From f3c71899be0c3bebba336fd64e3b4532d15ee25e Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Mon, 23 May 2022 22:32:14 -0400 Subject: [PATCH] Add recurring job --- pageserver/src/jobs.rs | 160 ----------------------- pageserver/src/lib.rs | 2 +- pageserver/src/tenant_jobs/mod.rs | 2 + pageserver/src/tenant_jobs/scheduler.rs | 14 +++ pageserver/src/tenant_jobs/worker.rs | 161 ++++++++++++++++++++++++ 5 files changed, 178 insertions(+), 161 deletions(-) create mode 100644 pageserver/src/tenant_jobs/mod.rs create mode 100644 pageserver/src/tenant_jobs/scheduler.rs create mode 100644 pageserver/src/tenant_jobs/worker.rs diff --git a/pageserver/src/jobs.rs b/pageserver/src/jobs.rs index 94f84b63d1..e69de29bb2 100644 --- a/pageserver/src/jobs.rs +++ b/pageserver/src/jobs.rs @@ -1,160 +0,0 @@ -use std::{collections::VecDeque, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}}; -use tracing::{debug, error, info, warn}; - -// TODO maybe make jobs tenant-specific? Makes monitorin easier. - -pub trait Job: std::fmt::Debug + Send + 'static { - fn run(&self); -} - -#[derive(Debug)] -enum WorkerCommand { - Shutdown, - DoJob(J), -} - -#[derive(Debug)] -struct Pool { - job_queue: Mutex>>, - condvar: Condvar, // Notified when queue becomes nonempty -} - -impl Pool { - fn new() -> Self { - Pool { - job_queue: Mutex::new(VecDeque::>::new()), - condvar: Condvar::new(), - } - } - - fn worker_main(&self) -> anyhow::Result<()> { - let mut q = self.job_queue.lock().unwrap(); - loop { - if let Some(command) = q.pop_front() { - drop(q); - match command { - WorkerCommand::Shutdown => return Ok(()), - WorkerCommand::DoJob(job) => { - let result = panic::catch_unwind(AssertUnwindSafe(|| { - job.run(); - })); - if let Err(e) = result { - // TODO mark job as broken - println!("Job panicked, thread is ok."); - } - }, - } - q = self.job_queue.lock().unwrap(); - } else { - q = self.condvar.wait(q).unwrap(); - } - } - } - - fn queue_job(&self, job: J) { - // Add the job to the back of the queue - let mut q = self.job_queue.lock().unwrap(); - q.push_back(WorkerCommand::DoJob(job)); - - // If the queue was empty, wake up the next worker thread to pick up - // the job. - if q.len() == 1 { - self.condvar.notify_one(); - } - } - - fn shutdown_one(&self) { - // Add shutdown command to the front of the queue - let mut q = self.job_queue.lock().unwrap(); - q.push_front(WorkerCommand::Shutdown); - - // If the queue was empty, wake up the next worker thread. - if q.len() == 1 { - self.condvar.notify_one(); - } - - // TODO wait? - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use once_cell::sync::OnceCell; - - use crate::thread_mgr::{self, ThreadKind}; - use super::*; - - #[derive(Debug, Clone, Eq, PartialEq)] - struct PrintJob { - to_print: String - } - - impl Job for PrintJob { - fn run(&self) { - if self.to_print == "pls panic" { - panic!("AAA"); - } - println!("{}", self.to_print); - } - } - - static TEST_POOL: OnceCell> = OnceCell::new(); - - #[tokio::test] - async fn pool_1() { - TEST_POOL.set(Pool::::new()).unwrap(); - - thread_mgr::spawn( - ThreadKind::GarbageCollector, // change this - None, - None, - "test_worker_1", - true, - move || { - TEST_POOL.get().unwrap().worker_main() - }, - ).unwrap(); - - let j = PrintJob { - to_print: "hello from job".to_string(), - }; - TEST_POOL.get().unwrap().queue_job(j.clone()); - TEST_POOL.get().unwrap().queue_job(j.clone()); - TEST_POOL.get().unwrap().queue_job(j.clone()); - - tokio::time::sleep(Duration::from_millis(100)).await; - TEST_POOL.get().unwrap().shutdown_one(); - } - - #[tokio::test] - async fn pool_panic() { - TEST_POOL.set(Pool::::new()).unwrap(); - - thread_mgr::spawn( - ThreadKind::GarbageCollector, // change this - None, - None, - "test_worker_1", - true, - move || { - TEST_POOL.get().unwrap().worker_main() - }, - ).unwrap(); - - let j = PrintJob { - to_print: "hello from job".to_string(), - }; - let panic = PrintJob { - to_print: "pls panic".to_string(), - }; - - TEST_POOL.get().unwrap().queue_job(panic.clone()); - TEST_POOL.get().unwrap().queue_job(j.clone()); - TEST_POOL.get().unwrap().queue_job(j.clone()); - - tokio::time::sleep(Duration::from_millis(100)).await; - TEST_POOL.get().unwrap().shutdown_one(); - } -} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 8bc709cbc6..afb1f3801a 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -21,7 +21,7 @@ pub mod walingest; pub mod walreceiver; pub mod walrecord; pub mod walredo; -pub mod jobs; +pub mod tenant_jobs; use lazy_static::lazy_static; use tracing::info; diff --git a/pageserver/src/tenant_jobs/mod.rs b/pageserver/src/tenant_jobs/mod.rs new file mode 100644 index 0000000000..f414520992 --- /dev/null +++ b/pageserver/src/tenant_jobs/mod.rs @@ -0,0 +1,2 @@ +pub mod worker; +pub mod scheduler; diff --git a/pageserver/src/tenant_jobs/scheduler.rs b/pageserver/src/tenant_jobs/scheduler.rs new file mode 100644 index 0000000000..64bac43b34 --- /dev/null +++ b/pageserver/src/tenant_jobs/scheduler.rs @@ -0,0 +1,14 @@ +use std::time::Duration; + +use super::worker::Job; + + +enum JobStatus { + Active, + Stuck, +} + +struct Scheduler { + interval: Duration, + jobs: Vec<(J, JobStatus)> +} diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs new file mode 100644 index 0000000000..f0b36a3975 --- /dev/null +++ b/pageserver/src/tenant_jobs/worker.rs @@ -0,0 +1,161 @@ +use std::{collections::VecDeque, ops::Sub, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}}; + +// TODO maybe make jobs tenant-specific? Makes monitorin easier. + +pub trait Job: std::fmt::Debug + Send + Clone + 'static { + fn run(&self); +} + +#[derive(Debug)] +struct JobStatusTable { + jobs: Vec, + next: usize, + begin: Instant, +} + +#[derive(Debug)] +struct Pool { + job_table: Mutex>, + condvar: Condvar, // Notified when queue becomes nonempty + period: Duration, +} + +impl Pool { + fn new() -> Self { + Pool { + job_table: Mutex::new(JobStatusTable:: { + jobs: vec![], + next: 0, + begin: Instant::now(), + }), + condvar: Condvar::new(), + period: Duration::from_millis(10), + } + } + + fn worker_main(&self) -> anyhow::Result<()> { + let mut job_table = self.job_table.lock().unwrap(); + loop { + if job_table.next < job_table.jobs.len() { + let job = job_table.jobs[job_table.next].clone(); + job_table.next += 1; + + // Run job without holding lock + drop(job_table); + let result = panic::catch_unwind(AssertUnwindSafe(|| { + job.run(); + })); + job_table = self.job_table.lock().unwrap(); + + match result { + Ok(()) => {}, + Err(e) => { + // TODO mark job as broken + println!("Job panicked, thread is ok."); + }, + } + } else { + let since_last_cycle = Instant::now().duration_since(job_table.begin); + let until_next_cycle = self.period.saturating_sub(since_last_cycle); + if until_next_cycle.is_zero() { + job_table.next = 0; + job_table.begin = Instant::now(); + } else { + job_table = self.condvar.wait_timeout(job_table, until_next_cycle).unwrap().0; + } + } + } + } + + fn queue_job(&self, job: J) { + // Add the job to the back of the queue + let mut job_table = self.job_table.lock().unwrap(); + job_table.jobs.push(job); + + // If the queue was empty, wake up the next worker thread to pick up + // the job. + if job_table.next == job_table.jobs.len() - 1 { + self.condvar.notify_one(); + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use once_cell::sync::OnceCell; + + use crate::thread_mgr::{self, ThreadKind}; + use super::*; + + #[derive(Debug, Clone, Eq, PartialEq)] + struct PrintJob { + to_print: String + } + + impl Job for PrintJob { + fn run(&self) { + if self.to_print == "pls panic" { + panic!("AAA"); + } + println!("{}", self.to_print); + } + } + + static TEST_POOL: OnceCell> = OnceCell::new(); + + #[tokio::test] + async fn pool_1() { + TEST_POOL.set(Pool::::new()).unwrap(); + + thread_mgr::spawn( + ThreadKind::GarbageCollector, // change this + None, + None, + "test_worker_1", + true, + move || { + TEST_POOL.get().unwrap().worker_main() + }, + ).unwrap(); + + let j = PrintJob { + to_print: "hello from job".to_string(), + }; + TEST_POOL.get().unwrap().queue_job(j.clone()); + TEST_POOL.get().unwrap().queue_job(j.clone()); + TEST_POOL.get().unwrap().queue_job(j.clone()); + + tokio::time::sleep(Duration::from_millis(100)).await; + } + + #[tokio::test] + async fn pool_panic() { + TEST_POOL.set(Pool::::new()).unwrap(); + + thread_mgr::spawn( + ThreadKind::GarbageCollector, // change this + None, + None, + "test_worker_1", + true, + move || { + TEST_POOL.get().unwrap().worker_main() + }, + ).unwrap(); + + let j = PrintJob { + to_print: "hello from job".to_string(), + }; + let panic = PrintJob { + to_print: "pls panic".to_string(), + }; + + TEST_POOL.get().unwrap().queue_job(panic.clone()); + TEST_POOL.get().unwrap().queue_job(j.clone()); + TEST_POOL.get().unwrap().queue_job(j.clone()); + + tokio::time::sleep(Duration::from_millis(100)).await; + } +}