From ea5900f155d32149c5d001272c5d2f3bc367786e Mon Sep 17 00:00:00 2001 From: anastasia Date: Fri, 22 Oct 2021 20:01:57 +0300 Subject: [PATCH] Refactoring of checkpointer and GC. Move them to a separate tenant_threads module to detangle thread management from LayeredRepository implementation. --- pageserver/src/branches.rs | 3 +- pageserver/src/layered_repository.rs | 126 ++++++--------------------- pageserver/src/lib.rs | 10 +++ pageserver/src/repository.rs | 15 ++-- pageserver/src/tenant_mgr.rs | 86 +++++++++--------- pageserver/src/tenant_threads.rs | 125 ++++++++++++++++++++++++++ 6 files changed, 222 insertions(+), 143 deletions(-) create mode 100644 pageserver/src/tenant_threads.rs diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 6c0e808d12..cfbdd404ee 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -23,6 +23,7 @@ use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::tenant_mgr; use crate::walredo::WalRedoManager; +use crate::CheckpointConfig; use crate::{repository::Repository, PageServerConf}; use crate::{restore_local_repo, LOG_FILE_NAME}; @@ -238,7 +239,7 @@ fn bootstrap_timeline( timeline.writer().as_ref(), lsn, )?; - timeline.checkpoint()?; + timeline.checkpoint(CheckpointConfig::Forced)?; println!( "created initial timeline {} timeline.lsn {}", diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 6f201d83a7..2dc57c8987 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -30,7 +30,6 @@ use std::ops::{Bound::Included, Deref}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; -use std::thread::JoinHandle; use std::time::{Duration, Instant}; use crate::relish::*; @@ -40,6 +39,7 @@ use crate::tenant_mgr; use crate::walreceiver; use crate::walreceiver::IS_WAL_RECEIVER; use crate::walredo::WalRedoManager; +use crate::CheckpointConfig; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; @@ -219,6 +219,22 @@ impl Repository for LayeredRepository { }) } + fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()> { + { + let timelines = self.timelines.lock().unwrap(); + + for (timelineid, timeline) in timelines.iter() { + let _entered = + info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid) + .entered(); + + timeline.checkpoint(cconf)?; + } + } + + Ok(()) + } + // Wait for all threads to complete and persist repository data before pageserver shutdown. fn shutdown(&self) -> Result<()> { trace!("LayeredRepository shutdown for tenant {}", self.tenantid); @@ -228,7 +244,7 @@ impl Repository for LayeredRepository { walreceiver::stop_wal_receiver(*timelineid); // Wait for syncing data to disk trace!("repo shutdown. checkpoint timeline {}", timelineid); - timeline.checkpoint()?; + timeline.checkpoint(CheckpointConfig::Forced)?; //TODO Wait for walredo process to shutdown too } @@ -309,90 +325,6 @@ impl LayeredRepository { } } - /// - /// Launch the checkpointer thread in given repository. - /// - pub fn launch_checkpointer_thread( - conf: &'static PageServerConf, - rc: Arc, - ) -> JoinHandle<()> { - std::thread::Builder::new() - .name("Checkpointer thread".into()) - .spawn(move || { - // FIXME: relaunch it? Panic is not good. - rc.checkpoint_loop(conf).expect("Checkpointer thread died"); - }) - .unwrap() - } - - /// - /// Checkpointer thread's main loop - /// - fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> { - while !tenant_mgr::shutdown_requested() { - std::thread::sleep(conf.checkpoint_period); - info!("checkpointer thread for tenant {} waking up", self.tenantid); - - // checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE - // bytes of WAL since last checkpoint. - { - let timelines = self.timelines.lock().unwrap(); - for (timelineid, timeline) in timelines.iter() { - let _entered = - info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid) - .entered(); - - STORAGE_TIME - .with_label_values(&["checkpoint_timed"]) - .observe_closure_duration(|| { - timeline.checkpoint_internal(conf.checkpoint_distance, false) - })? - } - // release lock on 'timelines' - } - } - trace!("Checkpointer thread shut down"); - Ok(()) - } - - /// - /// Launch the GC thread in given repository. - /// - pub fn launch_gc_thread( - conf: &'static PageServerConf, - rc: Arc, - ) -> JoinHandle<()> { - std::thread::Builder::new() - .name("GC thread".into()) - .spawn(move || { - // FIXME: relaunch it? Panic is not good. - rc.gc_loop(conf).expect("GC thread died"); - }) - .unwrap() - } - - /// - /// GC thread's main loop - /// - fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> { - while !tenant_mgr::shutdown_requested() { - // Garbage collect old files that are not needed for PITR anymore - if conf.gc_horizon > 0 { - self.gc_iteration(None, conf.gc_horizon, false).unwrap(); - } - - // TODO Write it in more adequate way using - // condvar.wait_timeout() or something - let mut sleep_time = conf.gc_period.as_secs(); - while sleep_time > 0 && !tenant_mgr::shutdown_requested() { - sleep_time -= 1; - std::thread::sleep(Duration::from_secs(1)); - } - info!("gc thread for tenant {} waking up", self.tenantid); - } - Ok(()) - } - /// Save timeline metadata to file fn save_metadata( conf: &'static PageServerConf, @@ -546,7 +478,7 @@ impl LayeredRepository { // so that they too can be garbage collected. That's // used in tests, so we want as deterministic results as possible. if checkpoint_before_gc { - timeline.checkpoint()?; + timeline.checkpoint(CheckpointConfig::Forced)?; info!("timeline {} checkpoint_before_gc done", timelineid); } @@ -867,11 +799,15 @@ impl Timeline for LayeredTimeline { /// Public entry point for checkpoint(). All the logic is in the private /// checkpoint_internal function, this public facade just wraps it for /// metrics collection. - fn checkpoint(&self) -> Result<()> { - STORAGE_TIME - .with_label_values(&["checkpoint_force"]) - //pass checkpoint_distance=0 to force checkpoint - .observe_closure_duration(|| self.checkpoint_internal(0, true)) + fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()> { + match cconf { + CheckpointConfig::Forced => STORAGE_TIME + .with_label_values(&["forced checkpoint"]) + .observe_closure_duration(|| self.checkpoint_internal(0)), + CheckpointConfig::Distance(distance) => STORAGE_TIME + .with_label_values(&["checkpoint"]) + .observe_closure_duration(|| self.checkpoint_internal(distance)), + } } fn get_last_record_lsn(&self) -> Lsn { @@ -1250,7 +1186,7 @@ impl LayeredTimeline { /// Flush to disk all data that was written with the put_* functions /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. - fn checkpoint_internal(&self, checkpoint_distance: u64, forced: bool) -> Result<()> { + fn checkpoint_internal(&self, checkpoint_distance: u64) -> Result<()> { let mut write_guard = self.write_lock.lock().unwrap(); let mut layers = self.layers.lock().unwrap(); @@ -1281,10 +1217,6 @@ impl LayeredTimeline { while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() { let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); - if tenant_mgr::shutdown_requested() && !forced { - return Ok(()); - } - // Does this layer need freezing? // // Write out all in-memory layers that contain WAL older than CHECKPOINT_DISTANCE. diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 6bb5d5f7f6..114630a624 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -18,6 +18,7 @@ pub mod relish_storage; pub mod repository; pub mod restore_local_repo; pub mod tenant_mgr; +pub mod tenant_threads; pub mod waldecoder; pub mod walreceiver; pub mod walredo; @@ -164,6 +165,15 @@ impl PageServerConf { } } +/// Config for the Repository checkpointer +#[derive(Debug, Clone, Copy)] +pub enum CheckpointConfig { + // Flush in-memory data that is older than this + Distance(u64), + // Flush all in-memory data + Forced, +} + /// External relish storage configuration, enough for creating a client for that storage. #[derive(Debug, Clone)] pub struct RelishStorageConfig { diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index c1bdb87944..9556bcc2d1 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,4 +1,5 @@ use crate::relish::*; +use crate::CheckpointConfig; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -24,9 +25,9 @@ pub trait Repository: Send + Sync { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>; - /// perform one garbage collection iteration. - /// garbage collection is periodically performed by gc thread, - /// but it can be explicitly requested through page server api. + /// perform one garbage collection iteration, removing old data files from disk. + /// this funtion is periodically called by gc thread. + /// also it can be explicitly requested through page server api 'do_gc' command. /// /// 'timelineid' specifies the timeline to GC, or None for all. /// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval). @@ -39,6 +40,10 @@ pub trait Repository: Send + Sync { horizon: u64, checkpoint_before_gc: bool, ) -> Result; + + /// perform one checkpoint iteration, flushing in-memory data on disk. + /// this function is periodically called by checkponter thread. + fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()>; } /// @@ -144,7 +149,7 @@ pub trait Timeline: Send + Sync { /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. - fn checkpoint(&self) -> Result<()>; + fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()>; /// Retrieve current logical size of the timeline /// @@ -714,7 +719,7 @@ mod tests { .contains(&TESTREL_A)); // Run checkpoint and garbage collection and check that it's still not visible - newtline.checkpoint()?; + newtline.checkpoint(CheckpointConfig::Forced)?; repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?; assert!(!newtline diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index c4f57a59a0..b47dfbd4ab 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -4,6 +4,7 @@ use crate::branches; use crate::layered_repository::LayeredRepository; use crate::repository::{Repository, Timeline}; +use crate::tenant_threads; use crate::walredo::PostgresRedoManager; use crate::PageServerConf; use anyhow::{anyhow, bail, Context, Result}; @@ -16,7 +17,6 @@ use std::fs; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; -use std::thread::JoinHandle; use zenith_utils::zid::{ZTenantId, ZTimelineId}; lazy_static! { @@ -28,7 +28,7 @@ struct Tenant { repo: Option>, } -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub enum TenantState { // This tenant only exists in cloud storage. It cannot be accessed. CloudOnly, @@ -41,10 +41,12 @@ pub enum TenantState { // This tenant exists on local disk, and the layer map has been loaded into memory. // The local disk might have some newer files that don't exist in cloud storage yet. Active, + // Tenant is active, but there is no walreceiver connection. + Idle, // This tenant exists on local disk, and the layer map has been loaded into memory. // The local disk might have some newer files that don't exist in cloud storage yet. // The tenant cannot be accessed anymore for any reason, but graceful shutdown. - //Stopping, + Stopping, } impl fmt::Display for TenantState { @@ -53,6 +55,8 @@ impl fmt::Display for TenantState { TenantState::CloudOnly => f.write_str("CloudOnly"), TenantState::Downloading => f.write_str("Downloading"), TenantState::Active => f.write_str("Active"), + TenantState::Idle => f.write_str("Idle"), + TenantState::Stopping => f.write_str("Stopping"), } } } @@ -61,18 +65,6 @@ fn access_tenants() -> MutexGuard<'static, HashMap> { TENANTS.lock().unwrap() } -struct TenantHandleEntry { - checkpointer_handle: Option>, - gc_handle: Option>, -} - -// Logically these handles belong to Repository, -// but it's just simpler to store them separately -lazy_static! { - static ref TENANT_HANDLES: Mutex> = - Mutex::new(HashMap::new()); -} - static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); pub fn init(conf: &'static PageServerConf) { @@ -106,21 +98,13 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) { true, )); - let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone()); - let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone()); - - let mut handles = TENANT_HANDLES.lock().unwrap(); - let h = TenantHandleEntry { - checkpointer_handle: Some(checkpointer_handle), - gc_handle: Some(gc_handle), - }; - - handles.insert(tenant_id, h); - let mut m = access_tenants(); let tenant = m.get_mut(&tenant_id).unwrap(); tenant.repo = Some(repo); tenant.state = TenantState::Active; + + // TODO Start these threads only if tenant actively receives some WAL + tenant_threads::start_tenant_threads(conf, tenant_id); } pub fn register_relish_download( @@ -142,8 +126,7 @@ pub fn register_relish_download( }); tenant.state = TenantState::Downloading; match &tenant.repo { - Some(repo) => - { + Some(repo) => { init_timeline(repo.as_ref(), timeline_id); tenant.state = TenantState::Active; return; @@ -171,27 +154,23 @@ pub fn shutdown_requested() -> bool { SHUTDOWN_REQUESTED.load(Ordering::Relaxed) } -pub fn stop_tenant_threads(tenantid: ZTenantId) { - let mut handles = TENANT_HANDLES.lock().unwrap(); - if let Some(h) = handles.get_mut(&tenantid) { - h.checkpointer_handle.take().map(JoinHandle::join); - debug!("checkpointer for tenant {} has stopped", tenantid); - h.gc_handle.take().map(JoinHandle::join); - debug!("gc for tenant {} has stopped", tenantid); - } -} - pub fn shutdown_all_tenants() -> Result<()> { SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed); let tenantids = list_tenantids()?; + + for tenantid in &tenantids { + set_tenant_state(*tenantid, TenantState::Stopping)?; + } + for tenantid in tenantids { - stop_tenant_threads(tenantid); + // Wait for checkpointer and GC to finish their job + tenant_threads::wait_for_tenant_threads_to_stop(tenantid); + let repo = get_repository_for_tenant(tenantid)?; debug!("shutdown tenant {}", tenantid); repo.shutdown()?; } - Ok(()) } @@ -223,6 +202,33 @@ pub fn create_repository_for_tenant( Ok(()) } +// If tenant is not found in the repository, return CloudOnly state +pub fn get_tenant_state(tenantid: ZTenantId) -> TenantState { + let m = access_tenants(); + match m.get(&tenantid) { + Some(tenant) => tenant.state, + None => TenantState::CloudOnly, + } +} + +pub fn set_tenant_state(tenantid: ZTenantId, state: TenantState) -> Result { + let mut m = access_tenants(); + let tenant = m.get_mut(&tenantid); + + match tenant { + Some(tenant) => { + if state == TenantState::Idle && tenant.state != TenantState::Active { + // Only Active tenant can become Idle + return Ok(tenant.state); + } + info!("set_tenant_state: {} -> {}", tenant.state, state); + tenant.state = state; + Ok(tenant.state) + } + None => bail!("Tenant not found for tenant {}", tenantid), + } +} + pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result> { let m = access_tenants(); let tenant = m diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs new file mode 100644 index 0000000000..3dc824180b --- /dev/null +++ b/pageserver/src/tenant_threads.rs @@ -0,0 +1,125 @@ +//! This module contains functions to serve per-tenant background processes, +//! such as checkpointer and GC +use crate::tenant_mgr; +use crate::tenant_mgr::TenantState; +use crate::CheckpointConfig; +use crate::PageServerConf; +use anyhow::Result; +use lazy_static::lazy_static; +use std::collections::HashMap; +use std::sync::Mutex; +use std::thread::JoinHandle; +use std::time::Duration; +use tracing::*; +use zenith_utils::zid::ZTenantId; + +struct TenantHandleEntry { + checkpointer_handle: Option>, + gc_handle: Option>, +} + +// Preserve handles to wait for thread completion +// at shutdown +lazy_static! { + static ref TENANT_HANDLES: Mutex> = + Mutex::new(HashMap::new()); +} + +pub fn start_tenant_threads(conf: &'static PageServerConf, tenantid: ZTenantId) { + //ensure that old threads are stopeed + wait_for_tenant_threads_to_stop(tenantid); + + let checkpointer_handle = std::thread::Builder::new() + .name("Checkpointer thread".into()) + .spawn(move || { + checkpoint_loop(tenantid, conf).expect("Checkpointer thread died"); + }) + .ok(); + + let gc_handle = std::thread::Builder::new() + .name("GC thread".into()) + .spawn(move || { + gc_loop(tenantid, conf).expect("GC thread died"); + }) + .ok(); + + // TODO handle thread errors if any + + let mut handles = TENANT_HANDLES.lock().unwrap(); + let h = TenantHandleEntry { + checkpointer_handle, + gc_handle, + }; + + handles.insert(tenantid, h); +} + +pub fn wait_for_tenant_threads_to_stop(tenantid: ZTenantId) { + let mut handles = TENANT_HANDLES.lock().unwrap(); + if let Some(h) = handles.get_mut(&tenantid) { + h.checkpointer_handle.take().map(JoinHandle::join); + trace!("checkpointer for tenant {} has stopped", tenantid); + h.gc_handle.take().map(JoinHandle::join); + trace!("gc for tenant {} has stopped", tenantid); + } + handles.remove(&tenantid); +} + +/// +/// Checkpointer thread's main loop +/// +fn checkpoint_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { + loop { + if tenant_mgr::get_tenant_state(tenantid) != TenantState::Active { + break; + } + + std::thread::sleep(conf.checkpoint_period); + trace!("checkpointer thread for tenant {} waking up", tenantid); + + // checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE + // bytes of WAL since last checkpoint. + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + repo.checkpoint_iteration(CheckpointConfig::Distance(conf.checkpoint_distance))?; + } + + trace!( + "checkpointer thread stopped for tenant {} state is {}", + tenantid, + tenant_mgr::get_tenant_state(tenantid) + ); + Ok(()) +} + +/// +/// GC thread's main loop +/// +fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> { + loop { + if tenant_mgr::get_tenant_state(tenantid) != TenantState::Active { + break; + } + + trace!("gc thread for tenant {} waking up", tenantid); + + // Garbage collect old files that are not needed for PITR anymore + if conf.gc_horizon > 0 { + let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + repo.gc_iteration(None, conf.gc_horizon, false).unwrap(); + } + + // TODO Write it in more adequate way using + // condvar.wait_timeout() or something + let mut sleep_time = conf.gc_period.as_secs(); + while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == TenantState::Active { + sleep_time -= 1; + std::thread::sleep(Duration::from_secs(1)); + } + } + trace!( + "GC thread stopped for tenant {} state is {}", + tenantid, + tenant_mgr::get_tenant_state(tenantid) + ); + Ok(()) +}