mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-30 23:20:40 +00:00
perf: remove general overhead during segment merging (#47)
This commit is contained in:
@@ -1,3 +1,3 @@
|
||||
#! /bin/bash
|
||||
|
||||
cargo +stable nextest run --features mmap,stopwords,lz4-compression,zstd-compression,failpoints --verbose --workspace
|
||||
cargo +stable nextest run --features quickwit,mmap,stopwords,lz4-compression,zstd-compression,failpoints --verbose --workspace
|
||||
|
||||
@@ -3,8 +3,10 @@ use std::io;
|
||||
use common::OwnedBytes;
|
||||
|
||||
use crate::directory::FileSlice;
|
||||
use crate::positions::PositionReader;
|
||||
use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo};
|
||||
use crate::positions::borrowed_position_reader::BorrowedPositionReader;
|
||||
use crate::postings::borrowed_block_segment_postings::BorrowedBlockSegmentPostings;
|
||||
use crate::postings::borrowed_segment_postings::BorrowedSegmentPostings;
|
||||
use crate::postings::TermInfo;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::termdict::TermDictionary;
|
||||
|
||||
@@ -64,9 +66,9 @@ impl MergeOptimizedInvertedIndexReader {
|
||||
&self,
|
||||
term_info: &TermInfo,
|
||||
requested_option: IndexRecordOption,
|
||||
) -> io::Result<BlockSegmentPostings> {
|
||||
let postings_data = self.postings_bytes.slice(term_info.postings_range.clone());
|
||||
BlockSegmentPostings::open(
|
||||
) -> io::Result<BorrowedBlockSegmentPostings> {
|
||||
let postings_data = &self.postings_bytes[term_info.postings_range.clone()];
|
||||
BorrowedBlockSegmentPostings::open(
|
||||
term_info.doc_freq,
|
||||
postings_data,
|
||||
self.record_option,
|
||||
@@ -82,22 +84,20 @@ impl MergeOptimizedInvertedIndexReader {
|
||||
&self,
|
||||
term_info: &TermInfo,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<SegmentPostings> {
|
||||
) -> io::Result<BorrowedSegmentPostings> {
|
||||
let option = option.downgrade(self.record_option);
|
||||
|
||||
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
|
||||
let position_reader = {
|
||||
if option.has_positions() {
|
||||
let positions_data = self
|
||||
.positions_bytes
|
||||
.slice(term_info.positions_range.clone());
|
||||
let position_reader = PositionReader::open(positions_data)?;
|
||||
let positions_data = &self.positions_bytes[term_info.positions_range.clone()];
|
||||
let position_reader = BorrowedPositionReader::open(positions_data)?;
|
||||
Some(position_reader)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
Ok(SegmentPostings::from_block_postings(
|
||||
Ok(BorrowedSegmentPostings::from_block_postings(
|
||||
block_postings,
|
||||
position_reader,
|
||||
))
|
||||
|
||||
@@ -17,7 +17,8 @@ use crate::index::{Segment, SegmentComponent, SegmentReader};
|
||||
use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping};
|
||||
use crate::indexer::segment_updater::CancelSentinel;
|
||||
use crate::indexer::SegmentSerializer;
|
||||
use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
|
||||
use crate::postings::borrowed_segment_postings::BorrowedSegmentPostings;
|
||||
use crate::postings::InvertedIndexSerializer;
|
||||
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema};
|
||||
use crate::store::StoreWriter;
|
||||
use crate::termdict::{TermMerger, TermOrdinal};
|
||||
@@ -372,7 +373,8 @@ impl IndexMerger {
|
||||
indexed. Have you modified the schema?",
|
||||
);
|
||||
|
||||
let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![];
|
||||
let mut segment_postings_containing_the_term: Vec<(usize, BorrowedSegmentPostings)> =
|
||||
vec![];
|
||||
|
||||
let mut cnt = 0;
|
||||
while merged_terms.advance() {
|
||||
|
||||
155
src/positions/borrowed_position_reader.rs
Normal file
155
src/positions/borrowed_position_reader.rs
Normal file
@@ -0,0 +1,155 @@
|
||||
use std::io;
|
||||
|
||||
use common::{BinarySerializable, VInt};
|
||||
|
||||
use crate::positions::COMPRESSION_BLOCK_SIZE;
|
||||
use crate::postings::compression::{BlockDecoder, VIntDecoder};
|
||||
|
||||
/// When accessing the positions of a term, we get a positions_idx from the `Terminfo`.
|
||||
/// This means we need to skip to the `nth` position efficiently.
|
||||
///
|
||||
/// Blocks are compressed using bitpacking, so `skip_read` contains the number of bits
|
||||
/// (values can go from 0 to 32 bits) required to decompress every block.
|
||||
///
|
||||
/// A given block obviously takes `(128 x num_bit_for_the_block / num_bits_in_a_byte)`,
|
||||
/// so skipping a block without decompressing it is just a matter of advancing that many
|
||||
/// bytes.
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BorrowedPositionReader<'a> {
|
||||
bit_widths: &'a [u8],
|
||||
positions: &'a [u8],
|
||||
|
||||
block_decoder: BlockDecoder,
|
||||
|
||||
// offset, expressed in positions, for the first position of the block currently loaded
|
||||
// block_offset is a multiple of COMPRESSION_BLOCK_SIZE.
|
||||
block_offset: u64,
|
||||
// offset, expressed in positions, for the position of the first block encoded
|
||||
// in the `self.positions` bytes, and if bitpacked, compressed using the bitwidth in
|
||||
// `self.bit_widths`.
|
||||
//
|
||||
// As we advance, anchor increases simultaneously with bit_widths and positions get consumed.
|
||||
anchor_offset: u64,
|
||||
|
||||
// These are just copies used for .reset().
|
||||
original_bit_widths: &'a [u8],
|
||||
original_positions: &'a [u8],
|
||||
}
|
||||
|
||||
impl<'a> BorrowedPositionReader<'a> {
|
||||
/// Open and reads the term positions encoded into the positions_data owned bytes.
|
||||
pub fn open(mut positions_data: &'a [u8]) -> io::Result<BorrowedPositionReader<'a>> {
|
||||
let num_positions_bitpacked_blocks = VInt::deserialize(&mut positions_data)?.0 as usize;
|
||||
let (bit_widths, positions) = positions_data.split_at(num_positions_bitpacked_blocks);
|
||||
Ok(BorrowedPositionReader {
|
||||
bit_widths,
|
||||
positions,
|
||||
block_decoder: BlockDecoder::default(),
|
||||
block_offset: i64::MAX as u64,
|
||||
anchor_offset: 0u64,
|
||||
original_bit_widths: bit_widths,
|
||||
original_positions: positions,
|
||||
})
|
||||
}
|
||||
|
||||
fn reset(&mut self) {
|
||||
self.positions = self.original_positions;
|
||||
self.bit_widths = self.original_bit_widths;
|
||||
self.block_offset = i64::MAX as u64;
|
||||
self.anchor_offset = 0u64;
|
||||
}
|
||||
|
||||
/// Advance from num_blocks bitpacked blocks.
|
||||
///
|
||||
/// Panics if there are not that many remaining blocks.
|
||||
fn advance_num_blocks(&mut self, num_blocks: usize) {
|
||||
let num_bits: usize = self.bit_widths.as_ref()[..num_blocks]
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(|num_bits| num_bits as usize)
|
||||
.sum();
|
||||
let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8;
|
||||
|
||||
// self.bit_widths.advance(num_blocks);
|
||||
let (_, rest) = self.bit_widths.split_at(num_blocks);
|
||||
self.bit_widths = rest;
|
||||
|
||||
// self.positions.advance(num_bytes_to_skip);
|
||||
let (_, rest) = self.positions.split_at(num_bytes_to_skip);
|
||||
self.positions = rest;
|
||||
|
||||
self.anchor_offset += (num_blocks * COMPRESSION_BLOCK_SIZE) as u64;
|
||||
}
|
||||
|
||||
/// block_rel_id is counted relatively to the anchor.
|
||||
/// block_rel_id = 0 means the anchor block.
|
||||
/// block_rel_id = i means the ith block after the anchor block.
|
||||
fn load_block(&mut self, block_rel_id: usize) {
|
||||
let bit_widths = self.bit_widths;
|
||||
let byte_offset: usize = bit_widths[0..block_rel_id]
|
||||
.iter()
|
||||
.map(|&b| b as usize)
|
||||
.sum::<usize>()
|
||||
* COMPRESSION_BLOCK_SIZE
|
||||
/ 8;
|
||||
let compressed_data = &self.positions[byte_offset..];
|
||||
if bit_widths.len() > block_rel_id {
|
||||
// that block is bitpacked.
|
||||
let bit_width = bit_widths[block_rel_id];
|
||||
self.block_decoder
|
||||
.uncompress_block_unsorted(compressed_data, bit_width, false);
|
||||
} else {
|
||||
// that block is vint encoded.
|
||||
self.block_decoder
|
||||
.uncompress_vint_unsorted_until_end(compressed_data);
|
||||
}
|
||||
self.block_offset = self.anchor_offset + (block_rel_id * COMPRESSION_BLOCK_SIZE) as u64;
|
||||
}
|
||||
|
||||
/// Fills a buffer with the positions `[offset..offset+output.len())` integers.
|
||||
///
|
||||
/// This function is optimized to be called with increasing values of `offset`.
|
||||
pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) {
|
||||
if offset < self.anchor_offset {
|
||||
self.reset();
|
||||
}
|
||||
let delta_to_block_offset = offset as i64 - self.block_offset as i64;
|
||||
if !(0..128).contains(&delta_to_block_offset) {
|
||||
// The first position is not within the first block.
|
||||
// (Note that it could be before or after)
|
||||
// We need to possibly skip a few blocks, and decompress the first relevant block.
|
||||
let delta_to_anchor_offset = offset - self.anchor_offset;
|
||||
let num_blocks_to_skip =
|
||||
(delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize;
|
||||
self.advance_num_blocks(num_blocks_to_skip);
|
||||
self.load_block(0);
|
||||
} else {
|
||||
// The request offset is within the loaded block.
|
||||
// We still need to advance anchor_offset to our current block.
|
||||
let num_blocks_to_skip =
|
||||
((self.block_offset - self.anchor_offset) / COMPRESSION_BLOCK_SIZE as u64) as usize;
|
||||
self.advance_num_blocks(num_blocks_to_skip);
|
||||
}
|
||||
|
||||
// At this point, the block containing offset is loaded, and anchor has
|
||||
// been updated to point to it as well.
|
||||
for i in 1.. {
|
||||
// we copy the part from block i - 1 that is relevant.
|
||||
let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE;
|
||||
let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block;
|
||||
if remaining_in_block >= output.len() {
|
||||
output.copy_from_slice(
|
||||
&self.block_decoder.output_array()[offset_in_block..][..output.len()],
|
||||
);
|
||||
break;
|
||||
}
|
||||
output[..remaining_in_block]
|
||||
.copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]);
|
||||
output = &mut output[remaining_in_block..];
|
||||
// we load block #i if necessary.
|
||||
offset += remaining_in_block as u64;
|
||||
self.load_block(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -28,6 +28,7 @@
|
||||
//! * *VIntPosDeltas* := *VIntPosDelta*^(*P* % 128).
|
||||
//!
|
||||
//! The skip widths encoded separately makes it easy and fast to rapidly skip over n positions.
|
||||
pub(crate) mod borrowed_position_reader;
|
||||
mod reader;
|
||||
mod serializer;
|
||||
|
||||
|
||||
@@ -45,13 +45,109 @@ impl<W: io::Write> PositionSerializer<W> {
|
||||
|
||||
/// Writes all of the given positions delta.
|
||||
pub fn write_positions_delta(&mut self, mut positions_delta: &[u32]) {
|
||||
while !positions_delta.is_empty() {
|
||||
let remaining_block_len = self.remaining_block_len();
|
||||
let num_to_write = remaining_block_len.min(positions_delta.len());
|
||||
self.block.extend(&positions_delta[..num_to_write]);
|
||||
positions_delta = &positions_delta[num_to_write..];
|
||||
if self.remaining_block_len() == 0 {
|
||||
self.flush_block();
|
||||
match positions_delta.len() {
|
||||
0 => {}
|
||||
1 => {
|
||||
if self.remaining_block_len() == 0 {
|
||||
self.flush_block();
|
||||
}
|
||||
self.block.push(positions_delta[0]);
|
||||
}
|
||||
2 => {
|
||||
let rem = self.remaining_block_len();
|
||||
if rem < 2 {
|
||||
if rem == 1 {
|
||||
self.block.push(positions_delta[0]);
|
||||
self.flush_block();
|
||||
self.block.push(positions_delta[1]);
|
||||
} else {
|
||||
self.flush_block();
|
||||
self.block.push(positions_delta[0]);
|
||||
self.block.push(positions_delta[1]);
|
||||
}
|
||||
} else {
|
||||
self.block.push(positions_delta[0]);
|
||||
self.block.push(positions_delta[1]);
|
||||
}
|
||||
}
|
||||
3 => {
|
||||
let rem = self.remaining_block_len();
|
||||
match rem {
|
||||
3.. => {
|
||||
self.block.push(positions_delta[0]);
|
||||
self.block.push(positions_delta[1]);
|
||||
self.block.push(positions_delta[2]);
|
||||
}
|
||||
2 => {
|
||||
self.block.push(positions_delta[0]);
|
||||
self.block.push(positions_delta[1]);
|
||||
self.flush_block();
|
||||
self.block.push(positions_delta[2]);
|
||||
}
|
||||
1 => {
|
||||
self.block.push(positions_delta[0]);
|
||||
self.flush_block();
|
||||
self.block.push(positions_delta[1]);
|
||||
self.block.push(positions_delta[2]);
|
||||
}
|
||||
0 => {
|
||||
self.flush_block();
|
||||
self.block.push(positions_delta[0]);
|
||||
self.block.push(positions_delta[1]);
|
||||
self.block.push(positions_delta[2]);
|
||||
}
|
||||
}
|
||||
}
|
||||
4 => {
|
||||
let rem = self.remaining_block_len();
|
||||
match rem {
|
||||
4.. => {
|
||||
self.block.push(positions_delta[0]);
|
||||
self.block.push(positions_delta[1]);
|
||||
self.block.push(positions_delta[2]);
|
||||
self.block.push(positions_delta[3]);
|
||||
}
|
||||
3 => {
|
||||
self.block.push(positions_delta[0]);
|
||||
self.block.push(positions_delta[1]);
|
||||
self.block.push(positions_delta[2]);
|
||||
self.flush_block();
|
||||
self.block.push(positions_delta[3]);
|
||||
}
|
||||
2 => {
|
||||
self.block.push(positions_delta[0]);
|
||||
self.block.push(positions_delta[1]);
|
||||
self.flush_block();
|
||||
self.block.push(positions_delta[2]);
|
||||
self.block.push(positions_delta[3]);
|
||||
}
|
||||
1 => {
|
||||
self.block.push(positions_delta[0]);
|
||||
self.flush_block();
|
||||
self.block.push(positions_delta[1]);
|
||||
self.block.push(positions_delta[2]);
|
||||
self.block.push(positions_delta[3]);
|
||||
}
|
||||
0 => {
|
||||
self.flush_block();
|
||||
self.block.push(positions_delta[0]);
|
||||
self.block.push(positions_delta[1]);
|
||||
self.block.push(positions_delta[2]);
|
||||
self.block.push(positions_delta[3]);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
while !positions_delta.is_empty() {
|
||||
let remaining_block_len = self.remaining_block_len();
|
||||
let num_to_write = remaining_block_len.min(positions_delta.len());
|
||||
self.block
|
||||
.extend_from_slice(&positions_delta[..num_to_write]);
|
||||
positions_delta = &positions_delta[num_to_write..];
|
||||
if self.remaining_block_len() == 0 {
|
||||
self.flush_block();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::query::Bm25Weight;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::{DocId, Score, TERMINATED};
|
||||
|
||||
fn max_score<I: Iterator<Item = Score>>(mut it: I) -> Option<Score> {
|
||||
pub(crate) fn max_score<I: Iterator<Item = Score>>(mut it: I) -> Option<Score> {
|
||||
it.next().map(|first| it.fold(first, Score::max))
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ pub struct BlockSegmentPostings {
|
||||
skip_reader: SkipReader,
|
||||
}
|
||||
|
||||
fn decode_bitpacked_block(
|
||||
pub(crate) fn decode_bitpacked_block(
|
||||
doc_decoder: &mut BlockDecoder,
|
||||
freq_decoder_opt: Option<&mut BlockDecoder>,
|
||||
data: &[u8],
|
||||
@@ -53,7 +53,7 @@ fn decode_bitpacked_block(
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_vint_block(
|
||||
pub(crate) fn decode_vint_block(
|
||||
doc_decoder: &mut BlockDecoder,
|
||||
freq_decoder_opt: Option<&mut BlockDecoder>,
|
||||
data: &[u8],
|
||||
|
||||
223
src/postings/borrowed_block_segment_postings.rs
Normal file
223
src/postings/borrowed_block_segment_postings.rs
Normal file
@@ -0,0 +1,223 @@
|
||||
use std::io;
|
||||
|
||||
use common::VInt;
|
||||
|
||||
use crate::postings::block_segment_postings::{decode_bitpacked_block, decode_vint_block};
|
||||
use crate::postings::borrowed_skip_reader::BorrowedSkipReader;
|
||||
use crate::postings::compression::{BlockDecoder, COMPRESSION_BLOCK_SIZE};
|
||||
use crate::postings::{BlockInfo, FreqReadingOption};
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::{DocId, Score, TERMINATED};
|
||||
|
||||
/// `BorrowedBlockSegmentPostings` is a cursor iterating over blocks
|
||||
/// of documents.
|
||||
///
|
||||
/// # Warning
|
||||
///
|
||||
/// While it is useful for some very specific high-performance
|
||||
/// use cases, you should prefer using `SegmentPostings` for most usage.
|
||||
#[derive(Clone)]
|
||||
pub struct BorrowedBlockSegmentPostings<'a> {
|
||||
pub(crate) doc_decoder: BlockDecoder,
|
||||
block_loaded: bool,
|
||||
freq_decoder: BlockDecoder,
|
||||
freq_reading_option: FreqReadingOption,
|
||||
block_max_score_cache: Option<Score>,
|
||||
doc_freq: u32,
|
||||
data: &'a [u8],
|
||||
skip_reader: BorrowedSkipReader<'a>,
|
||||
}
|
||||
|
||||
fn split_into_skips_and_postings(
|
||||
doc_freq: u32,
|
||||
mut bytes: &[u8],
|
||||
) -> io::Result<(Option<&[u8]>, &[u8])> {
|
||||
if doc_freq < COMPRESSION_BLOCK_SIZE as u32 {
|
||||
return Ok((None, bytes));
|
||||
}
|
||||
let skip_len = VInt::deserialize_u64(&mut bytes)? as usize;
|
||||
let (skip_data, postings_data) = bytes.split_at(skip_len);
|
||||
Ok((Some(skip_data), postings_data))
|
||||
}
|
||||
|
||||
impl<'a> BorrowedBlockSegmentPostings<'a> {
|
||||
/// Opens a `BorrowedBlockSegmentPostings`.
|
||||
/// `doc_freq` is the number of documents in the posting list.
|
||||
/// `record_option` represents the amount of data available according to the schema.
|
||||
/// `requested_option` is the amount of data requested by the user.
|
||||
/// If for instance, we do not request for term frequencies, this function will not decompress
|
||||
/// term frequency blocks.
|
||||
pub(crate) fn open(
|
||||
doc_freq: u32,
|
||||
bytes: &'a [u8],
|
||||
mut record_option: IndexRecordOption,
|
||||
requested_option: IndexRecordOption,
|
||||
) -> io::Result<BorrowedBlockSegmentPostings<'a>> {
|
||||
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?;
|
||||
let skip_reader = match skip_data_opt {
|
||||
Some(skip_data) => {
|
||||
let block_count = doc_freq as usize / COMPRESSION_BLOCK_SIZE;
|
||||
// 8 is the minimum size of a block with frequency (can be more if pos are stored
|
||||
// too)
|
||||
if skip_data.len() < 8 * block_count {
|
||||
// the field might be encoded with frequency, but this term in particular isn't.
|
||||
// This can happen for JSON field with term frequencies:
|
||||
// - text terms are encoded with term freqs.
|
||||
// - numerical terms are encoded without term freqs.
|
||||
record_option = IndexRecordOption::Basic;
|
||||
}
|
||||
BorrowedSkipReader::new(skip_data, doc_freq, record_option)
|
||||
}
|
||||
None => BorrowedSkipReader::new(&[], doc_freq, record_option),
|
||||
};
|
||||
|
||||
let freq_reading_option = match (record_option, requested_option) {
|
||||
(IndexRecordOption::Basic, _) => FreqReadingOption::NoFreq,
|
||||
(_, IndexRecordOption::Basic) => FreqReadingOption::SkipFreq,
|
||||
(_, _) => FreqReadingOption::ReadFreq,
|
||||
};
|
||||
|
||||
let mut block_segment_postings = BorrowedBlockSegmentPostings {
|
||||
doc_decoder: BlockDecoder::with_val(TERMINATED),
|
||||
block_loaded: false,
|
||||
freq_decoder: BlockDecoder::with_val(1),
|
||||
freq_reading_option,
|
||||
block_max_score_cache: None,
|
||||
doc_freq,
|
||||
data: postings_data,
|
||||
skip_reader,
|
||||
};
|
||||
block_segment_postings.load_block();
|
||||
Ok(block_segment_postings)
|
||||
}
|
||||
|
||||
/// Returns the overall number of documents in the block postings.
|
||||
/// It does not take in account whether documents are deleted or not.
|
||||
///
|
||||
/// This `doc_freq` is simply the sum of the length of all of the blocks
|
||||
/// length, and it does not take in account deleted documents.
|
||||
pub fn doc_freq(&self) -> u32 {
|
||||
self.doc_freq
|
||||
}
|
||||
|
||||
/// Returns a full block, regardless of whether the block is complete or incomplete (
|
||||
/// as it happens for the last block of the posting list).
|
||||
///
|
||||
/// In the latter case, the block is guaranteed to be padded with the sentinel value:
|
||||
/// `TERMINATED`. The array is also guaranteed to be aligned on 16 bytes = 128 bits.
|
||||
///
|
||||
/// This method is useful to run SSE2 linear search.
|
||||
#[inline]
|
||||
pub(crate) fn full_block(&self) -> &[DocId; COMPRESSION_BLOCK_SIZE] {
|
||||
debug_assert!(self.block_is_loaded());
|
||||
self.doc_decoder.full_output()
|
||||
}
|
||||
|
||||
/// Return the document at index `idx` of the block.
|
||||
#[inline]
|
||||
pub fn doc(&self, idx: usize) -> u32 {
|
||||
self.doc_decoder.output(idx)
|
||||
}
|
||||
|
||||
/// Return the array of `term freq` in the block.
|
||||
#[inline]
|
||||
pub fn freqs(&self) -> &[u32] {
|
||||
debug_assert!(self.block_is_loaded());
|
||||
self.freq_decoder.output_array()
|
||||
}
|
||||
|
||||
/// Return the frequency at index `idx` of the block.
|
||||
#[inline]
|
||||
pub fn freq(&self, idx: usize) -> u32 {
|
||||
debug_assert!(self.block_is_loaded());
|
||||
self.freq_decoder.output(idx)
|
||||
}
|
||||
|
||||
/// Position on a block that may contains `target_doc`.
|
||||
///
|
||||
/// If all docs are smaller than target, the block loaded may be empty,
|
||||
/// or be the last an incomplete VInt block.
|
||||
pub fn seek(&mut self, target_doc: DocId) {
|
||||
self.shallow_seek(target_doc);
|
||||
self.load_block();
|
||||
}
|
||||
|
||||
pub(crate) fn position_offset(&self) -> u64 {
|
||||
self.skip_reader.position_offset()
|
||||
}
|
||||
|
||||
/// Dangerous API! This calls seek on the skip list,
|
||||
/// but does not `.load_block()` afterwards.
|
||||
///
|
||||
/// `.load_block()` needs to be called manually afterwards.
|
||||
/// If all docs are smaller than target, the block loaded may be empty,
|
||||
/// or be the last an incomplete VInt block.
|
||||
pub(crate) fn shallow_seek(&mut self, target_doc: DocId) {
|
||||
if self.skip_reader.seek(target_doc) {
|
||||
self.block_max_score_cache = None;
|
||||
self.block_loaded = false;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn block_is_loaded(&self) -> bool {
|
||||
self.block_loaded
|
||||
}
|
||||
|
||||
pub(crate) fn load_block(&mut self) {
|
||||
let offset = self.skip_reader.byte_offset();
|
||||
if self.block_is_loaded() {
|
||||
return;
|
||||
}
|
||||
match self.skip_reader.block_info() {
|
||||
BlockInfo::BitPacked {
|
||||
doc_num_bits,
|
||||
strict_delta_encoded,
|
||||
tf_num_bits,
|
||||
..
|
||||
} => {
|
||||
decode_bitpacked_block(
|
||||
&mut self.doc_decoder,
|
||||
if let FreqReadingOption::ReadFreq = self.freq_reading_option {
|
||||
Some(&mut self.freq_decoder)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
&self.data[offset..],
|
||||
self.skip_reader.last_doc_in_previous_block,
|
||||
doc_num_bits,
|
||||
tf_num_bits,
|
||||
strict_delta_encoded,
|
||||
);
|
||||
}
|
||||
BlockInfo::VInt { num_docs } => {
|
||||
let data = {
|
||||
if num_docs == 0 {
|
||||
&[]
|
||||
} else {
|
||||
&self.data[offset..]
|
||||
}
|
||||
};
|
||||
decode_vint_block(
|
||||
&mut self.doc_decoder,
|
||||
if let FreqReadingOption::ReadFreq = self.freq_reading_option {
|
||||
Some(&mut self.freq_decoder)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
data,
|
||||
self.skip_reader.last_doc_in_previous_block,
|
||||
num_docs as usize,
|
||||
);
|
||||
}
|
||||
}
|
||||
self.block_loaded = true;
|
||||
}
|
||||
|
||||
/// Advance to the next block.
|
||||
pub fn advance(&mut self) {
|
||||
self.skip_reader.advance();
|
||||
self.block_loaded = false;
|
||||
self.block_max_score_cache = None;
|
||||
self.load_block();
|
||||
}
|
||||
}
|
||||
179
src/postings/borrowed_segment_postings.rs
Normal file
179
src/postings/borrowed_segment_postings.rs
Normal file
@@ -0,0 +1,179 @@
|
||||
use common::HasLen;
|
||||
|
||||
use crate::docset::DocSet;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::positions::borrowed_position_reader::BorrowedPositionReader;
|
||||
use crate::postings::borrowed_block_segment_postings::BorrowedBlockSegmentPostings;
|
||||
use crate::postings::branchless_binary_search;
|
||||
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
|
||||
use crate::{DocId, TERMINATED};
|
||||
|
||||
/// `SegmentPostings` represents the inverted list or postings associated with
|
||||
/// a term in a `Segment`.
|
||||
///
|
||||
/// As we iterate through the `SegmentPostings`, the frequencies are optionally decoded.
|
||||
/// Positions on the other hand, are optionally entirely decoded upfront.
|
||||
#[derive(Clone)]
|
||||
pub struct BorrowedSegmentPostings<'a> {
|
||||
pub(crate) block_cursor: BorrowedBlockSegmentPostings<'a>,
|
||||
cur: usize,
|
||||
position_reader: Option<BorrowedPositionReader<'a>>,
|
||||
}
|
||||
|
||||
impl BorrowedSegmentPostings<'_> {
|
||||
/// Compute the number of non-deleted documents.
|
||||
///
|
||||
/// This method will clone and scan through the posting lists.
|
||||
/// (this is a rather expensive operation).
|
||||
pub fn doc_freq_given_deletes(&self, alive_bitset: &AliveBitSet) -> u32 {
|
||||
let mut docset = self.clone();
|
||||
let mut doc_freq = 0;
|
||||
loop {
|
||||
let doc = docset.doc();
|
||||
if doc == TERMINATED {
|
||||
return doc_freq;
|
||||
}
|
||||
if alive_bitset.is_alive(doc) {
|
||||
doc_freq += 1u32;
|
||||
}
|
||||
docset.advance();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the overall number of documents in the block postings.
|
||||
/// It does not take in account whether documents are deleted or not.
|
||||
pub fn doc_freq(&self) -> u32 {
|
||||
self.block_cursor.doc_freq()
|
||||
}
|
||||
|
||||
/// Reads a Segment postings from an &[u8]
|
||||
///
|
||||
/// * `len` - number of document in the posting lists.
|
||||
/// * `data` - data array. The complete data is not necessarily used.
|
||||
/// * `freq_handler` - the freq handler is in charge of decoding frequencies and/or positions
|
||||
pub(crate) fn from_block_postings<'a>(
|
||||
segment_block_postings: BorrowedBlockSegmentPostings<'a>,
|
||||
position_reader: Option<BorrowedPositionReader<'a>>,
|
||||
) -> BorrowedSegmentPostings<'a> {
|
||||
BorrowedSegmentPostings {
|
||||
block_cursor: segment_block_postings,
|
||||
cur: 0, // cursor within the block
|
||||
position_reader,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn term_freq(&self) -> u32 {
|
||||
debug_assert!(
|
||||
// Here we do not use the len of `freqs()`
|
||||
// because it is actually ok to request for the freq of doc
|
||||
// even if no frequency were encoded for the field.
|
||||
//
|
||||
// In that case we hit the block just as if the frequency had been
|
||||
// decoded. The block is simply prefilled by the value 1.
|
||||
self.cur < COMPRESSION_BLOCK_SIZE,
|
||||
"Have you forgotten to call `.advance()` at least once before calling `.term_freq()`."
|
||||
);
|
||||
self.block_cursor.freq(self.cur)
|
||||
}
|
||||
|
||||
/// Returns the positions offsetted with a given value.
|
||||
/// It is not necessary to clear the `output` before calling this method.
|
||||
/// The output vector will be resized to the `term_freq`.
|
||||
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
|
||||
output.clear();
|
||||
self.append_positions_with_offset(offset, output);
|
||||
}
|
||||
|
||||
/// Returns the positions offsetted with a given value.
|
||||
/// Data will be appended to the output.
|
||||
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
|
||||
let term_freq = self.term_freq();
|
||||
let prev_len = output.len();
|
||||
if let Some(position_reader) = self.position_reader.as_mut() {
|
||||
debug_assert!(
|
||||
!self.block_cursor.freqs().is_empty(),
|
||||
"No positions available"
|
||||
);
|
||||
let read_offset = self.block_cursor.position_offset()
|
||||
+ (self.block_cursor.freqs()[..self.cur]
|
||||
.iter()
|
||||
.cloned()
|
||||
.sum::<u32>() as u64);
|
||||
// TODO: instead of zeroing the output, we could use MaybeUninit or similar.
|
||||
output.resize(prev_len + term_freq as usize, 0u32);
|
||||
position_reader.read(read_offset, &mut output[prev_len..]);
|
||||
let mut cum = offset;
|
||||
for output_mut in output[prev_len..].iter_mut() {
|
||||
cum += *output_mut;
|
||||
*output_mut = cum;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the positions of the term in the given document.
|
||||
/// The output vector will be resized to the `term_freq`.
|
||||
pub fn positions(&mut self, output: &mut Vec<u32>) {
|
||||
self.positions_with_offset(0u32, output);
|
||||
}
|
||||
}
|
||||
|
||||
impl DocSet for BorrowedSegmentPostings<'_> {
|
||||
// goes to the next element.
|
||||
// next needs to be called a first time to point to the correct element.
|
||||
#[inline]
|
||||
fn advance(&mut self) -> DocId {
|
||||
debug_assert!(self.block_cursor.block_is_loaded());
|
||||
if self.cur == COMPRESSION_BLOCK_SIZE - 1 {
|
||||
self.cur = 0;
|
||||
self.block_cursor.advance();
|
||||
} else {
|
||||
self.cur += 1;
|
||||
}
|
||||
self.doc()
|
||||
}
|
||||
|
||||
fn seek(&mut self, target: DocId) -> DocId {
|
||||
debug_assert!(self.doc() <= target);
|
||||
if self.doc() >= target {
|
||||
return self.doc();
|
||||
}
|
||||
|
||||
self.block_cursor.seek(target);
|
||||
|
||||
// At this point we are on the block, that might contain our document.
|
||||
let output = self.block_cursor.full_block();
|
||||
self.cur = branchless_binary_search(output, target);
|
||||
|
||||
// The last block is not full and padded with the value TERMINATED,
|
||||
// so that we are guaranteed to have at least doc in the block (a real one or the padding)
|
||||
// that is greater or equal to the target.
|
||||
debug_assert!(self.cur < COMPRESSION_BLOCK_SIZE);
|
||||
|
||||
// `doc` is now the first element >= `target`
|
||||
|
||||
// If all docs are smaller than target the current block should be incomplemented and padded
|
||||
// with the value `TERMINATED`.
|
||||
//
|
||||
// After the search, the cursor should point to the first value of TERMINATED.
|
||||
let doc = output[self.cur];
|
||||
debug_assert!(doc >= target);
|
||||
debug_assert_eq!(doc, self.doc());
|
||||
doc
|
||||
}
|
||||
|
||||
/// Return the current document's `DocId`.
|
||||
#[inline]
|
||||
fn doc(&self) -> DocId {
|
||||
self.block_cursor.doc(self.cur)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> u32 {
|
||||
self.len() as u32
|
||||
}
|
||||
}
|
||||
|
||||
impl HasLen for BorrowedSegmentPostings<'_> {
|
||||
fn len(&self) -> usize {
|
||||
self.block_cursor.doc_freq() as usize
|
||||
}
|
||||
}
|
||||
160
src/postings/borrowed_skip_reader.rs
Normal file
160
src/postings/borrowed_skip_reader.rs
Normal file
@@ -0,0 +1,160 @@
|
||||
use crate::postings::compression::{compressed_block_size, COMPRESSION_BLOCK_SIZE};
|
||||
use crate::postings::skip::{decode_bitwidth, decode_block_wand_max_tf, read_u32};
|
||||
use crate::postings::BlockInfo;
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::{DocId, TERMINATED};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct BorrowedSkipReader<'a> {
|
||||
last_doc_in_block: DocId,
|
||||
pub(crate) last_doc_in_previous_block: DocId,
|
||||
owned_read: &'a [u8],
|
||||
skip_info: IndexRecordOption,
|
||||
byte_offset: usize,
|
||||
remaining_docs: u32, // number of docs remaining, including the
|
||||
// documents in the current block.
|
||||
block_info: BlockInfo,
|
||||
|
||||
position_offset: u64,
|
||||
}
|
||||
|
||||
impl<'a> BorrowedSkipReader<'a> {
|
||||
pub fn new(
|
||||
data: &'a [u8],
|
||||
doc_freq: u32,
|
||||
skip_info: IndexRecordOption,
|
||||
) -> BorrowedSkipReader<'a> {
|
||||
let mut skip_reader = BorrowedSkipReader {
|
||||
last_doc_in_block: if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
|
||||
0
|
||||
} else {
|
||||
TERMINATED
|
||||
},
|
||||
last_doc_in_previous_block: 0u32,
|
||||
owned_read: data,
|
||||
skip_info,
|
||||
block_info: BlockInfo::VInt { num_docs: doc_freq },
|
||||
byte_offset: 0,
|
||||
remaining_docs: doc_freq,
|
||||
position_offset: 0u64,
|
||||
};
|
||||
if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 {
|
||||
skip_reader.read_block_info();
|
||||
}
|
||||
skip_reader
|
||||
}
|
||||
|
||||
pub(crate) fn last_doc_in_block(&self) -> DocId {
|
||||
self.last_doc_in_block
|
||||
}
|
||||
|
||||
pub fn position_offset(&self) -> u64 {
|
||||
self.position_offset
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn byte_offset(&self) -> usize {
|
||||
self.byte_offset
|
||||
}
|
||||
|
||||
fn read_block_info(&mut self) {
|
||||
let bytes = self.owned_read;
|
||||
let advance_len: usize;
|
||||
self.last_doc_in_block = read_u32(bytes);
|
||||
let (doc_num_bits, strict_delta_encoded) = decode_bitwidth(bytes[4]);
|
||||
match self.skip_info {
|
||||
IndexRecordOption::Basic => {
|
||||
advance_len = 5;
|
||||
self.block_info = BlockInfo::BitPacked {
|
||||
doc_num_bits,
|
||||
strict_delta_encoded,
|
||||
tf_num_bits: 0,
|
||||
tf_sum: 0,
|
||||
block_wand_fieldnorm_id: 0,
|
||||
block_wand_term_freq: 0,
|
||||
};
|
||||
}
|
||||
IndexRecordOption::WithFreqs => {
|
||||
let tf_num_bits = bytes[5];
|
||||
let block_wand_fieldnorm_id = bytes[6];
|
||||
let block_wand_term_freq = decode_block_wand_max_tf(bytes[7]);
|
||||
advance_len = 8;
|
||||
self.block_info = BlockInfo::BitPacked {
|
||||
doc_num_bits,
|
||||
strict_delta_encoded,
|
||||
tf_num_bits,
|
||||
tf_sum: 0,
|
||||
block_wand_fieldnorm_id,
|
||||
block_wand_term_freq,
|
||||
};
|
||||
}
|
||||
IndexRecordOption::WithFreqsAndPositions => {
|
||||
let tf_num_bits = bytes[5];
|
||||
let tf_sum = read_u32(&bytes[6..10]);
|
||||
let block_wand_fieldnorm_id = bytes[10];
|
||||
let block_wand_term_freq = decode_block_wand_max_tf(bytes[11]);
|
||||
advance_len = 12;
|
||||
self.block_info = BlockInfo::BitPacked {
|
||||
doc_num_bits,
|
||||
strict_delta_encoded,
|
||||
tf_num_bits,
|
||||
tf_sum,
|
||||
block_wand_fieldnorm_id,
|
||||
block_wand_term_freq,
|
||||
};
|
||||
}
|
||||
}
|
||||
// self.owned_read.advance(advance_len);
|
||||
let (_, rest) = self.owned_read.split_at(advance_len);
|
||||
self.owned_read = rest;
|
||||
}
|
||||
|
||||
pub fn block_info(&self) -> BlockInfo {
|
||||
self.block_info
|
||||
}
|
||||
|
||||
/// Advance the skip reader to the block that may contain the target.
|
||||
///
|
||||
/// If the target is larger than all documents, the skip_reader
|
||||
/// then advance to the last Variable In block.
|
||||
pub fn seek(&mut self, target: DocId) -> bool {
|
||||
if self.last_doc_in_block() >= target {
|
||||
return false;
|
||||
}
|
||||
loop {
|
||||
self.advance();
|
||||
if self.last_doc_in_block() >= target {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn advance(&mut self) {
|
||||
match self.block_info {
|
||||
BlockInfo::BitPacked {
|
||||
doc_num_bits,
|
||||
tf_num_bits,
|
||||
tf_sum,
|
||||
..
|
||||
} => {
|
||||
self.remaining_docs -= COMPRESSION_BLOCK_SIZE as u32;
|
||||
self.byte_offset += compressed_block_size(doc_num_bits + tf_num_bits);
|
||||
self.position_offset += tf_sum as u64;
|
||||
}
|
||||
BlockInfo::VInt { num_docs } => {
|
||||
debug_assert_eq!(num_docs, self.remaining_docs);
|
||||
self.remaining_docs = 0;
|
||||
self.byte_offset = usize::MAX;
|
||||
}
|
||||
}
|
||||
self.last_doc_in_previous_block = self.last_doc_in_block;
|
||||
if self.remaining_docs >= COMPRESSION_BLOCK_SIZE as u32 {
|
||||
self.read_block_info();
|
||||
} else {
|
||||
self.last_doc_in_block = TERMINATED;
|
||||
self.block_info = BlockInfo::VInt {
|
||||
num_docs: self.remaining_docs,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,10 @@ mod block_search;
|
||||
|
||||
pub(crate) use self::block_search::branchless_binary_search;
|
||||
|
||||
mod block_segment_postings;
|
||||
pub(crate) mod block_segment_postings;
|
||||
pub(crate) mod borrowed_block_segment_postings;
|
||||
pub(crate) mod borrowed_segment_postings;
|
||||
mod borrowed_skip_reader;
|
||||
pub(crate) mod compression;
|
||||
mod indexing_context;
|
||||
mod json_postings_writer;
|
||||
@@ -15,7 +18,7 @@ mod postings_writer;
|
||||
mod recorder;
|
||||
mod segment_postings;
|
||||
mod serializer;
|
||||
mod skip;
|
||||
pub(crate) mod skip;
|
||||
mod term_info;
|
||||
|
||||
pub(crate) use loaded_postings::LoadedPostings;
|
||||
|
||||
@@ -10,23 +10,23 @@ use crate::{DocId, Score, TERMINATED};
|
||||
// - 1: unused
|
||||
// - 2: is delta-1 encoded. 0 if not, 1, if yes
|
||||
// - 3: a 6 bit number in 0..=32, the actual bitwidth
|
||||
fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 {
|
||||
pub(crate) fn encode_bitwidth(bitwidth: u8, delta_1: bool) -> u8 {
|
||||
bitwidth | ((delta_1 as u8) << 6)
|
||||
}
|
||||
|
||||
fn decode_bitwidth(raw_bitwidth: u8) -> (u8, bool) {
|
||||
let delta_1 = ((raw_bitwidth >> 6) & 1) != 0;
|
||||
pub(crate) fn decode_bitwidth(raw_bitwidth: u8) -> (u8, bool) {
|
||||
let delta_1 = (raw_bitwidth >> 6 & 1) != 0;
|
||||
let bitwidth = raw_bitwidth & 0x3f;
|
||||
(bitwidth, delta_1)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
|
||||
pub(crate) fn encode_block_wand_max_tf(max_tf: u32) -> u8 {
|
||||
max_tf.min(u8::MAX as u32) as u8
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
|
||||
pub(crate) fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
|
||||
if max_tf_code == u8::MAX {
|
||||
u32::MAX
|
||||
} else {
|
||||
@@ -35,12 +35,12 @@ fn decode_block_wand_max_tf(max_tf_code: u8) -> u32 {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn read_u32(data: &[u8]) -> u32 {
|
||||
pub(crate) fn read_u32(data: &[u8]) -> u32 {
|
||||
u32::from_le_bytes(data[..4].try_into().unwrap())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_u32(val: u32, buf: &mut Vec<u8>) {
|
||||
pub(crate) fn write_u32(val: u32, buf: &mut Vec<u8>) {
|
||||
buf.extend_from_slice(&val.to_le_bytes());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user