diff --git a/src/indexer/segment_entry.rs b/src/indexer/segment_entry.rs index 00308f5e1..b4a34e72b 100644 --- a/src/indexer/segment_entry.rs +++ b/src/indexer/segment_entry.rs @@ -40,6 +40,10 @@ impl SegmentEntry { } } + pub fn opstamp(&self) -> u64 { + self.opstamp + } + /// Return a reference to the segment entry deleted bitset. /// /// `DocId` in this bitset are flagged as deleted. diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 380124727..10d3a0c21 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -128,16 +128,14 @@ impl SegmentManager { }); } - pub fn commit(&self, segment_entries: Vec) { + pub fn commit(&self, opstamp: u64, segment_entries: Vec) { let mut registers_lock = self.write(); registers_lock.committed.clear(); registers_lock.committed_in_the_future.clear(); registers_lock.uncommitted.clear(); - for segment_entry in segment_entries { - registers_lock - .committed - .register_segment_entry(segment_entry); - } + registers_lock + .committed + .set_commit(opstamp, segment_entries); } pub fn soft_commit(&self, segment_entries: Vec) { diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 263e5e20e..c8fa2f4cf 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -17,6 +17,7 @@ use std::fmt::{self, Debug, Formatter}; #[derive(Default)] pub struct SegmentRegister { segment_states: HashMap, + opstamp_constraint: Option } impl Debug for SegmentRegister { @@ -71,10 +72,23 @@ impl SegmentRegister { /// If a segment entry associated to this `SegmentId` is already there, /// override it with the new `SegmentEntry`. pub fn register_segment_entry(&mut self, segment_entry: SegmentEntry) { + if let Some(expected_opstamp) = self.opstamp_constraint { + if expected_opstamp != segment_entry.opstamp() { + panic!(format!("Invalid segment. Expect opstamp {}, got {}.", expected_opstamp, segment_entry.opstamp())); + } + } let segment_id = segment_entry.segment_id(); self.segment_states.insert(segment_id, segment_entry); } + pub fn set_commit(&mut self, opstamp: u64, segment_entries: Vec) { + assert!(self.segment_states.is_empty()); + self.opstamp_constraint = Some(opstamp); + for segment_entry in segment_entries { + self.register_segment_entry(segment_entry); + } + } + pub fn remove_segment(&mut self, segment_id: &SegmentId) { self.segment_states.remove(segment_id); } @@ -90,7 +104,10 @@ impl SegmentRegister { let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None, opstamp); segment_states.insert(segment_id, segment_entry); } - SegmentRegister { segment_states } + SegmentRegister { + segment_states, + opstamp_constraint: Some(opstamp) + } } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 453de0e6b..27e7f19bb 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -299,7 +299,7 @@ impl SegmentUpdater { // ... obviously we do not save the meta file. } else { // Hard_commit. We register the new segment entries as committed. - segment_updater.0.segment_manager.commit(segment_entries); + segment_updater.0.segment_manager.commit(opstamp, segment_entries); segment_updater.save_metas(opstamp, payload); } segment_updater.garbage_collect_files_exec();