diff --git a/fastfield_codecs/src/column.rs b/fastfield_codecs/src/column.rs index 00e0c092b..02dbb804f 100644 --- a/fastfield_codecs/src/column.rs +++ b/fastfield_codecs/src/column.rs @@ -152,6 +152,42 @@ impl<'a, T: Copy + PartialOrd + Send + Sync> Column for VecColumn<'a, T> { } } +impl<'a, T: Copy + PartialOrd> ColumnV2 for VecColumn<'a, T> { + fn get_val(&self, position: u64) -> Option { + Some(self.values[position as usize]) + } + + fn min_value(&self) -> T { + self.min_value + } + + fn max_value(&self) -> T { + self.max_value + } + + fn num_vals(&self) -> u64 { + self.values.len() as u64 + } +} + +impl<'a, T: Copy + PartialOrd> ColumnV2 for VecColumn<'a, Option> { + fn get_val(&self, position: u64) -> Option { + self.values[position as usize] + } + + fn min_value(&self) -> T { + self.min_value.unwrap() + } + + fn max_value(&self) -> T { + self.max_value.unwrap() + } + + fn num_vals(&self) -> u64 { + self.values.len() as u64 + } +} + impl<'a, T: Copy + Ord + Default, V> From<&'a V> for VecColumn<'a, T> where V: AsRef<[T]> + ?Sized { diff --git a/fastfield_codecs/src/compact_space/build_compact_space.rs b/fastfield_codecs/src/compact_space/build_compact_space.rs index 8cdf589f7..3b7006500 100644 --- a/fastfield_codecs/src/compact_space/build_compact_space.rs +++ b/fastfield_codecs/src/compact_space/build_compact_space.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeSet, BinaryHeap}; +use std::iter; use std::ops::RangeInclusive; use itertools::Itertools; @@ -9,35 +10,16 @@ use super::{CompactSpace, RangeMapping}; /// Put the blanks for the sorted values into a binary heap fn get_blanks(values_sorted: &BTreeSet) -> BinaryHeap { let mut blanks: BinaryHeap = BinaryHeap::new(); - let mut add_range = |blank_range: RangeInclusive| { - let blank_range: Result = blank_range.try_into(); - if let Ok(blank_range) = blank_range { - blanks.push(blank_range); - } - }; for (first, second) in values_sorted.iter().tuple_windows() { // Correctness Overflow: the values are deduped and sorted (BTreeSet property), that means // there's always space between two values. let blank_range = first + 1..=second - 1; - add_range(blank_range); - } - - // Replace after stabilization of https://github.com/rust-lang/rust/issues/62924 - // Add preceeding range if values don't start at 0 - if let Some(first_val) = values_sorted.iter().next() { - if *first_val != 0 { - let blank_range = 0..=first_val - 1; - add_range(blank_range); + let blank_range: Result = blank_range.try_into(); + if let Ok(blank_range) = blank_range { + blanks.push(blank_range); } } - // Add succeeding range if values don't end at u128::MAX - if let Some(last_val) = values_sorted.iter().last() { - if *last_val != u128::MAX { - let blank_range = last_val + 1..=u128::MAX; - add_range(blank_range); - } - } blanks } @@ -75,32 +57,46 @@ fn num_bits(val: u128) -> u8 { /// metadata. pub fn get_compact_space( values_deduped_sorted: &BTreeSet, - total_num_values: usize, + total_num_values: u64, cost_per_blank: usize, ) -> CompactSpace { - let mut compact_space = CompactSpaceBuilder::new(); + let mut compact_space_builder = CompactSpaceBuilder::new(); if values_deduped_sorted.is_empty() { - return compact_space.finish(); + return compact_space_builder.finish(); } let mut blanks: BinaryHeap = get_blanks(values_deduped_sorted); - let mut amplitude_compact_space = u128::MAX; + // Replace after stabilization of https://github.com/rust-lang/rust/issues/62924 + + // We start by space that's limited to min_value..=max_value + let min_value = *values_deduped_sorted.iter().next().unwrap_or(&0); + let max_value = *values_deduped_sorted.iter().last().unwrap_or(&0); + + // +1 for null, in case min and max covers the whole space, we are off by one. + let mut amplitude_compact_space = (max_value - min_value).saturating_add(1); + if min_value != 0 { + compact_space_builder.add_blanks(iter::once(0..=min_value - 1)); + } + if max_value != u128::MAX { + compact_space_builder.add_blanks(iter::once(max_value + 1..=u128::MAX)); + } + let mut amplitude_bits: u8 = num_bits(amplitude_compact_space); let mut blank_collector = BlankCollector::new(); - // We will stage blanks until they reduce the compact space by 1 bit. + // We will stage blanks until they reduce the compact space by at least 1 bit and then flush + // them if the metadata cost is lower than the total number of saved bits. // Binary heap to process the gaps by their size while let Some(blank_range) = blanks.pop() { blank_collector.stage_blank(blank_range); let staged_spaces_sum: u128 = blank_collector.staged_blanks_sum(); - // +1 for later added null value - let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum + 1; + let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum; let amplitude_new_bits = num_bits(amplitude_new_compact_space); if amplitude_bits == amplitude_new_bits { continue; } - let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * total_num_values; + let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * total_num_values as usize; // TODO: Maybe calculate exact cost of blanks and run this more expensive computation only, // when amplitude_new_bits changes let cost = blank_collector.num_staged_blanks() * cost_per_blank; @@ -116,7 +112,7 @@ pub fn get_compact_space( amplitude_compact_space = amplitude_new_compact_space; amplitude_bits = amplitude_new_bits; - compact_space.add_blanks(blank_collector.drain().map(|blank| blank.blank_range())); + compact_space_builder.add_blanks(blank_collector.drain().map(|blank| blank.blank_range())); } // special case, when we don't collected any blanks because: @@ -126,8 +122,8 @@ pub fn get_compact_space( // We drain one collected blank unconditionally, so the empty case is reserved for empty // data, and therefore empty compact_space means the data is empty and no data is covered // (conversely to all data) and we can assign null to it. - if compact_space.is_empty() { - compact_space.add_blanks( + if compact_space_builder.is_empty() { + compact_space_builder.add_blanks( blank_collector .drain() .map(|blank| blank.blank_range()) @@ -135,7 +131,14 @@ pub fn get_compact_space( ); } - compact_space.finish() + let compact_space = compact_space_builder.finish(); + if max_value - min_value != u128::MAX { + debug_assert_eq!( + compact_space.amplitude_compact_space(), + amplitude_compact_space + ); + } + compact_space } #[derive(Debug, Clone, Eq, PartialEq)] @@ -146,7 +149,7 @@ struct CompactSpaceBuilder { impl CompactSpaceBuilder { /// Creates a new compact space builder which will initially cover the whole space. fn new() -> Self { - Self { blanks: vec![] } + Self { blanks: Vec::new() } } /// Assumes that repeated add_blank calls don't overlap and are not adjacent, diff --git a/fastfield_codecs/src/compact_space/mod.rs b/fastfield_codecs/src/compact_space/mod.rs index d64bd923e..7428ca8e7 100644 --- a/fastfield_codecs/src/compact_space/mod.rs +++ b/fastfield_codecs/src/compact_space/mod.rs @@ -123,7 +123,7 @@ impl CompactSpace { fn amplitude_compact_space(&self) -> u128 { self.ranges_mapping .last() - .map(|last_range| last_range.compact_end() as u128 + 1) + .map(|last_range| last_range.compact_end() as u128) .unwrap_or(1) // compact space starts at 1, 0 == null } @@ -133,7 +133,7 @@ impl CompactSpace { /// Returns either Ok(the value in the compact space) or if it is outside the compact space the /// Err(position where it would be inserted) - fn to_compact(&self, value: u128) -> Result { + fn u128_to_compact(&self, value: u128) -> Result { self.ranges_mapping .binary_search_by(|probe| { let value_range = &probe.value_range; @@ -153,7 +153,7 @@ impl CompactSpace { } /// Unpacks a value from compact space u64 to u128 space - fn unpack(&self, compact: u64) -> u128 { + fn compact_to_u128(&self, compact: u64) -> u128 { let pos = self .ranges_mapping .binary_search_by_key(&compact, |range_mapping| range_mapping.compact_start) @@ -182,14 +182,39 @@ pub struct IPCodecParams { impl CompactSpaceCompressor { /// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals. - pub fn train_from( - vals: impl Iterator, - total_num_values_incl_nulls: usize, - ) -> Self { - let mut tree = BTreeSet::new(); - tree.extend(vals); - assert!(tree.len() <= total_num_values_incl_nulls); - train(&tree, total_num_values_incl_nulls) + pub fn train_from(column: impl ColumnV2) -> Self { + let mut values_sorted = BTreeSet::new(); + values_sorted.extend(column.iter().flatten()); + let total_num_values = column.num_vals(); + + let compact_space = + get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS); + let amplitude_compact_space = compact_space.amplitude_compact_space(); + + assert!( + amplitude_compact_space <= u64::MAX as u128, + "case unsupported." + ); + + let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64); + let min_value = *values_sorted.iter().next().unwrap_or(&0); + let max_value = *values_sorted.iter().last().unwrap_or(&0); + assert_eq!( + compact_space + .u128_to_compact(max_value) + .expect("could not convert max value to compact space"), + amplitude_compact_space as u64 + ); + CompactSpaceCompressor { + params: IPCodecParams { + compact_space, + bit_unpacker: BitUnpacker::new(num_bits), + min_value, + max_value, + num_vals: total_num_values as u64, + num_bits, + }, + } } fn write_footer(self, writer: &mut impl Write) -> io::Result<()> { @@ -216,12 +241,15 @@ impl CompactSpaceCompressor { let mut bitpacker = BitPacker::default(); for val in vals { let compact = if let Some(val) = val { - self.params.compact_space.to_compact(val).map_err(|_| { - io::Error::new( - io::ErrorKind::InvalidData, - "Could not convert value to compact_space. This is a bug.", - ) - })? + self.params + .compact_space + .u128_to_compact(val) + .map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidData, + "Could not convert value to compact_space. This is a bug.", + ) + })? } else { NULL_VALUE_COMPACT_SPACE }; @@ -233,36 +261,6 @@ impl CompactSpaceCompressor { } } -fn train(values_sorted: &BTreeSet, total_num_values: usize) -> CompactSpaceCompressor { - let compact_space = get_compact_space(values_sorted, total_num_values, COST_PER_BLANK_IN_BITS); - let amplitude_compact_space = compact_space.amplitude_compact_space(); - - assert!( - amplitude_compact_space <= u64::MAX as u128, - "case unsupported." - ); - - let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64); - let min_value = *values_sorted.iter().next().unwrap_or(&0); - let max_value = *values_sorted.iter().last().unwrap_or(&0); - assert!( - compact_space - .to_compact(max_value) - .expect("could not convert max value to compact space") - < amplitude_compact_space as u64 - ); - CompactSpaceCompressor { - params: IPCodecParams { - compact_space, - bit_unpacker: BitUnpacker::new(num_bits), - min_value, - max_value, - num_vals: total_num_values as u64, - num_bits, - }, - } -} - #[derive(Debug, Clone)] pub struct CompactSpaceDecompressor { data: OwnedBytes, @@ -353,12 +351,12 @@ impl CompactSpaceDecompressor { /// /// and we want a mapping for 1005, there is no equivalent compact space. We instead return an /// error with the index of the next range. - fn to_compact(&self, value: u128) -> Result { - self.params.compact_space.to_compact(value) + fn u128_to_compact(&self, value: u128) -> Result { + self.params.compact_space.u128_to_compact(value) } fn compact_to_u128(&self, compact: u64) -> u128 { - self.params.compact_space.unpack(compact) + self.params.compact_space.compact_to_u128(compact) } /// Comparing on compact space: 1.08 GElements/s, which equals a throughput of 17,3 Gb/s @@ -372,8 +370,8 @@ impl CompactSpaceDecompressor { let from_value = *range.start(); let to_value = *range.end(); assert!(to_value >= from_value); - let compact_from = self.to_compact(from_value); - let compact_to = self.to_compact(to_value); + let compact_from = self.u128_to_compact(from_value); + let compact_to = self.u128_to_compact(to_value); // Quick return, if both ranges fall into the same non-mapped space, the range can't cover // any values, so we can early exit @@ -477,6 +475,7 @@ impl CompactSpaceDecompressor { mod tests { use super::*; + use crate::VecColumn; #[test] fn compact_space_test() { @@ -485,12 +484,12 @@ mod tests { ] .into_iter() .collect(); - let compact_space = get_compact_space(ips, ips.len(), 11); + let compact_space = get_compact_space(ips, ips.len() as u64, 11); let amplitude = compact_space.amplitude_compact_space(); - assert_eq!(amplitude, 20); - assert_eq!(3, compact_space.to_compact(2).unwrap()); - assert_eq!(4, compact_space.to_compact(3).unwrap()); - assert_eq!(compact_space.to_compact(100).unwrap_err(), 1); + assert_eq!(amplitude, 17); + assert_eq!(1, compact_space.u128_to_compact(2).unwrap()); + assert_eq!(2, compact_space.u128_to_compact(3).unwrap()); + assert_eq!(compact_space.u128_to_compact(100).unwrap_err(), 1); for (num1, num2) in (0..3).tuple_windows() { assert_eq!( @@ -508,17 +507,17 @@ mod tests { ); for ip in ips { - let compact = compact_space.to_compact(*ip).unwrap(); - assert_eq!(compact_space.unpack(compact), *ip); + let compact = compact_space.u128_to_compact(*ip).unwrap(); + assert_eq!(compact_space.compact_to_u128(compact), *ip); } } #[test] fn compact_space_amplitude_test() { let ips = &[100000u128, 1000000].into_iter().collect(); - let compact_space = get_compact_space(ips, ips.len(), 1); + let compact_space = get_compact_space(ips, ips.len() as u64, 1); let amplitude = compact_space.amplitude_compact_space(); - assert_eq!(amplitude, 3); + assert_eq!(amplitude, 2); } fn test_all(data: OwnedBytes, expected: &[Option]) { @@ -547,10 +546,7 @@ mod tests { } fn test_aux_vals_opt(u128_vals: &[Option]) -> OwnedBytes { - let compressor = CompactSpaceCompressor::train_from( - u128_vals.iter().cloned().flatten(), - u128_vals.len(), - ); + let compressor = CompactSpaceCompressor::train_from(VecColumn::from(u128_vals)); let data = compressor.compress(u128_vals.iter().cloned()).unwrap(); let data = OwnedBytes::new(data); test_all(data.clone(), u128_vals); @@ -628,9 +624,8 @@ mod tests { #[test] fn test_null() { - let vals = &[2u128]; - let compressor = CompactSpaceCompressor::train_from(vals.iter().cloned(), 2); let vals = vec![None, Some(2u128)]; + let compressor = CompactSpaceCompressor::train_from(VecColumn::from(&vals)); let data = compressor.compress(vals.iter().cloned()).unwrap(); let decomp = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap(); let positions = decomp.get_range(0..=1); @@ -668,7 +663,7 @@ mod tests { 1_000_000, 5_000_000_000, ]; - let compressor = CompactSpaceCompressor::train_from(vals.iter().cloned(), vals.len()); + let compressor = CompactSpaceCompressor::train_from(VecColumn::from(vals)); let data = compressor.compress(vals.iter().cloned().map(Some)).unwrap(); let decomp = CompactSpaceDecompressor::open(OwnedBytes::new(data)).unwrap(); @@ -696,6 +691,12 @@ mod tests { let _data = test_aux_vals(vals); } + #[test] + fn test_bug4() { + let vals = &[340282366920938463463374607431768211455, 0]; + let _data = test_aux_vals(vals); + } + #[test] fn test_first_large_gaps() { let vals = &[1_000_000_000u128; 100]; diff --git a/fastfield_codecs/src/main.rs b/fastfield_codecs/src/main.rs index 11d166816..ef39df0b9 100644 --- a/fastfield_codecs/src/main.rs +++ b/fastfield_codecs/src/main.rs @@ -93,8 +93,7 @@ fn bench_ip() { { let mut data = vec![]; for dataset in dataset.chunks(50_000) { - let compressor = - CompactSpaceCompressor::train_from(dataset.iter().cloned(), dataset.len()); + let compressor = CompactSpaceCompressor::train_from(VecColumn::from(dataset)); compressor .compress_into(dataset.iter().cloned().map(Some), &mut data) .unwrap(); @@ -107,7 +106,7 @@ fn bench_ip() { ); } - let compressor = CompactSpaceCompressor::train_from(dataset.iter().cloned(), dataset.len()); + let compressor = CompactSpaceCompressor::train_from(VecColumn::from(&dataset)); let data = compressor .compress(dataset.iter().cloned().map(Some)) .unwrap();