mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
Add job status
This commit is contained in:
@@ -6,9 +6,16 @@ pub trait Job: std::fmt::Debug + Send + Clone + 'static {
|
||||
fn run(&self);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum JobStatus {
|
||||
Ready,
|
||||
Running(Instant),
|
||||
Stuck, // TODO remember error
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct JobStatusTable<J: Job> {
|
||||
jobs: Vec<J>,
|
||||
jobs: Vec<(J, JobStatus)>,
|
||||
next: usize,
|
||||
begin: Instant,
|
||||
}
|
||||
@@ -37,23 +44,29 @@ impl<J: Job> Pool<J> {
|
||||
let mut job_table = self.job_table.lock().unwrap();
|
||||
loop {
|
||||
if job_table.next < job_table.jobs.len() {
|
||||
let job = job_table.jobs[job_table.next].clone();
|
||||
let curr = job_table.next;
|
||||
let (job, status) = job_table.jobs[curr].clone();
|
||||
job_table.next += 1;
|
||||
|
||||
// Run job without holding lock
|
||||
drop(job_table);
|
||||
// TODO don't run job if previous iteration is still running
|
||||
let result = panic::catch_unwind(AssertUnwindSafe(|| {
|
||||
job.run();
|
||||
}));
|
||||
job_table = self.job_table.lock().unwrap();
|
||||
if matches!(status, JobStatus::Ready) {
|
||||
// Run job without holding lock
|
||||
job_table.jobs[curr].1 = JobStatus::Running(Instant::now());
|
||||
drop(job_table);
|
||||
// TODO don't run job if previous iteration is still running
|
||||
let result = panic::catch_unwind(AssertUnwindSafe(|| {
|
||||
job.run();
|
||||
}));
|
||||
job_table = self.job_table.lock().unwrap();
|
||||
|
||||
match result {
|
||||
Ok(()) => {},
|
||||
Err(e) => {
|
||||
// TODO mark job as broken
|
||||
println!("Job panicked, thread is ok.");
|
||||
},
|
||||
match result {
|
||||
Ok(()) => {
|
||||
job_table.jobs[curr].1 = JobStatus::Ready;
|
||||
},
|
||||
Err(e) => {
|
||||
job_table.jobs[curr].1 = JobStatus::Stuck;
|
||||
println!("Job panicked, thread is ok.");
|
||||
},
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let since_last_cycle = Instant::now().duration_since(job_table.begin);
|
||||
@@ -71,7 +84,7 @@ impl<J: Job> Pool<J> {
|
||||
fn queue_job(&self, job: J) {
|
||||
// Add the job to the back of the queue
|
||||
let mut job_table = self.job_table.lock().unwrap();
|
||||
job_table.jobs.push(job);
|
||||
job_table.jobs.push((job, JobStatus::Ready));
|
||||
|
||||
// If the queue was empty, wake up the next worker thread to pick up
|
||||
// the job.
|
||||
|
||||
Reference in New Issue
Block a user