Prepare commit is public again (#1202)

- Simplified some of the prepare commit & segment updater code using
async.
- Made PrepareCommit public again.
This commit is contained in:
Paul Masurel
2021-11-12 23:25:39 +09:00
committed by GitHub
parent 33301a3eb4
commit 8802d125f8
5 changed files with 59 additions and 46 deletions

View File

@@ -465,10 +465,8 @@ impl IndexWriter {
}
/// Detects and removes the files that are not used by the index anymore.
pub fn garbage_collect_files(
&self,
) -> impl Future<Output = crate::Result<GarbageCollectionResult>> {
self.segment_updater.schedule_garbage_collect()
pub async fn garbage_collect_files(&self) -> crate::Result<GarbageCollectionResult> {
self.segment_updater.schedule_garbage_collect().await
}
/// Deletes all documents from the index
@@ -607,7 +605,7 @@ impl IndexWriter {
/// It is also possible to add a payload to the `commit`
/// using this API.
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
pub(crate) fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
// Here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.

View File

@@ -29,7 +29,7 @@ pub use self::index_writer::IndexWriter;
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
pub(crate) use self::prepared_commit::PreparedCommit;
pub use self::prepared_commit::PreparedCommit;
pub use self::segment_entry::SegmentEntry;
pub use self::segment_manager::SegmentManager;
pub use self::segment_serializer::SegmentSerializer;

View File

@@ -3,7 +3,7 @@ use crate::Opstamp;
use futures::executor::block_on;
/// A prepared commit
pub(crate) struct PreparedCommit<'a> {
pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter,
payload: Option<String>,
opstamp: Opstamp,
@@ -18,25 +18,38 @@ impl<'a> PreparedCommit<'a> {
}
}
pub(crate) fn opstamp(&self) -> Opstamp {
/// Returns the opstamp associated to the prepared commit.
pub fn opstamp(&self) -> Opstamp {
self.opstamp
}
pub(crate) fn set_payload(&mut self, payload: &str) {
/// Adds an arbitrary payload to the commit.
pub fn set_payload(&mut self, payload: &str) {
self.payload = Some(payload.to_string())
}
pub(crate) fn abort(self) -> crate::Result<Opstamp> {
/// Rollbacks any change.
pub fn abort(self) -> crate::Result<Opstamp> {
self.index_writer.rollback()
}
pub(crate) fn commit(self) -> crate::Result<Opstamp> {
/// Proceeds to commit.
/// See `.commit_async()`.
pub fn commit(self) -> crate::Result<Opstamp> {
block_on(self.commit_async())
}
/// Proceeds to commit.
///
/// Unfortunately, contrary to what `PrepareCommit` may suggests,
/// this operation is not at all really light.
/// At this point deletes have not been flushed yet.
pub async fn commit_async(self) -> crate::Result<Opstamp> {
info!("committing {}", self.opstamp);
block_on(
self.index_writer
.segment_updater()
.schedule_commit(self.opstamp, self.payload),
)?;
self.index_writer
.segment_updater()
.schedule_commit(self.opstamp, self.payload)
.await?;
Ok(self.opstamp)
}
}

View File

@@ -351,37 +351,39 @@ impl SegmentUpdater {
*self.merge_policy.write().unwrap() = arc_merge_policy;
}
fn schedule_future<T: 'static + Send, F: Future<Output = crate::Result<T>> + 'static + Send>(
async fn schedule_task<
T: 'static + Send,
F: Future<Output = crate::Result<T>> + 'static + Send,
>(
&self,
f: F,
) -> impl Future<Output = crate::Result<T>> {
let (sender, receiver) = oneshot::channel();
if self.is_alive() {
self.pool.spawn_ok(async move {
let _ = sender.send(f.await);
});
} else {
let _ = sender.send(Err(crate::TantivyError::SystemError(
task: F,
) -> crate::Result<T> {
if !self.is_alive() {
return Err(crate::TantivyError::SystemError(
"Segment updater killed".to_string(),
)));
));
}
receiver.unwrap_or_else(|_| {
let (sender, receiver) = oneshot::channel();
self.pool.spawn_ok(async move {
let task_result = task.await;
let _ = sender.send(task_result);
});
let task_result = receiver.await;
task_result.unwrap_or_else(|_| {
let err_msg =
"A segment_updater future did not success. This should never happen.".to_string();
Err(crate::TantivyError::SystemError(err_msg))
})
}
pub fn schedule_add_segment(
&self,
segment_entry: SegmentEntry,
) -> impl Future<Output = crate::Result<()>> {
pub async fn schedule_add_segment(&self, segment_entry: SegmentEntry) -> crate::Result<()> {
let segment_updater = self.clone();
self.schedule_future(async move {
self.schedule_task(async move {
segment_updater.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options().await;
Ok(())
})
.await
}
/// Orders `SegmentManager` to remove all segments
@@ -448,11 +450,9 @@ impl SegmentUpdater {
Ok(())
}
pub fn schedule_garbage_collect(
&self,
) -> impl Future<Output = crate::Result<GarbageCollectionResult>> {
pub async fn schedule_garbage_collect(&self) -> crate::Result<GarbageCollectionResult> {
let garbage_collect_future = garbage_collect_files(self.clone());
self.schedule_future(garbage_collect_future)
self.schedule_task(garbage_collect_future).await
}
/// List the files that are useful to the index.
@@ -470,13 +470,13 @@ impl SegmentUpdater {
files
}
pub fn schedule_commit(
pub(crate) async fn schedule_commit(
&self,
opstamp: Opstamp,
payload: Option<String>,
) -> impl Future<Output = crate::Result<()>> {
) -> crate::Result<()> {
let segment_updater: SegmentUpdater = self.clone();
self.schedule_future(async move {
self.schedule_task(async move {
let segment_entries = segment_updater.purge_deletes(opstamp)?;
segment_updater.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload)?;
@@ -484,6 +484,7 @@ impl SegmentUpdater {
segment_updater.consider_merge_options().await;
Ok(())
})
.await
}
fn store_meta(&self, index_meta: &IndexMeta) {
@@ -611,14 +612,14 @@ impl SegmentUpdater {
}
}
fn end_merge(
async fn end_merge(
&self,
merge_operation: MergeOperation,
mut after_merge_segment_entry: SegmentEntry,
) -> impl Future<Output = crate::Result<SegmentMeta>> {
) -> crate::Result<SegmentMeta> {
let segment_updater = self.clone();
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
let end_merge_future = self.schedule_future(async move {
self.schedule_task(async move {
info!("End merge {:?}", after_merge_segment_entry.meta());
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
@@ -661,8 +662,9 @@ impl SegmentUpdater {
let _ = garbage_collect_files(segment_updater).await;
Ok(())
});
end_merge_future.map_ok(|_| after_merge_segment_meta)
})
.await?;
Ok(after_merge_segment_meta)
}
/// Wait for current merging threads.

View File

@@ -174,7 +174,7 @@ pub use crate::indexer::demuxer::*;
pub use crate::indexer::merge_filtered_segments;
pub use crate::indexer::merge_indices;
pub use crate::indexer::operation::UserOperation;
pub use crate::indexer::IndexWriter;
pub use crate::indexer::{IndexWriter, PreparedCommit};
pub use crate::postings::Postings;
pub use crate::reader::LeasedItem;
pub use crate::schema::{Document, Term};