From de0e96d2be85812460ffeddabbd0e5bd6d82b912 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 3 Oct 2023 10:22:11 +0200 Subject: [PATCH] remote_storage: separate semaphores for read and write ops (#5440) Before this PR, a compaction that queues a lot of uploads could grab all the semaphore permits. Any readers that need on-demand downloads would queue up, causing getpage@lsn outliers. Internal context: https://neondb.slack.com/archives/C05NXJFNRPA/p1696264359425419?thread_ts=1696250393.840899&cid=C05NXJFNRPA --- libs/remote_storage/src/s3_bucket.rs | 46 +++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index acab953904..fc6d7fa61b 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -47,10 +47,47 @@ pub struct S3Bucket { bucket_name: String, prefix_in_bucket: Option, max_keys_per_list_response: Option, + concurrency_limiter: ConcurrencyLimiter, +} + +struct ConcurrencyLimiter { // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. - concurrency_limiter: Arc, + write: Arc, + read: Arc, +} + +impl ConcurrencyLimiter { + fn for_kind(&self, kind: RequestKind) -> &Arc { + match kind { + RequestKind::Get => &self.read, + RequestKind::Put => &self.write, + RequestKind::List => &self.read, + RequestKind::Delete => &self.write, + } + } + + async fn acquire( + &self, + kind: RequestKind, + ) -> Result, tokio::sync::AcquireError> { + self.for_kind(kind).acquire().await + } + + async fn acquire_owned( + &self, + kind: RequestKind, + ) -> Result { + Arc::clone(self.for_kind(kind)).acquire_owned().await + } + + fn new(limit: usize) -> ConcurrencyLimiter { + Self { + read: Arc::new(Semaphore::new(limit)), + write: Arc::new(Semaphore::new(limit)), + } + } } #[derive(Default)] @@ -117,7 +154,7 @@ impl S3Bucket { bucket_name: aws_config.bucket_name.clone(), max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, - concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), + concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()), }) } @@ -156,7 +193,7 @@ impl S3Bucket { let started_at = start_counting_cancelled_wait(kind); let permit = self .concurrency_limiter - .acquire() + .acquire(kind) .await .expect("semaphore is never closed"); @@ -172,8 +209,7 @@ impl S3Bucket { let started_at = start_counting_cancelled_wait(kind); let permit = self .concurrency_limiter - .clone() - .acquire_owned() + .acquire_owned(kind) .await .expect("semaphore is never closed");