Compare commits

...

24 Commits

Author SHA1 Message Date
Bojan Serafimov
8c78a5e9a6 Clippy 2022-05-25 11:31:22 -04:00
Bojan Serafimov
a8f0124af7 Make condvar shutdown-aware 2022-05-25 11:28:50 -04:00
Bojan Serafimov
88154f9871 clipppy 2022-05-24 23:50:21 -04:00
Bojan Serafimov
14b8dcab5b clippy 2022-05-24 23:24:03 -04:00
Bojan Serafimov
aee8615b30 Merge branch 'main' into ps-thread-pool-2 2022-05-24 22:00:59 -04:00
Bojan Serafimov
079871f8e3 Fmt 2022-05-24 21:54:55 -04:00
Bojan Serafimov
f9c1fc8657 Add metric 2022-05-24 21:48:45 -04:00
Bojan Serafimov
32b4e0fda2 Fix 2022-05-24 15:14:33 -04:00
Bojan Serafimov
6b4e17ef89 Start workers 2022-05-24 15:03:18 -04:00
Bojan Serafimov
07e6dd809d Start jobs when tenant activates 2022-05-24 14:25:15 -04:00
Bojan Serafimov
b028f12b06 Fix 2022-05-24 14:14:29 -04:00
Bojan Serafimov
3975417cae Add metadata 2022-05-24 14:12:53 -04:00
Bojan Serafimov
98d8d65c83 Cleanup 2022-05-24 14:03:07 -04:00
Bojan Serafimov
1e9b628c25 Add period 2022-05-24 12:49:30 -04:00
Bojan Serafimov
31dc9e6abd Use queue 2022-05-24 10:50:56 -04:00
Bojan Serafimov
0d5fee3ab7 Add gc pool 2022-05-23 23:42:25 -04:00
Bojan Serafimov
dc47f9ccf1 Simplify 2022-05-23 23:18:25 -04:00
Bojan Serafimov
fefbff8981 Add job status 2022-05-23 22:44:48 -04:00
Bojan Serafimov
eefc7aa792 Add todo 2022-05-23 22:33:53 -04:00
Bojan Serafimov
f3c71899be Add recurring job 2022-05-23 22:32:14 -04:00
Bojan Serafimov
de55b2f139 Handle panic 2022-05-22 16:12:00 -04:00
Bojan Serafimov
5a19ac02c9 Add todo 2022-05-22 15:53:52 -04:00
Bojan Serafimov
13b374a2df Add shutdown command 2022-05-22 12:34:53 -04:00
Bojan Serafimov
b9814222f9 WIP 2022-05-12 12:28:46 -04:00
9 changed files with 445 additions and 114 deletions

View File

