diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 4487c527c..359c12446 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -205,6 +205,7 @@ fn test_merge_columnar_numbers() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -233,6 +234,7 @@ fn test_merge_columnar_texts() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -282,6 +284,7 @@ fn test_merge_columnar_byte() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -338,6 +341,7 @@ fn test_merge_columnar_byte_with_missing() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -390,6 +394,7 @@ fn test_merge_columnar_different_types() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); @@ -455,6 +460,7 @@ fn test_merge_columnar_different_empty_cardinality() { &[], MergeRowOrder::Stack(stack_merge_order), &mut buffer, + || false, ) .unwrap(); let columnar_reader = ColumnarReader::open(buffer).unwrap(); diff --git a/columnar/src/compat_tests.rs b/columnar/src/compat_tests.rs index e791f5a40..baa9c2b70 100644 --- a/columnar/src/compat_tests.rs +++ b/columnar/src/compat_tests.rs @@ -71,7 +71,14 @@ fn test_format(path: &str) { let columnar_readers = vec![&reader, &reader2]; let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]); let mut out = Vec::new(); - merge_columnar(&columnar_readers, &[], merge_row_order.into(), &mut out).unwrap(); + merge_columnar( + &columnar_readers, + &[], + merge_row_order.into(), + &mut out, + || false, + ) + .unwrap(); let reader = ColumnarReader::open(out).unwrap(); check_columns(&reader); } diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index 5fa537466..f9bb6ba61 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -641,7 +641,7 @@ proptest! { let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); let mut output: Vec = Vec::new(); let stack_merge_order = StackMergeOrder::stack(&columnar_readers_arr[..]).into(); - crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output).unwrap(); + crate::merge_columnar(&columnar_readers_arr[..], &[], stack_merge_order, &mut output, || false,).unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); let concat_rows: Vec> = columnar_docs.iter().flatten().cloned().collect(); let expected_merged_columnar = build_columnar(&concat_rows[..]); @@ -665,6 +665,7 @@ fn test_columnar_merging_empty_columnar() { &[], crate::MergeRowOrder::Stack(stack_merge_order), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -702,6 +703,7 @@ fn test_columnar_merging_number_columns() { &[], crate::MergeRowOrder::Stack(stack_merge_order), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -775,6 +777,7 @@ fn test_columnar_merge_and_remap( &[], shuffle_merge_order.into(), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -817,6 +820,7 @@ fn test_columnar_merge_empty() { &[], shuffle_merge_order.into(), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -843,6 +847,7 @@ fn test_columnar_merge_single_str_column() { &[], shuffle_merge_order.into(), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); @@ -875,6 +880,7 @@ fn test_delete_decrease_cardinality() { &[], shuffle_merge_order.into(), &mut output, + || false, ) .unwrap(); let merged_columnar = ColumnarReader::open(output).unwrap(); diff --git a/src/index/inverted_index_reader.rs b/src/index/inverted_index_reader.rs index 7314f8741..d3c0e84e2 100644 --- a/src/index/inverted_index_reader.rs +++ b/src/index/inverted_index_reader.rs @@ -211,7 +211,7 @@ impl InvertedIndexReader { .slice(term_info.postings_range.clone()); BlockSegmentPostings::open( term_info.doc_freq, - postings_data, + postings_data.read_bytes()?, self.record_option, requested_option, ) diff --git a/src/index/merge_optimized_inverted_index_reader.rs b/src/index/merge_optimized_inverted_index_reader.rs new file mode 100644 index 000000000..afe89bd85 --- /dev/null +++ b/src/index/merge_optimized_inverted_index_reader.rs @@ -0,0 +1,105 @@ +use std::io; + +use common::OwnedBytes; + +use crate::directory::FileSlice; +use crate::positions::PositionReader; +use crate::postings::{BlockSegmentPostings, SegmentPostings, TermInfo}; +use crate::schema::IndexRecordOption; +use crate::termdict::TermDictionary; + +/// The inverted index reader is in charge of accessing +/// the inverted index associated with a specific field. +/// +/// This is optimized for merging in that it full reads +/// the postings and positions files into memory when opened. +/// This eliminates all disk I/O to these files during merging. +/// +/// NB: This is a copy/paste from [`InvertedIndexReader`] and trimmed +/// down to only include the methods required by the merge process. +pub(crate) struct MergeOptimizedInvertedIndexReader { + termdict: TermDictionary, + postings_bytes: OwnedBytes, + positions_bytes: OwnedBytes, + record_option: IndexRecordOption, +} + +impl MergeOptimizedInvertedIndexReader { + pub(crate) fn new( + termdict: TermDictionary, + postings_file_slice: FileSlice, + positions_file_slice: FileSlice, + record_option: IndexRecordOption, + ) -> io::Result { + let (_, postings_body) = postings_file_slice.split(8); + Ok(MergeOptimizedInvertedIndexReader { + termdict, + postings_bytes: postings_body.read_bytes()?, + positions_bytes: positions_file_slice.read_bytes()?, + record_option, + }) + } + + /// Creates an empty `InvertedIndexReader` object, which + /// contains no terms at all. + pub fn empty(record_option: IndexRecordOption) -> MergeOptimizedInvertedIndexReader { + MergeOptimizedInvertedIndexReader { + termdict: TermDictionary::empty(), + postings_bytes: FileSlice::empty().read_bytes().unwrap(), + positions_bytes: FileSlice::empty().read_bytes().unwrap(), + record_option, + } + } + + /// Return the term dictionary datastructure. + pub fn terms(&self) -> &TermDictionary { + &self.termdict + } + + /// Returns a block postings given a `term_info`. + /// This method is for an advanced usage only. + /// + /// Most users should prefer using [`Self::read_postings()`] instead. + pub fn read_block_postings_from_terminfo( + &self, + term_info: &TermInfo, + requested_option: IndexRecordOption, + ) -> io::Result { + let postings_data = self.postings_bytes.slice(term_info.postings_range.clone()); + BlockSegmentPostings::open( + term_info.doc_freq, + postings_data, + self.record_option, + requested_option, + ) + } + + /// Returns a posting object given a `term_info`. + /// This method is for an advanced usage only. + /// + /// Most users should prefer using [`Self::read_postings()`] instead. + pub fn read_postings_from_terminfo( + &self, + term_info: &TermInfo, + option: IndexRecordOption, + ) -> io::Result { + let option = option.downgrade(self.record_option); + + let block_postings = self.read_block_postings_from_terminfo(term_info, option)?; + let position_reader = { + if option.has_positions() { + let positions_data = self + .positions_bytes + .slice(term_info.positions_range.clone()); + let position_reader = PositionReader::open(positions_data)?; + Some(position_reader) + } else { + None + } + }; + Ok(SegmentPostings::from_block_postings( + block_postings, + position_reader, + )) + } +} diff --git a/src/index/mod.rs b/src/index/mod.rs index ef079e080..776074ba6 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -5,6 +5,7 @@ mod index; mod index_meta; mod inverted_index_reader; +pub mod merge_optimized_inverted_index_reader; mod segment; mod segment_component; mod segment_id; diff --git a/src/index/segment_reader.rs b/src/index/segment_reader.rs index 06f187119..2732ae4bd 100644 --- a/src/index/segment_reader.rs +++ b/src/index/segment_reader.rs @@ -13,6 +13,7 @@ use crate::directory::{CompositeFile, FileSlice}; use crate::error::DataCorruption; use crate::fastfield::{intersect_alive_bitsets, AliveBitSet, FacetReader, FastFieldReaders}; use crate::fieldnorm::{FieldNormReader, FieldNormReaders}; +use crate::index::merge_optimized_inverted_index_reader::MergeOptimizedInvertedIndexReader; use crate::index::{InvertedIndexReader, Segment, SegmentComponent, SegmentId}; use crate::json_utils::json_path_sep_to_dot; use crate::schema::{Field, IndexRecordOption, Schema, Type}; @@ -262,6 +263,76 @@ impl SegmentReader { Ok(inv_idx_reader) } + /// Returns a field reader associated with the field given in argument that is optimized for + /// Tantivy's merge process. + /// + /// If the field was not present in the index during indexing time, + /// the InvertedIndexReader is empty. + /// + /// The field reader is in charge of iterating through the + /// term dictionary associated with a specific field, + /// and opening the posting list associated with any term. + /// + /// If the field is not marked as index, a warning is logged and an empty + /// `MergeOptimizedInvertedIndexReader` is returned. + /// Similarly, if the field is marked as indexed but no term has been indexed for the given + /// index, an empty `MergeOptimizedInvertedIndexReader` is returned (but no warning is logged). + pub(crate) fn merge_optimized_inverted_index( + &self, + field: Field, + ) -> crate::Result> { + let field_entry = self.schema.get_field_entry(field); + let field_type = field_entry.field_type(); + let record_option_opt = field_type.get_index_record_option(); + + if record_option_opt.is_none() { + warn!("Field {:?} does not seem indexed.", field_entry.name()); + } + + let postings_file_opt = self.postings_composite().open_read(field); + + if postings_file_opt.is_none() || record_option_opt.is_none() { + // no documents in the segment contained this field. + // As a result, no data is associated with the inverted index. + // + // Returns an empty inverted index. + let record_option = record_option_opt.unwrap_or(IndexRecordOption::Basic); + return Ok(Arc::new(MergeOptimizedInvertedIndexReader::empty( + record_option, + ))); + } + + let record_option = record_option_opt.unwrap(); + let postings_file = postings_file_opt.unwrap(); + + let termdict_file: FileSlice = + self.termdict_composite().open_read(field).ok_or_else(|| { + DataCorruption::comment_only(format!( + "Failed to open field {:?}'s term dictionary in the composite file. Has the \ + schema been modified?", + field_entry.name() + )) + })?; + + let positions_file = self.positions_composite().open_read(field).ok_or_else(|| { + let error_msg = format!( + "Failed to open field {:?}'s positions in the composite file. Has the schema been \ + modified?", + field_entry.name() + ); + DataCorruption::comment_only(error_msg) + })?; + + let inv_idx_reader = Arc::new(MergeOptimizedInvertedIndexReader::new( + TermDictionary::open(termdict_file)?, + postings_file, + positions_file, + record_option, + )?); + + Ok(inv_idx_reader) + } + /// Returns the list of fields that have been indexed in the segment. /// The field list includes the field defined in the schema as well as the fields /// that have been indexed as a part of a JSON field. diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index babc16693..26eb692d7 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -12,6 +12,7 @@ use crate::docset::{DocSet, TERMINATED}; use crate::error::DataCorruption; use crate::fastfield::AliveBitSet; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; +use crate::index::merge_optimized_inverted_index_reader::MergeOptimizedInvertedIndexReader; use crate::index::{Segment, SegmentComponent, SegmentReader}; use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping}; use crate::indexer::segment_updater::CancelSentinel; @@ -20,7 +21,7 @@ use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings}; use crate::schema::{value_type_to_column_type, Field, FieldType, Schema}; use crate::store::StoreWriter; use crate::termdict::{TermMerger, TermOrdinal}; -use crate::{DocAddress, DocId, InvertedIndexReader}; +use crate::{DocAddress, DocId}; /// Segment's max doc must be `< MAX_DOC_LIMIT`. /// @@ -309,10 +310,10 @@ impl IndexMerger { let mut max_term_ords: Vec = Vec::new(); - let field_readers: Vec> = self + let field_readers: Vec> = self .readers .iter() - .map(|reader| reader.inverted_index(indexed_field)) + .map(|reader| reader.merge_optimized_inverted_index(indexed_field)) .collect::>>()?; let mut field_term_streams = Vec::new(); @@ -369,10 +370,16 @@ impl IndexMerger { let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![]; + let mut cnt = 0; while merged_terms.advance() { - if self.cancel.wants_cancel() { - return Err(crate::TantivyError::Cancelled); + // calling `wants_cancel()` could be expensive so only do it so often + if cnt % 1000 == 0 { + if self.cancel.wants_cancel() { + return Err(crate::TantivyError::Cancelled); + } } + cnt += 1; + segment_postings_containing_the_term.clear(); let term_bytes: &[u8] = merged_terms.key(); @@ -381,7 +388,8 @@ impl IndexMerger { // Let's compute the list of non-empty posting lists for (segment_ord, term_info) in merged_terms.current_segment_ords_and_term_infos() { let segment_reader = &self.readers[segment_ord]; - let inverted_index: &InvertedIndexReader = &field_readers[segment_ord]; + let inverted_index: &MergeOptimizedInvertedIndexReader = + &field_readers[segment_ord]; let segment_postings = inverted_index .read_postings_from_terminfo(&term_info, segment_postings_option)?; let alive_bitset_opt = segment_reader.alive_bitset(); @@ -451,8 +459,11 @@ impl IndexMerger { let mut doc = segment_postings.doc(); while doc != TERMINATED { - if self.cancel.wants_cancel() { - return Err(crate::TantivyError::Cancelled); + if doc % 1000 == 0 { + // calling `wants_cancel()` could be expensive so only do it so often + if self.cancel.wants_cancel() { + return Err(crate::TantivyError::Cancelled); + } } // deleted doc are skipped as they do not have a `remapped_doc_id`. if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] { diff --git a/src/postings/block_segment_postings.rs b/src/postings/block_segment_postings.rs index cc8894294..26646cdbd 100644 --- a/src/postings/block_segment_postings.rs +++ b/src/postings/block_segment_postings.rs @@ -2,7 +2,7 @@ use std::io; use common::VInt; -use crate::directory::{FileSlice, OwnedBytes}; +use crate::directory::OwnedBytes; use crate::fieldnorm::FieldNormReader; use crate::postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE}; use crate::postings::{BlockInfo, FreqReadingOption, SkipReader}; @@ -96,11 +96,10 @@ impl BlockSegmentPostings { /// term frequency blocks. pub(crate) fn open( doc_freq: u32, - data: FileSlice, + bytes: OwnedBytes, mut record_option: IndexRecordOption, requested_option: IndexRecordOption, ) -> io::Result { - let bytes = data.read_bytes()?; let (skip_data_opt, postings_data) = split_into_skips_and_postings(doc_freq, bytes)?; let skip_reader = match skip_data_opt { Some(skip_data) => { diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index d9ba33eb2..2319916d3 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -81,7 +81,7 @@ impl SegmentPostings { } let block_segment_postings = BlockSegmentPostings::open( docs.len() as u32, - FileSlice::from(buffer), + FileSlice::from(buffer).read_bytes().unwrap(), IndexRecordOption::Basic, IndexRecordOption::Basic, ) @@ -129,7 +129,7 @@ impl SegmentPostings { .unwrap(); let block_segment_postings = BlockSegmentPostings::open( doc_and_tfs.len() as u32, - FileSlice::from(buffer), + FileSlice::from(buffer).read_bytes().unwrap(), IndexRecordOption::WithFreqs, IndexRecordOption::WithFreqs, )