diff --git a/Cargo.lock b/Cargo.lock index aa0d77cb6b..2401644109 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4288,6 +4288,7 @@ dependencies = [ "clap", "crc32c", "either", + "futures", "futures-util", "hex", "histogram", diff --git a/s3_scrubber/Cargo.toml b/s3_scrubber/Cargo.toml index a590ab61ba..4d136472e0 100644 --- a/s3_scrubber/Cargo.toml +++ b/s3_scrubber/Cargo.toml @@ -40,3 +40,5 @@ tracing-subscriber.workspace = true clap.workspace = true tracing-appender = "0.2" histogram = "0.7" + +futures.workspace = true diff --git a/s3_scrubber/src/main.rs b/s3_scrubber/src/main.rs index ef020edc2a..957213856b 100644 --- a/s3_scrubber/src/main.rs +++ b/s3_scrubber/src/main.rs @@ -1,3 +1,4 @@ +use pageserver_api::shard::TenantShardId; use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode}; use s3_scrubber::scan_metadata::scan_metadata; use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth}; @@ -34,6 +35,8 @@ enum Command { ScanMetadata { #[arg(short, long, default_value_t = false)] json: bool, + #[arg(long = "tenant-id", num_args = 0..)] + tenant_ids: Vec, }, } @@ -57,35 +60,37 @@ async fn main() -> anyhow::Result<()> { )); match cli.command { - Command::ScanMetadata { json } => match scan_metadata(bucket_config.clone()).await { - Err(e) => { - tracing::error!("Failed: {e}"); - Err(e) - } - Ok(summary) => { - if json { - println!("{}", serde_json::to_string(&summary).unwrap()) - } else { - println!("{}", summary.summary_string()); + Command::ScanMetadata { json, tenant_ids } => { + match scan_metadata(bucket_config.clone(), tenant_ids).await { + Err(e) => { + tracing::error!("Failed: {e}"); + Err(e) } - if summary.is_fatal() { - Err(anyhow::anyhow!("Fatal scrub errors detected")) - } else if summary.is_empty() { - // Strictly speaking an empty bucket is a valid bucket, but if someone ran the - // scrubber they were likely expecting to scan something, and if we see no timelines - // at all then it's likely due to some configuration issues like a bad prefix - Err(anyhow::anyhow!( - "No timelines found in bucket {} prefix {}", - bucket_config.bucket, - bucket_config - .prefix_in_bucket - .unwrap_or("".to_string()) - )) - } else { - Ok(()) + Ok(summary) => { + if json { + println!("{}", serde_json::to_string(&summary).unwrap()) + } else { + println!("{}", summary.summary_string()); + } + if summary.is_fatal() { + Err(anyhow::anyhow!("Fatal scrub errors detected")) + } else if summary.is_empty() { + // Strictly speaking an empty bucket is a valid bucket, but if someone ran the + // scrubber they were likely expecting to scan something, and if we see no timelines + // at all then it's likely due to some configuration issues like a bad prefix + Err(anyhow::anyhow!( + "No timelines found in bucket {} prefix {}", + bucket_config.bucket, + bucket_config + .prefix_in_bucket + .unwrap_or("".to_string()) + )) + } else { + Ok(()) + } } } - }, + } Command::FindGarbage { node_kind, depth, diff --git a/s3_scrubber/src/scan_metadata.rs b/s3_scrubber/src/scan_metadata.rs index 91347ca21b..fd5ebf8f16 100644 --- a/s3_scrubber/src/scan_metadata.rs +++ b/s3_scrubber/src/scan_metadata.rs @@ -180,10 +180,17 @@ Timeline layer count: {6} } /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics. -pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result { +pub async fn scan_metadata( + bucket_config: BucketConfig, + tenant_ids: Vec, +) -> anyhow::Result { let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?; - let tenants = stream_tenants(&s3_client, &target); + let tenants = if tenant_ids.is_empty() { + futures::future::Either::Left(stream_tenants(&s3_client, &target)) + } else { + futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok))) + }; // How many tenants to process in parallel. We need to be mindful of pageservers // accessing the same per tenant prefixes, so use a lower setting than pageservers.