diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d5d69e26a7..62ae739a18 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -9,7 +9,18 @@ use clap::{App, Arg}; use daemonize::Daemonize; use fail::FailScenario; -use pageserver::{LOG_FILE_NAME, config::{defaults::*, PageServerConf}, http, page_cache, page_service, profiling, tenant_jobs::{compaction::{COMPACTION_POOL, CompactionJob}, gc::{GC_POOL, GcJob}, worker::Pool}, tenant_mgr, thread_mgr, thread_mgr::ThreadKind, timelines, virtual_file}; +use pageserver::{ + config::{defaults::*, PageServerConf}, + http, page_cache, page_service, profiling, + tenant_jobs::{ + compaction::{CompactionJob, COMPACTION_POOL}, + gc::{GcJob, GC_POOL}, + worker::Pool, + }, + tenant_mgr, thread_mgr, + thread_mgr::ThreadKind, + timelines, virtual_file, LOG_FILE_NAME, +}; use utils::{ auth::JwtAuth, http::endpoint, @@ -310,10 +321,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() None, &name.clone(), true, - move || { - GC_POOL.get().unwrap().worker_main(name.clone()) - }, - ).unwrap(); + move || GC_POOL.get().unwrap().worker_main(name.clone()), + ) + .unwrap(); } // Spawn compaction workers @@ -326,10 +336,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() None, &name.clone(), true, - move || { - GC_POOL.get().unwrap().worker_main(name.clone()) - }, - ).unwrap(); + move || GC_POOL.get().unwrap().worker_main(name.clone()), + ) + .unwrap(); } signals.handle(|signal| match signal { diff --git a/pageserver/src/jobs.rs b/pageserver/src/jobs.rs index e69de29bb2..8b13789179 100644 --- a/pageserver/src/jobs.rs +++ b/pageserver/src/jobs.rs @@ -0,0 +1 @@ + diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index bde54debd2..b2ef18761b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -12,6 +12,7 @@ pub mod reltag; pub mod repository; pub mod storage_sync; pub mod tenant_config; +pub mod tenant_jobs; pub mod tenant_mgr; pub mod thread_mgr; pub mod timelines; @@ -20,7 +21,6 @@ pub mod walingest; pub mod walreceiver; pub mod walrecord; pub mod walredo; -pub mod tenant_jobs; use lazy_static::lazy_static; use tracing::info; diff --git a/pageserver/src/tenant_jobs/compaction.rs b/pageserver/src/tenant_jobs/compaction.rs index c680d9fda6..a12f3c5768 100644 --- a/pageserver/src/tenant_jobs/compaction.rs +++ b/pageserver/src/tenant_jobs/compaction.rs @@ -1,14 +1,13 @@ use std::{ops::Add, time::Instant}; +use crate::repository::Repository; use once_cell::sync::OnceCell; use utils::zid::ZTenantId; -use crate::repository::Repository; use crate::tenant_mgr::{self, TenantState}; use super::worker::{Job, Pool}; - #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct CompactionJob { pub tenant: ZTenantId, @@ -19,7 +18,10 @@ impl Job for CompactionJob { fn run(&self) -> Result, Self::ErrorType> { // Don't reschedule job if tenant isn't active - if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) { + if !matches!( + tenant_mgr::get_tenant_state(self.tenant), + Some(TenantState::Active) + ) { return Ok(None); } diff --git a/pageserver/src/tenant_jobs/gc.rs b/pageserver/src/tenant_jobs/gc.rs index cbb47bf439..cd9ce5aa2a 100644 --- a/pageserver/src/tenant_jobs/gc.rs +++ b/pageserver/src/tenant_jobs/gc.rs @@ -1,8 +1,11 @@ use std::{ops::Add, time::Instant}; +use crate::{ + repository::Repository, + tenant_mgr::{self, TenantState}, +}; use once_cell::sync::OnceCell; use utils::zid::ZTenantId; -use crate::{repository::Repository, tenant_mgr::{self, TenantState}}; use super::worker::{Job, Pool}; @@ -16,7 +19,10 @@ impl Job for GcJob { fn run(&self) -> Result, Self::ErrorType> { // Don't reschedule job if tenant isn't active - if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) { + if !matches!( + tenant_mgr::get_tenant_state(self.tenant), + Some(TenantState::Active) + ) { return Ok(None); } diff --git a/pageserver/src/tenant_jobs/mod.rs b/pageserver/src/tenant_jobs/mod.rs index f71bdec4d5..f7beb525f9 100644 --- a/pageserver/src/tenant_jobs/mod.rs +++ b/pageserver/src/tenant_jobs/mod.rs @@ -1,3 +1,3 @@ -pub mod worker; pub mod compaction; pub mod gc; +pub mod worker; diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index 0a490b0394..cb710c9bea 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -1,6 +1,15 @@ -use std::{any::Any, collections::{BinaryHeap, HashMap}, fmt::Debug, hash::Hash, ops::Add, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}}; use lazy_static::lazy_static; use metrics::{register_gauge_vec, GaugeVec}; +use std::{ + any::Any, + collections::{BinaryHeap, HashMap}, + fmt::Debug, + hash::Hash, + ops::Add, + panic::{self, AssertUnwindSafe}, + sync::{Condvar, Mutex}, + time::{Duration, Instant}, +}; lazy_static! { static ref POOL_UTILIZATION_GAUGE: GaugeVec = register_gauge_vec!( @@ -23,7 +32,10 @@ enum JobError { } #[derive(Debug)] -enum JobStatus where J::ErrorType: Debug { +enum JobStatus +where + J::ErrorType: Debug, +{ Ready { scheduled_for: Instant, }, @@ -36,33 +48,48 @@ enum JobStatus where J::ErrorType: Debug { // TODO make this generic event, put in different module #[derive(Debug)] -struct Deadline where J::ErrorType: Debug { +struct Deadline +where + J::ErrorType: Debug, +{ start_by: Instant, job: J, } -impl PartialOrd for Deadline where J::ErrorType: Debug { +impl PartialOrd for Deadline +where + J::ErrorType: Debug, +{ fn partial_cmp(&self, other: &Self) -> Option { other.start_by.partial_cmp(&self.start_by) } } -impl Ord for Deadline where J::ErrorType: Debug { +impl Ord for Deadline +where + J::ErrorType: Debug, +{ fn cmp(&self, other: &Self) -> std::cmp::Ordering { other.start_by.cmp(&self.start_by) } } -impl PartialEq for Deadline where J::ErrorType: Debug { +impl PartialEq for Deadline +where + J::ErrorType: Debug, +{ fn eq(&self, other: &Self) -> bool { self.start_by == other.start_by } } -impl Eq for Deadline where J::ErrorType: Debug { } +impl Eq for Deadline where J::ErrorType: Debug {} #[derive(Debug)] -struct JobStatusTable where J::ErrorType: Debug { +struct JobStatusTable +where + J::ErrorType: Debug, +{ /// Complete summary of current state status: HashMap>, @@ -70,7 +97,10 @@ struct JobStatusTable where J::ErrorType: Debug { queue: BinaryHeap>, } -impl JobStatusTable where J::ErrorType: Debug { +impl JobStatusTable +where + J::ErrorType: Debug, +{ fn pop_due(&mut self) -> Option> { if let Some(deadline) = self.queue.peek() { if Instant::now() > deadline.start_by { @@ -87,12 +117,18 @@ impl JobStatusTable where J::ErrorType: Debug { } #[derive(Debug)] -pub struct Pool where J::ErrorType: Debug { +pub struct Pool +where + J::ErrorType: Debug, +{ job_table: Mutex>, - condvar: Condvar, // Notified when idle worker should wake up + condvar: Condvar, // Notified when idle worker should wake up } -impl Pool where J::ErrorType: Debug { +impl Pool +where + J::ErrorType: Debug, +{ pub fn new() -> Self { Pool { job_table: Mutex::new(JobStatusTable:: { @@ -107,43 +143,51 @@ impl Pool where J::ErrorType: Debug { pub fn worker_main(&self, worker_name: String) -> anyhow::Result<()> { let mut job_table = self.job_table.lock().unwrap(); loop { - if let Some(Deadline {job, ..}) = job_table.pop_due() { - job_table.set_status(&job, JobStatus::Running { - worker_name: worker_name.clone(), - started_at: Instant::now(), - }); + if let Some(Deadline { job, .. }) = job_table.pop_due() { + job_table.set_status( + &job, + JobStatus::Running { + worker_name: worker_name.clone(), + started_at: Instant::now(), + }, + ); // Run job without holding lock drop(job_table); - POOL_UTILIZATION_GAUGE.with_label_values(&["todo_put_pool_name_here"]).inc(); - let result = panic::catch_unwind(AssertUnwindSafe(|| { - job.run() - })); - POOL_UTILIZATION_GAUGE.with_label_values(&["todo_put_pool_name_here"]).dec(); + POOL_UTILIZATION_GAUGE + .with_label_values(&["todo_put_pool_name_here"]) + .inc(); + let result = panic::catch_unwind(AssertUnwindSafe(|| job.run())); + POOL_UTILIZATION_GAUGE + .with_label_values(&["todo_put_pool_name_here"]) + .dec(); job_table = self.job_table.lock().unwrap(); // Update job status match result { Ok(Ok(Some(reschedule_for))) => { - job_table.set_status(&job, JobStatus::Ready { - scheduled_for: reschedule_for, - }); + job_table.set_status( + &job, + JobStatus::Ready { + scheduled_for: reschedule_for, + }, + ); job_table.queue.push(Deadline { job: job.clone(), start_by: reschedule_for, }); - }, + } Ok(Ok(None)) => { job_table.status.remove(&job); - }, + } Ok(Err(e)) => { job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e))); println!("Job errored, thread is ok."); - }, + } Err(e) => { job_table.set_status(&job, JobStatus::Stuck(JobError::Panic(e))); println!("Job panicked, thread is ok."); - }, + } } } else { match job_table.queue.peek() { @@ -162,9 +206,9 @@ impl Pool where J::ErrorType: Debug { pub fn queue_job(&self, job: J) { let mut job_table = self.job_table.lock().unwrap(); let scheduled_for = Instant::now(); - job_table.status.insert(job.clone(), JobStatus::Ready { - scheduled_for, - }); + job_table + .status + .insert(job.clone(), JobStatus::Ready { scheduled_for }); job_table.queue.push(Deadline { job: job.clone(), start_by: scheduled_for, @@ -180,12 +224,12 @@ mod tests { use once_cell::sync::OnceCell; - use crate::thread_mgr::{self, ThreadKind}; use super::*; + use crate::thread_mgr::{self, ThreadKind}; #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] struct PrintJob { - to_print: String + to_print: String, } impl Job for PrintJob { @@ -207,26 +251,24 @@ mod tests { TEST_POOL.set(Pool::::new()).unwrap(); thread_mgr::spawn( - ThreadKind::GarbageCollector, // change this + ThreadKind::GarbageCollector, // change this None, None, "test_worker_1", true, - move || { - TEST_POOL.get().unwrap().worker_main("test_worker_1".into()) - }, - ).unwrap(); + move || TEST_POOL.get().unwrap().worker_main("test_worker_1".into()), + ) + .unwrap(); thread_mgr::spawn( - ThreadKind::GarbageCollector, // change this + ThreadKind::GarbageCollector, // change this None, None, "test_worker_2", true, - move || { - TEST_POOL.get().unwrap().worker_main("test_worker_2".into()) - }, - ).unwrap(); + move || TEST_POOL.get().unwrap().worker_main("test_worker_2".into()), + ) + .unwrap(); TEST_POOL.get().unwrap().queue_job(PrintJob { to_print: "hello from job".to_string(), diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 6387676e63..9cac82e3a7 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -8,8 +8,8 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate}; use crate::storage_sync::index::RemoteIndex; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::tenant_config::TenantConfOpt; -use crate::tenant_jobs::compaction::{COMPACTION_POOL, CompactionJob}; -use crate::tenant_jobs::gc::{GC_POOL, GcJob}; +use crate::tenant_jobs::compaction::{CompactionJob, COMPACTION_POOL}; +use crate::tenant_jobs::gc::{GcJob, GC_POOL}; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::timelines; @@ -245,13 +245,15 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> { // Important to activate before scheduling jobs tenant.state = TenantState::Active; - GC_POOL.get().unwrap().queue_job(GcJob { - tenant: tenant_id, - }); + GC_POOL + .get() + .unwrap() + .queue_job(GcJob { tenant: tenant_id }); - COMPACTION_POOL.get().unwrap().queue_job(CompactionJob { - tenant: tenant_id, - }); + COMPACTION_POOL + .get() + .unwrap() + .queue_job(CompactionJob { tenant: tenant_id }); } TenantState::Stopping => {