don't hold TENANTS lock while waiting for set_stopping()

This commit is contained in:
Christian Schwarz
2023-05-26 17:45:32 +02:00
parent 13d3f4c29f
commit e7c4ef9f4f

View File

@@ -10,6 +10,7 @@ use tokio::fs;
use anyhow::Context;
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tracing::*;
use remote_storage::GenericRemoteStorage;
@@ -246,11 +247,56 @@ pub async fn shutdown_all_tenants() {
}
};
// Set tenant (and its timlines) to Stoppping state.
// Since we can only transition into Stopping state after activation is complete,
// run it in a JoinSet so all tenants have a chance to stop before we git SIGKILLed.
//
// Transitioning tenants to Stopping state has a couple of non-obvious side effects:
// 1. Lock out any new requests to the tenants.
// 2. Signal cancellation to WAL receivers (we wait on it below).
// 3. Signal cancellation for othher tenant background loops.
// 4. ???
//
// The waiting for the cancellation is not done uniformly.
// We certainly wait for WAL receivers to shut down.
// That is necessary so that no new data comes in before the freeze_and_flush.
// But the tenant background loops are joined-on in our caller.
// It's mesed up.
let mut join_set = JoinSet::new();
let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len());
for (_, tenant) in tenants_to_shut_down {
// updates tenant state, forbidding new GC and compaction iterations from starting
let _ = tenant.set_stopping().await; // TODO handle error
tenants_to_freeze_and_flush.push(tenant);
join_set.spawn(async move {
match tenant.set_stopping().await {
Ok(()) => Ok(tenant),
Err(e) => Err((tenant, e)),
}
});
}
while let Some(res) = join_set.join_next().await {
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures");
}
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("task that calls set_stopping() panicked, don't know which tenant this is, and probably freeze_and_flush won't work anyways: {join_error:#}");
}
Ok(retval) => match retval {
Ok(tenant) => {
// success
debug!("tenant successfully stopped: {}", tenant.tenant_id);
tenants_to_freeze_and_flush.push(tenant);
}
// our task_mgr::shutdown_tasks are going to coalesce on that just fine
Err((tenant, SetStoppingError::AlreadyStopping)) => {
tenants_to_freeze_and_flush.push(tenant);
}
Err((tenant, SetStoppingError::Broken)) => {
warn!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well: {}", tenant.tenant_id);
tenants_to_freeze_and_flush.push(tenant);
}
},
}
}
// Shut down all existing walreceiver connections and stop accepting the new ones.
@@ -266,8 +312,9 @@ pub async fn shutdown_all_tenants() {
// On error, log it but continue with the shutdown for other tenants.
for tenant in tenants_to_freeze_and_flush {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
debug!("freeze_and_flush tenant {tenant_id}");
// TODO this could probably run in a JoinSet as well?
if let Err(err) = tenant.freeze_and_flush().await {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
@@ -590,6 +637,9 @@ where
let tenants_accessor = TENANTS.write().await;
match tenants_accessor.get(&tenant_id) {
Some(tenant) => {
let tenant = Arc::clone(tenant);
// don't hold TENANTS lock while set_stopping waits for activation to finish
drop(tenants_accessor);
match tenant.set_stopping().await {
Ok(()) => {
// we won, continue stopping procedure