diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 68e2de077..c5103a604 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -559,7 +559,9 @@ impl IndexWriter { /// /// `segment_ids` is required to be non-empty. pub fn merge(&mut self, segment_ids: &[SegmentId]) -> FutureResult> { - let merge_operation = self.segment_updater.make_merge_operation(segment_ids); + let merge_operation = self + .segment_updater + .make_merge_operation(segment_ids, false); let segment_updater = self.segment_updater.clone(); segment_updater.start_merge(merge_operation) } @@ -573,8 +575,11 @@ impl IndexWriter { pub fn merge_foreground( &mut self, segment_ids: &[SegmentId], + ignore_store: bool, ) -> crate::Result> { - let merge_operation = self.segment_updater.make_merge_operation(segment_ids); + let merge_operation = self + .segment_updater + .make_merge_operation(segment_ids, ignore_store); self.segment_updater.merge_foreground(merge_operation) } diff --git a/src/indexer/merge_operation.rs b/src/indexer/merge_operation.rs index 073319002..acdda6056 100644 --- a/src/indexer/merge_operation.rs +++ b/src/indexer/merge_operation.rs @@ -47,6 +47,7 @@ pub struct MergeOperation { pub(crate) struct InnerMergeOperation { target_opstamp: Opstamp, segment_ids: Vec, + ignore_store: bool, } impl MergeOperation { @@ -54,10 +55,12 @@ impl MergeOperation { inventory: &MergeOperationInventory, target_opstamp: Opstamp, segment_ids: Vec, + ignore_store: bool, ) -> MergeOperation { let inner_merge_operation = InnerMergeOperation { target_opstamp, segment_ids, + ignore_store, }; MergeOperation { inner: inventory.track(inner_merge_operation), @@ -74,4 +77,9 @@ impl MergeOperation { pub fn segment_ids(&self) -> &[SegmentId] { &self.inner.segment_ids[..] } + + /// Returns true if the store should be ignored during merge. + pub fn ignore_store(&self) -> bool { + self.inner.ignore_store + } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 6ac48470e..b7188c316 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -83,6 +83,7 @@ pub struct IndexMerger { pub(crate) readers: Vec, max_doc: u32, cancel: Box, + ignore_store: bool, } struct DeltaComputer { @@ -152,9 +153,10 @@ impl IndexMerger { schema: Schema, segments: &[Segment], cancel: Box, + ignore_store: bool, ) -> crate::Result { let alive_bitset = segments.iter().map(|_| None).collect_vec(); - Self::open_with_custom_alive_set(schema, segments, alive_bitset, cancel) + Self::open_with_custom_alive_set(schema, segments, alive_bitset, cancel, ignore_store) } // Create merge with a custom delete set. @@ -174,6 +176,7 @@ impl IndexMerger { segments: &[Segment], alive_bitset_opt: Vec>, cancel: Box, + ignore_store: bool, ) -> crate::Result { let mut readers = vec![]; for (segment, new_alive_bitset_opt) in segments.iter().zip(alive_bitset_opt) { @@ -198,6 +201,7 @@ impl IndexMerger { readers, max_doc, cancel, + ignore_store, }) } @@ -578,7 +582,9 @@ impl IndexMerger { )?; debug!("write-storagefields"); - self.write_storable_fields(serializer.get_store_writer())?; + if !self.ignore_store { + self.write_storable_fields(serializer.get_store_writer())?; + } debug!("write-fastfields"); self.write_fast_fields(serializer.get_fast_field_write(), doc_id_mapping)?; @@ -826,7 +832,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer: IndexWriter = index.writer_for_tests()?; - index_writer.merge_foreground(&segment_ids)?; + index_writer.merge_foreground(&segment_ids, false)?; } { reader.reload()?; diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 267b50f2e..18e142108 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -129,6 +129,7 @@ fn merge( mut segment_entries: Vec, target_opstamp: Opstamp, cancel: Box, + ignore_store: bool, ) -> crate::Result> { let num_docs = segment_entries .iter() @@ -155,7 +156,7 @@ fn merge( .collect(); // An IndexMerger is like a "view" of our merged segments. - let merger = IndexMerger::open(index.schema(), &segments[..], cancel)?; + let merger = IndexMerger::open(index.schema(), &segments[..], cancel, ignore_store)?; // ... we just serialize this index merger in our new segment to merge the segments. let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?; @@ -272,6 +273,7 @@ pub fn merge_filtered_segments>>( segments, filter_doc_ids, cancel, + false, )?; let segment_serializer = SegmentSerializer::for_segment(merged_segment)?; let num_docs = merger.write(segment_serializer)?; @@ -543,9 +545,18 @@ impl SegmentUpdater { self.active_index_meta.read().unwrap().clone() } - pub(crate) fn make_merge_operation(&self, segment_ids: &[SegmentId]) -> MergeOperation { + pub(crate) fn make_merge_operation( + &self, + segment_ids: &[SegmentId], + ignore_store: bool, + ) -> MergeOperation { let commit_opstamp = self.load_meta().opstamp; - MergeOperation::new(&self.merge_operations, commit_opstamp, segment_ids.to_vec()) + MergeOperation::new( + &self.merge_operations, + commit_opstamp, + segment_ids.to_vec(), + ignore_store, + ) } // Starts a merge operation. This function will block until the merge operation is effectively @@ -605,6 +616,7 @@ impl SegmentUpdater { segment_entries, merge_operation.target_opstamp(), cancel, + false, ) { Ok(after_merge_segment_entry) => { let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry); @@ -651,6 +663,7 @@ impl SegmentUpdater { segment_entries, merge_operation.target_opstamp(), cancel, + merge_operation.ignore_store(), ) { Ok(after_merge_segment_entry) => { segment_updater.end_merge_foreground(merge_operation, after_merge_segment_entry) @@ -687,7 +700,12 @@ impl SegmentUpdater { .compute_merge_candidates(Some(self.index.directory()), &uncommitted_segments) .into_iter() .map(|merge_candidate| { - MergeOperation::new(&self.merge_operations, current_opstamp, merge_candidate.0) + MergeOperation::new( + &self.merge_operations, + current_opstamp, + merge_candidate.0, + false, + ) }) .collect(); @@ -696,7 +714,12 @@ impl SegmentUpdater { .compute_merge_candidates(Some(self.index.directory()), &committed_segments) .into_iter() .map(|merge_candidate: MergeCandidate| { - MergeOperation::new(&self.merge_operations, commit_opstamp, merge_candidate.0) + MergeOperation::new( + &self.merge_operations, + commit_opstamp, + merge_candidate.0, + false, + ) }); merge_candidates.extend(committed_merge_candidates); @@ -1280,6 +1303,7 @@ mod tests { &segments[..], filter_segments, Box::new(|| false), + false, )?; let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect(); @@ -1296,6 +1320,7 @@ mod tests { &segments[..], filter_segments, Box::new(|| false), + false, )?; let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();