From e7c4ef9f4f8ac3e5e5a383839ba0b365e4e6039b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 26 May 2023 17:45:32 +0200 Subject: [PATCH] don't hold TENANTS lock while waiting for set_stopping() --- pageserver/src/tenant/mgr.rs | 58 +++++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 185b5eb3c8..6778e13911 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -10,6 +10,7 @@ use tokio::fs; use anyhow::Context; use once_cell::sync::Lazy; use tokio::sync::RwLock; +use tokio::task::JoinSet; use tracing::*; use remote_storage::GenericRemoteStorage; @@ -246,11 +247,56 @@ pub async fn shutdown_all_tenants() { } }; + // Set tenant (and its timlines) to Stoppping state. + // Since we can only transition into Stopping state after activation is complete, + // run it in a JoinSet so all tenants have a chance to stop before we git SIGKILLed. + // + // Transitioning tenants to Stopping state has a couple of non-obvious side effects: + // 1. Lock out any new requests to the tenants. + // 2. Signal cancellation to WAL receivers (we wait on it below). + // 3. Signal cancellation for othher tenant background loops. + // 4. ??? + // + // The waiting for the cancellation is not done uniformly. + // We certainly wait for WAL receivers to shut down. + // That is necessary so that no new data comes in before the freeze_and_flush. + // But the tenant background loops are joined-on in our caller. + // It's mesed up. + let mut join_set = JoinSet::new(); let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len()); for (_, tenant) in tenants_to_shut_down { - // updates tenant state, forbidding new GC and compaction iterations from starting - let _ = tenant.set_stopping().await; // TODO handle error - tenants_to_freeze_and_flush.push(tenant); + join_set.spawn(async move { + match tenant.set_stopping().await { + Ok(()) => Ok(tenant), + Err(e) => Err((tenant, e)), + } + }); + } + while let Some(res) = join_set.join_next().await { + match res { + Err(join_error) if join_error.is_cancelled() => { + unreachable!("we are not cancelling any of the futures"); + } + Err(join_error) => { + // cannot really do anything, as this panic is likely a bug + error!("task that calls set_stopping() panicked, don't know which tenant this is, and probably freeze_and_flush won't work anyways: {join_error:#}"); + } + Ok(retval) => match retval { + Ok(tenant) => { + // success + debug!("tenant successfully stopped: {}", tenant.tenant_id); + tenants_to_freeze_and_flush.push(tenant); + } + // our task_mgr::shutdown_tasks are going to coalesce on that just fine + Err((tenant, SetStoppingError::AlreadyStopping)) => { + tenants_to_freeze_and_flush.push(tenant); + } + Err((tenant, SetStoppingError::Broken)) => { + warn!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well: {}", tenant.tenant_id); + tenants_to_freeze_and_flush.push(tenant); + } + }, + } } // Shut down all existing walreceiver connections and stop accepting the new ones. @@ -266,8 +312,9 @@ pub async fn shutdown_all_tenants() { // On error, log it but continue with the shutdown for other tenants. for tenant in tenants_to_freeze_and_flush { let tenant_id = tenant.tenant_id(); - debug!("shutdown tenant {tenant_id}"); + debug!("freeze_and_flush tenant {tenant_id}"); + // TODO this could probably run in a JoinSet as well? if let Err(err) = tenant.freeze_and_flush().await { error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}"); } @@ -590,6 +637,9 @@ where let tenants_accessor = TENANTS.write().await; match tenants_accessor.get(&tenant_id) { Some(tenant) => { + let tenant = Arc::clone(tenant); + // don't hold TENANTS lock while set_stopping waits for activation to finish + drop(tenants_accessor); match tenant.set_stopping().await { Ok(()) => { // we won, continue stopping procedure