From 8802d125f84fbd6dc1c3ef632bfa76d2cfa647ba Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 12 Nov 2021 23:25:39 +0900 Subject: [PATCH] Prepare commit is public again (#1202) - Simplified some of the prepare commit & segment updater code using async. - Made PrepareCommit public again. --- src/indexer/index_writer.rs | 8 ++--- src/indexer/mod.rs | 2 +- src/indexer/prepared_commit.rs | 33 +++++++++++++------ src/indexer/segment_updater.rs | 60 ++++++++++++++++++---------------- src/lib.rs | 2 +- 5 files changed, 59 insertions(+), 46 deletions(-) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 0a8127b60..3ee60c875 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -465,10 +465,8 @@ impl IndexWriter { } /// Detects and removes the files that are not used by the index anymore. - pub fn garbage_collect_files( - &self, - ) -> impl Future> { - self.segment_updater.schedule_garbage_collect() + pub async fn garbage_collect_files(&self) -> crate::Result { + self.segment_updater.schedule_garbage_collect().await } /// Deletes all documents from the index @@ -607,7 +605,7 @@ impl IndexWriter { /// It is also possible to add a payload to the `commit` /// using this API. /// See [`PreparedCommit::set_payload()`](PreparedCommit.html) - pub(crate) fn prepare_commit(&mut self) -> crate::Result { + pub fn prepare_commit(&mut self) -> crate::Result { // Here, because we join all of the worker threads, // all of the segment update for this commit have been // sent. diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index a29671fc2..d8bec5200 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -29,7 +29,7 @@ pub use self::index_writer::IndexWriter; pub use self::log_merge_policy::LogMergePolicy; pub use self::merge_operation::MergeOperation; pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy}; -pub(crate) use self::prepared_commit::PreparedCommit; +pub use self::prepared_commit::PreparedCommit; pub use self::segment_entry::SegmentEntry; pub use self::segment_manager::SegmentManager; pub use self::segment_serializer::SegmentSerializer; diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index 510a19950..4ad71178a 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -3,7 +3,7 @@ use crate::Opstamp; use futures::executor::block_on; /// A prepared commit -pub(crate) struct PreparedCommit<'a> { +pub struct PreparedCommit<'a> { index_writer: &'a mut IndexWriter, payload: Option, opstamp: Opstamp, @@ -18,25 +18,38 @@ impl<'a> PreparedCommit<'a> { } } - pub(crate) fn opstamp(&self) -> Opstamp { + /// Returns the opstamp associated to the prepared commit. + pub fn opstamp(&self) -> Opstamp { self.opstamp } - pub(crate) fn set_payload(&mut self, payload: &str) { + /// Adds an arbitrary payload to the commit. + pub fn set_payload(&mut self, payload: &str) { self.payload = Some(payload.to_string()) } - pub(crate) fn abort(self) -> crate::Result { + /// Rollbacks any change. + pub fn abort(self) -> crate::Result { self.index_writer.rollback() } - pub(crate) fn commit(self) -> crate::Result { + /// Proceeds to commit. + /// See `.commit_async()`. + pub fn commit(self) -> crate::Result { + block_on(self.commit_async()) + } + + /// Proceeds to commit. + /// + /// Unfortunately, contrary to what `PrepareCommit` may suggests, + /// this operation is not at all really light. + /// At this point deletes have not been flushed yet. + pub async fn commit_async(self) -> crate::Result { info!("committing {}", self.opstamp); - block_on( - self.index_writer - .segment_updater() - .schedule_commit(self.opstamp, self.payload), - )?; + self.index_writer + .segment_updater() + .schedule_commit(self.opstamp, self.payload) + .await?; Ok(self.opstamp) } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 4ae24c53c..f7f7518db 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -351,37 +351,39 @@ impl SegmentUpdater { *self.merge_policy.write().unwrap() = arc_merge_policy; } - fn schedule_future> + 'static + Send>( + async fn schedule_task< + T: 'static + Send, + F: Future> + 'static + Send, + >( &self, - f: F, - ) -> impl Future> { - let (sender, receiver) = oneshot::channel(); - if self.is_alive() { - self.pool.spawn_ok(async move { - let _ = sender.send(f.await); - }); - } else { - let _ = sender.send(Err(crate::TantivyError::SystemError( + task: F, + ) -> crate::Result { + if !self.is_alive() { + return Err(crate::TantivyError::SystemError( "Segment updater killed".to_string(), - ))); + )); } - receiver.unwrap_or_else(|_| { + let (sender, receiver) = oneshot::channel(); + self.pool.spawn_ok(async move { + let task_result = task.await; + let _ = sender.send(task_result); + }); + let task_result = receiver.await; + task_result.unwrap_or_else(|_| { let err_msg = "A segment_updater future did not success. This should never happen.".to_string(); Err(crate::TantivyError::SystemError(err_msg)) }) } - pub fn schedule_add_segment( - &self, - segment_entry: SegmentEntry, - ) -> impl Future> { + pub async fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> crate::Result<()> { let segment_updater = self.clone(); - self.schedule_future(async move { + self.schedule_task(async move { segment_updater.segment_manager.add_segment(segment_entry); segment_updater.consider_merge_options().await; Ok(()) }) + .await } /// Orders `SegmentManager` to remove all segments @@ -448,11 +450,9 @@ impl SegmentUpdater { Ok(()) } - pub fn schedule_garbage_collect( - &self, - ) -> impl Future> { + pub async fn schedule_garbage_collect(&self) -> crate::Result { let garbage_collect_future = garbage_collect_files(self.clone()); - self.schedule_future(garbage_collect_future) + self.schedule_task(garbage_collect_future).await } /// List the files that are useful to the index. @@ -470,13 +470,13 @@ impl SegmentUpdater { files } - pub fn schedule_commit( + pub(crate) async fn schedule_commit( &self, opstamp: Opstamp, payload: Option, - ) -> impl Future> { + ) -> crate::Result<()> { let segment_updater: SegmentUpdater = self.clone(); - self.schedule_future(async move { + self.schedule_task(async move { let segment_entries = segment_updater.purge_deletes(opstamp)?; segment_updater.segment_manager.commit(segment_entries); segment_updater.save_metas(opstamp, payload)?; @@ -484,6 +484,7 @@ impl SegmentUpdater { segment_updater.consider_merge_options().await; Ok(()) }) + .await } fn store_meta(&self, index_meta: &IndexMeta) { @@ -611,14 +612,14 @@ impl SegmentUpdater { } } - fn end_merge( + async fn end_merge( &self, merge_operation: MergeOperation, mut after_merge_segment_entry: SegmentEntry, - ) -> impl Future> { + ) -> crate::Result { let segment_updater = self.clone(); let after_merge_segment_meta = after_merge_segment_entry.meta().clone(); - let end_merge_future = self.schedule_future(async move { + self.schedule_task(async move { info!("End merge {:?}", after_merge_segment_entry.meta()); { let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); @@ -661,8 +662,9 @@ impl SegmentUpdater { let _ = garbage_collect_files(segment_updater).await; Ok(()) - }); - end_merge_future.map_ok(|_| after_merge_segment_meta) + }) + .await?; + Ok(after_merge_segment_meta) } /// Wait for current merging threads. diff --git a/src/lib.rs b/src/lib.rs index 3402cb2dd..f014103db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,7 +174,7 @@ pub use crate::indexer::demuxer::*; pub use crate::indexer::merge_filtered_segments; pub use crate::indexer::merge_indices; pub use crate::indexer::operation::UserOperation; -pub use crate::indexer::IndexWriter; +pub use crate::indexer::{IndexWriter, PreparedCommit}; pub use crate::postings::Postings; pub use crate::reader::LeasedItem; pub use crate::schema::{Document, Term};