From b1fe8259b44ba0d0f0ce4d777edbc0e7e76ebd62 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Tue, 9 Jul 2024 13:41:37 -0400 Subject: [PATCH] fix(storage-scrubber): use default AWS authentication (#8299) part of https://github.com/neondatabase/cloud/issues/14024 close https://github.com/neondatabase/neon/issues/7665 Things running in k8s container use this authentication: https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html while we did not configure the client to use it. This pull request simply uses the default s3 client credential chain for storage scrubber. It might break compatibility with minio. ## Summary of changes * Use default AWS credential provider chain. * Improvements for s3 errors, we now have detailed errors and correct backtrace on last trial of the operation. --------- Signed-off-by: Alex Chi Z Co-authored-by: Joonas Koivunen --- storage_scrubber/src/find_large_objects.rs | 2 +- storage_scrubber/src/garbage.rs | 4 +- storage_scrubber/src/lib.rs | 89 +++++-------------- storage_scrubber/src/main.rs | 2 +- .../src/pageserver_physical_gc.rs | 2 +- .../src/scan_pageserver_metadata.rs | 2 +- .../src/scan_safekeeper_metadata.rs | 2 +- storage_scrubber/src/tenant_snapshot.rs | 7 +- 8 files changed, 33 insertions(+), 77 deletions(-) diff --git a/storage_scrubber/src/find_large_objects.rs b/storage_scrubber/src/find_large_objects.rs index 1422545f2f..2ef802229d 100644 --- a/storage_scrubber/src/find_large_objects.rs +++ b/storage_scrubber/src/find_large_objects.rs @@ -47,7 +47,7 @@ pub async fn find_large_objects( ignore_deltas: bool, concurrency: usize, ) -> anyhow::Result { - let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; + let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; let tenants = std::pin::pin!(stream_tenants(&s3_client, &target)); let objects_stream = tenants.map_ok(|tenant_shard_id| { diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index ce0ff10ec6..0450851988 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -140,7 +140,7 @@ async fn find_garbage_inner( node_kind: NodeKind, ) -> anyhow::Result { // Construct clients for S3 and for Console API - let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?; + let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?; let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config)); // Build a set of console-known tenants, for quickly eliminating known-active tenants without having @@ -432,7 +432,7 @@ pub async fn purge_garbage( ); let (s3_client, target) = - init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?; + init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?; // Sanity checks on the incoming list if garbage_list.active_tenant_count == 0 { diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index 8f567b22e0..9102ad9906 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -15,17 +15,10 @@ use std::fmt::Display; use std::sync::Arc; use std::time::Duration; -use anyhow::Context; -use aws_config::environment::EnvironmentVariableCredentialsProvider; -use aws_config::imds::credentials::ImdsCredentialsProvider; -use aws_config::meta::credentials::CredentialsProviderChain; -use aws_config::profile::ProfileFileCredentialsProvider; -use aws_config::retry::RetryConfig; -use aws_config::sso::SsoCredentialsProvider; -use aws_config::BehaviorVersion; -use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep}; -use aws_sdk_s3::{Client, Config}; -use aws_smithy_async::rt::sleep::TokioSleep; +use anyhow::{anyhow, Context}; +use aws_sdk_s3::config::Region; +use aws_sdk_s3::error::DisplayErrorContext; +use aws_sdk_s3::Client; use camino::{Utf8Path, Utf8PathBuf}; use clap::ValueEnum; @@ -274,65 +267,21 @@ pub fn init_logging(file_name: &str) -> Option { } } -pub fn init_s3_client(bucket_region: Region) -> Client { - let credentials_provider = { - // uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY" - let chain = CredentialsProviderChain::first_try( - "env", - EnvironmentVariableCredentialsProvider::new(), - ) - // uses "AWS_PROFILE" / `aws sso login --profile ` - .or_else( - "profile-sso", - ProfileFileCredentialsProvider::builder().build(), - ); - - // Use SSO if we were given an account ID - match std::env::var("SSO_ACCOUNT_ID").ok() { - Some(sso_account) => chain.or_else( - "sso", - SsoCredentialsProvider::builder() - .account_id(sso_account) - .role_name("PowerUserAccess") - .start_url("https://neondb.awsapps.com/start") - .region(bucket_region.clone()) - .build(), - ), - None => chain, - } - .or_else( - // Finally try IMDS - "imds", - ImdsCredentialsProvider::builder().build(), - ) - }; - - let sleep_impl: Arc = Arc::new(TokioSleep::new()); - - let mut builder = Config::builder() - .behavior_version( - #[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */ - BehaviorVersion::v2023_11_09(), - ) +pub async fn init_s3_client(bucket_region: Region) -> Client { + let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28()) .region(bucket_region) - .retry_config(RetryConfig::adaptive().with_max_attempts(3)) - .sleep_impl(SharedAsyncSleep::from(sleep_impl)) - .credentials_provider(credentials_provider); - - if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") { - builder = builder.endpoint_url(endpoint) - } - - Client::from_conf(builder.build()) + .load() + .await; + Client::new(&config) } -fn init_remote( +async fn init_remote( bucket_config: BucketConfig, node_kind: NodeKind, ) -> anyhow::Result<(Arc, RootTarget)> { let bucket_region = Region::new(bucket_config.region); let delimiter = "/".to_string(); - let s3_client = Arc::new(init_s3_client(bucket_region)); + let s3_client = Arc::new(init_s3_client(bucket_region).await); let s3_root = match node_kind { NodeKind::Pageserver => RootTarget::Pageserver(S3Target { @@ -357,7 +306,7 @@ async fn list_objects_with_retries( s3_target: &S3Target, continuation_token: Option, ) -> anyhow::Result { - for _ in 0..MAX_RETRIES { + for trial in 0..MAX_RETRIES { match s3_client .list_objects_v2() .bucket(&s3_target.bucket_name) @@ -369,16 +318,22 @@ async fn list_objects_with_retries( { Ok(response) => return Ok(response), Err(e) => { + if trial == MAX_RETRIES - 1 { + return Err(e) + .with_context(|| format!("Failed to list objects {MAX_RETRIES} times")); + } error!( - "list_objects_v2 query failed: {e}, bucket_name={}, prefix={}, delimiter={}", - s3_target.bucket_name, s3_target.prefix_in_bucket, s3_target.delimiter + "list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}", + s3_target.bucket_name, + s3_target.prefix_in_bucket, + s3_target.delimiter, + DisplayErrorContext(e), ); tokio::time::sleep(Duration::from_secs(1)).await; } } } - - anyhow::bail!("Failed to list objects {MAX_RETRIES} times") + Err(anyhow!("unreachable unless MAX_RETRIES==0")) } async fn download_object_with_retries( diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 16a26613d2..d816121192 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -196,7 +196,7 @@ async fn main() -> anyhow::Result<()> { concurrency, } => { let downloader = - SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?; + SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?; downloader.download().await } Command::PageserverPhysicalGc { diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index 0146433128..fb8fbc1635 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -160,7 +160,7 @@ pub async fn pageserver_physical_gc( min_age: Duration, mode: GcMode, ) -> anyhow::Result { - let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; + let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; let tenants = if tenant_ids.is_empty() { futures::future::Either::Left(stream_tenants(&s3_client, &target)) diff --git a/storage_scrubber/src/scan_pageserver_metadata.rs b/storage_scrubber/src/scan_pageserver_metadata.rs index af74ffa4cd..df4f29acf7 100644 --- a/storage_scrubber/src/scan_pageserver_metadata.rs +++ b/storage_scrubber/src/scan_pageserver_metadata.rs @@ -199,7 +199,7 @@ pub async fn scan_metadata( bucket_config: BucketConfig, tenant_ids: Vec, ) -> anyhow::Result { - let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?; + let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?; let tenants = if tenant_ids.is_empty() { futures::future::Either::Left(stream_tenants(&s3_client, &target)) diff --git a/storage_scrubber/src/scan_safekeeper_metadata.rs b/storage_scrubber/src/scan_safekeeper_metadata.rs index 24051b03de..553adf8f46 100644 --- a/storage_scrubber/src/scan_safekeeper_metadata.rs +++ b/storage_scrubber/src/scan_safekeeper_metadata.rs @@ -106,7 +106,7 @@ pub async fn scan_safekeeper_metadata( let timelines = client.query(&query, &[]).await?; info!("loaded {} timelines", timelines.len()); - let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?; + let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?; let console_config = ConsoleConfig::from_env()?; let cloud_admin_api_client = CloudAdminApiClient::new(console_config); diff --git a/storage_scrubber/src/tenant_snapshot.rs b/storage_scrubber/src/tenant_snapshot.rs index 450b337235..5a75f8d40e 100644 --- a/storage_scrubber/src/tenant_snapshot.rs +++ b/storage_scrubber/src/tenant_snapshot.rs @@ -28,13 +28,13 @@ pub struct SnapshotDownloader { } impl SnapshotDownloader { - pub fn new( + pub async fn new( bucket_config: BucketConfig, tenant_id: TenantId, output_path: Utf8PathBuf, concurrency: usize, ) -> anyhow::Result { - let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?; + let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?; Ok(Self { s3_client, s3_root, @@ -215,7 +215,8 @@ impl SnapshotDownloader { } pub async fn download(&self) -> anyhow::Result<()> { - let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?; + let (s3_client, target) = + init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?; // Generate a stream of TenantShardId let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;