mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
fix(storage-scrubber): use default AWS authentication (#8299)
part of https://github.com/neondatabase/cloud/issues/14024 close https://github.com/neondatabase/neon/issues/7665 Things running in k8s container use this authentication: https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html while we did not configure the client to use it. This pull request simply uses the default s3 client credential chain for storage scrubber. It might break compatibility with minio. ## Summary of changes * Use default AWS credential provider chain. * Improvements for s3 errors, we now have detailed errors and correct backtrace on last trial of the operation. --------- Signed-off-by: Alex Chi Z <chi@neon.tech> Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
@@ -47,7 +47,7 @@ pub async fn find_large_objects(
|
||||
ignore_deltas: bool,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<LargeObjectListing> {
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
|
||||
let objects_stream = tenants.map_ok(|tenant_shard_id| {
|
||||
|
||||
@@ -140,7 +140,7 @@ async fn find_garbage_inner(
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<GarbageList> {
|
||||
// Construct clients for S3 and for Console API
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?;
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
|
||||
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
|
||||
|
||||
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
|
||||
@@ -432,7 +432,7 @@ pub async fn purge_garbage(
|
||||
);
|
||||
|
||||
let (s3_client, target) =
|
||||
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?;
|
||||
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
|
||||
|
||||
// Sanity checks on the incoming list
|
||||
if garbage_list.active_tenant_count == 0 {
|
||||
|
||||
@@ -15,17 +15,10 @@ use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::environment::EnvironmentVariableCredentialsProvider;
|
||||
use aws_config::imds::credentials::ImdsCredentialsProvider;
|
||||
use aws_config::meta::credentials::CredentialsProviderChain;
|
||||
use aws_config::profile::ProfileFileCredentialsProvider;
|
||||
use aws_config::retry::RetryConfig;
|
||||
use aws_config::sso::SsoCredentialsProvider;
|
||||
use aws_config::BehaviorVersion;
|
||||
use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
|
||||
use aws_sdk_s3::{Client, Config};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
use anyhow::{anyhow, Context};
|
||||
use aws_sdk_s3::config::Region;
|
||||
use aws_sdk_s3::error::DisplayErrorContext;
|
||||
use aws_sdk_s3::Client;
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::ValueEnum;
|
||||
@@ -274,65 +267,21 @@ pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init_s3_client(bucket_region: Region) -> Client {
|
||||
let credentials_provider = {
|
||||
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
|
||||
let chain = CredentialsProviderChain::first_try(
|
||||
"env",
|
||||
EnvironmentVariableCredentialsProvider::new(),
|
||||
)
|
||||
// uses "AWS_PROFILE" / `aws sso login --profile <profile>`
|
||||
.or_else(
|
||||
"profile-sso",
|
||||
ProfileFileCredentialsProvider::builder().build(),
|
||||
);
|
||||
|
||||
// Use SSO if we were given an account ID
|
||||
match std::env::var("SSO_ACCOUNT_ID").ok() {
|
||||
Some(sso_account) => chain.or_else(
|
||||
"sso",
|
||||
SsoCredentialsProvider::builder()
|
||||
.account_id(sso_account)
|
||||
.role_name("PowerUserAccess")
|
||||
.start_url("https://neondb.awsapps.com/start")
|
||||
.region(bucket_region.clone())
|
||||
.build(),
|
||||
),
|
||||
None => chain,
|
||||
}
|
||||
.or_else(
|
||||
// Finally try IMDS
|
||||
"imds",
|
||||
ImdsCredentialsProvider::builder().build(),
|
||||
)
|
||||
};
|
||||
|
||||
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
|
||||
|
||||
let mut builder = Config::builder()
|
||||
.behavior_version(
|
||||
#[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
|
||||
BehaviorVersion::v2023_11_09(),
|
||||
)
|
||||
pub async fn init_s3_client(bucket_region: Region) -> Client {
|
||||
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
|
||||
.region(bucket_region)
|
||||
.retry_config(RetryConfig::adaptive().with_max_attempts(3))
|
||||
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
|
||||
.credentials_provider(credentials_provider);
|
||||
|
||||
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
|
||||
builder = builder.endpoint_url(endpoint)
|
||||
}
|
||||
|
||||
Client::from_conf(builder.build())
|
||||
.load()
|
||||
.await;
|
||||
Client::new(&config)
|
||||
}
|
||||
|
||||
fn init_remote(
|
||||
async fn init_remote(
|
||||
bucket_config: BucketConfig,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
|
||||
let bucket_region = Region::new(bucket_config.region);
|
||||
let delimiter = "/".to_string();
|
||||
let s3_client = Arc::new(init_s3_client(bucket_region));
|
||||
let s3_client = Arc::new(init_s3_client(bucket_region).await);
|
||||
|
||||
let s3_root = match node_kind {
|
||||
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
|
||||
@@ -357,7 +306,7 @@ async fn list_objects_with_retries(
|
||||
s3_target: &S3Target,
|
||||
continuation_token: Option<String>,
|
||||
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
|
||||
for _ in 0..MAX_RETRIES {
|
||||
for trial in 0..MAX_RETRIES {
|
||||
match s3_client
|
||||
.list_objects_v2()
|
||||
.bucket(&s3_target.bucket_name)
|
||||
@@ -369,16 +318,22 @@ async fn list_objects_with_retries(
|
||||
{
|
||||
Ok(response) => return Ok(response),
|
||||
Err(e) => {
|
||||
if trial == MAX_RETRIES - 1 {
|
||||
return Err(e)
|
||||
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
|
||||
}
|
||||
error!(
|
||||
"list_objects_v2 query failed: {e}, bucket_name={}, prefix={}, delimiter={}",
|
||||
s3_target.bucket_name, s3_target.prefix_in_bucket, s3_target.delimiter
|
||||
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
|
||||
s3_target.bucket_name,
|
||||
s3_target.prefix_in_bucket,
|
||||
s3_target.delimiter,
|
||||
DisplayErrorContext(e),
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
|
||||
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
|
||||
}
|
||||
|
||||
async fn download_object_with_retries(
|
||||
|
||||
@@ -196,7 +196,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
concurrency,
|
||||
} => {
|
||||
let downloader =
|
||||
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?;
|
||||
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?;
|
||||
downloader.download().await
|
||||
}
|
||||
Command::PageserverPhysicalGc {
|
||||
|
||||
@@ -160,7 +160,7 @@ pub async fn pageserver_physical_gc(
|
||||
min_age: Duration,
|
||||
mode: GcMode,
|
||||
) -> anyhow::Result<GcSummary> {
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
|
||||
let tenants = if tenant_ids.is_empty() {
|
||||
futures::future::Either::Left(stream_tenants(&s3_client, &target))
|
||||
|
||||
@@ -199,7 +199,7 @@ pub async fn scan_metadata(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_ids: Vec<TenantShardId>,
|
||||
) -> anyhow::Result<MetadataSummary> {
|
||||
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
|
||||
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
|
||||
|
||||
let tenants = if tenant_ids.is_empty() {
|
||||
futures::future::Either::Left(stream_tenants(&s3_client, &target))
|
||||
|
||||
@@ -106,7 +106,7 @@ pub async fn scan_safekeeper_metadata(
|
||||
let timelines = client.query(&query, &[]).await?;
|
||||
info!("loaded {} timelines", timelines.len());
|
||||
|
||||
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?;
|
||||
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
|
||||
let console_config = ConsoleConfig::from_env()?;
|
||||
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);
|
||||
|
||||
|
||||
@@ -28,13 +28,13 @@ pub struct SnapshotDownloader {
|
||||
}
|
||||
|
||||
impl SnapshotDownloader {
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
bucket_config: BucketConfig,
|
||||
tenant_id: TenantId,
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
Ok(Self {
|
||||
s3_client,
|
||||
s3_root,
|
||||
@@ -215,7 +215,8 @@ impl SnapshotDownloader {
|
||||
}
|
||||
|
||||
pub async fn download(&self) -> anyhow::Result<()> {
|
||||
let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
let (s3_client, target) =
|
||||
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
|
||||
// Generate a stream of TenantShardId
|
||||
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;
|
||||
|
||||
Reference in New Issue
Block a user