mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-10 11:02:55 +00:00
opstamp constraint
This commit is contained in:
@@ -40,6 +40,10 @@ impl SegmentEntry {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn opstamp(&self) -> u64 {
|
||||
self.opstamp
|
||||
}
|
||||
|
||||
/// Return a reference to the segment entry deleted bitset.
|
||||
///
|
||||
/// `DocId` in this bitset are flagged as deleted.
|
||||
|
||||
@@ -128,16 +128,14 @@ impl SegmentManager {
|
||||
});
|
||||
}
|
||||
|
||||
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
|
||||
pub fn commit(&self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
|
||||
let mut registers_lock = self.write();
|
||||
registers_lock.committed.clear();
|
||||
registers_lock.committed_in_the_future.clear();
|
||||
registers_lock.uncommitted.clear();
|
||||
for segment_entry in segment_entries {
|
||||
registers_lock
|
||||
.committed
|
||||
.register_segment_entry(segment_entry);
|
||||
}
|
||||
registers_lock
|
||||
.committed
|
||||
.set_commit(opstamp, segment_entries);
|
||||
}
|
||||
|
||||
pub fn soft_commit(&self, segment_entries: Vec<SegmentEntry>) {
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::fmt::{self, Debug, Formatter};
|
||||
#[derive(Default)]
|
||||
pub struct SegmentRegister {
|
||||
segment_states: HashMap<SegmentId, SegmentEntry>,
|
||||
opstamp_constraint: Option<u64>
|
||||
}
|
||||
|
||||
impl Debug for SegmentRegister {
|
||||
@@ -71,10 +72,23 @@ impl SegmentRegister {
|
||||
/// If a segment entry associated to this `SegmentId` is already there,
|
||||
/// override it with the new `SegmentEntry`.
|
||||
pub fn register_segment_entry(&mut self, segment_entry: SegmentEntry) {
|
||||
if let Some(expected_opstamp) = self.opstamp_constraint {
|
||||
if expected_opstamp != segment_entry.opstamp() {
|
||||
panic!(format!("Invalid segment. Expect opstamp {}, got {}.", expected_opstamp, segment_entry.opstamp()));
|
||||
}
|
||||
}
|
||||
let segment_id = segment_entry.segment_id();
|
||||
self.segment_states.insert(segment_id, segment_entry);
|
||||
}
|
||||
|
||||
pub fn set_commit(&mut self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
|
||||
assert!(self.segment_states.is_empty());
|
||||
self.opstamp_constraint = Some(opstamp);
|
||||
for segment_entry in segment_entries {
|
||||
self.register_segment_entry(segment_entry);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove_segment(&mut self, segment_id: &SegmentId) {
|
||||
self.segment_states.remove(segment_id);
|
||||
}
|
||||
@@ -90,7 +104,10 @@ impl SegmentRegister {
|
||||
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None, opstamp);
|
||||
segment_states.insert(segment_id, segment_entry);
|
||||
}
|
||||
SegmentRegister { segment_states }
|
||||
SegmentRegister {
|
||||
segment_states,
|
||||
opstamp_constraint: Some(opstamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -299,7 +299,7 @@ impl SegmentUpdater {
|
||||
// ... obviously we do not save the meta file.
|
||||
} else {
|
||||
// Hard_commit. We register the new segment entries as committed.
|
||||
segment_updater.0.segment_manager.commit(segment_entries);
|
||||
segment_updater.0.segment_manager.commit(opstamp, segment_entries);
|
||||
segment_updater.save_metas(opstamp, payload);
|
||||
}
|
||||
segment_updater.garbage_collect_files_exec();
|
||||
|
||||
Reference in New Issue
Block a user