From 2b25f0dfa08ec1f6d6f73fd08481571f406c437d Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Thu, 25 May 2023 22:05:11 +0100 Subject: [PATCH 1/5] Fix flakiness of test_metric_collection (#4346) ## Problem Test `test_metric_collection` become flaky: ``` AssertionError: assert not ['2023-05-25T14:03:41.644042Z ERROR metrics_collection: failed to send metrics: reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("localhost")), port: Some(18022), path: "/billing/api/v1/usage_events", query: None, fragment: None }, source: hyper::Error(Connect, ConnectError("tcp connect error", Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" })) }', ...] ``` I suspect it is caused by having 2 places when we define `httpserver_listen_address` fixture (which is internally used by `pytest-httpserver` plugin) ## Summary of changes - Remove the definition of `httpserver_listen_address` from `test_runner/regress/test_ddl_forwarding.py` and keep one in `test_runner/fixtures/neon_fixtures.py` - Also remote unused `httpserver_listen_address` parameter from `test_proxy_metric_collection` --- test_runner/regress/test_ddl_forwarding.py | 11 +---------- test_runner/regress/test_metric_collection.py | 1 - 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/test_runner/regress/test_ddl_forwarding.py b/test_runner/regress/test_ddl_forwarding.py index 27ebd3c181..6bfa8fdbe7 100644 --- a/test_runner/regress/test_ddl_forwarding.py +++ b/test_runner/regress/test_ddl_forwarding.py @@ -4,21 +4,12 @@ from typing import Any, Dict, List, Optional, Tuple, Type import psycopg2 import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import ( - PortDistributor, - VanillaPostgres, -) +from fixtures.neon_fixtures import VanillaPostgres from pytest_httpserver import HTTPServer from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response -@pytest.fixture(scope="session") -def httpserver_listen_address(port_distributor: PortDistributor): - port = port_distributor.get_port() - return ("localhost", port) - - def handle_db(dbs, roles, operation): if operation["op"] == "set": if "old_name" in operation and operation["old_name"] in dbs: diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index 00ea77f2e7..12e695bcbd 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -228,7 +228,6 @@ def proxy_with_metric_collector( @pytest.mark.asyncio async def test_proxy_metric_collection( httpserver: HTTPServer, - httpserver_listen_address, proxy_with_metric_collector: NeonProxy, vanilla_pg: VanillaPostgres, ): From 024109fbeb533b4574976a5899c27f56891de881 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 26 May 2023 13:35:50 +0300 Subject: [PATCH 2/5] Allow for higher s3 concurrency (#4292) We currently have a semaphore based rate limiter which we hope will keep us under S3 limits. However, the semaphore does not consider time, so I've been hesitant to raise the concurrency limit of 100. See #3698. The PR Introduces a leaky-bucket based rate limiter instead of the `tokio::sync::Semaphore` which will allow us to raise the limit later on. The configuration changes are not contained here. --- Cargo.lock | 12 ++++ libs/remote_storage/Cargo.toml | 2 + libs/remote_storage/src/lib.rs | 2 + libs/remote_storage/src/s3_bucket.rs | 85 +++++++++------------------- 4 files changed, 42 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d390df94e0..69d161d2b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2040,6 +2040,17 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leaky-bucket" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7" +dependencies = [ + "parking_lot", + "tokio", + "tracing", +] + [[package]] name = "libc" version = "0.2.144" @@ -3222,6 +3233,7 @@ dependencies = [ "aws-smithy-http", "aws-types", "hyper", + "leaky-bucket", "metrics", "once_cell", "pin-project-lite", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 0877a38dd9..5da02293a8 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -25,6 +25,8 @@ utils.workspace = true pin-project-lite.workspace = true workspace_hack.workspace = true +leaky-bucket = "1.0" + [dev-dependencies] tempfile.workspace = true test-context.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e0cc3ca543..f3ae2425f6 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -37,6 +37,8 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ +/// +/// IAM ratelimit should never be observed with caching credentials provider. pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; /// No limits on the client side, which currenltly means 1000 for AWS S3. /// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 0be8c72fe0..631caa6a48 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -21,10 +21,7 @@ use aws_sdk_s3::{ }; use aws_smithy_http::body::SdkBody; use hyper::Body; -use tokio::{ - io::{self, AsyncRead}, - sync::Semaphore, -}; +use tokio::io; use tokio_util::io::ReaderStream; use tracing::debug; @@ -105,9 +102,8 @@ pub struct S3Bucket { prefix_in_bucket: Option, max_keys_per_list_response: Option, // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. - // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. - concurrency_limiter: Arc, + concurrency_limiter: Arc, } #[derive(Default)] @@ -158,12 +154,24 @@ impl S3Bucket { } prefix }); + + let rps = aws_config.concurrency_limit.get(); + let concurrency_limiter = leaky_bucket::RateLimiter::builder() + .max(rps) + .initial(0) + // refill it by rps every second. this means the (rps+1)th request will have to wait for + // 1 second from earliest. + .refill(rps) + .interval(std::time::Duration::from_secs(1)) + .fair(true) + .build(); + Ok(Self { client, bucket_name: aws_config.bucket_name.clone(), max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, - concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), + concurrency_limiter: Arc::new(concurrency_limiter), }) } @@ -195,13 +203,10 @@ impl S3Bucket { } async fn download_object(&self, request: GetObjectRequest) -> Result { - let permit = self - .concurrency_limiter - .clone() - .acquire_owned() - .await - .context("Concurrency limiter semaphore got closed during S3 download") - .map_err(DownloadError::Other)?; + // while the download could take a long time with `leaky_bucket` we have nothing to release + // once the download is done. this is because with "requests per second" rate limiting on + // s3, there should be no meaning for the long requests. + self.concurrency_limiter.clone().acquire_owned(1).await; metrics::inc_get_object(); @@ -219,10 +224,9 @@ impl S3Bucket { let metadata = object_output.metadata().cloned().map(StorageMetadata); Ok(Download { metadata, - download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new( - permit, + download_stream: Box::pin(io::BufReader::new( object_output.body.into_async_read(), - ))), + )), }) } Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { @@ -238,32 +242,6 @@ impl S3Bucket { } } -pin_project_lite::pin_project! { - /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. - struct RatelimitedAsyncRead { - permit: tokio::sync::OwnedSemaphorePermit, - #[pin] - inner: S, - } -} - -impl RatelimitedAsyncRead { - fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { - RatelimitedAsyncRead { permit, inner } - } -} - -impl AsyncRead for RatelimitedAsyncRead { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut io::ReadBuf<'_>, - ) -> std::task::Poll> { - let this = self.project(); - this.inner.poll_read(cx, buf) - } -} - #[async_trait::async_trait] impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` @@ -289,12 +267,7 @@ impl RemoteStorage for S3Bucket { let mut continuation_token = None; loop { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 list") - .map_err(DownloadError::Other)?; + self.concurrency_limiter.acquire_one().await; metrics::inc_list_objects(); @@ -339,11 +312,9 @@ impl RemoteStorage for S3Bucket { to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 upload")?; + // similarly to downloads, the permit does not have live through the upload, but instead we + // are rate limiting requests per second. + self.concurrency_limiter.acquire_one().await; metrics::inc_put_object(); @@ -398,11 +369,7 @@ impl RemoteStorage for S3Bucket { } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 delete")?; + self.concurrency_limiter.acquire_one().await; metrics::inc_delete_object(); From a560b28829f25d6be033cba589c6cbbf85dc55a1 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 26 May 2023 16:19:36 +0300 Subject: [PATCH 3/5] Make new tenant/timeline IDs mandatory in create APIs. (#4304) We used to generate the ID, if the caller didn't specify it. That's bad practice, however, because network is never fully reliable, so it's possible we create a new tenant but the caller doesn't know about it, and because it doesn't know the tenant ID, it has no way of retrying or checking if it succeeded. To discourage that, make it mandatory. The web control plane has not relied on the auto-generation for a long time. --- control_plane/src/pageserver.rs | 7 ++++++ .../compute_wrapper/shell/compute.sh | 20 +++++++++++----- libs/pageserver_api/src/models.rs | 16 ++++++------- pageserver/src/http/openapi_spec.yml | 4 ++++ pageserver/src/http/routes.rs | 17 ++++---------- test_runner/fixtures/pageserver/http.py | 8 +++---- test_runner/regress/test_auth.py | 19 ++++++++------- test_runner/regress/test_tenants.py | 23 ++++++++++--------- 8 files changed, 62 insertions(+), 52 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 6309494b71..149cfd00cb 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -370,6 +370,10 @@ impl PageServerNode { .remove("evictions_low_residence_duration_metric_threshold") .map(|x| x.to_string()), }; + + // If tenant ID was not specified, generate one + let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate()); + let request = models::TenantCreateRequest { new_tenant_id, config, @@ -495,6 +499,9 @@ impl PageServerNode { ancestor_timeline_id: Option, pg_version: Option, ) -> anyhow::Result { + // If timeline ID was not specified, generate one + let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate()); + self.http_request( Method::POST, format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id), diff --git a/docker-compose/compute_wrapper/shell/compute.sh b/docker-compose/compute_wrapper/shell/compute.sh index cef2b485f3..22660a63ce 100755 --- a/docker-compose/compute_wrapper/shell/compute.sh +++ b/docker-compose/compute_wrapper/shell/compute.sh @@ -1,6 +1,14 @@ #!/bin/bash set -eux +# Generate a random tenant or timeline ID +# +# Takes a variable name as argument. The result is stored in that variable. +generate_id() { + local -n resvar=$1 + printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM +} + PG_VERSION=${PG_VERSION:-14} SPEC_FILE_ORG=/var/db/postgres/specs/spec.json @@ -13,29 +21,29 @@ done echo "Page server is ready." echo "Create a tenant and timeline" +generate_id tenant_id PARAMS=( -sb -X POST -H "Content-Type: application/json" - -d "{}" + -d "{\"new_tenant_id\": \"${tenant_id}\"}" http://pageserver:9898/v1/tenant/ ) -tenant_id=$(curl "${PARAMS[@]}" | sed 's/"//g') +result=$(curl "${PARAMS[@]}") +echo $result | jq . +generate_id timeline_id PARAMS=( -sb -X POST -H "Content-Type: application/json" - -d "{\"tenant_id\":\"${tenant_id}\", \"pg_version\": ${PG_VERSION}}" + -d "{\"new_timeline_id\": \"${timeline_id}\", \"pg_version\": ${PG_VERSION}}" "http://pageserver:9898/v1/tenant/${tenant_id}/timeline/" ) result=$(curl "${PARAMS[@]}") echo $result | jq . echo "Overwrite tenant id and timeline id in spec file" -tenant_id=$(echo ${result} | jq -r .tenant_id) -timeline_id=$(echo ${result} | jq -r .timeline_id) - sed "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE_ORG} > ${SPEC_FILE} sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE} diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 3927ba3dad..540633d113 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -118,9 +118,8 @@ pub enum TimelineState { #[serde_as] #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { - #[serde(default)] - #[serde_as(as = "Option")] - pub new_timeline_id: Option, + #[serde_as(as = "DisplayFromStr")] + pub new_timeline_id: TimelineId, #[serde(default)] #[serde_as(as = "Option")] pub ancestor_timeline_id: Option, @@ -131,12 +130,11 @@ pub struct TimelineCreateRequest { } #[serde_as] -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TenantCreateRequest { - #[serde(default)] - #[serde_as(as = "Option")] - pub new_tenant_id: Option, + #[serde_as(as = "DisplayFromStr")] + pub new_tenant_id: TenantId, #[serde(flatten)] pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it } @@ -184,10 +182,10 @@ pub struct StatusResponse { } impl TenantCreateRequest { - pub fn new(new_tenant_id: Option) -> TenantCreateRequest { + pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest { TenantCreateRequest { new_tenant_id, - ..Default::default() + config: TenantConfig::default(), } } } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index e23d3f3a20..0d912c95e0 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -678,6 +678,8 @@ paths: application/json: schema: type: object + required: + - new_timeline_id properties: new_timeline_id: type: string @@ -936,6 +938,8 @@ components: allOf: - $ref: '#/components/schemas/TenantConfig' - type: object + required: + - new_tenant_id properties: new_tenant_id: type: string diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 25e0d88e70..30c219f773 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -301,9 +301,7 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result Err(ApiError::InternalServerError(err)), } } - .instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version)) + .instrument(info_span!("timeline_create", tenant = %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version)) .await } @@ -764,6 +762,8 @@ pub fn html_response(status: StatusCode, data: String) -> Result, } async fn tenant_create_handler(mut request: Request) -> Result, ApiError> { + let request_data: TenantCreateRequest = json_request(&mut request).await?; + let target_tenant_id = request_data.new_tenant_id; check_permission(&request, None)?; let _timer = STORAGE_TIME_GLOBAL @@ -771,17 +771,10 @@ async fn tenant_create_handler(mut request: Request) -> Result TenantId: if conf is not None: assert "new_tenant_id" not in conf.keys() res = self.post( f"http://localhost:{self.port}/v1/tenant", json={ - "new_tenant_id": str(new_tenant_id) if new_tenant_id else None, + "new_tenant_id": str(new_tenant_id), **(conf or {}), }, ) @@ -293,13 +293,13 @@ class PageserverHttpClient(requests.Session): self, pg_version: PgVersion, tenant_id: TenantId, - new_timeline_id: Optional[TimelineId] = None, + new_timeline_id: TimelineId, ancestor_timeline_id: Optional[TimelineId] = None, ancestor_start_lsn: Optional[Lsn] = None, **kwargs, ) -> Dict[Any, Any]: body: Dict[str, Any] = { - "new_timeline_id": str(new_timeline_id) if new_timeline_id else None, + "new_timeline_id": str(new_timeline_id), "ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None, "ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None, } diff --git a/test_runner/regress/test_auth.py b/test_runner/regress/test_auth.py index 3e4a0bfbbb..fb79748832 100644 --- a/test_runner/regress/test_auth.py +++ b/test_runner/regress/test_auth.py @@ -3,7 +3,7 @@ from contextlib import closing import pytest from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol from fixtures.pageserver.http import PageserverApiException -from fixtures.types import TenantId +from fixtures.types import TenantId, TimelineId def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): @@ -25,21 +25,19 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): ps.safe_psql("set FOO", password=tenant_token) ps.safe_psql("set FOO", password=pageserver_token) - new_timeline_id = env.neon_cli.create_branch( - "test_pageserver_auth", tenant_id=env.initial_tenant - ) - # tenant can create branches tenant_http_client.timeline_create( pg_version=env.pg_version, tenant_id=env.initial_tenant, - ancestor_timeline_id=new_timeline_id, + new_timeline_id=TimelineId.generate(), + ancestor_timeline_id=env.initial_timeline, ) # console can create branches for tenant pageserver_http_client.timeline_create( pg_version=env.pg_version, tenant_id=env.initial_tenant, - ancestor_timeline_id=new_timeline_id, + new_timeline_id=TimelineId.generate(), + ancestor_timeline_id=env.initial_timeline, ) # fail to create branch using token with different tenant_id @@ -49,18 +47,19 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): invalid_tenant_http_client.timeline_create( pg_version=env.pg_version, tenant_id=env.initial_tenant, - ancestor_timeline_id=new_timeline_id, + new_timeline_id=TimelineId.generate(), + ancestor_timeline_id=env.initial_timeline, ) # create tenant using management token - pageserver_http_client.tenant_create() + pageserver_http_client.tenant_create(TenantId.generate()) # fail to create tenant using tenant token with pytest.raises( PageserverApiException, match="Forbidden: Attempt to access management api with tenant scope. Permission denied", ): - tenant_http_client.tenant_create() + tenant_http_client.tenant_create(TenantId.generate()) def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder): diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 5642449ce6..6599fa7ba5 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -314,21 +314,22 @@ def test_pageserver_with_empty_tenants( client = env.pageserver.http_client() - tenant_with_empty_timelines_dir = client.tenant_create() - temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir) + tenant_with_empty_timelines = TenantId.generate() + client.tenant_create(tenant_with_empty_timelines) + temp_timelines = client.timeline_list(tenant_with_empty_timelines) for temp_timeline in temp_timelines: client.timeline_delete( - tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"]) + tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) ) files_in_timelines_dir = sum( 1 for _p in Path.iterdir( - Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines" + Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines) / "timelines" ) ) assert ( files_in_timelines_dir == 0 - ), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory" + ), f"Tenant {tenant_with_empty_timelines} should have an empty timelines/ directory" # Trigger timeline re-initialization after pageserver restart env.endpoints.stop_all() @@ -356,15 +357,15 @@ def test_pageserver_with_empty_tenants( assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*") - [loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)] + [loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)] assert ( loaded_tenant["state"]["slug"] == "Active" - ), "Tenant {tenant_with_empty_timelines_dir} with empty timelines dir should be active and ready for timeline creation" + ), "Tenant {tenant_with_empty_timelines} with empty timelines dir should be active and ready for timeline creation" - loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines_dir) + loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines) assert ( loaded_tenant_status["state"]["slug"] == "Active" - ), f"Tenant {tenant_with_empty_timelines_dir} without timelines dir should be active" + ), f"Tenant {tenant_with_empty_timelines} without timelines dir should be active" time.sleep(1) # to allow metrics propagation @@ -374,7 +375,7 @@ def test_pageserver_with_empty_tenants( "state": "Broken", } active_tenants_metric_filter = { - "tenant_id": str(tenant_with_empty_timelines_dir), + "tenant_id": str(tenant_with_empty_timelines), "state": "Active", } @@ -386,7 +387,7 @@ def test_pageserver_with_empty_tenants( assert ( tenant_active_count == 1 - ), f"Tenant {tenant_with_empty_timelines_dir} should have metric as active" + ), f"Tenant {tenant_with_empty_timelines} should have metric as active" tenant_broken_count = int( ps_metrics.query_one( From 339a3e314609ed00e675b455d0fdb98e908394d2 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 26 May 2023 14:49:42 +0100 Subject: [PATCH 4/5] GitHub Autocomment: comment commits for branches (#4335) ## Problem GitHub Autocomment script posts a comment only for PRs. It's harder to debug failed tests on main or release branches. ## Summary of changes - Change the GitHub Autocomment script to be able to post a comment to either a PR or a commit of a branch --- .github/workflows/build_and_test.yml | 6 +-- ...-test-report.js => comment-test-report.js} | 37 +++++++++++++++---- 2 files changed, 31 insertions(+), 12 deletions(-) rename scripts/{pr-comment-test-report.js => comment-test-report.js} (85%) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 6d89ce9994..336dea04eb 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -407,9 +407,7 @@ jobs: uses: ./.github/actions/allure-report-generate - uses: actions/github-script@v6 - if: > - !cancelled() && - github.event_name == 'pull_request' + if: ${{ !cancelled() }} with: # Retry script for 5XX server errors: https://github.com/actions/github-script#retries retries: 5 @@ -419,7 +417,7 @@ jobs: reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}", } - const script = require("./scripts/pr-comment-test-report.js") + const script = require("./scripts/comment-test-report.js") await script({ github, context, diff --git a/scripts/pr-comment-test-report.js b/scripts/comment-test-report.js similarity index 85% rename from scripts/pr-comment-test-report.js rename to scripts/comment-test-report.js index 3a7bba0daa..a7fd5b0bef 100644 --- a/scripts/pr-comment-test-report.js +++ b/scripts/comment-test-report.js @@ -1,5 +1,5 @@ // -// The script parses Allure reports and posts a comment with a summary of the test results to the PR. +// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch. // // The comment is updated on each run with the latest results. // @@ -7,7 +7,7 @@ // - uses: actions/github-script@v6 // with: // script: | -// const script = require("./scripts/pr-comment-test-report.js") +// const script = require("./scripts/comment-test-report.js") // await script({ // github, // context, @@ -35,8 +35,12 @@ class DefaultMap extends Map { module.exports = async ({ github, context, fetch, report }) => { // Marker to find the comment in the subsequent runs const startMarker = `` + // If we run the script in the PR or in the branch (main/release/...) + const isPullRequest = !!context.payload.pull_request + // Latest commit in PR or in the branch + const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha // Let users know that the comment is updated automatically - const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:
` + const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${commitSha} at ${new Date().toISOString()} :recycle:
` // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) const githubActionsBotId = 41898282 // Commend body itself @@ -166,22 +170,39 @@ module.exports = async ({ github, context, fetch, report }) => { commentBody += autoupdateNotice - const { data: comments } = await github.rest.issues.listComments({ - issue_number: context.payload.number, + let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha + if (isPullRequest) { + createCommentFn = github.rest.issues.createComment + listCommentsFn = github.rest.issues.listComments + updateCommentFn = github.rest.issues.updateComment + issueNumberOrSha = { + issue_number: context.payload.number, + } + } else { + updateCommentFn = github.rest.repos.updateCommitComment + listCommentsFn = github.rest.repos.listCommentsForCommit + createCommentFn = github.rest.repos.createCommitComment + issueNumberOrSha = { + commit_sha: commitSha, + } + } + + const { data: comments } = await listCommentsFn({ + ...issueNumberOrSha, ...ownerRepoParams, }) const comment = comments.find(comment => comment.user.id === githubActionsBotId && comment.body.startsWith(startMarker)) if (comment) { - await github.rest.issues.updateComment({ + await updateCommentFn({ comment_id: comment.id, body: commentBody, ...ownerRepoParams, }) } else { - await github.rest.issues.createComment({ - issue_number: context.payload.number, + await createCommentFn({ body: commentBody, + ...issueNumberOrSha, ...ownerRepoParams, }) } From be177f82dc5c9aa8166a3fdfbc03dbd8105d0c59 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 26 May 2023 18:37:17 +0300 Subject: [PATCH 5/5] Revert "Allow for higher s3 concurrency (#4292)" (#4356) This reverts commit 024109fbeb533b4574976a5899c27f56891de881 for it failing to be speed up anything, but run into more errors. See: #3698. --- Cargo.lock | 12 ---- libs/remote_storage/Cargo.toml | 2 - libs/remote_storage/src/lib.rs | 2 - libs/remote_storage/src/s3_bucket.rs | 85 +++++++++++++++++++--------- 4 files changed, 59 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69d161d2b1..d390df94e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2040,17 +2040,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "leaky-bucket" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7" -dependencies = [ - "parking_lot", - "tokio", - "tracing", -] - [[package]] name = "libc" version = "0.2.144" @@ -3233,7 +3222,6 @@ dependencies = [ "aws-smithy-http", "aws-types", "hyper", - "leaky-bucket", "metrics", "once_cell", "pin-project-lite", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 5da02293a8..0877a38dd9 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -25,8 +25,6 @@ utils.workspace = true pin-project-lite.workspace = true workspace_hack.workspace = true -leaky-bucket = "1.0" - [dev-dependencies] tempfile.workspace = true test-context.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index f3ae2425f6..e0cc3ca543 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -37,8 +37,6 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ -/// -/// IAM ratelimit should never be observed with caching credentials provider. pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; /// No limits on the client side, which currenltly means 1000 for AWS S3. /// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 631caa6a48..0be8c72fe0 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -21,7 +21,10 @@ use aws_sdk_s3::{ }; use aws_smithy_http::body::SdkBody; use hyper::Body; -use tokio::io; +use tokio::{ + io::{self, AsyncRead}, + sync::Semaphore, +}; use tokio_util::io::ReaderStream; use tracing::debug; @@ -102,8 +105,9 @@ pub struct S3Bucket { prefix_in_bucket: Option, max_keys_per_list_response: Option, // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. + // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. - concurrency_limiter: Arc, + concurrency_limiter: Arc, } #[derive(Default)] @@ -154,24 +158,12 @@ impl S3Bucket { } prefix }); - - let rps = aws_config.concurrency_limit.get(); - let concurrency_limiter = leaky_bucket::RateLimiter::builder() - .max(rps) - .initial(0) - // refill it by rps every second. this means the (rps+1)th request will have to wait for - // 1 second from earliest. - .refill(rps) - .interval(std::time::Duration::from_secs(1)) - .fair(true) - .build(); - Ok(Self { client, bucket_name: aws_config.bucket_name.clone(), max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, - concurrency_limiter: Arc::new(concurrency_limiter), + concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), }) } @@ -203,10 +195,13 @@ impl S3Bucket { } async fn download_object(&self, request: GetObjectRequest) -> Result { - // while the download could take a long time with `leaky_bucket` we have nothing to release - // once the download is done. this is because with "requests per second" rate limiting on - // s3, there should be no meaning for the long requests. - self.concurrency_limiter.clone().acquire_owned(1).await; + let permit = self + .concurrency_limiter + .clone() + .acquire_owned() + .await + .context("Concurrency limiter semaphore got closed during S3 download") + .map_err(DownloadError::Other)?; metrics::inc_get_object(); @@ -224,9 +219,10 @@ impl S3Bucket { let metadata = object_output.metadata().cloned().map(StorageMetadata); Ok(Download { metadata, - download_stream: Box::pin(io::BufReader::new( + download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new( + permit, object_output.body.into_async_read(), - )), + ))), }) } Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { @@ -242,6 +238,32 @@ impl S3Bucket { } } +pin_project_lite::pin_project! { + /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. + struct RatelimitedAsyncRead { + permit: tokio::sync::OwnedSemaphorePermit, + #[pin] + inner: S, + } +} + +impl RatelimitedAsyncRead { + fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { + RatelimitedAsyncRead { permit, inner } + } +} + +impl AsyncRead for RatelimitedAsyncRead { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.inner.poll_read(cx, buf) + } +} + #[async_trait::async_trait] impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` @@ -267,7 +289,12 @@ impl RemoteStorage for S3Bucket { let mut continuation_token = None; loop { - self.concurrency_limiter.acquire_one().await; + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 list") + .map_err(DownloadError::Other)?; metrics::inc_list_objects(); @@ -312,9 +339,11 @@ impl RemoteStorage for S3Bucket { to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { - // similarly to downloads, the permit does not have live through the upload, but instead we - // are rate limiting requests per second. - self.concurrency_limiter.acquire_one().await; + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 upload")?; metrics::inc_put_object(); @@ -369,7 +398,11 @@ impl RemoteStorage for S3Bucket { } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - self.concurrency_limiter.acquire_one().await; + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 delete")?; metrics::inc_delete_object();