mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 11:32:56 +00:00
pageserver: shutdown all walredo managers 8s into shutdown (#8572)
# Motivation The working theory for hung systemd during PS deploy (https://github.com/neondatabase/cloud/issues/11387) is that leftover walredo processes trigger a race condition. In https://github.com/neondatabase/neon/pull/8150 I arranged that a clean Tenant shutdown does actually kill its walredo processes. But many prod machines don't manage to shut down all their tenants until the 10s systemd timeout hits and, presumably, triggers the race condition in systemd / the Linux kernel that causes the frozen systemd # Solution This PR bolts on a rather ugly mechanism to shut down tenant managers out of order 8s after we've received the SIGTERM from systemd. # Changes - add a global registry of `Weak<WalRedoManager>` - add a special thread spawned during `shutdown_pageserver` that sleeps for 8s, then shuts down all redo managers in the registry and prevents new redo managers from being created - propagate the new failure mode of tenant spawning throughout the code base - make sure shut down tenant manager results in PageReconstructError::Cancelled so that if Timeline::get calls come in after the shutdown, they do the right thing
This commit is contained in:
committed by
GitHub
parent
d6c79b77df
commit
980d506bda
@@ -12,6 +12,8 @@ pub mod disk_usage_eviction_task;
|
||||
pub mod http;
|
||||
pub mod import_datadir;
|
||||
pub mod l0_flush;
|
||||
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
pub use pageserver_api::keyspace;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
pub mod aux_file;
|
||||
@@ -36,7 +38,7 @@ use tenant::{
|
||||
mgr::{BackgroundPurges, TenantManager},
|
||||
secondary,
|
||||
};
|
||||
use tracing::info;
|
||||
use tracing::{info, info_span};
|
||||
|
||||
/// Current storage format version
|
||||
///
|
||||
@@ -85,6 +87,79 @@ pub async fn shutdown_pageserver(
|
||||
exit_code: i32,
|
||||
) {
|
||||
use std::time::Duration;
|
||||
|
||||
// If the orderly shutdown below takes too long, we still want to make
|
||||
// sure that all walredo processes are killed and wait()ed on by us, not systemd.
|
||||
//
|
||||
// (Leftover walredo processes are the hypothesized trigger for the systemd freezes
|
||||
// that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
|
||||
//
|
||||
// We use a thread instead of a tokio task because the background runtime is likely busy
|
||||
// with the final flushing / uploads. This activity here has priority, and due to lack
|
||||
// of scheduling priority feature sin the tokio scheduler, using a separate thread is
|
||||
// an effective priority booster.
|
||||
let walredo_extraordinary_shutdown_thread_span = {
|
||||
let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
|
||||
span.follows_from(tracing::Span::current());
|
||||
span
|
||||
};
|
||||
let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
|
||||
let walredo_extraordinary_shutdown_thread = std::thread::spawn({
|
||||
let walredo_extraordinary_shutdown_thread_cancel =
|
||||
walredo_extraordinary_shutdown_thread_cancel.clone();
|
||||
move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let _entered = rt.enter();
|
||||
let _entered = walredo_extraordinary_shutdown_thread_span.enter();
|
||||
if let Ok(()) = rt.block_on(tokio::time::timeout(
|
||||
Duration::from_secs(8),
|
||||
walredo_extraordinary_shutdown_thread_cancel.cancelled(),
|
||||
)) {
|
||||
info!("cancellation requested");
|
||||
return;
|
||||
}
|
||||
let managers = tenant::WALREDO_MANAGERS
|
||||
.lock()
|
||||
.unwrap()
|
||||
// prevents new walredo managers from being inserted
|
||||
.take()
|
||||
.expect("only we take()");
|
||||
// Use FuturesUnordered to get in queue early for each manager's
|
||||
// heavier_once_cell semaphore wait list.
|
||||
// Also, for idle tenants that for some reason haven't
|
||||
// shut down yet, it's quite likely that we're not going
|
||||
// to get Poll::Pending once.
|
||||
let mut futs: FuturesUnordered<_> = managers
|
||||
.into_iter()
|
||||
.filter_map(|(_, mgr)| mgr.upgrade())
|
||||
.map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
|
||||
.collect();
|
||||
info!(count=%futs.len(), "built FuturesUnordered");
|
||||
let mut last_log_at = std::time::Instant::now();
|
||||
#[derive(Debug, Default)]
|
||||
struct Results {
|
||||
initiated: u64,
|
||||
already: u64,
|
||||
}
|
||||
let mut results = Results::default();
|
||||
while let Some(we_initiated) = rt.block_on(futs.next()) {
|
||||
if we_initiated {
|
||||
results.initiated += 1;
|
||||
} else {
|
||||
results.already += 1;
|
||||
}
|
||||
if last_log_at.elapsed() > Duration::from_millis(100) {
|
||||
info!(remaining=%futs.len(), ?results, "progress");
|
||||
last_log_at = std::time::Instant::now();
|
||||
}
|
||||
}
|
||||
info!(?results, "done");
|
||||
}
|
||||
});
|
||||
|
||||
// Shut down the libpq endpoint task. This prevents new connections from
|
||||
// being accepted.
|
||||
let remaining_connections = timed(
|
||||
@@ -160,6 +235,12 @@ pub async fn shutdown_pageserver(
|
||||
Duration::from_secs(1),
|
||||
)
|
||||
.await;
|
||||
|
||||
info!("cancel & join walredo_extraordinary_shutdown_thread");
|
||||
walredo_extraordinary_shutdown_thread_cancel.cancel();
|
||||
walredo_extraordinary_shutdown_thread.join().unwrap();
|
||||
info!("walredo_extraordinary_shutdown_thread done");
|
||||
|
||||
info!("Shut down successfully completed");
|
||||
std::process::exit(exit_code);
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::sync::Weak;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::io::BufReader;
|
||||
@@ -312,14 +313,66 @@ impl std::fmt::Debug for Tenant {
|
||||
}
|
||||
|
||||
pub(crate) enum WalRedoManager {
|
||||
Prod(PostgresRedoManager),
|
||||
Prod(WalredoManagerId, PostgresRedoManager),
|
||||
#[cfg(test)]
|
||||
Test(harness::TestRedoManager),
|
||||
}
|
||||
|
||||
impl From<PostgresRedoManager> for WalRedoManager {
|
||||
fn from(mgr: PostgresRedoManager) -> Self {
|
||||
Self::Prod(mgr)
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error("pageserver is shutting down")]
|
||||
pub(crate) struct GlobalShutDown;
|
||||
|
||||
impl WalRedoManager {
|
||||
pub(crate) fn new(mgr: PostgresRedoManager) -> Result<Arc<Self>, GlobalShutDown> {
|
||||
let id = WalredoManagerId::next();
|
||||
let arc = Arc::new(Self::Prod(id, mgr));
|
||||
let mut guard = WALREDO_MANAGERS.lock().unwrap();
|
||||
match &mut *guard {
|
||||
Some(map) => {
|
||||
map.insert(id, Arc::downgrade(&arc));
|
||||
Ok(arc)
|
||||
}
|
||||
None => Err(GlobalShutDown),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalRedoManager {
|
||||
fn drop(&mut self) {
|
||||
match self {
|
||||
Self::Prod(id, _) => {
|
||||
let mut guard = WALREDO_MANAGERS.lock().unwrap();
|
||||
if let Some(map) = &mut *guard {
|
||||
map.remove(id).expect("new() registers, drop() unregisters");
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
Self::Test(_) => {
|
||||
// Not applicable to test redo manager
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Global registry of all walredo managers so that [`crate::shutdown_pageserver`] can shut down
|
||||
/// the walredo processes outside of the regular order.
|
||||
///
|
||||
/// This is necessary to work around a systemd bug where it freezes if there are
|
||||
/// walredo processes left => <https://github.com/neondatabase/cloud/issues/11387>
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub(crate) static WALREDO_MANAGERS: once_cell::sync::Lazy<
|
||||
Mutex<Option<HashMap<WalredoManagerId, Weak<WalRedoManager>>>>,
|
||||
> = once_cell::sync::Lazy::new(|| Mutex::new(Some(HashMap::new())));
|
||||
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
|
||||
pub(crate) struct WalredoManagerId(u64);
|
||||
impl WalredoManagerId {
|
||||
pub fn next() -> Self {
|
||||
static NEXT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
|
||||
let id = NEXT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
if id == 0 {
|
||||
panic!("WalredoManagerId::new() returned 0, indicating wraparound, risking it's no longer unique");
|
||||
}
|
||||
Self(id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,19 +384,20 @@ impl From<harness::TestRedoManager> for WalRedoManager {
|
||||
}
|
||||
|
||||
impl WalRedoManager {
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
pub(crate) async fn shutdown(&self) -> bool {
|
||||
match self {
|
||||
Self::Prod(mgr) => mgr.shutdown().await,
|
||||
Self::Prod(_, mgr) => mgr.shutdown().await,
|
||||
#[cfg(test)]
|
||||
Self::Test(_) => {
|
||||
// Not applicable to test redo manager
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
match self {
|
||||
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
|
||||
Self::Prod(_, mgr) => mgr.maybe_quiesce(idle_timeout),
|
||||
#[cfg(test)]
|
||||
Self::Test(_) => {
|
||||
// Not applicable to test redo manager
|
||||
@@ -363,7 +417,7 @@ impl WalRedoManager {
|
||||
pg_version: u32,
|
||||
) -> Result<bytes::Bytes, walredo::Error> {
|
||||
match self {
|
||||
Self::Prod(mgr) => {
|
||||
Self::Prod(_, mgr) => {
|
||||
mgr.request_redo(key, lsn, base_img, records, pg_version)
|
||||
.await
|
||||
}
|
||||
@@ -377,7 +431,7 @@ impl WalRedoManager {
|
||||
|
||||
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
|
||||
match self {
|
||||
WalRedoManager::Prod(m) => Some(m.status()),
|
||||
WalRedoManager::Prod(_, m) => Some(m.status()),
|
||||
#[cfg(test)]
|
||||
WalRedoManager::Test(_) => None,
|
||||
}
|
||||
@@ -677,11 +731,9 @@ impl Tenant {
|
||||
init_order: Option<InitializationOrder>,
|
||||
mode: SpawnMode,
|
||||
ctx: &RequestContext,
|
||||
) -> Arc<Tenant> {
|
||||
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
)));
|
||||
) -> Result<Arc<Tenant>, GlobalShutDown> {
|
||||
let wal_redo_manager =
|
||||
WalRedoManager::new(PostgresRedoManager::new(conf, tenant_shard_id))?;
|
||||
|
||||
let TenantSharedResources {
|
||||
broker_client,
|
||||
@@ -880,7 +932,7 @@ impl Tenant {
|
||||
}
|
||||
.instrument(tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation)),
|
||||
);
|
||||
tenant
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
|
||||
@@ -55,7 +55,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use super::remote_timeline_client::remote_tenant_path;
|
||||
use super::secondary::SecondaryTenant;
|
||||
use super::timeline::detach_ancestor::PreparedTimelineDetach;
|
||||
use super::TenantSharedResources;
|
||||
use super::{GlobalShutDown, TenantSharedResources};
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
/// - `Attached`: has a full Tenant object, is elegible to service
|
||||
@@ -665,17 +665,20 @@ pub async fn init_tenant_mgr(
|
||||
let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
|
||||
let shard_identity = location_conf.shard;
|
||||
let slot = match location_conf.mode {
|
||||
LocationMode::Attached(attached_conf) => TenantSlot::Attached(tenant_spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&tenant_dir_path,
|
||||
resources.clone(),
|
||||
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
|
||||
shard_identity,
|
||||
Some(init_order.clone()),
|
||||
SpawnMode::Lazy,
|
||||
&ctx,
|
||||
)),
|
||||
LocationMode::Attached(attached_conf) => TenantSlot::Attached(
|
||||
tenant_spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&tenant_dir_path,
|
||||
resources.clone(),
|
||||
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
|
||||
shard_identity,
|
||||
Some(init_order.clone()),
|
||||
SpawnMode::Lazy,
|
||||
&ctx,
|
||||
)
|
||||
.expect("global shutdown during init_tenant_mgr cannot happen"),
|
||||
),
|
||||
LocationMode::Secondary(secondary_conf) => {
|
||||
info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
@@ -723,7 +726,7 @@ fn tenant_spawn(
|
||||
init_order: Option<InitializationOrder>,
|
||||
mode: SpawnMode,
|
||||
ctx: &RequestContext,
|
||||
) -> Arc<Tenant> {
|
||||
) -> Result<Arc<Tenant>, GlobalShutDown> {
|
||||
// All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
|
||||
// path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode
|
||||
// to avoid impacting prod runtime performance.
|
||||
@@ -1190,7 +1193,10 @@ impl TenantManager {
|
||||
None,
|
||||
spawn_mode,
|
||||
ctx,
|
||||
);
|
||||
)
|
||||
.map_err(|_: GlobalShutDown| {
|
||||
UpsertLocationError::Unavailable(TenantMapError::ShuttingDown)
|
||||
})?;
|
||||
|
||||
TenantSlot::Attached(tenant)
|
||||
}
|
||||
@@ -1311,7 +1317,7 @@ impl TenantManager {
|
||||
None,
|
||||
SpawnMode::Eager,
|
||||
ctx,
|
||||
);
|
||||
)?;
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
|
||||
@@ -2045,7 +2051,7 @@ impl TenantManager {
|
||||
None,
|
||||
SpawnMode::Eager,
|
||||
ctx,
|
||||
);
|
||||
)?;
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
|
||||
|
||||
@@ -76,6 +76,7 @@ use crate::{
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::PersistentLayerDesc,
|
||||
},
|
||||
walredo,
|
||||
};
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
@@ -1000,7 +1001,10 @@ impl Timeline {
|
||||
.for_get_kind(GetKind::Singular)
|
||||
.observe(elapsed.as_secs_f64());
|
||||
|
||||
if cfg!(feature = "testing") && res.is_err() {
|
||||
if cfg!(feature = "testing")
|
||||
&& res.is_err()
|
||||
&& !matches!(res, Err(PageReconstructError::Cancelled))
|
||||
{
|
||||
// it can only be walredo issue
|
||||
use std::fmt::Write;
|
||||
|
||||
@@ -5466,20 +5470,22 @@ impl Timeline {
|
||||
} else {
|
||||
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
|
||||
};
|
||||
|
||||
let img = match self
|
||||
let res = self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
.context("timeline has no walredo manager")
|
||||
.map_err(PageReconstructError::WalRedo)?
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
.await
|
||||
.context("reconstruct a page image")
|
||||
{
|
||||
.await;
|
||||
let img = match res {
|
||||
Ok(img) => img,
|
||||
Err(e) => return Err(PageReconstructError::WalRedo(e)),
|
||||
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
|
||||
Err(walredo::Error::Other(e)) => {
|
||||
return Err(PageReconstructError::WalRedo(
|
||||
e.context("reconstruct a page image"),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(img)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,6 +241,9 @@ impl PostgresRedoManager {
|
||||
|
||||
/// Shut down the WAL redo manager.
|
||||
///
|
||||
/// Returns `true` if this call was the one that initiated shutdown.
|
||||
/// `true` may be observed by no caller if the first caller stops polling.
|
||||
///
|
||||
/// After this future completes
|
||||
/// - no redo process is running
|
||||
/// - no new redo process will be spawned
|
||||
@@ -250,22 +253,32 @@ impl PostgresRedoManager {
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub async fn shutdown(&self) {
|
||||
pub async fn shutdown(&self) -> bool {
|
||||
// prevent new processes from being spawned
|
||||
let permit = match self.redo_process.get_or_init_detached().await {
|
||||
let maybe_permit = match self.redo_process.get_or_init_detached().await {
|
||||
Ok(guard) => {
|
||||
let (proc, permit) = guard.take_and_deinit();
|
||||
drop(proc); // this just drops the Arc, its refcount may not be zero yet
|
||||
permit
|
||||
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
|
||||
None
|
||||
} else {
|
||||
let (proc, permit) = guard.take_and_deinit();
|
||||
drop(proc); // this just drops the Arc, its refcount may not be zero yet
|
||||
Some(permit)
|
||||
}
|
||||
}
|
||||
Err(permit) => permit,
|
||||
Err(permit) => Some(permit),
|
||||
};
|
||||
let it_was_us = if let Some(permit) = maybe_permit {
|
||||
self.redo_process
|
||||
.set(ProcessOnceCell::ManagerShutDown, permit);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
self.redo_process
|
||||
.set(ProcessOnceCell::ManagerShutDown, permit);
|
||||
// wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
|
||||
// we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
|
||||
// for the underlying process.
|
||||
self.launched_processes.close().await;
|
||||
it_was_us
|
||||
}
|
||||
|
||||
/// This type doesn't have its own background task to check for idleness: we
|
||||
|
||||
Reference in New Issue
Block a user