From c632fc014e9318f494cc1dbecbd136f2190bc65b Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 4 Sep 2022 22:57:24 +0900 Subject: [PATCH] Refactoring fast fields codecs. This removes the GCD part as a codec, and makes it so that fastfield codecs all share the same normalization part (shift + gcd). --- fastfield_codecs/benches/bench.rs | 66 ++++-- fastfield_codecs/src/bitpacked.rs | 113 ++-------- fastfield_codecs/src/blockwise_linear.rs | 86 +++----- fastfield_codecs/src/column.rs | 107 +++++++++ fastfield_codecs/src/gcd.rs | 41 +--- fastfield_codecs/src/lib.rs | 67 +++--- fastfield_codecs/src/linear.rs | 65 +++--- fastfield_codecs/src/main.rs | 2 +- fastfield_codecs/src/serialize.rs | 264 ++++++++++++++--------- src/fastfield/bytes/writer.rs | 27 ++- src/fastfield/mod.rs | 93 +++----- src/fastfield/multivalued/mod.rs | 1 + src/fastfield/multivalued/writer.rs | 216 +++++++++++++++---- src/fastfield/serializer/mod.rs | 36 +--- src/fastfield/writer.rs | 8 +- src/indexer/doc_id_mapping.rs | 6 + src/indexer/merger.rs | 16 +- src/indexer/segment_writer.rs | 4 +- 18 files changed, 678 insertions(+), 540 deletions(-) diff --git a/fastfield_codecs/benches/bench.rs b/fastfield_codecs/benches/bench.rs index b28b8af41..11f9fbfc6 100644 --- a/fastfield_codecs/benches/bench.rs +++ b/fastfield_codecs/benches/bench.rs @@ -4,14 +4,17 @@ extern crate test; #[cfg(test)] mod tests { + use std::sync::Arc; + use fastfield_codecs::bitpacked::BitpackedCodec; use fastfield_codecs::blockwise_linear::BlockwiseLinearCodec; use fastfield_codecs::linear::LinearCodec; use fastfield_codecs::*; fn get_data() -> Vec { + let mut rng = StdRng::seed_from_u64(2u64); let mut data: Vec<_> = (100..55000_u64) - .map(|num| num + rand::random::() as u64) + .map(|num| num + rng.gen::() as u64) .collect(); data.push(99_000); data.insert(1000, 2000); @@ -22,32 +25,59 @@ mod tests { data } + #[inline(never)] fn value_iter() -> impl Iterator { 0..20_000 } + fn get_reader_for_bench(data: &[u64]) -> Codec::Reader { + let mut bytes = Vec::new(); + let col = VecColumn::from(&data); + let normalized_header = fastfield_codecs::NormalizedHeader { + num_vals: col.num_vals(), + max_value: col.max_value(), + }; + Codec::serialize(&VecColumn::from(data), &mut bytes).unwrap(); + Codec::open_from_bytes(OwnedBytes::new(bytes), normalized_header).unwrap() + } fn bench_get(b: &mut Bencher, data: &[u64]) { - let mut bytes = vec![]; - Codec::serialize(&mut bytes, &VecColumn::from(data)).unwrap(); - let reader = Codec::open_from_bytes(OwnedBytes::new(bytes)).unwrap(); + let col = get_reader_for_bench::(data); b.iter(|| { let mut sum = 0u64; for pos in value_iter() { - let val = reader.get_val(pos as u64); - debug_assert_eq!(data[pos as usize], val); + let val = col.get_val(pos as u64); sum = sum.wrapping_add(val); } sum }); } + + #[inline(never)] + fn bench_get_dynamic_helper(b: &mut Bencher, col: Arc) { + b.iter(|| { + let mut sum = 0u64; + for pos in value_iter() { + let val = col.get_val(pos as u64); + sum = sum.wrapping_add(val); + } + sum + }); + } + + fn bench_get_dynamic(b: &mut Bencher, data: &[u64]) { + let col = Arc::new(get_reader_for_bench::(data)); + bench_get_dynamic_helper(b, col); + } fn bench_create(b: &mut Bencher, data: &[u64]) { let mut bytes = Vec::new(); b.iter(|| { bytes.clear(); - Codec::serialize(&mut bytes, &VecColumn::from(data)).unwrap(); + Codec::serialize(&VecColumn::from(data), &mut bytes).unwrap(); }); } use ownedbytes::OwnedBytes; + use rand::rngs::StdRng; + use rand::{Rng, SeedableRng}; use test::Bencher; #[bench] fn bench_fastfield_bitpack_create(b: &mut Bencher) { @@ -70,22 +100,28 @@ mod tests { bench_get::(b, &data); } #[bench] + fn bench_fastfield_bitpack_get_dynamic(b: &mut Bencher) { + let data: Vec<_> = get_data(); + bench_get_dynamic::(b, &data); + } + #[bench] fn bench_fastfield_linearinterpol_get(b: &mut Bencher) { let data: Vec<_> = get_data(); bench_get::(b, &data); } #[bench] + fn bench_fastfield_linearinterpol_get_dynamic(b: &mut Bencher) { + let data: Vec<_> = get_data(); + bench_get_dynamic::(b, &data); + } + #[bench] fn bench_fastfield_multilinearinterpol_get(b: &mut Bencher) { let data: Vec<_> = get_data(); bench_get::(b, &data); } - pub fn stats_from_vec(data: &[u64]) -> FastFieldStats { - let min_value = data.iter().cloned().min().unwrap_or(0); - let max_value = data.iter().cloned().max().unwrap_or(0); - FastFieldStats { - min_value, - max_value, - num_vals: data.len() as u64, - } + #[bench] + fn bench_fastfield_multilinearinterpol_get_dynamic(b: &mut Bencher) { + let data: Vec<_> = get_data(); + bench_get_dynamic::(b, &data); } } diff --git a/fastfield_codecs/src/bitpacked.rs b/fastfield_codecs/src/bitpacked.rs index 7b696777a..d57f78b79 100644 --- a/fastfield_codecs/src/bitpacked.rs +++ b/fastfield_codecs/src/bitpacked.rs @@ -1,9 +1,9 @@ use std::io::{self, Write}; -use common::BinarySerializable; use ownedbytes::OwnedBytes; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; +use crate::serialize::NormalizedHeader; use crate::{Column, FastFieldCodec, FastFieldCodecType}; /// Depending on the field type, a different @@ -12,80 +12,25 @@ use crate::{Column, FastFieldCodec, FastFieldCodecType}; pub struct BitpackedReader { data: OwnedBytes, bit_unpacker: BitUnpacker, - min_value_u64: u64, - max_value_u64: u64, - num_vals: u64, + normalized_header: NormalizedHeader, } impl Column for BitpackedReader { #[inline] fn get_val(&self, doc: u64) -> u64 { - self.min_value_u64 + self.bit_unpacker.get(doc, &self.data) + self.bit_unpacker.get(doc, &self.data) } #[inline] fn min_value(&self) -> u64 { - self.min_value_u64 + 0 } #[inline] fn max_value(&self) -> u64 { - self.max_value_u64 + self.normalized_header.max_value } #[inline] fn num_vals(&self) -> u64 { - self.num_vals - } -} -pub struct BitpackedSerializerLegacy<'a, W: 'a + Write> { - bit_packer: BitPacker, - write: &'a mut W, - min_value: u64, - num_vals: u64, - amplitude: u64, - num_bits: u8, -} - -impl<'a, W: Write> BitpackedSerializerLegacy<'a, W> { - /// Creates a new fast field serializer. - /// - /// The serializer in fact encode the values by bitpacking - /// `(val - min_value)`. - /// - /// It requires a `min_value` and a `max_value` to compute - /// compute the minimum number of bits required to encode - /// values. - pub fn open( - write: &'a mut W, - min_value: u64, - max_value: u64, - ) -> io::Result> { - assert!(min_value <= max_value); - let amplitude = max_value - min_value; - let num_bits = compute_num_bits(amplitude); - let bit_packer = BitPacker::new(); - Ok(BitpackedSerializerLegacy { - bit_packer, - write, - min_value, - num_vals: 0, - amplitude, - num_bits, - }) - } - /// Pushes a new value to the currently open u64 fast field. - #[inline] - pub fn add_val(&mut self, val: u64) -> io::Result<()> { - let val_to_write: u64 = val - self.min_value; - self.bit_packer - .write(val_to_write, self.num_bits, &mut self.write)?; - self.num_vals += 1; - Ok(()) - } - pub fn close_field(mut self) -> io::Result<()> { - self.bit_packer.close(&mut self.write)?; - self.min_value.serialize(&mut self.write)?; - self.amplitude.serialize(&mut self.write)?; - self.num_vals.serialize(&mut self.write)?; - Ok(()) + self.normalized_header.num_vals } } @@ -98,50 +43,34 @@ impl FastFieldCodec for BitpackedCodec { type Reader = BitpackedReader; /// Opens a fast field given a file. - fn open_from_bytes(bytes: OwnedBytes) -> io::Result { - let footer_offset = bytes.len() - 24; - let (data, mut footer) = bytes.split(footer_offset); - let min_value = u64::deserialize(&mut footer)?; - let amplitude = u64::deserialize(&mut footer)?; - let num_vals = u64::deserialize(&mut footer)?; - let max_value = min_value + amplitude; - let num_bits = compute_num_bits(amplitude); + fn open_from_bytes( + data: OwnedBytes, + normalized_header: NormalizedHeader, + ) -> io::Result { + let num_bits = compute_num_bits(normalized_header.max_value); let bit_unpacker = BitUnpacker::new(num_bits); Ok(BitpackedReader { data, bit_unpacker, - min_value_u64: min_value, - max_value_u64: max_value, - num_vals, + normalized_header, }) } /// Serializes data with the BitpackedFastFieldSerializer. /// - /// The serializer in fact encode the values by bitpacking - /// `(val - min_value)`. - /// - /// It requires a `min_value` and a `max_value` to compute - /// compute the minimum number of bits required to encode - /// values. - fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> { - let mut serializer = BitpackedSerializerLegacy::open( - write, - fastfield_accessor.min_value(), - fastfield_accessor.max_value(), - )?; - - for val in fastfield_accessor.iter() { - serializer.add_val(val)?; + /// Ideally, we made a shift upstream on the column so that `col.min_value() == 0`. + fn serialize(col: &dyn Column, write: &mut impl Write) -> io::Result<()> { + let num_bits = compute_num_bits(col.max_value()); + let mut bit_packer = BitPacker::new(); + for val in col.iter() { + bit_packer.write(val, num_bits, write)?; } - serializer.close_field()?; - + bit_packer.close(write)?; Ok(()) } - fn estimate(fastfield_accessor: &impl Column) -> Option { - let amplitude = fastfield_accessor.max_value() - fastfield_accessor.min_value(); - let num_bits = compute_num_bits(amplitude); + fn estimate(col: &impl Column) -> Option { + let num_bits = compute_num_bits(col.max_value()); let num_bits_uncompressed = 64; Some(num_bits as f32 / num_bits_uncompressed as f32) } diff --git a/fastfield_codecs/src/blockwise_linear.rs b/fastfield_codecs/src/blockwise_linear.rs index 32dcaf005..02b4124a2 100644 --- a/fastfield_codecs/src/blockwise_linear.rs +++ b/fastfield_codecs/src/blockwise_linear.rs @@ -1,11 +1,12 @@ -use std::io; use std::sync::Arc; +use std::{io, iter}; use common::{BinarySerializable, CountingWriter, DeserializeFrom}; use ownedbytes::OwnedBytes; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; use crate::line::Line; +use crate::serialize::NormalizedHeader; use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn}; const CHUNK_SIZE: usize = 512; @@ -35,45 +36,6 @@ impl BinarySerializable for Block { } } -#[derive(Debug)] -struct BlockwiseLinearParams { - num_vals: u64, - min_value: u64, - max_value: u64, - blocks: Vec, -} - -impl BinarySerializable for BlockwiseLinearParams { - fn serialize(&self, wrt: &mut W) -> io::Result<()> { - self.num_vals.serialize(wrt)?; - self.min_value.serialize(wrt)?; - self.max_value.serialize(wrt)?; - let expected_num_blocks = compute_num_blocks(self.num_vals); - assert_eq!(expected_num_blocks, self.blocks.len()); - for block in &self.blocks { - block.serialize(wrt)?; - } - Ok(()) - } - - fn deserialize(reader: &mut R) -> io::Result { - let num_vals = u64::deserialize(reader)?; - let min_value = u64::deserialize(reader)?; - let max_value = u64::deserialize(reader)?; - let num_blocks = compute_num_blocks(num_vals); - let mut blocks = Vec::with_capacity(num_blocks); - for _ in 0..num_blocks { - blocks.push(Block::deserialize(reader)?); - } - Ok(BlockwiseLinearParams { - num_vals, - min_value, - max_value, - blocks, - }) - } -} - fn compute_num_blocks(num_vals: u64) -> usize { (num_vals as usize + CHUNK_SIZE - 1) / CHUNK_SIZE } @@ -84,19 +46,27 @@ impl FastFieldCodec for BlockwiseLinearCodec { const CODEC_TYPE: crate::FastFieldCodecType = FastFieldCodecType::BlockwiseLinear; type Reader = BlockwiseLinearReader; - fn open_from_bytes(bytes: ownedbytes::OwnedBytes) -> io::Result { + fn open_from_bytes( + bytes: ownedbytes::OwnedBytes, + normalized_header: NormalizedHeader, + ) -> io::Result { let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?; let footer_offset = bytes.len() - 4 - footer_len as usize; let (data, mut footer) = bytes.split(footer_offset); - let mut params = BlockwiseLinearParams::deserialize(&mut footer)?; + let num_blocks = compute_num_blocks(normalized_header.num_vals); + let mut blocks: Vec = iter::repeat_with(|| Block::deserialize(&mut footer)) + .take(num_blocks) + .collect::>()?; + let mut start_offset = 0; - for block in params.blocks.iter_mut() { + for block in &mut blocks { block.data_start_offset = start_offset; start_offset += (block.bit_unpacker.bit_width() as usize) * CHUNK_SIZE / 8; } Ok(BlockwiseLinearReader { - params: Arc::new(params), + blocks: Arc::new(blocks), data, + normalized_header, }) } @@ -134,8 +104,8 @@ impl FastFieldCodec for BlockwiseLinearCodec { } fn serialize( - wrt: &mut impl io::Write, fastfield_accessor: &dyn crate::Column, + wrt: &mut impl io::Write, ) -> io::Result<()> { let mut buffer = Vec::with_capacity(CHUNK_SIZE); let num_vals = fastfield_accessor.num_vals(); @@ -171,20 +141,15 @@ impl FastFieldCodec for BlockwiseLinearCodec { }); } - let params = BlockwiseLinearParams { - num_vals, - min_value: fastfield_accessor.min_value(), - max_value: fastfield_accessor.max_value(), - blocks, - }; bit_packer.close(wrt)?; + assert_eq!(blocks.len(), compute_num_blocks(num_vals)); + let mut counting_wrt = CountingWriter::wrap(wrt); - - params.serialize(&mut counting_wrt)?; - + for block in &blocks { + block.serialize(&mut counting_wrt)?; + } let footer_len = counting_wrt.written_bytes(); - (footer_len as u32).serialize(&mut counting_wrt)?; Ok(()) @@ -193,7 +158,8 @@ impl FastFieldCodec for BlockwiseLinearCodec { #[derive(Clone)] pub struct BlockwiseLinearReader { - params: Arc, + blocks: Arc>, + normalized_header: NormalizedHeader, data: OwnedBytes, } @@ -202,7 +168,7 @@ impl Column for BlockwiseLinearReader { fn get_val(&self, idx: u64) -> u64 { let block_id = (idx / CHUNK_SIZE as u64) as usize; let idx_within_block = idx % (CHUNK_SIZE as u64); - let block = &self.params.blocks[block_id]; + let block = &self.blocks[block_id]; let interpoled_val: u64 = block.line.eval(idx_within_block); let block_bytes = &self.data[block.data_start_offset..]; let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes); @@ -210,14 +176,14 @@ impl Column for BlockwiseLinearReader { } fn min_value(&self) -> u64 { - self.params.min_value + 0u64 } fn max_value(&self) -> u64 { - self.params.max_value + self.normalized_header.max_value } fn num_vals(&self) -> u64 { - self.params.num_vals + self.normalized_header.num_vals } } diff --git a/fastfield_codecs/src/column.rs b/fastfield_codecs/src/column.rs index de81542ef..03bb91c36 100644 --- a/fastfield_codecs/src/column.rs +++ b/fastfield_codecs/src/column.rs @@ -1,4 +1,5 @@ use std::marker::PhantomData; +use std::sync::Mutex; use tantivy_bitpacker::minmax; @@ -59,6 +60,24 @@ pub struct VecColumn<'a, T = u64> { max_value: T, } +impl<'a, C: Column, T: Copy + PartialOrd> Column for &'a C { + fn get_val(&self, idx: u64) -> T { + (*self).get_val(idx) + } + + fn min_value(&self) -> T { + (*self).min_value() + } + + fn max_value(&self) -> T { + (*self).max_value() + } + + fn num_vals(&self) -> u64 { + (*self).num_vals() + } +} + impl<'a, T: Copy + PartialOrd> Column for VecColumn<'a, T> { fn get_val(&self, position: u64) -> T { self.values[position as usize] @@ -142,6 +161,87 @@ where } } +pub struct RemappedColumn { + column: C, + new_to_old_id_mapping: M, + min_max_cache: Mutex>, +} + +impl RemappedColumn +where + C: Column, + M: Column, + T: Copy + Ord + Default, +{ + fn min_max(&self) -> (T, T) { + if let Some((min, max)) = self.min_max_cache.lock().unwrap().clone() { + return (min, max); + } + let (min, max) = + tantivy_bitpacker::minmax(self.iter()).unwrap_or((T::default(), T::default())); + *self.min_max_cache.lock().unwrap() = Some((min, max)); + (min, max) + } +} + +pub struct IterColumn(T); + +impl From for IterColumn +where T: Iterator + Clone + ExactSizeIterator +{ + fn from(iter: T) -> Self { + IterColumn(iter) + } +} + +impl Column for IterColumn +where T: Iterator + Clone + ExactSizeIterator +{ + fn get_val(&self, idx: u64) -> T::Item { + self.0.clone().nth(idx as usize).unwrap() + } + + fn min_value(&self) -> T::Item { + self.0.clone().next().unwrap() + } + + fn max_value(&self) -> T::Item { + self.0.clone().last().unwrap() + } + + fn num_vals(&self) -> u64 { + self.0.len() as u64 + } + + fn iter<'a>(&'a self) -> Box + 'a> { + Box::new(self.0.clone()) + } +} + +impl Column for RemappedColumn +where + C: Column, + M: Column, + T: Copy + Ord + Default, +{ + fn get_val(&self, idx: u64) -> T { + let old_id = self.new_to_old_id_mapping.get_val(idx); + self.column.get_val(old_id as u64) + } + + fn min_value(&self) -> T { + self.min_max().0 + } + + fn max_value(&self) -> T { + self.min_max().1 + } + + fn num_vals(&self) -> u64 { + self.new_to_old_id_mapping.num_vals() as u64 + } +} + #[cfg(test)] mod tests { use super::*; @@ -158,4 +258,11 @@ mod tests { assert_eq!(mapped.get_val(0), 5); assert_eq!(mapped.get_val(1), 7); } + + #[test] + fn test_range_as_col() { + let col = IterColumn::from(10..100); + assert_eq!(col.num_vals(), 90); + assert_eq!(col.max_value(), 99); + } } diff --git a/fastfield_codecs/src/gcd.rs b/fastfield_codecs/src/gcd.rs index 7cec65faa..7917d7ca4 100644 --- a/fastfield_codecs/src/gcd.rs +++ b/fastfield_codecs/src/gcd.rs @@ -1,36 +1,7 @@ -use std::io::{self, Write}; use std::num::NonZeroU64; -use common::BinarySerializable; use fastdivide::DividerU64; -#[derive(Debug, Clone, Copy)] -pub struct GCDParams { - pub gcd: u64, - pub min_value: u64, - pub num_vals: u64, -} - -impl BinarySerializable for GCDParams { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - self.gcd.serialize(writer)?; - self.min_value.serialize(writer)?; - self.num_vals.serialize(writer)?; - Ok(()) - } - - fn deserialize(reader: &mut R) -> io::Result { - let gcd: u64 = u64::deserialize(reader)?; - let min_value: u64 = u64::deserialize(reader)?; - let num_vals: u64 = u64::deserialize(reader)?; - Ok(Self { - gcd, - min_value, - num_vals, - }) - } -} - /// Compute the gcd of two non null numbers. /// /// It is recommended, but not required, to feed values such that `large >= small`. @@ -85,11 +56,7 @@ mod tests { ) -> io::Result<()> { let mut vals: Vec = (-4..=(num_vals as i64) - 5).map(|val| val * 1000).collect(); let mut buffer: Vec = Vec::new(); - crate::serialize( - VecColumn::from(&vals), - &mut buffer, - &[codec_type, FastFieldCodecType::Gcd], - )?; + crate::serialize(VecColumn::from(&vals), &mut buffer, &[codec_type])?; let buffer = OwnedBytes::new(buffer); let column = crate::open::(buffer.clone())?; assert_eq!(column.get_val(0), -4000i64); @@ -131,11 +98,7 @@ mod tests { ) -> io::Result<()> { let mut vals: Vec = (1..=num_vals).map(|i| i as u64 * 1000u64).collect(); let mut buffer: Vec = Vec::new(); - crate::serialize( - VecColumn::from(&vals), - &mut buffer, - &[codec_type, FastFieldCodecType::Gcd], - )?; + crate::serialize(VecColumn::from(&vals), &mut buffer, &[codec_type])?; let buffer = OwnedBytes::new(buffer); let column = crate::open::(buffer.clone())?; assert_eq!(column.get_val(0), 1000u64); diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 92cf6961a..215acfd64 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -23,7 +23,7 @@ mod gcd; mod serialize; pub use self::column::{monotonic_map_column, Column, VecColumn}; -pub use self::serialize::{open, serialize, serialize_and_load}; +pub use self::serialize::{open, serialize, serialize_and_load, NormalizedHeader}; #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] #[repr(u8)] @@ -31,7 +31,6 @@ pub enum FastFieldCodecType { Bitpacked = 1, Linear = 2, BlockwiseLinear = 3, - Gcd = 4, } impl BinarySerializable for FastFieldCodecType { @@ -57,7 +56,6 @@ impl FastFieldCodecType { 1 => Some(Self::Bitpacked), 2 => Some(Self::Linear), 3 => Some(Self::BlockwiseLinear), - 4 => Some(Self::Gcd), _ => None, } } @@ -134,13 +132,13 @@ pub trait FastFieldCodec: 'static { type Reader: Column + 'static; /// Reads the metadata and returns the CodecReader - fn open_from_bytes(bytes: OwnedBytes) -> io::Result; + fn open_from_bytes(bytes: OwnedBytes, header: NormalizedHeader) -> io::Result; /// Serializes the data using the serializer into write. /// - /// The fastfield_accessor iterator should be preferred over using fastfield_accessor for + /// The column iterator should be preferred over using column `get_val` method for /// performance reasons. - fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()>; + fn serialize(column: &dyn Column, write: &mut impl Write) -> io::Result<()>; /// Returns an estimate of the compression ratio. /// If the codec is not applicable, returns `None`. @@ -149,13 +147,12 @@ pub trait FastFieldCodec: 'static { /// /// It could make sense to also return a value representing /// computational complexity. - fn estimate(fastfield_accessor: &impl Column) -> Option; + fn estimate(column: &impl Column) -> Option; } -pub const ALL_CODEC_TYPES: [FastFieldCodecType; 4] = [ +pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [ FastFieldCodecType::Bitpacked, FastFieldCodecType::BlockwiseLinear, - FastFieldCodecType::Gcd, FastFieldCodecType::Linear, ]; @@ -176,19 +173,24 @@ mod tests { use crate::bitpacked::BitpackedCodec; use crate::blockwise_linear::BlockwiseLinearCodec; use crate::linear::LinearCodec; + use crate::serialize::Header; - pub fn create_and_validate( + pub(crate) fn create_and_validate( data: &[u64], name: &str, ) -> Option<(f32, f32)> { - let estimation = Codec::estimate(&VecColumn::from(data))?; + let col = &VecColumn::from(data); + let header = Header::compute_header(&col, &[Codec::CODEC_TYPE])?; + let normalized_col = header.normalize_column(col); + let estimation = Codec::estimate(&normalized_col)?; - let mut out: Vec = Vec::new(); - Codec::serialize(&mut out, &VecColumn::from(data)).unwrap(); + let mut out = Vec::new(); + let col = VecColumn::from(data); + serialize(col, &mut out, &[Codec::CODEC_TYPE]).unwrap(); let actual_compression = out.len() as f32 / (data.len() as f32 * 8.0); - let reader = Codec::open_from_bytes(OwnedBytes::new(out)).unwrap(); + let reader = crate::open::(OwnedBytes::new(out)).unwrap(); assert_eq!(reader.num_vals(), data.len() as u64); for (doc, orig_val) in data.iter().copied().enumerate() { let val = reader.get_val(doc as u64); @@ -203,24 +205,42 @@ mod tests { proptest! { #![proptest_config(ProptestConfig::with_cases(100))] + #[test] - fn test_proptest_small(data in proptest::collection::vec(num_strategy(), 1..10)) { - create_and_validate::(&data, "proptest linearinterpol"); - create_and_validate::(&data, "proptest multilinearinterpol"); + fn test_proptest_small_bitpacked(data in proptest::collection::vec(num_strategy(), 1..10)) { create_and_validate::(&data, "proptest bitpacked"); } + + #[test] + fn test_proptest_small_linear(data in proptest::collection::vec(num_strategy(), 1..10)) { + create_and_validate::(&data, "proptest linearinterpol"); + } + + #[test] + fn test_proptest_small_blockwise_linear(data in proptest::collection::vec(num_strategy(), 1..10)) { + create_and_validate::(&data, "proptest multilinearinterpol"); + } } proptest! { #![proptest_config(ProptestConfig::with_cases(10))] + #[test] - fn test_proptest_large(data in proptest::collection::vec(num_strategy(), 1..6000)) { - create_and_validate::(&data, "proptest linearinterpol"); - create_and_validate::(&data, "proptest multilinearinterpol"); + fn test_proptest_large_bitpacked(data in proptest::collection::vec(num_strategy(), 1..6000)) { create_and_validate::(&data, "proptest bitpacked"); } + #[test] + fn test_proptest_large_linear(data in proptest::collection::vec(num_strategy(), 1..6000)) { + create_and_validate::(&data, "proptest linearinterpol"); + } + + #[test] + fn test_proptest_large_blockwise_linear(data in proptest::collection::vec(num_strategy(), 1..6000)) { + create_and_validate::(&data, "proptest multilinearinterpol"); + } } + fn num_strategy() -> impl Strategy { prop_oneof![ 1 => prop::num::u64::ANY.prop_map(|num| u64::MAX - (num % 10) ), @@ -307,11 +327,8 @@ mod tests { #[test] fn estimation_prefer_bitpacked() { - let data: &[u64] = &[10, 10, 10, 10]; - - let data: VecColumn = data.into(); + let data = VecColumn::from(&[10, 10, 10, 10]); let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap(); - let bitpacked_estimation = BitpackedCodec::estimate(&data).unwrap(); assert_lt!(bitpacked_estimation, linear_interpol_estimation); } @@ -341,7 +358,7 @@ mod tests { count_codec += 1; } } - assert_eq!(count_codec, 4); + assert_eq!(count_codec, 3); } } diff --git a/fastfield_codecs/src/linear.rs b/fastfield_codecs/src/linear.rs index dc411b768..5cade79b3 100644 --- a/fastfield_codecs/src/linear.rs +++ b/fastfield_codecs/src/linear.rs @@ -1,10 +1,11 @@ use std::io::{self, Write}; -use common::{BinarySerializable, CountingWriter, DeserializeFrom}; +use common::BinarySerializable; use ownedbytes::OwnedBytes; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; use crate::line::Line; +use crate::serialize::NormalizedHeader; use crate::{Column, FastFieldCodec, FastFieldCodecType}; /// Depending on the field type, a different @@ -12,28 +13,31 @@ use crate::{Column, FastFieldCodec, FastFieldCodecType}; #[derive(Clone)] pub struct LinearReader { data: OwnedBytes, - footer: LinearParams, + linear_params: LinearParams, + header: NormalizedHeader, } impl Column for LinearReader { #[inline] fn get_val(&self, doc: u64) -> u64 { - let interpoled_val: u64 = self.footer.line.eval(doc); - let bitpacked_diff = self.footer.bit_unpacker.get(doc, &self.data); + let interpoled_val: u64 = self.linear_params.line.eval(doc); + let bitpacked_diff = self.linear_params.bit_unpacker.get(doc, &self.data); interpoled_val.wrapping_add(bitpacked_diff) } #[inline] fn min_value(&self) -> u64 { - self.footer.min_value + 0u64 } + #[inline] fn max_value(&self) -> u64 { - self.footer.max_value + self.header.max_value } + #[inline] fn num_vals(&self) -> u64 { - self.footer.num_vals + self.header.num_vals } } @@ -43,33 +47,21 @@ pub struct LinearCodec; #[derive(Debug, Clone)] struct LinearParams { - num_vals: u64, - min_value: u64, - max_value: u64, line: Line, bit_unpacker: BitUnpacker, } impl BinarySerializable for LinearParams { fn serialize(&self, writer: &mut W) -> io::Result<()> { - self.num_vals.serialize(writer)?; - self.min_value.serialize(writer)?; - self.max_value.serialize(writer)?; self.line.serialize(writer)?; self.bit_unpacker.bit_width().serialize(writer)?; Ok(()) } fn deserialize(reader: &mut R) -> io::Result { - let num_vals = u64::deserialize(reader)?; - let min_value = u64::deserialize(reader)?; - let max_value = u64::deserialize(reader)?; let line = Line::deserialize(reader)?; let bit_width = u8::deserialize(reader)?; Ok(Self { - num_vals, - min_value, - max_value, line, bit_unpacker: BitUnpacker::new(bit_width), }) @@ -82,16 +74,17 @@ impl FastFieldCodec for LinearCodec { type Reader = LinearReader; /// Opens a fast field given a file. - fn open_from_bytes(bytes: OwnedBytes) -> io::Result { - let footer_len: u32 = (&bytes[bytes.len() - 4..]).deserialize()?; - let footer_offset = bytes.len() - 4 - footer_len as usize; - let (data, mut footer) = bytes.split(footer_offset); - let footer = LinearParams::deserialize(&mut footer)?; - Ok(LinearReader { data, footer }) + fn open_from_bytes(mut data: OwnedBytes, header: NormalizedHeader) -> io::Result { + let linear_params = LinearParams::deserialize(&mut data)?; + Ok(LinearReader { + data, + linear_params, + header, + }) } /// Creates a new fast field serializer. - fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> { + fn serialize(fastfield_accessor: &dyn Column, write: &mut impl Write) -> io::Result<()> { assert!(fastfield_accessor.min_value() <= fastfield_accessor.max_value()); let line = Line::train(fastfield_accessor); @@ -106,6 +99,12 @@ impl FastFieldCodec for LinearCodec { .unwrap(); let num_bits = compute_num_bits(max_offset_from_line); + let linear_params = LinearParams { + line, + bit_unpacker: BitUnpacker::new(num_bits), + }; + linear_params.serialize(write)?; + let mut bit_packer = BitPacker::new(); for (pos, actual_value) in fastfield_accessor.iter().enumerate() { let calculated_value = line.eval(pos as u64); @@ -114,19 +113,6 @@ impl FastFieldCodec for LinearCodec { } bit_packer.close(write)?; - let footer = LinearParams { - num_vals: fastfield_accessor.num_vals(), - min_value: fastfield_accessor.min_value(), - max_value: fastfield_accessor.max_value(), - line, - bit_unpacker: BitUnpacker::new(num_bits), - }; - - let mut counting_wrt = CountingWriter::wrap(write); - footer.serialize(&mut counting_wrt)?; - let footer_len = counting_wrt.written_bytes(); - (footer_len as u32).serialize(&mut counting_wrt)?; - Ok(()) } @@ -225,7 +211,6 @@ mod tests { #[test] fn linear_interpol_fast_field_test_simple() { let data = (10..=20_u64).collect::>(); - create_and_validate(&data, "simple monotonically"); } diff --git a/fastfield_codecs/src/main.rs b/fastfield_codecs/src/main.rs index cbe5b3198..2b9ff969b 100644 --- a/fastfield_codecs/src/main.rs +++ b/fastfield_codecs/src/main.rs @@ -113,7 +113,7 @@ pub fn serialize_with_codec( let data = Data(data); let estimation = C::estimate(&data)?; let mut out = Vec::new(); - C::serialize(&mut out, &data).unwrap(); + C::serialize(&data, &mut out).unwrap(); let actual_compression = out.len() as f32 / (data.num_vals() * 8) as f32; Some((estimation, actual_compression, C::CODEC_TYPE)) } diff --git a/fastfield_codecs/src/serialize.rs b/fastfield_codecs/src/serialize.rs index 0deb36150..ff117f987 100644 --- a/fastfield_codecs/src/serialize.rs +++ b/fastfield_codecs/src/serialize.rs @@ -21,14 +21,13 @@ use std::io; use std::num::NonZeroU64; use std::sync::Arc; -use common::BinarySerializable; +use common::{BinarySerializable, VInt}; use fastdivide::DividerU64; use log::warn; use ownedbytes::OwnedBytes; use crate::bitpacked::BitpackedCodec; use crate::blockwise_linear::BlockwiseLinearCodec; -use crate::gcd::{find_gcd, GCDParams}; use crate::linear::LinearCodec; use crate::{ monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64, @@ -46,125 +45,149 @@ fn codec_estimation( } } -fn write_header(codec_type: FastFieldCodecType, output: &mut W) -> io::Result<()> { - codec_type.to_code().serialize(output)?; - Ok(()) +#[derive(Debug, Copy, Clone)] +pub struct NormalizedHeader { + pub num_vals: u64, + pub max_value: u64, } -fn gcd_params(column: &impl Column) -> Option { - let min_value = column.min_value(); - let gcd = find_gcd(column.iter().map(|val| val - min_value)).map(NonZeroU64::get)?; - if gcd == 1 { - return None; +#[derive(Debug, Copy, Clone)] +pub(crate) struct Header { + num_vals: u64, + min_value: u64, + max_value: u64, + gcd: Option, + codec_type: FastFieldCodecType, +} + +impl Header { + pub fn normalized(self) -> NormalizedHeader { + let max_value = + (self.max_value - self.min_value) / self.gcd.map(|gcd| gcd.get()).unwrap_or(1); + NormalizedHeader { + num_vals: self.num_vals, + max_value, + } + } + + pub fn normalize_column(&self, from_column: C) -> impl Column { + let min_value = self.min_value; + let gcd = self.gcd.map(|gcd| gcd.get()).unwrap_or(1); + let divider = DividerU64::divide_by(gcd); + monotonic_map_column(from_column, move |val| divider.divide(val - min_value)) + } + + pub fn compute_header( + column: impl Column, + codecs: &[FastFieldCodecType], + ) -> Option
{ + let num_vals = column.num_vals(); + let min_value = column.min_value(); + let max_value = column.max_value(); + let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value)) + .filter(|gcd| gcd.get() > 1u64); + let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64)); + let shifted_column = monotonic_map_column(&column, |val| divider.divide(val - min_value)); + let codec_type = detect_codec(shifted_column, codecs)?; + Some(Header { + num_vals, + min_value, + max_value, + gcd, + codec_type, + }) + } +} + +impl BinarySerializable for Header { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + VInt(self.num_vals).serialize(writer)?; + VInt(self.min_value).serialize(writer)?; + VInt(self.max_value - self.min_value).serialize(writer)?; + if let Some(gcd) = self.gcd { + VInt(gcd.get()).serialize(writer)?; + } else { + VInt(0u64).serialize(writer)?; + } + self.codec_type.serialize(writer)?; + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let num_vals = VInt::deserialize(reader)?.0; + let min_value = VInt::deserialize(reader)?.0; + let amplitude = VInt::deserialize(reader)?.0; + let max_value = min_value + amplitude; + let gcd_u64 = VInt::deserialize(reader)?.0; + let codec_type = FastFieldCodecType::deserialize(reader)?; + Ok(Header { + num_vals, + min_value, + max_value, + gcd: NonZeroU64::new(gcd_u64), + codec_type, + }) } - Some(GCDParams { - gcd, - min_value, - num_vals: column.num_vals(), - }) } /// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. pub fn open( mut bytes: OwnedBytes, ) -> io::Result>> { - let codec_type = FastFieldCodecType::deserialize(&mut bytes)?; - open_from_id(bytes, codec_type) -} - -fn open_codec_from_bytes( - bytes: OwnedBytes, -) -> io::Result>> { - let reader = C::open_from_bytes(bytes)?; - Ok(Arc::new(monotonic_map_column(reader, Item::from_u64))) -} - -pub fn open_gcd_from_bytes( - bytes: OwnedBytes, -) -> io::Result { - let footer_offset = bytes.len() - 24; - let (body, mut footer) = bytes.split(footer_offset); - let gcd_params = GCDParams::deserialize(&mut footer)?; - let gcd_remap = move |val: u64| gcd_params.min_value + gcd_params.gcd * val; - let reader: WrappedCodec::Reader = WrappedCodec::open_from_bytes(body)?; - Ok(monotonic_map_column(reader, gcd_remap)) -} - -fn open_codec_with_gcd( - bytes: OwnedBytes, -) -> io::Result>> { - let reader = open_gcd_from_bytes::(bytes)?; - Ok(Arc::new(monotonic_map_column(reader, Item::from_u64))) -} - -/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. -fn open_from_id( - mut bytes: OwnedBytes, - codec_type: FastFieldCodecType, -) -> io::Result>> { - match codec_type { - FastFieldCodecType::Bitpacked => open_codec_from_bytes::(bytes), - FastFieldCodecType::Linear => open_codec_from_bytes::(bytes), + let header = Header::deserialize(&mut bytes)?; + match header.codec_type { + FastFieldCodecType::Bitpacked => open_specific_codec::(bytes, &header), + FastFieldCodecType::Linear => open_specific_codec::(bytes, &header), FastFieldCodecType::BlockwiseLinear => { - open_codec_from_bytes::(bytes) - } - FastFieldCodecType::Gcd => { - let codec_type = FastFieldCodecType::deserialize(&mut bytes)?; - match codec_type { - FastFieldCodecType::Bitpacked => open_codec_with_gcd::(bytes), - FastFieldCodecType::Linear => open_codec_with_gcd::(bytes), - FastFieldCodecType::BlockwiseLinear => { - open_codec_with_gcd::(bytes) - } - FastFieldCodecType::Gcd => Err(io::Error::new( - io::ErrorKind::InvalidData, - "Gcd codec wrapped into another gcd codec. This combination is not allowed.", - )), - } + open_specific_codec::(bytes, &header) } } } +fn open_specific_codec( + bytes: OwnedBytes, + header: &Header, +) -> io::Result>> { + let normalized_header = header.normalized(); + let reader = C::open_from_bytes(bytes, normalized_header)?; + let min_value = header.min_value; + if let Some(gcd) = header.gcd { + let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val * gcd.get()); + Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping))) + } else { + let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val); + Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping))) + } +} + pub fn serialize( typed_column: impl Column, output: &mut impl io::Write, codecs: &[FastFieldCodecType], ) -> io::Result<()> { let column = monotonic_map_column(typed_column, T::to_u64); - let gcd_params_opt = if codecs.contains(&FastFieldCodecType::Gcd) { - gcd_params(&column) - } else { - None - }; - - let gcd_params = if let Some(gcd_params) = gcd_params_opt { - gcd_params - } else { - return serialize_without_gcd(column, output, codecs); - }; - - write_header(FastFieldCodecType::Gcd, output)?; - let base_value = column.min_value(); - let gcd_divider = DividerU64::divide_by(gcd_params.gcd); - let divided_fastfield_accessor = - monotonic_map_column(column, |val: u64| gcd_divider.divide(val - base_value)); - - serialize_without_gcd(divided_fastfield_accessor, output, codecs)?; - - gcd_params.serialize(output)?; + let header = Header::compute_header(&column, codecs).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "Data cannot be serialized with this list of codec. {:?}", + codecs + ), + ) + })?; + header.serialize(output)?; + let normalized_column = header.normalize_column(column); + assert_eq!(normalized_column.min_value(), 0u64); + serialize_given_codec(normalized_column, header.codec_type, output)?; Ok(()) } -fn serialize_without_gcd( +fn detect_codec( column: impl Column, - output: &mut impl io::Write, codecs: &[FastFieldCodecType], -) -> io::Result<()> { +) -> Option { let mut estimations = Vec::new(); for &codec in codecs { - if codec == FastFieldCodecType::Gcd { - continue; - } match codec { FastFieldCodecType::Bitpacked => { codec_estimation::(&column, &mut estimations); @@ -175,7 +198,6 @@ fn serialize_without_gcd( FastFieldCodecType::BlockwiseLinear => { codec_estimation::(&column, &mut estimations); } - FastFieldCodecType::Gcd => {} } } if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan()) { @@ -187,25 +209,25 @@ fn serialize_without_gcd( // removing nan values for codecs with broken calculations, and max values which disables // codecs estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX); - estimations.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); - let (_ratio, codec_type) = estimations[0]; + estimations + .sort_by(|(score_left, _), (score_right, _)| score_left.partial_cmp(&score_right).unwrap()); + Some(estimations.first()?.1) +} - write_header(codec_type, output)?; +fn serialize_given_codec( + column: impl Column, + codec_type: FastFieldCodecType, + output: &mut impl io::Write, +) -> io::Result<()> { match codec_type { FastFieldCodecType::Bitpacked => { - BitpackedCodec::serialize(output, &column)?; + BitpackedCodec::serialize(&column, output)?; } FastFieldCodecType::Linear => { - LinearCodec::serialize(output, &column)?; + LinearCodec::serialize(&column, output)?; } FastFieldCodecType::BlockwiseLinear => { - BlockwiseLinearCodec::serialize(output, &column)?; - } - FastFieldCodecType::Gcd => { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "GCD codec not supported.", - )); + BlockwiseLinearCodec::serialize(&column, output)?; } } output.flush()?; @@ -230,4 +252,32 @@ mod tests { let restored: Vec = serialize_and_load(&original[..]).iter().collect(); assert_eq!(&restored, &original[..]); } + + #[test] + fn test_fastfield_bool_size_bitwidth_1() { + let mut buffer = Vec::new(); + let col = VecColumn::from(&[false, true][..]); + serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap(); + // 5 bytes of header, 1 byte of value, 7 bytes of padding. + assert_eq!(buffer.len(), 5 + 8); + } + + #[test] + fn test_fastfield_bool_bit_size_bitwidth_0() { + let mut buffer = Vec::new(); + let col = VecColumn::from(&[true][..]); + serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap(); + // 5 bytes of header, 0 bytes of value, 7 bytes of padding. + assert_eq!(buffer.len(), 5 + 7); + } + + #[test] + fn test_fastfield_gcd() { + let mut buffer = Vec::new(); + let vals: Vec = (0..80).map(|val| (val % 7) * 1_000u64).collect(); + let col = VecColumn::from(&vals[..]); + serialize(col, &mut buffer, &[FastFieldCodecType::Bitpacked]).unwrap(); + // Values are stored over 3 bits. + assert_eq!(buffer.len(), 7 + (3 * 80 / 8) + 7); + } } diff --git a/src/fastfield/bytes/writer.rs b/src/fastfield/bytes/writer.rs index 2a98c1c08..7aee8ddb6 100644 --- a/src/fastfield/bytes/writer.rs +++ b/src/fastfield/bytes/writer.rs @@ -1,6 +1,9 @@ use std::io; +use fastfield_codecs::VecColumn; + use crate::fastfield::serializer::CompositeFastFieldSerializer; +use crate::fastfield::MultivalueStartIndex; use crate::indexer::doc_id_mapping::DocIdMapping; use crate::schema::{Document, Field, Value}; use crate::DocId; @@ -104,20 +107,26 @@ impl BytesFastFieldWriter { /// Serializes the fast field values by pushing them to the `FastFieldSerializer`. pub fn serialize( - &self, + mut self, serializer: &mut CompositeFastFieldSerializer, doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()> { // writing the offset index - let mut doc_index_serializer = - serializer.new_u64_fast_field_with_idx(self.field, 0, self.vals.len() as u64, 0)?; - let mut offset = 0; - for vals in self.get_ordered_values(doc_id_map) { - doc_index_serializer.add_val(offset)?; - offset += vals.len() as u64; + // TODO FIXME No need to double the memory. + { + self.doc_index.push(self.vals.len() as u64); + let col = VecColumn::from(&self.doc_index[..]); + if let Some(doc_id_map) = doc_id_map { + let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map); + serializer.create_auto_detect_u64_fast_field_with_idx( + self.field, + multi_value_start_index, + 0, + )?; + } else { + serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 0)?; + } } - doc_index_serializer.add_val(self.vals.len() as u64)?; - doc_index_serializer.close_field()?; // writing the values themselves let mut value_serializer = serializer.new_bytes_fast_field_with_idx(self.field, 1); // the else could be removed, but this is faster (difference not benchmarked) diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index be4713c7b..1513f9937 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -26,6 +26,7 @@ pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveB pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::facet_reader::FacetReader; +pub(crate) use self::multivalued::MultivalueStartIndex; pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; pub use self::readers::FastFieldReaders; pub(crate) use self::readers::{type_and_cardinality, FastType}; @@ -197,19 +198,18 @@ mod tests { use std::ops::Range; use std::path::Path; use std::sync::Arc; - use std::time::{Duration, SystemTime}; use common::HasLen; use fastfield_codecs::{open, FastFieldCodecType}; use once_cell::sync::Lazy; use rand::prelude::SliceRandom; use rand::rngs::StdRng; - use rand::SeedableRng; + use rand::{Rng, SeedableRng}; use super::*; use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr}; use crate::merge_policy::NoMergePolicy; - use crate::schema::{Document, Field, Schema, FAST, STRING, TEXT}; + use crate::schema::{Document, Field, Schema, SchemaBuilder, FAST, STRING, TEXT}; use crate::time::OffsetDateTime; use crate::{DateOptions, DatePrecision, Index, SegmentId, SegmentReader}; @@ -251,7 +251,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 45); + assert_eq!(file.len(), 25); let composite_file = CompositeFile::open(&file)?; let fast_field_bytes = composite_file.open_read(*FIELD).unwrap().read_bytes()?; let fast_field_reader = open::(fast_field_bytes)?; @@ -282,7 +282,7 @@ mod tests { serializer.close()?; } let file = directory.open_read(path)?; - assert_eq!(file.len(), 70); + assert_eq!(file.len(), 53); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite @@ -321,7 +321,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 43); + assert_eq!(file.len(), 26); { let fast_fields_composite = CompositeFile::open(&file).unwrap(); let data = fast_fields_composite @@ -356,7 +356,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 80051); + assert_eq!(file.len(), 80040); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite @@ -398,12 +398,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - // assert_eq!(file.len(), 17710 as usize); //bitpacked size - // assert_eq!(file.len(), 10175_usize); // linear interpol size - // assert_eq!(file.len(), 75_usize); // linear interpol size after calc improvement - // assert_eq!(file.len(), 1325_usize); // linear interpol size after switching to int based - assert_eq!(file.len(), 62_usize); // linear interpol size after switching to int based, off - // by one fix + assert_eq!(file.len(), 40_usize); { let fast_fields_composite = CompositeFile::open(&file)?; @@ -843,7 +838,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 44); + assert_eq!(file.len(), 24); let composite_file = CompositeFile::open(&file)?; let data = composite_file.open_read(field).unwrap().read_bytes()?; let fast_field_reader = open::(data)?; @@ -879,7 +874,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 56); + assert_eq!(file.len(), 36); let composite_file = CompositeFile::open(&file)?; let data = composite_file.open_read(field).unwrap().read_bytes()?; let fast_field_reader = open::(data)?; @@ -897,24 +892,21 @@ mod tests { let directory: RamDirectory = RamDirectory::create(); let mut schema_builder = Schema::builder(); - schema_builder.add_bool_field("field_bool", FAST); + let field = schema_builder.add_bool_field("field_bool", FAST); let schema = schema_builder.build(); - let field = schema.get_field("field_bool").unwrap(); { let write: WritePtr = directory.open_write(path).unwrap(); - let mut serializer = CompositeFastFieldSerializer::from_write(write).unwrap(); + let mut serializer = CompositeFastFieldSerializer::from_write(write)?; let mut fast_field_writers = FastFieldsWriter::from_schema(&schema); let doc = Document::default(); fast_field_writers.add_document(&doc); - fast_field_writers - .serialize(&mut serializer, &HashMap::new(), None) - .unwrap(); - serializer.close().unwrap(); + fast_field_writers.serialize(&mut serializer, &HashMap::new(), None)?; + serializer.close()?; } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 43); let composite_file = CompositeFile::open(&file)?; + assert_eq!(file.len(), 23); let data = composite_file.open_read(field).unwrap().read_bytes()?; let fast_field_reader = open::(data)?; assert_eq!(fast_field_reader.get_val(0), false); @@ -948,15 +940,10 @@ mod tests { pub fn test_gcd_date() -> crate::Result<()> { let size_prec_sec = test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Seconds)?; + assert_eq!(size_prec_sec, 28 + (1_000 * 13) / 8); // 13 bits per val = ceil(log_2(number of seconds in 2hours); let size_prec_micro = test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Microseconds)?; - assert!(size_prec_sec < size_prec_micro); - - let size_prec_sec = - test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Seconds)?; - let size_prec_micro = - test_gcd_date_with_codec(FastFieldCodecType::Linear, DatePrecision::Microseconds)?; - assert!(size_prec_sec < size_prec_micro); + assert_eq!(size_prec_micro, 26 + (1_000 * 33) / 8); // 33 bits per val = ceil(log_2(number of microsecsseconds in 2hours); Ok(()) } @@ -964,40 +951,26 @@ mod tests { codec_type: FastFieldCodecType, precision: DatePrecision, ) -> crate::Result { - let time1 = DateTime::from_timestamp_micros( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - ); - let time2 = DateTime::from_timestamp_micros( - SystemTime::now() - .checked_sub(Duration::from_micros(4111)) - .unwrap() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - ); - - let time3 = DateTime::from_timestamp_micros( - SystemTime::now() - .checked_sub(Duration::from_millis(2000)) - .unwrap() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - ); - - let mut schema_builder = Schema::builder(); + let mut rng = StdRng::seed_from_u64(2u64); + const T0: i64 = 1_662_345_825_012_529i64; + const ONE_HOUR_IN_MICROSECS: i64 = 3_600 * 1_000_000; + let times: Vec = std::iter::repeat_with(|| { + // +- One hour. + let t = T0 + rng.gen_range(-ONE_HOUR_IN_MICROSECS..ONE_HOUR_IN_MICROSECS); + DateTime::from_timestamp_micros(t) + }) + .take(1_000) + .collect(); let date_options = DateOptions::default() .set_fast(Cardinality::SingleValue) .set_precision(precision); + let mut schema_builder = SchemaBuilder::default(); let field = schema_builder.add_date_field("field", date_options); let schema = schema_builder.build(); - let docs = vec![doc!(field=>time1), doc!(field=>time2), doc!(field=>time3)]; + let docs: Vec = times.iter().map(|time| doc!(field=>*time)).collect(); - let directory = get_index(&docs, &schema, &[codec_type])?; + let directory = get_index(&docs[..], &schema, &[codec_type])?; let path = Path::new("test"); let file = directory.open_read(path).unwrap(); let composite_file = CompositeFile::open(&file)?; @@ -1005,9 +978,9 @@ mod tests { let len = file.len(); let test_fastfield = open::(file.read_bytes()?)?; - assert_eq!(test_fastfield.get_val(0), time1.truncate(precision)); - assert_eq!(test_fastfield.get_val(1), time2.truncate(precision)); - assert_eq!(test_fastfield.get_val(2), time3.truncate(precision)); + for (i, time) in times.iter().enumerate() { + assert_eq!(test_fastfield.get_val(i as u64), time.truncate(precision)); + } Ok(len) } } diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index d4b54a111..241a6f7ed 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -3,6 +3,7 @@ mod writer; pub use self::reader::MultiValuedFastFieldReader; pub use self::writer::MultiValuedFastFieldWriter; +pub(crate) use self::writer::MultivalueStartIndex; #[cfg(test)] mod tests { diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index a185ee90b..da0284e3a 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -1,10 +1,9 @@ use std::io; +use std::sync::Mutex; -use fastfield_codecs::MonotonicallyMappableToU64; +use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn}; use fnv::FnvHashMap; -use tantivy_bitpacker::minmax; -use crate::fastfield::serializer::BitpackedSerializerLegacy; use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType}; use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; @@ -102,16 +101,6 @@ impl MultiValuedFastFieldWriter { } } - /// Register all of the values associated to a document. - /// - /// The method returns the `DocId` of the document that was - /// just written. - pub fn add_document_vals(&mut self, vals: &[UnorderedTermId]) -> DocId { - let doc = self.doc_index.len() as DocId; - self.next_doc(); - self.vals.extend_from_slice(vals); - doc - } /// Returns an iterator over values per doc_id in ascending doc_id order. /// /// Normally the order is simply iterating self.doc_id_index. @@ -151,39 +140,34 @@ impl MultiValuedFastFieldWriter { /// `tantivy` builds a mapping to convert this `UnorderedTermId` into /// term ordinals. pub fn serialize( - &self, + mut self, serializer: &mut CompositeFastFieldSerializer, mapping_opt: Option<&FnvHashMap>, doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()> { { - // writing the offset index - let mut doc_index_serializer = - serializer.new_u64_fast_field_with_idx(self.field, 0, self.vals.len() as u64, 0)?; - - let mut offset = 0; - for vals in self.get_ordered_values(doc_id_map) { - doc_index_serializer.add_val(offset)?; - offset += vals.len() as u64; + self.doc_index.push(self.vals.len() as u64); + let col = VecColumn::from(&self.doc_index[..]); + if let Some(doc_id_map) = doc_id_map { + let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map); + serializer.create_auto_detect_u64_fast_field_with_idx( + self.field, + multi_value_start_index, + 0, + )?; + } else { + serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 0)?; } - doc_index_serializer.add_val(self.vals.len() as u64)?; - - doc_index_serializer.close_field()?; } { - // writing the values themselves. - let mut value_serializer: BitpackedSerializerLegacy<'_, _>; + // Writing the values themselves. + // TODO FIXME: Use less memory. + let mut values: Vec = Vec::new(); if let Some(mapping) = mapping_opt { - value_serializer = serializer.new_u64_fast_field_with_idx( - self.field, - 0u64, - mapping.len() as u64, - 1, - )?; - if self.fast_field_type.is_facet() { let mut doc_vals: Vec = Vec::with_capacity(100); for vals in self.get_ordered_values(doc_id_map) { + // In the case of facets, we want a vec of facet ord that is sorted. doc_vals.clear(); let remapped_vals = vals .iter() @@ -191,7 +175,7 @@ impl MultiValuedFastFieldWriter { doc_vals.extend(remapped_vals); doc_vals.sort_unstable(); for &val in &doc_vals { - value_serializer.add_val(val)?; + values.push(val); } } } else { @@ -200,24 +184,172 @@ impl MultiValuedFastFieldWriter { .iter() .map(|val| *mapping.get(val).expect("Missing term ordinal")); for val in remapped_vals { - value_serializer.add_val(val)?; + values.push(val); } } } } else { - let val_min_max = minmax(self.vals.iter().cloned()); - let (val_min, val_max) = val_min_max.unwrap_or((0u64, 0u64)); - value_serializer = - serializer.new_u64_fast_field_with_idx(self.field, val_min, val_max, 1)?; for vals in self.get_ordered_values(doc_id_map) { // sort values in case of remapped doc_ids? for &val in vals { - value_serializer.add_val(val)?; + values.push(val); } } } - value_serializer.close_field()?; + let col = VecColumn::from(&values[..]); + serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 1)?; } Ok(()) } } + +pub(crate) struct MultivalueStartIndex<'a, C: Column> { + column: &'a C, + doc_id_map: &'a DocIdMapping, + min_max_opt: Mutex>, + random_seeker: Mutex>, +} + +struct MultivalueStartIndexRandomSeeker<'a, C: Column> { + seek_head: MultivalueStartIndexIter<'a, C>, + seek_next_id: u64, +} + +impl<'a, C: Column> MultivalueStartIndex<'a, C> { + pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self { + assert_eq!(column.num_vals(), doc_id_map.num_old_doc_ids() as u64 + 1); + MultivalueStartIndex { + column, + doc_id_map, + min_max_opt: Mutex::default(), + random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker { + seek_head: MultivalueStartIndexIter { + column, + doc_id_map, + new_doc_id: 0, + offset: 0u64, + }, + seek_next_id: 0u64, + }), + } + } + + fn minmax(&self) -> (u64, u64) { + if let Some((min, max)) = self.min_max_opt.lock().unwrap().clone() { + return (min, max); + } + let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64)); + *self.min_max_opt.lock().unwrap() = Some((min, max)); + (min, max) + } +} +impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> { + fn get_val(&self, idx: u64) -> u64 { + let mut random_seeker_lock = self.random_seeker.lock().unwrap(); + if random_seeker_lock.seek_next_id > idx { + *random_seeker_lock = MultivalueStartIndexRandomSeeker { + seek_head: MultivalueStartIndexIter { + column: self.column, + doc_id_map: self.doc_id_map, + new_doc_id: 0, + offset: 0u64, + }, + seek_next_id: 0u64, + }; + } + let to_skip = idx - random_seeker_lock.seek_next_id; + random_seeker_lock.seek_next_id = idx + 1; + random_seeker_lock.seek_head.nth(to_skip as usize).unwrap() + } + + fn min_value(&self) -> u64 { + self.minmax().0 + } + + fn max_value(&self) -> u64 { + self.minmax().1 + } + + fn num_vals(&self) -> u64 { + (self.doc_id_map.num_new_doc_ids() + 1) as u64 + } + + fn iter<'b>(&'b self) -> Box + 'b> { + Box::new(MultivalueStartIndexIter { + column: &self.column, + doc_id_map: self.doc_id_map, + new_doc_id: 0, + offset: 0, + }) + } +} + +struct MultivalueStartIndexIter<'a, C: Column> { + pub column: &'a C, + pub doc_id_map: &'a DocIdMapping, + pub new_doc_id: usize, + pub offset: u64, +} + +impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> { + type Item = u64; + + fn next(&mut self) -> Option { + if self.new_doc_id > self.doc_id_map.num_new_doc_ids() { + return None; + } + let new_doc_id = self.new_doc_id; + self.new_doc_id += 1; + let start_offset = self.offset; + if new_doc_id < self.doc_id_map.num_new_doc_ids() { + let old_doc = self.doc_id_map.get_old_doc_id(new_doc_id as u32) as u64; + let num_vals_for_doc = self.column.get_val(old_doc + 1) - self.column.get_val(old_doc); + self.offset += num_vals_for_doc; + } + Some(start_offset) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_multivalue_start_index() { + let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![4, 1, 2]); + assert_eq!(doc_id_mapping.num_old_doc_ids(), 5); + let col = VecColumn::from(&[0u64, 3, 5, 10, 12, 16][..]); + let multivalue_start_index = MultivalueStartIndex::new( + &col, // 3, 2, 5, 2, 4 + &doc_id_mapping, + ); + assert_eq!(multivalue_start_index.num_vals(), 4); + assert_eq!( + multivalue_start_index.iter().collect::>(), + vec![0, 4, 6, 11] + ); // 4, 2, 5 + } + + #[test] + fn test_multivalue_get_vals() { + let doc_id_mapping = DocIdMapping::from_new_id_to_old_id(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_eq!(doc_id_mapping.num_old_doc_ids(), 10); + let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55,][..]); + let multivalue_start_index = MultivalueStartIndex::new( + &col, + &doc_id_mapping, + ); + assert_eq!( + multivalue_start_index.iter().collect::>(), + vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55] + ); + assert_eq!(multivalue_start_index.num_vals(), 11); + assert_eq!(multivalue_start_index.get_val(3), 2); + assert_eq!(multivalue_start_index.get_val(5), 5); + assert_eq!(multivalue_start_index.get_val(8), 21); + assert_eq!(multivalue_start_index.get_val(4), 3); + assert_eq!(multivalue_start_index.get_val(0), 0); + assert_eq!(multivalue_start_index.get_val(10), 55); + } + +} diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index 10b692e2f..1447c0455 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -1,7 +1,7 @@ use std::io::{self, Write}; use common::{BinarySerializable, CountingWriter}; -pub use fastfield_codecs::bitpacked::{BitpackedCodec, BitpackedSerializerLegacy}; +pub use fastfield_codecs::bitpacked::BitpackedCodec; pub use fastfield_codecs::{Column, FastFieldCodec, FastFieldStats}; use fastfield_codecs::{FastFieldCodecType, MonotonicallyMappableToU64, ALL_CODEC_TYPES}; @@ -85,40 +85,6 @@ impl CompositeFastFieldSerializer { Ok(()) } - /// Start serializing a new u64 fast field - pub fn serialize_into( - &mut self, - field: Field, - min_value: u64, - max_value: u64, - ) -> io::Result>> { - self.new_u64_fast_field_with_idx(field, min_value, max_value, 0) - } - - /// Start serializing a new u64 fast field - pub fn new_u64_fast_field( - &mut self, - field: Field, - min_value: u64, - max_value: u64, - ) -> io::Result>> { - self.new_u64_fast_field_with_idx(field, min_value, max_value, 0) - } - - /// Start serializing a new u64 fast field - pub fn new_u64_fast_field_with_idx( - &mut self, - field: Field, - min_value: u64, - max_value: u64, - idx: usize, - ) -> io::Result>> { - let field_write = self.composite_write.for_field_with_idx(field, idx); - // Prepend codec id to field data for compatibility with DynamicFastFieldReader. - FastFieldCodecType::Bitpacked.serialize(field_write)?; - BitpackedSerializerLegacy::open(field_write, min_value, max_value) - } - /// Start serializing a new [u8] fast field pub fn new_bytes_fast_field_with_idx( &mut self, diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 1c0adadb1..d93772b7e 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -211,12 +211,12 @@ impl FastFieldsWriter { /// Serializes all of the `FastFieldWriter`s by pushing them in /// order to the fast field serializer. pub fn serialize( - &self, + self, serializer: &mut CompositeFastFieldSerializer, mapping: &HashMap>, doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()> { - for field_writer in &self.term_id_writers { + for field_writer in self.term_id_writers { let field = field_writer.field(); field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?; } @@ -224,11 +224,11 @@ impl FastFieldsWriter { field_writer.serialize(serializer, doc_id_map)?; } - for field_writer in &self.multi_values_writers { + for field_writer in self.multi_values_writers { let field = field_writer.field(); field_writer.serialize(serializer, mapping.get(&field), doc_id_map)?; } - for field_writer in &self.bytes_value_writers { + for field_writer in self.bytes_value_writers { field_writer.serialize(serializer, doc_id_map)?; } Ok(()) diff --git a/src/indexer/doc_id_mapping.rs b/src/indexer/doc_id_mapping.rs index 6cba76840..b9183c9eb 100644 --- a/src/indexer/doc_id_mapping.rs +++ b/src/indexer/doc_id_mapping.rs @@ -91,6 +91,12 @@ impl DocIdMapping { .map(|old_doc| els[*old_doc as usize]) .collect() } + pub fn num_new_doc_ids(&self) -> usize { + self.new_doc_id_to_old.len() + } + pub fn num_old_doc_ids(&self) -> usize { + self.old_doc_id_to_new.len() + } } pub(crate) fn expect_field_id_for_sort_field( diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 57f8b629e..1c6682cbc 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -612,25 +612,23 @@ impl IndexMerger { .collect::>(); // We can now write the actual fast field values. // In the case of hierarchical facets, they are actually term ordinals. - let max_term_ord = term_ordinal_mappings.max_term_ord(); { - let mut serialize_vals = - fast_field_serializer.new_u64_fast_field_with_idx(field, 0u64, max_term_ord, 1)?; - let mut vals: Vec = Vec::with_capacity(100); - + let mut vals = Vec::new(); + let mut buffer = Vec::new(); for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { let term_ordinal_mapping: &[TermOrdinal] = term_ordinal_mappings.get_segment(old_doc_addr.segment_ord as usize); let ff_reader = &fast_field_reader[old_doc_addr.segment_ord as usize]; - ff_reader.get_vals(old_doc_addr.doc_id, &mut vals); - for &prev_term_ord in &vals { + ff_reader.get_vals(old_doc_addr.doc_id, &mut buffer); + for &prev_term_ord in &buffer { let new_term_ord = term_ordinal_mapping[prev_term_ord as usize]; - serialize_vals.add_val(new_term_ord)?; + vals.push(new_term_ord); } } - serialize_vals.close_field()?; + let col = VecColumn::from(&vals[..]); + fast_field_serializer.create_auto_detect_u64_fast_field_with_idx(field, col, 1)?; } Ok(()) } diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index d48f326ac..8d84d4954 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -138,7 +138,7 @@ impl SegmentWriter { remap_and_write( &self.per_field_postings_writers, self.ctx, - &self.fast_field_writers, + self.fast_field_writers, &self.fieldnorms_writer, &self.schema, self.segment_serializer, @@ -345,7 +345,7 @@ impl SegmentWriter { fn remap_and_write( per_field_postings_writers: &PerFieldPostingsWriter, ctx: IndexingContext, - fast_field_writers: &FastFieldsWriter, + fast_field_writers: FastFieldsWriter, fieldnorms_writer: &FieldNormsWriter, schema: &Schema, mut serializer: SegmentSerializer,