mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-02 16:40:43 +00:00
Buffer up to 2048 doc ids in for_each_docset_buffered
The no-score collection path (Weight::for_each_no_score) handed the collector's collect_block one COLLECT_BLOCK_BUFFER_LEN (64) block at a time. For aggregations this is the dominant path, and 64 docs per collect_block under-amortizes the per-call overhead. for_each_docset_buffered now owns a 2048-element heap buffer and fills it through successive fill_buffer calls over 64-element windows, flushing a single larger block to collect_block. fill_buffer keeps its 64-element window contract, so no DocSet implementation changes. The buffer is allocated with Box::new_zeroed_slice (stable since 1.92, hence the MSRV bump) to zero directly on the heap.
This commit is contained in:
@@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2021"
|
||||
rust-version = "1.86"
|
||||
rust-version = "1.92"
|
||||
exclude = ["benches/*.json", "benches/*.txt"]
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -301,11 +301,14 @@ pub trait SegmentCollector: 'static {
|
||||
/// The query pushes the scored document to the collector via this method.
|
||||
fn collect(&mut self, doc: DocId, score: Score);
|
||||
|
||||
/// The query pushes the scored document to the collector via this method.
|
||||
/// The query pushes the matched documents to the collector via this method.
|
||||
/// This method is used when the collector does not require scoring.
|
||||
///
|
||||
/// See [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN) for the
|
||||
/// buffer size passed to the collector.
|
||||
/// `docs` is a block of matched doc ids. Doc ids are produced in increasing
|
||||
/// order, in windows of [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN),
|
||||
/// but several windows are accumulated before being flushed here, so the
|
||||
/// block may be larger than `COLLECT_BLOCK_BUFFER_LEN`. Implementations must
|
||||
/// not assume any particular maximum length.
|
||||
fn collect_block(&mut self, docs: &[DocId]) {
|
||||
for doc in docs {
|
||||
self.collect(*doc, 0.0);
|
||||
|
||||
@@ -11,9 +11,14 @@ use crate::DocId;
|
||||
/// to compare `[u32; 4]`.
|
||||
pub const TERMINATED: DocId = i32::MAX as u32;
|
||||
|
||||
/// The collect_block method on `SegmentCollector` uses a buffer of this size.
|
||||
/// Passed results to `collect_block` will not exceed this size and will be
|
||||
/// exactly this size as long as we can fill the buffer.
|
||||
/// Window size used by [`DocSet::fill_buffer`]: a single `fill_buffer` call
|
||||
/// writes at most this many doc ids, and exactly this many as long as the
|
||||
/// `DocSet` is not exhausted.
|
||||
///
|
||||
/// Note that this is *not* the maximum length of the slice passed to
|
||||
/// `SegmentCollector::collect_block`: the collection loop accumulates several
|
||||
/// such windows into a larger buffer before flushing it, so `collect_block`
|
||||
/// may receive a block larger than `COLLECT_BLOCK_BUFFER_LEN`.
|
||||
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;
|
||||
|
||||
/// Number of `TinySet` (64-bit) buckets in a block used by [`DocSet::fill_bitset_block`].
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
|
||||
use crate::index::SegmentReader;
|
||||
use crate::postings::FreqReadingOption;
|
||||
use crate::query::disjunction::Disjunction;
|
||||
@@ -531,13 +530,12 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
|
||||
) -> crate::Result<()> {
|
||||
let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?;
|
||||
let num_docs = reader.num_docs();
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
|
||||
match scorer {
|
||||
SpecializedScorer::TermUnion(term_scorers) => {
|
||||
let mut union_scorer =
|
||||
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
|
||||
for_each_docset_buffered(&mut union_scorer, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut union_scorer, callback);
|
||||
}
|
||||
SpecializedScorer::TermIntersection(term_scorers) => {
|
||||
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
|
||||
@@ -545,10 +543,10 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
|
||||
.map(|term_scorer| Box::new(term_scorer) as Box<dyn Scorer>)
|
||||
.collect();
|
||||
let mut intersection = intersect_scorers(boxed_scorers, num_docs);
|
||||
for_each_docset_buffered(intersection.as_mut(), &mut buffer, callback);
|
||||
for_each_docset_buffered(intersection.as_mut(), callback);
|
||||
}
|
||||
SpecializedScorer::Other(mut scorer) => {
|
||||
for_each_docset_buffered(scorer.as_mut(), &mut buffer, callback);
|
||||
for_each_docset_buffered(scorer.as_mut(), callback);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use super::term_scorer::TermScorer;
|
||||
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
|
||||
use crate::docset::DocSet;
|
||||
use crate::fieldnorm::FieldNormReader;
|
||||
use crate::index::SegmentReader;
|
||||
use crate::postings::SegmentPostings;
|
||||
@@ -92,13 +92,11 @@ impl Weight for TermWeight {
|
||||
) -> crate::Result<()> {
|
||||
match self.specialized_scorer(reader, 1.0)? {
|
||||
TermOrEmptyOrAllScorer::TermScorer(mut term_scorer) => {
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
for_each_docset_buffered(&mut term_scorer, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut term_scorer, callback);
|
||||
}
|
||||
TermOrEmptyOrAllScorer::Empty => {}
|
||||
TermOrEmptyOrAllScorer::AllMatch(mut all_scorer) => {
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
for_each_docset_buffered(&mut all_scorer, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut all_scorer, callback);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -17,18 +17,56 @@ pub(crate) fn for_each_scorer<TScorer: Scorer + ?Sized>(
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates through all of the documents matched by the DocSet
|
||||
/// `DocSet`.
|
||||
/// Number of `COLLECT_BLOCK_BUFFER_LEN`-sized windows accumulated into the large
|
||||
/// buffer before it is flushed to the collector via `collect_block`.
|
||||
const NUM_WINDOWS_PER_BLOCK: usize = 32;
|
||||
/// Size of the buffer accumulated before invoking the callback (2_048 = 32 * 64).
|
||||
/// `fill_buffer` keeps writing `COLLECT_BLOCK_BUFFER_LEN`-sized windows; this only
|
||||
/// changes how much we accumulate before flushing.
|
||||
const LARGE_COLLECT_BUFFER_LEN: usize = COLLECT_BLOCK_BUFFER_LEN * NUM_WINDOWS_PER_BLOCK;
|
||||
|
||||
/// Iterates through all of the documents matched by the `DocSet`, flushing
|
||||
/// blocks of up to `LARGE_COLLECT_BUFFER_LEN` doc ids to `callback`.
|
||||
///
|
||||
/// `fill_buffer` only ever writes `COLLECT_BLOCK_BUFFER_LEN` doc ids at a time,
|
||||
/// so we accumulate several such windows into a single larger buffer before
|
||||
/// handing it to the collector. This amortizes the per-`collect_block` overhead
|
||||
/// (virtual dispatch, aggregation setup) over more documents.
|
||||
#[inline]
|
||||
pub(crate) fn for_each_docset_buffered<T: DocSet + ?Sized>(
|
||||
docset: &mut T,
|
||||
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
mut callback: impl FnMut(&[DocId]),
|
||||
) {
|
||||
// Heap-allocated once per call (i.e. once per segment in the no-score path).
|
||||
// `new_zeroed_slice` zeroes directly on the heap, avoiding a 2_048-element
|
||||
// stack temporary.
|
||||
// SAFETY: an all-zero bit pattern is a valid value for every `DocId` (u32),
|
||||
// so the zeroed slice is fully initialized.
|
||||
let mut buffer: Box<[DocId]> =
|
||||
unsafe { Box::new_zeroed_slice(LARGE_COLLECT_BUFFER_LEN).assume_init() };
|
||||
loop {
|
||||
let num_items = docset.fill_buffer(buffer);
|
||||
callback(&buffer[..num_items]);
|
||||
if num_items != buffer.len() {
|
||||
let mut filled = 0;
|
||||
let mut reached_end = false;
|
||||
// Fill the large buffer one `COLLECT_BLOCK_BUFFER_LEN` window at a time.
|
||||
// `chunks_exact_mut` yields windows of exactly `COLLECT_BLOCK_BUFFER_LEN`
|
||||
// because `LARGE_COLLECT_BUFFER_LEN` is a multiple of it (empty remainder).
|
||||
// The windows are contiguous and filled in order, so the doc ids always
|
||||
// occupy the contiguous prefix `buffer[..filled]`.
|
||||
for window in buffer.chunks_exact_mut(COLLECT_BLOCK_BUFFER_LEN) {
|
||||
// SAFETY: each `window` is a slice of exactly `COLLECT_BLOCK_BUFFER_LEN`
|
||||
// elements, so reinterpreting its start pointer as a fixed-size array
|
||||
// reference of that length is valid.
|
||||
let window: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN] =
|
||||
unsafe { &mut *window.as_mut_ptr().cast::<[DocId; COLLECT_BLOCK_BUFFER_LEN]>() };
|
||||
let num_items = docset.fill_buffer(window);
|
||||
filled += num_items;
|
||||
if num_items != COLLECT_BLOCK_BUFFER_LEN {
|
||||
reached_end = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
callback(&buffer[..filled]);
|
||||
if reached_end {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -104,9 +142,7 @@ pub trait Weight: Send + Sync + 'static {
|
||||
callback: &mut dyn FnMut(&[DocId]),
|
||||
) -> crate::Result<()> {
|
||||
let mut docset = self.scorer(reader, 1.0)?;
|
||||
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
for_each_docset_buffered(&mut docset, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut docset, callback);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user