diff --git a/Cargo.lock b/Cargo.lock index d390df94e0..69d161d2b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2040,6 +2040,17 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leaky-bucket" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7" +dependencies = [ + "parking_lot", + "tokio", + "tracing", +] + [[package]] name = "libc" version = "0.2.144" @@ -3222,6 +3233,7 @@ dependencies = [ "aws-smithy-http", "aws-types", "hyper", + "leaky-bucket", "metrics", "once_cell", "pin-project-lite", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 0877a38dd9..5da02293a8 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -25,6 +25,8 @@ utils.workspace = true pin-project-lite.workspace = true workspace_hack.workspace = true +leaky-bucket = "1.0" + [dev-dependencies] tempfile.workspace = true test-context.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e0cc3ca543..f3ae2425f6 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -37,6 +37,8 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ +/// +/// IAM ratelimit should never be observed with caching credentials provider. pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; /// No limits on the client side, which currenltly means 1000 for AWS S3. /// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 0be8c72fe0..631caa6a48 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -21,10 +21,7 @@ use aws_sdk_s3::{ }; use aws_smithy_http::body::SdkBody; use hyper::Body; -use tokio::{ - io::{self, AsyncRead}, - sync::Semaphore, -}; +use tokio::io; use tokio_util::io::ReaderStream; use tracing::debug; @@ -105,9 +102,8 @@ pub struct S3Bucket { prefix_in_bucket: Option, max_keys_per_list_response: Option, // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. - // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. - concurrency_limiter: Arc, + concurrency_limiter: Arc, } #[derive(Default)] @@ -158,12 +154,24 @@ impl S3Bucket { } prefix }); + + let rps = aws_config.concurrency_limit.get(); + let concurrency_limiter = leaky_bucket::RateLimiter::builder() + .max(rps) + .initial(0) + // refill it by rps every second. this means the (rps+1)th request will have to wait for + // 1 second from earliest. + .refill(rps) + .interval(std::time::Duration::from_secs(1)) + .fair(true) + .build(); + Ok(Self { client, bucket_name: aws_config.bucket_name.clone(), max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, - concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), + concurrency_limiter: Arc::new(concurrency_limiter), }) } @@ -195,13 +203,10 @@ impl S3Bucket { } async fn download_object(&self, request: GetObjectRequest) -> Result { - let permit = self - .concurrency_limiter - .clone() - .acquire_owned() - .await - .context("Concurrency limiter semaphore got closed during S3 download") - .map_err(DownloadError::Other)?; + // while the download could take a long time with `leaky_bucket` we have nothing to release + // once the download is done. this is because with "requests per second" rate limiting on + // s3, there should be no meaning for the long requests. + self.concurrency_limiter.clone().acquire_owned(1).await; metrics::inc_get_object(); @@ -219,10 +224,9 @@ impl S3Bucket { let metadata = object_output.metadata().cloned().map(StorageMetadata); Ok(Download { metadata, - download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new( - permit, + download_stream: Box::pin(io::BufReader::new( object_output.body.into_async_read(), - ))), + )), }) } Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { @@ -238,32 +242,6 @@ impl S3Bucket { } } -pin_project_lite::pin_project! { - /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. - struct RatelimitedAsyncRead { - permit: tokio::sync::OwnedSemaphorePermit, - #[pin] - inner: S, - } -} - -impl RatelimitedAsyncRead { - fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { - RatelimitedAsyncRead { permit, inner } - } -} - -impl AsyncRead for RatelimitedAsyncRead { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut io::ReadBuf<'_>, - ) -> std::task::Poll> { - let this = self.project(); - this.inner.poll_read(cx, buf) - } -} - #[async_trait::async_trait] impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` @@ -289,12 +267,7 @@ impl RemoteStorage for S3Bucket { let mut continuation_token = None; loop { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 list") - .map_err(DownloadError::Other)?; + self.concurrency_limiter.acquire_one().await; metrics::inc_list_objects(); @@ -339,11 +312,9 @@ impl RemoteStorage for S3Bucket { to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 upload")?; + // similarly to downloads, the permit does not have live through the upload, but instead we + // are rate limiting requests per second. + self.concurrency_limiter.acquire_one().await; metrics::inc_put_object(); @@ -398,11 +369,7 @@ impl RemoteStorage for S3Bucket { } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 delete")?; + self.concurrency_limiter.acquire_one().await; metrics::inc_delete_object();