From 2b25f0dfa08ec1f6d6f73fd08481571f406c437d Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Thu, 25 May 2023 22:05:11 +0100 Subject: [PATCH 01/10] Fix flakiness of test_metric_collection (#4346) ## Problem Test `test_metric_collection` become flaky: ``` AssertionError: assert not ['2023-05-25T14:03:41.644042Z ERROR metrics_collection: failed to send metrics: reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Domain("localhost")), port: Some(18022), path: "/billing/api/v1/usage_events", query: None, fragment: None }, source: hyper::Error(Connect, ConnectError("tcp connect error", Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" })) }', ...] ``` I suspect it is caused by having 2 places when we define `httpserver_listen_address` fixture (which is internally used by `pytest-httpserver` plugin) ## Summary of changes - Remove the definition of `httpserver_listen_address` from `test_runner/regress/test_ddl_forwarding.py` and keep one in `test_runner/fixtures/neon_fixtures.py` - Also remote unused `httpserver_listen_address` parameter from `test_proxy_metric_collection` --- test_runner/regress/test_ddl_forwarding.py | 11 +---------- test_runner/regress/test_metric_collection.py | 1 - 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/test_runner/regress/test_ddl_forwarding.py b/test_runner/regress/test_ddl_forwarding.py index 27ebd3c181..6bfa8fdbe7 100644 --- a/test_runner/regress/test_ddl_forwarding.py +++ b/test_runner/regress/test_ddl_forwarding.py @@ -4,21 +4,12 @@ from typing import Any, Dict, List, Optional, Tuple, Type import psycopg2 import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import ( - PortDistributor, - VanillaPostgres, -) +from fixtures.neon_fixtures import VanillaPostgres from pytest_httpserver import HTTPServer from werkzeug.wrappers.request import Request from werkzeug.wrappers.response import Response -@pytest.fixture(scope="session") -def httpserver_listen_address(port_distributor: PortDistributor): - port = port_distributor.get_port() - return ("localhost", port) - - def handle_db(dbs, roles, operation): if operation["op"] == "set": if "old_name" in operation and operation["old_name"] in dbs: diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index 00ea77f2e7..12e695bcbd 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -228,7 +228,6 @@ def proxy_with_metric_collector( @pytest.mark.asyncio async def test_proxy_metric_collection( httpserver: HTTPServer, - httpserver_listen_address, proxy_with_metric_collector: NeonProxy, vanilla_pg: VanillaPostgres, ): From 1367e2b0eebc08a9add60f087a5024014ab4d7a5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 26 May 2023 09:31:44 +0200 Subject: [PATCH 02/10] improve TenantState doc comments, repeating what's in the Mermaid diagram --- libs/pageserver_api/src/models.rs | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 5d1ac5ee15..e6130e0595 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -55,19 +55,38 @@ use bytes::{BufMut, Bytes, BytesMut}; )] #[serde(tag = "slug", content = "data")] pub enum TenantState { - /// This tenant is being loaded from local disk + /// This tenant is being loaded from local disk. + /// + /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. Loading, /// This tenant is being attached to the pageserver. + /// + /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. Attaching, /// The tenant is transitioning from Loading/Attaching to Active. + /// + /// While in this state, the individual timelines are being activated. + /// + /// `set_stopping()` and `set_broken()` do not work in this state and wait for it to pass. Activating, /// The tenant has finished activating and is open for business. + /// + /// Transitions out of this state are possible through `set_stopping()` and `set_broken()`. Active, - /// A tenant is recognized by pageserver, but it is being detached or the + /// The tenant is recognized by pageserver, but it is being detached or the /// system is being shut down. + /// + /// Transitions out of this state are possible through `set_broken()`. Stopping, - /// A tenant is recognized by the pageserver, but can no longer be used for - /// any operations, because it failed to be activated. + /// The tenant is recognized by the pageserver, but can no longer be used for + /// any operations. + /// + /// If the tenant fails to load or attach, it will transition to this state + /// and it is guaranteed that no background tasks are running in its name. + /// + /// The other way to transition into this state is from `Stopping` state + /// through `set_broken()` called from `remove_tenant_from_memory()`. That happens + /// if the cleanup future executed by `remove_tenant_from_memory()` fails. Broken { reason: String, backtrace: String }, } From b09beaa4fe3fca71dbeace2845e60d987229f643 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 26 May 2023 09:34:12 +0200 Subject: [PATCH 03/10] log while waiting for tenant to finish activation --- pageserver/src/tenant.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a3a1cb3ca5..f03443b6a7 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1716,7 +1716,13 @@ impl Tenant { // cannot stop before we're done activating, so wait out until we're done activating rx.wait_for(|state| match state { - TenantState::Activating | TenantState::Loading | TenantState::Attaching => false, // TODO log that we're waiting + TenantState::Activating | TenantState::Loading | TenantState::Attaching => { + info!( + "waiting for {} to turn Active|Broken|Stopping", + <&'static str>::from(state) + ); + false + } TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true, }) .await @@ -1770,7 +1776,13 @@ impl Tenant { // The load & attach routines own the tenant state until it has reached `Active`. // So, wait until it's done. rx.wait_for(|state| match state { - TenantState::Activating | TenantState::Loading | TenantState::Attaching => false, // TODO log that we're waiting + TenantState::Activating | TenantState::Loading | TenantState::Attaching => { + info!( + "waiting for {} to turn Active|Broken|Stopping", + <&'static str>::from(state) + ); + false + } TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping {} => true, }) .await From 024109fbeb533b4574976a5899c27f56891de881 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 26 May 2023 13:35:50 +0300 Subject: [PATCH 04/10] Allow for higher s3 concurrency (#4292) We currently have a semaphore based rate limiter which we hope will keep us under S3 limits. However, the semaphore does not consider time, so I've been hesitant to raise the concurrency limit of 100. See #3698. The PR Introduces a leaky-bucket based rate limiter instead of the `tokio::sync::Semaphore` which will allow us to raise the limit later on. The configuration changes are not contained here. --- Cargo.lock | 12 ++++ libs/remote_storage/Cargo.toml | 2 + libs/remote_storage/src/lib.rs | 2 + libs/remote_storage/src/s3_bucket.rs | 85 +++++++++------------------- 4 files changed, 42 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d390df94e0..69d161d2b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2040,6 +2040,17 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leaky-bucket" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7" +dependencies = [ + "parking_lot", + "tokio", + "tracing", +] + [[package]] name = "libc" version = "0.2.144" @@ -3222,6 +3233,7 @@ dependencies = [ "aws-smithy-http", "aws-types", "hyper", + "leaky-bucket", "metrics", "once_cell", "pin-project-lite", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 0877a38dd9..5da02293a8 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -25,6 +25,8 @@ utils.workspace = true pin-project-lite.workspace = true workspace_hack.workspace = true +leaky-bucket = "1.0" + [dev-dependencies] tempfile.workspace = true test-context.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e0cc3ca543..f3ae2425f6 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -37,6 +37,8 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ +/// +/// IAM ratelimit should never be observed with caching credentials provider. pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; /// No limits on the client side, which currenltly means 1000 for AWS S3. /// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 0be8c72fe0..631caa6a48 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -21,10 +21,7 @@ use aws_sdk_s3::{ }; use aws_smithy_http::body::SdkBody; use hyper::Body; -use tokio::{ - io::{self, AsyncRead}, - sync::Semaphore, -}; +use tokio::io; use tokio_util::io::ReaderStream; use tracing::debug; @@ -105,9 +102,8 @@ pub struct S3Bucket { prefix_in_bucket: Option, max_keys_per_list_response: Option, // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. - // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. - concurrency_limiter: Arc, + concurrency_limiter: Arc, } #[derive(Default)] @@ -158,12 +154,24 @@ impl S3Bucket { } prefix }); + + let rps = aws_config.concurrency_limit.get(); + let concurrency_limiter = leaky_bucket::RateLimiter::builder() + .max(rps) + .initial(0) + // refill it by rps every second. this means the (rps+1)th request will have to wait for + // 1 second from earliest. + .refill(rps) + .interval(std::time::Duration::from_secs(1)) + .fair(true) + .build(); + Ok(Self { client, bucket_name: aws_config.bucket_name.clone(), max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, - concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), + concurrency_limiter: Arc::new(concurrency_limiter), }) } @@ -195,13 +203,10 @@ impl S3Bucket { } async fn download_object(&self, request: GetObjectRequest) -> Result { - let permit = self - .concurrency_limiter - .clone() - .acquire_owned() - .await - .context("Concurrency limiter semaphore got closed during S3 download") - .map_err(DownloadError::Other)?; + // while the download could take a long time with `leaky_bucket` we have nothing to release + // once the download is done. this is because with "requests per second" rate limiting on + // s3, there should be no meaning for the long requests. + self.concurrency_limiter.clone().acquire_owned(1).await; metrics::inc_get_object(); @@ -219,10 +224,9 @@ impl S3Bucket { let metadata = object_output.metadata().cloned().map(StorageMetadata); Ok(Download { metadata, - download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new( - permit, + download_stream: Box::pin(io::BufReader::new( object_output.body.into_async_read(), - ))), + )), }) } Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { @@ -238,32 +242,6 @@ impl S3Bucket { } } -pin_project_lite::pin_project! { - /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. - struct RatelimitedAsyncRead { - permit: tokio::sync::OwnedSemaphorePermit, - #[pin] - inner: S, - } -} - -impl RatelimitedAsyncRead { - fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { - RatelimitedAsyncRead { permit, inner } - } -} - -impl AsyncRead for RatelimitedAsyncRead { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut io::ReadBuf<'_>, - ) -> std::task::Poll> { - let this = self.project(); - this.inner.poll_read(cx, buf) - } -} - #[async_trait::async_trait] impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` @@ -289,12 +267,7 @@ impl RemoteStorage for S3Bucket { let mut continuation_token = None; loop { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 list") - .map_err(DownloadError::Other)?; + self.concurrency_limiter.acquire_one().await; metrics::inc_list_objects(); @@ -339,11 +312,9 @@ impl RemoteStorage for S3Bucket { to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 upload")?; + // similarly to downloads, the permit does not have live through the upload, but instead we + // are rate limiting requests per second. + self.concurrency_limiter.acquire_one().await; metrics::inc_put_object(); @@ -398,11 +369,7 @@ impl RemoteStorage for S3Bucket { } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 delete")?; + self.concurrency_limiter.acquire_one().await; metrics::inc_delete_object(); From a560b28829f25d6be033cba589c6cbbf85dc55a1 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 26 May 2023 16:19:36 +0300 Subject: [PATCH 05/10] Make new tenant/timeline IDs mandatory in create APIs. (#4304) We used to generate the ID, if the caller didn't specify it. That's bad practice, however, because network is never fully reliable, so it's possible we create a new tenant but the caller doesn't know about it, and because it doesn't know the tenant ID, it has no way of retrying or checking if it succeeded. To discourage that, make it mandatory. The web control plane has not relied on the auto-generation for a long time. --- control_plane/src/pageserver.rs | 7 ++++++ .../compute_wrapper/shell/compute.sh | 20 +++++++++++----- libs/pageserver_api/src/models.rs | 16 ++++++------- pageserver/src/http/openapi_spec.yml | 4 ++++ pageserver/src/http/routes.rs | 17 ++++---------- test_runner/fixtures/pageserver/http.py | 8 +++---- test_runner/regress/test_auth.py | 19 ++++++++------- test_runner/regress/test_tenants.py | 23 ++++++++++--------- 8 files changed, 62 insertions(+), 52 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 6309494b71..149cfd00cb 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -370,6 +370,10 @@ impl PageServerNode { .remove("evictions_low_residence_duration_metric_threshold") .map(|x| x.to_string()), }; + + // If tenant ID was not specified, generate one + let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate()); + let request = models::TenantCreateRequest { new_tenant_id, config, @@ -495,6 +499,9 @@ impl PageServerNode { ancestor_timeline_id: Option, pg_version: Option, ) -> anyhow::Result { + // If timeline ID was not specified, generate one + let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate()); + self.http_request( Method::POST, format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id), diff --git a/docker-compose/compute_wrapper/shell/compute.sh b/docker-compose/compute_wrapper/shell/compute.sh index cef2b485f3..22660a63ce 100755 --- a/docker-compose/compute_wrapper/shell/compute.sh +++ b/docker-compose/compute_wrapper/shell/compute.sh @@ -1,6 +1,14 @@ #!/bin/bash set -eux +# Generate a random tenant or timeline ID +# +# Takes a variable name as argument. The result is stored in that variable. +generate_id() { + local -n resvar=$1 + printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM +} + PG_VERSION=${PG_VERSION:-14} SPEC_FILE_ORG=/var/db/postgres/specs/spec.json @@ -13,29 +21,29 @@ done echo "Page server is ready." echo "Create a tenant and timeline" +generate_id tenant_id PARAMS=( -sb -X POST -H "Content-Type: application/json" - -d "{}" + -d "{\"new_tenant_id\": \"${tenant_id}\"}" http://pageserver:9898/v1/tenant/ ) -tenant_id=$(curl "${PARAMS[@]}" | sed 's/"//g') +result=$(curl "${PARAMS[@]}") +echo $result | jq . +generate_id timeline_id PARAMS=( -sb -X POST -H "Content-Type: application/json" - -d "{\"tenant_id\":\"${tenant_id}\", \"pg_version\": ${PG_VERSION}}" + -d "{\"new_timeline_id\": \"${timeline_id}\", \"pg_version\": ${PG_VERSION}}" "http://pageserver:9898/v1/tenant/${tenant_id}/timeline/" ) result=$(curl "${PARAMS[@]}") echo $result | jq . echo "Overwrite tenant id and timeline id in spec file" -tenant_id=$(echo ${result} | jq -r .tenant_id) -timeline_id=$(echo ${result} | jq -r .timeline_id) - sed "s/TENANT_ID/${tenant_id}/" ${SPEC_FILE_ORG} > ${SPEC_FILE} sed -i "s/TIMELINE_ID/${timeline_id}/" ${SPEC_FILE} diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 3927ba3dad..540633d113 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -118,9 +118,8 @@ pub enum TimelineState { #[serde_as] #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { - #[serde(default)] - #[serde_as(as = "Option")] - pub new_timeline_id: Option, + #[serde_as(as = "DisplayFromStr")] + pub new_timeline_id: TimelineId, #[serde(default)] #[serde_as(as = "Option")] pub ancestor_timeline_id: Option, @@ -131,12 +130,11 @@ pub struct TimelineCreateRequest { } #[serde_as] -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TenantCreateRequest { - #[serde(default)] - #[serde_as(as = "Option")] - pub new_tenant_id: Option, + #[serde_as(as = "DisplayFromStr")] + pub new_tenant_id: TenantId, #[serde(flatten)] pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it } @@ -184,10 +182,10 @@ pub struct StatusResponse { } impl TenantCreateRequest { - pub fn new(new_tenant_id: Option) -> TenantCreateRequest { + pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest { TenantCreateRequest { new_tenant_id, - ..Default::default() + config: TenantConfig::default(), } } } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index e23d3f3a20..0d912c95e0 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -678,6 +678,8 @@ paths: application/json: schema: type: object + required: + - new_timeline_id properties: new_timeline_id: type: string @@ -936,6 +938,8 @@ components: allOf: - $ref: '#/components/schemas/TenantConfig' - type: object + required: + - new_tenant_id properties: new_tenant_id: type: string diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 25e0d88e70..30c219f773 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -301,9 +301,7 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result Err(ApiError::InternalServerError(err)), } } - .instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version)) + .instrument(info_span!("timeline_create", tenant = %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version)) .await } @@ -764,6 +762,8 @@ pub fn html_response(status: StatusCode, data: String) -> Result, } async fn tenant_create_handler(mut request: Request) -> Result, ApiError> { + let request_data: TenantCreateRequest = json_request(&mut request).await?; + let target_tenant_id = request_data.new_tenant_id; check_permission(&request, None)?; let _timer = STORAGE_TIME_GLOBAL @@ -771,17 +771,10 @@ async fn tenant_create_handler(mut request: Request) -> Result TenantId: if conf is not None: assert "new_tenant_id" not in conf.keys() res = self.post( f"http://localhost:{self.port}/v1/tenant", json={ - "new_tenant_id": str(new_tenant_id) if new_tenant_id else None, + "new_tenant_id": str(new_tenant_id), **(conf or {}), }, ) @@ -293,13 +293,13 @@ class PageserverHttpClient(requests.Session): self, pg_version: PgVersion, tenant_id: TenantId, - new_timeline_id: Optional[TimelineId] = None, + new_timeline_id: TimelineId, ancestor_timeline_id: Optional[TimelineId] = None, ancestor_start_lsn: Optional[Lsn] = None, **kwargs, ) -> Dict[Any, Any]: body: Dict[str, Any] = { - "new_timeline_id": str(new_timeline_id) if new_timeline_id else None, + "new_timeline_id": str(new_timeline_id), "ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None, "ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None, } diff --git a/test_runner/regress/test_auth.py b/test_runner/regress/test_auth.py index 3e4a0bfbbb..fb79748832 100644 --- a/test_runner/regress/test_auth.py +++ b/test_runner/regress/test_auth.py @@ -3,7 +3,7 @@ from contextlib import closing import pytest from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol from fixtures.pageserver.http import PageserverApiException -from fixtures.types import TenantId +from fixtures.types import TenantId, TimelineId def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): @@ -25,21 +25,19 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): ps.safe_psql("set FOO", password=tenant_token) ps.safe_psql("set FOO", password=pageserver_token) - new_timeline_id = env.neon_cli.create_branch( - "test_pageserver_auth", tenant_id=env.initial_tenant - ) - # tenant can create branches tenant_http_client.timeline_create( pg_version=env.pg_version, tenant_id=env.initial_tenant, - ancestor_timeline_id=new_timeline_id, + new_timeline_id=TimelineId.generate(), + ancestor_timeline_id=env.initial_timeline, ) # console can create branches for tenant pageserver_http_client.timeline_create( pg_version=env.pg_version, tenant_id=env.initial_tenant, - ancestor_timeline_id=new_timeline_id, + new_timeline_id=TimelineId.generate(), + ancestor_timeline_id=env.initial_timeline, ) # fail to create branch using token with different tenant_id @@ -49,18 +47,19 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): invalid_tenant_http_client.timeline_create( pg_version=env.pg_version, tenant_id=env.initial_tenant, - ancestor_timeline_id=new_timeline_id, + new_timeline_id=TimelineId.generate(), + ancestor_timeline_id=env.initial_timeline, ) # create tenant using management token - pageserver_http_client.tenant_create() + pageserver_http_client.tenant_create(TenantId.generate()) # fail to create tenant using tenant token with pytest.raises( PageserverApiException, match="Forbidden: Attempt to access management api with tenant scope. Permission denied", ): - tenant_http_client.tenant_create() + tenant_http_client.tenant_create(TenantId.generate()) def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder): diff --git a/test_runner/regress/test_tenants.py b/test_runner/regress/test_tenants.py index 5642449ce6..6599fa7ba5 100644 --- a/test_runner/regress/test_tenants.py +++ b/test_runner/regress/test_tenants.py @@ -314,21 +314,22 @@ def test_pageserver_with_empty_tenants( client = env.pageserver.http_client() - tenant_with_empty_timelines_dir = client.tenant_create() - temp_timelines = client.timeline_list(tenant_with_empty_timelines_dir) + tenant_with_empty_timelines = TenantId.generate() + client.tenant_create(tenant_with_empty_timelines) + temp_timelines = client.timeline_list(tenant_with_empty_timelines) for temp_timeline in temp_timelines: client.timeline_delete( - tenant_with_empty_timelines_dir, TimelineId(temp_timeline["timeline_id"]) + tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"]) ) files_in_timelines_dir = sum( 1 for _p in Path.iterdir( - Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines_dir) / "timelines" + Path(env.repo_dir) / "tenants" / str(tenant_with_empty_timelines) / "timelines" ) ) assert ( files_in_timelines_dir == 0 - ), f"Tenant {tenant_with_empty_timelines_dir} should have an empty timelines/ directory" + ), f"Tenant {tenant_with_empty_timelines} should have an empty timelines/ directory" # Trigger timeline re-initialization after pageserver restart env.endpoints.stop_all() @@ -356,15 +357,15 @@ def test_pageserver_with_empty_tenants( assert env.pageserver.log_contains(".*Setting tenant as Broken state, reason:.*") - [loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)] + [loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)] assert ( loaded_tenant["state"]["slug"] == "Active" - ), "Tenant {tenant_with_empty_timelines_dir} with empty timelines dir should be active and ready for timeline creation" + ), "Tenant {tenant_with_empty_timelines} with empty timelines dir should be active and ready for timeline creation" - loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines_dir) + loaded_tenant_status = client.tenant_status(tenant_with_empty_timelines) assert ( loaded_tenant_status["state"]["slug"] == "Active" - ), f"Tenant {tenant_with_empty_timelines_dir} without timelines dir should be active" + ), f"Tenant {tenant_with_empty_timelines} without timelines dir should be active" time.sleep(1) # to allow metrics propagation @@ -374,7 +375,7 @@ def test_pageserver_with_empty_tenants( "state": "Broken", } active_tenants_metric_filter = { - "tenant_id": str(tenant_with_empty_timelines_dir), + "tenant_id": str(tenant_with_empty_timelines), "state": "Active", } @@ -386,7 +387,7 @@ def test_pageserver_with_empty_tenants( assert ( tenant_active_count == 1 - ), f"Tenant {tenant_with_empty_timelines_dir} should have metric as active" + ), f"Tenant {tenant_with_empty_timelines} should have metric as active" tenant_broken_count = int( ps_metrics.query_one( From 339a3e314609ed00e675b455d0fdb98e908394d2 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 26 May 2023 14:49:42 +0100 Subject: [PATCH 06/10] GitHub Autocomment: comment commits for branches (#4335) ## Problem GitHub Autocomment script posts a comment only for PRs. It's harder to debug failed tests on main or release branches. ## Summary of changes - Change the GitHub Autocomment script to be able to post a comment to either a PR or a commit of a branch --- .github/workflows/build_and_test.yml | 6 +-- ...-test-report.js => comment-test-report.js} | 37 +++++++++++++++---- 2 files changed, 31 insertions(+), 12 deletions(-) rename scripts/{pr-comment-test-report.js => comment-test-report.js} (85%) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 6d89ce9994..336dea04eb 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -407,9 +407,7 @@ jobs: uses: ./.github/actions/allure-report-generate - uses: actions/github-script@v6 - if: > - !cancelled() && - github.event_name == 'pull_request' + if: ${{ !cancelled() }} with: # Retry script for 5XX server errors: https://github.com/actions/github-script#retries retries: 5 @@ -419,7 +417,7 @@ jobs: reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}", } - const script = require("./scripts/pr-comment-test-report.js") + const script = require("./scripts/comment-test-report.js") await script({ github, context, diff --git a/scripts/pr-comment-test-report.js b/scripts/comment-test-report.js similarity index 85% rename from scripts/pr-comment-test-report.js rename to scripts/comment-test-report.js index 3a7bba0daa..a7fd5b0bef 100644 --- a/scripts/pr-comment-test-report.js +++ b/scripts/comment-test-report.js @@ -1,5 +1,5 @@ // -// The script parses Allure reports and posts a comment with a summary of the test results to the PR. +// The script parses Allure reports and posts a comment with a summary of the test results to the PR or to the latest commit in the branch. // // The comment is updated on each run with the latest results. // @@ -7,7 +7,7 @@ // - uses: actions/github-script@v6 // with: // script: | -// const script = require("./scripts/pr-comment-test-report.js") +// const script = require("./scripts/comment-test-report.js") // await script({ // github, // context, @@ -35,8 +35,12 @@ class DefaultMap extends Map { module.exports = async ({ github, context, fetch, report }) => { // Marker to find the comment in the subsequent runs const startMarker = `` + // If we run the script in the PR or in the branch (main/release/...) + const isPullRequest = !!context.payload.pull_request + // Latest commit in PR or in the branch + const commitSha = isPullRequest ? context.payload.pull_request.head.sha : context.sha // Let users know that the comment is updated automatically - const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${context.payload.pull_request.head.sha} at ${new Date().toISOString()} :recycle:
` + const autoupdateNotice = `
The comment gets automatically updated with the latest test results
${commitSha} at ${new Date().toISOString()} :recycle:
` // GitHub bot id taken from (https://api.github.com/users/github-actions[bot]) const githubActionsBotId = 41898282 // Commend body itself @@ -166,22 +170,39 @@ module.exports = async ({ github, context, fetch, report }) => { commentBody += autoupdateNotice - const { data: comments } = await github.rest.issues.listComments({ - issue_number: context.payload.number, + let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha + if (isPullRequest) { + createCommentFn = github.rest.issues.createComment + listCommentsFn = github.rest.issues.listComments + updateCommentFn = github.rest.issues.updateComment + issueNumberOrSha = { + issue_number: context.payload.number, + } + } else { + updateCommentFn = github.rest.repos.updateCommitComment + listCommentsFn = github.rest.repos.listCommentsForCommit + createCommentFn = github.rest.repos.createCommitComment + issueNumberOrSha = { + commit_sha: commitSha, + } + } + + const { data: comments } = await listCommentsFn({ + ...issueNumberOrSha, ...ownerRepoParams, }) const comment = comments.find(comment => comment.user.id === githubActionsBotId && comment.body.startsWith(startMarker)) if (comment) { - await github.rest.issues.updateComment({ + await updateCommentFn({ comment_id: comment.id, body: commentBody, ...ownerRepoParams, }) } else { - await github.rest.issues.createComment({ - issue_number: context.payload.number, + await createCommentFn({ body: commentBody, + ...issueNumberOrSha, ...ownerRepoParams, }) } From be177f82dc5c9aa8166a3fdfbc03dbd8105d0c59 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 26 May 2023 18:37:17 +0300 Subject: [PATCH 07/10] Revert "Allow for higher s3 concurrency (#4292)" (#4356) This reverts commit 024109fbeb533b4574976a5899c27f56891de881 for it failing to be speed up anything, but run into more errors. See: #3698. --- Cargo.lock | 12 ---- libs/remote_storage/Cargo.toml | 2 - libs/remote_storage/src/lib.rs | 2 - libs/remote_storage/src/s3_bucket.rs | 85 +++++++++++++++++++--------- 4 files changed, 59 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69d161d2b1..d390df94e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2040,17 +2040,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "leaky-bucket" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d615fd0b579225f0d3c8d781af50a73644b571da8b5b50053ef2dcfa60dd51e7" -dependencies = [ - "parking_lot", - "tokio", - "tracing", -] - [[package]] name = "libc" version = "0.2.144" @@ -3233,7 +3222,6 @@ dependencies = [ "aws-smithy-http", "aws-types", "hyper", - "leaky-bucket", "metrics", "once_cell", "pin-project-lite", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 5da02293a8..0877a38dd9 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -25,8 +25,6 @@ utils.workspace = true pin-project-lite.workspace = true workspace_hack.workspace = true -leaky-bucket = "1.0" - [dev-dependencies] tempfile.workspace = true test-context.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index f3ae2425f6..e0cc3ca543 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -37,8 +37,6 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/UsingWithRDS.IAMDBAuth.html /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ -/// -/// IAM ratelimit should never be observed with caching credentials provider. pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; /// No limits on the client side, which currenltly means 1000 for AWS S3. /// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 631caa6a48..0be8c72fe0 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -21,7 +21,10 @@ use aws_sdk_s3::{ }; use aws_smithy_http::body::SdkBody; use hyper::Body; -use tokio::io; +use tokio::{ + io::{self, AsyncRead}, + sync::Semaphore, +}; use tokio_util::io::ReaderStream; use tracing::debug; @@ -102,8 +105,9 @@ pub struct S3Bucket { prefix_in_bucket: Option, max_keys_per_list_response: Option, // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. + // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. // The helps to ensure we don't exceed the thresholds. - concurrency_limiter: Arc, + concurrency_limiter: Arc, } #[derive(Default)] @@ -154,24 +158,12 @@ impl S3Bucket { } prefix }); - - let rps = aws_config.concurrency_limit.get(); - let concurrency_limiter = leaky_bucket::RateLimiter::builder() - .max(rps) - .initial(0) - // refill it by rps every second. this means the (rps+1)th request will have to wait for - // 1 second from earliest. - .refill(rps) - .interval(std::time::Duration::from_secs(1)) - .fair(true) - .build(); - Ok(Self { client, bucket_name: aws_config.bucket_name.clone(), max_keys_per_list_response: aws_config.max_keys_per_list_response, prefix_in_bucket, - concurrency_limiter: Arc::new(concurrency_limiter), + concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())), }) } @@ -203,10 +195,13 @@ impl S3Bucket { } async fn download_object(&self, request: GetObjectRequest) -> Result { - // while the download could take a long time with `leaky_bucket` we have nothing to release - // once the download is done. this is because with "requests per second" rate limiting on - // s3, there should be no meaning for the long requests. - self.concurrency_limiter.clone().acquire_owned(1).await; + let permit = self + .concurrency_limiter + .clone() + .acquire_owned() + .await + .context("Concurrency limiter semaphore got closed during S3 download") + .map_err(DownloadError::Other)?; metrics::inc_get_object(); @@ -224,9 +219,10 @@ impl S3Bucket { let metadata = object_output.metadata().cloned().map(StorageMetadata); Ok(Download { metadata, - download_stream: Box::pin(io::BufReader::new( + download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new( + permit, object_output.body.into_async_read(), - )), + ))), }) } Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { @@ -242,6 +238,32 @@ impl S3Bucket { } } +pin_project_lite::pin_project! { + /// An `AsyncRead` adapter which carries a permit for the lifetime of the value. + struct RatelimitedAsyncRead { + permit: tokio::sync::OwnedSemaphorePermit, + #[pin] + inner: S, + } +} + +impl RatelimitedAsyncRead { + fn new(permit: tokio::sync::OwnedSemaphorePermit, inner: S) -> Self { + RatelimitedAsyncRead { permit, inner } + } +} + +impl AsyncRead for RatelimitedAsyncRead { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.inner.poll_read(cx, buf) + } +} + #[async_trait::async_trait] impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` @@ -267,7 +289,12 @@ impl RemoteStorage for S3Bucket { let mut continuation_token = None; loop { - self.concurrency_limiter.acquire_one().await; + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 list") + .map_err(DownloadError::Other)?; metrics::inc_list_objects(); @@ -312,9 +339,11 @@ impl RemoteStorage for S3Bucket { to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { - // similarly to downloads, the permit does not have live through the upload, but instead we - // are rate limiting requests per second. - self.concurrency_limiter.acquire_one().await; + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 upload")?; metrics::inc_put_object(); @@ -369,7 +398,11 @@ impl RemoteStorage for S3Bucket { } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - self.concurrency_limiter.acquire_one().await; + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 delete")?; metrics::inc_delete_object(); From 13d3f4c29f5627d0b8aa234544bb73847cae909c Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 26 May 2023 17:13:53 +0200 Subject: [PATCH 08/10] set_stopping(): report in result if not transitioning to Stopping --- pageserver/src/tenant.rs | 44 +++++++++++++++++++++++++----------- pageserver/src/tenant/mgr.rs | 28 +++++++++++++++-------- 2 files changed, 49 insertions(+), 23 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f03443b6a7..65ad8ae8e3 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -447,6 +447,11 @@ pub enum DeleteTimelineError { Other(#[from] anyhow::Error), } +pub enum SetStoppingError { + AlreadyStopping, + Broken, +} + struct RemoteStartupData { index_part: IndexPart, remote_metadata: TimelineMetadata, @@ -1711,7 +1716,7 @@ impl Tenant { /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state. /// /// This function is not cancel-safe! - pub async fn set_stopping(&self) { + pub async fn set_stopping(&self) -> Result<(), SetStoppingError> { let mut rx = self.state.subscribe(); // cannot stop before we're done activating, so wait out until we're done activating @@ -1729,8 +1734,8 @@ impl Tenant { .expect("cannot drop self.state while on a &self method"); // we now know we're done activating, let's see whether this task is the winner to transition into Stopping - let mut stopping = false; - self.state.send_modify(|current_state| match current_state { + let mut err = None; + let stopping = self.state.send_if_modified(|current_state| match current_state { TenantState::Activating | TenantState::Loading | TenantState::Attaching => { unreachable!("we ensured above that we're done with activation, and, there is no re-activation") } @@ -1739,29 +1744,42 @@ impl Tenant { // are created after the transition to Stopping. That's harmless, as the Timelines // won't be accessible to anyone afterwards, because the Tenant is in Stopping state. *current_state = TenantState::Stopping; - stopping = true; // Continue stopping outside the closure. We need to grab timelines.lock() // and we plan to turn it into a tokio::sync::Mutex in a future patch. + true } TenantState::Broken { reason, .. } => { info!( "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}" ); + err = Some(SetStoppingError::Broken); + false } TenantState::Stopping => { info!("Tenant is already in Stopping state"); + err = Some(SetStoppingError::AlreadyStopping); + false } }); - - if stopping { - let timelines_accessor = self.timelines.lock().unwrap(); - let not_broken_timelines = timelines_accessor - .values() - .filter(|timeline| timeline.current_state() != TimelineState::Broken); - for timeline in not_broken_timelines { - timeline.set_state(TimelineState::Stopping); - } + match (stopping, err) { + (true, None) => {} // continue + (false, Some(err)) => return Err(err), + (true, Some(_)) => unreachable!( + "send_if_modified closure must error out if not transitioning to Stopping" + ), + (false, None) => unreachable!( + "send_if_modified closure must return true if transitioning to Stopping" + ), } + + let timelines_accessor = self.timelines.lock().unwrap(); + let not_broken_timelines = timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken); + for timeline in not_broken_timelines { + timeline.set_state(TimelineState::Stopping); + } + Ok(()) } /// Method for tenant::mgr to transition us into Broken state in case of a late failure in diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index d40468a5b3..185b5eb3c8 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -19,7 +19,9 @@ use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; -use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState}; +use crate::tenant::{ + create_tenant_files, CreateTenantFilesMode, SetStoppingError, Tenant, TenantState, +}; use crate::IGNORED_TENANT_FILE_NAME; use utils::fs_ext::PathExt; @@ -247,7 +249,7 @@ pub async fn shutdown_all_tenants() { let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len()); for (_, tenant) in tenants_to_shut_down { // updates tenant state, forbidding new GC and compaction iterations from starting - tenant.set_stopping().await; + let _ = tenant.set_stopping().await; // TODO handle error tenants_to_freeze_and_flush.push(tenant); } @@ -587,14 +589,20 @@ where { let tenants_accessor = TENANTS.write().await; match tenants_accessor.get(&tenant_id) { - Some(tenant) => match tenant.current_state() { - TenantState::Attaching - | TenantState::Loading - | TenantState::Activating - | TenantState::Broken { .. } - | TenantState::Active => tenant.set_stopping().await, - TenantState::Stopping => return Err(TenantStateError::IsStopping(tenant_id)), - }, + Some(tenant) => { + match tenant.set_stopping().await { + Ok(()) => { + // we won, continue stopping procedure + } + Err(SetStoppingError::Broken) => { + // continue the procedure, let's hope the closure can deal with broken tenants + } + Err(SetStoppingError::AlreadyStopping) => { + // the tenant is already stopping or broken, don't do anything + return Err(TenantStateError::IsStopping(tenant_id)); + } + } + } None => return Err(TenantStateError::NotFound(tenant_id)), } } From e7c4ef9f4f8ac3e5e5a383839ba0b365e4e6039b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 26 May 2023 17:45:32 +0200 Subject: [PATCH 09/10] don't hold TENANTS lock while waiting for set_stopping() --- pageserver/src/tenant/mgr.rs | 58 +++++++++++++++++++++++++++++++++--- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 185b5eb3c8..6778e13911 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -10,6 +10,7 @@ use tokio::fs; use anyhow::Context; use once_cell::sync::Lazy; use tokio::sync::RwLock; +use tokio::task::JoinSet; use tracing::*; use remote_storage::GenericRemoteStorage; @@ -246,11 +247,56 @@ pub async fn shutdown_all_tenants() { } }; + // Set tenant (and its timlines) to Stoppping state. + // Since we can only transition into Stopping state after activation is complete, + // run it in a JoinSet so all tenants have a chance to stop before we git SIGKILLed. + // + // Transitioning tenants to Stopping state has a couple of non-obvious side effects: + // 1. Lock out any new requests to the tenants. + // 2. Signal cancellation to WAL receivers (we wait on it below). + // 3. Signal cancellation for othher tenant background loops. + // 4. ??? + // + // The waiting for the cancellation is not done uniformly. + // We certainly wait for WAL receivers to shut down. + // That is necessary so that no new data comes in before the freeze_and_flush. + // But the tenant background loops are joined-on in our caller. + // It's mesed up. + let mut join_set = JoinSet::new(); let mut tenants_to_freeze_and_flush = Vec::with_capacity(tenants_to_shut_down.len()); for (_, tenant) in tenants_to_shut_down { - // updates tenant state, forbidding new GC and compaction iterations from starting - let _ = tenant.set_stopping().await; // TODO handle error - tenants_to_freeze_and_flush.push(tenant); + join_set.spawn(async move { + match tenant.set_stopping().await { + Ok(()) => Ok(tenant), + Err(e) => Err((tenant, e)), + } + }); + } + while let Some(res) = join_set.join_next().await { + match res { + Err(join_error) if join_error.is_cancelled() => { + unreachable!("we are not cancelling any of the futures"); + } + Err(join_error) => { + // cannot really do anything, as this panic is likely a bug + error!("task that calls set_stopping() panicked, don't know which tenant this is, and probably freeze_and_flush won't work anyways: {join_error:#}"); + } + Ok(retval) => match retval { + Ok(tenant) => { + // success + debug!("tenant successfully stopped: {}", tenant.tenant_id); + tenants_to_freeze_and_flush.push(tenant); + } + // our task_mgr::shutdown_tasks are going to coalesce on that just fine + Err((tenant, SetStoppingError::AlreadyStopping)) => { + tenants_to_freeze_and_flush.push(tenant); + } + Err((tenant, SetStoppingError::Broken)) => { + warn!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well: {}", tenant.tenant_id); + tenants_to_freeze_and_flush.push(tenant); + } + }, + } } // Shut down all existing walreceiver connections and stop accepting the new ones. @@ -266,8 +312,9 @@ pub async fn shutdown_all_tenants() { // On error, log it but continue with the shutdown for other tenants. for tenant in tenants_to_freeze_and_flush { let tenant_id = tenant.tenant_id(); - debug!("shutdown tenant {tenant_id}"); + debug!("freeze_and_flush tenant {tenant_id}"); + // TODO this could probably run in a JoinSet as well? if let Err(err) = tenant.freeze_and_flush().await { error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}"); } @@ -590,6 +637,9 @@ where let tenants_accessor = TENANTS.write().await; match tenants_accessor.get(&tenant_id) { Some(tenant) => { + let tenant = Arc::clone(tenant); + // don't hold TENANTS lock while set_stopping waits for activation to finish + drop(tenants_accessor); match tenant.set_stopping().await { Ok(()) => { // we won, continue stopping procedure From 9a4789ec7345b5a7879a673cb82d597ee33ce887 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 26 May 2023 18:22:41 +0200 Subject: [PATCH 10/10] demote warn line to info-level, as the log line in set_stopping() is also info!() This should fix the faile regress tests that barked on allowed_errors --- pageserver/src/tenant/mgr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 6778e13911..9934ba4b3c 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -292,7 +292,7 @@ pub async fn shutdown_all_tenants() { tenants_to_freeze_and_flush.push(tenant); } Err((tenant, SetStoppingError::Broken)) => { - warn!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well: {}", tenant.tenant_id); + info!("tenant is broken, so stopping failed, freeze_and_flush is likely going to make noise as well: {}", tenant.tenant_id); tenants_to_freeze_and_flush.push(tenant); } },