diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index 4856e8de43..433ff71d41 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -6,9 +6,16 @@ pub trait Job: std::fmt::Debug + Send + Clone + 'static { fn run(&self); } +#[derive(Debug, Clone)] +enum JobStatus { + Ready, + Running(Instant), + Stuck, // TODO remember error +} + #[derive(Debug)] struct JobStatusTable { - jobs: Vec, + jobs: Vec<(J, JobStatus)>, next: usize, begin: Instant, } @@ -37,23 +44,29 @@ impl Pool { let mut job_table = self.job_table.lock().unwrap(); loop { if job_table.next < job_table.jobs.len() { - let job = job_table.jobs[job_table.next].clone(); + let curr = job_table.next; + let (job, status) = job_table.jobs[curr].clone(); job_table.next += 1; - // Run job without holding lock - drop(job_table); - // TODO don't run job if previous iteration is still running - let result = panic::catch_unwind(AssertUnwindSafe(|| { - job.run(); - })); - job_table = self.job_table.lock().unwrap(); + if matches!(status, JobStatus::Ready) { + // Run job without holding lock + job_table.jobs[curr].1 = JobStatus::Running(Instant::now()); + drop(job_table); + // TODO don't run job if previous iteration is still running + let result = panic::catch_unwind(AssertUnwindSafe(|| { + job.run(); + })); + job_table = self.job_table.lock().unwrap(); - match result { - Ok(()) => {}, - Err(e) => { - // TODO mark job as broken - println!("Job panicked, thread is ok."); - }, + match result { + Ok(()) => { + job_table.jobs[curr].1 = JobStatus::Ready; + }, + Err(e) => { + job_table.jobs[curr].1 = JobStatus::Stuck; + println!("Job panicked, thread is ok."); + }, + } } } else { let since_last_cycle = Instant::now().duration_since(job_table.begin); @@ -71,7 +84,7 @@ impl Pool { fn queue_job(&self, job: J) { // Add the job to the back of the queue let mut job_table = self.job_table.lock().unwrap(); - job_table.jobs.push(job); + job_table.jobs.push((job, JobStatus::Ready)); // If the queue was empty, wake up the next worker thread to pick up // the job.