diff --git a/pageserver/src/tenant_jobs/compaction.rs b/pageserver/src/tenant_jobs/compaction.rs index f163288435..6369e95035 100644 --- a/pageserver/src/tenant_jobs/compaction.rs +++ b/pageserver/src/tenant_jobs/compaction.rs @@ -31,5 +31,5 @@ impl Job for CompactionJob { pub static COMPACTION_SCHEDULER: OnceCell> = OnceCell::new(); -// TODO init pool with compaction interval // TODO spawn 20 worker threads +// TODO add tasks when tenant activates diff --git a/pageserver/src/tenant_jobs/gc.rs b/pageserver/src/tenant_jobs/gc.rs index 2d53c16165..41c6e9844a 100644 --- a/pageserver/src/tenant_jobs/gc.rs +++ b/pageserver/src/tenant_jobs/gc.rs @@ -15,6 +15,7 @@ impl Job for GcJob { fn run(&self) -> Result<(), Self::ErrorType> { if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) { // TODO Maybe record this as "didn't run"? + // TODO Maybe unschedule? return Ok(()); } @@ -30,5 +31,5 @@ impl Job for GcJob { pub static GC_POOL: OnceCell> = OnceCell::new(); -// TODO init gc pool with gc interval // TODO spawn 20 worker threads +// TODO add tasks when tenant activates diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index c2184a6f80..e1c4c4f4d3 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -18,6 +18,12 @@ enum JobStatus where J::ErrorType: Debug { Stuck(JobError), } +#[derive(Debug)] +struct JobEntry where J::ErrorType: Debug { + period: Duration, + status: JobStatus, +} + #[derive(Debug)] struct Deadline where J::ErrorType: Debug { start_by: Instant, @@ -46,7 +52,7 @@ impl Eq for Deadline where J::ErrorType: Debug { } #[derive(Debug)] struct JobStatusTable where J::ErrorType: Debug { - status: HashMap>, + status: HashMap>, queue: BinaryHeap>, } @@ -62,7 +68,7 @@ impl JobStatusTable where J::ErrorType: Debug { fn set_status(&mut self, job: &J, status: JobStatus) { let s = self.status.get_mut(job).expect("status not found"); - *s = status; + (*s).status = status; } } @@ -76,7 +82,7 @@ impl Pool where J::ErrorType: Debug { fn new() -> Self { Pool { job_table: Mutex::new(JobStatusTable:: { - status: HashMap::>::new(), + status: HashMap::>::new(), queue: BinaryHeap::>::new(), }), condvar: Condvar::new(), @@ -100,9 +106,10 @@ impl Pool where J::ErrorType: Debug { match result { Ok(Ok(())) => { job_table.set_status(&job, JobStatus::Ready); + let period = job_table.status.get(&job).unwrap().period; job_table.queue.push(Deadline { job: job.clone(), - start_by: Instant::now().add(Duration::from_millis(10)), + start_by: Instant::now().add(period), }) }, Ok(Err(e)) => { @@ -128,10 +135,13 @@ impl Pool where J::ErrorType: Debug { } } - fn queue_job(&self, job: J) { + fn queue_job(&self, job: J, period: Duration) { // Add the job to the back of the queue let mut job_table = self.job_table.lock().unwrap(); - job_table.status.insert(job.clone(), JobStatus::Ready); + job_table.status.insert(job.clone(), JobEntry { + period, + status: JobStatus::Ready, + }); job_table.queue.push(Deadline { job: job.clone(), start_by: Instant::now(), @@ -197,36 +207,7 @@ mod tests { TEST_POOL.get().unwrap().queue_job(PrintJob { to_print: "hello from job".to_string(), - }); - - tokio::time::sleep(Duration::from_millis(100)).await; - } - - #[tokio::test] - async fn pool_panic() { - TEST_POOL.set(Pool::::new()).unwrap(); - - thread_mgr::spawn( - ThreadKind::GarbageCollector, // change this - None, - None, - "test_worker_1", - true, - move || { - TEST_POOL.get().unwrap().worker_main() - }, - ).unwrap(); - - let j = PrintJob { - to_print: "hello from job".to_string(), - }; - let panic = PrintJob { - to_print: "pls panic".to_string(), - }; - - TEST_POOL.get().unwrap().queue_job(panic.clone()); - TEST_POOL.get().unwrap().queue_job(j.clone()); - TEST_POOL.get().unwrap().queue_job(j.clone()); + }, Duration::from_millis(10)); tokio::time::sleep(Duration::from_millis(100)).await; }