diff --git a/Cargo.lock b/Cargo.lock index 5e7fce3e8d..0a434c6ee6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4064,6 +4064,7 @@ dependencies = [ "aws-config", "aws-credential-types", "aws-sdk-s3", + "aws-smithy-async", "aws-smithy-http", "aws-types", "azure_core", diff --git a/Cargo.toml b/Cargo.toml index 6be831a2b3..5b719d776b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ async-trait = "0.1" aws-config = { version = "0.56", default-features = false, features=["rustls"] } aws-sdk-s3 = "0.29" aws-smithy-http = "0.56" +aws-smithy-async = { version = "0.56", default-features = false, features=["rt-tokio"] } aws-credential-types = "0.56" aws-types = "0.56" axum = { version = "0.6.20", features = ["ws"] } diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 65b5392389..d7bcce28cb 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true anyhow.workspace = true async-trait.workspace = true once_cell.workspace = true +aws-smithy-async.workspace = true aws-smithy-http.workspace = true aws-types.workspace = true aws-config.workspace = true diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 560a2c14e9..ab3fd3fe62 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -4,23 +4,27 @@ //! allowing multiple api users to independently work with the same S3 bucket, if //! their bucket prefixes are both specified and different. -use std::borrow::Cow; +use std::{borrow::Cow, sync::Arc}; use anyhow::Context; use aws_config::{ environment::credentials::EnvironmentVariableCredentialsProvider, - imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain, - provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider, + imds::credentials::ImdsCredentialsProvider, + meta::credentials::CredentialsProviderChain, + provider_config::ProviderConfig, + retry::{RetryConfigBuilder, RetryMode}, + web_identity_token::WebIdentityTokenCredentialsProvider, }; use aws_credential_types::cache::CredentialsCache; use aws_sdk_s3::{ - config::{Config, Region}, + config::{AsyncSleep, Config, Region, SharedAsyncSleep}, error::SdkError, operation::get_object::GetObjectError, primitives::ByteStream, types::{Delete, ObjectIdentifier}, Client, }; +use aws_smithy_async::rt::sleep::TokioSleep; use aws_smithy_http::body::SdkBody; use hyper::Body; use scopeguard::ScopeGuard; @@ -83,10 +87,23 @@ impl S3Bucket { .or_else("imds", ImdsCredentialsProvider::builder().build()) }; + // AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off + let sleep_impl: Arc = Arc::new(TokioSleep::new()); + + // We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling + // responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one + // attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled. + let mut retry_config = RetryConfigBuilder::new(); + retry_config + .set_max_attempts(Some(1)) + .set_mode(Some(RetryMode::Adaptive)); + let mut config_builder = Config::builder() .region(region) .credentials_cache(CredentialsCache::lazy()) - .credentials_provider(credentials_provider); + .credentials_provider(credentials_provider) + .sleep_impl(SharedAsyncSleep::from(sleep_impl)) + .retry_config(retry_config.build()); if let Some(custom_endpoint) = aws_config.endpoint.clone() { config_builder = config_builder