From de55b2f139c319870dd722ec2498f1a66c0b7a90 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Sun, 22 May 2022 16:12:00 -0400 Subject: [PATCH] Handle panic --- pageserver/src/jobs.rs | 50 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/pageserver/src/jobs.rs b/pageserver/src/jobs.rs index bf6aeda96b..94f84b63d1 100644 --- a/pageserver/src/jobs.rs +++ b/pageserver/src/jobs.rs @@ -1,4 +1,7 @@ -use std::{collections::VecDeque, sync::{Condvar, Mutex}}; +use std::{collections::VecDeque, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}}; +use tracing::{debug, error, info, warn}; + +// TODO maybe make jobs tenant-specific? Makes monitorin easier. pub trait Job: std::fmt::Debug + Send + 'static { fn run(&self); @@ -30,10 +33,16 @@ impl Pool { if let Some(command) = q.pop_front() { drop(q); match command { - // TODO catch unwind. On error: - // - report back so that only that tenant would get stuck - WorkerCommand::DoJob(job) => job.run(), WorkerCommand::Shutdown => return Ok(()), + WorkerCommand::DoJob(job) => { + let result = panic::catch_unwind(AssertUnwindSafe(|| { + job.run(); + })); + if let Err(e) = result { + // TODO mark job as broken + println!("Job panicked, thread is ok."); + } + }, } q = self.job_queue.lock().unwrap(); } else { @@ -84,6 +93,9 @@ mod tests { impl Job for PrintJob { fn run(&self) { + if self.to_print == "pls panic" { + panic!("AAA"); + } println!("{}", self.to_print); } } @@ -115,4 +127,34 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; TEST_POOL.get().unwrap().shutdown_one(); } + + #[tokio::test] + async fn pool_panic() { + TEST_POOL.set(Pool::::new()).unwrap(); + + thread_mgr::spawn( + ThreadKind::GarbageCollector, // change this + None, + None, + "test_worker_1", + true, + move || { + TEST_POOL.get().unwrap().worker_main() + }, + ).unwrap(); + + let j = PrintJob { + to_print: "hello from job".to_string(), + }; + let panic = PrintJob { + to_print: "pls panic".to_string(), + }; + + TEST_POOL.get().unwrap().queue_job(panic.clone()); + TEST_POOL.get().unwrap().queue_job(j.clone()); + TEST_POOL.get().unwrap().queue_job(j.clone()); + + tokio::time::sleep(Duration::from_millis(100)).await; + TEST_POOL.get().unwrap().shutdown_one(); + } }