diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index f729cad3c3..5aee13cfc6 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -12,6 +12,8 @@ pub mod disk_usage_eviction_task; pub mod http; pub mod import_datadir; pub mod l0_flush; + +use futures::{stream::FuturesUnordered, StreamExt}; pub use pageserver_api::keyspace; use tokio_util::sync::CancellationToken; pub mod aux_file; @@ -36,7 +38,7 @@ use tenant::{ mgr::{BackgroundPurges, TenantManager}, secondary, }; -use tracing::info; +use tracing::{info, info_span}; /// Current storage format version /// @@ -85,6 +87,79 @@ pub async fn shutdown_pageserver( exit_code: i32, ) { use std::time::Duration; + + // If the orderly shutdown below takes too long, we still want to make + // sure that all walredo processes are killed and wait()ed on by us, not systemd. + // + // (Leftover walredo processes are the hypothesized trigger for the systemd freezes + // that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387. + // + // We use a thread instead of a tokio task because the background runtime is likely busy + // with the final flushing / uploads. This activity here has priority, and due to lack + // of scheduling priority feature sin the tokio scheduler, using a separate thread is + // an effective priority booster. + let walredo_extraordinary_shutdown_thread_span = { + let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread"); + span.follows_from(tracing::Span::current()); + span + }; + let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new(); + let walredo_extraordinary_shutdown_thread = std::thread::spawn({ + let walredo_extraordinary_shutdown_thread_cancel = + walredo_extraordinary_shutdown_thread_cancel.clone(); + move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let _entered = rt.enter(); + let _entered = walredo_extraordinary_shutdown_thread_span.enter(); + if let Ok(()) = rt.block_on(tokio::time::timeout( + Duration::from_secs(8), + walredo_extraordinary_shutdown_thread_cancel.cancelled(), + )) { + info!("cancellation requested"); + return; + } + let managers = tenant::WALREDO_MANAGERS + .lock() + .unwrap() + // prevents new walredo managers from being inserted + .take() + .expect("only we take()"); + // Use FuturesUnordered to get in queue early for each manager's + // heavier_once_cell semaphore wait list. + // Also, for idle tenants that for some reason haven't + // shut down yet, it's quite likely that we're not going + // to get Poll::Pending once. + let mut futs: FuturesUnordered<_> = managers + .into_iter() + .filter_map(|(_, mgr)| mgr.upgrade()) + .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await }) + .collect(); + info!(count=%futs.len(), "built FuturesUnordered"); + let mut last_log_at = std::time::Instant::now(); + #[derive(Debug, Default)] + struct Results { + initiated: u64, + already: u64, + } + let mut results = Results::default(); + while let Some(we_initiated) = rt.block_on(futs.next()) { + if we_initiated { + results.initiated += 1; + } else { + results.already += 1; + } + if last_log_at.elapsed() > Duration::from_millis(100) { + info!(remaining=%futs.len(), ?results, "progress"); + last_log_at = std::time::Instant::now(); + } + } + info!(?results, "done"); + } + }); + // Shut down the libpq endpoint task. This prevents new connections from // being accepted. let remaining_connections = timed( @@ -160,6 +235,12 @@ pub async fn shutdown_pageserver( Duration::from_secs(1), ) .await; + + info!("cancel & join walredo_extraordinary_shutdown_thread"); + walredo_extraordinary_shutdown_thread_cancel.cancel(); + walredo_extraordinary_shutdown_thread.join().unwrap(); + info!("walredo_extraordinary_shutdown_thread done"); + info!("Shut down successfully completed"); std::process::exit(exit_code); } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 5d0e963b4e..0f09241d22 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -33,6 +33,7 @@ use remote_storage::GenericRemoteStorage; use remote_storage::TimeoutOrCancel; use std::collections::BTreeMap; use std::fmt; +use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; use tokio::io::BufReader; @@ -312,14 +313,66 @@ impl std::fmt::Debug for Tenant { } pub(crate) enum WalRedoManager { - Prod(PostgresRedoManager), + Prod(WalredoManagerId, PostgresRedoManager), #[cfg(test)] Test(harness::TestRedoManager), } -impl From for WalRedoManager { - fn from(mgr: PostgresRedoManager) -> Self { - Self::Prod(mgr) +#[derive(thiserror::Error, Debug)] +#[error("pageserver is shutting down")] +pub(crate) struct GlobalShutDown; + +impl WalRedoManager { + pub(crate) fn new(mgr: PostgresRedoManager) -> Result, GlobalShutDown> { + let id = WalredoManagerId::next(); + let arc = Arc::new(Self::Prod(id, mgr)); + let mut guard = WALREDO_MANAGERS.lock().unwrap(); + match &mut *guard { + Some(map) => { + map.insert(id, Arc::downgrade(&arc)); + Ok(arc) + } + None => Err(GlobalShutDown), + } + } +} + +impl Drop for WalRedoManager { + fn drop(&mut self) { + match self { + Self::Prod(id, _) => { + let mut guard = WALREDO_MANAGERS.lock().unwrap(); + if let Some(map) = &mut *guard { + map.remove(id).expect("new() registers, drop() unregisters"); + } + } + #[cfg(test)] + Self::Test(_) => { + // Not applicable to test redo manager + } + } + } +} + +/// Global registry of all walredo managers so that [`crate::shutdown_pageserver`] can shut down +/// the walredo processes outside of the regular order. +/// +/// This is necessary to work around a systemd bug where it freezes if there are +/// walredo processes left => +#[allow(clippy::type_complexity)] +pub(crate) static WALREDO_MANAGERS: once_cell::sync::Lazy< + Mutex>>>, +> = once_cell::sync::Lazy::new(|| Mutex::new(Some(HashMap::new()))); +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub(crate) struct WalredoManagerId(u64); +impl WalredoManagerId { + pub fn next() -> Self { + static NEXT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); + let id = NEXT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if id == 0 { + panic!("WalredoManagerId::new() returned 0, indicating wraparound, risking it's no longer unique"); + } + Self(id) } } @@ -331,19 +384,20 @@ impl From for WalRedoManager { } impl WalRedoManager { - pub(crate) async fn shutdown(&self) { + pub(crate) async fn shutdown(&self) -> bool { match self { - Self::Prod(mgr) => mgr.shutdown().await, + Self::Prod(_, mgr) => mgr.shutdown().await, #[cfg(test)] Self::Test(_) => { // Not applicable to test redo manager + true } } } pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) { match self { - Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout), + Self::Prod(_, mgr) => mgr.maybe_quiesce(idle_timeout), #[cfg(test)] Self::Test(_) => { // Not applicable to test redo manager @@ -363,7 +417,7 @@ impl WalRedoManager { pg_version: u32, ) -> Result { match self { - Self::Prod(mgr) => { + Self::Prod(_, mgr) => { mgr.request_redo(key, lsn, base_img, records, pg_version) .await } @@ -377,7 +431,7 @@ impl WalRedoManager { pub(crate) fn status(&self) -> Option { match self { - WalRedoManager::Prod(m) => Some(m.status()), + WalRedoManager::Prod(_, m) => Some(m.status()), #[cfg(test)] WalRedoManager::Test(_) => None, } @@ -677,11 +731,9 @@ impl Tenant { init_order: Option, mode: SpawnMode, ctx: &RequestContext, - ) -> Arc { - let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( - conf, - tenant_shard_id, - ))); + ) -> Result, GlobalShutDown> { + let wal_redo_manager = + WalRedoManager::new(PostgresRedoManager::new(conf, tenant_shard_id))?; let TenantSharedResources { broker_client, @@ -880,7 +932,7 @@ impl Tenant { } .instrument(tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation)), ); - tenant + Ok(tenant) } #[instrument(skip_all)] diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 58f8990892..b5568d37b5 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -55,7 +55,7 @@ use utils::id::{TenantId, TimelineId}; use super::remote_timeline_client::remote_tenant_path; use super::secondary::SecondaryTenant; use super::timeline::detach_ancestor::PreparedTimelineDetach; -use super::TenantSharedResources; +use super::{GlobalShutDown, TenantSharedResources}; /// For a tenant that appears in TenantsMap, it may either be /// - `Attached`: has a full Tenant object, is elegible to service @@ -665,17 +665,20 @@ pub async fn init_tenant_mgr( let tenant_dir_path = conf.tenant_path(&tenant_shard_id); let shard_identity = location_conf.shard; let slot = match location_conf.mode { - LocationMode::Attached(attached_conf) => TenantSlot::Attached(tenant_spawn( - conf, - tenant_shard_id, - &tenant_dir_path, - resources.clone(), - AttachedTenantConf::new(location_conf.tenant_conf, attached_conf), - shard_identity, - Some(init_order.clone()), - SpawnMode::Lazy, - &ctx, - )), + LocationMode::Attached(attached_conf) => TenantSlot::Attached( + tenant_spawn( + conf, + tenant_shard_id, + &tenant_dir_path, + resources.clone(), + AttachedTenantConf::new(location_conf.tenant_conf, attached_conf), + shard_identity, + Some(init_order.clone()), + SpawnMode::Lazy, + &ctx, + ) + .expect("global shutdown during init_tenant_mgr cannot happen"), + ), LocationMode::Secondary(secondary_conf) => { info!( tenant_id = %tenant_shard_id.tenant_id, @@ -723,7 +726,7 @@ fn tenant_spawn( init_order: Option, mode: SpawnMode, ctx: &RequestContext, -) -> Arc { +) -> Result, GlobalShutDown> { // All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed // path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode // to avoid impacting prod runtime performance. @@ -1190,7 +1193,10 @@ impl TenantManager { None, spawn_mode, ctx, - ); + ) + .map_err(|_: GlobalShutDown| { + UpsertLocationError::Unavailable(TenantMapError::ShuttingDown) + })?; TenantSlot::Attached(tenant) } @@ -1311,7 +1317,7 @@ impl TenantManager { None, SpawnMode::Eager, ctx, - ); + )?; slot_guard.upsert(TenantSlot::Attached(tenant))?; @@ -2045,7 +2051,7 @@ impl TenantManager { None, SpawnMode::Eager, ctx, - ); + )?; slot_guard.upsert(TenantSlot::Attached(tenant))?; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ecae443079..3a7353c138 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -76,6 +76,7 @@ use crate::{ metadata::TimelineMetadata, storage_layer::PersistentLayerDesc, }, + walredo, }; use crate::{ context::{DownloadBehavior, RequestContext}, @@ -1000,7 +1001,10 @@ impl Timeline { .for_get_kind(GetKind::Singular) .observe(elapsed.as_secs_f64()); - if cfg!(feature = "testing") && res.is_err() { + if cfg!(feature = "testing") + && res.is_err() + && !matches!(res, Err(PageReconstructError::Cancelled)) + { // it can only be walredo issue use std::fmt::Write; @@ -5466,20 +5470,22 @@ impl Timeline { } else { trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn); }; - - let img = match self + let res = self .walredo_mgr .as_ref() .context("timeline has no walredo manager") .map_err(PageReconstructError::WalRedo)? .request_redo(key, request_lsn, data.img, data.records, self.pg_version) - .await - .context("reconstruct a page image") - { + .await; + let img = match res { Ok(img) => img, - Err(e) => return Err(PageReconstructError::WalRedo(e)), + Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled), + Err(walredo::Error::Other(e)) => { + return Err(PageReconstructError::WalRedo( + e.context("reconstruct a page image"), + )) + } }; - Ok(img) } } diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 5095beefd7..770081b3b4 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -241,6 +241,9 @@ impl PostgresRedoManager { /// Shut down the WAL redo manager. /// + /// Returns `true` if this call was the one that initiated shutdown. + /// `true` may be observed by no caller if the first caller stops polling. + /// /// After this future completes /// - no redo process is running /// - no new redo process will be spawned @@ -250,22 +253,32 @@ impl PostgresRedoManager { /// # Cancel-Safety /// /// This method is cancellation-safe. - pub async fn shutdown(&self) { + pub async fn shutdown(&self) -> bool { // prevent new processes from being spawned - let permit = match self.redo_process.get_or_init_detached().await { + let maybe_permit = match self.redo_process.get_or_init_detached().await { Ok(guard) => { - let (proc, permit) = guard.take_and_deinit(); - drop(proc); // this just drops the Arc, its refcount may not be zero yet - permit + if matches!(&*guard, ProcessOnceCell::ManagerShutDown) { + None + } else { + let (proc, permit) = guard.take_and_deinit(); + drop(proc); // this just drops the Arc, its refcount may not be zero yet + Some(permit) + } } - Err(permit) => permit, + Err(permit) => Some(permit), + }; + let it_was_us = if let Some(permit) = maybe_permit { + self.redo_process + .set(ProcessOnceCell::ManagerShutDown, permit); + true + } else { + false }; - self.redo_process - .set(ProcessOnceCell::ManagerShutDown, permit); // wait for ongoing requests to drain and the refcounts of all Arc that // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s // for the underlying process. self.launched_processes.close().await; + it_was_us } /// This type doesn't have its own background task to check for idleness: we