diff --git a/pageserver/src/jobs/mod.rs b/pageserver/src/jobs/mod.rs new file mode 100644 index 0000000000..be2b292734 --- /dev/null +++ b/pageserver/src/jobs/mod.rs @@ -0,0 +1,2 @@ +pub mod scheduler; +pub mod worker; diff --git a/pageserver/src/jobs/scheduler.rs b/pageserver/src/jobs/scheduler.rs new file mode 100644 index 0000000000..8e2ec7d8f5 --- /dev/null +++ b/pageserver/src/jobs/scheduler.rs @@ -0,0 +1,115 @@ +use std::time::Duration; + +use crate::thread_mgr::shutdown_watcher; +use tokio::sync::mpsc::{Receiver, Sender, channel}; +use tokio::time::sleep; + +use super::worker::{Job, Worker, Report}; + +#[derive(Debug)] +pub struct Sched { + pub worker: (Sender>, Receiver>), + pub work: (Sender, Receiver), + pub report: (Sender>, Receiver>), +} + +impl Sched { + pub fn new() -> Self { + Sched { + worker: channel::>(100), + work: channel::(100), + report: channel::>(100), + } + } + + pub fn run(mut self) -> anyhow::Result<()> { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + runtime.block_on(async { + let mut workers = Vec::>::new(); + loop { + let shutdown_watcher = shutdown_watcher(); + tokio::select! { + _ = shutdown_watcher => break, + w = self.worker.1.recv() => workers.push(w.expect("oopsie")), + j = self.work.1.recv() => { + if let Some(w) = workers.pop() { + w.0.send(j.unwrap()).await.unwrap(); + } else { + // no workers available, spawn? + } + }, + r = self.report.1.recv() => { + // TODO + } + } + } + }); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}}; + use super::*; + + #[derive(Debug, Clone, Eq, PartialEq)] + struct PrintJob { + to_print: String + } + + impl Job for PrintJob { + fn run(&self) { + println!("{}", self.to_print); + } + } + + #[tokio::test] + async fn sched_1() { + let s = Sched::::new(); + + let send_work = s.work.0.clone(); + + // Used for workers + let enlist = s.worker.0.clone(); + let report = s.report.0.clone(); + + // Spawn scheduler + thread_mgr::spawn( + ThreadKind::GcScheduler, + None, + None, + "gc_scheduler", + true, + move || { + s.run() + }, + ).unwrap(); + + // Spawn worker 1 + thread_mgr::spawn( + ThreadKind::GcWorker, + None, + None, + "gc_worker_1", + true, + move || { + run_worker(enlist, report) + }, + ).unwrap(); + + // Send a job + let j = PrintJob { + to_print: "hello from job".to_string(), + }; + send_work.send(j.clone()).await.unwrap(); + + sleep(Duration::from_millis(1000)).await; + + thread_mgr::shutdown_threads(None, None, None); + } +} diff --git a/pageserver/src/jobs/worker.rs b/pageserver/src/jobs/worker.rs new file mode 100644 index 0000000000..0dfdf9a1d9 --- /dev/null +++ b/pageserver/src/jobs/worker.rs @@ -0,0 +1,96 @@ +//! +//! Worker thread that can be used in a thread pool to process jobs. +//! +use crate::thread_mgr::shutdown_watcher; +use tokio::sync::mpsc::{Sender, channel}; + +pub trait Job: std::fmt::Debug { + fn run(&self); +} + +#[derive(Debug)] +pub struct Worker(pub Sender); + +#[derive(Debug)] +pub struct Report { + for_job: J, +} + +pub fn run_worker(enlist: Sender>, report: Sender>) -> anyhow::Result<()> { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + runtime.block_on(async { + loop { + let (send_work, mut get_work) = channel::(100); + enlist.send(Worker(send_work)).await.unwrap(); + + let shutdown_watcher = shutdown_watcher(); + tokio::select! { + _ = shutdown_watcher => break, + j = get_work.recv() => { + let job = j.unwrap(); + job.run(); + report.send(Report { + for_job: job, + }).await.unwrap(); + } + }; + } + }); + + Ok(()) +} + + +#[cfg(test)] +mod tests { + use crate::thread_mgr::{self, ThreadKind}; + use super::*; + + #[derive(Debug, Clone, Eq, PartialEq)] + struct PrintJob { + to_print: String + } + + impl Job for PrintJob { + fn run(&self) { + println!("{}", self.to_print); + } + } + + #[tokio::test] + async fn worker_1() { + let mut worker = channel::>(100); + let mut result = channel::>(100); + + thread_mgr::spawn( + ThreadKind::GcWorker, + None, + None, + "gc_worker_1", + true, + move || { + run_worker(worker.0, result.0) + }, + ).unwrap(); + + let j = PrintJob { + to_print: "hello from job".to_string(), + }; + let w = worker.1.recv().await.unwrap(); + w.0.send(j.clone()).await.unwrap(); + + println!("waiting for result"); + let report = result.1.recv().await.unwrap(); + assert_eq!(j, report.for_job); + println!("got result"); + + thread_mgr::shutdown_threads(None, None, None); + } + + #[test] + fn worker_cancellation() { + } +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 83985069ec..8bc709cbc6 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -21,6 +21,7 @@ pub mod walingest; pub mod walreceiver; pub mod walrecord; pub mod walredo; +pub mod jobs; use lazy_static::lazy_static; use tracing::info; diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index b908f220ee..2014252b3c 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -97,6 +97,12 @@ pub enum ThreadKind { // Thread that handles compaction of all timelines for a tenant. Compactor, + // Thread that schedules GC tasks + GcScheduler, + + // Thread that works on GC tasks + GcWorker, + // Thread that handles GC of a tenant GarbageCollector,