mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Handle panic
This commit is contained in:
@@ -1,4 +1,7 @@
|
||||
use std::{collections::VecDeque, sync::{Condvar, Mutex}};
|
||||
use std::{collections::VecDeque, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
// TODO maybe make jobs tenant-specific? Makes monitorin easier.
|
||||
|
||||
pub trait Job: std::fmt::Debug + Send + 'static {
|
||||
fn run(&self);
|
||||
@@ -30,10 +33,16 @@ impl<J: Job> Pool<J> {
|
||||
if let Some(command) = q.pop_front() {
|
||||
drop(q);
|
||||
match command {
|
||||
// TODO catch unwind. On error:
|
||||
// - report back so that only that tenant would get stuck
|
||||
WorkerCommand::DoJob(job) => job.run(),
|
||||
WorkerCommand::Shutdown => return Ok(()),
|
||||
WorkerCommand::DoJob(job) => {
|
||||
let result = panic::catch_unwind(AssertUnwindSafe(|| {
|
||||
job.run();
|
||||
}));
|
||||
if let Err(e) = result {
|
||||
// TODO mark job as broken
|
||||
println!("Job panicked, thread is ok.");
|
||||
}
|
||||
},
|
||||
}
|
||||
q = self.job_queue.lock().unwrap();
|
||||
} else {
|
||||
@@ -84,6 +93,9 @@ mod tests {
|
||||
|
||||
impl Job for PrintJob {
|
||||
fn run(&self) {
|
||||
if self.to_print == "pls panic" {
|
||||
panic!("AAA");
|
||||
}
|
||||
println!("{}", self.to_print);
|
||||
}
|
||||
}
|
||||
@@ -115,4 +127,34 @@ mod tests {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
TEST_POOL.get().unwrap().shutdown_one();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pool_panic() {
|
||||
TEST_POOL.set(Pool::<PrintJob>::new()).unwrap();
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GarbageCollector, // change this
|
||||
None,
|
||||
None,
|
||||
"test_worker_1",
|
||||
true,
|
||||
move || {
|
||||
TEST_POOL.get().unwrap().worker_main()
|
||||
},
|
||||
).unwrap();
|
||||
|
||||
let j = PrintJob {
|
||||
to_print: "hello from job".to_string(),
|
||||
};
|
||||
let panic = PrintJob {
|
||||
to_print: "pls panic".to_string(),
|
||||
};
|
||||
|
||||
TEST_POOL.get().unwrap().queue_job(panic.clone());
|
||||
TEST_POOL.get().unwrap().queue_job(j.clone());
|
||||
TEST_POOL.get().unwrap().queue_job(j.clone());
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
TEST_POOL.get().unwrap().shutdown_one();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user