From d84c4907a95eabdd99648d21c25da2505d35ab38 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Mon, 23 May 2022 15:11:38 -0400 Subject: [PATCH] Add fair scheduler process --- pageserver/src/jobs/scheduler.rs | 177 ++++++++++++++++++++----------- pageserver/src/jobs/worker.rs | 11 +- 2 files changed, 117 insertions(+), 71 deletions(-) diff --git a/pageserver/src/jobs/scheduler.rs b/pageserver/src/jobs/scheduler.rs index 0eda7fd4b3..67fd8b6fbc 100644 --- a/pageserver/src/jobs/scheduler.rs +++ b/pageserver/src/jobs/scheduler.rs @@ -1,31 +1,14 @@ -use std::collections::VecDeque; -use std::time::Duration; +use std::collections::{HashMap, VecDeque}; +use std::ops::Add; +use tokio::time::{Duration, Instant}; use crate::thread_mgr::shutdown_watcher; +use tokio::sync::Mutex; use tokio::sync::mpsc::{Receiver, Sender, channel}; -use tokio::time::sleep; +use tokio::time::{sleep, sleep_until}; use super::worker::{Job, Worker, Report}; -#[derive(Debug)] -pub struct Sched { - /// Idle workers - workers: Vec>, - - /// Queued due jobs - jobs: VecDeque, - - /// Channel for registering due jobs - pub send_work: Sender, // TODO should job specify report destination? - recv_work: Receiver, - - /// Channel for enlisting idle workers - recv_worker: Receiver>, - - /// Channel where workers report results - recv_report: Receiver>, -} - pub struct Spawner { send_worker: Sender>, send_report: Sender>, @@ -50,26 +33,104 @@ impl Spawner { } } -impl Sched { - pub fn new() -> (Sched, Spawner) { +#[derive(Debug)] +pub enum Status { + Scheduled, + Running, + Stuck, +} + +pub struct Scheduler { + send_work: Sender, + recv_report: Receiver>, + + pub period: Duration, + pub chores: Mutex>, +} + +impl Scheduler { + pub async fn handle_report(&mut self, report: Report) { + let job = report.for_job; + match report.result { + Ok(()) => { + // Reschedule job to run again + if let Some(status) = self.chores.lock().await.get_mut(&job) { + *status = Status::Scheduled; + } + }, + Err(e) => { + // Remember error that got job stuck + println!("task panicked"); + if let Some(status) = self.chores.lock().await.get_mut(&job) { + *status = Status::Stuck; + } + } + } + } + + pub fn run(mut self) -> anyhow::Result<()> { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + runtime.block_on(async { + let mut next_iteration = Instant::now(); + loop { + tokio::select! { + _ = shutdown_watcher() => break, + _ = sleep_until(next_iteration) => { + next_iteration = Instant::now().add(self.period); + for (job, status) in self.chores.lock().await.iter_mut() { + if matches!(status, Status::Scheduled) { + self.send_work.send(job.clone()).await.unwrap(); + *status = Status::Running; + } + } + } + report = self.recv_report.recv() => { + let report = report.expect("report channel closed"); + self.handle_report(report).await; + } + } + } + }); + + Ok(()) + } +} + +#[derive(Debug)] +pub struct Board { + workers: Vec>, + jobs: VecDeque, + recv_work: Receiver, + recv_worker: Receiver>, +} + +impl Board { + pub fn new() -> (Board, Spawner, Scheduler) { let worker = channel::>(100); let work = channel::(100); let report = channel::>(100); - let sched = Sched { + let board = Board { workers: vec![], jobs: VecDeque::new(), recv_worker: worker.1, - send_work: work.0, recv_work: work.1, - recv_report: report.1, }; let spawner = Spawner { send_worker: worker.0, send_report: report.0, }; + let scheduler = Scheduler { + send_work: work.0, + recv_report: report.1, + period: Duration::from_millis(10), + chores: Mutex::new(HashMap::new()), + }; - (sched, spawner) + (board, spawner, scheduler) } @@ -92,24 +153,6 @@ impl Sched { } } - pub async fn handle_report(&mut self, report: Report) { - // Reschedule job to run again - let send_work = self.send_work.clone(); - let job = report.for_job; - match report.result { - Ok(()) => { - tokio::spawn(async move { - sleep(Duration::from_millis(10)).await; - send_work.send(job).await.unwrap(); - }); - }, - Err(e) => { - // TODO mark chore as blocked - println!("task panicked"); - } - } - } - pub fn run(mut self) -> anyhow::Result<()> { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -117,9 +160,8 @@ impl Sched { runtime.block_on(async { loop { - let shutdown_watcher = shutdown_watcher(); tokio::select! { - _ = shutdown_watcher => break, + _ = shutdown_watcher() => break, worker = self.recv_worker.recv() => { let worker = worker.expect("worker channel closed"); self.handle_worker(worker).await; @@ -128,10 +170,6 @@ impl Sched { let job = job.expect("job channel closed"); self.handle_job(job).await; }, - report = self.recv_report.recv() => { - let report = report.expect("report channel closed"); - self.handle_report(report).await; - } } } }); @@ -142,10 +180,10 @@ impl Sched { #[cfg(test)] mod tests { - use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}}; + use crate::thread_mgr::{self, ThreadKind}; use super::*; - #[derive(Debug, Clone, Eq, PartialEq)] + #[derive(Debug, Clone, Eq, PartialEq, Hash)] struct PrintJob { to_print: String } @@ -158,9 +196,25 @@ mod tests { #[tokio::test] async fn sched_1() { - let (sched, spawner) = Sched::::new(); + let (board, spawner, scheduler) = Board::::new(); - let send_work = sched.send_work.clone(); + // Schedule recurring job + let j = PrintJob { + to_print: "hello from job".to_string(), + }; + scheduler.chores.lock().await.insert(j, Status::Scheduled); + + // Spawn board + thread_mgr::spawn( + ThreadKind::GcScheduler, + None, + None, + "gc_scheduler", + true, + move || { + board.run() + }, + ).unwrap(); // Spawn scheduler thread_mgr::spawn( @@ -170,20 +224,17 @@ mod tests { "gc_scheduler", true, move || { - sched.run() + scheduler.run() }, ).unwrap(); + // Spawn worker spawner.spawn_worker(); - // Send a job - let j = PrintJob { - to_print: "hello from job".to_string(), - }; - send_work.send(j.clone()).await.unwrap(); - + // Wait for job to run a few times sleep(Duration::from_millis(100)).await; + // Cleanup thread_mgr::shutdown_threads(None, None, None); } } diff --git a/pageserver/src/jobs/worker.rs b/pageserver/src/jobs/worker.rs index 7ec3aa45a2..5064ebdcb4 100644 --- a/pageserver/src/jobs/worker.rs +++ b/pageserver/src/jobs/worker.rs @@ -4,19 +4,14 @@ use crate::thread_mgr::shutdown_watcher; use tokio::sync::mpsc::{Sender, channel}; use std::any::Any; +use std::hash::Hash; use std::panic::AssertUnwindSafe; use std::panic::catch_unwind; -pub trait Job: std::fmt::Debug + Send + 'static + Clone { +pub trait Job: std::fmt::Debug + Send + 'static + Clone + PartialEq + Eq + Hash { fn run(&self); } -// TODO make scheduler an async fn, leave rescheduling to chore_mgr -pub struct Work { - pub job: J, - pub when_done: Sender>, -} - #[derive(Debug)] pub struct Worker(pub Sender); @@ -66,7 +61,7 @@ mod tests { use crate::thread_mgr::{self, ThreadKind}; use super::*; - #[derive(Debug, Clone, Eq, PartialEq)] + #[derive(Debug, Clone, Eq, PartialEq, Hash)] struct PrintJob { to_print: String }