@@ -11,7 +11,13 @@ use daemonize::Daemonize;
use fail::FailScenario;
use pageserver::{
config::{defaults::*, PageServerConf},
http, page_cache, page_service, profiling, tenant_mgr, thread_mgr,
http, page_cache, page_service, profiling,
tenant_jobs::{
compaction::{CompactionJob, COMPACTION_POOL},
gc::{GcJob, GC_POOL},
worker::Pool,
},
tenant_mgr, thread_mgr,
thread_mgr::ThreadKind,
timelines, virtual_file, LOG_FILE_NAME,
};
@@ -302,6 +308,36 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
move || page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type),
)?;
// Spawn GC workers
GC_POOL.set(Pool::<GcJob>::new()).unwrap();
for i in 0..3 {
let name = format!("gc_worker_{}", i);
thread_mgr::spawn(
ThreadKind::GarbageCollectionWorker,
None,
None,
&name.clone(),
true,
move || GC_POOL.get().unwrap().worker_main(name),
)
.unwrap();
}
// Spawn compaction workers
COMPACTION_POOL.set(Pool::<CompactionJob>::new()).unwrap();
for i in 0..3 {
let name = format!("compaction_worker_{}", i);
thread_mgr::spawn(
ThreadKind::CompactionWorker,
None,
None,
&name.clone(),
true,
move || GC_POOL.get().unwrap().worker_main(name),
)
.unwrap();
}
signals.handle(|signal| match signal {
Signal::Quit => {
info!(

View File

@@ -12,8 +12,8 @@ pub mod reltag;
pub mod repository;
pub mod storage_sync;
pub mod tenant_config;
pub mod tenant_jobs;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;

View File

@@ -0,0 +1,35 @@
use std::{ops::Add, time::Instant};
use crate::repository::Repository;
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use crate::tenant_mgr::{self, TenantState};
use super::worker::{Job, Pool};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CompactionJob {
pub tenant: ZTenantId,
}
impl Job for CompactionJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<Option<Instant>, Self::ErrorType> {
// Don't reschedule job if tenant isn't active
if !matches!(
tenant_mgr::get_tenant_state(self.tenant),
Some(TenantState::Active)
) {
return Ok(None);
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
repo.compaction_iteration()?;
Ok(Some(Instant::now().add(repo.get_compaction_period())))
}
}
pub static COMPACTION_POOL: OnceCell<Pool<CompactionJob>> = OnceCell::new();

View File

@@ -0,0 +1,39 @@
use std::{ops::Add, time::Instant};
use crate::{
repository::Repository,
tenant_mgr::{self, TenantState},
};
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use super::worker::{Job, Pool};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct GcJob {
pub tenant: ZTenantId,
}
impl Job for GcJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<Option<Instant>, Self::ErrorType> {
// Don't reschedule job if tenant isn't active
if !matches!(
tenant_mgr::get_tenant_state(self.tenant),
Some(TenantState::Active)
) {
return Ok(None);
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
let gc_horizon = repo.get_gc_horizon();
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
Ok(Some(Instant::now().add(repo.get_gc_period())))
}
}
pub static GC_POOL: OnceCell<Pool<GcJob>> = OnceCell::new();

View File

@@ -0,0 +1,3 @@
pub mod compaction;
pub mod gc;
pub mod worker;

View File

@@ -0,0 +1,294 @@
use lazy_static::lazy_static;
use metrics::{register_gauge_vec, GaugeVec};
use std::{
any::Any,
collections::{BinaryHeap, HashMap},
fmt::Debug,
hash::Hash,
panic::{self, AssertUnwindSafe},
sync::{Arc, Condvar, Mutex},
time::Instant,
};
use crate::thread_mgr::{get_shutdown_aware_condvar, is_shutdown_requested};
lazy_static! {
static ref POOL_UTILIZATION_GAUGE: GaugeVec = register_gauge_vec!(
"pageserver_pool_utilization",
"Number of bysy workers",
&["pool_name"]
)
.expect("Failed to register safekeeper_commit_lsn gauge vec");
}
pub trait Job: std::fmt::Debug + Send + Clone + PartialOrd + Ord + Hash + 'static {
type ErrorType;
fn run(&self) -> Result<Option<Instant>, Self::ErrorType>;
}
#[derive(Debug)]
enum JobError<J: Job> {
Panic(Box<dyn Any + Send>),
Error(J::ErrorType),
}
#[derive(Debug)]
enum JobStatus<J: Job>
where
J::ErrorType: Debug,
{
Ready {
#[allow(dead_code)]
scheduled_for: Instant,
},
Running {
#[allow(dead_code)]
worker_name: String,
#[allow(dead_code)]
started_at: Instant,
},
Stuck(JobError<J>),
}
// TODO make this generic event, put in different module
#[derive(Debug)]
struct Deadline<J: Job>
where
J::ErrorType: Debug,
{
start_by: Instant,
job: J,
}
impl<J: Job> PartialOrd for Deadline<J>
where
J::ErrorType: Debug,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
other.start_by.partial_cmp(&self.start_by)
}
}
impl<J: Job> Ord for Deadline<J>
where
J::ErrorType: Debug,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.start_by.cmp(&self.start_by)
}
}
impl<J: Job> PartialEq for Deadline<J>
where
J::ErrorType: Debug,
{
fn eq(&self, other: &Self) -> bool {
self.start_by == other.start_by
}
}
impl<J: Job> Eq for Deadline<J> where J::ErrorType: Debug {}
#[derive(Debug, Default)]
struct JobStatusTable<J: Job>
where
J::ErrorType: Debug,
{
/// Complete summary of current state
status: HashMap<J, JobStatus<J>>,
/// Index over status for finding the next scheduled job
queue: BinaryHeap<Deadline<J>>,
}
impl<J: Job> JobStatusTable<J>
where
J::ErrorType: Debug,
{
fn pop_due(&mut self) -> Option<Deadline<J>> {
if let Some(deadline) = self.queue.peek() {
if Instant::now() > deadline.start_by {
return self.queue.pop();
}
}
None
}
fn set_status(&mut self, job: &J, status: JobStatus<J>) {
let s = self.status.get_mut(job).expect("status not found");
*s = status;
}
}
#[derive(Debug)]
pub struct Pool<J: Job>
where
J::ErrorType: Debug,
{
job_table: Mutex<JobStatusTable<J>>,
condvar: Arc<Condvar>, // Notified when idle worker should wake up
}
impl<J: Job> Default for Pool<J>
where
J::ErrorType: Debug,
{
fn default() -> Self {
Self::new()
}
}
impl<J: Job> Pool<J>
where
J::ErrorType: Debug,
{
pub fn new() -> Self {
Pool {
job_table: Mutex::new(JobStatusTable::<J> {
status: HashMap::<J, JobStatus<J>>::new(),
queue: BinaryHeap::<Deadline<J>>::new(),
}),
condvar: get_shutdown_aware_condvar(),
}
}
pub fn worker_main(&self, worker_name: String) -> anyhow::Result<()> {
let mut job_table = self.job_table.lock().unwrap();
while !is_shutdown_requested() {
if let Some(Deadline { job, .. }) = job_table.pop_due() {
job_table.set_status(
&job,
JobStatus::Running {
worker_name: worker_name.clone(),
started_at: Instant::now(),
},
);
// Run job without holding lock
drop(job_table);
POOL_UTILIZATION_GAUGE
.with_label_values(&["todo_put_pool_name_here"])
.inc();
let result = panic::catch_unwind(AssertUnwindSafe(|| job.run()));
POOL_UTILIZATION_GAUGE
.with_label_values(&["todo_put_pool_name_here"])
.dec();
job_table = self.job_table.lock().unwrap();
// Update job status
match result {
Ok(Ok(Some(reschedule_for))) => {
job_table.set_status(
&job,
JobStatus::Ready {
scheduled_for: reschedule_for,
},
);
job_table.queue.push(Deadline {
job: job.clone(),
start_by: reschedule_for,
});
}
Ok(Ok(None)) => {
job_table.status.remove(&job);
}
Ok(Err(e)) => {
job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e)));
println!("Job errored, thread is ok.");
}
Err(e) => {
job_table.set_status(&job, JobStatus::Stuck(JobError::Panic(e)));
println!("Job panicked, thread is ok.");
}
}
} else {
match job_table.queue.peek() {
Some(deadline) => {
let wait_time = deadline.start_by.duration_since(Instant::now());
job_table = self.condvar.wait_timeout(job_table, wait_time).unwrap().0;
}
None => {
job_table = self.condvar.wait(job_table).unwrap();
}
}
}
}
Ok(())
}
pub fn queue_job(&self, job: J) {
let mut job_table = self.job_table.lock().unwrap();
let scheduled_for = Instant::now();
job_table
.status
.insert(job.clone(), JobStatus::Ready { scheduled_for });
job_table.queue.push(Deadline {
job,
start_by: scheduled_for,
});
self.condvar.notify_all();
}
}
#[cfg(test)]
mod tests {
use std::{ops::Add, time::Duration};
use once_cell::sync::OnceCell;
use super::*;
use crate::thread_mgr::{self, ThreadKind};
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
struct PrintJob {
to_print: String,
}
impl Job for PrintJob {
type ErrorType = String;
fn run(&self) -> Result<Option<Instant>, String> {
if self.to_print == "pls panic" {
panic!("AAA");
}
println!("{}", self.to_print);
Ok(Some(Instant::now().add(Duration::from_millis(10))))
}
}
static TEST_POOL: OnceCell<Pool<PrintJob>> = OnceCell::new();
#[tokio::test]
async fn pool_1() {
TEST_POOL.set(Pool::<PrintJob>::new()).unwrap();
thread_mgr::spawn(
ThreadKind::GarbageCollectionWorker,
None,
None,
"test_worker_1",
true,
move || TEST_POOL.get().unwrap().worker_main("test_worker_1".into()),
)
.unwrap();
thread_mgr::spawn(
ThreadKind::GarbageCollectionWorker,
None,
None,
"test_worker_2",
true,
move || TEST_POOL.get().unwrap().worker_main("test_worker_2".into()),
)
.unwrap();
TEST_POOL.get().unwrap().queue_job(PrintJob {
to_print: "hello from job".to_string(),
});
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

View File

@@ -8,6 +8,8 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate};
use crate::storage_sync::index::RemoteIndex;
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::tenant_config::TenantConfOpt;
use crate::tenant_jobs::compaction::{CompactionJob, COMPACTION_POOL};
use crate::tenant_jobs::gc::{GcJob, GC_POOL};
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::timelines;
@@ -173,8 +175,8 @@ pub fn shutdown_all_tenants() {
drop(m);
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollectionWorker), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::CompactionWorker), None, None);
// Ok, no background threads running anymore. Flush any remaining data in
// memory to disk.
@@ -262,34 +264,20 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
// If the tenant is already active, nothing to do.
TenantState::Active => {}
// If it's Idle, launch the compactor and GC threads
// If it's Idle, launch the compactor and GC jobs
TenantState::Idle => {
thread_mgr::spawn(
ThreadKind::Compactor,
Some(tenant_id),
None,
"Compactor thread",
false,
move || crate::tenant_threads::compact_loop(tenant_id),
)?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenant_id),
None,
"GC thread",
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
if let Err(e) = &gc_spawn_result {
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
// Important to activate before scheduling jobs
tenant.state = TenantState::Active;
GC_POOL
.get()
.unwrap()
.queue_job(GcJob { tenant: tenant_id });
COMPACTION_POOL
.get()
.unwrap()
.queue_job(CompactionJob { tenant: tenant_id });
}
TenantState::Stopping => {

View File

@@ -1,79 +0,0 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use anyhow::Result;
use std::time::Duration;
use tracing::*;
use utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
Ok(())
}
}
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
std::thread::sleep(compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.compaction_iteration()?;
}
trace!(
"compaction thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon = repo.get_gc_horizon();
// Garbage collect old files that are not needed for PITR anymore
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = repo.get_gc_period().as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}

