diff --git a/fastfield_codecs/Cargo.toml b/fastfield_codecs/Cargo.toml index f66f91c96..2ee7e0093 100644 --- a/fastfield_codecs/Cargo.toml +++ b/fastfield_codecs/Cargo.toml @@ -16,7 +16,7 @@ prettytable-rs = {version="0.9.0", optional= true} rand = {version="0.8.3", optional= true} fastdivide = "0.4" log = "0.4" -itertools = { version="0.10.3", optional=true} +itertools = { version = "0.10.3", optional = true } measure_time = { version="0.8.2", optional=true} [dev-dependencies] diff --git a/fastfield_codecs/src/compact_space.rs b/fastfield_codecs/src/compact_space.rs index f62b99322..c8184913d 100644 --- a/fastfield_codecs/src/compact_space.rs +++ b/fastfield_codecs/src/compact_space.rs @@ -12,8 +12,7 @@ /// The codec is created to compress ip addresses, but may be employed in other use cases. use std::{ cmp::Ordering, - collections::BinaryHeap, - convert::{TryFrom, TryInto}, + collections::BTreeSet, io::{self, Write}, net::{IpAddr, Ipv6Addr}, ops::RangeInclusive, @@ -24,6 +23,10 @@ use ownedbytes::OwnedBytes; use tantivy_bitpacker::{self, BitPacker, BitUnpacker}; use crate::column::{ColumnV2, ColumnV2Ext}; +use crate::compact_space::build_compact_space::get_compact_space; + +mod blank_range; +mod build_compact_space; pub fn ip_to_u128(ip_addr: IpAddr) -> u128 { let ip_addr_v6: Ipv6Addr = match ip_addr { @@ -39,256 +42,8 @@ pub fn ip_to_u128(ip_addr: IpAddr) -> u128 { /// The number is taken by looking at a real dataset. It is optimized for larger datasets. const COST_PER_BLANK_IN_BITS: usize = 36; -/// The range of a blank in value space. -/// -/// A blank is an unoccupied space in the data. -/// Ordered by size -/// -/// Use the try_into(), invalid ranges will be rejected. -/// -/// TODO: move to own module to force try_into -#[derive(Debug, Eq, PartialEq, Clone)] -struct BlankRange { - blank_range: RangeInclusive, -} -impl TryFrom> for BlankRange { - type Error = &'static str; - fn try_from(range: RangeInclusive) -> Result { - let blank_size = range.end().saturating_sub(*range.start()); - if blank_size < 2 { - Err("invalid range") - } else { - Ok(BlankRange { blank_range: range }) - } - } -} -impl BlankRange { - fn blank_size(&self) -> u128 { - self.blank_range.end() - self.blank_range.start() - } -} - -impl Ord for BlankRange { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.blank_size().cmp(&other.blank_size()) - } -} -impl PartialOrd for BlankRange { - fn partial_cmp(&self, other: &Self) -> Option { - self.blank_size().partial_cmp(&other.blank_size()) - } -} - -/// Put the blanks for the sorted values into a binary heap -fn get_blanks(values_sorted: &[u128]) -> BinaryHeap { - let mut blanks: BinaryHeap = BinaryHeap::new(); - let mut add_range = |blank_range: RangeInclusive| { - let blank_range: Result = blank_range.try_into(); - if let Ok(blank_range) = blank_range { - blanks.push(blank_range); - } - }; - for values in values_sorted.windows(2) { - let blank_range = values[0] + 1..=values[1] - 1; - add_range(blank_range); - } - if let Some(first_val) = values_sorted.first().filter(|first_val| **first_val != 0) { - let blank_range = 0..=first_val - 1; - add_range(blank_range); - } - - if let Some(last_val) = values_sorted - .last() - .filter(|last_val| **last_val != u128::MAX) - { - let blank_range = last_val + 1..=u128::MAX; - add_range(blank_range); - } - blanks -} - -struct BlankCollector { - blanks: Vec, - staged_blanks_sum: u128, -} -impl BlankCollector { - fn new() -> Self { - Self { - blanks: vec![], - staged_blanks_sum: 0, - } - } - fn stage_blank(&mut self, blank: BlankRange) { - self.staged_blanks_sum += blank.blank_size(); - self.blanks.push(blank); - } - fn drain(&mut self) -> std::vec::Drain<'_, BlankRange> { - self.staged_blanks_sum = 0; - self.blanks.drain(..) - } - fn staged_blanks_sum(&self) -> u128 { - self.staged_blanks_sum - } - fn num_blanks(&self) -> usize { - self.blanks.len() - } -} -fn num_bits(val: u128) -> u8 { - (128u32 - val.leading_zeros()) as u8 -} - -/// Will collect blanks and add them to compact space if more bits are saved than cost from -/// metadata. -fn get_compact_space( - values_deduped_sorted: &[u128], - total_num_values: usize, - cost_per_blank: usize, -) -> CompactSpace { - let mut blanks = get_blanks(values_deduped_sorted); - let mut amplitude_compact_space = u128::MAX; - let mut amplitude_bits: u8 = num_bits(amplitude_compact_space); - - let mut compact_space = CompactSpaceBuilder::new(); - if values_deduped_sorted.is_empty() { - return compact_space.finish(); - } - - let mut blank_collector = BlankCollector::new(); - // We will stage blanks until they reduce the compact space by 1 bit. - // Binary heap to process the gaps by their size - while let Some(blank_range) = blanks.pop() { - blank_collector.stage_blank(blank_range); - - let staged_spaces_sum: u128 = blank_collector.staged_blanks_sum(); - // +1 for later added null value - let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1; - let amplitude_new_bits = num_bits(amplitude_new_compact_space); - if amplitude_bits == amplitude_new_bits { - continue; - } - let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * total_num_values; - // TODO: Maybe calculate exact cost of blanks and run this more expensive computation only, - // when amplitude_new_bits changes - let cost = blank_collector.num_blanks() * cost_per_blank; - if cost >= saved_bits { - // Continue here, since although we walk over the blanks by size, - // we can potentially save a lot at the last bits, which are smaller blanks - // - // E.g. if the first range reduces the compact space by 1000 from 2000 to 1000, which - // saves 11-10=1 bit and the next range reduces the compact space by 950 to - // 50, which saves 10-6=4 bit - continue; - } - - amplitude_compact_space = amplitude_new_compact_space; - amplitude_bits = amplitude_new_bits; - compact_space.add_blanks(blank_collector.drain().map(|blank| blank.blank_range)); - } - - // special case, when we don't collected any blanks because: - // * the data is empty - // * the algorithm did decide it's not worth the cost, which can be the case for single values - // - // We drain one collected blank unconditionally, so the empty case is reserved for empty - // data, and therefore empty compact_space means the data is empty and no data is covered - // (conversely to all data) and we can assign null to it. - if compact_space.is_empty() { - compact_space.add_blanks( - blank_collector - .drain() - .map(|blank| blank.blank_range) - .take(1), - ); - } - - compact_space.finish() -} - #[derive(Debug, Clone, Eq, PartialEq)] -struct CompactSpaceBuilder { - blanks: Vec>, -} - -impl CompactSpaceBuilder { - /// Creates a new compact space builder which will initially cover the whole space. - fn new() -> Self { - Self { blanks: vec![] } - } - - /// Assumes that repeated add_blank calls don't overlap and are not adjacent, - /// e.g. [3..=5, 5..=10] is not allowed - /// - /// Both of those assumptions are true when blanks are produced from sorted values. - fn add_blanks(&mut self, blank: impl Iterator>) { - self.blanks.extend(blank); - } - - fn is_empty(&self) -> bool { - self.blanks.is_empty() - } - - /// Convert blanks to covered space and assign null value - fn finish(mut self) -> CompactSpace { - // sort by start since ranges are not allowed to overlap - self.blanks.sort_by_key(|blank| *blank.start()); - - // Between the blanks - let mut covered_space = self - .blanks - .windows(2) - .map(|blanks| { - assert!( - blanks[0].end() < blanks[1].start(), - "overlapping or adjacent ranges detected" - ); - *blanks[0].end() + 1..=*blanks[1].start() - 1 - }) - .collect::>(); - - // Outside the blanks - if let Some(first_blank_start) = self.blanks.first().map(RangeInclusive::start) { - if *first_blank_start != 0 { - covered_space.insert(0, 0..=first_blank_start - 1); - } - } - - if let Some(last_blank_end) = self.blanks.last().map(RangeInclusive::end) { - if *last_blank_end != u128::MAX { - covered_space.push(last_blank_end + 1..=u128::MAX); - } - } - - // Extend the first range and assign the null value to it. - let null_value = if let Some(first_covered_space) = covered_space.first_mut() { - // in case the first covered space ends at u128::MAX, assign null to the beginning - if *first_covered_space.end() == u128::MAX { - *first_covered_space = first_covered_space.start() - 1..=*first_covered_space.end(); - *first_covered_space.start() - } else { - *first_covered_space = *first_covered_space.start()..=first_covered_space.end() + 1; - *first_covered_space.end() - } - } else { - covered_space.push(0..=0); // empty data case - 0u128 - }; - - let mut compact_start: u64 = 0; - let mut ranges_and_compact_start = Vec::with_capacity(covered_space.len()); - for cov in covered_space { - let covered_range_len = cov.end() - cov.start() + 1; // e.g. 0..=1 covered space 1-0+1= 2 - ranges_and_compact_start.push((cov, compact_start)); - compact_start += covered_range_len as u64; - } - CompactSpace { - ranges_and_compact_start, - null_value, - } - } -} - -#[derive(Debug, Clone, Eq, PartialEq)] -struct CompactSpace { +pub struct CompactSpace { ranges_and_compact_start: Vec<(RangeInclusive, u64)>, pub null_value: u128, } @@ -400,24 +155,19 @@ pub struct IPCodecParams { } impl CompactSpaceCompressor { - pub fn null_value(&self) -> u128 { - self.params.null_value + pub fn null_value_compact_space(&self) -> u64 { + self.params.null_value_compact_space } - /// Taking the vals as Vec may cost a lot of memory. - /// It is used to sort the vals. - /// - /// The lower memory alternative to just store the index (u32) and use that as sorting may be an - /// issue for the merge case, where random access is more expensive. - /// - /// TODO: Should we take Option here? (better api, but 24bytes instead 16 per element) - pub fn train_from(mut vals: Vec) -> Self { - let total_num_values = vals.len(); // TODO: Null values should be here too - vals.sort(); - // We don't care for duplicates - vals.dedup(); - vals.shrink_to_fit(); - train(&vals, total_num_values) + /// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals. + pub fn train_from( + vals: impl Iterator, + total_num_values_incl_nulls: usize, + ) -> Self { + let mut tree = BTreeSet::new(); + tree.extend(vals); + assert!(tree.len() <= total_num_values_incl_nulls); + train(&tree, total_num_values_incl_nulls) } fn to_compact(&self, value: u128) -> u64 { @@ -435,22 +185,25 @@ impl CompactSpaceCompressor { Ok(()) } - pub fn compress(self, vals: impl Iterator) -> io::Result> { + pub fn compress(self, vals: impl Iterator>) -> io::Result> { let mut output = vec![]; self.compress_into(vals, &mut output)?; Ok(output) } - /// TODO: Should we take Option here? Otherwise the caller has to replace None with - /// `self.null_value()` + pub fn compress_into( self, - vals: impl Iterator, + vals: impl Iterator>, write: &mut impl Write, ) -> io::Result<()> { let mut bitpacker = BitPacker::default(); let mut num_vals = 0; for val in vals { - let compact = self.to_compact(val); + let compact = if let Some(val) = val { + self.to_compact(val) + } else { + self.null_value_compact_space() + }; bitpacker .write(compact, self.params.num_bits, write) .unwrap(); @@ -462,7 +215,7 @@ impl CompactSpaceCompressor { } } -fn train(values_sorted: &[u128], total_num_values: usize) -> CompactSpaceCompressor { +fn train(values_sorted: &BTreeSet, total_num_values: usize) -> CompactSpaceCompressor { let compact_space = get_compact_space(values_sorted, total_num_values, COST_PER_BLANK_IN_BITS); let null_value = compact_space.null_value; let null_compact_space = compact_space @@ -476,8 +229,8 @@ fn train(values_sorted: &[u128], total_num_values: usize) -> CompactSpaceCompres ); let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64); - let min_value = *values_sorted.first().unwrap_or(&0); - let max_value = *values_sorted.last().unwrap_or(&0); + let min_value = *values_sorted.iter().next().unwrap_or(&0); + let max_value = *values_sorted.iter().last().unwrap_or(&0); let compressor = CompactSpaceCompressor { params: IPCodecParams { compact_space, @@ -491,8 +244,8 @@ fn train(values_sorted: &[u128], total_num_values: usize) -> CompactSpaceCompres }, }; - let max_value = *values_sorted.last().unwrap_or(&0u128).max(&null_value); - assert!(compressor.to_compact(max_value) < amplitude_compact_space as u64); + let max_value_in_value_space = max_value.max(null_value); + assert!(compressor.to_compact(max_value_in_value_space) < amplitude_compact_space as u64); compressor } @@ -700,27 +453,13 @@ mod tests { use super::*; - #[test] - fn test_binary_heap_pop_order() { - let mut blanks: BinaryHeap = BinaryHeap::new(); - blanks.push(BlankRange { - blank_range: 0..=10, - }); - blanks.push(BlankRange { - blank_range: 100..=200, - }); - blanks.push(BlankRange { - blank_range: 100..=110, - }); - assert_eq!(blanks.pop().unwrap().blank_size(), 100); - assert_eq!(blanks.pop().unwrap().blank_size(), 10); - } - #[test] fn compact_space_test() { let ips = &[ 2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260, - ]; + ] + .into_iter() + .collect(); let compact_space = get_compact_space(ips, ips.len(), 11); assert_eq!(compact_space.null_value, 5); let amplitude = compact_space.amplitude_compact_space(); @@ -745,7 +484,7 @@ mod tests { #[test] fn compact_space_amplitude_test() { - let ips = &[100000u128, 1000000]; + let ips = &[100000u128, 1000000].into_iter().collect(); let compact_space = get_compact_space(ips, ips.len(), 1); assert_eq!(compact_space.null_value, 100001); let amplitude = compact_space.amplitude_compact_space(); @@ -754,8 +493,7 @@ mod tests { fn test_all(data: OwnedBytes, expected: &[u128]) { let decompressor = CompactSpaceDecompressor::open(data).unwrap(); - for idx in 0..decompressor.params.num_vals as usize { - let expected_val = expected[idx]; + for (idx, expected_val) in expected.iter().cloned().enumerate() { let val = decompressor.get(idx as u64); assert_eq!(val, Some(expected_val)); let positions = decompressor.get_range(expected_val.saturating_sub(1)..=expected_val); @@ -771,8 +509,11 @@ mod tests { } fn test_aux_vals(u128_vals: &[u128]) -> OwnedBytes { - let compressor = CompactSpaceCompressor::train_from(u128_vals.to_vec()); - let data = compressor.compress(u128_vals.iter().cloned()).unwrap(); + let compressor = + CompactSpaceCompressor::train_from(u128_vals.iter().cloned(), u128_vals.len()); + let data = compressor + .compress(u128_vals.iter().cloned().map(Some)) + .unwrap(); let data = OwnedBytes::new(data); test_all(data.clone(), u128_vals); data @@ -846,8 +587,8 @@ mod tests { #[test] fn test_null() { let vals = &[2u128]; - let compressor = CompactSpaceCompressor::train_from(vals.to_vec()); - let vals = vec![compressor.null_value(), 2u128]; + let compressor = CompactSpaceCompressor::train_from(vals.iter().cloned(), vals.len()); + let vals = vec![None, Some(2u128)]; let data = compressor.compress(vals.iter().cloned()).unwrap(); let decomp = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap(); let positions = decomp.get_range(0..=1); @@ -885,9 +626,8 @@ mod tests { 1_000_000, 5_000_000_000, ]; - let compressor = CompactSpaceCompressor::train_from(vals.to_vec()); - // let vals = vec![compressor.null_value(), 2u128]; - let data = compressor.compress(vals.iter().cloned()).unwrap(); + let compressor = CompactSpaceCompressor::train_from(vals.iter().cloned(), vals.len()); + let data = compressor.compress(vals.iter().cloned().map(Some)).unwrap(); let decomp = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap(); assert_eq!(decomp.get_range(199..=200), vec![0]); diff --git a/fastfield_codecs/src/compact_space/blank_range.rs b/fastfield_codecs/src/compact_space/blank_range.rs new file mode 100644 index 000000000..b68508318 --- /dev/null +++ b/fastfield_codecs/src/compact_space/blank_range.rs @@ -0,0 +1,42 @@ +use std::ops::RangeInclusive; + +/// The range of a blank in value space. +/// +/// A blank is an unoccupied space in the data. +/// Ordered by size +/// +/// Use the try_into(), invalid ranges will be rejected. +#[derive(Debug, Eq, PartialEq, Clone)] +pub(crate) struct BlankRange { + blank_range: RangeInclusive, +} +impl TryFrom> for BlankRange { + type Error = &'static str; + fn try_from(range: RangeInclusive) -> Result { + let blank_size = range.end().saturating_sub(*range.start()); + if blank_size < 2 { + Err("invalid range") + } else { + Ok(BlankRange { blank_range: range }) + } + } +} +impl BlankRange { + pub(crate) fn blank_size(&self) -> u128 { + self.blank_range.end() - self.blank_range.start() + } + pub(crate) fn blank_range(&self) -> RangeInclusive { + self.blank_range.clone() + } +} + +impl Ord for BlankRange { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.blank_size().cmp(&other.blank_size()) + } +} +impl PartialOrd for BlankRange { + fn partial_cmp(&self, other: &Self) -> Option { + self.blank_size().partial_cmp(&other.blank_size()) + } +} diff --git a/fastfield_codecs/src/compact_space/build_compact_space.rs b/fastfield_codecs/src/compact_space/build_compact_space.rs new file mode 100644 index 000000000..b088729d4 --- /dev/null +++ b/fastfield_codecs/src/compact_space/build_compact_space.rs @@ -0,0 +1,237 @@ +use std::collections::{BTreeSet, BinaryHeap}; +use std::ops::RangeInclusive; + +use itertools::Itertools; + +use super::blank_range::BlankRange; +use super::CompactSpace; + +/// Put the blanks for the sorted values into a binary heap +fn get_blanks(values_sorted: &BTreeSet) -> BinaryHeap { + let mut blanks: BinaryHeap = BinaryHeap::new(); + let mut add_range = |blank_range: RangeInclusive| { + let blank_range: Result = blank_range.try_into(); + if let Ok(blank_range) = blank_range { + blanks.push(blank_range); + } + }; + for (first, second) in values_sorted.iter().tuple_windows() { + // Correctness Overflow: the values are deduped and sorted (BTreeSet property), that means + // there's always space between two values. + let blank_range = first + 1..=second - 1; + add_range(blank_range); + } + + // Replace after stabilization of https://github.com/rust-lang/rust/issues/62924 + // Add preceeding range if values don't start at 0 + if let Some(first_val) = values_sorted.iter().next() { + if *first_val != 0 { + let blank_range = 0..=first_val - 1; + add_range(blank_range); + } + } + + // Add succeeding range if values don't end at u128::MAX + if let Some(last_val) = values_sorted.iter().last() { + if *last_val != u128::MAX { + let blank_range = last_val + 1..=u128::MAX; + add_range(blank_range); + } + } + blanks +} + +struct BlankCollector { + blanks: Vec, + staged_blanks_sum: u128, +} +impl BlankCollector { + fn new() -> Self { + Self { + blanks: vec![], + staged_blanks_sum: 0, + } + } + fn stage_blank(&mut self, blank: BlankRange) { + self.staged_blanks_sum += blank.blank_size(); + self.blanks.push(blank); + } + fn drain(&mut self) -> std::vec::Drain<'_, BlankRange> { + self.staged_blanks_sum = 0; + self.blanks.drain(..) + } + fn staged_blanks_sum(&self) -> u128 { + self.staged_blanks_sum + } + fn num_blanks(&self) -> usize { + self.blanks.len() + } +} +fn num_bits(val: u128) -> u8 { + (128u32 - val.leading_zeros()) as u8 +} + +/// Will collect blanks and add them to compact space if more bits are saved than cost from +/// metadata. +pub fn get_compact_space( + values_deduped_sorted: &BTreeSet, + total_num_values: usize, + cost_per_blank: usize, +) -> CompactSpace { + let mut blanks = get_blanks(values_deduped_sorted); + let mut amplitude_compact_space = u128::MAX; + let mut amplitude_bits: u8 = num_bits(amplitude_compact_space); + + let mut compact_space = CompactSpaceBuilder::new(); + if values_deduped_sorted.is_empty() { + return compact_space.finish(); + } + + let mut blank_collector = BlankCollector::new(); + // We will stage blanks until they reduce the compact space by 1 bit. + // Binary heap to process the gaps by their size + while let Some(blank_range) = blanks.pop() { + blank_collector.stage_blank(blank_range); + + let staged_spaces_sum: u128 = blank_collector.staged_blanks_sum(); + // +1 for later added null value + let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1; + let amplitude_new_bits = num_bits(amplitude_new_compact_space); + if amplitude_bits == amplitude_new_bits { + continue; + } + let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * total_num_values; + // TODO: Maybe calculate exact cost of blanks and run this more expensive computation only, + // when amplitude_new_bits changes + let cost = blank_collector.num_blanks() * cost_per_blank; + if cost >= saved_bits { + // Continue here, since although we walk over the blanks by size, + // we can potentially save a lot at the last bits, which are smaller blanks + // + // E.g. if the first range reduces the compact space by 1000 from 2000 to 1000, which + // saves 11-10=1 bit and the next range reduces the compact space by 950 to + // 50, which saves 10-6=4 bit + continue; + } + + amplitude_compact_space = amplitude_new_compact_space; + amplitude_bits = amplitude_new_bits; + compact_space.add_blanks(blank_collector.drain().map(|blank| blank.blank_range())); + } + + // special case, when we don't collected any blanks because: + // * the data is empty (early exit) + // * the algorithm did decide it's not worth the cost, which can be the case for single values + // + // We drain one collected blank unconditionally, so the empty case is reserved for empty + // data, and therefore empty compact_space means the data is empty and no data is covered + // (conversely to all data) and we can assign null to it. + if compact_space.is_empty() { + compact_space.add_blanks( + blank_collector + .drain() + .map(|blank| blank.blank_range()) + .take(1), + ); + } + + compact_space.finish() +} + +#[derive(Debug, Clone, Eq, PartialEq)] +struct CompactSpaceBuilder { + blanks: Vec>, +} + +impl CompactSpaceBuilder { + /// Creates a new compact space builder which will initially cover the whole space. + fn new() -> Self { + Self { blanks: vec![] } + } + + /// Assumes that repeated add_blank calls don't overlap and are not adjacent, + /// e.g. [3..=5, 5..=10] is not allowed + /// + /// Both of those assumptions are true when blanks are produced from sorted values. + fn add_blanks(&mut self, blank: impl Iterator>) { + self.blanks.extend(blank); + } + + fn is_empty(&self) -> bool { + self.blanks.is_empty() + } + + /// Convert blanks to covered space and assign null value + fn finish(mut self) -> CompactSpace { + // sort by start since ranges are not allowed to overlap + self.blanks.sort_by_key(|blank| *blank.start()); + + // Between the blanks + let mut covered_space = self + .blanks + .windows(2) + .map(|blanks| { + assert!( + blanks[0].end() < blanks[1].start(), + "overlapping or adjacent ranges detected" + ); + *blanks[0].end() + 1..=*blanks[1].start() - 1 + }) + .collect::>(); + + // Outside the blanks + if let Some(first_blank_start) = self.blanks.first().map(RangeInclusive::start) { + if *first_blank_start != 0 { + covered_space.insert(0, 0..=first_blank_start - 1); + } + } + + if let Some(last_blank_end) = self.blanks.last().map(RangeInclusive::end) { + if *last_blank_end != u128::MAX { + covered_space.push(last_blank_end + 1..=u128::MAX); + } + } + + // Extend the first range and assign the null value to it. + let null_value = if let Some(first_covered_space) = covered_space.first_mut() { + // in case the first covered space ends at u128::MAX, assign null to the beginning + if *first_covered_space.end() == u128::MAX { + *first_covered_space = first_covered_space.start() - 1..=*first_covered_space.end(); + *first_covered_space.start() + } else { + *first_covered_space = *first_covered_space.start()..=first_covered_space.end() + 1; + *first_covered_space.end() + } + } else { + covered_space.push(0..=0); // empty data case + 0u128 + }; + + let mut compact_start: u64 = 0; + let mut ranges_and_compact_start = Vec::with_capacity(covered_space.len()); + for cov in covered_space { + let covered_range_len = cov.end() - cov.start() + 1; // e.g. 0..=1 covered space 1-0+1= 2 + ranges_and_compact_start.push((cov, compact_start)); + compact_start += covered_range_len as u64; + } + CompactSpace { + ranges_and_compact_start, + null_value, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_binary_heap_pop_order() { + let mut blanks: BinaryHeap = BinaryHeap::new(); + blanks.push((0..=10).try_into().unwrap()); + blanks.push((100..=200).try_into().unwrap()); + blanks.push((100..=110).try_into().unwrap()); + assert_eq!(blanks.pop().unwrap().blank_size(), 100); + assert_eq!(blanks.pop().unwrap().blank_size(), 10); + } +} diff --git a/fastfield_codecs/src/main.rs b/fastfield_codecs/src/main.rs index 439d9ecc6..11d166816 100644 --- a/fastfield_codecs/src/main.rs +++ b/fastfield_codecs/src/main.rs @@ -7,9 +7,12 @@ use std::net::{IpAddr, Ipv6Addr}; use std::str::FromStr; use fastfield_codecs::{ - Column, CompactSpaceCompressor, FastFieldCodecType, FastFieldStats, VecColumn, + Column, CompactSpaceCompressor, CompactSpaceDecompressor, FastFieldCodecType, FastFieldStats, + VecColumn, }; use itertools::Itertools; +use measure_time::print_time; +use ownedbytes::OwnedBytes; use prettytable::{Cell, Row, Table}; fn print_set_stats(ip_addrs: &[u128]) { @@ -51,12 +54,6 @@ fn print_set_stats(ip_addrs: &[u128]) { b.1.cmp(&a.1) } }); - - // println!("\n\n----\nIP Address histogram"); - // println!("IPAddrCount\tFrequency"); - // for (ip_addr_count, times) in cnts { - // println!("{}\t{}", ip_addr_count, times); - //} } fn ip_dataset() -> Vec { @@ -96,9 +93,10 @@ fn bench_ip() { { let mut data = vec![]; for dataset in dataset.chunks(50_000) { - let compressor = CompactSpaceCompressor::train_from(dataset.to_vec()); + let compressor = + CompactSpaceCompressor::train_from(dataset.iter().cloned(), dataset.len()); compressor - .compress_into(dataset.iter().cloned(), &mut data) + .compress_into(dataset.iter().cloned().map(Some), &mut data) .unwrap(); } let compression = data.len() as f64 / (dataset.len() * 16) as f64; @@ -109,8 +107,10 @@ fn bench_ip() { ); } - let compressor = CompactSpaceCompressor::train_from(dataset.to_vec()); - let data = compressor.compress(dataset.iter().cloned()).unwrap(); + let compressor = CompactSpaceCompressor::train_from(dataset.iter().cloned(), dataset.len()); + let data = compressor + .compress(dataset.iter().cloned().map(Some)) + .unwrap(); let compression = data.len() as f64 / (dataset.len() * 16) as f64; println!("Compression {:.2}", compression); @@ -119,12 +119,13 @@ fn bench_ip() { (data.len() * 8) as f32 / dataset.len() as f32 ); - // let decompressor = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap(); - // for i in 11100..11150 { - // print_time!("get range"); - // let doc_values = decompressor.get_range(dataset[i]..=dataset[i]); - // println!("{:?}", doc_values.len()); - //} + let decompressor = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap(); + // Sample some ranges + for value in dataset.iter().take(1110).skip(1100).cloned() { + print_time!("get range"); + let doc_values = decompressor.get_range(value..=value); + println!("{:?}", doc_values.len()); + } } fn main() {