From 176f7e852aad55ee3768b4c532d47d3bb9d1ebd9 Mon Sep 17 00:00:00 2001 From: Eric Ridge Date: Mon, 9 Jun 2025 13:57:05 -0400 Subject: [PATCH] perf: remove general overhead during segment merging (#47) --- runtests.sh | 2 +- .../merge_optimized_inverted_index_reader.rs | 22 +- src/indexer/merger.rs | 6 +- src/positions/borrowed_position_reader.rs | 155 ++++++++++++ src/positions/mod.rs | 1 + src/positions/serializer.rs | 110 ++++++++- src/postings/block_segment_postings.rs | 6 +- .../borrowed_block_segment_postings.rs | 223 ++++++++++++++++++ src/postings/borrowed_segment_postings.rs | 179 ++++++++++++++ src/postings/borrowed_skip_reader.rs | 160 +++++++++++++ src/postings/mod.rs | 7 +- src/postings/skip.rs | 14 +- 12 files changed, 852 insertions(+), 33 deletions(-) create mode 100644 src/positions/borrowed_position_reader.rs create mode 100644 src/postings/borrowed_block_segment_postings.rs create mode 100644 src/postings/borrowed_segment_postings.rs create mode 100644 src/postings/borrowed_skip_reader.rs diff --git a/runtests.sh b/runtests.sh index c915be42f..4d4367a41 100755 --- a/runtests.sh +++ b/runtests.sh @@ -1,3 +1,3 @@ #! /bin/bash -cargo +stable nextest run --features mmap,stopwords,lz4-compression,zstd-compression,failpoints --verbose --workspace +cargo +stable nextest run --features quickwit,mmap,stopwords,lz4-compression,zstd-compression,failpoints --verbose --workspace diff --git a/src/index/merge_optimized_inverted_index_reader.rs b/src/index/merge_optimized_inverted_index_reader.rs index afe89bd85..3dc0534a4 100644 --- a/src/index/merge_optimized_inverted_index_reader.rs +++ b/src/index/merge_optimized_inverted_index_reader.rs @@ -3,8 +3,10 @@ use std::io; use common::OwnedBytes; use crate::directory::FileSlice; -use crate::positions::PositionReader; -use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo}; +use crate::positions::borrowed_position_reader::BorrowedPositionReader; +use crate::postings::borrowed_block_segment_postings::BorrowedBlockSegmentPostings; +use crate::postings::borrowed_segment_postings::BorrowedSegmentPostings; +use crate::postings::TermInfo; use crate::schema::IndexRecordOption; use crate::termdict::TermDictionary; @@ -64,9 +66,9 @@ impl MergeOptimizedInvertedIndexReader { &self, term_info: &TermInfo, requested_option: IndexRecordOption, - ) -> io::Result { - let postings_data = self.postings_bytes.slice(term_info.postings_range.clone()); - BlockSegmentPostings::open( + ) -> io::Result { + let postings_data = &self.postings_bytes[term_info.postings_range.clone()]; + BorrowedBlockSegmentPostings::open( term_info.doc_freq, postings_data, self.record_option, @@ -82,22 +84,20 @@ impl MergeOptimizedInvertedIndexReader { &self, term_info: &TermInfo, option: IndexRecordOption, - ) -> io::Result { + ) -> io::Result { let option = option.downgrade(self.record_option); let block_postings = self.read_block_postings_from_terminfo(term_info, option)?; let position_reader = { if option.has_positions() { - let positions_data = self - .positions_bytes - .slice(term_info.positions_range.clone()); - let position_reader = PositionReader::open(positions_data)?; + let positions_data = &self.positions_bytes[term_info.positions_range.clone()]; + let position_reader = BorrowedPositionReader::open(positions_data)?; Some(position_reader) } else { None } }; - Ok(SegmentPostings::from_block_postings( + Ok(BorrowedSegmentPostings::from_block_postings( block_postings, position_reader, )) diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index b7188c316..0566d9016 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -17,7 +17,8 @@ use crate::index::{Segment, SegmentComponent, SegmentReader}; use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping}; use crate::indexer::segment_updater::CancelSentinel; use crate::indexer::SegmentSerializer; -use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings}; +use crate::postings::borrowed_segment_postings::BorrowedSegmentPostings; +use crate::postings::InvertedIndexSerializer; use crate::schema::{value_type_to_column_type, Field, FieldType, Schema}; use crate::store::StoreWriter; use crate::termdict::{TermMerger, TermOrdinal}; @@ -372,7 +373,8 @@ impl IndexMerger { indexed. Have you modified the schema?", ); - let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![]; + let mut segment_postings_containing_the_term: Vec<(usize, BorrowedSegmentPostings)> = + vec![]; let mut cnt = 0; while merged_terms.advance() { diff --git a/src/positions/borrowed_position_reader.rs b/src/positions/borrowed_position_reader.rs new file mode 100644 index 000000000..a3b7fa862 --- /dev/null +++ b/src/positions/borrowed_position_reader.rs @@ -0,0 +1,155 @@ +use std::io; + +use common::{BinarySerializable, VInt}; + +use crate::positions::COMPRESSION_BLOCK_SIZE; +use crate::postings::compression::{BlockDecoder, VIntDecoder}; + +/// When accessing the positions of a term, we get a positions_idx from the `Terminfo`. +/// This means we need to skip to the `nth` position efficiently. +/// +/// Blocks are compressed using bitpacking, so `skip_read` contains the number of bits +/// (values can go from 0 to 32 bits) required to decompress every block. +/// +/// A given block obviously takes `(128 x num_bit_for_the_block / num_bits_in_a_byte)`, +/// so skipping a block without decompressing it is just a matter of advancing that many +/// bytes. + +#[derive(Clone)] +pub struct BorrowedPositionReader<'a> { + bit_widths: &'a [u8], + positions: &'a [u8], + + block_decoder: BlockDecoder, + + // offset, expressed in positions, for the first position of the block currently loaded + // block_offset is a multiple of COMPRESSION_BLOCK_SIZE. + block_offset: u64, + // offset, expressed in positions, for the position of the first block encoded + // in the `self.positions` bytes, and if bitpacked, compressed using the bitwidth in + // `self.bit_widths`. + // + // As we advance, anchor increases simultaneously with bit_widths and positions get consumed. + anchor_offset: u64, + + // These are just copies used for .reset(). + original_bit_widths: &'a [u8], + original_positions: &'a [u8], +} + +impl<'a> BorrowedPositionReader<'a> { + /// Open and reads the term positions encoded into the positions_data owned bytes. + pub fn open(mut positions_data: &'a [u8]) -> io::Result> { + let num_positions_bitpacked_blocks = VInt::deserialize(&mut positions_data)?.0 as usize; + let (bit_widths, positions) = positions_data.split_at(num_positions_bitpacked_blocks); + Ok(BorrowedPositionReader { + bit_widths, + positions, + block_decoder: BlockDecoder::default(), + block_offset: i64::MAX as u64, + anchor_offset: 0u64, + original_bit_widths: bit_widths, + original_positions: positions, + }) + } + + fn reset(&mut self) { + self.positions = self.original_positions; + self.bit_widths = self.original_bit_widths; + self.block_offset = i64::MAX as u64; + self.anchor_offset = 0u64; + } + + /// Advance from num_blocks bitpacked blocks. + /// + /// Panics if there are not that many remaining blocks. + fn advance_num_blocks(&mut self, num_blocks: usize) { + let num_bits: usize = self.bit_widths.as_ref()[..num_blocks] + .iter() + .cloned() + .map(|num_bits| num_bits as usize) + .sum(); + let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8; + + // self.bit_widths.advance(num_blocks); + let (_, rest) = self.bit_widths.split_at(num_blocks); + self.bit_widths = rest; + + // self.positions.advance(num_bytes_to_skip); + let (_, rest) = self.positions.split_at(num_bytes_to_skip); + self.positions = rest; + + self.anchor_offset += (num_blocks * COMPRESSION_BLOCK_SIZE) as u64; + } + + /// block_rel_id is counted relatively to the anchor. + /// block_rel_id = 0 means the anchor block. + /// block_rel_id = i means the ith block after the anchor block. + fn load_block(&mut self, block_rel_id: usize) { + let bit_widths = self.bit_widths; + let byte_offset: usize = bit_widths[0..block_rel_id] + .iter() + .map(|&b| b as usize) + .sum::() + * COMPRESSION_BLOCK_SIZE + / 8; + let compressed_data = &self.positions[byte_offset..]; + if bit_widths.len() > block_rel_id { + // that block is bitpacked. + let bit_width = bit_widths[block_rel_id]; + self.block_decoder + .uncompress_block_unsorted(compressed_data, bit_width, false); + } else { + // that block is vint encoded. + self.block_decoder + .uncompress_vint_unsorted_until_end(compressed_data); + } + self.block_offset = self.anchor_offset + (block_rel_id * COMPRESSION_BLOCK_SIZE) as u64; + } + + /// Fills a buffer with the positions `[offset..offset+output.len())` integers. + /// + /// This function is optimized to be called with increasing values of `offset`. + pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) { + if offset < self.anchor_offset { + self.reset(); + } + let delta_to_block_offset = offset as i64 - self.block_offset as i64; + if !(0..128).contains(&delta_to_block_offset) { + // The first position is not within the first block. + // (Note that it could be before or after) + // We need to possibly skip a few blocks, and decompress the first relevant block. + let delta_to_anchor_offset = offset - self.anchor_offset; + let num_blocks_to_skip = + (delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize; + self.advance_num_blocks(num_blocks_to_skip); + self.load_block(0); + } else { + // The request offset is within the loaded block. + // We still need to advance anchor_offset to our current block. + let num_blocks_to_skip = + ((self.block_offset - self.anchor_offset) / COMPRESSION_BLOCK_SIZE as u64) as usize; + self.advance_num_blocks(num_blocks_to_skip); + } + + // At this point, the block containing offset is loaded, and anchor has + // been updated to point to it as well. + for i in 1.. { + // we copy the part from block i - 1 that is relevant. + let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE; + let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block; + if remaining_in_block >= output.len() { + output.copy_from_slice( + &self.block_decoder.output_array()[offset_in_block..][..output.len()], + ); + break; + } + output[..remaining_in_block] + .copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]); + output = &mut output[remaining_in_block..]; + // we load block #i if necessary. + offset += remaining_in_block as u64; + self.load_block(i); + } + } +} diff --git a/src/positions/mod.rs b/src/positions/mod.rs index a96546d54..da9ee5844 100644 --- a/src/positions/mod.rs +++ b/src/positions/mod.rs @@ -28,6 +28,7 @@ //! * *VIntPosDeltas* := *VIntPosDelta*^(*P* % 128). //! //! The skip widths encoded separately makes it easy and fast to rapidly skip over n positions. +pub(crate) mod borrowed_position_reader; mod reader; mod serializer; diff --git a/src/positions/serializer.rs b/src/positions/serializer.rs index f41923e8b..a4028cd5f 100644 --- a/src/positions/serializer.rs +++ b/src/positions/serializer.rs @@ -45,13 +45,109 @@ impl PositionSerializer { /// Writes all of the given positions delta. pub fn write_positions_delta(&mut self, mut positions_delta: &[u32]) { - while !positions_delta.is_empty() { - let remaining_block_len = self.remaining_block_len(); - let num_to_write = remaining_block_len.min(positions_delta.len()); - self.block.extend(&positions_delta[..num_to_write]); - positions_delta = &positions_delta[num_to_write..]; - if self.remaining_block_len() == 0 { - self.flush_block(); + match positions_delta.len() { + 0 => {} + 1 => { + if self.remaining_block_len() == 0 { + self.flush_block(); + } + self.block.push(positions_delta[0]); + } + 2 => { + let rem = self.remaining_block_len(); + if rem < 2 { + if rem == 1 { + self.block.push(positions_delta[0]); + self.flush_block(); + self.block.push(positions_delta[1]); + } else { + self.flush_block(); + self.block.push(positions_delta[0]); + self.block.push(positions_delta[1]); + } + } else { + self.block.push(positions_delta[0]); + self.block.push(positions_delta[1]); + } + } + 3 => { + let rem = self.remaining_block_len(); + match rem { + 3.. => { + self.block.push(positions_delta[0]); + self.block.push(positions_delta[1]); + self.block.push(positions_delta[2]); + } + 2 => { + self.block.push(positions_delta[0]); + self.block.push(positions_delta[1]); + self.flush_block(); + self.block.push(positions_delta[2]); + } + 1 => { + self.block.push(positions_delta[0]); + self.flush_block(); + self.block.push(positions_delta[1]); + self.block.push(positions_delta[2]); + } + 0 => { + self.flush_block(); + self.block.push(positions_delta[0]); + self.block.push(positions_delta[1]); + self.block.push(positions_delta[2]); + } + } + } + 4 => { + let rem = self.remaining_block_len(); + match rem { + 4.. => { + self.block.push(positions_delta[0]); + self.block.push(positions_delta[1]); + self.block.push(positions_delta[2]); + self.block.push(positions_delta[3]); + } + 3 => { + self.block.push(positions_delta[0]); + self.block.push(positions_delta[1]); + self.block.push(positions_delta[2]); + self.flush_block(); + self.block.push(positions_delta[3]); + } + 2 => { + self.block.push(positions_delta[0]); + self.block.push(positions_delta[1]); + self.flush_block(); + self.block.push(positions_delta[2]); + self.block.push(positions_delta[3]); + } + 1 => { + self.block.push(positions_delta[0]); + self.flush_block(); + self.block.push(positions_delta[1]); + self.block.push(positions_delta[2]); + self.block.push(positions_delta[3]); + } + 0 => { + self.flush_block(); + self.block.push(positions_delta[0]); + self.block.push(positions_delta[1]); + self.block.push(positions_delta[2]); + self.block.push(positions_delta[3]); + } + } + } + _ => { + while !positions_delta.is_empty() { + let remaining_block_len = self.remaining_block_len(); + let num_to_write = remaining_block_len.min(positions_delta.len()); + self.block + .extend_from_slice(&positions_delta[..num_to_write]); + positions_delta = &positions_delta[num_to_write..]; + if self.remaining_block_len() == 0 { + self.flush_block(); + } + } } } } diff --git a/src/postings/block_segment_postings.rs b/src/postings/block_segment_postings.rs index 26646cdbd..518bcf777 100644 --- a/src/postings/block_segment_postings.rs +++ b/src/postings/block_segment_postings.rs @@ -10,7 +10,7 @@ use crate::query::Bm25Weight; use crate::schema::IndexRecordOption; use crate::{DocId, Score, TERMINATED}; -fn max_score>(mut it: I) -> Option { +pub(crate) fn max_score>(mut it: I) -> Option { it.next().map(|first| it.fold(first, Score::max)) } @@ -33,7 +33,7 @@ pub struct BlockSegmentPostings { skip_reader: SkipReader, } -fn decode_bitpacked_block( +pub(crate) fn decode_bitpacked_block( doc_decoder: &mut BlockDecoder, freq_decoder_opt: Option<&mut BlockDecoder>, data: &[u8], @@ -53,7 +53,7 @@ fn decode_bitpacked_block( } } -fn decode_vint_block( +pub(crate) fn decode_vint_block( doc_decoder: &mut BlockDecoder, freq_decoder_opt: Option<&mut BlockDecoder>, data: &[u8], diff --git a/src/postings/borrowed_block_segment_postings.rs b/src/postings/borrowed_block_segment_postings.rs new file mode 100644 index 000000000..c09e3d849 --- /dev/null +++ b/src/postings/borrowed_block_segment_postings.rs @@ -0,0 +1,223 @@ +use std::io; + +use common::VInt; + +use crate::postings::block_segment_postings::{decode_bitpacked_block, decode_vint_block}; +use crate::postings::borrowed_skip_reader::BorrowedSkipReader; +use crate::postings::compression::{BlockDecoder, COMPRESSION_BLOCK_SIZE}; +use crate::postings::{BlockInfo, FreqReadingOption}; +use crate::schema::IndexRecordOption; +use crate::{DocId, Score, TERMINATED}; + +/// `BorrowedBlockSegmentPostings` is a cursor iterating over blocks +/// of documents. +/// +/// # Warning +/// +/// While it is useful for some very specific high-performance +/// use cases, you should prefer using `SegmentPostings` for most usage. +#[derive(Clone)] +pub struct BorrowedBlockSegmentPostings<'a> { + pub(crate) doc_decoder: BlockDecoder, + block_loaded: bool, + freq_decoder: BlockDecoder, + freq_reading_option: FreqReadingOption, + block_max_score_cache: Option, + doc_freq: u32, + data: &'a [u8], + skip_reader: BorrowedSkipReader<'a>, +} + +fn split_into_skips_and_postings( + doc_freq: u32, + mut bytes: &[u8], +) -> io::Result<(Option<&[u8]>, &[u8])> { + if doc_freq < COMPRESSION_BLOCK_SIZE as u32 { + return Ok((None, bytes)); + } + let skip_len = VInt::deserialize_u64(&mut bytes)? as usize; + let (skip_data, postings_data) = bytes.split_at(skip_len); + Ok((Some(skip_data), postings_data)) +} + +impl<'a> BorrowedBlockSegmentPostings<'a> { + /// Opens a `BorrowedBlockSegmentPostings`. + /// `doc_freq` is the number of documents in the posting list. + /// `record_option` represents the amount of data available according to the schema. + /// `requested_option` is the amount of data requested by the user. + /// If for instance, we do not request for term frequencies, this function will not decompress + /// term frequency blocks. + pub(crate) fn open( + doc_freq: u32, + bytes: &'a [u8], + mut record_option: IndexRecordOption, + requested_option: IndexRecordOption, + ) -> io::Result> { + let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?; + let skip_reader = match skip_data_opt { + Some(skip_data) => { + let block_count = doc_freq as usize / COMPRESSION_BLOCK_SIZE; + // 8 is the minimum size of a block with frequency (can be more if pos are stored + // too) + if skip_data.len() < 8 * block_count { + // the field might be encoded with frequency, but this term in particular isn't. + // This can happen for JSON field with term frequencies: + // - text terms are encoded with term freqs. + // - numerical terms are encoded without term freqs. + record_option = IndexRecordOption::Basic; + } + BorrowedSkipReader::new(skip_data, doc_freq, record_option) + } + None => BorrowedSkipReader::new(&[], doc_freq, record_option), + }; + + let freq_reading_option = match (record_option, requested_option) { + (IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq, + (_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq, + (_, _) => FreqReadingOption::ReadFreq, + }; + + let mut block_segment_postings = BorrowedBlockSegmentPostings { + doc_decoder: BlockDecoder::with_val(TERMINATED), + block_loaded: false, + freq_decoder: BlockDecoder::with_val(1), + freq_reading_option, + block_max_score_cache: None, + doc_freq, + data: postings_data, + skip_reader, + }; + block_segment_postings.load_block(); + Ok(block_segment_postings) + } + + /// Returns the overall number of documents in the block postings. + /// It does not take in account whether documents are deleted or not. + /// + /// This `doc_freq` is simply the sum of the length of all of the blocks + /// length, and it does not take in account deleted documents. + pub fn doc_freq(&self) -> u32 { + self.doc_freq + } + + /// Returns a full block, regardless of whether the block is complete or incomplete ( + /// as it happens for the last block of the posting list). + /// + /// In the latter case, the block is guaranteed to be padded with the sentinel value: + /// `TERMINATED`. The array is also guaranteed to be aligned on 16 bytes = 128 bits. + /// + /// This method is useful to run SSE2 linear search. + #[inline] + pub(crate) fn full_block(&self) -> &[DocId; COMPRESSION_BLOCK_SIZE] { + debug_assert!(self.block_is_loaded()); + self.doc_decoder.full_output() + } + + /// Return the document at index `idx` of the block. + #[inline] + pub fn doc(&self, idx: usize) -> u32 { + self.doc_decoder.output(idx) + } + + /// Return the array of `term freq` in the block. + #[inline] + pub fn freqs(&self) -> &[u32] { + debug_assert!(self.block_is_loaded()); + self.freq_decoder.output_array() + } + + /// Return the frequency at index `idx` of the block. + #[inline] + pub fn freq(&self, idx: usize) -> u32 { + debug_assert!(self.block_is_loaded()); + self.freq_decoder.output(idx) + } + + /// Position on a block that may contains `target_doc`. + /// + /// If all docs are smaller than target, the block loaded may be empty, + /// or be the last an incomplete VInt block. + pub fn seek(&mut self, target_doc: DocId) { + self.shallow_seek(target_doc); + self.load_block(); + } + + pub(crate) fn position_offset(&self) -> u64 { + self.skip_reader.position_offset() + } + + /// Dangerous API! This calls seek on the skip list, + /// but does not `.load_block()` afterwards. + /// + /// `.load_block()` needs to be called manually afterwards. + /// If all docs are smaller than target, the block loaded may be empty, + /// or be the last an incomplete VInt block. + pub(crate) fn shallow_seek(&mut self, target_doc: DocId) { + if self.skip_reader.seek(target_doc) { + self.block_max_score_cache = None; + self.block_loaded = false; + } + } + + pub(crate) fn block_is_loaded(&self) -> bool { + self.block_loaded + } + + pub(crate) fn load_block(&mut self) { + let offset = self.skip_reader.byte_offset(); + if self.block_is_loaded() { + return; + } + match self.skip_reader.block_info() { + BlockInfo::BitPacked { + doc_num_bits, + strict_delta_encoded, + tf_num_bits, + .. + } => { + decode_bitpacked_block( + &mut self.doc_decoder, + if let FreqReadingOption::ReadFreq = self.freq_reading_option { + Some(&mut self.freq_decoder) + } else { + None + }, + &self.data[offset..], + self.skip_reader.last_doc_in_previous_block, + doc_num_bits, + tf_num_bits, + strict_delta_encoded, + ); + } + BlockInfo::VInt { num_docs } => { + let data = { + if num_docs == 0 { + &[] + } else { + &self.data[offset..] + } + }; + decode_vint_block( + &mut self.doc_decoder, + if let FreqReadingOption::ReadFreq = self.freq_reading_option { + Some(&mut self.freq_decoder) + } else { + None + }, + data, + self.skip_reader.last_doc_in_previous_block, + num_docs as usize, + ); + } + } + self.block_loaded = true; + } + + /// Advance to the next block. + pub fn advance(&mut self) { + self.skip_reader.advance(); + self.block_loaded = false; + self.block_max_score_cache = None; + self.load_block(); + } +} diff --git a/src/postings/borrowed_segment_postings.rs b/src/postings/borrowed_segment_postings.rs new file mode 100644 index 000000000..b2f18dc51 --- /dev/null +++ b/src/postings/borrowed_segment_postings.rs @@ -0,0 +1,179 @@ +use common::HasLen; + +use crate::docset::DocSet; +use crate::fastfield::AliveBitSet; +use crate::positions::borrowed_position_reader::BorrowedPositionReader; +use crate::postings::borrowed_block_segment_postings::BorrowedBlockSegmentPostings; +use crate::postings::branchless_binary_search; +use crate::postings::compression::COMPRESSION_BLOCK_SIZE; +use crate::{DocId, TERMINATED}; + +/// `SegmentPostings` represents the inverted list or postings associated with +/// a term in a `Segment`. +/// +/// As we iterate through the `SegmentPostings`, the frequencies are optionally decoded. +/// Positions on the other hand, are optionally entirely decoded upfront. +#[derive(Clone)] +pub struct BorrowedSegmentPostings<'a> { + pub(crate) block_cursor: BorrowedBlockSegmentPostings<'a>, + cur: usize, + position_reader: Option>, +} + +impl BorrowedSegmentPostings<'_> { + /// Compute the number of non-deleted documents. + /// + /// This method will clone and scan through the posting lists. + /// (this is a rather expensive operation). + pub fn doc_freq_given_deletes(&self, alive_bitset: &AliveBitSet) -> u32 { + let mut docset = self.clone(); + let mut doc_freq = 0; + loop { + let doc = docset.doc(); + if doc == TERMINATED { + return doc_freq; + } + if alive_bitset.is_alive(doc) { + doc_freq += 1u32; + } + docset.advance(); + } + } + + /// Returns the overall number of documents in the block postings. + /// It does not take in account whether documents are deleted or not. + pub fn doc_freq(&self) -> u32 { + self.block_cursor.doc_freq() + } + + /// Reads a Segment postings from an &[u8] + /// + /// * `len` - number of document in the posting lists. + /// * `data` - data array. The complete data is not necessarily used. + /// * `freq_handler` - the freq handler is in charge of decoding frequencies and/or positions + pub(crate) fn from_block_postings<'a>( + segment_block_postings: BorrowedBlockSegmentPostings<'a>, + position_reader: Option>, + ) -> BorrowedSegmentPostings<'a> { + BorrowedSegmentPostings { + block_cursor: segment_block_postings, + cur: 0, // cursor within the block + position_reader, + } + } + + pub fn term_freq(&self) -> u32 { + debug_assert!( + // Here we do not use the len of `freqs()` + // because it is actually ok to request for the freq of doc + // even if no frequency were encoded for the field. + // + // In that case we hit the block just as if the frequency had been + // decoded. The block is simply prefilled by the value 1. + self.cur < COMPRESSION_BLOCK_SIZE, + "Have you forgotten to call `.advance()` at least once before calling `.term_freq()`." + ); + self.block_cursor.freq(self.cur) + } + + /// Returns the positions offsetted with a given value. + /// It is not necessary to clear the `output` before calling this method. + /// The output vector will be resized to the `term_freq`. + fn positions_with_offset(&mut self, offset: u32, output: &mut Vec) { + output.clear(); + self.append_positions_with_offset(offset, output); + } + + /// Returns the positions offsetted with a given value. + /// Data will be appended to the output. + fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec) { + let term_freq = self.term_freq(); + let prev_len = output.len(); + if let Some(position_reader) = self.position_reader.as_mut() { + debug_assert!( + !self.block_cursor.freqs().is_empty(), + "No positions available" + ); + let read_offset = self.block_cursor.position_offset() + + (self.block_cursor.freqs()[..self.cur] + .iter() + .cloned() + .sum::() as u64); + // TODO: instead of zeroing the output, we could use MaybeUninit or similar. + output.resize(prev_len + term_freq as usize, 0u32); + position_reader.read(read_offset, &mut output[prev_len..]); + let mut cum = offset; + for output_mut in output[prev_len..].iter_mut() { + cum += *output_mut; + *output_mut = cum; + } + } + } + + /// Returns the positions of the term in the given document. + /// The output vector will be resized to the `term_freq`. + pub fn positions(&mut self, output: &mut Vec) { + self.positions_with_offset(0u32, output); + } +} + +impl DocSet for BorrowedSegmentPostings<'_> { + // goes to the next element. + // next needs to be called a first time to point to the correct element. + #[inline] + fn advance(&mut self) -> DocId { + debug_assert!(self.block_cursor.block_is_loaded()); + if self.cur == COMPRESSION_BLOCK_SIZE - 1 { + self.cur = 0; + self.block_cursor.advance(); + } else { + self.cur += 1; + } + self.doc() + } + + fn seek(&mut self, target: DocId) -> DocId { + debug_assert!(self.doc() <= target); + if self.doc() >= target { + return self.doc(); + } + + self.block_cursor.seek(target); + + // At this point we are on the block, that might contain our document. + let output = self.block_cursor.full_block(); + self.cur = branchless_binary_search(output, target); + + // The last block is not full and padded with the value TERMINATED, + // so that we are guaranteed to have at least doc in the block (a real one or the padding) + // that is greater or equal to the target. + debug_assert!(self.cur < COMPRESSION_BLOCK_SIZE); + + // `doc` is now the first element >= `target` + + // If all docs are smaller than target the current block should be incomplemented and padded + // with the value `TERMINATED`. + // + // After the search, the cursor should point to the first value of TERMINATED. + let doc = output[self.cur]; + debug_assert!(doc >= target); + debug_assert_eq!(doc, self.doc()); + doc + } + + /// Return the current document's `DocId`. + #[inline] + fn doc(&self) -> DocId { + self.block_cursor.doc(self.cur) + } + + fn size_hint(&self) -> u32 { + self.len() as u32 + } +} + +impl HasLen for BorrowedSegmentPostings<'_> { + fn len(&self) -> usize { + self.block_cursor.doc_freq() as usize + } +} diff --git a/src/postings/borrowed_skip_reader.rs b/src/postings/borrowed_skip_reader.rs new file mode 100644 index 000000000..9e92bf3e1 --- /dev/null +++ b/src/postings/borrowed_skip_reader.rs @@ -0,0 +1,160 @@ +use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE}; +use crate::postings::skip::{decode_bitwidth, decode_block_wand_max_tf, read_u32}; +use crate::postings::BlockInfo; +use crate::schema::IndexRecordOption; +use crate::{DocId, TERMINATED}; + +#[derive(Clone)] +pub(crate) struct BorrowedSkipReader<'a> { + last_doc_in_block: DocId, + pub(crate) last_doc_in_previous_block: DocId, + owned_read: &'a [u8], + skip_info: IndexRecordOption, + byte_offset: usize, + remaining_docs: u32, // number of docs remaining, including the + // documents in the current block. + block_info: BlockInfo, + + position_offset: u64, +} + +impl<'a> BorrowedSkipReader<'a> { + pub fn new( + data: &'a [u8], + doc_freq: u32, + skip_info: IndexRecordOption, + ) -> BorrowedSkipReader<'a> { + let mut skip_reader = BorrowedSkipReader { + last_doc_in_block: if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { + 0 + } else { + TERMINATED + }, + last_doc_in_previous_block: 0u32, + owned_read: data, + skip_info, + block_info: BlockInfo::VInt { num_docs: doc_freq }, + byte_offset: 0, + remaining_docs: doc_freq, + position_offset: 0u64, + }; + if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { + skip_reader.read_block_info(); + } + skip_reader + } + + pub(crate) fn last_doc_in_block(&self) -> DocId { + self.last_doc_in_block + } + + pub fn position_offset(&self) -> u64 { + self.position_offset + } + + #[inline] + pub fn byte_offset(&self) -> usize { + self.byte_offset + } + + fn read_block_info(&mut self) { + let bytes = self.owned_read; + let advance_len: usize; + self.last_doc_in_block = read_u32(bytes); + let (doc_num_bits, strict_delta_encoded) = decode_bitwidth(bytes[4]); + match self.skip_info { + IndexRecordOption::Basic => { + advance_len = 5; + self.block_info = BlockInfo::BitPacked { + doc_num_bits, + strict_delta_encoded, + tf_num_bits: 0, + tf_sum: 0, + block_wand_fieldnorm_id: 0, + block_wand_term_freq: 0, + }; + } + IndexRecordOption::WithFreqs => { + let tf_num_bits = bytes[5]; + let block_wand_fieldnorm_id = bytes[6]; + let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]); + advance_len = 8; + self.block_info = BlockInfo::BitPacked { + doc_num_bits, + strict_delta_encoded, + tf_num_bits, + tf_sum: 0, + block_wand_fieldnorm_id, + block_wand_term_freq, + }; + } + IndexRecordOption::WithFreqsAndPositions => { + let tf_num_bits = bytes[5]; + let tf_sum = read_u32(&bytes[6..10]); + let block_wand_fieldnorm_id = bytes[10]; + let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]); + advance_len = 12; + self.block_info = BlockInfo::BitPacked { + doc_num_bits, + strict_delta_encoded, + tf_num_bits, + tf_sum, + block_wand_fieldnorm_id, + block_wand_term_freq, + }; + } + } + // self.owned_read.advance(advance_len); + let (_, rest) = self.owned_read.split_at(advance_len); + self.owned_read = rest; + } + + pub fn block_info(&self) -> BlockInfo { + self.block_info + } + + /// Advance the skip reader to the block that may contain the target. + /// + /// If the target is larger than all documents, the skip_reader + /// then advance to the last Variable In block. + pub fn seek(&mut self, target: DocId) -> bool { + if self.last_doc_in_block() >= target { + return false; + } + loop { + self.advance(); + if self.last_doc_in_block() >= target { + return true; + } + } + } + + pub fn advance(&mut self) { + match self.block_info { + BlockInfo::BitPacked { + doc_num_bits, + tf_num_bits, + tf_sum, + .. + } => { + self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32; + self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits); + self.position_offset += tf_sum as u64; + } + BlockInfo::VInt { num_docs } => { + debug_assert_eq!(num_docs, self.remaining_docs); + self.remaining_docs = 0; + self.byte_offset = usize::MAX; + } + } + self.last_doc_in_previous_block = self.last_doc_in_block; + if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 { + self.read_block_info(); + } else { + self.last_doc_in_block = TERMINATED; + self.block_info = BlockInfo::VInt { + num_docs: self.remaining_docs, + }; + } + } +} diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 41b9ab0c3..566dabbf3 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -4,7 +4,10 @@ mod block_search; pub(crate) use self::block_search::branchless_binary_search; -mod block_segment_postings; +pub(crate) mod block_segment_postings; +pub(crate) mod borrowed_block_segment_postings; +pub(crate) mod borrowed_segment_postings; +mod borrowed_skip_reader; pub(crate) mod compression; mod indexing_context; mod json_postings_writer; @@ -15,7 +18,7 @@ mod postings_writer; mod recorder; mod segment_postings; mod serializer; -mod skip; +pub(crate) mod skip; mod term_info; pub(crate) use loaded_postings::LoadedPostings; diff --git a/src/postings/skip.rs b/src/postings/skip.rs index c36690444..6e56a4b57 100644 --- a/src/postings/skip.rs +++ b/src/postings/skip.rs @@ -10,23 +10,23 @@ use crate::{DocId, Score, TERMINATED}; // - 1: unused // - 2: is delta-1 encoded. 0 if not, 1, if yes // - 3: a 6 bit number in 0..=32, the actual bitwidth -fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 { +pub(crate) fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 { bitwidth | ((delta_1 as u8) << 6) } -fn decode_bitwidth(raw_bitwidth: u8) -> (u8, bool) { - let delta_1 = ((raw_bitwidth >> 6) & 1) != 0; +pub(crate) fn decode_bitwidth(raw_bitwidth: u8) -> (u8, bool) { + let delta_1 = (raw_bitwidth >> 6 & 1) != 0; let bitwidth = raw_bitwidth & 0x3f; (bitwidth, delta_1) } #[inline] -fn encode_block_wand_max_tf(max_tf: u32) -> u8 { +pub(crate) fn encode_block_wand_max_tf(max_tf: u32) -> u8 { max_tf.min(u8::MAX as u32) as u8 } #[inline] -fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 { +pub(crate) fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 { if max_tf_code == u8::MAX { u32::MAX } else { @@ -35,12 +35,12 @@ fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 { } #[inline] -fn read_u32(data: &[u8]) -> u32 { +pub(crate) fn read_u32(data: &[u8]) -> u32 { u32::from_le_bytes(data[..4].try_into().unwrap()) } #[inline] -fn write_u32(val: u32, buf: &mut Vec) { +pub(crate) fn write_u32(val: u32, buf: &mut Vec) { buf.extend_from_slice(&val.to_le_bytes()); }