From 7183ac6cbcddccc52e5136ab6620f44eb0f45fe0 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Mon, 20 Oct 2025 10:02:09 -0700 Subject: [PATCH] fix: Use smaller buffers during merging (#71) `MergeOptimizedInvertedIndexReader` was added in #32 in order to avoid making small reads to our underlying `FileHandle`. It did so by reading the entire content of the posting lists and positions at open time. As that PR says: > A likely downside to this approach is that now pg_search will be, indirectly, holding onto a lot of heap-allocated memory that was read from its block storage. Perhaps in the (near) future we can further optimize the new `MergeOptimizedInvertedIndexReader` such that it pages in blocks of a few megabytes at a time, on demand, rather than the whole file. This PR makes that change. But it additionally removes code that was later added in #47 to borrow individual entries rather than creating `OwnedBytes` for them. I believe that this code was added due to a misunderstanding: `OwnedBytes` is a total misnomer: the bytes are not "owned": they are immutably borrowed and reference counted. An `OwnedBytes` object can be created for any type which derefs to a slice of bytes, and can be cheaply cloned and sliced. So there is no need to actually borrow _or_ copy the buffer under the `OwnedBytes`. Removing the code that was doing so allows us to safely recreate our buffer without worrying about the lifetimes of buffers that we've handed out. --- common/src/buffered_file_slice.rs | 106 +++++++++ common/src/lib.rs | 1 + src/directory/mod.rs | 1 + .../merge_optimized_inverted_index_reader.rs | 47 ++-- src/indexer/merger.rs | 6 +- src/positions/borrowed_position_reader.rs | 155 ------------ src/positions/mod.rs | 2 +- .../borrowed_block_segment_postings.rs | 223 ------------------ src/postings/borrowed_segment_postings.rs | 179 -------------- src/postings/borrowed_skip_reader.rs | 160 ------------- src/postings/mod.rs | 4 +- 11 files changed, 136 insertions(+), 748 deletions(-) create mode 100644 common/src/buffered_file_slice.rs delete mode 100644 src/positions/borrowed_position_reader.rs delete mode 100644 src/postings/borrowed_block_segment_postings.rs delete mode 100644 src/postings/borrowed_segment_postings.rs delete mode 100644 src/postings/borrowed_skip_reader.rs diff --git a/common/src/buffered_file_slice.rs b/common/src/buffered_file_slice.rs new file mode 100644 index 000000000..fe96fe22d --- /dev/null +++ b/common/src/buffered_file_slice.rs @@ -0,0 +1,106 @@ +use std::cell::RefCell; +use std::cmp::min; +use std::io; +use std::ops::Range; + +use super::file_slice::FileSlice; +use super::{HasLen, OwnedBytes}; + +const DEFAULT_BUFFER_MAX_SIZE: usize = 4 * 1024 * 1024; // 4 MB + +/// A buffered reader for a FileSlice. +/// +/// Reads the underlying `FileSlice` in large, sequential chunks to amortize +/// the cost of `read_bytes` calls, while keeping peak memory usage under control. +/// +/// TODO: Rather than wrapping a `FileSlice` in buffering, it will usually be better to adjust a +/// `FileHandle` to directly handle buffering itself (as that allows separate `FileSlice`s read +/// from the same `FileHandle` to share buffers.) +pub struct BufferedFileSlice { + file_slice: FileSlice, + buffer: RefCell, + buffer_range: RefCell>, + buffer_max_size: usize, +} + +impl BufferedFileSlice { + /// Creates a new `BufferedFileSlice`. + /// + /// The `buffer_max_size` is the amount of data that will be read from the + /// `FileSlice` on a buffer miss. + pub fn new(file_slice: FileSlice, buffer_max_size: usize) -> Self { + Self { + file_slice, + buffer: RefCell::new(OwnedBytes::empty()), + buffer_range: RefCell::new(0..0), + buffer_max_size, + } + } + + /// Creates a new `BufferedFileSlice` with a default buffer max size. + pub fn new_with_default_buffer_size(file_slice: FileSlice) -> Self { + Self::new(file_slice, DEFAULT_BUFFER_MAX_SIZE) + } + + /// Creates an empty `BufferedFileSlice`. + pub fn empty() -> Self { + Self::new(FileSlice::empty(), 0) + } + + /// Returns an `OwnedBytes` corresponding to the given `required_range`. + /// + /// If the requested range is not in the buffer, this will trigger a read + /// from the underlying `FileSlice`. + /// + /// If the requested range is larger than the buffer_max_size, it will be read directly from the + /// source without buffering. + /// + /// # Errors + /// + /// Returns an `io::Error` if the underlying read fails or the range is + /// out of bounds. + pub fn get_bytes(&self, required_range: Range) -> io::Result { + let buffer_range = self.buffer_range.borrow(); + + // Cache miss condition: the required range is not fully contained in the current buffer. + if required_range.start < buffer_range.start || required_range.end > buffer_range.end { + drop(buffer_range); // release borrow before mutating + + if required_range.end > self.file_slice.len() as u64 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Requested range extends beyond the end of the file slice.", + )); + } + + if (required_range.end - required_range.start) as usize > self.buffer_max_size { + // This read is larger than our buffer max size. + // Read it directly and bypass the buffer to avoid churning. + return self + .file_slice + .read_bytes_slice(required_range.start as usize..required_range.end as usize); + } + + let new_buffer_start = required_range.start; + let new_buffer_end = min( + new_buffer_start + self.buffer_max_size as u64, + self.file_slice.len() as u64, + ); + let read_range = new_buffer_start..new_buffer_end; + + let new_buffer = self + .file_slice + .read_bytes_slice(read_range.start as usize..read_range.end as usize)?; + + self.buffer.replace(new_buffer); + self.buffer_range.replace(read_range); + } + + // Now the data is guaranteed to be in the buffer. + let buffer = self.buffer.borrow(); + let buffer_range = self.buffer_range.borrow(); + let local_start = (required_range.start - buffer_range.start) as usize; + let local_end = (required_range.end - buffer_range.start) as usize; + Ok(buffer.slice(local_start..local_end)) + } +} diff --git a/common/src/lib.rs b/common/src/lib.rs index 4e64af11c..b1cb4745f 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -6,6 +6,7 @@ pub use byteorder::LittleEndian as Endianness; mod bitset; pub mod bounds; +pub mod buffered_file_slice; mod byte_count; mod datetime; pub mod file_slice; diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 0392a9a0b..f0f15ae32 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -19,6 +19,7 @@ mod composite_file; use std::io::BufWriter; use std::path::PathBuf; +pub use common::buffered_file_slice::BufferedFileSlice; pub use common::file_slice::{FileHandle, FileSlice}; pub use common::{AntiCallToken, OwnedBytes, TerminatingWrite}; diff --git a/src/index/merge_optimized_inverted_index_reader.rs b/src/index/merge_optimized_inverted_index_reader.rs index 3dc0534a4..1df710400 100644 --- a/src/index/merge_optimized_inverted_index_reader.rs +++ b/src/index/merge_optimized_inverted_index_reader.rs @@ -1,28 +1,25 @@ use std::io; -use common::OwnedBytes; - -use crate::directory::FileSlice; -use crate::positions::borrowed_position_reader::BorrowedPositionReader; -use crate::postings::borrowed_block_segment_postings::BorrowedBlockSegmentPostings; -use crate::postings::borrowed_segment_postings::BorrowedSegmentPostings; -use crate::postings::TermInfo; +use crate::directory::{BufferedFileSlice, FileSlice}; +use crate::positions::PositionReader; +use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo}; use crate::schema::IndexRecordOption; use crate::termdict::TermDictionary; /// The inverted index reader is in charge of accessing /// the inverted index associated with a specific field. /// -/// This is optimized for merging in that it full reads -/// the postings and positions files into memory when opened. -/// This eliminates all disk I/O to these files during merging. +/// This is optimized for merging in that it uses a buffered reader +/// for the postings and positions files. +/// This eliminates most disk I/O to these files during merging, without +/// reading the entire file into memory at once. /// /// NB: This is a copy/paste from [`InvertedIndexReader`] and trimmed /// down to only include the methods required by the merge process. pub(crate) struct MergeOptimizedInvertedIndexReader { termdict: TermDictionary, - postings_bytes: OwnedBytes, - positions_bytes: OwnedBytes, + postings_reader: BufferedFileSlice, + positions_reader: BufferedFileSlice, record_option: IndexRecordOption, } @@ -36,8 +33,8 @@ impl MergeOptimizedInvertedIndexReader { let (_, postings_body) = postings_file_slice.split(8); Ok(MergeOptimizedInvertedIndexReader { termdict, - postings_bytes: postings_body.read_bytes()?, - positions_bytes: positions_file_slice.read_bytes()?, + postings_reader: BufferedFileSlice::new_with_default_buffer_size(postings_body), + positions_reader: BufferedFileSlice::new_with_default_buffer_size(positions_file_slice), record_option, }) } @@ -47,8 +44,8 @@ impl MergeOptimizedInvertedIndexReader { pub fn empty(record_option: IndexRecordOption) -> MergeOptimizedInvertedIndexReader { MergeOptimizedInvertedIndexReader { termdict: TermDictionary::empty(), - postings_bytes: FileSlice::empty().read_bytes().unwrap(), - positions_bytes: FileSlice::empty().read_bytes().unwrap(), + postings_reader: BufferedFileSlice::empty(), + positions_reader: BufferedFileSlice::empty(), record_option, } } @@ -66,9 +63,11 @@ impl MergeOptimizedInvertedIndexReader { &self, term_info: &TermInfo, requested_option: IndexRecordOption, - ) -> io::Result { - let postings_data = &self.postings_bytes[term_info.postings_range.clone()]; - BorrowedBlockSegmentPostings::open( + ) -> io::Result { + let postings_data = self.postings_reader.get_bytes( + term_info.postings_range.start as u64..term_info.postings_range.end as u64, + )?; + BlockSegmentPostings::open( term_info.doc_freq, postings_data, self.record_option, @@ -84,20 +83,22 @@ impl MergeOptimizedInvertedIndexReader { &self, term_info: &TermInfo, option: IndexRecordOption, - ) -> io::Result { + ) -> io::Result { let option = option.downgrade(self.record_option); let block_postings = self.read_block_postings_from_terminfo(term_info, option)?; let position_reader = { if option.has_positions() { - let positions_data = &self.positions_bytes[term_info.positions_range.clone()]; - let position_reader = BorrowedPositionReader::open(positions_data)?; + let positions_data = self.positions_reader.get_bytes( + term_info.positions_range.start as u64..term_info.positions_range.end as u64, + )?; + let position_reader = PositionReader::open(positions_data)?; Some(position_reader) } else { None } }; - Ok(BorrowedSegmentPostings::from_block_postings( + Ok(SegmentPostings::from_block_postings( block_postings, position_reader, )) diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 0566d9016..b7188c316 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -17,8 +17,7 @@ use crate::index::{Segment, SegmentComponent, SegmentReader}; use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping}; use crate::indexer::segment_updater::CancelSentinel; use crate::indexer::SegmentSerializer; -use crate::postings::borrowed_segment_postings::BorrowedSegmentPostings; -use crate::postings::InvertedIndexSerializer; +use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings}; use crate::schema::{value_type_to_column_type, Field, FieldType, Schema}; use crate::store::StoreWriter; use crate::termdict::{TermMerger, TermOrdinal}; @@ -373,8 +372,7 @@ impl IndexMerger { indexed. Have you modified the schema?", ); - let mut segment_postings_containing_the_term: Vec<(usize, BorrowedSegmentPostings)> = - vec![]; + let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![]; let mut cnt = 0; while merged_terms.advance() { diff --git a/src/positions/borrowed_position_reader.rs b/src/positions/borrowed_position_reader.rs deleted file mode 100644 index a3b7fa862..000000000 --- a/src/positions/borrowed_position_reader.rs +++ /dev/null @@ -1,155 +0,0 @@ -use std::io; - -use common::{BinarySerializable, VInt}; - -use crate::positions::COMPRESSION_BLOCK_SIZE; -use crate::postings::compression::{BlockDecoder, VIntDecoder}; - -/// When accessing the positions of a term, we get a positions_idx from the `Terminfo`. -/// This means we need to skip to the `nth` position efficiently. -/// -/// Blocks are compressed using bitpacking, so `skip_read` contains the number of bits -/// (values can go from 0 to 32 bits) required to decompress every block. -/// -/// A given block obviously takes `(128 x num_bit_for_the_block / num_bits_in_a_byte)`, -/// so skipping a block without decompressing it is just a matter of advancing that many -/// bytes. - -#[derive(Clone)] -pub struct BorrowedPositionReader<'a> { - bit_widths: &'a [u8], - positions: &'a [u8], - - block_decoder: BlockDecoder, - - // offset, expressed in positions, for the first position of the block currently loaded - // block_offset is a multiple of COMPRESSION_BLOCK_SIZE. - block_offset: u64, - // offset, expressed in positions, for the position of the first block encoded - // in the `self.positions` bytes, and if bitpacked, compressed using the bitwidth in - // `self.bit_widths`. - // - // As we advance, anchor increases simultaneously with bit_widths and positions get consumed. - anchor_offset: u64, - - // These are just copies used for .reset(). - original_bit_widths: &'a [u8], - original_positions: &'a [u8], -} - -impl<'a> BorrowedPositionReader<'a> { - /// Open and reads the term positions encoded into the positions_data owned bytes. - pub fn open(mut positions_data: &'a [u8]) -> io::Result> { - let num_positions_bitpacked_blocks = VInt::deserialize(&mut positions_data)?.0 as usize; - let (bit_widths, positions) = positions_data.split_at(num_positions_bitpacked_blocks); - Ok(BorrowedPositionReader { - bit_widths, - positions, - block_decoder: BlockDecoder::default(), - block_offset: i64::MAX as u64, - anchor_offset: 0u64, - original_bit_widths: bit_widths, - original_positions: positions, - }) - } - - fn reset(&mut self) { - self.positions = self.original_positions; - self.bit_widths = self.original_bit_widths; - self.block_offset = i64::MAX as u64; - self.anchor_offset = 0u64; - } - - /// Advance from num_blocks bitpacked blocks. - /// - /// Panics if there are not that many remaining blocks. - fn advance_num_blocks(&mut self, num_blocks: usize) { - let num_bits: usize = self.bit_widths.as_ref()[..num_blocks] - .iter() - .cloned() - .map(|num_bits| num_bits as usize) - .sum(); - let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8; - - // self.bit_widths.advance(num_blocks); - let (_, rest) = self.bit_widths.split_at(num_blocks); - self.bit_widths = rest; - - // self.positions.advance(num_bytes_to_skip); - let (_, rest) = self.positions.split_at(num_bytes_to_skip); - self.positions = rest; - - self.anchor_offset += (num_blocks * COMPRESSION_BLOCK_SIZE) as u64; - } - - /// block_rel_id is counted relatively to the anchor. - /// block_rel_id = 0 means the anchor block. - /// block_rel_id = i means the ith block after the anchor block. - fn load_block(&mut self, block_rel_id: usize) { - let bit_widths = self.bit_widths; - let byte_offset: usize = bit_widths[0..block_rel_id] - .iter() - .map(|&b| b as usize) - .sum::() - * COMPRESSION_BLOCK_SIZE - / 8; - let compressed_data = &self.positions[byte_offset..]; - if bit_widths.len() > block_rel_id { - // that block is bitpacked. - let bit_width = bit_widths[block_rel_id]; - self.block_decoder - .uncompress_block_unsorted(compressed_data, bit_width, false); - } else { - // that block is vint encoded. - self.block_decoder - .uncompress_vint_unsorted_until_end(compressed_data); - } - self.block_offset = self.anchor_offset + (block_rel_id * COMPRESSION_BLOCK_SIZE) as u64; - } - - /// Fills a buffer with the positions `[offset..offset+output.len())` integers. - /// - /// This function is optimized to be called with increasing values of `offset`. - pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) { - if offset < self.anchor_offset { - self.reset(); - } - let delta_to_block_offset = offset as i64 - self.block_offset as i64; - if !(0..128).contains(&delta_to_block_offset) { - // The first position is not within the first block. - // (Note that it could be before or after) - // We need to possibly skip a few blocks, and decompress the first relevant block. - let delta_to_anchor_offset = offset - self.anchor_offset; - let num_blocks_to_skip = - (delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize; - self.advance_num_blocks(num_blocks_to_skip); - self.load_block(0); - } else { - // The request offset is within the loaded block. - // We still need to advance anchor_offset to our current block. - let num_blocks_to_skip = - ((self.block_offset - self.anchor_offset) / COMPRESSION_BLOCK_SIZE as u64) as usize; - self.advance_num_blocks(num_blocks_to_skip); - } - - // At this point, the block containing offset is loaded, and anchor has - // been updated to point to it as well. - for i in 1.. { - // we copy the part from block i - 1 that is relevant. - let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE; - let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block; - if remaining_in_block >= output.len() { - output.copy_from_slice( - &self.block_decoder.output_array()[offset_in_block..][..output.len()], - ); - break; - } - output[..remaining_in_block] - .copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]); - output = &mut output[remaining_in_block..]; - // we load block #i if necessary. - offset += remaining_in_block as u64; - self.load_block(i); - } - } -} diff --git a/src/positions/mod.rs b/src/positions/mod.rs index da9ee5844..865e0e5d8 100644 --- a/src/positions/mod.rs +++ b/src/positions/mod.rs @@ -28,7 +28,7 @@ //! * *VIntPosDeltas* := *VIntPosDelta*^(*P* % 128). //! //! The skip widths encoded separately makes it easy and fast to rapidly skip over n positions. -pub(crate) mod borrowed_position_reader; + mod reader; mod serializer; diff --git a/src/postings/borrowed_block_segment_postings.rs b/src/postings/borrowed_block_segment_postings.rs deleted file mode 100644 index c09e3d849..000000000 --- a/src/postings/borrowed_block_segment_postings.rs +++ /dev/null @@ -1,223 +0,0 @@ -use std::io; - -use common::VInt; - -use crate::postings::block_segment_postings::{decode_bitpacked_block, decode_vint_block}; -use crate::postings::borrowed_skip_reader::BorrowedSkipReader; -use crate::postings::compression::{BlockDecoder, COMPRESSION_BLOCK_SIZE}; -use crate::postings::{BlockInfo, FreqReadingOption}; -use crate::schema::IndexRecordOption; -use crate::{DocId, Score, TERMINATED}; - -/// `BorrowedBlockSegmentPostings` is a cursor iterating over blocks -/// of documents. -/// -/// # Warning -/// -/// While it is useful for some very specific high-performance -/// use cases, you should prefer using `SegmentPostings` for most usage. -#[derive(Clone)] -pub struct BorrowedBlockSegmentPostings<'a> { - pub(crate) doc_decoder: BlockDecoder, - block_loaded: bool, - freq_decoder: BlockDecoder, - freq_reading_option: FreqReadingOption, - block_max_score_cache: Option, - doc_freq: u32, - data: &'a [u8], - skip_reader: BorrowedSkipReader<'a>, -} - -fn split_into_skips_and_postings( - doc_freq: u32, - mut bytes: &[u8], -) -> io::Result<(Option<&[u8]>, &[u8])> { - if doc_freq < COMPRESSION_BLOCK_SIZE as u32 { - return Ok((None, bytes)); - } - let skip_len = VInt::deserialize_u64(&mut bytes)? as usize; - let (skip_data, postings_data) = bytes.split_at(skip_len); - Ok((Some(skip_data), postings_data)) -} - -impl<'a> BorrowedBlockSegmentPostings<'a> { - /// Opens a `BorrowedBlockSegmentPostings`. - /// `doc_freq` is the number of documents in the posting list. - /// `record_option` represents the amount of data available according to the schema. - /// `requested_option` is the amount of data requested by the user. - /// If for instance, we do not request for term frequencies, this function will not decompress - /// term frequency blocks. - pub(crate) fn open( - doc_freq: u32, - bytes: &'a [u8], - mut record_option: IndexRecordOption, - requested_option: IndexRecordOption, - ) -> io::Result> { - let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?; - let skip_reader = match skip_data_opt { - Some(skip_data) => { - let block_count = doc_freq as usize / COMPRESSION_BLOCK_SIZE; - // 8 is the minimum size of a block with frequency (can be more if pos are stored - // too) - if skip_data.len() < 8 * block_count { - // the field might be encoded with frequency, but this term in particular isn't. - // This can happen for JSON field with term frequencies: - // - text terms are encoded with term freqs. - // - numerical terms are encoded without term freqs. - record_option = IndexRecordOption::Basic; - } - BorrowedSkipReader::new(skip_data, doc_freq, record_option) - } - None => BorrowedSkipReader::new(&[], doc_freq, record_option), - }; - - let freq_reading_option = match (record_option, requested_option) { - (IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq, - (_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq, - (_, _) => FreqReadingOption::ReadFreq, - }; - - let mut block_segment_postings = BorrowedBlockSegmentPostings { - doc_decoder: BlockDecoder::with_val(TERMINATED), - block_loaded: false, - freq_decoder: BlockDecoder::with_val(1), - freq_reading_option, - block_max_score_cache: None, - doc_freq, - data: postings_data, - skip_reader, - }; - block_segment_postings.load_block(); - Ok(block_segment_postings) - } - - /// Returns the overall number of documents in the block postings. - /// It does not take in account whether documents are deleted or not. - /// - /// This `doc_freq` is simply the sum of the length of all of the blocks - /// length, and it does not take in account deleted documents. - pub fn doc_freq(&self) -> u32 { - self.doc_freq - } - - /// Returns a full block, regardless of whether the block is complete or incomplete ( - /// as it happens for the last block of the posting list). - /// - /// In the latter case, the block is guaranteed to be padded with the sentinel value: - /// `TERMINATED`. The array is also guaranteed to be aligned on 16 bytes = 128 bits. - /// - /// This method is useful to run SSE2 linear search. - #[inline] - pub(crate) fn full_block(&self) -> &[DocId; COMPRESSION_BLOCK_SIZE] { - debug_assert!(self.block_is_loaded()); - self.doc_decoder.full_output() - } - - /// Return the document at index `idx` of the block. - #[inline] - pub fn doc(&self, idx: usize) -> u32 { - self.doc_decoder.output(idx) - } - - /// Return the array of `term freq` in the block. - #[inline] - pub fn freqs(&self) -> &[u32] { - debug_assert!(self.block_is_loaded()); - self.freq_decoder.output_array() - } - - /// Return the frequency at index `idx` of the block. - #[inline] - pub fn freq(&self, idx: usize) -> u32 { - debug_assert!(self.block_is_loaded()); - self.freq_decoder.output(idx) - } - - /// Position on a block that may contains `target_doc`. - /// - /// If all docs are smaller than target, the block loaded may be empty, - /// or be the last an incomplete VInt block. - pub fn seek(&mut self, target_doc: DocId) { - self.shallow_seek(target_doc); - self.load_block(); - } - - pub(crate) fn position_offset(&self) -> u64 { - self.skip_reader.position_offset() - } - - /// Dangerous API! This calls seek on the skip list, - /// but does not `.load_block()` afterwards. - /// - /// `.load_block()` needs to be called manually afterwards. - /// If all docs are smaller than target, the block loaded may be empty, - /// or be the last an incomplete VInt block. - pub(crate) fn shallow_seek(&mut self, target_doc: DocId) { - if self.skip_reader.seek(target_doc) { - self.block_max_score_cache = None; - self.block_loaded = false; - } - } - - pub(crate) fn block_is_loaded(&self) -> bool { - self.block_loaded - } - - pub(crate) fn load_block(&mut self) { - let offset = self.skip_reader.byte_offset(); - if self.block_is_loaded() { - return; - } - match self.skip_reader.block_info() { - BlockInfo::BitPacked { - doc_num_bits, - strict_delta_encoded, - tf_num_bits, - .. - } => { - decode_bitpacked_block( - &mut self.doc_decoder, - if let FreqReadingOption::ReadFreq = self.freq_reading_option { - Some(&mut self.freq_decoder) - } else { - None - }, - &self.data[offset..], - self.skip_reader.last_doc_in_previous_block, - doc_num_bits, - tf_num_bits, - strict_delta_encoded, - ); - } - BlockInfo::VInt { num_docs } => { - let data = { - if num_docs == 0 { - &[] - } else { - &self.data[offset..] - } - }; - decode_vint_block( - &mut self.doc_decoder, - if let FreqReadingOption::ReadFreq = self.freq_reading_option { - Some(&mut self.freq_decoder) - } else { - None - }, - data, - self.skip_reader.last_doc_in_previous_block, - num_docs as usize, - ); - } - } - self.block_loaded = true; - } - - /// Advance to the next block. - pub fn advance(&mut self) { - self.skip_reader.advance(); - self.block_loaded = false; - self.block_max_score_cache = None; - self.load_block(); - } -} diff --git a/src/postings/borrowed_segment_postings.rs b/src/postings/borrowed_segment_postings.rs deleted file mode 100644 index b2f18dc51..000000000 --- a/src/postings/borrowed_segment_postings.rs +++ /dev/null @@ -1,179 +0,0 @@ -use common::HasLen; - -use crate::docset::DocSet; -use crate::fastfield::AliveBitSet; -use crate::positions::borrowed_position_reader::BorrowedPositionReader; -use crate::postings::borrowed_block_segment_postings::BorrowedBlockSegmentPostings; -use crate::postings::branchless_binary_search; -use crate::postings::compression::COMPRESSION_BLOCK_SIZE; -use crate::{DocId, TERMINATED}; - -/// `SegmentPostings` represents the inverted list or postings associated with -/// a term in a `Segment`. -/// -/// As we iterate through the `SegmentPostings`, the frequencies are optionally decoded. -/// Positions on the other hand, are optionally entirely decoded upfront. -#[derive(Clone)] -pub struct BorrowedSegmentPostings<'a> { - pub(crate) block_cursor: BorrowedBlockSegmentPostings<'a>, - cur: usize, - position_reader: Option>, -} - -impl BorrowedSegmentPostings<'_> { - /// Compute the number of non-deleted documents. - /// - /// This method will clone and scan through the posting lists. - /// (this is a rather expensive operation). - pub fn doc_freq_given_deletes(&self, alive_bitset: &AliveBitSet) -> u32 { - let mut docset = self.clone(); - let mut doc_freq = 0; - loop { - let doc = docset.doc(); - if doc == TERMINATED { - return doc_freq; - } - if alive_bitset.is_alive(doc) { - doc_freq += 1u32; - } - docset.advance(); - } - } - - /// Returns the overall number of documents in the block postings. - /// It does not take in account whether documents are deleted or not. - pub fn doc_freq(&self) -> u32 { - self.block_cursor.doc_freq() - } - - /// Reads a Segment postings from an &[u8] - /// - /// * `len` - number of document in the posting lists. - /// * `data` - data array. The complete data is not necessarily used. - /// * `freq_handler` - the freq handler is in charge of decoding frequencies and/or positions - pub(crate) fn from_block_postings<'a>( - segment_block_postings: BorrowedBlockSegmentPostings<'a>, - position_reader: Option>, - ) -> BorrowedSegmentPostings<'a> { - BorrowedSegmentPostings { - block_cursor: segment_block_postings, - cur: 0, // cursor within the block - position_reader, - } - } - - pub fn term_freq(&self) -> u32 { - debug_assert!( - // Here we do not use the len of `freqs()` - // because it is actually ok to request for the freq of doc - // even if no frequency were encoded for the field. - // - // In that case we hit the block just as if the frequency had been - // decoded. The block is simply prefilled by the value 1. - self.cur < COMPRESSION_BLOCK_SIZE, - "Have you forgotten to call `.advance()` at least once before calling `.term_freq()`." - ); - self.block_cursor.freq(self.cur) - } - - /// Returns the positions offsetted with a given value. - /// It is not necessary to clear the `output` before calling this method. - /// The output vector will be resized to the `term_freq`. - fn positions_with_offset(&mut self, offset: u32, output: &mut Vec) { - output.clear(); - self.append_positions_with_offset(offset, output); - } - - /// Returns the positions offsetted with a given value. - /// Data will be appended to the output. - fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec) { - let term_freq = self.term_freq(); - let prev_len = output.len(); - if let Some(position_reader) = self.position_reader.as_mut() { - debug_assert!( - !self.block_cursor.freqs().is_empty(), - "No positions available" - ); - let read_offset = self.block_cursor.position_offset() - + (self.block_cursor.freqs()[..self.cur] - .iter() - .cloned() - .sum::() as u64); - // TODO: instead of zeroing the output, we could use MaybeUninit or similar. - output.resize(prev_len + term_freq as usize, 0u32); - position_reader.read(read_offset, &mut output[prev_len..]); - let mut cum = offset; - for output_mut in output[prev_len..].iter_mut() { - cum += *output_mut; - *output_mut = cum; - } - } - } - - /// Returns the positions of the term in the given document. - /// The output vector will be resized to the `term_freq`. - pub fn positions(&mut self, output: &mut Vec) { - self.positions_with_offset(0u32, output); - } -} - -impl DocSet for BorrowedSegmentPostings<'_> { - // goes to the next element. - // next needs to be called a first time to point to the correct element. - #[inline] - fn advance(&mut self) -> DocId { - debug_assert!(self.block_cursor.block_is_loaded()); - if self.cur == COMPRESSION_BLOCK_SIZE - 1 { - self.cur = 0; - self.block_cursor.advance(); - } else { - self.cur += 1; - } - self.doc() - } - - fn seek(&mut self, target: DocId) -> DocId { - debug_assert!(self.doc() <= target); - if self.doc() >= target { - return self.doc(); - } - - self.block_cursor.seek(target); - - // At this point we are on the block, that might contain our document. - let output = self.block_cursor.full_block(); - self.cur = branchless_binary_search(output, target); - - // The last block is not full and padded with the value TERMINATED, - // so that we are guaranteed to have at least doc in the block (a real one or the padding) - // that is greater or equal to the target. - debug_assert!(self.cur < COMPRESSION_BLOCK_SIZE); - - // `doc` is now the first element >= `target` - - // If all docs are smaller than target the current block should be incomplemented and padded - // with the value `TERMINATED`. - // - // After the search, the cursor should point to the first value of TERMINATED. - let doc = output[self.cur]; - debug_assert!(doc >= target); - debug_assert_eq!(doc, self.doc()); - doc - } - - /// Return the current document's `DocId`. - #[inline] - fn doc(&self) -> DocId { - self.block_cursor.doc(self.cur) - } - - fn size_hint(&self) -> u32 { - self.len() as u32 - } -} - -impl HasLen for BorrowedSegmentPostings<'_> { - fn len(&self) -> usize { - self.block_cursor.doc_freq() as usize - } -} diff --git a/src/postings/borrowed_skip_reader.rs b/src/postings/borrowed_skip_reader.rs deleted file mode 100644 index 9e92bf3e1..000000000 --- a/src/postings/borrowed_skip_reader.rs +++ /dev/null @@ -1,160 +0,0 @@ -use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE}; -use crate::postings::skip::{decode_bitwidth, decode_block_wand_max_tf, read_u32}; -use crate::postings::BlockInfo; -use crate::schema::IndexRecordOption; -use crate::{DocId, TERMINATED}; - -#[derive(Clone)] -pub(crate) struct BorrowedSkipReader<'a> { - last_doc_in_block: DocId, - pub(crate) last_doc_in_previous_block: DocId, - owned_read: &'a [u8], - skip_info: IndexRecordOption, - byte_offset: usize, - remaining_docs: u32, // number of docs remaining, including the - // documents in the current block. - block_info: BlockInfo, - - position_offset: u64, -} - -impl<'a> BorrowedSkipReader<'a> { - pub fn new( - data: &'a [u8], - doc_freq: u32, - skip_info: IndexRecordOption, - ) -> BorrowedSkipReader<'a> { - let mut skip_reader = BorrowedSkipReader { - last_doc_in_block: if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { - 0 - } else { - TERMINATED - }, - last_doc_in_previous_block: 0u32, - owned_read: data, - skip_info, - block_info: BlockInfo::VInt { num_docs: doc_freq }, - byte_offset: 0, - remaining_docs: doc_freq, - position_offset: 0u64, - }; - if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { - skip_reader.read_block_info(); - } - skip_reader - } - - pub(crate) fn last_doc_in_block(&self) -> DocId { - self.last_doc_in_block - } - - pub fn position_offset(&self) -> u64 { - self.position_offset - } - - #[inline] - pub fn byte_offset(&self) -> usize { - self.byte_offset - } - - fn read_block_info(&mut self) { - let bytes = self.owned_read; - let advance_len: usize; - self.last_doc_in_block = read_u32(bytes); - let (doc_num_bits, strict_delta_encoded) = decode_bitwidth(bytes[4]); - match self.skip_info { - IndexRecordOption::Basic => { - advance_len = 5; - self.block_info = BlockInfo::BitPacked { - doc_num_bits, - strict_delta_encoded, - tf_num_bits: 0, - tf_sum: 0, - block_wand_fieldnorm_id: 0, - block_wand_term_freq: 0, - }; - } - IndexRecordOption::WithFreqs => { - let tf_num_bits = bytes[5]; - let block_wand_fieldnorm_id = bytes[6]; - let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]); - advance_len = 8; - self.block_info = BlockInfo::BitPacked { - doc_num_bits, - strict_delta_encoded, - tf_num_bits, - tf_sum: 0, - block_wand_fieldnorm_id, - block_wand_term_freq, - }; - } - IndexRecordOption::WithFreqsAndPositions => { - let tf_num_bits = bytes[5]; - let tf_sum = read_u32(&bytes[6..10]); - let block_wand_fieldnorm_id = bytes[10]; - let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]); - advance_len = 12; - self.block_info = BlockInfo::BitPacked { - doc_num_bits, - strict_delta_encoded, - tf_num_bits, - tf_sum, - block_wand_fieldnorm_id, - block_wand_term_freq, - }; - } - } - // self.owned_read.advance(advance_len); - let (_, rest) = self.owned_read.split_at(advance_len); - self.owned_read = rest; - } - - pub fn block_info(&self) -> BlockInfo { - self.block_info - } - - /// Advance the skip reader to the block that may contain the target. - /// - /// If the target is larger than all documents, the skip_reader - /// then advance to the last Variable In block. - pub fn seek(&mut self, target: DocId) -> bool { - if self.last_doc_in_block() >= target { - return false; - } - loop { - self.advance(); - if self.last_doc_in_block() >= target { - return true; - } - } - } - - pub fn advance(&mut self) { - match self.block_info { - BlockInfo::BitPacked { - doc_num_bits, - tf_num_bits, - tf_sum, - .. - } => { - self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32; - self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits); - self.position_offset += tf_sum as u64; - } - BlockInfo::VInt { num_docs } => { - debug_assert_eq!(num_docs, self.remaining_docs); - self.remaining_docs = 0; - self.byte_offset = usize::MAX; - } - } - self.last_doc_in_previous_block = self.last_doc_in_block; - if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 { - self.read_block_info(); - } else { - self.last_doc_in_block = TERMINATED; - self.block_info = BlockInfo::VInt { - num_docs: self.remaining_docs, - }; - } - } -} diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 45360a173..64795955f 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -5,9 +5,7 @@ mod block_search; pub(crate) use self::block_search::branchless_binary_search; pub(crate) mod block_segment_postings; -pub(crate) mod borrowed_block_segment_postings; -pub(crate) mod borrowed_segment_postings; -mod borrowed_skip_reader; + pub(crate) mod compression; mod indexing_context; mod json_postings_writer;