feat: IndexWriter::wait_merging_threads() return Err on merge failure (#34)

This commit is contained in:
Eric Ridge
2025-03-24 10:50:16 -04:00
committed by Stu Hood
parent 75a8384c2b
commit 2560de3a01
3 changed files with 15 additions and 2 deletions

View File

@@ -113,6 +113,8 @@ pub enum TantivyError {
/// The user requested the current operation be cancelled
#[error("User requested cancel")]
Cancelled,
#[error("Segment Merging failed: {0:#?}")]
MergeErrors(Vec<TantivyError>),
}
impl From<io::Error> for TantivyError {

View File

@@ -395,6 +395,10 @@ impl<D: Document> IndexWriter<D> {
error!("Some merging thread failed {e:?}");
}
let merge_errors = self.segment_updater.get_merge_errors();
if !merge_errors.is_empty() {
return Err(TantivyError::MergeErrors(merge_errors));
}
result
}

View File

@@ -26,8 +26,6 @@ use crate::indexer::{
};
use crate::{FutureResult, Opstamp, TantivyError};
const PANIC_CAUGHT: &str = "Panic caught in merge thread";
/// Save the index meta file.
/// This operation is atomic:
/// Either
@@ -327,6 +325,7 @@ pub(crate) struct InnerSegmentUpdater {
active_index_meta: RwLock<Arc<IndexMeta>>,
pool: ThreadPool,
merge_thread_pool: ThreadPool,
merge_errors: Arc<RwLock<Vec<TantivyError>>>,
index: Index,
segment_manager: SegmentManager,
@@ -380,6 +379,7 @@ impl SegmentUpdater {
active_index_meta: RwLock::new(Arc::new(index_meta)),
pool,
merge_thread_pool,
merge_errors: Default::default(),
index,
segment_manager,
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
@@ -594,6 +594,7 @@ impl SegmentUpdater {
FutureResult::create("Merge operation failed.");
let cancel = self.cancel.box_clone();
let merge_errors = self.merge_errors.clone();
self.merge_thread_pool.spawn(move || {
// The fact that `merge_operation` is moved here is important.
// Its lifetime is used to track how many merging thread are currently running,
@@ -618,6 +619,8 @@ impl SegmentUpdater {
if cfg!(test) {
panic!("{merge_error:?}");
}
merge_errors.write().unwrap().push(merge_error.clone());
let _send_result = merging_future_send.send(Err(merge_error));
}
}
@@ -632,6 +635,10 @@ impl SegmentUpdater {
.get_mergeable_segments(&merge_segment_ids)
}
pub(crate) fn get_merge_errors(&self) -> Vec<TantivyError> {
self.merge_errors.read().unwrap().clone()
}
fn consider_merge_options(&self) {
let (mut committed_segments, mut uncommitted_segments) = self.get_mergeable_segments();
if committed_segments.len() == 1 && committed_segments[0].num_deleted_docs() == 0 {