mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-01 08:00:41 +00:00
Optimize buffered union scoring with block refills
Add horizon-limited buffering APIs for docsets and scorers so buffered union can refill from block-oriented postings while preserving term frequencies. This lets term scorers score buffered docs directly and reduces per-document refill overhead for dense unions.
This commit is contained in:
@@ -138,6 +138,31 @@ pub trait DocSet: Send {
|
||||
buffer.len()
|
||||
}
|
||||
|
||||
/// Fills a given mutable buffer with the next doc ids smaller than `horizon`.
|
||||
///
|
||||
/// Unlike [`DocSet::fill_buffer`], this method must not advance past a doc id greater than or
|
||||
/// equal to `horizon`.
|
||||
fn fill_buffer_up_to(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
if self.doc() == TERMINATED {
|
||||
return 0;
|
||||
}
|
||||
for (pos, buffer_val) in buffer.iter_mut().enumerate() {
|
||||
let doc = self.doc();
|
||||
if doc >= horizon {
|
||||
return pos;
|
||||
}
|
||||
*buffer_val = doc;
|
||||
if self.advance() == TERMINATED {
|
||||
return pos + 1;
|
||||
}
|
||||
}
|
||||
buffer.len()
|
||||
}
|
||||
|
||||
/// Returns the current document
|
||||
/// Right after creating a new `DocSet`, the docset points to the first document.
|
||||
///
|
||||
@@ -251,6 +276,14 @@ impl DocSet for &mut dyn DocSet {
|
||||
(**self).fill_buffer(buffer)
|
||||
}
|
||||
|
||||
fn fill_buffer_up_to(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
(**self).fill_buffer_up_to(horizon, buffer)
|
||||
}
|
||||
|
||||
fn fill_bitset_block(
|
||||
&mut self,
|
||||
min_doc: DocId,
|
||||
|
||||
@@ -169,10 +169,8 @@ mod macros;
|
||||
mod future_result;
|
||||
|
||||
// Re-exports
|
||||
pub use columnar;
|
||||
pub use common::{ByteCount, DateTime};
|
||||
pub use query_grammar;
|
||||
pub use time;
|
||||
pub use {columnar, query_grammar, time};
|
||||
|
||||
pub use crate::error::TantivyError;
|
||||
pub use crate::future_result::FutureResult;
|
||||
|
||||
@@ -240,6 +240,42 @@ impl BlockSegmentPostings {
|
||||
self.freq_decoder.output_array()
|
||||
}
|
||||
|
||||
pub(crate) fn copy_docs_and_term_freqs(
|
||||
&self,
|
||||
start: usize,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId],
|
||||
term_freqs: &mut [u32],
|
||||
) -> usize {
|
||||
debug_assert_eq!(docs.len(), term_freqs.len());
|
||||
let block_docs = self.docs();
|
||||
let available = block_docs.len().saturating_sub(start);
|
||||
let max_len = available.min(docs.len());
|
||||
if max_len == 0 {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let source_docs = &block_docs[start..start + max_len];
|
||||
let len = if source_docs[max_len - 1] < horizon {
|
||||
max_len
|
||||
} else {
|
||||
source_docs
|
||||
.iter()
|
||||
.position(|&doc| doc >= horizon)
|
||||
.unwrap_or(max_len)
|
||||
};
|
||||
|
||||
docs[..len].copy_from_slice(&source_docs[..len]);
|
||||
|
||||
let block_freqs = self.freq_output_array();
|
||||
if block_freqs.len() >= start + len {
|
||||
term_freqs[..len].copy_from_slice(&block_freqs[start..start + len]);
|
||||
} else {
|
||||
term_freqs[..len].fill(1);
|
||||
}
|
||||
len
|
||||
}
|
||||
|
||||
/// Return the frequency at index `idx` of the block.
|
||||
#[inline]
|
||||
pub fn freq(&self, idx: usize) -> u32 {
|
||||
|
||||
@@ -532,6 +532,16 @@ pub(crate) mod tests {
|
||||
fn score(&mut self) -> Score {
|
||||
self.0.score()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
self.0.can_score_doc()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
|
||||
self.0.score_doc(doc, term_freq)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn test_skip_against_unoptimized<F: Fn() -> Box<dyn DocSet>>(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use common::HasLen;
|
||||
|
||||
use crate::docset::DocSet;
|
||||
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::positions::PositionReader;
|
||||
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||
@@ -151,6 +151,34 @@ impl SegmentPostings {
|
||||
position_reader,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
let mut filled = 0;
|
||||
while filled < COLLECT_BLOCK_BUFFER_LEN && self.doc() < horizon {
|
||||
let copied = self.block_cursor.copy_docs_and_term_freqs(
|
||||
self.cur,
|
||||
horizon,
|
||||
&mut docs[filled..],
|
||||
&mut term_freqs[filled..],
|
||||
);
|
||||
if copied == 0 {
|
||||
break;
|
||||
}
|
||||
filled += copied;
|
||||
self.cur += copied;
|
||||
|
||||
if self.cur == COMPRESSION_BLOCK_SIZE {
|
||||
self.cur = 0;
|
||||
self.block_cursor.advance();
|
||||
}
|
||||
}
|
||||
filled
|
||||
}
|
||||
}
|
||||
|
||||
impl DocSet for SegmentPostings {
|
||||
|
||||
@@ -109,6 +109,16 @@ impl Scorer for AllScorer {
|
||||
fn score(&mut self) -> Score {
|
||||
1.0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
1.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -91,10 +91,14 @@ fn into_box_scorer<TScoreCombiner: ScoreCombiner>(
|
||||
num_docs: u32,
|
||||
) -> Box<dyn Scorer> {
|
||||
match scorer {
|
||||
SpecializedScorer::TermUnion(term_scorers) => {
|
||||
let union_scorer =
|
||||
BufferedUnionScorer::build(term_scorers, score_combiner_fn, num_docs);
|
||||
Box::new(union_scorer)
|
||||
SpecializedScorer::TermUnion(mut term_scorers) => {
|
||||
if term_scorers.len() == 1 {
|
||||
Box::new(term_scorers.pop().unwrap())
|
||||
} else {
|
||||
let union_scorer =
|
||||
BufferedUnionScorer::build(term_scorers, score_combiner_fn, num_docs);
|
||||
Box::new(union_scorer)
|
||||
}
|
||||
}
|
||||
SpecializedScorer::TermIntersection(term_scorers) => {
|
||||
let boxed_scorers: Vec<Box<dyn Scorer>> = term_scorers
|
||||
|
||||
@@ -112,6 +112,14 @@ impl<S: Scorer> DocSet for BoostScorer<S> {
|
||||
self.underlying.fill_buffer(buffer)
|
||||
}
|
||||
|
||||
fn fill_buffer_up_to(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
buffer: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
self.underlying.fill_buffer_up_to(horizon, buffer)
|
||||
}
|
||||
|
||||
fn doc(&self) -> u32 {
|
||||
self.underlying.doc()
|
||||
}
|
||||
@@ -138,6 +146,27 @@ impl<S: Scorer> Scorer for BoostScorer<S> {
|
||||
fn score(&mut self) -> Score {
|
||||
self.underlying.score() * self.boost
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
self.underlying.can_score_doc()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
|
||||
self.underlying.score_doc(doc, term_freq) * self.boost
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
self.underlying
|
||||
.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -141,6 +141,16 @@ impl<TDocSet: DocSet + 'static> Scorer for ConstScorer<TDocSet> {
|
||||
fn score(&mut self) -> Score {
|
||||
self.score
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
self.score
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -315,6 +315,20 @@ mod tests {
|
||||
fn score(&mut self) -> Score {
|
||||
self.foo.get(self.cursor).map(|x| x.1).unwrap_or(0.0)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, _term_freq: u32) -> Score {
|
||||
self.foo
|
||||
.iter()
|
||||
.find(|(candidate_doc, _)| *candidate_doc == doc)
|
||||
.map(|(_, score)| *score)
|
||||
.unwrap_or(0.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -59,6 +59,16 @@ impl Scorer for EmptyScorer {
|
||||
fn score(&mut self) -> Score {
|
||||
0.0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
0.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -111,6 +111,16 @@ where
|
||||
fn score(&mut self) -> Score {
|
||||
self.underlying_docset.score()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
self.underlying_docset.can_score_doc()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
|
||||
self.underlying_docset.score_doc(doc, term_freq)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,5 +1,40 @@
|
||||
use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::query::Scorer;
|
||||
use crate::Score;
|
||||
use crate::{DocId, Score};
|
||||
|
||||
struct ScoreOnlyScorer {
|
||||
doc: DocId,
|
||||
score: Score,
|
||||
}
|
||||
|
||||
impl DocSet for ScoreOnlyScorer {
|
||||
fn advance(&mut self) -> DocId {
|
||||
self.doc = TERMINATED;
|
||||
TERMINATED
|
||||
}
|
||||
|
||||
fn doc(&self) -> DocId {
|
||||
self.doc
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> u32 {
|
||||
1
|
||||
}
|
||||
}
|
||||
|
||||
impl Scorer for ScoreOnlyScorer {
|
||||
fn score(&mut self) -> Score {
|
||||
self.score
|
||||
}
|
||||
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
self.score
|
||||
}
|
||||
}
|
||||
|
||||
/// The `ScoreCombiner` trait defines how to compute
|
||||
/// an overall score given a list of scores.
|
||||
@@ -10,6 +45,12 @@ pub trait ScoreCombiner: Default + Clone + Send + Copy + 'static {
|
||||
/// or not.
|
||||
fn update<TScorer: Scorer>(&mut self, scorer: &mut TScorer);
|
||||
|
||||
/// Aggregates the score combiner with an already computed score.
|
||||
fn update_score(&mut self, doc: DocId, score: Score) {
|
||||
let mut scorer = ScoreOnlyScorer { doc, score };
|
||||
self.update(&mut scorer);
|
||||
}
|
||||
|
||||
/// Clears the score combiner state back to its initial state.
|
||||
fn clear(&mut self);
|
||||
|
||||
@@ -27,6 +68,8 @@ pub struct DoNothingCombiner;
|
||||
impl ScoreCombiner for DoNothingCombiner {
|
||||
fn update<TScorer: Scorer>(&mut self, _scorer: &mut TScorer) {}
|
||||
|
||||
fn update_score(&mut self, _doc: DocId, _score: Score) {}
|
||||
|
||||
fn clear(&mut self) {}
|
||||
|
||||
#[inline]
|
||||
@@ -42,10 +85,16 @@ pub struct SumCombiner {
|
||||
}
|
||||
|
||||
impl ScoreCombiner for SumCombiner {
|
||||
#[inline]
|
||||
fn update<TScorer: Scorer>(&mut self, scorer: &mut TScorer) {
|
||||
self.score += scorer.score();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn update_score(&mut self, _doc: DocId, score: Score) {
|
||||
self.score += score;
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.score = 0.0;
|
||||
}
|
||||
@@ -77,12 +126,19 @@ impl DisjunctionMaxCombiner {
|
||||
}
|
||||
|
||||
impl ScoreCombiner for DisjunctionMaxCombiner {
|
||||
#[inline]
|
||||
fn update<TScorer: Scorer>(&mut self, scorer: &mut TScorer) {
|
||||
let score = scorer.score();
|
||||
self.max = Score::max(score, self.max);
|
||||
self.sum += score;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn update_score(&mut self, _doc: DocId, score: Score) {
|
||||
self.max = Score::max(score, self.max);
|
||||
self.sum += score;
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.max = 0.0;
|
||||
self.sum = 0.0;
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::ops::DerefMut;
|
||||
|
||||
use downcast_rs::impl_downcast;
|
||||
|
||||
use crate::docset::DocSet;
|
||||
use crate::Score;
|
||||
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
|
||||
use crate::{DocId, Score};
|
||||
|
||||
/// Scored set of documents matching a query within a specific segment.
|
||||
///
|
||||
@@ -13,6 +13,29 @@ pub trait Scorer: downcast_rs::Downcast + DocSet + 'static {
|
||||
///
|
||||
/// This method will perform a bit of computation and is not cached.
|
||||
fn score(&mut self) -> Score;
|
||||
|
||||
/// Returns true if [`Scorer::score_doc`] can score arbitrary buffered docs without
|
||||
/// repositioning the scorer.
|
||||
fn can_score_doc(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns the score for `doc` with its term frequency.
|
||||
fn score_doc(&mut self, _doc: DocId, _term_freq: u32) -> Score {
|
||||
panic!("score_doc is not supported by this scorer. You need check can_score_doc() before calling this method.")
|
||||
}
|
||||
|
||||
/// Fills docs and term frequencies up to `horizon`.
|
||||
fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
let len = DocSet::fill_buffer_up_to(self, horizon, docs);
|
||||
term_freqs[..len].fill(1);
|
||||
len
|
||||
}
|
||||
}
|
||||
|
||||
impl_downcast!(Scorer);
|
||||
@@ -22,4 +45,25 @@ impl Scorer for Box<dyn Scorer> {
|
||||
fn score(&mut self) -> Score {
|
||||
self.deref_mut().score()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
self.as_ref().can_score_doc()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
|
||||
self.deref_mut().score_doc(doc, term_freq)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
self.deref_mut()
|
||||
.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::docset::DocSet;
|
||||
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
|
||||
use crate::fieldnorm::FieldNormReader;
|
||||
use crate::postings::{BlockSegmentPostings, FreqReadingOption, Postings, SegmentPostings};
|
||||
use crate::query::bm25::Bm25Weight;
|
||||
@@ -147,6 +147,27 @@ impl Scorer for TermScorer {
|
||||
let term_freq = self.term_freq();
|
||||
self.similarity_weight.score(fieldnorm_id, term_freq)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn can_score_doc(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn score_doc(&mut self, doc: DocId, term_freq: u32) -> Score {
|
||||
let fieldnorm_id = self.fieldnorm_reader.fieldnorm_id(doc);
|
||||
self.similarity_weight.score(fieldnorm_id, term_freq)
|
||||
}
|
||||
|
||||
fn fill_buffer_up_to_with_term_freqs(
|
||||
&mut self,
|
||||
horizon: DocId,
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
) -> usize {
|
||||
self.postings
|
||||
.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::{DocId, Score};
|
||||
// of upcoming document IDs (the "horizon").
|
||||
const HORIZON_NUM_TINYBITSETS: usize = HORIZON as usize / 64;
|
||||
const HORIZON: u32 = 64u32 * 64u32;
|
||||
const GROUPED_INSERT_MAX_BUCKET_SPAN: u32 = 2;
|
||||
|
||||
/// Creates a `DocSet` that iterate through the union of two or more `DocSet`s.
|
||||
pub struct BufferedUnionScorer<TScorer, TScoreCombiner = DoNothingCombiner> {
|
||||
@@ -36,9 +37,168 @@ pub struct BufferedUnionScorer<TScorer, TScoreCombiner = DoNothingCombiner> {
|
||||
score: Score,
|
||||
/// Number of documents in the segment.
|
||||
num_docs: u32,
|
||||
/// Scratch buffer for block-based refill.
|
||||
refill_docs: [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
/// Scratch buffer for term frequencies matching `refill_docs`.
|
||||
refill_term_freqs: [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
/// Whether all children support scoring buffered docs after advancing.
|
||||
use_score_doc_refill: bool,
|
||||
}
|
||||
|
||||
fn refill_scorer<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
#[inline]
|
||||
fn union_bucket(
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
bucket_pos: u32,
|
||||
tinyset: TinySet,
|
||||
) {
|
||||
debug_assert!((bucket_pos as usize) < HORIZON_NUM_TINYBITSETS);
|
||||
// `bucket` comes from a doc delta below `HORIZON`; there are exactly
|
||||
// `HORIZON / 64` buckets in the refill window.
|
||||
bitsets[bucket_pos as usize] = bitsets[bucket_pos as usize].union(tinyset);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn insert_delta(bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS], delta: DocId) {
|
||||
debug_assert!(delta < HORIZON);
|
||||
// `delta < HORIZON`, so `delta / 64` is in the bitset array. The bit
|
||||
// offset is reduced modulo 64 before being inserted in the TinySet.
|
||||
bitsets[delta as usize / 64].insert_mut(delta % 64u32);
|
||||
}
|
||||
|
||||
fn insert_and_score_full_buffer<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorer: &mut TScorer,
|
||||
docs: &[DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &[u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
min_doc: DocId,
|
||||
) {
|
||||
debug_assert!(docs.windows(2).all(|pair| pair[0] < pair[1]));
|
||||
debug_assert!(docs[COLLECT_BLOCK_BUFFER_LEN - 1] - min_doc < HORIZON);
|
||||
|
||||
let first_delta = docs[0] - min_doc;
|
||||
let last_delta = docs[COLLECT_BLOCK_BUFFER_LEN - 1] - min_doc;
|
||||
let first_bucket = first_delta / 64;
|
||||
let last_bucket = last_delta / 64;
|
||||
|
||||
// Common for very dense scorers: 64 distinct doc ids in one 64-doc bucket
|
||||
// means all bits in that bucket are present.
|
||||
if first_bucket == last_bucket {
|
||||
union_bucket(bitsets, first_bucket, TinySet::full());
|
||||
score_full_buffer(scorer, docs, term_freqs, score_combiner, min_doc);
|
||||
return;
|
||||
}
|
||||
|
||||
// 64 sorted distinct integers spanning exactly 64 values are consecutive.
|
||||
// If they cross a TinySet boundary, this is just the suffix of the first
|
||||
// bucket plus the prefix of the second bucket.
|
||||
if last_delta - first_delta == COLLECT_BLOCK_BUFFER_LEN as u32 - 1 {
|
||||
union_bucket(
|
||||
bitsets,
|
||||
first_bucket,
|
||||
TinySet::range_greater_or_equal(first_delta % 64u32),
|
||||
);
|
||||
union_bucket(
|
||||
bitsets,
|
||||
last_bucket,
|
||||
TinySet::range_lower((last_delta + 1) % 64u32),
|
||||
);
|
||||
score_full_buffer(scorer, docs, term_freqs, score_combiner, min_doc);
|
||||
return;
|
||||
}
|
||||
|
||||
// Grouping wins only for very dense buffers that hit the same TinySet many
|
||||
// times. Once the 64 docs are spread farther, a straight pass is cheaper.
|
||||
if last_bucket - first_bucket <= GROUPED_INSERT_MAX_BUCKET_SPAN {
|
||||
let mut bucket = first_bucket;
|
||||
let mut tinyset = TinySet::empty();
|
||||
for (&doc, &term_freq) in docs.iter().zip(term_freqs.iter()) {
|
||||
let delta = doc - min_doc;
|
||||
let delta_bucket = delta / 64;
|
||||
if delta_bucket != bucket {
|
||||
union_bucket(bitsets, bucket, tinyset);
|
||||
bucket = delta_bucket;
|
||||
tinyset = TinySet::empty();
|
||||
}
|
||||
tinyset.insert_mut(delta % 64u32);
|
||||
let score = scorer.score_doc(doc, term_freq);
|
||||
update_score_combiner(score_combiner, delta, doc, score);
|
||||
}
|
||||
union_bucket(bitsets, bucket, tinyset);
|
||||
} else {
|
||||
for (&doc, &term_freq) in docs.iter().zip(term_freqs.iter()) {
|
||||
let delta = doc - min_doc;
|
||||
insert_delta(bitsets, delta);
|
||||
// TODO: score_doc access the field_norm reader for each _term_, instead of once per doc.
|
||||
// We could optimize this by caching the field norm for the doc, and reusing it for all
|
||||
// terms in the doc.
|
||||
let score = scorer.score_doc(doc, term_freq);
|
||||
update_score_combiner(score_combiner, delta, doc, score);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn update_score_combiner<TScoreCombiner: ScoreCombiner>(
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
delta: DocId,
|
||||
doc: DocId,
|
||||
score: Score,
|
||||
) {
|
||||
debug_assert!(delta < HORIZON);
|
||||
// Full and partial refill only buffer docs below `horizon`, so their
|
||||
// deltas are always in the score-combiner window.
|
||||
score_combiner[delta as usize].update_score(doc, score);
|
||||
}
|
||||
|
||||
fn score_full_buffer<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorer: &mut TScorer,
|
||||
docs: &[DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &[u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
min_doc: DocId,
|
||||
) {
|
||||
for (&doc, &term_freq) in docs.iter().zip(term_freqs.iter()) {
|
||||
let score = scorer.score_doc(doc, term_freq);
|
||||
update_score_combiner(score_combiner, doc - min_doc, doc, score);
|
||||
}
|
||||
}
|
||||
|
||||
fn refill_scorer_with_score_docs<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorer: &mut TScorer,
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
min_doc: DocId,
|
||||
horizon: DocId,
|
||||
) {
|
||||
loop {
|
||||
let len = scorer.fill_buffer_up_to_with_term_freqs(horizon, docs, term_freqs);
|
||||
if len == COLLECT_BLOCK_BUFFER_LEN {
|
||||
debug_assert!(docs[COLLECT_BLOCK_BUFFER_LEN - 1] != TERMINATED);
|
||||
debug_assert!(docs[COLLECT_BLOCK_BUFFER_LEN - 1] < horizon);
|
||||
insert_and_score_full_buffer(
|
||||
scorer,
|
||||
docs,
|
||||
term_freqs,
|
||||
bitsets,
|
||||
score_combiner,
|
||||
min_doc,
|
||||
);
|
||||
} else {
|
||||
for (&doc, &term_freq) in docs[..len].iter().zip(term_freqs[..len].iter()) {
|
||||
let delta = doc - min_doc;
|
||||
insert_delta(bitsets, delta);
|
||||
let score = scorer.score_doc(doc, term_freq);
|
||||
update_score_combiner(score_combiner, delta, doc, score);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn refill_scorer_from_current_doc<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorer: &mut TScorer,
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
@@ -50,9 +210,9 @@ fn refill_scorer<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
if doc >= horizon {
|
||||
break;
|
||||
}
|
||||
// add this document
|
||||
let delta = doc - min_doc;
|
||||
bitsets[(delta / 64) as usize].insert_mut(delta % 64u32);
|
||||
insert_delta(bitsets, delta);
|
||||
debug_assert!(delta < HORIZON);
|
||||
score_combiner[delta as usize].update(scorer);
|
||||
scorer.advance();
|
||||
}
|
||||
@@ -62,11 +222,26 @@ fn refill<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
|
||||
scorers: &mut Vec<TScorer>,
|
||||
bitsets: &mut [TinySet; HORIZON_NUM_TINYBITSETS],
|
||||
score_combiner: &mut [TScoreCombiner; HORIZON as usize],
|
||||
docs: &mut [DocId; COLLECT_BLOCK_BUFFER_LEN],
|
||||
term_freqs: &mut [u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
min_doc: DocId,
|
||||
use_score_doc_refill: bool,
|
||||
) {
|
||||
let horizon = min_doc + HORIZON;
|
||||
for scorer in scorers.iter_mut() {
|
||||
refill_scorer(scorer, bitsets, score_combiner, min_doc, horizon);
|
||||
if use_score_doc_refill {
|
||||
refill_scorer_with_score_docs(
|
||||
scorer,
|
||||
bitsets,
|
||||
score_combiner,
|
||||
docs,
|
||||
term_freqs,
|
||||
min_doc,
|
||||
horizon,
|
||||
);
|
||||
} else {
|
||||
refill_scorer_from_current_doc(scorer, bitsets, score_combiner, min_doc, horizon);
|
||||
}
|
||||
}
|
||||
scorers.retain(|scorer| scorer.doc() != TERMINATED);
|
||||
}
|
||||
@@ -78,6 +253,7 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer
|
||||
score_combiner_fn: impl FnOnce() -> TScoreCombiner,
|
||||
num_docs: u32,
|
||||
) -> BufferedUnionScorer<TScorer, TScoreCombiner> {
|
||||
let use_score_doc_refill = docsets.iter().all(Scorer::can_score_doc);
|
||||
let non_empty_docsets: Vec<TScorer> = docsets
|
||||
.into_iter()
|
||||
.filter(|docset| docset.doc() != TERMINATED)
|
||||
@@ -91,6 +267,9 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer
|
||||
doc: 0,
|
||||
score: 0.0,
|
||||
num_docs,
|
||||
refill_docs: [TERMINATED; COLLECT_BLOCK_BUFFER_LEN],
|
||||
refill_term_freqs: [1u32; COLLECT_BLOCK_BUFFER_LEN],
|
||||
use_score_doc_refill,
|
||||
};
|
||||
if union.refill() {
|
||||
union.advance();
|
||||
@@ -111,7 +290,10 @@ impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> BufferedUnionScorer<TScorer
|
||||
&mut self.docsets,
|
||||
&mut self.bitsets,
|
||||
&mut self.scores,
|
||||
&mut self.refill_docs,
|
||||
&mut self.refill_term_freqs,
|
||||
min_doc,
|
||||
self.use_score_doc_refill,
|
||||
);
|
||||
true
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user