From c39c2d79dab2593acec261c911c0a6e87ff06447 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 22 Aug 2022 17:00:12 +0200 Subject: [PATCH] move iter to FastFieldDataAccess --- fastfield_codecs/benches/bench.rs | 18 +------ fastfield_codecs/src/bitpacked.rs | 4 +- fastfield_codecs/src/lib.rs | 26 +++++----- fastfield_codecs/src/linearinterpol.rs | 6 +-- fastfield_codecs/src/main.rs | 9 +--- fastfield_codecs/src/multilinearinterpol.rs | 4 +- src/fastfield/serializer/mod.rs | 55 ++++++-------------- src/fastfield/writer.rs | 34 +++++-------- src/indexer/merger.rs | 56 +++++++++++---------- 9 files changed, 77 insertions(+), 135 deletions(-) diff --git a/fastfield_codecs/benches/bench.rs b/fastfield_codecs/benches/bench.rs index 768037d00..e81ccf99a 100644 --- a/fastfield_codecs/benches/bench.rs +++ b/fastfield_codecs/benches/bench.rs @@ -34,14 +34,7 @@ mod tests { data: &[u64], ) { let mut bytes = vec![]; - S::serialize( - &mut bytes, - &data, - stats_from_vec(data), - data.iter().cloned(), - data.iter().cloned(), - ) - .unwrap(); + S::serialize(&mut bytes, &data, stats_from_vec(data)).unwrap(); let reader = R::open_from_bytes(&bytes).unwrap(); b.iter(|| { for pos in value_iter() { @@ -52,14 +45,7 @@ mod tests { fn bench_create(b: &mut Bencher, data: &[u64]) { let mut bytes = vec![]; b.iter(|| { - S::serialize( - &mut bytes, - &data, - stats_from_vec(data), - data.iter().cloned(), - data.iter().cloned(), - ) - .unwrap(); + S::serialize(&mut bytes, &data, stats_from_vec(data)).unwrap(); }); } diff --git a/fastfield_codecs/src/bitpacked.rs b/fastfield_codecs/src/bitpacked.rs index a07416998..9a79f4665 100644 --- a/fastfield_codecs/src/bitpacked.rs +++ b/fastfield_codecs/src/bitpacked.rs @@ -113,13 +113,11 @@ impl FastFieldCodecSerializer for BitpackedFastFieldSerializer { write: &mut impl Write, _fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, - data_iter: impl Iterator, - _data_iter1: impl Iterator, ) -> io::Result<()> { let mut serializer = BitpackedFastFieldSerializerLegacy::open(write, stats.min_value, stats.max_value)?; - for val in data_iter { + for val in _fastfield_accessor.iter() { serializer.add_val(val)?; } serializer.close_field()?; diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index be17a1971..c72de4025 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -38,14 +38,13 @@ pub trait FastFieldCodecSerializer { fn estimate(fastfield_accessor: &impl FastFieldDataAccess, stats: FastFieldStats) -> f32; /// Serializes the data using the serializer into write. - /// There are multiple iterators, in case the codec needs to read the data multiple times. - /// The iterators should be preferred over using fastfield_accessor for performance reasons. + /// + /// The fastfield_accessor iterator should be preferred over using fastfield_accessor for + /// performance reasons. fn serialize( write: &mut impl Write, fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, - data_iter: impl Iterator, - data_iter1: impl Iterator, ) -> io::Result<()>; } @@ -60,6 +59,9 @@ pub trait FastFieldDataAccess { /// /// May panic if `position` is greater than the index. fn get_val(&self, position: u64) -> u64; + + /// Returns a iterator over the data + fn iter<'a>(&'a self) -> Box + 'a>; } #[derive(Debug, Clone)] @@ -74,12 +76,19 @@ impl<'a> FastFieldDataAccess for &'a [u64] { fn get_val(&self, position: u64) -> u64 { self[position as usize] } + + fn iter<'b>(&'b self) -> Box + 'b> { + Box::new((self as &[u64]).iter().cloned()) + } } impl FastFieldDataAccess for Vec { fn get_val(&self, position: u64) -> u64 { self[position as usize] } + fn iter<'b>(&'b self) -> Box + 'b> { + Box::new((&self as &[u64]).iter().cloned()) + } } #[cfg(test)] @@ -99,14 +108,7 @@ mod tests { } let estimation = S::estimate(&data, crate::tests::stats_from_vec(data)); let mut out: Vec = Vec::new(); - S::serialize( - &mut out, - &data, - crate::tests::stats_from_vec(data), - data.iter().cloned(), - data.iter().cloned(), - ) - .unwrap(); + S::serialize(&mut out, &data, crate::tests::stats_from_vec(data)).unwrap(); let actual_compression = out.len() as f32 / (data.len() as f32 * 8.0); diff --git a/fastfield_codecs/src/linearinterpol.rs b/fastfield_codecs/src/linearinterpol.rs index a8ea95672..6f5f76a80 100644 --- a/fastfield_codecs/src/linearinterpol.rs +++ b/fastfield_codecs/src/linearinterpol.rs @@ -116,8 +116,6 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer { write: &mut impl Write, fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, - data_iter: impl Iterator, - data_iter1: impl Iterator, ) -> io::Result<()> { assert!(stats.min_value <= stats.max_value); @@ -127,7 +125,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer { // calculate offset to ensure all values are positive let mut offset = 0; let mut rel_positive_max = 0; - for (pos, actual_value) in data_iter1.enumerate() { + for (pos, actual_value) in fastfield_accessor.iter().enumerate() { let calculated_value = get_calculated_value(first_val, pos as u64, slope); if calculated_value > actual_value { // negative value we need to apply an offset @@ -145,7 +143,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer { let num_bits = compute_num_bits(relative_max_value); let mut bit_packer = BitPacker::new(); - for (pos, val) in data_iter.enumerate() { + for (pos, val) in fastfield_accessor.iter().enumerate() { let calculated_value = get_calculated_value(first_val, pos as u64, slope); let diff = (val + offset) - calculated_value; bit_packer.write(diff, num_bits, write)?; diff --git a/fastfield_codecs/src/main.rs b/fastfield_codecs/src/main.rs index 18fef5c60..fdb796c80 100644 --- a/fastfield_codecs/src/main.rs +++ b/fastfield_codecs/src/main.rs @@ -100,14 +100,7 @@ pub fn serialize_with_codec( } let estimation = S::estimate(&data, stats_from_vec(data)); let mut out = vec![]; - S::serialize( - &mut out, - &data, - stats_from_vec(data), - data.iter().cloned(), - data.iter().cloned(), - ) - .unwrap(); + S::serialize(&mut out, &data, stats_from_vec(data)).unwrap(); let actual_compression = out.len() as f32 / (data.len() * 8) as f32; (true, estimation, actual_compression, S::NAME) diff --git a/fastfield_codecs/src/multilinearinterpol.rs b/fastfield_codecs/src/multilinearinterpol.rs index 26b7c9e88..b701974ee 100644 --- a/fastfield_codecs/src/multilinearinterpol.rs +++ b/fastfield_codecs/src/multilinearinterpol.rs @@ -198,8 +198,6 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer { write: &mut impl Write, fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, - data_iter: impl Iterator, - _data_iter1: impl Iterator, ) -> io::Result<()> { assert!(stats.min_value <= stats.max_value); @@ -218,7 +216,7 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer { // Since we potentially apply multiple passes over the data, the data is cached. // Multiple iteration can be expensive (merge with index sorting can add lot of overhead per // iteration) - let data = data_iter.collect::>(); + let data = fastfield_accessor.iter().collect::>(); //// let's split this into chunks of CHUNK_SIZE for data_pos in (0..data.len() as u64).step_by(CHUNK_SIZE as usize).skip(1) { diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index d4765f2a0..c4883f97e 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -98,24 +98,13 @@ impl CompositeFastFieldSerializer { /// Serialize data into a new u64 fast field. The best compression codec will be chosen /// automatically. - pub fn create_auto_detect_u64_fast_field( + pub fn create_auto_detect_u64_fast_field( &mut self, field: Field, stats: FastFieldStats, fastfield_accessor: impl FastFieldDataAccess, - iter_gen: F, - ) -> io::Result<()> - where - F: Fn() -> I, - I: Iterator, - { - self.create_auto_detect_u64_fast_field_with_idx( - field, - stats, - fastfield_accessor, - iter_gen, - 0, - ) + ) -> io::Result<()> { + self.create_auto_detect_u64_fast_field_with_idx(field, stats, fastfield_accessor, 0) } /// Serialize data into a new u64 fast field. The best compression codec will be chosen @@ -128,20 +117,15 @@ impl CompositeFastFieldSerializer { /// Serialize data into a new u64 fast field. The best compression codec will be chosen /// automatically. - pub fn create_auto_detect_u64_fast_field_with_idx( + pub fn create_auto_detect_u64_fast_field_with_idx( &mut self, field: Field, stats: FastFieldStats, fastfield_accessor: impl FastFieldDataAccess, - iter_gen: F, idx: usize, - ) -> io::Result<()> - where - F: Fn() -> I, - I: Iterator, - { + ) -> io::Result<()> { let field_write = self.composite_write.for_field_with_idx(field, idx); - let gcd = find_gcd(iter_gen().map(|val| val - stats.min_value)) + let gcd = find_gcd(fastfield_accessor.iter().map(|val| val - stats.min_value)) .map(NonZeroU64::get) .unwrap_or(GCD_DEFAULT); @@ -152,8 +136,6 @@ impl CompositeFastFieldSerializer { field_write, stats, fastfield_accessor, - iter_gen(), - iter_gen(), ); } @@ -167,6 +149,13 @@ impl CompositeFastFieldSerializer { fn get_val(&self, position: u64) -> u64 { (self.fastfield_accessor.get_val(position) - self.min_value) / self.gcd } + fn iter<'b>(&'b self) -> Box + 'b> { + Box::new( + self.fastfield_accessor + .iter() + .map(|val| (val - self.min_value) / self.gcd), + ) + } } let fastfield_accessor = GCDWrappedFFAccess { @@ -181,16 +170,12 @@ impl CompositeFastFieldSerializer { max_value: (stats.max_value - stats.min_value) / gcd, num_vals: stats.num_vals, }; - let iter1 = iter_gen().map(|val| (val - min_value) / gcd); - let iter2 = iter_gen().map(|val| (val - min_value) / gcd); Self::create_auto_detect_u64_fast_field_with_idx_gcd( self.codec_enable_checker.clone(), field, field_write, stats, fastfield_accessor, - iter1, - iter2, )?; write_gcd_header(field_write, min_value, gcd)?; Ok(()) @@ -204,8 +189,6 @@ impl CompositeFastFieldSerializer { field_write: &mut CountingWriter, stats: FastFieldStats, fastfield_accessor: impl FastFieldDataAccess, - iter1: impl Iterator, - iter2: impl Iterator, ) -> io::Result<()> { let mut estimations = vec![]; @@ -250,21 +233,13 @@ impl CompositeFastFieldSerializer { Self::write_header(field_write, id)?; match name { BitpackedFastFieldSerializer::NAME => { - BitpackedFastFieldSerializer::serialize( - field_write, - &fastfield_accessor, - stats, - iter1, - iter2, - )?; + BitpackedFastFieldSerializer::serialize(field_write, &fastfield_accessor, stats)?; } LinearInterpolFastFieldSerializer::NAME => { LinearInterpolFastFieldSerializer::serialize( field_write, &fastfield_accessor, stats, - iter1, - iter2, )?; } MultiLinearInterpolFastFieldSerializer::NAME => { @@ -272,8 +247,6 @@ impl CompositeFastFieldSerializer { field_write, &fastfield_accessor, stats, - iter1, - iter2, )?; } _ => { diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index a19eda59f..d45722e16 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -369,28 +369,8 @@ impl IntFastFieldWriter { num_vals: self.val_count as u64, }; - if let Some(doc_id_map) = doc_id_map { - let iter_gen = || { - doc_id_map - .iter_old_doc_ids() - .map(|doc_id| self.vals.get(doc_id as usize)) - }; - serializer.create_auto_detect_u64_fast_field( - self.field, - stats, - fastfield_accessor, - iter_gen, - )?; - } else { - let iter_gen = || self.vals.iter(); + serializer.create_auto_detect_u64_fast_field(self.field, stats, fastfield_accessor)?; - serializer.create_auto_detect_u64_fast_field( - self.field, - stats, - fastfield_accessor, - iter_gen, - )?; - }; Ok(()) } } @@ -419,4 +399,16 @@ impl<'map, 'bitp> FastFieldDataAccess for WriterFastFieldAccessProvider<'map, 'b self.vals.get(doc as usize) } } + + fn iter<'a>(&'a self) -> Box + 'a> { + if let Some(doc_id_map) = self.doc_id_map { + Box::new( + doc_id_map + .iter_old_doc_ids() + .map(|doc_id| self.vals.get(doc_id as usize)), + ) + } else { + Box::new(self.vals.iter()) + } + } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index c6662398c..fa550df0b 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -134,7 +134,7 @@ impl TermOrdinalMapping { fn max_term_ord(&self) -> TermOrdinal { self.per_segment_new_term_ordinals .iter() - .flat_map(|term_ordinals| term_ordinals.iter().cloned().max()) + .flat_map(|term_ordinals| term_ordinals.iter().max()) .max() .unwrap_or_default() } @@ -383,22 +383,27 @@ impl IndexMerger { } = self.doc_id_mapping.get_old_doc_addr(doc as u32); self.fast_field_readers[segment_ord as usize].get(doc_id) } + + fn iter<'b>(&'b self) -> Box + 'b> { + Box::new( + self.doc_id_mapping + .iter_old_doc_addrs() + .map(|old_doc_addr| { + let fast_field_reader = + &self.fast_field_readers[old_doc_addr.segment_ord as usize]; + fast_field_reader.get(old_doc_addr.doc_id) + }), + ) + } } let fastfield_accessor = SortedDocIdFieldAccessProvider { doc_id_mapping, fast_field_readers: &fast_field_readers, }; - let iter_gen = || { - doc_id_mapping.iter_old_doc_addrs().map(|old_doc_addr| { - let fast_field_reader = &fast_field_readers[old_doc_addr.segment_ord as usize]; - fast_field_reader.get(old_doc_addr.doc_id) - }) - }; fast_field_serializer.create_auto_detect_u64_fast_field( field, stats, fastfield_accessor, - iter_gen, )?; Ok(()) @@ -559,13 +564,7 @@ impl IndexMerger { } offsets.push(offset); - let iter_gen = || offsets.iter().cloned(); - fast_field_serializer.create_auto_detect_u64_fast_field( - field, - stats, - &offsets[..], - iter_gen, - )?; + fast_field_serializer.create_auto_detect_u64_fast_field(field, stats, &offsets[..])?; Ok(offsets) } /// Returns the fastfield index (index for the data, not the data). @@ -746,7 +745,7 @@ impl IndexMerger { let new_doc_id: DocId = self.offsets .iter() - .position(|&offset| offset > pos) + .position(|offset| offset > pos) .expect("pos is out of bounds") as DocId - 1u32; @@ -764,27 +763,30 @@ impl IndexMerger { vals[pos_in_values as usize] } + + fn iter<'b>(&'b self) -> Box + 'b> { + Box::new( + self.doc_id_mapping + .iter_old_doc_addrs() + .flat_map(|old_doc_addr| { + let ff_reader = + &self.fast_field_readers[old_doc_addr.segment_ord as usize]; + let mut vals = Vec::new(); + ff_reader.get_vals(old_doc_addr.doc_id, &mut vals); + vals.into_iter() + }), + ) + } } let fastfield_accessor = SortedDocIdMultiValueAccessProvider { doc_id_mapping, fast_field_readers: &ff_readers, offsets, }; - let iter_gen = || { - doc_id_mapping - .iter_old_doc_addrs() - .flat_map(|old_doc_addr| { - let ff_reader = &ff_readers[old_doc_addr.segment_ord as usize]; - let mut vals = Vec::new(); - ff_reader.get_vals(old_doc_addr.doc_id, &mut vals); - vals.into_iter() - }) - }; fast_field_serializer.create_auto_detect_u64_fast_field_with_idx( field, stats, fastfield_accessor, - iter_gen, 1, )?;