diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index 1ef97e78eb..14cda0a289 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -31,6 +31,7 @@ pub(crate) enum PageserverState { Available { last_seen_at: Instant, utilization: PageserverUtilization, + new: bool, }, Offline, } @@ -127,6 +128,7 @@ impl HeartbeaterTask { heartbeat_futs.push({ let jwt_token = self.jwt_token.clone(); let cancel = self.cancel.clone(); + let new_node = !self.state.contains_key(node_id); // Clone the node and mark it as available such that the request // goes through to the pageserver even when the node is marked offline. @@ -159,6 +161,7 @@ impl HeartbeaterTask { PageserverState::Available { last_seen_at: Instant::now(), utilization, + new: new_node, } } else { PageserverState::Offline @@ -220,6 +223,7 @@ impl HeartbeaterTask { } }, Vacant(_) => { + // This is a new node. Don't generate a delta for it. deltas.push((node_id, ps_state.clone())); } } diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index 7b5513c908..34dcf0c642 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration}; use pageserver_api::{ controller_api::{ NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, - TenantLocateResponseShard, + TenantLocateResponseShard, UtilizationScore, }, shard::TenantShardId, }; @@ -116,6 +116,16 @@ impl Node { match (self.availability, availability) { (Offline, Active(_)) => ToActive, (Active(_), Offline) => ToOffline, + // Consider the case when the storage controller handles the re-attach of a node + // before the heartbeats detect that the node is back online. We still need + // [`Service::node_configure`] to attempt reconciliations for shards with an + // unknown observed location. + // The unsavoury match arm below handles this situation. + (Active(lhs), Active(rhs)) + if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() => + { + ToActive + } _ => Unchanged, } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index cf6a95bf0b..926332f946 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -12,7 +12,7 @@ use crate::{ id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard}, persistence::{AbortShardSplitStatus, TenantFilter}, reconciler::{ReconcileError, ReconcileUnits}, - scheduler::{ScheduleContext, ScheduleMode}, + scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, tenant_shard::{ MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction, }, @@ -747,29 +747,61 @@ impl Service { let res = self.heartbeater.heartbeat(nodes).await; if let Ok(deltas) = res { for (node_id, state) in deltas.0 { - let new_availability = match state { - PageserverState::Available { utilization, .. } => NodeAvailability::Active( - UtilizationScore(utilization.utilization_score), + let (new_node, new_availability) = match state { + PageserverState::Available { + utilization, new, .. + } => ( + new, + NodeAvailability::Active(UtilizationScore( + utilization.utilization_score, + )), ), - PageserverState::Offline => NodeAvailability::Offline, + PageserverState::Offline => (false, NodeAvailability::Offline), }; - let res = self - .node_configure(node_id, Some(new_availability), None) - .await; - match res { - Ok(()) => {} - Err(ApiError::NotFound(_)) => { - // This should be rare, but legitimate since the heartbeats are done - // on a snapshot of the nodes. - tracing::info!("Node {} was not found after heartbeat round", node_id); + if new_node { + // When the heartbeats detect a newly added node, we don't wish + // to attempt to reconcile the shards assigned to it. The node + // is likely handling it's re-attach response, so reconciling now + // would be counterproductive. + // + // Instead, update the in-memory state with the details learned about the + // node. + let mut locked = self.inner.write().unwrap(); + let (nodes, _tenants, scheduler) = locked.parts_mut(); + + let mut new_nodes = (**nodes).clone(); + + if let Some(node) = new_nodes.get_mut(&node_id) { + node.set_availability(new_availability); + scheduler.node_upsert(node); } - Err(err) => { - tracing::error!( - "Failed to update node {} after heartbeat round: {}", - node_id, - err - ); + + locked.nodes = Arc::new(new_nodes); + } else { + // This is the code path for geniune availability transitions (i.e node + // goes unavailable and/or comes back online). + let res = self + .node_configure(node_id, Some(new_availability), None) + .await; + + match res { + Ok(()) => {} + Err(ApiError::NotFound(_)) => { + // This should be rare, but legitimate since the heartbeats are done + // on a snapshot of the nodes. + tracing::info!( + "Node {} was not found after heartbeat round", + node_id + ); + } + Err(err) => { + tracing::error!( + "Failed to update node {} after heartbeat round: {}", + node_id, + err + ); + } } } } @@ -4316,6 +4348,16 @@ impl Service { continue; } + if !new_nodes + .values() + .any(|n| matches!(n.may_schedule(), MaySchedule::Yes(_))) + { + // Special case for when all nodes are unavailable and/or unschedulable: there is no point + // trying to reschedule since there's nowhere else to go. Without this + // branch we incorrectly detach tenants in response to node unavailability. + continue; + } + if tenant_shard.intent.demote_attached(scheduler, node_id) { tenant_shard.sequence = tenant_shard.sequence.next(); @@ -4353,6 +4395,12 @@ impl Service { // When a node comes back online, we must reconcile any tenant that has a None observed // location on the node. for tenant_shard in locked.tenants.values_mut() { + // If a reconciliation is already in progress, rely on the previous scheduling + // decision and skip triggering a new reconciliation. + if tenant_shard.reconciler.is_some() { + continue; + } + if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) { if observed_loc.conf.is_none() { self.maybe_reconcile_shard(tenant_shard, &new_nodes); diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index f41468210c..8624a45f45 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -934,19 +934,27 @@ class Failure: def clear(self, env: NeonEnv): raise NotImplementedError() + def nodes(self): + raise NotImplementedError() + class NodeStop(Failure): - def __init__(self, pageserver_id, immediate): - self.pageserver_id = pageserver_id + def __init__(self, pageserver_ids, immediate): + self.pageserver_ids = pageserver_ids self.immediate = immediate def apply(self, env: NeonEnv): - pageserver = env.get_pageserver(self.pageserver_id) - pageserver.stop(immediate=self.immediate) + for ps_id in self.pageserver_ids: + pageserver = env.get_pageserver(ps_id) + pageserver.stop(immediate=self.immediate) def clear(self, env: NeonEnv): - pageserver = env.get_pageserver(self.pageserver_id) - pageserver.start() + for ps_id in self.pageserver_ids: + pageserver = env.get_pageserver(ps_id) + pageserver.start() + + def nodes(self): + return self.pageserver_ids class PageserverFailpoint(Failure): @@ -962,6 +970,9 @@ class PageserverFailpoint(Failure): pageserver = env.get_pageserver(self.pageserver_id) pageserver.http_client().configure_failpoints((self.failpoint, "off")) + def nodes(self): + return [self.pageserver_id] + def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]: tenants = env.storage_controller.tenant_list() @@ -985,8 +996,9 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]: @pytest.mark.parametrize( "failure", [ - NodeStop(pageserver_id=1, immediate=False), - NodeStop(pageserver_id=1, immediate=True), + NodeStop(pageserver_ids=[1], immediate=False), + NodeStop(pageserver_ids=[1], immediate=True), + NodeStop(pageserver_ids=[1, 2], immediate=True), PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"), ], ) @@ -1039,33 +1051,50 @@ def test_storage_controller_heartbeats( wait_until(10, 1, tenants_placed) # ... then we apply the failure - offline_node_id = failure.pageserver_id - online_node_id = (set(range(1, len(env.pageservers) + 1)) - {offline_node_id}).pop() - env.get_pageserver(offline_node_id).allowed_errors.append( - # In the case of the failpoint failure, the impacted pageserver - # still believes it has the tenant attached since location - # config calls into it will fail due to being marked offline. - ".*Dropped remote consistent LSN updates.*", - ) + offline_node_ids = set(failure.nodes()) + online_node_ids = set(range(1, len(env.pageservers) + 1)) - offline_node_ids + + for node_id in offline_node_ids: + env.get_pageserver(node_id).allowed_errors.append( + # In the case of the failpoint failure, the impacted pageserver + # still believes it has the tenant attached since location + # config calls into it will fail due to being marked offline. + ".*Dropped remote consistent LSN updates.*", + ) + + if len(offline_node_ids) > 1: + env.get_pageserver(node_id).allowed_errors.append( + ".*Scheduling error when marking pageserver.*offline.*", + ) failure.apply(env) # ... expecting the heartbeats to mark it offline - def node_offline(): + def nodes_offline(): nodes = env.storage_controller.node_list() log.info(f"{nodes=}") - target = next(n for n in nodes if n["id"] == offline_node_id) - assert target["availability"] == "Offline" + for node in nodes: + if node["id"] in offline_node_ids: + assert node["availability"] == "Offline" # A node is considered offline if the last successful heartbeat # was more than 10 seconds ago (hardcoded in the storage controller). - wait_until(20, 1, node_offline) + wait_until(20, 1, nodes_offline) # .. expecting the tenant on the offline node to be migrated def tenant_migrated(): + if len(online_node_ids) == 0: + time.sleep(5) + return + node_to_tenants = build_node_to_tenants_map(env) log.info(f"{node_to_tenants=}") - assert set(node_to_tenants[online_node_id]) == set(tenant_ids) + + observed_tenants = set() + for node_id in online_node_ids: + observed_tenants |= set(node_to_tenants[node_id]) + + assert observed_tenants == set(tenant_ids) wait_until(10, 1, tenant_migrated) @@ -1073,31 +1102,24 @@ def test_storage_controller_heartbeats( failure.clear(env) # ... expecting the offline node to become active again - def node_online(): + def nodes_online(): nodes = env.storage_controller.node_list() - target = next(n for n in nodes if n["id"] == offline_node_id) - assert target["availability"] == "Active" + for node in nodes: + if node["id"] in online_node_ids: + assert node["availability"] == "Active" - wait_until(10, 1, node_online) + wait_until(10, 1, nodes_online) time.sleep(5) - # ... then we create a new tenant - tid = TenantId.generate() - env.storage_controller.tenant_create(tid) - - # ... expecting it to be placed on the node that just came back online - tenants = env.storage_controller.tenant_list() - newest_tenant = next(t for t in tenants if t["tenant_shard_id"] == str(tid)) - locations = list(newest_tenant["observed"]["locations"].keys()) - locations = [int(node_id) for node_id in locations] - assert locations == [offline_node_id] + node_to_tenants = build_node_to_tenants_map(env) + log.info(f"Back online: {node_to_tenants=}") # ... expecting the storage controller to reach a consistent state def storage_controller_consistent(): env.storage_controller.consistency_check() - wait_until(10, 1, storage_controller_consistent) + wait_until(30, 1, storage_controller_consistent) def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder):