Added soft commits

This commit is contained in:
Paul Masurel
2019-02-08 14:42:52 +09:00
parent 45e62d4329
commit 91e89714f4
5 changed files with 110 additions and 50 deletions

View File

@@ -549,6 +549,16 @@ impl IndexWriter {
/// using this API.
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
pub fn prepare_commit(&mut self) -> Result<PreparedCommit> {
info!("Preparing commit");
self.prepare_commit_internal(false)
}
pub(crate) fn prepare_commit_soft(&mut self) -> Result<PreparedCommit> {
info!("Preparing soft commit");
self.prepare_commit_internal(true)
}
pub(crate) fn prepare_commit_internal(&mut self, soft: bool) -> Result<PreparedCommit> {
// Here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
@@ -571,13 +581,13 @@ impl IndexWriter {
let indexing_worker_result = worker_handle
.join()
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
indexing_worker_result?;
// add a new worker for the next generation.
// add a new worker for the next generation, whether the worker failed or not.
self.add_indexing_worker()?;
indexing_worker_result?;
}
let commit_opstamp = self.stamper.stamp();
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
let prepared_commit = PreparedCommit::new(self, commit_opstamp, soft);
info!("Prepared commit {}", commit_opstamp);
Ok(prepared_commit)
}

View File

@@ -6,14 +6,16 @@ pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter,
payload: Option<String>,
opstamp: u64,
soft: bool
}
impl<'a> PreparedCommit<'a> {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64, soft: bool) -> PreparedCommit {
PreparedCommit {
index_writer,
payload: None,
opstamp,
soft
}
}
@@ -33,7 +35,7 @@ impl<'a> PreparedCommit<'a> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()
.commit(self.opstamp, self.payload)?;
.commit(self.opstamp, self.payload, self.soft)?;
Ok(self.opstamp)
}
}

View File

