mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-02 00:20:42 +00:00
Compare commits
2 Commits
paul.masur
...
larger-col
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
468850e9f4 | ||
|
|
a27c64998f |
@@ -11,7 +11,7 @@ repository = "https://github.com/quickwit-oss/tantivy"
|
||||
readme = "README.md"
|
||||
keywords = ["search", "information", "retrieval"]
|
||||
edition = "2021"
|
||||
rust-version = "1.86"
|
||||
rust-version = "1.92"
|
||||
exclude = ["benches/*.json", "benches/*.txt"]
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -121,7 +121,7 @@ pub struct FileSlice {
|
||||
|
||||
impl fmt::Debug for FileSlice {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "FileSlice({:?}, {:?})", &self.data, self.range)
|
||||
write!(f, "FileSlice({:?}, {:?})", self.data, self.range)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -301,11 +301,14 @@ pub trait SegmentCollector: 'static {
|
||||
/// The query pushes the scored document to the collector via this method.
|
||||
fn collect(&mut self, doc: DocId, score: Score);
|
||||
|
||||
/// The query pushes the scored document to the collector via this method.
|
||||
/// The query pushes the matched documents to the collector via this method.
|
||||
/// This method is used when the collector does not require scoring.
|
||||
///
|
||||
/// See [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN) for the
|
||||
/// buffer size passed to the collector.
|
||||
/// `docs` is a block of matched doc ids. Doc ids are produced in increasing
|
||||
/// order, in windows of [`COLLECT_BLOCK_BUFFER_LEN`](crate::COLLECT_BLOCK_BUFFER_LEN),
|
||||
/// but several windows are accumulated before being flushed here, so the
|
||||
/// block may be larger than `COLLECT_BLOCK_BUFFER_LEN`. Implementations must
|
||||
/// not assume any particular maximum length.
|
||||
fn collect_block(&mut self, docs: &[DocId]) {
|
||||
for doc in docs {
|
||||
self.collect(*doc, 0.0);
|
||||
|
||||
@@ -52,7 +52,7 @@ impl<T: FastValue> SortKeyComputer for SortByStaticFastValue<T> {
|
||||
if schema_type != T::to_type() {
|
||||
return Err(crate::TantivyError::SchemaError(format!(
|
||||
"Field `{}` is of type {schema_type:?}, not of the type {:?}.",
|
||||
&self.field,
|
||||
self.field,
|
||||
T::to_type()
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -11,9 +11,14 @@ use crate::DocId;
|
||||
/// to compare `[u32; 4]`.
|
||||
pub const TERMINATED: DocId = i32::MAX as u32;
|
||||
|
||||
/// The collect_block method on `SegmentCollector` uses a buffer of this size.
|
||||
/// Passed results to `collect_block` will not exceed this size and will be
|
||||
/// exactly this size as long as we can fill the buffer.
|
||||
/// Window size used by [`DocSet::fill_buffer`]: a single `fill_buffer` call
|
||||
/// writes at most this many doc ids, and exactly this many as long as the
|
||||
/// `DocSet` is not exhausted.
|
||||
///
|
||||
/// Note that this is *not* the maximum length of the slice passed to
|
||||
/// `SegmentCollector::collect_block`: the collection loop accumulates several
|
||||
/// such windows into a larger buffer before flushing it, so `collect_block`
|
||||
/// may receive a block larger than `COLLECT_BLOCK_BUFFER_LEN`.
|
||||
pub const COLLECT_BLOCK_BUFFER_LEN: usize = 64;
|
||||
|
||||
/// Number of `TinySet` (64-bit) buckets in a block used by [`DocSet::fill_bitset_block`].
|
||||
|
||||
@@ -322,7 +322,7 @@ impl SegmentReader {
|
||||
// Without expand dots enabled dots need to be escaped.
|
||||
let escaped_json_path = json_path.replace('.', "\\.");
|
||||
let full_path = format!("{field_name}.{escaped_json_path}");
|
||||
let full_path_unescaped = format!("{}.{}", field_name, &json_path);
|
||||
let full_path_unescaped = format!("{}.{}", field_name, json_path);
|
||||
map_to_canonical.insert(full_path_unescaped, full_path.to_string());
|
||||
full_path
|
||||
} else {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::docset::COLLECT_BLOCK_BUFFER_LEN;
|
||||
use crate::index::SegmentReader;
|
||||
use crate::postings::FreqReadingOption;
|
||||
use crate::query::disjunction::Disjunction;
|
||||
@@ -531,13 +530,12 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
|
||||
) -> crate::Result<()> {
|
||||
let scorer = self.complex_scorer(reader, 1.0, || DoNothingCombiner)?;
|
||||
let num_docs = reader.num_docs();
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
|
||||
match scorer {
|
||||
SpecializedScorer::TermUnion(term_scorers) => {
|
||||
let mut union_scorer =
|
||||
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn, num_docs);
|
||||
for_each_docset_buffered(&mut union_scorer, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut union_scorer, callback);
|
||||
}
|
||||
SpecializedScorer::TermIntersection(term_scorers) => {
|
||||
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
|
||||
@@ -545,10 +543,10 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
|
||||
.map(|term_scorer| Box::new(term_scorer) as Box<dyn Scorer>)
|
||||
.collect();
|
||||
let mut intersection = intersect_scorers(boxed_scorers, num_docs);
|
||||
for_each_docset_buffered(intersection.as_mut(), &mut buffer, callback);
|
||||
for_each_docset_buffered(intersection.as_mut(), callback);
|
||||
}
|
||||
SpecializedScorer::Other(mut scorer) => {
|
||||
for_each_docset_buffered(scorer.as_mut(), &mut buffer, callback);
|
||||
for_each_docset_buffered(scorer.as_mut(), callback);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use super::term_scorer::TermScorer;
|
||||
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
|
||||
use crate::docset::DocSet;
|
||||
use crate::fieldnorm::FieldNormReader;
|
||||
use crate::index::SegmentReader;
|
||||
use crate::postings::SegmentPostings;
|
||||
@@ -92,13 +92,11 @@ impl Weight for TermWeight {
|
||||
) -> crate::Result<()> {
|
||||
match self.specialized_scorer(reader, 1.0)? {
|
||||
TermOrEmptyOrAllScorer::TermScorer(mut term_scorer) => {
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
for_each_docset_buffered(&mut term_scorer, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut term_scorer, callback);
|
||||
}
|
||||
TermOrEmptyOrAllScorer::Empty => {}
|
||||
TermOrEmptyOrAllScorer::AllMatch(mut all_scorer) => {
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
for_each_docset_buffered(&mut all_scorer, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut all_scorer, callback);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -17,18 +17,56 @@ pub(crate) fn for_each_scorer<TScorer: Scorer + ?Sized>(
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates through all of the documents matched by the DocSet
|
||||
/// `DocSet`.
|
||||
/// Number of `COLLECT_BLOCK_BUFFER_LEN`-sized windows accumulated into the large
|
||||
/// buffer before it is flushed to the collector via `collect_block`.
|
||||
const NUM_WINDOWS_PER_BLOCK: usize = 32;
|
||||
/// Size of the buffer accumulated before invoking the callback (2_048 = 32 * 64).
|
||||
/// `fill_buffer` keeps writing `COLLECT_BLOCK_BUFFER_LEN`-sized windows; this only
|
||||
/// changes how much we accumulate before flushing.
|
||||
const LARGE_COLLECT_BUFFER_LEN: usize = COLLECT_BLOCK_BUFFER_LEN * NUM_WINDOWS_PER_BLOCK;
|
||||
|
||||
/// Iterates through all of the documents matched by the `DocSet`, flushing
|
||||
/// blocks of up to `LARGE_COLLECT_BUFFER_LEN` doc ids to `callback`.
|
||||
///
|
||||
/// `fill_buffer` only ever writes `COLLECT_BLOCK_BUFFER_LEN` doc ids at a time,
|
||||
/// so we accumulate several such windows into a single larger buffer before
|
||||
/// handing it to the collector. This amortizes the per-`collect_block` overhead
|
||||
/// (virtual dispatch, aggregation setup) over more documents.
|
||||
#[inline]
|
||||
pub(crate) fn for_each_docset_buffered<T: DocSet + ?Sized>(
|
||||
docset: &mut T,
|
||||
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
mut callback: impl FnMut(&[DocId]),
|
||||
) {
|
||||
// Heap-allocated once per call (i.e. once per segment in the no-score path).
|
||||
// `new_zeroed_slice` zeroes directly on the heap, avoiding a 2_048-element
|
||||
// stack temporary.
|
||||
// SAFETY: an all-zero bit pattern is a valid value for every `DocId` (u32),
|
||||
// so the zeroed slice is fully initialized.
|
||||
let mut buffer: Box<[DocId]> =
|
||||
unsafe { Box::new_zeroed_slice(LARGE_COLLECT_BUFFER_LEN).assume_init() };
|
||||
loop {
|
||||
let num_items = docset.fill_buffer(buffer);
|
||||
callback(&buffer[..num_items]);
|
||||
if num_items != buffer.len() {
|
||||
let mut filled = 0;
|
||||
let mut reached_end = false;
|
||||
// Fill the large buffer one `COLLECT_BLOCK_BUFFER_LEN` window at a time.
|
||||
// `chunks_exact_mut` yields windows of exactly `COLLECT_BLOCK_BUFFER_LEN`
|
||||
// because `LARGE_COLLECT_BUFFER_LEN` is a multiple of it (empty remainder).
|
||||
// The windows are contiguous and filled in order, so the doc ids always
|
||||
// occupy the contiguous prefix `buffer[..filled]`.
|
||||
for window in buffer.chunks_exact_mut(COLLECT_BLOCK_BUFFER_LEN) {
|
||||
// SAFETY: each `window` is a slice of exactly `COLLECT_BLOCK_BUFFER_LEN`
|
||||
// elements, so reinterpreting its start pointer as a fixed-size array
|
||||
// reference of that length is valid.
|
||||
let window: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN] =
|
||||
unsafe { &mut *window.as_mut_ptr().cast::<[DocId; COLLECT_BLOCK_BUFFER_LEN]>() };
|
||||
let num_items = docset.fill_buffer(window);
|
||||
filled += num_items;
|
||||
if num_items != COLLECT_BLOCK_BUFFER_LEN {
|
||||
reached_end = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
callback(&buffer[..filled]);
|
||||
if reached_end {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -104,9 +142,7 @@ pub trait Weight: Send + Sync + 'static {
|
||||
callback: &mut dyn FnMut(&[DocId]),
|
||||
) -> crate::Result<()> {
|
||||
let mut docset = self.scorer(reader, 1.0)?;
|
||||
|
||||
let mut buffer = [0u32; COLLECT_BLOCK_BUFFER_LEN];
|
||||
for_each_docset_buffered(&mut docset, &mut buffer, callback);
|
||||
for_each_docset_buffered(&mut docset, callback);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -94,13 +94,7 @@ impl SkipIndex {
|
||||
byte_range: 0..first_layer_len,
|
||||
};
|
||||
for layer in &self.layers {
|
||||
if let Some(checkpoint) =
|
||||
layer.seek_start_at_offset(target, cur_checkpoint.byte_range.start)
|
||||
{
|
||||
cur_checkpoint = checkpoint;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
cur_checkpoint = layer.seek_start_at_offset(target, cur_checkpoint.byte_range.start)?;
|
||||
}
|
||||
Some(cur_checkpoint)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user