From 9ba2a87e69880f1bad63bcf3cd433eee054919dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Wed, 19 Feb 2025 17:57:11 +0100 Subject: [PATCH] storcon: sk heartbeat fixes (#10891) This PR does the following things: * The initial heartbeat round blocks the storage controller from becoming online again. If all safekeepers are unresponsive, this can cause storage controller startup to be very slow. The original intent of #10583 was that heartbeats don't affect normal functionality of the storage controller. So add a short timeout to prevent it from impeding storcon functionality. * Fix the URL of the utilization endpoint. * Don't send heartbeats to safekeepers which are decomissioned. Part of https://github.com/neondatabase/neon/issues/9011 context: https://neondb.slack.com/archives/C033RQ5SPDH/p1739966807592589 --- safekeeper/client/src/mgmt_api.rs | 2 +- storage_controller/src/heartbeater.rs | 8 +++++++- storage_controller/src/safekeeper.rs | 16 ++++++++++++---- storage_controller/src/service.rs | 8 +++++--- test_runner/regress/test_storage_controller.py | 14 +++++++++++--- 5 files changed, 36 insertions(+), 12 deletions(-) diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index 40e5afc4aa..5c305769dd 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -137,7 +137,7 @@ impl Client { } pub async fn utilization(&self) -> Result { - let uri = format!("{}/v1/utilization/", self.mgmt_api_endpoint); + let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint); let resp = self.get(&uri).await?; resp.json().await.map_err(Error::ReceiveBody) } diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index 6f110d3294..1f20326398 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -10,7 +10,10 @@ use std::{ }; use tokio_util::sync::CancellationToken; -use pageserver_api::{controller_api::NodeAvailability, models::PageserverUtilization}; +use pageserver_api::{ + controller_api::{NodeAvailability, SkSchedulingPolicy}, + models::PageserverUtilization, +}; use thiserror::Error; use utils::{id::NodeId, logging::SecretString}; @@ -311,6 +314,9 @@ impl HeartBeat for HeartbeaterTask Self { + let scheduling_policy = SkSchedulingPolicy::from_str(&skp.scheduling_policy).unwrap(); Self { cancel, listen_http_addr: skp.host.clone(), @@ -31,6 +33,7 @@ impl Safekeeper { id: NodeId(skp.id as u64), skp, availability: SafekeeperState::Offline, + scheduling_policy, } } pub(crate) fn base_url(&self) -> String { @@ -46,6 +49,13 @@ impl Safekeeper { pub(crate) fn set_availability(&mut self, availability: SafekeeperState) { self.availability = availability; } + pub(crate) fn scheduling_policy(&self) -> SkSchedulingPolicy { + self.scheduling_policy + } + pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) { + self.scheduling_policy = scheduling_policy; + self.skp.scheduling_policy = String::from(scheduling_policy); + } /// Perform an operation (which is given a [`SafekeeperClient`]) with retries pub(crate) async fn with_client_retries( &self, @@ -129,10 +139,8 @@ impl Safekeeper { self.id.0 ); } - self.skp = crate::persistence::SafekeeperPersistence::from_upsert( - record, - SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(), - ); + self.skp = + crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy); self.listen_http_port = http_port as u16; self.listen_http_addr = host; } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index dd4d93dc84..f47dd72579 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -819,7 +819,9 @@ impl Service { .heartbeater_ps .heartbeat(Arc::new(nodes_to_heartbeat)) .await; - let res_sk = self.heartbeater_sk.heartbeat(all_sks).await; + // Put a small, but reasonable timeout to get the initial heartbeats of the safekeepers to avoid a storage controller downtime + const SK_TIMEOUT: Duration = Duration::from_secs(5); + let res_sk = tokio::time::timeout(SK_TIMEOUT, self.heartbeater_sk.heartbeat(all_sks)).await; let mut online_nodes = HashMap::new(); if let Ok(deltas) = res_ps { @@ -837,7 +839,7 @@ impl Service { } let mut online_sks = HashMap::new(); - if let Ok(deltas) = res_sk { + if let Ok(Ok(deltas)) = res_sk { for (node_id, status) in deltas.0 { match status { SafekeeperState::Available { @@ -7960,7 +7962,7 @@ impl Service { let sk = safekeepers .get_mut(&node_id) .ok_or(DatabaseError::Logical("Not found".to_string()))?; - sk.skp.scheduling_policy = String::from(scheduling_policy); + sk.set_scheduling_policy(scheduling_policy); locked.safekeepers = Arc::new(safekeepers); } diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 88d30308f7..1d95312140 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3238,12 +3238,17 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): newest_info = target.get_safekeeper(inserted["id"]) assert newest_info assert newest_info["scheduling_policy"] == "Pause" - target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned") + target.safekeeper_scheduling_policy(inserted["id"], "Active") newest_info = target.get_safekeeper(inserted["id"]) assert newest_info - assert newest_info["scheduling_policy"] == "Decomissioned" + assert newest_info["scheduling_policy"] == "Active" # Ensure idempotency - target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned") + target.safekeeper_scheduling_policy(inserted["id"], "Active") + newest_info = target.get_safekeeper(inserted["id"]) + assert newest_info + assert newest_info["scheduling_policy"] == "Active" + # change back to paused again + target.safekeeper_scheduling_policy(inserted["id"], "Pause") def storcon_heartbeat(): assert env.storage_controller.log_contains( @@ -3252,6 +3257,9 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): wait_until(storcon_heartbeat) + # Now decomission it + target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned") + def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool: compared = [dict(a), dict(b)]