From 8270b58f3925340a2a32b008d559902aa0457de5 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 14 Jun 2024 11:28:11 +0100 Subject: [PATCH 1/4] storcon: handle reattach and heartbeat race Consider the case when the storage controller handles the re-attach of a node before the heartbeats detect that the node is back online. We still need to reconfigure the node (by calling `Service::node_configure`) to migrate attachments back onto the node. In order to determine if node reconfiguration is required, we call into `Node::get_availability_transition`. This commit updates the function to consider the transition from "node just re-attached" (with no utilisation score) to "node responded to the first heartbeat after a period of unavailablity" (with some utilisation score). --- storage_controller/src/node.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index 7b5513c908..f7a034bc8b 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration}; use pageserver_api::{ controller_api::{ NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, - TenantLocateResponseShard, + TenantLocateResponseShard, UtilizationScore, }, shard::TenantShardId, }; @@ -116,6 +116,15 @@ impl Node { match (self.availability, availability) { (Offline, Active(_)) => ToActive, (Active(_), Offline) => ToOffline, + // Consider the case when the storage controller handles the re-attach of a node + // before the heartbeats detect that the node is back online. We still need + // [`Service::node_configure`] to migrate attachments back onto the node. + // The unsavoury match arm below handles this situation. + (Active(lhs), Active(rhs)) + if lhs == UtilizationScore::worst() && rhs < UtilizationScore::worst() => + { + ToActive + } _ => Unchanged, } } From 677c1662a42ce2a73bcb7f76879d04d9a8548341 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 14 Jun 2024 11:32:04 +0100 Subject: [PATCH 2/4] storcon: do not detach tenants when all nodes are unvailable Previously, when all nodes in the cluster became unavailable at the same time, we would detach all tenant shards. This is due to a bug in `Service::node_configure`. If all nodes are unavailable, there's no chance of reschedulling anything, so we should leave the intent states untouced. This commit adds a special case which detects this situation and skips any reschedullings. --- storage_controller/src/service.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 1e81b5c5a2..ff9c1434f3 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4312,6 +4312,13 @@ impl Service { continue; } + if !new_nodes.values().any(Node::is_available) { + // Special case for when all nodes are unavailable: there is no point + // trying to reschedule since there's nowhere else to go. Without this + // branch we incorrectly detach tenants in response to node unavailability. + continue; + } + if tenant_shard.intent.demote_attached(scheduler, node_id) { tenant_shard.sequence = tenant_shard.sequence.next(); From d3887504a01c29a6cd80add2486431369ab27272 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 14 Jun 2024 11:34:41 +0100 Subject: [PATCH 3/4] tests: test heartbeats when the entire cluster goes for lunch --- .../regress/test_storage_controller.py | 94 ++++++++++++------- 1 file changed, 58 insertions(+), 36 deletions(-) diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 2031feaa83..d6ee3384e0 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -931,19 +931,27 @@ class Failure: def clear(self, env: NeonEnv): raise NotImplementedError() + def nodes(self): + raise NotImplementedError() + class NodeStop(Failure): - def __init__(self, pageserver_id, immediate): - self.pageserver_id = pageserver_id + def __init__(self, pageserver_ids, immediate): + self.pageserver_ids = pageserver_ids self.immediate = immediate def apply(self, env: NeonEnv): - pageserver = env.get_pageserver(self.pageserver_id) - pageserver.stop(immediate=self.immediate) + for ps_id in self.pageserver_ids: + pageserver = env.get_pageserver(ps_id) + pageserver.stop(immediate=self.immediate) def clear(self, env: NeonEnv): - pageserver = env.get_pageserver(self.pageserver_id) - pageserver.start() + for ps_id in self.pageserver_ids: + pageserver = env.get_pageserver(ps_id) + pageserver.start() + + def nodes(self): + return self.pageserver_ids class PageserverFailpoint(Failure): @@ -959,6 +967,9 @@ class PageserverFailpoint(Failure): pageserver = env.get_pageserver(self.pageserver_id) pageserver.http_client().configure_failpoints((self.failpoint, "off")) + def nodes(self): + return [self.pageserver_id] + def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]: tenants = env.storage_controller.tenant_list() @@ -982,8 +993,9 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]: @pytest.mark.parametrize( "failure", [ - NodeStop(pageserver_id=1, immediate=False), - NodeStop(pageserver_id=1, immediate=True), + NodeStop(pageserver_ids=[1], immediate=False), + NodeStop(pageserver_ids=[1], immediate=True), + NodeStop(pageserver_ids=[1, 2], immediate=True), PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"), ], ) @@ -1036,33 +1048,50 @@ def test_storage_controller_heartbeats( wait_until(10, 1, tenants_placed) # ... then we apply the failure - offline_node_id = failure.pageserver_id - online_node_id = (set(range(1, len(env.pageservers) + 1)) - {offline_node_id}).pop() - env.get_pageserver(offline_node_id).allowed_errors.append( - # In the case of the failpoint failure, the impacted pageserver - # still believes it has the tenant attached since location - # config calls into it will fail due to being marked offline. - ".*Dropped remote consistent LSN updates.*", - ) + offline_node_ids = set(failure.nodes()) + online_node_ids = set(range(1, len(env.pageservers) + 1)) - offline_node_ids + + for node_id in offline_node_ids: + env.get_pageserver(node_id).allowed_errors.append( + # In the case of the failpoint failure, the impacted pageserver + # still believes it has the tenant attached since location + # config calls into it will fail due to being marked offline. + ".*Dropped remote consistent LSN updates.*", + ) + + if len(offline_node_ids) > 1: + env.get_pageserver(node_id).allowed_errors.append( + ".*Scheduling error when marking pageserver.*offline.*", + ) failure.apply(env) # ... expecting the heartbeats to mark it offline - def node_offline(): + def nodes_offline(): nodes = env.storage_controller.node_list() log.info(f"{nodes=}") - target = next(n for n in nodes if n["id"] == offline_node_id) - assert target["availability"] == "Offline" + for node in nodes: + if node["id"] in offline_node_ids: + assert node["availability"] == "Offline" # A node is considered offline if the last successful heartbeat # was more than 10 seconds ago (hardcoded in the storage controller). - wait_until(20, 1, node_offline) + wait_until(20, 1, nodes_offline) # .. expecting the tenant on the offline node to be migrated def tenant_migrated(): + if len(online_node_ids) == 0: + time.sleep(5) + return + node_to_tenants = build_node_to_tenants_map(env) log.info(f"{node_to_tenants=}") - assert set(node_to_tenants[online_node_id]) == set(tenant_ids) + + observed_tenants = set() + for node_id in online_node_ids: + observed_tenants |= set(node_to_tenants[node_id]) + + assert observed_tenants == set(tenant_ids) wait_until(10, 1, tenant_migrated) @@ -1070,31 +1099,24 @@ def test_storage_controller_heartbeats( failure.clear(env) # ... expecting the offline node to become active again - def node_online(): + def nodes_online(): nodes = env.storage_controller.node_list() - target = next(n for n in nodes if n["id"] == offline_node_id) - assert target["availability"] == "Active" + for node in nodes: + if node["id"] in online_node_ids: + assert node["availability"] == "Active" - wait_until(10, 1, node_online) + wait_until(10, 1, nodes_online) time.sleep(5) - # ... then we create a new tenant - tid = TenantId.generate() - env.storage_controller.tenant_create(tid) - - # ... expecting it to be placed on the node that just came back online - tenants = env.storage_controller.tenant_list() - newest_tenant = next(t for t in tenants if t["tenant_shard_id"] == str(tid)) - locations = list(newest_tenant["observed"]["locations"].keys()) - locations = [int(node_id) for node_id in locations] - assert locations == [offline_node_id] + node_to_tenants = build_node_to_tenants_map(env) + log.info(f"Back online: {node_to_tenants=}") # ... expecting the storage controller to reach a consistent state def storage_controller_consistent(): env.storage_controller.consistency_check() - wait_until(10, 1, storage_controller_consistent) + wait_until(30, 1, storage_controller_consistent) def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder): From a0fa1b928dc05fe5bdc07cb0bc1ea0245454e76d Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 14 Jun 2024 15:21:34 +0100 Subject: [PATCH 4/4] storcon: refine heartbeat handling of new addded nodes --- storage_controller/src/heartbeater.rs | 4 ++ storage_controller/src/service.rs | 76 ++++++++++++++++++++------- 2 files changed, 61 insertions(+), 19 deletions(-) diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index 1ef97e78eb..14cda0a289 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -31,6 +31,7 @@ pub(crate) enum PageserverState { Available { last_seen_at: Instant, utilization: PageserverUtilization, + new: bool, }, Offline, } @@ -127,6 +128,7 @@ impl HeartbeaterTask { heartbeat_futs.push({ let jwt_token = self.jwt_token.clone(); let cancel = self.cancel.clone(); + let new_node = !self.state.contains_key(node_id); // Clone the node and mark it as available such that the request // goes through to the pageserver even when the node is marked offline. @@ -159,6 +161,7 @@ impl HeartbeaterTask { PageserverState::Available { last_seen_at: Instant::now(), utilization, + new: new_node, } } else { PageserverState::Offline @@ -220,6 +223,7 @@ impl HeartbeaterTask { } }, Vacant(_) => { + // This is a new node. Don't generate a delta for it. deltas.push((node_id, ps_state.clone())); } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ff9c1434f3..07c8065b5c 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -747,29 +747,61 @@ impl Service { let res = self.heartbeater.heartbeat(nodes).await; if let Ok(deltas) = res { for (node_id, state) in deltas.0 { - let new_availability = match state { - PageserverState::Available { utilization, .. } => NodeAvailability::Active( - UtilizationScore(utilization.utilization_score), + let (new_node, new_availability) = match state { + PageserverState::Available { + utilization, new, .. + } => ( + new, + NodeAvailability::Active(UtilizationScore( + utilization.utilization_score, + )), ), - PageserverState::Offline => NodeAvailability::Offline, + PageserverState::Offline => (false, NodeAvailability::Offline), }; - let res = self - .node_configure(node_id, Some(new_availability), None) - .await; - match res { - Ok(()) => {} - Err(ApiError::NotFound(_)) => { - // This should be rare, but legitimate since the heartbeats are done - // on a snapshot of the nodes. - tracing::info!("Node {} was not found after heartbeat round", node_id); + if new_node { + // When the heartbeats detect a newly added node, we don't wish + // to attempt to reconcile the shards assigned to it. The node + // is likely handling it's re-attach response, so reconciling now + // would be counterproductive. + // + // Instead, update the in-memory state with the details learned about the + // node. + let mut locked = self.inner.write().unwrap(); + let (nodes, _tenants, scheduler) = locked.parts_mut(); + + let mut new_nodes = (**nodes).clone(); + + if let Some(node) = new_nodes.get_mut(&node_id) { + node.set_availability(new_availability); + scheduler.node_upsert(node); } - Err(err) => { - tracing::error!( - "Failed to update node {} after heartbeat round: {}", - node_id, - err - ); + + locked.nodes = Arc::new(new_nodes); + } else { + // This is the code path for geniune availability transitions (i.e node + // goes unavailable and/or comes back online). + let res = self + .node_configure(node_id, Some(new_availability), None) + .await; + + match res { + Ok(()) => {} + Err(ApiError::NotFound(_)) => { + // This should be rare, but legitimate since the heartbeats are done + // on a snapshot of the nodes. + tracing::info!( + "Node {} was not found after heartbeat round", + node_id + ); + } + Err(err) => { + tracing::error!( + "Failed to update node {} after heartbeat round: {}", + node_id, + err + ); + } } } } @@ -4356,6 +4388,12 @@ impl Service { // When a node comes back online, we must reconcile any tenant that has a None observed // location on the node. for tenant_shard in locked.tenants.values_mut() { + // If a reconciliation is already in progress, rely on the previous scheduling + // decision and skip triggering a new reconciliation. + if tenant_shard.reconciler.is_some() { + continue; + } + if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) { if observed_loc.conf.is_none() { self.maybe_reconcile_shard(tenant_shard, &new_nodes);