diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index e898c72ca..8ba8225bb 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -34,7 +34,9 @@ pub use self::reader::DynamicFastFieldReader; pub use self::reader::FastFieldReader; pub use self::readers::FastFieldReaders; pub use self::serializer::CompositeFastFieldSerializer; +pub use self::serializer::FastFieldDataAccess; pub use self::serializer::FastFieldSerializer; +pub use self::serializer::FastFieldStats; pub use self::writer::{FastFieldsWriter, IntFastFieldWriter}; use crate::schema::Cardinality; use crate::schema::FieldType; @@ -239,7 +241,7 @@ mod tests { #[test] pub fn test_fastfield() { - let test_fastfield = BitpackedFastFieldReader::::from(vec![100, 200, 300]); + let test_fastfield = DynamicFastFieldReader::::from(vec![100, 200, 300]); assert_eq!(test_fastfield.get(0), 100); assert_eq!(test_fastfield.get(1), 200); assert_eq!(test_fastfield.get(2), 300); @@ -268,10 +270,10 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(&path).unwrap(); - assert_eq!(file.len(), 36 as usize); + assert_eq!(file.len(), 37 as usize); let composite_file = CompositeFile::open(&file)?; let file = composite_file.open_read(*FIELD).unwrap(); - let fast_field_reader = BitpackedFastFieldReader::::open(file)?; + let fast_field_reader = DynamicFastFieldReader::::open(file)?; assert_eq!(fast_field_reader.get(0), 13u64); assert_eq!(fast_field_reader.get(1), 14u64); assert_eq!(fast_field_reader.get(2), 2u64); @@ -299,11 +301,11 @@ mod tests { serializer.close()?; } let file = directory.open_read(&path)?; - assert_eq!(file.len(), 61 as usize); + assert_eq!(file.len(), 62 as usize); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite.open_read(*FIELD).unwrap(); - let fast_field_reader = BitpackedFastFieldReader::::open(data)?; + let fast_field_reader = DynamicFastFieldReader::::open(data)?; assert_eq!(fast_field_reader.get(0), 4u64); assert_eq!(fast_field_reader.get(1), 14_082_001u64); assert_eq!(fast_field_reader.get(2), 3_052u64); @@ -335,11 +337,11 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(&path).unwrap(); - assert_eq!(file.len(), 34 as usize); + assert_eq!(file.len(), 35 as usize); { let fast_fields_composite = CompositeFile::open(&file).unwrap(); let data = fast_fields_composite.open_read(*FIELD).unwrap(); - let fast_field_reader = BitpackedFastFieldReader::::open(data)?; + let fast_field_reader = DynamicFastFieldReader::::open(data)?; for doc in 0..10_000 { assert_eq!(fast_field_reader.get(doc), 100_000u64); } @@ -367,11 +369,11 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(&path).unwrap(); - assert_eq!(file.len(), 80042 as usize); + assert_eq!(file.len(), 80043 as usize); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite.open_read(*FIELD).unwrap(); - let fast_field_reader = BitpackedFastFieldReader::::open(data)?; + let fast_field_reader = DynamicFastFieldReader::::open(data)?; assert_eq!(fast_field_reader.get(0), 0u64); for doc in 1..10_001 { assert_eq!( @@ -406,11 +408,11 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(&path).unwrap(); - assert_eq!(file.len(), 17709 as usize); + assert_eq!(file.len(), 17710 as usize); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite.open_read(i64_field).unwrap(); - let fast_field_reader = BitpackedFastFieldReader::::open(data)?; + let fast_field_reader = DynamicFastFieldReader::::open(data)?; assert_eq!(fast_field_reader.min_value(), -100i64); assert_eq!(fast_field_reader.max_value(), 9_999i64); @@ -450,7 +452,7 @@ mod tests { { let fast_fields_composite = CompositeFile::open(&file).unwrap(); let data = fast_fields_composite.open_read(i64_field).unwrap(); - let fast_field_reader = BitpackedFastFieldReader::::open(data)?; + let fast_field_reader = DynamicFastFieldReader::::open(data)?; assert_eq!(fast_field_reader.get(0u32), 0i64); } Ok(()) @@ -483,7 +485,7 @@ mod tests { { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite.open_read(*FIELD).unwrap(); - let fast_field_reader = BitpackedFastFieldReader::::open(data)?; + let fast_field_reader = DynamicFastFieldReader::::open(data)?; let mut a = 0u64; for _ in 0..n { diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 1e1777c67..40428a328 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -1,4 +1,4 @@ -use crate::fastfield::serializer::DynamicFastFieldSerializer; +use crate::fastfield::serializer::BitpackedFastFieldSerializer; use crate::fastfield::serializer::FastFieldSerializer; use crate::fastfield::CompositeFastFieldSerializer; use crate::postings::UnorderedTermId; @@ -155,7 +155,7 @@ impl MultiValuedFastFieldWriter { } { // writing the values themselves. - let mut value_serializer: DynamicFastFieldSerializer<'_, _>; + let mut value_serializer: BitpackedFastFieldSerializer<'_, _>; match mapping_opt { Some(mapping) => { value_serializer = serializer.new_u64_fast_field_with_idx( diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 7050a1b9a..1effdec54 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -67,8 +67,12 @@ pub enum DynamicFastFieldReader { impl DynamicFastFieldReader { /// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. pub fn open(file: FileSlice) -> crate::Result> { + let mut bytes = file.read_bytes()?; + let (mut id_bytes, data_bytes) = bytes.split(1); + let id = u8::deserialize(&mut id_bytes)?; + Ok(DynamicFastFieldReader::Bitpacked( - BitpackedFastFieldReader::open(file)?, + BitpackedFastFieldReader::open_from_bytes(data_bytes)?, )) } } @@ -113,6 +117,11 @@ impl BitpackedFastFieldReader { /// Opens a fast field given a file. pub fn open(file: FileSlice) -> crate::Result { let mut bytes = file.read_bytes()?; + let _id = u8::deserialize(&mut bytes)?; + Self::open_from_bytes(bytes) + } + /// Opens a fast field given a file. + pub fn open_from_bytes(mut bytes: OwnedBytes) -> crate::Result { let min_value = u64::deserialize(&mut bytes)?; let amplitude = u64::deserialize(&mut bytes)?; let max_value = min_value + amplitude; @@ -198,8 +207,8 @@ impl FastFieldReader for BitpackedFastFieldReader { } } -impl From> for BitpackedFastFieldReader { - fn from(vals: Vec) -> BitpackedFastFieldReader { +impl From> for DynamicFastFieldReader { + fn from(vals: Vec) -> DynamicFastFieldReader { let mut schema_builder = Schema::builder(); let field = schema_builder.add_u64_field("field", FAST); let schema = schema_builder.build(); @@ -231,6 +240,6 @@ impl From> for BitpackedFastFieldReader { let field_file = composite_file .open_read(field) .expect("File component not found"); - BitpackedFastFieldReader::open(field_file).unwrap() + DynamicFastFieldReader::open(field_file).unwrap() } } diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs deleted file mode 100644 index 28199bfa7..000000000 --- a/src/fastfield/serializer.rs +++ /dev/null @@ -1,256 +0,0 @@ -use crate::common::BinarySerializable; -use crate::common::CompositeWrite; -use crate::common::CountingWriter; -use crate::directory::WritePtr; -use crate::schema::Field; -use std::io::{self, Write}; -use tantivy_bitpacker::compute_num_bits; -use tantivy_bitpacker::BitPacker; - -/// `CompositeFastFieldSerializer` is in charge of serializing -/// fastfields on disk. -/// -/// Fast fields have differnt encodings like bit-packing. -/// -/// `FastFieldWriter`s are in charge of pushing the data to -/// the serializer. -/// The serializer expects to receive the following calls. -/// -/// * `new_u64_fast_field(...)` -/// * `add_val(...)` -/// * `add_val(...)` -/// * `add_val(...)` -/// * ... -/// * `close_field()` -/// * `new_u64_fast_field(...)` -/// * `add_val(...)` -/// * ... -/// * `close_field()` -/// * `close()` -pub struct CompositeFastFieldSerializer { - composite_write: CompositeWrite, -} - -impl CompositeFastFieldSerializer { - /// Constructor - pub fn from_write(write: WritePtr) -> io::Result { - // just making room for the pointer to header. - let composite_write = CompositeWrite::wrap(write); - Ok(CompositeFastFieldSerializer { composite_write }) - } - - /// Start serializing a new u64 fast field - pub fn new_u64_fast_field( - &mut self, - field: Field, - min_value: u64, - max_value: u64, - ) -> io::Result>> { - self.new_u64_fast_field_with_idx(field, min_value, max_value, 0) - } - - /// Start serializing a new u64 fast field - pub fn new_u64_fast_field_with_idx( - &mut self, - field: Field, - min_value: u64, - max_value: u64, - idx: usize, - ) -> io::Result>> { - let field_write = self.composite_write.for_field_with_idx(field, idx); - DynamicFastFieldSerializer::open(field_write, min_value, max_value) - } - - /// Start serializing a new [u8] fast field - pub fn new_bytes_fast_field_with_idx( - &mut self, - field: Field, - idx: usize, - ) -> FastBytesFieldSerializer<'_, CountingWriter> { - let field_write = self.composite_write.for_field_with_idx(field, idx); - FastBytesFieldSerializer { write: field_write } - } - - /// Closes the serializer - /// - /// After this call the data must be persistently save on disk. - pub fn close(self) -> io::Result<()> { - self.composite_write.close() - } -} - -#[derive(Debug, Clone)] -pub struct EstimationStats { - min_value: u64, - max_value: u64, -} -/// The FastFieldSerializer trait is the common interface -/// implemented by every fastfield serializer variant. -/// -/// `DynamicFastFieldSerializer` is the enum wrapping all variants. -/// It is used to create an serializer instance. -pub trait FastFieldSerializer { - /// add value to serializer - fn add_val(&mut self, val: u64) -> io::Result<()>; - /// finish serializing a field. - fn close_field(self) -> io::Result<()>; -} - -/// The FastFieldSerializerEstimate trait is required on all variants -/// of fast field compressions, to decide which one to choose. -pub trait FastFieldSerializerEstimate { - /// returns an estimate of the compression ratio. - fn estimate( - /*fastfield_accessor: impl FastFieldReader,*/ stats: EstimationStats, - ) -> (f32, &'static str); - /// the unique name of the compressor - fn name() -> &'static str; -} - -pub enum DynamicFastFieldSerializer<'a, W: Write> { - Bitpacked(BitpackedFastFieldSerializer<'a, W>), -} - -impl<'a, W: Write> DynamicFastFieldSerializer<'a, W> { - /// Creates a new fast field serializer. - /// - /// The serializer in fact encode the values by bitpacking - /// `(val - min_value)`. - /// - /// It requires a `min_value` and a `max_value` to compute - /// compute the minimum number of bits required to encode - /// values. - pub fn open( - write: &'a mut W, - min_value: u64, - max_value: u64, - ) -> io::Result> { - let stats = EstimationStats { - min_value, - max_value, - }; - let (_ratio, name) = ( - BitpackedFastFieldSerializer::>::estimate(stats), - BitpackedFastFieldSerializer::>::name(), - ); - Self::open_from_name(write, min_value, max_value, name) - } - - /// Creates a new fast field serializer. - /// - /// The serializer in fact encode the values by bitpacking - /// `(val - min_value)`. - /// - /// It requires a `min_value` and a `max_value` to compute - /// compute the minimum number of bits required to encode - /// values. - pub fn open_from_name( - write: &'a mut W, - min_value: u64, - max_value: u64, - name: &str, - ) -> io::Result> { - // Weirdly the W generic on BitpackedFastFieldSerializer needs to be set, - // although name() doesn't use it - let variant = if name == BitpackedFastFieldSerializer::>::name() { - DynamicFastFieldSerializer::Bitpacked(BitpackedFastFieldSerializer::open( - write, min_value, max_value, - )?) - } else { - panic!("unknown fastfield serializer {}", name); - }; - - Ok(variant) - } -} -impl<'a, W: Write> FastFieldSerializer for DynamicFastFieldSerializer<'a, W> { - fn add_val(&mut self, val: u64) -> io::Result<()> { - match self { - Self::Bitpacked(serializer) => serializer.add_val(val), - } - } - fn close_field(self) -> io::Result<()> { - match self { - Self::Bitpacked(serializer) => serializer.close_field(), - } - } -} - -pub struct BitpackedFastFieldSerializer<'a, W: Write> { - bit_packer: BitPacker, - write: &'a mut W, - min_value: u64, - num_bits: u8, -} - -impl<'a, W: Write> BitpackedFastFieldSerializer<'a, W> { - /// Creates a new fast field serializer. - /// - /// The serializer in fact encode the values by bitpacking - /// `(val - min_value)`. - /// - /// It requires a `min_value` and a `max_value` to compute - /// compute the minimum number of bits required to encode - /// values. - fn open( - write: &'a mut W, - min_value: u64, - max_value: u64, - ) -> io::Result> { - assert!(min_value <= max_value); - min_value.serialize(write)?; - let amplitude = max_value - min_value; - amplitude.serialize(write)?; - let num_bits = compute_num_bits(amplitude); - let bit_packer = BitPacker::new(); - Ok(BitpackedFastFieldSerializer { - bit_packer, - write, - min_value, - num_bits, - }) - } -} - -impl<'a, W: 'a + Write> FastFieldSerializer for BitpackedFastFieldSerializer<'a, W> { - /// Pushes a new value to the currently open u64 fast field. - fn add_val(&mut self, val: u64) -> io::Result<()> { - let val_to_write: u64 = val - self.min_value; - self.bit_packer - .write(val_to_write, self.num_bits, &mut self.write)?; - Ok(()) - } - fn close_field(mut self) -> io::Result<()> { - self.bit_packer.close(&mut self.write) - } -} - -impl<'a, W: 'a + Write> FastFieldSerializerEstimate for BitpackedFastFieldSerializer<'a, W> { - fn estimate( - /*_fastfield_accessor: impl FastFieldReader, */ stats: EstimationStats, - ) -> (f32, &'static str) { - let amplitude = stats.max_value - stats.min_value; - let num_bits = compute_num_bits(amplitude); - let num_bits_uncompressed = 64; - let ratio = num_bits as f32 / num_bits_uncompressed as f32; - let name = Self::name(); - (ratio, name) - } - fn name() -> &'static str { - "Bitpacked" - } -} - -pub struct FastBytesFieldSerializer<'a, W: Write> { - write: &'a mut W, -} - -impl<'a, W: Write> FastBytesFieldSerializer<'a, W> { - pub fn write_all(&mut self, vals: &[u8]) -> io::Result<()> { - self.write.write_all(vals) - } - - pub fn flush(&mut self) -> io::Result<()> { - self.write.flush() - } -} diff --git a/src/fastfield/serializer/bitpacked.rs b/src/fastfield/serializer/bitpacked.rs new file mode 100644 index 000000000..8dc6e9c4f --- /dev/null +++ b/src/fastfield/serializer/bitpacked.rs @@ -0,0 +1,97 @@ +use super::FastFieldDataAccess; +use super::FastFieldSerializer; +use super::FastFieldSerializerEstimate; +use super::FastFieldStats; +use crate::common::BinarySerializable; +use std::io::{self, Write}; +use tantivy_bitpacker::compute_num_bits; +use tantivy_bitpacker::BitPacker; + +pub struct BitpackedFastFieldSerializer<'a, W: 'a + Write> { + bit_packer: BitPacker, + write: &'a mut W, + min_value: u64, + num_bits: u8, +} + +impl<'a, W: Write> BitpackedFastFieldSerializer<'a, W> { + /// Creates a new fast field serializer. + /// + /// The serializer in fact encode the values by bitpacking + /// `(val - min_value)`. + /// + /// It requires a `min_value` and a `max_value` to compute + /// compute the minimum number of bits required to encode + /// values. + pub(crate) fn open( + write: &'a mut W, + min_value: u64, + max_value: u64, + ) -> io::Result> { + assert!(min_value <= max_value); + min_value.serialize(write)?; + let amplitude = max_value - min_value; + amplitude.serialize(write)?; + let num_bits = compute_num_bits(amplitude); + let bit_packer = BitPacker::new(); + Ok(BitpackedFastFieldSerializer { + bit_packer, + write, + min_value, + num_bits, + }) + } + /// Creates a new fast field serializer. + /// + /// The serializer in fact encode the values by bitpacking + /// `(val - min_value)`. + /// + /// It requires a `min_value` and a `max_value` to compute + /// compute the minimum number of bits required to encode + /// values. + pub(crate) fn create( + write: &'a mut W, + fastfield_accessor: &impl FastFieldDataAccess, + stats: FastFieldStats, + data_iter: impl Iterator, + ) -> io::Result<()> { + let mut serializer = Self::open(write, stats.min_value, stats.max_value)?; + + for val in data_iter { + serializer.add_val(val)?; + } + serializer.close_field()?; + + Ok(()) + } +} + +impl<'a, W: 'a + Write> FastFieldSerializer for BitpackedFastFieldSerializer<'a, W> { + /// Pushes a new value to the currently open u64 fast field. + fn add_val(&mut self, val: u64) -> io::Result<()> { + let val_to_write: u64 = val - self.min_value; + self.bit_packer + .write(val_to_write, self.num_bits, &mut self.write)?; + Ok(()) + } + fn close_field(mut self) -> io::Result<()> { + self.bit_packer.close(&mut self.write) + } +} + +impl<'a, W: 'a + Write> FastFieldSerializerEstimate for BitpackedFastFieldSerializer<'a, W> { + fn estimate( + _fastfield_accessor: &impl FastFieldDataAccess, + stats: FastFieldStats, + ) -> (f32, &'static str) { + let amplitude = stats.max_value - stats.min_value; + let num_bits = compute_num_bits(amplitude); + let num_bits_uncompressed = 64; + let ratio = num_bits as f32 / num_bits_uncompressed as f32; + let name = Self::codec_id().0; + (ratio, name) + } + fn codec_id() -> (&'static str, u8) { + ("Bitpacked", 1) + } +} diff --git a/src/fastfield/serializer/linearinterpol.rs b/src/fastfield/serializer/linearinterpol.rs new file mode 100644 index 000000000..b8a2c9c93 --- /dev/null +++ b/src/fastfield/serializer/linearinterpol.rs @@ -0,0 +1,83 @@ +use super::FastFieldDataAccess; +use super::FastFieldSerializer; +use super::FastFieldSerializerEstimate; +use super::FastFieldStats; +use crate::common::BinarySerializable; +use std::io::{self, Write}; +use tantivy_bitpacker::compute_num_bits; +use tantivy_bitpacker::BitPacker; + +/// Fastfield serializer, which tries to guess values by linear interpolation +/// and stores the difference. +pub struct LinearInterpolFastFieldSerializer<'a, W: 'a + Write> { + bit_packer: BitPacker, + write: &'a mut W, + min_value: u64, + num_bits: u8, +} + +impl<'a, W: Write> LinearInterpolFastFieldSerializer<'a, W> { + /// Creates a new fast field serializer. + /// + /// The serializer in fact encode the values by bitpacking + /// `(val - min_value)`. + /// + /// It requires a `min_value` and a `max_value` to compute + /// compute the minimum number of bits required to encode + /// values. + pub(crate) fn create( + write: &'a mut W, + fastfield_accessor: &impl FastFieldDataAccess, + stats: FastFieldStats, + data_iter: impl Iterator, + ) -> io::Result<()> { + assert!(stats.min_value <= stats.max_value); + stats.min_value.serialize(write)?; + let amplitude = stats.max_value - stats.min_value; + amplitude.serialize(write)?; + let num_bits = compute_num_bits(amplitude); + let mut serializer = LinearInterpolFastFieldSerializer { + bit_packer: BitPacker::new(), + write, + min_value: stats.min_value, + num_bits, + }; + + for val in data_iter { + serializer.add_val(val)?; + } + serializer.close_field()?; + + Ok(()) + } +} + +impl<'a, W: 'a + Write> FastFieldSerializer for LinearInterpolFastFieldSerializer<'a, W> { + /// Pushes a new value to the currently open u64 fast field. + fn add_val(&mut self, val: u64) -> io::Result<()> { + let val_to_write: u64 = val - self.min_value; + self.bit_packer + .write(val_to_write, self.num_bits, &mut self.write)?; + Ok(()) + } + fn close_field(mut self) -> io::Result<()> { + self.bit_packer.close(&mut self.write) + } +} + +impl<'a, W: 'a + Write> FastFieldSerializerEstimate for LinearInterpolFastFieldSerializer<'a, W> { + fn estimate( + _fastfield_accessor: &impl FastFieldDataAccess, + stats: FastFieldStats, + ) -> (f32, &'static str) { + let amplitude = stats.max_value - stats.min_value; + let num_bits = compute_num_bits(amplitude); + let num_bits_uncompressed = 64; + let ratio = num_bits as f32 / num_bits_uncompressed as f32; + let name = Self::codec_id().0; + (ratio, name) + } + fn codec_id() -> (&'static str, u8) { + ("Bitpacked", 2) + } +} diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs new file mode 100644 index 000000000..b8074fc4a --- /dev/null +++ b/src/fastfield/serializer/mod.rs @@ -0,0 +1,175 @@ +mod bitpacked; +mod linearinterpol; + +use crate::common::BinarySerializable; +use crate::common::CompositeWrite; +use crate::common::CountingWriter; +use crate::directory::WritePtr; +use crate::schema::Field; +use crate::DocId; +pub use bitpacked::BitpackedFastFieldSerializer; +use std::io::{self, Write}; + +/// FastFieldReader is the trait to access fast field data. +pub trait FastFieldDataAccess: Clone { + //type IteratorType: Iterator; + /// Return the value associated to the given document. + /// + /// Whenever possible use the Iterator passed to the fastfield creation instead, for performance reasons. + /// + /// # Panics + /// + /// May panic if `doc` is greater than the segment + fn get(&self, doc: DocId) -> u64; +} + +/// `CompositeFastFieldSerializer` is in charge of serializing +/// fastfields on disk. +/// +/// Fast fields have different encodings like bit-packing. +/// +/// `FastFieldWriter`s are in charge of pushing the data to +/// the serializer. +/// The serializer expects to receive the following calls. +/// +/// * `new_u64_fast_field(...)` +/// * `add_val(...)` +/// * `add_val(...)` +/// * `add_val(...)` +/// * ... +/// * `close_field()` +/// * `new_u64_fast_field(...)` +/// * `add_val(...)` +/// * ... +/// * `close_field()` +/// * `close()` +pub struct CompositeFastFieldSerializer { + composite_write: CompositeWrite, +} + +impl CompositeFastFieldSerializer { + /// Constructor + pub fn from_write(write: WritePtr) -> io::Result { + // just making room for the pointer to header. + let composite_write = CompositeWrite::wrap(write); + Ok(CompositeFastFieldSerializer { composite_write }) + } + + /// Serialize data into a new u64 fast field. The compression will be detected automatically. + pub fn create_auto_detect_u64_fast_field( + &mut self, + field: Field, + stats: FastFieldStats, + fastfield_accessor: impl FastFieldDataAccess, + data_iter_1: impl Iterator, + data_iter_2: impl Iterator, + ) -> io::Result<()> { + let field_write = self.composite_write.for_field_with_idx(field, 0); + + let (_ratio, (name, id)) = ( + BitpackedFastFieldSerializer::>::estimate(&fastfield_accessor, stats.clone()), + BitpackedFastFieldSerializer::>::codec_id(), + ); + + id.serialize(field_write)?; + if name == BitpackedFastFieldSerializer::>::codec_id().0 { + BitpackedFastFieldSerializer::create( + field_write, + &fastfield_accessor, + stats, + data_iter_1, + )?; + } else { + panic!("unknown fastfield serializer {}", name); + }; + + Ok(()) + } + + /// Start serializing a new u64 fast field + pub fn new_u64_fast_field( + &mut self, + field: Field, + min_value: u64, + max_value: u64, + ) -> io::Result>> { + self.new_u64_fast_field_with_idx(field, min_value, max_value, 0) + } + + /// Start serializing a new u64 fast field + pub fn new_u64_fast_field_with_idx( + &mut self, + field: Field, + min_value: u64, + max_value: u64, + idx: usize, + ) -> io::Result>> { + let field_write = self.composite_write.for_field_with_idx(field, idx); + // Prepend codec id to field data for compatibility with DynamicFastFieldReader. + let (_name, id) = BitpackedFastFieldSerializer::>::codec_id(); + id.serialize(field_write)?; + BitpackedFastFieldSerializer::open(field_write, min_value, max_value) + } + + /// Start serializing a new [u8] fast field + pub fn new_bytes_fast_field_with_idx( + &mut self, + field: Field, + idx: usize, + ) -> FastBytesFieldSerializer<'_, CountingWriter> { + let field_write = self.composite_write.for_field_with_idx(field, idx); + FastBytesFieldSerializer { write: field_write } + } + + /// Closes the serializer + /// + /// After this call the data must be persistently save on disk. + pub fn close(self) -> io::Result<()> { + self.composite_write.close() + } +} + +#[derive(Debug, Clone)] +pub struct FastFieldStats { + pub min_value: u64, + pub max_value: u64, + pub num_vals: u64, +} + +/// The FastFieldSerializer trait is the common interface +/// implemented by every fastfield serializer variant. +/// +/// `DynamicFastFieldSerializer` is the enum wrapping all variants. +/// It is used to create an serializer instance. +pub trait FastFieldSerializer { + /// add value to serializer + fn add_val(&mut self, val: u64) -> io::Result<()>; + /// finish serializing a field. + fn close_field(self) -> io::Result<()>; +} + +/// The FastFieldSerializerEstimate trait is required on all variants +/// of fast field compressions, to decide which one to choose. +pub trait FastFieldSerializerEstimate { + /// returns an estimate of the compression ratio. + fn estimate( + fastfield_accessor: &impl FastFieldDataAccess, + stats: FastFieldStats, + ) -> (f32, &'static str); + /// the unique (name, id) of the compressor. Used to distinguish when de/serializing. + fn codec_id() -> (&'static str, u8); +} + +pub struct FastBytesFieldSerializer<'a, W: Write> { + write: &'a mut W, +} + +impl<'a, W: Write> FastBytesFieldSerializer<'a, W> { + pub fn write_all(&mut self, vals: &[u8]) -> io::Result<()> { + self.write.write_all(vals) + } + + pub fn flush(&mut self) -> io::Result<()> { + self.write.flush() + } +} diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index e99fc7148..0cd9f30d8 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -1,4 +1,7 @@ use super::multivalued::MultiValuedFastFieldWriter; +use super::serializer::FastFieldStats; +use super::FastFieldDataAccess; +use super::FastFieldReader; use crate::common; use crate::fastfield::serializer::FastFieldSerializer; use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer}; @@ -6,6 +9,7 @@ use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema}; use crate::termdict::TermOrdinal; +use crate::DocId; use fnv::FnvHashMap; use std::collections::HashMap; use std::io; @@ -265,9 +269,9 @@ impl IntFastFieldWriter { self.add_val(val); } - /// Extract the stored data - pub(crate) fn get_data(&self) -> Vec { - self.vals.iter().collect::>() + /// get iterator over the data + pub(crate) fn iter(&self) -> impl Iterator + '_ { + self.vals.iter() } /// Push the fast fields value to the `FastFieldWriter`. @@ -281,17 +285,58 @@ impl IntFastFieldWriter { } else { (self.val_min, self.val_max) }; - let mut single_field_serializer = serializer.new_u64_fast_field(self.field, min, max)?; - if let Some(doc_id_map) = doc_id_map { - for doc_id in doc_id_map.iter_old_doc_ids() { - single_field_serializer.add_val(self.vals.get(*doc_id as usize))?; - } - } else { - for val in self.vals.iter() { - single_field_serializer.add_val(val)?; - } + let fastfield_accessor = WriterFastFieldAccessProvider { + doc_id_map, + vals: &self.vals, + }; + let stats = FastFieldStats { + min_value: min, + max_value: max, + num_vals: self.val_count as u64, }; - single_field_serializer.close_field() + if let Some(doc_id_map) = doc_id_map { + let iter = doc_id_map + .iter_old_doc_ids() + .map(|doc_id| self.vals.get(*doc_id as usize)); + serializer.create_auto_detect_u64_fast_field( + self.field, + stats, + fastfield_accessor, + iter.clone(), + iter, + )?; + } else { + serializer.create_auto_detect_u64_fast_field( + self.field, + stats, + fastfield_accessor, + self.vals.iter(), + self.vals.iter(), + )?; + }; + Ok(()) + } +} + +#[derive(Clone)] +struct WriterFastFieldAccessProvider<'map, 'bitp> { + doc_id_map: Option<&'map DocIdMapping>, + vals: &'bitp BlockedBitpacker, +} +impl<'map, 'bitp> FastFieldDataAccess for WriterFastFieldAccessProvider<'map, 'bitp> { + /// Return the value associated to the given document. + /// + /// This accessor should return as fast as possible. + /// + /// # Panics + /// + /// May panic if `doc` is greater than the segment + fn get(&self, doc: DocId) -> u64 { + if let Some(doc_id_map) = self.doc_id_map { + self.vals.get(doc_id_map.get_old_doc_id(doc) as usize) // consider extra FastFieldReader wrapper for non doc_id_map + } else { + self.vals.get(doc as usize) + } } } diff --git a/src/indexer/doc_id_mapping.rs b/src/indexer/doc_id_mapping.rs index dd8f9aa7d..65c3f03f6 100644 --- a/src/indexer/doc_id_mapping.rs +++ b/src/indexer/doc_id_mapping.rs @@ -60,9 +60,8 @@ pub(crate) fn get_doc_id_mapping_from_field( })?; // create new doc_id to old doc_id index (used in fast_field_writers) - let data = fast_field.get_data(); - let mut doc_id_and_data = data - .into_iter() + let mut doc_id_and_data = fast_field + .iter() .enumerate() .map(|el| (el.0 as DocId, el.1)) .collect::>(); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 3bcc07a05..b227c6f98 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -3,8 +3,10 @@ use crate::error::DataCorruption; use crate::fastfield::CompositeFastFieldSerializer; use crate::fastfield::DeleteBitSet; use crate::fastfield::DynamicFastFieldReader; +use crate::fastfield::FastFieldDataAccess; use crate::fastfield::FastFieldReader; use crate::fastfield::FastFieldSerializer; +use crate::fastfield::FastFieldStats; use crate::fastfield::MultiValuedFastFieldReader; use crate::fieldnorm::FieldNormsSerializer; use crate::fieldnorm::FieldNormsWriter; @@ -344,21 +346,71 @@ impl IndexMerger { }) .collect::>(); if let Some(doc_id_mapping) = doc_id_mapping { - let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| { - ( - doc_id, - &fast_field_readers[reader_with_ordinal.ordinal as usize], - ) - }); - // add values in order of the new doc_ids - let mut fast_single_field_serializer = - fast_field_serializer.new_u64_fast_field(field, min_value, max_value)?; - for (doc_id, field_reader) in sorted_doc_ids { - let val = field_reader.get(*doc_id); - fast_single_field_serializer.add_val(val)?; + //struct SortedDocidAccessor {}; + #[derive(Clone)] + struct SortedDocidFieldAccessProvider<'a> { + doc_id_mapping: &'a Vec<(DocId, SegmentReaderWithOrdinal<'a>)>, + fast_field_readers: &'a Vec>, } + impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> { + //type IteratorType = ; + //type IteratorType: std::iter::Map)>, _> + //type IteratorType = + //std::iter::Map)>, u64>; + fn get(&self, doc: DocId) -> u64 { + let (doc_id, reader_with_ordinal) = self.doc_id_mapping[doc as usize]; + self.fast_field_readers[reader_with_ordinal.ordinal as usize].get(doc_id) + } - fast_single_field_serializer.close_field()?; + //fn iter(&self) -> Self::IteratorType { + //self.doc_id_mapping + //.iter() + //.map(|(doc_id, reader_with_ordinal)| { + //let fast_field_reader = + //&self.fast_field_readers[reader_with_ordinal.ordinal as usize]; + //let val = self.field_reader.get(*doc_id); + //val + //}) + //} + } + let stats = FastFieldStats { + min_value, + max_value, + num_vals: doc_id_mapping.len() as u64, + }; + let fastfield_accessor = SortedDocidFieldAccessProvider { + doc_id_mapping, + fast_field_readers: &fast_field_readers, + }; + let iter = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| { + let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize]; + let val = fast_field_reader.get(*doc_id); + val + }); + fast_field_serializer.create_auto_detect_u64_fast_field( + field, + stats, + fastfield_accessor, + iter.clone(), + iter, + )?; + + //let sorted_doc_ids = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| { + //( + //doc_id, + //&fast_field_readers[reader_with_ordinal.ordinal as usize], + //) + //}); + //// add values in order of the new doc_ids + + //let mut fast_single_field_serializer = + //fast_field_serializer.new_u64_fast_field(field, min_value, max_value)?; + //for (doc_id, field_reader) in sorted_doc_ids { + //let val = field_reader.get(*doc_id); + //fast_single_field_serializer.add_val(val)?; + //} + + //fast_single_field_serializer.close_field()?; Ok(()) } else { let u64_readers = self.readers.iter()