mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-30 15:10:40 +00:00
feat: introduce a MergeOptimizedInvertedIndexReader (#32)
This is probably a bit of a misnomer as it's really a "PgSearchOptimizedInvertedIndexReaderForMerge". What we've done here is copied `InvertedIndexReader` and internally adjusted it to hold onto the complete `OwnedBytes` of the index's postings and positions. One or two other small touch points were required to make other internal APIs compatabile with this but they don't otherwise change functionality or I/O patterns. `MergeOptimizedInvertedIndexReader` does change I/O patterns, however, in that the merge process now does two (potentially) very large reads when it obtains the new "merge optimized inverted index reader" for each segment. This changes access patterns such that all the reads happen up-front rather than term-by-term as the merge process is solving. A likely downside to this approach is that now pg_search will be, indirectly, holding onto a lot of heap-allocated memory that was read from its block storage. Perhaps in the (near) future we can further optimize the new `MergeOptimizedInvertedIndexReader` such that it pages in blocks of a few megabytes at a time, on demand, rather than the whole file. --- Some unit tests were also updated to resolve compilation problems by PR https://github.com/paradedb/tantivy/pull/31 that for some reason didn't show in CI. #weird
This commit is contained in:
@@ -205,6 +205,7 @@ fn test_merge_columnar_numbers() {
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut buffer,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
@@ -233,6 +234,7 @@ fn test_merge_columnar_texts() {
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut buffer,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
@@ -282,6 +284,7 @@ fn test_merge_columnar_byte() {
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut buffer,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
@@ -338,6 +341,7 @@ fn test_merge_columnar_byte_with_missing() {
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut buffer,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
@@ -390,6 +394,7 @@ fn test_merge_columnar_different_types() {
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut buffer,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
@@ -455,6 +460,7 @@ fn test_merge_columnar_different_empty_cardinality() {
|
||||
&[],
|
||||
MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut buffer,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let columnar_reader = ColumnarReader::open(buffer).unwrap();
|
||||
|
||||
@@ -71,7 +71,14 @@ fn test_format(path: &str) {
|
||||
let columnar_readers = vec![&reader, &reader2];
|
||||
let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]);
|
||||
let mut out = Vec::new();
|
||||
merge_columnar(&columnar_readers, &[], merge_row_order.into(), &mut out).unwrap();
|
||||
merge_columnar(
|
||||
&columnar_readers,
|
||||
&[],
|
||||
merge_row_order.into(),
|
||||
&mut out,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let reader = ColumnarReader::open(out).unwrap();
|
||||
check_columns(&reader);
|
||||
}
|
||||
|
||||
@@ -641,7 +641,7 @@ proptest! {
|
||||
let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect();
|
||||
let mut output: Vec<u8> = Vec::new();
|
||||
let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into();
|
||||
crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output).unwrap();
|
||||
crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output, || false,).unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
let concat_rows: Vec<Vec<(&'static str, ColumnValue)>> = columnar_docs.iter().flatten().cloned().collect();
|
||||
let expected_merged_columnar = build_columnar(&concat_rows[..]);
|
||||
@@ -665,6 +665,7 @@ fn test_columnar_merging_empty_columnar() {
|
||||
&[],
|
||||
crate::MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut output,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
@@ -702,6 +703,7 @@ fn test_columnar_merging_number_columns() {
|
||||
&[],
|
||||
crate::MergeRowOrder::Stack(stack_merge_order),
|
||||
&mut output,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
@@ -775,6 +777,7 @@ fn test_columnar_merge_and_remap(
|
||||
&[],
|
||||
shuffle_merge_order.into(),
|
||||
&mut output,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
@@ -817,6 +820,7 @@ fn test_columnar_merge_empty() {
|
||||
&[],
|
||||
shuffle_merge_order.into(),
|
||||
&mut output,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
@@ -843,6 +847,7 @@ fn test_columnar_merge_single_str_column() {
|
||||
&[],
|
||||
shuffle_merge_order.into(),
|
||||
&mut output,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
@@ -875,6 +880,7 @@ fn test_delete_decrease_cardinality() {
|
||||
&[],
|
||||
shuffle_merge_order.into(),
|
||||
&mut output,
|
||||
|| false,
|
||||
)
|
||||
.unwrap();
|
||||
let merged_columnar = ColumnarReader::open(output).unwrap();
|
||||
|
||||
@@ -211,7 +211,7 @@ impl InvertedIndexReader {
|
||||
.slice(term_info.postings_range.clone());
|
||||
BlockSegmentPostings::open(
|
||||
term_info.doc_freq,
|
||||
postings_data,
|
||||
postings_data.read_bytes()?,
|
||||
self.record_option,
|
||||
requested_option,
|
||||
)
|
||||
|
||||
105
src/index/merge_optimized_inverted_index_reader.rs
Normal file
105
src/index/merge_optimized_inverted_index_reader.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
use std::io;
|
||||
|
||||
use common::OwnedBytes;
|
||||
|
||||
use crate::directory::FileSlice;
|
||||
use crate::positions::PositionReader;
|
||||
use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo};
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::termdict::TermDictionary;
|
||||
|
||||
/// The inverted index reader is in charge of accessing
|
||||
/// the inverted index associated with a specific field.
|
||||
///
|
||||
/// This is optimized for merging in that it full reads
|
||||
/// the postings and positions files into memory when opened.
|
||||
/// This eliminates all disk I/O to these files during merging.
|
||||
///
|
||||
/// NB: This is a copy/paste from [`InvertedIndexReader`] and trimmed
|
||||
/// down to only include the methods required by the merge process.
|
||||
pub(crate) struct MergeOptimizedInvertedIndexReader {
|
||||
termdict: TermDictionary,
|
||||
postings_bytes: OwnedBytes,
|
||||
positions_bytes: OwnedBytes,
|
||||
record_option: IndexRecordOption,
|
||||
}
|
||||
|
||||
impl MergeOptimizedInvertedIndexReader {
|
||||
pub(crate) fn new(
|
||||
termdict: TermDictionary,
|
||||
postings_file_slice: FileSlice,
|
||||
positions_file_slice: FileSlice,
|
||||
record_option: IndexRecordOption,
|
||||
) -> io::Result<MergeOptimizedInvertedIndexReader> {
|
||||
let (_, postings_body) = postings_file_slice.split(8);
|
||||
Ok(MergeOptimizedInvertedIndexReader {
|
||||
termdict,
|
||||
postings_bytes: postings_body.read_bytes()?,
|
||||
positions_bytes: positions_file_slice.read_bytes()?,
|
||||
record_option,
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates an empty `InvertedIndexReader` object, which
|
||||
/// contains no terms at all.
|
||||
pub fn empty(record_option: IndexRecordOption) -> MergeOptimizedInvertedIndexReader {
|
||||
MergeOptimizedInvertedIndexReader {
|
||||
termdict: TermDictionary::empty(),
|
||||
postings_bytes: FileSlice::empty().read_bytes().unwrap(),
|
||||
positions_bytes: FileSlice::empty().read_bytes().unwrap(),
|
||||
record_option,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the term dictionary datastructure.
|
||||
pub fn terms(&self) -> &TermDictionary {
|
||||
&self.termdict
|
||||
}
|
||||
|
||||
/// Returns a block postings given a `term_info`.
|
||||
/// This method is for an advanced usage only.
|
||||
///
|
||||
/// Most users should prefer using [`Self::read_postings()`] instead.
|
||||
pub fn read_block_postings_from_terminfo(
|
||||
&self,
|
||||
term_info: &TermInfo,
|
||||
requested_option: IndexRecordOption,
|
||||
) -> io::Result<BlockSegmentPostings> {
|
||||
let postings_data = self.postings_bytes.slice(term_info.postings_range.clone());
|
||||
BlockSegmentPostings::open(
|
||||
term_info.doc_freq,
|
||||
postings_data,
|
||||
self.record_option,
|
||||
requested_option,
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns a posting object given a `term_info`.
|
||||
/// This method is for an advanced usage only.
|
||||
///
|
||||
/// Most users should prefer using [`Self::read_postings()`] instead.
|
||||
pub fn read_postings_from_terminfo(
|
||||
&self,
|
||||
term_info: &TermInfo,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<SegmentPostings> {
|
||||
let option = option.downgrade(self.record_option);
|
||||
|
||||
let block_postings = self.read_block_postings_from_terminfo(term_info, option)?;
|
||||
let position_reader = {
|
||||
if option.has_positions() {
|
||||
let positions_data = self
|
||||
.positions_bytes
|
||||
.slice(term_info.positions_range.clone());
|
||||
let position_reader = PositionReader::open(positions_data)?;
|
||||
Some(position_reader)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
Ok(SegmentPostings::from_block_postings(
|
||||
block_postings,
|
||||
position_reader,
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@
|
||||
mod index;
|
||||
mod index_meta;
|
||||
mod inverted_index_reader;
|
||||
pub mod merge_optimized_inverted_index_reader;
|
||||
mod segment;
|
||||
mod segment_component;
|
||||
mod segment_id;
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::directory::{CompositeFile, FileSlice};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders};
|
||||
use crate::fieldnorm::{FieldNormReader, FieldNormReaders};
|
||||
use crate::index::merge_optimized_inverted_index_reader::MergeOptimizedInvertedIndexReader;
|
||||
use crate::index::{InvertedIndexReader, Segment, SegmentComponent, SegmentId};
|
||||
use crate::json_utils::json_path_sep_to_dot;
|
||||
use crate::schema::{Field, IndexRecordOption, Schema, Type};
|
||||
@@ -262,6 +263,76 @@ impl SegmentReader {
|
||||
Ok(inv_idx_reader)
|
||||
}
|
||||
|
||||
/// Returns a field reader associated with the field given in argument that is optimized for
|
||||
/// Tantivy's merge process.
|
||||
///
|
||||
/// If the field was not present in the index during indexing time,
|
||||
/// the InvertedIndexReader is empty.
|
||||
///
|
||||
/// The field reader is in charge of iterating through the
|
||||
/// term dictionary associated with a specific field,
|
||||
/// and opening the posting list associated with any term.
|
||||
///
|
||||
/// If the field is not marked as index, a warning is logged and an empty
|
||||
/// `MergeOptimizedInvertedIndexReader` is returned.
|
||||
/// Similarly, if the field is marked as indexed but no term has been indexed for the given
|
||||
/// index, an empty `MergeOptimizedInvertedIndexReader` is returned (but no warning is logged).
|
||||
pub(crate) fn merge_optimized_inverted_index(
|
||||
&self,
|
||||
field: Field,
|
||||
) -> crate::Result<Arc<MergeOptimizedInvertedIndexReader>> {
|
||||
let field_entry = self.schema.get_field_entry(field);
|
||||
let field_type = field_entry.field_type();
|
||||
let record_option_opt = field_type.get_index_record_option();
|
||||
|
||||
if record_option_opt.is_none() {
|
||||
warn!("Field {:?} does not seem indexed.", field_entry.name());
|
||||
}
|
||||
|
||||
let postings_file_opt = self.postings_composite().open_read(field);
|
||||
|
||||
if postings_file_opt.is_none() || record_option_opt.is_none() {
|
||||
// no documents in the segment contained this field.
|
||||
// As a result, no data is associated with the inverted index.
|
||||
//
|
||||
// Returns an empty inverted index.
|
||||
let record_option = record_option_opt.unwrap_or(IndexRecordOption::Basic);
|
||||
return Ok(Arc::new(MergeOptimizedInvertedIndexReader::empty(
|
||||
record_option,
|
||||
)));
|
||||
}
|
||||
|
||||
let record_option = record_option_opt.unwrap();
|
||||
let postings_file = postings_file_opt.unwrap();
|
||||
|
||||
let termdict_file: FileSlice =
|
||||
self.termdict_composite().open_read(field).ok_or_else(|| {
|
||||
DataCorruption::comment_only(format!(
|
||||
"Failed to open field {:?}'s term dictionary in the composite file. Has the \
|
||||
schema been modified?",
|
||||
field_entry.name()
|
||||
))
|
||||
})?;
|
||||
|
||||
let positions_file = self.positions_composite().open_read(field).ok_or_else(|| {
|
||||
let error_msg = format!(
|
||||
"Failed to open field {:?}'s positions in the composite file. Has the schema been \
|
||||
modified?",
|
||||
field_entry.name()
|
||||
);
|
||||
DataCorruption::comment_only(error_msg)
|
||||
})?;
|
||||
|
||||
let inv_idx_reader = Arc::new(MergeOptimizedInvertedIndexReader::new(
|
||||
TermDictionary::open(termdict_file)?,
|
||||
postings_file,
|
||||
positions_file,
|
||||
record_option,
|
||||
)?);
|
||||
|
||||
Ok(inv_idx_reader)
|
||||
}
|
||||
|
||||
/// Returns the list of fields that have been indexed in the segment.
|
||||
/// The field list includes the field defined in the schema as well as the fields
|
||||
/// that have been indexed as a part of a JSON field.
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::docset::{DocSet, TERMINATED};
|
||||
use crate::error::DataCorruption;
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
|
||||
use crate::index::merge_optimized_inverted_index_reader::MergeOptimizedInvertedIndexReader;
|
||||
use crate::index::{Segment, SegmentComponent, SegmentReader};
|
||||
use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping};
|
||||
use crate::indexer::segment_updater::CancelSentinel;
|
||||
@@ -20,7 +21,7 @@ use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
|
||||
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema};
|
||||
use crate::store::StoreWriter;
|
||||
use crate::termdict::{TermMerger, TermOrdinal};
|
||||
use crate::{DocAddress, DocId, InvertedIndexReader};
|
||||
use crate::{DocAddress, DocId};
|
||||
|
||||
/// Segment's max doc must be `< MAX_DOC_LIMIT`.
|
||||
///
|
||||
@@ -309,10 +310,10 @@ impl IndexMerger {
|
||||
|
||||
let mut max_term_ords: Vec<TermOrdinal> = Vec::new();
|
||||
|
||||
let field_readers: Vec<Arc<InvertedIndexReader>> = self
|
||||
let field_readers: Vec<Arc<MergeOptimizedInvertedIndexReader>> = self
|
||||
.readers
|
||||
.iter()
|
||||
.map(|reader| reader.inverted_index(indexed_field))
|
||||
.map(|reader| reader.merge_optimized_inverted_index(indexed_field))
|
||||
.collect::<crate::Result<Vec<_>>>()?;
|
||||
|
||||
let mut field_term_streams = Vec::new();
|
||||
@@ -369,10 +370,16 @@ impl IndexMerger {
|
||||
|
||||
let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![];
|
||||
|
||||
let mut cnt = 0;
|
||||
while merged_terms.advance() {
|
||||
if self.cancel.wants_cancel() {
|
||||
return Err(crate::TantivyError::Cancelled);
|
||||
// calling `wants_cancel()` could be expensive so only do it so often
|
||||
if cnt % 1000 == 0 {
|
||||
if self.cancel.wants_cancel() {
|
||||
return Err(crate::TantivyError::Cancelled);
|
||||
}
|
||||
}
|
||||
cnt += 1;
|
||||
|
||||
segment_postings_containing_the_term.clear();
|
||||
let term_bytes: &[u8] = merged_terms.key();
|
||||
|
||||
@@ -381,7 +388,8 @@ impl IndexMerger {
|
||||
// Let's compute the list of non-empty posting lists
|
||||
for (segment_ord, term_info) in merged_terms.current_segment_ords_and_term_infos() {
|
||||
let segment_reader = &self.readers[segment_ord];
|
||||
let inverted_index: &InvertedIndexReader = &field_readers[segment_ord];
|
||||
let inverted_index: &MergeOptimizedInvertedIndexReader =
|
||||
&field_readers[segment_ord];
|
||||
let segment_postings = inverted_index
|
||||
.read_postings_from_terminfo(&term_info, segment_postings_option)?;
|
||||
let alive_bitset_opt = segment_reader.alive_bitset();
|
||||
@@ -451,8 +459,11 @@ impl IndexMerger {
|
||||
|
||||
let mut doc = segment_postings.doc();
|
||||
while doc != TERMINATED {
|
||||
if self.cancel.wants_cancel() {
|
||||
return Err(crate::TantivyError::Cancelled);
|
||||
if doc % 1000 == 0 {
|
||||
// calling `wants_cancel()` could be expensive so only do it so often
|
||||
if self.cancel.wants_cancel() {
|
||||
return Err(crate::TantivyError::Cancelled);
|
||||
}
|
||||
}
|
||||
// deleted doc are skipped as they do not have a `remapped_doc_id`.
|
||||
if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] {
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::io;
|
||||
|
||||
use common::VInt;
|
||||
|
||||
use crate::directory::{FileSlice, OwnedBytes};
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::fieldnorm::FieldNormReader;
|
||||
use crate::postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE};
|
||||
use crate::postings::{BlockInfo, FreqReadingOption, SkipReader};
|
||||
@@ -96,11 +96,10 @@ impl BlockSegmentPostings {
|
||||
/// term frequency blocks.
|
||||
pub(crate) fn open(
|
||||
doc_freq: u32,
|
||||
data: FileSlice,
|
||||
bytes: OwnedBytes,
|
||||
mut record_option: IndexRecordOption,
|
||||
requested_option: IndexRecordOption,
|
||||
) -> io::Result<BlockSegmentPostings> {
|
||||
let bytes = data.read_bytes()?;
|
||||
let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?;
|
||||
let skip_reader = match skip_data_opt {
|
||||
Some(skip_data) => {
|
||||
|
||||
@@ -81,7 +81,7 @@ impl SegmentPostings {
|
||||
}
|
||||
let block_segment_postings = BlockSegmentPostings::open(
|
||||
docs.len() as u32,
|
||||
FileSlice::from(buffer),
|
||||
FileSlice::from(buffer).read_bytes().unwrap(),
|
||||
IndexRecordOption::Basic,
|
||||
IndexRecordOption::Basic,
|
||||
)
|
||||
@@ -129,7 +129,7 @@ impl SegmentPostings {
|
||||
.unwrap();
|
||||
let block_segment_postings = BlockSegmentPostings::open(
|
||||
doc_and_tfs.len() as u32,
|
||||
FileSlice::from(buffer),
|
||||
FileSlice::from(buffer).read_bytes().unwrap(),
|
||||
IndexRecordOption::WithFreqs,
|
||||
IndexRecordOption::WithFreqs,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user