diff --git a/pageserver/src/jobs/scheduler.rs b/pageserver/src/jobs/scheduler.rs index 0eda7fd4b3..c0b5a65cba 100644 --- a/pageserver/src/jobs/scheduler.rs +++ b/pageserver/src/jobs/scheduler.rs @@ -5,7 +5,10 @@ use crate::thread_mgr::shutdown_watcher; use tokio::sync::mpsc::{Receiver, Sender, channel}; use tokio::time::sleep; -use super::worker::{Job, Worker, Report}; +use super::worker::{Job, Worker, Report, Work}; + +// TODO spawn a tokio task for each tenant +// x) Why not use the simpler worker implementation then? #[derive(Debug)] pub struct Sched { @@ -13,22 +16,18 @@ pub struct Sched { workers: Vec>, /// Queued due jobs - jobs: VecDeque, + work_queue: VecDeque>, /// Channel for registering due jobs - pub send_work: Sender, // TODO should job specify report destination? - recv_work: Receiver, + pub send_work: Sender>, + recv_work: Receiver>, /// Channel for enlisting idle workers recv_worker: Receiver>, - - /// Channel where workers report results - recv_report: Receiver>, } pub struct Spawner { send_worker: Sender>, - send_report: Sender>, } impl Spawner { @@ -36,7 +35,6 @@ impl Spawner { use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}}; let enlist = self.send_worker.clone(); - let report = self.send_report.clone(); thread_mgr::spawn( ThreadKind::GcWorker, None, @@ -44,7 +42,7 @@ impl Spawner { "gc_worker_1", true, move || { - run_worker(enlist, report) + run_worker(enlist) }, ).unwrap(); } @@ -53,63 +51,41 @@ impl Spawner { impl Sched { pub fn new() -> (Sched, Spawner) { let worker = channel::>(100); - let work = channel::(100); - let report = channel::>(100); + let work = channel::>(100); let sched = Sched { workers: vec![], - jobs: VecDeque::new(), + work_queue: VecDeque::new(), recv_worker: worker.1, send_work: work.0, recv_work: work.1, - recv_report: report.1, }; let spawner = Spawner { send_worker: worker.0, - send_report: report.0, }; (sched, spawner) } - - pub async fn handle_job(&mut self, job: J) { + pub async fn handle_work(&mut self, work: Work) { // Assign to a worker if any are availabe - while let Some(w) = self.workers.pop() { - if let Ok(()) = w.0.send(job.clone()).await { + while let Some(worker) = self.workers.pop() { + if let Ok(()) = worker.0.send(work.clone()).await { return; } } - self.jobs.push_back(job); + self.work_queue.push_back(work); } pub async fn handle_worker(&mut self, worker: Worker) { // Assign jobs if any are queued - if let Some(j) = self.jobs.pop_front() { + if let Some(j) = self.work_queue.pop_front() { worker.0.send(j).await.ok(); } else { self.workers.push(worker); } } - pub async fn handle_report(&mut self, report: Report) { - // Reschedule job to run again - let send_work = self.send_work.clone(); - let job = report.for_job; - match report.result { - Ok(()) => { - tokio::spawn(async move { - sleep(Duration::from_millis(10)).await; - send_work.send(job).await.unwrap(); - }); - }, - Err(e) => { - // TODO mark chore as blocked - println!("task panicked"); - } - } - } - pub fn run(mut self) -> anyhow::Result<()> { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -124,14 +100,10 @@ impl Sched { let worker = worker.expect("worker channel closed"); self.handle_worker(worker).await; }, - job = self.recv_work.recv() => { - let job = job.expect("job channel closed"); - self.handle_job(job).await; + work = self.recv_work.recv() => { + let work = work.expect("job channel closed"); + self.handle_work(work).await; }, - report = self.recv_report.recv() => { - let report = report.expect("report channel closed"); - self.handle_report(report).await; - } } } }); @@ -142,7 +114,7 @@ impl Sched { #[cfg(test)] mod tests { - use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}}; + use crate::thread_mgr::{self, ThreadKind}; use super::*; #[derive(Debug, Clone, Eq, PartialEq)] @@ -177,10 +149,14 @@ mod tests { spawner.spawn_worker(); // Send a job - let j = PrintJob { - to_print: "hello from job".to_string(), + let when_done = channel::>(100); + let work = Work { + job: PrintJob { + to_print: "hello from job".to_string(), + }, + when_done: when_done.0, }; - send_work.send(j.clone()).await.unwrap(); + send_work.send(work.clone()).await.unwrap(); sleep(Duration::from_millis(100)).await; diff --git a/pageserver/src/jobs/worker.rs b/pageserver/src/jobs/worker.rs index 7ec3aa45a2..01de4b950d 100644 --- a/pageserver/src/jobs/worker.rs +++ b/pageserver/src/jobs/worker.rs @@ -12,13 +12,14 @@ pub trait Job: std::fmt::Debug + Send + 'static + Clone { } // TODO make scheduler an async fn, leave rescheduling to chore_mgr +#[derive(Debug, Clone)] pub struct Work { pub job: J, pub when_done: Sender>, } -#[derive(Debug)] -pub struct Worker(pub Sender); +#[derive(Debug, Clone)] +pub struct Worker(pub Sender>); #[derive(Debug)] pub struct Report { @@ -26,26 +27,26 @@ pub struct Report { pub result: Result<(), Box> } -pub fn run_worker(enlist: Sender>, report: Sender>) -> anyhow::Result<()> { +pub fn run_worker(enlist: Sender>) -> anyhow::Result<()> { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; runtime.block_on(async { loop { - let (send_work, mut get_work) = channel::(100); + let (send_work, mut get_work) = channel::>(100); enlist.send(Worker(send_work)).await.unwrap(); let shutdown_watcher = shutdown_watcher(); tokio::select! { _ = shutdown_watcher => break, - j = get_work.recv() => { - if let Some(job) = j { + w = get_work.recv() => { + if let Some(work) = w { let result = catch_unwind(AssertUnwindSafe(|| { - job.run(); + work.job.run(); })); - report.send(Report { - for_job: job, + work.when_done.send(Report { + for_job: work.job, result: result, }).await.unwrap(); } else { @@ -80,7 +81,6 @@ mod tests { #[tokio::test] async fn worker_1() { let mut worker = channel::>(100); - let mut result = channel::>(100); thread_mgr::spawn( ThreadKind::GcWorker, @@ -89,19 +89,23 @@ mod tests { "gc_worker_1", true, move || { - run_worker(worker.0, result.0) + run_worker(worker.0) }, ).unwrap(); - let j = PrintJob { - to_print: "hello from job".to_string(), + let mut when_done = channel::>(100); + let work = Work { + job: PrintJob { + to_print: "hello from job".to_string(), + }, + when_done: when_done.0, }; - let w = worker.1.recv().await.unwrap(); - w.0.send(j.clone()).await.unwrap(); + let worker = worker.1.recv().await.unwrap(); + worker.0.send(work.clone()).await.unwrap(); println!("waiting for result"); - let report = result.1.recv().await.unwrap(); - assert_eq!(j, report.for_job); + let report = when_done.1.recv().await.unwrap(); + assert_eq!(work.job, report.for_job); println!("got result"); thread_mgr::shutdown_threads(None, None, None);