diff --git a/src/indexer/doc_id_mapping.rs b/src/indexer/doc_id_mapping.rs index 9b1a3e6a4..61c076f48 100644 --- a/src/indexer/doc_id_mapping.rs +++ b/src/indexer/doc_id_mapping.rs @@ -2,13 +2,57 @@ //! to get mappings from old doc_id to new doc_id and vice versa, after sorting //! -use super::SegmentWriter; +use super::{merger::SegmentReaderWithOrdinal, SegmentWriter}; use crate::{ schema::{Field, Schema}, DocId, IndexSortByField, Order, TantivyError, }; -use std::cmp::Reverse; -/// Struct to provide mapping from old doc_id to new doc_id and vice versa +use std::{cmp::Reverse, ops::Index}; + +/// Struct to provide mapping from new doc_id to old doc_id and segment. +#[derive(Clone)] +pub(crate) struct SegmentDocidMapping<'a> { + new_doc_id_to_old_and_segment: Vec<(DocId, SegmentReaderWithOrdinal<'a>)>, + is_trivial: bool, +} + +impl<'a> SegmentDocidMapping<'a> { + pub(crate) fn new( + new_doc_id_to_old_and_segment: Vec<(DocId, SegmentReaderWithOrdinal<'a>)>, + is_trivial: bool, + ) -> Self { + Self { + new_doc_id_to_old_and_segment, + is_trivial, + } + } + pub(crate) fn iter(&self) -> impl Iterator { + self.new_doc_id_to_old_and_segment.iter() + } + pub(crate) fn len(&self) -> usize { + self.new_doc_id_to_old_and_segment.len() + } + pub(crate) fn is_trivial(&self) -> bool { + self.is_trivial + } +} +impl<'a> Index for SegmentDocidMapping<'a> { + type Output = (DocId, SegmentReaderWithOrdinal<'a>); + + fn index(&self, idx: usize) -> &Self::Output { + &self.new_doc_id_to_old_and_segment[idx] + } +} +impl<'a> IntoIterator for SegmentDocidMapping<'a> { + type Item = (DocId, SegmentReaderWithOrdinal<'a>); + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.new_doc_id_to_old_and_segment.into_iter() + } +} + +/// Struct to provide mapping from old doc_id to new doc_id and vice versa within a segment. pub struct DocIdMapping { new_doc_id_to_old: Vec, old_doc_id_to_new: Vec, diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 16f19556d..9fa190798 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -9,6 +9,7 @@ use crate::fastfield::MultiValuedFastFieldReader; use crate::fieldnorm::FieldNormsSerializer; use crate::fieldnorm::FieldNormsWriter; use crate::fieldnorm::{FieldNormReader, FieldNormReaders}; +use crate::indexer::doc_id_mapping::SegmentDocidMapping; use crate::indexer::SegmentSerializer; use crate::postings::Postings; use crate::postings::{InvertedIndexSerializer, SegmentPostings}; @@ -232,7 +233,7 @@ impl IndexMerger { fn write_fieldnorms( &self, mut fieldnorms_serializer: FieldNormsSerializer, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result<()> { let fields = FieldNormsWriter::fields_with_fieldnorm(&self.schema); let mut fieldnorms_data = Vec::with_capacity(self.max_doc as usize); @@ -244,7 +245,7 @@ impl IndexMerger { .iter() .map(|reader| reader.get_fieldnorms_reader(field)) .collect::>()?; - for (doc_id, reader_with_ordinal) in doc_id_mapping { + for (doc_id, reader_with_ordinal) in doc_id_mapping.iter() { let fieldnorms_reader = &fieldnorms_readers[reader_with_ordinal.ordinal as usize]; let fieldnorm_id = fieldnorms_reader.fieldnorm_id(*doc_id); @@ -269,7 +270,7 @@ impl IndexMerger { &self, fast_field_serializer: &mut CompositeFastFieldSerializer, mut term_ord_mappings: HashMap, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result<()> { for (field, field_entry) in self.schema.fields() { let field_type = field_entry.field_type(); @@ -318,7 +319,7 @@ impl IndexMerger { &self, field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result<()> { let (min_value, max_value) = self.readers.iter().map(|reader|{ let u64_reader: DynamicFastFieldReader = reader @@ -351,7 +352,7 @@ impl IndexMerger { }; #[derive(Clone)] struct SortedDocidFieldAccessProvider<'a> { - doc_id_mapping: &'a Vec<(DocId, SegmentReaderWithOrdinal<'a>)>, + doc_id_mapping: &'a SegmentDocidMapping<'a>, fast_field_readers: &'a Vec>, } impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> { @@ -364,7 +365,11 @@ impl IndexMerger { doc_id_mapping, fast_field_readers: &fast_field_readers, }; - let iter = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| { + let iter1 = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| { + let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize]; + fast_field_reader.get(*doc_id) + }); + let iter2 = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| { let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize]; fast_field_reader.get(*doc_id) }); @@ -372,92 +377,12 @@ impl IndexMerger { field, stats, fastfield_accessor, - iter.clone(), - iter, + iter1, + iter2, )?; Ok(()) } else { - let num_vals = self - .readers - .iter() - .map(|reader| reader.num_docs() as u64) - .sum(); - - let stats = FastFieldStats { - min_value, - max_value, - num_vals, - }; - #[derive(Clone)] - struct DocidFieldAccessProvider<'a> { - segment_and_field_readers: Vec<(&'a SegmentReader, DynamicFastFieldReader)>, - } - impl<'a> FastFieldDataAccess for DocidFieldAccessProvider<'a> { - fn get_val(&self, doc: u64) -> u64 { - // Find the reader which will contain the doc_id. - let mut num_docs_so_far = 0; - let reader_ordinal = self - .segment_and_field_readers - .iter() - .position(|(segment_reader, _)| { - num_docs_so_far += segment_reader.num_docs() as u64; - - num_docs_so_far > doc - }) - .unwrap(); - - let (segment_reader, reader) = - &self.segment_and_field_readers[reader_ordinal as usize]; - let pos_in_reader = doc - (num_docs_so_far - segment_reader.num_docs() as u64); - - let docid = segment_reader - .doc_ids_alive() - .nth(pos_in_reader as usize) - .expect(&format!( - "unexpected error, could not find doc id in alive list docid {}, number of docids in segment {} ", - pos_in_reader, - segment_reader.doc_ids_alive().count() - )); - reader.get(docid) - } - } - let segment_and_field_readers = self.readers.iter() - .map(|reader|{ - let u64_reader: DynamicFastFieldReader = reader - .fast_fields() - .typed_fast_field_reader(field) - .expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen."); - (reader, u64_reader) - }).collect::>(); - - let iter1 = segment_and_field_readers - .iter() - .map(|(reader, u64_reader)| { - reader - .doc_ids_alive() - .map(move |doc_id| u64_reader.get(doc_id)) - }) - .flatten(); - let iter2 = segment_and_field_readers - .iter() - .map(|(reader, u64_reader)| { - reader - .doc_ids_alive() - .map(move |doc_id| u64_reader.get(doc_id)) - }) - .flatten(); - - let fastfield_accessor = DocidFieldAccessProvider { - segment_and_field_readers: segment_and_field_readers.clone(), - }; - fast_field_serializer.create_auto_detect_u64_fast_field( - field, - stats, - fastfield_accessor, - iter1, - iter2, - )?; Ok(()) } } @@ -523,7 +448,7 @@ impl IndexMerger { pub(crate) fn generate_doc_id_mapping( &self, sort_by_field: &IndexSortByField, - ) -> crate::Result> { + ) -> crate::Result { let reader_and_field_accessors = self.get_reader_with_sort_field_accessor(sort_by_field)?; // Loading the field accessor on demand causes a 15x regression @@ -559,7 +484,7 @@ impl IndexMerger { }) .map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id)) .collect::>(); - Ok(sorted_doc_ids) + Ok(SegmentDocidMapping::new(sorted_doc_ids, false)) } // Creating the index file to point into the data, generic over `BytesFastFieldReader` and @@ -571,7 +496,7 @@ impl IndexMerger { fn write_1_n_fast_field_idx_generic( field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, reader_and_field_accessors: &[(&SegmentReader, T)], ) -> crate::Result> { let mut total_num_vals = 0u64; @@ -610,7 +535,7 @@ impl IndexMerger { let mut offsets = vec![]; let mut offset = 0; - for (doc_id, reader) in doc_id_mapping { + for (doc_id, reader) in doc_id_mapping.iter() { let reader = &reader_and_field_accessors[reader.ordinal as usize].1; offsets.push(offset); offset += reader.get_len(*doc_id) as u64; @@ -650,7 +575,7 @@ impl IndexMerger { &self, field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result> { let reader_and_field_accessors = self.readers.iter().map(|reader|{ let u64s_reader: MultiValuedFastFieldReader = reader.fast_fields() @@ -672,7 +597,7 @@ impl IndexMerger { field: Field, term_ordinal_mappings: &TermOrdinalMapping, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result<()> { // Multifastfield consists in 2 fastfields. // The first serves as an index into the second one and is stricly increasing. @@ -700,7 +625,7 @@ impl IndexMerger { fast_field_serializer.new_u64_fast_field_with_idx(field, 0u64, max_term_ord, 1)?; let mut vals = Vec::with_capacity(100); if let Some(doc_id_mapping) = doc_id_mapping { - for (old_doc_id, reader_with_ordinal) in doc_id_mapping { + for (old_doc_id, reader_with_ordinal) in doc_id_mapping.iter() { let term_ordinal_mapping: &[TermOrdinal] = term_ordinal_mappings.get_segment(reader_with_ordinal.ordinal as usize); @@ -731,12 +656,35 @@ impl IndexMerger { Ok(()) } + /// Creates a mapping if the segments are stacked. this is helpful to merge codelines between index + /// sorting and the others + pub(crate) fn get_doc_id_from_concatenated_data(&self) -> crate::Result { + let mapping: Vec<_> = self + .readers + .iter() + .enumerate() + .map(|(ordinal, reader)| { + let reader_with_ordinal = SegmentReaderWithOrdinal { + ordinal: ordinal as u32, + reader, + }; + reader + .doc_ids_alive() + .map(move |doc_id| (doc_id, reader_with_ordinal)) + }) + .flatten() + .collect(); + Ok(SegmentDocidMapping::new(mapping, true)) + } fn write_multi_fast_field( &self, field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result<()> { + //if doc_id_mapping.is_none() { + //doc_id_mapping = &Some(self.get_doc_id_from_concatenated_data()?); + //} // Multifastfield consists in 2 fastfields. // The first serves as an index into the second one and is stricly increasing. // The second contains the actual values. @@ -784,17 +732,6 @@ impl IndexMerger { max_value = 0; } - let fast_field_readers = self - .readers - .iter() - .map(|reader| { - let ff_reader : MultiValuedFastFieldReader = reader.fast_fields() - .typed_fast_field_multi_reader(field) - .expect("Failed to find index for multivalued field. This is a bug in tantivy, please report."); - ff_reader - }) - .collect::>(); - // We can now initialize our serializer, and push it the different values let stats = FastFieldStats { max_value, @@ -803,7 +740,7 @@ impl IndexMerger { }; if let Some(doc_id_mapping) = doc_id_mapping { struct SortedDocidMultiValueAccessProvider<'a> { - doc_id_mapping: &'a Vec<(DocId, SegmentReaderWithOrdinal<'a>)>, + doc_id_mapping: &'a SegmentDocidMapping<'a>, fast_field_readers: &'a Vec>, offsets: Vec, } @@ -835,13 +772,22 @@ impl IndexMerger { } let fastfield_accessor = SortedDocidMultiValueAccessProvider { doc_id_mapping, - fast_field_readers: &fast_field_readers, + fast_field_readers: &ff_readers, offsets, }; - let iter = doc_id_mapping + let iter1 = doc_id_mapping .iter() .map(|(doc_id, reader_with_ordinal)| { - let ff_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize]; + let ff_reader = &ff_readers[reader_with_ordinal.ordinal as usize]; + let mut vals = vec![]; + ff_reader.get_vals(*doc_id, &mut vals); + vals.into_iter() + }) + .flatten(); + let iter2 = doc_id_mapping + .iter() + .map(|(doc_id, reader_with_ordinal)| { + let ff_reader = &ff_readers[reader_with_ordinal.ordinal as usize]; let mut vals = vec![]; ff_reader.get_vals(*doc_id, &mut vals); vals.into_iter() @@ -851,11 +797,79 @@ impl IndexMerger { field, stats, fastfield_accessor, - iter.clone(), - iter, + iter1, + iter2, 1, )?; } else { + //struct DocidMultiValueAccessProvider<'a> { + //segment_and_field_readers: + //Vec<(&'a SegmentReader, MultiValuedFastFieldReader)>, + //offsets: Vec, + //} + //impl<'a> FastFieldDataAccess for DocidMultiValueAccessProvider<'a> { + //fn get_val(&self, pos: u64) -> u64 { + //// use the offsets index to find the new doc_id which will contain the position. + //// the offsets are stricly increasing so we can do a simple search on it. + //let new_docid = self + //.offsets + //.iter() + //.position(|&offset| offset > pos) + //.unwrap() + //- 1; + + //// now we need to find the position of `pos` in the multivalued bucket + //let num_pos_covered_until_now = self.offsets[new_docid]; + //let pos_in_values = pos - num_pos_covered_until_now; + + ////now we need to find the segment that contains this doc_id. + //let mut num_docs_so_far = 0; + //let reader_ordinal = self + //.segment_and_field_readers + //.iter() + //.position(|(segment_reader, _)| { + //num_docs_so_far += segment_reader.num_docs() as u64; + //num_docs_so_far > doc + //}) + //.unwrap(); + + //let (segment_reader, reader) = + //&self.segment_and_field_readers[reader_ordinal as usize]; + //let pos_in_reader = doc - (num_docs_so_far - segment_reader.num_docs() as u64); + //let docid = segment_reader + //.doc_ids_alive() + //.nth(pos_in_reader as usize) + //.expect(&format!( + //"unexpected error, could not find doc id in alive list docid {}, number of docids in segment {} ", + //pos_in_reader, + //segment_reader.doc_ids_alive().count() + //)); + + //let (old_doc_id, reader_with_ordinal) = self.doc_id_mapping[new_docid as usize]; + //let num_vals = self.fast_field_readers[reader_with_ordinal.ordinal as usize] + //.get_len(old_doc_id); + //assert!(num_vals >= pos_in_values); + //let mut vals = vec![]; + //self.fast_field_readers[reader_with_ordinal.ordinal as usize] + //.get_vals(old_doc_id, &mut vals); + + //vals[pos_in_values as usize] + //} + //} + //let iter = self + //.readers + //.iter() + //.zip(&ff_readers) + //.map(move |(reader, ff_reader)| { + //reader + //.doc_ids_alive() + //.map(move |doc_id| { + //let mut vals = vec![]; + //ff_reader.get_vals(doc_id, &mut vals); + //vals.into_iter() + //}) + //.flatten() + //}); let mut serialize_vals = fast_field_serializer .new_u64_fast_field_with_idx(field, min_value, max_value, 1)?; for (reader, ff_reader) in self.readers.iter().zip(ff_readers) { @@ -876,7 +890,7 @@ impl IndexMerger { &self, field: Field, fast_field_serializer: &mut CompositeFastFieldSerializer, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result<()> { let reader_and_field_accessors = self .readers @@ -896,7 +910,7 @@ impl IndexMerger { )?; let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1); if let Some(doc_id_mapping) = doc_id_mapping { - for (doc_id, reader_with_ordinal) in doc_id_mapping { + for (doc_id, reader_with_ordinal) in doc_id_mapping.iter() { let bytes_reader = &reader_and_field_accessors[reader_with_ordinal.ordinal as usize].1; let val = bytes_reader.get_bytes(*doc_id); @@ -923,7 +937,7 @@ impl IndexMerger { field_type: &FieldType, serializer: &mut InvertedIndexSerializer, fieldnorm_reader: Option, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result> { let mut positions_buffer: Vec = Vec::with_capacity(1_000); let mut delta_computer = DeltaComputer::new(); @@ -1112,7 +1126,7 @@ impl IndexMerger { &self, serializer: &mut InvertedIndexSerializer, fieldnorm_readers: FieldNormReaders, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result> { let mut term_ordinal_mappings = HashMap::new(); for (field, field_entry) in self.schema.fields() { @@ -1135,7 +1149,7 @@ impl IndexMerger { fn write_storable_fields( &self, store_writer: &mut StoreWriter, - doc_id_mapping: &Option>, + doc_id_mapping: &Option, ) -> crate::Result<()> { let store_readers: Vec<_> = self .readers @@ -1148,7 +1162,7 @@ impl IndexMerger { .map(|(i, store)| store.iter_raw(self.readers[i].delete_bitset())) .collect(); if let Some(doc_id_mapping) = doc_id_mapping { - for (old_doc_id, reader_with_ordinal) in doc_id_mapping { + for (old_doc_id, reader_with_ordinal) in doc_id_mapping.iter() { let doc_bytes_it = &mut document_iterators[reader_with_ordinal.ordinal as usize]; if let Some(doc_bytes_res) = doc_bytes_it.next() { let doc_bytes = doc_bytes_res?; @@ -1204,12 +1218,12 @@ impl IndexMerger { // If the documents are already sorted and stackable, we ignore the mapping and execute // it as if there was no sorting if self.is_disjunct_and_sorted_on_sort_property(sort_by_field)? { - None + Some(self.get_doc_id_from_concatenated_data()?) } else { Some(self.generate_doc_id_mapping(sort_by_field)?) } } else { - None + Some(self.get_doc_id_from_concatenated_data()?) }; if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {