From 943220df9b4e320947116ca7fa45bd421c709b7b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 24 Jun 2024 17:18:54 +0000 Subject: [PATCH] implement a global gate + cancellation mechanism for live walredo processes, hook up to shutdown --- pageserver/src/bin/pageserver.rs | 6 +++ pageserver/src/tenant.rs | 12 ++--- pageserver/src/walredo.rs | 75 +++++++++++++++++++++++++++---- pageserver/src/walredo/process.rs | 57 ++++++++++++++++++----- 4 files changed, 127 insertions(+), 23 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index ba5b2608bd..db5e0ae788 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -394,6 +394,11 @@ fn start_pageserver( deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); } + // Set up global walredo manager state + let walredo_global_state = BACKGROUND_RUNTIME.block_on( + pageserver::walredo::GlobalState::spawn(conf, shutdown_pageserver.clone()), + ); + // Up to this point no significant I/O has been done: this should have been fast. Record // duration prior to starting I/O intensive phase of startup. startup_checkpoint(started_startup_at, "initial", "Starting loading tenants"); @@ -429,6 +434,7 @@ fn start_pageserver( broker_client: broker_client.clone(), remote_storage: remote_storage.clone(), deletion_queue_client, + walredo_global_state: Arc::clone(&walredo_global_state), }, order, shutdown_pageserver.clone(), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 203451baa5..b13521e0ed 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -166,6 +166,7 @@ pub struct TenantSharedResources { pub broker_client: storage_broker::BrokerClientChannel, pub remote_storage: GenericRemoteStorage, pub deletion_queue_client: DeletionQueueClient, + pub walredo_global_state: Arc, } /// A [`Tenant`] is really an _attached_ tenant. The configuration @@ -650,17 +651,18 @@ impl Tenant { mode: SpawnMode, ctx: &RequestContext, ) -> anyhow::Result> { - let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( - conf, - tenant_shard_id, - ))); - let TenantSharedResources { broker_client, remote_storage, deletion_queue_client, + walredo_global_state, } = resources; + let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( + walredo_global_state, + tenant_shard_id, + ))); + let attach_mode = attached_conf.location.attach_mode; let generation = attached_conf.location.generation; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 5b43098380..1cb1623ac2 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -35,13 +35,60 @@ use anyhow::Context; use bytes::{Bytes, BytesMut}; use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus}; use pageserver_api::shard::TenantShardId; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::lsn::Lsn; +use utils::sync::gate::Gate; use utils::sync::heavier_once_cell; +pub struct GlobalState { + conf: &'static PageServerConf, + /// Watched by the [`Self::actor`]. + shutdown: CancellationToken, + /// Set by [`Self::actor`] when [`Self::shutdown`] is cancelled. + /// We do this to avoid the Mutex lock inside the `CancellationToken`. + shutdown_bool: AtomicBool, + pub(self) spawn_gate: Gate, +} + +impl GlobalState { + pub async fn spawn( + conf: &'static PageServerConf, + shutdown: CancellationToken, + ) -> Arc { + let state = Arc::new(GlobalState { + conf, + shutdown, + shutdown_bool: AtomicBool::new(false), // if `shutdown` is cancelled already, the task spawned below will set it promptly + spawn_gate: Gate::default(), + }); + tokio::spawn({ + let state = Arc::clone(&state); + async move { + info!("starting"); + state.actor().await; + info!("done"); + } + .instrument(info_span!(parent: None, "walredo_global_state")) + }); + state + } + async fn actor(self: Arc) { + self.shutdown.cancelled().await; + info!("propagating cancellation"); + self.shutdown_bool.store(true, Ordering::Relaxed); + self.spawn_gate.close().await; + info!("all walredo processes have been killed and no new ones will be spawned"); + } + pub(self) fn is_shutdown_requested(self: &Arc) -> bool { + self.shutdown_bool.load(Ordering::Relaxed) + } +} + /// /// This is the real implementation that uses a Postgres process to /// perform WAL replay. Only one thread can use the process at a time, @@ -50,8 +97,8 @@ use utils::sync::heavier_once_cell; /// records. /// pub struct PostgresRedoManager { + global_state: Arc, tenant_shard_id: TenantShardId, - conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, /// The current [`process::WalRedoProcess`] that is used by new redo requests. /// We use [`heavier_once_cell`] for coalescing the spawning, but the redo @@ -78,9 +125,10 @@ pub enum Error { macro_rules! bail { ($($arg:tt)*) => { - return Err(Error::Other(anyhow::anyhow!($($arg)*))); + return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*))); } } +pub(self) use bail; /// /// Public interface of WAL redo manager @@ -124,7 +172,7 @@ impl PostgresRedoManager { img, base_img_lsn, &records[batch_start..i], - self.conf.wal_redo_timeout, + self.global_state.conf.wal_redo_timeout, pg_version, ) .await @@ -145,7 +193,7 @@ impl PostgresRedoManager { img, base_img_lsn, &records[batch_start..], - self.conf.wal_redo_timeout, + self.global_state.conf.wal_redo_timeout, pg_version, ) .await @@ -175,13 +223,13 @@ impl PostgresRedoManager { /// Create a new PostgresRedoManager. /// pub fn new( - conf: &'static PageServerConf, + global_state: Arc, tenant_shard_id: TenantShardId, ) -> PostgresRedoManager { // The actual process is launched lazily, on first request. PostgresRedoManager { + global_state, tenant_shard_id, - conf, last_redo_at: std::sync::Mutex::default(), redo_process: heavier_once_cell::OnceCell::default(), } @@ -232,11 +280,16 @@ impl PostgresRedoManager { let start = Instant::now(); let proc = Arc::new( process::WalRedoProcess::launch( - self.conf, + &self.global_state, self.tenant_shard_id, pg_version, ) - .context("launch walredo process")?, + .map_err(|e| match e { + process::LaunchError::Cancelled => Error::Cancelled, + process::LaunchError::Other(e) => { + Error::Other(e.context("launch walredo process")) + } + })?, ); let duration = start.elapsed(); WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64()); @@ -257,6 +310,12 @@ impl PostgresRedoManager { .apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout) .await .context("apply_wal_records"); + if result.is_err() { + // avoid + if self.global_state.shutdown.is_cancelled() { + return Err(Error::Cancelled); + } + } let duration = started_at.elapsed(); diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index 9140d4f6aa..c07f0a24d2 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -18,13 +18,22 @@ use std::sync::atomic::AtomicUsize; use std::{ collections::VecDeque, process::{Command, Stdio}, + sync::Arc, time::Duration, }; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::{debug, error, instrument, Instrument}; -use utils::{lsn::Lsn, poison::Poison}; +use utils::{ + lsn::Lsn, + poison::Poison, + sync::gate::{GateError, GateGuard}, +}; + +use super::GlobalState; pub struct WalRedoProcess { + global_state: Arc, + _spawn_gate_guard: GateGuard, #[allow(dead_code)] conf: &'static PageServerConf, #[cfg(feature = "testing")] @@ -49,18 +58,33 @@ struct ProcessOutput { n_processed_responses: usize, } +#[derive(Debug, thiserror::Error)] +pub(super) enum LaunchError { + #[error("cancelled")] + Cancelled, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + impl WalRedoProcess { // // Start postgres binary in special WAL redo mode. // #[instrument(skip_all,fields(pg_version=pg_version))] pub(crate) fn launch( - conf: &'static PageServerConf, + global_state: &Arc, tenant_shard_id: TenantShardId, pg_version: u32, - ) -> anyhow::Result { + ) -> Result { crate::span::debug_assert_current_span_has_tenant_id(); + let conf = global_state.conf; + + let spawn_gate_guard = match global_state.spawn_gate.enter() { + Ok(guard) => guard, + Err(GateError::GateClosed) => return Err(LaunchError::Cancelled), + }; + let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?; @@ -144,6 +168,8 @@ impl WalRedoProcess { ); Ok(Self { + global_state: global_state.clone(), + _spawn_gate_guard: spawn_gate_guard, conf, #[cfg(feature = "testing")] tenant_shard_id, @@ -189,9 +215,13 @@ impl WalRedoProcess { base_img: &Option, records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, - ) -> anyhow::Result { + ) -> Result { debug_assert_current_span_has_tenant_id(); + if self.global_state.is_shutdown_requested() { + return Err(super::Error::Cancelled); + } + let tag = protocol::BufferTag { rel, blknum }; // Serialize all the messages to send the WAL redo process first. @@ -216,17 +246,19 @@ impl WalRedoProcess { { protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); } else { - anyhow::bail!("tried to pass neon wal record to postgres WAL redo"); + super::bail!("tried to pass neon wal record to postgres WAL redo"); } } protocol::build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); - let Ok(res) = - tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await - else { - anyhow::bail!("WAL redo timed out"); - }; + let res = + // TODO: we should tokio::select! on the self.global_state.shutdown here, + // but, that requires thinking through the perf implications + tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)) + .await + .map_err(|_elapsed| anyhow::anyhow!("WAL redo timed out"))? + .map_err(super::Error::Other); if res.is_err() { // not all of these can be caused by this particular input, however these are so rare @@ -377,6 +409,11 @@ impl Drop for WalRedoProcess { .take() .expect("we only do this once") .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop); + + // The spawn gate is supposed to track running walredo processes. + // => must keep guard alive until the process is dead. + let _ = &self._spawn_gate_guard; + // no way to wait for stderr_logger_task from Drop because that is async only } }