mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
This reverts commit 024109fbeb for it
failing to be speed up anything, but run into more errors.
See: #3698.
This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -2040,17 +2040,6 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "leaky-bucket"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7"
|
||||
dependencies = [
|
||||
"parking_lot",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.144"
|
||||
@@ -3233,7 +3222,6 @@ dependencies = [
|
||||
"aws-smithy-http",
|
||||
"aws-types",
|
||||
"hyper",
|
||||
"leaky-bucket",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -25,8 +25,6 @@ utils.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
leaky-bucket = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile.workspace = true
|
||||
test-context.workspace = true
|
||||
|
||||
@@ -37,8 +37,6 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
|
||||
/// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html
|
||||
/// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests
|
||||
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
|
||||
///
|
||||
/// IAM ratelimit should never be observed with caching credentials provider.
|
||||
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// No limits on the client side, which currenltly means 1000 for AWS S3.
|
||||
/// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax
|
||||
|
||||
@@ -21,7 +21,10 @@ use aws_sdk_s3::{
|
||||
};
|
||||
use aws_smithy_http::body::SdkBody;
|
||||
use hyper::Body;
|
||||
use tokio::io;
|
||||
use tokio::{
|
||||
io::{self, AsyncRead},
|
||||
sync::Semaphore,
|
||||
};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -102,8 +105,9 @@ pub struct S3Bucket {
|
||||
prefix_in_bucket: Option<String>,
|
||||
max_keys_per_list_response: Option<i32>,
|
||||
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
|
||||
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
|
||||
// The helps to ensure we don't exceed the thresholds.
|
||||
concurrency_limiter: Arc<leaky_bucket::RateLimiter>,
|
||||
concurrency_limiter: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -154,24 +158,12 @@ impl S3Bucket {
|
||||
}
|
||||
prefix
|
||||
});
|
||||
|
||||
let rps = aws_config.concurrency_limit.get();
|
||||
let concurrency_limiter = leaky_bucket::RateLimiter::builder()
|
||||
.max(rps)
|
||||
.initial(0)
|
||||
// refill it by rps every second. this means the (rps+1)th request will have to wait for
|
||||
// 1 second from earliest.
|
||||
.refill(rps)
|
||||
.interval(std::time::Duration::from_secs(1))
|
||||
.fair(true)
|
||||
.build();
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
bucket_name: aws_config.bucket_name.clone(),
|
||||
max_keys_per_list_response: aws_config.max_keys_per_list_response,
|
||||
prefix_in_bucket,
|
||||
concurrency_limiter: Arc::new(concurrency_limiter),
|
||||
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -203,10 +195,13 @@ impl S3Bucket {
|
||||
}
|
||||
|
||||
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
|
||||
// while the download could take a long time with `leaky_bucket` we have nothing to release
|
||||
// once the download is done. this is because with "requests per second" rate limiting on
|
||||
// s3, there should be no meaning for the long requests.
|
||||
self.concurrency_limiter.clone().acquire_owned(1).await;
|
||||
let permit = self
|
||||
.concurrency_limiter
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 download")
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
@@ -224,9 +219,10 @@ impl S3Bucket {
|
||||
let metadata = object_output.metadata().cloned().map(StorageMetadata);
|
||||
Ok(Download {
|
||||
metadata,
|
||||
download_stream: Box::pin(io::BufReader::new(
|
||||
download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new(
|
||||
permit,
|
||||
object_output.body.into_async_read(),
|
||||
)),
|
||||
))),
|
||||
})
|
||||
}
|
||||
Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => {
|
||||
@@ -242,6 +238,32 @@ impl S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
/// An `AsyncRead` adapter which carries a permit for the lifetime of the value.
|
||||
struct RatelimitedAsyncRead<S> {
|
||||
permit: tokio::sync::OwnedSemaphorePermit,
|
||||
#[pin]
|
||||
inner: S,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead> RatelimitedAsyncRead<S> {
|
||||
fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self {
|
||||
RatelimitedAsyncRead { permit, inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead> AsyncRead for RatelimitedAsyncRead<S> {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let this = self.project();
|
||||
this.inner.poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RemoteStorage for S3Bucket {
|
||||
/// See the doc for `RemoteStorage::list_prefixes`
|
||||
@@ -267,7 +289,12 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
self.concurrency_limiter.acquire_one().await;
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 list")
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
metrics::inc_list_objects();
|
||||
|
||||
@@ -312,9 +339,11 @@ impl RemoteStorage for S3Bucket {
|
||||
to: &RemotePath,
|
||||
metadata: Option<StorageMetadata>,
|
||||
) -> anyhow::Result<()> {
|
||||
// similarly to downloads, the permit does not have live through the upload, but instead we
|
||||
// are rate limiting requests per second.
|
||||
self.concurrency_limiter.acquire_one().await;
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 upload")?;
|
||||
|
||||
metrics::inc_put_object();
|
||||
|
||||
@@ -369,7 +398,11 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
|
||||
self.concurrency_limiter.acquire_one().await;
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 delete")?;
|
||||
|
||||
metrics::inc_delete_object();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user