From 31dc9e6abd891c091b2fe7866de00778a083fa4c Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Tue, 24 May 2022 10:50:56 -0400 Subject: [PATCH] Use queue --- pageserver/src/tenant_jobs/compaction.rs | 2 +- pageserver/src/tenant_jobs/gc.rs | 2 +- pageserver/src/tenant_jobs/worker.rs | 124 ++++++++++++++--------- 3 files changed, 78 insertions(+), 50 deletions(-) diff --git a/pageserver/src/tenant_jobs/compaction.rs b/pageserver/src/tenant_jobs/compaction.rs index 5df7540f87..f163288435 100644 --- a/pageserver/src/tenant_jobs/compaction.rs +++ b/pageserver/src/tenant_jobs/compaction.rs @@ -7,7 +7,7 @@ use crate::tenant_mgr::{self, TenantState}; use super::worker::{Job, Pool}; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct CompactionJob { pub tenant: ZTenantId, } diff --git a/pageserver/src/tenant_jobs/gc.rs b/pageserver/src/tenant_jobs/gc.rs index 0ecfdd90f9..2d53c16165 100644 --- a/pageserver/src/tenant_jobs/gc.rs +++ b/pageserver/src/tenant_jobs/gc.rs @@ -4,7 +4,7 @@ use crate::{repository::Repository, tenant_mgr::{self, TenantState}}; use super::worker::{Job, Pool}; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct GcJob { pub tenant: ZTenantId, } diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index 94af61663e..c2184a6f80 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -1,6 +1,6 @@ -use std::{any::Any, fmt::Debug, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}}; +use std::{any::Any, collections::{BinaryHeap, HashMap}, fmt::Debug, hash::Hash, ops::Add, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}}; -pub trait Job: std::fmt::Debug + Send + Clone + 'static { +pub trait Job: std::fmt::Debug + Send + Clone + PartialOrd + Ord + Hash + 'static { type ErrorType; fn run(&self) -> Result<(), Self::ErrorType>; } @@ -18,42 +18,51 @@ enum JobStatus where J::ErrorType: Debug { Stuck(JobError), } +#[derive(Debug)] +struct Deadline where J::ErrorType: Debug { + start_by: Instant, + job: J, +} + +impl PartialOrd for Deadline where J::ErrorType: Debug { + fn partial_cmp(&self, other: &Self) -> Option { + other.start_by.partial_cmp(&self.start_by) + } +} + +impl Ord for Deadline where J::ErrorType: Debug { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + other.start_by.cmp(&self.start_by) + } +} + +impl PartialEq for Deadline where J::ErrorType: Debug { + fn eq(&self, other: &Self) -> bool { + self.start_by == other.start_by + } +} + +impl Eq for Deadline where J::ErrorType: Debug { } + #[derive(Debug)] struct JobStatusTable where J::ErrorType: Debug { - // TODO this vec is no good. Too much index arithmetic. - jobs: Vec<(J, JobStatus)>, - next: usize, - begin: Instant, - period: Duration, + status: HashMap>, + queue: BinaryHeap>, } impl JobStatusTable where J::ErrorType: Debug { - fn next(&mut self) -> Option<(usize, J)> { - while self.next < self.jobs.len() { - let curr = self.next; - self.next += 1; - - match self.jobs[curr].1 { - JobStatus::Ready => { - self.jobs[curr].1 = JobStatus::Running(Instant::now()); - return Some((curr, self.jobs[curr].0.clone())) - } - JobStatus::Running(_) => println!("Job already running, skipping this round"), - JobStatus::Stuck(_) => println!("Job stuck, skipping"), + fn pop_due(&mut self) -> Option> { + if let Some(deadline) = self.queue.peek() { + if Instant::now() > deadline.start_by { + return self.queue.pop(); } } None } - fn check_end_of_cycle(&mut self) -> Option { - let until_next = self.period.saturating_sub(Instant::now().duration_since(self.begin)); - if until_next.is_zero() { - self.next = 0; - self.begin = Instant::now(); - None - } else { - Some(until_next) - } + fn set_status(&mut self, job: &J, status: JobStatus) { + let s = self.status.get_mut(job).expect("status not found"); + *s = status; } } @@ -67,10 +76,8 @@ impl Pool where J::ErrorType: Debug { fn new() -> Self { Pool { job_table: Mutex::new(JobStatusTable:: { - jobs: vec![], - next: 0, - begin: Instant::now(), - period: Duration::from_millis(10), + status: HashMap::>::new(), + queue: BinaryHeap::>::new(), }), condvar: Condvar::new(), } @@ -79,7 +86,9 @@ impl Pool where J::ErrorType: Debug { fn worker_main(&self) -> anyhow::Result<()> { let mut job_table = self.job_table.lock().unwrap(); loop { - if let Some((id, job)) = job_table.next() { + if let Some(Deadline {job, ..}) = job_table.pop_due() { + job_table.set_status(&job, JobStatus::Running(Instant::now())); + // Run job without holding lock drop(job_table); let result = panic::catch_unwind(AssertUnwindSafe(|| { @@ -90,20 +99,30 @@ impl Pool where J::ErrorType: Debug { // Update job status match result { Ok(Ok(())) => { - job_table.jobs[id].1 = JobStatus::Ready; + job_table.set_status(&job, JobStatus::Ready); + job_table.queue.push(Deadline { + job: job.clone(), + start_by: Instant::now().add(Duration::from_millis(10)), + }) }, Ok(Err(e)) => { - job_table.jobs[id].1 = JobStatus::Stuck(JobError::Error(e)); + job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e))); println!("Job errored, thread is ok."); }, Err(e) => { - job_table.jobs[id].1 = JobStatus::Stuck(JobError::Panic(e)); + job_table.set_status(&job, JobStatus::Stuck(JobError::Panic(e))); println!("Job panicked, thread is ok."); }, } } else { - if let Some(wait_time) = job_table.check_end_of_cycle() { - job_table = self.condvar.wait_timeout(job_table, wait_time).unwrap().0; + match job_table.queue.peek() { + Some(deadline) => { + let wait_time = deadline.start_by.duration_since(Instant::now()); + job_table = self.condvar.wait_timeout(job_table, wait_time).unwrap().0; + } + None => { + job_table = self.condvar.wait(job_table).unwrap(); + } } } } @@ -112,12 +131,13 @@ impl Pool where J::ErrorType: Debug { fn queue_job(&self, job: J) { // Add the job to the back of the queue let mut job_table = self.job_table.lock().unwrap(); - job_table.jobs.push((job, JobStatus::Ready)); + job_table.status.insert(job.clone(), JobStatus::Ready); + job_table.queue.push(Deadline { + job: job.clone(), + start_by: Instant::now(), + }); - // Notify workers if they're waiting for work. - if job_table.next == job_table.jobs.len() - 1 { - self.condvar.notify_one(); - } + self.condvar.notify_all(); } } @@ -130,7 +150,7 @@ mod tests { use crate::thread_mgr::{self, ThreadKind}; use super::*; - #[derive(Debug, Clone, Eq, PartialEq)] + #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] struct PrintJob { to_print: String } @@ -164,11 +184,19 @@ mod tests { }, ).unwrap(); + thread_mgr::spawn( + ThreadKind::GarbageCollector, // change this + None, + None, + "test_worker_2", + true, + move || { + TEST_POOL.get().unwrap().worker_main() + }, + ).unwrap(); + TEST_POOL.get().unwrap().queue_job(PrintJob { - to_print: "hello from job 1".to_string(), - }); - TEST_POOL.get().unwrap().queue_job(PrintJob { - to_print: "hello from job 2".to_string(), + to_print: "hello from job".to_string(), }); tokio::time::sleep(Duration::from_millis(100)).await;