mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-05 21:20:37 +00:00
pageserver: re-work remote deletion markers
- Store them under timelines/ - Read them as part of timeline listing, rather than with a separate GET.
This commit is contained in:
@@ -190,6 +190,11 @@ struct TimelinePreload {
|
||||
index_part: Result<MaybeDeletedIndexPart, DownloadError>,
|
||||
}
|
||||
|
||||
pub(crate) struct TenantPreload {
|
||||
deleting: bool,
|
||||
timelines: HashMap<TimelineId, TimelinePreload>,
|
||||
}
|
||||
|
||||
///
|
||||
/// Tenant consists of multiple timelines. Keep them in a hash table.
|
||||
///
|
||||
@@ -653,10 +658,31 @@ impl Tenant {
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_tenant_load_remote.take());
|
||||
|
||||
let preload = match &remote_storage {
|
||||
Some(remote_storage) => Some(
|
||||
match tenant_clone
|
||||
.preload(remote_storage)
|
||||
.instrument(
|
||||
tracing::info_span!(parent: None, "attach_preload", tenant_id=%tenant_id),
|
||||
)
|
||||
.await {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(e));
|
||||
return Ok(());
|
||||
}
|
||||
},
|
||||
),
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Remote preload is complete.
|
||||
drop(remote_load_completion);
|
||||
|
||||
let pending_deletion = {
|
||||
match DeleteTenantFlow::should_resume_deletion(
|
||||
conf,
|
||||
remote_storage.as_ref(),
|
||||
preload.as_ref().map(|p| p.deleting).unwrap_or(false),
|
||||
&tenant_clone,
|
||||
)
|
||||
.await
|
||||
@@ -675,7 +701,6 @@ impl Tenant {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
drop(remote_load_completion);
|
||||
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
|
||||
let _ = init_order
|
||||
.as_mut()
|
||||
@@ -691,6 +716,7 @@ impl Tenant {
|
||||
match DeleteTenantFlow::resume_from_attach(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
preload,
|
||||
tenants,
|
||||
init_order,
|
||||
&ctx,
|
||||
@@ -705,7 +731,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
match tenant_clone.attach(init_order, &ctx).await {
|
||||
match tenant_clone.attach(init_order, preload, &ctx).await {
|
||||
Ok(()) => {
|
||||
info!("attach finished, activating");
|
||||
tenant_clone.activate(broker_client, None, &ctx);
|
||||
@@ -725,6 +751,30 @@ impl Tenant {
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
pub(crate) async fn preload(
|
||||
self: &Arc<Tenant>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> anyhow::Result<TenantPreload> {
|
||||
// Get list of remote timelines
|
||||
// download index files for every tenant timeline
|
||||
info!("listing remote timelines");
|
||||
let (remote_timeline_ids, other_keys) =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
|
||||
let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
|
||||
for k in other_keys {
|
||||
info!("found non timeline key {k}");
|
||||
}
|
||||
|
||||
Ok(TenantPreload {
|
||||
deleting,
|
||||
timelines: self
|
||||
.load_timeline_metadata(remote_timeline_ids, remote_storage)
|
||||
.await?,
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
@@ -733,34 +783,21 @@ impl Tenant {
|
||||
async fn attach(
|
||||
self: &Arc<Tenant>,
|
||||
mut init_order: Option<InitializationOrder>,
|
||||
preload: Option<TenantPreload>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
crate::failpoint_support::sleep_millis_async!("before-attaching-tenant");
|
||||
|
||||
let remote_storage = match &self.remote_storage {
|
||||
Some(rs) => rs,
|
||||
let preload = match preload {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_tenant_load_remote.take());
|
||||
|
||||
// Deprecated dev mode: load from local disk state instead of remote storage
|
||||
return self.load_local(init_order, ctx).await;
|
||||
}
|
||||
};
|
||||
|
||||
// Get list of remote timelines
|
||||
// download index files for every tenant timeline
|
||||
info!("listing remote timelines");
|
||||
let remote_timeline_ids =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
let preload = self
|
||||
.load_timeline_metadata(remote_timeline_ids, remote_storage)
|
||||
.await?;
|
||||
|
||||
// Signal that we have completed remote phase
|
||||
init_order
|
||||
.as_mut()
|
||||
@@ -772,7 +809,7 @@ impl Tenant {
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
let mut timeline_ancestors = HashMap::new();
|
||||
let mut existent_timelines = HashSet::new();
|
||||
for (timeline_id, preload) in preload {
|
||||
for (timeline_id, preload) in preload.timelines {
|
||||
// In this context a timeline "exists" if it has any content in remote storage: this will
|
||||
// be our cue to not delete any corresponding local directory
|
||||
existent_timelines.insert(timeline_id);
|
||||
@@ -3567,8 +3604,12 @@ pub(crate) mod harness {
|
||||
.await?;
|
||||
}
|
||||
LoadMode::Remote => {
|
||||
let preload = tenant
|
||||
.preload(&self.remote_storage)
|
||||
.instrument(info_span!("try_load_preload", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
tenant
|
||||
.attach(None, ctx)
|
||||
.attach(None, Some(preload), ctx)
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, instrument, warn, Instrument, Span};
|
||||
@@ -25,11 +25,9 @@ use super::{
|
||||
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
|
||||
span,
|
||||
timeline::delete::DeleteTimelineFlow,
|
||||
tree_sort_timelines, DeleteTimelineError, Tenant,
|
||||
tree_sort_timelines, DeleteTimelineError, Tenant, TenantPreload,
|
||||
};
|
||||
|
||||
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTenantError {
|
||||
#[error("GetTenant {0}")]
|
||||
@@ -60,7 +58,7 @@ fn remote_tenant_delete_mark_path(
|
||||
.context("Failed to strip workdir prefix")
|
||||
.and_then(RemotePath::new)
|
||||
.context("tenant path")?;
|
||||
Ok(tenant_remote_path.join(Utf8Path::new("deleted")))
|
||||
Ok(tenant_remote_path.join(Utf8Path::new("timelines/deleted")))
|
||||
}
|
||||
|
||||
async fn create_remote_delete_mark(
|
||||
@@ -239,32 +237,6 @@ async fn cleanup_remaining_fs_traces(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remote_delete_mark_exists(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> anyhow::Result<bool> {
|
||||
// If remote storage is there we rely on it
|
||||
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id).context("path")?;
|
||||
|
||||
let result = backoff::retry(
|
||||
|| async { remote_storage.download(&remote_mark_path).await },
|
||||
|e| matches!(e, DownloadError::NotFound),
|
||||
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
|
||||
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
|
||||
"fetch_tenant_deletion_mark",
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(_) => Ok(true),
|
||||
Err(DownloadError::NotFound) => Ok(false),
|
||||
Err(e) => Err(anyhow::anyhow!(e)).context("remote_delete_mark_exists")?,
|
||||
}
|
||||
}
|
||||
|
||||
/// Orchestrates tenant shut down of all tasks, removes its in-memory structures,
|
||||
/// and deletes its data from both disk and s3.
|
||||
/// The sequence of steps:
|
||||
@@ -377,7 +349,7 @@ impl DeleteTenantFlow {
|
||||
|
||||
pub(crate) async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
remote_mark_exists: bool,
|
||||
tenant: &Tenant,
|
||||
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
|
||||
let acquire = |t: &Tenant| {
|
||||
@@ -388,18 +360,13 @@ impl DeleteTenantFlow {
|
||||
)
|
||||
};
|
||||
|
||||
let tenant_id = tenant.tenant_id;
|
||||
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
||||
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
|
||||
if remote_mark_exists {
|
||||
return Ok(acquire(tenant));
|
||||
}
|
||||
|
||||
let remote_storage = match remote_storage {
|
||||
Some(remote_storage) => remote_storage,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
if remote_delete_mark_exists(conf, &tenant_id, remote_storage).await? {
|
||||
let tenant_id = tenant.tenant_id;
|
||||
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
||||
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
|
||||
Ok(acquire(tenant))
|
||||
} else {
|
||||
Ok(None)
|
||||
@@ -409,6 +376,7 @@ impl DeleteTenantFlow {
|
||||
pub(crate) async fn resume_from_attach(
|
||||
guard: DeletionGuard,
|
||||
tenant: &Arc<Tenant>,
|
||||
preload: Option<TenantPreload>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
init_order: Option<InitializationOrder>,
|
||||
ctx: &RequestContext,
|
||||
@@ -420,7 +388,10 @@ impl DeleteTenantFlow {
|
||||
.await
|
||||
.expect("cant be stopping or broken");
|
||||
|
||||
tenant.attach(init_order, ctx).await.context("attach")?;
|
||||
tenant
|
||||
.attach(init_order, preload, ctx)
|
||||
.await
|
||||
.context("attach")?;
|
||||
|
||||
Self::background(
|
||||
guard,
|
||||
|
||||
@@ -19,7 +19,7 @@ use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::Generation;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage};
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
@@ -170,43 +170,50 @@ pub fn is_temp_download_file(path: &Utf8Path) -> bool {
|
||||
pub async fn list_remote_timelines(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<HashSet<TimelineId>> {
|
||||
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
|
||||
let remote_path = remote_timelines_path(&tenant_id);
|
||||
|
||||
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
|
||||
anyhow::bail!("storage-sync-list-remote-timelines");
|
||||
});
|
||||
|
||||
let timelines = download_retry(
|
||||
|| storage.list_prefixes(Some(&remote_path)),
|
||||
&format!("list prefixes for {tenant_id}"),
|
||||
let listing = download_retry(
|
||||
|| storage.list(Some(&remote_path), ListingMode::WithDelimiter),
|
||||
&format!("list timelines for {tenant_id}"),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut timeline_ids = HashSet::new();
|
||||
let mut other_prefixes = HashSet::new();
|
||||
|
||||
for timeline_remote_storage_key in timelines {
|
||||
tracing::info!("list_remote_timelines prefixes:");
|
||||
for p in &listing.prefixes {
|
||||
tracing::info!(" '{p}'");
|
||||
}
|
||||
tracing::info!("list_remote_timelines keys:");
|
||||
for p in &listing.keys {
|
||||
tracing::info!(" '{p}'");
|
||||
}
|
||||
|
||||
for timeline_remote_storage_key in listing.prefixes {
|
||||
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
|
||||
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
|
||||
})?;
|
||||
|
||||
let timeline_id: TimelineId = object_name
|
||||
.parse()
|
||||
.with_context(|| format!("parse object name into timeline id '{object_name}'"))?;
|
||||
|
||||
// list_prefixes is assumed to return unique names. Ensure this here.
|
||||
// NB: it's safer to bail out than warn-log this because the pageserver
|
||||
// needs to absolutely know about _all_ timelines that exist, so that
|
||||
// GC knows all the branchpoints. If we skipped over a timeline instead,
|
||||
// GC could delete a layer that's still needed by that timeline.
|
||||
anyhow::ensure!(
|
||||
!timeline_ids.contains(&timeline_id),
|
||||
"list_prefixes contains duplicate timeline id {timeline_id}"
|
||||
);
|
||||
timeline_ids.insert(timeline_id);
|
||||
match object_name.parse::<TimelineId>() {
|
||||
Ok(t) => timeline_ids.insert(t),
|
||||
Err(_) => other_prefixes.insert(object_name.to_string()),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(timeline_ids)
|
||||
for key in listing.keys {
|
||||
let object_name = key
|
||||
.object_name()
|
||||
.ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
|
||||
other_prefixes.insert(object_name.to_string());
|
||||
}
|
||||
|
||||
Ok((timeline_ids, other_prefixes))
|
||||
}
|
||||
|
||||
async fn do_download_index_part(
|
||||
|
||||
@@ -66,10 +66,6 @@ def test_tenant_reattach(
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
# Thats because of UnreliableWrapper's injected failures
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
|
||||
)
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
with endpoint.cursor() as cur:
|
||||
@@ -116,7 +112,7 @@ def test_tenant_reattach(
|
||||
assert query_scalar(cur, "SELECT count(*) FROM t") == 100000
|
||||
|
||||
# Check that we had to retry the downloads
|
||||
assert env.pageserver.log_contains(".*list prefixes.*failed, will retry.*")
|
||||
assert env.pageserver.log_contains(".*list timelines.*failed, will retry.*")
|
||||
assert env.pageserver.log_contains(".*download.*failed, will retry.*")
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user