From 304641afe5a3d6bcdc6c57cdd7a50a34c47c8ac8 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Mon, 23 May 2022 00:42:14 -0400 Subject: [PATCH] Refactor --- pageserver/src/jobs/scheduler.rs | 167 ++++++++++++++++++++----------- pageserver/src/jobs/worker.rs | 8 +- 2 files changed, 115 insertions(+), 60 deletions(-) diff --git a/pageserver/src/jobs/scheduler.rs b/pageserver/src/jobs/scheduler.rs index 67e6e0b4d3..0eda7fd4b3 100644 --- a/pageserver/src/jobs/scheduler.rs +++ b/pageserver/src/jobs/scheduler.rs @@ -7,20 +7,106 @@ use tokio::time::sleep; use super::worker::{Job, Worker, Report}; - #[derive(Debug)] pub struct Sched { - pub worker: (Sender>, Receiver>), - pub work: (Sender, Receiver), - pub report: (Sender>, Receiver>), + /// Idle workers + workers: Vec>, + + /// Queued due jobs + jobs: VecDeque, + + /// Channel for registering due jobs + pub send_work: Sender, // TODO should job specify report destination? + recv_work: Receiver, + + /// Channel for enlisting idle workers + recv_worker: Receiver>, + + /// Channel where workers report results + recv_report: Receiver>, +} + +pub struct Spawner { + send_worker: Sender>, + send_report: Sender>, +} + +impl Spawner { + pub fn spawn_worker(&self) { + use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}}; + + let enlist = self.send_worker.clone(); + let report = self.send_report.clone(); + thread_mgr::spawn( + ThreadKind::GcWorker, + None, + None, + "gc_worker_1", + true, + move || { + run_worker(enlist, report) + }, + ).unwrap(); + } } impl Sched { - pub fn new() -> Self { - Sched { - worker: channel::>(100), - work: channel::(100), - report: channel::>(100), + pub fn new() -> (Sched, Spawner) { + let worker = channel::>(100); + let work = channel::(100); + let report = channel::>(100); + + let sched = Sched { + workers: vec![], + jobs: VecDeque::new(), + recv_worker: worker.1, + send_work: work.0, + recv_work: work.1, + recv_report: report.1, + }; + let spawner = Spawner { + send_worker: worker.0, + send_report: report.0, + }; + + (sched, spawner) + } + + + pub async fn handle_job(&mut self, job: J) { + // Assign to a worker if any are availabe + while let Some(w) = self.workers.pop() { + if let Ok(()) = w.0.send(job.clone()).await { + return; + } + } + self.jobs.push_back(job); + } + + pub async fn handle_worker(&mut self, worker: Worker) { + // Assign jobs if any are queued + if let Some(j) = self.jobs.pop_front() { + worker.0.send(j).await.ok(); + } else { + self.workers.push(worker); + } + } + + pub async fn handle_report(&mut self, report: Report) { + // Reschedule job to run again + let send_work = self.send_work.clone(); + let job = report.for_job; + match report.result { + Ok(()) => { + tokio::spawn(async move { + sleep(Duration::from_millis(10)).await; + send_work.send(job).await.unwrap(); + }); + }, + Err(e) => { + // TODO mark chore as blocked + println!("task panicked"); + } } } @@ -30,44 +116,21 @@ impl Sched { .build()?; runtime.block_on(async { - let mut workers = Vec::>::new(); - let mut jobs = VecDeque::::new(); loop { let shutdown_watcher = shutdown_watcher(); tokio::select! { _ = shutdown_watcher => break, - worker = self.worker.1.recv() => { - // Assign to next job in queue, if nonempty - if let Some(j) = jobs.pop_front() { - worker.unwrap().0.send(j).await.unwrap(); - } else { - workers.push(worker.unwrap()); - } + worker = self.recv_worker.recv() => { + let worker = worker.expect("worker channel closed"); + self.handle_worker(worker).await; }, - job = self.work.1.recv() => { - // Assign to first worker in pool, if nonempty - if let Some(w) = workers.pop() { - w.0.send(job.unwrap()).await.unwrap(); - } else { - jobs.push_back(job.unwrap()); - } + job = self.recv_work.recv() => { + let job = job.expect("job channel closed"); + self.handle_job(job).await; }, - report = self.report.1.recv() => { - // Reschedule job to run again - let send_work = self.work.0.clone(); - let report = report.unwrap(); - let job = report.for_job; - match report.result { - Ok(()) => { - tokio::spawn(async move { - sleep(Duration::from_millis(10)).await; - send_work.send(job).await.unwrap(); - }); - }, - Err(e) => { - println!("task panicked"); - } - } + report = self.recv_report.recv() => { + let report = report.expect("report channel closed"); + self.handle_report(report).await; } } } @@ -95,13 +158,9 @@ mod tests { #[tokio::test] async fn sched_1() { - let s = Sched::::new(); + let (sched, spawner) = Sched::::new(); - let send_work = s.work.0.clone(); - - // Used for workers - let enlist = s.worker.0.clone(); - let report = s.report.0.clone(); + let send_work = sched.send_work.clone(); // Spawn scheduler thread_mgr::spawn( @@ -111,21 +170,11 @@ mod tests { "gc_scheduler", true, move || { - s.run() + sched.run() }, ).unwrap(); - // Spawn worker 1 - thread_mgr::spawn( - ThreadKind::GcWorker, - None, - None, - "gc_worker_1", - true, - move || { - run_worker(enlist, report) - }, - ).unwrap(); + spawner.spawn_worker(); // Send a job let j = PrintJob { diff --git a/pageserver/src/jobs/worker.rs b/pageserver/src/jobs/worker.rs index e0f56fa931..7ec3aa45a2 100644 --- a/pageserver/src/jobs/worker.rs +++ b/pageserver/src/jobs/worker.rs @@ -7,10 +7,16 @@ use std::any::Any; use std::panic::AssertUnwindSafe; use std::panic::catch_unwind; -pub trait Job: std::fmt::Debug + Send + 'static { +pub trait Job: std::fmt::Debug + Send + 'static + Clone { fn run(&self); } +// TODO make scheduler an async fn, leave rescheduling to chore_mgr +pub struct Work { + pub job: J, + pub when_done: Sender>, +} + #[derive(Debug)] pub struct Worker(pub Sender);