fix: Use smaller buffers during merging (#71)

`MergeOptimizedInvertedIndexReader` was added in #32 in order to avoid making small reads to our underlying `FileHandle`. It did so by reading the entire content of the posting lists and positions at open time.

As that PR says:
> A likely downside to this approach is that now pg_search will be, indirectly, holding onto a lot of heap-allocated memory that was read from its block storage. Perhaps in the (near) future we can further optimize the new `MergeOptimizedInvertedIndexReader` such that it pages in blocks of a few megabytes at a time, on demand, rather than the whole file.

This PR makes that change. But it additionally removes code that was later added in #47 to borrow individual entries rather than creating `OwnedBytes` for them. I believe that this code was added due to a misunderstanding:

`OwnedBytes` is a total misnomer: the bytes are not "owned": they are immutably borrowed and reference counted. An `OwnedBytes` object can be created for any type which derefs to a slice of bytes, and can be cheaply cloned and sliced. So there is no need to actually borrow _or_ copy the buffer under the `OwnedBytes`. Removing the code that was doing so allows us to safely recreate our buffer without worrying about the lifetimes of buffers that we've handed out.
This commit is contained in:
Stu Hood
2025-10-20 10:02:09 -07:00
committed by Stu Hood
parent e0476d2eb2
commit 7183ac6cbc
11 changed files with 136 additions and 748 deletions

View File

@@ -0,0 +1,106 @@
use std::cell::RefCell;
use std::cmp::min;
use std::io;
use std::ops::Range;
use super::file_slice::FileSlice;
use super::{HasLen, OwnedBytes};
const DEFAULT_BUFFER_MAX_SIZE: usize = 4 * 1024 * 1024; // 4 MB
/// A buffered reader for a FileSlice.
///
/// Reads the underlying `FileSlice` in large, sequential chunks to amortize
/// the cost of `read_bytes` calls, while keeping peak memory usage under control.
///
/// TODO: Rather than wrapping a `FileSlice` in buffering, it will usually be better to adjust a
/// `FileHandle` to directly handle buffering itself (as that allows separate `FileSlice`s read
/// from the same `FileHandle` to share buffers.)
pub struct BufferedFileSlice {
file_slice: FileSlice,
buffer: RefCell<OwnedBytes>,
buffer_range: RefCell<Range<u64>>,
buffer_max_size: usize,
}
impl BufferedFileSlice {
/// Creates a new `BufferedFileSlice`.
///
/// The `buffer_max_size` is the amount of data that will be read from the
/// `FileSlice` on a buffer miss.
pub fn new(file_slice: FileSlice, buffer_max_size: usize) -> Self {
Self {
file_slice,
buffer: RefCell::new(OwnedBytes::empty()),
buffer_range: RefCell::new(0..0),
buffer_max_size,
}
}
/// Creates a new `BufferedFileSlice` with a default buffer max size.
pub fn new_with_default_buffer_size(file_slice: FileSlice) -> Self {
Self::new(file_slice, DEFAULT_BUFFER_MAX_SIZE)
}
/// Creates an empty `BufferedFileSlice`.
pub fn empty() -> Self {
Self::new(FileSlice::empty(), 0)
}
/// Returns an `OwnedBytes` corresponding to the given `required_range`.
///
/// If the requested range is not in the buffer, this will trigger a read
/// from the underlying `FileSlice`.
///
/// If the requested range is larger than the buffer_max_size, it will be read directly from the
/// source without buffering.
///
/// # Errors
///
/// Returns an `io::Error` if the underlying read fails or the range is
/// out of bounds.
pub fn get_bytes(&self, required_range: Range<u64>) -> io::Result<OwnedBytes> {
let buffer_range = self.buffer_range.borrow();
// Cache miss condition: the required range is not fully contained in the current buffer.
if required_range.start < buffer_range.start || required_range.end > buffer_range.end {
drop(buffer_range); // release borrow before mutating
if required_range.end > self.file_slice.len() as u64 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Requested range extends beyond the end of the file slice.",
));
}
if (required_range.end - required_range.start) as usize > self.buffer_max_size {
// This read is larger than our buffer max size.
// Read it directly and bypass the buffer to avoid churning.
return self
.file_slice
.read_bytes_slice(required_range.start as usize..required_range.end as usize);
}
let new_buffer_start = required_range.start;
let new_buffer_end = min(
new_buffer_start + self.buffer_max_size as u64,
self.file_slice.len() as u64,
);
let read_range = new_buffer_start..new_buffer_end;
let new_buffer = self
.file_slice
.read_bytes_slice(read_range.start as usize..read_range.end as usize)?;
self.buffer.replace(new_buffer);
self.buffer_range.replace(read_range);
}
// Now the data is guaranteed to be in the buffer.
let buffer = self.buffer.borrow();
let buffer_range = self.buffer_range.borrow();
let local_start = (required_range.start - buffer_range.start) as usize;
let local_end = (required_range.end - buffer_range.start) as usize;
Ok(buffer.slice(local_start..local_end))
}
}

