Compare commits

..

14 Commits

Author SHA1 Message Date
Paul Masurel
01d670f60c renaming 2026-01-16 15:44:30 +01:00
Paul Masurel
b77338b590 compiling 2026-01-16 15:18:31 +01:00
Paul Masurel
c75fa94d25 blop 2026-01-16 14:09:21 +01:00
Paul Masurel
cf632673ac blop 2026-01-16 13:56:21 +01:00
Paul Masurel
6f00d96127 blop 2026-01-16 13:45:51 +01:00
Paul Masurel
a5ccb62c99 Removing block postings public accessors 2026-01-16 11:38:01 +01:00
Paul Masurel
c42505a043 added method to lift Postings into TermScorer 2026-01-16 11:21:03 +01:00
Paul Masurel
3e57eb9add Generic TermScorer 2026-01-15 18:06:26 +01:00
Paul Masurel
0955b44ce1 blop 2026-01-15 14:54:42 +01:00
Paul Masurel
783a2a6bef Removing read_block_postings 2026-01-15 12:02:10 +01:00
Paul Masurel
1e3c353e21 blop 2026-01-14 11:43:38 +01:00
Paul Masurel
799e88adbd blop 2026-01-13 20:21:22 +01:00
Paul Masurel
1d5fe6bc7c blop 2026-01-13 19:06:34 +01:00
Paul Masurel
d768b2a491 blop 2026-01-13 17:03:12 +01:00
36 changed files with 940 additions and 1480 deletions

View File

@@ -91,46 +91,10 @@ fn main() -> tantivy::Result<()> {
}
}
// A `Term` is a text token associated with a field.
// Let's go through all docs containing the term `title:the` and access their position
let term_the = Term::from_field_text(title, "the");
// Some other powerful operations (especially `.skip_to`) may be useful to consume these
// Some other powerful operations (especially `.seek`) may be useful to consume these
// posting lists rapidly.
// You can check for them in the [`DocSet`](https://docs.rs/tantivy/~0/tantivy/trait.DocSet.html) trait
// and the [`Postings`](https://docs.rs/tantivy/~0/tantivy/trait.Postings.html) trait
// Also, for some VERY specific high performance use case like an OLAP analysis of logs,
// you can get better performance by accessing directly the blocks of doc ids.
for segment_reader in searcher.segment_readers() {
// A segment contains different data structure.
// Inverted index stands for the combination of
// - the term dictionary
// - the inverted lists associated with each terms and their positions
let inverted_index = segment_reader.inverted_index(title)?;
// This segment posting object is like a cursor over the documents matching the term.
// The `IndexRecordOption` arguments tells tantivy we will be interested in both term
// frequencies and positions.
//
// If you don't need all this information, you may get better performance by decompressing
// less information.
if let Some(mut block_segment_postings) =
inverted_index.read_block_postings(&term_the, IndexRecordOption::Basic)?
{
loop {
let docs = block_segment_postings.docs();
if docs.is_empty() {
break;
}
// Once again these docs MAY contains deleted documents as well.
let docs = block_segment_postings.docs();
// Prints `Docs [0, 2].`
println!("Docs {docs:?}");
block_segment_postings.advance();
}
}
}
Ok(())
}

View File

@@ -1,18 +1,26 @@
mod postings;
mod standard;
pub mod postings;
pub mod standard;
use std::borrow::Cow;
use std::io;
use common::OwnedBytes;
use serde::{Deserialize, Serialize};
pub use standard::StandardCodec;
use crate::codec::postings::PostingsCodec;
use crate::postings::Postings;
use crate::schema::IndexRecordOption;
pub trait Codec: Clone + std::fmt::Debug + Send + Sync + 'static {
type PostingsCodec;
type PostingsCodec: PostingsCodec;
const NAME: &'static str;
fn from_json_props(json_value: &serde_json::Value) -> crate::Result<Self>;
fn to_json_props(&self) -> serde_json::Value;
fn postings_codec(&self) -> &Self::PostingsCodec;
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@@ -47,3 +55,36 @@ impl Default for CodecConfiguration {
CodecConfiguration::from_codec(&StandardCodec)
}
}
pub trait ObjectSafeCodec: 'static + Send + Sync {
fn load_postings_type_erased(
&self,
doc_freq: u32,
postings_data: OwnedBytes,
record_option: IndexRecordOption,
requested_option: IndexRecordOption,
positions_data: Option<OwnedBytes>,
) -> io::Result<Box<dyn Postings>>;
}
impl<TCodec: Codec> ObjectSafeCodec for TCodec {
fn load_postings_type_erased(
&self,
doc_freq: u32,
postings_data: OwnedBytes,
record_option: IndexRecordOption,
requested_option: IndexRecordOption,
positions_data: Option<OwnedBytes>,
) -> io::Result<Box<dyn Postings>> {
let postings: <<Self as Codec>::PostingsCodec as PostingsCodec>::Postings =
self.postings_codec().load_postings(
doc_freq,
postings_data,
record_option,
requested_option,
positions_data,
)?;
let boxed_postings: Box<dyn Postings> = Box::new(postings);
Ok(boxed_postings)
}
}

View File

@@ -1,23 +1,40 @@
use std::io;
use common::OwnedBytes;
use crate::fieldnorm::FieldNormReader;
use crate::postings::{FreqReadingOption, Postings};
use crate::query::Bm25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score};
pub trait PostingsCodec {
pub trait PostingsCodec: Send + Sync + 'static {
type PostingsSerializer: PostingsSerializer;
}
type Postings: Postings;
pub trait PostingsSerializer {
fn new(
fn new_serializer(
&self,
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> Self;
) -> Self::PostingsSerializer;
fn load_postings(
&self,
doc_freq: u32,
postings_data: OwnedBytes,
record_option: IndexRecordOption,
requested_option: IndexRecordOption,
positions_data: Option<OwnedBytes>,
) -> io::Result<Self::Postings>;
}
pub trait PostingsSerializer {
fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool);
fn write_doc(&mut self, doc_id: DocId, term_freq: u32);
fn close_term(&mut self, doc_freq: u32, wrt: &mut impl io::Write) -> io::Result<()>;
fn clear(&mut self);
}

View File

@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use crate::codec::standard::postings::StandardPostingsCodec;
use crate::codec::Codec;
mod postings;
pub mod postings;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct StandardCodec;
@@ -26,4 +26,8 @@ impl Codec for StandardCodec {
fn to_json_props(&self) -> serde_json::Value {
serde_json::Value::Null
}
fn postings_codec(&self) -> &Self::PostingsCodec {
&StandardPostingsCodec
}
}

View File

@@ -1,11 +1,11 @@
use std::io;
use common::VInt;
use common::{OwnedBytes, VInt};
use crate::directory::{FileSlice, OwnedBytes};
use crate::codec::standard::postings::skip::{BlockInfo, SkipReader};
use crate::fieldnorm::FieldNormReader;
use crate::postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE};
use crate::postings::{BlockInfo, FreqReadingOption, SkipReader};
use crate::postings::compression::{BlockDecoder, VIntDecoder as _, COMPRESSION_BLOCK_SIZE};
use crate::postings::FreqReadingOption;
use crate::query::Bm25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score, TERMINATED};
@@ -16,13 +16,8 @@ fn max_score<I: Iterator<Item = Score>>(mut it: I) -> Option<Score> {
/// `BlockSegmentPostings` is a cursor iterating over blocks
/// of documents.
///
/// # Warning
///
/// While it is useful for some very specific high-performance
/// use cases, you should prefer using `SegmentPostings` for most usage.
#[derive(Clone)]
pub struct BlockSegmentPostings {
pub(crate) struct BlockSegmentPostings {
pub(crate) doc_decoder: BlockDecoder,
block_loaded: bool,
freq_decoder: BlockDecoder,
@@ -88,7 +83,7 @@ fn split_into_skips_and_postings(
}
impl BlockSegmentPostings {
/// Opens a `BlockSegmentPostings`.
/// Opens a `StandardPostingsReader`.
/// `doc_freq` is the number of documents in the posting list.
/// `record_option` represents the amount of data available according to the schema.
/// `requested_option` is the amount of data requested by the user.
@@ -96,11 +91,10 @@ impl BlockSegmentPostings {
/// term frequency blocks.
pub(crate) fn open(
doc_freq: u32,
data: FileSlice,
bytes: OwnedBytes,
mut record_option: IndexRecordOption,
requested_option: IndexRecordOption,
) -> io::Result<BlockSegmentPostings> {
let bytes = data.read_bytes()?;
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?;
let skip_reader = match skip_data_opt {
Some(skip_data) => {
@@ -138,76 +132,13 @@ impl BlockSegmentPostings {
block_segment_postings.load_block();
Ok(block_segment_postings)
}
}
/// Returns the block_max_score for the current block.
/// It does not require the block to be loaded. For instance, it is ok to call this method
/// after having called `.shallow_advance(..)`.
///
/// See `TermScorer::block_max_score(..)` for more information.
pub fn block_max_score(
&mut self,
fieldnorm_reader: &FieldNormReader,
bm25_weight: &Bm25Weight,
) -> Score {
if let Some(score) = self.block_max_score_cache {
return score;
}
if let Some(skip_reader_max_score) = self.skip_reader.block_max_score(bm25_weight) {
// if we are on a full block, the skip reader should have the block max information
// for us
self.block_max_score_cache = Some(skip_reader_max_score);
return skip_reader_max_score;
}
// this is the last block of the segment posting list.
// If it is actually loaded, we can compute block max manually.
if self.block_is_loaded() {
let docs = self.doc_decoder.output_array().iter().cloned();
let freqs = self.freq_decoder.output_array().iter().cloned();
let bm25_scores = docs.zip(freqs).map(|(doc, term_freq)| {
let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc);
bm25_weight.score(fieldnorm_id, term_freq)
});
let block_max_score = max_score(bm25_scores).unwrap_or(0.0);
self.block_max_score_cache = Some(block_max_score);
return block_max_score;
}
// We do not have access to any good block max value. We return bm25_weight.max_score()
// as it is a valid upperbound.
//
// We do not cache it however, so that it gets computed when once block is loaded.
bm25_weight.max_score()
}
pub(crate) fn freq_reading_option(&self) -> FreqReadingOption {
impl BlockSegmentPostings {
pub fn freq_reading_option(&self) -> FreqReadingOption {
self.freq_reading_option
}
// Resets the block segment postings on another position
// in the postings file.
//
// This is useful for enumerating through a list of terms,
// and consuming the associated posting lists while avoiding
// reallocating a `BlockSegmentPostings`.
//
// # Warning
//
// This does not reset the positions list.
pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: OwnedBytes) -> io::Result<()> {
let (skip_data_opt, postings_data) =
split_into_skips_and_postings(doc_freq, postings_data)?;
self.data = postings_data;
self.block_max_score_cache = None;
self.block_loaded = false;
if let Some(skip_data) = skip_data_opt {
self.skip_reader.reset(skip_data, doc_freq);
} else {
self.skip_reader.reset(OwnedBytes::empty(), doc_freq);
}
self.doc_freq = doc_freq;
self.load_block();
Ok(())
}
/// Returns the overall number of documents in the block postings.
/// It does not take in account whether documents are deleted or not.
///
@@ -223,41 +154,31 @@ impl BlockSegmentPostings {
/// returned by `.docs()` is empty.
#[inline]
pub fn docs(&self) -> &[DocId] {
debug_assert!(self.block_is_loaded());
debug_assert!(self.block_loaded);
self.doc_decoder.output_array()
}
/// Return the document at index `idx` of the block.
#[inline]
pub fn doc(&self, idx: usize) -> u32 {
debug_assert!(self.block_loaded);
self.doc_decoder.output(idx)
}
/// Return the array of `term freq` in the block.
#[inline]
pub fn freqs(&self) -> &[u32] {
debug_assert!(self.block_is_loaded());
debug_assert!(self.block_loaded);
self.freq_decoder.output_array()
}
/// Return the frequency at index `idx` of the block.
#[inline]
pub fn freq(&self, idx: usize) -> u32 {
debug_assert!(self.block_is_loaded());
debug_assert!(self.block_loaded);
self.freq_decoder.output(idx)
}
/// Returns the length of the current block.
///
/// All blocks have a length of `NUM_DOCS_PER_BLOCK`,
/// except the last block that may have a length
/// of any number between 1 and `NUM_DOCS_PER_BLOCK - 1`
#[inline]
pub fn block_len(&self) -> usize {
debug_assert!(self.block_is_loaded());
self.doc_decoder.output_len
}
/// Position on a block that may contains `target_doc`.
///
/// If all docs are smaller than target, the block loaded may be empty,
@@ -281,10 +202,77 @@ impl BlockSegmentPostings {
doc
}
pub(crate) fn position_offset(&self) -> u64 {
pub fn position_offset(&self) -> u64 {
self.skip_reader.position_offset()
}
/// Advance to the next block.
pub fn advance(&mut self) {
self.skip_reader.advance();
self.block_loaded = false;
self.block_max_score_cache = None;
self.load_block();
}
/// Returns the block_max_score for the current block.
/// It does not require the block to be loaded. For instance, it is ok to call this method
/// after having called `.shallow_advance(..)`.
///
/// See `TermScorer::block_max_score(..)` for more information.
pub fn block_max_score(
&mut self,
fieldnorm_reader: &FieldNormReader,
bm25_weight: &Bm25Weight,
) -> Score {
if let Some(score) = self.block_max_score_cache {
return score;
}
if let Some(skip_reader_max_score) = self.skip_reader.block_max_score(bm25_weight) {
// if we are on a full block, the skip reader should have the block max information
// for us
self.block_max_score_cache = Some(skip_reader_max_score);
return skip_reader_max_score;
}
// this is the last block of the segment posting list.
// If it is actually loaded, we can compute block max manually.
if self.block_loaded {
let docs = self.doc_decoder.output_array().iter().cloned();
let freqs = self.freq_decoder.output_array().iter().cloned();
let bm25_scores = docs.zip(freqs).map(|(doc, term_freq)| {
let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc);
bm25_weight.score(fieldnorm_id, term_freq)
});
let block_max_score = max_score(bm25_scores).unwrap_or(0.0);
self.block_max_score_cache = Some(block_max_score);
return block_max_score;
}
// We do not have access to any good block max value. We return bm25_weight.max_score()
// as it is a valid upperbound.
//
// We do not cache it however, so that it gets computed when once block is loaded.
bm25_weight.max_score()
}
}
impl BlockSegmentPostings {
/// Returns an empty segment postings object
pub fn empty() -> BlockSegmentPostings {
BlockSegmentPostings {
doc_decoder: BlockDecoder::with_val(TERMINATED),
block_loaded: true,
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option: FreqReadingOption::NoFreq,
block_max_score_cache: None,
doc_freq: 0,
data: OwnedBytes::empty(),
skip_reader: SkipReader::new(OwnedBytes::empty(), 0, IndexRecordOption::Basic),
}
}
pub(crate) fn skip_reader(&self) -> &SkipReader {
&self.skip_reader
}
/// Dangerous API! This calls seeks the next block on the skip list,
/// but does not `.load_block()` afterwards.
///
@@ -298,15 +286,11 @@ impl BlockSegmentPostings {
}
}
pub(crate) fn block_is_loaded(&self) -> bool {
self.block_loaded
}
pub(crate) fn load_block(&mut self) {
let offset = self.skip_reader.byte_offset();
if self.block_is_loaded() {
if self.block_loaded {
return;
}
let offset = self.skip_reader.byte_offset();
match self.skip_reader.block_info() {
BlockInfo::BitPacked {
doc_num_bits,
@@ -351,32 +335,6 @@ impl BlockSegmentPostings {
}
self.block_loaded = true;
}
/// Advance to the next block.
pub fn advance(&mut self) {
self.skip_reader.advance();
self.block_loaded = false;
self.block_max_score_cache = None;
self.load_block();
}
/// Returns an empty segment postings object
pub fn empty() -> BlockSegmentPostings {
BlockSegmentPostings {
doc_decoder: BlockDecoder::with_val(TERMINATED),
block_loaded: true,
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option: FreqReadingOption::NoFreq,
block_max_score_cache: None,
doc_freq: 0,
data: OwnedBytes::empty(),
skip_reader: SkipReader::new(OwnedBytes::empty(), 0, IndexRecordOption::Basic),
}
}
pub(crate) fn skip_reader(&self) -> &SkipReader {
&self.skip_reader
}
}
#[cfg(test)]
@@ -387,10 +345,8 @@ mod tests {
use crate::docset::{DocSet, TERMINATED};
use crate::index::Index;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::postings::postings::Postings;
use crate::postings::SegmentPostings;
use crate::postings::{Postings as _, SegmentPostings};
use crate::schema::{IndexRecordOption, Schema, Term, INDEXED};
use crate::DocId;
#[test]
fn test_empty_segment_postings() {
@@ -479,31 +435,6 @@ mod tests {
Ok(())
}
fn build_block_postings(docs: &[DocId]) -> crate::Result<BlockSegmentPostings> {
let mut schema_builder = Schema::builder();
let int_field = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
let mut last_doc = 0u32;
for &doc in docs {
for _ in last_doc..doc {
index_writer.add_document(doc!(int_field=>1u64))?;
}
index_writer.add_document(doc!(int_field=>0u64))?;
last_doc = doc + 1;
}
index_writer.commit()?;
let searcher = index.reader()?.searcher();
let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field).unwrap();
let term = Term::from_field_u64(int_field, 0u64);
let term_info = inverted_index.get_term_info(&term)?.unwrap();
let block_postings = inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?;
Ok(block_postings)
}
#[test]
fn test_block_segment_postings_seek() -> crate::Result<()> {
let mut docs = vec![0];
@@ -539,7 +470,7 @@ mod tests {
let searcher = index.reader()?.searcher();
let segment_reader = searcher.segment_reader(0);
let mut block_segments;
let block_segments;
{
let term = Term::from_field_u64(int_field, 0u64);
let inverted_index = segment_reader.inverted_index(int_field)?;
@@ -548,13 +479,6 @@ mod tests {
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?;
}
assert_eq!(block_segments.docs(), &[0, 2, 4]);
{
let term = Term::from_field_u64(int_field, 1u64);
let inverted_index = segment_reader.inverted_index(int_field)?;
let term_info = inverted_index.get_term_info(&term)?.unwrap();
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?;
}
assert_eq!(block_segments.docs(), &[1, 3, 5]);
Ok(())
}
}

View File

@@ -1,13 +1,52 @@
use std::io;
use crate::codec::postings::PostingsCodec;
use crate::codec::standard::postings::block_segment_postings::BlockSegmentPostings;
use crate::codec::standard::postings::segment_postings::SegmentPostings;
use crate::fieldnorm::FieldNormReader;
use crate::positions::PositionReader;
use crate::schema::IndexRecordOption;
use crate::Score;
mod block;
mod postings_serializer;
mod block_segment_postings;
mod segment_postings;
mod skip;
mod standard_postings_serializer;
pub use postings_serializer::StandardPostingsSerializer;
pub use standard_postings_serializer::StandardPostingsSerializer;
pub struct StandardPostingsCodec;
impl PostingsCodec for StandardPostingsCodec {
type PostingsSerializer = StandardPostingsSerializer;
type Postings = SegmentPostings;
fn new_serializer(
&self,
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> Self::PostingsSerializer {
StandardPostingsSerializer::new(avg_fieldnorm, mode, fieldnorm_reader)
}
fn load_postings(
&self,
doc_freq: u32,
postings_data: common::OwnedBytes,
record_option: IndexRecordOption,
requested_option: IndexRecordOption,
positions_data_opt: Option<common::OwnedBytes>,
) -> io::Result<Self::Postings> {
// Rationalize record_option/requested_option.
let record_option = requested_option.downgrade(record_option);
let block_segment_postings =
BlockSegmentPostings::open(doc_freq, postings_data, record_option, requested_option)?;
let position_reader = positions_data_opt.map(PositionReader::open).transpose()?;
Ok(SegmentPostings::from_block_postings(
block_segment_postings,
position_reader,
))
}
}

View File

@@ -1,11 +1,13 @@
use common::HasLen;
use common::{BitSet, HasLen};
use super::BlockSegmentPostings;
use crate::docset::DocSet;
use crate::fastfield::AliveBitSet;
use crate::fieldnorm::FieldNormReader;
use crate::positions::PositionReader;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::postings::{BlockSegmentPostings, Postings};
use crate::{DocId, TERMINATED};
use crate::postings::{FreqReadingOption, Postings};
use crate::query::Bm25Weight;
use crate::{DocId, Score, TERMINATED};
/// `SegmentPostings` represents the inverted list or postings associated with
/// a term in a `Segment`.
@@ -29,31 +31,6 @@ impl SegmentPostings {
}
}
/// Compute the number of non-deleted documents.
///
/// This method will clone and scan through the posting lists.
/// (this is a rather expensive operation).
pub fn doc_freq_given_deletes(&self, alive_bitset: &AliveBitSet) -> u32 {
let mut docset = self.clone();
let mut doc_freq = 0;
loop {
let doc = docset.doc();
if doc == TERMINATED {
return doc_freq;
}
if alive_bitset.is_alive(doc) {
doc_freq += 1u32;
}
docset.advance();
}
}
/// Returns the overall number of documents in the block postings.
/// It does not take in account whether documents are deleted or not.
pub fn doc_freq(&self) -> u32 {
self.block_cursor.doc_freq()
}
/// Creates a segment postings object with the given documents
/// and no frequency encoded.
///
@@ -64,13 +41,19 @@ impl SegmentPostings {
/// buffer with the serialized data.
#[cfg(test)]
pub fn create_from_docs(docs: &[u32]) -> SegmentPostings {
use crate::directory::FileSlice;
use crate::postings::serializer::PostingsSerializer;
use common::OwnedBytes;
use crate::schema::IndexRecordOption;
let mut buffer = Vec::new();
{
use crate::codec::postings::PostingsSerializer;
let mut postings_serializer =
PostingsSerializer::new(0.0, IndexRecordOption::Basic, None);
crate::codec::standard::postings::StandardPostingsSerializer::new(
0.0,
IndexRecordOption::Basic,
None,
);
postings_serializer.new_term(docs.len() as u32, false);
for &doc in docs {
postings_serializer.write_doc(doc, 1u32);
@@ -81,7 +64,7 @@ impl SegmentPostings {
}
let block_segment_postings = BlockSegmentPostings::open(
docs.len() as u32,
FileSlice::from(buffer),
OwnedBytes::new(buffer),
IndexRecordOption::Basic,
IndexRecordOption::Basic,
)
@@ -95,9 +78,11 @@ impl SegmentPostings {
doc_and_tfs: &[(u32, u32)],
fieldnorms: Option<&[u32]>,
) -> SegmentPostings {
use crate::directory::FileSlice;
use common::OwnedBytes;
use crate::codec::postings::PostingsSerializer as _;
use crate::codec::standard::postings::StandardPostingsSerializer;
use crate::fieldnorm::FieldNormReader;
use crate::postings::serializer::PostingsSerializer;
use crate::schema::IndexRecordOption;
use crate::Score;
let mut buffer: Vec<u8> = Vec::new();
@@ -114,7 +99,7 @@ impl SegmentPostings {
total_num_tokens as Score / fieldnorms.len() as Score
})
.unwrap_or(0.0);
let mut postings_serializer = PostingsSerializer::new(
let mut postings_serializer = StandardPostingsSerializer::new(
average_field_norm,
IndexRecordOption::WithFreqs,
fieldnorm_reader,
@@ -128,7 +113,7 @@ impl SegmentPostings {
.unwrap();
let block_segment_postings = BlockSegmentPostings::open(
doc_and_tfs.len() as u32,
FileSlice::from(buffer),
OwnedBytes::new(buffer),
IndexRecordOption::WithFreqs,
IndexRecordOption::WithFreqs,
)
@@ -158,7 +143,6 @@ impl DocSet for SegmentPostings {
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> DocId {
debug_assert!(self.block_cursor.block_is_loaded());
if self.cur == COMPRESSION_BLOCK_SIZE - 1 {
self.cur = 0;
self.block_cursor.advance();
@@ -191,6 +175,19 @@ impl DocSet for SegmentPostings {
fn size_hint(&self) -> u32 {
self.len() as u32
}
fn fill_bitset(&mut self, bitset: &mut BitSet) {
loop {
let docs = self.block_cursor.docs();
if docs.is_empty() {
break;
}
for &doc in docs {
bitset.insert(doc);
}
self.block_cursor.advance();
}
}
}
impl HasLen for SegmentPostings {
@@ -200,6 +197,15 @@ impl HasLen for SegmentPostings {
}
impl Postings for SegmentPostings {
fn new_term_scorer(
self: Box<Self>,
fieldnorm_reader: FieldNormReader,
similarity_weight: Bm25Weight,
) -> Box<dyn crate::query::Scorer> {
use crate::query::term_query::TermScorer;
Box::new(TermScorer::new(*self, fieldnorm_reader, similarity_weight))
}
/// Returns the frequency associated with the current document.
/// If the schema is set up so that no frequency have been encoded,
/// this method should always return 1.
@@ -221,6 +227,12 @@ impl Postings for SegmentPostings {
self.block_cursor.freq(self.cur)
}
/// Returns the overall number of documents in the block postings.
/// It does not take in account whether documents are deleted or not.
fn doc_freq(&self) -> u32 {
self.block_cursor.doc_freq()
}
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let term_freq = self.term_freq();
let prev_len = output.len();
@@ -244,6 +256,29 @@ impl Postings for SegmentPostings {
}
}
}
fn supports_block_max(&self) -> bool {
true
}
fn seek_block(
&mut self,
target_doc: crate::DocId,
fieldnorm_reader: &FieldNormReader,
similarity_weight: &Bm25Weight,
) -> Score {
self.block_cursor.seek_block(target_doc);
self.block_cursor
.block_max_score(&fieldnorm_reader, &similarity_weight)
}
fn last_doc_in_block(&self) -> crate::DocId {
self.block_cursor.skip_reader().last_doc_in_block()
}
fn freq_reading_option(&self) -> FreqReadingOption {
self.block_cursor.freq_reading_option()
}
}
#[cfg(test)]

View File

@@ -30,29 +30,28 @@ pub struct StandardPostingsSerializer {
term_has_freq: bool,
}
impl PostingsSerializer for StandardPostingsSerializer {
fn new(
impl StandardPostingsSerializer {
pub fn new(
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> StandardPostingsSerializer {
Self {
last_doc_id_encoded: 0,
block_encoder: BlockEncoder::new(),
block: Box::new(Block::new()),
postings_write: Vec::new(),
skip_write: SkipSerializer::new(),
last_doc_id_encoded: 0u32,
mode,
fieldnorm_reader,
bm25_weight: None,
avg_fieldnorm,
term_has_freq: false,
}
}
}
impl PostingsSerializer for StandardPostingsSerializer {
fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) {
self.bm25_weight = None;
@@ -124,6 +123,11 @@ impl PostingsSerializer for StandardPostingsSerializer {
self.bm25_weight = None;
Ok(())
}
fn clear(&mut self) {
self.block.clear();
self.last_doc_id_encoded = 0;
}
}
impl StandardPostingsSerializer {
@@ -179,9 +183,4 @@ impl StandardPostingsSerializer {
}
self.block.clear();
}
fn clear(&mut self) {
self.block.clear();
self.last_doc_id_encoded = 0;
}
}

View File

@@ -4,7 +4,7 @@ use common::{replace_in_place, JsonPathWriter};
use rustc_hash::FxHashMap;
use crate::indexer::indexing_term::IndexingTerm;
use crate::postings::{IndexingContext, IndexingPosition, PostingsWriter};
use crate::postings::{IndexingContext, IndexingPosition, PostingsWriter as _, PostingsWriterEnum};
use crate::schema::document::{ReferenceValue, ReferenceValueLeaf, Value};
use crate::schema::{Type, DATE_TIME_PRECISION_INDEXED};
use crate::time::format_description::well_known::Rfc3339;
@@ -80,7 +80,7 @@ fn index_json_object<'a, V: Value<'a>>(
text_analyzer: &mut TextAnalyzer,
term_buffer: &mut IndexingTerm,
json_path_writer: &mut JsonPathWriter,
postings_writer: &mut dyn PostingsWriter,
postings_writer: &mut PostingsWriterEnum,
ctx: &mut IndexingContext,
positions_per_path: &mut IndexingPositionsPerPath,
) {
@@ -110,7 +110,7 @@ pub(crate) fn index_json_value<'a, V: Value<'a>>(
text_analyzer: &mut TextAnalyzer,
term_buffer: &mut IndexingTerm,
json_path_writer: &mut JsonPathWriter,
postings_writer: &mut dyn PostingsWriter,
postings_writer: &mut PostingsWriterEnum,
ctx: &mut IndexingContext,
positions_per_path: &mut IndexingPositionsPerPath,
) {

View File

@@ -1,5 +1,7 @@
use std::borrow::{Borrow, BorrowMut};
use common::BitSet;
use crate::fastfield::AliveBitSet;
use crate::DocId;
@@ -106,6 +108,15 @@ pub trait DocSet: Send {
buffer.len()
}
/// TODO comment on the size of the bitset
fn fill_bitset(&mut self, bitset: &mut BitSet) {
let mut doc = self.doc();
while doc != TERMINATED {
bitset.insert(doc);
doc = self.advance();
}
}
/// Returns the current document
/// Right after creating a new `DocSet`, the docset points to the first document.
///

View File

@@ -1,4 +1,5 @@
use std::io;
use std::sync::Arc;
use common::json_path_writer::JSON_END_OF_PATH;
use common::{BinarySerializable, ByteCount};
@@ -9,9 +10,9 @@ use itertools::Itertools;
#[cfg(feature = "quickwit")]
use tantivy_fst::automaton::{AlwaysMatch, Automaton};
use crate::codec::{ObjectSafeCodec, StandardCodec};
use crate::directory::FileSlice;
use crate::positions::PositionReader;
use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo};
use crate::postings::{Postings, TermInfo};
use crate::schema::{IndexRecordOption, Term, Type};
use crate::termdict::TermDictionary;
@@ -33,6 +34,7 @@ pub struct InvertedIndexReader {
positions_file_slice: FileSlice,
record_option: IndexRecordOption,
total_num_tokens: u64,
codec: Arc<dyn ObjectSafeCodec>,
}
/// Object that records the amount of space used by a field in an inverted index.
@@ -68,6 +70,7 @@ impl InvertedIndexReader {
postings_file_slice: FileSlice,
positions_file_slice: FileSlice,
record_option: IndexRecordOption,
codec: Arc<dyn ObjectSafeCodec>,
) -> io::Result<InvertedIndexReader> {
let (total_num_tokens_slice, postings_body) = postings_file_slice.split(8);
let total_num_tokens = u64::deserialize(&mut total_num_tokens_slice.read_bytes()?)?;
@@ -77,6 +80,7 @@ impl InvertedIndexReader {
positions_file_slice,
record_option,
total_num_tokens,
codec,
})
}
@@ -89,6 +93,7 @@ impl InvertedIndexReader {
positions_file_slice: FileSlice::empty(),
record_option,
total_num_tokens: 0u64,
codec: Arc::new(StandardCodec),
}
}
@@ -160,62 +165,26 @@ impl InvertedIndexReader {
Ok(fields)
}
/// Resets the block segment to another position of the postings
/// file.
///
/// This is useful for enumerating through a list of terms,
/// and consuming the associated posting lists while avoiding
/// reallocating a [`BlockSegmentPostings`].
///
/// # Warning
///
/// This does not reset the positions list.
pub fn reset_block_postings_from_terminfo(
&self,
term_info: &TermInfo,
block_postings: &mut BlockSegmentPostings,
) -> io::Result<()> {
let postings_slice = self
.postings_file_slice
.slice(term_info.postings_range.clone());
let postings_bytes = postings_slice.read_bytes()?;
block_postings.reset(term_info.doc_freq, postings_bytes)?;
Ok(())
}
/// Returns a block postings given a `Term`.
/// This method is for an advanced usage only.
///
/// Most users should prefer using [`Self::read_postings()`] instead.
pub fn read_block_postings(
&self,
term: &Term,
option: IndexRecordOption,
) -> io::Result<Option<BlockSegmentPostings>> {
self.get_term_info(term)?
.map(move |term_info| self.read_block_postings_from_terminfo(&term_info, option))
.transpose()
}
/// Returns a block postings given a `term_info`.
/// This method is for an advanced usage only.
///
/// Most users should prefer using [`Self::read_postings()`] instead.
pub fn read_block_postings_from_terminfo(
&self,
term_info: &TermInfo,
requested_option: IndexRecordOption,
) -> io::Result<BlockSegmentPostings> {
let postings_data = self
.postings_file_slice
.slice(term_info.postings_range.clone());
BlockSegmentPostings::open(
term_info.doc_freq,
postings_data,
self.record_option,
requested_option,
)
}
// /// Returns a block postings given a `term_info`.
// /// This method is for an advanced usage only.
// ///
// /// Most users should prefer using [`Self::read_postings()`] instead.
// pub fn read_block_postings_from_terminfo(
// &self,
// term_info: &TermInfo,
// requested_option: IndexRecordOption,
// ) -> io::Result<BlockSegmentPostings> {
// let postings_data = self
// .postings_file_slice
// .slice(term_info.postings_range.clone())
// .read_bytes()?;
// BlockSegmentPostings::open(
// term_info.doc_freq,
// postings_data,
// self.record_option,
// requested_option,
// )
// }
/// Returns a posting object given a `term_info`.
/// This method is for an advanced usage only.
@@ -225,25 +194,32 @@ impl InvertedIndexReader {
&self,
term_info: &TermInfo,
option: IndexRecordOption,
) -> io::Result<SegmentPostings> {
) -> io::Result<Box<dyn Postings>> {
let option = option.downgrade(self.record_option);
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
let position_reader = {
let postings_data = self
.postings_file_slice
.slice(term_info.postings_range.clone())
.read_bytes()?;
let positions_data = {
if option.has_positions() {
let positions_data = self
.positions_file_slice
.read_bytes_slice(term_info.positions_range.clone())?;
let position_reader = PositionReader::open(positions_data)?;
Some(position_reader)
.slice(term_info.positions_range.clone())
.read_bytes()?;
Some(positions_data)
} else {
None
}
};
Ok(SegmentPostings::from_block_postings(
block_postings,
position_reader,
))
let postings = self.codec.load_postings_type_erased(
term_info.doc_freq,
postings_data,
self.record_option,
option,
positions_data,
)?;
Ok(postings)
}
/// Returns the total number of tokens recorded for all documents
@@ -266,7 +242,7 @@ impl InvertedIndexReader {
&self,
term: &Term,
option: IndexRecordOption,
) -> io::Result<Option<SegmentPostings>> {
) -> io::Result<Option<Box<dyn Postings>>> {
self.get_term_info(term)?
.map(move |term_info| self.read_postings_from_terminfo(&term_info, option))
.transpose()

View File

@@ -6,6 +6,7 @@ use common::{ByteCount, HasLen};
use fnv::FnvHashMap;
use itertools::Itertools;
use crate::codec::ObjectSafeCodec;
use crate::directory::{CompositeFile, FileSlice};
use crate::error::DataCorruption;
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
@@ -47,6 +48,8 @@ pub struct SegmentReader {
store_file: FileSlice,
alive_bitset_opt: Option<AliveBitSet>,
schema: Schema,
codec: Arc<dyn ObjectSafeCodec>,
}
impl SegmentReader {
@@ -149,6 +152,7 @@ impl SegmentReader {
segment: &Segment<C>,
custom_bitset: Option<AliveBitSet>,
) -> crate::Result<SegmentReader> {
let codec: Arc<dyn ObjectSafeCodec> = Arc::new(segment.index().codec().clone());
let termdict_file = segment.open_read(SegmentComponent::Terms)?;
let termdict_composite = CompositeFile::open(&termdict_file)?;
@@ -204,6 +208,7 @@ impl SegmentReader {
alive_bitset_opt,
positions_composite,
schema,
codec,
})
}
@@ -273,6 +278,7 @@ impl SegmentReader {
postings_file,
positions_file,
record_option,
self.codec.clone(),
)?);
// by releasing the lock in between, we may end up opening the inverting index

View File

@@ -7,6 +7,7 @@ use common::ReadOnlyBitSet;
use itertools::Itertools;
use measure_time::debug_time;
use crate::codec::{Codec, StandardCodec};
use crate::directory::WritePtr;
use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
@@ -15,7 +16,7 @@ use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer,
use crate::index::{Segment, SegmentComponent, SegmentReader};
use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping};
use crate::indexer::SegmentSerializer;
use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
use crate::postings::{FreqReadingOption, InvertedIndexSerializer, Postings};
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema};
use crate::store::StoreWriter;
use crate::termdict::{TermMerger, TermOrdinal};
@@ -76,10 +77,11 @@ fn estimate_total_num_tokens(readers: &[SegmentReader], field: Field) -> crate::
Ok(total_num_tokens)
}
pub struct IndexMerger {
pub struct IndexMerger<C: Codec = StandardCodec> {
schema: Schema,
pub(crate) readers: Vec<SegmentReader>,
max_doc: u32,
codec: C,
}
struct DeltaComputer {
@@ -144,11 +146,8 @@ fn extract_fast_field_required_columns(schema: &Schema) -> Vec<(String, ColumnTy
.collect()
}
impl IndexMerger {
pub fn open<C: crate::codec::Codec>(
schema: Schema,
segments: &[Segment<C>],
) -> crate::Result<IndexMerger> {
impl<C: Codec> IndexMerger<C> {
pub fn open(schema: Schema, segments: &[Segment<C>]) -> crate::Result<IndexMerger<C>> {
let alive_bitset = segments.iter().map(|_| None).collect_vec();
Self::open_with_custom_alive_set(schema, segments, alive_bitset)
}
@@ -165,11 +164,15 @@ impl IndexMerger {
// This can be used to merge but also apply an additional filter.
// One use case is demux, which is basically taking a list of
// segments and partitions them e.g. by a value in a field.
pub fn open_with_custom_alive_set<C: crate::codec::Codec>(
//
// # Panics if segments is empty.
pub fn open_with_custom_alive_set(
schema: Schema,
segments: &[Segment<C>],
alive_bitset_opt: Vec<Option<AliveBitSet>>,
) -> crate::Result<IndexMerger> {
) -> crate::Result<IndexMerger<C>> {
assert!(!segments.is_empty());
let codec = segments[0].index().codec().clone();
let mut readers = vec![];
for (segment, new_alive_bitset_opt) in segments.iter().zip(alive_bitset_opt) {
if segment.meta().num_docs() > 0 {
@@ -192,6 +195,7 @@ impl IndexMerger {
schema,
readers,
max_doc,
codec,
})
}
@@ -290,7 +294,7 @@ impl IndexMerger {
&self,
indexed_field: Field,
_field_type: &FieldType,
serializer: &mut InvertedIndexSerializer,
serializer: &mut InvertedIndexSerializer<C>,
fieldnorm_reader: Option<FieldNormReader>,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
@@ -358,7 +362,7 @@ impl IndexMerger {
indexed. Have you modified the schema?",
);
let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![];
let mut segment_postings_containing_the_term: Vec<(usize, Box<dyn Postings>)> = vec![];
while merged_terms.advance() {
segment_postings_containing_the_term.clear();
@@ -398,11 +402,10 @@ impl IndexMerger {
assert!(!segment_postings_containing_the_term.is_empty());
let has_term_freq = {
let has_term_freq = !segment_postings_containing_the_term[0]
let has_term_freq = segment_postings_containing_the_term[0]
.1
.block_cursor
.freqs()
.is_empty();
.freq_reading_option()
== FreqReadingOption::ReadFreq;
for (_, postings) in &segment_postings_containing_the_term[1..] {
// This may look at a strange way to test whether we have term freq or not.
// With JSON object, the schema is not sufficient to know whether a term
@@ -418,7 +421,7 @@ impl IndexMerger {
//
// Overall the reliable way to know if we have actual frequencies loaded or not
// is to check whether the actual decoded array is empty or not.
if has_term_freq == postings.block_cursor.freqs().is_empty() {
if postings.freq_reading_option() != FreqReadingOption::ReadFreq {
return Err(DataCorruption::comment_only(
"Term freqs are inconsistent across segments",
)
@@ -470,7 +473,7 @@ impl IndexMerger {
fn write_postings(
&self,
serializer: &mut InvertedIndexSerializer,
serializer: &mut InvertedIndexSerializer<C>,
fieldnorm_readers: FieldNormReaders,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
@@ -528,10 +531,7 @@ impl IndexMerger {
///
/// # Returns
/// The number of documents in the resulting segment.
pub fn write<C: crate::codec::Codec>(
&self,
mut serializer: SegmentSerializer<C>,
) -> crate::Result<u32> {
pub fn write(&self, mut serializer: SegmentSerializer<C>) -> crate::Result<u32> {
let doc_id_mapping = self.get_doc_id_from_concatenated_data()?;
debug!("write-fieldnorms");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {
@@ -1527,7 +1527,7 @@ mod tests {
.term_scorer_for_test(searcher.segment_reader(0u32), 1.0)?
.unwrap();
assert_eq!(term_scorer.doc(), 0);
assert_nearly_equals!(term_scorer.block_max_score(), 0.0079681855);
assert_nearly_equals!(term_scorer.seek_block(0), 0.0079681855);
assert_nearly_equals!(term_scorer.score(), 0.0079681855);
for _ in 0..81 {
writer.add_document(doc!(text=>"hello happy tax payer"))?;
@@ -1546,7 +1546,7 @@ mod tests {
// there.
for doc in segment_reader.doc_ids_alive() {
assert_eq!(term_scorer.doc(), doc);
assert_nearly_equals!(term_scorer.block_max_score(), 0.003478312);
assert_nearly_equals!(term_scorer.seek_block(doc), 0.003478312);
assert_nearly_equals!(term_scorer.score(), 0.003478312);
term_scorer.advance();
}
@@ -1571,7 +1571,7 @@ mod tests {
// the difference compared to before is intrinsic to the bm25 formula. no worries there.
for doc in segment_reader.doc_ids_alive() {
assert_eq!(term_scorer.doc(), doc);
assert_nearly_equals!(term_scorer.block_max_score(), 0.003478312);
assert_nearly_equals!(term_scorer.seek_block(doc), 0.003478312);
assert_nearly_equals!(term_scorer.score(), 0.003478312);
term_scorer.advance();
}

View File

@@ -13,7 +13,7 @@ pub struct SegmentSerializer<C: crate::codec::Codec> {
pub(crate) store_writer: StoreWriter,
fast_field_write: WritePtr,
fieldnorms_serializer: Option<FieldNormsSerializer>,
postings_serializer: InvertedIndexSerializer,
postings_serializer: InvertedIndexSerializer<C>,
}
impl<C: crate::codec::Codec> SegmentSerializer<C> {
@@ -55,7 +55,7 @@ impl<C: crate::codec::Codec> SegmentSerializer<C> {
}
/// Accessor to the `PostingsSerializer`.
pub fn get_postings_serializer(&mut self) -> &mut InvertedIndexSerializer {
pub fn get_postings_serializer(&mut self) -> &mut InvertedIndexSerializer<C> {
&mut self.postings_serializer
}

View File

@@ -115,7 +115,7 @@ fn merge<Codec: crate::codec::Codec>(
.collect();
// An IndexMerger is like a "view" of our merged segments.
let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?;
let merger: IndexMerger<Codec> = IndexMerger::open(index.schema(), &segments[..])?;
// ... we just serialize this index merger in our new segment to merge the segments.
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?;
@@ -186,12 +186,12 @@ pub fn merge_indices<Codec: crate::codec::Codec>(
/// meant to work if you have an `IndexWriter` running for the origin indices, or
/// the destination `Index`.
#[doc(hidden)]
pub fn merge_filtered_segments<Codec: crate::codec::Codec, T: Into<Box<dyn Directory>>>(
segments: &[Segment<Codec>],
pub fn merge_filtered_segments<C: crate::codec::Codec, T: Into<Box<dyn Directory>>>(
segments: &[Segment<C>],
target_settings: IndexSettings,
filter_doc_ids: Vec<Option<AliveBitSet>>,
output_directory: T,
) -> crate::Result<Index<Codec>> {
) -> crate::Result<Index<C>> {
if segments.is_empty() {
// If there are no indices to merge, there is no need to do anything.
return Err(crate::TantivyError::InvalidArgument(
@@ -212,7 +212,7 @@ pub fn merge_filtered_segments<Codec: crate::codec::Codec, T: Into<Box<dyn Direc
));
}
let mut merged_index: Index<Codec> = Index::builder()
let mut merged_index: Index<C> = Index::builder()
.schema(target_schema.clone())
.codec(segments[0].index().codec().clone())
.settings(target_settings.clone())
@@ -220,7 +220,7 @@ pub fn merge_filtered_segments<Codec: crate::codec::Codec, T: Into<Box<dyn Direc
let merged_segment = merged_index.new_segment();
let merged_segment_id = merged_segment.id();
let merger: IndexMerger =
let merger: IndexMerger<C> =
IndexMerger::open_with_custom_alive_set(merged_index.schema(), segments, filter_doc_ids)?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment)?;
let num_docs = merger.write(segment_serializer)?;

View File

@@ -13,7 +13,7 @@ use crate::indexer::segment_serializer::SegmentSerializer;
use crate::json_utils::{index_json_value, IndexingPositionsPerPath};
use crate::postings::{
compute_table_memory_size, serialize_postings, IndexingContext, IndexingPosition,
PerFieldPostingsWriter, PostingsWriter,
PerFieldPostingsWriter, PostingsWriter, PostingsWriterEnum,
};
use crate::schema::document::{Document, Value};
use crate::schema::{FieldEntry, FieldType, Schema, DATE_TIME_PRECISION_INDEXED};
@@ -173,7 +173,7 @@ impl<Codec: crate::codec::Codec> SegmentWriter<Codec> {
}
let (term_buffer, ctx) = (&mut self.term_buffer, &mut self.ctx);
let postings_writer: &mut dyn PostingsWriter =
let postings_writer: &mut PostingsWriterEnum =
self.per_field_postings_writers.get_for_field_mut(field);
term_buffer.clear_with_field(field);

View File

@@ -3,6 +3,7 @@ use std::io;
use common::json_path_writer::JSON_END_OF_PATH;
use stacker::Addr;
use crate::codec::Codec;
use crate::indexer::indexing_term::IndexingTerm;
use crate::indexer::path_to_unordered_id::OrderedPathId;
use crate::postings::postings_writer::SpecializedPostingsWriter;
@@ -52,12 +53,12 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
}
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
fn serialize<C: Codec>(
&self,
ordered_term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
serializer: &mut FieldSerializer<C>,
) -> io::Result<()> {
let mut term_buffer = JsonTermSerializer(Vec::with_capacity(48));
let mut buffer_lender = BufferLender::default();

View File

@@ -1,5 +1,8 @@
use crate::docset::{DocSet, TERMINATED};
use crate::postings::{Postings, SegmentPostings};
use crate::fieldnorm::FieldNormReader;
use crate::postings::Postings;
use crate::query::term_query::TermScorer;
use crate::query::{Bm25Weight, Scorer};
use crate::DocId;
/// `LoadedPostings` is a `DocSet` and `Postings` implementation.
@@ -22,19 +25,27 @@ pub struct LoadedPostings {
}
impl LoadedPostings {
fn new_term_scorer(
self: Box<Self>,
fieldnorm_reader: FieldNormReader,
similarity_weight: Bm25Weight,
) -> Box<dyn Scorer> {
Box::new(TermScorer::new(*self, fieldnorm_reader, similarity_weight))
}
/// Creates a new `LoadedPostings` from a `SegmentPostings`.
///
/// It will also preload positions, if positions are available in the SegmentPostings.
pub fn load(segment_postings: &mut SegmentPostings) -> LoadedPostings {
let num_docs = segment_postings.doc_freq() as usize;
pub fn load(postings: &mut Box<dyn Postings>) -> LoadedPostings {
let num_docs = postings.doc_freq() as usize;
let mut doc_ids = Vec::with_capacity(num_docs);
let mut positions = Vec::with_capacity(num_docs);
let mut position_offsets = Vec::with_capacity(num_docs);
while segment_postings.doc() != TERMINATED {
while postings.doc() != TERMINATED {
position_offsets.push(positions.len() as u32);
doc_ids.push(segment_postings.doc());
segment_postings.append_positions_with_offset(0, &mut positions);
segment_postings.advance();
doc_ids.push(postings.doc());
postings.append_positions_with_offset(0, &mut positions);
postings.advance();
}
position_offsets.push(positions.len() as u32);
LoadedPostings {
@@ -88,6 +99,14 @@ impl DocSet for LoadedPostings {
}
}
impl Postings for LoadedPostings {
fn new_term_scorer(
self: Box<Self>,
fieldnorm_reader: FieldNormReader,
similarity_weight: Bm25Weight,
) -> Box<dyn Scorer> {
Box::new(TermScorer::new(*self, fieldnorm_reader, similarity_weight))
}
fn term_freq(&self) -> u32 {
let start = self.position_offsets[self.cursor] as usize;
let end = self.position_offsets[self.cursor + 1] as usize;
@@ -101,6 +120,10 @@ impl Postings for LoadedPostings {
output.push(*pos + offset);
}
}
fn freq_reading_option(&self) -> super::FreqReadingOption {
super::FreqReadingOption::ReadFreq
}
}
#[cfg(test)]

View File

@@ -4,7 +4,7 @@ mod block_search;
pub(crate) use self::block_search::branchless_binary_search;
mod block_segment_postings;
// mod block_segment_postings;
pub(crate) mod compression;
mod indexing_context;
mod json_postings_writer;
@@ -13,27 +13,24 @@ mod per_field_postings_writer;
mod postings;
mod postings_writer;
mod recorder;
mod segment_postings;
mod serializer;
mod skip;
mod term_info;
pub(crate) use loaded_postings::LoadedPostings;
pub(crate) use stacker::compute_table_memory_size;
pub use self::block_segment_postings::BlockSegmentPostings;
pub(crate) use self::indexing_context::IndexingContext;
pub(crate) use self::per_field_postings_writer::PerFieldPostingsWriter;
pub use self::postings::Postings;
pub(crate) use self::postings_writer::{serialize_postings, IndexingPosition, PostingsWriter};
pub use self::segment_postings::SegmentPostings;
pub(crate) use self::postings_writer::{
serialize_postings, IndexingPosition, PostingsWriter, PostingsWriterEnum,
};
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub use self::term_info::TermInfo;
#[expect(clippy::enum_variant_names)]
#[derive(Debug, PartialEq, Clone, Copy, Eq)]
pub(crate) enum FreqReadingOption {
pub enum FreqReadingOption {
NoFreq,
SkipFreq,
ReadFreq,

View File

@@ -1,4 +1,9 @@
use crate::docset::DocSet;
use crate::fastfield::AliveBitSet;
use crate::fieldnorm::FieldNormReader;
use crate::postings::FreqReadingOption;
use crate::query::{Bm25Weight, Scorer};
use crate::{Score, TERMINATED};
/// Postings (also called inverted list)
///
@@ -11,9 +16,42 @@ use crate::docset::DocSet;
/// but other implementations mocking `SegmentPostings` exist,
/// for merging segments or for testing.
pub trait Postings: DocSet + 'static {
fn new_term_scorer(
self: Box<Self>,
fieldnorm_reader: FieldNormReader,
similarity_weight: Bm25Weight,
) -> Box<dyn Scorer> {
// let self_dyn: Box<dyn Postings> = self;
todo!();
}
/// The number of times the term appears in the document.
fn term_freq(&self) -> u32;
fn doc_freq(&self) -> u32 {
todo!();
}
/// Compute the number of non-deleted documents.
///
/// This method will clone and scan through the posting lists.
/// (this is a rather expensive operation).
fn doc_freq_given_deletes(&self, alive_bitset: &AliveBitSet) -> u32 {
todo!();
// let mut docset = self.clone();
// let mut doc_freq = 0;
// loop {
// let doc = docset.doc();
// if doc == TERMINATED {
// return doc_freq;
// }
// if alive_bitset.is_alive(doc) {
// doc_freq += 1u32;
// }
// docset.advance();
// }
}
/// Returns the positions offsetted with a given value.
/// It is not necessary to clear the `output` before calling this method.
/// The output vector will be resized to the `term_freq`.
@@ -31,9 +69,41 @@ pub trait Postings: DocSet + 'static {
fn positions(&mut self, output: &mut Vec<u32>) {
self.positions_with_offset(0u32, output);
}
fn freq_reading_option(&self) -> FreqReadingOption;
// TODO see if we can put that in a lift to PostingsWithBlockMax trait.
// supports Block-Wand
fn supports_block_max(&self) -> bool {
false
}
// TODO document
// Only allowed for block max.
fn seek_block(
&mut self,
target_doc: crate::DocId,
fieldnorm_reader: &FieldNormReader,
similarity_weight: &Bm25Weight,
) -> Score {
unimplemented!()
}
// TODO
// Only allowed for block max.
fn last_doc_in_block(&self) -> crate::DocId {
unimplemented!()
}
}
impl Postings for Box<dyn Postings> {
fn new_term_scorer(
self: Box<Self>,
fieldnorm_reader: FieldNormReader,
similarity_weight: Bm25Weight,
) -> Box<dyn Scorer> {
(*self).new_term_scorer(fieldnorm_reader, similarity_weight)
}
fn term_freq(&self) -> u32 {
(**self).term_freq()
}
@@ -41,4 +111,25 @@ impl Postings for Box<dyn Postings> {
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
(**self).append_positions_with_offset(offset, output);
}
fn supports_block_max(&self) -> bool {
(**self).supports_block_max()
}
fn seek_block(
&mut self,
target_doc: crate::DocId,
fieldnorm_reader: &FieldNormReader,
similarity_weight: &Bm25Weight,
) -> Score {
(**self).seek_block(target_doc, fieldnorm_reader, similarity_weight)
}
fn last_doc_in_block(&self) -> crate::DocId {
(**self).last_doc_in_block()
}
fn freq_reading_option(&self) -> FreqReadingOption {
(**self).freq_reading_option()
}
}

View File

@@ -4,6 +4,7 @@ use std::ops::Range;
use stacker::Addr;
use crate::codec::Codec;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::indexing_term::IndexingTerm;
use crate::indexer::path_to_unordered_id::OrderedPathId;
@@ -48,12 +49,12 @@ fn make_field_partition(
/// Serialize the inverted index.
/// It pushes all term, one field at a time, towards the
/// postings serializer.
pub(crate) fn serialize_postings(
pub(crate) fn serialize_postings<C: Codec>(
ctx: IndexingContext,
schema: Schema,
per_field_postings_writers: &PerFieldPostingsWriter,
fieldnorm_readers: FieldNormReaders,
serializer: &mut InvertedIndexSerializer,
serializer: &mut InvertedIndexSerializer<C>,
) -> crate::Result<()> {
// Replace unordered ids by ordered ids to be able to sort
let unordered_id_to_ordered_id: Vec<OrderedPathId> =
@@ -166,12 +167,12 @@ impl PostingsWriter for PostingsWriterEnum {
}
}
fn serialize(
fn serialize<C: Codec>(
&self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
serializer: &mut FieldSerializer<C>,
) -> io::Result<()> {
match self {
PostingsWriterEnum::DocId(writer) => {
@@ -254,12 +255,12 @@ pub(crate) trait PostingsWriter: Send + Sync {
/// Serializes the postings on disk.
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
fn serialize<C: Codec>(
&self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
serializer: &mut FieldSerializer<C>,
) -> io::Result<()>;
/// Tokenize a text and subscribe all of its token.
@@ -311,12 +312,12 @@ pub(crate) struct SpecializedPostingsWriter<Rec: Recorder> {
impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
#[inline]
pub(crate) fn serialize_one_term(
pub(crate) fn serialize_one_term<C: Codec>(
term: &[u8],
addr: Addr,
buffer_lender: &mut BufferLender,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
serializer: &mut FieldSerializer<C>,
) -> io::Result<()> {
let recorder: Rec = ctx.term_index.read(addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
@@ -357,12 +358,12 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
});
}
fn serialize(
fn serialize<C: Codec>(
&self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
_ordered_id_to_path: &[&str],
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
serializer: &mut FieldSerializer<C>,
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (_field, _path_id, term, addr) in term_addrs {

View File

@@ -1,6 +1,7 @@
use common::read_u32_vint;
use stacker::{ExpUnrolledLinkedList, MemoryArena};
use crate::codec::Codec;
use crate::postings::FieldSerializer;
use crate::DocId;
@@ -67,10 +68,10 @@ pub(crate) trait Recorder: Copy + Default + Send + Sync + 'static {
/// Close the document. It will help record the term frequency.
fn close_doc(&mut self, arena: &mut MemoryArena);
/// Pushes the postings information to the serializer.
fn serialize(
fn serialize<C: Codec>(
&self,
arena: &MemoryArena,
serializer: &mut FieldSerializer<'_>,
serializer: &mut FieldSerializer<C>,
buffer_lender: &mut BufferLender,
);
/// Returns the number of document containing this term.
@@ -110,10 +111,10 @@ impl Recorder for DocIdRecorder {
#[inline]
fn close_doc(&mut self, _arena: &mut MemoryArena) {}
fn serialize(
fn serialize<C: Codec>(
&self,
arena: &MemoryArena,
serializer: &mut FieldSerializer<'_>,
serializer: &mut FieldSerializer<C>,
buffer_lender: &mut BufferLender,
) {
let buffer = buffer_lender.lend_u8();
@@ -178,10 +179,10 @@ impl Recorder for TermFrequencyRecorder {
self.current_tf = 0;
}
fn serialize(
fn serialize<C: Codec>(
&self,
arena: &MemoryArena,
serializer: &mut FieldSerializer<'_>,
serializer: &mut FieldSerializer<C>,
buffer_lender: &mut BufferLender,
) {
let buffer = buffer_lender.lend_u8();
@@ -235,10 +236,10 @@ impl Recorder for TfAndPositionRecorder {
self.stack.writer(arena).write_u32_vint(POSITION_END);
}
fn serialize(
fn serialize<C: Codec>(
&self,
arena: &MemoryArena,
serializer: &mut FieldSerializer<'_>,
serializer: &mut FieldSerializer<C>,
buffer_lender: &mut BufferLender,
) {
let (buffer_u8, buffer_positions) = buffer_lender.lend_all();

View File

@@ -1,16 +1,14 @@
use std::cmp::Ordering;
use std::io::{self, Write};
use common::{BinarySerializable, CountingWriter, VInt};
use common::{BinarySerializable, CountingWriter};
use super::TermInfo;
use crate::codec::postings::PostingsSerializer;
use crate::codec::Codec;
use crate::directory::{CompositeWrite, WritePtr};
use crate::fieldnorm::FieldNormReader;
use crate::index::Segment;
use crate::positions::PositionSerializer;
use crate::postings::compression::{BlockEncoder, VIntEncoder, COMPRESSION_BLOCK_SIZE};
use crate::postings::skip::SkipSerializer;
use crate::query::Bm25Weight;
use crate::schema::{Field, FieldEntry, FieldType, IndexRecordOption, Schema};
use crate::termdict::TermDictionaryBuilder;
use crate::{DocId, Score};
@@ -46,24 +44,27 @@ use crate::{DocId, Score};
///
/// A description of the serialization format is
/// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html).
pub struct InvertedIndexSerializer {
pub struct InvertedIndexSerializer<C: Codec> {
terms_write: CompositeWrite<WritePtr>,
postings_write: CompositeWrite<WritePtr>,
positions_write: CompositeWrite<WritePtr>,
schema: Schema,
codec: C,
}
impl InvertedIndexSerializer {
use crate::codec::postings::PostingsCodec;
impl<C: Codec> InvertedIndexSerializer<C> {
/// Open a new `InvertedIndexSerializer` for the given segment
pub fn open<C: crate::codec::Codec>(
segment: &mut Segment<C>,
) -> crate::Result<InvertedIndexSerializer> {
pub fn open(segment: &mut Segment<C>) -> crate::Result<InvertedIndexSerializer<C>> {
use crate::index::SegmentComponent::{Positions, Postings, Terms};
let codec = segment.index().codec().clone();
let inv_index_serializer = InvertedIndexSerializer {
terms_write: CompositeWrite::wrap(segment.open_write(Terms)?),
postings_write: CompositeWrite::wrap(segment.open_write(Postings)?),
positions_write: CompositeWrite::wrap(segment.open_write(Positions)?),
schema: segment.schema(),
codec,
};
Ok(inv_index_serializer)
}
@@ -77,7 +78,7 @@ impl InvertedIndexSerializer {
field: Field,
total_num_tokens: u64,
fieldnorm_reader: Option<FieldNormReader>,
) -> io::Result<FieldSerializer<'_>> {
) -> io::Result<FieldSerializer<'_, C>> {
let field_entry: &FieldEntry = self.schema.get_field_entry(field);
let term_dictionary_write = self.terms_write.for_field(field);
let postings_write = self.postings_write.for_field(field);
@@ -90,6 +91,7 @@ impl InvertedIndexSerializer {
postings_write,
positions_write,
fieldnorm_reader,
&self.codec,
)
}
@@ -104,9 +106,9 @@ impl InvertedIndexSerializer {
/// The field serializer is in charge of
/// the serialization of a specific field.
pub struct FieldSerializer<'a> {
pub struct FieldSerializer<'a, C: Codec> {
term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter<WritePtr>>,
postings_serializer: PostingsSerializer,
postings_serializer: <C::PostingsCodec as PostingsCodec>::PostingsSerializer,
positions_serializer_opt: Option<PositionSerializer<&'a mut CountingWriter<WritePtr>>>,
current_term_info: TermInfo,
term_open: bool,
@@ -114,7 +116,7 @@ pub struct FieldSerializer<'a> {
postings_start_offset: u64,
}
impl<'a> FieldSerializer<'a> {
impl<'a, C: Codec> FieldSerializer<'a, C> {
fn create(
field_type: &FieldType,
total_num_tokens: u64,
@@ -122,7 +124,8 @@ impl<'a> FieldSerializer<'a> {
postings_write: &'a mut CountingWriter<WritePtr>,
positions_write: &'a mut CountingWriter<WritePtr>,
fieldnorm_reader: Option<FieldNormReader>,
) -> io::Result<FieldSerializer<'a>> {
codec: &C,
) -> io::Result<FieldSerializer<'a, C>> {
total_num_tokens.serialize(postings_write)?;
let index_record_option = field_type
.index_record_option()
@@ -132,8 +135,11 @@ impl<'a> FieldSerializer<'a> {
.as_ref()
.map(|ff_reader| total_num_tokens as Score / ff_reader.num_docs() as Score)
.unwrap_or(0.0);
let postings_serializer =
PostingsSerializer::new(average_fieldnorm, index_record_option, fieldnorm_reader);
let postings_serializer = codec.postings_codec().new_serializer(
average_fieldnorm,
index_record_option,
fieldnorm_reader,
);
let positions_serializer_opt = if index_record_option.has_positions() {
Some(PositionSerializer::new(positions_write))
} else {
@@ -250,223 +256,3 @@ impl<'a> FieldSerializer<'a> {
Ok(())
}
}
struct Block {
doc_ids: [DocId; COMPRESSION_BLOCK_SIZE],
term_freqs: [u32; COMPRESSION_BLOCK_SIZE],
len: usize,
}
impl Block {
fn new() -> Self {
Block {
doc_ids: [0u32; COMPRESSION_BLOCK_SIZE],
term_freqs: [0u32; COMPRESSION_BLOCK_SIZE],
len: 0,
}
}
fn doc_ids(&self) -> &[DocId] {
&self.doc_ids[..self.len]
}
fn term_freqs(&self) -> &[u32] {
&self.term_freqs[..self.len]
}
fn clear(&mut self) {
self.len = 0;
}
fn append_doc(&mut self, doc: DocId, term_freq: u32) {
let len = self.len;
self.doc_ids[len] = doc;
self.term_freqs[len] = term_freq;
self.len = len + 1;
}
fn is_full(&self) -> bool {
self.len == COMPRESSION_BLOCK_SIZE
}
fn is_empty(&self) -> bool {
self.len == 0
}
fn last_doc(&self) -> DocId {
assert_eq!(self.len, COMPRESSION_BLOCK_SIZE);
self.doc_ids[COMPRESSION_BLOCK_SIZE - 1]
}
}
pub struct PostingsSerializer {
last_doc_id_encoded: u32,
block_encoder: BlockEncoder,
block: Box<Block>,
postings_write: Vec<u8>,
skip_write: SkipSerializer,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
bm25_weight: Option<Bm25Weight>,
avg_fieldnorm: Score, /* Average number of term in the field for that segment.
* this value is used to compute the block wand information. */
term_has_freq: bool,
}
impl PostingsSerializer {
pub fn new(
avg_fieldnorm: Score,
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> PostingsSerializer {
PostingsSerializer {
block_encoder: BlockEncoder::new(),
block: Box::new(Block::new()),
postings_write: Vec::new(),
skip_write: SkipSerializer::new(),
last_doc_id_encoded: 0u32,
mode,
fieldnorm_reader,
bm25_weight: None,
avg_fieldnorm,
term_has_freq: false,
}
}
pub fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) {
self.bm25_weight = None;
self.term_has_freq = self.mode.has_freq() && record_term_freq;
if !self.term_has_freq {
return;
}
let num_docs_in_segment: u64 =
if let Some(fieldnorm_reader) = self.fieldnorm_reader.as_ref() {
fieldnorm_reader.num_docs() as u64
} else {
return;
};
if num_docs_in_segment == 0 {
return;
}
self.bm25_weight = Some(Bm25Weight::for_one_term_without_explain(
term_doc_freq as u64,
num_docs_in_segment,
self.avg_fieldnorm,
));
}
fn write_block(&mut self) {
{
// encode the doc ids
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_sorted(self.block.doc_ids(), self.last_doc_id_encoded);
self.last_doc_id_encoded = self.block.last_doc();
self.skip_write
.write_doc(self.last_doc_id_encoded, num_bits);
// last el block 0, offset block 1,
self.postings_write.extend(block_encoded);
}
if self.term_has_freq {
let (num_bits, block_encoded): (u8, &[u8]) = self
.block_encoder
.compress_block_unsorted(self.block.term_freqs(), true);
self.postings_write.extend(block_encoded);
self.skip_write.write_term_freq(num_bits);
if self.mode.has_positions() {
// We serialize the sum of term freqs within the skip information
// in order to navigate through positions.
let sum_freq = self.block.term_freqs().iter().cloned().sum();
self.skip_write.write_total_term_freq(sum_freq);
}
let mut blockwand_params = (0u8, 0u32);
if let Some(bm25_weight) = self.bm25_weight.as_ref() {
if let Some(fieldnorm_reader) = self.fieldnorm_reader.as_ref() {
let docs = self.block.doc_ids().iter().cloned();
let term_freqs = self.block.term_freqs().iter().cloned();
let fieldnorms = docs.map(|doc| fieldnorm_reader.fieldnorm_id(doc));
blockwand_params = fieldnorms
.zip(term_freqs)
.max_by(
|(left_fieldnorm_id, left_term_freq),
(right_fieldnorm_id, right_term_freq)| {
let left_score =
bm25_weight.tf_factor(*left_fieldnorm_id, *left_term_freq);
let right_score =
bm25_weight.tf_factor(*right_fieldnorm_id, *right_term_freq);
left_score
.partial_cmp(&right_score)
.unwrap_or(Ordering::Equal)
},
)
.unwrap();
}
}
let (fieldnorm_id, term_freq) = blockwand_params;
self.skip_write.write_blockwand_max(fieldnorm_id, term_freq);
}
self.block.clear();
}
pub fn write_doc(&mut self, doc_id: DocId, term_freq: u32) {
self.block.append_doc(doc_id, term_freq);
if self.block.is_full() {
self.write_block();
}
}
pub fn close_term(
&mut self,
doc_freq: u32,
output_write: &mut impl std::io::Write,
) -> io::Result<()> {
if !self.block.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
// not a perfect multiple of our block size.
//
// In that case, the remaining part is encoded
// using variable int encoding.
{
let block_encoded = self
.block_encoder
.compress_vint_sorted(self.block.doc_ids(), self.last_doc_id_encoded);
self.postings_write.write_all(block_encoded)?;
}
// ... Idem for term frequencies
if self.term_has_freq {
let block_encoded = self
.block_encoder
.compress_vint_unsorted(self.block.term_freqs());
self.postings_write.write_all(block_encoded)?;
}
self.block.clear();
}
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
let skip_data = self.skip_write.data();
VInt(skip_data.len() as u64).serialize(output_write)?;
output_write.write_all(skip_data)?;
}
output_write.write_all(&self.postings_write[..])?;
self.skip_write.clear();
self.postings_write.clear();
self.bm25_weight = None;
Ok(())
}
fn clear(&mut self) {
self.block.clear();
self.last_doc_id_encoded = 0;
}
}

View File

@@ -1,448 +0,0 @@
use crate::directory::OwnedBytes;
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
use crate::query::Bm25Weight;
use crate::schema::IndexRecordOption;
use crate::{DocId, Score, TERMINATED};
// doc num bits uses the following encoding:
// given 0b a b cdefgh
// |1|2|3| 4 |
// - 1: unused
// - 2: is delta-1 encoded. 0 if not, 1, if yes
// - 3: unused
// - 4: a 5 bit number in 0..32, the actual bitwidth. Bitpacking could in theory say this is 32
// (requiring a 6th bit), but the biggest doc_id we can want to encode is TERMINATED-1, which can
// be represented on 31b without delta encoding.
fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 {
assert!(bitwidth < 32);
bitwidth | ((delta_1 as u8) << 6)
}
fn decode_bitwidth(raw_bitwidth: u8) -> (u8, bool) {
let delta_1 = ((raw_bitwidth >> 6) & 1) != 0;
let bitwidth = raw_bitwidth & 0x1f;
(bitwidth, delta_1)
}
#[inline]
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
max_tf.min(u8::MAX as u32) as u8
}
#[inline]
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
if max_tf_code == u8::MAX {
u32::MAX
} else {
max_tf_code as u32
}
}
#[inline]
fn read_u32(data: &[u8]) -> u32 {
u32::from_le_bytes(data[..4].try_into().unwrap())
}
#[inline]
fn write_u32(val: u32, buf: &mut Vec<u8>) {
buf.extend_from_slice(&val.to_le_bytes());
}
pub struct SkipSerializer {
buffer: Vec<u8>,
}
impl SkipSerializer {
pub fn new() -> SkipSerializer {
SkipSerializer { buffer: Vec::new() }
}
pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) {
write_u32(last_doc, &mut self.buffer);
self.buffer.push(encode_bitwidth(doc_num_bits, true));
}
pub fn write_term_freq(&mut self, tf_num_bits: u8) {
self.buffer.push(tf_num_bits);
}
pub fn write_total_term_freq(&mut self, tf_sum: u32) {
write_u32(tf_sum, &mut self.buffer);
}
pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) {
let block_wand_tf = encode_block_wand_max_tf(term_freq);
self.buffer
.extend_from_slice(&[fieldnorm_id, block_wand_tf]);
}
pub fn data(&self) -> &[u8] {
&self.buffer[..]
}
pub fn clear(&mut self) {
self.buffer.clear();
}
}
#[derive(Clone)]
pub(crate) struct SkipReader {
last_doc_in_block: DocId,
pub(crate) last_doc_in_previous_block: DocId,
owned_read: OwnedBytes,
skip_info: IndexRecordOption,
byte_offset: usize,
remaining_docs: u32, // number of docs remaining, including the
// documents in the current block.
block_info: BlockInfo,
position_offset: u64,
}
#[derive(Clone, Eq, PartialEq, Copy, Debug)]
pub(crate) enum BlockInfo {
BitPacked {
doc_num_bits: u8,
strict_delta_encoded: bool,
tf_num_bits: u8,
tf_sum: u32,
block_wand_fieldnorm_id: u8,
block_wand_term_freq: u32,
},
VInt {
num_docs: u32,
},
}
impl Default for BlockInfo {
fn default() -> Self {
BlockInfo::VInt { num_docs: 0u32 }
}
}
impl SkipReader {
pub fn new(data: OwnedBytes, doc_freq: u32, skip_info: IndexRecordOption) -> SkipReader {
let mut skip_reader = SkipReader {
last_doc_in_block: if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0
} else {
TERMINATED
},
last_doc_in_previous_block: 0u32,
owned_read: data,
skip_info,
block_info: BlockInfo::VInt { num_docs: doc_freq },
byte_offset: 0,
remaining_docs: doc_freq,
position_offset: 0u64,
};
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
skip_reader.read_block_info();
}
skip_reader
}
pub fn reset(&mut self, data: OwnedBytes, doc_freq: u32) {
self.last_doc_in_block = if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0
} else {
TERMINATED
};
self.last_doc_in_previous_block = 0u32;
self.owned_read = data;
self.block_info = BlockInfo::VInt { num_docs: doc_freq };
self.byte_offset = 0;
self.remaining_docs = doc_freq;
self.position_offset = 0u64;
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
self.read_block_info();
}
}
// Returns the block max score for this block if available.
//
// The block max score is available for all full bitpacked block,
// but no available for the last VInt encoded incomplete block.
pub fn block_max_score(&self, bm25_weight: &Bm25Weight) -> Option<Score> {
match self.block_info {
BlockInfo::BitPacked {
block_wand_fieldnorm_id,
block_wand_term_freq,
..
} => Some(bm25_weight.score(block_wand_fieldnorm_id, block_wand_term_freq)),
BlockInfo::VInt { .. } => None,
}
}
pub(crate) fn last_doc_in_block(&self) -> DocId {
self.last_doc_in_block
}
pub fn position_offset(&self) -> u64 {
self.position_offset
}
#[inline]
pub fn byte_offset(&self) -> usize {
self.byte_offset
}
fn read_block_info(&mut self) {
let bytes = self.owned_read.as_slice();
let advance_len: usize;
self.last_doc_in_block = read_u32(bytes);
let (doc_num_bits, strict_delta_encoded) = decode_bitwidth(bytes[4]);
match self.skip_info {
IndexRecordOption::Basic => {
advance_len = 5;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits: 0,
tf_sum: 0,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0,
};
}
IndexRecordOption::WithFreqs => {
let tf_num_bits = bytes[5];
let block_wand_fieldnorm_id = bytes[6];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
advance_len = 8;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
tf_sum: 0,
block_wand_fieldnorm_id,
block_wand_term_freq,
};
}
IndexRecordOption::WithFreqsAndPositions => {
let tf_num_bits = bytes[5];
let tf_sum = read_u32(&bytes[6..10]);
let block_wand_fieldnorm_id = bytes[10];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
advance_len = 12;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
tf_sum,
block_wand_fieldnorm_id,
block_wand_term_freq,
};
}
}
self.owned_read.advance(advance_len);
}
pub fn block_info(&self) -> BlockInfo {
self.block_info
}
/// Advance the skip reader to the block that may contain the target.
///
/// If the target is larger than all documents, the skip_reader
/// then advance to the last Variable In block.
pub fn seek(&mut self, target: DocId) -> bool {
if self.last_doc_in_block() >= target {
return false;
}
loop {
self.advance();
if self.last_doc_in_block() >= target {
return true;
}
}
}
pub fn advance(&mut self) {
match self.block_info {
BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
tf_sum,
..
} => {
self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32;
self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits);
self.position_offset += tf_sum as u64;
}
BlockInfo::VInt { num_docs } => {
debug_assert_eq!(num_docs, self.remaining_docs);
self.remaining_docs = 0;
self.byte_offset = usize::MAX;
}
}
self.last_doc_in_previous_block = self.last_doc_in_block;
if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 {
self.read_block_info();
} else {
self.last_doc_in_block = TERMINATED;
self.block_info = BlockInfo::VInt {
num_docs: self.remaining_docs,
};
}
}
}
#[cfg(test)]
mod tests {
use super::{
decode_bitwidth, encode_bitwidth, BlockInfo, IndexRecordOption, SkipReader, SkipSerializer,
};
use crate::directory::OwnedBytes;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
#[test]
fn test_encode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8);
}
for &tf in &[255, 256, 1_000_000, u32::MAX] {
assert_eq!(super::encode_block_wand_max_tf(tf), 255);
}
}
#[test]
fn test_decode_block_wand_max_tf() {
for tf in 0..255 {
assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32);
}
assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX);
}
#[test]
fn test_skip_with_freq() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.write_term_freq(3u8);
skip_serializer.write_blockwand_max(13u8, 3u32);
skip_serializer.write_doc(5u32, 5u8);
skip_serializer.write_term_freq(2u8);
skip_serializer.write_blockwand_max(8u8, 2u32);
skip_serializer.data().to_owned()
};
let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32;
let mut skip_reader =
SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::WithFreqs);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info,
BlockInfo::BitPacked {
doc_num_bits: 2u8,
strict_delta_encoded: true,
tf_num_bits: 3u8,
tf_sum: 0,
block_wand_fieldnorm_id: 13,
block_wand_term_freq: 3
}
);
skip_reader.advance();
assert_eq!(skip_reader.last_doc_in_block(), 5u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 5u8,
strict_delta_encoded: true,
tf_num_bits: 2u8,
tf_sum: 0,
block_wand_fieldnorm_id: 8,
block_wand_term_freq: 2
}
);
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
}
#[test]
fn test_skip_no_freq() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.write_doc(5u32, 5u8);
skip_serializer.data().to_owned()
};
let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32;
let mut skip_reader =
SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::Basic);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 2u8,
strict_delta_encoded: true,
tf_num_bits: 0,
tf_sum: 0u32,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0
}
);
skip_reader.advance();
assert_eq!(skip_reader.last_doc_in_block(), 5u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 5u8,
strict_delta_encoded: true,
tf_num_bits: 0,
tf_sum: 0u32,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0
}
);
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
}
#[test]
fn test_skip_multiple_of_block_size() {
let buf = {
let mut skip_serializer = SkipSerializer::new();
skip_serializer.write_doc(1u32, 2u8);
skip_serializer.data().to_owned()
};
let doc_freq = COMPRESSION_BLOCK_SIZE as u32;
let mut skip_reader =
SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::Basic);
assert_eq!(skip_reader.last_doc_in_block(), 1u32);
assert_eq!(
skip_reader.block_info(),
BlockInfo::BitPacked {
doc_num_bits: 2u8,
strict_delta_encoded: true,
tf_num_bits: 0,
tf_sum: 0u32,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0
}
);
skip_reader.advance();
assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 });
}
#[test]
fn test_encode_decode_bitwidth() {
for bitwidth in 0..32 {
for delta_1 in [false, true] {
assert_eq!(
(bitwidth, delta_1),
decode_bitwidth(encode_bitwidth(bitwidth, delta_1))
);
}
}
assert_eq!(0b01000010, encode_bitwidth(0b10, true));
assert_eq!(0b00000010, encode_bitwidth(0b10, false));
}
}

View File

@@ -10,7 +10,7 @@ use crate::postings::TermInfo;
use crate::query::{BitSetDocSet, ConstScorer, Explanation, Scorer, Weight};
use crate::schema::{Field, IndexRecordOption};
use crate::termdict::{TermDictionary, TermStreamer};
use crate::{DocId, Score, TantivyError};
use crate::{DocId, DocSet, Score, TantivyError};
/// A weight struct for Fuzzy Term and Regex Queries
pub struct AutomatonWeight<A> {
@@ -92,18 +92,9 @@ where
let mut term_stream = self.automaton_stream(term_dict)?;
while term_stream.advance() {
let term_info = term_stream.value();
let mut block_segment_postings = inverted_index
.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic)?;
loop {
let docs = block_segment_postings.docs();
if docs.is_empty() {
break;
}
for &doc in docs {
doc_bitset.insert(doc);
}
block_segment_postings.advance();
}
let mut block_segment_postings =
inverted_index.read_postings_from_terminfo(term_info, IndexRecordOption::Basic)?;
block_segment_postings.fill_bitset(&mut doc_bitset);
}
let doc_bitset = BitSetDocSet::from(doc_bitset);
let const_scorer = ConstScorer::new(doc_bitset, boost);

View File

@@ -4,143 +4,143 @@ use crate::query::term_query::TermScorer;
use crate::query::Scorer;
use crate::{DocId, DocSet, Score, TERMINATED};
/// Takes a term_scorers sorted by their current doc() and a threshold and returns
/// Returns (pivot_len, pivot_ord) defined as follows:
/// - `pivot_doc` lowest document that has a chance of exceeding (>) the threshold score.
/// - `before_pivot_len` number of term_scorers such that term_scorer.doc() < pivot.
/// - `pivot_len` number of term_scorers such that term_scorer.doc() <= pivot.
///
/// We always have `before_pivot_len` < `pivot_len`.
///
/// `None` is returned if we establish that no document can exceed the threshold.
fn find_pivot_doc(
term_scorers: &[TermScorerWithMaxScore],
threshold: Score,
) -> Option<(usize, usize, DocId)> {
let mut max_score = 0.0;
let mut before_pivot_len = 0;
let mut pivot_doc = TERMINATED;
while before_pivot_len < term_scorers.len() {
let term_scorer = &term_scorers[before_pivot_len];
max_score += term_scorer.max_score;
if max_score > threshold {
pivot_doc = term_scorer.doc();
break;
}
before_pivot_len += 1;
}
if pivot_doc == TERMINATED {
return None;
}
// Right now i is an ordinal, we want a len.
let mut pivot_len = before_pivot_len + 1;
// Some other term_scorer may be positioned on the same document.
pivot_len += term_scorers[pivot_len..]
.iter()
.take_while(|term_scorer| term_scorer.doc() == pivot_doc)
.count();
Some((before_pivot_len, pivot_len, pivot_doc))
}
// /// Takes a term_scorers sorted by their current doc() and a threshold and returns
// /// Returns (pivot_len, pivot_ord) defined as follows:
// /// - `pivot_doc` lowest document that has a chance of exceeding (>) the threshold score.
// /// - `before_pivot_len` number of term_scorers such that term_scorer.doc() < pivot.
// /// - `pivot_len` number of term_scorers such that term_scorer.doc() <= pivot.
// ///
// /// We always have `before_pivot_len` < `pivot_len`.
// ///
// /// `None` is returned if we establish that no document can exceed the threshold.
// fn find_pivot_doc(
// term_scorers: &[TermScorerWithMaxScore],
// threshold: Score,
// ) -> Option<(usize, usize, DocId)> {
// let mut max_score = 0.0;
// let mut before_pivot_len = 0;
// let mut pivot_doc = TERMINATED;
// while before_pivot_len < term_scorers.len() {
// let term_scorer = &term_scorers[before_pivot_len];
// max_score += term_scorer.max_score;
// if max_score > threshold {
// pivot_doc = term_scorer.doc();
// break;
// }
// before_pivot_len += 1;
// }
// if pivot_doc == TERMINATED {
// return None;
// }
// // Right now i is an ordinal, we want a len.
// let mut pivot_len = before_pivot_len + 1;
// // Some other term_scorer may be positioned on the same document.
// pivot_len += term_scorers[pivot_len..]
// .iter()
// .take_while(|term_scorer| term_scorer.doc() == pivot_doc)
// .count();
// Some((before_pivot_len, pivot_len, pivot_doc))
// }
/// Advance the scorer with best score among the scorers[..pivot_len] to
/// the next doc candidate defined by the min of `last_doc_in_block + 1` for
/// scorer in scorers[..pivot_len] and `scorer.doc()` for scorer in scorers[pivot_len..].
/// Note: before and after calling this method, scorers need to be sorted by their `.doc()`.
fn block_max_was_too_low_advance_one_scorer(
scorers: &mut [TermScorerWithMaxScore],
pivot_len: usize,
) {
debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
let mut scorer_to_seek = pivot_len - 1;
let mut global_max_score = scorers[scorer_to_seek].max_score;
let mut doc_to_seek_after = scorers[scorer_to_seek].last_doc_in_block();
for scorer_ord in (0..pivot_len - 1).rev() {
let scorer = &scorers[scorer_ord];
if scorer.last_doc_in_block() <= doc_to_seek_after {
doc_to_seek_after = scorer.last_doc_in_block();
}
if scorers[scorer_ord].max_score > global_max_score {
global_max_score = scorers[scorer_ord].max_score;
scorer_to_seek = scorer_ord;
}
}
// Add +1 to go to the next block unless we are already at the end.
if doc_to_seek_after != TERMINATED {
doc_to_seek_after += 1;
}
for scorer in &scorers[pivot_len..] {
if scorer.doc() <= doc_to_seek_after {
doc_to_seek_after = scorer.doc();
}
}
scorers[scorer_to_seek].seek(doc_to_seek_after);
// /// Advance the scorer with best score among the scorers[..pivot_len] to
// /// the next doc candidate defined by the min of `last_doc_in_block + 1` for
// /// scorer in scorers[..pivot_len] and `scorer.doc()` for scorer in scorers[pivot_len..].
// /// Note: before and after calling this method, scorers need to be sorted by their `.doc()`.
// fn block_max_was_too_low_advance_one_scorer(
// scorers: &mut [TermScorerWithMaxScore],
// pivot_len: usize,
// ) {
// debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
// let mut scorer_to_seek = pivot_len - 1;
// let mut global_max_score = scorers[scorer_to_seek].max_score;
// let mut doc_to_seek_after = scorers[scorer_to_seek].last_doc_in_block();
// for scorer_ord in (0..pivot_len - 1).rev() {
// let scorer = &scorers[scorer_ord];
// if scorer.last_doc_in_block() <= doc_to_seek_after {
// doc_to_seek_after = scorer.last_doc_in_block();
// }
// if scorers[scorer_ord].max_score > global_max_score {
// global_max_score = scorers[scorer_ord].max_score;
// scorer_to_seek = scorer_ord;
// }
// }
// // Add +1 to go to the next block unless we are already at the end.
// if doc_to_seek_after != TERMINATED {
// doc_to_seek_after += 1;
// }
// for scorer in &scorers[pivot_len..] {
// if scorer.doc() <= doc_to_seek_after {
// doc_to_seek_after = scorer.doc();
// }
// }
// scorers[scorer_to_seek].seek(doc_to_seek_after);
restore_ordering(scorers, scorer_to_seek);
debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
}
// restore_ordering(scorers, scorer_to_seek);
// debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
// }
// Given a list of term_scorers and a `ord` and assuming that `term_scorers[ord]` is sorted
// except term_scorers[ord] that might be in advance compared to its ranks,
// bubble up term_scorers[ord] in order to restore the ordering.
fn restore_ordering(term_scorers: &mut [TermScorerWithMaxScore], ord: usize) {
let doc = term_scorers[ord].doc();
for i in ord + 1..term_scorers.len() {
if term_scorers[i].doc() >= doc {
break;
}
term_scorers.swap(i, i - 1);
}
debug_assert!(is_sorted(term_scorers.iter().map(|scorer| scorer.doc())));
}
// // Given a list of term_scorers and a `ord` and assuming that `term_scorers[ord]` is sorted
// // except term_scorers[ord] that might be in advance compared to its ranks,
// // bubble up term_scorers[ord] in order to restore the ordering.
// fn restore_ordering(term_scorers: &mut [TermScorerWithMaxScore], ord: usize) {
// let doc = term_scorers[ord].doc();
// for i in ord + 1..term_scorers.len() {
// if term_scorers[i].doc() >= doc {
// break;
// }
// term_scorers.swap(i, i - 1);
// }
// debug_assert!(is_sorted(term_scorers.iter().map(|scorer| scorer.doc())));
// }
// Attempts to advance all term_scorers between `&term_scorers[0..before_len]` to the pivot.
// If this works, return true.
// If this fails (ie: one of the term_scorer does not contain `pivot_doc` and seek goes past the
// pivot), reorder the term_scorers to ensure the list is still sorted and returns `false`.
// If a term_scorer reach TERMINATED in the process return false remove the term_scorer and return.
fn align_scorers(
term_scorers: &mut Vec<TermScorerWithMaxScore>,
pivot_doc: DocId,
before_pivot_len: usize,
) -> bool {
debug_assert_ne!(pivot_doc, TERMINATED);
for i in (0..before_pivot_len).rev() {
let new_doc = term_scorers[i].seek(pivot_doc);
if new_doc != pivot_doc {
if new_doc == TERMINATED {
term_scorers.swap_remove(i);
}
// We went past the pivot.
// We just go through the outer loop mechanic (Note that pivot is
// still a possible candidate).
//
// Termination is still guaranteed since we can only consider the same
// pivot at most term_scorers.len() - 1 times.
restore_ordering(term_scorers, i);
return false;
}
}
true
}
// // Attempts to advance all term_scorers between `&term_scorers[0..before_len]` to the pivot.
// // If this works, return true.
// // If this fails (ie: one of the term_scorer does not contain `pivot_doc` and seek goes past the
// // pivot), reorder the term_scorers to ensure the list is still sorted and returns `false`.
// // If a term_scorer reach TERMINATED in the process return false remove the term_scorer and
// return. fn align_scorers(
// term_scorers: &mut Vec<TermScorerWithMaxScore>,
// pivot_doc: DocId,
// before_pivot_len: usize,
// ) -> bool {
// debug_assert_ne!(pivot_doc, TERMINATED);
// for i in (0..before_pivot_len).rev() {
// let new_doc = term_scorers[i].seek(pivot_doc);
// if new_doc != pivot_doc {
// if new_doc == TERMINATED {
// term_scorers.swap_remove(i);
// }
// // We went past the pivot.
// // We just go through the outer loop mechanic (Note that pivot is
// // still a possible candidate).
// //
// // Termination is still guaranteed since we can only consider the same
// // pivot at most term_scorers.len() - 1 times.
// restore_ordering(term_scorers, i);
// return false;
// }
// }
// true
// }
// Assumes terms_scorers[..pivot_len] are positioned on the same doc (pivot_doc).
// Advance term_scorers[..pivot_len] and out of these removes the terminated scores.
// Restores the ordering of term_scorers.
fn advance_all_scorers_on_pivot(term_scorers: &mut Vec<TermScorerWithMaxScore>, pivot_len: usize) {
for term_scorer in &mut term_scorers[..pivot_len] {
term_scorer.advance();
}
// TODO use drain_filter when available.
let mut i = 0;
while i != term_scorers.len() {
if term_scorers[i].doc() == TERMINATED {
term_scorers.swap_remove(i);
} else {
i += 1;
}
}
term_scorers.sort_by_key(|scorer| scorer.doc());
}
// // Assumes terms_scorers[..pivot_len] are positioned on the same doc (pivot_doc).
// // Advance term_scorers[..pivot_len] and out of these removes the terminated scores.
// // Restores the ordering of term_scorers.
// fn advance_all_scorers_on_pivot(term_scorers: &mut Vec<TermScorerWithMaxScore>, pivot_len: usize)
// { for term_scorer in &mut term_scorers[..pivot_len] {
// term_scorer.advance();
// }
// // TODO use drain_filter when available.
// let mut i = 0;
// while i != term_scorers.len() {
// if term_scorers[i].doc() == TERMINATED {
// term_scorers.swap_remove(i);
// } else {
// i += 1;
// }
// }
// term_scorers.sort_by_key(|scorer| scorer.doc());
// }
/// Implements the WAND (Weak AND) algorithm for dynamic pruning
/// described in the paper "Faster Top-k Document Retrieval Using Block-Max Indexes".
@@ -150,65 +150,63 @@ pub fn block_wand(
mut threshold: Score,
callback: &mut dyn FnMut(u32, Score) -> Score,
) {
let mut scorers: Vec<TermScorerWithMaxScore> = scorers
.iter_mut()
.map(TermScorerWithMaxScore::from)
.collect();
scorers.sort_by_key(|scorer| scorer.doc());
// At this point we need to ensure that the scorers are sorted!
debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
while let Some((before_pivot_len, pivot_len, pivot_doc)) =
find_pivot_doc(&scorers[..], threshold)
{
debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
debug_assert_ne!(pivot_doc, TERMINATED);
debug_assert!(before_pivot_len < pivot_len);
// let mut scorers: Vec<TermScorerWithMaxScore> = scorers
// .iter_mut()
// .map(TermScorerWithMaxScore::from)
// .collect();
// scorers.sort_by_key(|scorer| scorer.doc());
// // At this point we need to ensure that the scorers are sorted!
// debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
// while let Some((before_pivot_len, pivot_len, pivot_doc)) =
// find_pivot_doc(&scorers[..], threshold)
// {
// debug_assert!(is_sorted(scorers.iter().map(|scorer| scorer.doc())));
// debug_assert_ne!(pivot_doc, TERMINATED);
// debug_assert!(before_pivot_len < pivot_len);
let block_max_score_upperbound: Score = scorers[..pivot_len]
.iter_mut()
.map(|scorer| {
scorer.seek_block(pivot_doc);
scorer.block_max_score()
})
.sum();
// let block_max_score_upperbound: Score = scorers[..pivot_len]
// .iter_mut()
// .map(|scorer| scorer.seek_block(pivot_doc))
// .sum();
// Beware after shallow advance, skip readers can be in advance compared to
// the segment posting lists.
//
// `block_segment_postings.load_block()` need to be called separately.
if block_max_score_upperbound <= threshold {
// Block max condition was not reached
// We could get away by simply advancing the scorers to DocId + 1 but it would
// be inefficient. The optimization requires proper explanation and was
// isolated in a different function.
block_max_was_too_low_advance_one_scorer(&mut scorers, pivot_len);
continue;
}
// // Beware after shallow advance, skip readers can be in advance compared to
// // the segment posting lists.
// //
// // `block_segment_postings.load_block()` need to be called separately.
// if block_max_score_upperbound <= threshold {
// // Block max condition was not reached
// // We could get away by simply advancing the scorers to DocId + 1 but it would
// // be inefficient. The optimization requires proper explanation and was
// // isolated in a different function.
// block_max_was_too_low_advance_one_scorer(&mut scorers, pivot_len);
// continue;
// }
// Block max condition is observed.
//
// Let's try and advance all scorers before the pivot to the pivot.
if !align_scorers(&mut scorers, pivot_doc, before_pivot_len) {
// At least of the scorer does not contain the pivot.
//
// Let's stop scoring this pivot and go through the pivot selection again.
// Note that the current pivot is not necessarily a bad candidate and it
// may be picked again.
continue;
}
// // Block max condition is observed.
// //
// // Let's try and advance all scorers before the pivot to the pivot.
// if !align_scorers(&mut scorers, pivot_doc, before_pivot_len) {
// // At least of the scorer does not contain the pivot.
// //
// // Let's stop scoring this pivot and go through the pivot selection again.
// // Note that the current pivot is not necessarily a bad candidate and it
// // may be picked again.
// continue;
// }
// At this point, all scorers are positioned on the doc.
let score = scorers[..pivot_len]
.iter_mut()
.map(|scorer| scorer.score())
.sum();
// // At this point, all scorers are positioned on the doc.
// let score = scorers[..pivot_len]
// .iter_mut()
// .map(|scorer| scorer.score())
// .sum();
if score > threshold {
threshold = callback(pivot_doc, score);
}
// let's advance all of the scorers that are currently positioned on the pivot.
advance_all_scorers_on_pivot(&mut scorers, pivot_len);
}
// if score > threshold {
// threshold = callback(pivot_doc, score);
// }
// // let's advance all of the scorers that are currently positioned on the pivot.
// advance_all_scorers_on_pivot(&mut scorers, pivot_len);
// }
todo!();
}
/// Specialized version of [`block_wand`] for a single scorer.
@@ -224,40 +222,42 @@ pub fn block_wand_single_scorer(
mut threshold: Score,
callback: &mut dyn FnMut(u32, Score) -> Score,
) {
let mut doc = scorer.doc();
loop {
// We position the scorer on a block that can reach
// the threshold.
while scorer.block_max_score() < threshold {
let last_doc_in_block = scorer.last_doc_in_block();
if last_doc_in_block == TERMINATED {
return;
}
doc = last_doc_in_block + 1;
scorer.seek_block(doc);
}
// Seek will effectively load that block.
doc = scorer.seek(doc);
if doc == TERMINATED {
break;
}
loop {
let score = scorer.score();
if score > threshold {
threshold = callback(doc, score);
}
debug_assert!(doc <= scorer.last_doc_in_block());
if doc == scorer.last_doc_in_block() {
break;
}
doc = scorer.advance();
if doc == TERMINATED {
return;
}
}
doc += 1;
scorer.seek_block(doc);
}
todo!();
// let mut doc = scorer.doc();
// let mut block_max_score = scorer.seek_block(doc);
// loop {
// // We position the scorer on a block that can reach
// // the threshold.
// while block_max_score < threshold {
// let last_doc_in_block = scorer.last_doc_in_block();
// if last_doc_in_block == TERMINATED {
// return;
// }
// doc = last_doc_in_block + 1;
// block_max_score = scorer.seek_block(doc);
// }
// // Seek will effectively load that block.
// doc = scorer.seek(doc);
// if doc == TERMINATED {
// break;
// }
// loop {
// let score = scorer.score();
// if score > threshold {
// threshold = callback(doc, score);
// }
// debug_assert!(doc <= scorer.last_doc_in_block());
// if doc == scorer.last_doc_in_block() {
// break;
// }
// doc = scorer.advance();
// if doc == TERMINATED {
// return;
// }
// }
// doc += 1;
// block_max_score = scorer.seek_block(doc);
// }
}
struct TermScorerWithMaxScore<'a> {
@@ -305,6 +305,7 @@ mod tests {
use proptest::prelude::*;
use crate::postings::SegmentPostings;
use crate::query::score_combiner::SumCombiner;
use crate::query::term_query::TermScorer;
use crate::query::{Bm25Weight, BufferedUnionScorer, Scorer};

View File

@@ -24,7 +24,7 @@ mod reqopt_scorer;
mod scorer;
mod set_query;
mod size_hint;
mod term_query;
pub(crate) mod term_query;
mod union;
mod weight;

View File

@@ -1,7 +1,7 @@
use super::{prefix_end, PhrasePrefixScorer};
use crate::fieldnorm::FieldNormReader;
use crate::index::SegmentReader;
use crate::postings::SegmentPostings;
use crate::postings::Postings;
use crate::query::bm25::Bm25Weight;
use crate::query::explanation::does_not_match;
use crate::query::{EmptyScorer, Explanation, Scorer, Weight};
@@ -46,13 +46,13 @@ impl PhrasePrefixWeight {
&self,
reader: &SegmentReader,
boost: Score,
) -> crate::Result<Option<PhrasePrefixScorer<SegmentPostings>>> {
) -> crate::Result<Option<Box<dyn Scorer>>> {
let similarity_weight_opt = self
.similarity_weight_opt
.as_ref()
.map(|similarity_weight| similarity_weight.boost_by(boost));
let fieldnorm_reader = self.fieldnorm_reader(reader)?;
let mut term_postings_list = Vec::new();
let mut term_postings_list: Vec<(usize, Box<dyn Postings>)> = Vec::new();
for &(offset, ref term) in &self.phrase_terms {
if let Some(postings) = reader
.inverted_index(term.field())?
@@ -103,42 +103,44 @@ impl PhrasePrefixWeight {
}
}
Ok(Some(PhrasePrefixScorer::new(
// TODO make this specialized.
Ok(Some(Box::new(PhrasePrefixScorer::new(
term_postings_list,
similarity_weight_opt,
fieldnorm_reader,
suffixes,
self.prefix.0,
)))
))))
}
}
impl Weight for PhrasePrefixWeight {
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
if let Some(scorer) = self.phrase_scorer(reader, boost)? {
Ok(Box::new(scorer))
Ok(scorer)
} else {
Ok(Box::new(EmptyScorer))
}
}
fn explain(&self, reader: &SegmentReader, doc: DocId) -> crate::Result<Explanation> {
let scorer_opt = self.phrase_scorer(reader, 1.0)?;
if scorer_opt.is_none() {
return Err(does_not_match(doc));
}
let mut scorer = scorer_opt.unwrap();
if scorer.seek(doc) != doc {
return Err(does_not_match(doc));
}
let fieldnorm_reader = self.fieldnorm_reader(reader)?;
let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc);
let phrase_count = scorer.phrase_count();
let mut explanation = Explanation::new("Phrase Prefix Scorer", scorer.score());
if let Some(similarity_weight) = self.similarity_weight_opt.as_ref() {
explanation.add_detail(similarity_weight.explain(fieldnorm_id, phrase_count));
}
Ok(explanation)
todo!();
// let scorer_opt = self.phrase_scorer(reader, 1.0)?;
// if scorer_opt.is_none() {
// return Err(does_not_match(doc));
// }
// let mut scorer = scorer_opt.unwrap();
// if scorer.seek(doc) != doc {
// return Err(does_not_match(doc));
// }
// let fieldnorm_reader = self.fieldnorm_reader(reader)?;
// let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc);
// let phrase_count = scorer.phrase_count();
// let mut explanation = Explanation::new("Phrase Prefix Scorer", scorer.score());
// if let Some(similarity_weight) = self.similarity_weight_opt.as_ref() {
// explanation.add_detail(similarity_weight.explain(fieldnorm_id, phrase_count));
// }
// Ok(explanation)
}
}

View File

@@ -1,7 +1,6 @@
use super::PhraseScorer;
use crate::fieldnorm::FieldNormReader;
use crate::index::SegmentReader;
use crate::postings::SegmentPostings;
use crate::query::bm25::Bm25Weight;
use crate::query::explanation::does_not_match;
use crate::query::{EmptyScorer, Explanation, Scorer, Weight};
@@ -43,13 +42,14 @@ impl PhraseWeight {
&self,
reader: &SegmentReader,
boost: Score,
) -> crate::Result<Option<PhraseScorer<SegmentPostings>>> {
) -> crate::Result<Option<Box<dyn Scorer>>> {
let similarity_weight_opt = self
.similarity_weight_opt
.as_ref()
.map(|similarity_weight| similarity_weight.boost_by(boost));
let fieldnorm_reader = self.fieldnorm_reader(reader)?;
let mut term_postings_list = Vec::new();
// TODO make it specialized
for &(offset, ref term) in &self.phrase_terms {
if let Some(postings) = reader
.inverted_index(term.field())?
@@ -60,12 +60,12 @@ impl PhraseWeight {
return Ok(None);
}
}
Ok(Some(PhraseScorer::new(
Ok(Some(Box::new(PhraseScorer::new(
term_postings_list,
similarity_weight_opt,
fieldnorm_reader,
self.slop,
)))
))))
}
pub fn slop(&mut self, slop: u32) {
@@ -76,29 +76,30 @@ impl PhraseWeight {
impl Weight for PhraseWeight {
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
if let Some(scorer) = self.phrase_scorer(reader, boost)? {
Ok(Box::new(scorer))
Ok(scorer)
} else {
Ok(Box::new(EmptyScorer))
}
}
fn explain(&self, reader: &SegmentReader, doc: DocId) -> crate::Result<Explanation> {
let scorer_opt = self.phrase_scorer(reader, 1.0)?;
if scorer_opt.is_none() {
return Err(does_not_match(doc));
}
let mut scorer = scorer_opt.unwrap();
if scorer.seek(doc) != doc {
return Err(does_not_match(doc));
}
let fieldnorm_reader = self.fieldnorm_reader(reader)?;
let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc);
let phrase_count = scorer.phrase_count();
let mut explanation = Explanation::new("Phrase Scorer", scorer.score());
if let Some(similarity_weight) = self.similarity_weight_opt.as_ref() {
explanation.add_detail(similarity_weight.explain(fieldnorm_id, phrase_count));
}
Ok(explanation)
todo!();
// let scorer_opt = self.phrase_scorer(reader, 1.0)?;
// if scorer_opt.is_none() {
// return Err(does_not_match(doc));
// }
// let mut scorer = scorer_opt.unwrap();
// if scorer.seek(doc) != doc {
// return Err(does_not_match(doc));
// }
// let fieldnorm_reader = self.fieldnorm_reader(reader)?;
// let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc);
// let phrase_count = scorer.phrase_count();
// let mut explanation = Explanation::new("Phrase Scorer", scorer.score());
// if let Some(similarity_weight) = self.similarity_weight_opt.as_ref() {
// explanation.add_detail(similarity_weight.explain(fieldnorm_id, phrase_count));
// }
// Ok(explanation)
}
}

View File

@@ -6,7 +6,7 @@ use tantivy_fst::Regex;
use super::PhraseScorer;
use crate::fieldnorm::FieldNormReader;
use crate::index::SegmentReader;
use crate::postings::{LoadedPostings, Postings, SegmentPostings, TermInfo};
use crate::postings::{LoadedPostings, Postings, TermInfo};
use crate::query::bm25::Bm25Weight;
use crate::query::explanation::does_not_match;
use crate::query::union::{BitSetPostingUnion, SimpleUnion};
@@ -103,18 +103,9 @@ impl RegexPhraseWeight {
term_info: &TermInfo,
doc_bitset: &mut BitSet,
) -> crate::Result<()> {
let mut block_segment_postings = inverted_index
.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic)?;
loop {
let docs = block_segment_postings.docs();
if docs.is_empty() {
break;
}
for &doc in docs {
doc_bitset.insert(doc);
}
block_segment_postings.advance();
}
let mut segment_postings =
inverted_index.read_postings_from_terminfo(term_info, IndexRecordOption::Basic)?;
segment_postings.fill_bitset(doc_bitset);
Ok(())
}
@@ -188,7 +179,7 @@ impl RegexPhraseWeight {
// - Bucket 1: Terms appearing in 0.1% to 1% of documents
// - Bucket 2: Terms appearing in 1% to 10% of documents
// - Bucket 3: Terms appearing in more than 10% of documents
let mut buckets: Vec<(BitSet, Vec<SegmentPostings>)> = (0..4)
let mut buckets: Vec<(BitSet, Vec<Box<dyn Postings>>)> = (0..4)
.map(|_| (BitSet::with_max_value(max_doc), Vec::new()))
.collect();

View File

@@ -11,7 +11,7 @@ use crate::query::range_query::is_type_valid_for_fastfield_range_query;
use crate::query::{BitSetDocSet, ConstScorer, EnableScoring, Explanation, Query, Scorer, Weight};
use crate::schema::{Field, IndexRecordOption, Term, Type};
use crate::termdict::{TermDictionary, TermStreamer};
use crate::{DocId, Score};
use crate::{DocId, DocSet, Score};
/// `RangeQuery` matches all documents that have at least one term within a defined range.
///
@@ -228,18 +228,9 @@ impl Weight for InvertedIndexRangeWeight {
}
processed_count += 1;
let term_info = term_range.value();
let mut block_segment_postings = inverted_index
.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic)?;
loop {
let docs = block_segment_postings.docs();
if docs.is_empty() {
break;
}
for &doc in block_segment_postings.docs() {
doc_bitset.insert(doc);
}
block_segment_postings.advance();
}
let mut postings =
inverted_index.read_postings_from_terminfo(term_info, IndexRecordOption::Basic)?;
postings.fill_bitset(&mut doc_bitset);
}
let doc_bitset = BitSetDocSet::from(doc_bitset);
Ok(Box::new(ConstScorer::new(doc_bitset, boost)))

View File

@@ -4,6 +4,7 @@ mod term_weight;
pub use self::term_query::TermQuery;
pub use self::term_scorer::TermScorer;
#[cfg(test)]
mod tests {

View File

@@ -1,23 +1,27 @@
use crate::codec::postings::PostingsCodec;
use crate::codec::{Codec, StandardCodec};
use crate::docset::DocSet;
use crate::fieldnorm::FieldNormReader;
use crate::postings::{FreqReadingOption, Postings, SegmentPostings};
use crate::postings::{FreqReadingOption, Postings};
use crate::query::bm25::Bm25Weight;
use crate::query::{Explanation, Scorer};
use crate::{DocId, Score};
#[derive(Clone)]
pub struct TermScorer {
postings: SegmentPostings,
pub struct TermScorer<
TPostings: Postings = <<StandardCodec as Codec>::PostingsCodec as PostingsCodec>::Postings,
> {
postings: TPostings,
fieldnorm_reader: FieldNormReader,
similarity_weight: Bm25Weight,
}
impl TermScorer {
impl<TPostings: Postings> TermScorer<TPostings> {
pub fn new(
postings: SegmentPostings,
postings: TPostings,
fieldnorm_reader: FieldNormReader,
similarity_weight: Bm25Weight,
) -> TermScorer {
) -> TermScorer<TPostings> {
TermScorer {
postings,
fieldnorm_reader,
@@ -25,53 +29,9 @@ impl TermScorer {
}
}
pub(crate) fn seek_block(&mut self, target_doc: DocId) {
self.postings.block_cursor.seek_block(target_doc);
}
#[cfg(test)]
pub fn create_for_test(
doc_and_tfs: &[(DocId, u32)],
fieldnorms: &[u32],
similarity_weight: Bm25Weight,
) -> TermScorer {
assert!(!doc_and_tfs.is_empty());
assert!(
doc_and_tfs
.iter()
.map(|(doc, _tf)| *doc)
.max()
.unwrap_or(0u32)
< fieldnorms.len() as u32
);
let segment_postings =
SegmentPostings::create_from_docs_and_tfs(doc_and_tfs, Some(fieldnorms));
let fieldnorm_reader = FieldNormReader::for_test(fieldnorms);
TermScorer::new(segment_postings, fieldnorm_reader, similarity_weight)
}
/// See `FreqReadingOption`.
pub(crate) fn freq_reading_option(&self) -> FreqReadingOption {
self.postings.block_cursor.freq_reading_option()
}
/// Returns the maximum score for the current block.
///
/// In some rare case, the result may not be exact. In this case a lower value is returned,
/// (and may lead us to return a lesser document).
///
/// At index time, we store the (fieldnorm_id, term frequency) pair that maximizes the
/// score assuming the average fieldnorm computed on this segment.
///
/// Though extremely rare, it is theoretically possible that the actual average fieldnorm
/// is different enough from the current segment average fieldnorm that the maximum over a
/// specific is achieved on a different document.
///
/// (The result is on the other hand guaranteed to be correct if there is only one segment).
pub fn block_max_score(&mut self) -> Score {
self.postings
.block_cursor
.block_max_score(&self.fieldnorm_reader, &self.similarity_weight)
self.postings.freq_reading_option()
}
pub fn term_freq(&self) -> u32 {
@@ -93,11 +53,39 @@ impl TermScorer {
}
pub fn last_doc_in_block(&self) -> DocId {
self.postings.block_cursor.skip_reader().last_doc_in_block()
self.postings.last_doc_in_block()
}
pub(crate) fn seek_block(&mut self, target_doc: DocId) -> Score {
self.postings
.seek_block(target_doc, &self.fieldnorm_reader, &self.similarity_weight)
}
}
impl DocSet for TermScorer {
impl TermScorer {
#[cfg(test)]
pub fn create_for_test(
doc_and_tfs: &[(DocId, u32)],
fieldnorms: &[u32],
similarity_weight: Bm25Weight,
) -> TermScorer {
assert!(!doc_and_tfs.is_empty());
assert!(
doc_and_tfs
.iter()
.map(|(doc, _tf)| *doc)
.max()
.unwrap_or(0u32)
< fieldnorms.len() as u32
);
let segment_postings: SegmentPostings =
SegmentPostings::create_from_docs_and_tfs(doc_and_tfs, Some(fieldnorms));
let fieldnorm_reader = FieldNormReader::for_test(fieldnorms);
TermScorer::new(segment_postings, fieldnorm_reader, similarity_weight)
}
}
impl<TPostings: Postings> DocSet for TermScorer<TPostings> {
#[inline]
fn advance(&mut self) -> DocId {
self.postings.advance()
@@ -118,7 +106,7 @@ impl DocSet for TermScorer {
}
}
impl Scorer for TermScorer {
impl<TPostings: Postings> Scorer for TermScorer<TPostings> {
#[inline]
fn score(&mut self) -> Score {
let fieldnorm_id = self.fieldnorm_id();
@@ -154,7 +142,7 @@ mod tests {
crate::assert_nearly_equals!(max_scorer, 1.3990127);
assert_eq!(term_scorer.doc(), 2);
assert_eq!(term_scorer.term_freq(), 3);
assert_nearly_equals!(term_scorer.block_max_score(), 1.3676447);
assert_nearly_equals!(term_scorer.seek_block(2), 1.3676447);
assert_nearly_equals!(term_scorer.score(), 1.0892314);
assert_eq!(term_scorer.advance(), 3);
assert_eq!(term_scorer.doc(), 3);
@@ -217,7 +205,7 @@ mod tests {
let docs: Vec<DocId> = (0..term_doc_freq).map(|doc| doc as DocId).collect();
for block in docs.chunks(COMPRESSION_BLOCK_SIZE) {
let block_max_score: Score = term_scorer.block_max_score();
let block_max_score: Score = term_scorer.seek_block(0);
let mut block_max_score_computed: Score = 0.0;
for &doc in block {
assert_eq!(term_scorer.doc(), doc);
@@ -245,14 +233,12 @@ mod tests {
let fieldnorms: Vec<u32> = std::iter::repeat_n(20u32, 300).collect();
let bm25_weight = Bm25Weight::for_one_term(10, 129, 20.0);
let mut docs = TermScorer::create_for_test(&doc_tfs[..], &fieldnorms[..], bm25_weight);
assert_nearly_equals!(docs.block_max_score(), 2.5161593);
docs.seek_block(135);
assert_nearly_equals!(docs.block_max_score(), 3.4597192);
docs.seek_block(256);
assert_nearly_equals!(docs.seek_block(0), 2.5161593);
assert_nearly_equals!(docs.seek_block(135), 3.4597192);
// the block is not loaded yet.
assert_nearly_equals!(docs.block_max_score(), 5.2971773);
assert_nearly_equals!(docs.seek_block(256), 5.2971773);
assert_eq!(256, docs.seek(256));
assert_nearly_equals!(docs.block_max_score(), 3.9539647);
assert_nearly_equals!(docs.seek_block(256), 3.9539647);
}
fn test_block_wand_aux(term_query: &TermQuery, searcher: &Searcher) -> crate::Result<()> {
@@ -279,8 +265,8 @@ mod tests {
{
let mut term_scorer = term_weight.term_scorer_for_test(reader, 1.0)?.unwrap();
for d in docs {
term_scorer.seek_block(d);
block_max_scores_b.push(term_scorer.block_max_score());
let block_max_score = term_scorer.seek_block(d);
block_max_scores_b.push(block_max_score);
}
}
for (l, r) in block_max_scores

View File

@@ -2,7 +2,7 @@ use super::term_scorer::TermScorer;
use crate::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN};
use crate::fieldnorm::FieldNormReader;
use crate::index::SegmentReader;
use crate::postings::SegmentPostings;
use crate::postings::Postings;
use crate::query::bm25::Bm25Weight;
use crate::query::explanation::does_not_match;
use crate::query::weight::{for_each_docset_buffered, for_each_scorer};
@@ -18,7 +18,7 @@ pub struct TermWeight {
}
enum TermOrEmptyOrAllScorer {
TermScorer(Box<TermScorer>),
TermScorer(Box<dyn Scorer>),
Empty,
AllMatch(AllScorer),
}
@@ -39,18 +39,19 @@ impl Weight for TermWeight {
}
fn explain(&self, reader: &SegmentReader, doc: DocId) -> crate::Result<Explanation> {
match self.specialized_scorer(reader, 1.0)? {
TermOrEmptyOrAllScorer::TermScorer(mut term_scorer) => {
if term_scorer.doc() > doc || term_scorer.seek(doc) != doc {
return Err(does_not_match(doc));
}
let mut explanation = term_scorer.explain();
explanation.add_context(format!("Term={:?}", self.term,));
Ok(explanation)
}
TermOrEmptyOrAllScorer::Empty => Err(does_not_match(doc)),
TermOrEmptyOrAllScorer::AllMatch(_) => AllWeight.explain(reader, doc),
}
todo!();
// match self.specialized_scorer(reader, 1.0)? {
// TermOrEmptyOrAllScorer::TermScorer(mut term_scorer) => {
// if term_scorer.doc() > doc || term_scorer.seek(doc) != doc {
// return Err(does_not_match(doc));
// }
// let mut explanation = term_scorer.explain();
// explanation.add_context(format!("Term={:?}", self.term,));
// Ok(explanation)
// }
// TermOrEmptyOrAllScorer::Empty => Err(does_not_match(doc)),
// TermOrEmptyOrAllScorer::AllMatch(_) => AllWeight.explain(reader, doc),
// }
}
fn count(&self, reader: &SegmentReader) -> crate::Result<u32> {
@@ -124,11 +125,12 @@ impl Weight for TermWeight {
let specialized_scorer = self.specialized_scorer(reader, 1.0)?;
match specialized_scorer {
TermOrEmptyOrAllScorer::TermScorer(term_scorer) => {
crate::query::boolean_query::block_wand_single_scorer(
*term_scorer,
threshold,
callback,
);
todo!();
// crate::query::boolean_query::block_wand_single_scorer(
// *term_scorer,
// threshold,
// callback,
// );
}
TermOrEmptyOrAllScorer::Empty => {}
TermOrEmptyOrAllScorer::AllMatch(_) => {
@@ -168,10 +170,10 @@ impl TermWeight {
&self,
reader: &SegmentReader,
boost: Score,
) -> crate::Result<Option<TermScorer>> {
) -> crate::Result<Option<Box<dyn Scorer>>> {
let scorer = self.specialized_scorer(reader, boost)?;
Ok(match scorer {
TermOrEmptyOrAllScorer::TermScorer(scorer) => Some(*scorer),
TermOrEmptyOrAllScorer::TermScorer(scorer) => Some(scorer),
_ => None,
})
}
@@ -196,14 +198,14 @@ impl TermWeight {
)));
}
let segment_postings: SegmentPostings =
let segment_postings: Box<dyn Postings> =
inverted_index.read_postings_from_terminfo(&term_info, self.index_record_option)?;
let fieldnorm_reader = self.fieldnorm_reader(reader)?;
let similarity_weight = self.similarity_weight.boost_by(boost);
Ok(TermOrEmptyOrAllScorer::TermScorer(Box::new(
TermScorer::new(segment_postings, fieldnorm_reader, similarity_weight),
)))
Ok(TermOrEmptyOrAllScorer::TermScorer(
segment_postings.new_term_scorer(fieldnorm_reader, similarity_weight),
))
}
fn fieldnorm_reader(&self, segment_reader: &SegmentReader) -> crate::Result<FieldNormReader> {

View File

@@ -1,7 +1,7 @@
use std::cell::RefCell;
use crate::docset::DocSet;
use crate::postings::Postings;
use crate::postings::{FreqReadingOption, Postings};
use crate::query::BitSetDocSet;
use crate::DocId;
@@ -31,6 +31,15 @@ impl<TDocSet: DocSet> BitSetPostingUnion<TDocSet> {
}
impl<TDocSet: Postings> Postings for BitSetPostingUnion<TDocSet> {
fn new_term_scorer(
self: Box<Self>,
fieldnorm_reader: crate::fieldnorm::FieldNormReader,
similarity_weight: crate::query::Bm25Weight,
) -> Box<dyn crate::query::Scorer> {
use crate::query::term_query::TermScorer;
Box::new(TermScorer::new(*self, fieldnorm_reader, similarity_weight))
}
fn term_freq(&self) -> u32 {
let curr_doc = self.bitset.doc();
let mut term_freq = 0;
@@ -46,6 +55,10 @@ impl<TDocSet: Postings> Postings for BitSetPostingUnion<TDocSet> {
term_freq
}
fn freq_reading_option(&self) -> FreqReadingOption {
FreqReadingOption::ReadFreq
}
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let curr_doc = self.bitset.doc();
let mut docsets = self.docsets.borrow_mut();

View File

@@ -1,5 +1,5 @@
use crate::docset::{DocSet, TERMINATED};
use crate::postings::Postings;
use crate::postings::{FreqReadingOption, Postings};
use crate::DocId;
/// A `SimpleUnion` is a `DocSet` that is the union of multiple `DocSet`.
@@ -45,6 +45,15 @@ impl<TDocSet: DocSet> SimpleUnion<TDocSet> {
}
impl<TDocSet: Postings> Postings for SimpleUnion<TDocSet> {
fn new_term_scorer(
self: Box<Self>,
fieldnorm_reader: crate::fieldnorm::FieldNormReader,
similarity_weight: crate::query::Bm25Weight,
) -> Box<dyn crate::query::Scorer> {
use crate::query::term_query::TermScorer;
Box::new(TermScorer::new(*self, fieldnorm_reader, similarity_weight))
}
fn term_freq(&self) -> u32 {
let mut term_freq = 0;
for docset in &self.docsets {
@@ -56,6 +65,10 @@ impl<TDocSet: Postings> Postings for SimpleUnion<TDocSet> {
term_freq
}
fn freq_reading_option(&self) -> FreqReadingOption {
FreqReadingOption::ReadFreq
}
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
for docset in &mut self.docsets {
let doc = docset.doc();