mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
scrubber: allow restricting find_garbage to a partial tenant id prefix (#9814)
Adds support to the `find_garbage` command to restrict itself to a partial tenant ID prefix, say `a`, and then it only traverses tenants with IDs starting with `a`. One can now pass the `--tenant-id-prefix` parameter. That way, one can shard the `find_garbage` command and make it run in parallel. The PR also does a change of how `remote_storage` first removes trailing `/`s, only to then add them in the listing function. It turns out that this isn't neccessary and it prevents the prefix functionality from working. S3 doesn't do this either.
This commit is contained in:
@@ -21,7 +21,7 @@ use utils::{backoff, id::TenantId};
|
||||
use crate::{
|
||||
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
|
||||
init_remote, list_objects_with_retries,
|
||||
metadata_stream::{stream_tenant_timelines, stream_tenants},
|
||||
metadata_stream::{stream_tenant_timelines, stream_tenants_maybe_prefix},
|
||||
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, MAX_RETRIES,
|
||||
};
|
||||
|
||||
@@ -118,9 +118,17 @@ pub async fn find_garbage(
|
||||
console_config: ConsoleConfig,
|
||||
depth: TraversingDepth,
|
||||
node_kind: NodeKind,
|
||||
tenant_id_prefix: Option<String>,
|
||||
output_path: String,
|
||||
) -> anyhow::Result<()> {
|
||||
let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?;
|
||||
let garbage = find_garbage_inner(
|
||||
bucket_config,
|
||||
console_config,
|
||||
depth,
|
||||
node_kind,
|
||||
tenant_id_prefix,
|
||||
)
|
||||
.await?;
|
||||
let serialized = serde_json::to_vec_pretty(&garbage)?;
|
||||
|
||||
tokio::fs::write(&output_path, &serialized).await?;
|
||||
@@ -152,6 +160,7 @@ async fn find_garbage_inner(
|
||||
console_config: ConsoleConfig,
|
||||
depth: TraversingDepth,
|
||||
node_kind: NodeKind,
|
||||
tenant_id_prefix: Option<String>,
|
||||
) -> anyhow::Result<GarbageList> {
|
||||
// Construct clients for S3 and for Console API
|
||||
let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
|
||||
@@ -178,7 +187,7 @@ async fn find_garbage_inner(
|
||||
|
||||
// Enumerate Tenants in S3, and check if each one exists in Console
|
||||
tracing::info!("Finding all tenants in {}...", bucket_config.desc_str());
|
||||
let tenants = stream_tenants(&remote_client, &target);
|
||||
let tenants = stream_tenants_maybe_prefix(&remote_client, &target, tenant_id_prefix);
|
||||
let tenants_checked = tenants.map_ok(|t| {
|
||||
let api_client = cloud_admin_api_client.clone();
|
||||
let console_cache = console_cache.clone();
|
||||
|
||||
@@ -54,6 +54,8 @@ enum Command {
|
||||
node_kind: NodeKind,
|
||||
#[arg(short, long, default_value_t=TraversingDepth::Tenant)]
|
||||
depth: TraversingDepth,
|
||||
#[arg(short, long, default_value=None)]
|
||||
tenant_id_prefix: Option<String>,
|
||||
#[arg(short, long, default_value_t = String::from("garbage.json"))]
|
||||
output_path: String,
|
||||
},
|
||||
@@ -209,10 +211,19 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::FindGarbage {
|
||||
node_kind,
|
||||
depth,
|
||||
tenant_id_prefix,
|
||||
output_path,
|
||||
} => {
|
||||
let console_config = ConsoleConfig::from_env()?;
|
||||
find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
|
||||
find_garbage(
|
||||
bucket_config,
|
||||
console_config,
|
||||
depth,
|
||||
node_kind,
|
||||
tenant_id_prefix,
|
||||
output_path,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Command::PurgeGarbage {
|
||||
input_path,
|
||||
|
||||
@@ -17,9 +17,20 @@ use utils::id::{TenantId, TimelineId};
|
||||
pub fn stream_tenants<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
|
||||
stream_tenants_maybe_prefix(remote_client, target, None)
|
||||
}
|
||||
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
|
||||
pub fn stream_tenants_maybe_prefix<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
tenant_id_prefix: Option<String>,
|
||||
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
|
||||
try_stream! {
|
||||
let tenants_target = target.tenants_root();
|
||||
let mut tenants_target = target.tenants_root();
|
||||
if let Some(tenant_id_prefix) = tenant_id_prefix {
|
||||
tenants_target.prefix_in_bucket += &tenant_id_prefix;
|
||||
}
|
||||
let mut tenants_stream =
|
||||
std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
|
||||
while let Some(chunk) = tenants_stream.next().await {
|
||||
|
||||
Reference in New Issue
Block a user