From c4474fef3adc36af7888f2e8d4ae1967dbb9dbb9 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 17 Nov 2022 17:07:02 +0800 Subject: [PATCH] add OptionalColumn --- fastfield_codecs/benches/bench.rs | 2 +- fastfield_codecs/src/bitpacked.rs | 21 +++ fastfield_codecs/src/blockwise_linear.rs | 27 ++++ fastfield_codecs/src/gcd.rs | 20 +-- fastfield_codecs/src/lib.rs | 20 ++- fastfield_codecs/src/linear.rs | 23 ++++ fastfield_codecs/src/optional_column.rs | 120 ++++++++++++++++++ fastfield_codecs/src/serialize.rs | 5 +- src/aggregation/agg_req_with_accessor.rs | 10 +- src/aggregation/bucket/histogram/histogram.rs | 99 +++++++++------ src/aggregation/bucket/range.rs | 31 +++-- src/aggregation/metric/average.rs | 41 +++--- src/aggregation/metric/stats.rs | 39 +++--- src/collector/filter_collector_wrapper.rs | 4 +- src/collector/histogram_collector.rs | 6 +- src/collector/top_score_collector.rs | 4 +- src/fastfield/mod.rs | 39 ++++-- src/fastfield/multivalued/mod.rs | 7 +- src/fastfield/readers.rs | 52 ++++++-- src/indexer/merger.rs | 6 +- src/indexer/merger_sorted_index_test.rs | 10 +- src/indexer/sorted_doc_id_column.rs | 20 ++- src/lib.rs | 2 +- 23 files changed, 454 insertions(+), 154 deletions(-) create mode 100644 fastfield_codecs/src/optional_column.rs diff --git a/fastfield_codecs/benches/bench.rs b/fastfield_codecs/benches/bench.rs index 8f89f3ecf..bc7a25359 100644 --- a/fastfield_codecs/benches/bench.rs +++ b/fastfield_codecs/benches/bench.rs @@ -41,7 +41,7 @@ mod tests { ) -> Arc> { let mut buffer = Vec::new(); serialize(VecColumn::from(&column), &mut buffer, &ALL_CODEC_TYPES).unwrap(); - open(OwnedBytes::new(buffer)).unwrap() + open(OwnedBytes::new(buffer)).unwrap().to_full().unwrap() } #[bench] diff --git a/fastfield_codecs/src/bitpacked.rs b/fastfield_codecs/src/bitpacked.rs index 044debb96..4d25cb3b5 100644 --- a/fastfield_codecs/src/bitpacked.rs +++ b/fastfield_codecs/src/bitpacked.rs @@ -3,6 +3,7 @@ use std::io::{self, Write}; use ownedbytes::OwnedBytes; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; +use crate::optional_column::OptionalColumn; use crate::serialize::NormalizedHeader; use crate::{Column, FastFieldCodec, FastFieldCodecType}; @@ -35,6 +36,26 @@ impl Column for BitpackedReader { } } +impl OptionalColumn for BitpackedReader { + #[inline] + fn get_val(&self, doc: u32) -> Option { + Some(self.bit_unpacker.get(doc, &self.data)) + } + #[inline] + fn min_value(&self) -> Option { + // The BitpackedReader assumes a normalized vector. + Some(0) + } + #[inline] + fn max_value(&self) -> Option { + Some(self.normalized_header.max_value) + } + #[inline] + fn num_vals(&self) -> u32 { + self.normalized_header.num_vals + } +} + pub struct BitpackedCodec; impl FastFieldCodec for BitpackedCodec { diff --git a/fastfield_codecs/src/blockwise_linear.rs b/fastfield_codecs/src/blockwise_linear.rs index 553463cc7..60e8e26a9 100644 --- a/fastfield_codecs/src/blockwise_linear.rs +++ b/fastfield_codecs/src/blockwise_linear.rs @@ -6,6 +6,7 @@ use ownedbytes::OwnedBytes; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; use crate::line::Line; +use crate::optional_column::OptionalColumn; use crate::serialize::NormalizedHeader; use crate::{Column, FastFieldCodec, FastFieldCodecType, VecColumn}; @@ -184,3 +185,29 @@ impl Column for BlockwiseLinearReader { self.normalized_header.num_vals } } + +impl OptionalColumn for BlockwiseLinearReader { + #[inline] + fn get_val(&self, idx: u32) -> Option { + let block_id = (idx / CHUNK_SIZE as u32) as usize; + let idx_within_block = idx % (CHUNK_SIZE as u32); + let block = &self.blocks[block_id]; + let interpoled_val: u64 = block.line.eval(idx_within_block); + let block_bytes = &self.data[block.data_start_offset..]; + let bitpacked_diff = block.bit_unpacker.get(idx_within_block, block_bytes); + Some(interpoled_val.wrapping_add(bitpacked_diff)) + } + #[inline] + fn min_value(&self) -> Option { + // The BitpackedReader assumes a normalized vector. + Some(0) + } + #[inline] + fn max_value(&self) -> Option { + Some(self.normalized_header.max_value) + } + #[inline] + fn num_vals(&self) -> u32 { + self.normalized_header.num_vals + } +} diff --git a/fastfield_codecs/src/gcd.rs b/fastfield_codecs/src/gcd.rs index 7917d7ca4..96d706b75 100644 --- a/fastfield_codecs/src/gcd.rs +++ b/fastfield_codecs/src/gcd.rs @@ -59,11 +59,11 @@ mod tests { crate::serialize(VecColumn::from(&vals), &mut buffer, &[codec_type])?; let buffer = OwnedBytes::new(buffer); let column = crate::open::(buffer.clone())?; - assert_eq!(column.get_val(0), -4000i64); - assert_eq!(column.get_val(1), -3000i64); - assert_eq!(column.get_val(2), -2000i64); - assert_eq!(column.max_value(), (num_vals as i64 - 5) * 1000); - assert_eq!(column.min_value(), -4000i64); + assert_eq!(column.get_val(0), Some(-4000i64)); + assert_eq!(column.get_val(1), Some(-3000i64)); + assert_eq!(column.get_val(2), Some(-2000i64)); + assert_eq!(column.max_value(), Some((num_vals as i64 - 5) * 1000)); + assert_eq!(column.min_value(), Some(-4000i64)); // Can't apply gcd let mut buffer_without_gcd = Vec::new(); @@ -101,11 +101,11 @@ mod tests { crate::serialize(VecColumn::from(&vals), &mut buffer, &[codec_type])?; let buffer = OwnedBytes::new(buffer); let column = crate::open::(buffer.clone())?; - assert_eq!(column.get_val(0), 1000u64); - assert_eq!(column.get_val(1), 2000u64); - assert_eq!(column.get_val(2), 3000u64); - assert_eq!(column.max_value(), num_vals as u64 * 1000); - assert_eq!(column.min_value(), 1000u64); + assert_eq!(column.get_val(0), Some(1000u64)); + assert_eq!(column.get_val(1), Some(2000u64)); + assert_eq!(column.get_val(2), Some(3000u64)); + assert_eq!(column.max_value(), Some(num_vals as u64 * 1000)); + assert_eq!(column.min_value(), Some(1000u64)); // Can't apply gcd let mut buffer_without_gcd = Vec::new(); diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index f8a22732f..8e2415cb2 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -24,6 +24,8 @@ use monotonic_mapping::{ StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal, StrictlyMonotonicMappingToInternalBaseval, StrictlyMonotonicMappingToInternalGCDBaseval, }; +pub use optional_column::OptionalColumn; +use optional_column::ToOptionalColumn; use ownedbytes::OwnedBytes; use serialize::{Header, U128Header}; @@ -34,6 +36,7 @@ mod line; mod linear; mod monotonic_mapping; mod monotonic_mapping_u128; +mod optional_column; mod column; mod gcd; @@ -142,7 +145,7 @@ pub fn open_u128( /// Returns the correct codec reader wrapped in the `Arc` for the data. pub fn open( mut bytes: OwnedBytes, -) -> io::Result>> { +) -> io::Result>> { let header = Header::deserialize(&mut bytes)?; match header.codec_type { FastFieldCodecType::Bitpacked => open_specific_codec::(bytes, &header), @@ -156,7 +159,7 @@ pub fn open( fn open_specific_codec( bytes: OwnedBytes, header: &Header, -) -> io::Result>> { +) -> io::Result>> { let normalized_header = header.normalized(); let reader = C::open_from_bytes(bytes, normalized_header)?; let min_value = header.min_value; @@ -164,12 +167,16 @@ fn open_specific_codec( let mapping = StrictlyMonotonicMappingInverter::from( StrictlyMonotonicMappingToInternalGCDBaseval::new(gcd.get(), min_value), ); - Ok(Arc::new(monotonic_map_column(reader, mapping))) + Ok(Arc::new(ToOptionalColumn::new(Arc::new( + monotonic_map_column(reader, mapping), + )))) } else { let mapping = StrictlyMonotonicMappingInverter::from( StrictlyMonotonicMappingToInternalBaseval::new(min_value), ); - Ok(Arc::new(monotonic_map_column(reader, mapping))) + Ok(Arc::new(ToOptionalColumn::new(Arc::new( + monotonic_map_column(reader, mapping), + )))) } } @@ -240,8 +247,9 @@ mod tests { for (doc, orig_val) in data.iter().copied().enumerate() { let val = reader.get_val(doc as u32); assert_eq!( - val, orig_val, - "val `{val}` does not match orig_val {orig_val:?}, in data set {name}, data \ + val, + Some(orig_val), + "val `{val:?}` does not match orig_val {orig_val:?}, in data set {name}, data \ `{data:?}`", ); } diff --git a/fastfield_codecs/src/linear.rs b/fastfield_codecs/src/linear.rs index d75eeea80..dceb46194 100644 --- a/fastfield_codecs/src/linear.rs +++ b/fastfield_codecs/src/linear.rs @@ -5,6 +5,7 @@ use ownedbytes::OwnedBytes; use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker}; use crate::line::Line; +use crate::optional_column::OptionalColumn; use crate::serialize::NormalizedHeader; use crate::{Column, FastFieldCodec, FastFieldCodecType}; @@ -42,6 +43,28 @@ impl Column for LinearReader { } } +impl OptionalColumn for LinearReader { + #[inline] + fn get_val(&self, doc: u32) -> Option { + let interpoled_val: u64 = self.linear_params.line.eval(doc); + let bitpacked_diff = self.linear_params.bit_unpacker.get(doc, &self.data); + Some(interpoled_val.wrapping_add(bitpacked_diff)) + } + #[inline] + fn min_value(&self) -> Option { + // The BitpackedReader assumes a normalized vector. + Some(0) + } + #[inline] + fn max_value(&self) -> Option { + Some(self.header.max_value) + } + #[inline] + fn num_vals(&self) -> u32 { + self.header.num_vals + } +} + /// Fastfield serializer, which tries to guess values by linear interpolation /// and stores the difference bitpacked. pub struct LinearCodec; diff --git a/fastfield_codecs/src/optional_column.rs b/fastfield_codecs/src/optional_column.rs new file mode 100644 index 000000000..c4efc76f9 --- /dev/null +++ b/fastfield_codecs/src/optional_column.rs @@ -0,0 +1,120 @@ +use std::ops::{Range, RangeInclusive}; +use std::sync::Arc; + +use crate::Column; + +/// `OptionalColumn` provides columnar access on a field. +pub trait OptionalColumn: Send + Sync { + /// Return the value associated with the given idx. + /// + /// This accessor should return as fast as possible. + /// + /// # Panics + /// + /// May panic if `idx` is greater than the column length. + fn get_val(&self, idx: u32) -> Option; + + /// Fills an output buffer with the fast field values + /// associated with the `DocId` going from + /// `start` to `start + output.len()`. + /// + /// # Panics + /// + /// Must panic if `start + output.len()` is greater than + /// the segment's `maxdoc`. + + fn get_range(&self, start: u64, output: &mut [Option]) { + for (out, idx) in output.iter_mut().zip(start..) { + *out = self.get_val(idx as u32); + } + } + + /// Return the positions of values which are in the provided range. + fn get_docids_for_value_range( + &self, + value_range: RangeInclusive, + doc_id_range: Range, + positions: &mut Vec, + ) { + let doc_id_range = doc_id_range.start..doc_id_range.end.min(self.num_vals()); + + for idx in doc_id_range.start..doc_id_range.end { + let val = self.get_val(idx); + if let Some(val) = val { + if value_range.contains(&val) { + positions.push(idx); + } + } + } + } + + /// Returns the minimum value for this fast field. + /// + /// This min_value may not be exact. + /// For instance, the min value does not take in account of possible + /// deleted document. All values are however guaranteed to be higher than + /// `.min_value()`. + fn min_value(&self) -> Option; + + /// Returns the maximum value for this fast field. + /// + /// This max_value may not be exact. + /// For instance, the max value does not take in account of possible + /// deleted document. All values are however guaranteed to be higher than + /// `.max_value()`. + fn max_value(&self) -> Option; + + /// The number of values in the column. + fn num_vals(&self) -> u32; + + /// Returns a iterator over the data + fn iter<'a>(&'a self) -> Box> + 'a> { + Box::new((0..self.num_vals()).map(|idx| self.get_val(idx))) + } + + /// return full column if all values are set and is not empty + fn to_full(&self) -> Option>> { + None + } +} + +/// Temporary wrapper to migrate to optional column +pub(crate) struct ToOptionalColumn { + column: Arc>, +} + +impl ToOptionalColumn { + pub(crate) fn new(column: Arc>) -> Self { + Self { column } + } +} + +impl OptionalColumn for ToOptionalColumn { + #[inline] + fn get_val(&self, idx: u32) -> Option { + let val = self.column.get_val(idx); + Some(val) + } + + fn min_value(&self) -> Option { + let min_value = self.column.min_value(); + Some(min_value) + } + + fn max_value(&self) -> Option { + let max_value = self.column.max_value(); + Some(max_value) + } + + fn num_vals(&self) -> u32 { + self.column.num_vals() + } + + fn iter(&self) -> Box> + '_> { + Box::new(self.column.iter().map(|el| Some(el))) + } + /// return full column if all values are set and is not empty + fn to_full(&self) -> Option>> { + Some(self.column.clone()) + } +} diff --git a/fastfield_codecs/src/serialize.rs b/fastfield_codecs/src/serialize.rs index b0f9e15da..cfe5c663f 100644 --- a/fastfield_codecs/src/serialize.rs +++ b/fastfield_codecs/src/serialize.rs @@ -278,7 +278,10 @@ pub fn serialize_and_load( ) -> Arc> { let mut buffer = Vec::new(); super::serialize(VecColumn::from(&column), &mut buffer, &ALL_CODEC_TYPES).unwrap(); - super::open(OwnedBytes::new(buffer)).unwrap() + super::open(OwnedBytes::new(buffer)) + .unwrap() + .to_full() + .unwrap() } #[cfg(test)] diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 6e09749aa..387d9b39d 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -4,7 +4,7 @@ use std::rc::Rc; use std::sync::atomic::AtomicU32; use std::sync::Arc; -use fastfield_codecs::Column; +use fastfield_codecs::OptionalColumn; use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation}; use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation}; @@ -37,16 +37,16 @@ impl AggregationsWithAccessor { #[derive(Clone)] pub(crate) enum FastFieldAccessor { Multi(MultiValuedFastFieldReader), - Single(Arc>), + Single(Arc>), } impl FastFieldAccessor { - pub fn as_single(&self) -> Option<&dyn Column> { + pub fn as_single(&self) -> Option<&dyn OptionalColumn> { match self { FastFieldAccessor::Multi(_) => None, FastFieldAccessor::Single(reader) => Some(&**reader), } } - pub fn into_single(self) -> Option>> { + pub fn into_single(self) -> Option>> { match self { FastFieldAccessor::Multi(_) => None, FastFieldAccessor::Single(reader) => Some(reader), @@ -124,7 +124,7 @@ impl BucketAggregationWithAccessor { pub struct MetricAggregationWithAccessor { pub metric: MetricAggregation, pub field_type: Type, - pub accessor: Arc, + pub accessor: Arc, } impl MetricAggregationWithAccessor { diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index c2d0c1277..fb83ec57a 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -1,7 +1,7 @@ use std::cmp::Ordering; use std::fmt::Display; -use fastfield_codecs::Column; +use fastfield_codecs::OptionalColumn; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -263,13 +263,17 @@ impl SegmentHistogramCollector { req: &HistogramAggregation, sub_aggregation: &AggregationsWithAccessor, field_type: Type, - accessor: &dyn Column, + accessor: &dyn OptionalColumn, ) -> crate::Result { req.validate()?; - let min = f64_from_fastfield_u64(accessor.min_value(), &field_type); - let max = f64_from_fastfield_u64(accessor.max_value(), &field_type); + let min_max_u64 = accessor.min_value().zip(accessor.max_value()); + let min_max_f64 = min_max_u64.map(|(min, max)| { + let min = f64_from_fastfield_u64(min, &field_type); + let max = f64_from_fastfield_u64(max, &field_type); + (min, max) + }); - let (min, max) = get_req_min_max(req, Some((min, max))); + let (min, max) = get_req_min_max(req, min_max_f64); // We compute and generate the buckets range (min, max) based on the request and the min // max in the fast field, but this is likely not ideal when this is a subbucket, where many @@ -331,47 +335,58 @@ impl SegmentHistogramCollector { .expect("unexpected fast field cardinatility"); let mut iter = doc.chunks_exact(4); for docs in iter.by_ref() { - let val0 = self.f64_from_fastfield_u64(accessor.get_val(docs[0])); - let val1 = self.f64_from_fastfield_u64(accessor.get_val(docs[1])); - let val2 = self.f64_from_fastfield_u64(accessor.get_val(docs[2])); - let val3 = self.f64_from_fastfield_u64(accessor.get_val(docs[3])); + if let Some(val) = accessor.get_val(docs[0]) { + let val = self.f64_from_fastfield_u64(val); + let bucket_pos = get_bucket_num(val); + self.increment_bucket_if_in_bounds( + val, + &bounds, + bucket_pos, + docs[0], + &bucket_with_accessor.sub_aggregation, + )?; + } - let bucket_pos0 = get_bucket_num(val0); - let bucket_pos1 = get_bucket_num(val1); - let bucket_pos2 = get_bucket_num(val2); - let bucket_pos3 = get_bucket_num(val3); + if let Some(val) = accessor.get_val(docs[1]) { + let val = self.f64_from_fastfield_u64(val); + let bucket_pos = get_bucket_num(val); + self.increment_bucket_if_in_bounds( + val, + &bounds, + bucket_pos, + docs[1], + &bucket_with_accessor.sub_aggregation, + )?; + } - self.increment_bucket_if_in_bounds( - val0, - &bounds, - bucket_pos0, - docs[0], - &bucket_with_accessor.sub_aggregation, - )?; - self.increment_bucket_if_in_bounds( - val1, - &bounds, - bucket_pos1, - docs[1], - &bucket_with_accessor.sub_aggregation, - )?; - self.increment_bucket_if_in_bounds( - val2, - &bounds, - bucket_pos2, - docs[2], - &bucket_with_accessor.sub_aggregation, - )?; - self.increment_bucket_if_in_bounds( - val3, - &bounds, - bucket_pos3, - docs[3], - &bucket_with_accessor.sub_aggregation, - )?; + if let Some(val) = accessor.get_val(docs[2]) { + let val = self.f64_from_fastfield_u64(val); + let bucket_pos = get_bucket_num(val); + self.increment_bucket_if_in_bounds( + val, + &bounds, + bucket_pos, + docs[2], + &bucket_with_accessor.sub_aggregation, + )?; + } + + if let Some(val) = accessor.get_val(docs[3]) { + let val = self.f64_from_fastfield_u64(val); + let bucket_pos = get_bucket_num(val); + self.increment_bucket_if_in_bounds( + val, + &bounds, + bucket_pos, + docs[3], + &bucket_with_accessor.sub_aggregation, + )?; + } } for &doc in iter.remainder() { - let val = f64_from_fastfield_u64(accessor.get_val(doc), &self.field_type); + let Some(val) = accessor.get_val(doc).map(|val|f64_from_fastfield_u64(val, &self.field_type)) else{ + continue; + }; if !bounds.contains(val) { continue; } diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 333727536..30d0f94f3 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -267,20 +267,29 @@ impl SegmentRangeCollector { let val2 = accessor.get_val(docs[1]); let val3 = accessor.get_val(docs[2]); let val4 = accessor.get_val(docs[3]); - let bucket_pos1 = self.get_bucket_pos(val1); - let bucket_pos2 = self.get_bucket_pos(val2); - let bucket_pos3 = self.get_bucket_pos(val3); - let bucket_pos4 = self.get_bucket_pos(val4); - - self.increment_bucket(bucket_pos1, docs[0], &bucket_with_accessor.sub_aggregation)?; - self.increment_bucket(bucket_pos2, docs[1], &bucket_with_accessor.sub_aggregation)?; - self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation)?; - self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation)?; + if let Some(val) = val1 { + let bucket_pos = self.get_bucket_pos(val); + self.increment_bucket(bucket_pos, docs[0], &bucket_with_accessor.sub_aggregation)?; + } + if let Some(val) = val2 { + let bucket_pos = self.get_bucket_pos(val); + self.increment_bucket(bucket_pos, docs[1], &bucket_with_accessor.sub_aggregation)?; + } + if let Some(val) = val3 { + let bucket_pos = self.get_bucket_pos(val); + self.increment_bucket(bucket_pos, docs[2], &bucket_with_accessor.sub_aggregation)?; + } + if let Some(val) = val4 { + let bucket_pos = self.get_bucket_pos(val); + self.increment_bucket(bucket_pos, docs[3], &bucket_with_accessor.sub_aggregation)?; + } } for &doc in iter.remainder() { let val = accessor.get_val(doc); - let bucket_pos = self.get_bucket_pos(val); - self.increment_bucket(bucket_pos, doc, &bucket_with_accessor.sub_aggregation)?; + if let Some(val) = val { + let bucket_pos = self.get_bucket_pos(val); + self.increment_bucket(bucket_pos, doc, &bucket_with_accessor.sub_aggregation)?; + } } if force_flush { for bucket in &mut self.buckets { diff --git a/src/aggregation/metric/average.rs b/src/aggregation/metric/average.rs index 2f22430b4..1215d60d0 100644 --- a/src/aggregation/metric/average.rs +++ b/src/aggregation/metric/average.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use fastfield_codecs::Column; +use fastfield_codecs::OptionalColumn; use serde::{Deserialize, Serialize}; use crate::aggregation::f64_from_fastfield_u64; @@ -57,26 +57,33 @@ impl SegmentAverageCollector { data: Default::default(), } } - pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column) { + pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn OptionalColumn) { let mut iter = doc.chunks_exact(4); for docs in iter.by_ref() { - let val1 = field.get_val(docs[0]); - let val2 = field.get_val(docs[1]); - let val3 = field.get_val(docs[2]); - let val4 = field.get_val(docs[3]); - let val1 = f64_from_fastfield_u64(val1, &self.field_type); - let val2 = f64_from_fastfield_u64(val2, &self.field_type); - let val3 = f64_from_fastfield_u64(val3, &self.field_type); - let val4 = f64_from_fastfield_u64(val4, &self.field_type); - self.data.collect(val1); - self.data.collect(val2); - self.data.collect(val3); - self.data.collect(val4); + if let Some(val) = field.get_val(docs[0]) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.data.collect(val); + } + if let Some(val) = field.get_val(docs[1]) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.data.collect(val); + } + + if let Some(val) = field.get_val(docs[2]) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.data.collect(val); + } + + if let Some(val) = field.get_val(docs[3]) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.data.collect(val); + } } for &doc in iter.remainder() { - let val = field.get_val(doc); - let val = f64_from_fastfield_u64(val, &self.field_type); - self.data.collect(val); + if let Some(val) = field.get_val(doc) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.data.collect(val); + } } } } diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index f84944c26..ad3855852 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -1,4 +1,4 @@ -use fastfield_codecs::Column; +use fastfield_codecs::OptionalColumn; use serde::{Deserialize, Serialize}; use crate::aggregation::f64_from_fastfield_u64; @@ -163,26 +163,31 @@ impl SegmentStatsCollector { stats: IntermediateStats::default(), } } - pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column) { + pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn OptionalColumn) { let mut iter = doc.chunks_exact(4); for docs in iter.by_ref() { - let val1 = field.get_val(docs[0]); - let val2 = field.get_val(docs[1]); - let val3 = field.get_val(docs[2]); - let val4 = field.get_val(docs[3]); - let val1 = f64_from_fastfield_u64(val1, &self.field_type); - let val2 = f64_from_fastfield_u64(val2, &self.field_type); - let val3 = f64_from_fastfield_u64(val3, &self.field_type); - let val4 = f64_from_fastfield_u64(val4, &self.field_type); - self.stats.collect(val1); - self.stats.collect(val2); - self.stats.collect(val3); - self.stats.collect(val4); + if let Some(val) = field.get_val(docs[0]) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.stats.collect(val); + } + if let Some(val) = field.get_val(docs[1]) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.stats.collect(val); + } + if let Some(val) = field.get_val(docs[2]) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.stats.collect(val); + } + if let Some(val) = field.get_val(docs[3]) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.stats.collect(val); + } } for &doc in iter.remainder() { - let val = field.get_val(doc); - let val = f64_from_fastfield_u64(val, &self.field_type); - self.stats.collect(val); + if let Some(val) = field.get_val(doc) { + let val = f64_from_fastfield_u64(val, &self.field_type); + self.stats.collect(val); + } } } } diff --git a/src/collector/filter_collector_wrapper.rs b/src/collector/filter_collector_wrapper.rs index 15f52e29c..3d6893a49 100644 --- a/src/collector/filter_collector_wrapper.rs +++ b/src/collector/filter_collector_wrapper.rs @@ -130,7 +130,9 @@ where let fast_field_reader = segment_reader .fast_fields() - .typed_fast_field_reader(self.field)?; + .typed_fast_field_reader(self.field)? + .to_full() + .expect("temp migration solution"); let segment_collector = self .collector diff --git a/src/collector/histogram_collector.rs b/src/collector/histogram_collector.rs index dac0e19d9..ab3df527c 100644 --- a/src/collector/histogram_collector.rs +++ b/src/collector/histogram_collector.rs @@ -112,7 +112,11 @@ impl Collector for HistogramCollector { _segment_local_id: crate::SegmentOrdinal, segment: &crate::SegmentReader, ) -> crate::Result { - let ff_reader = segment.fast_fields().u64_lenient(self.field)?; + let ff_reader = segment + .fast_fields() + .u64_lenient(self.field)? + .to_full() + .expect("temp migration solution"); Ok(SegmentHistogramCollector { histogram_computer: HistogramComputer { counts: vec![0; self.num_buckets], diff --git a/src/collector/top_score_collector.rs b/src/collector/top_score_collector.rs index fdd6fb1fd..57a82c691 100644 --- a/src/collector/top_score_collector.rs +++ b/src/collector/top_score_collector.rs @@ -156,7 +156,9 @@ impl CustomScorer for ScorerByField { // The conversion will then happen only on the top-K docs. let ff_reader = segment_reader .fast_fields() - .typed_fast_field_reader(self.field)?; + .typed_fast_field_reader(self.field)? + .to_full() + .expect("temp migration solution"); Ok(ScorerByFastFieldReader { ff_reader }) } } diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index a29ee238c..581acd12e 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -210,7 +210,7 @@ mod tests { assert_eq!(file.len(), 25); let composite_file = CompositeFile::open(&file)?; let fast_field_bytes = composite_file.open_read(*FIELD).unwrap().read_bytes()?; - let fast_field_reader = open::(fast_field_bytes)?; + let fast_field_reader = open::(fast_field_bytes)?.to_full().unwrap(); assert_eq!(fast_field_reader.get_val(0), 13u64); assert_eq!(fast_field_reader.get_val(1), 14u64); assert_eq!(fast_field_reader.get_val(2), 2u64); @@ -263,7 +263,7 @@ mod tests { .open_read(*FIELD) .unwrap() .read_bytes()?; - let fast_field_reader = open::(data)?; + let fast_field_reader = open::(data)?.to_full().unwrap(); assert_eq!(fast_field_reader.get_val(0), 4u64); assert_eq!(fast_field_reader.get_val(1), 14_082_001u64); assert_eq!(fast_field_reader.get_val(2), 3_052u64); @@ -304,7 +304,7 @@ mod tests { .open_read(*FIELD) .unwrap() .read_bytes()?; - let fast_field_reader = open::(data)?; + let fast_field_reader = open::(data)?.to_full().unwrap(); for doc in 0..10_000 { assert_eq!(fast_field_reader.get_val(doc), 100_000u64); } @@ -343,7 +343,7 @@ mod tests { .open_read(*FIELD) .unwrap() .read_bytes()?; - let fast_field_reader = open::(data)?; + let fast_field_reader = open::(data)?.to_full().unwrap(); assert_eq!(fast_field_reader.get_val(0), 0u64); for doc in 1..10_001 { assert_eq!( @@ -386,7 +386,7 @@ mod tests { .open_read(i64_field) .unwrap() .read_bytes()?; - let fast_field_reader = open::(data)?; + let fast_field_reader = open::(data)?.to_full().unwrap(); assert_eq!(fast_field_reader.min_value(), -100i64); assert_eq!(fast_field_reader.max_value(), 9_999i64); @@ -429,7 +429,7 @@ mod tests { .open_read(i64_field) .unwrap() .read_bytes()?; - let fast_field_reader = open::(data)?; + let fast_field_reader = open::(data)?.to_full().unwrap(); assert_eq!(fast_field_reader.get_val(0), 0i64); } Ok(()) @@ -470,7 +470,7 @@ mod tests { .open_read(*FIELD) .unwrap() .read_bytes()?; - let fast_field_reader = open::(data)?; + let fast_field_reader = open::(data)?.to_full().unwrap(); for a in 0..n { assert_eq!(fast_field_reader.get_val(a as u32), permutation[a as usize]); @@ -763,19 +763,28 @@ mod tests { let dates_fast_field = fast_fields.dates(multi_date_field).unwrap(); let mut dates = vec![]; { - assert_eq!(date_fast_field.get_val(0).into_timestamp_micros(), 1i64); + assert_eq!( + date_fast_field.get_val(0).unwrap().into_timestamp_micros(), + 1i64 + ); dates_fast_field.get_vals(0u32, &mut dates); assert_eq!(dates.len(), 2); assert_eq!(dates[0].into_timestamp_micros(), 2i64); assert_eq!(dates[1].into_timestamp_micros(), 3i64); } { - assert_eq!(date_fast_field.get_val(1).into_timestamp_micros(), 4i64); + assert_eq!( + date_fast_field.get_val(1).unwrap().into_timestamp_micros(), + 4i64 + ); dates_fast_field.get_vals(1u32, &mut dates); assert!(dates.is_empty()); } { - assert_eq!(date_fast_field.get_val(2).into_timestamp_micros(), 0i64); + assert_eq!( + date_fast_field.get_val(2).unwrap().into_timestamp_micros(), + 0i64 + ); dates_fast_field.get_vals(2u32, &mut dates); assert_eq!(dates.len(), 2); assert_eq!(dates[0].into_timestamp_micros(), 5i64); @@ -825,7 +834,7 @@ mod tests { assert_eq!(file.len(), 24); let composite_file = CompositeFile::open(&file)?; let data = composite_file.open_read(field).unwrap().read_bytes()?; - let fast_field_reader = open::(data)?; + let fast_field_reader = open::(data)?.to_full().unwrap(); assert_eq!(fast_field_reader.get_val(0), true); assert_eq!(fast_field_reader.get_val(1), false); assert_eq!(fast_field_reader.get_val(2), true); @@ -863,7 +872,7 @@ mod tests { assert_eq!(file.len(), 36); let composite_file = CompositeFile::open(&file)?; let data = composite_file.open_read(field).unwrap().read_bytes()?; - let fast_field_reader = open::(data)?; + let fast_field_reader = open::(data)?.to_full().unwrap(); for i in 0..25 { assert_eq!(fast_field_reader.get_val(i * 2), true); assert_eq!(fast_field_reader.get_val(i * 2 + 1), false); @@ -894,7 +903,7 @@ mod tests { let composite_file = CompositeFile::open(&file)?; assert_eq!(file.len(), 23); let data = composite_file.open_read(field).unwrap().read_bytes()?; - let fast_field_reader = open::(data)?; + let fast_field_reader = open::(data)?.to_full().unwrap(); assert_eq!(fast_field_reader.get_val(0), false); Ok(()) @@ -962,7 +971,9 @@ mod tests { let composite_file = CompositeFile::open(&file)?; let file = composite_file.open_read(*FIELD).unwrap(); let len = file.len(); - let test_fastfield = open::(file.read_bytes()?)?; + let test_fastfield = open::(file.read_bytes()?)? + .to_full() + .expect("temp migration solution"); for (i, time) in times.iter().enumerate() { assert_eq!(test_fastfield.get_val(i as u32), time.truncate(precision)); diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index e4e6ede70..defae6204 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -533,14 +533,17 @@ mod bench { .unwrap() .read_bytes() .unwrap(); - let idx_reader = fastfield_codecs::open(data_idx).unwrap(); + let idx_reader = fastfield_codecs::open(data_idx).unwrap().to_full().unwrap(); let data_vals = fast_fields_composite .open_read_with_idx(field, 1) .unwrap() .read_bytes() .unwrap(); - let vals_reader = fastfield_codecs::open(data_vals).unwrap(); + let vals_reader = fastfield_codecs::open(data_vals) + .unwrap() + .to_full() + .unwrap(); let fast_field_reader = MultiValuedFastFieldReader::open(idx_reader, vals_reader); b.iter(|| { let mut sum = 0u64; diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index 257c8345a..7114fdbe9 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -1,7 +1,7 @@ use std::net::Ipv6Addr; use std::sync::Arc; -use fastfield_codecs::{open, open_u128, Column}; +use fastfield_codecs::{open, open_u128, Column, OptionalColumn}; use super::multivalued::MultiValuedU128FastFieldReader; use crate::directory::{CompositeFile, FileSlice}; @@ -118,7 +118,7 @@ impl FastFieldReaders { &self, field: Field, index: usize, - ) -> crate::Result>> { + ) -> crate::Result>> { let fast_field_slice = self.fast_field_data(field, index)?; let bytes = fast_field_slice.read_bytes()?; let column = fastfield_codecs::open(bytes)?; @@ -128,7 +128,7 @@ impl FastFieldReaders { pub(crate) fn typed_fast_field_reader( &self, field: Field, - ) -> crate::Result>> { + ) -> crate::Result>> { self.typed_fast_field_reader_with_idx(field, 0) } @@ -138,7 +138,14 @@ impl FastFieldReaders { ) -> crate::Result> { let idx_reader = self.typed_fast_field_reader(field)?; let vals_reader = self.typed_fast_field_reader_with_idx(field, 1)?; - Ok(MultiValuedFastFieldReader::open(idx_reader, vals_reader)) + Ok(MultiValuedFastFieldReader::open( + idx_reader + .to_full() + .expect("multivalue fast field are always full"), + vals_reader + .to_full() + .expect("multivalue fast field are always full"), + )) } /// Returns the `u64` fast field reader reader associated with `field`. @@ -146,7 +153,10 @@ impl FastFieldReaders { /// If `field` is not a u64 fast field, this method returns an Error. pub fn u64(&self, field: Field) -> crate::Result>> { self.check_type(field, FastType::U64, Cardinality::SingleValue)?; - self.typed_fast_field_reader(field) + Ok(self + .typed_fast_field_reader(field)? + .to_full() + .expect("temp migration solution")) } /// Returns the `ip` fast field reader reader associated to `field`. @@ -166,7 +176,10 @@ impl FastFieldReaders { field: Field, ) -> crate::Result> { self.check_type(field, FastType::U128, Cardinality::MultiValues)?; - let idx_reader: Arc> = self.typed_fast_field_reader(field)?; + let idx_reader: Arc> = self + .typed_fast_field_reader(field)? + .to_full() + .expect("multivalue fast fields are always full"); let bytes = self.fast_field_data(field, 1)?.read_bytes()?; let vals_reader = open_u128::(bytes)?; @@ -191,7 +204,10 @@ impl FastFieldReaders { /// If `field` is not a u128 multi-valued fast field, this method returns an Error. pub fn u128s(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::U128, Cardinality::MultiValues)?; - let idx_reader: Arc> = self.typed_fast_field_reader(field)?; + let idx_reader: Arc> = self + .typed_fast_field_reader(field)? + .to_full() + .expect("multivalue fast fields are always full"); let bytes = self.fast_field_data(field, 1)?.read_bytes()?; let vals_reader = open_u128::(bytes)?; @@ -207,14 +223,14 @@ impl FastFieldReaders { /// /// If not, the fastfield reader will returns the u64-value associated with the original /// FastValue. - pub fn u64_lenient(&self, field: Field) -> crate::Result>> { - self.typed_fast_field_reader(field) + pub fn u64_lenient(&self, field: Field) -> crate::Result>> { + Ok(self.typed_fast_field_reader(field)?) } /// Returns the `i64` fast field reader reader associated with `field`. /// /// If `field` is not a i64 fast field, this method returns an Error. - pub fn i64(&self, field: Field) -> crate::Result>> { + pub fn i64(&self, field: Field) -> crate::Result>> { self.check_type(field, FastType::I64, Cardinality::SingleValue)?; self.typed_fast_field_reader(field) } @@ -222,7 +238,7 @@ impl FastFieldReaders { /// Returns the `date` fast field reader reader associated with `field`. /// /// If `field` is not a date fast field, this method returns an Error. - pub fn date(&self, field: Field) -> crate::Result>> { + pub fn date(&self, field: Field) -> crate::Result>> { self.check_type(field, FastType::Date, Cardinality::SingleValue)?; self.typed_fast_field_reader(field) } @@ -232,13 +248,16 @@ impl FastFieldReaders { /// If `field` is not a f64 fast field, this method returns an Error. pub fn f64(&self, field: Field) -> crate::Result>> { self.check_type(field, FastType::F64, Cardinality::SingleValue)?; - self.typed_fast_field_reader(field) + Ok(self + .typed_fast_field_reader(field)? + .to_full() + .expect("temp migration solution")) } /// Returns the `bool` fast field reader reader associated with `field`. /// /// If `field` is not a bool fast field, this method returns an Error. - pub fn bool(&self, field: Field) -> crate::Result>> { + pub fn bool(&self, field: Field) -> crate::Result>> { self.check_type(field, FastType::Bool, Cardinality::SingleValue)?; self.typed_fast_field_reader(field) } @@ -309,7 +328,12 @@ impl FastFieldReaders { let fast_field_idx_bytes = fast_field_idx_file.read_bytes()?; let idx_reader = open(fast_field_idx_bytes)?; let data = self.fast_field_data(field, 1)?; - BytesFastFieldReader::open(idx_reader, data) + BytesFastFieldReader::open( + idx_reader + .to_full() + .expect("multivalue fields are always full"), + data, + ) } else { Err(FastFieldNotAvailableError::new(field_entry).into()) } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index aa9d4df21..de2340600 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -465,7 +465,11 @@ impl IndexMerger { sort_by_field: &IndexSortByField, ) -> crate::Result> { let field_id = expect_field_id_for_sort_field(reader.schema(), sort_by_field)?; // for now expect fastfield, but not strictly required - let value_accessor = reader.fast_fields().u64_lenient(field_id)?; + let value_accessor = reader + .fast_fields() + .u64_lenient(field_id)? + .to_full() + .expect("temp migration solution"); Ok(value_accessor) } /// Collecting value_accessors into a vec to bind the lifetime. diff --git a/src/indexer/merger_sorted_index_test.rs b/src/indexer/merger_sorted_index_test.rs index ba41e62f0..700047480 100644 --- a/src/indexer/merger_sorted_index_test.rs +++ b/src/indexer/merger_sorted_index_test.rs @@ -535,11 +535,15 @@ mod bench_sorted_index_merge { b.iter(|| { let sorted_doc_ids = doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| { let reader = &merger.readers[doc_addr.segment_ord as usize]; - let u64_reader: Arc> = - reader.fast_fields().typed_fast_field_reader(field).expect( + let u64_reader: Arc> = reader + .fast_fields() + .typed_fast_field_reader(field) + .expect( "Failed to find a reader for single fast field. This is a tantivy bug and \ it should never happen.", - ); + ) + .to_full() + .unwrap(); (doc_addr.doc_id, reader, u64_reader) }); // add values in order of the new doc_ids diff --git a/src/indexer/sorted_doc_id_column.rs b/src/indexer/sorted_doc_id_column.rs index 75665bab0..9f68df792 100644 --- a/src/indexer/sorted_doc_id_column.rs +++ b/src/indexer/sorted_doc_id_column.rs @@ -46,11 +46,15 @@ impl<'a> RemappedDocIdColumn<'a> { let (min_value, max_value) = readers .iter() .filter_map(|reader| { - let u64_reader: Arc> = - reader.fast_fields().typed_fast_field_reader(field).expect( + let u64_reader: Arc> = reader + .fast_fields() + .typed_fast_field_reader(field) + .expect( "Failed to find a reader for single fast field. This is a tantivy bug and \ it should never happen.", - ); + ) + .to_full() + .expect("temp migration solution"); compute_min_max_val(&*u64_reader, reader) }) .reduce(|a, b| (a.0.min(b.0), a.1.max(b.1))) @@ -59,11 +63,15 @@ impl<'a> RemappedDocIdColumn<'a> { let fast_field_readers = readers .iter() .map(|reader| { - let u64_reader: Arc> = - reader.fast_fields().typed_fast_field_reader(field).expect( + let u64_reader: Arc> = reader + .fast_fields() + .typed_fast_field_reader(field) + .expect( "Failed to find a reader for single fast field. This is a tantivy bug and \ it should never happen.", - ); + ) + .to_full() + .expect("temp migration solution"); u64_reader }) .collect::>(); diff --git a/src/lib.rs b/src/lib.rs index f57b99d1e..8144a0719 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1044,7 +1044,7 @@ pub mod tests { let fast_field_reader_res = segment_reader.fast_fields().i64(fast_field_signed); assert!(fast_field_reader_res.is_ok()); let fast_field_reader = fast_field_reader_res.unwrap(); - assert_eq!(fast_field_reader.get_val(0), 4i64) + assert_eq!(fast_field_reader.get_val(0), Some(4i64)) } {