diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 6c7dff815..a3f83c857 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -12,13 +12,15 @@ //! //! //! Fields have to be declared as `FAST` in the schema. -//! Currently supported fields are: u64, i64, f64, bytes and text. +//! Currently supported fields are: u64, i64, f64, bytes, ip and text. //! //! Fast fields are stored in with [different codecs](fastfield_codecs). The best codec is detected //! automatically, when serializing. //! //! Read access performance is comparable to that of an array lookup. +use std::net::Ipv6Addr; + use fastfield_codecs::MonotonicallyMappableToU64; pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveBitSet}; @@ -28,7 +30,7 @@ pub use self::facet_reader::FacetReader; pub(crate) use self::multivalued::{get_fastfield_codecs_for_multivalue, MultivalueStartIndex}; pub use self::multivalued::{ MultiValueIndex, MultiValueU128FastFieldWriter, MultiValuedFastFieldReader, - MultiValuedFastFieldWriter, MultiValuedU128FastFieldReader, + MultiValuedFastFieldWriter, }; pub(crate) use self::readers::type_and_cardinality; pub use self::readers::FastFieldReaders; @@ -47,6 +49,33 @@ mod readers; mod serializer; mod writer; +/// Trait for types that provide a zero value. +/// +/// The resulting value is never used, just as placeholder, e.g. for `vec.resize()`. +pub trait MakeZero { + /// Build a default value. This default value is never used, so the value does not + /// really matter. + fn make_zero() -> Self; +} + +impl MakeZero for T { + fn make_zero() -> Self { + T::from_u64(0) + } +} + +impl MakeZero for u128 { + fn make_zero() -> Self { + 0 + } +} + +impl MakeZero for Ipv6Addr { + fn make_zero() -> Self { + Ipv6Addr::from(0u128.to_be_bytes()) + } +} + /// Trait for types that are allowed for fast fields: /// (u64, i64 and f64, bool, DateTime). pub trait FastValue: @@ -54,12 +83,6 @@ pub trait FastValue: { /// Returns the `schema::Type` for this FastValue. fn to_type() -> Type; - - /// Build a default value. This default value is never used, so the value does not - /// really matter. - fn make_zero() -> Self { - Self::from_u64(0u64) - } } impl FastValue for u64 { @@ -101,12 +124,6 @@ impl FastValue for DateTime { fn to_type() -> Type { Type::Date } - - fn make_zero() -> Self { - DateTime { - timestamp_micros: 0, - } - } } fn value_to_u64(value: &Value) -> crate::Result { @@ -520,11 +537,6 @@ mod tests { Ok(()) } - #[test] - fn test_default_date() { - assert_eq!(0, DateTime::make_zero().into_timestamp_secs()); - } - fn get_vals_for_docs(ff: &MultiValuedFastFieldReader, docs: Range) -> Vec { let mut all = vec![]; diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index e4e6ede70..4e9957815 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -5,7 +5,7 @@ mod writer; use fastfield_codecs::FastFieldCodecType; pub use index::MultiValueIndex; -pub use self::reader::{MultiValuedFastFieldReader, MultiValuedU128FastFieldReader}; +pub use self::reader::MultiValuedFastFieldReader; pub(crate) use self::writer::MultivalueStartIndex; pub use self::writer::{MultiValueU128FastFieldWriter, MultiValuedFastFieldWriter}; diff --git a/src/fastfield/multivalued/reader.rs b/src/fastfield/multivalued/reader.rs index 5593737ee..57484ca82 100644 --- a/src/fastfield/multivalued/reader.rs +++ b/src/fastfield/multivalued/reader.rs @@ -1,107 +1,30 @@ use std::ops::{Range, RangeInclusive}; use std::sync::Arc; -use fastfield_codecs::{Column, MonotonicallyMappableToU128}; +use fastfield_codecs::Column; use super::MultiValueIndex; -use crate::fastfield::FastValue; +use crate::fastfield::MakeZero; use crate::DocId; -/// Reader for a multivalued `u64` fast field. +/// Reader for a multivalued fast field. /// -/// The reader is implemented as two `u64` fast field. +/// The reader is implemented as two fast fields, one u64 fast field for the index and one for the +/// values. /// -/// The `vals_reader` will access the concatenated list of all -/// values for all reader. -/// The `idx_reader` associated, for each document, the index of its first value. -/// Stores the start position for each document. +/// The `vals_reader` will access the concatenated list of all values. +/// The `idx_reader` associates, for each document, the index of its first value. #[derive(Clone)] -pub struct MultiValuedFastFieldReader { - idx_reader: MultiValueIndex, - vals_reader: Arc>, -} - -impl MultiValuedFastFieldReader { - pub(crate) fn open( - idx_reader: Arc>, - vals_reader: Arc>, - ) -> MultiValuedFastFieldReader { - MultiValuedFastFieldReader { - idx_reader: MultiValueIndex::new(idx_reader), - vals_reader, - } - } - - /// Returns the array of values associated with the given `doc`. - #[inline] - fn get_vals_for_range(&self, range: Range, vals: &mut Vec) { - let len = (range.end - range.start) as usize; - vals.resize(len, Item::make_zero()); - self.vals_reader - .get_range(range.start as u64, &mut vals[..]); - } - - /// Returns the array of values associated with the given `doc`. - #[inline] - pub fn get_vals(&self, doc: DocId, vals: &mut Vec) { - let range = self.idx_reader.range(doc); - self.get_vals_for_range(range, vals); - } - - /// returns the multivalue index - pub fn get_index_reader(&self) -> &MultiValueIndex { - &self.idx_reader - } - - /// Returns the minimum value for this fast field. - /// - /// The min value does not take in account of possible - /// deleted document, and should be considered as a lower bound - /// of the actual minimum value. - pub fn min_value(&self) -> Item { - self.vals_reader.min_value() - } - - /// Returns the maximum value for this fast field. - /// - /// The max value does not take in account of possible - /// deleted document, and should be considered as an upper bound - /// of the actual maximum value. - pub fn max_value(&self) -> Item { - self.vals_reader.max_value() - } - - /// Returns the number of values associated with the document `DocId`. - #[inline] - pub fn num_vals(&self, doc: DocId) -> u32 { - self.idx_reader.num_vals_for_doc(doc) - } - - /// Returns the overall number of values in this field. - #[inline] - pub fn total_num_vals(&self) -> u32 { - self.idx_reader.total_num_vals() - } -} - -/// Reader for a multivalued `u128` fast field. -/// -/// The reader is implemented as a `u64` fast field for the index and a `u128` fast field. -/// -/// The `vals_reader` will access the concatenated list of all -/// values for all reader. -/// The `idx_reader` associated, for each document, the index of its first value. -#[derive(Clone)] -pub struct MultiValuedU128FastFieldReader { +pub struct MultiValuedFastFieldReader { idx_reader: MultiValueIndex, vals_reader: Arc>, } -impl MultiValuedU128FastFieldReader { +impl MultiValuedFastFieldReader { pub(crate) fn open( idx_reader: Arc>, vals_reader: Arc>, - ) -> MultiValuedU128FastFieldReader { + ) -> MultiValuedFastFieldReader { Self { idx_reader: MultiValueIndex::new(idx_reader), vals_reader, @@ -122,7 +45,7 @@ impl MultiValuedU128FastFieldReader { #[inline] fn get_vals_for_range(&self, range: Range, vals: &mut Vec) { let len = (range.end - range.start) as usize; - vals.resize(len, T::from_u128(0)); + vals.resize(len, T::make_zero()); self.vals_reader .get_range(range.start as u64, &mut vals[..]); } diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index 257c8345a..6da352867 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -3,11 +3,9 @@ use std::sync::Arc; use fastfield_codecs::{open, open_u128, Column}; -use super::multivalued::MultiValuedU128FastFieldReader; +use super::multivalued::MultiValuedFastFieldReader; use crate::directory::{CompositeFile, FileSlice}; -use crate::fastfield::{ - BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader, -}; +use crate::fastfield::{BytesFastFieldReader, FastFieldNotAvailableError, FastValue}; use crate::schema::{Cardinality, Field, FieldType, Schema}; use crate::space_usage::PerFieldSpaceUsage; use crate::{DateTime, TantivyError}; @@ -161,20 +159,14 @@ impl FastFieldReaders { /// Returns the `ip` fast field reader reader associated to `field`. /// /// If `field` is not a u128 fast field, this method returns an Error. - pub fn ip_addrs( - &self, - field: Field, - ) -> crate::Result> { + pub fn ip_addrs(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::U128, Cardinality::MultiValues)?; let idx_reader: Arc> = self.typed_fast_field_reader(field)?; let bytes = self.fast_field_data(field, 1)?.read_bytes()?; let vals_reader = open_u128::(bytes)?; - Ok(MultiValuedU128FastFieldReader::open( - idx_reader, - vals_reader, - )) + Ok(MultiValuedFastFieldReader::open(idx_reader, vals_reader)) } /// Returns the `u128` fast field reader reader associated to `field`. @@ -189,17 +181,14 @@ impl FastFieldReaders { /// Returns the `u128` multi-valued fast field reader reader associated to `field`. /// /// If `field` is not a u128 multi-valued fast field, this method returns an Error. - pub fn u128s(&self, field: Field) -> crate::Result> { + pub fn u128s(&self, field: Field) -> crate::Result> { self.check_type(field, FastType::U128, Cardinality::MultiValues)?; let idx_reader: Arc> = self.typed_fast_field_reader(field)?; let bytes = self.fast_field_data(field, 1)?.read_bytes()?; let vals_reader = open_u128::(bytes)?; - Ok(MultiValuedU128FastFieldReader::open( - idx_reader, - vals_reader, - )) + Ok(MultiValuedFastFieldReader::open(idx_reader, vals_reader)) } /// Returns the `u64` fast field reader reader associated with `field`, regardless of whether diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 0f37852db..91ba30198 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -13,7 +13,7 @@ use crate::docset::{DocSet, TERMINATED}; use crate::error::DataCorruption; use crate::fastfield::{ get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer, - MultiValueIndex, MultiValuedFastFieldReader, MultiValuedU128FastFieldReader, + MultiValueIndex, MultiValuedFastFieldReader, }; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping}; @@ -331,18 +331,18 @@ impl IndexMerger { fast_field_serializer: &mut CompositeFastFieldSerializer, doc_id_mapping: &SegmentDocIdMapping, ) -> crate::Result<()> { - let segment_and_ff_readers: Vec<(&SegmentReader, MultiValuedU128FastFieldReader)> = - self.readers - .iter() - .map(|segment_reader| { - let ff_reader: MultiValuedU128FastFieldReader = - segment_reader.fast_fields().u128s(field).expect( - "Failed to find index for multivalued field. This is a bug in \ - tantivy, please report.", - ); - (segment_reader, ff_reader) - }) - .collect::>(); + let segment_and_ff_readers: Vec<(&SegmentReader, MultiValuedFastFieldReader)> = self + .readers + .iter() + .map(|segment_reader| { + let ff_reader: MultiValuedFastFieldReader = + segment_reader.fast_fields().u128s(field).expect( + "Failed to find index for multivalued field. This is a bug in tantivy, \ + please report.", + ); + (segment_reader, ff_reader) + }) + .collect::>(); Self::write_1_n_fast_field_idx_generic( field, diff --git a/src/lib.rs b/src/lib.rs index 79fe68987..7b670e4e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -144,7 +144,7 @@ use crate::time::{OffsetDateTime, PrimitiveDateTime, UtcOffset}; /// All constructors and conversions are provided as explicit /// functions and not by implementing any `From`/`Into` traits /// to prevent unintended usage. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct DateTime { // Timestamp in microseconds. pub(crate) timestamp_micros: i64, diff --git a/src/query/mod.rs b/src/query/mod.rs index 9bf897ed6..ed0672070 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -16,7 +16,6 @@ mod phrase_query; mod query; mod query_parser; mod range_query; -mod range_query_ip_fastfield; mod regex_query; mod reqopt_scorer; mod scorer; diff --git a/src/query/range_query/fast_field_range_query.rs b/src/query/range_query/fast_field_range_query.rs new file mode 100644 index 000000000..b493c6537 --- /dev/null +++ b/src/query/range_query/fast_field_range_query.rs @@ -0,0 +1,209 @@ +use std::ops::RangeInclusive; +use std::sync::Arc; + +use fastfield_codecs::Column; + +use crate::fastfield::{MakeZero, MultiValuedFastFieldReader}; +use crate::{DocId, DocSet, TERMINATED}; + +/// Helper to have a cursor over a vec of docids +struct VecCursor { + docs: Vec, + current_pos: usize, +} +impl VecCursor { + fn new() -> Self { + Self { + docs: Vec::with_capacity(32), + current_pos: 0, + } + } + fn next(&mut self) -> Option { + self.current_pos += 1; + self.current() + } + #[inline] + fn current(&self) -> Option { + self.docs.get(self.current_pos).copied() + } + fn get_cleared_data(&mut self) -> &mut Vec { + self.docs.clear(); + self.current_pos = 0; + &mut self.docs + } + fn last_value(&self) -> Option { + self.docs.iter().last().cloned() + } + fn is_empty(&self) -> bool { + self.current().is_none() + } +} + +pub(crate) enum FastFieldCardinality { + SingleValue(Arc>), + MultiValue(MultiValuedFastFieldReader), +} + +impl FastFieldCardinality { + fn num_docs(&self) -> u32 { + match self { + FastFieldCardinality::SingleValue(single_value) => single_value.num_vals(), + FastFieldCardinality::MultiValue(multi_value) => { + multi_value.get_index_reader().num_docs() + } + } + } +} + +pub(crate) struct RangeDocSet { + /// The range filter on the values. + value_range: RangeInclusive, + fast_field: FastFieldCardinality, + /// The next docid start range to fetch (inclusive). + next_fetch_start: u32, + /// Number of docs range checked in a batch. + /// + /// There are two patterns. + /// - We do a full scan. => We can load large chunks. We don't know in advance if seek call + /// will come, so we start with small chunks + /// - We load docs, interspersed with seek calls. When there are big jumps in the seek, we + /// should load small chunks. When the seeks are small, we can employ the same strategy as on a + /// full scan. + fetch_horizon: u32, + /// Current batch of loaded docs. + loaded_docs: VecCursor, + last_seek_pos_opt: Option, +} + +const DEFAULT_FETCH_HORIZON: u32 = 128; +impl RangeDocSet { + pub(crate) fn new(value_range: RangeInclusive, fast_field: FastFieldCardinality) -> Self { + let mut range_docset = Self { + value_range, + fast_field, + loaded_docs: VecCursor::new(), + next_fetch_start: 0, + fetch_horizon: DEFAULT_FETCH_HORIZON, + last_seek_pos_opt: None, + }; + range_docset.reset_fetch_range(); + range_docset.fetch_block(); + range_docset + } + + fn reset_fetch_range(&mut self) { + self.fetch_horizon = DEFAULT_FETCH_HORIZON; + } + + /// Returns true if more data could be fetched + fn fetch_block(&mut self) { + const MAX_HORIZON: u32 = 100_000; + while self.loaded_docs.is_empty() { + let finished_to_end = self.fetch_horizon(self.fetch_horizon); + if finished_to_end { + break; + } + // Fetch more data, increase horizon. Horizon only gets reset when doing a seek. + self.fetch_horizon = (self.fetch_horizon * 2).min(MAX_HORIZON); + } + } + + /// check if the distance between the seek calls is large + fn is_last_seek_distance_large(&self, new_seek: DocId) -> bool { + if let Some(last_seek_pos) = self.last_seek_pos_opt { + (new_seek - last_seek_pos) >= 128 + } else { + true + } + } + + /// Fetches a block for docid range [next_fetch_start .. next_fetch_start + HORIZON] + fn fetch_horizon(&mut self, horizon: u32) -> bool { + let mut finished_to_end = false; + + let limit = self.fast_field.num_docs(); + let mut end = self.next_fetch_start + horizon; + if end >= limit { + end = limit; + finished_to_end = true; + } + + match &self.fast_field { + FastFieldCardinality::MultiValue(multi) => { + let last_value = self.loaded_docs.last_value(); + + multi.get_docids_for_value_range( + self.value_range.clone(), + self.next_fetch_start..end, + self.loaded_docs.get_cleared_data(), + ); + // In case of multivalues, we may have an overlap of the same docid between fetching + // blocks + if let Some(last_value) = last_value { + while self.loaded_docs.current() == Some(last_value) { + self.loaded_docs.next(); + } + } + } + FastFieldCardinality::SingleValue(single) => { + single.get_docids_for_value_range( + self.value_range.clone(), + self.next_fetch_start..end, + self.loaded_docs.get_cleared_data(), + ); + } + } + self.next_fetch_start = end; + + finished_to_end + } +} + +impl DocSet for RangeDocSet { + #[inline] + fn advance(&mut self) -> DocId { + if let Some(docid) = self.loaded_docs.next() { + docid + } else { + if self.next_fetch_start >= self.fast_field.num_docs() { + return TERMINATED; + } + self.fetch_block(); + self.loaded_docs.current().unwrap_or(TERMINATED) + } + } + + #[inline] + fn doc(&self) -> DocId { + self.loaded_docs.current().unwrap_or(TERMINATED) + } + + /// Advances the `DocSet` forward until reaching the target, or going to the + /// lowest [`DocId`] greater than the target. + /// + /// If the end of the `DocSet` is reached, [`TERMINATED`] is returned. + /// + /// Calling `.seek(target)` on a terminated `DocSet` is legal. Implementation + /// of `DocSet` should support it. + /// + /// Calling `seek(TERMINATED)` is also legal and is the normal way to consume a `DocSet`. + fn seek(&mut self, target: DocId) -> DocId { + if self.is_last_seek_distance_large(target) { + self.reset_fetch_range(); + } + if target > self.next_fetch_start { + self.next_fetch_start = target; + } + let mut doc = self.doc(); + debug_assert!(doc <= target); + while doc < target { + doc = self.advance(); + } + self.last_seek_pos_opt = Some(target); + doc + } + + fn size_hint(&self) -> u32 { + 0 // heuristic possible by checking number of hits when fetching a block + } +} diff --git a/src/query/range_query/mod.rs b/src/query/range_query/mod.rs new file mode 100644 index 000000000..a1f07f498 --- /dev/null +++ b/src/query/range_query/mod.rs @@ -0,0 +1,5 @@ +mod fast_field_range_query; +mod range_query; +mod range_query_ip_fastfield; + +pub use self::range_query::RangeQuery; diff --git a/src/query/range_query.rs b/src/query/range_query/range_query.rs similarity index 99% rename from src/query/range_query.rs rename to src/query/range_query/range_query.rs index 91332b008..6ce2dbd70 100644 --- a/src/query/range_query.rs +++ b/src/query/range_query/range_query.rs @@ -6,7 +6,7 @@ use common::BitSet; use crate::core::SegmentReader; use crate::error::TantivyError; use crate::query::explanation::does_not_match; -use crate::query::range_query_ip_fastfield::IPFastFieldRangeWeight; +use crate::query::range_query::range_query_ip_fastfield::IPFastFieldRangeWeight; use crate::query::{BitSetDocSet, ConstScorer, EnableScoring, Explanation, Query, Scorer, Weight}; use crate::schema::{Field, IndexRecordOption, Term, Type}; use crate::termdict::{TermDictionary, TermStreamer}; diff --git a/src/query/range_query_ip_fastfield.rs b/src/query/range_query/range_query_ip_fastfield.rs similarity index 69% rename from src/query/range_query_ip_fastfield.rs rename to src/query/range_query/range_query_ip_fastfield.rs index a696e3a40..90e22c858 100644 --- a/src/query/range_query_ip_fastfield.rs +++ b/src/query/range_query/range_query_ip_fastfield.rs @@ -4,16 +4,15 @@ use std::net::Ipv6Addr; use std::ops::{Bound, RangeInclusive}; -use std::sync::Arc; use common::BinarySerializable; -use fastfield_codecs::{Column, MonotonicallyMappableToU128}; +use fastfield_codecs::MonotonicallyMappableToU128; +use super::fast_field_range_query::{FastFieldCardinality, RangeDocSet}; use super::range_query::map_bound; -use super::{ConstScorer, Explanation, Scorer, Weight}; -use crate::fastfield::MultiValuedU128FastFieldReader; +use crate::query::{ConstScorer, Explanation, Scorer, Weight}; use crate::schema::{Cardinality, Field}; -use crate::{DocId, DocSet, Score, SegmentReader, TantivyError, TERMINATED}; +use crate::{DocId, DocSet, Score, SegmentReader, TantivyError}; /// `IPFastFieldRangeWeight` uses the ip address fast field to execute range queries. pub struct IPFastFieldRangeWeight { @@ -51,9 +50,9 @@ impl Weight for IPFastFieldRangeWeight { ip_addr_fast_field.min_value(), ip_addr_fast_field.max_value(), ); - let docset = IpRangeDocSet::new( + let docset = RangeDocSet::new( value_range, - IpFastFieldCardinality::SingleValue(ip_addr_fast_field), + FastFieldCardinality::SingleValue(ip_addr_fast_field), ); Ok(Box::new(ConstScorer::new(docset, boost))) } @@ -65,9 +64,9 @@ impl Weight for IPFastFieldRangeWeight { ip_addr_fast_field.min_value(), ip_addr_fast_field.max_value(), ); - let docset = IpRangeDocSet::new( + let docset = RangeDocSet::new( value_range, - IpFastFieldCardinality::MultiValue(ip_addr_fast_field), + FastFieldCardinality::MultiValue(ip_addr_fast_field), ); Ok(Box::new(ConstScorer::new(docset, boost))) } @@ -108,211 +107,6 @@ fn bound_to_value_range( start_value..=end_value } -/// Helper to have a cursor over a vec of docids -struct VecCursor { - docs: Vec, - current_pos: usize, -} -impl VecCursor { - fn new() -> Self { - Self { - docs: Vec::with_capacity(32), - current_pos: 0, - } - } - fn next(&mut self) -> Option { - self.current_pos += 1; - self.current() - } - #[inline] - fn current(&self) -> Option { - self.docs.get(self.current_pos).copied() - } - fn get_cleared_data(&mut self) -> &mut Vec { - self.docs.clear(); - self.current_pos = 0; - &mut self.docs - } - fn last_value(&self) -> Option { - self.docs.iter().last().cloned() - } - fn is_empty(&self) -> bool { - self.current_pos >= self.docs.len() - } -} - -pub(crate) enum IpFastFieldCardinality { - SingleValue(Arc>), - MultiValue(MultiValuedU128FastFieldReader), -} - -impl IpFastFieldCardinality { - fn num_docs(&self) -> u32 { - match self { - IpFastFieldCardinality::SingleValue(single_value) => single_value.num_vals(), - IpFastFieldCardinality::MultiValue(multi_value) => { - multi_value.get_index_reader().num_docs() - } - } - } -} - -struct IpRangeDocSet { - /// The range filter on the values. - value_range: RangeInclusive, - ip_addr_fast_field: IpFastFieldCardinality, - /// The next docid start range to fetch (inclusive). - next_fetch_start: u32, - /// Number of docs range checked in a batch. - /// - /// There are two patterns. - /// - We do a full scan. => We can load large chunks. We don't know in advance if seek call - /// will come, so we start with small chunks - /// - We load docs, interspersed with seek calls. When there are big jumps in the seek, we - /// should load small chunks. When the seeks are small, we can employ the same strategy as on a - /// full scan. - fetch_horizon: u32, - /// Current batch of loaded docs. - loaded_docs: VecCursor, - last_seek_pos_opt: Option, -} - -const DEFAULT_FETCH_HORIZON: u32 = 128; -impl IpRangeDocSet { - fn new( - value_range: RangeInclusive, - ip_addr_fast_field: IpFastFieldCardinality, - ) -> Self { - let mut ip_range_docset = Self { - value_range, - ip_addr_fast_field, - loaded_docs: VecCursor::new(), - next_fetch_start: 0, - fetch_horizon: DEFAULT_FETCH_HORIZON, - last_seek_pos_opt: None, - }; - ip_range_docset.reset_fetch_range(); - ip_range_docset.fetch_block(); - ip_range_docset - } - - fn reset_fetch_range(&mut self) { - self.fetch_horizon = DEFAULT_FETCH_HORIZON; - } - - /// Returns true if more data could be fetched - fn fetch_block(&mut self) { - const MAX_HORIZON: u32 = 100_000; - while self.loaded_docs.is_empty() { - let finished_to_end = self.fetch_horizon(self.fetch_horizon); - if finished_to_end { - break; - } - // Fetch more data, increase horizon. Horizon only gets reset when doing a seek. - self.fetch_horizon = (self.fetch_horizon * 2).min(MAX_HORIZON); - } - } - - /// check if the distance between the seek calls is large - fn is_last_seek_distance_large(&self, new_seek: DocId) -> bool { - if let Some(last_seek_pos) = self.last_seek_pos_opt { - (new_seek - last_seek_pos) >= 128 - } else { - true - } - } - - /// Fetches a block for docid range [next_fetch_start .. next_fetch_start + HORIZON] - fn fetch_horizon(&mut self, horizon: u32) -> bool { - let mut finished_to_end = false; - - let limit = self.ip_addr_fast_field.num_docs(); - let mut end = self.next_fetch_start + horizon; - if end >= limit { - end = limit; - finished_to_end = true; - } - - match &self.ip_addr_fast_field { - IpFastFieldCardinality::MultiValue(multi) => { - let last_value = self.loaded_docs.last_value(); - - multi.get_docids_for_value_range( - self.value_range.clone(), - self.next_fetch_start..end, - self.loaded_docs.get_cleared_data(), - ); - // In case of multivalues, we may have an overlap of the same docid between fetching - // blocks - if let Some(last_value) = last_value { - while self.loaded_docs.current() == Some(last_value) { - self.loaded_docs.next(); - } - } - } - IpFastFieldCardinality::SingleValue(single) => { - single.get_docids_for_value_range( - self.value_range.clone(), - self.next_fetch_start..end, - self.loaded_docs.get_cleared_data(), - ); - } - } - self.next_fetch_start = end; - - finished_to_end - } -} - -impl DocSet for IpRangeDocSet { - #[inline] - fn advance(&mut self) -> DocId { - if let Some(docid) = self.loaded_docs.next() { - docid - } else { - if self.next_fetch_start >= self.ip_addr_fast_field.num_docs() { - return TERMINATED; - } - self.fetch_block(); - self.loaded_docs.current().unwrap_or(TERMINATED) - } - } - - #[inline] - fn doc(&self) -> DocId { - self.loaded_docs.current().unwrap_or(TERMINATED) - } - - /// Advances the `DocSet` forward until reaching the target, or going to the - /// lowest [`DocId`] greater than the target. - /// - /// If the end of the `DocSet` is reached, [`TERMINATED`] is returned. - /// - /// Calling `.seek(target)` on a terminated `DocSet` is legal. Implementation - /// of `DocSet` should support it. - /// - /// Calling `seek(TERMINATED)` is also legal and is the normal way to consume a `DocSet`. - fn seek(&mut self, target: DocId) -> DocId { - if self.is_last_seek_distance_large(target) { - self.reset_fetch_range(); - } - if target > self.next_fetch_start { - self.next_fetch_start = target; - } - let mut doc = self.doc(); - debug_assert!(doc <= target); - while doc < target { - doc = self.advance(); - } - self.last_seek_pos_opt = Some(target); - doc - } - - fn size_hint(&self) -> u32 { - 0 // heuristic possible by checking number of hits when fetching a block - } -} - #[cfg(test)] mod tests { use proptest::prelude::ProptestConfig; @@ -401,7 +195,7 @@ mod tests { let index = Index::create_in_ram(schema); { - let mut index_writer = index.writer(3_000_000).unwrap(); + let mut index_writer = index.writer(10_000_000).unwrap(); for doc in docs.iter() { index_writer .add_document(doc!(