diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index b2fa1d648..5861b5c46 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::borrow::BorrowMut; use std::collections::HashSet; use std::io::Write; @@ -23,7 +24,9 @@ use crate::indexer::{ DefaultMergePolicy, MergeCandidate, MergeOperation, MergePolicy, SegmentEntry, SegmentSerializer, }; -use crate::{FutureResult, Opstamp}; +use crate::{FutureResult, Opstamp, TantivyError}; + +const PANIC_CAUGHT: &str = "Panic caught in merge thread"; /// Save the index meta file. /// This operation is atomic: @@ -287,6 +290,15 @@ impl SegmentUpdater { let merge_thread_pool = ThreadPoolBuilder::new() .thread_name(|i| format!("merge_thread_{i}")) .num_threads(num_merge_threads) + .panic_handler(move |panic| { + // We don't print the panic content itself, + // it is already printed during the unwinding + if let Some(message) = panic.downcast_ref::<&str>() { + if *message != PANIC_CAUGHT { + error!("uncaught merge panic") + } + } + }) .build() .map_err(|_| { crate::TantivyError::SystemError( @@ -506,11 +518,34 @@ impl SegmentUpdater { // Its lifetime is used to track how many merging thread are currently running, // as well as which segment is currently in merge and therefore should not be // candidate for another merge. - match merge( - &segment_updater.index, - segment_entries, - merge_operation.target_opstamp(), - ) { + let merge_panic_res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + merge( + &segment_updater.index, + segment_entries, + merge_operation.target_opstamp(), + ) + })); + let merge_res = match merge_panic_res { + Ok(merge_res) => merge_res, + Err(panic_err) => { + let panic_str = if let Some(msg) = panic_err.downcast_ref::<&str>() { + *msg + } else if let Some(msg) = panic_err.downcast_ref::() { + msg.as_str() + } else { + "UNKNOWN" + }; + let _send_result = merging_future_send.send(Err(TantivyError::SystemError( + format!("Merge thread panicked: {panic_str}"), + ))); + // Resume unwinding because we forced unwind safety with + // `std::panic::AssertUnwindSafe` Use a specific message so + // the panic_handler can double check that we properly caught the panic. + let boxed_panic_message: Box = Box::new(PANIC_CAUGHT); + std::panic::resume_unwind(boxed_panic_message); + } + }; + match merge_res { Ok(after_merge_segment_entry) => { let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry); let _send_result = merging_future_send.send(res);