From 91e89714f433bc221df99e13b45617fee3b8b7a5 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 8 Feb 2019 14:42:52 +0900 Subject: [PATCH] Added soft commits --- src/indexer/index_writer.rs | 16 ++++-- src/indexer/prepared_commit.rs | 6 ++- src/indexer/segment_manager.rs | 40 +++++++++++++-- src/indexer/segment_register.rs | 12 +++-- src/indexer/segment_updater.rs | 86 ++++++++++++++++++--------------- 5 files changed, 110 insertions(+), 50 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 2d74b46fe..83fe75848 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -549,6 +549,16 @@ impl IndexWriter { /// using this API. /// See [`PreparedCommit::set_payload()`](PreparedCommit.html) pub fn prepare_commit(&mut self) -> Result { + info!("Preparing commit"); + self.prepare_commit_internal(false) + } + + pub(crate) fn prepare_commit_soft(&mut self) -> Result { + info!("Preparing soft commit"); + self.prepare_commit_internal(true) + } + + pub(crate) fn prepare_commit_internal(&mut self, soft: bool) -> Result { // Here, because we join all of the worker threads, // all of the segment update for this commit have been // sent. @@ -571,13 +581,13 @@ impl IndexWriter { let indexing_worker_result = worker_handle .join() .map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?; - indexing_worker_result?; - // add a new worker for the next generation. + // add a new worker for the next generation, whether the worker failed or not. self.add_indexing_worker()?; + indexing_worker_result?; } let commit_opstamp = self.stamper.stamp(); - let prepared_commit = PreparedCommit::new(self, commit_opstamp); + let prepared_commit = PreparedCommit::new(self, commit_opstamp, soft); info!("Prepared commit {}", commit_opstamp); Ok(prepared_commit) } diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index 4728af01a..c6a0bdd30 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -6,14 +6,16 @@ pub struct PreparedCommit<'a> { index_writer: &'a mut IndexWriter, payload: Option, opstamp: u64, + soft: bool } impl<'a> PreparedCommit<'a> { - pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit { + pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64, soft: bool) -> PreparedCommit { PreparedCommit { index_writer, payload: None, opstamp, + soft } } @@ -33,7 +35,7 @@ impl<'a> PreparedCommit<'a> { info!("committing {}", self.opstamp); self.index_writer .segment_updater() - .commit(self.opstamp, self.payload)?; + .commit(self.opstamp, self.payload, self.soft)?; Ok(self.opstamp) } } diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 4e3a7e7e4..1b84a0d0b 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -16,6 +16,14 @@ use Result as TantivyResult; struct SegmentRegisters { uncommitted: SegmentRegister, committed: SegmentRegister, + // soft commits can advance committed segment to a future delete + // opstamp. + // + // In that case the same `SegmentId` can appear in both `committed` + // and in `committed_in_the_future`. + // + // TODO: which one should be considered for merges? + committed_in_the_future: SegmentRegister } /// The segment manager stores the list of segments @@ -63,6 +71,7 @@ impl SegmentManager { registers: RwLock::new(SegmentRegisters { uncommitted: SegmentRegister::default(), committed: SegmentRegister::new(segment_metas, delete_cursor), + committed_in_the_future: SegmentRegister::default() }), } } @@ -121,9 +130,34 @@ impl SegmentManager { pub fn commit(&self, segment_entries: Vec) { let mut registers_lock = self.write(); registers_lock.committed.clear(); + registers_lock.committed_in_the_future.clear(); registers_lock.uncommitted.clear(); for segment_entry in segment_entries { - registers_lock.committed.add_segment_entry(segment_entry); + registers_lock.committed.register_segment_entry(segment_entry); + } + } + + pub fn soft_commit(&self, segment_entries: Vec) { + let mut registers_lock = self.write(); + for segment_entry in segment_entries { + let segment_id = segment_entry.segment_id(); + if let Some(committed_segment_entry) = registers_lock.committed.get(&segment_id) { + // this is a committed segment. + if committed_segment_entry.meta().delete_opstamp() == segment_entry.meta().delete_opstamp() { + // Actually, there was no change made to the segment...No need to do anything. + continue; + } + // Our `segment_entry` is a commited in which *future* deletes (as in, sent after the last + // commit) + // Let's append it to a dedicated register for that. + registers_lock.committed_in_the_future.register_segment_entry(segment_entry); + // TODO make sure we use `committed_in_the_future` segments, + // when we `commit`, to avoid replaying deletes several times. + + } else if let Some(uncommitted_segment) = registers_lock.uncommitted.get(&segment_id) { + // This will override our previous entry. + registers_lock.uncommitted.register_segment_entry(segment_entry); + } } } @@ -160,7 +194,7 @@ impl SegmentManager { pub fn add_segment(&self, segment_entry: SegmentEntry) { let mut registers_lock = self.write(); - registers_lock.uncommitted.add_segment_entry(segment_entry); + registers_lock.uncommitted.register_segment_entry(segment_entry); } pub fn end_merge( @@ -188,7 +222,7 @@ impl SegmentManager { for segment_id in before_merge_segment_ids { target_register.remove_segment(segment_id); } - target_register.add_segment_entry(after_merge_segment_entry); + target_register.register_segment_entry(after_merge_segment_entry); } pub fn committed_segment_metas(&self) -> Vec { diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 74234f2de..eb6ab7590 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -66,7 +66,11 @@ impl SegmentRegister { .all(|segment_id| self.segment_states.contains_key(segment_id)) } - pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) { + /// Registers a `SegmentEntry`. + /// + /// If a segment entry associated to this `SegmentId` is already there, + /// override it with the new `SegmentEntry`. + pub fn register_segment_entry(&mut self, segment_entry: SegmentEntry) { let segment_id = segment_entry.segment_id(); self.segment_states.insert(segment_id, segment_entry); } @@ -117,20 +121,20 @@ mod tests { { let segment_meta = SegmentMeta::new(segment_id_a, 0u32); let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); - segment_register.add_segment_entry(segment_entry); + segment_register.register_segment_entry(segment_entry); } assert_eq!(segment_ids(&segment_register), vec![segment_id_a]); { let segment_meta = SegmentMeta::new(segment_id_b, 0u32); let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); - segment_register.add_segment_entry(segment_entry); + segment_register.register_segment_entry(segment_entry); } segment_register.remove_segment(&segment_id_a); segment_register.remove_segment(&segment_id_b); { let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32); let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None); - segment_register.add_segment_entry(segment_entry); + segment_register.register_segment_entry(segment_entry); } assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]); } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 9f8e9577f..c0d65e375 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -220,10 +220,9 @@ impl SegmentUpdater { !self.0.killed.load(Ordering::Acquire) } - /// Apply deletes up to the target opstamp to all segments. + /// Apply deletes up to the target opstamp to all segments (committed and uncommitted). /// - /// Tne method returns copies of the segment entries, - /// updated with the delete information. + /// Tne method returns copies of the segment entries, updated with the delete information. fn purge_deletes(&self, target_opstamp: u64) -> Result> { let mut segment_entries = self.0.segment_manager.segment_entries(); for segment_entry in &mut segment_entries { @@ -234,35 +233,36 @@ impl SegmentUpdater { } pub fn save_metas(&self, opstamp: u64, commit_message: Option) { - if self.is_alive() { - let index = &self.0.index; - let directory = index.directory(); - let mut commited_segment_metas = self.0.segment_manager.committed_segment_metas(); - - // We sort segment_readers by number of documents. - // This is an heuristic to make multithreading more efficient. - // - // This is not done at the searcher level because I had a strange - // use case in which I was dealing with a large static index, - // dispatched over 5 SSD drives. - // - // A `UnionDirectory` makes it possible to read from these - // 5 different drives and creates a meta.json on the fly. - // In order to optimize the throughput, it creates a lasagna of segments - // from the different drives. - // - // Segment 1 from disk 1, Segment 1 from disk 2, etc. - commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32)); - let index_meta = IndexMeta { - segments: commited_segment_metas, - schema: index.schema(), - opstamp, - payload: commit_message, - }; - save_metas(&index_meta, directory.box_clone().borrow_mut()) - .expect("Could not save metas."); - self.store_meta(&index_meta); + if !self.is_alive() { + return; } + let index = &self.0.index; + let directory = index.directory(); + let mut commited_segment_metas = self.0.segment_manager.committed_segment_metas(); + + // We sort segment_readers by number of documents. + // This is an heuristic to make multithreading more efficient. + // + // This is not done at the searcher level because I had a strange + // use case in which I was dealing with a large static index, + // dispatched over 5 SSD drives. + // + // A `UnionDirectory` makes it possible to read from these + // 5 different drives and creates a meta.json on the fly. + // In order to optimize the throughput, it creates a lasagna of segments + // from the different drives. + // + // Segment 1 from disk 1, Segment 1 from disk 2, etc. + commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32)); + let index_meta = IndexMeta { + segments: commited_segment_metas, + schema: index.schema(), + opstamp, + payload: commit_message, + }; + save_metas(&index_meta, directory.box_clone().borrow_mut()) + .expect("Could not save metas."); + self.store_meta(&index_meta); } pub fn garbage_collect_files(&self) -> Result<()> { @@ -280,17 +280,27 @@ impl SegmentUpdater { .garbage_collect(|| self.0.segment_manager.list_files()); } - pub fn commit(&self, opstamp: u64, payload: Option) -> Result<()> { + pub fn commit(&self, opstamp: u64, payload: Option, soft: bool) -> Result<()> { self.run_async(move |segment_updater| { - if segment_updater.is_alive() { - let segment_entries = segment_updater - .purge_deletes(opstamp) - .expect("Failed purge deletes"); + let segment_entries = segment_updater + .purge_deletes(opstamp) + .expect("Failed purge deletes"); + if soft { + // Soft commit. + // + // The list `segment_entries` above is what we might want to use as searchable + // segment. However, we do not want to mark them as committed, and we want + // to keep the current set of committed segment. + segment_updater.0.segment_manager.soft_commit(segment_entries); + // ... obviously we do not save the meta file. + } else { + // Hard_commit. We register the new segment entries as committed. segment_updater.0.segment_manager.commit(segment_entries); segment_updater.save_metas(opstamp, payload); - segment_updater.garbage_collect_files_exec(); - segment_updater.consider_merge_options(); } + segment_updater.garbage_collect_files_exec(); + segment_updater.consider_merge_options(); + }) .wait() }