From 5b6da9123caef7f600492eb734e30b627d67a2df Mon Sep 17 00:00:00 2001 From: Eric Ridge Date: Tue, 4 Mar 2025 15:42:53 -0500 Subject: [PATCH] feat: introduce a `MergeOptimizedInvertedIndexReader` (#32) This is probably a bit of a misnomer as it's really a "PgSearchOptimizedInvertedIndexReaderForMerge". What we've done here is copied `InvertedIndexReader` and internally adjusted it to hold onto the complete `OwnedBytes` of the index's postings and positions. One or two other small touch points were required to make other internal APIs compatabile with this but they don't otherwise change functionality or I/O patterns. `MergeOptimizedInvertedIndexReader` does change I/O patterns, however, in that the merge process now does two (potentially) very large reads when it obtains the new "merge optimized inverted index reader" for each segment. This changes access patterns such that all the reads happen up-front rather than term-by-term as the merge process is solving. A likely downside to this approach is that now pg_search will be, indirectly, holding onto a lot of heap-allocated memory that was read from its block storage. Perhaps in the (near) future we can further optimize the new `MergeOptimizedInvertedIndexReader` such that it pages in blocks of a few megabytes at a time, on demand, rather than the whole file. --- Some unit tests were also updated to resolve compilation problems by PR https://github.com/paradedb/tantivy/pull/31 that for some reason didn't show in CI. #weird --- columnar/src/columnar/merge/tests.rs | 6 + columnar/src/compat_tests.rs | 9 +- columnar/src/tests.rs | 8 +- src/index/inverted_index_reader.rs | 2 +- .../merge_optimized_inverted_index_reader.rs | 105 ++++++++++++++++++ src/index/mod.rs | 1 + src/index/segment_reader.rs | 71 ++++++++++++ src/indexer/merger.rs | 27 +++-- src/postings/block_segment_postings.rs | 5 +- src/postings/segment_postings.rs | 4 +- 10 files changed, 222 insertions(+), 16 deletions(-) create mode 100644 src/index/merge_optimized_inverted_index_reader.rs diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 4487c527c..359c12446 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -205,6 +205,7 @@ fn test_merge_columnar_numbers() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -233,6 +234,7 @@ fn test_merge_columnar_texts() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -282,6 +284,7 @@ fn test_merge_columnar_byte() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -338,6 +341,7 @@ fn test_merge_columnar_byte_with_missing() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -390,6 +394,7 @@ fn test_merge_columnar_different_types() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -455,6 +460,7 @@ fn test_merge_columnar_different_empty_cardinality() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); diff --git a/columnar/src/compat_tests.rs b/columnar/src/compat_tests.rs index e791f5a40..baa9c2b70 100644 --- a/columnar/src/compat_tests.rs +++ b/columnar/src/compat_tests.rs @@ -71,7 +71,14 @@ fn test_format(path: &str) { let columnar_readers = vec![&reader, &reader2]; let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]); let mut out = Vec::new(); - merge_columnar(&columnar_readers, &[], merge_row_order.into(), &mut out).unwrap(); + merge_columnar( + &columnar_readers, + &[], + merge_row_order.into(), + &mut out, + || false, + ) + .unwrap(); let reader = ColumnarReader::open(out).unwrap(); check_columns(&reader); } diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index 5fa537466..f9bb6ba61 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -641,7 +641,7 @@ proptest! { let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); let mut output: Vec = Vec::new(); let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into(); - crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output).unwrap(); + crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output, || false,).unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); let concat_rows: Vec> = columnar_docs.iter().flatten().cloned().collect(); let expected_merged_columnar = build_columnar(&concat_rows[..]); @@ -665,6 +665,7 @@ fn test_columnar_merging_empty_columnar() { &[], crate::MergeRowOrder::Stack(stack_merge_order), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -702,6 +703,7 @@ fn test_columnar_merging_number_columns() { &[], crate::MergeRowOrder::Stack(stack_merge_order), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -775,6 +777,7 @@ fn test_columnar_merge_and_remap( &[], shuffle_merge_order.into(), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -817,6 +820,7 @@ fn test_columnar_merge_empty() { &[], shuffle_merge_order.into(), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -843,6 +847,7 @@ fn test_columnar_merge_single_str_column() { &[], shuffle_merge_order.into(), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -875,6 +880,7 @@ fn test_delete_decrease_cardinality() { &[], shuffle_merge_order.into(), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 7314f8741..d3c0e84e2 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -211,7 +211,7 @@ impl InvertedIndexReader { .slice(term_info.postings_range.clone()); BlockSegmentPostings::open( term_info.doc_freq, - postings_data, + postings_data.read_bytes()?, self.record_option, requested_option, ) diff --git a/src/index/merge_optimized_inverted_index_reader.rs b/src/index/merge_optimized_inverted_index_reader.rs new file mode 100644 index 000000000..afe89bd85 --- /dev/null +++ b/src/index/merge_optimized_inverted_index_reader.rs @@ -0,0 +1,105 @@ +use std::io; + +use common::OwnedBytes; + +use crate::directory::FileSlice; +use crate::positions::PositionReader; +use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo}; +use crate::schema::IndexRecordOption; +use crate::termdict::TermDictionary; + +/// The inverted index reader is in charge of accessing +/// the inverted index associated with a specific field. +/// +/// This is optimized for merging in that it full reads +/// the postings and positions files into memory when opened. +/// This eliminates all disk I/O to these files during merging. +/// +/// NB: This is a copy/paste from [`InvertedIndexReader`] and trimmed +/// down to only include the methods required by the merge process. +pub(crate) struct MergeOptimizedInvertedIndexReader { + termdict: TermDictionary, + postings_bytes: OwnedBytes, + positions_bytes: OwnedBytes, + record_option: IndexRecordOption, +} + +impl MergeOptimizedInvertedIndexReader { + pub(crate) fn new( + termdict: TermDictionary, + postings_file_slice: FileSlice, + positions_file_slice: FileSlice, + record_option: IndexRecordOption, + ) -> io::Result { + let (_, postings_body) = postings_file_slice.split(8); + Ok(MergeOptimizedInvertedIndexReader { + termdict, + postings_bytes: postings_body.read_bytes()?, + positions_bytes: positions_file_slice.read_bytes()?, + record_option, + }) + } + + /// Creates an empty `InvertedIndexReader` object, which + /// contains no terms at all. + pub fn empty(record_option: IndexRecordOption) -> MergeOptimizedInvertedIndexReader { + MergeOptimizedInvertedIndexReader { + termdict: TermDictionary::empty(), + postings_bytes: FileSlice::empty().read_bytes().unwrap(), + positions_bytes: FileSlice::empty().read_bytes().unwrap(), + record_option, + } + } + + /// Return the term dictionary datastructure. + pub fn terms(&self) -> &TermDictionary { + &self.termdict + } + + /// Returns a block postings given a `term_info`. + /// This method is for an advanced usage only. + /// + /// Most users should prefer using [`Self::read_postings()`] instead. + pub fn read_block_postings_from_terminfo( + &self, + term_info: &TermInfo, + requested_option: IndexRecordOption, + ) -> io::Result { + let postings_data = self.postings_bytes.slice(term_info.postings_range.clone()); + BlockSegmentPostings::open( + term_info.doc_freq, + postings_data, + self.record_option, + requested_option, + ) + } + + /// Returns a posting object given a `term_info`. + /// This method is for an advanced usage only. + /// + /// Most users should prefer using [`Self::read_postings()`] instead. + pub fn read_postings_from_terminfo( + &self, + term_info: &TermInfo, + option: IndexRecordOption, + ) -> io::Result { + let option = option.downgrade(self.record_option); + + let block_postings = self.read_block_postings_from_terminfo(term_info, option)?; + let position_reader = { + if option.has_positions() { + let positions_data = self + .positions_bytes + .slice(term_info.positions_range.clone()); + let position_reader = PositionReader::open(positions_data)?; + Some(position_reader) + } else { + None + } + }; + Ok(SegmentPostings::from_block_postings( + block_postings, + position_reader, + )) + } +} diff --git a/src/index/mod.rs b/src/index/mod.rs index ef079e080..776074ba6 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -5,6 +5,7 @@ mod index; mod index_meta; mod inverted_index_reader; +pub mod merge_optimized_inverted_index_reader; mod segment; mod segment_component; mod segment_id; diff --git a/src/index/segment_reader.rs b/src/index/segment_reader.rs index 06f187119..2732ae4bd 100644 --- a/src/index/segment_reader.rs +++ b/src/index/segment_reader.rs @@ -13,6 +13,7 @@ use crate::directory::{CompositeFile, FileSlice}; use crate::error::DataCorruption; use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders}; use crate::fieldnorm::{FieldNormReader, FieldNormReaders}; +use crate::index::merge_optimized_inverted_index_reader::MergeOptimizedInvertedIndexReader; use crate::index::{InvertedIndexReader, Segment, SegmentComponent, SegmentId}; use crate::json_utils::json_path_sep_to_dot; use crate::schema::{Field, IndexRecordOption, Schema, Type}; @@ -262,6 +263,76 @@ impl SegmentReader { Ok(inv_idx_reader) } + /// Returns a field reader associated with the field given in argument that is optimized for + /// Tantivy's merge process. + /// + /// If the field was not present in the index during indexing time, + /// the InvertedIndexReader is empty. + /// + /// The field reader is in charge of iterating through the + /// term dictionary associated with a specific field, + /// and opening the posting list associated with any term. + /// + /// If the field is not marked as index, a warning is logged and an empty + /// `MergeOptimizedInvertedIndexReader` is returned. + /// Similarly, if the field is marked as indexed but no term has been indexed for the given + /// index, an empty `MergeOptimizedInvertedIndexReader` is returned (but no warning is logged). + pub(crate) fn merge_optimized_inverted_index( + &self, + field: Field, + ) -> crate::Result> { + let field_entry = self.schema.get_field_entry(field); + let field_type = field_entry.field_type(); + let record_option_opt = field_type.get_index_record_option(); + + if record_option_opt.is_none() { + warn!("Field {:?} does not seem indexed.", field_entry.name()); + } + + let postings_file_opt = self.postings_composite().open_read(field); + + if postings_file_opt.is_none() || record_option_opt.is_none() { + // no documents in the segment contained this field. + // As a result, no data is associated with the inverted index. + // + // Returns an empty inverted index. + let record_option = record_option_opt.unwrap_or(IndexRecordOption::Basic); + return Ok(Arc::new(MergeOptimizedInvertedIndexReader::empty( + record_option, + ))); + } + + let record_option = record_option_opt.unwrap(); + let postings_file = postings_file_opt.unwrap(); + + let termdict_file: FileSlice = + self.termdict_composite().open_read(field).ok_or_else(|| { + DataCorruption::comment_only(format!( + "Failed to open field {:?}'s term dictionary in the composite file. Has the \ + schema been modified?", + field_entry.name() + )) + })?; + + let positions_file = self.positions_composite().open_read(field).ok_or_else(|| { + let error_msg = format!( + "Failed to open field {:?}'s positions in the composite file. Has the schema been \ + modified?", + field_entry.name() + ); + DataCorruption::comment_only(error_msg) + })?; + + let inv_idx_reader = Arc::new(MergeOptimizedInvertedIndexReader::new( + TermDictionary::open(termdict_file)?, + postings_file, + positions_file, + record_option, + )?); + + Ok(inv_idx_reader) + } + /// Returns the list of fields that have been indexed in the segment. /// The field list includes the field defined in the schema as well as the fields /// that have been indexed as a part of a JSON field. diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index babc16693..26eb692d7 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -12,6 +12,7 @@ use crate::docset::{DocSet, TERMINATED}; use crate::error::DataCorruption; use crate::fastfield::AliveBitSet; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; +use crate::index::merge_optimized_inverted_index_reader::MergeOptimizedInvertedIndexReader; use crate::index::{Segment, SegmentComponent, SegmentReader}; use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping}; use crate::indexer::segment_updater::CancelSentinel; @@ -20,7 +21,7 @@ use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings}; use crate::schema::{value_type_to_column_type, Field, FieldType, Schema}; use crate::store::StoreWriter; use crate::termdict::{TermMerger, TermOrdinal}; -use crate::{DocAddress, DocId, InvertedIndexReader}; +use crate::{DocAddress, DocId}; /// Segment's max doc must be `< MAX_DOC_LIMIT`. /// @@ -309,10 +310,10 @@ impl IndexMerger { let mut max_term_ords: Vec = Vec::new(); - let field_readers: Vec> = self + let field_readers: Vec> = self .readers .iter() - .map(|reader| reader.inverted_index(indexed_field)) + .map(|reader| reader.merge_optimized_inverted_index(indexed_field)) .collect::>>()?; let mut field_term_streams = Vec::new(); @@ -369,10 +370,16 @@ impl IndexMerger { let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![]; + let mut cnt = 0; while merged_terms.advance() { - if self.cancel.wants_cancel() { - return Err(crate::TantivyError::Cancelled); + // calling `wants_cancel()` could be expensive so only do it so often + if cnt % 1000 == 0 { + if self.cancel.wants_cancel() { + return Err(crate::TantivyError::Cancelled); + } } + cnt += 1; + segment_postings_containing_the_term.clear(); let term_bytes: &[u8] = merged_terms.key(); @@ -381,7 +388,8 @@ impl IndexMerger { // Let's compute the list of non-empty posting lists for (segment_ord, term_info) in merged_terms.current_segment_ords_and_term_infos() { let segment_reader = &self.readers[segment_ord]; - let inverted_index: &InvertedIndexReader = &field_readers[segment_ord]; + let inverted_index: &MergeOptimizedInvertedIndexReader = + &field_readers[segment_ord]; let segment_postings = inverted_index .read_postings_from_terminfo(&term_info, segment_postings_option)?; let alive_bitset_opt = segment_reader.alive_bitset(); @@ -451,8 +459,11 @@ impl IndexMerger { let mut doc = segment_postings.doc(); while doc != TERMINATED { - if self.cancel.wants_cancel() { - return Err(crate::TantivyError::Cancelled); + if doc % 1000 == 0 { + // calling `wants_cancel()` could be expensive so only do it so often + if self.cancel.wants_cancel() { + return Err(crate::TantivyError::Cancelled); + } } // deleted doc are skipped as they do not have a `remapped_doc_id`. if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] { diff --git a/src/postings/block_segment_postings.rs b/src/postings/block_segment_postings.rs index cc8894294..26646cdbd 100644 --- a/src/postings/block_segment_postings.rs +++ b/src/postings/block_segment_postings.rs @@ -2,7 +2,7 @@ use std::io; use common::VInt; -use crate::directory::{FileSlice, OwnedBytes}; +use crate::directory::OwnedBytes; use crate::fieldnorm::FieldNormReader; use crate::postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE}; use crate::postings::{BlockInfo, FreqReadingOption, SkipReader}; @@ -96,11 +96,10 @@ impl BlockSegmentPostings { /// term frequency blocks. pub(crate) fn open( doc_freq: u32, - data: FileSlice, + bytes: OwnedBytes, mut record_option: IndexRecordOption, requested_option: IndexRecordOption, ) -> io::Result { - let bytes = data.read_bytes()?; let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?; let skip_reader = match skip_data_opt { Some(skip_data) => { diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index d9ba33eb2..2319916d3 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -81,7 +81,7 @@ impl SegmentPostings { } let block_segment_postings = BlockSegmentPostings::open( docs.len() as u32, - FileSlice::from(buffer), + FileSlice::from(buffer).read_bytes().unwrap(), IndexRecordOption::Basic, IndexRecordOption::Basic, ) @@ -129,7 +129,7 @@ impl SegmentPostings { .unwrap(); let block_segment_postings = BlockSegmentPostings::open( doc_and_tfs.len() as u32, - FileSlice::from(buffer), + FileSlice::from(buffer).read_bytes().unwrap(), IndexRecordOption::WithFreqs, IndexRecordOption::WithFreqs, )