chore: allow merge_foreground to ignore the store (#40)

This commit is contained in:
Ming
2025-04-28 16:04:01 -04:00
committed by Stu Hood
parent b0660ba196
commit 0e1a7e213e
4 changed files with 54 additions and 10 deletions

View File

@@ -559,7 +559,9 @@ impl<D: Document> IndexWriter<D> {
///
/// `segment_ids` is required to be non-empty.
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> FutureResult<Option<SegmentMeta>> {
let merge_operation = self.segment_updater.make_merge_operation(segment_ids);
let merge_operation = self
.segment_updater
.make_merge_operation(segment_ids, false);
let segment_updater = self.segment_updater.clone();
segment_updater.start_merge(merge_operation)
}
@@ -573,8 +575,11 @@ impl<D: Document> IndexWriter<D> {
pub fn merge_foreground(
&mut self,
segment_ids: &[SegmentId],
ignore_store: bool,
) -> crate::Result<Option<SegmentMeta>> {
let merge_operation = self.segment_updater.make_merge_operation(segment_ids);
let merge_operation = self
.segment_updater
.make_merge_operation(segment_ids, ignore_store);
self.segment_updater.merge_foreground(merge_operation)
}

View File

@@ -47,6 +47,7 @@ pub struct MergeOperation {
pub(crate) struct InnerMergeOperation {
target_opstamp: Opstamp,
segment_ids: Vec<SegmentId>,
ignore_store: bool,
}
impl MergeOperation {
@@ -54,10 +55,12 @@ impl MergeOperation {
inventory: &MergeOperationInventory,
target_opstamp: Opstamp,
segment_ids: Vec<SegmentId>,
ignore_store: bool,
) -> MergeOperation {
let inner_merge_operation = InnerMergeOperation {
target_opstamp,
segment_ids,
ignore_store,
};
MergeOperation {
inner: inventory.track(inner_merge_operation),
@@ -74,4 +77,9 @@ impl MergeOperation {
pub fn segment_ids(&self) -> &[SegmentId] {
&self.inner.segment_ids[..]
}
/// Returns true if the store should be ignored during merge.
pub fn ignore_store(&self) -> bool {
self.inner.ignore_store
}
}

View File

@@ -83,6 +83,7 @@ pub struct IndexMerger {
pub(crate) readers: Vec<SegmentReader>,
max_doc: u32,
cancel: Box<dyn CancelSentinel>,
ignore_store: bool,
}
struct DeltaComputer {
@@ -152,9 +153,10 @@ impl IndexMerger {
schema: Schema,
segments: &[Segment],
cancel: Box<dyn CancelSentinel>,
ignore_store: bool,
) -> crate::Result<IndexMerger> {
let alive_bitset = segments.iter().map(|_| None).collect_vec();
Self::open_with_custom_alive_set(schema, segments, alive_bitset, cancel)
Self::open_with_custom_alive_set(schema, segments, alive_bitset, cancel, ignore_store)
}
// Create merge with a custom delete set.
@@ -174,6 +176,7 @@ impl IndexMerger {
segments: &[Segment],
alive_bitset_opt: Vec<Option<AliveBitSet>>,
cancel: Box<dyn CancelSentinel>,
ignore_store: bool,
) -> crate::Result<IndexMerger> {
let mut readers = vec![];
for (segment, new_alive_bitset_opt) in segments.iter().zip(alive_bitset_opt) {
@@ -198,6 +201,7 @@ impl IndexMerger {
readers,
max_doc,
cancel,
ignore_store,
})
}
@@ -578,7 +582,9 @@ impl IndexMerger {
)?;
debug!("write-storagefields");
self.write_storable_fields(serializer.get_store_writer())?;
if !self.ignore_store {
self.write_storable_fields(serializer.get_store_writer())?;
}
debug!("write-fastfields");
self.write_fast_fields(serializer.get_fast_field_write(), doc_id_mapping)?;
@@ -826,7 +832,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.merge_foreground(&segment_ids)?;
index_writer.merge_foreground(&segment_ids, false)?;
}
{
reader.reload()?;

View File

@@ -129,6 +129,7 @@ fn merge(
mut segment_entries: Vec<SegmentEntry>,
target_opstamp: Opstamp,
cancel: Box<dyn CancelSentinel>,
ignore_store: bool,
) -> crate::Result<Option<SegmentEntry>> {
let num_docs = segment_entries
.iter()
@@ -155,7 +156,7 @@ fn merge(
.collect();
// An IndexMerger is like a "view" of our merged segments.
let merger = IndexMerger::open(index.schema(), &segments[..], cancel)?;
let merger = IndexMerger::open(index.schema(), &segments[..], cancel, ignore_store)?;
// ... we just serialize this index merger in our new segment to merge the segments.
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?;
@@ -272,6 +273,7 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
segments,
filter_doc_ids,
cancel,
false,
)?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment)?;
let num_docs = merger.write(segment_serializer)?;
@@ -543,9 +545,18 @@ impl SegmentUpdater {
self.active_index_meta.read().unwrap().clone()
}
pub(crate) fn make_merge_operation(&self, segment_ids: &[SegmentId]) -> MergeOperation {
pub(crate) fn make_merge_operation(
&self,
segment_ids: &[SegmentId],
ignore_store: bool,
) -> MergeOperation {
let commit_opstamp = self.load_meta().opstamp;
MergeOperation::new(&self.merge_operations, commit_opstamp, segment_ids.to_vec())
MergeOperation::new(
&self.merge_operations,
commit_opstamp,
segment_ids.to_vec(),
ignore_store,
)
}
// Starts a merge operation. This function will block until the merge operation is effectively
@@ -605,6 +616,7 @@ impl SegmentUpdater {
segment_entries,
merge_operation.target_opstamp(),
cancel,
false,
) {
Ok(after_merge_segment_entry) => {
let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry);
@@ -651,6 +663,7 @@ impl SegmentUpdater {
segment_entries,
merge_operation.target_opstamp(),
cancel,
merge_operation.ignore_store(),
) {
Ok(after_merge_segment_entry) => {
segment_updater.end_merge_foreground(merge_operation, after_merge_segment_entry)
@@ -687,7 +700,12 @@ impl SegmentUpdater {
.compute_merge_candidates(Some(self.index.directory()), &uncommitted_segments)
.into_iter()
.map(|merge_candidate| {
MergeOperation::new(&self.merge_operations, current_opstamp, merge_candidate.0)
MergeOperation::new(
&self.merge_operations,
current_opstamp,
merge_candidate.0,
false,
)
})
.collect();
@@ -696,7 +714,12 @@ impl SegmentUpdater {
.compute_merge_candidates(Some(self.index.directory()), &committed_segments)
.into_iter()
.map(|merge_candidate: MergeCandidate| {
MergeOperation::new(&self.merge_operations, commit_opstamp, merge_candidate.0)
MergeOperation::new(
&self.merge_operations,
commit_opstamp,
merge_candidate.0,
false,
)
});
merge_candidates.extend(committed_merge_candidates);
@@ -1280,6 +1303,7 @@ mod tests {
&segments[..],
filter_segments,
Box::new(|| false),
false,
)?;
let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();
@@ -1296,6 +1320,7 @@ mod tests {
&segments[..],
filter_segments,
Box::new(|| false),
false,
)?;
let doc_ids_alive: Vec<_> = merger.readers[0].doc_ids_alive().collect();