@@ -16,6 +16,14 @@ use Result as TantivyResult;
struct SegmentRegisters {
uncommitted: SegmentRegister,
committed: SegmentRegister,
// soft commits can advance committed segment to a future delete
// opstamp.
//
// In that case the same `SegmentId` can appear in both `committed`
// and in `committed_in_the_future`.
//
// TODO: which one should be considered for merges?
committed_in_the_future: SegmentRegister
}
/// The segment manager stores the list of segments
@@ -63,6 +71,7 @@ impl SegmentManager {
registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(segment_metas, delete_cursor),
committed_in_the_future: SegmentRegister::default()
}),
}
}
@@ -121,9 +130,34 @@ impl SegmentManager {
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
registers_lock.committed.clear();
registers_lock.committed_in_the_future.clear();
registers_lock.uncommitted.clear();
for segment_entry in segment_entries {
registers_lock.committed.add_segment_entry(segment_entry);
registers_lock.committed.register_segment_entry(segment_entry);
}
}
pub fn soft_commit(&self, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
for segment_entry in segment_entries {
let segment_id = segment_entry.segment_id();
if let Some(committed_segment_entry) = registers_lock.committed.get(&segment_id) {
// this is a committed segment.
if committed_segment_entry.meta().delete_opstamp() == segment_entry.meta().delete_opstamp() {
// Actually, there was no change made to the segment...No need to do anything.
continue;
}
// Our `segment_entry` is a commited in which *future* deletes (as in, sent after the last
// commit)
// Let's append it to a dedicated register for that.
registers_lock.committed_in_the_future.register_segment_entry(segment_entry);
// TODO make sure we use `committed_in_the_future` segments,
// when we `commit`, to avoid replaying deletes several times.
} else if let Some(uncommitted_segment) = registers_lock.uncommitted.get(&segment_id) {
// This will override our previous entry.
registers_lock.uncommitted.register_segment_entry(segment_entry);
}
}
}
@@ -160,7 +194,7 @@ impl SegmentManager {
pub fn add_segment(&self, segment_entry: SegmentEntry) {
let mut registers_lock = self.write();
registers_lock.uncommitted.add_segment_entry(segment_entry);
registers_lock.uncommitted.register_segment_entry(segment_entry);
}
pub fn end_merge(
@@ -188,7 +222,7 @@ impl SegmentManager {
for segment_id in before_merge_segment_ids {
target_register.remove_segment(segment_id);
}
target_register.add_segment_entry(after_merge_segment_entry);
target_register.register_segment_entry(after_merge_segment_entry);
}
pub fn committed_segment_metas(&self) -> Vec<SegmentMeta> {

View File

@@ -66,7 +66,11 @@ impl SegmentRegister {
.all(|segment_id| self.segment_states.contains_key(segment_id))
}
pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) {
/// Registers a `SegmentEntry`.
///
/// If a segment entry associated to this `SegmentId` is already there,
/// override it with the new `SegmentEntry`.
pub fn register_segment_entry(&mut self, segment_entry: SegmentEntry) {
let segment_id = segment_entry.segment_id();
self.segment_states.insert(segment_id, segment_entry);
}
@@ -117,20 +121,20 @@ mod tests {
{
let segment_meta = SegmentMeta::new(segment_id_a, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
segment_register.register_segment_entry(segment_entry);
}
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
{
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
segment_register.register_segment_entry(segment_entry);
}
segment_register.remove_segment(&segment_id_a);
segment_register.remove_segment(&segment_id_b);
{
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32);
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
segment_register.register_segment_entry(segment_entry);
}
assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]);
}

View File

@@ -220,10 +220,9 @@ impl SegmentUpdater {
!self.0.killed.load(Ordering::Acquire)
}
/// Apply deletes up to the target opstamp to all segments.
/// Apply deletes up to the target opstamp to all segments (committed and uncommitted).
///
/// Tne method returns copies of the segment entries,
/// updated with the delete information.
/// Tne method returns copies of the segment entries, updated with the delete information.
fn purge_deletes(&self, target_opstamp: u64) -> Result<Vec<SegmentEntry>> {
let mut segment_entries = self.0.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
@@ -234,35 +233,36 @@ impl SegmentUpdater {
}
pub fn save_metas(&self, opstamp: u64, commit_message: Option<String>) {
if self.is_alive() {
let index = &self.0.index;
let directory = index.directory();
let mut commited_segment_metas = self.0.segment_manager.committed_segment_metas();
// We sort segment_readers by number of documents.
// This is an heuristic to make multithreading more efficient.
//
// This is not done at the searcher level because I had a strange
// use case in which I was dealing with a large static index,
// dispatched over 5 SSD drives.
//
// A `UnionDirectory` makes it possible to read from these
// 5 different drives and creates a meta.json on the fly.
// In order to optimize the throughput, it creates a lasagna of segments
// from the different drives.
//
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
let index_meta = IndexMeta {
segments: commited_segment_metas,
schema: index.schema(),
opstamp,
payload: commit_message,
};
save_metas(&index_meta, directory.box_clone().borrow_mut())
.expect("Could not save metas.");
self.store_meta(&index_meta);
if !self.is_alive() {
return;
}
let index = &self.0.index;
let directory = index.directory();
let mut commited_segment_metas = self.0.segment_manager.committed_segment_metas();
// We sort segment_readers by number of documents.
// This is an heuristic to make multithreading more efficient.
//
// This is not done at the searcher level because I had a strange
// use case in which I was dealing with a large static index,
// dispatched over 5 SSD drives.
//
// A `UnionDirectory` makes it possible to read from these
// 5 different drives and creates a meta.json on the fly.
// In order to optimize the throughput, it creates a lasagna of segments
// from the different drives.
//
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
let index_meta = IndexMeta {
segments: commited_segment_metas,
schema: index.schema(),
opstamp,
payload: commit_message,
};
save_metas(&index_meta, directory.box_clone().borrow_mut())
.expect("Could not save metas.");
self.store_meta(&index_meta);
}
pub fn garbage_collect_files(&self) -> Result<()> {
@@ -280,17 +280,27 @@ impl SegmentUpdater {
.garbage_collect(|| self.0.segment_manager.list_files());
}
pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
pub fn commit(&self, opstamp: u64, payload: Option<String>, soft: bool) -> Result<()> {
self.run_async(move |segment_updater| {
if segment_updater.is_alive() {
let segment_entries = segment_updater
.purge_deletes(opstamp)
.expect("Failed purge deletes");
let segment_entries = segment_updater
.purge_deletes(opstamp)
.expect("Failed purge deletes");
if soft {
// Soft commit.
//
// The list `segment_entries` above is what we might want to use as searchable
// segment. However, we do not want to mark them as committed, and we want
// to keep the current set of committed segment.
segment_updater.0.segment_manager.soft_commit(segment_entries);
// ... obviously we do not save the meta file.
} else {
// Hard_commit. We register the new segment entries as committed.
segment_updater.0.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload);
segment_updater.garbage_collect_files_exec();
segment_updater.consider_merge_options();
}
segment_updater.garbage_collect_files_exec();
segment_updater.consider_merge_options();
})
.wait()
}