From 29d56111de678d9059e1808ab115cc2c3f0c756d Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 7 Sep 2022 18:28:17 +0800 Subject: [PATCH] refactor, fix api refactor fix clippy fix docs remove unused code fix bytesfield index api flaw --- fastfield_codecs/benches/bench.rs | 3 - fastfield_codecs/src/column.rs | 2 +- fastfield_codecs/src/lib.rs | 88 +++++++++-------------- fastfield_codecs/src/monotonic_mapping.rs | 60 ++++++++++++++++ fastfield_codecs/src/serialize.rs | 43 ++--------- src/fastfield/bytes/writer.rs | 3 +- src/fastfield/multivalued/writer.rs | 62 ++++++++-------- src/fastfield/serializer/mod.rs | 40 ++++------- src/indexer/merger.rs | 10 +-- 9 files changed, 148 insertions(+), 163 deletions(-) create mode 100644 fastfield_codecs/src/monotonic_mapping.rs diff --git a/fastfield_codecs/benches/bench.rs b/fastfield_codecs/benches/bench.rs index 11f9fbfc6..c30df44e5 100644 --- a/fastfield_codecs/benches/bench.rs +++ b/fastfield_codecs/benches/bench.rs @@ -6,9 +6,6 @@ extern crate test; mod tests { use std::sync::Arc; - use fastfield_codecs::bitpacked::BitpackedCodec; - use fastfield_codecs::blockwise_linear::BlockwiseLinearCodec; - use fastfield_codecs::linear::LinearCodec; use fastfield_codecs::*; fn get_data() -> Vec { diff --git a/fastfield_codecs/src/column.rs b/fastfield_codecs/src/column.rs index dcaea689d..14cdb97b0 100644 --- a/fastfield_codecs/src/column.rs +++ b/fastfield_codecs/src/column.rs @@ -176,7 +176,7 @@ where T: Copy + Ord + Default, { fn min_max(&self) -> (T, T) { - if let Some((min, max)) = self.min_max_cache.lock().unwrap().clone() { + if let Some((min, max)) = *self.min_max_cache.lock().unwrap() { return (min, max); } let (min, max) = diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index e17ca3b6b..9a11f986a 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -9,21 +9,28 @@ extern crate test; use std::io; use std::io::Write; +use std::sync::Arc; use common::BinarySerializable; use ownedbytes::OwnedBytes; +use serialize::Header; mod bitpacked; mod blockwise_linear; pub(crate) mod line; mod linear; +mod monotonic_mapping; mod column; mod gcd; mod serialize; +pub use self::bitpacked::BitpackedCodec; +pub use self::blockwise_linear::BlockwiseLinearCodec; pub use self::column::{monotonic_map_column, Column, VecColumn}; -pub use self::serialize::{estimate, open, serialize, serialize_and_load, NormalizedHeader}; +pub use self::linear::LinearCodec; +pub use self::monotonic_mapping::MonotonicallyMappableToU64; +pub use self::serialize::{estimate, serialize, serialize_and_load, NormalizedHeader}; #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)] #[repr(u8)] @@ -61,70 +68,39 @@ impl FastFieldCodecType { } } -pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy { - /// Converts a value to u64. - /// - /// Internally all fast field values are encoded as u64. - fn to_u64(self) -> u64; - - /// Converts a value from u64 - /// - /// Internally all fast field values are encoded as u64. - /// **Note: To be used for converting encoded Term, Posting values.** - fn from_u64(val: u64) -> Self; -} - -impl MonotonicallyMappableToU64 for u64 { - fn to_u64(self) -> u64 { - self - } - - fn from_u64(val: u64) -> Self { - val - } -} - -impl MonotonicallyMappableToU64 for i64 { - #[inline(always)] - fn to_u64(self) -> u64 { - common::i64_to_u64(self) - } - - #[inline(always)] - fn from_u64(val: u64) -> Self { - common::u64_to_i64(val) - } -} - -impl MonotonicallyMappableToU64 for bool { - #[inline(always)] - fn to_u64(self) -> u64 { - if self { - 1 - } else { - 0 +/// Returns the correct codec reader wrapped in the `Arc` for the data. +pub fn open( + mut bytes: OwnedBytes, +) -> io::Result>> { + let header = Header::deserialize(&mut bytes)?; + match header.codec_type { + FastFieldCodecType::Bitpacked => open_specific_codec::(bytes, &header), + FastFieldCodecType::Linear => open_specific_codec::(bytes, &header), + FastFieldCodecType::BlockwiseLinear => { + open_specific_codec::(bytes, &header) } } - - #[inline(always)] - fn from_u64(val: u64) -> Self { - val > 0 - } } -impl MonotonicallyMappableToU64 for f64 { - fn to_u64(self) -> u64 { - common::f64_to_u64(self) - } - - fn from_u64(val: u64) -> Self { - common::u64_to_f64(val) +fn open_specific_codec( + bytes: OwnedBytes, + header: &Header, +) -> io::Result>> { + let normalized_header = header.normalized(); + let reader = C::open_from_bytes(bytes, normalized_header)?; + let min_value = header.min_value; + if let Some(gcd) = header.gcd { + let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val * gcd.get()); + Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping))) + } else { + let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val); + Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping))) } } /// The FastFieldSerializerEstimate trait is required on all variants /// of fast field compressions, to decide which one to choose. -trait FastFieldCodec: 'static { +pub trait FastFieldCodec: 'static { /// A codex needs to provide a unique name and id, which is /// used for debugging and de/serialization. const CODEC_TYPE: FastFieldCodecType; diff --git a/fastfield_codecs/src/monotonic_mapping.rs b/fastfield_codecs/src/monotonic_mapping.rs new file mode 100644 index 000000000..6b65e63e4 --- /dev/null +++ b/fastfield_codecs/src/monotonic_mapping.rs @@ -0,0 +1,60 @@ +pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy { + /// Converts a value to u64. + /// + /// Internally all fast field values are encoded as u64. + fn to_u64(self) -> u64; + + /// Converts a value from u64 + /// + /// Internally all fast field values are encoded as u64. + /// **Note: To be used for converting encoded Term, Posting values.** + fn from_u64(val: u64) -> Self; +} + +impl MonotonicallyMappableToU64 for u64 { + fn to_u64(self) -> u64 { + self + } + + fn from_u64(val: u64) -> Self { + val + } +} + +impl MonotonicallyMappableToU64 for i64 { + #[inline(always)] + fn to_u64(self) -> u64 { + common::i64_to_u64(self) + } + + #[inline(always)] + fn from_u64(val: u64) -> Self { + common::u64_to_i64(val) + } +} + +impl MonotonicallyMappableToU64 for bool { + #[inline(always)] + fn to_u64(self) -> u64 { + if self { + 1 + } else { + 0 + } + } + + #[inline(always)] + fn from_u64(val: u64) -> Self { + val > 0 + } +} + +impl MonotonicallyMappableToU64 for f64 { + fn to_u64(self) -> u64 { + common::f64_to_u64(self) + } + + fn from_u64(val: u64) -> Self { + common::u64_to_f64(val) + } +} diff --git a/fastfield_codecs/src/serialize.rs b/fastfield_codecs/src/serialize.rs index 28ed40d76..4753bb443 100644 --- a/fastfield_codecs/src/serialize.rs +++ b/fastfield_codecs/src/serialize.rs @@ -47,11 +47,11 @@ pub struct NormalizedHeader { #[derive(Debug, Copy, Clone)] pub(crate) struct Header { - num_vals: u64, - min_value: u64, - max_value: u64, - gcd: Option, - codec_type: FastFieldCodecType, + pub num_vals: u64, + pub min_value: u64, + pub max_value: u64, + pub gcd: Option, + pub codec_type: FastFieldCodecType, } impl Header { @@ -124,36 +124,6 @@ impl BinarySerializable for Header { } } -/// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. -pub fn open( - mut bytes: OwnedBytes, -) -> io::Result>> { - let header = Header::deserialize(&mut bytes)?; - match header.codec_type { - FastFieldCodecType::Bitpacked => open_specific_codec::(bytes, &header), - FastFieldCodecType::Linear => open_specific_codec::(bytes, &header), - FastFieldCodecType::BlockwiseLinear => { - open_specific_codec::(bytes, &header) - } - } -} - -fn open_specific_codec( - bytes: OwnedBytes, - header: &Header, -) -> io::Result>> { - let normalized_header = header.normalized(); - let reader = C::open_from_bytes(bytes, normalized_header)?; - let min_value = header.min_value; - if let Some(gcd) = header.gcd { - let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val * gcd.get()); - Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping))) - } else { - let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val); - Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping))) - } -} - pub fn estimate( typed_column: impl Column, codec_type: FastFieldCodecType, @@ -217,8 +187,7 @@ fn detect_codec( // removing nan values for codecs with broken calculations, and max values which disables // codecs estimations.retain(|estimation| !estimation.0.is_nan() && estimation.0 != f32::MAX); - estimations - .sort_by(|(score_left, _), (score_right, _)| score_left.partial_cmp(&score_right).unwrap()); + estimations.sort_by(|(score_left, _), (score_right, _)| score_left.total_cmp(score_right)); Some(estimations.first()?.1) } diff --git a/src/fastfield/bytes/writer.rs b/src/fastfield/bytes/writer.rs index 7aee8ddb6..aa16a45e7 100644 --- a/src/fastfield/bytes/writer.rs +++ b/src/fastfield/bytes/writer.rs @@ -112,7 +112,6 @@ impl BytesFastFieldWriter { doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()> { // writing the offset index - // TODO FIXME No need to double the memory. { self.doc_index.push(self.vals.len() as u64); let col = VecColumn::from(&self.doc_index[..]); @@ -128,7 +127,7 @@ impl BytesFastFieldWriter { } } // writing the values themselves - let mut value_serializer = serializer.new_bytes_fast_field_with_idx(self.field, 1); + let mut value_serializer = serializer.new_bytes_fast_field(self.field); // the else could be removed, but this is faster (difference not benchmarked) if let Some(doc_id_map) = doc_id_map { for vals in self.get_ordered_values(Some(doc_id_map)) { diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index a8bd244ab..1b6e2787c 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -142,7 +142,7 @@ impl MultiValuedFastFieldWriter { pub fn serialize( mut self, serializer: &mut CompositeFastFieldSerializer, - mapping_opt: Option<&FnvHashMap>, + term_mapping_opt: Option<&FnvHashMap>, doc_id_map: Option<&DocIdMapping>, ) -> io::Result<()> { { @@ -163,7 +163,7 @@ impl MultiValuedFastFieldWriter { // Writing the values themselves. // TODO FIXME: Use less memory. let mut values: Vec = Vec::new(); - if let Some(mapping) = mapping_opt { + if let Some(term_mapping) = term_mapping_opt { if self.fast_field_type.is_facet() { let mut doc_vals: Vec = Vec::with_capacity(100); for vals in self.get_ordered_values(doc_id_map) { @@ -171,7 +171,7 @@ impl MultiValuedFastFieldWriter { doc_vals.clear(); let remapped_vals = vals .iter() - .map(|val| *mapping.get(val).expect("Missing term ordinal")); + .map(|val| *term_mapping.get(val).expect("Missing term ordinal")); doc_vals.extend(remapped_vals); doc_vals.sort_unstable(); for &val in &doc_vals { @@ -182,7 +182,7 @@ impl MultiValuedFastFieldWriter { for vals in self.get_ordered_values(doc_id_map) { let remapped_vals = vals .iter() - .map(|val| *mapping.get(val).expect("Missing term ordinal")); + .map(|val| *term_mapping.get(val).expect("Missing term ordinal")); for val in remapped_vals { values.push(val); } @@ -214,6 +214,19 @@ struct MultivalueStartIndexRandomSeeker<'a, C: Column> { seek_head: MultivalueStartIndexIter<'a, C>, seek_next_id: u64, } +impl<'a, C: Column> MultivalueStartIndexRandomSeeker<'a, C> { + fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self { + Self { + seek_head: MultivalueStartIndexIter { + column, + doc_id_map, + new_doc_id: 0, + offset: 0u64, + }, + seek_next_id: 0u64, + } + } +} impl<'a, C: Column> MultivalueStartIndex<'a, C> { pub fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self { @@ -222,20 +235,12 @@ impl<'a, C: Column> MultivalueStartIndex<'a, C> { column, doc_id_map, min_max_opt: Mutex::default(), - random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker { - seek_head: MultivalueStartIndexIter { - column, - doc_id_map, - new_doc_id: 0, - offset: 0u64, - }, - seek_next_id: 0u64, - }), + random_seeker: Mutex::new(MultivalueStartIndexRandomSeeker::new(column, doc_id_map)), } } fn minmax(&self) -> (u64, u64) { - if let Some((min, max)) = self.min_max_opt.lock().unwrap().clone() { + if let Some((min, max)) = *self.min_max_opt.lock().unwrap() { return (min, max); } let (min, max) = tantivy_bitpacker::minmax(self.iter()).unwrap_or((0u64, 0u64)); @@ -247,15 +252,8 @@ impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> { fn get_val(&self, idx: u64) -> u64 { let mut random_seeker_lock = self.random_seeker.lock().unwrap(); if random_seeker_lock.seek_next_id > idx { - *random_seeker_lock = MultivalueStartIndexRandomSeeker { - seek_head: MultivalueStartIndexIter { - column: self.column, - doc_id_map: self.doc_id_map, - new_doc_id: 0, - offset: 0u64, - }, - seek_next_id: 0u64, - }; + *random_seeker_lock = + MultivalueStartIndexRandomSeeker::new(self.column, self.doc_id_map); } let to_skip = idx - random_seeker_lock.seek_next_id; random_seeker_lock.seek_next_id = idx + 1; @@ -275,12 +273,7 @@ impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> { } fn iter<'b>(&'b self) -> Box + 'b> { - Box::new(MultivalueStartIndexIter { - column: &self.column, - doc_id_map: self.doc_id_map, - new_doc_id: 0, - offset: 0, - }) + Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map)) } } @@ -291,6 +284,17 @@ struct MultivalueStartIndexIter<'a, C: Column> { pub offset: u64, } +impl<'a, C: Column> MultivalueStartIndexIter<'a, C> { + fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self { + Self { + column, + doc_id_map, + new_doc_id: 0, + offset: 0, + } + } +} + impl<'a, C: Column> Iterator for MultivalueStartIndexIter<'a, C> { type Item = u64; diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index 5d88adf9e..307c9e060 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -1,6 +1,6 @@ use std::io::{self, Write}; -use common::{BinarySerializable, CountingWriter}; +use common::CountingWriter; pub use fastfield_codecs::{Column, FastFieldStats}; use fastfield_codecs::{FastFieldCodecType, MonotonicallyMappableToU64, ALL_CODEC_TYPES}; @@ -16,16 +16,14 @@ use crate::schema::Field; /// the serializer. /// The serializer expects to receive the following calls. /// -/// * `new_u64_fast_field(...)` -/// * `add_val(...)` -/// * `add_val(...)` -/// * `add_val(...)` +/// * `create_auto_detect_u64_fast_field(...)` +/// * `create_auto_detect_u64_fast_field(...)` /// * ... -/// * `close_field()` -/// * `new_u64_fast_field(...)` -/// * `add_val(...)` +/// * `let bytes_fastfield = new_bytes_fast_field(...)` +/// * `bytes_fastfield.write_all(...)` +/// * `bytes_fastfield.write_all(...)` +/// * `bytes_fastfield.flush()` /// * ... -/// * `close_field()` /// * `close()` pub struct CompositeFastFieldSerializer { composite_write: CompositeWrite, @@ -33,17 +31,16 @@ pub struct CompositeFastFieldSerializer { } impl CompositeFastFieldSerializer { - /// Constructor + /// New fast field serializer with all codec types pub fn from_write(write: WritePtr) -> io::Result { Self::from_write_with_codec(write, &ALL_CODEC_TYPES) } - /// Constructor + /// New fast field serializer with allowed codec types pub fn from_write_with_codec( write: WritePtr, codec_types: &[FastFieldCodecType], ) -> io::Result { - // just making room for the pointer to header. let composite_write = CompositeWrite::wrap(write); Ok(CompositeFastFieldSerializer { composite_write, @@ -61,16 +58,6 @@ impl CompositeFastFieldSerializer { self.create_auto_detect_u64_fast_field_with_idx(field, fastfield_accessor, 0) } - /// Serialize data into a new u64 fast field. The best compression codec will be chosen - /// automatically. - pub fn write_header( - field_write: &mut W, - codec_type: FastFieldCodecType, - ) -> io::Result<()> { - codec_type.to_code().serialize(field_write)?; - Ok(()) - } - /// Serialize data into a new u64 fast field. The best compression codec will be chosen /// automatically. pub fn create_auto_detect_u64_fast_field_with_idx( @@ -84,13 +71,14 @@ impl CompositeFastFieldSerializer { Ok(()) } - /// Start serializing a new [u8] fast field - pub fn new_bytes_fast_field_with_idx( + /// Start serializing a new [u8] fast field. + /// + /// The bytes will be stored as is, no compression will be applied. + pub fn new_bytes_fast_field( &mut self, field: Field, - idx: usize, ) -> FastBytesFieldSerializer<'_, CountingWriter> { - let field_write = self.composite_write.for_field_with_idx(field, idx); + let field_write = self.composite_write.for_field_with_idx(field, 1); FastBytesFieldSerializer { write: field_write } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 016916778..335b3ea88 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -130,14 +130,6 @@ impl TermOrdinalMapping { fn get_segment(&self, segment_ord: usize) -> &[TermOrdinal] { &(self.per_segment_new_term_ordinals[segment_ord])[..] } - - fn max_term_ord(&self) -> TermOrdinal { - self.per_segment_new_term_ordinals - .iter() - .flat_map(|term_ordinals| term_ordinals.iter().max().cloned()) - .max() - .unwrap_or_default() - } } struct DeltaComputer { @@ -814,7 +806,7 @@ impl IndexMerger { doc_id_mapping, &reader_and_field_accessors, )?; - let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1); + let mut serialize_vals = fast_field_serializer.new_bytes_fast_field(field); for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() { let bytes_reader = &reader_and_field_accessors[old_doc_addr.segment_ord as usize].1;