diff --git a/src/error.rs b/src/error.rs index 7b80b9cb2..8b3c3e5e0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -113,6 +113,8 @@ pub enum TantivyError { /// The user requested the current operation be cancelled #[error("User requested cancel")] Cancelled, + #[error("Segment Merging failed: {0:#?}")] + MergeErrors(Vec), } impl From for TantivyError { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 943dc6a84..88404c247 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -395,6 +395,10 @@ impl IndexWriter { error!("Some merging thread failed {e:?}"); } + let merge_errors = self.segment_updater.get_merge_errors(); + if !merge_errors.is_empty() { + return Err(TantivyError::MergeErrors(merge_errors)); + } result } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 0bdb62131..641f172f6 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -26,8 +26,6 @@ use crate::indexer::{ }; use crate::{FutureResult, Opstamp, TantivyError}; -const PANIC_CAUGHT: &str = "Panic caught in merge thread"; - /// Save the index meta file. /// This operation is atomic: /// Either @@ -327,6 +325,7 @@ pub(crate) struct InnerSegmentUpdater { active_index_meta: RwLock>, pool: ThreadPool, merge_thread_pool: ThreadPool, + merge_errors: Arc>>, index: Index, segment_manager: SegmentManager, @@ -380,6 +379,7 @@ impl SegmentUpdater { active_index_meta: RwLock::new(Arc::new(index_meta)), pool, merge_thread_pool, + merge_errors: Default::default(), index, segment_manager, merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())), @@ -594,6 +594,7 @@ impl SegmentUpdater { FutureResult::create("Merge operation failed."); let cancel = self.cancel.box_clone(); + let merge_errors = self.merge_errors.clone(); self.merge_thread_pool.spawn(move || { // The fact that `merge_operation` is moved here is important. // Its lifetime is used to track how many merging thread are currently running, @@ -618,6 +619,8 @@ impl SegmentUpdater { if cfg!(test) { panic!("{merge_error:?}"); } + + merge_errors.write().unwrap().push(merge_error.clone()); let _send_result = merging_future_send.send(Err(merge_error)); } } @@ -632,6 +635,10 @@ impl SegmentUpdater { .get_mergeable_segments(&merge_segment_ids) } + pub(crate) fn get_merge_errors(&self) -> Vec { + self.merge_errors.read().unwrap().clone() + } + fn consider_merge_options(&self) { let (mut committed_segments, mut uncommitted_segments) = self.get_mergeable_segments(); if committed_segments.len() == 1 && committed_segments[0].num_deleted_docs() == 0 {