mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 00:20:37 +00:00
WIP working on simple scheduler
This commit is contained in:
2
pageserver/src/jobs/mod.rs
Normal file
2
pageserver/src/jobs/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod scheduler;
|
||||
pub mod worker;
|
||||
115
pageserver/src/jobs/scheduler.rs
Normal file
115
pageserver/src/jobs/scheduler.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::thread_mgr::shutdown_watcher;
|
||||
use tokio::sync::mpsc::{Receiver, Sender, channel};
|
||||
use tokio::time::sleep;
|
||||
|
||||
use super::worker::{Job, Worker, Report};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Sched<J: Job> {
|
||||
pub worker: (Sender<Worker<J>>, Receiver<Worker<J>>),
|
||||
pub work: (Sender<J>, Receiver<J>),
|
||||
pub report: (Sender<Report<J>>, Receiver<Report<J>>),
|
||||
}
|
||||
|
||||
impl<J: Job> Sched<J> {
|
||||
pub fn new() -> Self {
|
||||
Sched {
|
||||
worker: channel::<Worker<J>>(100),
|
||||
work: channel::<J>(100),
|
||||
report: channel::<Report<J>>(100),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(mut self) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
runtime.block_on(async {
|
||||
let mut workers = Vec::<Worker<J>>::new();
|
||||
loop {
|
||||
let shutdown_watcher = shutdown_watcher();
|
||||
tokio::select! {
|
||||
_ = shutdown_watcher => break,
|
||||
w = self.worker.1.recv() => workers.push(w.expect("oopsie")),
|
||||
j = self.work.1.recv() => {
|
||||
if let Some(w) = workers.pop() {
|
||||
w.0.send(j.unwrap()).await.unwrap();
|
||||
} else {
|
||||
// no workers available, spawn?
|
||||
}
|
||||
},
|
||||
r = self.report.1.recv() => {
|
||||
// TODO
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}};
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct PrintJob {
|
||||
to_print: String
|
||||
}
|
||||
|
||||
impl Job for PrintJob {
|
||||
fn run(&self) {
|
||||
println!("{}", self.to_print);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sched_1() {
|
||||
let s = Sched::<PrintJob>::new();
|
||||
|
||||
let send_work = s.work.0.clone();
|
||||
|
||||
// Used for workers
|
||||
let enlist = s.worker.0.clone();
|
||||
let report = s.report.0.clone();
|
||||
|
||||
// Spawn scheduler
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GcScheduler,
|
||||
None,
|
||||
None,
|
||||
"gc_scheduler",
|
||||
true,
|
||||
move || {
|
||||
s.run()
|
||||
},
|
||||
).unwrap();
|
||||
|
||||
// Spawn worker 1
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GcWorker,
|
||||
None,
|
||||
None,
|
||||
"gc_worker_1",
|
||||
true,
|
||||
move || {
|
||||
run_worker(enlist, report)
|
||||
},
|
||||
).unwrap();
|
||||
|
||||
// Send a job
|
||||
let j = PrintJob {
|
||||
to_print: "hello from job".to_string(),
|
||||
};
|
||||
send_work.send(j.clone()).await.unwrap();
|
||||
|
||||
sleep(Duration::from_millis(1000)).await;
|
||||
|
||||
thread_mgr::shutdown_threads(None, None, None);
|
||||
}
|
||||
}
|
||||
96
pageserver/src/jobs/worker.rs
Normal file
96
pageserver/src/jobs/worker.rs
Normal file
@@ -0,0 +1,96 @@
|
||||
//!
|
||||
//! Worker thread that can be used in a thread pool to process jobs.
|
||||
//!
|
||||
use crate::thread_mgr::shutdown_watcher;
|
||||
use tokio::sync::mpsc::{Sender, channel};
|
||||
|
||||
pub trait Job: std::fmt::Debug {
|
||||
fn run(&self);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Worker<J: Job>(pub Sender<J>);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Report<J: Job> {
|
||||
for_job: J,
|
||||
}
|
||||
|
||||
pub fn run_worker<J: Job>(enlist: Sender<Worker<J>>, report: Sender<Report<J>>) -> anyhow::Result<()> {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
runtime.block_on(async {
|
||||
loop {
|
||||
let (send_work, mut get_work) = channel::<J>(100);
|
||||
enlist.send(Worker(send_work)).await.unwrap();
|
||||
|
||||
let shutdown_watcher = shutdown_watcher();
|
||||
tokio::select! {
|
||||
_ = shutdown_watcher => break,
|
||||
j = get_work.recv() => {
|
||||
let job = j.unwrap();
|
||||
job.run();
|
||||
report.send(Report {
|
||||
for_job: job,
|
||||
}).await.unwrap();
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::thread_mgr::{self, ThreadKind};
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
struct PrintJob {
|
||||
to_print: String
|
||||
}
|
||||
|
||||
impl Job for PrintJob {
|
||||
fn run(&self) {
|
||||
println!("{}", self.to_print);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn worker_1() {
|
||||
let mut worker = channel::<Worker<PrintJob>>(100);
|
||||
let mut result = channel::<Report<PrintJob>>(100);
|
||||
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GcWorker,
|
||||
None,
|
||||
None,
|
||||
"gc_worker_1",
|
||||
true,
|
||||
move || {
|
||||
run_worker(worker.0, result.0)
|
||||
},
|
||||
).unwrap();
|
||||
|
||||
let j = PrintJob {
|
||||
to_print: "hello from job".to_string(),
|
||||
};
|
||||
let w = worker.1.recv().await.unwrap();
|
||||
w.0.send(j.clone()).await.unwrap();
|
||||
|
||||
println!("waiting for result");
|
||||
let report = result.1.recv().await.unwrap();
|
||||
assert_eq!(j, report.for_job);
|
||||
println!("got result");
|
||||
|
||||
thread_mgr::shutdown_threads(None, None, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn worker_cancellation() {
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,7 @@ pub mod walingest;
|
||||
pub mod walreceiver;
|
||||
pub mod walrecord;
|
||||
pub mod walredo;
|
||||
pub mod jobs;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use tracing::info;
|
||||
|
||||
@@ -97,6 +97,12 @@ pub enum ThreadKind {
|
||||
// Thread that handles compaction of all timelines for a tenant.
|
||||
Compactor,
|
||||
|
||||
// Thread that schedules GC tasks
|
||||
GcScheduler,
|
||||
|
||||
// Thread that works on GC tasks
|
||||
GcWorker,
|
||||
|
||||
// Thread that handles GC of a tenant
|
||||
GarbageCollector,
|
||||
|
||||
|
||||
Reference in New Issue
Block a user