diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 864abad574..cc9534f05d 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -87,6 +87,24 @@ jobs: uses: ./.github/workflows/build-build-tools-image.yml secrets: inherit + lint-openapi-spec: + runs-on: ubuntu-22.04 + needs: [ meta, check-permissions ] + # We do need to run this in `.*-rc-pr` because of hotfixes. + if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }} + steps: + - name: Harden the runner (Audit all outbound calls) + uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 + with: + egress-policy: audit + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # v3.4.0 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - run: make lint-openapi-spec + check-codestyle-python: needs: [ meta, check-permissions, build-build-tools-image ] # No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes. diff --git a/Cargo.lock b/Cargo.lock index 7edc974e93..8a3e3a61eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4499,6 +4499,7 @@ name = "pageserver_client_grpc" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "compute_api", "futures", "pageserver_api", diff --git a/Makefile b/Makefile index 4b31e26810..d07ac907b4 100644 --- a/Makefile +++ b/Makefile @@ -220,6 +220,15 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17 setup-pre-commit-hook: ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit +.PHONY: lint-openapi-spec +lint-openapi-spec: + # operation-2xx-response: pageserver timeline delete returns 404 on success + find . -iname "openapi_spec.y*ml" -exec\ + docker run --rm -v ${PWD}:/spec ghcr.io/redocly/cli:1.34.4\ + --skip-rule=operation-operationId --skip-rule=operation-summary --extends=minimal\ + --skip-rule=no-server-example.com --skip-rule=operation-2xx-response\ + lint {} \+ + # Targets for building PostgreSQL are defined in postgres.mk. # # But if the caller has indicated that PostgreSQL is already diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index eaf33d1f82..3c58b284b3 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -416,15 +416,6 @@ components: total_startup_ms: type: integer - Info: - type: object - description: Information about VM/Pod. - required: - - num_cpus - properties: - num_cpus: - type: integer - DbsAndRoles: type: object description: Databases and Roles @@ -642,26 +633,6 @@ components: description: Promote error, if any type: string - InstalledExtensions: - type: object - properties: - extensions: - description: Contains list of installed extensions. - type: array - items: - type: object - properties: - extname: - type: string - version: - type: string - items: - type: string - n_databases: - type: integer - owned_by_superuser: - type: integer - SetRoleGrantsRequest: type: object required: diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 0a8bcad2ef..84e27abb84 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +bytes.workspace = true compute_api.workspace = true futures.workspace = true pageserver_api.workspace = true diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 5bccdeede3..c21ce2e47d 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -2,13 +2,15 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::anyhow; +use futures::stream::FuturesUnordered; +use futures::{FutureExt as _, StreamExt as _}; use tracing::instrument; use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; use crate::retry::Retry; +use crate::split::GetPageSplitter; use compute_api::spec::PageserverProtocol; -use pageserver_api::key::{Key, rel_block_to_key}; -use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; +use pageserver_api::shard::ShardStripeSize; use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber}; @@ -78,10 +80,11 @@ impl PageserverClient { .await } - /// Fetches a page. The `request_id` must be unique across all in-flight requests. + /// Fetches pages. The `request_id` must be unique across all in-flight requests. Automatically + /// splits requests that straddle shard boundaries, and assembles the responses. /// - /// Unlike the `page_api::Client`, this client automatically converts `status_code` into - /// `tonic::Status` errors. All responses will have `GetPageStatusCode::Ok`. + /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status` + /// errors. All responses will have `GetPageStatusCode::Ok`. #[instrument(skip_all, fields( req_id = %req.request_id, rel = %req.rel, @@ -93,22 +96,55 @@ impl PageserverClient { &self, req: page_api::GetPageRequest, ) -> tonic::Result { - // TODO: this needs to split batch requests across shards and reassemble responses into a - // single response. It must also re-split the batch in case the shard map changes. For now, - // just use the first page. - let key = rel_block_to_key( - req.rel, - req.block_numbers - .first() - .copied() - .ok_or_else(|| tonic::Status::invalid_argument("no block numbers provided"))?, - ); + // Make sure we have at least one page. + if req.block_numbers.is_empty() { + return Err(tonic::Status::invalid_argument("no block number")); + } - self.retry + // Fast path: request is for a single shard. + if let Some(shard_id) = + GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size) + { + return self.get_page_for_shard(shard_id, req).await; + } + + // Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and + // reassemble the responses. + // + // TODO: when we support shard map updates, we need to detect when it changes and re-split + // the request on errors. + let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size); + + let mut shard_requests: FuturesUnordered<_> = splitter + .drain_requests() + .map(|(shard_id, shard_req)| { + // NB: each request will retry internally. + self.get_page_for_shard(shard_id, shard_req) + .map(move |result| result.map(|resp| (shard_id, resp))) + }) + .collect(); + + while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? { + splitter.add_response(shard_id, shard_response)?; + } + + splitter.assemble_response() + } + + /// Fetches pages that belong to the given shard. + #[instrument(skip_all, fields(shard = %shard_id))] + async fn get_page_for_shard( + &self, + shard_id: ShardIndex, + req: page_api::GetPageRequest, + ) -> tonic::Result { + let resp = self + .retry .with(async || { - let stream = self.shards.get_for_key(key).stream().await; + let stream = self.shards.get(shard_id)?.stream().await; let resp = stream.send(req.clone()).await?; + // Convert per-request errors into a tonic::Status. if resp.status_code != page_api::GetPageStatusCode::Ok { return Err(tonic::Status::new( resp.status_code.into(), @@ -118,7 +154,18 @@ impl PageserverClient { Ok(resp) }) - .await + .await?; + + // Make sure we got the right number of pages. + // NB: check outside of the retry loop, since we don't want to retry this. + let (expected, actual) = (req.block_numbers.len(), resp.page_images.len()); + if expected != actual { + return Err(tonic::Status::internal(format!( + "expected {expected} pages for shard {shard_id}, got {actual}", + ))); + } + + Ok(resp) } /// Returns the size of a relation, as # of blocks. @@ -216,13 +263,6 @@ impl Shards { .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}"))) } - /// Looks up the shard that owns the given key. - fn get_for_key(&self, key: Key) -> &Shard { - let shard_number = key_to_shard_number(self.count, self.stripe_size, &key); - self.get(ShardIndex::new(shard_number, self.count)) - .expect("must exist") - } - /// Returns shard 0. fn get_zero(&self) -> &Shard { self.get(ShardIndex::new(ShardNumber(0), self.count)) diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 2a59f9868c..3fc7178be2 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -1,5 +1,6 @@ mod client; mod pool; mod retry; +mod split; pub use client::PageserverClient; diff --git a/pageserver/client_grpc/src/split.rs b/pageserver/client_grpc/src/split.rs new file mode 100644 index 0000000000..5bbcaab393 --- /dev/null +++ b/pageserver/client_grpc/src/split.rs @@ -0,0 +1,172 @@ +use std::collections::HashMap; + +use bytes::Bytes; + +use pageserver_api::key::rel_block_to_key; +use pageserver_api::shard::{ShardStripeSize, key_to_shard_number}; +use pageserver_page_api as page_api; +use utils::shard::{ShardCount, ShardIndex}; + +/// Splits GetPageRequests that straddle shard boundaries and assembles the responses. +/// TODO: add tests for this. +pub struct GetPageSplitter { + /// The original request ID. Used for all shard requests. + request_id: page_api::RequestID, + /// Split requests by shard index. + requests: HashMap, + /// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble + /// the response pages in the same order as the original request. + block_shards: Vec, + /// Page responses by shard index. Will be assembled into a single response. + responses: HashMap>, +} + +impl GetPageSplitter { + /// Checks if the given request only touches a single shard, and returns the shard ID. This is + /// the common case, so we check first in order to avoid unnecessary allocations and overhead. + /// The caller must ensure that the request has at least one block number, or this will panic. + pub fn is_single_shard( + req: &page_api::GetPageRequest, + count: ShardCount, + stripe_size: ShardStripeSize, + ) -> Option { + // Fast path: unsharded tenant. + if count.is_unsharded() { + return Some(ShardIndex::unsharded()); + } + + // Find the base shard index for the first page, and compare with the rest. + let key = rel_block_to_key(req.rel, *req.block_numbers.first().expect("no pages")); + let shard_number = key_to_shard_number(count, stripe_size, &key); + + req.block_numbers + .iter() + .skip(1) // computed above + .all(|&blkno| { + let key = rel_block_to_key(req.rel, blkno); + key_to_shard_number(count, stripe_size, &key) == shard_number + }) + .then_some(ShardIndex::new(shard_number, count)) + } + + /// Splits the given request. + pub fn split( + req: page_api::GetPageRequest, + count: ShardCount, + stripe_size: ShardStripeSize, + ) -> Self { + // The caller should make sure we don't split requests unnecessarily. + debug_assert!( + Self::is_single_shard(&req, count, stripe_size).is_none(), + "unnecessary request split" + ); + + // Split the requests by shard index. + let mut requests = HashMap::with_capacity(2); // common case + let mut block_shards = Vec::with_capacity(req.block_numbers.len()); + for blkno in req.block_numbers { + let key = rel_block_to_key(req.rel, blkno); + let shard_number = key_to_shard_number(count, stripe_size, &key); + let shard_id = ShardIndex::new(shard_number, count); + + let shard_req = requests + .entry(shard_id) + .or_insert_with(|| page_api::GetPageRequest { + request_id: req.request_id, + request_class: req.request_class, + rel: req.rel, + read_lsn: req.read_lsn, + block_numbers: Vec::new(), + }); + shard_req.block_numbers.push(blkno); + block_shards.push(shard_id); + } + + Self { + request_id: req.request_id, + responses: HashMap::with_capacity(requests.len()), + requests, + block_shards, + } + } + + /// Drains the per-shard requests, moving them out of the hashmap to avoid extra allocations. + pub fn drain_requests( + &mut self, + ) -> impl Iterator { + self.requests.drain() + } + + /// Adds a response from the given shard. + #[allow(clippy::result_large_err)] + pub fn add_response( + &mut self, + shard_id: ShardIndex, + response: page_api::GetPageResponse, + ) -> tonic::Result<()> { + // The caller should already have converted status codes into tonic::Status. + assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok); + + // Make sure the response matches the request ID. + if response.request_id != self.request_id { + return Err(tonic::Status::internal(format!( + "response ID {} does not match request ID {}", + response.request_id, self.request_id + ))); + } + + // Add the response data to the map. + let old = self.responses.insert(shard_id, response.page_images); + + if old.is_some() { + return Err(tonic::Status::internal(format!( + "duplicate response for shard {shard_id}", + ))); + } + + Ok(()) + } + + /// Assembles the shard responses into a single response. Responses must be present for all + /// relevant shards, and the total number of pages must match the original request. + #[allow(clippy::result_large_err)] + pub fn assemble_response(self) -> tonic::Result { + let mut response = page_api::GetPageResponse { + request_id: self.request_id, + status_code: page_api::GetPageStatusCode::Ok, + reason: None, + page_images: Vec::with_capacity(self.block_shards.len()), + }; + + // Set up per-shard page iterators we can pull from. + let mut shard_responses = HashMap::with_capacity(self.responses.len()); + for (shard_id, responses) in self.responses { + shard_responses.insert(shard_id, responses.into_iter()); + } + + // Reassemble the responses in the same order as the original request. + for shard_id in &self.block_shards { + let page = shard_responses + .get_mut(shard_id) + .ok_or_else(|| { + tonic::Status::internal(format!("missing response for shard {shard_id}")) + })? + .next() + .ok_or_else(|| { + tonic::Status::internal(format!("missing page from shard {shard_id}")) + })?; + response.page_images.push(page); + } + + // Make sure there are no additional pages. + for (shard_id, mut pages) in shard_responses { + if pages.next().is_some() { + return Err(tonic::Status::internal(format!( + "extra pages returned from shard {shard_id}" + ))); + } + } + + Ok(response) + } +} diff --git a/pageserver/src/feature_resolver.rs b/pageserver/src/feature_resolver.rs index 65cac8eea1..f0178fd9b3 100644 --- a/pageserver/src/feature_resolver.rs +++ b/pageserver/src/feature_resolver.rs @@ -409,11 +409,12 @@ impl TenantFeatureResolver { /// Refresh the cached properties and flags on the critical path. pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) { - let mut remote_size_mb = None; + let mut remote_size_mb = Some(0.0); for timeline in tenant_shard.list_timelines() { let size = timeline.metrics.resident_physical_size_get(); if size == 0 { remote_size_mb = None; + break; } if let Some(ref mut remote_size_mb) = remote_size_mb { *remote_size_mb += size as f64 / 1024.0 / 1024.0; diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index e8d1367d6c..3ffc80f19a 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -116,26 +116,6 @@ paths: schema: type: string - /v1/tenant/{tenant_id}/timeline: - parameters: - - name: tenant_id - in: path - required: true - schema: - type: string - get: - description: Get timelines for tenant - responses: - "200": - description: TimelineInfo - content: - application/json: - schema: - type: array - items: - $ref: "#/components/schemas/TimelineInfo" - - /v1/tenant/{tenant_id}/timeline/{timeline_id}: parameters: - name: tenant_id @@ -618,7 +598,7 @@ paths: schema: $ref: "#/components/schemas/SecondaryProgress" - /v1/tenant/{tenant_id}/timeline/: + /v1/tenant/{tenant_id}/timeline: parameters: - name: tenant_id in: path @@ -685,6 +665,17 @@ paths: application/json: schema: $ref: "#/components/schemas/Error" + get: + description: Get timelines for tenant + responses: + "200": + description: TimelineInfo + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/TimelineInfo" /v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor: parameters: @@ -767,7 +758,7 @@ paths: $ref: "#/components/schemas/ServiceUnavailableError" - /v1/tenant/: + /v1/tenant: get: description: Get tenants list responses: @@ -847,7 +838,7 @@ paths: items: $ref: "#/components/schemas/TenantInfo" - /v1/tenant/{tenant_id}/config/: + /v1/tenant/{tenant_id}/config: parameters: - name: tenant_id in: path diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 36f7346c6f..c888b616c7 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3694,6 +3694,23 @@ async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow Ok(()) } +async fn force_refresh_feature_flag( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + let state = get_state(&request); + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + tenant + .feature_resolver + .refresh_properties_and_flags(&tenant); + json_response(StatusCode::OK, ()) +} + async fn tenant_evaluate_feature_flag( request: Request, _cancel: CancellationToken, @@ -4159,6 +4176,9 @@ pub fn make_router( .get("/v1/tenant/:tenant_shard_id/feature_flag/:flag_key", |r| { api_handler(r, tenant_evaluate_feature_flag) }) + .post("/v1/tenant/:tenant_shard_id/force_refresh_feature_flag", |r| { + api_handler(r, force_refresh_feature_flag) + }) .put("/v1/feature_flag/:flag_key", |r| { testing_api_handler("force override feature flag - put", r, force_override_feature_flag_for_testing_put) }) diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index d9037f2d08..79cfba8da6 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -1247,3 +1247,10 @@ class PageserverHttpClient(requests.Session, MetricsGetter): ) self.verbose_error(res) return res.json() + + def force_refresh_feature_flag(self, tenant_id: TenantId | TenantShardId): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/force_refresh_feature_flag", + ) + self.verbose_error(res) + return res.json() diff --git a/test_runner/random_ops/test_random_ops.py b/test_runner/random_ops/test_random_ops.py index d3815c40bb..5c43b06bc5 100644 --- a/test_runner/random_ops/test_random_ops.py +++ b/test_runner/random_ops/test_random_ops.py @@ -117,7 +117,9 @@ class NeonBranch: def create_child_branch(self) -> NeonBranch | None: return self.project.create_branch(self.id) - def create_ro_endpoint(self) -> NeonEndpoint: + def create_ro_endpoint(self) -> NeonEndpoint | None: + if not self.project.check_limit_endpoints(): + return None return NeonEndpoint( self.project, self.neon_api.create_endpoint(self.project_id, self.id, "read_only", {})["endpoint"], @@ -151,11 +153,26 @@ class NeonBranch: return self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"]) parent_id: str = res["branch"]["parent_id"] + # XXX Retry get parent details to work around the issue + # https://databricks.atlassian.net/browse/LKB-279 + target_time = datetime.now() + timedelta(seconds=30) + while datetime.now() < target_time: + try: + parent_def = self.neon_api.get_branch_details(self.project_id, parent_id) + except HTTPError as he: + if he.response.status_code == 404: + log.info("Branch not found, waiting...") + time.sleep(1) + else: + raise HTTPError(he) from he + else: + break + else: + raise RuntimeError(f"Branch {parent_id} not found") + # Creates an object for the parent branch # After the reset operation a new parent branch is created - parent = NeonBranch( - self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True - ) + parent = NeonBranch(self.project, parent_def, True) self.project.branches[parent_id] = parent self.parent = parent parent.children[self.id] = self @@ -168,29 +185,21 @@ class NeonBranch: source_timestamp: str | None = None, preserve_under_name: str | None = None, ) -> dict[str, Any] | None: + if not self.project.check_limit_branches(): + return None endpoints = [ep for ep in self.endpoints.values() if ep.type == "read_only"] # Terminate all the benchmarks running to prevent errors. Errors in benchmark during pgbench are expected for ep in endpoints: ep.terminate_benchmark() self.terminate_benchmark() - try: - res: dict[str, Any] = self.neon_api.restore_branch( - self.project_id, - self.id, - source_branch_id, - source_lsn, - source_timestamp, - preserve_under_name, - ) - except HTTPError as he: - if ( - he.response.status_code == 422 - and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED" - ): - log.info("Branch limit exceeded, skipping") - return None - else: - raise HTTPError(he) from he + res: dict[str, Any] = self.neon_api.restore_branch( + self.project_id, + self.id, + source_branch_id, + source_lsn, + source_timestamp, + preserve_under_name, + ) self.project.wait() self.start_benchmark() for ep in endpoints: @@ -239,19 +248,30 @@ class NeonProject: def delete(self) -> None: self.neon_api.delete_project(self.id) + def check_limit_branches(self) -> bool: + if self.limits["max_branches"] == -1 or len(self.branches) < self.limits["max_branches"]: + return True + log.info("branch limit exceeded (%s/%s)", len(self.branches), self.limits["max_branches"]) + return False + + def check_limit_endpoints(self) -> bool: + if ( + self.limits["max_read_only_endpoints"] == -1 + or self.read_only_endpoints_total < self.limits["max_read_only_endpoints"] + ): + return True + log.info( + "Maximum read only endpoint limit exceeded (%s/%s)", + self.read_only_endpoints_total, + self.limits["max_read_only_endpoints"], + ) + return False + def create_branch(self, parent_id: str | None = None) -> NeonBranch | None: self.wait() - try: - branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id) - except HTTPError as he: - if ( - he.response.status_code == 422 - and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED" - ): - log.info("Branch limit exceeded, skipping") - return None - else: - raise HTTPError(he) from he + if not self.check_limit_branches(): + return None + branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id) new_branch = NeonBranch(self, branch_def) self.wait() return new_branch @@ -388,17 +408,9 @@ def do_action(project: NeonProject, action: str) -> bool: log.info("Action: %s", action) if action == "new_branch": log.info("Trying to create a new branch") - if 0 <= project.limits["max_branches"] <= len(project.branches): - log.info( - "Maximum branch limit exceeded (%s of %s)", - len(project.branches), - project.limits["max_branches"], - ) - return False parent = project.branches[ random.choice(list(set(project.branches.keys()) - project.reset_branches)) ] - log.info("Parent: %s", parent) child = parent.create_child_branch() if child is None: return False @@ -413,16 +425,11 @@ def do_action(project: NeonProject, action: str) -> bool: log.info("Leaf branches not found, skipping") return False elif action == "new_ro_endpoint": - if 0 <= project.limits["max_read_only_endpoints"] <= project.read_only_endpoints_total: - log.info( - "Maximum read only endpoint limit exceeded (%s of %s)", - project.read_only_endpoints_total, - project.limits["max_read_only_endpoints"], - ) - return False ep = random.choice( [br for br in project.branches.values() if br.id not in project.reset_branches] ).create_ro_endpoint() + if ep is None: + return False log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id) ep.start_benchmark() elif action == "delete_ro_endpoint": diff --git a/test_runner/regress/test_feature_flag.py b/test_runner/regress/test_feature_flag.py index 2712d13dcc..c6c192b6f1 100644 --- a/test_runner/regress/test_feature_flag.py +++ b/test_runner/regress/test_feature_flag.py @@ -49,3 +49,12 @@ def test_feature_flag(neon_env_builder: NeonEnvBuilder): env.initial_tenant, "test-feature-flag" )["result"] ) + + env.pageserver.http_client().force_refresh_feature_flag(env.initial_tenant) + + # Check if the properties exist + result = env.pageserver.http_client().evaluate_feature_flag_multivariate( + env.initial_tenant, "test-feature-flag" + ) + assert "tenant_remote_size_mb" in result["properties"] + assert "tenant_id" in result["properties"] diff --git a/test_runner/regress/test_subscriber_branching.py b/test_runner/regress/test_subscriber_branching.py index 83bebc19be..63772f7cd4 100644 --- a/test_runner/regress/test_subscriber_branching.py +++ b/test_runner/regress/test_subscriber_branching.py @@ -332,7 +332,7 @@ def test_multiple_subscription_branching(neon_simple_env: NeonEnv): last_insert_lsn = query_scalar(cursor, "select pg_current_wal_insert_lsn();") - def start_publisher_workload(table_num: int, duration: int): + def start_publisher_workload(i: int, duration: int): start = time.time() with endpoint.cursor(dbname="publisher_db") as cur: while time.time() - start < duration: