diff --git a/examples/iterating_docs_and_positions.rs b/examples/iterating_docs_and_positions.rs index 36bc4371c..d08b66a09 100644 --- a/examples/iterating_docs_and_positions.rs +++ b/examples/iterating_docs_and_positions.rs @@ -7,6 +7,7 @@ // the list of documents containing a term, getting // its term frequency, and accessing its positions. +use tantivy::codec::postings::PostingsReader as _; use tantivy::postings::Postings; // --- // Importing tantivy... diff --git a/src/codec/postings/mod.rs b/src/codec/postings/mod.rs index 1a547eea1..528461da5 100644 --- a/src/codec/postings/mod.rs +++ b/src/codec/postings/mod.rs @@ -19,7 +19,7 @@ pub trait PostingsCodec { fieldnorm_reader: Option, ) -> Self::PostingsSerializer; - /// Opens a `BlockSegmentPostings`. + /// Opens a `PostingsReader`. /// `doc_freq` is the number of documents in the posting list. /// `record_option` represents the amount of data available according to the schema. /// `requested_option` is the amount of data requested by the user. @@ -46,7 +46,9 @@ pub trait PostingsSerializer { // TODO docs // TODO Add blockwand trait -pub trait PostingsReader: Sized { +pub trait PostingsReader { + fn box_clone(&self) -> Box; + fn freq_reading_option(&self) -> FreqReadingOption; fn reset(&mut self, doc_freq: u32, postings_data: OwnedBytes) -> io::Result<()>; @@ -69,9 +71,6 @@ pub trait PostingsReader: Sized { fn advance(&mut self); - // TODO Move to the codec and use the serializer. - fn empty() -> Self; - fn block_max_score( &mut self, fieldnorm_reader: &FieldNormReader, diff --git a/src/codec/standard/postings/mod.rs b/src/codec/standard/postings/mod.rs index 74baa746c..463d05255 100644 --- a/src/codec/standard/postings/mod.rs +++ b/src/codec/standard/postings/mod.rs @@ -4,9 +4,9 @@ use crate::schema::IndexRecordOption; use crate::Score; mod block; +mod skip; mod standard_postings_reader; mod standard_postings_serializer; -mod skip; pub use standard_postings_reader::StandardPostingsReader; pub use standard_postings_serializer::StandardPostingsSerializer; diff --git a/src/codec/standard/postings/standard_postings_reader.rs b/src/codec/standard/postings/standard_postings_reader.rs index d9f2b9ca4..1e97ab46c 100644 --- a/src/codec/standard/postings/standard_postings_reader.rs +++ b/src/codec/standard/postings/standard_postings_reader.rs @@ -15,7 +15,7 @@ fn max_score>(mut it: I) -> Option { it.next().map(|first| it.fold(first, Score::max)) } -/// `BlockSegmentPostings` is a cursor iterating over blocks +/// `StandardPostingsReader` is a cursor iterating over blocks /// of documents. /// /// # Warning @@ -89,7 +89,7 @@ fn split_into_skips_and_postings( } impl StandardPostingsReader { - /// Opens a `BlockSegmentPostings`. + /// Opens a `StandardPostingsReader`. /// `doc_freq` is the number of documents in the posting list. /// `record_option` represents the amount of data available according to the schema. /// `requested_option` is the amount of data requested by the user. @@ -150,7 +150,7 @@ impl PostingsReader for StandardPostingsReader { // // This is useful for enumerating through a list of terms, // and consuming the associated posting lists while avoiding - // reallocating a `BlockSegmentPostings`. + // reallocating a `StandardPostingsReader`. // // # Warning // @@ -186,7 +186,7 @@ impl PostingsReader for StandardPostingsReader { /// returned by `.docs()` is empty. #[inline] fn docs(&self) -> &[DocId] { - debug_assert!(self.block_is_loaded()); + debug_assert!(self.block_loaded); self.doc_decoder.output_array() } @@ -199,14 +199,14 @@ impl PostingsReader for StandardPostingsReader { /// Return the array of `term freq` in the block. #[inline] fn freqs(&self) -> &[u32] { - debug_assert!(self.block_is_loaded()); + debug_assert!(self.block_loaded); self.freq_decoder.output_array() } /// Return the frequency at index `idx` of the block. #[inline] fn freq(&self, idx: usize) -> u32 { - debug_assert!(self.block_is_loaded()); + debug_assert!(self.block_loaded); self.freq_decoder.output(idx) } @@ -217,7 +217,7 @@ impl PostingsReader for StandardPostingsReader { /// of any number between 1 and `NUM_DOCS_PER_BLOCK - 1` #[inline] fn block_len(&self) -> usize { - debug_assert!(self.block_is_loaded()); + debug_assert!(self.block_loaded); self.doc_decoder.output_len } @@ -256,20 +256,6 @@ impl PostingsReader for StandardPostingsReader { self.load_block(); } - /// Returns an empty segment postings object - fn empty() -> StandardPostingsReader { - StandardPostingsReader { - doc_decoder: BlockDecoder::with_val(TERMINATED), - block_loaded: true, - freq_decoder: BlockDecoder::with_val(1), - freq_reading_option: FreqReadingOption::NoFreq, - block_max_score_cache: None, - doc_freq: 0, - data: OwnedBytes::empty(), - skip_reader: SkipReader::new(OwnedBytes::empty(), 0, IndexRecordOption::Basic), - } - } - /// Returns the block_max_score for the current block. /// It does not require the block to be loaded. For instance, it is ok to call this method /// after having called `.shallow_advance(..)`. @@ -308,15 +294,29 @@ impl PostingsReader for StandardPostingsReader { // We do not cache it however, so that it gets computed when once block is loaded. bm25_weight.max_score() } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } } impl StandardPostingsReader { - pub(crate) fn skip_reader(&self) -> &SkipReader { - &self.skip_reader + /// Returns an empty segment postings object + pub fn empty() -> StandardPostingsReader { + StandardPostingsReader { + doc_decoder: BlockDecoder::with_val(TERMINATED), + block_loaded: true, + freq_decoder: BlockDecoder::with_val(1), + freq_reading_option: FreqReadingOption::NoFreq, + block_max_score_cache: None, + doc_freq: 0, + data: OwnedBytes::empty(), + skip_reader: SkipReader::new(OwnedBytes::empty(), 0, IndexRecordOption::Basic), + } } - pub(crate) fn block_is_loaded(&self) -> bool { - self.block_loaded + pub(crate) fn skip_reader(&self) -> &SkipReader { + &self.skip_reader } /// Dangerous API! This calls seeks the next block on the skip list, @@ -333,10 +333,10 @@ impl StandardPostingsReader { } pub(crate) fn load_block(&mut self) { - let offset = self.skip_reader.byte_offset(); - if self.block_is_loaded() { + if self.block_loaded { return; } + let offset = self.skip_reader.byte_offset(); match self.skip_reader.block_info() { BlockInfo::BitPacked { doc_num_bits, @@ -381,45 +381,6 @@ impl StandardPostingsReader { } self.block_loaded = true; } - - /// Returns the block_max_score for the current block. - /// It does not require the block to be loaded. For instance, it is ok to call this method - /// after having called `.shallow_advance(..)`. - /// - /// See `TermScorer::block_max_score(..)` for more information. - pub fn block_max_score( - &mut self, - fieldnorm_reader: &FieldNormReader, - bm25_weight: &Bm25Weight, - ) -> Score { - if let Some(score) = self.block_max_score_cache { - return score; - } - if let Some(skip_reader_max_score) = self.skip_reader.block_max_score(bm25_weight) { - // if we are on a full block, the skip reader should have the block max information - // for us - self.block_max_score_cache = Some(skip_reader_max_score); - return skip_reader_max_score; - } - // this is the last block of the segment posting list. - // If it is actually loaded, we can compute block max manually. - if self.block_is_loaded() { - let docs = self.doc_decoder.output_array().iter().cloned(); - let freqs = self.freq_decoder.output_array().iter().cloned(); - let bm25_scores = docs.zip(freqs).map(|(doc, term_freq)| { - let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc); - bm25_weight.score(fieldnorm_id, term_freq) - }); - let block_max_score = max_score(bm25_scores).unwrap_or(0.0); - self.block_max_score_cache = Some(block_max_score); - return block_max_score; - } - // We do not have access to any good block max value. We return bm25_weight.max_score() - // as it is a valid upperbound. - // - // We do not cache it however, so that it gets computed when once block is loaded. - bm25_weight.max_score() - } } #[cfg(test)] diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 53e52b3f6..4473abaed 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -9,6 +9,7 @@ use itertools::Itertools; #[cfg(feature = "quickwit")] use tantivy_fst::automaton::{AlwaysMatch, Automaton}; +use crate::codec::postings::PostingsReader as _; use crate::directory::FileSlice; use crate::positions::PositionReader; use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo}; diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index a8c8878a4..fea231f01 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -7,6 +7,7 @@ use common::ReadOnlyBitSet; use itertools::Itertools; use measure_time::debug_time; +use crate::codec::postings::PostingsReader as _; use crate::codec::{Codec, StandardCodec}; use crate::directory::WritePtr; use crate::docset::{DocSet, TERMINATED}; diff --git a/src/postings/block_segment_postings.rs b/src/postings/block_segment_postings.rs deleted file mode 100644 index bdbf9418d..000000000 --- a/src/postings/block_segment_postings.rs +++ /dev/null @@ -1,555 +0,0 @@ -use std::io; - -use common::VInt; - -use crate::directory::OwnedBytes; -use crate::fieldnorm::FieldNormReader; -use crate::postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE}; -use crate::postings::{BlockInfo, FreqReadingOption, SkipReader}; -use crate::query::Bm25Weight; -use crate::schema::IndexRecordOption; -use crate::{DocId, Score, TERMINATED}; - -fn max_score>(mut it: I) -> Option { - it.next().map(|first| it.fold(first, Score::max)) -} - -/// `BlockSegmentPostings` is a cursor iterating over blocks -/// of documents. -/// -/// # Warning -/// -/// While it is useful for some very specific high-performance -/// use cases, you should prefer using `SegmentPostings` for most usage. -#[derive(Clone)] -pub struct BlockSegmentPostings { - pub(crate) doc_decoder: BlockDecoder, - block_loaded: bool, - freq_decoder: BlockDecoder, - freq_reading_option: FreqReadingOption, - block_max_score_cache: Option, - doc_freq: u32, - data: OwnedBytes, - skip_reader: SkipReader, -} - -fn decode_bitpacked_block( - doc_decoder: &mut BlockDecoder, - freq_decoder_opt: Option<&mut BlockDecoder>, - data: &[u8], - doc_offset: DocId, - doc_num_bits: u8, - tf_num_bits: u8, - strict_delta: bool, -) { - let num_consumed_bytes = - doc_decoder.uncompress_block_sorted(data, doc_offset, doc_num_bits, strict_delta); - if let Some(freq_decoder) = freq_decoder_opt { - freq_decoder.uncompress_block_unsorted( - &data[num_consumed_bytes..], - tf_num_bits, - strict_delta, - ); - } -} - -fn decode_vint_block( - doc_decoder: &mut BlockDecoder, - freq_decoder_opt: Option<&mut BlockDecoder>, - data: &[u8], - doc_offset: DocId, - num_vint_docs: usize, -) { - let num_consumed_bytes = - doc_decoder.uncompress_vint_sorted(data, doc_offset, num_vint_docs, TERMINATED); - if let Some(freq_decoder) = freq_decoder_opt { - // if it's a json term with freq, containing less than 256 docs, we can reach here thinking - // we have a freq, despite not really having one. - if data.len() > num_consumed_bytes { - freq_decoder.uncompress_vint_unsorted( - &data[num_consumed_bytes..], - num_vint_docs, - TERMINATED, - ); - } - } -} - -fn split_into_skips_and_postings( - doc_freq: u32, - mut bytes: OwnedBytes, -) -> io::Result<(Option, OwnedBytes)> { - if doc_freq < COMPRESSION_BLOCK_SIZE as u32 { - return Ok((None, bytes)); - } - let skip_len = VInt::deserialize_u64(&mut bytes)? as usize; - let (skip_data, postings_data) = bytes.split(skip_len); - Ok((Some(skip_data), postings_data)) -} - -impl BlockSegmentPostings { - /// Opens a `BlockSegmentPostings`. - /// `doc_freq` is the number of documents in the posting list. - /// `record_option` represents the amount of data available according to the schema. - /// `requested_option` is the amount of data requested by the user. - /// If for instance, we do not request for term frequencies, this function will not decompress - /// term frequency blocks. - pub(crate) fn open( - doc_freq: u32, - bytes: OwnedBytes, - mut record_option: IndexRecordOption, - requested_option: IndexRecordOption, - ) -> io::Result { - let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?; - let skip_reader = match skip_data_opt { - Some(skip_data) => { - let block_count = doc_freq as usize / COMPRESSION_BLOCK_SIZE; - // 8 is the minimum size of a block with frequency (can be more if pos are stored - // too) - if skip_data.len() < 8 * block_count { - // the field might be encoded with frequency, but this term in particular isn't. - // This can happen for JSON field with term frequencies: - // - text terms are encoded with term freqs. - // - numerical terms are encoded without term freqs. - record_option = IndexRecordOption::Basic; - } - SkipReader::new(skip_data, doc_freq, record_option) - } - None => SkipReader::new(OwnedBytes::empty(), doc_freq, record_option), - }; - - let freq_reading_option = match (record_option, requested_option) { - (IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq, - (_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq, - (_, _) => FreqReadingOption::ReadFreq, - }; - - let mut block_segment_postings = BlockSegmentPostings { - doc_decoder: BlockDecoder::with_val(TERMINATED), - block_loaded: false, - freq_decoder: BlockDecoder::with_val(1), - freq_reading_option, - block_max_score_cache: None, - doc_freq, - data: postings_data, - skip_reader, - }; - block_segment_postings.load_block(); - Ok(block_segment_postings) - } - - /// Returns the block_max_score for the current block. - /// It does not require the block to be loaded. For instance, it is ok to call this method - /// after having called `.shallow_advance(..)`. - /// - /// See `TermScorer::block_max_score(..)` for more information. - pub fn block_max_score( - &mut self, - fieldnorm_reader: &FieldNormReader, - bm25_weight: &Bm25Weight, - ) -> Score { - if let Some(score) = self.block_max_score_cache { - return score; - } - if let Some(skip_reader_max_score) = self.skip_reader.block_max_score(bm25_weight) { - // if we are on a full block, the skip reader should have the block max information - // for us - self.block_max_score_cache = Some(skip_reader_max_score); - return skip_reader_max_score; - } - // this is the last block of the segment posting list. - // If it is actually loaded, we can compute block max manually. - if self.block_loaded { - let docs = self.doc_decoder.output_array().iter().cloned(); - let freqs = self.freq_decoder.output_array().iter().cloned(); - let bm25_scores = docs.zip(freqs).map(|(doc, term_freq)| { - let fieldnorm_id = fieldnorm_reader.fieldnorm_id(doc); - bm25_weight.score(fieldnorm_id, term_freq) - }); - let block_max_score = max_score(bm25_scores).unwrap_or(0.0); - self.block_max_score_cache = Some(block_max_score); - return block_max_score; - } - // We do not have access to any good block max value. We return bm25_weight.max_score() - // as it is a valid upperbound. - // - // We do not cache it however, so that it gets computed when once block is loaded. - bm25_weight.max_score() - } - - pub(crate) fn freq_reading_option(&self) -> FreqReadingOption { - self.freq_reading_option - } - - // Resets the block segment postings on another position - // in the postings file. - // - // This is useful for enumerating through a list of terms, - // and consuming the associated posting lists while avoiding - // reallocating a `BlockSegmentPostings`. - // - // # Warning - // - // This does not reset the positions list. - pub(crate) fn reset(&mut self, doc_freq: u32, postings_data: OwnedBytes) -> io::Result<()> { - let (skip_data_opt, postings_data) = - split_into_skips_and_postings(doc_freq, postings_data)?; - self.data = postings_data; - self.block_max_score_cache = None; - self.block_loaded = false; - if let Some(skip_data) = skip_data_opt { - self.skip_reader.reset(skip_data, doc_freq); - } else { - self.skip_reader.reset(OwnedBytes::empty(), doc_freq); - } - self.doc_freq = doc_freq; - self.load_block(); - Ok(()) - } - - /// Returns the overall number of documents in the block postings. - /// It does not take in account whether documents are deleted or not. - /// - /// This `doc_freq` is simply the sum of the length of all of the blocks - /// length, and it does not take in account deleted documents. - pub fn doc_freq(&self) -> u32 { - self.doc_freq - } - - /// Returns the array of docs in the current block. - /// - /// Before the first call to `.advance()`, the block - /// returned by `.docs()` is empty. - #[inline] - pub fn docs(&self) -> &[DocId] { - debug_assert!(self.block_loaded); - self.doc_decoder.output_array() - } - - /// Return the document at index `idx` of the block. - #[inline] - pub fn doc(&self, idx: usize) -> u32 { - self.doc_decoder.output(idx) - } - - /// Return the array of `term freq` in the block. - #[inline] - pub fn freqs(&self) -> &[u32] { - debug_assert!(self.block_loaded); - self.freq_decoder.output_array() - } - - /// Return the frequency at index `idx` of the block. - #[inline] - pub fn freq(&self, idx: usize) -> u32 { - debug_assert!(self.block_loaded); - self.freq_decoder.output(idx) - } - - /// Returns the length of the current block. - /// - /// All blocks have a length of `NUM_DOCS_PER_BLOCK`, - /// except the last block that may have a length - /// of any number between 1 and `NUM_DOCS_PER_BLOCK - 1` - #[inline] - pub fn block_len(&self) -> usize { - debug_assert!(self.block_loaded); - self.doc_decoder.output_len - } - - /// Position on a block that may contains `target_doc`. - /// - /// If all docs are smaller than target, the block loaded may be empty, - /// or be the last an incomplete VInt block. - pub fn seek(&mut self, target_doc: DocId) -> usize { - // Move to the block that might contain our document. - self.seek_block(target_doc); - self.load_block(); - - // At this point we are on the block that might contain our document. - let doc = self.doc_decoder.seek_within_block(target_doc); - - // The last block is not full and padded with TERMINATED, - // so we are guaranteed to have at least one value (real or padding) - // that is >= target_doc. - debug_assert!(doc < COMPRESSION_BLOCK_SIZE); - - // `doc` is now the first element >= `target_doc`. - // If all docs are smaller than target, the current block is incomplete and padded - // with TERMINATED. After the search, the cursor points to the first TERMINATED. - doc - } - - pub(crate) fn position_offset(&self) -> u64 { - self.skip_reader.position_offset() - } - - /// Dangerous API! This calls seeks the next block on the skip list, - /// but does not `.load_block()` afterwards. - /// - /// `.load_block()` needs to be called manually afterwards. - /// If all docs are smaller than target, the block loaded may be empty, - /// or be the last an incomplete VInt block. - pub(crate) fn seek_block(&mut self, target_doc: DocId) { - if self.skip_reader.seek(target_doc) { - self.block_max_score_cache = None; - self.block_loaded = false; - } - } - - fn load_block(&mut self) { - let offset = self.skip_reader.byte_offset(); - if self.block_loaded { - return; - } - match self.skip_reader.block_info() { - BlockInfo::BitPacked { - doc_num_bits, - strict_delta_encoded, - tf_num_bits, - .. - } => { - decode_bitpacked_block( - &mut self.doc_decoder, - if let FreqReadingOption::ReadFreq = self.freq_reading_option { - Some(&mut self.freq_decoder) - } else { - None - }, - &self.data.as_slice()[offset..], - self.skip_reader.last_doc_in_previous_block, - doc_num_bits, - tf_num_bits, - strict_delta_encoded, - ); - } - BlockInfo::VInt { num_docs } => { - let data = { - if num_docs == 0 { - &[] - } else { - &self.data.as_slice()[offset..] - } - }; - decode_vint_block( - &mut self.doc_decoder, - if let FreqReadingOption::ReadFreq = self.freq_reading_option { - Some(&mut self.freq_decoder) - } else { - None - }, - data, - self.skip_reader.last_doc_in_previous_block, - num_docs as usize, - ); - } - } - self.block_loaded = true; - } - - /// Advance to the next block. - pub fn advance(&mut self) { - self.skip_reader.advance(); - self.block_loaded = false; - self.block_max_score_cache = None; - self.load_block(); - } - - /// Returns an empty segment postings object - pub fn empty() -> BlockSegmentPostings { - BlockSegmentPostings { - doc_decoder: BlockDecoder::with_val(TERMINATED), - block_loaded: true, - freq_decoder: BlockDecoder::with_val(1), - freq_reading_option: FreqReadingOption::NoFreq, - block_max_score_cache: None, - doc_freq: 0, - data: OwnedBytes::empty(), - skip_reader: SkipReader::new(OwnedBytes::empty(), 0, IndexRecordOption::Basic), - } - } - - pub(crate) fn skip_reader(&self) -> &SkipReader { - &self.skip_reader - } -} - -#[cfg(test)] -mod tests { - use common::HasLen; - - use super::BlockSegmentPostings; - use crate::docset::{DocSet, TERMINATED}; - use crate::index::Index; - use crate::postings::compression::COMPRESSION_BLOCK_SIZE; - use crate::postings::postings::Postings; - use crate::postings::SegmentPostings; - use crate::schema::{IndexRecordOption, Schema, Term, INDEXED}; - use crate::DocId; - - #[test] - fn test_empty_segment_postings() { - let mut postings = SegmentPostings::empty(); - assert_eq!(postings.doc(), TERMINATED); - assert_eq!(postings.advance(), TERMINATED); - assert_eq!(postings.advance(), TERMINATED); - assert_eq!(postings.doc_freq(), 0); - assert_eq!(postings.len(), 0); - } - - #[test] - fn test_empty_postings_doc_returns_terminated() { - let mut postings = SegmentPostings::empty(); - assert_eq!(postings.doc(), TERMINATED); - assert_eq!(postings.advance(), TERMINATED); - } - - #[test] - fn test_empty_postings_doc_term_freq_returns_0() { - let postings = SegmentPostings::empty(); - assert_eq!(postings.term_freq(), 1); - } - - #[test] - fn test_empty_block_segment_postings() { - let mut postings = BlockSegmentPostings::empty(); - assert!(postings.docs().is_empty()); - assert_eq!(postings.doc_freq(), 0); - postings.advance(); - assert!(postings.docs().is_empty()); - assert_eq!(postings.doc_freq(), 0); - } - - #[test] - fn test_block_segment_postings() -> crate::Result<()> { - let mut block_segments = build_block_postings(&(0..100_000).collect::>())?; - let mut offset: u32 = 0u32; - // checking that the `doc_freq` is correct - assert_eq!(block_segments.doc_freq(), 100_000); - loop { - let block = block_segments.docs(); - if block.is_empty() { - break; - } - for (i, doc) in block.iter().cloned().enumerate() { - assert_eq!(offset + (i as u32), doc); - } - offset += block.len() as u32; - block_segments.advance(); - } - Ok(()) - } - - #[test] - fn test_skip_right_at_new_block() -> crate::Result<()> { - let mut doc_ids = (0..128).collect::>(); - // 128 is missing - doc_ids.push(129); - doc_ids.push(130); - { - let block_segments = build_block_postings(&doc_ids)?; - let mut docset = SegmentPostings::from_block_postings(block_segments, None); - assert_eq!(docset.seek(128), 129); - assert_eq!(docset.doc(), 129); - assert_eq!(docset.advance(), 130); - assert_eq!(docset.doc(), 130); - assert_eq!(docset.advance(), TERMINATED); - } - { - let block_segments = build_block_postings(&doc_ids).unwrap(); - let mut docset = SegmentPostings::from_block_postings(block_segments, None); - assert_eq!(docset.seek(129), 129); - assert_eq!(docset.doc(), 129); - assert_eq!(docset.advance(), 130); - assert_eq!(docset.doc(), 130); - assert_eq!(docset.advance(), TERMINATED); - } - { - let block_segments = build_block_postings(&doc_ids)?; - let mut docset = SegmentPostings::from_block_postings(block_segments, None); - assert_eq!(docset.doc(), 0); - assert_eq!(docset.seek(131), TERMINATED); - assert_eq!(docset.doc(), TERMINATED); - } - Ok(()) - } - - fn build_block_postings(docs: &[DocId]) -> crate::Result { - let mut schema_builder = Schema::builder(); - let int_field = schema_builder.add_u64_field("id", INDEXED); - let schema = schema_builder.build(); - let index = Index::create_in_ram(schema); - let mut index_writer = index.writer_for_tests()?; - let mut last_doc = 0u32; - for &doc in docs { - for _ in last_doc..doc { - index_writer.add_document(doc!(int_field=>1u64))?; - } - index_writer.add_document(doc!(int_field=>0u64))?; - last_doc = doc + 1; - } - index_writer.commit()?; - let searcher = index.reader()?.searcher(); - let segment_reader = searcher.segment_reader(0); - let inverted_index = segment_reader.inverted_index(int_field).unwrap(); - let term = Term::from_field_u64(int_field, 0u64); - let term_info = inverted_index.get_term_info(&term)?.unwrap(); - let block_postings = inverted_index - .read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?; - Ok(block_postings) - } - - #[test] - fn test_block_segment_postings_seek() -> crate::Result<()> { - let mut docs = vec![0]; - for i in 0..1300 { - docs.push((i * i / 100) + i); - } - let mut block_postings = build_block_postings(&docs[..])?; - for i in &[0, 424, 10000] { - block_postings.seek(*i); - let docs = block_postings.docs(); - assert!(docs[0] <= *i); - assert!(docs.last().cloned().unwrap_or(0u32) >= *i); - } - block_postings.seek(100_000); - assert_eq!(block_postings.doc(COMPRESSION_BLOCK_SIZE - 1), TERMINATED); - Ok(()) - } - - #[test] - fn test_reset_block_segment_postings() -> crate::Result<()> { - let mut schema_builder = Schema::builder(); - let int_field = schema_builder.add_u64_field("id", INDEXED); - let schema = schema_builder.build(); - let index = Index::create_in_ram(schema); - let mut index_writer = index.writer_for_tests()?; - // create two postings list, one containing even number, - // the other containing odd numbers. - for i in 0..6 { - let doc = doc!(int_field=> (i % 2) as u64); - index_writer.add_document(doc)?; - } - index_writer.commit()?; - let searcher = index.reader()?.searcher(); - let segment_reader = searcher.segment_reader(0); - - let mut block_segments; - { - let term = Term::from_field_u64(int_field, 0u64); - let inverted_index = segment_reader.inverted_index(int_field)?; - let term_info = inverted_index.get_term_info(&term)?.unwrap(); - block_segments = inverted_index - .read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?; - } - assert_eq!(block_segments.docs(), &[0, 2, 4]); - { - let term = Term::from_field_u64(int_field, 1u64); - let inverted_index = segment_reader.inverted_index(int_field)?; - let term_info = inverted_index.get_term_info(&term)?.unwrap(); - inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?; - } - assert_eq!(block_segments.docs(), &[1, 3, 5]); - Ok(()) - } -} diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 74723f7e0..c7a9adf92 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -4,7 +4,7 @@ mod block_search; pub(crate) use self::block_search::branchless_binary_search; -mod block_segment_postings; +// mod block_segment_postings; pub(crate) mod compression; mod indexing_context; mod json_postings_writer; @@ -15,13 +15,11 @@ mod postings_writer; mod recorder; mod segment_postings; mod serializer; -mod skip; mod term_info; pub(crate) use loaded_postings::LoadedPostings; pub(crate) use stacker::compute_table_memory_size; -pub use self::block_segment_postings::BlockSegmentPostings; pub(crate) use self::indexing_context::IndexingContext; pub(crate) use self::per_field_postings_writer::PerFieldPostingsWriter; pub use self::postings::Postings; @@ -30,8 +28,8 @@ pub(crate) use self::postings_writer::{ }; pub use self::segment_postings::SegmentPostings; pub use self::serializer::{FieldSerializer, InvertedIndexSerializer}; -pub(crate) use self::skip::{BlockInfo, SkipReader}; pub use self::term_info::TermInfo; +pub use crate::codec::standard::postings::StandardPostingsReader as BlockSegmentPostings; #[expect(clippy::enum_variant_names)] #[derive(Debug, PartialEq, Clone, Copy, Eq)] diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index f2ebf5587..1e064586a 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -1,5 +1,6 @@ use common::HasLen; +use crate::codec::postings::PostingsReader; use crate::docset::DocSet; use crate::fastfield::AliveBitSet; use crate::positions::PositionReader; diff --git a/src/postings/skip.rs b/src/postings/skip.rs deleted file mode 100644 index dd762ca46..000000000 --- a/src/postings/skip.rs +++ /dev/null @@ -1,448 +0,0 @@ -use crate::directory::OwnedBytes; -use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE}; -use crate::query::Bm25Weight; -use crate::schema::IndexRecordOption; -use crate::{DocId, Score, TERMINATED}; - -// doc num bits uses the following encoding: -// given 0b a b cdefgh -// |1|2|3| 4 | -// - 1: unused -// - 2: is delta-1 encoded. 0 if not, 1, if yes -// - 3: unused -// - 4: a 5 bit number in 0..32, the actual bitwidth. Bitpacking could in theory say this is 32 -// (requiring a 6th bit), but the biggest doc_id we can want to encode is TERMINATED-1, which can -// be represented on 31b without delta encoding. -fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 { - assert!(bitwidth < 32); - bitwidth | ((delta_1 as u8) << 6) -} - -fn decode_bitwidth(raw_bitwidth: u8) -> (u8, bool) { - let delta_1 = ((raw_bitwidth >> 6) & 1) != 0; - let bitwidth = raw_bitwidth & 0x1f; - (bitwidth, delta_1) -} - -#[inline] -fn encode_block_wand_max_tf(max_tf: u32) -> u8 { - max_tf.min(u8::MAX as u32) as u8 -} - -#[inline] -fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 { - if max_tf_code == u8::MAX { - u32::MAX - } else { - max_tf_code as u32 - } -} - -#[inline] -fn read_u32(data: &[u8]) -> u32 { - u32::from_le_bytes(data[..4].try_into().unwrap()) -} - -#[inline] -fn write_u32(val: u32, buf: &mut Vec) { - buf.extend_from_slice(&val.to_le_bytes()); -} - -pub struct SkipSerializer { - buffer: Vec, -} - -impl SkipSerializer { - pub fn new() -> SkipSerializer { - SkipSerializer { buffer: Vec::new() } - } - - pub fn write_doc(&mut self, last_doc: DocId, doc_num_bits: u8) { - write_u32(last_doc, &mut self.buffer); - self.buffer.push(encode_bitwidth(doc_num_bits, true)); - } - - pub fn write_term_freq(&mut self, tf_num_bits: u8) { - self.buffer.push(tf_num_bits); - } - - pub fn write_total_term_freq(&mut self, tf_sum: u32) { - write_u32(tf_sum, &mut self.buffer); - } - - pub fn write_blockwand_max(&mut self, fieldnorm_id: u8, term_freq: u32) { - let block_wand_tf = encode_block_wand_max_tf(term_freq); - self.buffer - .extend_from_slice(&[fieldnorm_id, block_wand_tf]); - } - - pub fn data(&self) -> &[u8] { - &self.buffer[..] - } - - pub fn clear(&mut self) { - self.buffer.clear(); - } -} - -#[derive(Clone)] -pub(crate) struct SkipReader { - last_doc_in_block: DocId, - pub(crate) last_doc_in_previous_block: DocId, - owned_read: OwnedBytes, - skip_info: IndexRecordOption, - byte_offset: usize, - remaining_docs: u32, // number of docs remaining, including the - // documents in the current block. - block_info: BlockInfo, - - position_offset: u64, -} - -#[derive(Clone, Eq, PartialEq, Copy, Debug)] -pub(crate) enum BlockInfo { - BitPacked { - doc_num_bits: u8, - strict_delta_encoded: bool, - tf_num_bits: u8, - tf_sum: u32, - block_wand_fieldnorm_id: u8, - block_wand_term_freq: u32, - }, - VInt { - num_docs: u32, - }, -} - -impl Default for BlockInfo { - fn default() -> Self { - BlockInfo::VInt { num_docs: 0u32 } - } -} - -impl SkipReader { - pub fn new(data: OwnedBytes, doc_freq: u32, skip_info: IndexRecordOption) -> SkipReader { - let mut skip_reader = SkipReader { - last_doc_in_block: if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { - 0 - } else { - TERMINATED - }, - last_doc_in_previous_block: 0u32, - owned_read: data, - skip_info, - block_info: BlockInfo::VInt { num_docs: doc_freq }, - byte_offset: 0, - remaining_docs: doc_freq, - position_offset: 0u64, - }; - if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { - skip_reader.read_block_info(); - } - skip_reader - } - - pub fn reset(&mut self, data: OwnedBytes, doc_freq: u32) { - self.last_doc_in_block = if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { - 0 - } else { - TERMINATED - }; - self.last_doc_in_previous_block = 0u32; - self.owned_read = data; - self.block_info = BlockInfo::VInt { num_docs: doc_freq }; - self.byte_offset = 0; - self.remaining_docs = doc_freq; - self.position_offset = 0u64; - if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { - self.read_block_info(); - } - } - - // Returns the block max score for this block if available. - // - // The block max score is available for all full bitpacked block, - // but no available for the last VInt encoded incomplete block. - pub fn block_max_score(&self, bm25_weight: &Bm25Weight) -> Option { - match self.block_info { - BlockInfo::BitPacked { - block_wand_fieldnorm_id, - block_wand_term_freq, - .. - } => Some(bm25_weight.score(block_wand_fieldnorm_id, block_wand_term_freq)), - BlockInfo::VInt { .. } => None, - } - } - - pub(crate) fn last_doc_in_block(&self) -> DocId { - self.last_doc_in_block - } - - pub fn position_offset(&self) -> u64 { - self.position_offset - } - - #[inline] - pub fn byte_offset(&self) -> usize { - self.byte_offset - } - - fn read_block_info(&mut self) { - let bytes = self.owned_read.as_slice(); - let advance_len: usize; - self.last_doc_in_block = read_u32(bytes); - let (doc_num_bits, strict_delta_encoded) = decode_bitwidth(bytes[4]); - match self.skip_info { - IndexRecordOption::Basic => { - advance_len = 5; - self.block_info = BlockInfo::BitPacked { - doc_num_bits, - strict_delta_encoded, - tf_num_bits: 0, - tf_sum: 0, - block_wand_fieldnorm_id: 0, - block_wand_term_freq: 0, - }; - } - IndexRecordOption::WithFreqs => { - let tf_num_bits = bytes[5]; - let block_wand_fieldnorm_id = bytes[6]; - let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]); - advance_len = 8; - self.block_info = BlockInfo::BitPacked { - doc_num_bits, - strict_delta_encoded, - tf_num_bits, - tf_sum: 0, - block_wand_fieldnorm_id, - block_wand_term_freq, - }; - } - IndexRecordOption::WithFreqsAndPositions => { - let tf_num_bits = bytes[5]; - let tf_sum = read_u32(&bytes[6..10]); - let block_wand_fieldnorm_id = bytes[10]; - let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]); - advance_len = 12; - self.block_info = BlockInfo::BitPacked { - doc_num_bits, - strict_delta_encoded, - tf_num_bits, - tf_sum, - block_wand_fieldnorm_id, - block_wand_term_freq, - }; - } - } - self.owned_read.advance(advance_len); - } - - pub fn block_info(&self) -> BlockInfo { - self.block_info - } - - /// Advance the skip reader to the block that may contain the target. - /// - /// If the target is larger than all documents, the skip_reader - /// then advance to the last Variable In block. - pub fn seek(&mut self, target: DocId) -> bool { - if self.last_doc_in_block() >= target { - return false; - } - loop { - self.advance(); - if self.last_doc_in_block() >= target { - return true; - } - } - } - - pub fn advance(&mut self) { - match self.block_info { - BlockInfo::BitPacked { - doc_num_bits, - tf_num_bits, - tf_sum, - .. - } => { - self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32; - self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits); - self.position_offset += tf_sum as u64; - } - BlockInfo::VInt { num_docs } => { - debug_assert_eq!(num_docs, self.remaining_docs); - self.remaining_docs = 0; - self.byte_offset = usize::MAX; - } - } - self.last_doc_in_previous_block = self.last_doc_in_block; - if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 { - self.read_block_info(); - } else { - self.last_doc_in_block = TERMINATED; - self.block_info = BlockInfo::VInt { - num_docs: self.remaining_docs, - }; - } - } -} - -#[cfg(test)] -mod tests { - - use super::{ - decode_bitwidth, encode_bitwidth, BlockInfo, IndexRecordOption, SkipReader, SkipSerializer, - }; - use crate::directory::OwnedBytes; - use crate::postings::compression::COMPRESSION_BLOCK_SIZE; - - #[test] - fn test_encode_block_wand_max_tf() { - for tf in 0..255 { - assert_eq!(super::encode_block_wand_max_tf(tf), tf as u8); - } - for &tf in &[255, 256, 1_000_000, u32::MAX] { - assert_eq!(super::encode_block_wand_max_tf(tf), 255); - } - } - - #[test] - fn test_decode_block_wand_max_tf() { - for tf in 0..255 { - assert_eq!(super::decode_block_wand_max_tf(tf), tf as u32); - } - assert_eq!(super::decode_block_wand_max_tf(255), u32::MAX); - } - - #[test] - fn test_skip_with_freq() { - let buf = { - let mut skip_serializer = SkipSerializer::new(); - skip_serializer.write_doc(1u32, 2u8); - skip_serializer.write_term_freq(3u8); - skip_serializer.write_blockwand_max(13u8, 3u32); - skip_serializer.write_doc(5u32, 5u8); - skip_serializer.write_term_freq(2u8); - skip_serializer.write_blockwand_max(8u8, 2u32); - skip_serializer.data().to_owned() - }; - let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32; - let mut skip_reader = - SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::WithFreqs); - assert_eq!(skip_reader.last_doc_in_block(), 1u32); - assert_eq!( - skip_reader.block_info, - BlockInfo::BitPacked { - doc_num_bits: 2u8, - strict_delta_encoded: true, - tf_num_bits: 3u8, - tf_sum: 0, - block_wand_fieldnorm_id: 13, - block_wand_term_freq: 3 - } - ); - skip_reader.advance(); - assert_eq!(skip_reader.last_doc_in_block(), 5u32); - assert_eq!( - skip_reader.block_info(), - BlockInfo::BitPacked { - doc_num_bits: 5u8, - strict_delta_encoded: true, - tf_num_bits: 2u8, - tf_sum: 0, - block_wand_fieldnorm_id: 8, - block_wand_term_freq: 2 - } - ); - skip_reader.advance(); - assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 }); - skip_reader.advance(); - assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 }); - skip_reader.advance(); - assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 }); - } - - #[test] - fn test_skip_no_freq() { - let buf = { - let mut skip_serializer = SkipSerializer::new(); - skip_serializer.write_doc(1u32, 2u8); - skip_serializer.write_doc(5u32, 5u8); - skip_serializer.data().to_owned() - }; - let doc_freq = 3u32 + (COMPRESSION_BLOCK_SIZE * 2) as u32; - let mut skip_reader = - SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::Basic); - assert_eq!(skip_reader.last_doc_in_block(), 1u32); - assert_eq!( - skip_reader.block_info(), - BlockInfo::BitPacked { - doc_num_bits: 2u8, - strict_delta_encoded: true, - tf_num_bits: 0, - tf_sum: 0u32, - block_wand_fieldnorm_id: 0, - block_wand_term_freq: 0 - } - ); - skip_reader.advance(); - assert_eq!(skip_reader.last_doc_in_block(), 5u32); - assert_eq!( - skip_reader.block_info(), - BlockInfo::BitPacked { - doc_num_bits: 5u8, - strict_delta_encoded: true, - tf_num_bits: 0, - tf_sum: 0u32, - block_wand_fieldnorm_id: 0, - block_wand_term_freq: 0 - } - ); - skip_reader.advance(); - assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 3u32 }); - skip_reader.advance(); - assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 }); - skip_reader.advance(); - assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 }); - } - - #[test] - fn test_skip_multiple_of_block_size() { - let buf = { - let mut skip_serializer = SkipSerializer::new(); - skip_serializer.write_doc(1u32, 2u8); - skip_serializer.data().to_owned() - }; - let doc_freq = COMPRESSION_BLOCK_SIZE as u32; - let mut skip_reader = - SkipReader::new(OwnedBytes::new(buf), doc_freq, IndexRecordOption::Basic); - assert_eq!(skip_reader.last_doc_in_block(), 1u32); - assert_eq!( - skip_reader.block_info(), - BlockInfo::BitPacked { - doc_num_bits: 2u8, - strict_delta_encoded: true, - tf_num_bits: 0, - tf_sum: 0u32, - block_wand_fieldnorm_id: 0, - block_wand_term_freq: 0 - } - ); - skip_reader.advance(); - assert_eq!(skip_reader.block_info(), BlockInfo::VInt { num_docs: 0u32 }); - } - - #[test] - fn test_encode_decode_bitwidth() { - for bitwidth in 0..32 { - for delta_1 in [false, true] { - assert_eq!( - (bitwidth, delta_1), - decode_bitwidth(encode_bitwidth(bitwidth, delta_1)) - ); - } - } - assert_eq!(0b01000010, encode_bitwidth(0b10, true)); - assert_eq!(0b00000010, encode_bitwidth(0b10, false)); - } -} diff --git a/src/query/automaton_weight.rs b/src/query/automaton_weight.rs index 5f1053fb6..1ef960299 100644 --- a/src/query/automaton_weight.rs +++ b/src/query/automaton_weight.rs @@ -5,6 +5,7 @@ use common::BitSet; use tantivy_fst::Automaton; use super::phrase_prefix_query::prefix_end; +use crate::codec::postings::PostingsReader as _; use crate::index::SegmentReader; use crate::postings::TermInfo; use crate::query::{BitSetDocSet, ConstScorer, Explanation, Scorer, Weight}; diff --git a/src/query/phrase_query/regex_phrase_weight.rs b/src/query/phrase_query/regex_phrase_weight.rs index 4e850d2e2..4b1c7091a 100644 --- a/src/query/phrase_query/regex_phrase_weight.rs +++ b/src/query/phrase_query/regex_phrase_weight.rs @@ -4,6 +4,7 @@ use common::BitSet; use tantivy_fst::Regex; use super::PhraseScorer; +use crate::codec::postings::PostingsReader as _; use crate::fieldnorm::FieldNormReader; use crate::index::SegmentReader; use crate::postings::{LoadedPostings, Postings, SegmentPostings, TermInfo}; diff --git a/src/query/range_query/range_query.rs b/src/query/range_query/range_query.rs index 1893a06a5..193ff0346 100644 --- a/src/query/range_query/range_query.rs +++ b/src/query/range_query/range_query.rs @@ -5,6 +5,7 @@ use common::bounds::{map_bound, BoundsRange}; use common::BitSet; use super::range_query_fastfield::FastFieldRangeWeight; +use crate::codec::postings::PostingsReader as _; use crate::index::SegmentReader; use crate::query::explanation::does_not_match; use crate::query::range_query::is_type_valid_for_fastfield_range_query; diff --git a/src/query/term_query/term_scorer.rs b/src/query/term_query/term_scorer.rs index 00fb8ca0b..7e00e84a2 100644 --- a/src/query/term_query/term_scorer.rs +++ b/src/query/term_query/term_scorer.rs @@ -1,3 +1,4 @@ +use crate::codec::postings::PostingsReader as _; use crate::docset::DocSet; use crate::fieldnorm::FieldNormReader; use crate::postings::{FreqReadingOption, Postings, SegmentPostings};