From de92f094aa8d34cf6f7a1bb59dabdd10f579391a Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 30 Jun 2021 15:51:32 +0900 Subject: [PATCH] Closes #1101 fix delete documents with sort by field Closes #1101 * fix delete documents with sort by field Co-authored-by: Andre-Philippe Paquet --- CHANGELOG.md | 4 + src/core/mod.rs | 1 - src/core/segment.rs | 20 +-- src/fastfield/bytes/writer.rs | 8 +- src/fastfield/multivalued/writer.rs | 8 +- src/fastfield/writer.rs | 2 +- src/fieldnorm/writer.rs | 2 +- src/indexer/doc_id_mapping.rs | 4 +- src/indexer/doc_opstamp_mapping.rs | 75 +++------- src/indexer/index_writer.rs | 217 +++++++++++++++++++++++++--- src/indexer/merger.rs | 16 +- src/indexer/segment_updater.rs | 5 +- src/indexer/segment_writer.rs | 55 +++---- 13 files changed, 273 insertions(+), 144 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0084b8af..bcf8e43a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +Tantivy 0.15.2 +========================= +- Major bugfix. Deleting documents was broken when the index was sorted by a field. (@appaquet, @fulmicoton) #1101 + Tantivy 0.15.1 ========================= - Major bugfix. DocStore panics when first block is deleted. (@appaquet) #1077 diff --git a/src/core/mod.rs b/src/core/mod.rs index aa3679ee4..77587ab59 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -16,7 +16,6 @@ pub use self::index_meta::{ pub use self::inverted_index_reader::InvertedIndexReader; pub use self::searcher::Searcher; pub use self::segment::Segment; -pub use self::segment::SerializableSegment; pub use self::segment_component::SegmentComponent; pub use self::segment_id::SegmentId; pub use self::segment_reader::SegmentReader; diff --git a/src/core/segment.rs b/src/core/segment.rs index a4c135dbb..10158b196 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -1,13 +1,12 @@ use super::SegmentComponent; +use crate::core::Index; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::directory::error::{OpenReadError, OpenWriteError}; use crate::directory::Directory; use crate::directory::{FileSlice, WritePtr}; -use crate::indexer::segment_serializer::SegmentSerializer; use crate::schema::Schema; use crate::Opstamp; -use crate::{core::Index, indexer::doc_id_mapping::DocIdMapping}; use std::fmt; use std::path::PathBuf; @@ -90,20 +89,3 @@ impl Segment { Ok(write) } } - -pub trait SerializableSegment { - /// Writes a view of a segment by pushing information - /// to the `SegmentSerializer`. - /// - /// # Returns - /// The number of documents in the segment. - /// - /// doc_id_map is used when index is created and sorted, to map to the new doc_id order. - /// It is not used by the `IndexMerger`, since the doc_id_mapping on cross-segments works - /// differently - fn write( - &self, - serializer: SegmentSerializer, - doc_id_map: Option<&DocIdMapping>, - ) -> crate::Result; -} diff --git a/src/fastfield/bytes/writer.rs b/src/fastfield/bytes/writer.rs index 3a4de980f..3f79d44f2 100644 --- a/src/fastfield/bytes/writer.rs +++ b/src/fastfield/bytes/writer.rs @@ -83,11 +83,11 @@ impl BytesFastFieldWriter { &'a self, doc_id_map: Option<&'b DocIdMapping>, ) -> impl Iterator { - let doc_id_iter = if let Some(doc_id_map) = doc_id_map { - Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box> + let doc_id_iter: Box> = if let Some(doc_id_map) = doc_id_map { + Box::new(doc_id_map.iter_old_doc_ids()) } else { - Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32)) - as Box> + let max_doc = self.doc_index.len() as u32; + Box::new(0..max_doc) }; doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id)) } diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 9259a4116..faeffddc7 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -102,11 +102,11 @@ impl MultiValuedFastFieldWriter { &'a self, doc_id_map: Option<&'b DocIdMapping>, ) -> impl Iterator { - let doc_id_iter = if let Some(doc_id_map) = doc_id_map { - Box::new(doc_id_map.iter_old_doc_ids().cloned()) as Box> + let doc_id_iter: Box> = if let Some(doc_id_map) = doc_id_map { + Box::new(doc_id_map.iter_old_doc_ids()) } else { - Box::new(self.doc_index.iter().enumerate().map(|el| el.0 as u32)) - as Box> + let max_doc = self.doc_index.len() as DocId; + Box::new(0..max_doc) }; doc_id_iter.map(move |doc_id| self.get_values_for_doc_id(doc_id)) } diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 85e6666a7..a3b3bdb79 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -296,7 +296,7 @@ impl IntFastFieldWriter { if let Some(doc_id_map) = doc_id_map { let iter = doc_id_map .iter_old_doc_ids() - .map(|doc_id| self.vals.get(*doc_id as usize)); + .map(|doc_id| self.vals.get(doc_id as usize)); serializer.create_auto_detect_u64_fast_field( self.field, stats, diff --git a/src/fieldnorm/writer.rs b/src/fieldnorm/writer.rs index 9c764ad4c..5284c7021 100644 --- a/src/fieldnorm/writer.rs +++ b/src/fieldnorm/writer.rs @@ -98,7 +98,7 @@ impl FieldNormsWriter { let mut mapped_fieldnorm_values = vec![]; mapped_fieldnorm_values.resize(fieldnorm_values.len(), 0u8); for (new_doc_id, old_doc_id) in doc_id_map.iter_old_doc_ids().enumerate() { - mapped_fieldnorm_values[new_doc_id] = fieldnorm_values[*old_doc_id as usize]; + mapped_fieldnorm_values[new_doc_id] = fieldnorm_values[old_doc_id as usize]; } fieldnorms_serializer.serialize_field(field, &mapped_fieldnorm_values)?; } else { diff --git a/src/indexer/doc_id_mapping.rs b/src/indexer/doc_id_mapping.rs index 65c3f03f6..9b1a3e6a4 100644 --- a/src/indexer/doc_id_mapping.rs +++ b/src/indexer/doc_id_mapping.rs @@ -24,8 +24,8 @@ impl DocIdMapping { self.new_doc_id_to_old[doc_id as usize] } /// iterate over old doc_ids in order of the new doc_ids - pub fn iter_old_doc_ids(&self) -> std::slice::Iter<'_, DocId> { - self.new_doc_id_to_old.iter() + pub fn iter_old_doc_ids(&self) -> impl Iterator + Clone + '_ { + self.new_doc_id_to_old.iter().cloned() } } diff --git a/src/indexer/doc_opstamp_mapping.rs b/src/indexer/doc_opstamp_mapping.rs index d54ccb69d..d5a905cab 100644 --- a/src/indexer/doc_opstamp_mapping.rs +++ b/src/indexer/doc_opstamp_mapping.rs @@ -14,35 +14,27 @@ use crate::Opstamp; // The doc to opstamp mapping stores precisely an array // indexed by doc id and storing the opstamp of the document. // -// This mapping is (for the moment) stricly increasing -// because of the way document id are allocated. +// This mapping is NOT necessarily increasing, because +// we might be sorting documents according to a fast field. #[derive(Clone)] pub enum DocToOpstampMapping<'a> { WithMap(&'a [Opstamp]), None, } -impl<'a> From<&'a [u64]> for DocToOpstampMapping<'a> { - fn from(opstamps: &[Opstamp]) -> DocToOpstampMapping { - DocToOpstampMapping::WithMap(opstamps) - } -} - impl<'a> DocToOpstampMapping<'a> { - /// Given an opstamp return the limit doc id L - /// such that all doc id D such that - // D >= L iff opstamp(D) >= than `target_opstamp`. - // - // The edge case opstamp = some doc opstamp is in practise - // never called. - pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId { - match *self { - DocToOpstampMapping::WithMap(ref doc_opstamps) => { - match doc_opstamps.binary_search(&target_opstamp) { - Ok(doc_id) | Err(doc_id) => doc_id as DocId, - } + /// Assess whether a document should be considered deleted given that it contains + /// a deleted term that was deleted at the opstamp: `delete_opstamp`. + /// + /// This function returns true if the `DocToOpstamp` mapping is none or if + /// the `doc_opstamp` is anterior to the delete opstamp. + pub fn is_deleted(&self, doc_id: DocId, delete_opstamp: Opstamp) -> bool { + match self { + Self::WithMap(doc_opstamps) => { + let doc_opstamp = doc_opstamps[doc_id as usize]; + doc_opstamp < delete_opstamp } - DocToOpstampMapping::None => DocId::max_value(), + Self::None => true, } } } @@ -55,40 +47,17 @@ mod tests { #[test] fn test_doc_to_opstamp_mapping_none() { let doc_to_opstamp_mapping = DocToOpstampMapping::None; - assert_eq!( - doc_to_opstamp_mapping.compute_doc_limit(1), - u32::max_value() - ); + assert!(doc_to_opstamp_mapping.is_deleted(1u32, 0u64)); + assert!(doc_to_opstamp_mapping.is_deleted(1u32, 2u64)); } #[test] - fn test_doc_to_opstamp_mapping_complex() { - { - let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[][..]); - assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0); - assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 0); - } - { - let doc_to_opstamp_mapping = DocToOpstampMapping::from(&[1u64][..]); - assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0); - assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(2u64), 1); - } - { - let doc_to_opstamp_mapping = - DocToOpstampMapping::from(&[1u64, 12u64, 17u64, 23u64][..]); - assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(0u64), 0); - for i in 2u64..13u64 { - assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 1); - } - for i in 13u64..18u64 { - assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 2); - } - for i in 18u64..24u64 { - assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 3); - } - for i in 24u64..30u64 { - assert_eq!(doc_to_opstamp_mapping.compute_doc_limit(i), 4); - } - } + fn test_doc_to_opstamp_mapping_with_map() { + let doc_to_opstamp_mapping = DocToOpstampMapping::WithMap(&[5u64, 1u64, 0u64, 4u64, 3u64]); + assert_eq!(doc_to_opstamp_mapping.is_deleted(0u32, 2u64), false); + assert_eq!(doc_to_opstamp_mapping.is_deleted(1u32, 2u64), true); + assert_eq!(doc_to_opstamp_mapping.is_deleted(2u32, 2u64), true); + assert_eq!(doc_to_opstamp_mapping.is_deleted(3u32, 2u64), false); + assert_eq!(doc_to_opstamp_mapping.is_deleted(4u32, 2u64), false); } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 718f5203f..d11017904 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -106,22 +106,18 @@ fn compute_deleted_bitset( } // A delete operation should only affect - // document that were inserted after it. - // - // Limit doc helps identify the first document - // that may be affected by the delete operation. - let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp); + // document that were inserted before it. let inverted_index = segment_reader.inverted_index(delete_op.term.field())?; if let Some(mut docset) = inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)? { - let mut deleted_doc = docset.doc(); - while deleted_doc != TERMINATED { - if deleted_doc < limit_doc { - delete_bitset.insert(deleted_doc); + let mut doc_matching_deleted_term = docset.doc(); + while doc_matching_deleted_term != TERMINATED { + if doc_opstamps.is_deleted(doc_matching_deleted_term, delete_op.opstamp) { + delete_bitset.insert(doc_matching_deleted_term); might_have_changed = true; } - deleted_doc = docset.advance(); + doc_matching_deleted_term = docset.advance(); } } delete_cursor.advance(); @@ -230,14 +226,8 @@ fn index_documents( let segment_with_max_doc = segment.with_max_doc(max_doc); - let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap()); - - let delete_bitset_opt = apply_deletes( - &segment_with_max_doc, - &mut delete_cursor, - &doc_opstamps, - last_docstamp, - )?; + let delete_bitset_opt = + apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?; let meta = segment_with_max_doc.meta().clone(); meta.untrack_temp_docstore(); @@ -247,19 +237,26 @@ fn index_documents( Ok(true) } +/// `doc_opstamps` is required to be non-empty. fn apply_deletes( segment: &Segment, mut delete_cursor: &mut DeleteCursor, doc_opstamps: &[Opstamp], - last_docstamp: Opstamp, ) -> crate::Result> { if delete_cursor.get().is_none() { // if there are no delete operation in the queue, no need // to even open the segment. return Ok(None); } + + let max_doc_opstamp: Opstamp = doc_opstamps + .iter() + .cloned() + .max() + .expect("Empty DocOpstamp is forbidden"); + let segment_reader = SegmentReader::open(segment)?; - let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps); + let doc_to_opstamps = DocToOpstampMapping::WithMap(doc_opstamps); let max_doc = segment.meta().max_doc(); let mut deleted_bitset = BitSet::with_max_value(max_doc); @@ -268,7 +265,7 @@ fn apply_deletes( &segment_reader, &mut delete_cursor, &doc_to_opstamps, - last_docstamp, + max_doc_opstamp, )?; Ok(if may_have_deletes { Some(deleted_bitset) @@ -784,17 +781,22 @@ impl Drop for IndexWriter { #[cfg(test)] mod tests { + use proptest::prelude::*; + use proptest::prop_oneof; + use proptest::strategy::Strategy; use super::super::operation::UserOperation; use crate::collector::TopDocs; use crate::directory::error::LockError; use crate::error::*; + use crate::fastfield::FastFieldReader; use crate::indexer::NoMergePolicy; use crate::query::TermQuery; - use crate::schema::{self, IndexRecordOption, STRING}; + use crate::schema::{self, IndexRecordOption, FAST, INDEXED, STRING}; use crate::Index; use crate::ReloadPolicy; use crate::Term; + use crate::{IndexSettings, IndexSortByField, Order}; #[test] fn test_operations_group() { @@ -1282,6 +1284,177 @@ mod tests { assert!(commit_again.is_ok()); } + #[test] + fn test_delete_with_sort_by_field() -> crate::Result<()> { + let mut schema_builder = schema::Schema::builder(); + let id_field = + schema_builder.add_u64_field("id", schema::INDEXED | schema::STORED | schema::FAST); + let schema = schema_builder.build(); + + let settings = IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "id".to_string(), + order: Order::Desc, + }), + ..Default::default() + }; + + let index = Index::builder() + .schema(schema) + .settings(settings) + .create_in_ram()?; + let index_reader = index.reader()?; + let mut index_writer = index.writer_for_tests()?; + + // create and delete docs in same commit + for id in 0u64..5u64 { + index_writer.add_document(doc!(id_field => id)); + } + for id in 2u64..4u64 { + index_writer.delete_term(Term::from_field_u64(id_field, id)); + } + for id in 5u64..10u64 { + index_writer.add_document(doc!(id_field => id)); + } + index_writer.commit()?; + index_reader.reload()?; + + let searcher = index_reader.searcher(); + assert_eq!(searcher.segment_readers().len(), 1); + + let segment_reader = searcher.segment_reader(0); + assert_eq!(segment_reader.num_docs(), 8); + assert_eq!(segment_reader.max_doc(), 10); + let fast_field_reader = segment_reader.fast_fields().u64(id_field)?; + let in_order_alive_ids: Vec = segment_reader + .doc_ids_alive() + .map(|doc| fast_field_reader.get(doc)) + .collect(); + assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 1, 0]); + Ok(()) + } + + #[derive(Debug, Clone, Copy)] + enum IndexingOp { + AddDoc { id: u64 }, + DeleteDoc { id: u64 }, + } + + fn operation_strategy() -> impl Strategy { + prop_oneof![ + (0u64..10u64).prop_map(|id| IndexingOp::DeleteDoc { id }), + (0u64..10u64).prop_map(|id| IndexingOp::AddDoc { id }), + ] + } + + fn expected_ids(ops: &[IndexingOp]) -> Vec { + let mut ids = Vec::new(); + for &op in ops { + match op { + IndexingOp::AddDoc { id } => { + ids.push(id); + } + IndexingOp::DeleteDoc { id } => { + ids.retain(|&id_val| id_val != id); + } + } + } + ids.sort(); + ids + } + + fn test_operation_strategy(ops: &[IndexingOp]) -> crate::Result<()> { + let mut schema_builder = schema::Schema::builder(); + let id_field = schema_builder.add_u64_field("id", FAST | INDEXED); + let schema = schema_builder.build(); + let settings = IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "id".to_string(), + order: Order::Asc, + }), + ..Default::default() + }; + let index = Index::builder() + .schema(schema) + .settings(settings) + .create_in_ram()?; + let mut index_writer = index.writer_for_tests()?; + for &op in ops { + match op { + IndexingOp::AddDoc { id } => { + index_writer.add_document(doc!(id_field=>id)); + } + IndexingOp::DeleteDoc { id } => { + index_writer.delete_term(Term::from_field_u64(id_field, id)); + } + } + } + index_writer.commit()?; + let searcher = index.reader()?.searcher(); + let ids: Vec = searcher + .segment_readers() + .iter() + .flat_map(|segment_reader| { + let ff_reader = segment_reader.fast_fields().u64(id_field).unwrap(); + segment_reader + .doc_ids_alive() + .map(move |doc| ff_reader.get(doc)) + }) + .collect(); + + assert_eq!(ids, expected_ids(ops)); + Ok(()) + } + + proptest! { + #[test] + fn test_delete_with_sort_proptest(ops in proptest::collection::vec(operation_strategy(), 1..10)) { + assert!(test_operation_strategy(&ops[..]).is_ok()); + } + } + + #[test] + fn test_delete_with_sort_by_field_last_opstamp_is_not_max() -> crate::Result<()> { + let mut schema_builder = schema::Schema::builder(); + let sort_by_field = schema_builder.add_u64_field("sort_by", FAST); + let id_field = schema_builder.add_u64_field("id", INDEXED); + let schema = schema_builder.build(); + + let settings = IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "sort_by".to_string(), + order: Order::Asc, + }), + ..Default::default() + }; + + let index = Index::builder() + .schema(schema) + .settings(settings) + .create_in_ram()?; + let mut index_writer = index.writer_for_tests()?; + + // We add a doc... + index_writer.add_document(doc!(sort_by_field => 2u64, id_field => 0u64)); + // And remove it. + index_writer.delete_term(Term::from_field_u64(id_field, 0u64)); + // We add another doc. + index_writer.add_document(doc!(sort_by_field=>1u64, id_field => 0u64)); + + // The expected result is a segment with + // maxdoc = 2 + // numdoc = 1. + index_writer.commit()?; + + let searcher = index.reader()?.searcher(); + assert_eq!(searcher.segment_readers().len(), 1); + + let segment_reader = searcher.segment_reader(0); + assert_eq!(segment_reader.max_doc(), 2); + assert_eq!(segment_reader.num_deleted_docs(), 1); + Ok(()) + } + #[test] fn test_index_doc_missing_field() { let mut schema_builder = schema::Schema::builder(); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index e6b683f5f..5810a6e55 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,4 +1,3 @@ -use super::doc_id_mapping::DocIdMapping; use crate::error::DataCorruption; use crate::fastfield::CompositeFastFieldSerializer; use crate::fastfield::DeleteBitSet; @@ -19,11 +18,11 @@ use crate::schema::{Field, Schema}; use crate::store::StoreWriter; use crate::termdict::TermMerger; use crate::termdict::TermOrdinal; +use crate::IndexSortByField; use crate::{common::HasLen, fastfield::MultiValueLength}; use crate::{common::MAX_DOC_LIMIT, IndexSettings}; use crate::{core::Segment, indexer::doc_id_mapping::expect_field_id_for_sort_field}; use crate::{core::SegmentReader, Order}; -use crate::{core::SerializableSegment, IndexSortByField}; use crate::{ docset::{DocSet, TERMINATED}, SegmentOrdinal, @@ -1084,14 +1083,13 @@ impl IndexMerger { } Ok(()) } -} -impl SerializableSegment for IndexMerger { - fn write( - &self, - mut serializer: SegmentSerializer, - _: Option<&DocIdMapping>, - ) -> crate::Result { + /// Writes the merged segment by pushing information + /// to the `SegmentSerializer`. + /// + /// # Returns + /// The number of documents in the resulting segment. + pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result { let doc_id_mapping = if let Some(sort_by_field) = self.index_settings.sort_by_field.as_ref() { // If the documents are already sorted and stackable, we ignore the mapping and execute diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 0be2f93de..c4de031da 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -5,7 +5,6 @@ use crate::core::IndexSettings; use crate::core::Segment; use crate::core::SegmentId; use crate::core::SegmentMeta; -use crate::core::SerializableSegment; use crate::core::META_FILEPATH; use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult}; use crate::indexer::delete_queue::DeleteCursor; @@ -140,7 +139,7 @@ fn merge( // ... we just serialize this index merger in our new segment to merge the segments. let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone(), true)?; - let num_docs = merger.write(segment_serializer, None)?; + let num_docs = merger.write(segment_serializer)?; let merged_segment_id = merged_segment.id(); @@ -209,7 +208,7 @@ pub fn merge_segments( &segments[..], )?; let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?; - let num_docs = merger.write(segment_serializer, None)?; + let num_docs = merger.write(segment_serializer)?; let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs); diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index ac737340e..c4d58d59c 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -12,12 +12,12 @@ use crate::schema::Schema; use crate::schema::Term; use crate::schema::Value; use crate::schema::{Field, FieldEntry}; +use crate::store::StoreReader; use crate::tokenizer::{BoxTokenStream, PreTokenizedStream}; use crate::tokenizer::{FacetTokenizer, TextAnalyzer}; use crate::tokenizer::{TokenStreamChain, Tokenizer}; use crate::Opstamp; use crate::{core::Segment, store::StoreWriter}; -use crate::{core::SerializableSegment, store::StoreReader}; use crate::{DocId, SegmentComponent}; /// Computes the initial size of the hash table. @@ -36,6 +36,20 @@ fn initial_table_size(per_thread_memory_budget: usize) -> crate::Result { } } +fn remap_doc_opstamps( + opstamps: Vec, + doc_id_mapping_opt: Option<&DocIdMapping>, +) -> Vec { + if let Some(doc_id_mapping_opt) = doc_id_mapping_opt { + doc_id_mapping_opt + .iter_old_doc_ids() + .map(|doc| opstamps[doc as usize]) + .collect() + } else { + opstamps + } +} + /// A `SegmentWriter` is in charge of creating segment index from a /// set of documents. /// @@ -112,14 +126,15 @@ impl SegmentWriter { .clone() .map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self)) .transpose()?; - write( + remap_and_write( &self.multifield_postings, &self.fast_field_writers, &self.fieldnorms_writer, self.segment_serializer, mapping.as_ref(), )?; - Ok(self.doc_opstamps) + let doc_opstamps = remap_doc_opstamps(self.doc_opstamps, mapping.as_ref()); + Ok(doc_opstamps) } pub fn mem_usage(&self) -> usize { @@ -315,8 +330,12 @@ impl SegmentWriter { } } -// This method is used as a trick to workaround the borrow checker -fn write( +/// This method is used as a trick to workaround the borrow checker +/// Writes a view of a segment by pushing information +/// to the `SegmentSerializer`. +/// +/// `doc_id_map` is used to map to the new doc_id order. +fn remap_and_write( multifield_postings: &MultiFieldPostingsWriter, fast_field_writers: &FastFieldsWriter, fieldnorms_writer: &FieldNormsWriter, @@ -340,6 +359,7 @@ fn write( &term_ord_map, doc_id_map, )?; + // finalize temp docstore and create version, which reflects the doc_id_map if let Some(doc_id_map) = doc_id_map { let store_write = serializer @@ -356,31 +376,16 @@ fn write( .segment() .open_read(SegmentComponent::TempStore)?, )?; + for old_doc_id in doc_id_map.iter_old_doc_ids() { - let doc_bytes = store_read.get_document_bytes(*old_doc_id)?; + let doc_bytes = store_read.get_document_bytes(old_doc_id)?; serializer.get_store_writer().store_bytes(&doc_bytes)?; } } - serializer.close()?; - Ok(()) -} -impl SerializableSegment for SegmentWriter { - fn write( - &self, - serializer: SegmentSerializer, - doc_id_map: Option<&DocIdMapping>, - ) -> crate::Result { - let max_doc = self.max_doc; - write( - &self.multifield_postings, - &self.fast_field_writers, - &self.fieldnorms_writer, - serializer, - doc_id_map, - )?; - Ok(max_doc) - } + serializer.close()?; + + Ok(()) } #[cfg(test)]