mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 07:30:37 +00:00
Allow for higher s3 concurrency (#4292)
We currently have a semaphore based rate limiter which we hope will keep us under S3 limits. However, the semaphore does not consider time, so I've been hesitant to raise the concurrency limit of 100. See #3698. The PR Introduces a leaky-bucket based rate limiter instead of the `tokio::sync::Semaphore` which will allow us to raise the limit later on. The configuration changes are not contained here.
This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -2040,6 +2040,17 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "leaky-bucket"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7"
|
||||
dependencies = [
|
||||
"parking_lot",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.144"
|
||||
@@ -3222,6 +3233,7 @@ dependencies = [
|
||||
"aws-smithy-http",
|
||||
"aws-types",
|
||||
"hyper",
|
||||
"leaky-bucket",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -25,6 +25,8 @@ utils.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
leaky-bucket = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile.workspace = true
|
||||
test-context.workspace = true
|
||||
|
||||
@@ -37,6 +37,8 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
|
||||
/// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html
|
||||
/// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
|
||||
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
|
||||
///
|
||||
/// IAM ratelimit should never be observed with caching credentials provider.
|
||||
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// No limits on the client side, which currenltly means 1000 for AWS S3.
|
||||
/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax
|
||||
|
||||
@@ -21,10 +21,7 @@ use aws_sdk_s3::{
|
||||
};
|
||||
use aws_smithy_http::body::SdkBody;
|
||||
use hyper::Body;
|
||||
use tokio::{
|
||||
io::{self, AsyncRead},
|
||||
sync::Semaphore,
|
||||
};
|
||||
use tokio::io;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -105,9 +102,8 @@ pub struct S3Bucket {
|
||||
prefix_in_bucket: Option<String>,
|
||||
max_keys_per_list_response: Option<i32>,
|
||||
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
|
||||
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
|
||||
// The helps to ensure we don't exceed the thresholds.
|
||||
concurrency_limiter: Arc<Semaphore>,
|
||||
concurrency_limiter: Arc<leaky_bucket::RateLimiter>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -158,12 +154,24 @@ impl S3Bucket {
|
||||
}
|
||||
prefix
|
||||
});
|
||||
|
||||
let rps = aws_config.concurrency_limit.get();
|
||||
let concurrency_limiter = leaky_bucket::RateLimiter::builder()
|
||||
.max(rps)
|
||||
.initial(0)
|
||||
// refill it by rps every second. this means the (rps+1)th request will have to wait for
|
||||
// 1 second from earliest.
|
||||
.refill(rps)
|
||||
.interval(std::time::Duration::from_secs(1))
|
||||
.fair(true)
|
||||
.build();
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
bucket_name: aws_config.bucket_name.clone(),
|
||||
max_keys_per_list_response: aws_config.max_keys_per_list_response,
|
||||
prefix_in_bucket,
|
||||
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
|
||||
concurrency_limiter: Arc::new(concurrency_limiter),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -195,13 +203,10 @@ impl S3Bucket {
|
||||
}
|
||||
|
||||
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
|
||||
let permit = self
|
||||
.concurrency_limiter
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 download")
|
||||
.map_err(DownloadError::Other)?;
|
||||
// while the download could take a long time with `leaky_bucket` we have nothing to release
|
||||
// once the download is done. this is because with "requests per second" rate limiting on
|
||||
// s3, there should be no meaning for the long requests.
|
||||
self.concurrency_limiter.clone().acquire_owned(1).await;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
@@ -219,10 +224,9 @@ impl S3Bucket {
|
||||
let metadata = object_output.metadata().cloned().map(StorageMetadata);
|
||||
Ok(Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new(
|
||||
permit,
|
||||
download_stream: Box::pin(io::BufReader::new(
|
||||
object_output.body.into_async_read(),
|
||||
))),
|
||||
)),
|
||||
})
|
||||
}
|
||||
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
|
||||
@@ -238,32 +242,6 @@ impl S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
|
||||
struct RatelimitedAsyncRead<S> {
|
||||
permit: tokio::sync::OwnedSemaphorePermit,
|
||||
#[pin]
|
||||
inner: S,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead> RatelimitedAsyncRead<S> {
|
||||
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
|
||||
RatelimitedAsyncRead { permit, inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let this = self.project();
|
||||
this.inner.poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RemoteStorage for S3Bucket {
|
||||
/// See the doc for `RemoteStorage::list_prefixes`
|
||||
@@ -289,12 +267,7 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 list")
|
||||
.map_err(DownloadError::Other)?;
|
||||
self.concurrency_limiter.acquire_one().await;
|
||||
|
||||
metrics::inc_list_objects();
|
||||
|
||||
@@ -339,11 +312,9 @@ impl RemoteStorage for S3Bucket {
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
) -> anyhow::Result<()> {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 upload")?;
|
||||
// similarly to downloads, the permit does not have live through the upload, but instead we
|
||||
// are rate limiting requests per second.
|
||||
self.concurrency_limiter.acquire_one().await;
|
||||
|
||||
metrics::inc_put_object();
|
||||
|
||||
@@ -398,11 +369,7 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 delete")?;
|
||||
self.concurrency_limiter.acquire_one().await;
|
||||
|
||||
metrics::inc_delete_object();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user