mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
storage_scrubber: migrate FindGarbage to remote_storage (#8548)
Uses the newly added APIs from #8541 named `stream_tenants_generic` and `stream_objects_with_retries` and extends them with `list_objects_with_retries_generic` and `stream_tenant_timelines_generic` to migrate the `find-garbage` command of the scrubber to `GenericRemoteStorage`. Part of https://github.com/neondatabase/neon/issues/7547
This commit is contained in:
@@ -144,6 +144,7 @@ impl RemotePath {
|
||||
///
|
||||
/// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
|
||||
/// NoDelimiter mode will only populate `keys`.
|
||||
#[derive(Copy, Clone)]
|
||||
pub enum ListingMode {
|
||||
WithDelimiter,
|
||||
NoDelimiter,
|
||||
|
||||
@@ -19,8 +19,8 @@ use utils::id::TenantId;
|
||||
|
||||
use crate::{
|
||||
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
|
||||
init_remote, init_remote_generic, list_objects_with_retries,
|
||||
metadata_stream::{stream_tenant_timelines, stream_tenants},
|
||||
init_remote_generic, list_objects_with_retries_generic,
|
||||
metadata_stream::{stream_tenant_timelines_generic, stream_tenants_generic},
|
||||
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
|
||||
};
|
||||
|
||||
@@ -153,7 +153,7 @@ async fn find_garbage_inner(
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<GarbageList> {
|
||||
// Construct clients for S3 and for Console API
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
|
||||
let (remote_client, target) = init_remote_generic(bucket_config.clone(), node_kind).await?;
|
||||
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
|
||||
|
||||
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
|
||||
@@ -179,7 +179,7 @@ async fn find_garbage_inner(
|
||||
|
||||
// Enumerate Tenants in S3, and check if each one exists in Console
|
||||
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
|
||||
let tenants = stream_tenants(&s3_client, &target);
|
||||
let tenants = stream_tenants_generic(&remote_client, &target);
|
||||
let tenants_checked = tenants.map_ok(|t| {
|
||||
let api_client = cloud_admin_api_client.clone();
|
||||
let console_cache = console_cache.clone();
|
||||
@@ -237,25 +237,26 @@ async fn find_garbage_inner(
|
||||
// Special case: If it's missing in console, check for known bugs that would enable us to conclusively
|
||||
// identify it as purge-able anyway
|
||||
if console_result.is_none() {
|
||||
let timelines = stream_tenant_timelines(&s3_client, &target, tenant_shard_id)
|
||||
.await?
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
let timelines =
|
||||
stream_tenant_timelines_generic(&remote_client, &target, tenant_shard_id)
|
||||
.await?
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
if timelines.is_empty() {
|
||||
// No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
|
||||
let tenant_objects = list_objects_with_retries(
|
||||
&s3_client,
|
||||
let tenant_objects = list_objects_with_retries_generic(
|
||||
&remote_client,
|
||||
ListingMode::WithDelimiter,
|
||||
&target.tenant_root(&tenant_shard_id),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let object = tenant_objects.contents.as_ref().unwrap().first().unwrap();
|
||||
if object.key.as_ref().unwrap().ends_with("heatmap-v1.json") {
|
||||
let object = tenant_objects.keys.first().unwrap();
|
||||
if object.key.get_path().as_str().ends_with("heatmap-v1.json") {
|
||||
tracing::info!("Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)");
|
||||
garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
|
||||
continue;
|
||||
} else {
|
||||
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key.as_ref().unwrap());
|
||||
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key);
|
||||
}
|
||||
} else {
|
||||
// A console-unknown tenant with timelines: check if these timelines only contain initdb.tar.zst, from the initial
|
||||
@@ -264,24 +265,18 @@ async fn find_garbage_inner(
|
||||
|
||||
for timeline_r in timelines {
|
||||
let timeline = timeline_r?;
|
||||
let timeline_objects = list_objects_with_retries(
|
||||
&s3_client,
|
||||
let timeline_objects = list_objects_with_retries_generic(
|
||||
&remote_client,
|
||||
ListingMode::WithDelimiter,
|
||||
&target.timeline_root(&timeline),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
if timeline_objects
|
||||
.common_prefixes
|
||||
.as_ref()
|
||||
.map(|v| v.len())
|
||||
.unwrap_or(0)
|
||||
> 0
|
||||
{
|
||||
if !timeline_objects.prefixes.is_empty() {
|
||||
// Sub-paths? Unexpected
|
||||
any_non_initdb = true;
|
||||
} else {
|
||||
let object = timeline_objects.contents.as_ref().unwrap().first().unwrap();
|
||||
if object.key.as_ref().unwrap().ends_with("initdb.tar.zst") {
|
||||
let object = timeline_objects.keys.first().unwrap();
|
||||
if object.key.get_path().as_str().ends_with("initdb.tar.zst") {
|
||||
tracing::info!("Timeline {timeline} contains only initdb.tar.zst");
|
||||
} else {
|
||||
any_non_initdb = true;
|
||||
@@ -336,7 +331,8 @@ async fn find_garbage_inner(
|
||||
|
||||
// Construct a stream of all timelines within active tenants
|
||||
let active_tenants = tokio_stream::iter(active_tenants.iter().map(Ok));
|
||||
let timelines = active_tenants.map_ok(|t| stream_tenant_timelines(&s3_client, &target, *t));
|
||||
let timelines =
|
||||
active_tenants.map_ok(|t| stream_tenant_timelines_generic(&remote_client, &target, *t));
|
||||
let timelines = timelines.try_buffer_unordered(S3_CONCURRENCY);
|
||||
let timelines = timelines.try_flatten();
|
||||
|
||||
|
||||
@@ -427,6 +427,7 @@ async fn list_objects_with_retries(
|
||||
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
|
||||
}
|
||||
|
||||
/// Listing possibly large amounts of keys in a streaming fashion.
|
||||
fn stream_objects_with_retries<'a>(
|
||||
storage_client: &'a GenericRemoteStorage,
|
||||
listing_mode: ListingMode,
|
||||
@@ -465,6 +466,45 @@ fn stream_objects_with_retries<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
/// If you want to list a bounded amount of prefixes or keys. For larger numbers of keys/prefixes,
|
||||
/// use [`stream_objects_with_retries`] instead.
|
||||
async fn list_objects_with_retries_generic(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
listing_mode: ListingMode,
|
||||
s3_target: &S3Target,
|
||||
) -> anyhow::Result<Listing> {
|
||||
let cancel = CancellationToken::new();
|
||||
let prefix_str = &s3_target
|
||||
.prefix_in_bucket
|
||||
.strip_prefix("/")
|
||||
.unwrap_or(&s3_target.prefix_in_bucket);
|
||||
let prefix = RemotePath::from_string(prefix_str)?;
|
||||
for trial in 0..MAX_RETRIES {
|
||||
match remote_client
|
||||
.list(Some(&prefix), listing_mode, None, &cancel)
|
||||
.await
|
||||
{
|
||||
Ok(response) => return Ok(response),
|
||||
Err(e) => {
|
||||
if trial == MAX_RETRIES - 1 {
|
||||
return Err(e)
|
||||
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
|
||||
}
|
||||
error!(
|
||||
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
|
||||
s3_target.bucket_name,
|
||||
s3_target.prefix_in_bucket,
|
||||
s3_target.delimiter,
|
||||
DisplayErrorContext(e),
|
||||
);
|
||||
let backoff_time = 1 << trial.max(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
panic!("MAX_RETRIES is not allowed to be 0");
|
||||
}
|
||||
|
||||
async fn download_object_with_retries(
|
||||
s3_client: &Client,
|
||||
bucket_name: &str,
|
||||
|
||||
@@ -189,6 +189,63 @@ pub async fn stream_tenant_timelines<'a>(
|
||||
})
|
||||
}
|
||||
|
||||
/// Given a `TenantShardId`, output a stream of the timelines within that tenant, discovered
|
||||
/// using a listing. The listing is done before the stream is built, so that this
|
||||
/// function can be used to generate concurrency on a stream using buffer_unordered.
|
||||
pub async fn stream_tenant_timelines_generic<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
tenant: TenantShardId,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<TenantShardTimelineId, anyhow::Error>> + 'a> {
|
||||
let mut timeline_ids: Vec<Result<TimelineId, anyhow::Error>> = Vec::new();
|
||||
let timelines_target = target.timelines_root(&tenant);
|
||||
|
||||
let mut objects_stream = std::pin::pin!(stream_objects_with_retries(
|
||||
remote_client,
|
||||
ListingMode::WithDelimiter,
|
||||
&timelines_target
|
||||
));
|
||||
loop {
|
||||
tracing::debug!("Listing in {tenant}");
|
||||
let fetch_response = match objects_stream.next().await {
|
||||
None => break,
|
||||
Some(Err(e)) => {
|
||||
timeline_ids.push(Err(e));
|
||||
break;
|
||||
}
|
||||
Some(Ok(r)) => r,
|
||||
};
|
||||
|
||||
let new_entry_ids = fetch_response
|
||||
.prefixes
|
||||
.iter()
|
||||
.filter_map(|prefix| -> Option<&str> {
|
||||
prefix
|
||||
.get_path()
|
||||
.as_str()
|
||||
.strip_prefix(&timelines_target.prefix_in_bucket)?
|
||||
.strip_suffix('/')
|
||||
})
|
||||
.map(|entry_id_str| {
|
||||
entry_id_str
|
||||
.parse::<TimelineId>()
|
||||
.with_context(|| format!("Incorrect entry id str: {entry_id_str}"))
|
||||
});
|
||||
|
||||
for i in new_entry_ids {
|
||||
timeline_ids.push(i);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Yielding for {}", tenant);
|
||||
Ok(stream! {
|
||||
for i in timeline_ids {
|
||||
let id = i?;
|
||||
yield Ok(TenantShardTimelineId::new(tenant, id));
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn stream_listing<'a>(
|
||||
s3_client: &'a Client,
|
||||
target: &'a S3Target,
|
||||
|
||||
Reference in New Issue
Block a user