diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index b2c5dfe58a..815f5c940f 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -56,6 +56,10 @@ enum Command { #[arg(long)] scheduling: Option, }, + NodeDelete { + #[arg(long)] + node_id: NodeId, + }, /// Modify a tenant's policies in the storage controller TenantPolicy { #[arg(long)] @@ -357,13 +361,16 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Delete status: {}", status); } Command::Nodes {} => { - let resp = storcon_client + let mut resp = storcon_client .dispatch::<(), Vec>( Method::GET, "control/v1/node".to_string(), None, ) .await?; + + resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr)); + let mut table = comfy_table::Table::new(); table.set_header(["Id", "Hostname", "Scheduling", "Availability"]); for node in resp { @@ -395,13 +402,16 @@ async fn main() -> anyhow::Result<()> { .await?; } Command::Tenants {} => { - let resp = storcon_client + let mut resp = storcon_client .dispatch::<(), Vec>( Method::GET, "control/v1/tenant".to_string(), None, ) .await?; + + resp.sort_by(|a, b| a.tenant_id.cmp(&b.tenant_id)); + let mut table = comfy_table::Table::new(); table.set_header([ "TenantId", @@ -650,6 +660,11 @@ async fn main() -> anyhow::Result<()> { .dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None) .await?; } + Command::NodeDelete { node_id } => { + storcon_client + .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None) + .await?; + } Command::TenantSetTimeBasedEviction { tenant_id, period, diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 7446ad53a2..3a62c0dd4f 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -456,6 +456,14 @@ async fn handle_node_drop(req: Request) -> Result, ApiError json_response(StatusCode::OK, state.service.node_drop(node_id).await?) } +async fn handle_node_delete(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + json_response(StatusCode::OK, state.service.node_delete(node_id).await?) +} + async fn handle_node_configure(mut req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; @@ -878,6 +886,9 @@ pub fn make_router( .post("/control/v1/node", |r| { named_request_span(r, handle_node_register, RequestName("control_v1_node")) }) + .delete("/control/v1/node/:node_id", |r| { + named_request_span(r, handle_node_delete, RequestName("control_v1_node_delete")) + }) .get("/control/v1/node", |r| { named_request_span(r, handle_node_list, RequestName("control_v1_node")) }) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index aada1939ee..b6e2b53191 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -2,6 +2,7 @@ use std::{ borrow::Cow, cmp::Ordering, collections::{BTreeMap, HashMap, HashSet}, + ops::Deref, path::PathBuf, str::FromStr, sync::Arc, @@ -115,12 +116,14 @@ enum TenantOperations { SecondaryDownload, TimelineCreate, TimelineDelete, + AttachHook, } #[derive(Clone, strum_macros::Display)] enum NodeOperations { Register, Configure, + Delete, } pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128; @@ -845,9 +848,10 @@ impl Service { tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(), sequence=%result.sequence ))] - fn process_result(&self, result: ReconcileResult) { + fn process_result(&self, mut result: ReconcileResult) { let mut locked = self.inner.write().unwrap(); - let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else { + let (nodes, tenants, _scheduler) = locked.parts_mut(); + let Some(tenant) = tenants.get_mut(&result.tenant_shard_id) else { // A reconciliation result might race with removing a tenant: drop results for // tenants that aren't in our map. return; @@ -864,6 +868,13 @@ impl Service { // Let the TenantShard know it is idle. tenant.reconcile_complete(result.sequence); + // In case a node was deleted while this reconcile is in flight, filter it out of the update we will + // make to the tenant + result + .observed + .locations + .retain(|node_id, _loc| nodes.contains_key(node_id)); + match result.result { Ok(()) => { for (node_id, loc) in &result.observed.locations { @@ -873,6 +884,7 @@ impl Service { tracing::info!("Setting observed location {} to None", node_id,) } } + tenant.observed = result.observed; tenant.waiter.advance(result.sequence); } @@ -1109,8 +1121,16 @@ impl Service { // We will populate intent properly later in [`Self::startup_reconcile`], initially populate // it with what we can infer: the node for which a generation was most recently issued. let mut intent = IntentState::new(); - if let Some(generation_pageserver) = tsp.generation_pageserver { - intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64))); + if let Some(generation_pageserver) = tsp.generation_pageserver.map(|n| NodeId(n as u64)) + { + if nodes.contains_key(&generation_pageserver) { + intent.set_attached(&mut scheduler, Some(generation_pageserver)); + } else { + // If a node was removed before being completely drained, it is legal for it to leave behind a `generation_pageserver` referring + // to a non-existent node, because node deletion doesn't block on completing the reconciliations that will issue new generations + // on different pageservers. + tracing::warn!("Tenant shard {tenant_shard_id} references non-existent node {generation_pageserver} in database, will be rescheduled"); + } } let new_tenant = TenantShard::from_persistent(tsp, intent)?; @@ -1237,7 +1257,7 @@ impl Service { let _tenant_lock = trace_exclusive_lock( &self.tenant_op_locks, attach_req.tenant_shard_id.tenant_id, - TenantOperations::ShardSplit, + TenantOperations::AttachHook, ) .await; @@ -4210,8 +4230,6 @@ impl Service { /// This is for debug/support only: we simply drop all state for a tenant, without /// detaching or deleting it on pageservers. We do not try and re-schedule any /// tenants that were on this node. - /// - /// TODO: proper node deletion API that unhooks things more gracefully pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> { self.persistence.delete_node(node_id).await?; @@ -4219,6 +4237,7 @@ impl Service { for shard in locked.tenants.values_mut() { shard.deref_node(node_id); + shard.observed.locations.remove(&node_id); } let mut nodes = (*locked.nodes).clone(); @@ -4230,6 +4249,94 @@ impl Service { Ok(()) } + /// If a node has any work on it, it will be rescheduled: this is "clean" in the sense + /// that we don't leave any bad state behind in the storage controller, but unclean + /// in the sense that we are not carefully draining the node. + pub(crate) async fn node_delete(&self, node_id: NodeId) -> Result<(), ApiError> { + let _node_lock = + trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Delete).await; + + // 1. Atomically update in-memory state: + // - set the scheduling state to Pause to make subsequent scheduling ops skip it + // - update shards' intents to exclude the node, and reschedule any shards whose intents we modified. + // - drop the node from the main nodes map, so that when running reconciles complete they do not + // re-insert references to this node into the ObservedState of shards + // - drop the node from the scheduler + { + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + + { + let mut nodes_mut = (*nodes).deref().clone(); + match nodes_mut.get_mut(&node_id) { + Some(node) => { + // We do not bother setting this in the database, because we're about to delete the row anyway, and + // if we crash it would not be desirable to leave the node paused after a restart. + node.set_scheduling(NodeSchedulingPolicy::Pause); + } + None => { + tracing::info!( + "Node not found: presuming this is a retry and returning success" + ); + return Ok(()); + } + } + + *nodes = Arc::new(nodes_mut); + } + + for (tenant_shard_id, shard) in tenants { + if shard.deref_node(node_id) { + // FIXME: we need to build a ScheduleContext that reflects this shard's peers, otherwise + // it won't properly do anti-affinity. + let mut schedule_context = ScheduleContext::default(); + + if let Err(e) = shard.schedule(scheduler, &mut schedule_context) { + // TODO: implement force flag to remove a node even if we can't reschedule + // a tenant + tracing::error!("Refusing to delete node, shard {tenant_shard_id} can't be rescheduled: {e}"); + return Err(e.into()); + } else { + tracing::info!( + "Rescheduled shard {tenant_shard_id} away from node during deletion" + ) + } + + self.maybe_reconcile_shard(shard, nodes); + } + + // Here we remove an existing observed location for the node we're removing, and it will + // not be re-added by a reconciler's completion because we filter out removed nodes in + // process_result. + // + // Note that we update the shard's observed state _after_ calling maybe_reconcile_shard: that + // means any reconciles we spawned will know about the node we're deleting, enabling them + // to do live migrations if it's still online. + shard.observed.locations.remove(&node_id); + } + + scheduler.node_remove(node_id); + + { + let mut nodes_mut = (**nodes).clone(); + nodes_mut.remove(&node_id); + *nodes = Arc::new(nodes_mut); + } + } + + // Note: some `generation_pageserver` columns on tenant shards in the database may still refer to + // the removed node, as this column means "The pageserver to which this generation was issued", and + // their generations won't get updated until the reconcilers moving them away from this node complete. + // That is safe because in Service::spawn we only use generation_pageserver if it refers to a node + // that exists. + + // 2. Actually delete the node from the database and from in-memory state + tracing::info!("Deleting node from database"); + self.persistence.delete_node(node_id).await?; + + Ok(()) + } + pub(crate) async fn node_list(&self) -> Result, ApiError> { let nodes = { self.inner diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 2ddab58aaf..2574dc297a 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -1229,18 +1229,27 @@ impl TenantShard { } } - // If we had any state at all referring to this node ID, drop it. Does not - // attempt to reschedule. - pub(crate) fn deref_node(&mut self, node_id: NodeId) { + /// If we had any state at all referring to this node ID, drop it. Does not + /// attempt to reschedule. + /// + /// Returns true if we modified the node's intent state. + pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool { + let mut intent_modified = false; + + // Drop if this node was our attached intent if self.intent.attached == Some(node_id) { self.intent.attached = None; + intent_modified = true; } + // Drop from the list of secondaries, and check if we modified it + let had_secondaries = self.intent.secondary.len(); self.intent.secondary.retain(|n| n != &node_id); - - self.observed.locations.remove(&node_id); + intent_modified |= self.intent.secondary.len() != had_secondaries; debug_assert!(!self.intent.all_pageservers().contains(&node_id)); + + intent_modified } pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5ca31644a9..463e4a3b01 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2287,6 +2287,14 @@ class NeonStorageController(MetricsGetter, LogUtils): headers=self.headers(TokenScope.ADMIN), ) + def node_delete(self, node_id): + log.info(f"node_delete({node_id})") + self.request( + "DELETE", + f"{self.env.storage_controller_api}/control/v1/node/{node_id}", + headers=self.headers(TokenScope.ADMIN), + ) + def node_drain(self, node_id): log.info(f"node_drain({node_id})") self.request( diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 65649e0c0a..1e5e320e0e 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -93,6 +93,29 @@ check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif( ) +def fixup_storage_controller(env: NeonEnv): + """ + After importing a repo_dir, we need to massage the storage controller's state a bit: it will have + initially started up with no nodes, but some tenants, and thereby those tenants won't be scheduled + anywhere. + + After NeonEnv.start() is done (i.e. nodes are started + registered), call this function to get + the storage controller into a good state. + + This function should go away once compat tests carry the controller database in their snapshots, so + that the controller properly remembers nodes between creating + restoring the snapshot. + """ + env.storage_controller.allowed_errors.extend( + [ + ".*Tenant shard .+ references non-existent node.*", + ".*Failed to schedule tenant .+ at startup.*", + ] + ) + env.storage_controller.stop() + env.storage_controller.start() + env.storage_controller.reconcile_until_idle() + + @pytest.mark.xdist_group("compatibility") @pytest.mark.order(before="test_forward_compatibility") def test_create_snapshot( @@ -175,6 +198,7 @@ def test_backward_compatibility( neon_env_builder.num_safekeepers = 3 env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo") neon_env_builder.start() + fixup_storage_controller(env) check_neon_works( env, @@ -263,6 +287,7 @@ def test_forward_compatibility( assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version) neon_env_builder.start() + fixup_storage_controller(env) # ensure the specified pageserver is running assert env.pageserver.log_contains("git-env:" + prev_pageserver_version) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index d37f7aae3d..741f16685e 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -1611,3 +1611,91 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder): env.storage_controller.cancel_node_drain(ps_id_to_drain) env.storage_controller.poll_node_status(ps_id_to_drain, "Active", max_attempts=6, backoff=2) + + +@pytest.mark.parametrize("while_offline", [True, False]) +def test_storage_controller_node_deletion( + neon_env_builder: NeonEnvBuilder, + compute_reconfigure_listener: ComputeReconfigure, + while_offline: bool, +): + """ + Test that deleting a node works & properly reschedules everything that was on the node. + """ + neon_env_builder.num_pageservers = 3 + env = neon_env_builder.init_configs() + env.start() + + tenant_count = 10 + shard_count_per_tenant = 8 + tenant_ids = [] + for _ in range(0, tenant_count): + tid = TenantId.generate() + tenant_ids.append(tid) + env.neon_cli.create_tenant( + tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant + ) + + victim = env.pageservers[-1] + + # The procedure a human would follow is: + # 1. Mark pageserver scheduling=pause + # 2. Mark pageserver availability=offline to trigger migrations away from it + # 3. Wait for attachments to all move elsewhere + # 4. Call deletion API + # 5. Stop the node. + + env.storage_controller.node_configure(victim.id, {"scheduling": "Pause"}) + + if while_offline: + victim.stop(immediate=True) + env.storage_controller.node_configure(victim.id, {"availability": "Offline"}) + + def assert_shards_migrated(): + counts = get_node_shard_counts(env, tenant_ids) + elsewhere = sum(v for (k, v) in counts.items() if k != victim.id) + log.info(f"Shards on nodes other than on victim: {elsewhere}") + assert elsewhere == tenant_count * shard_count_per_tenant + + wait_until(30, 1, assert_shards_migrated) + + log.info(f"Deleting pageserver {victim.id}") + env.storage_controller.node_delete(victim.id) + + if not while_offline: + + def assert_victim_evacuated(): + counts = get_node_shard_counts(env, tenant_ids) + count = counts[victim.id] + log.info(f"Shards on node {victim.id}: {count}") + assert count == 0 + + wait_until(30, 1, assert_victim_evacuated) + + # The node should be gone from the list API + assert victim.id not in [n["id"] for n in env.storage_controller.node_list()] + + # No tenants should refer to the node in their intent + for tenant_id in tenant_ids: + describe = env.storage_controller.tenant_describe(tenant_id) + for shard in describe["shards"]: + assert shard["node_attached"] != victim.id + assert victim.id not in shard["node_secondary"] + + # Reconciles running during deletion should all complete + # FIXME: this currently doesn't work because the deletion schedules shards without a proper ScheduleContext, resulting + # in states that background_reconcile wants to optimize, but can't proceed with migrations yet because this is a short3 + # test that hasn't uploaded any heatmaps for secondaries. + # In the interim, just do a reconcile_all to enable the consistency check. + # env.storage_controller.reconcile_until_idle() + env.storage_controller.reconcile_all() + + # Controller should pass its own consistency checks + env.storage_controller.consistency_check() + + # The node should stay gone across a restart + env.storage_controller.stop() + env.storage_controller.start() + assert victim.id not in [n["id"] for n in env.storage_controller.node_list()] + env.storage_controller.reconcile_all() # FIXME: workaround for optimizations happening on startup, see FIXME above. + env.storage_controller.consistency_check()