diff --git a/pageserver/src/tenant_jobs/mod.rs b/pageserver/src/tenant_jobs/mod.rs index f414520992..2c8b83993a 100644 --- a/pageserver/src/tenant_jobs/mod.rs +++ b/pageserver/src/tenant_jobs/mod.rs @@ -1,2 +1 @@ pub mod worker; -pub mod scheduler; diff --git a/pageserver/src/tenant_jobs/scheduler.rs b/pageserver/src/tenant_jobs/scheduler.rs deleted file mode 100644 index 64bac43b34..0000000000 --- a/pageserver/src/tenant_jobs/scheduler.rs +++ /dev/null @@ -1,14 +0,0 @@ -use std::time::Duration; - -use super::worker::Job; - - -enum JobStatus { - Active, - Stuck, -} - -struct Scheduler { - interval: Duration, - jobs: Vec<(J, JobStatus)> -} diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index 433ff71d41..4b8fd1bcd3 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, ops::Sub, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}}; +use std::{panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}}; // TODO maybe make jobs tenant-specific? Makes monitorin easier. @@ -9,22 +9,53 @@ pub trait Job: std::fmt::Debug + Send + Clone + 'static { #[derive(Debug, Clone)] enum JobStatus { Ready, - Running(Instant), + Running(Instant), // TODO add worker id Stuck, // TODO remember error } #[derive(Debug)] struct JobStatusTable { + // TODO this vec is no good. Too much index arithmetic. jobs: Vec<(J, JobStatus)>, next: usize, begin: Instant, + period: Duration, +} + +impl JobStatusTable { + fn next(&mut self) -> Option<(usize, J)> { + while self.next < self.jobs.len() { + let curr = self.next; + self.next += 1; + + match self.jobs[curr].1 { + JobStatus::Ready => { + self.jobs[curr].1 = JobStatus::Running(Instant::now()); + return Some((curr, self.jobs[curr].0.clone())) + } + JobStatus::Running(_) => println!("Job already running, skipping this round"), + JobStatus::Stuck => println!("Job stuck, skipping"), + } + } + None + } + + fn check_end_of_cycle(&mut self) -> Option { + let until_next = self.period.saturating_sub(Instant::now().duration_since(self.begin)); + if until_next.is_zero() { + self.next = 0; + self.begin = Instant::now(); + None + } else { + Some(until_next) + } + } } #[derive(Debug)] struct Pool { job_table: Mutex>, - condvar: Condvar, // Notified when queue becomes nonempty - period: Duration, + condvar: Condvar, // Notified when idle worker should wake up } impl Pool { @@ -34,48 +65,36 @@ impl Pool { jobs: vec![], next: 0, begin: Instant::now(), + period: Duration::from_millis(10), }), condvar: Condvar::new(), - period: Duration::from_millis(10), } } fn worker_main(&self) -> anyhow::Result<()> { let mut job_table = self.job_table.lock().unwrap(); loop { - if job_table.next < job_table.jobs.len() { - let curr = job_table.next; - let (job, status) = job_table.jobs[curr].clone(); - job_table.next += 1; + if let Some((id, job)) = job_table.next() { + // Run job without holding lock + drop(job_table); + let result = panic::catch_unwind(AssertUnwindSafe(|| { + job.run(); + })); + job_table = self.job_table.lock().unwrap(); - if matches!(status, JobStatus::Ready) { - // Run job without holding lock - job_table.jobs[curr].1 = JobStatus::Running(Instant::now()); - drop(job_table); - // TODO don't run job if previous iteration is still running - let result = panic::catch_unwind(AssertUnwindSafe(|| { - job.run(); - })); - job_table = self.job_table.lock().unwrap(); - - match result { - Ok(()) => { - job_table.jobs[curr].1 = JobStatus::Ready; - }, - Err(e) => { - job_table.jobs[curr].1 = JobStatus::Stuck; - println!("Job panicked, thread is ok."); - }, - } + // Update job status + match result { + Ok(()) => { + job_table.jobs[id].1 = JobStatus::Ready; + }, + Err(e) => { + job_table.jobs[id].1 = JobStatus::Stuck; + println!("Job panicked, thread is ok."); + }, } } else { - let since_last_cycle = Instant::now().duration_since(job_table.begin); - let until_next_cycle = self.period.saturating_sub(since_last_cycle); - if until_next_cycle.is_zero() { - job_table.next = 0; - job_table.begin = Instant::now(); - } else { - job_table = self.condvar.wait_timeout(job_table, until_next_cycle).unwrap().0; + if let Some(wait_time) = job_table.check_end_of_cycle() { + job_table = self.condvar.wait_timeout(job_table, wait_time).unwrap().0; } } } @@ -86,8 +105,7 @@ impl Pool { let mut job_table = self.job_table.lock().unwrap(); job_table.jobs.push((job, JobStatus::Ready)); - // If the queue was empty, wake up the next worker thread to pick up - // the job. + // Notify workers if they're waiting for work. if job_table.next == job_table.jobs.len() - 1 { self.condvar.notify_one(); } @@ -134,12 +152,12 @@ mod tests { }, ).unwrap(); - let j = PrintJob { - to_print: "hello from job".to_string(), - }; - TEST_POOL.get().unwrap().queue_job(j.clone()); - TEST_POOL.get().unwrap().queue_job(j.clone()); - TEST_POOL.get().unwrap().queue_job(j.clone()); + TEST_POOL.get().unwrap().queue_job(PrintJob { + to_print: "hello from job 1".to_string(), + }); + TEST_POOL.get().unwrap().queue_job(PrintJob { + to_print: "hello from job 2".to_string(), + }); tokio::time::sleep(Duration::from_millis(100)).await; }