From 024109fbeb533b4574976a5899c27f56891de881 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 26 May 2023 13:35:50 +0300 Subject: [PATCH] Allow for higher s3 concurrency (#4292) We currently have a semaphore based rate limiter which we hope will keep us under S3 limits. However, the semaphore does not consider time, so I've been hesitant to raise the concurrency limit of 100. See #3698. The PR Introduces a leaky-bucket based rate limiter instead of the `tokio::sync::Semaphore` which will allow us to raise the limit later on. The configuration changes are not contained here. --- Cargo.lock | 12 ++++ libs/remote_storage/Cargo.toml | 2 + libs/remote_storage/src/lib.rs | 2 + libs/remote_storage/src/s3_bucket.rs | 85 +++++++++------------------- 4 files changed, 42 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d390df94e0..69d161d2b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2040,6 +2040,17 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leaky-bucket" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7" +dependencies = [ + "parking_lot", + "tokio", + "tracing", +] + [[package]] name = "libc" version = "0.2.144" @@ -3222,6 +3233,7 @@ dependencies = [ "aws-smithy-http", "aws-types", "hyper", + "leaky-bucket", "metrics", "once_cell", "pin-project-lite", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 0877a38dd9..5da02293a8 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -25,6 +25,8 @@ utils.workspace = true pin-project-lite.workspace = true workspace_hack.workspace = true +leaky-bucket = "1.0" + [dev-dependencies] tempfile.workspace = true test-context.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e0cc3ca543..f3ae2425f6 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -37,6 +37,8 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ +/// +/// IAM ratelimit should never be observed with caching credentials provider. pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; /// No limits on the client side, which currenltly means 1000 for AWS S3. /// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 0be8c72fe0..631caa6a48 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -21,10 +21,7 @@ use aws_sdk_s3::{ }; use aws_smithy_http::body::SdkBody; use hyper::Body; -use tokio::{ - io::{self, AsyncRead}, - sync::Semaphore, -}; +use tokio::io; use tokio_util::io::ReaderStream; use tracing::debug; @@ -105,9 +102,8 @@ pub struct S3Bucket { prefix_in_bucket: Option, max_keys_per_list_response: Option, // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. - // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. - concurrency_limiter: Arc, + concurrency_limiter: Arc, } #[derive(Default)] @@ -158,12 +154,24 @@ impl S3Bucket { } prefix }); + + let rps = aws_config.concurrency_limit.get(); + let concurrency_limiter = leaky_bucket::RateLimiter::builder() + .max(rps) + .initial(0) + // refill it by rps every second. this means the (rps+1)th request will have to wait for + // 1 second from earliest. + .refill(rps) + .interval(std::time::Duration::from_secs(1)) + .fair(true) + .build(); + Ok(Self { client, bucket_name: aws_config.bucket_name.clone(), max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, - concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), + concurrency_limiter: Arc::new(concurrency_limiter), }) } @@ -195,13 +203,10 @@ impl S3Bucket { } async fn download_object(&self, request: GetObjectRequest) -> Result { - let permit = self - .concurrency_limiter - .clone() - .acquire_owned() - .await - .context("Concurrency limiter semaphore got closed during S3 download") - .map_err(DownloadError::Other)?; + // while the download could take a long time with `leaky_bucket` we have nothing to release + // once the download is done. this is because with "requests per second" rate limiting on + // s3, there should be no meaning for the long requests. + self.concurrency_limiter.clone().acquire_owned(1).await; metrics::inc_get_object(); @@ -219,10 +224,9 @@ impl S3Bucket { let metadata = object_output.metadata().cloned().map(StorageMetadata); Ok(Download { metadata, - download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new( - permit, + download_stream: Box::pin(io::BufReader::new( object_output.body.into_async_read(), - ))), + )), }) } Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { @@ -238,32 +242,6 @@ impl S3Bucket { } } -pin_project_lite::pin_project! { - /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. - struct RatelimitedAsyncRead { - permit: tokio::sync::OwnedSemaphorePermit, - #[pin] - inner: S, - } -} - -impl RatelimitedAsyncRead { - fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { - RatelimitedAsyncRead { permit, inner } - } -} - -impl AsyncRead for RatelimitedAsyncRead { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut io::ReadBuf<'_>, - ) -> std::task::Poll> { - let this = self.project(); - this.inner.poll_read(cx, buf) - } -} - #[async_trait::async_trait] impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` @@ -289,12 +267,7 @@ impl RemoteStorage for S3Bucket { let mut continuation_token = None; loop { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 list") - .map_err(DownloadError::Other)?; + self.concurrency_limiter.acquire_one().await; metrics::inc_list_objects(); @@ -339,11 +312,9 @@ impl RemoteStorage for S3Bucket { to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 upload")?; + // similarly to downloads, the permit does not have live through the upload, but instead we + // are rate limiting requests per second. + self.concurrency_limiter.acquire_one().await; metrics::inc_put_object(); @@ -398,11 +369,7 @@ impl RemoteStorage for S3Bucket { } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 delete")?; + self.concurrency_limiter.acquire_one().await; metrics::inc_delete_object();