mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Resume tenant deletion on attach (#5039)
I'm still a bit nervous about attach -> crash case. But it should work. (unlike case with timeline). Ideally would be cool to cover this with test. This continues tradition of adding bool flags for Tenant::set_stopping. Probably lifecycle project will help with fixing it.
This commit is contained in:
@@ -29,6 +29,7 @@ use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
@@ -499,6 +500,7 @@ impl Tenant {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
@@ -513,7 +515,7 @@ impl Tenant {
|
||||
tenant_conf,
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
Some(remote_storage),
|
||||
Some(remote_storage.clone()),
|
||||
));
|
||||
|
||||
// Do all the hard work in the background
|
||||
@@ -528,17 +530,61 @@ impl Tenant {
|
||||
"attach tenant",
|
||||
false,
|
||||
async move {
|
||||
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
|
||||
let make_broken = |t: &Tenant, err: anyhow::Error| {
|
||||
error!("attach failed, setting tenant state to Broken: {err:?}");
|
||||
t.state.send_modify(|state| {
|
||||
assert_eq!(
|
||||
*state,
|
||||
TenantState::Attaching,
|
||||
"the attach task owns the tenant state until activation is complete"
|
||||
);
|
||||
*state = TenantState::broken_from_reason(err.to_string());
|
||||
});
|
||||
};
|
||||
|
||||
let pending_deletion = {
|
||||
match DeleteTenantFlow::should_resume_deletion(
|
||||
conf,
|
||||
Some(&remote_storage),
|
||||
&tenant_clone,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(should_resume_deletion) => should_resume_deletion,
|
||||
Err(err) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(err));
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
info!("pending_deletion {}", pending_deletion.is_some());
|
||||
|
||||
if let Some(deletion) = pending_deletion {
|
||||
match DeleteTenantFlow::resume_from_attach(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
tenants,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(err));
|
||||
return Ok(());
|
||||
}
|
||||
Ok(()) => return Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
match tenant_clone.attach(&ctx).await {
|
||||
Ok(()) => {
|
||||
info!("attach finished, activating");
|
||||
tenant_clone.activate(broker_client, None, &ctx);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("attach failed, setting tenant state to Broken: {:?}", e);
|
||||
tenant_clone.state.send_modify(|state| {
|
||||
assert_eq!(*state, TenantState::Attaching, "the attach task owns the tenant state until activation is complete");
|
||||
*state = TenantState::broken_from_reason(e.to_string());
|
||||
});
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(e));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -833,6 +879,7 @@ impl Tenant {
|
||||
"initial tenant load",
|
||||
false,
|
||||
async move {
|
||||
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
|
||||
let make_broken = |t: &Tenant, err: anyhow::Error| {
|
||||
error!("load failed, setting tenant state to Broken: {err:?}");
|
||||
t.state.send_modify(|state| {
|
||||
@@ -880,7 +927,7 @@ impl Tenant {
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_logical_size_attempt.take());
|
||||
|
||||
match DeleteTenantFlow::resume(
|
||||
match DeleteTenantFlow::resume_from_load(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
init_order.as_ref(),
|
||||
@@ -902,7 +949,7 @@ impl Tenant {
|
||||
|
||||
match tenant_clone.load(init_order.as_ref(), &ctx).await {
|
||||
Ok(()) => {
|
||||
debug!("load finished",);
|
||||
debug!("load finished");
|
||||
|
||||
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
|
||||
}
|
||||
@@ -1795,7 +1842,7 @@ impl Tenant {
|
||||
// It's mesed up.
|
||||
// we just ignore the failure to stop
|
||||
|
||||
match self.set_stopping(shutdown_progress, false).await {
|
||||
match self.set_stopping(shutdown_progress, false, false).await {
|
||||
Ok(()) => {}
|
||||
Err(SetStoppingError::Broken) => {
|
||||
// assume that this is acceptable
|
||||
@@ -1837,15 +1884,18 @@ impl Tenant {
|
||||
/// This function is not cancel-safe!
|
||||
///
|
||||
/// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
|
||||
/// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
|
||||
async fn set_stopping(
|
||||
&self,
|
||||
progress: completion::Barrier,
|
||||
allow_transition_from_loading: bool,
|
||||
allow_transition_from_attaching: bool,
|
||||
) -> Result<(), SetStoppingError> {
|
||||
let mut rx = self.state.subscribe();
|
||||
|
||||
// cannot stop before we're done activating, so wait out until we're done activating
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Attaching if allow_transition_from_attaching => true,
|
||||
TenantState::Activating(_) | TenantState::Attaching => {
|
||||
info!(
|
||||
"waiting for {} to turn Active|Broken|Stopping",
|
||||
@@ -1862,12 +1912,19 @@ impl Tenant {
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
|
||||
let mut err = None;
|
||||
let stopping = self.state.send_if_modified(|current_state| match current_state {
|
||||
TenantState::Activating(_) | TenantState::Attaching => {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
TenantState::Activating(_) => {
|
||||
unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
|
||||
}
|
||||
TenantState::Attaching => {
|
||||
if !allow_transition_from_attaching {
|
||||
unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
|
||||
};
|
||||
*current_state = TenantState::Stopping { progress };
|
||||
true
|
||||
}
|
||||
TenantState::Loading => {
|
||||
if !allow_transition_from_loading {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
|
||||
};
|
||||
*current_state = TenantState::Stopping { progress };
|
||||
true
|
||||
@@ -1943,7 +2000,8 @@ impl Tenant {
|
||||
self.set_broken_no_wait(reason)
|
||||
}
|
||||
|
||||
pub(crate) fn set_broken_no_wait(&self, reason: String) {
|
||||
pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
|
||||
let reason = reason.to_string();
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
|
||||
@@ -275,8 +275,9 @@ pub(crate) async fn remote_delete_mark_exists(
|
||||
/// It is resumable from any step in case a crash/restart occurs.
|
||||
/// There are three entrypoints to the process:
|
||||
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
|
||||
/// 2. [`DeleteTenantFlow::resume`] is called during restarts when local or remote deletion marks are still there.
|
||||
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
|
||||
/// 2. [`DeleteTenantFlow::resume_from_load`] is called during restarts when local or remote deletion marks are still there.
|
||||
/// 3. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
|
||||
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
|
||||
#[derive(Default)]
|
||||
pub enum DeleteTenantFlow {
|
||||
#[default]
|
||||
@@ -403,7 +404,7 @@ impl DeleteTenantFlow {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn resume(
|
||||
pub(crate) async fn resume_from_load(
|
||||
guard: DeletionGuard,
|
||||
tenant: &Arc<Tenant>,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
@@ -413,7 +414,7 @@ impl DeleteTenantFlow {
|
||||
let (_, progress) = completion::channel();
|
||||
|
||||
tenant
|
||||
.set_stopping(progress, true)
|
||||
.set_stopping(progress, true, false)
|
||||
.await
|
||||
.expect("cant be stopping or broken");
|
||||
|
||||
@@ -441,6 +442,31 @@ impl DeleteTenantFlow {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn resume_from_attach(
|
||||
guard: DeletionGuard,
|
||||
tenant: &Arc<Tenant>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let (_, progress) = completion::channel();
|
||||
|
||||
tenant
|
||||
.set_stopping(progress, false, true)
|
||||
.await
|
||||
.expect("cant be stopping or broken");
|
||||
|
||||
tenant.attach(ctx).await.context("attach")?;
|
||||
|
||||
Self::background(
|
||||
guard,
|
||||
tenant.conf,
|
||||
tenant.remote_storage.clone(),
|
||||
tenants,
|
||||
tenant,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn prepare(
|
||||
tenants: &tokio::sync::RwLock<TenantsMap>,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::{remote_delete_mark_exists, DeleteTenantError};
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
|
||||
/// The tenants known to the pageserver.
|
||||
@@ -201,7 +201,8 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
|
||||
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
|
||||
if let Some(remote_storage) = remote_storage {
|
||||
match Tenant::spawn_attach(conf, tenant_id, broker_client, remote_storage, ctx) {
|
||||
match Tenant::spawn_attach(conf, tenant_id, broker_client, tenants, remote_storage, ctx)
|
||||
{
|
||||
Ok(tenant) => tenant,
|
||||
Err(e) => {
|
||||
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
|
||||
@@ -591,12 +592,6 @@ pub async fn attach_tenant(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
// Temporary solution, proper one would be to resume deletion, but that needs more plumbing around Tenant::load/Tenant::attach
|
||||
// Corresponding issue https://github.com/neondatabase/neon/issues/5006
|
||||
if remote_delete_mark_exists(conf, &tenant_id, &remote_storage).await? {
|
||||
return Err(anyhow::anyhow!("Tenant is marked as deleted on remote storage").into());
|
||||
}
|
||||
|
||||
tenant_map_insert(tenant_id, || {
|
||||
let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach)?;
|
||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||
|
||||
@@ -315,4 +315,4 @@ MANY_SMALL_LAYERS_TENANT_CONFIG = {
|
||||
|
||||
|
||||
def poll_for_remote_storage_iterations(remote_storage_kind: RemoteStorageKind) -> int:
|
||||
return 40 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 10
|
||||
return 40 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 15
|
||||
|
||||
@@ -292,9 +292,8 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
)
|
||||
|
||||
|
||||
# TODO resume deletion (https://github.com/neondatabase/neon/issues/5006)
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
def test_deleted_tenant_ignored_on_attach(
|
||||
def test_tenant_delete_is_resumed_on_attach(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_bin: PgBin,
|
||||
@@ -336,6 +335,8 @@ def test_deleted_tenant_ignored_on_attach(
|
||||
(
|
||||
# allow errors caused by failpoints
|
||||
f".*failpoint: {failpoint}",
|
||||
# From deletion polling
|
||||
f".*NotFound: tenant {env.initial_tenant}.*",
|
||||
# It appears when we stopped flush loop during deletion (attempt) and then pageserver is stopped
|
||||
".*freeze_and_flush_on_shutdown.*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
|
||||
# error from http response is also logged
|
||||
@@ -381,20 +382,17 @@ def test_deleted_tenant_ignored_on_attach(
|
||||
env.pageserver.start()
|
||||
|
||||
# now we call attach
|
||||
with pytest.raises(
|
||||
PageserverApiException, match="Tenant is marked as deleted on remote storage"
|
||||
):
|
||||
ps_http.tenant_attach(tenant_id=tenant_id)
|
||||
ps_http.tenant_attach(tenant_id=tenant_id)
|
||||
|
||||
# delete should be resumed (not yet)
|
||||
# wait_tenant_status_404(ps_http, tenant_id, iterations)
|
||||
# delete should be resumed
|
||||
wait_tenant_status_404(ps_http, tenant_id, iterations)
|
||||
|
||||
# we shouldn've created tenant dir on disk
|
||||
tenant_path = env.tenant_dir(tenant_id=tenant_id)
|
||||
assert not tenant_path.exists()
|
||||
|
||||
if remote_storage_kind in available_s3_storages():
|
||||
assert_prefix_not_empty(
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
|
||||
Reference in New Issue
Block a user