From 9c48b5c4ab5321ba45048c42b21c6eba70d519ce Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 22 Feb 2024 14:01:06 +0000 Subject: [PATCH] controller: improved handling of offline nodes (#6846) Stacks on https://github.com/neondatabase/neon/pull/6823 - Pending a heartbeating mechanism (#6844 ), use /re-attach calls as a cue to mark an offline node as active, so that a node which is unavailable during controller startup doesn't require manual intervention if it later starts/restarts. - Tweak scheduling logic so that when we schedule the attached location for a tenant, we prefer to select from secondary locations rather than picking a fresh one. This is an interim state until we implement #6844 and full chaos testing for handling failures. --- control_plane/attachment_service/src/http.rs | 9 +- .../attachment_service/src/scheduler.rs | 90 +++++++---- .../attachment_service/src/service.rs | 10 +- .../attachment_service/src/tenant_state.rs | 148 ++++++++++++++++-- control_plane/src/pageserver.rs | 36 +++-- test_runner/regress/test_sharding_service.py | 9 +- 6 files changed, 230 insertions(+), 72 deletions(-) diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index d85753bedc..15ae2a26b4 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -66,14 +66,7 @@ fn get_state(request: &Request) -> &HttpState { async fn handle_re_attach(mut req: Request) -> Result, ApiError> { let reattach_req = json_request::(&mut req).await?; let state = get_state(&req); - json_response( - StatusCode::OK, - state - .service - .re_attach(reattach_req) - .await - .map_err(ApiError::InternalServerError)?, - ) + json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?) } /// Pageserver calls into this before doing deletions, to confirm that it still diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs index fb3c7f634c..7059071bee 100644 --- a/control_plane/attachment_service/src/scheduler.rs +++ b/control_plane/attachment_service/src/scheduler.rs @@ -175,6 +175,33 @@ impl Scheduler { } } + /// Where we have several nodes to choose from, for example when picking a secondary location + /// to promote to an attached location, this method may be used to pick the best choice based + /// on the scheduler's knowledge of utilization and availability. + /// + /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the + /// caller can pick a node some other way. + pub(crate) fn node_preferred(&self, nodes: &[NodeId]) -> Option { + if nodes.is_empty() { + return None; + } + + let node = nodes + .iter() + .map(|node_id| { + let may_schedule = self + .nodes + .get(node_id) + .map(|n| n.may_schedule) + .unwrap_or(false); + (*node_id, may_schedule) + }) + .max_by_key(|(_n, may_schedule)| *may_schedule); + + // If even the preferred node has may_schedule==false, return None + node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None }) + } + pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result { if self.nodes.is_empty() { return Err(ScheduleError::NoPageservers); @@ -224,44 +251,45 @@ impl Scheduler { } } +#[cfg(test)] +pub(crate) mod test_utils { + + use crate::node::Node; + use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy}; + use std::collections::HashMap; + use utils::id::NodeId; + /// Test helper: synthesize the requested number of nodes, all in active state. + /// + /// Node IDs start at one. + pub(crate) fn make_test_nodes(n: u64) -> HashMap { + (1..n + 1) + .map(|i| { + ( + NodeId(i), + Node { + id: NodeId(i), + availability: NodeAvailability::Active, + scheduling: NodeSchedulingPolicy::Active, + listen_http_addr: format!("httphost-{i}"), + listen_http_port: 80 + i as u16, + listen_pg_addr: format!("pghost-{i}"), + listen_pg_port: 5432 + i as u16, + }, + ) + }) + .collect() + } +} + #[cfg(test)] mod tests { use super::*; - use std::collections::HashMap; - - use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy}; use utils::id::NodeId; - use crate::{node::Node, tenant_state::IntentState}; - + use crate::tenant_state::IntentState; #[test] fn scheduler_basic() -> anyhow::Result<()> { - let mut nodes = HashMap::new(); - nodes.insert( - NodeId(1), - Node { - id: NodeId(1), - availability: NodeAvailability::Active, - scheduling: NodeSchedulingPolicy::Active, - listen_http_addr: String::new(), - listen_http_port: 0, - listen_pg_addr: String::new(), - listen_pg_port: 0, - }, - ); - - nodes.insert( - NodeId(2), - Node { - id: NodeId(2), - availability: NodeAvailability::Active, - scheduling: NodeSchedulingPolicy::Active, - listen_http_addr: String::new(), - listen_http_port: 0, - listen_pg_addr: String::new(), - listen_pg_port: 0, - }, - ); + let nodes = test_utils::make_test_nodes(2); let mut scheduler = Scheduler::new(nodes.values()); let mut t1_intent = IntentState::new(); diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 6366348017..0b9a7d8a69 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -914,7 +914,15 @@ impl Service { pub(crate) async fn re_attach( &self, reattach_req: ReAttachRequest, - ) -> anyhow::Result { + ) -> Result { + // Take a re-attach as indication that the node is available: this is a precursor to proper + // heartbeating in https://github.com/neondatabase/neon/issues/6844 + self.node_configure(NodeConfigureRequest { + node_id: reattach_req.node_id, + availability: Some(NodeAvailability::Active), + scheduling: None, + })?; + // Ordering: we must persist generation number updates before making them visible in the in-memory state let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?; diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index 7970207e27..3cfffc6c45 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -143,6 +143,23 @@ impl IntentState { } } + /// Like set_attached, but the node is from [`Self::secondary`]. This swaps the node from + /// secondary to attached while maintaining the scheduler's reference counts. + pub(crate) fn promote_attached( + &mut self, + _scheduler: &mut Scheduler, + promote_secondary: NodeId, + ) { + // If we call this with a node that isn't in secondary, it would cause incorrect + // scheduler reference counting, since we assume the node is already referenced as a secondary. + debug_assert!(self.secondary.contains(&promote_secondary)); + + // TODO: when scheduler starts tracking attached + secondary counts separately, we will + // need to call into it here. + self.secondary.retain(|n| n != &promote_secondary); + self.attached = Some(promote_secondary); + } + pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) { debug_assert!(!self.secondary.contains(&new_secondary)); scheduler.node_inc_ref(new_secondary); @@ -197,6 +214,8 @@ impl IntentState { /// Returns true if a change was made pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool { if self.attached == Some(node_id) { + // TODO: when scheduler starts tracking attached + secondary counts separately, we will + // need to call into it here. self.attached = None; self.secondary.push(node_id); true @@ -370,6 +389,9 @@ impl TenantState { // All remaining observed locations generate secondary intents. This includes None // observations, as these may well have some local content on disk that is usable (this // is an edge case that might occur if we restarted during a migration or other change) + // + // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`] + // will take care of promoting one of these secondaries to be attached. self.observed.locations.keys().for_each(|node_id| { if Some(*node_id) != self.intent.attached { self.intent.secondary.push(*node_id); @@ -377,6 +399,33 @@ impl TenantState { }); } + /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the + /// attached pageserver for a shard. + /// + /// Returns whether we modified it, and the NodeId selected. + fn schedule_attached( + &mut self, + scheduler: &mut Scheduler, + ) -> Result<(bool, NodeId), ScheduleError> { + // No work to do if we already have an attached tenant + if let Some(node_id) = self.intent.attached { + return Ok((false, node_id)); + } + + if let Some(promote_secondary) = scheduler.node_preferred(&self.intent.secondary) { + // Promote a secondary + tracing::debug!("Promoted secondary {} to attached", promote_secondary); + self.intent.promote_attached(scheduler, promote_secondary); + Ok((true, promote_secondary)) + } else { + // Pick a fresh node: either we had no secondaries or none were schedulable + let node_id = scheduler.schedule_shard(&self.intent.secondary)?; + tracing::debug!("Selected {} as attached", node_id); + self.intent.set_attached(scheduler, Some(node_id)); + Ok((true, node_id)) + } + } + pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> { // TODO: before scheduling new nodes, check if any existing content in // self.intent refers to pageservers that are offline, and pick other @@ -387,19 +436,15 @@ impl TenantState { // Build the set of pageservers already in use by this tenant, to avoid scheduling // more work on the same pageservers we're already using. - let mut used_pageservers = self.intent.all_pageservers(); let mut modified = false; use PlacementPolicy::*; match self.policy { Single => { // Should have exactly one attached, and zero secondaries - if self.intent.attached.is_none() { - let node_id = scheduler.schedule_shard(&used_pageservers)?; - self.intent.set_attached(scheduler, Some(node_id)); - used_pageservers.push(node_id); - modified = true; - } + let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?; + modified |= modified_attached; + if !self.intent.secondary.is_empty() { self.intent.clear_secondary(scheduler); modified = true; @@ -407,13 +452,10 @@ impl TenantState { } Double(secondary_count) => { // Should have exactly one attached, and N secondaries - if self.intent.attached.is_none() { - let node_id = scheduler.schedule_shard(&used_pageservers)?; - self.intent.set_attached(scheduler, Some(node_id)); - used_pageservers.push(node_id); - modified = true; - } + let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?; + modified |= modified_attached; + let mut used_pageservers = vec![attached_node_id]; while self.intent.secondary.len() < secondary_count { let node_id = scheduler.schedule_shard(&used_pageservers)?; self.intent.push_secondary(scheduler, node_id); @@ -702,3 +744,83 @@ impl TenantState { } } } + +#[cfg(test)] +pub(crate) mod tests { + use pageserver_api::shard::{ShardCount, ShardNumber}; + use utils::id::TenantId; + + use crate::scheduler::test_utils::make_test_nodes; + + use super::*; + + fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantState { + let tenant_id = TenantId::generate(); + let shard_number = ShardNumber(0); + let shard_count = ShardCount::new(1); + + let tenant_shard_id = TenantShardId { + tenant_id, + shard_number, + shard_count, + }; + TenantState::new( + tenant_shard_id, + ShardIdentity::new( + shard_number, + shard_count, + pageserver_api::shard::ShardStripeSize(32768), + ) + .unwrap(), + policy, + ) + } + + /// Test the scheduling behaviors used when a tenant configured for HA is subject + /// to nodes being marked offline. + #[test] + fn tenant_ha_scheduling() -> anyhow::Result<()> { + // Start with three nodes. Our tenant will only use two. The third one is + // expected to remain unused. + let mut nodes = make_test_nodes(3); + + let mut scheduler = Scheduler::new(nodes.values()); + + let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1)); + tenant_state + .schedule(&mut scheduler) + .expect("we have enough nodes, scheduling should work"); + + // Expect to initially be schedule on to different nodes + assert_eq!(tenant_state.intent.secondary.len(), 1); + assert!(tenant_state.intent.attached.is_some()); + + let attached_node_id = tenant_state.intent.attached.unwrap(); + let secondary_node_id = *tenant_state.intent.secondary.iter().last().unwrap(); + assert_ne!(attached_node_id, secondary_node_id); + + // Notifying the attached node is offline should demote it to a secondary + let changed = tenant_state.intent.notify_offline(attached_node_id); + assert!(changed); + + // Update the scheduler state to indicate the node is offline + nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline; + scheduler.node_upsert(nodes.get(&attached_node_id).unwrap()); + + // Scheduling the node should promote the still-available secondary node to attached + tenant_state + .schedule(&mut scheduler) + .expect("active nodes are available"); + assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id); + + // The original attached node should have been retained as a secondary + assert_eq!( + *tenant_state.intent.secondary.iter().last().unwrap(), + attached_node_id + ); + + tenant_state.intent.clear(&mut scheduler); + + Ok(()) + } +} diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 8dd86bad96..5909477586 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -210,6 +210,25 @@ impl PageServerNode { update_config: bool, register: bool, ) -> anyhow::Result<()> { + // Register the node with the storage controller before starting pageserver: pageserver must be registered to + // successfully call /re-attach and finish starting up. + if register { + let attachment_service = AttachmentService::from_env(&self.env); + let (pg_host, pg_port) = + parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); + let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr) + .expect("Unable to parse listen_http_addr"); + attachment_service + .node_register(NodeRegisterRequest { + node_id: self.conf.id, + listen_pg_addr: pg_host.to_string(), + listen_pg_port: pg_port.unwrap_or(5432), + listen_http_addr: http_host.to_string(), + listen_http_port: http_port.unwrap_or(80), + }) + .await?; + } + // TODO: using a thread here because start_process() is not async but we need to call check_status() let datadir = self.repo_path(); print!( @@ -248,23 +267,6 @@ impl PageServerNode { ) .await?; - if register { - let attachment_service = AttachmentService::from_env(&self.env); - let (pg_host, pg_port) = - parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); - let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr) - .expect("Unable to parse listen_http_addr"); - attachment_service - .node_register(NodeRegisterRequest { - node_id: self.conf.id, - listen_pg_addr: pg_host.to_string(), - listen_pg_port: pg_port.unwrap_or(5432), - listen_http_addr: http_host.to_string(), - listen_http_port: http_port.unwrap_or(80), - }) - .await?; - } - Ok(()) } diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index e62d239d77..00c3a1628e 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -272,8 +272,13 @@ def test_sharding_service_onboarding( env.broker.try_start() env.attachment_service.start() - # This is the pageserver where we'll initially create the tenant - env.pageservers[0].start(register=False) + # This is the pageserver where we'll initially create the tenant. Run it in emergency + # mode so that it doesn't talk to storage controller, and do not register it. + env.pageservers[0].allowed_errors.append(".*Emergency mode!.*") + env.pageservers[0].start( + overrides=("--pageserver-config-override=control_plane_emergency_mode=true",), + register=False, + ) origin_ps = env.pageservers[0] # This is the pageserver managed by the sharding service, where the tenant