From 590301df08b4eb3e8afc7afa7e3a91b6ab5dc420 Mon Sep 17 00:00:00 2001 From: Alexander Sarantcev <99037063+ephemeralsad@users.noreply.github.com> Date: Fri, 6 Jun 2025 14:16:55 +0400 Subject: [PATCH] storcon: Introduce deletion tombstones to support flaky node scenario (#12096) ## Problem Removed nodes can re-add themselves on restart if not properly tombstoned. We need a mechanism (e.g. soft-delete flag) to prevent this, especially in cases where the node is unreachable. More details there: #12036 ## Summary of changes - Introduced `NodeLifecycle` enum to represent node lifecycle states. - Added a string representation of `NodeLifecycle` to the `nodes` table. - Implemented node removal using a tombstone mechanism. - Introduced `/debug/v1/tombstone*` handlers to manage the tombstone state. --- control_plane/storcon_cli/src/main.rs | 41 +++++++ libs/pageserver_api/src/controller_api.rs | 29 +++++ .../down.sql | 1 + .../up.sql | 1 + storage_controller/src/http.rs | 50 ++++++++ storage_controller/src/node.rs | 6 +- storage_controller/src/persistence.rs | 115 +++++++++++++++--- storage_controller/src/schema.rs | 1 + storage_controller/src/service.rs | 57 ++++++++- test_runner/fixtures/neon_fixtures.py | 16 +++ .../regress/test_storage_controller.py | 52 ++++++++ 11 files changed, 345 insertions(+), 24 deletions(-) create mode 100644 storage_controller/migrations/2025-06-01-201442_add_lifecycle_to_nodes/down.sql create mode 100644 storage_controller/migrations/2025-06-01-201442_add_lifecycle_to_nodes/up.sql diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 19c686dcfd..1a9e944e07 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -61,10 +61,16 @@ enum Command { #[arg(long)] scheduling: Option, }, + // Set a node status as deleted. NodeDelete { #[arg(long)] node_id: NodeId, }, + /// Delete a tombstone of node from the storage controller. + NodeDeleteTombstone { + #[arg(long)] + node_id: NodeId, + }, /// Modify a tenant's policies in the storage controller TenantPolicy { #[arg(long)] @@ -82,6 +88,8 @@ enum Command { }, /// List nodes known to the storage controller Nodes {}, + /// List soft deleted nodes known to the storage controller + NodeTombstones {}, /// List tenants known to the storage controller Tenants { /// If this field is set, it will list the tenants on a specific node @@ -900,6 +908,39 @@ async fn main() -> anyhow::Result<()> { .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None) .await?; } + Command::NodeDeleteTombstone { node_id } => { + storcon_client + .dispatch::<(), ()>( + Method::DELETE, + format!("debug/v1/tombstone/{node_id}"), + None, + ) + .await?; + } + Command::NodeTombstones {} => { + let mut resp = storcon_client + .dispatch::<(), Vec>( + Method::GET, + "debug/v1/tombstone".to_string(), + None, + ) + .await?; + + resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr)); + + let mut table = comfy_table::Table::new(); + table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]); + for node in resp { + table.add_row([ + format!("{}", node.id), + node.listen_http_addr, + node.availability_zone_id, + format!("{:?}", node.scheduling), + format!("{:?}", node.availability), + ]); + } + println!("{table}"); + } Command::TenantSetTimeBasedEviction { tenant_id, period, diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index c5b49edba0..ae792cc81c 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -344,6 +344,35 @@ impl Default for ShardSchedulingPolicy { } } +#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)] +pub enum NodeLifecycle { + Active, + Deleted, +} + +impl FromStr for NodeLifecycle { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "active" => Ok(Self::Active), + "deleted" => Ok(Self::Deleted), + _ => Err(anyhow::anyhow!("Unknown node lifecycle '{s}'")), + } + } +} + +impl From for String { + fn from(value: NodeLifecycle) -> String { + use NodeLifecycle::*; + match value { + Active => "active", + Deleted => "deleted", + } + .to_string() + } +} + #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)] pub enum NodeSchedulingPolicy { Active, diff --git a/storage_controller/migrations/2025-06-01-201442_add_lifecycle_to_nodes/down.sql b/storage_controller/migrations/2025-06-01-201442_add_lifecycle_to_nodes/down.sql new file mode 100644 index 0000000000..a09acb916b --- /dev/null +++ b/storage_controller/migrations/2025-06-01-201442_add_lifecycle_to_nodes/down.sql @@ -0,0 +1 @@ +ALTER TABLE nodes DROP COLUMN lifecycle; diff --git a/storage_controller/migrations/2025-06-01-201442_add_lifecycle_to_nodes/up.sql b/storage_controller/migrations/2025-06-01-201442_add_lifecycle_to_nodes/up.sql new file mode 100644 index 0000000000..e03a0cadba --- /dev/null +++ b/storage_controller/migrations/2025-06-01-201442_add_lifecycle_to_nodes/up.sql @@ -0,0 +1 @@ +ALTER TABLE nodes ADD COLUMN lifecycle VARCHAR NOT NULL DEFAULT 'active'; diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 2b1c0db12f..705b81077e 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -907,6 +907,42 @@ async fn handle_node_delete(req: Request) -> Result, ApiErr json_response(StatusCode::OK, state.service.node_delete(node_id).await?) } +async fn handle_tombstone_list(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let state = get_state(&req); + let mut nodes = state.service.tombstone_list().await?; + nodes.sort_by_key(|n| n.get_id()); + let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::>(); + + json_response(StatusCode::OK, api_nodes) +} + +async fn handle_tombstone_delete(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + json_response( + StatusCode::OK, + state.service.tombstone_delete(node_id).await?, + ) +} + async fn handle_node_configure(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; @@ -2062,6 +2098,20 @@ pub fn make_router( .post("/debug/v1/node/:node_id/drop", |r| { named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop")) }) + .delete("/debug/v1/tombstone/:node_id", |r| { + named_request_span( + r, + handle_tombstone_delete, + RequestName("debug_v1_tombstone_delete"), + ) + }) + .get("/debug/v1/tombstone", |r| { + named_request_span( + r, + handle_tombstone_list, + RequestName("debug_v1_tombstone_list"), + ) + }) .post("/debug/v1/tenant/:tenant_id/import", |r| { named_request_span( r, diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index e180c49b43..8e0f1873e5 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::time::Duration; use pageserver_api::controller_api::{ - AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, + AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeLifecycle, NodeRegisterRequest, NodeSchedulingPolicy, TenantLocateResponseShard, }; use pageserver_api::shard::TenantShardId; @@ -29,6 +29,7 @@ pub(crate) struct Node { availability: NodeAvailability, scheduling: NodeSchedulingPolicy, + lifecycle: NodeLifecycle, listen_http_addr: String, listen_http_port: u16, @@ -228,6 +229,7 @@ impl Node { listen_pg_addr, listen_pg_port, scheduling: NodeSchedulingPolicy::Active, + lifecycle: NodeLifecycle::Active, availability: NodeAvailability::Offline, availability_zone_id, use_https, @@ -239,6 +241,7 @@ impl Node { NodePersistence { node_id: self.id.0 as i64, scheduling_policy: self.scheduling.into(), + lifecycle: self.lifecycle.into(), listen_http_addr: self.listen_http_addr.clone(), listen_http_port: self.listen_http_port as i32, listen_https_port: self.listen_https_port.map(|x| x as i32), @@ -263,6 +266,7 @@ impl Node { availability: NodeAvailability::Offline, scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy) .expect("Bad scheduling policy in DB"), + lifecycle: NodeLifecycle::from_str(&np.lifecycle).expect("Bad lifecycle in DB"), listen_http_addr: np.listen_http_addr, listen_http_port: np.listen_http_port as u16, listen_https_port: np.listen_https_port.map(|x| x as u16), diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 052c0f02eb..2edfe3a338 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -19,7 +19,7 @@ use futures::FutureExt; use futures::future::BoxFuture; use itertools::Itertools; use pageserver_api::controller_api::{ - AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy, + AvailabilityZone, MetadataHealthRecord, NodeLifecycle, NodeSchedulingPolicy, PlacementPolicy, SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy, }; use pageserver_api::models::{ShardImportStatus, TenantConfig}; @@ -102,6 +102,7 @@ pub(crate) enum DatabaseOperation { UpdateNode, DeleteNode, ListNodes, + ListTombstones, BeginShardSplit, CompleteShardSplit, AbortShardSplit, @@ -357,6 +358,8 @@ impl Persistence { } /// When a node is first registered, persist it before using it for anything + /// If the provided node_id already exists, it will be error. + /// The common case is when a node marked for deletion wants to register. pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> { let np = &node.to_persistent(); self.with_measured_conn(DatabaseOperation::InsertNode, move |conn| { @@ -373,19 +376,41 @@ impl Persistence { /// At startup, populate the list of nodes which our shards may be placed on pub(crate) async fn list_nodes(&self) -> DatabaseResult> { - let nodes: Vec = self + use crate::schema::nodes::dsl::*; + + let result: Vec = self .with_measured_conn(DatabaseOperation::ListNodes, move |conn| { Box::pin(async move { Ok(crate::schema::nodes::table + .filter(lifecycle.ne(String::from(NodeLifecycle::Deleted))) .load::(conn) .await?) }) }) .await?; - tracing::info!("list_nodes: loaded {} nodes", nodes.len()); + tracing::info!("list_nodes: loaded {} nodes", result.len()); - Ok(nodes) + Ok(result) + } + + pub(crate) async fn list_tombstones(&self) -> DatabaseResult> { + use crate::schema::nodes::dsl::*; + + let result: Vec = self + .with_measured_conn(DatabaseOperation::ListTombstones, move |conn| { + Box::pin(async move { + Ok(crate::schema::nodes::table + .filter(lifecycle.eq(String::from(NodeLifecycle::Deleted))) + .load::(conn) + .await?) + }) + }) + .await?; + + tracing::info!("list_tombstones: loaded {} nodes", result.len()); + + Ok(result) } pub(crate) async fn update_node( @@ -404,6 +429,7 @@ impl Persistence { Box::pin(async move { let updated = diesel::update(nodes) .filter(node_id.eq(input_node_id.0 as i64)) + .filter(lifecycle.ne(String::from(NodeLifecycle::Deleted))) .set(values) .execute(conn) .await?; @@ -447,6 +473,57 @@ impl Persistence { .await } + /// Tombstone is a special state where the node is not deleted from the database, + /// but it is not available for usage. + /// The main reason for it is to prevent the flaky node to register. + pub(crate) async fn set_tombstone(&self, del_node_id: NodeId) -> DatabaseResult<()> { + use crate::schema::nodes::dsl::*; + self.update_node( + del_node_id, + lifecycle.eq(String::from(NodeLifecycle::Deleted)), + ) + .await + } + + pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> { + use crate::schema::nodes::dsl::*; + self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| { + Box::pin(async move { + // You can hard delete a node only if it has a tombstone. + // So we need to check if the node has lifecycle set to deleted. + let node_to_delete = nodes + .filter(node_id.eq(del_node_id.0 as i64)) + .first::(conn) + .await + .optional()?; + + if let Some(np) = node_to_delete { + let lc = NodeLifecycle::from_str(&np.lifecycle).map_err(|e| { + DatabaseError::Logical(format!( + "Node {} has invalid lifecycle: {}", + del_node_id, e + )) + })?; + + if lc != NodeLifecycle::Deleted { + return Err(DatabaseError::Logical(format!( + "Node {} was not soft deleted before, cannot hard delete it", + del_node_id + ))); + } + + diesel::delete(nodes) + .filter(node_id.eq(del_node_id.0 as i64)) + .execute(conn) + .await?; + } + + Ok(()) + }) + }) + .await + } + /// At startup, load the high level state for shards, such as their config + policy. This will /// be enriched at runtime with state discovered on pageservers. /// @@ -543,21 +620,6 @@ impl Persistence { .await } - pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> { - use crate::schema::nodes::dsl::*; - self.with_measured_conn(DatabaseOperation::DeleteNode, move |conn| { - Box::pin(async move { - diesel::delete(nodes) - .filter(node_id.eq(del_node_id.0 as i64)) - .execute(conn) - .await?; - - Ok(()) - }) - }) - .await - } - /// When a tenant invokes the /re-attach API, this function is responsible for doing an efficient /// batched increment of the generations of all tenants whose generation_pageserver is equal to /// the node that called /re-attach. @@ -571,6 +633,20 @@ impl Persistence { let updated = self .with_measured_conn(DatabaseOperation::ReAttach, move |conn| { Box::pin(async move { + // Check if the node is not marked as deleted + let deleted_node: i64 = nodes + .filter(node_id.eq(input_node_id.0 as i64)) + .filter(lifecycle.eq(String::from(NodeLifecycle::Deleted))) + .count() + .get_result(conn) + .await?; + if deleted_node > 0 { + return Err(DatabaseError::Logical(format!( + "Node {} is marked as deleted, re-attach is not allowed", + input_node_id + ))); + } + let rows_updated = diesel::update(tenant_shards) .filter(generation_pageserver.eq(input_node_id.0 as i64)) .set(generation.eq(generation + 1)) @@ -2048,6 +2124,7 @@ pub(crate) struct NodePersistence { pub(crate) listen_pg_port: i32, pub(crate) availability_zone_id: String, pub(crate) listen_https_port: Option, + pub(crate) lifecycle: String, } /// Tenant metadata health status that are stored durably. diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index 20be9bb5ca..f5807cfcd2 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -33,6 +33,7 @@ diesel::table! { listen_pg_port -> Int4, availability_zone_id -> Varchar, listen_https_port -> Nullable, + lifecycle -> Varchar, } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 790797bae2..cb29993e8c 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -166,6 +166,7 @@ enum NodeOperations { Register, Configure, Delete, + DeleteTombstone, } /// The leadership status for the storage controller process. @@ -6909,7 +6910,7 @@ impl Service { /// detaching or deleting it on pageservers. We do not try and re-schedule any /// tenants that were on this node. pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> { - self.persistence.delete_node(node_id).await?; + self.persistence.set_tombstone(node_id).await?; let mut locked = self.inner.write().unwrap(); @@ -7033,9 +7034,10 @@ impl Service { // That is safe because in Service::spawn we only use generation_pageserver if it refers to a node // that exists. - // 2. Actually delete the node from the database and from in-memory state + // 2. Actually delete the node from in-memory state and set tombstone to the database + // for preventing the node to register again. tracing::info!("Deleting node from database"); - self.persistence.delete_node(node_id).await?; + self.persistence.set_tombstone(node_id).await?; Ok(()) } @@ -7054,6 +7056,35 @@ impl Service { Ok(nodes) } + pub(crate) async fn tombstone_list(&self) -> Result, ApiError> { + self.persistence + .list_tombstones() + .await? + .into_iter() + .map(|np| Node::from_persistent(np, false)) + .collect::, _>>() + .map_err(ApiError::InternalServerError) + } + + pub(crate) async fn tombstone_delete(&self, node_id: NodeId) -> Result<(), ApiError> { + let _node_lock = trace_exclusive_lock( + &self.node_op_locks, + node_id, + NodeOperations::DeleteTombstone, + ) + .await; + + if matches!(self.get_node(node_id).await, Err(ApiError::NotFound(_))) { + self.persistence.delete_node(node_id).await?; + Ok(()) + } else { + Err(ApiError::Conflict(format!( + "Node {} is in use, consider using tombstone API first", + node_id + ))) + } + } + pub(crate) async fn get_node(&self, node_id: NodeId) -> Result { self.inner .read() @@ -7224,7 +7255,25 @@ impl Service { }; match registration_status { - RegistrationStatus::New => self.persistence.insert_node(&new_node).await?, + RegistrationStatus::New => { + self.persistence.insert_node(&new_node).await.map_err(|e| { + if matches!( + e, + crate::persistence::DatabaseError::Query( + diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + ) + ) + ) { + // The node can be deleted by tombstone API, and not show up in the list of nodes. + // If you see this error, check tombstones first. + ApiError::Conflict(format!("Node {} is already exists", new_node.get_id())) + } else { + ApiError::from(e) + } + })?; + } RegistrationStatus::NeedUpdate => { self.persistence .update_node_on_registration( diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index db3f080261..5223e34baf 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2054,6 +2054,14 @@ class NeonStorageController(MetricsGetter, LogUtils): headers=self.headers(TokenScope.ADMIN), ) + def tombstone_delete(self, node_id): + log.info(f"tombstone_delete({node_id})") + self.request( + "DELETE", + f"{self.api}/debug/v1/tombstone/{node_id}", + headers=self.headers(TokenScope.ADMIN), + ) + def node_drain(self, node_id): log.info(f"node_drain({node_id})") self.request( @@ -2110,6 +2118,14 @@ class NeonStorageController(MetricsGetter, LogUtils): ) return response.json() + def tombstone_list(self): + response = self.request( + "GET", + f"{self.api}/debug/v1/tombstone", + headers=self.headers(TokenScope.ADMIN), + ) + return response.json() + def tenant_shard_dump(self): """ Debug listing API: dumps the internal map of tenant shards diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 346ef0951d..5e0dd780c3 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3093,6 +3093,58 @@ def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvB wait_until(reconfigure_node_again) +def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_pageservers = 3 + + env = neon_env_builder.init_start() + + def assert_nodes_count(n: int): + nodes = env.storage_controller.node_list() + assert len(nodes) == n + + # Nodes count must remain the same before deletion + assert_nodes_count(3) + + ps = env.pageservers[0] + env.storage_controller.node_delete(ps.id) + + # After deletion, the node count must be reduced + assert_nodes_count(2) + + # Running pageserver CLI init in a separate thread + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + log.info("Restarting tombstoned pageserver...") + ps.stop() + ps_start_fut = executor.submit(lambda: ps.start(await_active=False)) + + # After deleted pageserver restart, the node count must remain the same + assert_nodes_count(2) + + tombstones = env.storage_controller.tombstone_list() + assert len(tombstones) == 1 and tombstones[0]["id"] == ps.id + + env.storage_controller.tombstone_delete(ps.id) + + tombstones = env.storage_controller.tombstone_list() + assert len(tombstones) == 0 + + # Wait for the pageserver start operation to complete. + # If it fails with an exception, we try restarting the pageserver since the failure + # may be due to the storage controller refusing to register the node. + # However, if we get a TimeoutError that means the pageserver is completely hung, + # which is an unexpected failure mode that we'll let propagate up. + try: + ps_start_fut.result(timeout=20) + except TimeoutError: + raise + except Exception: + log.info("Restarting deleted pageserver...") + ps.restart() + + # Finally, the node can be registered again after tombstone is deleted + wait_until(lambda: assert_nodes_count(3)) + + def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder): """ The storage controller is meant to handle the case where a timeline CRUD operation races