From 8a488b8315bdcf31033620f1c8e56c0669b063a4 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 3 Jan 2020 11:04:30 +0900 Subject: [PATCH] Added soft commits --- CHANGELOG.md | 4 ++++ src/indexer/index_writer.rs | 35 +++++++++++++++++++++++++++++----- src/indexer/prepared_commit.rs | 18 +++++++++++------ src/indexer/segment_updater.rs | 12 ++++++++---- src/reader/meta_file_reader.rs | 2 +- 5 files changed, 55 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 90a9e4edf..1b6e95a92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +Tantivy 0.12.1 +===================== +- By default IndexReader are in `Manual` mode. + Tantivy 0.12.0 ====================== - Removing static dispatch in tokenizers for simplicity. (#762) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index d53bb9ad3..6a40062a2 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -643,7 +643,7 @@ impl IndexWriter { /// It is also possible to add a payload to the `commit` /// using this API. /// See [`PreparedCommit::set_payload()`](PreparedCommit.html) - pub fn prepare_commit(&mut self) -> crate::Result { + pub fn prepare_commit(&mut self, soft_commit: bool) -> crate::Result { // Here, because we join all of the worker threads, // all of the segment update for this commit have been // sent. @@ -671,7 +671,7 @@ impl IndexWriter { } let commit_opstamp = self.stamper.stamp(); - let prepared_commit = PreparedCommit::new(self, commit_opstamp); + let prepared_commit = PreparedCommit::new(self, commit_opstamp, soft_commit); info!("Prepared commit {}", commit_opstamp); Ok(prepared_commit) } @@ -691,7 +691,11 @@ impl IndexWriter { /// that made it in the commit. /// pub fn commit(&mut self) -> crate::Result { - self.prepare_commit()?.commit() + self.prepare_commit(false)?.commit() + } + + pub fn soft_commit(&mut self) -> crate::Result { + self.prepare_commit(true)?.commit() } pub(crate) fn segment_updater(&self) -> &SegmentUpdater { @@ -1048,7 +1052,8 @@ mod tests { index_writer.add_document(doc!(text_field => "a")); } { - let mut prepared_commit = index_writer.prepare_commit().expect("commit failed"); + let mut prepared_commit = + index_writer.prepare_commit(false).expect("commit failed"); prepared_commit.set_payload("first commit"); prepared_commit.commit().expect("commit failed"); } @@ -1081,7 +1086,8 @@ mod tests { index_writer.add_document(doc!(text_field => "a")); } { - let mut prepared_commit = index_writer.prepare_commit().expect("commit failed"); + let mut prepared_commit = + index_writer.prepare_commit(false).expect("commit failed"); prepared_commit.set_payload("first commit"); prepared_commit.abort().expect("commit failed"); } @@ -1276,4 +1282,23 @@ mod tests { assert_eq!(reader.searcher().num_docs(), 2u64); assert_eq!(searcher.num_docs(), 1u64); } + + #[test] + fn test_index_writer_reader_soft_commit() { + let mut schema_builder = schema::Schema::builder(); + let idfield = schema_builder.add_text_field("id", STRING); + schema_builder.add_text_field("optfield", STRING); + let index = Index::create_in_ram(schema_builder.build()); + let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); + index_writer.add_document(doc!(idfield=>"myid")); + assert!(index_writer.soft_commit().is_ok()); + let nrt_reader = index_writer.reader(2).unwrap(); + let normal_reader = index.reader_builder().try_into().unwrap(); + assert_eq!(nrt_reader.searcher().num_docs(), 1u64); + assert_eq!(normal_reader.searcher().num_docs(), 0u64); + assert!(index_writer.commit().is_ok()); + assert!(normal_reader.reload().is_ok()); + assert_eq!(nrt_reader.searcher().num_docs(), 1u64); + assert_eq!(normal_reader.searcher().num_docs(), 1u64); + } } diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index a9e8f7bc6..3b17b5443 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -7,14 +7,20 @@ pub struct PreparedCommit<'a> { index_writer: &'a mut IndexWriter, payload: Option, opstamp: Opstamp, + soft_commit: bool, } impl<'a> PreparedCommit<'a> { - pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp) -> PreparedCommit<'_> { + pub(crate) fn new( + index_writer: &'a mut IndexWriter, + opstamp: Opstamp, + soft_commit: bool, + ) -> PreparedCommit { PreparedCommit { index_writer, payload: None, opstamp, + soft_commit, } } @@ -32,11 +38,11 @@ impl<'a> PreparedCommit<'a> { pub fn commit(self) -> crate::Result { info!("committing {}", self.opstamp); - block_on( - self.index_writer - .segment_updater() - .schedule_commit(self.opstamp, self.payload), - )?; + block_on(self.index_writer.segment_updater().schedule_commit( + self.opstamp, + self.payload, + self.soft_commit, + ))?; block_on(self.index_writer.trigger_commit()); Ok(self.opstamp) } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 119739dfa..f67b44165 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -330,17 +330,21 @@ impl SegmentUpdater { &self, opstamp: Opstamp, payload: Option, + soft_commit: bool, ) -> impl Future> { let segment_updater: SegmentUpdater = self.clone(); let directory = self.index.directory().clone(); self.schedule_future(async move { let mut segment_entries = segment_updater.purge_deletes(opstamp)?; - for segment_entry in &mut segment_entries { - let directory = directory.clone(); - segment_entry.persist(directory)?; + if !soft_commit { + for segment_entry in &mut segment_entries { + segment_entry.persist(directory.clone())?; + } } segment_updater.segment_manager.commit(segment_entries); - segment_updater.save_metas(opstamp, payload)?; + if !soft_commit { + segment_updater.save_metas(opstamp, payload)?; + } let _ = garbage_collect_files(segment_updater.clone()).await; segment_updater.consider_merge_options().await; Ok(()) diff --git a/src/reader/meta_file_reader.rs b/src/reader/meta_file_reader.rs index 4f30b6051..4595e0538 100644 --- a/src/reader/meta_file_reader.rs +++ b/src/reader/meta_file_reader.rs @@ -49,7 +49,7 @@ impl IndexReaderBuilder { pub(crate) fn new(index: Index) -> IndexReaderBuilder { IndexReaderBuilder { num_searchers: num_cpus::get(), - reload_policy: ReloadPolicy::OnCommit, + reload_policy: ReloadPolicy::Manual, index, } }