diff --git a/.github/ansible/prod.ap-southeast-1.hosts.yaml b/.github/ansible/prod.ap-southeast-1.hosts.yaml index 13b44f4052..8ccb67b04a 100644 --- a/.github/ansible/prod.ap-southeast-1.hosts.yaml +++ b/.github/ansible/prod.ap-southeast-1.hosts.yaml @@ -32,8 +32,6 @@ storage: hosts: safekeeper-0.ap-southeast-1.aws.neon.tech: ansible_host: i-0d6f1dc5161eef894 - safekeeper-1.ap-southeast-1.aws.neon.tech: - ansible_host: i-0e338adda8eb2d19f safekeeper-2.ap-southeast-1.aws.neon.tech: ansible_host: i-04fb63634e4679eb9 safekeeper-3.ap-southeast-1.aws.neon.tech: diff --git a/.github/workflows/deploy-dev.yml b/.github/workflows/deploy-dev.yml index 409517bf63..b080a29f7c 100644 --- a/.github/workflows/deploy-dev.yml +++ b/.github/workflows/deploy-dev.yml @@ -67,7 +67,7 @@ jobs: ./get_binaries.sh ansible-galaxy collection install sivel.toiletwater - ansible-playbook deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }} + ansible-playbook -v deploy.yaml -i staging.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_STAGING_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }} rm -f neon_install.tar.gz .neon_current_version - name: Cleanup ansible folder diff --git a/.github/workflows/deploy-prod.yml b/.github/workflows/deploy-prod.yml index 540d187274..6096ac8ab9 100644 --- a/.github/workflows/deploy-prod.yml +++ b/.github/workflows/deploy-prod.yml @@ -68,7 +68,7 @@ jobs: ./get_binaries.sh ansible-galaxy collection install sivel.toiletwater - ansible-playbook deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }} + ansible-playbook -v deploy.yaml -i prod.${{ matrix.target_region }}.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{ secrets.NEON_PRODUCTION_API_KEY }} -e SENTRY_URL_PAGESERVER=${{ secrets.SENTRY_URL_PAGESERVER }} -e SENTRY_URL_SAFEKEEPER=${{ secrets.SENTRY_URL_SAFEKEEPER }} rm -f neon_install.tar.gz .neon_current_version deploy-proxy-prod-new: diff --git a/Cargo.lock b/Cargo.lock index d154b4eaea..dab3d12263 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3054,6 +3054,7 @@ dependencies = [ "hyper", "metrics", "once_cell", + "pin-project-lite", "serde", "serde_json", "tempfile", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 4382fbac32..15812e8439 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -21,7 +21,7 @@ toml_edit.workspace = true tracing.workspace = true metrics.workspace = true utils.workspace = true - +pin-project-lite.workspace = true workspace_hack.workspace = true [dev-dependencies] diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 18a2c5dedd..93f5e0596e 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -20,7 +20,10 @@ use aws_sdk_s3::{ }; use aws_smithy_http::body::SdkBody; use hyper::Body; -use tokio::{io, sync::Semaphore}; +use tokio::{ + io::{self, AsyncRead}, + sync::Semaphore, +}; use tokio_util::io::ReaderStream; use tracing::debug; @@ -102,7 +105,7 @@ pub struct S3Bucket { // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. - concurrency_limiter: Semaphore, + concurrency_limiter: Arc, } #[derive(Default)] @@ -162,7 +165,7 @@ impl S3Bucket { client, bucket_name: aws_config.bucket_name.clone(), prefix_in_bucket, - concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()), + concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), }) } @@ -194,9 +197,10 @@ impl S3Bucket { } async fn download_object(&self, request: GetObjectRequest) -> Result { - let _guard = self + let permit = self .concurrency_limiter - .acquire() + .clone() + .acquire_owned() .await .context("Concurrency limiter semaphore got closed during S3 download") .map_err(DownloadError::Other)?; @@ -217,9 +221,10 @@ impl S3Bucket { let metadata = object_output.metadata().cloned().map(StorageMetadata); Ok(Download { metadata, - download_stream: Box::pin(io::BufReader::new( + download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new( + permit, object_output.body.into_async_read(), - )), + ))), }) } Err(SdkError::ServiceError { @@ -240,6 +245,32 @@ impl S3Bucket { } } +pin_project_lite::pin_project! { + /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. + struct RatelimitedAsyncRead { + permit: tokio::sync::OwnedSemaphorePermit, + #[pin] + inner: S, + } +} + +impl RatelimitedAsyncRead { + fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { + RatelimitedAsyncRead { permit, inner } + } +} + +impl AsyncRead for RatelimitedAsyncRead { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.inner.poll_read(cx, buf) + } +} + #[async_trait::async_trait] impl RemoteStorage for S3Bucket { async fn list(&self) -> anyhow::Result> { diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index e9ce52d1ab..20d1d2bfb6 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -74,7 +74,7 @@ async fn compaction_loop(tenant_id: TenantId) { let period = tenant.get_compaction_period(); // TODO: we shouldn't need to await to find tenant and this could be moved outside of - // loop + // loop, #3501. There are also additional "allowed_errors" in tests. if first { first = false; if random_init_delay(period, &cancel).await.is_err() { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f2b0a98509..176eb61ff3 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1770,15 +1770,9 @@ impl Timeline { let calculation = async { let cancel = cancel.child_token(); let ctx = ctx.attached_child(); - tokio::task::spawn_blocking(move || { - // Run in a separate thread since this can do a lot of - // synchronous file IO without .await inbetween - // if there are no RemoteLayers that would require downloading. - let h = tokio::runtime::Handle::current(); - h.block_on(self_calculation.calculate_logical_size(init_lsn, cancel, &ctx)) - }) - .await - .context("Failed to spawn calculation result task")? + self_calculation + .calculate_logical_size(init_lsn, cancel, &ctx) + .await }; let timeline_state_cancellation = async { loop { @@ -1811,7 +1805,7 @@ impl Timeline { tokio::pin!(calculation); loop { tokio::select! { - res = &mut calculation => { return res } + res = &mut calculation => { return res } reason = timeline_state_cancellation => { debug!(reason = reason, "cancelling calculation"); cancel.cancel(); @@ -3745,6 +3739,7 @@ impl Timeline { remote_layer.ongoing_download.close(); } else { // Keep semaphore open. We'll drop the permit at the end of the function. + info!("on-demand download failed: {:?}", result.as_ref().unwrap_err()); } // Don't treat it as an error if the task that triggered the download diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 63196609cc..73f224039e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2080,6 +2080,8 @@ class NeonPageserver(PgProtocol): ".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock() ".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress ".*task iteration took longer than the configured period.*", + # this is until #3501 + ".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant", ] def start(