diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 07d1618272..c71eac0672 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -566,8 +566,10 @@ pub(crate) async fn shutdown_all_tenants() { async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { use utils::completion; - // Atomically, 1. extract the list of tenants to shut down and 2. prevent creation of new tenants. - let (in_progress_ops, tenants_to_shut_down) = { + let mut join_set = JoinSet::new(); + + // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants. + let (total_in_progress, total_attached) = { let mut m = tenants.write().unwrap(); match &mut *m { TenantsMap::Initializing => { @@ -577,78 +579,67 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { } TenantsMap::Open(tenants) => { let mut shutdown_state = HashMap::new(); - let mut in_progress_ops = Vec::new(); - let mut tenants_to_shut_down = Vec::new(); + let mut total_in_progress = 0; + let mut total_attached = 0; - for (k, v) in tenants.drain() { + for (tenant_id, v) in tenants.drain() { match v { TenantSlot::Attached(t) => { - tenants_to_shut_down.push(t.clone()); - shutdown_state.insert(k, TenantSlot::Attached(t)); + shutdown_state.insert(tenant_id, TenantSlot::Attached(t.clone())); + join_set.spawn( + async move { + let freeze_and_flush = true; + + let res = { + let (_guard, shutdown_progress) = completion::channel(); + t.shutdown(shutdown_progress, freeze_and_flush).await + }; + + if let Err(other_progress) = res { + // join the another shutdown in progress + other_progress.wait().await; + } + + // we cannot afford per tenant logging here, because if s3 is degraded, we are + // going to log too many lines + debug!("tenant successfully stopped"); + } + .instrument(info_span!("shutdown", %tenant_id)), + ); + + total_attached += 1; } TenantSlot::Secondary => { - shutdown_state.insert(k, TenantSlot::Secondary); + shutdown_state.insert(tenant_id, TenantSlot::Secondary); } TenantSlot::InProgress(notify) => { // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will // wait for their notifications to fire in this function. - in_progress_ops.push(notify); + join_set.spawn(async move { + notify.wait().await; + }); + + total_in_progress += 1; } } } *m = TenantsMap::ShuttingDown(shutdown_state); - (in_progress_ops, tenants_to_shut_down) + (total_in_progress, total_attached) } TenantsMap::ShuttingDown(_) => { - // TODO: it is possible that detach and shutdown happen at the same time. as a - // result, during shutdown we do not wait for detach. error!("already shutting down, this function isn't supposed to be called more than once"); return; } } }; + let started_at = std::time::Instant::now(); + info!( "Waiting for {} InProgress tenants and {} Attached tenants to shut down", - in_progress_ops.len(), - tenants_to_shut_down.len() + total_in_progress, total_attached ); - for barrier in in_progress_ops { - barrier.wait().await; - } - - info!( - "InProgress tenants shut down, waiting for {} Attached tenants to shut down", - tenants_to_shut_down.len() - ); - let started_at = std::time::Instant::now(); - let mut join_set = JoinSet::new(); - for tenant in tenants_to_shut_down { - let tenant_id = tenant.get_tenant_id(); - join_set.spawn( - async move { - let freeze_and_flush = true; - - let res = { - let (_guard, shutdown_progress) = completion::channel(); - tenant.shutdown(shutdown_progress, freeze_and_flush).await - }; - - if let Err(other_progress) = res { - // join the another shutdown in progress - other_progress.wait().await; - } - - // we cannot afford per tenant logging here, because if s3 is degraded, we are - // going to log too many lines - - debug!("tenant successfully stopped"); - } - .instrument(info_span!("shutdown", %tenant_id)), - ); - } - let total = join_set.len(); let mut panicked = 0; let mut buffering = true; @@ -661,7 +652,7 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { match joined { Ok(()) => {} Err(join_error) if join_error.is_cancelled() => { - unreachable!("we are not cancelling any of the futures"); + unreachable!("we are not cancelling any of the tasks"); } Err(join_error) if join_error.is_panic() => { // cannot really do anything, as this panic is likely a bug