From 622b3b29936d0496808396e447e678177a58412d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 8 May 2025 17:13:11 +0200 Subject: [PATCH] Fixes for enabling --timelines-onto-safekeepers in tests (#11854) Second PR with fixes extracted from #11712, relating to `--timelines-onto-safekeepers`. Does the following: * Moves safekeeper registration to `neon_local` instead of the test fixtures * Pass safekeeper JWT token if `--timelines-onto-safekeepers` is enabled * Allow some warnings related to offline safekeepers (similarly to how we allow them for offline pageservers) * Enable generations on the compute's config if `--timelines-onto-safekeepers` is enabled * fix parallel `pull_timeline` race condition (the one that #11786 put for later) Fixes #11424 Part of #11670 --- control_plane/src/bin/neon_local.rs | 9 +- control_plane/src/storage_controller.rs | 100 ++++++++++++++++-- safekeeper/src/http/routes.rs | 3 +- safekeeper/src/pull_timeline.rs | 30 ++++-- test_runner/fixtures/neon_fixtures.py | 24 ----- .../fixtures/pageserver/allowed_errors.py | 4 + 6 files changed, 131 insertions(+), 39 deletions(-) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 610fa5f865..191a22f1de 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1417,7 +1417,14 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res let pageserver_id = args.endpoint_pageserver_id; let remote_ext_base_url = &args.remote_ext_base_url; - let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new); + let default_generation = env + .storage_controller + .timelines_onto_safekeepers + .then_some(1); + let safekeepers_generation = args + .safekeepers_generation + .or(default_generation) + .map(SafekeeperGeneration::new); // If --safekeepers argument is given, use only the listed // safekeeper nodes; otherwise all from the env. let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? { diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index a36815d27e..755d67a7ad 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -10,7 +10,8 @@ use camino::{Utf8Path, Utf8PathBuf}; use hyper0::Uri; use nix::unistd::Pid; use pageserver_api::controller_api::{ - NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest, + NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, + SafekeeperSchedulingPolicyRequest, SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantLocateResponse, }; use pageserver_api::models::{ @@ -20,7 +21,7 @@ use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ResponseErrorMessageExt; use pem::Pem; use postgres_backend::AuthType; -use reqwest::Method; +use reqwest::{Method, Response}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tokio::process::Command; @@ -570,6 +571,11 @@ impl StorageController { let peer_jwt_token = encode_from_key_file(&peer_claims, private_key) .expect("failed to generate jwt token"); args.push(format!("--peer-jwt-token={peer_jwt_token}")); + + let claims = Claims::new(None, Scope::SafekeeperData); + let jwt_token = + encode_from_key_file(&claims, private_key).expect("failed to generate jwt token"); + args.push(format!("--safekeeper-jwt-token={jwt_token}")); } if let Some(public_key) = &self.public_key { @@ -614,6 +620,10 @@ impl StorageController { self.env.base_data_dir.display() )); + if self.env.safekeepers.iter().any(|sk| sk.auth_enabled) && self.private_key.is_none() { + anyhow::bail!("Safekeeper set up for auth but no private key specified"); + } + if self.config.timelines_onto_safekeepers { args.push("--timelines-onto-safekeepers".to_string()); } @@ -640,6 +650,10 @@ impl StorageController { ) .await?; + if self.config.timelines_onto_safekeepers { + self.register_safekeepers().await?; + } + Ok(()) } @@ -743,6 +757,23 @@ impl StorageController { where RQ: Serialize + Sized, RS: DeserializeOwned + Sized, + { + let response = self.dispatch_inner(method, path, body).await?; + Ok(response + .json() + .await + .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?) + } + + /// Simple HTTP request wrapper for calling into storage controller + async fn dispatch_inner( + &self, + method: reqwest::Method, + path: String, + body: Option, + ) -> anyhow::Result + where + RQ: Serialize + Sized, { // In the special case of the `storage_controller start` subcommand, we wish // to use the API endpoint of the newly started storage controller in order @@ -785,10 +816,31 @@ impl StorageController { let response = builder.send().await?; let response = response.error_from_body().await?; - Ok(response - .json() - .await - .map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?) + Ok(response) + } + + /// Register the safekeepers in the storage controller + #[instrument(skip(self))] + async fn register_safekeepers(&self) -> anyhow::Result<()> { + for sk in self.env.safekeepers.iter() { + let sk_id = sk.id; + let body = serde_json::json!({ + "id": sk_id, + "created_at": "2023-10-25T09:11:25Z", + "updated_at": "2024-08-28T11:32:43Z", + "region_id": "aws-us-east-2", + "host": "127.0.0.1", + "port": sk.pg_port, + "http_port": sk.http_port, + "https_port": sk.https_port, + "version": 5957, + "availability_zone_id": format!("us-east-2b-{sk_id}"), + }); + self.upsert_safekeeper(sk_id, body).await?; + self.safekeeper_scheduling_policy(sk_id, SkSchedulingPolicy::Active) + .await?; + } + Ok(()) } /// Call into the attach_hook API, for use before handing out attachments to pageservers @@ -816,6 +868,42 @@ impl StorageController { Ok(response.generation) } + #[instrument(skip(self))] + pub async fn upsert_safekeeper( + &self, + node_id: NodeId, + request: serde_json::Value, + ) -> anyhow::Result<()> { + let resp = self + .dispatch_inner::( + Method::POST, + format!("control/v1/safekeeper/{node_id}"), + Some(request), + ) + .await?; + if !resp.status().is_success() { + anyhow::bail!( + "setting scheduling policy unsuccessful for safekeeper {node_id}: {}", + resp.status() + ); + } + Ok(()) + } + + #[instrument(skip(self))] + pub async fn safekeeper_scheduling_policy( + &self, + node_id: NodeId, + scheduling_policy: SkSchedulingPolicy, + ) -> anyhow::Result<()> { + self.dispatch::( + Method::POST, + format!("control/v1/safekeeper/{node_id}/scheduling_policy"), + Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }), + ) + .await + } + #[instrument(skip(self))] pub async fn inspect( &self, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 2b2d721db2..1a25b07496 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -243,8 +243,7 @@ async fn timeline_pull_handler(mut request: Request) -> Result, ssl_ca_certs: Vec, global_timelines: Arc, -) -> Result { +) -> Result { let existing_tli = global_timelines.get(TenantTimelineId::new( request.tenant_id, request.timeline_id, @@ -411,7 +412,9 @@ pub async fn handle_request( for ssl_ca_cert in ssl_ca_certs { http_client = http_client.add_root_certificate(ssl_ca_cert); } - let http_client = http_client.build()?; + let http_client = http_client + .build() + .map_err(|e| ApiError::InternalServerError(e.into()))?; let http_hosts = request.http_hosts.clone(); @@ -443,10 +446,10 @@ pub async fn handle_request( // offline and C comes online. Then we want a pull on C with A and B as hosts to work. let min_required_successful = (http_hosts.len() - 1).max(1); if statuses.len() < min_required_successful { - bail!( + return Err(ApiError::InternalServerError(anyhow::anyhow!( "only got {} successful status responses. required: {min_required_successful}", statuses.len() - ) + ))); } // Find the most advanced safekeeper @@ -465,7 +468,7 @@ pub async fn handle_request( assert!(status.tenant_id == request.tenant_id); assert!(status.timeline_id == request.timeline_id); - pull_timeline( + match pull_timeline( status, safekeeper_host, sk_auth_token, @@ -473,6 +476,21 @@ pub async fn handle_request( global_timelines, ) .await + { + Ok(resp) => Ok(resp), + Err(e) => { + match e.downcast_ref::() { + Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse { + safekeeper_host: None, + }), + Some(TimelineError::CreationInProgress(_)) => { + // We don't return success here because creation might still fail. + Err(ApiError::Conflict("Creation in progress".to_owned())) + } + _ => Err(ApiError::InternalServerError(e)), + } + } + } } async fn pull_timeline( diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 370eca5130..547c640a40 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1409,30 +1409,6 @@ class NeonEnv: for f in futs: f.result() - # Last step: register safekeepers at the storage controller - if ( - self.storage_controller_config is not None - and self.storage_controller_config.get("timelines_onto_safekeepers") is True - ): - for sk_id, sk in enumerate(self.safekeepers): - # 0 is an invalid safekeeper id - sk_id = sk_id + 1 - body = { - "id": sk_id, - "created_at": "2023-10-25T09:11:25Z", - "updated_at": "2024-08-28T11:32:43Z", - "region_id": "aws-us-east-2", - "host": "127.0.0.1", - "port": sk.port.pg, - "http_port": sk.port.http, - "https_port": None, - "version": 5957, - "availability_zone_id": f"us-east-2b-{sk_id}", - } - - self.storage_controller.on_safekeeper_deploy(sk_id, body) - self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active") - self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds) def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True): diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 24c856e279..43bffd919c 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -122,6 +122,10 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [ ".*Call to node.*management API.*failed.*Timeout.*", ".*Failed to update node .+ after heartbeat round.*error sending request for url.*", ".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*", + # Many tests will take safekeepers offline + ".*Call to safekeeper.*management API.*failed.*receive body.*", + ".*Call to safekeeper.*management API.*failed.*ReceiveBody.*", + ".*Call to safekeeper.*management API.*failed.*Timeout.*", # Many tests will start up with a node offline ".*startup_reconcile: Could not scan node.*", # Tests run in dev mode