diff --git a/fastfield_codecs/src/bitpacked.rs b/fastfield_codecs/src/bitpacked.rs index b77591279..c2549841b 100644 --- a/fastfield_codecs/src/bitpacked.rs +++ b/fastfield_codecs/src/bitpacked.rs @@ -68,7 +68,9 @@ impl FastFieldCodec for BitpackedCodec { assert_eq!(column.min_value(), 0u64); let num_bits = compute_num_bits(column.max_value()); let mut bit_packer = BitPacker::new(); - for val in column.iter() { + let mut reader = column.reader(); + while reader.advance() { + let val = reader.get(); bit_packer.write(val, num_bits, write)?; } bit_packer.close(write)?; diff --git a/fastfield_codecs/src/blockwise_linear.rs b/fastfield_codecs/src/blockwise_linear.rs index f6053d097..04e638354 100644 --- a/fastfield_codecs/src/blockwise_linear.rs +++ b/fastfield_codecs/src/blockwise_linear.rs @@ -75,7 +75,9 @@ impl FastFieldCodec for BlockwiseLinearCodec { if column.num_vals() < 10 * CHUNK_SIZE as u64 { return None; } - let mut first_chunk: Vec = column.iter().take(CHUNK_SIZE as usize).collect(); + let mut first_chunk: Vec = crate::iter_from_reader(column.reader()) + .take(CHUNK_SIZE as usize) + .collect(); let line = Line::train(&VecColumn::from(&first_chunk)); for (i, buffer_val) in first_chunk.iter_mut().enumerate() { let interpolated_val = line.eval(i as u64); @@ -109,7 +111,7 @@ impl FastFieldCodec for BlockwiseLinearCodec { let num_blocks = compute_num_blocks(num_vals); let mut blocks = Vec::with_capacity(num_blocks); - let mut vals = column.iter(); + let mut vals = crate::iter_from_reader(column.reader()); let mut bit_packer = BitPacker::new(); diff --git a/fastfield_codecs/src/column.rs b/fastfield_codecs/src/column.rs index 62311546e..ebe8d49d5 100644 --- a/fastfield_codecs/src/column.rs +++ b/fastfield_codecs/src/column.rs @@ -3,10 +3,11 @@ use std::ops::RangeInclusive; use tantivy_bitpacker::minmax; -pub trait Column: Send + Sync { +pub trait Column: Send + Sync { /// Return a `ColumnReader`. fn reader(&self) -> Box + '_> { - Box::new(ColumnReaderAdapter { column: self }) + // Box::new(ColumnReaderAdapter { column: self, idx: 0, }) + Box::new(ColumnReaderAdapter::from(self)) } /// Return the value associated to the given idx. @@ -65,39 +66,70 @@ pub trait Column: Send + Sync { fn max_value(&self) -> T; fn num_vals(&self) -> u64; - - /// Returns a iterator over the data - /// - /// TODO get rid of `.iter()` and extend ColumnReader instead. - fn iter(&self) -> Box + '_> { - Box::new((0..self.num_vals()).map(|idx| self.get_val(idx))) - } } /// `ColumnReader` makes it possible to read forward through a column. -/// -/// TODO add methods to make it possible to scan the column and replace `.iter()` pub trait ColumnReader { - fn seek(&mut self, idx: u64) -> T; + /// Advance the reader to the target_idx. + /// + /// After a successful call to seek, + /// `.get()` should returns `column.get_val(target_idx)`. + fn seek(&mut self, target_idx: u64) -> T; + + fn advance(&mut self) -> bool; + + /// Get the current value without advancing the reader + fn get(&self) -> T; } -pub(crate) struct ColumnReaderAdapter<'a, C: ?Sized> { +pub fn iter_from_reader<'a, T: 'static>( + mut column_reader: Box + 'a>, +) -> impl Iterator + 'a { + std::iter::from_fn(move || { + if !column_reader.advance() { + return None; + } + Some(column_reader.get()) + }) +} + +pub(crate) struct ColumnReaderAdapter<'a, C: ?Sized, T> { column: &'a C, + idx: u64, + len: u64, + _phantom: PhantomData, } -impl<'a, C: ?Sized> From<&'a C> for ColumnReaderAdapter<'a, C> { +impl<'a, C: Column + ?Sized, T: Copy + PartialOrd + 'static> From<&'a C> + for ColumnReaderAdapter<'a, C, T> +{ fn from(column: &'a C) -> Self { - ColumnReaderAdapter { column } + ColumnReaderAdapter { + column, + idx: u64::MAX, + len: column.num_vals(), + _phantom: PhantomData, + } } } -impl<'a, T, C: ?Sized> ColumnReader for ColumnReaderAdapter<'a, C> +impl<'a, T, C: ?Sized> ColumnReader for ColumnReaderAdapter<'a, C, T> where C: Column, - T: PartialOrd, + T: PartialOrd + Copy + 'static, { fn seek(&mut self, idx: u64) -> T { - self.column.get_val(idx) + self.idx = idx; + self.get() + } + + fn advance(&mut self) -> bool { + self.idx = self.idx.wrapping_add(1); + self.idx < self.len + } + + fn get(&self) -> T { + self.column.get_val(self.idx) } } @@ -107,7 +139,9 @@ pub struct VecColumn<'a, T = u64> { max_value: T, } -impl<'a, C: Column, T: Copy + PartialOrd> Column for &'a C { +impl<'a, C: Column, T> Column for &'a C +where T: Copy + PartialOrd + 'static +{ fn get_val(&self, idx: u64) -> T { (*self).get_val(idx) } @@ -128,24 +162,16 @@ impl<'a, C: Column, T: Copy + PartialOrd> Column for &'a C { (*self).reader() } - fn iter(&self) -> Box + '_> { - (*self).iter() - } - fn get_range(&self, start: u64, output: &mut [T]) { (*self).get_range(start, output) } } -impl<'a, T: Copy + PartialOrd + Send + Sync> Column for VecColumn<'a, T> { +impl<'a, T: Copy + PartialOrd + Send + Sync + 'static> Column for VecColumn<'a, T> { fn get_val(&self, position: u64) -> T { self.values[position as usize] } - fn iter(&self) -> Box + '_> { - Box::new(self.values.iter().copied()) - } - fn min_value(&self) -> T { self.min_value } @@ -184,15 +210,15 @@ struct MonotonicMappingColumn { } /// Creates a view of a column transformed by a monotonic mapping. -pub fn monotonic_map_column( +pub fn monotonic_map_column( from_column: C, monotonic_mapping: T, ) -> impl Column where C: Column, T: Fn(Input) -> Output + Send + Sync, - Input: Send + Sync, - Output: Send + Sync, + Input: Send + Sync + 'static, + Output: Send + Sync + 'static, { MonotonicMappingColumn { from_column, @@ -201,13 +227,13 @@ where } } -impl Column +impl Column for MonotonicMappingColumn where C: Column, T: Fn(Input) -> Output + Send + Sync, - Input: Send + Sync, - Output: Send + Sync, + Input: Send + Sync + 'static, + Output: Send + Sync + 'static, { #[inline] fn get_val(&self, idx: u64) -> Output { @@ -229,13 +255,9 @@ where self.from_column.num_vals() } - fn iter(&self) -> Box + '_> { - Box::new(self.from_column.iter().map(&self.monotonic_mapping)) - } - fn reader(&self) -> Box + '_> { Box::new(MonotonicMappingColumnReader { - col_reader: ColumnReaderAdapter::from(&self.from_column), + col_reader: self.from_column.reader(), monotonic_mapping: &self.monotonic_mapping, intermdiary_type: PhantomData, }) @@ -245,22 +267,30 @@ where // and we do not have any specialized implementation anyway. } -struct MonotonicMappingColumnReader<'a, ColR, Transform, U> { - col_reader: ColR, +struct MonotonicMappingColumnReader<'a, Transform, U> { + col_reader: Box + 'a>, monotonic_mapping: &'a Transform, intermdiary_type: PhantomData, } -impl<'a, U, V, ColR, Transform> ColumnReader - for MonotonicMappingColumnReader<'a, ColR, Transform, U> +impl<'a, U, V, Transform> ColumnReader for MonotonicMappingColumnReader<'a, Transform, U> where - ColR: ColumnReader + 'a, + U: Copy, + V: Copy, Transform: Fn(U) -> V, { fn seek(&mut self, idx: u64) -> V { let intermediary_value = self.col_reader.seek(idx); (*self.monotonic_mapping)(intermediary_value) } + + fn advance(&mut self) -> bool { + self.col_reader.advance() + } + + fn get(&self) -> V { + (*self.monotonic_mapping)(self.col_reader.get()) + } } pub struct IterColumn(T); @@ -276,7 +306,7 @@ where T: Iterator + Clone + ExactSizeIterator impl Column for IterColumn where T: Iterator + Clone + ExactSizeIterator + Send + Sync, - T::Item: PartialOrd, + T::Item: PartialOrd + Copy + 'static, { fn get_val(&self, idx: u64) -> T::Item { self.0.clone().nth(idx as usize).unwrap() @@ -293,10 +323,6 @@ where fn num_vals(&self) -> u64 { self.0.len() as u64 } - - fn iter(&self) -> Box + '_> { - Box::new(self.0.clone()) - } } #[cfg(test)] @@ -329,7 +355,7 @@ mod tests { let vals: Vec = (-1..99).map(i64::to_u64).collect(); let col = VecColumn::from(&vals); let mapped = monotonic_map_column(col, |el| i64::from_u64(el) * 10i64); - let val_i64s: Vec = mapped.iter().collect(); + let val_i64s: Vec = crate::iter_from_reader(mapped.reader()).collect(); for i in 0..100 { assert_eq!(val_i64s[i as usize], mapped.get_val(i)); } @@ -343,7 +369,7 @@ mod tests { assert_eq!(mapped.min_value(), -10i64); assert_eq!(mapped.max_value(), 980i64); assert_eq!(mapped.num_vals(), 100); - let val_i64s: Vec = mapped.iter().collect(); + let val_i64s: Vec = crate::iter_from_reader(mapped.reader()).collect(); assert_eq!(val_i64s.len(), 100); for i in 0..100 { assert_eq!(val_i64s[i as usize], mapped.get_val(i)); diff --git a/fastfield_codecs/src/compact_space/mod.rs b/fastfield_codecs/src/compact_space/mod.rs index 389bccf6e..d05773579 100644 --- a/fastfield_codecs/src/compact_space/mod.rs +++ b/fastfield_codecs/src/compact_space/mod.rs @@ -22,7 +22,7 @@ use ownedbytes::OwnedBytes; use tantivy_bitpacker::{self, BitPacker, BitUnpacker}; use crate::compact_space::build_compact_space::get_compact_space; -use crate::Column; +use crate::{iter_from_reader, Column, ColumnReader}; mod blank_range; mod build_compact_space; @@ -173,11 +173,14 @@ impl CompactSpaceCompressor { /// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals. pub fn train_from(column: &impl Column) -> Self { let mut values_sorted = BTreeSet::new(); - values_sorted.extend(column.iter()); + let total_num_values = column.num_vals(); + values_sorted.extend(iter_from_reader(column.reader())); + let compact_space = get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS); + let amplitude_compact_space = compact_space.amplitude_compact_space(); assert!( @@ -218,11 +221,12 @@ impl CompactSpaceCompressor { pub fn compress_into( self, - vals: impl Iterator, + mut vals: Box + '_>, write: &mut impl Write, ) -> io::Result<()> { let mut bitpacker = BitPacker::default(); - for val in vals { + while vals.advance() { + let val = vals.get(); let compact = self .params .compact_space @@ -300,13 +304,13 @@ impl Column for CompactSpaceDecompressor { self.params.num_vals } - #[inline] - fn iter(&self) -> Box + '_> { - Box::new(self.iter()) - } fn get_between_vals(&self, range: RangeInclusive) -> Vec { self.get_between_vals(range) } + + fn reader(&self) -> Box + '_> { + Box::new(self.specialized_reader()) + } } impl CompactSpaceDecompressor { @@ -410,18 +414,13 @@ impl CompactSpaceDecompressor { positions } - #[inline] - fn iter_compact(&self) -> impl Iterator + '_ { - (0..self.params.num_vals) - .map(move |idx| self.params.bit_unpacker.get(idx as u64, &self.data) as u64) - } - - #[inline] - fn iter(&self) -> impl Iterator + '_ { - // TODO: Performance. It would be better to iterate on the ranges and check existence via - // the bit_unpacker. - self.iter_compact() - .map(|compact| self.compact_to_u128(compact)) + fn specialized_reader(&self) -> CompactSpaceReader<'_> { + CompactSpaceReader { + data: self.data.as_slice(), + params: &self.params, + idx: 0u64, + len: self.params.num_vals, + } } #[inline] @@ -439,6 +438,30 @@ impl CompactSpaceDecompressor { } } +pub struct CompactSpaceReader<'a> { + data: &'a [u8], + params: &'a IPCodecParams, + idx: u64, + len: u64, +} + +impl<'a> ColumnReader for CompactSpaceReader<'a> { + fn seek(&mut self, target_idx: u64) -> u128 { + self.idx = target_idx; + self.get() + } + + fn advance(&mut self) -> bool { + self.idx = self.idx.wrapping_add(1); + self.idx < self.len + } + + fn get(&self) -> u128 { + let compact_code = self.params.bit_unpacker.get(self.idx, self.data); + self.params.compact_space.compact_to_u128(compact_code) + } +} + #[cfg(test)] mod tests { diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index a55847e8a..05afa730b 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -29,7 +29,7 @@ mod serialize; use self::bitpacked::BitpackedCodec; use self::blockwise_linear::BlockwiseLinearCodec; -pub use self::column::{monotonic_map_column, Column, ColumnReader, VecColumn}; +pub use self::column::{iter_from_reader, monotonic_map_column, Column, ColumnReader, VecColumn}; use self::linear::LinearCodec; pub use self::monotonic_mapping::MonotonicallyMappableToU64; pub use self::serialize::{ diff --git a/fastfield_codecs/src/linear.rs b/fastfield_codecs/src/linear.rs index 722135357..7c46e2733 100644 --- a/fastfield_codecs/src/linear.rs +++ b/fastfield_codecs/src/linear.rs @@ -89,8 +89,7 @@ impl FastFieldCodec for LinearCodec { assert_eq!(column.min_value(), 0); let line = Line::train(column); - let max_offset_from_line = column - .iter() + let max_offset_from_line = crate::iter_from_reader(column.reader()) .enumerate() .map(|(pos, actual_value)| { let calculated_value = line.eval(pos as u64); @@ -107,7 +106,12 @@ impl FastFieldCodec for LinearCodec { linear_params.serialize(write)?; let mut bit_packer = BitPacker::new(); - for (pos, actual_value) in column.iter().enumerate() { + let mut col_reader = column.reader(); + for pos in 0.. { + if !col_reader.advance() { + break; + } + let actual_value = col_reader.get(); let calculated_value = line.eval(pos as u64); let offset = actual_value.wrapping_sub(calculated_value); bit_packer.write(offset, num_bits, write)?; diff --git a/fastfield_codecs/src/serialize.rs b/fastfield_codecs/src/serialize.rs index 92f55f5d0..de6365178 100644 --- a/fastfield_codecs/src/serialize.rs +++ b/fastfield_codecs/src/serialize.rs @@ -31,8 +31,8 @@ use crate::blockwise_linear::BlockwiseLinearCodec; use crate::compact_space::CompactSpaceCompressor; use crate::linear::LinearCodec; use crate::{ - monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64, - VecColumn, ALL_CODEC_TYPES, + iter_from_reader, monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, + MonotonicallyMappableToU64, VecColumn, ALL_CODEC_TYPES, }; /// The normalized header gives some parameters after applying the following @@ -79,8 +79,9 @@ impl Header { let num_vals = column.num_vals(); let min_value = column.min_value(); let max_value = column.max_value(); - let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value)) - .filter(|gcd| gcd.get() > 1u64); + let gcd = + crate::gcd::find_gcd(iter_from_reader(column.reader()).map(|val| val - min_value)) + .filter(|gcd| gcd.get() > 1u64); let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64)); let shifted_column = monotonic_map_column(&column, |val| divider.divide(val - min_value)); let codec_type = detect_codec(shifted_column, codecs)?; @@ -131,7 +132,7 @@ pub fn estimate( ) -> Option { let column = monotonic_map_column(typed_column, T::to_u64); let min_value = column.min_value(); - let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value)) + let gcd = crate::gcd::find_gcd(iter_from_reader(column.reader()).map(|val| val - min_value)) .filter(|gcd| gcd.get() > 1u64); let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64)); let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value)); @@ -149,7 +150,7 @@ pub fn serialize_u128( // TODO write header, to later support more codecs let compressor = CompactSpaceCompressor::train_from(&typed_column); compressor - .compress_into(typed_column.iter(), output) + .compress_into(typed_column.reader(), output) .unwrap(); Ok(()) @@ -240,7 +241,8 @@ mod tests { #[test] fn test_serialize_deserialize() { let original = [1u64, 5u64, 10u64]; - let restored: Vec = serialize_and_load(&original[..]).iter().collect(); + let restored: Vec = + crate::iter_from_reader(serialize_and_load(&original[..]).reader()).collect(); assert_eq!(&restored, &original[..]); } diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index ad4c0e2f5..b908b1161 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -41,6 +41,7 @@ mod error; mod facet_reader; mod multivalued; mod readers; +mod remapped_column; mod serializer; mod writer; @@ -424,7 +425,7 @@ mod tests { permutation } - fn test_intfastfield_permutation_with_data(permutation: Vec) -> crate::Result<()> { + fn test_intfastfield_permutation_with_data(permutation: &[u64]) -> crate::Result<()> { let path = Path::new("test"); let n = permutation.len(); let directory = RamDirectory::create(); @@ -432,7 +433,7 @@ mod tests { let write: WritePtr = directory.open_write(Path::new("test"))?; let mut serializer = CompositeFastFieldSerializer::from_write(write)?; let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA); - for &x in &permutation { + for &x in permutation { fast_field_writers.add_document(&doc!(*FIELD=>x)); } fast_field_writers.serialize(&mut serializer, &HashMap::new(), None)?; @@ -446,7 +447,6 @@ mod tests { .unwrap() .read_bytes()?; let fast_field_reader = open::(data)?; - for a in 0..n { assert_eq!(fast_field_reader.get_val(a as u64), permutation[a as usize]); } @@ -455,16 +455,23 @@ mod tests { } #[test] - fn test_intfastfield_permutation_gcd() -> crate::Result<()> { - let permutation = generate_permutation_gcd(); - test_intfastfield_permutation_with_data(permutation)?; + fn test_intfastfield_simple() -> crate::Result<()> { + let permutation = &[1, 2, 3]; + test_intfastfield_permutation_with_data(&permutation[..])?; Ok(()) } #[test] fn test_intfastfield_permutation() -> crate::Result<()> { let permutation = generate_permutation(); - test_intfastfield_permutation_with_data(permutation)?; + test_intfastfield_permutation_with_data(&permutation)?; + Ok(()) + } + + #[test] + fn test_intfastfield_permutation_gcd() -> crate::Result<()> { + let permutation = generate_permutation_gcd(); + test_intfastfield_permutation_with_data(&permutation)?; Ok(()) } diff --git a/src/fastfield/multivalued/multivalue_start_index.rs b/src/fastfield/multivalued/multivalue_start_index.rs index 111b3eacf..a7f2ef003 100644 --- a/src/fastfield/multivalued/multivalue_start_index.rs +++ b/src/fastfield/multivalued/multivalue_start_index.rs @@ -1,6 +1,7 @@ use fastfield_codecs::{Column, ColumnReader}; use crate::indexer::doc_id_mapping::DocIdMapping; +use crate::DocId; pub(crate) struct MultivalueStartIndex<'a, C: Column> { column: &'a C, @@ -10,38 +11,63 @@ pub(crate) struct MultivalueStartIndex<'a, C: Column> { } struct MultivalueStartIndexReader<'a, C: Column> { - seek_head: MultivalueStartIndexIter<'a, C>, - seek_next_id: u64, + column: &'a C, + doc_id_map: &'a DocIdMapping, + idx: u64, + val: u64, + len: u64, } impl<'a, C: Column> MultivalueStartIndexReader<'a, C> { fn new(column: &'a C, doc_id_map: &'a DocIdMapping) -> Self { Self { - seek_head: MultivalueStartIndexIter { - column, - doc_id_map, - new_doc_id: 0, - offset: 0u64, - }, - seek_next_id: 0u64, + column, + doc_id_map, + idx: u64::MAX, + val: 0, + len: doc_id_map.num_new_doc_ids() as u64 + 1, } } fn reset(&mut self) { - self.seek_next_id = 0; - self.seek_head.new_doc_id = 0; - self.seek_head.offset = 0; + self.idx = u64::MAX; + self.val = 0; } } impl<'a, C: Column> ColumnReader for MultivalueStartIndexReader<'a, C> { fn seek(&mut self, idx: u64) -> u64 { - if self.seek_next_id > idx { + if self.idx > idx { self.reset(); + self.advance(); } - let to_skip = idx - self.seek_next_id; - self.seek_next_id = idx + 1; - self.seek_head.nth(to_skip as usize).unwrap() + for _ in self.idx..idx { + self.advance(); + } + self.get() + } + + fn advance(&mut self) -> bool { + if self.idx == u64::MAX { + self.idx = 0; + self.val = 0; + return true; + } + let new_doc_id: DocId = self.idx as DocId; + self.idx += 1; + if self.idx >= self.len { + self.idx = self.len; + return false; + } + let old_doc: DocId = self.doc_id_map.get_old_doc_id(new_doc_id); + let num_vals_for_doc = + self.column.get_val(old_doc as u64 + 1) - self.column.get_val(old_doc as u64); + self.val += num_vals_for_doc; + true + } + + fn get(&self) -> u64 { + self.val } } @@ -83,10 +109,6 @@ impl<'a, C: Column> Column for MultivalueStartIndex<'a, C> { fn num_vals(&self) -> u64 { (self.doc_id_map.num_new_doc_ids() + 1) as u64 } - - fn iter<'b>(&'b self) -> Box + 'b> { - Box::new(MultivalueStartIndexIter::new(self.column, self.doc_id_map)) - } } struct MultivalueStartIndexIter<'a, C: Column> { @@ -143,7 +165,8 @@ mod tests { ); assert_eq!(multivalue_start_index.num_vals(), 4); assert_eq!( - multivalue_start_index.iter().collect::>(), + fastfield_codecs::iter_from_reader(multivalue_start_index.reader()) + .collect::>(), vec![0, 4, 6, 11] ); // 4, 2, 5 } @@ -156,7 +179,8 @@ mod tests { let col = VecColumn::from(&[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][..]); let multivalue_start_index = MultivalueStartIndex::new(&col, &doc_id_mapping); assert_eq!( - multivalue_start_index.iter().collect::>(), + fastfield_codecs::iter_from_reader(multivalue_start_index.reader()) + .collect::>(), vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55] ); assert_eq!(multivalue_start_index.num_vals(), 11); diff --git a/src/fastfield/remapped_column.rs b/src/fastfield/remapped_column.rs new file mode 100644 index 000000000..34c34dc6d --- /dev/null +++ b/src/fastfield/remapped_column.rs @@ -0,0 +1,112 @@ +use fastfield_codecs::{Column, ColumnReader}; +use tantivy_bitpacker::BlockedBitpacker; + +use crate::indexer::doc_id_mapping::DocIdMapping; +use crate::DocId; + +#[derive(Clone)] +pub(crate) struct WriterFastFieldColumn<'map, 'bitp> { + pub(crate) doc_id_mapping_opt: Option<&'map DocIdMapping>, + pub(crate) vals: &'bitp BlockedBitpacker, + pub(crate) min_value: u64, + pub(crate) max_value: u64, + pub(crate) num_vals: u64, +} + +impl<'map, 'bitp> Column for WriterFastFieldColumn<'map, 'bitp> { + /// Return the value associated to the given doc. + /// + /// Whenever possible use the Iterator passed to the fastfield creation instead, for performance + /// reasons. + /// + /// # Panics + /// + /// May panic if `doc` is greater than the index. + fn get_val(&self, doc: u64) -> u64 { + if let Some(doc_id_map) = self.doc_id_mapping_opt { + self.vals + .get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra + // FastFieldReader wrapper for + // non doc_id_map + } else { + self.vals.get(doc as usize) + } + } + + fn reader(&self) -> Box { + if let Some(doc_id_mapping) = self.doc_id_mapping_opt { + Box::new(RemappedColumnReader { + doc_id_mapping, + vals: self.vals, + idx: u64::MAX, + len: doc_id_mapping.num_new_doc_ids() as u64, + }) + } else { + Box::new(BitpackedColumnReader { + vals: self.vals, + idx: u64::MAX, + len: self.num_vals, + }) + } + } + + fn min_value(&self) -> u64 { + self.min_value + } + + fn max_value(&self) -> u64 { + self.max_value + } + + fn num_vals(&self) -> u64 { + self.num_vals + } +} + +struct RemappedColumnReader<'a> { + doc_id_mapping: &'a DocIdMapping, + vals: &'a BlockedBitpacker, + idx: u64, + len: u64, +} + +impl<'a> ColumnReader for RemappedColumnReader<'a> { + fn seek(&mut self, target_idx: u64) -> u64 { + assert!(target_idx < self.len); + self.idx = target_idx; + self.get() + } + + fn advance(&mut self) -> bool { + self.idx = self.idx.wrapping_add(1); + self.idx < self.len + } + + fn get(&self) -> u64 { + let old_doc_id: DocId = self.doc_id_mapping.get_old_doc_id(self.idx as DocId); + self.vals.get(old_doc_id as usize) + } +} + +struct BitpackedColumnReader<'a> { + vals: &'a BlockedBitpacker, + idx: u64, + len: u64, +} + +impl<'a> ColumnReader for BitpackedColumnReader<'a> { + fn seek(&mut self, target_idx: u64) -> u64 { + assert!(target_idx < self.len); + self.idx = target_idx; + self.get() + } + + fn advance(&mut self) -> bool { + self.idx = self.idx.wrapping_add(1); + self.idx < self.len + } + + fn get(&self) -> u64 { + self.vals.get(self.idx as usize) + } +} diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index a7d70635b..8ef65d073 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -2,12 +2,13 @@ use std::collections::HashMap; use std::io; use common; -use fastfield_codecs::{Column, MonotonicallyMappableToU64}; +use fastfield_codecs::MonotonicallyMappableToU64; use fnv::FnvHashMap; use tantivy_bitpacker::BlockedBitpacker; use super::multivalued::MultiValuedFastFieldWriter; use super::FastFieldType; +use crate::fastfield::remapped_column::WriterFastFieldColumn; use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer}; use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; @@ -351,7 +352,7 @@ impl IntFastFieldWriter { pub fn serialize( &self, serializer: &mut CompositeFastFieldSerializer, - doc_id_map: Option<&DocIdMapping>, + doc_id_mapping_opt: Option<&DocIdMapping>, ) -> io::Result<()> { let (min, max) = if self.val_min > self.val_max { (0, 0) @@ -359,8 +360,8 @@ impl IntFastFieldWriter { (self.val_min, self.val_max) }; - let fastfield_accessor = WriterFastFieldAccessProvider { - doc_id_map, + let fastfield_accessor = WriterFastFieldColumn { + doc_id_mapping_opt, vals: &self.vals, min_value: min, max_value: max, @@ -372,57 +373,3 @@ impl IntFastFieldWriter { Ok(()) } } - -#[derive(Clone)] -struct WriterFastFieldAccessProvider<'map, 'bitp> { - doc_id_map: Option<&'map DocIdMapping>, - vals: &'bitp BlockedBitpacker, - min_value: u64, - max_value: u64, - num_vals: u64, -} - -impl<'map, 'bitp> Column for WriterFastFieldAccessProvider<'map, 'bitp> { - /// Return the value associated to the given doc. - /// - /// Whenever possible use the Iterator passed to the fastfield creation instead, for performance - /// reasons. - /// - /// # Panics - /// - /// May panic if `doc` is greater than the index. - fn get_val(&self, doc: u64) -> u64 { - if let Some(doc_id_map) = self.doc_id_map { - self.vals - .get(doc_id_map.get_old_doc_id(doc as u32) as usize) // consider extra - // FastFieldReader wrapper for - // non doc_id_map - } else { - self.vals.get(doc as usize) - } - } - - fn iter(&self) -> Box + '_> { - if let Some(doc_id_map) = self.doc_id_map { - Box::new( - doc_id_map - .iter_old_doc_ids() - .map(|doc_id| self.vals.get(doc_id as usize)), - ) - } else { - Box::new(self.vals.iter()) - } - } - - fn min_value(&self) -> u64 { - self.min_value - } - - fn max_value(&self) -> u64 { - self.max_value - } - - fn num_vals(&self) -> u64 { - self.num_vals - } -} diff --git a/src/indexer/sorted_doc_id_column.rs b/src/indexer/sorted_doc_id_column.rs index 1f84c20ac..abb888867 100644 --- a/src/indexer/sorted_doc_id_column.rs +++ b/src/indexer/sorted_doc_id_column.rs @@ -1,11 +1,11 @@ use std::sync::Arc; -use fastfield_codecs::Column; +use fastfield_codecs::{Column, ColumnReader}; use itertools::Itertools; use crate::indexer::doc_id_mapping::SegmentDocIdMapping; use crate::schema::Field; -use crate::{DocAddress, SegmentReader}; +use crate::{DocAddress, DocId, SegmentReader}; pub(crate) struct SortedDocIdColumn<'a> { doc_id_mapping: &'a SegmentDocIdMapping, @@ -87,17 +87,14 @@ impl<'a> Column for SortedDocIdColumn<'a> { self.fast_field_readers[segment_ord as usize].get_val(doc_id as u64) } - fn iter(&self) -> Box + '_> { - Box::new( - self.doc_id_mapping - .iter_old_doc_addrs() - .map(|old_doc_addr| { - let fast_field_reader = - &self.fast_field_readers[old_doc_addr.segment_ord as usize]; - fast_field_reader.get_val(old_doc_addr.doc_id as u64) - }), - ) + fn reader(&self) -> Box + '_> { + Box::new(SortedDocIdColumnReader { + doc_id_mapping: self.doc_id_mapping, + fast_field_readers: &self.fast_field_readers[..], + new_doc_id: u32::MAX, + }) } + fn min_value(&self) -> u64 { self.min_value } @@ -110,3 +107,27 @@ impl<'a> Column for SortedDocIdColumn<'a> { self.num_vals } } + +struct SortedDocIdColumnReader<'a> { + doc_id_mapping: &'a SegmentDocIdMapping, + fast_field_readers: &'a [Arc], + new_doc_id: DocId, +} + +impl<'a> ColumnReader for SortedDocIdColumnReader<'a> { + fn seek(&mut self, target_idx: u64) -> u64 { + assert!(target_idx < self.doc_id_mapping.len() as u64); + self.new_doc_id = target_idx as u32; + self.get() + } + + fn advance(&mut self) -> bool { + self.new_doc_id = self.new_doc_id.wrapping_add(1); + self.new_doc_id < self.doc_id_mapping.len() as u32 + } + + fn get(&self) -> u64 { + let old_doc = self.doc_id_mapping.get_old_doc_addr(self.new_doc_id); + self.fast_field_readers[old_doc.segment_ord as usize].get_val(old_doc.doc_id as u64) + } +} diff --git a/src/indexer/sorted_doc_id_multivalue_column.rs b/src/indexer/sorted_doc_id_multivalue_column.rs index bcd86baad..e816ad572 100644 --- a/src/indexer/sorted_doc_id_multivalue_column.rs +++ b/src/indexer/sorted_doc_id_multivalue_column.rs @@ -1,6 +1,6 @@ use std::cmp; -use fastfield_codecs::Column; +use fastfield_codecs::{Column, ColumnReader}; use crate::fastfield::{MultiValueLength, MultiValuedFastFieldReader}; use crate::indexer::doc_id_mapping::SegmentDocIdMapping; @@ -95,18 +95,6 @@ impl<'a> Column for SortedDocIdMultiValueColumn<'a> { vals[pos_in_values as usize] } - fn iter(&self) -> Box + '_> { - Box::new( - self.doc_id_mapping - .iter_old_doc_addrs() - .flat_map(|old_doc_addr| { - let ff_reader = &self.fast_field_readers[old_doc_addr.segment_ord as usize]; - let mut vals = Vec::new(); - ff_reader.get_vals(old_doc_addr.doc_id, &mut vals); - vals.into_iter() - }), - ) - } fn min_value(&self) -> u64 { self.min_value } @@ -118,4 +106,80 @@ impl<'a> Column for SortedDocIdMultiValueColumn<'a> { fn num_vals(&self) -> u64 { self.num_vals } + + fn reader(&self) -> Box + '_> { + let mut reader = SortedDocMultiValueColumnReader { + doc_id_mapping: self.doc_id_mapping, + fast_field_readers: &self.fast_field_readers[..], + new_doc_id: u32::MAX, + in_buffer_idx: 0, + buffer: Vec::new(), + idx: u64::MAX, + }; + reader.reset(); + Box::new(reader) + } +} + +struct SortedDocMultiValueColumnReader<'a> { + doc_id_mapping: &'a SegmentDocIdMapping, + fast_field_readers: &'a [MultiValuedFastFieldReader], + + new_doc_id: DocId, + in_buffer_idx: usize, + buffer: Vec, + idx: u64, +} + +impl<'a> SortedDocMultiValueColumnReader<'a> { + fn fill(&mut self) { + let old_doc = self.doc_id_mapping.get_old_doc_addr(self.new_doc_id); + let ff_reader = &self.fast_field_readers[old_doc.segment_ord as usize]; + ff_reader.get_vals(old_doc.doc_id, &mut self.buffer); + self.in_buffer_idx = 0; + } + + fn reset(&mut self) { + self.buffer.clear(); + self.idx = u64::MAX; + self.in_buffer_idx = 0; + self.new_doc_id = u32::MAX; + } +} + +impl<'a> ColumnReader for SortedDocMultiValueColumnReader<'a> { + fn seek(&mut self, target_idx: u64) -> u64 { + if target_idx < self.idx { + self.reset(); + self.advance(); + } + for _ in self.idx..target_idx { + // TODO could be optimized. + assert!(self.advance()); + } + self.get() + } + + fn advance(&mut self) -> bool { + loop { + self.in_buffer_idx += 1; + if self.in_buffer_idx < self.buffer.len() { + self.idx = self.idx.wrapping_add(1); + return true; + } + self.new_doc_id = self.new_doc_id.wrapping_add(1); + if self.new_doc_id >= self.doc_id_mapping.len() as u32 { + return false; + } + self.fill(); + if !self.buffer.is_empty() { + self.idx = self.idx.wrapping_add(1); + return true; + } + } + } + + fn get(&self) -> u64 { + self.buffer[self.in_buffer_idx] + } }