From 3975417cae3aa75cb589f9d4b0ca7c2d4f98e63c Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Tue, 24 May 2022 14:12:53 -0400 Subject: [PATCH] Add metadata --- pageserver/src/tenant_jobs/worker.rs | 37 +++++++++++++++++++--------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index 24e1050cb0..ecf0168d24 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -13,8 +13,13 @@ enum JobError { #[derive(Debug)] enum JobStatus where J::ErrorType: Debug { - Ready, - Running(Instant), // TODO add worker id + Ready { + scheduled_for: Instant, + }, + Running { + worker_name: String, + started_at: Instant, + }, Stuck(JobError), } @@ -47,7 +52,10 @@ impl Eq for Deadline where J::ErrorType: Debug { } #[derive(Debug)] struct JobStatusTable where J::ErrorType: Debug { + /// Complete summary of current state status: HashMap>, + + /// Index over status for finding the next scheduled job queue: BinaryHeap>, } @@ -84,11 +92,14 @@ impl Pool where J::ErrorType: Debug { } } - fn worker_main(&self) -> anyhow::Result<()> { + fn worker_main(&self, worker_name: String) -> anyhow::Result<()> { let mut job_table = self.job_table.lock().unwrap(); loop { if let Some(Deadline {job, ..}) = job_table.pop_due() { - job_table.set_status(&job, JobStatus::Running(Instant::now())); + job_table.set_status(&job, JobStatus::Running { + worker_name, + started_at: Instant::now(), + }); // Run job without holding lock drop(job_table); @@ -100,14 +111,16 @@ impl Pool where J::ErrorType: Debug { // Update job status match result { Ok(Ok(Some(reschedule_for))) => { - job_table.set_status(&job, JobStatus::Ready); + job_table.set_status(&job, JobStatus::Ready { + scheduled_for: reschedule_for, + }); job_table.queue.push(Deadline { job: job.clone(), start_by: reschedule_for, }) }, Ok(Ok(None)) => { - // TODO remove from job table + job_table.status.remove(&job); }, Ok(Err(e)) => { job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e))); @@ -133,12 +146,14 @@ impl Pool where J::ErrorType: Debug { } fn queue_job(&self, job: J) { - // Add the job to the back of the queue let mut job_table = self.job_table.lock().unwrap(); - job_table.status.insert(job.clone(), JobStatus::Ready); + let scheduled_for = Instant::now(); + job_table.status.insert(job.clone(), JobStatus::Ready { + scheduled_for, + }); job_table.queue.push(Deadline { job: job.clone(), - start_by: Instant::now(), + start_by: scheduled_for, }); self.condvar.notify_all(); @@ -184,7 +199,7 @@ mod tests { "test_worker_1", true, move || { - TEST_POOL.get().unwrap().worker_main() + TEST_POOL.get().unwrap().worker_main("test_worker_1".into()) }, ).unwrap(); @@ -195,7 +210,7 @@ mod tests { "test_worker_2", true, move || { - TEST_POOL.get().unwrap().worker_main() + TEST_POOL.get().unwrap().worker_main("test_worker_2".into()) }, ).unwrap();