mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
pageserver: remove Tenant::shutdown barrier
This commit is contained in:
@@ -9,7 +9,6 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use strum_macros;
|
||||
use utils::{
|
||||
completion,
|
||||
generation::Generation,
|
||||
history_buffer::HistoryBufferWithDropCounter,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
@@ -78,12 +77,7 @@ pub enum TenantState {
|
||||
/// system is being shut down.
|
||||
///
|
||||
/// Transitions out of this state are possible through `set_broken()`.
|
||||
Stopping {
|
||||
// Because of https://github.com/serde-rs/serde/issues/2105 this has to be a named field,
|
||||
// otherwise it will not be skipped during deserialization
|
||||
#[serde(skip)]
|
||||
progress: completion::Barrier,
|
||||
},
|
||||
Stopping,
|
||||
/// The tenant is recognized by the pageserver, but can no longer be used for
|
||||
/// any operations.
|
||||
///
|
||||
@@ -993,13 +987,7 @@ mod tests {
|
||||
"Activating",
|
||||
),
|
||||
(line!(), TenantState::Active, "Active"),
|
||||
(
|
||||
line!(),
|
||||
TenantState::Stopping {
|
||||
progress: utils::completion::Barrier::default(),
|
||||
},
|
||||
"Stopping",
|
||||
),
|
||||
(line!(), TenantState::Stopping {}, "Stopping"),
|
||||
(
|
||||
line!(),
|
||||
TenantState::Broken {
|
||||
|
||||
@@ -352,14 +352,14 @@ impl Debug for DeleteTimelineError {
|
||||
}
|
||||
|
||||
pub enum SetStoppingError {
|
||||
AlreadyStopping(completion::Barrier),
|
||||
AlreadyStopping,
|
||||
Broken,
|
||||
}
|
||||
|
||||
impl Debug for SetStoppingError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
|
||||
Self::AlreadyStopping => f.debug_tuple("AlreadyStopping").finish(),
|
||||
Self::Broken => write!(f, "Broken"),
|
||||
}
|
||||
}
|
||||
@@ -415,6 +415,11 @@ enum CreateTimelineCause {
|
||||
Delete,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ShutdownError {
|
||||
AlreadyStopping,
|
||||
}
|
||||
|
||||
impl Tenant {
|
||||
/// Yet another helper for timeline initialization.
|
||||
///
|
||||
@@ -1803,16 +1808,7 @@ impl Tenant {
|
||||
/// - detach + ignore (freeze_and_flush == false)
|
||||
///
|
||||
/// This will attempt to shutdown even if tenant is broken.
|
||||
///
|
||||
/// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
|
||||
/// If the tenant is already shutting down, we return a clone of the first shutdown call's
|
||||
/// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
|
||||
/// the ongoing shutdown.
|
||||
async fn shutdown(
|
||||
&self,
|
||||
shutdown_progress: completion::Barrier,
|
||||
freeze_and_flush: bool,
|
||||
) -> Result<(), completion::Barrier> {
|
||||
async fn shutdown(&self, freeze_and_flush: bool) -> Result<(), ShutdownError> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
// Set tenant (and its timlines) to Stoppping state.
|
||||
//
|
||||
@@ -1832,15 +1828,17 @@ impl Tenant {
|
||||
// It's mesed up.
|
||||
// we just ignore the failure to stop
|
||||
|
||||
match self.set_stopping(shutdown_progress, false, false).await {
|
||||
match self.set_stopping(false, false).await {
|
||||
Ok(()) => {}
|
||||
Err(SetStoppingError::Broken) => {
|
||||
// assume that this is acceptable
|
||||
}
|
||||
Err(SetStoppingError::AlreadyStopping(other)) => {
|
||||
// give caller the option to wait for this this shutdown
|
||||
info!("Tenant::shutdown: AlreadyStopping");
|
||||
return Err(other);
|
||||
Err(SetStoppingError::AlreadyStopping) => {
|
||||
// This should not happen: individual tenant shutdowns are guarded by
|
||||
// `[TenantSlot::InProgress]`, and when we shutdown all tenants during
|
||||
// process shutdown, we just wait for those InProgress ones to finish.
|
||||
error!("Called Tenant::shutdown while already stopping");
|
||||
return Err(ShutdownError::AlreadyStopping);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1881,7 +1879,6 @@ impl Tenant {
|
||||
/// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
|
||||
async fn set_stopping(
|
||||
&self,
|
||||
progress: completion::Barrier,
|
||||
allow_transition_from_loading: bool,
|
||||
allow_transition_from_attaching: bool,
|
||||
) -> Result<(), SetStoppingError> {
|
||||
@@ -1898,7 +1895,7 @@ impl Tenant {
|
||||
false
|
||||
}
|
||||
TenantState::Loading => allow_transition_from_loading,
|
||||
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
|
||||
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping => true,
|
||||
})
|
||||
.await
|
||||
.expect("cannot drop self.state while on a &self method");
|
||||
@@ -1913,21 +1910,21 @@ impl Tenant {
|
||||
if !allow_transition_from_attaching {
|
||||
unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
|
||||
};
|
||||
*current_state = TenantState::Stopping { progress };
|
||||
*current_state = TenantState::Stopping;
|
||||
true
|
||||
}
|
||||
TenantState::Loading => {
|
||||
if !allow_transition_from_loading {
|
||||
unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
|
||||
};
|
||||
*current_state = TenantState::Stopping { progress };
|
||||
*current_state = TenantState::Stopping;
|
||||
true
|
||||
}
|
||||
TenantState::Active => {
|
||||
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
|
||||
// are created after the transition to Stopping. That's harmless, as the Timelines
|
||||
// won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
|
||||
*current_state = TenantState::Stopping { progress };
|
||||
*current_state = TenantState::Stopping;
|
||||
// Continue stopping outside the closure. We need to grab timelines.lock()
|
||||
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
|
||||
true
|
||||
@@ -1939,9 +1936,9 @@ impl Tenant {
|
||||
err = Some(SetStoppingError::Broken);
|
||||
false
|
||||
}
|
||||
TenantState::Stopping { progress } => {
|
||||
TenantState::Stopping => {
|
||||
info!("Tenant is already in Stopping state");
|
||||
err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
|
||||
err = Some(SetStoppingError::AlreadyStopping);
|
||||
false
|
||||
}
|
||||
});
|
||||
@@ -4128,7 +4125,7 @@ mod tests {
|
||||
make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
|
||||
// so that all uploads finish & we can call harness.load() below again
|
||||
tenant
|
||||
.shutdown(Default::default(), true)
|
||||
.shutdown(true)
|
||||
.instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
|
||||
.await
|
||||
.ok()
|
||||
@@ -4169,7 +4166,7 @@ mod tests {
|
||||
|
||||
// so that all uploads finish & we can call harness.load() below again
|
||||
tenant
|
||||
.shutdown(Default::default(), true)
|
||||
.shutdown(true)
|
||||
.instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
|
||||
.await
|
||||
.ok()
|
||||
@@ -4231,7 +4228,7 @@ mod tests {
|
||||
drop(tline);
|
||||
// so that all uploads finish & we can call harness.try_load() below again
|
||||
tenant
|
||||
.shutdown(Default::default(), true)
|
||||
.shutdown(true)
|
||||
.instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
|
||||
.await
|
||||
.ok()
|
||||
|
||||
@@ -9,7 +9,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, instrument, warn, Instrument, Span};
|
||||
|
||||
use utils::{
|
||||
backoff, completion, crashsafe, fs_ext,
|
||||
backoff, crashsafe, fs_ext,
|
||||
id::{TenantId, TimelineId},
|
||||
};
|
||||
|
||||
@@ -391,10 +391,8 @@ impl DeleteTenantFlow {
|
||||
init_order: Option<InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let (_, progress) = completion::channel();
|
||||
|
||||
tenant
|
||||
.set_stopping(progress, false, true)
|
||||
.set_stopping(false, true)
|
||||
.await
|
||||
.expect("cant be stopping or broken");
|
||||
|
||||
@@ -434,16 +432,13 @@ impl DeleteTenantFlow {
|
||||
Err(anyhow::anyhow!("failpoint: tenant-delete-before-shutdown"))?
|
||||
});
|
||||
|
||||
// make pageserver shutdown not to wait for our completion
|
||||
let (_, progress) = completion::channel();
|
||||
|
||||
// It would be good to only set stopping here and continue shutdown in the background, but shutdown is not idempotent.
|
||||
// i e it is an error to do:
|
||||
// tenant.set_stopping
|
||||
// tenant.shutdown
|
||||
// Its also bad that we're holding tenants.read here.
|
||||
// TODO relax set_stopping to be idempotent?
|
||||
if tenant.shutdown(progress, false).await.is_err() {
|
||||
if tenant.shutdown(false).await.is_err() {
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"tenant shutdown is already in progress"
|
||||
)));
|
||||
|
||||
@@ -27,7 +27,9 @@ use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
|
||||
use crate::tenant::{
|
||||
create_tenant_files, AttachedTenantConf, ShutdownError, SpawnMode, Tenant, TenantState,
|
||||
};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
|
||||
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -532,8 +534,6 @@ pub(crate) async fn shutdown_all_tenants() {
|
||||
}
|
||||
|
||||
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
use utils::completion;
|
||||
|
||||
// Under write lock (prevent any new tenants being created), extract the list
|
||||
// of tenants to shut down.
|
||||
let (in_progress_ops, tenants_to_shut_down) = {
|
||||
@@ -597,14 +597,15 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
async move {
|
||||
let freeze_and_flush = true;
|
||||
|
||||
let res = {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
tenant.shutdown(shutdown_progress, freeze_and_flush).await
|
||||
};
|
||||
|
||||
if let Err(other_progress) = res {
|
||||
// join the another shutdown in progress
|
||||
other_progress.wait().await;
|
||||
let res = { tenant.shutdown(freeze_and_flush).await };
|
||||
if let Err(e) = res {
|
||||
match e {
|
||||
ShutdownError::AlreadyStopping => {
|
||||
// TODO: to ensure this can _never_ happen, we need to get rid of
|
||||
// the horrible DeleteTenantFlow::should_resume_deletion
|
||||
tracing::warn!(%tenant_id, "Tenant already stopping during shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// we cannot afford per tenant logging here, because if s3 is degraded, we are
|
||||
@@ -785,8 +786,6 @@ pub(crate) async fn upsert_location(
|
||||
// for Attached->Attached transitions in the same generation. By this point,
|
||||
// if we see an attached tenant we know it will be discarded and should be
|
||||
// shut down.
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
@@ -799,11 +798,11 @@ pub(crate) async fn upsert_location(
|
||||
};
|
||||
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
match tenant.shutdown(false).await {
|
||||
Ok(()) => {}
|
||||
Err(barrier) => {
|
||||
info!("Shutdown already in progress, waiting for it to complete");
|
||||
barrier.wait().await;
|
||||
Err(ShutdownError::AlreadyStopping) => {
|
||||
// This shouldn't have happened, we are guarded by TenantSlot::InProgress
|
||||
warn!("Shutdown unexpectedly already in progress");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1472,8 +1471,6 @@ async fn remove_tenant_from_memory<V, F>(
|
||||
where
|
||||
F: std::future::Future<Output = anyhow::Result<V>>,
|
||||
{
|
||||
use utils::completion;
|
||||
|
||||
let mut tenant_guard = tenant_map_acquire_slot_impl(&tenant_id, tenants, Some(true))?;
|
||||
let tenant_slot = tenant_guard.take_value();
|
||||
|
||||
@@ -1484,9 +1481,6 @@ where
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// allow pageserver shutdown to await for our completion
|
||||
let (_guard, progress) = completion::channel();
|
||||
|
||||
// If the tenant was attached, shut it down gracefully. For secondary
|
||||
// locations this part is not necessary
|
||||
match &attached_tenant {
|
||||
@@ -1496,11 +1490,12 @@ where
|
||||
|
||||
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
|
||||
// that we can continue safely to cleanup.
|
||||
match attached_tenant.shutdown(progress, freeze_and_flush).await {
|
||||
match attached_tenant.shutdown(freeze_and_flush).await {
|
||||
Ok(()) => {}
|
||||
Err(_other) => {
|
||||
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
|
||||
// wait for it but return an error right away because these are distinct requests.
|
||||
Err(ShutdownError::AlreadyStopping) => {
|
||||
// This is unexpected: fail whatever operation has encountered this
|
||||
// buggy state.
|
||||
warn!(%tenant_id, "Tenant already stopping while trying to shut down");
|
||||
return Err(TenantStateError::IsStopping(tenant_id));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user