From a852bc5e3936ec2f8fb5ec9511ef32269542aff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 3 Jul 2025 18:27:43 +0200 Subject: [PATCH] Add new activating scheduling policy for safekeepers (#12441) When deploying new safekeepers, we don't immediately want to send traffic to them. Maybe they are not ready yet by the time the deploy script is registering them with the storage controller. For pageservers, the storcon solves the problem by not scheduling stuff to them unless there has been a positive heartbeat response. We can't do the same for safekeepers though, otherwise a single down safekeeper would mean we can't create new timelines in smaller regions where there is only three safekeepers in total. So far we have created safekeepers as `pause` but this adds a manual step to safekeeper deployment which is prone to oversight. We want things to be automatted. So we introduce a new state `activating` that acts just like `pause`, except that we automatically transition the policy to `active` once we get a positive heartbeat from the safekeeper. For `pause`, we always keep the safekeeper paused. --- libs/pageserver_api/src/controller_api.rs | 3 + .../down.sql | 1 + .../up.sql | 1 + storage_controller/src/persistence.rs | 42 ++++++++++ storage_controller/src/service.rs | 61 +++++++++++---- .../src/service/safekeeper_service.rs | 47 +++++++----- .../regress/test_storage_controller.py | 76 ++++++++++++++++--- 7 files changed, 187 insertions(+), 44 deletions(-) create mode 100644 storage_controller/migrations/2025-07-02-170751_safekeeper_default_no_pause/down.sql create mode 100644 storage_controller/migrations/2025-07-02-170751_safekeeper_default_no_pause/up.sql diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index a8080a57e9..dc9fab2bdb 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -420,6 +420,7 @@ impl From for String { #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)] pub enum SkSchedulingPolicy { Active, + Activating, Pause, Decomissioned, } @@ -430,6 +431,7 @@ impl FromStr for SkSchedulingPolicy { fn from_str(s: &str) -> Result { Ok(match s { "active" => Self::Active, + "activating" => Self::Activating, "pause" => Self::Pause, "decomissioned" => Self::Decomissioned, _ => { @@ -446,6 +448,7 @@ impl From for String { use SkSchedulingPolicy::*; match value { Active => "active", + Activating => "activating", Pause => "pause", Decomissioned => "decomissioned", } diff --git a/storage_controller/migrations/2025-07-02-170751_safekeeper_default_no_pause/down.sql b/storage_controller/migrations/2025-07-02-170751_safekeeper_default_no_pause/down.sql new file mode 100644 index 0000000000..bc9b501189 --- /dev/null +++ b/storage_controller/migrations/2025-07-02-170751_safekeeper_default_no_pause/down.sql @@ -0,0 +1 @@ +ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'pause'; \ No newline at end of file diff --git a/storage_controller/migrations/2025-07-02-170751_safekeeper_default_no_pause/up.sql b/storage_controller/migrations/2025-07-02-170751_safekeeper_default_no_pause/up.sql new file mode 100644 index 0000000000..18c89bed7b --- /dev/null +++ b/storage_controller/migrations/2025-07-02-170751_safekeeper_default_no_pause/up.sql @@ -0,0 +1 @@ +ALTER TABLE safekeepers ALTER COLUMN scheduling_policy SET DEFAULT 'activating'; \ No newline at end of file diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 56f4d03111..aaf71624ae 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1388,6 +1388,48 @@ impl Persistence { .await } + /// Activate the given safekeeper, ensuring that there is no TOCTOU. + /// Returns `Some` if the safekeeper has indeed been activating (or already active). Other states return `None`. + pub(crate) async fn activate_safekeeper(&self, id_: i64) -> Result, DatabaseError> { + use crate::schema::safekeepers::dsl::*; + + self.with_conn(move |conn| { + Box::pin(async move { + #[derive(Insertable, AsChangeset)] + #[diesel(table_name = crate::schema::safekeepers)] + struct UpdateSkSchedulingPolicy<'a> { + id: i64, + scheduling_policy: &'a str, + } + let scheduling_policy_active = String::from(SkSchedulingPolicy::Active); + let scheduling_policy_activating = String::from(SkSchedulingPolicy::Activating); + + let rows_affected = diesel::update( + safekeepers.filter(id.eq(id_)).filter( + scheduling_policy + .eq(scheduling_policy_activating) + .or(scheduling_policy.eq(&scheduling_policy_active)), + ), + ) + .set(scheduling_policy.eq(&scheduling_policy_active)) + .execute(conn) + .await?; + + if rows_affected == 0 { + return Ok(Some(())); + } + if rows_affected != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({rows_affected})", + ))); + } + + Ok(Some(())) + }) + }) + .await + } + /// Persist timeline. Returns if the timeline was newly inserted. If it wasn't, we haven't done any writes. pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult { use crate::schema::timelines; diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index d76fcd6f7b..75b0876b38 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -31,8 +31,8 @@ use pageserver_api::controller_api::{ AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy, ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, - TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse, - TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest, + SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard, + TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse, }; use pageserver_api::models::{ @@ -1249,7 +1249,7 @@ impl Service { } /// Heartbeat all storage nodes once in a while. #[instrument(skip_all)] - async fn spawn_heartbeat_driver(&self) { + async fn spawn_heartbeat_driver(self: &Arc) { self.startup_complete.clone().wait().await; let mut interval = tokio::time::interval(self.config.heartbeat_interval); @@ -1376,18 +1376,51 @@ impl Service { } } if let Ok(deltas) = res_sk { - let mut locked = self.inner.write().unwrap(); - let mut safekeepers = (*locked.safekeepers).clone(); - for (id, state) in deltas.0 { - let Some(sk) = safekeepers.get_mut(&id) else { - tracing::info!( - "Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}" - ); - continue; - }; - sk.set_availability(state); + let mut to_activate = Vec::new(); + { + let mut locked = self.inner.write().unwrap(); + let mut safekeepers = (*locked.safekeepers).clone(); + + for (id, state) in deltas.0 { + let Some(sk) = safekeepers.get_mut(&id) else { + tracing::info!( + "Couldn't update safekeeper safekeeper state for id {id} from heartbeat={state:?}" + ); + continue; + }; + if sk.scheduling_policy() == SkSchedulingPolicy::Activating + && let SafekeeperState::Available { .. } = state + { + to_activate.push(id); + } + sk.set_availability(state); + } + locked.safekeepers = Arc::new(safekeepers); + } + for sk_id in to_activate { + // TODO this can race with set_scheduling_policy (can create disjoint DB <-> in-memory state) + tracing::info!("Activating safekeeper {sk_id}"); + match self.persistence.activate_safekeeper(sk_id.0 as i64).await { + Ok(Some(())) => {} + Ok(None) => { + tracing::info!( + "safekeeper {sk_id} has been removed from db or has different scheduling policy than active or activating" + ); + } + Err(e) => { + tracing::warn!("couldn't apply activation of {sk_id} to db: {e}"); + continue; + } + } + if let Err(e) = self + .set_safekeeper_scheduling_policy_in_mem(sk_id, SkSchedulingPolicy::Active) + .await + { + tracing::info!("couldn't activate safekeeper {sk_id} in memory: {e}"); + continue; + } + tracing::info!("Activation of safekeeper {sk_id} done"); } - locked.safekeepers = Arc::new(safekeepers); } } } diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 1c0e30f639..90ea48dd7b 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -805,7 +805,7 @@ impl Service { Safekeeper::from_persistence( crate::persistence::SafekeeperPersistence::from_upsert( record, - SkSchedulingPolicy::Pause, + SkSchedulingPolicy::Activating, ), CancellationToken::new(), use_https, @@ -846,27 +846,36 @@ impl Service { .await?; let node_id = NodeId(id as u64); // After the change has been persisted successfully, update the in-memory state - { - let mut locked = self.inner.write().unwrap(); - let mut safekeepers = (*locked.safekeepers).clone(); - let sk = safekeepers - .get_mut(&node_id) - .ok_or(DatabaseError::Logical("Not found".to_string()))?; - sk.set_scheduling_policy(scheduling_policy); + self.set_safekeeper_scheduling_policy_in_mem(node_id, scheduling_policy) + .await + } - match scheduling_policy { - SkSchedulingPolicy::Active => { - locked - .safekeeper_reconcilers - .start_reconciler(node_id, self); - } - SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => { - locked.safekeeper_reconcilers.stop_reconciler(node_id); - } + pub(crate) async fn set_safekeeper_scheduling_policy_in_mem( + self: &Arc, + node_id: NodeId, + scheduling_policy: SkSchedulingPolicy, + ) -> Result<(), DatabaseError> { + let mut locked = self.inner.write().unwrap(); + let mut safekeepers = (*locked.safekeepers).clone(); + let sk = safekeepers + .get_mut(&node_id) + .ok_or(DatabaseError::Logical("Not found".to_string()))?; + sk.set_scheduling_policy(scheduling_policy); + + match scheduling_policy { + SkSchedulingPolicy::Active => { + locked + .safekeeper_reconcilers + .start_reconciler(node_id, self); + } + SkSchedulingPolicy::Decomissioned + | SkSchedulingPolicy::Pause + | SkSchedulingPolicy::Activating => { + locked.safekeeper_reconcilers.stop_reconciler(node_id); } - - locked.safekeepers = Arc::new(safekeepers); } + + locked.safekeepers = Arc::new(safekeepers); Ok(()) } diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 17439fd811..c644ff569e 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3626,18 +3626,21 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): # some small tests for the scheduling policy querying and returning APIs newest_info = target.get_safekeeper(inserted["id"]) assert newest_info - assert newest_info["scheduling_policy"] == "Pause" - target.safekeeper_scheduling_policy(inserted["id"], "Active") - newest_info = target.get_safekeeper(inserted["id"]) - assert newest_info - assert newest_info["scheduling_policy"] == "Active" - # Ensure idempotency - target.safekeeper_scheduling_policy(inserted["id"], "Active") - newest_info = target.get_safekeeper(inserted["id"]) - assert newest_info - assert newest_info["scheduling_policy"] == "Active" - # change back to paused again + assert ( + newest_info["scheduling_policy"] == "Activating" + or newest_info["scheduling_policy"] == "Active" + ) target.safekeeper_scheduling_policy(inserted["id"], "Pause") + newest_info = target.get_safekeeper(inserted["id"]) + assert newest_info + assert newest_info["scheduling_policy"] == "Pause" + # Ensure idempotency + target.safekeeper_scheduling_policy(inserted["id"], "Pause") + newest_info = target.get_safekeeper(inserted["id"]) + assert newest_info + assert newest_info["scheduling_policy"] == "Pause" + # change back to active again + target.safekeeper_scheduling_policy(inserted["id"], "Active") def storcon_heartbeat(): assert env.storage_controller.log_contains( @@ -3650,6 +3653,57 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned") +@run_only_on_default_postgres("this is like a 'unit test' against storcon db") +def test_safekeeper_activating_to_active(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_configs() + env.start() + + fake_id = 5 + + target = env.storage_controller + + assert target.get_safekeeper(fake_id) is None + + start_sks = target.get_safekeepers() + + sk_0 = env.safekeepers[0] + + body = { + "active": True, + "id": fake_id, + "created_at": "2023-10-25T09:11:25Z", + "updated_at": "2024-08-28T11:32:43Z", + "region_id": "aws-eu-central-1", + "host": "localhost", + "port": sk_0.port.pg, + "http_port": sk_0.port.http, + "https_port": None, + "version": 5957, + "availability_zone_id": "eu-central-1a", + } + + target.on_safekeeper_deploy(fake_id, body) + + inserted = target.get_safekeeper(fake_id) + assert inserted is not None + assert target.get_safekeepers() == start_sks + [inserted] + assert eq_safekeeper_records(body, inserted) + + def safekeeper_is_active(): + newest_info = target.get_safekeeper(inserted["id"]) + assert newest_info + assert newest_info["scheduling_policy"] == "Active" + + wait_until(safekeeper_is_active) + + target.safekeeper_scheduling_policy(inserted["id"], "Activating") + + wait_until(safekeeper_is_active) + + # Now decomission it + target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned") + + def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool: compared = [dict(a), dict(b)]