View File

@@ -6,6 +6,7 @@ pub use byteorder::LittleEndian as Endianness;
mod bitset;
pub mod bounds;
pub mod buffered_file_slice;
mod byte_count;
mod datetime;
pub mod file_slice;

View File

@@ -19,6 +19,7 @@ mod composite_file;
use std::io::BufWriter;
use std::path::PathBuf;
pub use common::buffered_file_slice::BufferedFileSlice;
pub use common::file_slice::{FileHandle, FileSlice};
pub use common::{AntiCallToken, OwnedBytes, TerminatingWrite};

View File

@@ -1,28 +1,25 @@
use std::io;
use common::OwnedBytes;
use crate::directory::FileSlice;
use crate::positions::borrowed_position_reader::BorrowedPositionReader;
use crate::postings::borrowed_block_segment_postings::BorrowedBlockSegmentPostings;
use crate::postings::borrowed_segment_postings::BorrowedSegmentPostings;
use crate::postings::TermInfo;
use crate::directory::{BufferedFileSlice, FileSlice};
use crate::positions::PositionReader;
use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo};
use crate::schema::IndexRecordOption;
use crate::termdict::TermDictionary;
/// The inverted index reader is in charge of accessing
/// the inverted index associated with a specific field.
///
/// This is optimized for merging in that it full reads
/// the postings and positions files into memory when opened.
/// This eliminates all disk I/O to these files during merging.
/// This is optimized for merging in that it uses a buffered reader
/// for the postings and positions files.
/// This eliminates most disk I/O to these files during merging, without
/// reading the entire file into memory at once.
///
/// NB: This is a copy/paste from [`InvertedIndexReader`] and trimmed
/// down to only include the methods required by the merge process.
pub(crate) struct MergeOptimizedInvertedIndexReader {
termdict: TermDictionary,
postings_bytes: OwnedBytes,
positions_bytes: OwnedBytes,
postings_reader: BufferedFileSlice,
positions_reader: BufferedFileSlice,
record_option: IndexRecordOption,
}
@@ -36,8 +33,8 @@ impl MergeOptimizedInvertedIndexReader {
let (_, postings_body) = postings_file_slice.split(8);
Ok(MergeOptimizedInvertedIndexReader {
termdict,
postings_bytes: postings_body.read_bytes()?,
positions_bytes: positions_file_slice.read_bytes()?,
postings_reader: BufferedFileSlice::new_with_default_buffer_size(postings_body),
positions_reader: BufferedFileSlice::new_with_default_buffer_size(positions_file_slice),
record_option,
})
}
@@ -47,8 +44,8 @@ impl MergeOptimizedInvertedIndexReader {
pub fn empty(record_option: IndexRecordOption) -> MergeOptimizedInvertedIndexReader {
MergeOptimizedInvertedIndexReader {
termdict: TermDictionary::empty(),
postings_bytes: FileSlice::empty().read_bytes().unwrap(),
positions_bytes: FileSlice::empty().read_bytes().unwrap(),
postings_reader: BufferedFileSlice::empty(),
positions_reader: BufferedFileSlice::empty(),
record_option,
}
}
@@ -66,9 +63,11 @@ impl MergeOptimizedInvertedIndexReader {
&self,
term_info: &TermInfo,
requested_option: IndexRecordOption,
) -> io::Result<BorrowedBlockSegmentPostings> {
let postings_data = &self.postings_bytes[term_info.postings_range.clone()];
BorrowedBlockSegmentPostings::open(
) -> io::Result<BlockSegmentPostings> {
let postings_data = self.postings_reader.get_bytes(
term_info.postings_range.start as u64..term_info.postings_range.end as u64,
)?;
BlockSegmentPostings::open(
term_info.doc_freq,
postings_data,
self.record_option,
@@ -84,20 +83,22 @@ impl MergeOptimizedInvertedIndexReader {
&self,
term_info: &TermInfo,
option: IndexRecordOption,
) -> io::Result<BorrowedSegmentPostings> {
) -> io::Result<SegmentPostings> {
let option = option.downgrade(self.record_option);
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
let position_reader = {
if option.has_positions() {
let positions_data = &self.positions_bytes[term_info.positions_range.clone()];
let position_reader = BorrowedPositionReader::open(positions_data)?;
let positions_data = self.positions_reader.get_bytes(
term_info.positions_range.start as u64..term_info.positions_range.end as u64,
)?;
let position_reader = PositionReader::open(positions_data)?;
Some(position_reader)
} else {
None
}
};
Ok(BorrowedSegmentPostings::from_block_postings(
Ok(SegmentPostings::from_block_postings(
block_postings,
position_reader,
))

View File

@@ -17,8 +17,7 @@ use crate::index::{Segment, SegmentComponent, SegmentReader};
use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping};
use crate::indexer::segment_updater::CancelSentinel;
use crate::indexer::SegmentSerializer;
use crate::postings::borrowed_segment_postings::BorrowedSegmentPostings;
use crate::postings::InvertedIndexSerializer;
use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema};
use crate::store::StoreWriter;
use crate::termdict::{TermMerger, TermOrdinal};
@@ -373,8 +372,7 @@ impl IndexMerger {
indexed. Have you modified the schema?",
);
let mut segment_postings_containing_the_term: Vec<(usize, BorrowedSegmentPostings)> =
vec![];
let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![];
let mut cnt = 0;
while merged_terms.advance() {

View File

@@ -1,155 +0,0 @@
use std::io;
use common::{BinarySerializable, VInt};
use crate::positions::COMPRESSION_BLOCK_SIZE;
use crate::postings::compression::{BlockDecoder, VIntDecoder};
/// When accessing the positions of a term, we get a positions_idx from the `Terminfo`.
/// This means we need to skip to the `nth` position efficiently.
///
/// Blocks are compressed using bitpacking, so `skip_read` contains the number of bits
/// (values can go from 0 to 32 bits) required to decompress every block.
///
/// A given block obviously takes `(128 x num_bit_for_the_block / num_bits_in_a_byte)`,
/// so skipping a block without decompressing it is just a matter of advancing that many
/// bytes.
#[derive(Clone)]
pub struct BorrowedPositionReader<'a> {
bit_widths: &'a [u8],
positions: &'a [u8],
block_decoder: BlockDecoder,
// offset, expressed in positions, for the first position of the block currently loaded
// block_offset is a multiple of COMPRESSION_BLOCK_SIZE.
block_offset: u64,
// offset, expressed in positions, for the position of the first block encoded
// in the `self.positions` bytes, and if bitpacked, compressed using the bitwidth in
// `self.bit_widths`.
//
// As we advance, anchor increases simultaneously with bit_widths and positions get consumed.
anchor_offset: u64,
// These are just copies used for .reset().
original_bit_widths: &'a [u8],
original_positions: &'a [u8],
}
impl<'a> BorrowedPositionReader<'a> {
/// Open and reads the term positions encoded into the positions_data owned bytes.
pub fn open(mut positions_data: &'a [u8]) -> io::Result<BorrowedPositionReader<'a>> {
let num_positions_bitpacked_blocks = VInt::deserialize(&mut positions_data)?.0 as usize;
let (bit_widths, positions) = positions_data.split_at(num_positions_bitpacked_blocks);
Ok(BorrowedPositionReader {
bit_widths,
positions,
block_decoder: BlockDecoder::default(),
block_offset: i64::MAX as u64,
anchor_offset: 0u64,
original_bit_widths: bit_widths,
original_positions: positions,
})
}
fn reset(&mut self) {
self.positions = self.original_positions;
self.bit_widths = self.original_bit_widths;
self.block_offset = i64::MAX as u64;
self.anchor_offset = 0u64;
}
/// Advance from num_blocks bitpacked blocks.
///
/// Panics if there are not that many remaining blocks.
fn advance_num_blocks(&mut self, num_blocks: usize) {
let num_bits: usize = self.bit_widths.as_ref()[..num_blocks]
.iter()
.cloned()
.map(|num_bits| num_bits as usize)
.sum();
let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8;
// self.bit_widths.advance(num_blocks);
let (_, rest) = self.bit_widths.split_at(num_blocks);
self.bit_widths = rest;
// self.positions.advance(num_bytes_to_skip);
let (_, rest) = self.positions.split_at(num_bytes_to_skip);
self.positions = rest;
self.anchor_offset += (num_blocks * COMPRESSION_BLOCK_SIZE) as u64;
}
/// block_rel_id is counted relatively to the anchor.
/// block_rel_id = 0 means the anchor block.
/// block_rel_id = i means the ith block after the anchor block.
fn load_block(&mut self, block_rel_id: usize) {
let bit_widths = self.bit_widths;
let byte_offset: usize = bit_widths[0..block_rel_id]
.iter()
.map(|&b| b as usize)
.sum::<usize>()
* COMPRESSION_BLOCK_SIZE
/ 8;
let compressed_data = &self.positions[byte_offset..];
if bit_widths.len() > block_rel_id {
// that block is bitpacked.
let bit_width = bit_widths[block_rel_id];
self.block_decoder
.uncompress_block_unsorted(compressed_data, bit_width, false);
} else {
// that block is vint encoded.
self.block_decoder
.uncompress_vint_unsorted_until_end(compressed_data);
}
self.block_offset = self.anchor_offset + (block_rel_id * COMPRESSION_BLOCK_SIZE) as u64;
}
/// Fills a buffer with the positions `[offset..offset+output.len())` integers.
///
/// This function is optimized to be called with increasing values of `offset`.
pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) {
if offset < self.anchor_offset {
self.reset();
}
let delta_to_block_offset = offset as i64 - self.block_offset as i64;
if !(0..128).contains(&delta_to_block_offset) {
// The first position is not within the first block.
// (Note that it could be before or after)
// We need to possibly skip a few blocks, and decompress the first relevant block.
let delta_to_anchor_offset = offset - self.anchor_offset;
let num_blocks_to_skip =
(delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize;
self.advance_num_blocks(num_blocks_to_skip);
self.load_block(0);
} else {
// The request offset is within the loaded block.
// We still need to advance anchor_offset to our current block.
let num_blocks_to_skip =
((self.block_offset - self.anchor_offset) / COMPRESSION_BLOCK_SIZE as u64) as usize;
self.advance_num_blocks(num_blocks_to_skip);
}
// At this point, the block containing offset is loaded, and anchor has
// been updated to point to it as well.
for i in 1.. {
// we copy the part from block i - 1 that is relevant.
let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE;
let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block;
if remaining_in_block >= output.len() {
output.copy_from_slice(
&self.block_decoder.output_array()[offset_in_block..][..output.len()],
);
break;
}
output[..remaining_in_block]
.copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]);
output = &mut output[remaining_in_block..];
// we load block #i if necessary.
offset += remaining_in_block as u64;
self.load_block(i);
}
}
}

View File

@@ -28,7 +28,7 @@
//! * *VIntPosDeltas* := *VIntPosDelta*^(*P* % 128).
//!
//! The skip widths encoded separately makes it easy and fast to rapidly skip over n positions.
pub(crate) mod borrowed_position_reader;
mod reader;
mod serializer;

View File

@@ -1,223 +0,0 @@
use std::io;
use common::VInt;
use crate::postings::block_segment_postings::{decode_bitpacked_block, decode_vint_block};
use crate::postings::borrowed_skip_reader::BorrowedSkipReader;
use crate::postings::compression::{BlockDecoder, COMPRESSION_BLOCK_SIZE};
use crate::postings::{BlockInfo, FreqReadingOption};
use crate::schema::IndexRecordOption;
use crate::{DocId, Score, TERMINATED};
/// `BorrowedBlockSegmentPostings` is a cursor iterating over blocks
/// of documents.
///
/// # Warning
///
/// While it is useful for some very specific high-performance
/// use cases, you should prefer using `SegmentPostings` for most usage.
#[derive(Clone)]
pub struct BorrowedBlockSegmentPostings<'a> {
pub(crate) doc_decoder: BlockDecoder,
block_loaded: bool,
freq_decoder: BlockDecoder,
freq_reading_option: FreqReadingOption,
block_max_score_cache: Option<Score>,
doc_freq: u32,
data: &'a [u8],
skip_reader: BorrowedSkipReader<'a>,
}
fn split_into_skips_and_postings(
doc_freq: u32,
mut bytes: &[u8],
) -> io::Result<(Option<&[u8]>, &[u8])> {
if doc_freq < COMPRESSION_BLOCK_SIZE as u32 {
return Ok((None, bytes));
}
let skip_len = VInt::deserialize_u64(&mut bytes)? as usize;
let (skip_data, postings_data) = bytes.split_at(skip_len);
Ok((Some(skip_data), postings_data))
}
impl<'a> BorrowedBlockSegmentPostings<'a> {
/// Opens a `BorrowedBlockSegmentPostings`.
/// `doc_freq` is the number of documents in the posting list.
/// `record_option` represents the amount of data available according to the schema.
/// `requested_option` is the amount of data requested by the user.
/// If for instance, we do not request for term frequencies, this function will not decompress
/// term frequency blocks.
pub(crate) fn open(
doc_freq: u32,
bytes: &'a [u8],
mut record_option: IndexRecordOption,
requested_option: IndexRecordOption,
) -> io::Result<BorrowedBlockSegmentPostings<'a>> {
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?;
let skip_reader = match skip_data_opt {
Some(skip_data) => {
let block_count = doc_freq as usize / COMPRESSION_BLOCK_SIZE;
// 8 is the minimum size of a block with frequency (can be more if pos are stored
// too)
if skip_data.len() < 8 * block_count {
// the field might be encoded with frequency, but this term in particular isn't.
// This can happen for JSON field with term frequencies:
// - text terms are encoded with term freqs.
// - numerical terms are encoded without term freqs.
record_option = IndexRecordOption::Basic;
}
BorrowedSkipReader::new(skip_data, doc_freq, record_option)
}
None => BorrowedSkipReader::new(&[], doc_freq, record_option),
};
let freq_reading_option = match (record_option, requested_option) {
(IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq,
(_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq,
(_, _) => FreqReadingOption::ReadFreq,
};
let mut block_segment_postings = BorrowedBlockSegmentPostings {
doc_decoder: BlockDecoder::with_val(TERMINATED),
block_loaded: false,
freq_decoder: BlockDecoder::with_val(1),
freq_reading_option,
block_max_score_cache: None,
doc_freq,
data: postings_data,
skip_reader,
};
block_segment_postings.load_block();
Ok(block_segment_postings)
}
/// Returns the overall number of documents in the block postings.
/// It does not take in account whether documents are deleted or not.
///
/// This `doc_freq` is simply the sum of the length of all of the blocks
/// length, and it does not take in account deleted documents.
pub fn doc_freq(&self) -> u32 {
self.doc_freq
}
/// Returns a full block, regardless of whether the block is complete or incomplete (
/// as it happens for the last block of the posting list).
///
/// In the latter case, the block is guaranteed to be padded with the sentinel value:
/// `TERMINATED`. The array is also guaranteed to be aligned on 16 bytes = 128 bits.
///
/// This method is useful to run SSE2 linear search.
#[inline]
pub(crate) fn full_block(&self) -> &[DocId; COMPRESSION_BLOCK_SIZE] {
debug_assert!(self.block_is_loaded());
self.doc_decoder.full_output()
}
/// Return the document at index `idx` of the block.
#[inline]
pub fn doc(&self, idx: usize) -> u32 {
self.doc_decoder.output(idx)
}
/// Return the array of `term freq` in the block.
#[inline]
pub fn freqs(&self) -> &[u32] {
debug_assert!(self.block_is_loaded());
self.freq_decoder.output_array()
}
/// Return the frequency at index `idx` of the block.
#[inline]
pub fn freq(&self, idx: usize) -> u32 {
debug_assert!(self.block_is_loaded());
self.freq_decoder.output(idx)
}
/// Position on a block that may contains `target_doc`.
///
/// If all docs are smaller than target, the block loaded may be empty,
/// or be the last an incomplete VInt block.
pub fn seek(&mut self, target_doc: DocId) {
self.shallow_seek(target_doc);
self.load_block();
}
pub(crate) fn position_offset(&self) -> u64 {
self.skip_reader.position_offset()
}
/// Dangerous API! This calls seek on the skip list,
/// but does not `.load_block()` afterwards.
///
/// `.load_block()` needs to be called manually afterwards.
/// If all docs are smaller than target, the block loaded may be empty,
/// or be the last an incomplete VInt block.
pub(crate) fn shallow_seek(&mut self, target_doc: DocId) {
if self.skip_reader.seek(target_doc) {
self.block_max_score_cache = None;
self.block_loaded = false;
}
}
pub(crate) fn block_is_loaded(&self) -> bool {
self.block_loaded
}
pub(crate) fn load_block(&mut self) {
let offset = self.skip_reader.byte_offset();
if self.block_is_loaded() {
return;
}
match self.skip_reader.block_info() {
BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
..
} => {
decode_bitpacked_block(
&mut self.doc_decoder,
if let FreqReadingOption::ReadFreq = self.freq_reading_option {
Some(&mut self.freq_decoder)
} else {
None
},
&self.data[offset..],
self.skip_reader.last_doc_in_previous_block,
doc_num_bits,
tf_num_bits,
strict_delta_encoded,
);
}
BlockInfo::VInt { num_docs } => {
let data = {
if num_docs == 0 {
&[]
} else {
&self.data[offset..]
}
};
decode_vint_block(
&mut self.doc_decoder,
if let FreqReadingOption::ReadFreq = self.freq_reading_option {
Some(&mut self.freq_decoder)
} else {
None
},
data,
self.skip_reader.last_doc_in_previous_block,
num_docs as usize,
);
}
}
self.block_loaded = true;
}
/// Advance to the next block.
pub fn advance(&mut self) {
self.skip_reader.advance();
self.block_loaded = false;
self.block_max_score_cache = None;
self.load_block();
}
}

View File

@@ -1,179 +0,0 @@
use common::HasLen;
use crate::docset::DocSet;
use crate::fastfield::AliveBitSet;
use crate::positions::borrowed_position_reader::BorrowedPositionReader;
use crate::postings::borrowed_block_segment_postings::BorrowedBlockSegmentPostings;
use crate::postings::branchless_binary_search;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::{DocId, TERMINATED};
/// `SegmentPostings` represents the inverted list or postings associated with
/// a term in a `Segment`.
///
/// As we iterate through the `SegmentPostings`, the frequencies are optionally decoded.
/// Positions on the other hand, are optionally entirely decoded upfront.
#[derive(Clone)]
pub struct BorrowedSegmentPostings<'a> {
pub(crate) block_cursor: BorrowedBlockSegmentPostings<'a>,
cur: usize,
position_reader: Option<BorrowedPositionReader<'a>>,
}
impl BorrowedSegmentPostings<'_> {
/// Compute the number of non-deleted documents.
///
/// This method will clone and scan through the posting lists.
/// (this is a rather expensive operation).
pub fn doc_freq_given_deletes(&self, alive_bitset: &AliveBitSet) -> u32 {
let mut docset = self.clone();
let mut doc_freq = 0;
loop {
let doc = docset.doc();
if doc == TERMINATED {
return doc_freq;
}
if alive_bitset.is_alive(doc) {
doc_freq += 1u32;
}
docset.advance();
}
}
/// Returns the overall number of documents in the block postings.
/// It does not take in account whether documents are deleted or not.
pub fn doc_freq(&self) -> u32 {
self.block_cursor.doc_freq()
}
/// Reads a Segment postings from an &[u8]
///
/// * `len` - number of document in the posting lists.
/// * `data` - data array. The complete data is not necessarily used.
/// * `freq_handler` - the freq handler is in charge of decoding frequencies and/or positions
pub(crate) fn from_block_postings<'a>(
segment_block_postings: BorrowedBlockSegmentPostings<'a>,
position_reader: Option<BorrowedPositionReader<'a>>,
) -> BorrowedSegmentPostings<'a> {
BorrowedSegmentPostings {
block_cursor: segment_block_postings,
cur: 0, // cursor within the block
position_reader,
}
}
pub fn term_freq(&self) -> u32 {
debug_assert!(
// Here we do not use the len of `freqs()`
// because it is actually ok to request for the freq of doc
// even if no frequency were encoded for the field.
//
// In that case we hit the block just as if the frequency had been
// decoded. The block is simply prefilled by the value 1.
self.cur < COMPRESSION_BLOCK_SIZE,
"Have you forgotten to call `.advance()` at least once before calling `.term_freq()`."
);
self.block_cursor.freq(self.cur)
}
/// Returns the positions offsetted with a given value.
/// It is not necessary to clear the `output` before calling this method.
/// The output vector will be resized to the `term_freq`.
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
output.clear();
self.append_positions_with_offset(offset, output);
}
/// Returns the positions offsetted with a given value.
/// Data will be appended to the output.
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let term_freq = self.term_freq();
let prev_len = output.len();
if let Some(position_reader) = self.position_reader.as_mut() {
debug_assert!(
!self.block_cursor.freqs().is_empty(),
"No positions available"
);
let read_offset = self.block_cursor.position_offset()
+ (self.block_cursor.freqs()[..self.cur]
.iter()
.cloned()
.sum::<u32>() as u64);
// TODO: instead of zeroing the output, we could use MaybeUninit or similar.
output.resize(prev_len + term_freq as usize, 0u32);
position_reader.read(read_offset, &mut output[prev_len..]);
let mut cum = offset;
for output_mut in output[prev_len..].iter_mut() {
cum += *output_mut;
*output_mut = cum;
}
}
}
/// Returns the positions of the term in the given document.
/// The output vector will be resized to the `term_freq`.
pub fn positions(&mut self, output: &mut Vec<u32>) {
self.positions_with_offset(0u32, output);
}
}
impl DocSet for BorrowedSegmentPostings<'_> {
// goes to the next element.
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> DocId {
debug_assert!(self.block_cursor.block_is_loaded());
if self.cur == COMPRESSION_BLOCK_SIZE - 1 {
self.cur = 0;
self.block_cursor.advance();
} else {
self.cur += 1;
}
self.doc()
}
fn seek(&mut self, target: DocId) -> DocId {
debug_assert!(self.doc() <= target);
if self.doc() >= target {
return self.doc();
}
self.block_cursor.seek(target);
// At this point we are on the block, that might contain our document.
let output = self.block_cursor.full_block();
self.cur = branchless_binary_search(output, target);
// The last block is not full and padded with the value TERMINATED,
// so that we are guaranteed to have at least doc in the block (a real one or the padding)
// that is greater or equal to the target.
debug_assert!(self.cur < COMPRESSION_BLOCK_SIZE);
// `doc` is now the first element >= `target`
// If all docs are smaller than target the current block should be incomplemented and padded
// with the value `TERMINATED`.
//
// After the search, the cursor should point to the first value of TERMINATED.
let doc = output[self.cur];
debug_assert!(doc >= target);
debug_assert_eq!(doc, self.doc());
doc
}
/// Return the current document's `DocId`.
#[inline]
fn doc(&self) -> DocId {
self.block_cursor.doc(self.cur)
}
fn size_hint(&self) -> u32 {
self.len() as u32
}
}
impl HasLen for BorrowedSegmentPostings<'_> {
fn len(&self) -> usize {
self.block_cursor.doc_freq() as usize
}
}