View File

@@ -37,7 +37,7 @@ use std::collections::HashMap;
use std::panic;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
@@ -59,6 +59,17 @@ lazy_static! {
/// Global registry of threads
static ref THREADS: Mutex<HashMap<u64, Arc<PageServerThread>>> = Mutex::new(HashMap::new());
// TODO make these per thread for targetted shutdown, also for cleanup.
/// Condvars to notify after shutdown request
static ref SHUTDOWN_CONDVARS: Mutex<Vec<Arc<Condvar>>> = Mutex::new(Vec::new());
}
/// Return a condvar which will receive a notify_all() call when shutdown is requested
pub fn get_shutdown_aware_condvar() -> Arc<Condvar> {
let mut condvars = SHUTDOWN_CONDVARS.lock().unwrap();
condvars.push(Arc::new(Condvar::new()));
condvars.last().unwrap().clone()
}
// There is a Tokio watch channel for each thread, which can be used to signal the
@@ -94,11 +105,11 @@ pub enum ThreadKind {
// Thread that connects to a safekeeper to fetch WAL for one timeline.
WalReceiver,
// Thread that handles compaction of all timelines for a tenant.
Compactor,
// Worker that does compaction jobs
CompactionWorker,
// Thread that handles GC of a tenant
GarbageCollector,
// Worker that does GC jobs
GarbageCollectionWorker,
// Thread that flushes frozen in-memory layers to disk
LayerFlushThread,
@@ -297,6 +308,10 @@ pub fn shutdown_threads(
}
drop(threads);
for condvar in SHUTDOWN_CONDVARS.lock().unwrap().iter() {
condvar.notify_all();
}
for thread in victim_threads {
info!("waiting for {} to shut down", thread.name);
if let Some(join_handle) = thread.join_handle.lock().unwrap().take() {