diff --git a/Cargo.lock b/Cargo.lock index 55e868a6d5..93efbadd79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4405,12 +4405,14 @@ dependencies = [ "async-stream", "aws-config", "aws-sdk-s3", + "aws-smithy-async", "bincode", "bytes", "chrono", "clap", "crc32c", "either", + "futures", "futures-util", "hex", "histogram", diff --git a/s3_scrubber/Cargo.toml b/s3_scrubber/Cargo.toml index fdae378d55..4d136472e0 100644 --- a/s3_scrubber/Cargo.toml +++ b/s3_scrubber/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] aws-sdk-s3.workspace = true +aws-smithy-async.workspace = true either.workspace = true tokio-rustls.workspace = true anyhow.workspace = true @@ -39,3 +40,5 @@ tracing-subscriber.workspace = true clap.workspace = true tracing-appender = "0.2" histogram = "0.7" + +futures.workspace = true diff --git a/s3_scrubber/src/lib.rs b/s3_scrubber/src/lib.rs index 8fb1346c8e..d2842877d0 100644 --- a/s3_scrubber/src/lib.rs +++ b/s3_scrubber/src/lib.rs @@ -16,10 +16,12 @@ use aws_config::environment::EnvironmentVariableCredentialsProvider; use aws_config::imds::credentials::ImdsCredentialsProvider; use aws_config::meta::credentials::CredentialsProviderChain; use aws_config::profile::ProfileFileCredentialsProvider; +use aws_config::retry::RetryConfig; use aws_config::sso::SsoCredentialsProvider; use aws_config::BehaviorVersion; -use aws_sdk_s3::config::Region; +use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep}; use aws_sdk_s3::{Client, Config}; +use aws_smithy_async::rt::sleep::TokioSleep; use clap::ValueEnum; use pageserver::tenant::TENANTS_SEGMENT_NAME; @@ -283,9 +285,13 @@ pub fn init_s3_client(account_id: Option, bucket_region: Region) -> Clie ) }; + let sleep_impl: Arc = Arc::new(TokioSleep::new()); + let mut builder = Config::builder() .behavior_version(BehaviorVersion::v2023_11_09()) .region(bucket_region) + .retry_config(RetryConfig::adaptive().with_max_attempts(3)) + .sleep_impl(SharedAsyncSleep::from(sleep_impl)) .credentials_provider(credentials_provider); if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") { diff --git a/s3_scrubber/src/main.rs b/s3_scrubber/src/main.rs index ef020edc2a..957213856b 100644 --- a/s3_scrubber/src/main.rs +++ b/s3_scrubber/src/main.rs @@ -1,3 +1,4 @@ +use pageserver_api::shard::TenantShardId; use s3_scrubber::garbage::{find_garbage, purge_garbage, PurgeMode}; use s3_scrubber::scan_metadata::scan_metadata; use s3_scrubber::{init_logging, BucketConfig, ConsoleConfig, NodeKind, TraversingDepth}; @@ -34,6 +35,8 @@ enum Command { ScanMetadata { #[arg(short, long, default_value_t = false)] json: bool, + #[arg(long = "tenant-id", num_args = 0..)] + tenant_ids: Vec, }, } @@ -57,35 +60,37 @@ async fn main() -> anyhow::Result<()> { )); match cli.command { - Command::ScanMetadata { json } => match scan_metadata(bucket_config.clone()).await { - Err(e) => { - tracing::error!("Failed: {e}"); - Err(e) - } - Ok(summary) => { - if json { - println!("{}", serde_json::to_string(&summary).unwrap()) - } else { - println!("{}", summary.summary_string()); + Command::ScanMetadata { json, tenant_ids } => { + match scan_metadata(bucket_config.clone(), tenant_ids).await { + Err(e) => { + tracing::error!("Failed: {e}"); + Err(e) } - if summary.is_fatal() { - Err(anyhow::anyhow!("Fatal scrub errors detected")) - } else if summary.is_empty() { - // Strictly speaking an empty bucket is a valid bucket, but if someone ran the - // scrubber they were likely expecting to scan something, and if we see no timelines - // at all then it's likely due to some configuration issues like a bad prefix - Err(anyhow::anyhow!( - "No timelines found in bucket {} prefix {}", - bucket_config.bucket, - bucket_config - .prefix_in_bucket - .unwrap_or("".to_string()) - )) - } else { - Ok(()) + Ok(summary) => { + if json { + println!("{}", serde_json::to_string(&summary).unwrap()) + } else { + println!("{}", summary.summary_string()); + } + if summary.is_fatal() { + Err(anyhow::anyhow!("Fatal scrub errors detected")) + } else if summary.is_empty() { + // Strictly speaking an empty bucket is a valid bucket, but if someone ran the + // scrubber they were likely expecting to scan something, and if we see no timelines + // at all then it's likely due to some configuration issues like a bad prefix + Err(anyhow::anyhow!( + "No timelines found in bucket {} prefix {}", + bucket_config.bucket, + bucket_config + .prefix_in_bucket + .unwrap_or("".to_string()) + )) + } else { + Ok(()) + } } } - }, + } Command::FindGarbage { node_kind, depth, diff --git a/s3_scrubber/src/scan_metadata.rs b/s3_scrubber/src/scan_metadata.rs index bcc4d2e618..bfde8f0213 100644 --- a/s3_scrubber/src/scan_metadata.rs +++ b/s3_scrubber/src/scan_metadata.rs @@ -187,10 +187,17 @@ Timeline layer count: {6} } /// Scan the pageserver metadata in an S3 bucket, reporting errors and statistics. -pub async fn scan_metadata(bucket_config: BucketConfig) -> anyhow::Result { +pub async fn scan_metadata( + bucket_config: BucketConfig, + tenant_ids: Vec, +) -> anyhow::Result { let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?; - let tenants = stream_tenants(&s3_client, &target); + let tenants = if tenant_ids.is_empty() { + futures::future::Either::Left(stream_tenants(&s3_client, &target)) + } else { + futures::future::Either::Right(futures::stream::iter(tenant_ids.into_iter().map(Ok))) + }; // How many tenants to process in parallel. We need to be mindful of pageservers // accessing the same per tenant prefixes, so use a lower setting than pageservers.