diff --git a/common/src/lib.rs b/common/src/lib.rs index 0a21fbbc5..4463c46e1 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -11,7 +11,10 @@ mod writer; pub use bitset::*; pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize}; -pub use vint::{read_u32_vint, read_u32_vint_no_advance, serialize_vint_u32, write_u32_vint, VInt}; +pub use vint::{ + deserialize_vint_u128, read_u32_vint, read_u32_vint_no_advance, serialize_vint_u128, + serialize_vint_u32, write_u32_vint, VInt, VIntU128, +}; pub use writer::{AntiCallToken, CountingWriter, TerminatingWrite}; /// Has length trait diff --git a/common/src/vint.rs b/common/src/vint.rs index 0385a7f6a..de11451df 100644 --- a/common/src/vint.rs +++ b/common/src/vint.rs @@ -5,6 +5,75 @@ use byteorder::{ByteOrder, LittleEndian}; use super::BinarySerializable; +/// Variable int serializes a u128 number +pub fn serialize_vint_u128(mut val: u128, output: &mut Vec) { + loop { + let next_byte: u8 = (val % 128u128) as u8; + val /= 128u128; + if val == 0 { + output.push(next_byte | STOP_BIT); + return; + } else { + output.push(next_byte); + } + } +} + +/// Deserializes a u128 number +/// +/// Returns the number and the slice after the vint +pub fn deserialize_vint_u128(data: &[u8]) -> io::Result<(u128, &[u8])> { + let mut result = 0u128; + let mut shift = 0u64; + for i in 0..19 { + let b = data[i]; + result |= u128::from(b % 128u8) << shift; + if b >= STOP_BIT { + return Ok((result, &data[i + 1..])); + } + shift += 7; + } + Err(io::Error::new( + io::ErrorKind::InvalidData, + "Failed to deserialize u128 vint", + )) +} + +/// Wrapper over a `u128` that serializes as a variable int. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct VIntU128(pub u128); + +impl BinarySerializable for VIntU128 { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + let mut buffer = vec![]; + serialize_vint_u128(self.0, &mut buffer); + writer.write_all(&buffer) + } + + fn deserialize(reader: &mut R) -> io::Result { + let mut bytes = reader.bytes(); + let mut result = 0u128; + let mut shift = 0u64; + loop { + match bytes.next() { + Some(Ok(b)) => { + result |= u128::from(b % 128u8) << shift; + if b >= STOP_BIT { + return Ok(VIntU128(result)); + } + shift += 7; + } + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Reach end of buffer while reading VInt", + )); + } + } + } + } +} + /// Wrapper over a `u64` that serializes as a variable int. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct VInt(pub u64); @@ -176,6 +245,7 @@ impl BinarySerializable for VInt { mod tests { use super::{serialize_vint_u32, BinarySerializable, VInt}; + use crate::vint::{deserialize_vint_u128, serialize_vint_u128, VIntU128}; fn aux_test_vint(val: u64) { let mut v = [14u8; 10]; @@ -217,6 +287,26 @@ mod tests { assert_eq!(&buffer[..len_vint], res2, "array wrong for {}", val); } + fn aux_test_vint_u128(val: u128) { + let mut data = vec![]; + serialize_vint_u128(val, &mut data); + let (deser_val, _data) = deserialize_vint_u128(&data).unwrap(); + assert_eq!(val, deser_val); + + let mut out = vec![]; + VIntU128(val).serialize(&mut out).unwrap(); + let deser_val = VIntU128::deserialize(&mut &out[..]).unwrap(); + assert_eq!(val, deser_val.0); + } + + #[test] + fn test_vint_u128() { + aux_test_vint_u128(0); + aux_test_vint_u128(1); + aux_test_vint_u128(u128::MAX / 3); + aux_test_vint_u128(u128::MAX); + } + #[test] fn test_vint_u32() { aux_test_serialize_vint_u32(0); diff --git a/fastfield_codecs/Cargo.toml b/fastfield_codecs/Cargo.toml index cddee15de..a53a8ceeb 100644 --- a/fastfield_codecs/Cargo.toml +++ b/fastfield_codecs/Cargo.toml @@ -16,6 +16,8 @@ prettytable-rs = {version="0.9.0", optional= true} rand = {version="0.8.3", optional= true} fastdivide = "0.4" log = "0.4" +itertools = { version = "0.10.3" } +measure_time = { version="0.8.2", optional=true} [dev-dependencies] more-asserts = "0.3.0" @@ -23,7 +25,7 @@ proptest = "1.0.0" rand = "0.8.3" [features] -bin = ["prettytable-rs", "rand"] +bin = ["prettytable-rs", "rand", "measure_time"] default = ["bin"] unstable = [] diff --git a/fastfield_codecs/benches/bench.rs b/fastfield_codecs/benches/bench.rs index c30df44e5..87e9c8baa 100644 --- a/fastfield_codecs/benches/bench.rs +++ b/fastfield_codecs/benches/bench.rs @@ -4,9 +4,222 @@ extern crate test; #[cfg(test)] mod tests { + use std::iter; use std::sync::Arc; use fastfield_codecs::*; + use rand::prelude::*; + + use super::*; + + // Warning: this generates the same permutation at each call + fn generate_permutation() -> Vec { + let mut permutation: Vec = (0u64..100_000u64).collect(); + permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); + permutation + } + + fn generate_random() -> Vec { + let mut permutation: Vec = (0u64..100_000u64) + .map(|el| el + random::() as u64) + .collect(); + permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); + permutation + } + + // Warning: this generates the same permutation at each call + fn generate_permutation_gcd() -> Vec { + let mut permutation: Vec = (1u64..100_000u64).map(|el| el * 1000).collect(); + permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); + permutation + } + + pub fn serialize_and_load( + column: &[T], + ) -> Arc> { + let mut buffer = Vec::new(); + serialize(VecColumn::from(&column), &mut buffer, &ALL_CODEC_TYPES).unwrap(); + open(OwnedBytes::new(buffer)).unwrap() + } + + #[bench] + fn bench_intfastfield_jumpy_veclookup(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + b.iter(|| { + let mut a = 0u64; + for _ in 0..n { + a = permutation[a as usize]; + } + a + }); + } + + #[bench] + fn bench_intfastfield_jumpy_fflookup(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + let column: Arc> = serialize_and_load(&permutation); + b.iter(|| { + let mut a = 0u64; + for _ in 0..n { + a = column.get_val(a as u64); + } + a + }); + } + + fn get_exp_data() -> Vec { + let mut data = vec![]; + for i in 0..100 { + let num = i * i; + data.extend(iter::repeat(i as u64).take(num)); + } + data.shuffle(&mut StdRng::from_seed([1u8; 32])); + + // lengt = 328350 + data + } + + fn get_data_50percent_item() -> (u128, u128, Vec) { + let mut permutation = get_exp_data(); + let major_item = 20; + let minor_item = 10; + permutation.extend(iter::repeat(major_item).take(permutation.len())); + permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); + let permutation = permutation.iter().map(|el| *el as u128).collect::>(); + (major_item as u128, minor_item as u128, permutation) + } + fn get_u128_column_random() -> Arc> { + let permutation = generate_random(); + let permutation = permutation.iter().map(|el| *el as u128).collect::>(); + get_u128_column_from_data(&permutation) + } + + fn get_u128_column_from_data(data: &[u128]) -> Arc> { + let mut out = vec![]; + serialize_u128(VecColumn::from(&data), &mut out).unwrap(); + let out = OwnedBytes::new(out); + open_u128(out).unwrap() + } + + #[bench] + fn bench_intfastfield_getrange_u128_50percent_hit(b: &mut Bencher) { + let (major_item, _minor_item, data) = get_data_50percent_item(); + let column = get_u128_column_from_data(&data); + + b.iter(|| column.get_between_vals(major_item..=major_item)); + } + + #[bench] + fn bench_intfastfield_getrange_u128_single_hit(b: &mut Bencher) { + let (_major_item, minor_item, data) = get_data_50percent_item(); + let column = get_u128_column_from_data(&data); + + b.iter(|| column.get_between_vals(minor_item..=minor_item)); + } + + #[bench] + fn bench_intfastfield_getrange_u128_hit_all(b: &mut Bencher) { + let (_major_item, _minor_item, data) = get_data_50percent_item(); + let column = get_u128_column_from_data(&data); + + b.iter(|| column.get_between_vals(0..=u128::MAX)); + } + + #[bench] + fn bench_intfastfield_scan_all_fflookup_u128(b: &mut Bencher) { + let column = get_u128_column_random(); + + b.iter(|| { + let mut a = 0u128; + for i in 0u64..column.num_vals() as u64 { + a += column.get_val(i); + } + a + }); + } + + #[bench] + fn bench_intfastfield_jumpy_stride5_u128(b: &mut Bencher) { + let column = get_u128_column_random(); + + b.iter(|| { + let n = column.num_vals(); + let mut a = 0u128; + for i in (0..n / 5).map(|val| val * 5) { + a += column.get_val(i as u64); + } + a + }); + } + + #[bench] + fn bench_intfastfield_stride7_vec(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + b.iter(|| { + let mut a = 0u64; + for i in (0..n / 7).map(|val| val * 7) { + a += permutation[i as usize]; + } + a + }); + } + + #[bench] + fn bench_intfastfield_stride7_fflookup(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + let column: Arc> = serialize_and_load(&permutation); + b.iter(|| { + let mut a = 0u64; + for i in (0..n / 7).map(|val| val * 7) { + a += column.get_val(i as u64); + } + a + }); + } + + #[bench] + fn bench_intfastfield_scan_all_fflookup(b: &mut Bencher) { + let permutation = generate_permutation(); + let n = permutation.len(); + let column: Arc> = serialize_and_load(&permutation); + b.iter(|| { + let mut a = 0u64; + for i in 0u64..n as u64 { + a += column.get_val(i); + } + a + }); + } + + #[bench] + fn bench_intfastfield_scan_all_fflookup_gcd(b: &mut Bencher) { + let permutation = generate_permutation_gcd(); + let n = permutation.len(); + let column: Arc> = serialize_and_load(&permutation); + b.iter(|| { + let mut a = 0u64; + for i in 0..n as u64 { + a += column.get_val(i); + } + a + }); + } + + #[bench] + fn bench_intfastfield_scan_all_vec(b: &mut Bencher) { + let permutation = generate_permutation(); + b.iter(|| { + let mut a = 0u64; + for i in 0..permutation.len() { + a += permutation[i as usize] as u64; + } + a + }); + } fn get_data() -> Vec { let mut rng = StdRng::seed_from_u64(2u64); @@ -28,12 +241,14 @@ mod tests { } fn get_reader_for_bench(data: &[u64]) -> Codec::Reader { let mut bytes = Vec::new(); + let min_value = *data.iter().min().unwrap(); + let data = data.iter().map(|el| *el - min_value).collect::>(); let col = VecColumn::from(&data); let normalized_header = fastfield_codecs::NormalizedHeader { num_vals: col.num_vals(), max_value: col.max_value(), }; - Codec::serialize(&VecColumn::from(data), &mut bytes).unwrap(); + Codec::serialize(&VecColumn::from(&data), &mut bytes).unwrap(); Codec::open_from_bytes(OwnedBytes::new(bytes), normalized_header).unwrap() } fn bench_get(b: &mut Bencher, data: &[u64]) { @@ -65,10 +280,13 @@ mod tests { bench_get_dynamic_helper(b, col); } fn bench_create(b: &mut Bencher, data: &[u64]) { + let min_value = *data.iter().min().unwrap(); + let data = data.iter().map(|el| *el - min_value).collect::>(); + let mut bytes = Vec::new(); b.iter(|| { bytes.clear(); - Codec::serialize(&VecColumn::from(data), &mut bytes).unwrap(); + Codec::serialize(&VecColumn::from(&data), &mut bytes).unwrap(); }); } diff --git a/fastfield_codecs/src/column.rs b/fastfield_codecs/src/column.rs index e9d2e9db5..eb73be542 100644 --- a/fastfield_codecs/src/column.rs +++ b/fastfield_codecs/src/column.rs @@ -1,9 +1,10 @@ use std::marker::PhantomData; +use std::ops::RangeInclusive; use std::sync::Mutex; use tantivy_bitpacker::minmax; -pub trait Column: Send + Sync { +pub trait Column: Send + Sync { /// Return the value associated to the given idx. /// /// This accessor should return as fast as possible. @@ -28,6 +29,19 @@ pub trait Column: Send + Sync { } } + /// Return the positions of values which are in the provided range. + #[inline] + fn get_between_vals(&self, range: RangeInclusive) -> Vec { + let mut vals = Vec::new(); + for idx in 0..self.num_vals() { + let val = self.get_val(idx); + if range.contains(&val) { + vals.push(idx); + } + } + vals + } + /// Returns the minimum value for this fast field. /// /// This min_value may not be exact. @@ -131,7 +145,7 @@ struct MonotonicMappingColumn { } /// Creates a view of a column transformed by a monotonic mapping. -pub fn monotonic_map_column( +pub fn monotonic_map_column( from_column: C, monotonic_mapping: T, ) -> impl Column @@ -148,7 +162,8 @@ where } } -impl Column for MonotonicMappingColumn +impl Column + for MonotonicMappingColumn where C: Column, T: Fn(Input) -> Output + Send + Sync, @@ -217,7 +232,9 @@ where T: Iterator + Clone + ExactSizeIterator } impl Column for IterColumn -where T: Iterator + Clone + ExactSizeIterator + Send + Sync +where + T: Iterator + Clone + ExactSizeIterator + Send + Sync, + T::Item: PartialOrd, { fn get_val(&self, idx: u64) -> T::Item { self.0.clone().nth(idx as usize).unwrap() diff --git a/fastfield_codecs/src/compact_space/blank_range.rs b/fastfield_codecs/src/compact_space/blank_range.rs new file mode 100644 index 000000000..a1f265f00 --- /dev/null +++ b/fastfield_codecs/src/compact_space/blank_range.rs @@ -0,0 +1,43 @@ +use std::ops::RangeInclusive; + +/// The range of a blank in value space. +/// +/// A blank is an unoccupied space in the data. +/// Use try_into() to construct. +/// A range has to have at least length of 3. Invalid ranges will be rejected. +/// +/// Ordered by range length. +#[derive(Debug, Eq, PartialEq, Clone)] +pub(crate) struct BlankRange { + blank_range: RangeInclusive, +} +impl TryFrom> for BlankRange { + type Error = &'static str; + fn try_from(range: RangeInclusive) -> Result { + let blank_size = range.end().saturating_sub(*range.start()); + if blank_size < 2 { + Err("invalid range") + } else { + Ok(BlankRange { blank_range: range }) + } + } +} +impl BlankRange { + pub(crate) fn blank_size(&self) -> u128 { + self.blank_range.end() - self.blank_range.start() + 1 + } + pub(crate) fn blank_range(&self) -> RangeInclusive { + self.blank_range.clone() + } +} + +impl Ord for BlankRange { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.blank_size().cmp(&other.blank_size()) + } +} +impl PartialOrd for BlankRange { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.blank_size().cmp(&other.blank_size())) + } +} diff --git a/fastfield_codecs/src/compact_space/build_compact_space.rs b/fastfield_codecs/src/compact_space/build_compact_space.rs new file mode 100644 index 000000000..02bbe97f8 --- /dev/null +++ b/fastfield_codecs/src/compact_space/build_compact_space.rs @@ -0,0 +1,231 @@ +use std::collections::{BTreeSet, BinaryHeap}; +use std::iter; +use std::ops::RangeInclusive; + +use itertools::Itertools; + +use super::blank_range::BlankRange; +use super::{CompactSpace, RangeMapping}; + +/// Put the blanks for the sorted values into a binary heap +fn get_blanks(values_sorted: &BTreeSet) -> BinaryHeap { + let mut blanks: BinaryHeap = BinaryHeap::new(); + for (first, second) in values_sorted.iter().tuple_windows() { + // Correctness Overflow: the values are deduped and sorted (BTreeSet property), that means + // there's always space between two values. + let blank_range = first + 1..=second - 1; + let blank_range: Result = blank_range.try_into(); + if let Ok(blank_range) = blank_range { + blanks.push(blank_range); + } + } + + blanks +} + +struct BlankCollector { + blanks: Vec, + staged_blanks_sum: u128, +} +impl BlankCollector { + fn new() -> Self { + Self { + blanks: vec![], + staged_blanks_sum: 0, + } + } + fn stage_blank(&mut self, blank: BlankRange) { + self.staged_blanks_sum += blank.blank_size(); + self.blanks.push(blank); + } + fn drain(&mut self) -> impl Iterator + '_ { + self.staged_blanks_sum = 0; + self.blanks.drain(..) + } + fn staged_blanks_sum(&self) -> u128 { + self.staged_blanks_sum + } + fn num_staged_blanks(&self) -> usize { + self.blanks.len() + } +} +fn num_bits(val: u128) -> u8 { + (128u32 - val.leading_zeros()) as u8 +} + +/// Will collect blanks and add them to compact space if more bits are saved than cost from +/// metadata. +pub fn get_compact_space( + values_deduped_sorted: &BTreeSet, + total_num_values: u64, + cost_per_blank: usize, +) -> CompactSpace { + let mut compact_space_builder = CompactSpaceBuilder::new(); + if values_deduped_sorted.is_empty() { + return compact_space_builder.finish(); + } + + let mut blanks: BinaryHeap = get_blanks(values_deduped_sorted); + // Replace after stabilization of https://github.com/rust-lang/rust/issues/62924 + + // We start by space that's limited to min_value..=max_value + let min_value = *values_deduped_sorted.iter().next().unwrap_or(&0); + let max_value = *values_deduped_sorted.iter().last().unwrap_or(&0); + + // +1 for null, in case min and max covers the whole space, we are off by one. + let mut amplitude_compact_space = (max_value - min_value).saturating_add(1); + if min_value != 0 { + compact_space_builder.add_blanks(iter::once(0..=min_value - 1)); + } + if max_value != u128::MAX { + compact_space_builder.add_blanks(iter::once(max_value + 1..=u128::MAX)); + } + + let mut amplitude_bits: u8 = num_bits(amplitude_compact_space); + + let mut blank_collector = BlankCollector::new(); + // We will stage blanks until they reduce the compact space by at least 1 bit and then flush + // them if the metadata cost is lower than the total number of saved bits. + // Binary heap to process the gaps by their size + while let Some(blank_range) = blanks.pop() { + blank_collector.stage_blank(blank_range); + + let staged_spaces_sum: u128 = blank_collector.staged_blanks_sum(); + let amplitude_new_compact_space = amplitude_compact_space - staged_spaces_sum; + let amplitude_new_bits = num_bits(amplitude_new_compact_space); + if amplitude_bits == amplitude_new_bits { + continue; + } + let saved_bits = (amplitude_bits - amplitude_new_bits) as usize * total_num_values as usize; + // TODO: Maybe calculate exact cost of blanks and run this more expensive computation only, + // when amplitude_new_bits changes + let cost = blank_collector.num_staged_blanks() * cost_per_blank; + if cost >= saved_bits { + // Continue here, since although we walk over the blanks by size, + // we can potentially save a lot at the last bits, which are smaller blanks + // + // E.g. if the first range reduces the compact space by 1000 from 2000 to 1000, which + // saves 11-10=1 bit and the next range reduces the compact space by 950 to + // 50, which saves 10-6=4 bit + continue; + } + + amplitude_compact_space = amplitude_new_compact_space; + amplitude_bits = amplitude_new_bits; + compact_space_builder.add_blanks(blank_collector.drain().map(|blank| blank.blank_range())); + } + + // special case, when we don't collected any blanks because: + // * the data is empty (early exit) + // * the algorithm did decide it's not worth the cost, which can be the case for single values + // + // We drain one collected blank unconditionally, so the empty case is reserved for empty + // data, and therefore empty compact_space means the data is empty and no data is covered + // (conversely to all data) and we can assign null to it. + if compact_space_builder.is_empty() { + compact_space_builder.add_blanks( + blank_collector + .drain() + .map(|blank| blank.blank_range()) + .take(1), + ); + } + + let compact_space = compact_space_builder.finish(); + if max_value - min_value != u128::MAX { + debug_assert_eq!( + compact_space.amplitude_compact_space(), + amplitude_compact_space + ); + } + compact_space +} + +#[derive(Debug, Clone, Eq, PartialEq)] +struct CompactSpaceBuilder { + blanks: Vec>, +} + +impl CompactSpaceBuilder { + /// Creates a new compact space builder which will initially cover the whole space. + fn new() -> Self { + Self { blanks: Vec::new() } + } + + /// Assumes that repeated add_blank calls don't overlap and are not adjacent, + /// e.g. [3..=5, 5..=10] is not allowed + /// + /// Both of those assumptions are true when blanks are produced from sorted values. + fn add_blanks(&mut self, blank: impl Iterator>) { + self.blanks.extend(blank); + } + + fn is_empty(&self) -> bool { + self.blanks.is_empty() + } + + /// Convert blanks to covered space and assign null value + fn finish(mut self) -> CompactSpace { + // sort by start. ranges are not allowed to overlap + self.blanks.sort_unstable_by_key(|blank| *blank.start()); + + let mut covered_space = Vec::with_capacity(self.blanks.len()); + + // begining of the blanks + if let Some(first_blank_start) = self.blanks.first().map(RangeInclusive::start) { + if *first_blank_start != 0 { + covered_space.push(0..=first_blank_start - 1); + } + } + + // Between the blanks + let between_blanks = self.blanks.iter().tuple_windows().map(|(left, right)| { + assert!( + left.end() < right.start(), + "overlapping or adjacent ranges detected" + ); + *left.end() + 1..=*right.start() - 1 + }); + covered_space.extend(between_blanks); + + // end of the blanks + if let Some(last_blank_end) = self.blanks.last().map(RangeInclusive::end) { + if *last_blank_end != u128::MAX { + covered_space.push(last_blank_end + 1..=u128::MAX); + } + } + + if covered_space.is_empty() { + covered_space.push(0..=0); // empty data case + }; + + let mut compact_start: u64 = 1; // 0 is reserved for `null` + let mut ranges_mapping: Vec = Vec::with_capacity(covered_space.len()); + for cov in covered_space { + let range_mapping = super::RangeMapping { + value_range: cov, + compact_start, + }; + let covered_range_len = range_mapping.range_length(); + ranges_mapping.push(range_mapping); + compact_start += covered_range_len as u64; + } + // println!("num ranges {}", ranges_mapping.len()); + CompactSpace { ranges_mapping } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_binary_heap_pop_order() { + let mut blanks: BinaryHeap = BinaryHeap::new(); + blanks.push((0..=10).try_into().unwrap()); + blanks.push((100..=200).try_into().unwrap()); + blanks.push((100..=110).try_into().unwrap()); + assert_eq!(blanks.pop().unwrap().blank_size(), 101); + assert_eq!(blanks.pop().unwrap().blank_size(), 11); + } +} diff --git a/fastfield_codecs/src/compact_space/mod.rs b/fastfield_codecs/src/compact_space/mod.rs new file mode 100644 index 000000000..d137b30f4 --- /dev/null +++ b/fastfield_codecs/src/compact_space/mod.rs @@ -0,0 +1,666 @@ +/// This codec takes a large number space (u128) and reduces it to a compact number space. +/// +/// It will find spaces in the number range. For example: +/// +/// 100, 101, 102, 103, 104, 50000, 50001 +/// could be mapped to +/// 100..104 -> 0..4 +/// 50000..50001 -> 5..6 +/// +/// Compact space 0..=6 requires much less bits than 100..=50001 +/// +/// The codec is created to compress ip addresses, but may be employed in other use cases. +use std::{ + cmp::Ordering, + collections::BTreeSet, + io::{self, Write}, + ops::RangeInclusive, +}; + +use common::{BinarySerializable, CountingWriter, VInt, VIntU128}; +use ownedbytes::OwnedBytes; +use tantivy_bitpacker::{self, BitPacker, BitUnpacker}; + +use crate::compact_space::build_compact_space::get_compact_space; +use crate::Column; + +mod blank_range; +mod build_compact_space; + +/// The cost per blank is quite hard actually, since blanks are delta encoded, the actual cost of +/// blanks depends on the number of blanks. +/// +/// The number is taken by looking at a real dataset. It is optimized for larger datasets. +const COST_PER_BLANK_IN_BITS: usize = 36; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct CompactSpace { + ranges_mapping: Vec, +} + +/// Maps the range from the original space to compact_start + range.len() +#[derive(Debug, Clone, Eq, PartialEq)] +struct RangeMapping { + value_range: RangeInclusive, + compact_start: u64, +} +impl RangeMapping { + fn range_length(&self) -> u64 { + (self.value_range.end() - self.value_range.start()) as u64 + 1 + } + + // The last value of the compact space in this range + fn compact_end(&self) -> u64 { + self.compact_start + self.range_length() - 1 + } +} + +impl BinarySerializable for CompactSpace { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + VInt(self.ranges_mapping.len() as u64).serialize(writer)?; + + let mut prev_value = 0; + for value_range in self + .ranges_mapping + .iter() + .map(|range_mapping| &range_mapping.value_range) + { + let blank_delta_start = value_range.start() - prev_value; + VIntU128(blank_delta_start).serialize(writer)?; + prev_value = *value_range.start(); + + let blank_delta_end = value_range.end() - prev_value; + VIntU128(blank_delta_end).serialize(writer)?; + prev_value = *value_range.end(); + } + + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let num_ranges = VInt::deserialize(reader)?.0; + let mut ranges_mapping: Vec = vec![]; + let mut value = 0u128; + let mut compact_start = 1u64; // 0 is reserved for `null` + for _ in 0..num_ranges { + let blank_delta_start = VIntU128::deserialize(reader)?.0; + value += blank_delta_start; + let blank_start = value; + + let blank_delta_end = VIntU128::deserialize(reader)?.0; + value += blank_delta_end; + let blank_end = value; + + let range_mapping = RangeMapping { + value_range: blank_start..=blank_end, + compact_start, + }; + let range_length = range_mapping.range_length(); + ranges_mapping.push(range_mapping); + compact_start += range_length as u64; + } + + Ok(Self { ranges_mapping }) + } +} + +impl CompactSpace { + /// Amplitude is the value range of the compact space including the sentinel value used to + /// identify null values. The compact space is 0..=amplitude . + /// + /// It's only used to verify we don't exceed u64 number space, which would indicate a bug. + fn amplitude_compact_space(&self) -> u128 { + self.ranges_mapping + .last() + .map(|last_range| last_range.compact_end() as u128) + .unwrap_or(1) // compact space starts at 1, 0 == null + } + + fn get_range_mapping(&self, pos: usize) -> &RangeMapping { + &self.ranges_mapping[pos] + } + + /// Returns either Ok(the value in the compact space) or if it is outside the compact space the + /// Err(position where it would be inserted) + fn u128_to_compact(&self, value: u128) -> Result { + self.ranges_mapping + .binary_search_by(|probe| { + let value_range = &probe.value_range; + if value < *value_range.start() { + Ordering::Greater + } else if value > *value_range.end() { + Ordering::Less + } else { + Ordering::Equal + } + }) + .map(|pos| { + let range_mapping = &self.ranges_mapping[pos]; + let pos_in_range = (value - range_mapping.value_range.start()) as u64; + range_mapping.compact_start + pos_in_range + }) + } + + /// Unpacks a value from compact space u64 to u128 space + fn compact_to_u128(&self, compact: u64) -> u128 { + let pos = self + .ranges_mapping + .binary_search_by_key(&compact, |range_mapping| range_mapping.compact_start) + // Correctness: Overflow. The first range starts at compact space 0, the error from + // binary search can never be 0 + .map_or_else(|e| e - 1, |v| v); + + let range_mapping = &self.ranges_mapping[pos]; + let diff = compact - range_mapping.compact_start; + range_mapping.value_range.start() + diff as u128 + } +} + +pub struct CompactSpaceCompressor { + params: IPCodecParams, +} +#[derive(Debug, Clone)] +pub struct IPCodecParams { + compact_space: CompactSpace, + bit_unpacker: BitUnpacker, + min_value: u128, + max_value: u128, + num_vals: u64, + num_bits: u8, +} + +impl CompactSpaceCompressor { + /// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals. + pub fn train_from(column: &impl Column) -> Self { + let mut values_sorted = BTreeSet::new(); + values_sorted.extend(column.iter()); + let total_num_values = column.num_vals(); + + let compact_space = + get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS); + let amplitude_compact_space = compact_space.amplitude_compact_space(); + + assert!( + amplitude_compact_space <= u64::MAX as u128, + "case unsupported." + ); + + let num_bits = tantivy_bitpacker::compute_num_bits(amplitude_compact_space as u64); + let min_value = *values_sorted.iter().next().unwrap_or(&0); + let max_value = *values_sorted.iter().last().unwrap_or(&0); + assert_eq!( + compact_space + .u128_to_compact(max_value) + .expect("could not convert max value to compact space"), + amplitude_compact_space as u64 + ); + CompactSpaceCompressor { + params: IPCodecParams { + compact_space, + bit_unpacker: BitUnpacker::new(num_bits), + min_value, + max_value, + num_vals: total_num_values as u64, + num_bits, + }, + } + } + + fn write_footer(self, writer: &mut impl Write) -> io::Result<()> { + let writer = &mut CountingWriter::wrap(writer); + self.params.serialize(writer)?; + + let footer_len = writer.written_bytes() as u32; + footer_len.serialize(writer)?; + + Ok(()) + } + + pub fn compress_into( + self, + vals: impl Iterator, + write: &mut impl Write, + ) -> io::Result<()> { + let mut bitpacker = BitPacker::default(); + for val in vals { + let compact = self + .params + .compact_space + .u128_to_compact(val) + .map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidData, + "Could not convert value to compact_space. This is a bug.", + ) + })?; + bitpacker.write(compact, self.params.num_bits, write)?; + } + bitpacker.close(write)?; + self.write_footer(write)?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct CompactSpaceDecompressor { + data: OwnedBytes, + params: IPCodecParams, +} + +impl BinarySerializable for IPCodecParams { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + // header flags for future optional dictionary encoding + let footer_flags = 0u64; + footer_flags.serialize(writer)?; + + VIntU128(self.min_value).serialize(writer)?; + VIntU128(self.max_value).serialize(writer)?; + VIntU128(self.num_vals as u128).serialize(writer)?; + self.num_bits.serialize(writer)?; + + self.compact_space.serialize(writer)?; + + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let _header_flags = u64::deserialize(reader)?; + let min_value = VIntU128::deserialize(reader)?.0; + let max_value = VIntU128::deserialize(reader)?.0; + let num_vals = VIntU128::deserialize(reader)?.0 as u64; + let num_bits = u8::deserialize(reader)?; + let compact_space = CompactSpace::deserialize(reader)?; + + Ok(Self { + compact_space, + bit_unpacker: BitUnpacker::new(num_bits), + min_value, + max_value, + num_vals, + num_bits, + }) + } +} + +impl Column for CompactSpaceDecompressor { + #[inline] + fn get_val(&self, doc: u64) -> u128 { + self.get(doc) + } + + fn min_value(&self) -> u128 { + self.min_value() + } + + fn max_value(&self) -> u128 { + self.max_value() + } + + fn num_vals(&self) -> u64 { + self.params.num_vals + } + + #[inline] + fn iter<'a>(&'a self) -> Box + 'a> { + Box::new(self.iter()) + } + fn get_between_vals(&self, range: RangeInclusive) -> Vec { + self.get_between_vals(range) + } +} + +impl CompactSpaceDecompressor { + pub fn open(data: OwnedBytes) -> io::Result { + let (data_slice, footer_len_bytes) = data.split_at(data.len() - 4); + let footer_len = u32::deserialize(&mut &footer_len_bytes[..])?; + + let data_footer = &data_slice[data_slice.len() - footer_len as usize..]; + let params = IPCodecParams::deserialize(&mut &data_footer[..])?; + let decompressor = CompactSpaceDecompressor { data, params }; + + Ok(decompressor) + } + + /// Converting to compact space for the decompressor is more complex, since we may get values + /// which are outside the compact space. e.g. if we map + /// 1000 => 5 + /// 2000 => 6 + /// + /// and we want a mapping for 1005, there is no equivalent compact space. We instead return an + /// error with the index of the next range. + fn u128_to_compact(&self, value: u128) -> Result { + self.params.compact_space.u128_to_compact(value) + } + + fn compact_to_u128(&self, compact: u64) -> u128 { + self.params.compact_space.compact_to_u128(compact) + } + + /// Comparing on compact space: Random dataset 0,24 (50% random hit) - 1.05 GElements/s + /// Comparing on compact space: Real dataset 1.08 GElements/s + /// + /// Comparing on original space: Real dataset .06 GElements/s (not completely optimized) + pub fn get_between_vals(&self, range: RangeInclusive) -> Vec { + if range.start() > range.end() { + return Vec::new(); + } + let from_value = *range.start(); + let to_value = *range.end(); + assert!(to_value >= from_value); + let compact_from = self.u128_to_compact(from_value); + let compact_to = self.u128_to_compact(to_value); + + // Quick return, if both ranges fall into the same non-mapped space, the range can't cover + // any values, so we can early exit + match (compact_to, compact_from) { + (Err(pos1), Err(pos2)) if pos1 == pos2 => return Vec::new(), + _ => {} + } + + let compact_from = compact_from.unwrap_or_else(|pos| { + // Correctness: Out of bounds, if this value is Err(last_index + 1), we early exit, + // since the to_value also mapps into the same non-mapped space + let range_mapping = self.params.compact_space.get_range_mapping(pos); + range_mapping.compact_start + }); + // If there is no compact space, we go to the closest upperbound compact space + let compact_to = compact_to.unwrap_or_else(|pos| { + // Correctness: Overflow, if this value is Err(0), we early exit, + // since the from_value also mapps into the same non-mapped space + + // Get end of previous range + let pos = pos - 1; + let range_mapping = self.params.compact_space.get_range_mapping(pos); + range_mapping.compact_end() + }); + + let range = compact_from..=compact_to; + let mut positions = Vec::new(); + + let step_size = 4; + let cutoff = self.params.num_vals - self.params.num_vals % step_size; + + let mut push_if_in_range = |idx, val| { + if range.contains(&val) { + positions.push(idx); + } + }; + let get_val = |idx| self.params.bit_unpacker.get(idx as u64, &self.data); + // unrolled loop + for idx in (0..cutoff).step_by(step_size as usize) { + let idx1 = idx; + let idx2 = idx + 1; + let idx3 = idx + 2; + let idx4 = idx + 3; + let val1 = get_val(idx1); + let val2 = get_val(idx2); + let val3 = get_val(idx3); + let val4 = get_val(idx4); + push_if_in_range(idx1, val1); + push_if_in_range(idx2, val2); + push_if_in_range(idx3, val3); + push_if_in_range(idx4, val4); + } + + // handle rest + for idx in cutoff..self.params.num_vals { + push_if_in_range(idx, get_val(idx)); + } + + positions + } + + #[inline] + fn iter_compact(&self) -> impl Iterator + '_ { + (0..self.params.num_vals) + .map(move |idx| self.params.bit_unpacker.get(idx as u64, &self.data) as u64) + } + + #[inline] + fn iter(&self) -> impl Iterator + '_ { + // TODO: Performance. It would be better to iterate on the ranges and check existence via + // the bit_unpacker. + self.iter_compact() + .map(|compact| self.compact_to_u128(compact)) + } + + #[inline] + pub fn get(&self, idx: u64) -> u128 { + let compact = self.params.bit_unpacker.get(idx, &self.data); + self.compact_to_u128(compact) + } + + pub fn min_value(&self) -> u128 { + self.params.min_value + } + + pub fn max_value(&self) -> u128 { + self.params.max_value + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use crate::{open_u128, serialize_u128, VecColumn}; + + #[test] + fn compact_space_test() { + let ips = &[ + 2u128, 4u128, 1000, 1001, 1002, 1003, 1004, 1005, 1008, 1010, 1012, 1260, + ] + .into_iter() + .collect(); + let compact_space = get_compact_space(ips, ips.len() as u64, 11); + let amplitude = compact_space.amplitude_compact_space(); + assert_eq!(amplitude, 17); + assert_eq!(1, compact_space.u128_to_compact(2).unwrap()); + assert_eq!(2, compact_space.u128_to_compact(3).unwrap()); + assert_eq!(compact_space.u128_to_compact(100).unwrap_err(), 1); + + for (num1, num2) in (0..3).tuple_windows() { + assert_eq!( + compact_space.get_range_mapping(num1).compact_end() + 1, + compact_space.get_range_mapping(num2).compact_start + ); + } + + let mut output: Vec = Vec::new(); + compact_space.serialize(&mut output).unwrap(); + + assert_eq!( + compact_space, + CompactSpace::deserialize(&mut &output[..]).unwrap() + ); + + for ip in ips { + let compact = compact_space.u128_to_compact(*ip).unwrap(); + assert_eq!(compact_space.compact_to_u128(compact), *ip); + } + } + + #[test] + fn compact_space_amplitude_test() { + let ips = &[100000u128, 1000000].into_iter().collect(); + let compact_space = get_compact_space(ips, ips.len() as u64, 1); + let amplitude = compact_space.amplitude_compact_space(); + assert_eq!(amplitude, 2); + } + + fn test_all(data: OwnedBytes, expected: &[u128]) { + let decompressor = CompactSpaceDecompressor::open(data).unwrap(); + for (idx, expected_val) in expected.iter().cloned().enumerate() { + let val = decompressor.get(idx as u64); + assert_eq!(val, expected_val); + + let test_range = |range: RangeInclusive| { + let expected_positions = expected + .iter() + .positions(|val| range.contains(val)) + .map(|pos| pos as u64) + .collect::>(); + let positions = decompressor.get_between_vals(range); + assert_eq!(positions, expected_positions); + }; + + test_range(expected_val.saturating_sub(1)..=expected_val); + test_range(expected_val..=expected_val); + test_range(expected_val..=expected_val.saturating_add(1)); + test_range(expected_val.saturating_sub(1)..=expected_val.saturating_add(1)); + } + } + + fn test_aux_vals(u128_vals: &[u128]) -> OwnedBytes { + let mut out = Vec::new(); + serialize_u128(VecColumn::from(u128_vals), &mut out).unwrap(); + + let data = OwnedBytes::new(out); + test_all(data.clone(), u128_vals); + data + } + + #[test] + fn test_range_1() { + let vals = &[ + 1u128, + 100u128, + 3u128, + 99999u128, + 100000u128, + 100001u128, + 4_000_211_221u128, + 4_000_211_222u128, + 333u128, + ]; + let data = test_aux_vals(vals); + let decomp = CompactSpaceDecompressor::open(data).unwrap(); + let positions = decomp.get_between_vals(0..=1); + assert_eq!(positions, vec![0]); + let positions = decomp.get_between_vals(0..=2); + assert_eq!(positions, vec![0]); + let positions = decomp.get_between_vals(0..=3); + assert_eq!(positions, vec![0, 2]); + assert_eq!(decomp.get_between_vals(99999u128..=99999u128), vec![3]); + assert_eq!(decomp.get_between_vals(99999u128..=100000u128), vec![3, 4]); + assert_eq!(decomp.get_between_vals(99998u128..=100000u128), vec![3, 4]); + assert_eq!(decomp.get_between_vals(99998u128..=99999u128), vec![3]); + assert_eq!(decomp.get_between_vals(99998u128..=99998u128), vec![]); + assert_eq!(decomp.get_between_vals(333u128..=333u128), vec![8]); + assert_eq!(decomp.get_between_vals(332u128..=333u128), vec![8]); + assert_eq!(decomp.get_between_vals(332u128..=334u128), vec![8]); + assert_eq!(decomp.get_between_vals(333u128..=334u128), vec![8]); + + assert_eq!( + decomp.get_between_vals(4_000_211_221u128..=5_000_000_000u128), + vec![6, 7] + ); + } + + #[test] + fn test_empty() { + let vals = &[]; + let data = test_aux_vals(vals); + let _decomp = CompactSpaceDecompressor::open(data).unwrap(); + } + + #[test] + fn test_range_2() { + let vals = &[ + 100u128, + 99999u128, + 100000u128, + 100001u128, + 4_000_211_221u128, + 4_000_211_222u128, + 333u128, + ]; + let data = test_aux_vals(vals); + let decomp = CompactSpaceDecompressor::open(data).unwrap(); + let positions = decomp.get_between_vals(0..=5); + assert_eq!(positions, vec![]); + let positions = decomp.get_between_vals(0..=100); + assert_eq!(positions, vec![0]); + let positions = decomp.get_between_vals(0..=105); + assert_eq!(positions, vec![0]); + } + + #[test] + fn test_range_3() { + let vals = &[ + 200u128, + 201, + 202, + 203, + 204, + 204, + 206, + 207, + 208, + 209, + 210, + 1_000_000, + 5_000_000_000, + ]; + let mut out = Vec::new(); + serialize_u128(VecColumn::from(vals), &mut out).unwrap(); + let decomp = open_u128(OwnedBytes::new(out)).unwrap(); + + assert_eq!(decomp.get_between_vals(199..=200), vec![0]); + assert_eq!(decomp.get_between_vals(199..=201), vec![0, 1]); + assert_eq!(decomp.get_between_vals(200..=200), vec![0]); + assert_eq!(decomp.get_between_vals(1_000_000..=1_000_000), vec![11]); + } + + #[test] + fn test_bug1() { + let vals = &[9223372036854775806]; + let _data = test_aux_vals(vals); + } + + #[test] + fn test_bug2() { + let vals = &[340282366920938463463374607431768211455u128]; + let _data = test_aux_vals(vals); + } + + #[test] + fn test_bug3() { + let vals = &[340282366920938463463374607431768211454]; + let _data = test_aux_vals(vals); + } + + #[test] + fn test_bug4() { + let vals = &[340282366920938463463374607431768211455, 0]; + let _data = test_aux_vals(vals); + } + + #[test] + fn test_first_large_gaps() { + let vals = &[1_000_000_000u128; 100]; + let _data = test_aux_vals(vals); + } + use itertools::Itertools; + use proptest::prelude::*; + + fn num_strategy() -> impl Strategy { + prop_oneof![ + 1 => prop::num::u128::ANY.prop_map(|num| u128::MAX - (num % 10) ), + 1 => prop::num::u128::ANY.prop_map(|num| i64::MAX as u128 + 5 - (num % 10) ), + 1 => prop::num::u128::ANY.prop_map(|num| i128::MAX as u128 + 5 - (num % 10) ), + 1 => prop::num::u128::ANY.prop_map(|num| num % 10 ), + 20 => prop::num::u128::ANY, + ] + } + + proptest! { + #![proptest_config(ProptestConfig::with_cases(10))] + + #[test] + fn compress_decompress_random(vals in proptest::collection::vec(num_strategy() + , 1..1000)) { + let _data = test_aux_vals(&vals); + } + } +} diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index bf4a1ad34..88bc2953d 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -12,11 +12,13 @@ use std::io::Write; use std::sync::Arc; use common::BinarySerializable; +use compact_space::CompactSpaceDecompressor; use ownedbytes::OwnedBytes; use serialize::Header; mod bitpacked; mod blockwise_linear; +mod compact_space; mod line; mod linear; mod monotonic_mapping; @@ -30,8 +32,9 @@ use self::blockwise_linear::BlockwiseLinearCodec; pub use self::column::{monotonic_map_column, Column, VecColumn}; use self::linear::LinearCodec; pub use self::monotonic_mapping::MonotonicallyMappableToU64; -use self::serialize::NormalizedHeader; -pub use self::serialize::{estimate, serialize, serialize_and_load}; +pub use self::serialize::{ + estimate, serialize, serialize_and_load, serialize_u128, NormalizedHeader, +}; #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] #[repr(u8)] @@ -69,6 +72,11 @@ impl FastFieldCodecType { } } +/// Returns the correct codec reader wrapped in the `Arc` for the data. +pub fn open_u128(bytes: OwnedBytes) -> io::Result>> { + Ok(Arc::new(CompactSpaceDecompressor::open(bytes)?)) +} + /// Returns the correct codec reader wrapped in the `Arc` for the data. pub fn open( mut bytes: OwnedBytes, @@ -330,121 +338,3 @@ mod tests { assert_eq!(count_codec, 3); } } - -#[cfg(all(test, feature = "unstable"))] -mod bench { - use std::sync::Arc; - - use rand::prelude::*; - use test::{self, Bencher}; - - use crate::Column; - - // Warning: this generates the same permutation at each call - fn generate_permutation() -> Vec { - let mut permutation: Vec = (0u64..100_000u64).collect(); - permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); - permutation - } - - // Warning: this generates the same permutation at each call - fn generate_permutation_gcd() -> Vec { - let mut permutation: Vec = (1u64..100_000u64).map(|el| el * 1000).collect(); - permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); - permutation - } - - #[bench] - fn bench_intfastfield_jumpy_veclookup(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - b.iter(|| { - let mut a = 0u64; - for _ in 0..n { - a = permutation[a as usize]; - } - a - }); - } - - #[bench] - fn bench_intfastfield_jumpy_fflookup(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - let column: Arc> = crate::serialize_and_load(&permutation); - b.iter(|| { - let mut a = 0u64; - for _ in 0..n { - a = column.get_val(a as u64); - } - a - }); - } - - #[bench] - fn bench_intfastfield_stride7_vec(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - b.iter(|| { - let mut a = 0u64; - for i in (0..n / 7).map(|val| val * 7) { - a += permutation[i as usize]; - } - a - }); - } - - #[bench] - fn bench_intfastfield_stride7_fflookup(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - let column: Arc> = crate::serialize_and_load(&permutation); - b.iter(|| { - let mut a = 0u64; - for i in (0..n / 7).map(|val| val * 7) { - a += column.get_val(i as u64); - } - a - }); - } - - #[bench] - fn bench_intfastfield_scan_all_fflookup(b: &mut Bencher) { - let permutation = generate_permutation(); - let n = permutation.len(); - let column: Arc> = crate::serialize_and_load(&permutation); - b.iter(|| { - let mut a = 0u64; - for i in 0u64..n as u64 { - a += column.get_val(i); - } - a - }); - } - - #[bench] - fn bench_intfastfield_scan_all_fflookup_gcd(b: &mut Bencher) { - let permutation = generate_permutation_gcd(); - let n = permutation.len(); - let column: Arc> = crate::serialize_and_load(&permutation); - b.iter(|| { - let mut a = 0u64; - for i in 0..n as u64 { - a += column.get_val(i); - } - a - }); - } - - #[bench] - fn bench_intfastfield_scan_all_vec(b: &mut Bencher) { - let permutation = generate_permutation(); - b.iter(|| { - let mut a = 0u64; - for i in 0..permutation.len() { - a += permutation[i as usize] as u64; - } - a - }); - } -} diff --git a/fastfield_codecs/src/main.rs b/fastfield_codecs/src/main.rs index 082d2c4bc..d3d9c06f8 100644 --- a/fastfield_codecs/src/main.rs +++ b/fastfield_codecs/src/main.rs @@ -1,9 +1,130 @@ #[macro_use] extern crate prettytable; -use fastfield_codecs::{Column, FastFieldCodecType, VecColumn}; +use std::collections::HashSet; +use std::env; +use std::io::BufRead; +use std::net::{IpAddr, Ipv6Addr}; +use std::str::FromStr; + +use fastfield_codecs::{open_u128, serialize_u128, Column, FastFieldCodecType, VecColumn}; +use itertools::Itertools; +use measure_time::print_time; +use ownedbytes::OwnedBytes; use prettytable::{Cell, Row, Table}; +fn print_set_stats(ip_addrs: &[u128]) { + println!("NumIps\t{}", ip_addrs.len()); + let ip_addr_set: HashSet = ip_addrs.iter().cloned().collect(); + println!("NumUniqueIps\t{}", ip_addr_set.len()); + let ratio_unique = ip_addr_set.len() as f64 / ip_addrs.len() as f64; + println!("RatioUniqueOverTotal\t{ratio_unique:.4}"); + + // histogram + let mut ip_addrs = ip_addrs.to_vec(); + ip_addrs.sort(); + let mut cnts: Vec = ip_addrs + .into_iter() + .dedup_with_count() + .map(|(cnt, _)| cnt) + .collect(); + cnts.sort(); + + let top_256_cnt: usize = cnts.iter().rev().take(256).sum(); + let top_128_cnt: usize = cnts.iter().rev().take(128).sum(); + let top_64_cnt: usize = cnts.iter().rev().take(64).sum(); + let top_8_cnt: usize = cnts.iter().rev().take(8).sum(); + let total: usize = cnts.iter().sum(); + + println!("{}", total); + println!("{}", top_256_cnt); + println!("{}", top_128_cnt); + println!("Percentage Top8 {:02}", top_8_cnt as f32 / total as f32); + println!("Percentage Top64 {:02}", top_64_cnt as f32 / total as f32); + println!("Percentage Top128 {:02}", top_128_cnt as f32 / total as f32); + println!("Percentage Top256 {:02}", top_256_cnt as f32 / total as f32); + + let mut cnts: Vec<(usize, usize)> = cnts.into_iter().dedup_with_count().collect(); + cnts.sort_by(|a, b| { + if a.1 == b.1 { + a.0.cmp(&b.0) + } else { + b.1.cmp(&a.1) + } + }); +} + +fn ip_dataset() -> Vec { + let mut ip_addr_v4 = 0; + + let stdin = std::io::stdin(); + let ip_addrs: Vec = stdin + .lock() + .lines() + .flat_map(|line| { + let line = line.unwrap(); + let line = line.trim(); + let ip_addr = IpAddr::from_str(line.trim()).ok()?; + if ip_addr.is_ipv4() { + ip_addr_v4 += 1; + } + let ip_addr_v6: Ipv6Addr = match ip_addr { + IpAddr::V4(v4) => v4.to_ipv6_mapped(), + IpAddr::V6(v6) => v6, + }; + Some(ip_addr_v6) + }) + .map(|ip_v6| u128::from_be_bytes(ip_v6.octets())) + .collect(); + + println!("IpAddrsAny\t{}", ip_addrs.len()); + println!("IpAddrsV4\t{}", ip_addr_v4); + + ip_addrs +} + +fn bench_ip() { + let dataset = ip_dataset(); + print_set_stats(&dataset); + + // Chunks + { + let mut data = vec![]; + for dataset in dataset.chunks(500_000) { + serialize_u128(VecColumn::from(dataset), &mut data).unwrap(); + } + let compression = data.len() as f64 / (dataset.len() * 16) as f64; + println!("Compression 50_000 chunks {:.4}", compression); + println!( + "Num Bits per elem {:.2}", + (data.len() * 8) as f32 / dataset.len() as f32 + ); + } + + let mut data = vec![]; + serialize_u128(VecColumn::from(&dataset), &mut data).unwrap(); + + let compression = data.len() as f64 / (dataset.len() * 16) as f64; + println!("Compression {:.2}", compression); + println!( + "Num Bits per elem {:.2}", + (data.len() * 8) as f32 / dataset.len() as f32 + ); + + let decompressor = open_u128(OwnedBytes::new(data)).unwrap(); + // Sample some ranges + for value in dataset.iter().take(1110).skip(1100).cloned() { + print_time!("get range"); + let doc_values = decompressor.get_between_vals(value..=value); + println!("{:?}", doc_values.len()); + } +} + fn main() { + if env::args().nth(1).unwrap() == "bench_ip" { + bench_ip(); + return; + } + let mut table = Table::new(); // Add a row per time diff --git a/fastfield_codecs/src/serialize.rs b/fastfield_codecs/src/serialize.rs index 4753bb443..92f55f5d0 100644 --- a/fastfield_codecs/src/serialize.rs +++ b/fastfield_codecs/src/serialize.rs @@ -28,6 +28,7 @@ use ownedbytes::OwnedBytes; use crate::bitpacked::BitpackedCodec; use crate::blockwise_linear::BlockwiseLinearCodec; +use crate::compact_space::CompactSpaceCompressor; use crate::linear::LinearCodec; use crate::{ monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64, @@ -141,6 +142,19 @@ pub fn estimate( } } +pub fn serialize_u128( + typed_column: impl Column, + output: &mut impl io::Write, +) -> io::Result<()> { + // TODO write header, to later support more codecs + let compressor = CompactSpaceCompressor::train_from(&typed_column); + compressor + .compress_into(typed_column.iter(), output) + .unwrap(); + + Ok(()) +} + pub fn serialize( typed_column: impl Column, output: &mut impl io::Write, @@ -215,7 +229,7 @@ pub fn serialize_and_load( column: &[T], ) -> Arc> { let mut buffer = Vec::new(); - super::serialize(VecColumn::from(column), &mut buffer, &ALL_CODEC_TYPES).unwrap(); + super::serialize(VecColumn::from(&column), &mut buffer, &ALL_CODEC_TYPES).unwrap(); super::open(OwnedBytes::new(buffer)).unwrap() }