diff --git a/Cargo.toml b/Cargo.toml index fb3e708ae..2cb8eb61e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy" readme = "README.md" keywords = ["search", "information", "retrieval"] edition = "2021" -rust-version = "1.86" +rust-version = "1.92" exclude = ["benches/*.json", "benches/*.txt"] [dependencies] diff --git a/src/collector/mod.rs b/src/collector/mod.rs index 0f8360d8d..e46c49c18 100644 --- a/src/collector/mod.rs +++ b/src/collector/mod.rs @@ -301,11 +301,14 @@ pub trait SegmentCollector: 'static { /// The query pushes the scored document to the collector via this method. fn collect(&mut self, doc: DocId, score: Score); - /// The query pushes the scored document to the collector via this method. + /// The query pushes the matched documents to the collector via this method. /// This method is used when the collector does not require scoring. /// - /// See [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN) for the - /// buffer size passed to the collector. + /// `docs` is a block of matched doc ids. Doc ids are produced in increasing + /// order, in windows of [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN), + /// but several windows are accumulated before being flushed here, so the + /// block may be larger than `COLLECT_BLOCK_BUFFER_LEN`. Implementations must + /// not assume any particular maximum length. fn collect_block(&mut self, docs: &[DocId]) { for doc in docs { self.collect(*doc, 0.0); diff --git a/src/docset.rs b/src/docset.rs index c02bbbfc3..af4dedca9 100644 --- a/src/docset.rs +++ b/src/docset.rs @@ -11,9 +11,14 @@ use crate::DocId; /// to compare `[u32; 4]`. pub const TERMINATED: DocId = i32::MAX as u32; -/// The collect_block method on `SegmentCollector` uses a buffer of this size. -/// Passed results to `collect_block` will not exceed this size and will be -/// exactly this size as long as we can fill the buffer. +/// Window size used by [`DocSet::fill_buffer`]: a single `fill_buffer` call +/// writes at most this many doc ids, and exactly this many as long as the +/// `DocSet` is not exhausted. +/// +/// Note that this is *not* the maximum length of the slice passed to +/// `SegmentCollector::collect_block`: the collection loop accumulates several +/// such windows into a larger buffer before flushing it, so `collect_block` +/// may receive a block larger than `COLLECT_BLOCK_BUFFER_LEN`. pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64; /// Number of `TinySet` (64-bit) buckets in a block used by [`DocSet::fill_bitset_block`]. diff --git a/src/query/boolean_query/boolean_weight.rs b/src/query/boolean_query/boolean_weight.rs index f62cffb57..3aafbd115 100644 --- a/src/query/boolean_query/boolean_weight.rs +++ b/src/query/boolean_query/boolean_weight.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use crate::docset::COLLECT_BLOCK_BUFFER_LEN; use crate::index::SegmentReader; use crate::postings::FreqReadingOption; use crate::query::disjunction::Disjunction; @@ -531,13 +530,12 @@ impl Weight for BooleanWeight crate::Result<()> { let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?; let num_docs = reader.num_docs(); - let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN]; match scorer { SpecializedScorer::TermUnion(term_scorers) => { let mut union_scorer = BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs); - for_each_docset_buffered(&mut union_scorer, &mut buffer, callback); + for_each_docset_buffered(&mut union_scorer, callback); } SpecializedScorer::TermIntersection(term_scorers) => { let boxed_scorers: Vec> = term_scorers @@ -545,10 +543,10 @@ impl Weight for BooleanWeight) .collect(); let mut intersection = intersect_scorers(boxed_scorers, num_docs); - for_each_docset_buffered(intersection.as_mut(), &mut buffer, callback); + for_each_docset_buffered(intersection.as_mut(), callback); } SpecializedScorer::Other(mut scorer) => { - for_each_docset_buffered(scorer.as_mut(), &mut buffer, callback); + for_each_docset_buffered(scorer.as_mut(), callback); } } Ok(()) diff --git a/src/query/term_query/term_weight.rs b/src/query/term_query/term_weight.rs index 89b527cca..5bea86943 100644 --- a/src/query/term_query/term_weight.rs +++ b/src/query/term_query/term_weight.rs @@ -1,5 +1,5 @@ use super::term_scorer::TermScorer; -use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN}; +use crate::docset::DocSet; use crate::fieldnorm::FieldNormReader; use crate::index::SegmentReader; use crate::postings::SegmentPostings; @@ -92,13 +92,11 @@ impl Weight for TermWeight { ) -> crate::Result<()> { match self.specialized_scorer(reader, 1.0)? { TermOrEmptyOrAllScorer::TermScorer(mut term_scorer) => { - let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN]; - for_each_docset_buffered(&mut term_scorer, &mut buffer, callback); + for_each_docset_buffered(&mut term_scorer, callback); } TermOrEmptyOrAllScorer::Empty => {} TermOrEmptyOrAllScorer::AllMatch(mut all_scorer) => { - let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN]; - for_each_docset_buffered(&mut all_scorer, &mut buffer, callback); + for_each_docset_buffered(&mut all_scorer, callback); } }; diff --git a/src/query/weight.rs b/src/query/weight.rs index 23ff55c04..fff518133 100644 --- a/src/query/weight.rs +++ b/src/query/weight.rs @@ -17,18 +17,56 @@ pub(crate) fn for_each_scorer( } } -/// Iterates through all of the documents matched by the DocSet -/// `DocSet`. +/// Number of `COLLECT_BLOCK_BUFFER_LEN`-sized windows accumulated into the large +/// buffer before it is flushed to the collector via `collect_block`. +const NUM_WINDOWS_PER_BLOCK: usize = 32; +/// Size of the buffer accumulated before invoking the callback (2_048 = 32 * 64). +/// `fill_buffer` keeps writing `COLLECT_BLOCK_BUFFER_LEN`-sized windows; this only +/// changes how much we accumulate before flushing. +const LARGE_COLLECT_BUFFER_LEN: usize = COLLECT_BLOCK_BUFFER_LEN * NUM_WINDOWS_PER_BLOCK; + +/// Iterates through all of the documents matched by the `DocSet`, flushing +/// blocks of up to `LARGE_COLLECT_BUFFER_LEN` doc ids to `callback`. +/// +/// `fill_buffer` only ever writes `COLLECT_BLOCK_BUFFER_LEN` doc ids at a time, +/// so we accumulate several such windows into a single larger buffer before +/// handing it to the collector. This amortizes the per-`collect_block` overhead +/// (virtual dispatch, aggregation setup) over more documents. #[inline] pub(crate) fn for_each_docset_buffered( docset: &mut T, - buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN], mut callback: impl FnMut(&[DocId]), ) { + // Heap-allocated once per call (i.e. once per segment in the no-score path). + // `new_zeroed_slice` zeroes directly on the heap, avoiding a 2_048-element + // stack temporary. + // SAFETY: an all-zero bit pattern is a valid value for every `DocId` (u32), + // so the zeroed slice is fully initialized. + let mut buffer: Box<[DocId]> = + unsafe { Box::new_zeroed_slice(LARGE_COLLECT_BUFFER_LEN).assume_init() }; loop { - let num_items = docset.fill_buffer(buffer); - callback(&buffer[..num_items]); - if num_items != buffer.len() { + let mut filled = 0; + let mut reached_end = false; + // Fill the large buffer one `COLLECT_BLOCK_BUFFER_LEN` window at a time. + // `chunks_exact_mut` yields windows of exactly `COLLECT_BLOCK_BUFFER_LEN` + // because `LARGE_COLLECT_BUFFER_LEN` is a multiple of it (empty remainder). + // The windows are contiguous and filled in order, so the doc ids always + // occupy the contiguous prefix `buffer[..filled]`. + for window in buffer.chunks_exact_mut(COLLECT_BLOCK_BUFFER_LEN) { + // SAFETY: each `window` is a slice of exactly `COLLECT_BLOCK_BUFFER_LEN` + // elements, so reinterpreting its start pointer as a fixed-size array + // reference of that length is valid. + let window: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN] = + unsafe { &mut *window.as_mut_ptr().cast::<[DocId; COLLECT_BLOCK_BUFFER_LEN]>() }; + let num_items = docset.fill_buffer(window); + filled += num_items; + if num_items != COLLECT_BLOCK_BUFFER_LEN { + reached_end = true; + break; + } + } + callback(&buffer[..filled]); + if reached_end { break; } } @@ -104,9 +142,7 @@ pub trait Weight: Send + Sync + 'static { callback: &mut dyn FnMut(&[DocId]), ) -> crate::Result<()> { let mut docset = self.scorer(reader, 1.0)?; - - let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN]; - for_each_docset_buffered(&mut docset, &mut buffer, callback); + for_each_docset_buffered(&mut docset, callback); Ok(()) }