View File

@@ -1,160 +0,0 @@
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
use crate::postings::skip::{decode_bitwidth, decode_block_wand_max_tf, read_u32};
use crate::postings::BlockInfo;
use crate::schema::IndexRecordOption;
use crate::{DocId, TERMINATED};
#[derive(Clone)]
pub(crate) struct BorrowedSkipReader<'a> {
last_doc_in_block: DocId,
pub(crate) last_doc_in_previous_block: DocId,
owned_read: &'a [u8],
skip_info: IndexRecordOption,
byte_offset: usize,
remaining_docs: u32, // number of docs remaining, including the
// documents in the current block.
block_info: BlockInfo,
position_offset: u64,
}
impl<'a> BorrowedSkipReader<'a> {
pub fn new(
data: &'a [u8],
doc_freq: u32,
skip_info: IndexRecordOption,
) -> BorrowedSkipReader<'a> {
let mut skip_reader = BorrowedSkipReader {
last_doc_in_block: if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
0
} else {
TERMINATED
},
last_doc_in_previous_block: 0u32,
owned_read: data,
skip_info,
block_info: BlockInfo::VInt { num_docs: doc_freq },
byte_offset: 0,
remaining_docs: doc_freq,
position_offset: 0u64,
};
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
skip_reader.read_block_info();
}
skip_reader
}
pub(crate) fn last_doc_in_block(&self) -> DocId {
self.last_doc_in_block
}
pub fn position_offset(&self) -> u64 {
self.position_offset
}
#[inline]
pub fn byte_offset(&self) -> usize {
self.byte_offset
}
fn read_block_info(&mut self) {
let bytes = self.owned_read;
let advance_len: usize;
self.last_doc_in_block = read_u32(bytes);
let (doc_num_bits, strict_delta_encoded) = decode_bitwidth(bytes[4]);
match self.skip_info {
IndexRecordOption::Basic => {
advance_len = 5;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits: 0,
tf_sum: 0,
block_wand_fieldnorm_id: 0,
block_wand_term_freq: 0,
};
}
IndexRecordOption::WithFreqs => {
let tf_num_bits = bytes[5];
let block_wand_fieldnorm_id = bytes[6];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
advance_len = 8;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
tf_sum: 0,
block_wand_fieldnorm_id,
block_wand_term_freq,
};
}
IndexRecordOption::WithFreqsAndPositions => {
let tf_num_bits = bytes[5];
let tf_sum = read_u32(&bytes[6..10]);
let block_wand_fieldnorm_id = bytes[10];
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
advance_len = 12;
self.block_info = BlockInfo::BitPacked {
doc_num_bits,
strict_delta_encoded,
tf_num_bits,
tf_sum,
block_wand_fieldnorm_id,
block_wand_term_freq,
};
}
}
// self.owned_read.advance(advance_len);
let (_, rest) = self.owned_read.split_at(advance_len);
self.owned_read = rest;
}
pub fn block_info(&self) -> BlockInfo {
self.block_info
}
/// Advance the skip reader to the block that may contain the target.
///
/// If the target is larger than all documents, the skip_reader
/// then advance to the last Variable In block.
pub fn seek(&mut self, target: DocId) -> bool {
if self.last_doc_in_block() >= target {
return false;
}
loop {
self.advance();
if self.last_doc_in_block() >= target {
return true;
}
}
}
pub fn advance(&mut self) {
match self.block_info {
BlockInfo::BitPacked {
doc_num_bits,
tf_num_bits,
tf_sum,
..
} => {
self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32;
self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits);
self.position_offset += tf_sum as u64;
}
BlockInfo::VInt { num_docs } => {
debug_assert_eq!(num_docs, self.remaining_docs);
self.remaining_docs = 0;
self.byte_offset = usize::MAX;
}
}
self.last_doc_in_previous_block = self.last_doc_in_block;
if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 {
self.read_block_info();
} else {
self.last_doc_in_block = TERMINATED;
self.block_info = BlockInfo::VInt {
num_docs: self.remaining_docs,
};
}
}
}

View File

@@ -5,9 +5,7 @@ mod block_search;
pub(crate) use self::block_search::branchless_binary_search;
pub(crate) mod block_segment_postings;
pub(crate) mod borrowed_block_segment_postings;
pub(crate) mod borrowed_segment_postings;
mod borrowed_skip_reader;
pub(crate) mod compression;
mod indexing_context;
mod json_postings_writer;