From 5778d714f0a4a6f009d9b88bd30a6e35127a6410 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 19 Jun 2024 11:55:30 +0100 Subject: [PATCH] storcon: add drain and fill background operations for graceful cluster restarts (#8014) ## Problem Pageserver restarts cause read availablity downtime for tenants. See `Motivation` section in the [RFC](https://github.com/neondatabase/neon/pull/7704). ## Summary of changes * Introduce a new `NodeSchedulingPolicy`: `PauseForRestart` * Implement the first take of drain and fill algorithms * Add a node status endpoint which can be polled to figure out when an operation is done The implementation follows the RFC, so it might be useful to peek at it as you're reviewing. Since the PR is rather chunky, I've made sure all commits build (with warnings), so you can review by commit if you prefer that. RFC: https://github.com/neondatabase/neon/pull/7704 Related https://github.com/neondatabase/neon/issues/7387 --- Cargo.lock | 1 + libs/pageserver_api/src/controller_api.rs | 3 + storage_controller/Cargo.toml | 1 + .../src/background_node_operations.rs | 59 ++ storage_controller/src/http.rs | 43 ++ storage_controller/src/lib.rs | 1 + storage_controller/src/node.rs | 7 +- storage_controller/src/persistence.rs | 22 +- storage_controller/src/scheduler.rs | 39 ++ storage_controller/src/service.rs | 571 +++++++++++++++++- storage_controller/src/tenant_shard.rs | 36 +- test_runner/fixtures/neon_fixtures.py | 24 + .../regress/test_storage_controller.py | 119 +++- 13 files changed, 905 insertions(+), 21 deletions(-) create mode 100644 storage_controller/src/background_node_operations.rs diff --git a/Cargo.lock b/Cargo.lock index 5eac648fd9..cf8a0b3286 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5754,6 +5754,7 @@ dependencies = [ "r2d2", "reqwest 0.12.4", "routerify", + "scopeguard", "serde", "serde_json", "strum", diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 1278f17ad2..a0d10dc665 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -209,6 +209,7 @@ pub enum NodeSchedulingPolicy { Active, Filling, Pause, + PauseForRestart, Draining, } @@ -220,6 +221,7 @@ impl FromStr for NodeSchedulingPolicy { "active" => Ok(Self::Active), "filling" => Ok(Self::Filling), "pause" => Ok(Self::Pause), + "pause_for_restart" => Ok(Self::PauseForRestart), "draining" => Ok(Self::Draining), _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")), } @@ -233,6 +235,7 @@ impl From for String { Active => "active", Filling => "filling", Pause => "pause", + PauseForRestart => "pause_for_restart", Draining => "draining", } .to_string() diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 194619a496..b54dea5d47 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -40,6 +40,7 @@ tokio.workspace = true tokio-util.workspace = true tracing.workspace = true measured.workspace = true +scopeguard.workspace = true strum.workspace = true strum_macros.workspace = true diff --git a/storage_controller/src/background_node_operations.rs b/storage_controller/src/background_node_operations.rs new file mode 100644 index 0000000000..74b7e7c849 --- /dev/null +++ b/storage_controller/src/background_node_operations.rs @@ -0,0 +1,59 @@ +use std::{borrow::Cow, fmt::Debug, fmt::Display}; + +use tokio_util::sync::CancellationToken; +use utils::id::NodeId; + +pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10; + +#[derive(Copy, Clone)] +pub(crate) struct Drain { + pub(crate) node_id: NodeId, +} + +#[derive(Copy, Clone)] +pub(crate) struct Fill { + pub(crate) node_id: NodeId, +} + +#[derive(Copy, Clone)] +pub(crate) enum Operation { + Drain(Drain), + Fill(Fill), +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum OperationError { + #[error("Node state changed during operation: {0}")] + NodeStateChanged(Cow<'static, str>), + #[error("Operation finalize error: {0}")] + FinalizeError(Cow<'static, str>), + #[error("Operation cancelled")] + Cancelled, +} + +pub(crate) struct OperationHandler { + pub(crate) operation: Operation, + #[allow(unused)] + pub(crate) cancel: CancellationToken, +} + +impl Display for Drain { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "drain {}", self.node_id) + } +} + +impl Display for Fill { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "fill {}", self.node_id) + } +} + +impl Display for Operation { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Operation::Drain(op) => write!(f, "{op}"), + Operation::Fill(op) => write!(f, "{op}"), + } + } +} diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index bbb6d2cb32..3e9951fb9e 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -480,6 +480,39 @@ async fn handle_node_configure(mut req: Request) -> Result, ) } +async fn handle_node_status(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + + let node_status = state.service.get_node(node_id).await?; + + json_response(StatusCode::OK, node_status) +} + +async fn handle_node_drain(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + + state.service.start_node_drain(node_id).await?; + + json_response(StatusCode::ACCEPTED, ()) +} + +async fn handle_node_fill(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + + state.service.start_node_fill(node_id).await?; + + json_response(StatusCode::ACCEPTED, ()) +} + async fn handle_tenant_shard_split( service: Arc, mut req: Request, @@ -832,6 +865,16 @@ pub fn make_router( RequestName("control_v1_node_config"), ) }) + .get("/control/v1/node/:node_id", |r| { + named_request_span(r, handle_node_status, RequestName("control_v1_node_status")) + }) + .put("/control/v1/node/:node_id/drain", |r| { + named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain")) + }) + .put("/control/v1/node/:node_id/fill", |r| { + named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill")) + }) + // TODO(vlad): endpoint for cancelling drain and fill // Tenant Shard operations .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| { tenant_service_handler( diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index 2ea490a14b..8caf638904 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -2,6 +2,7 @@ use serde::Serialize; use utils::seqwait::MonotonicCounter; mod auth; +mod background_node_operations; mod compute_hook; mod heartbeater; pub mod http; diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index 34dcf0c642..4d17dff9fe 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -59,6 +59,10 @@ impl Node { self.id } + pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy { + self.scheduling + } + pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) { self.scheduling = scheduling } @@ -151,6 +155,7 @@ impl Node { NodeSchedulingPolicy::Draining => MaySchedule::No, NodeSchedulingPolicy::Filling => MaySchedule::Yes(score), NodeSchedulingPolicy::Pause => MaySchedule::No, + NodeSchedulingPolicy::PauseForRestart => MaySchedule::No, } } @@ -167,7 +172,7 @@ impl Node { listen_http_port, listen_pg_addr, listen_pg_port, - scheduling: NodeSchedulingPolicy::Filling, + scheduling: NodeSchedulingPolicy::Active, availability: NodeAvailability::Offline, cancel: CancellationToken::new(), } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 67c05296d5..47caf7ae81 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -442,13 +442,15 @@ impl Persistence { #[tracing::instrument(skip_all, fields(node_id))] pub(crate) async fn re_attach( &self, - node_id: NodeId, + input_node_id: NodeId, ) -> DatabaseResult> { + use crate::schema::nodes::dsl::scheduling_policy; + use crate::schema::nodes::dsl::*; use crate::schema::tenant_shards::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::ReAttach, move |conn| { let rows_updated = diesel::update(tenant_shards) - .filter(generation_pageserver.eq(node_id.0 as i64)) + .filter(generation_pageserver.eq(input_node_id.0 as i64)) .set(generation.eq(generation + 1)) .execute(conn)?; @@ -457,9 +459,23 @@ impl Persistence { // TODO: UPDATE+SELECT in one query let updated = tenant_shards - .filter(generation_pageserver.eq(node_id.0 as i64)) + .filter(generation_pageserver.eq(input_node_id.0 as i64)) .select(TenantShardPersistence::as_select()) .load(conn)?; + + // If the node went through a drain and restart phase before re-attaching, + // then reset it's node scheduling policy to active. + diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .filter( + scheduling_policy + .eq(String::from(NodeSchedulingPolicy::PauseForRestart)) + .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining))) + .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))), + ) + .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) + .execute(conn)?; + Ok(updated) }) .await?; diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 4ab85509dc..0bd2eeac35 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -1,4 +1,5 @@ use crate::{node::Node, tenant_shard::TenantShard}; +use itertools::Itertools; use pageserver_api::controller_api::UtilizationScore; use serde::Serialize; use std::collections::HashMap; @@ -283,6 +284,44 @@ impl Scheduler { } } + // Check if the number of shards attached to a given node is lagging below + // the cluster average. If that's the case, the node should be filled. + pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize { + let Some(node) = self.nodes.get(&node_id) else { + debug_assert!(false); + tracing::error!("Scheduler missing node {node_id}"); + return 0; + }; + assert!(!self.nodes.is_empty()); + let expected_attached_shards_per_node = self.expected_attached_shard_count(); + + for (node_id, node) in self.nodes.iter() { + tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node); + } + + if node.attached_shard_count < expected_attached_shards_per_node { + expected_attached_shards_per_node - node.attached_shard_count + } else { + 0 + } + } + + pub(crate) fn expected_attached_shard_count(&self) -> usize { + let total_attached_shards: usize = + self.nodes.values().map(|n| n.attached_shard_count).sum(); + + assert!(!self.nodes.is_empty()); + total_attached_shards / self.nodes.len() + } + + pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> { + self.nodes + .iter() + .map(|(node_id, stats)| (*node_id, stats.attached_shard_count)) + .sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse()) + .collect() + } + pub(crate) fn node_upsert(&mut self, node: &Node) { use std::collections::hash_map::Entry::*; match self.nodes.entry(node.get_id()) { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 926332f946..c94af113db 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -8,13 +8,17 @@ use std::{ }; use crate::{ + background_node_operations::{ + Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION, + }, compute_hook::NotifyError, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard}, persistence::{AbortShardSplitStatus, TenantFilter}, reconciler::{ReconcileError, ReconcileUnits}, scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, tenant_shard::{ - MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction, + MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, + ScheduleOptimizationAction, }, }; use anyhow::Context; @@ -134,6 +138,11 @@ struct ServiceState { scheduler: Scheduler, + /// Ongoing background operation on the cluster if any is running. + /// Note that only one such operation may run at any given time, + /// hence the type choice. + ongoing_operation: Option, + /// Queue of tenants who are waiting for concurrency limits to permit them to reconcile delayed_reconcile_rx: tokio::sync::mpsc::Receiver, } @@ -185,6 +194,7 @@ impl ServiceState { tenants, nodes: Arc::new(nodes), scheduler, + ongoing_operation: None, delayed_reconcile_rx, } } @@ -296,6 +306,17 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(value: OperationError) -> Self { + match value { + OperationError::NodeStateChanged(err) | OperationError::FinalizeError(err) => { + ApiError::InternalServerError(anyhow::anyhow!(err)) + } + OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()), + } + } +} + #[allow(clippy::large_enum_variant)] enum TenantCreateOrUpdate { Create(TenantCreateRequest), @@ -1594,15 +1615,32 @@ impl Service { // Setting a node active unblocks any Reconcilers that might write to the location config API, // but those requests will not be accepted by the node until it has finished processing // the re-attach response. + // + // Additionally, reset the nodes scheduling policy to match the conditional update done + // in [`Persistence::re_attach`]. if let Some(node) = nodes.get(&reattach_req.node_id) { - if !node.is_available() { + let reset_scheduling = matches!( + node.get_scheduling(), + NodeSchedulingPolicy::PauseForRestart + | NodeSchedulingPolicy::Draining + | NodeSchedulingPolicy::Filling + ); + + if !node.is_available() || reset_scheduling { let mut new_nodes = (**nodes).clone(); if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) { - node.set_availability(NodeAvailability::Active(UtilizationScore::worst())); + if !node.is_available() { + node.set_availability(NodeAvailability::Active(UtilizationScore::worst())); + } + + if reset_scheduling { + node.set_scheduling(NodeSchedulingPolicy::Active); + } + scheduler.node_upsert(node); + let new_nodes = Arc::new(new_nodes); + *nodes = new_nodes; } - let new_nodes = Arc::new(new_nodes); - *nodes = new_nodes; } } @@ -1883,6 +1921,25 @@ impl Service { Ok(()) } + /// Same as [`Service::await_waiters`], but returns the waiters which are still + /// in progress + async fn await_waiters_remainder( + &self, + waiters: Vec, + timeout: Duration, + ) -> Vec { + let deadline = Instant::now().checked_add(timeout).unwrap(); + for waiter in waiters.iter() { + let timeout = deadline.duration_since(Instant::now()); + let _ = waiter.wait_timeout(timeout).await; + } + + waiters + .into_iter() + .filter(|waiter| matches!(waiter.get_status(), ReconcilerStatus::InProgress)) + .collect::>() + } + /// Part of [`Self::tenant_location_config`]: dissect an incoming location config request, /// and transform it into either a tenant creation of a series of shard updates. /// @@ -4164,6 +4221,18 @@ impl Service { Ok(nodes) } + pub(crate) async fn get_node(&self, node_id: NodeId) -> Result { + self.inner + .read() + .unwrap() + .nodes + .get(&node_id) + .cloned() + .ok_or(ApiError::NotFound( + format!("Node {node_id} not registered").into(), + )) + } + pub(crate) async fn node_register( &self, register_req: NodeRegisterRequest, @@ -4318,9 +4387,6 @@ impl Service { if let Some(scheduling) = scheduling { node.set_scheduling(scheduling); - - // TODO: once we have a background scheduling ticker for fill/drain, kick it - // to wake up and start working. } // Update the scheduler, in case the elegibility of the node for new shards has changed @@ -4411,7 +4477,7 @@ impl Service { // TODO: in the background, we should balance work back onto this pageserver } AvailabilityTransition::Unchanged => { - tracing::debug!("Node {} no change during config", node_id); + tracing::debug!("Node {} no availability change during config", node_id); } } @@ -4420,6 +4486,201 @@ impl Service { Ok(()) } + pub(crate) async fn start_node_drain( + self: &Arc, + node_id: NodeId, + ) -> Result<(), ApiError> { + let (ongoing_op, node_available, node_policy, schedulable_nodes_count) = { + let locked = self.inner.read().unwrap(); + let nodes = &locked.nodes; + let node = nodes.get(&node_id).ok_or(ApiError::NotFound( + anyhow::anyhow!("Node {} not registered", node_id).into(), + ))?; + let schedulable_nodes_count = nodes + .iter() + .filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_))) + .count(); + + ( + locked + .ongoing_operation + .as_ref() + .map(|ongoing| ongoing.operation), + node.is_available(), + node.get_scheduling(), + schedulable_nodes_count, + ) + }; + + if let Some(ongoing) = ongoing_op { + return Err(ApiError::PreconditionFailed( + format!("Background operation already ongoing for node: {}", ongoing).into(), + )); + } + + if !node_available { + return Err(ApiError::ResourceUnavailable( + format!("Node {node_id} is currently unavailable").into(), + )); + } + + if schedulable_nodes_count == 0 { + return Err(ApiError::PreconditionFailed( + "No other schedulable nodes to drain to".into(), + )); + } + + match node_policy { + NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => { + self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining)) + .await?; + + let cancel = CancellationToken::new(); + + self.inner.write().unwrap().ongoing_operation = Some(OperationHandler { + operation: Operation::Drain(Drain { node_id }), + cancel: cancel.clone(), + }); + + tokio::task::spawn({ + let service = self.clone(); + let cancel = cancel.clone(); + async move { + scopeguard::defer! { + let prev = service.inner.write().unwrap().ongoing_operation.take(); + + if let Some(Operation::Drain(removed_drain)) = prev.map(|h| h.operation) { + assert_eq!(removed_drain.node_id, node_id, "We always take the same operation"); + } else { + panic!("We always remove the same operation") + } + } + + tracing::info!(%node_id, "Drain background operation starting"); + let res = service.drain_node(node_id, cancel).await; + match res { + Ok(()) => { + tracing::info!(%node_id, "Drain background operation completed successfully"); + } + Err(OperationError::Cancelled) => { + tracing::info!(%node_id, "Drain background operation was cancelled"); + } + Err(err) => { + tracing::error!(%node_id, "Drain background operation encountered: {err}") + } + } + } + }); + } + NodeSchedulingPolicy::Draining => { + return Err(ApiError::Conflict(format!( + "Node {node_id} has drain in progress" + ))); + } + policy => { + return Err(ApiError::PreconditionFailed( + format!("Node {node_id} cannot be drained due to {policy:?} policy").into(), + )); + } + } + + Ok(()) + } + + pub(crate) async fn start_node_fill(self: &Arc, node_id: NodeId) -> Result<(), ApiError> { + let (ongoing_op, node_available, node_policy, total_nodes_count) = { + let locked = self.inner.read().unwrap(); + let nodes = &locked.nodes; + let node = nodes.get(&node_id).ok_or(ApiError::NotFound( + anyhow::anyhow!("Node {} not registered", node_id).into(), + ))?; + + ( + locked + .ongoing_operation + .as_ref() + .map(|ongoing| ongoing.operation), + node.is_available(), + node.get_scheduling(), + nodes.len(), + ) + }; + + if let Some(ongoing) = ongoing_op { + return Err(ApiError::PreconditionFailed( + format!("Background operation already ongoing for node: {}", ongoing).into(), + )); + } + + if !node_available { + return Err(ApiError::ResourceUnavailable( + format!("Node {node_id} is currently unavailable").into(), + )); + } + + if total_nodes_count <= 1 { + return Err(ApiError::PreconditionFailed( + "No other nodes to fill from".into(), + )); + } + + match node_policy { + NodeSchedulingPolicy::Active => { + self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling)) + .await?; + + let cancel = CancellationToken::new(); + + self.inner.write().unwrap().ongoing_operation = Some(OperationHandler { + operation: Operation::Fill(Fill { node_id }), + cancel: cancel.clone(), + }); + + tokio::task::spawn({ + let service = self.clone(); + let cancel = cancel.clone(); + async move { + scopeguard::defer! { + let prev = service.inner.write().unwrap().ongoing_operation.take(); + + if let Some(Operation::Fill(removed_fill)) = prev.map(|h| h.operation) { + assert_eq!(removed_fill.node_id, node_id, "We always take the same operation"); + } else { + panic!("We always remove the same operation") + } + } + + tracing::info!(%node_id, "Fill background operation starting"); + let res = service.fill_node(node_id, cancel).await; + match res { + Ok(()) => { + tracing::info!(%node_id, "Fill background operation completed successfully"); + } + Err(OperationError::Cancelled) => { + tracing::info!(%node_id, "Fill background operation was cancelled"); + } + Err(err) => { + tracing::error!(%node_id, "Fill background operation encountered: {err}") + } + } + } + }); + } + NodeSchedulingPolicy::Filling => { + return Err(ApiError::Conflict(format!( + "Node {node_id} has fill in progress" + ))); + } + policy => { + return Err(ApiError::PreconditionFailed( + format!("Node {node_id} cannot be filled due to {policy:?} policy").into(), + )); + } + } + + Ok(()) + } + /// Helper for methods that will try and call pageserver APIs for /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant /// is attached somewhere. @@ -5004,4 +5265,296 @@ impl Service { // to complete. self.gate.close().await; } + + /// Drain a node by moving the shards attached to it as primaries. + /// This is a long running operation and it should run as a separate Tokio task. + pub(crate) async fn drain_node( + &self, + node_id: NodeId, + cancel: CancellationToken, + ) -> Result<(), OperationError> { + let mut last_inspected_shard: Option = None; + let mut inspected_all_shards = false; + let mut waiters = Vec::new(); + let mut schedule_context = ScheduleContext::default(); + + while !inspected_all_shards { + if cancel.is_cancelled() { + return Err(OperationError::Cancelled); + } + + { + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + + let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged( + format!("node {node_id} was removed").into(), + ))?; + + let current_policy = node.get_scheduling(); + if !matches!(current_policy, NodeSchedulingPolicy::Draining) { + // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think + // about it + return Err(OperationError::NodeStateChanged( + format!("node {node_id} changed state to {current_policy:?}").into(), + )); + } + + let mut cursor = tenants.iter_mut().skip_while({ + let skip_past = last_inspected_shard; + move |(tid, _)| match skip_past { + Some(last) => **tid != last, + None => false, + } + }); + + while waiters.len() < MAX_RECONCILES_PER_OPERATION { + let (tid, tenant_shard) = match cursor.next() { + Some(some) => some, + None => { + inspected_all_shards = true; + break; + } + }; + + if tenant_shard.intent.demote_attached(scheduler, node_id) { + match tenant_shard.schedule(scheduler, &mut schedule_context) { + Err(e) => { + tracing::warn!( + tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(), + "Scheduling error when draining pageserver {} : {e}", node_id + ); + } + Ok(()) => { + let scheduled_to = tenant_shard.intent.get_attached(); + tracing::info!( + tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(), + "Rescheduled shard while draining node {}: {} -> {:?}", + node_id, + node_id, + scheduled_to + ); + + let waiter = self.maybe_reconcile_shard(tenant_shard, nodes); + if let Some(some) = waiter { + waiters.push(some); + } + } + } + } + + last_inspected_shard = Some(*tid); + } + } + + waiters = self + .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) + .await; + } + + while !waiters.is_empty() { + tracing::info!("Awaiting {} pending drain reconciliations", waiters.len()); + + waiters = self + .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) + .await; + } + + // At this point we have done the best we could to drain shards from this node. + // Set the node scheduling policy to `[NodeSchedulingPolicy::PauseForRestart]` + // to complete the drain. + if let Err(err) = self + .node_configure(node_id, None, Some(NodeSchedulingPolicy::PauseForRestart)) + .await + { + // This is not fatal. Anything that is polling the node scheduling policy to detect + // the end of the drain operations will hang, but all such places should enforce an + // overall timeout. The scheduling policy will be updated upon node re-attach and/or + // by the counterpart fill operation. + return Err(OperationError::FinalizeError( + format!( + "Failed to finalise drain of {node_id} by setting scheduling policy to PauseForRestart: {err}" + ) + .into(), + )); + } + + Ok(()) + } + + /// Create a node fill plan (pick secondaries to promote) that meets the following requirements: + /// 1. The node should be filled until it reaches the expected cluster average of + /// attached shards. If there are not enough secondaries on the node, the plan stops early. + /// 2. Select tenant shards to promote such that the number of attached shards is balanced + /// throughout the cluster. We achieve this by picking tenant shards from each node, + /// starting from the ones with the largest number of attached shards, until the node + /// reaches the expected cluster average. + fn fill_node_plan(&self, node_id: NodeId) -> Vec { + let mut locked = self.inner.write().unwrap(); + let fill_requirement = locked.scheduler.compute_fill_requirement(node_id); + + let mut tids_by_node = locked + .tenants + .iter_mut() + .filter_map(|(tid, tenant_shard)| { + if tenant_shard.intent.get_secondary().contains(&node_id) { + if let Some(primary) = tenant_shard.intent.get_attached() { + return Some((*primary, *tid)); + } + } + + None + }) + .into_group_map(); + + let expected_attached = locked.scheduler.expected_attached_shard_count(); + let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count(); + + let mut plan = Vec::new(); + for (node_id, attached) in nodes_by_load { + if plan.len() >= fill_requirement + || tids_by_node.is_empty() + || attached <= expected_attached + { + break; + } + + let can_take = attached - expected_attached; + let mut remove_node = false; + for _ in 0..can_take { + match tids_by_node.get_mut(&node_id) { + Some(tids) => match tids.pop() { + Some(tid) => { + plan.push(tid); + } + None => { + remove_node = true; + break; + } + }, + None => { + break; + } + } + } + + if remove_node { + tids_by_node.remove(&node_id); + } + } + + plan + } + + /// Fill a node by promoting its secondaries until the cluster is balanced + /// with regards to attached shard counts. Note that this operation only + /// makes sense as a counterpart to the drain implemented in [`Service::drain_node`]. + /// This is a long running operation and it should run as a separate Tokio task. + pub(crate) async fn fill_node( + &self, + node_id: NodeId, + cancel: CancellationToken, + ) -> Result<(), OperationError> { + // TODO(vlad): Currently this operates on the assumption that all + // secondaries are warm. This is not always true (e.g. we just migrated the + // tenant). Take that into consideration by checking the secondary status. + let mut tids_to_promote = self.fill_node_plan(node_id); + + let mut waiters = Vec::new(); + let mut schedule_context = ScheduleContext::default(); + + // Execute the plan we've composed above. Before aplying each move from the plan, + // we validate to ensure that it has not gone stale in the meantime. + while !tids_to_promote.is_empty() { + if cancel.is_cancelled() { + return Err(OperationError::Cancelled); + } + + { + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + + let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged( + format!("node {node_id} was removed").into(), + ))?; + + let current_policy = node.get_scheduling(); + if !matches!(current_policy, NodeSchedulingPolicy::Filling) { + // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think + // about it + return Err(OperationError::NodeStateChanged( + format!("node {node_id} changed state to {current_policy:?}").into(), + )); + } + + while waiters.len() < MAX_RECONCILES_PER_OPERATION { + if let Some(tid) = tids_to_promote.pop() { + if let Some(tenant_shard) = tenants.get_mut(&tid) { + // If the node being filled is not a secondary anymore, + // skip the promotion. + if !tenant_shard.intent.get_secondary().contains(&node_id) { + continue; + } + + let previously_attached_to = *tenant_shard.intent.get_attached(); + + tenant_shard.intent.promote_attached(scheduler, node_id); + match tenant_shard.schedule(scheduler, &mut schedule_context) { + Err(e) => { + tracing::warn!( + tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(), + "Scheduling error when filling pageserver {} : {e}", node_id + ); + } + Ok(()) => { + tracing::info!( + tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(), + "Rescheduled shard while filling node {}: {:?} -> {}", + node_id, + previously_attached_to, + node_id + ); + + if let Some(waiter) = + self.maybe_reconcile_shard(tenant_shard, nodes) + { + waiters.push(waiter); + } + } + } + } + } else { + break; + } + } + } + + waiters = self + .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) + .await; + } + + while !waiters.is_empty() { + tracing::info!("Awaiting {} pending fill reconciliations", waiters.len()); + + waiters = self + .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) + .await; + } + + if let Err(err) = self + .node_configure(node_id, None, Some(NodeSchedulingPolicy::Active)) + .await + { + // This isn't a huge issue since the filling process starts upon request. However, it + // will prevent the next drain from starting. The only case in which this can fail + // is database unavailability. Such a case will require manual intervention. + return Err(OperationError::FinalizeError( + format!("Failed to finalise fill of {node_id} by setting scheduling policy to Active: {err}") + .into(), + )); + } + + Ok(()) + } } diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 77bbf4c604..d1b632755f 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -10,7 +10,9 @@ use crate::{ reconciler::ReconcileUnits, scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext}, }; -use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy}; +use pageserver_api::controller_api::{ + NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, +}; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, shard::{ShardIdentity, TenantShardId}, @@ -311,6 +313,12 @@ pub(crate) struct ReconcilerWaiter { seq: Sequence, } +pub(crate) enum ReconcilerStatus { + Done, + Failed, + InProgress, +} + #[derive(thiserror::Error, Debug)] pub(crate) enum ReconcileWaitError { #[error("Timeout waiting for shard {0}")] @@ -373,6 +381,16 @@ impl ReconcilerWaiter { Ok(()) } + + pub(crate) fn get_status(&self) -> ReconcilerStatus { + if self.seq_wait.would_wait_for(self.seq).is_err() { + ReconcilerStatus::Done + } else if self.error_seq_wait.would_wait_for(self.seq).is_err() { + ReconcilerStatus::Failed + } else { + ReconcilerStatus::InProgress + } + } } /// Having spawned a reconciler task, the tenant shard's state will carry enough @@ -652,13 +670,17 @@ impl TenantShard { let mut scores = all_pageservers .iter() .flat_map(|node_id| { - if matches!( - nodes - .get(node_id) - .map(|n| n.may_schedule()) - .unwrap_or(MaySchedule::No), - MaySchedule::No + let node = nodes.get(node_id); + if node.is_none() { + None + } else if matches!( + node.unwrap().get_scheduling(), + NodeSchedulingPolicy::Filling ) { + // If the node is currently filling, don't count it as a candidate to avoid, + // racing with the background fill. + None + } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) { None } else { let affinity_score = schedule_context.get_node_affinity(*node_id); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index aa55b6e4cb..bad93ff39a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2213,6 +2213,30 @@ class NeonStorageController(MetricsGetter, LogUtils): headers=self.headers(TokenScope.ADMIN), ) + def node_drain(self, node_id): + log.info(f"node_drain({node_id})") + self.request( + "PUT", + f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain", + headers=self.headers(TokenScope.ADMIN), + ) + + def node_fill(self, node_id): + log.info(f"node_fill({node_id})") + self.request( + "PUT", + f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill", + headers=self.headers(TokenScope.ADMIN), + ) + + def node_status(self, node_id): + response = self.request( + "GET", + f"{self.env.storage_controller_api}/control/v1/node/{node_id}", + headers=self.headers(TokenScope.ADMIN), + ) + return response.json() + def node_list(self): response = self.request( "GET", diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 8624a45f45..30f96ceee8 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -40,7 +40,7 @@ from werkzeug.wrappers.response import Response def get_node_shard_counts(env: NeonEnv, tenant_ids): - counts: defaultdict[str, int] = defaultdict(int) + counts: defaultdict[int, int] = defaultdict(int) for tid in tenant_ids: for shard in env.storage_controller.locate(tid): counts[shard["node_id"]] += 1 @@ -1502,3 +1502,120 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto workload = Workload(env, tenant_id, timeline, branch_name=branch) workload.expect_rows = expect_rows workload.validate() + + +def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder): + """ + Graceful reststart of storage controller clusters use the drain and + fill hooks in order to migrate attachments away from pageservers before + restarting. In practice, Ansible will drive this process. + """ + neon_env_builder.num_pageservers = 2 + env = neon_env_builder.init_configs() + env.start() + + tenant_count = 5 + shard_count_per_tenant = 8 + total_shards = tenant_count * shard_count_per_tenant + tenant_ids = [] + + for _ in range(0, tenant_count): + tid = TenantId.generate() + tenant_ids.append(tid) + env.neon_cli.create_tenant( + tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant + ) + + # Give things a chance to settle. + # A call to `reconcile_until_idle` could be used here instead, + # however since all attachments are placed on the same node, + # we'd have to wait for a long time (2 minutes-ish) for optimizations + # to quiesce. + # TODO: once the initial attachment selection is fixed, update this + # to use `reconcile_until_idle`. + time.sleep(2) + + nodes = env.storage_controller.node_list() + assert len(nodes) == 2 + + def retryable_node_operation(op, ps_id, max_attempts, backoff): + while max_attempts > 0: + try: + op(ps_id) + return + except StorageControllerApiException as e: + max_attempts -= 1 + log.info(f"Operation failed ({max_attempts} attempts left): {e}") + + if max_attempts == 0: + raise e + + time.sleep(backoff) + + def poll_node_status(node_id, desired_scheduling_policy, max_attempts, backoff): + log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy") + while max_attempts > 0: + try: + status = env.storage_controller.node_status(node_id) + policy = status["scheduling"] + if policy == desired_scheduling_policy: + return + else: + max_attempts -= 1 + log.info(f"Status call returned {policy=} ({max_attempts} attempts left)") + + if max_attempts == 0: + raise AssertionError( + f"Status for {node_id=} did not reach {desired_scheduling_policy=}" + ) + + time.sleep(backoff) + except StorageControllerApiException as e: + max_attempts -= 1 + log.info(f"Status call failed ({max_attempts} retries left): {e}") + + if max_attempts == 0: + raise e + + time.sleep(backoff) + + def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards): + # Assert that all nodes have some attached shards + assert len(shard_counts) == len(env.pageservers) + + min_shard_count = min(shard_counts.values()) + max_shard_count = max(shard_counts.values()) + + flake_factor = 5 / 100 + assert max_shard_count - min_shard_count <= int(total_shards * flake_factor) + + # Perform a graceful rolling restart + for ps in env.pageservers: + retryable_node_operation( + lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2 + ) + poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5) + + shard_counts = get_node_shard_counts(env, tenant_ids) + log.info(f"Shard counts after draining node {ps.id}: {shard_counts}") + # Assert that we've drained the node + assert shard_counts[ps.id] == 0 + # Assert that those shards actually went somewhere + assert sum(shard_counts.values()) == total_shards + + ps.restart() + poll_node_status(ps.id, "Active", max_attempts=10, backoff=1) + + retryable_node_operation( + lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2 + ) + poll_node_status(ps.id, "Active", max_attempts=6, backoff=5) + + shard_counts = get_node_shard_counts(env, tenant_ids) + log.info(f"Shard counts after filling node {ps.id}: {shard_counts}") + assert_shard_counts_balanced(env, shard_counts, total_shards) + + # Now check that shards are reasonably balanced + shard_counts = get_node_shard_counts(env, tenant_ids) + log.info(f"Shard counts after rolling restart: {shard_counts}") + assert_shard_counts_balanced(env, shard_counts, total_shards)