diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 1422545f2f..2ef802229d 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -47,7 +47,7 @@ pub async fn find_large_objects( ignore_deltas: bool, concurrency: usize, ) -> anyhow::Result { - let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; + let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; let tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); let objects_stream = tenants.map_ok(|tenant_shard_id| { diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index ce0ff10ec6..0450851988 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -140,7 +140,7 @@ async fn find_garbage_inner( node_kind: NodeKind, ) -> anyhow::Result { // Construct clients for S3 and for Console API - let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?; + let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?; let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config)); // Build a set of console-known tenants, for quickly eliminating known-active tenants without having @@ -432,7 +432,7 @@ pub async fn purge_garbage( ); let (s3_client, target) = - init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?; + init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; // Sanity checks on the incoming list if garbage_list.active_tenant_count == 0 { diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index 8f567b22e0..9102ad9906 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -15,17 +15,10 @@ use std::fmt::Display; use std::sync::Arc; use std::time::Duration; -use anyhow::Context; -use aws_config::environment::EnvironmentVariableCredentialsProvider; -use aws_config::imds::credentials::ImdsCredentialsProvider; -use aws_config::meta::credentials::CredentialsProviderChain; -use aws_config::profile::ProfileFileCredentialsProvider; -use aws_config::retry::RetryConfig; -use aws_config::sso::SsoCredentialsProvider; -use aws_config::BehaviorVersion; -use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep}; -use aws_sdk_s3::{Client, Config}; -use aws_smithy_async::rt::sleep::TokioSleep; +use anyhow::{anyhow, Context}; +use aws_sdk_s3::config::Region; +use aws_sdk_s3::error::DisplayErrorContext; +use aws_sdk_s3::Client; use camino::{Utf8Path, Utf8PathBuf}; use clap::ValueEnum; @@ -274,65 +267,21 @@ pub fn init_logging(file_name: &str) -> Option { } } -pub fn init_s3_client(bucket_region: Region) -> Client { - let credentials_provider = { - // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY" - let chain = CredentialsProviderChain::first_try( - "env", - EnvironmentVariableCredentialsProvider::new(), - ) - // uses "AWS_PROFILE" / `aws sso login --profile ` - .or_else( - "profile-sso", - ProfileFileCredentialsProvider::builder().build(), - ); - - // Use SSO if we were given an account ID - match std::env::var("SSO_ACCOUNT_ID").ok() { - Some(sso_account) => chain.or_else( - "sso", - SsoCredentialsProvider::builder() - .account_id(sso_account) - .role_name("PowerUserAccess") - .start_url("https://neondb.awsapps.com/start") - .region(bucket_region.clone()) - .build(), - ), - None => chain, - } - .or_else( - // Finally try IMDS - "imds", - ImdsCredentialsProvider::builder().build(), - ) - }; - - let sleep_impl: Arc = Arc::new(TokioSleep::new()); - - let mut builder = Config::builder() - .behavior_version( - #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */ - BehaviorVersion::v2023_11_09(), - ) +pub async fn init_s3_client(bucket_region: Region) -> Client { + let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28()) .region(bucket_region) - .retry_config(RetryConfig::adaptive().with_max_attempts(3)) - .sleep_impl(SharedAsyncSleep::from(sleep_impl)) - .credentials_provider(credentials_provider); - - if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") { - builder = builder.endpoint_url(endpoint) - } - - Client::from_conf(builder.build()) + .load() + .await; + Client::new(&config) } -fn init_remote( +async fn init_remote( bucket_config: BucketConfig, node_kind: NodeKind, ) -> anyhow::Result<(Arc, RootTarget)> { let bucket_region = Region::new(bucket_config.region); let delimiter = "/".to_string(); - let s3_client = Arc::new(init_s3_client(bucket_region)); + let s3_client = Arc::new(init_s3_client(bucket_region).await); let s3_root = match node_kind { NodeKind::Pageserver => RootTarget::Pageserver(S3Target { @@ -357,7 +306,7 @@ async fn list_objects_with_retries( s3_target: &S3Target, continuation_token: Option, ) -> anyhow::Result { - for _ in 0..MAX_RETRIES { + for trial in 0..MAX_RETRIES { match s3_client .list_objects_v2() .bucket(&s3_target.bucket_name) @@ -369,16 +318,22 @@ async fn list_objects_with_retries( { Ok(response) => return Ok(response), Err(e) => { + if trial == MAX_RETRIES - 1 { + return Err(e) + .with_context(|| format!("Failed to list objects {MAX_RETRIES} times")); + } error!( - "list_objects_v2 query failed: {e}, bucket_name={}, prefix={}, delimiter={}", - s3_target.bucket_name, s3_target.prefix_in_bucket, s3_target.delimiter + "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}", + s3_target.bucket_name, + s3_target.prefix_in_bucket, + s3_target.delimiter, + DisplayErrorContext(e), ); tokio::time::sleep(Duration::from_secs(1)).await; } } } - - anyhow::bail!("Failed to list objects {MAX_RETRIES} times") + Err(anyhow!("unreachable unless MAX_RETRIES==0")) } async fn download_object_with_retries( diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 16a26613d2..d816121192 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -196,7 +196,7 @@ async fn main() -> anyhow::Result<()> { concurrency, } => { let downloader = - SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?; + SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?; downloader.download().await } Command::PageserverPhysicalGc { diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index 0146433128..fb8fbc1635 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -160,7 +160,7 @@ pub async fn pageserver_physical_gc( min_age: Duration, mode: GcMode, ) -> anyhow::Result { - let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; + let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; let tenants = if tenant_ids.is_empty() { futures::future::Either::Left(stream_tenants(&s3_client, &target)) diff --git a/storage_scrubber/src/scan_pageserver_metadata.rs b/storage_scrubber/src/scan_pageserver_metadata.rs index af74ffa4cd..df4f29acf7 100644 --- a/storage_scrubber/src/scan_pageserver_metadata.rs +++ b/storage_scrubber/src/scan_pageserver_metadata.rs @@ -199,7 +199,7 @@ pub async fn scan_metadata( bucket_config: BucketConfig, tenant_ids: Vec, ) -> anyhow::Result { - let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?; + let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?; let tenants = if tenant_ids.is_empty() { futures::future::Either::Left(stream_tenants(&s3_client, &target)) diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index 24051b03de..553adf8f46 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -106,7 +106,7 @@ pub async fn scan_safekeeper_metadata( let timelines = client.query(&query, &[]).await?; info!("loaded {} timelines", timelines.len()); - let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?; + let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?; let console_config = ConsoleConfig::from_env()?; let cloud_admin_api_client = CloudAdminApiClient::new(console_config); diff --git a/storage_scrubber/src/tenant_snapshot.rs b/storage_scrubber/src/tenant_snapshot.rs index 450b337235..5a75f8d40e 100644 --- a/storage_scrubber/src/tenant_snapshot.rs +++ b/storage_scrubber/src/tenant_snapshot.rs @@ -28,13 +28,13 @@ pub struct SnapshotDownloader { } impl SnapshotDownloader { - pub fn new( + pub async fn new( bucket_config: BucketConfig, tenant_id: TenantId, output_path: Utf8PathBuf, concurrency: usize, ) -> anyhow::Result { - let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; + let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; Ok(Self { s3_client, s3_root, @@ -215,7 +215,8 @@ impl SnapshotDownloader { } pub async fn download(&self) -> anyhow::Result<()> { - let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?; + let (s3_client, target) = + init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?; // Generate a stream of TenantShardId let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;