Catch panics during merges (#2582)

* Adding panic handler for the rayon merge thread pool

* Return panic message in error

---------

Co-authored-by: Paul Masurel <paul.masurel@datadoghq.com>
This commit is contained in:
Remi Dettai
2025-03-05 10:36:48 +01:00
committed by GitHub
parent c48c649436
commit 89b052cd42

View File

@@ -1,3 +1,4 @@
use std::any::Any;
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::io::Write;
@@ -23,7 +24,9 @@ use crate::indexer::{
DefaultMergePolicy, MergeCandidate, MergeOperation, MergePolicy, SegmentEntry,
SegmentSerializer,
};
use crate::{FutureResult, Opstamp};
use crate::{FutureResult, Opstamp, TantivyError};
const PANIC_CAUGHT: &str = "Panic caught in merge thread";
/// Save the index meta file.
/// This operation is atomic:
@@ -287,6 +290,15 @@ impl SegmentUpdater {
let merge_thread_pool = ThreadPoolBuilder::new()
.thread_name(|i| format!("merge_thread_{i}"))
.num_threads(num_merge_threads)
.panic_handler(move |panic| {
// We don't print the panic content itself,
// it is already printed during the unwinding
if let Some(message) = panic.downcast_ref::<&str>() {
if *message != PANIC_CAUGHT {
error!("uncaught merge panic")
}
}
})
.build()
.map_err(|_| {
crate::TantivyError::SystemError(
@@ -506,11 +518,34 @@ impl SegmentUpdater {
// Its lifetime is used to track how many merging thread are currently running,
// as well as which segment is currently in merge and therefore should not be
// candidate for another merge.
match merge(
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
) {
let merge_panic_res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
merge(
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
)
}));
let merge_res = match merge_panic_res {
Ok(merge_res) => merge_res,
Err(panic_err) => {
let panic_str = if let Some(msg) = panic_err.downcast_ref::<&str>() {
*msg
} else if let Some(msg) = panic_err.downcast_ref::<String>() {
msg.as_str()
} else {
"UNKNOWN"
};
let _send_result = merging_future_send.send(Err(TantivyError::SystemError(
format!("Merge thread panicked: {panic_str}"),
)));
// Resume unwinding because we forced unwind safety with
// `std::panic::AssertUnwindSafe` Use a specific message so
// the panic_handler can double check that we properly caught the panic.
let boxed_panic_message: Box<dyn Any + Send> = Box::new(PANIC_CAUGHT);
std::panic::resume_unwind(boxed_panic_message);
}
};
match merge_res {
Ok(after_merge_segment_entry) => {
let res = segment_updater.end_merge(merge_operation, after_merge_segment_entry);
let _send_result = merging_future_send.send(res);