wrap docidmapping in struct

This commit is contained in:
Pascal Seitz
2021-06-25 12:52:57 +02:00
parent 8526434b63
commit 5c9e2ef036
2 changed files with 182 additions and 124 deletions

View File

@@ -2,13 +2,57 @@
//! to get mappings from old doc_id to new doc_id and vice versa, after sorting
//!
use super::SegmentWriter;
use super::{merger::SegmentReaderWithOrdinal, SegmentWriter};
use crate::{
schema::{Field, Schema},
DocId, IndexSortByField, Order, TantivyError,
};
use std::cmp::Reverse;
/// Struct to provide mapping from old doc_id to new doc_id and vice versa
use std::{cmp::Reverse, ops::Index};
/// Struct to provide mapping from new doc_id to old doc_id and segment.
#[derive(Clone)]
pub(crate) struct SegmentDocidMapping<'a> {
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentReaderWithOrdinal<'a>)>,
is_trivial: bool,
}
impl<'a> SegmentDocidMapping<'a> {
pub(crate) fn new(
new_doc_id_to_old_and_segment: Vec<(DocId, SegmentReaderWithOrdinal<'a>)>,
is_trivial: bool,
) -> Self {
Self {
new_doc_id_to_old_and_segment,
is_trivial,
}
}
pub(crate) fn iter(&self) -> impl Iterator<Item = &(DocId, SegmentReaderWithOrdinal)> {
self.new_doc_id_to_old_and_segment.iter()
}
pub(crate) fn len(&self) -> usize {
self.new_doc_id_to_old_and_segment.len()
}
pub(crate) fn is_trivial(&self) -> bool {
self.is_trivial
}
}
impl<'a> Index<usize> for SegmentDocidMapping<'a> {
type Output = (DocId, SegmentReaderWithOrdinal<'a>);
fn index(&self, idx: usize) -> &Self::Output {
&self.new_doc_id_to_old_and_segment[idx]
}
}
impl<'a> IntoIterator for SegmentDocidMapping<'a> {
type Item = (DocId, SegmentReaderWithOrdinal<'a>);
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.new_doc_id_to_old_and_segment.into_iter()
}
}
/// Struct to provide mapping from old doc_id to new doc_id and vice versa within a segment.
pub struct DocIdMapping {
new_doc_id_to_old: Vec<DocId>,
old_doc_id_to_new: Vec<DocId>,

View File

@@ -9,6 +9,7 @@ use crate::fastfield::MultiValuedFastFieldReader;
use crate::fieldnorm::FieldNormsSerializer;
use crate::fieldnorm::FieldNormsWriter;
use crate::fieldnorm::{FieldNormReader, FieldNormReaders};
use crate::indexer::doc_id_mapping::SegmentDocidMapping;
use crate::indexer::SegmentSerializer;
use crate::postings::Postings;
use crate::postings::{InvertedIndexSerializer, SegmentPostings};
@@ -232,7 +233,7 @@ impl IndexMerger {
fn write_fieldnorms(
&self,
mut fieldnorms_serializer: FieldNormsSerializer,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<()> {
let fields = FieldNormsWriter::fields_with_fieldnorm(&self.schema);
let mut fieldnorms_data = Vec::with_capacity(self.max_doc as usize);
@@ -244,7 +245,7 @@ impl IndexMerger {
.iter()
.map(|reader| reader.get_fieldnorms_reader(field))
.collect::<Result<_, _>>()?;
for (doc_id, reader_with_ordinal) in doc_id_mapping {
for (doc_id, reader_with_ordinal) in doc_id_mapping.iter() {
let fieldnorms_reader =
&fieldnorms_readers[reader_with_ordinal.ordinal as usize];
let fieldnorm_id = fieldnorms_reader.fieldnorm_id(*doc_id);
@@ -269,7 +270,7 @@ impl IndexMerger {
&self,
fast_field_serializer: &mut CompositeFastFieldSerializer,
mut term_ord_mappings: HashMap<Field, TermOrdinalMapping>,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<()> {
for (field, field_entry) in self.schema.fields() {
let field_type = field_entry.field_type();
@@ -318,7 +319,7 @@ impl IndexMerger {
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<()> {
let (min_value, max_value) = self.readers.iter().map(|reader|{
let u64_reader: DynamicFastFieldReader<u64> = reader
@@ -351,7 +352,7 @@ impl IndexMerger {
};
#[derive(Clone)]
struct SortedDocidFieldAccessProvider<'a> {
doc_id_mapping: &'a Vec<(DocId, SegmentReaderWithOrdinal<'a>)>,
doc_id_mapping: &'a SegmentDocidMapping<'a>,
fast_field_readers: &'a Vec<DynamicFastFieldReader<u64>>,
}
impl<'a> FastFieldDataAccess for SortedDocidFieldAccessProvider<'a> {
@@ -364,7 +365,11 @@ impl IndexMerger {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
};
let iter = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
let iter1 = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize];
fast_field_reader.get(*doc_id)
});
let iter2 = doc_id_mapping.iter().map(|(doc_id, reader_with_ordinal)| {
let fast_field_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize];
fast_field_reader.get(*doc_id)
});
@@ -372,92 +377,12 @@ impl IndexMerger {
field,
stats,
fastfield_accessor,
iter.clone(),
iter,
iter1,
iter2,
)?;
Ok(())
} else {
let num_vals = self
.readers
.iter()
.map(|reader| reader.num_docs() as u64)
.sum();
let stats = FastFieldStats {
min_value,
max_value,
num_vals,
};
#[derive(Clone)]
struct DocidFieldAccessProvider<'a> {
segment_and_field_readers: Vec<(&'a SegmentReader, DynamicFastFieldReader<u64>)>,
}
impl<'a> FastFieldDataAccess for DocidFieldAccessProvider<'a> {
fn get_val(&self, doc: u64) -> u64 {
// Find the reader which will contain the doc_id.
let mut num_docs_so_far = 0;
let reader_ordinal = self
.segment_and_field_readers
.iter()
.position(|(segment_reader, _)| {
num_docs_so_far += segment_reader.num_docs() as u64;
num_docs_so_far > doc
})
.unwrap();
let (segment_reader, reader) =
&self.segment_and_field_readers[reader_ordinal as usize];
let pos_in_reader = doc - (num_docs_so_far - segment_reader.num_docs() as u64);
let docid = segment_reader
.doc_ids_alive()
.nth(pos_in_reader as usize)
.expect(&format!(
"unexpected error, could not find doc id in alive list docid {}, number of docids in segment {} ",
pos_in_reader,
segment_reader.doc_ids_alive().count()
));
reader.get(docid)
}
}
let segment_and_field_readers = self.readers.iter()
.map(|reader|{
let u64_reader: DynamicFastFieldReader<u64> = reader
.fast_fields()
.typed_fast_field_reader(field)
.expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen.");
(reader, u64_reader)
}).collect::<Vec<_>>();
let iter1 = segment_and_field_readers
.iter()
.map(|(reader, u64_reader)| {
reader
.doc_ids_alive()
.map(move |doc_id| u64_reader.get(doc_id))
})
.flatten();
let iter2 = segment_and_field_readers
.iter()
.map(|(reader, u64_reader)| {
reader
.doc_ids_alive()
.map(move |doc_id| u64_reader.get(doc_id))
})
.flatten();
let fastfield_accessor = DocidFieldAccessProvider {
segment_and_field_readers: segment_and_field_readers.clone(),
};
fast_field_serializer.create_auto_detect_u64_fast_field(
field,
stats,
fastfield_accessor,
iter1,
iter2,
)?;
Ok(())
}
}
@@ -523,7 +448,7 @@ impl IndexMerger {
pub(crate) fn generate_doc_id_mapping(
&self,
sort_by_field: &IndexSortByField,
) -> crate::Result<Vec<(DocId, SegmentReaderWithOrdinal)>> {
) -> crate::Result<SegmentDocidMapping> {
let reader_and_field_accessors = self.get_reader_with_sort_field_accessor(sort_by_field)?;
// Loading the field accessor on demand causes a 15x regression
@@ -559,7 +484,7 @@ impl IndexMerger {
})
.map(|(doc_id, reader_with_id, _)| (doc_id, reader_with_id))
.collect::<Vec<_>>();
Ok(sorted_doc_ids)
Ok(SegmentDocidMapping::new(sorted_doc_ids, false))
}
// Creating the index file to point into the data, generic over `BytesFastFieldReader` and
@@ -571,7 +496,7 @@ impl IndexMerger {
fn write_1_n_fast_field_idx_generic<T: MultiValueLength>(
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
reader_and_field_accessors: &[(&SegmentReader, T)],
) -> crate::Result<Vec<u64>> {
let mut total_num_vals = 0u64;
@@ -610,7 +535,7 @@ impl IndexMerger {
let mut offsets = vec![];
let mut offset = 0;
for (doc_id, reader) in doc_id_mapping {
for (doc_id, reader) in doc_id_mapping.iter() {
let reader = &reader_and_field_accessors[reader.ordinal as usize].1;
offsets.push(offset);
offset += reader.get_len(*doc_id) as u64;
@@ -650,7 +575,7 @@ impl IndexMerger {
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<Vec<u64>> {
let reader_and_field_accessors = self.readers.iter().map(|reader|{
let u64s_reader: MultiValuedFastFieldReader<u64> = reader.fast_fields()
@@ -672,7 +597,7 @@ impl IndexMerger {
field: Field,
term_ordinal_mappings: &TermOrdinalMapping,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<()> {
// Multifastfield consists in 2 fastfields.
// The first serves as an index into the second one and is stricly increasing.
@@ -700,7 +625,7 @@ impl IndexMerger {
fast_field_serializer.new_u64_fast_field_with_idx(field, 0u64, max_term_ord, 1)?;
let mut vals = Vec::with_capacity(100);
if let Some(doc_id_mapping) = doc_id_mapping {
for (old_doc_id, reader_with_ordinal) in doc_id_mapping {
for (old_doc_id, reader_with_ordinal) in doc_id_mapping.iter() {
let term_ordinal_mapping: &[TermOrdinal] =
term_ordinal_mappings.get_segment(reader_with_ordinal.ordinal as usize);
@@ -731,12 +656,35 @@ impl IndexMerger {
Ok(())
}
/// Creates a mapping if the segments are stacked. this is helpful to merge codelines between index
/// sorting and the others
pub(crate) fn get_doc_id_from_concatenated_data(&self) -> crate::Result<SegmentDocidMapping> {
let mapping: Vec<_> = self
.readers
.iter()
.enumerate()
.map(|(ordinal, reader)| {
let reader_with_ordinal = SegmentReaderWithOrdinal {
ordinal: ordinal as u32,
reader,
};
reader
.doc_ids_alive()
.map(move |doc_id| (doc_id, reader_with_ordinal))
})
.flatten()
.collect();
Ok(SegmentDocidMapping::new(mapping, true))
}
fn write_multi_fast_field(
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<()> {
//if doc_id_mapping.is_none() {
//doc_id_mapping = &Some(self.get_doc_id_from_concatenated_data()?);
//}
// Multifastfield consists in 2 fastfields.
// The first serves as an index into the second one and is stricly increasing.
// The second contains the actual values.
@@ -784,17 +732,6 @@ impl IndexMerger {
max_value = 0;
}
let fast_field_readers = self
.readers
.iter()
.map(|reader| {
let ff_reader : MultiValuedFastFieldReader<u64> = reader.fast_fields()
.typed_fast_field_multi_reader(field)
.expect("Failed to find index for multivalued field. This is a bug in tantivy, please report.");
ff_reader
})
.collect::<Vec<_>>();
// We can now initialize our serializer, and push it the different values
let stats = FastFieldStats {
max_value,
@@ -803,7 +740,7 @@ impl IndexMerger {
};
if let Some(doc_id_mapping) = doc_id_mapping {
struct SortedDocidMultiValueAccessProvider<'a> {
doc_id_mapping: &'a Vec<(DocId, SegmentReaderWithOrdinal<'a>)>,
doc_id_mapping: &'a SegmentDocidMapping<'a>,
fast_field_readers: &'a Vec<MultiValuedFastFieldReader<u64>>,
offsets: Vec<u64>,
}
@@ -835,13 +772,22 @@ impl IndexMerger {
}
let fastfield_accessor = SortedDocidMultiValueAccessProvider {
doc_id_mapping,
fast_field_readers: &fast_field_readers,
fast_field_readers: &ff_readers,
offsets,
};
let iter = doc_id_mapping
let iter1 = doc_id_mapping
.iter()
.map(|(doc_id, reader_with_ordinal)| {
let ff_reader = &fast_field_readers[reader_with_ordinal.ordinal as usize];
let ff_reader = &ff_readers[reader_with_ordinal.ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
})
.flatten();
let iter2 = doc_id_mapping
.iter()
.map(|(doc_id, reader_with_ordinal)| {
let ff_reader = &ff_readers[reader_with_ordinal.ordinal as usize];
let mut vals = vec![];
ff_reader.get_vals(*doc_id, &mut vals);
vals.into_iter()
@@ -851,11 +797,79 @@ impl IndexMerger {
field,
stats,
fastfield_accessor,
iter.clone(),
iter,
iter1,
iter2,
1,
)?;
} else {
//struct DocidMultiValueAccessProvider<'a> {
//segment_and_field_readers:
//Vec<(&'a SegmentReader, MultiValuedFastFieldReader<u64>)>,
//offsets: Vec<u64>,
//}
//impl<'a> FastFieldDataAccess for DocidMultiValueAccessProvider<'a> {
//fn get_val(&self, pos: u64) -> u64 {
//// use the offsets index to find the new doc_id which will contain the position.
//// the offsets are stricly increasing so we can do a simple search on it.
//let new_docid = self
//.offsets
//.iter()
//.position(|&offset| offset > pos)
//.unwrap()
//- 1;
//// now we need to find the position of `pos` in the multivalued bucket
//let num_pos_covered_until_now = self.offsets[new_docid];
//let pos_in_values = pos - num_pos_covered_until_now;
////now we need to find the segment that contains this doc_id.
//let mut num_docs_so_far = 0;
//let reader_ordinal = self
//.segment_and_field_readers
//.iter()
//.position(|(segment_reader, _)| {
//num_docs_so_far += segment_reader.num_docs() as u64;
//num_docs_so_far > doc
//})
//.unwrap();
//let (segment_reader, reader) =
//&self.segment_and_field_readers[reader_ordinal as usize];
//let pos_in_reader = doc - (num_docs_so_far - segment_reader.num_docs() as u64);
//let docid = segment_reader
//.doc_ids_alive()
//.nth(pos_in_reader as usize)
//.expect(&format!(
//"unexpected error, could not find doc id in alive list docid {}, number of docids in segment {} ",
//pos_in_reader,
//segment_reader.doc_ids_alive().count()
//));
//let (old_doc_id, reader_with_ordinal) = self.doc_id_mapping[new_docid as usize];
//let num_vals = self.fast_field_readers[reader_with_ordinal.ordinal as usize]
//.get_len(old_doc_id);
//assert!(num_vals >= pos_in_values);
//let mut vals = vec![];
//self.fast_field_readers[reader_with_ordinal.ordinal as usize]
//.get_vals(old_doc_id, &mut vals);
//vals[pos_in_values as usize]
//}
//}
//let iter = self
//.readers
//.iter()
//.zip(&ff_readers)
//.map(move |(reader, ff_reader)| {
//reader
//.doc_ids_alive()
//.map(move |doc_id| {
//let mut vals = vec![];
//ff_reader.get_vals(doc_id, &mut vals);
//vals.into_iter()
//})
//.flatten()
//});
let mut serialize_vals = fast_field_serializer
.new_u64_fast_field_with_idx(field, min_value, max_value, 1)?;
for (reader, ff_reader) in self.readers.iter().zip(ff_readers) {
@@ -876,7 +890,7 @@ impl IndexMerger {
&self,
field: Field,
fast_field_serializer: &mut CompositeFastFieldSerializer,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<()> {
let reader_and_field_accessors = self
.readers
@@ -896,7 +910,7 @@ impl IndexMerger {
)?;
let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1);
if let Some(doc_id_mapping) = doc_id_mapping {
for (doc_id, reader_with_ordinal) in doc_id_mapping {
for (doc_id, reader_with_ordinal) in doc_id_mapping.iter() {
let bytes_reader =
&reader_and_field_accessors[reader_with_ordinal.ordinal as usize].1;
let val = bytes_reader.get_bytes(*doc_id);
@@ -923,7 +937,7 @@ impl IndexMerger {
field_type: &FieldType,
serializer: &mut InvertedIndexSerializer,
fieldnorm_reader: Option<FieldNormReader>,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<Option<TermOrdinalMapping>> {
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
@@ -1112,7 +1126,7 @@ impl IndexMerger {
&self,
serializer: &mut InvertedIndexSerializer,
fieldnorm_readers: FieldNormReaders,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<HashMap<Field, TermOrdinalMapping>> {
let mut term_ordinal_mappings = HashMap::new();
for (field, field_entry) in self.schema.fields() {
@@ -1135,7 +1149,7 @@ impl IndexMerger {
fn write_storable_fields(
&self,
store_writer: &mut StoreWriter,
doc_id_mapping: &Option<Vec<(DocId, SegmentReaderWithOrdinal)>>,
doc_id_mapping: &Option<SegmentDocidMapping>,
) -> crate::Result<()> {
let store_readers: Vec<_> = self
.readers
@@ -1148,7 +1162,7 @@ impl IndexMerger {
.map(|(i, store)| store.iter_raw(self.readers[i].delete_bitset()))
.collect();
if let Some(doc_id_mapping) = doc_id_mapping {
for (old_doc_id, reader_with_ordinal) in doc_id_mapping {
for (old_doc_id, reader_with_ordinal) in doc_id_mapping.iter() {
let doc_bytes_it = &mut document_iterators[reader_with_ordinal.ordinal as usize];
if let Some(doc_bytes_res) = doc_bytes_it.next() {
let doc_bytes = doc_bytes_res?;
@@ -1204,12 +1218,12 @@ impl IndexMerger {
// If the documents are already sorted and stackable, we ignore the mapping and execute
// it as if there was no sorting
if self.is_disjunct_and_sorted_on_sort_property(sort_by_field)? {
None
Some(self.get_doc_id_from_concatenated_data()?)
} else {
Some(self.generate_doc_id_mapping(sort_by_field)?)
}
} else {
None
Some(self.get_doc_id_from_concatenated_data()?)
};
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {