diff --git a/Cargo.lock b/Cargo.lock index dbbf330cf9..d1f3f1522e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5801,6 +5801,7 @@ dependencies = [ "r2d2", "reqwest 0.12.4", "routerify", + "scopeguard", "serde", "serde_json", "strum", diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 1278f17ad2..a0d10dc665 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -209,6 +209,7 @@ pub enum NodeSchedulingPolicy { Active, Filling, Pause, + PauseForRestart, Draining, } @@ -220,6 +221,7 @@ impl FromStr for NodeSchedulingPolicy { "active" => Ok(Self::Active), "filling" => Ok(Self::Filling), "pause" => Ok(Self::Pause), + "pause_for_restart" => Ok(Self::PauseForRestart), "draining" => Ok(Self::Draining), _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")), } @@ -233,6 +235,7 @@ impl From for String { Active => "active", Filling => "filling", Pause => "pause", + PauseForRestart => "pause_for_restart", Draining => "draining", } .to_string() diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 194619a496..b54dea5d47 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -40,6 +40,7 @@ tokio.workspace = true tokio-util.workspace = true tracing.workspace = true measured.workspace = true +scopeguard.workspace = true strum.workspace = true strum_macros.workspace = true diff --git a/storage_controller/src/background_node_operations.rs b/storage_controller/src/background_node_operations.rs new file mode 100644 index 0000000000..73fe48018e --- /dev/null +++ b/storage_controller/src/background_node_operations.rs @@ -0,0 +1,236 @@ +use std::{borrow::Cow, collections::HashMap, fmt::Debug, fmt::Display, sync::Arc}; + +use tokio::{sync::mpsc::error::TrySendError, task::JoinHandle}; +use tokio_util::sync::CancellationToken; +use utils::id::NodeId; + +use crate::service::Service; + +pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10; + +#[derive(Copy, Clone)] +pub(crate) struct Drain { + node_id: NodeId, +} + +#[derive(Copy, Clone)] +pub(crate) struct Fill { + node_id: NodeId, +} + +pub(crate) enum Operation { + Drain(Drain), + Fill(Fill), +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum OperationError { + #[error("Operation precondition failed: {0}")] + PreconditionFailed(Cow<'static, str>), + #[error("Node state changed during operation: {0}")] + NodeStateChanged(Cow<'static, str>), + #[error("Operation cancelled")] + Cancelled, + #[error("Shutting down")] + ShuttingDown, +} + +struct OperationHandler { + operation: Operation, + #[allow(unused)] + cancel: CancellationToken, + // TODO(vlad): maybe we don't need this handle after all. + // Unless we want to log the error at the end. + #[allow(unused)] + handle: JoinHandle>, +} + +#[derive(Default, Clone)] +struct OngoingOperations(Arc>>); + +pub(crate) struct Controller { + ongoing: OngoingOperations, + service: Arc, + sender: tokio::sync::mpsc::Sender, +} + +// TODO(vlad): write a little python test for this +impl Controller { + pub(crate) fn new(service: Arc) -> (Self, tokio::sync::mpsc::Receiver) { + let (operations_tx, operations_rx) = tokio::sync::mpsc::channel(1); + ( + Self { + ongoing: Default::default(), + service, + sender: operations_tx, + }, + operations_rx, + ) + } + + pub(crate) fn drain_node(&self, node_id: NodeId) -> Result<(), OperationError> { + if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) { + return Err(OperationError::PreconditionFailed( + format!( + "Background operation already ongoing for node: {}", + handler.operation + ) + .into(), + )); + } + + self.sender.try_send(Operation::Drain(Drain { node_id }))?; + + Ok(()) + } + + pub(crate) fn fill_node(&self, node_id: NodeId) -> Result<(), OperationError> { + if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) { + return Err(OperationError::PreconditionFailed( + format!( + "Background operation already ongoing for node: {}", + handler.operation + ) + .into(), + )); + } + + self.sender.try_send(Operation::Fill(Fill { node_id }))?; + + Ok(()) + } + + pub(crate) async fn handle_operations( + &self, + mut receiver: tokio::sync::mpsc::Receiver, + ) { + while let Some(op) = receiver.recv().await { + match op { + Operation::Drain(drain) => self.handle_drain(drain), + Operation::Fill(fill) => self.handle_fill(fill), + } + } + } + + fn handle_drain(&self, drain: Drain) { + let node_id = drain.node_id; + + let cancel = CancellationToken::new(); + + let (_holder, waiter) = utils::completion::channel(); + + let handle = tokio::task::spawn({ + let service = self.service.clone(); + let ongoing = self.ongoing.clone(); + let cancel = cancel.clone(); + + async move { + scopeguard::defer! { + let removed = ongoing.0.write().unwrap().remove(&drain.node_id); + if let Some(Operation::Drain(removed_drain)) = removed.map(|h| h.operation) { + assert_eq!(removed_drain.node_id, drain.node_id, "We always remove the same operation"); + } else { + panic!("We always remove the same operation") + } + } + + waiter.wait().await; + service.drain_node(drain.node_id, cancel).await + } + }); + + let replaced = self.ongoing.0.write().unwrap().insert( + node_id, + OperationHandler { + operation: Operation::Drain(drain), + cancel, + handle, + }, + ); + + assert!( + replaced.is_none(), + "The channel size is 1 and we checked before enqueing" + ); + } + + fn handle_fill(&self, fill: Fill) { + let node_id = fill.node_id; + + let cancel = CancellationToken::new(); + + let (_holder, waiter) = utils::completion::channel(); + + let handle = tokio::task::spawn({ + let service = self.service.clone(); + let ongoing = self.ongoing.clone(); + let cancel = cancel.clone(); + + async move { + scopeguard::defer! { + let removed = ongoing.0.write().unwrap().remove(&fill.node_id); + if let Some(Operation::Fill(removed_fill)) = removed.map(|h| h.operation) { + assert_eq!(removed_fill.node_id, fill.node_id, "We always remove the same operation"); + } else { + panic!("We always remove the same operation") + } + } + + waiter.wait().await; + service.fill_node(fill.node_id, cancel).await + } + }); + + let replaced = self.ongoing.0.write().unwrap().insert( + node_id, + OperationHandler { + operation: Operation::Fill(fill), + cancel, + handle, + }, + ); + + assert!( + replaced.is_none(), + "The channel size is 1 and we checked before enqueing" + ); + } +} + +impl From> for OperationError { + fn from(value: TrySendError) -> Self { + match value { + TrySendError::Full(_) => { + Self::PreconditionFailed("Too many background operation in progress".into()) + } + TrySendError::Closed(_) => Self::ShuttingDown, + } + } +} + +impl Display for Drain { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "drain {}", self.node_id) + } +} + +impl Display for Fill { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "fill {}", self.node_id) + } +} + +impl Display for Operation { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Operation::Drain(op) => write!(f, "{op}"), + Operation::Fill(op) => write!(f, "{op}"), + } + } +} + +impl Debug for Controller { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "backround_node_operations::Controller") + } +} diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index bbb6d2cb32..596675cc6f 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -480,6 +480,39 @@ async fn handle_node_configure(mut req: Request) -> Result, ) } +async fn handle_node_status(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + + let node_status = state.service.get_node_status(node_id).await?; + + json_response(StatusCode::OK, node_status) +} + +async fn handle_node_drain(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + + state.service.start_node_drain(node_id).await?; + + json_response(StatusCode::ACCEPTED, ()) +} + +async fn handle_node_fill(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + + state.service.start_node_fill(node_id).await?; + + json_response(StatusCode::ACCEPTED, ()) +} + async fn handle_tenant_shard_split( service: Arc, mut req: Request, @@ -832,6 +865,16 @@ pub fn make_router( RequestName("control_v1_node_config"), ) }) + .get("/control/v1/node/:node_id", |r| { + named_request_span(r, handle_node_status, RequestName("control_v1_node_status")) + }) + .put("/control/v1/node/:node_id/drain", |r| { + named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain")) + }) + .put("/control/v1/node/:node_id/fill", |r| { + named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill")) + }) + // TODO(vlad): endpoint for cancelling drain and fill // Tenant Shard operations .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| { tenant_service_handler( diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index 2ea490a14b..8caf638904 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -2,6 +2,7 @@ use serde::Serialize; use utils::seqwait::MonotonicCounter; mod auth; +mod background_node_operations; mod compute_hook; mod heartbeater; pub mod http; diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index 7b5513c908..da6e506bee 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -59,12 +59,17 @@ impl Node { self.id } + pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy { + self.scheduling + } + pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) { self.scheduling = scheduling } /// Does this registration request match `self`? This is used when deciding whether a registration /// request should be allowed to update an existing record with the same node ID. + /// how pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool { self.id == register_req.node_id && self.listen_http_addr == register_req.listen_http_addr @@ -141,6 +146,7 @@ impl Node { NodeSchedulingPolicy::Draining => MaySchedule::No, NodeSchedulingPolicy::Filling => MaySchedule::Yes(score), NodeSchedulingPolicy::Pause => MaySchedule::No, + NodeSchedulingPolicy::PauseForRestart => MaySchedule::No, } } @@ -157,7 +163,7 @@ impl Node { listen_http_port, listen_pg_addr, listen_pg_port, - scheduling: NodeSchedulingPolicy::Filling, + scheduling: NodeSchedulingPolicy::Active, availability: NodeAvailability::Offline, cancel: CancellationToken::new(), } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 67c05296d5..c287d6f111 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -442,13 +442,15 @@ impl Persistence { #[tracing::instrument(skip_all, fields(node_id))] pub(crate) async fn re_attach( &self, - node_id: NodeId, + input_node_id: NodeId, ) -> DatabaseResult> { + use crate::schema::nodes::dsl::scheduling_policy; + use crate::schema::nodes::dsl::*; use crate::schema::tenant_shards::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::ReAttach, move |conn| { let rows_updated = diesel::update(tenant_shards) - .filter(generation_pageserver.eq(node_id.0 as i64)) + .filter(generation_pageserver.eq(input_node_id.0 as i64)) .set(generation.eq(generation + 1)) .execute(conn)?; @@ -457,9 +459,23 @@ impl Persistence { // TODO: UPDATE+SELECT in one query let updated = tenant_shards - .filter(generation_pageserver.eq(node_id.0 as i64)) + .filter(generation_pageserver.eq(input_node_id.0 as i64)) .select(TenantShardPersistence::as_select()) .load(conn)?; + + // If the node went through a drain and restart phase before re-attaching, + // then reset it's node scheduling policy to active. + diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .filter( + scheduling_policy + // TODO(vlad): Do the same for `Filling` + .eq(String::from(NodeSchedulingPolicy::PauseForRestart)) + .or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining))), + ) + .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) + .execute(conn)?; + Ok(updated) }) .await?; diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 3ff0d87988..49460b8d2d 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -29,6 +29,8 @@ pub enum MaySchedule { struct SchedulerNode { /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`]. shard_count: usize, + /// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`]. + attached_shard_count: usize, /// Whether this node is currently elegible to have new shards scheduled (this is derived /// from a node's availability state and scheduling policy). @@ -42,7 +44,9 @@ impl PartialEq for SchedulerNode { (MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No) ); - may_schedule_matches && self.shard_count == other.shard_count + may_schedule_matches + && self.shard_count == other.shard_count + && self.attached_shard_count == other.attached_shard_count } } @@ -138,6 +142,15 @@ impl ScheduleContext { } } +pub(crate) enum RefCountUpdate { + PromoteSecondary, + Attach, + Detach, + DemoteAttached, + AddSecondary, + RemoveSecondary, +} + impl Scheduler { pub(crate) fn new<'a>(nodes: impl Iterator) -> Self { let mut scheduler_nodes = HashMap::new(); @@ -146,6 +159,7 @@ impl Scheduler { node.get_id(), SchedulerNode { shard_count: 0, + attached_shard_count: 0, may_schedule: node.may_schedule(), }, ); @@ -171,6 +185,7 @@ impl Scheduler { node.get_id(), SchedulerNode { shard_count: 0, + attached_shard_count: 0, may_schedule: node.may_schedule(), }, ); @@ -179,7 +194,10 @@ impl Scheduler { for shard in shards { if let Some(node_id) = shard.intent.get_attached() { match expect_nodes.get_mut(node_id) { - Some(node) => node.shard_count += 1, + Some(node) => { + node.shard_count += 1; + node.attached_shard_count += 1; + } None => anyhow::bail!( "Tenant {} references nonexistent node {}", shard.tenant_shard_id, @@ -227,31 +245,67 @@ impl Scheduler { Ok(()) } - /// Increment the reference count of a node. This reference count is used to guide scheduling - /// decisions, not for memory management: it represents one tenant shard whose IntentState targets - /// this node. + /// Update the reference counts of a node. These reference counts are used to guide scheduling + /// decisions, not for memory management: they represent the number of tenant shard whose IntentState + /// targets this node and the number of tenants shars whose IntentState is attached to this + /// node. /// /// It is an error to call this for a node that is not known to the scheduler (i.e. passed into /// [`Self::new`] or [`Self::node_upsert`]) - pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) { + pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) { let Some(node) = self.nodes.get_mut(&node_id) else { - tracing::error!("Scheduler missing node {node_id}"); debug_assert!(false); + tracing::error!("Scheduler missing node {node_id}"); return; }; - node.shard_count += 1; + match update { + RefCountUpdate::PromoteSecondary => { + node.attached_shard_count += 1; + } + RefCountUpdate::Attach => { + node.shard_count += 1; + node.attached_shard_count += 1; + } + RefCountUpdate::Detach => { + node.shard_count -= 1; + node.attached_shard_count -= 1; + } + RefCountUpdate::DemoteAttached => { + node.attached_shard_count -= 1; + } + RefCountUpdate::AddSecondary => { + node.shard_count += 1; + } + RefCountUpdate::RemoveSecondary => { + node.shard_count -= 1; + } + } } - /// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`]. - pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) { - let Some(node) = self.nodes.get_mut(&node_id) else { + // Check if the number of shards attached to a give node is lagging below + // the cluster average. If that's the case, the node should be filled. + // TODO(vlad): We can probably be smarter about this. This could be expressed + // as standard deviation of attached shards while excluding paused nodes. + pub(crate) fn should_fill_node(&self, node_id: NodeId) -> bool { + let Some(node) = self.nodes.get(&node_id) else { debug_assert!(false); tracing::error!("Scheduler missing node {node_id}"); - return; + return true; }; - node.shard_count -= 1; + let total_attached_shards: usize = + self.nodes.values().map(|n| n.attached_shard_count).sum(); + + assert!(!self.nodes.is_empty()); + let expected_attached_shards_per_node = total_attached_shards / self.nodes.len(); + + // TODO(vlad): remove this + for (node_id, node) in self.nodes.iter() { + tracing::info!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node); + } + + node.attached_shard_count < expected_attached_shards_per_node } pub(crate) fn node_upsert(&mut self, node: &Node) { @@ -263,6 +317,7 @@ impl Scheduler { Vacant(entry) => { entry.insert(SchedulerNode { shard_count: 0, + attached_shard_count: 0, may_schedule: node.may_schedule(), }); } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 756dc10a2a..997e451e9f 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -8,13 +8,15 @@ use std::{ }; use crate::{ + background_node_operations::{self, Controller, OperationError, MAX_RECONCILES_PER_OPERATION}, compute_hook::NotifyError, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard}, persistence::{AbortShardSplitStatus, TenantFilter}, reconciler::{ReconcileError, ReconcileUnits}, - scheduler::{ScheduleContext, ScheduleMode}, + scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, tenant_shard::{ - MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction, + MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization, + ScheduleOptimizationAction, }, }; use anyhow::Context; @@ -275,6 +277,8 @@ pub struct Service { /// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity. delayed_reconcile_tx: tokio::sync::mpsc::Sender, + background_operations_controller: std::sync::OnceLock, + // Process shutdown will fire this token cancel: CancellationToken, @@ -296,6 +300,19 @@ impl From for ApiError { } } +impl From for ApiError { + fn from(value: OperationError) -> Self { + match value { + OperationError::PreconditionFailed(err) => ApiError::PreconditionFailed(err.into()), + OperationError::NodeStateChanged(err) => { + ApiError::InternalServerError(anyhow::anyhow!(err)) + } + OperationError::ShuttingDown => ApiError::ShuttingDown, + OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()), + } + } +} + #[allow(clippy::large_enum_variant)] enum TenantCreateOrUpdate { Create(TenantCreateRequest), @@ -1090,12 +1107,20 @@ impl Service { delayed_reconcile_tx, abort_tx, startup_complete: startup_complete.clone(), + background_operations_controller: Default::default(), cancel, gate: Gate::default(), tenant_op_locks: Default::default(), node_op_locks: Default::default(), }); + let (controller, node_operations_receiver) = + background_node_operations::Controller::new(this.clone()); + + this.background_operations_controller + .set(controller) + .expect("This is the only code path that sets the controller"); + let result_task_this = this.clone(); tokio::task::spawn(async move { // Block shutdown until we're done (we must respect self.cancel) @@ -1167,6 +1192,19 @@ impl Service { } }); + tokio::task::spawn({ + let this = this.clone(); + let startup_complete = startup_complete.clone(); + async move { + startup_complete.wait().await; + this.background_operations_controller + .get() + .expect("Initialized at start up") + .handle_operations(node_operations_receiver) + .await; + } + }); + Ok(this) } @@ -1562,15 +1600,30 @@ impl Service { // Setting a node active unblocks any Reconcilers that might write to the location config API, // but those requests will not be accepted by the node until it has finished processing // the re-attach response. + // + // Additionally, reset the nodes scheduling policy to match the conditional update done + // in [`Persistence::re_attach`]. if let Some(node) = nodes.get(&reattach_req.node_id) { - if !node.is_available() { + let reset_scheduling = matches!( + node.get_scheduling(), + NodeSchedulingPolicy::PauseForRestart | NodeSchedulingPolicy::Draining + ); + + if !node.is_available() || reset_scheduling { let mut new_nodes = (**nodes).clone(); if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) { - node.set_availability(NodeAvailability::Active(UtilizationScore::worst())); + if !node.is_available() { + node.set_availability(NodeAvailability::Active(UtilizationScore::worst())); + } + + if reset_scheduling { + node.set_scheduling(NodeSchedulingPolicy::Active); + } + scheduler.node_upsert(node); + let new_nodes = Arc::new(new_nodes); + *nodes = new_nodes; } - let new_nodes = Arc::new(new_nodes); - *nodes = new_nodes; } } @@ -1851,6 +1904,23 @@ impl Service { Ok(()) } + async fn kick_waiters( + &self, + waiters: Vec, + timeout: Duration, + ) -> Vec { + let deadline = Instant::now().checked_add(timeout).unwrap(); + for waiter in waiters.iter() { + let timeout = deadline.duration_since(Instant::now()); + let _ = waiter.wait_timeout(timeout).await; + } + + waiters + .into_iter() + .filter(|waiter| matches!(waiter.get_status(), ReconcilerStatus::InProgress)) + .collect::>() + } + /// Part of [`Self::tenant_location_config`]: dissect an incoming location config request, /// and transform it into either a tenant creation of a series of shard updates. /// @@ -4128,6 +4198,18 @@ impl Service { Ok(nodes) } + pub(crate) async fn get_node_status(&self, node_id: NodeId) -> Result { + self.inner + .read() + .unwrap() + .nodes + .get(&node_id) + .cloned() + .ok_or(ApiError::NotFound( + format!("Node {node_id} not registered").into(), + )) + } + pub(crate) async fn node_register( &self, register_req: NodeRegisterRequest, @@ -4282,9 +4364,6 @@ impl Service { if let Some(scheduling) = scheduling { node.set_scheduling(scheduling); - - // TODO: once we have a background scheduling ticker for fill/drain, kick it - // to wake up and start working. } // Update the scheduler, in case the elegibility of the node for new shards has changed @@ -4312,7 +4391,7 @@ impl Service { continue; } - if tenant_shard.intent.demote_attached(node_id) { + if tenant_shard.intent.demote_attached(scheduler, node_id) { tenant_shard.sequence = tenant_shard.sequence.next(); // TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters @@ -4359,7 +4438,7 @@ impl Service { // TODO: in the background, we should balance work back onto this pageserver } AvailabilityTransition::Unchanged => { - tracing::debug!("Node {} no change during config", node_id); + tracing::debug!("Node {} no availability change during config", node_id); } } @@ -4368,6 +4447,112 @@ impl Service { Ok(()) } + pub(crate) async fn start_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> { + let (node_available, node_policy, schedulable_nodes_count) = { + let locked = self.inner.read().unwrap(); + let nodes = &locked.nodes; + let node = nodes.get(&node_id).ok_or(ApiError::NotFound( + anyhow::anyhow!("Node {} not registered", node_id).into(), + ))?; + let schedulable_nodes_count = nodes + .iter() + .filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_))) + .count(); + + ( + node.is_available(), + node.get_scheduling(), + schedulable_nodes_count, + ) + }; + + if !node_available { + return Err(ApiError::ResourceUnavailable( + format!("Node {node_id} is currently unavailable").into(), + )); + } + + if schedulable_nodes_count == 0 { + return Err(ApiError::PreconditionFailed( + "No other schedulable nodes to drain to".into(), + )); + } + + match node_policy { + NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => { + self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining)) + .await?; + let controller = self + .background_operations_controller + .get() + .expect("Initialized at start up"); + controller.drain_node(node_id)?; + } + NodeSchedulingPolicy::Draining => { + return Err(ApiError::Conflict(format!( + "Node {node_id} has drain in progress" + ))); + } + policy => { + return Err(ApiError::PreconditionFailed( + format!("Node {node_id} cannot be drained due to {policy:?} policy").into(), + )); + } + } + + Ok(()) + } + + pub(crate) async fn start_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> { + let (node_available, node_policy, total_nodes_count) = { + let locked = self.inner.read().unwrap(); + let nodes = &locked.nodes; + let node = nodes.get(&node_id).ok_or(ApiError::NotFound( + anyhow::anyhow!("Node {} not registered", node_id).into(), + ))?; + + (node.is_available(), node.get_scheduling(), nodes.len()) + }; + + if !node_available { + return Err(ApiError::ResourceUnavailable( + format!("Node {node_id} is currently unavailable").into(), + )); + } + + if total_nodes_count <= 1 { + return Err(ApiError::PreconditionFailed( + "No other nodes to fill from".into(), + )); + } + + match node_policy { + // TODO: clear Draining, Filling and PauseForRestart on storage controller restart and + // re-attach + NodeSchedulingPolicy::Active => { + self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling)) + .await?; + let controller = self + .background_operations_controller + .get() + .expect("Initialized at start up"); + controller.fill_node(node_id)?; + } + NodeSchedulingPolicy::Filling => { + return Err(ApiError::Conflict(format!( + "Node {node_id} has fill in progress" + ))); + } + policy => { + return Err(ApiError::PreconditionFailed( + format!("Node {node_id} cannot be filled due to {policy:?} policy").into(), + )); + } + } + + Ok(()) + } + /// Helper for methods that will try and call pageserver APIs for /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant /// is attached somewhere. @@ -4952,4 +5137,197 @@ impl Service { // to complete. self.gate.close().await; } + + pub(crate) async fn drain_node( + &self, + node_id: NodeId, + cancel: CancellationToken, + ) -> Result<(), OperationError> { + tracing::info!(%node_id, "Starting drain background operation"); + + let last_inspected_shard: Option = None; + let mut inspected_all_shards = false; + let mut waiters = Vec::new(); + let mut schedule_context = ScheduleContext::default(); + + while !inspected_all_shards { + if cancel.is_cancelled() { + return Err(OperationError::Cancelled); + } + + { + let mut locked = self.inner.write().unwrap(); + tracing::info!(%node_id, "Drain got write lock"); + let (nodes, tenants, scheduler) = locked.parts_mut(); + + let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged( + format!("node {node_id} was removed").into(), + ))?; + + let current_policy = node.get_scheduling(); + if !matches!(current_policy, NodeSchedulingPolicy::Draining) { + // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think + // about it + return Err(OperationError::NodeStateChanged( + format!("node {node_id} changed state to {current_policy:?}").into(), + )); + } + + let mut cursor = + tenants + .iter_mut() + .skip_while(|(tid, _)| match last_inspected_shard { + Some(last) => **tid != last, + None => false, + }); + + while waiters.len() < MAX_RECONCILES_PER_OPERATION { + let (tid, tenant_shard) = match cursor.next() { + Some(some) => some, + None => { + inspected_all_shards = true; + break; + } + }; + + if tenant_shard.intent.demote_attached(scheduler, node_id) { + tenant_shard.sequence = tenant_shard.sequence.next(); + match tenant_shard.schedule(scheduler, &mut schedule_context) { + Err(e) => { + tracing::warn!(%tid, "Scheduling error when draining pageserver {} : {e}", node_id); + } + Ok(()) => { + let waiter = self.maybe_reconcile_shard(tenant_shard, nodes); + if let Some(some) = waiter { + waiters.push(some); + } + } + } + } + } + } + tracing::info!(%node_id, "Drain released write lock"); + + waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await; + } + + let _ = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await; + + // At this point we have done the best we could to drain shards from this node. + // Set the node scheduling policy to `[NodeSchedulingPolicy::PauseForRestart]` + // to complete the drain. + if let Err(err) = self + .node_configure(node_id, None, Some(NodeSchedulingPolicy::PauseForRestart)) + .await + { + // This is not fatal. Anything that is polling the node scheduling policy to detect + // the end of the drain operations will hang, but all such places should enforce an + // overall timeout. The scheduling policy will be updated upon node re-attach and/or + // by the counterpart fill operation. + tracing::warn!(%node_id, "Failed to finalise drain by setting scheduling policy: {err}"); + } + + tracing::info!(%node_id, "Completed drain background operation"); + + Ok(()) + } + + // TODO: exit early when we are balanced + pub(crate) async fn fill_node( + &self, + node_id: NodeId, + cancel: CancellationToken, + ) -> Result<(), OperationError> { + // TODO(vlad): Currently this operates on the assumption that all + // secondaries are warm. This is not always true (we just migrated the + // tenant). Take that into consideration by checking the secondary status. + + // TODO(vlad): This is a subset of the optimizations applied by `Service::optimize_all`. + // The optimizations should be stopped while filing, othwewise we are going to fight for + // the same reconciler semaphore units. + + tracing::info!(%node_id, "Starting fill background operation"); + + let last_inspected_shard: Option = None; + let mut shards_left = true; + let mut waiters = Vec::new(); + + let mut should_fill_node = { + self.inner + .write() + .unwrap() + .scheduler + .should_fill_node(node_id) + }; + + while shards_left && should_fill_node { + if cancel.is_cancelled() { + return Err(OperationError::Cancelled); + } + + { + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + + let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged( + format!("node {node_id} was removed").into(), + ))?; + + let current_policy = node.get_scheduling(); + if !matches!(current_policy, NodeSchedulingPolicy::Filling) { + // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think + // about it + return Err(OperationError::NodeStateChanged( + format!("node {node_id} changed state to {current_policy:?}").into(), + )); + } + + let mut cursor = + tenants + .iter_mut() + .skip_while(|(tid, _)| match last_inspected_shard { + Some(last) => **tid != last, + None => false, + }); + + while waiters.len() < MAX_RECONCILES_PER_OPERATION { + let (_tid, tenant_shard) = match cursor.next() { + Some(some) => some, + None => { + shards_left = false; + break; + } + }; + + if tenant_shard.intent.get_secondary().contains(&node_id) { + tenant_shard.intent.promote_attached(scheduler, node_id); + let waiter = self.maybe_reconcile_shard(tenant_shard, nodes); + if let Some(some) = waiter { + waiters.push(some); + } + } + } + + should_fill_node = scheduler.should_fill_node(node_id); + } + + waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await; + } + + let _ = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await; + + if let Err(err) = self + .node_configure(node_id, None, Some(NodeSchedulingPolicy::Active)) + .await + { + // This isn't a huge issue since the filling process starts upon request. However, it + // will prevent the next drain from starting. The only case in which this can fail + // is database unavailability. Such a case will require manual intervention. + tracing::error!(%node_id, "Failed to finalise fill by setting scheduling policy: {err}"); + } + + tracing::info!(%node_id, "Completed fill background operation"); + + Ok(()) + } } diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index dda17f9887..1fd52beef0 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -8,9 +8,11 @@ use crate::{ metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome}, persistence::TenantShardPersistence, reconciler::ReconcileUnits, - scheduler::{AffinityScore, MaySchedule, ScheduleContext}, + scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext}, +}; +use pageserver_api::controller_api::{ + NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, }; -use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy}; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, shard::{ShardIdentity, TenantShardId}, @@ -153,7 +155,7 @@ impl IntentState { } pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option) -> Self { if let Some(node_id) = node_id { - scheduler.node_inc_ref(node_id); + scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach); } Self { attached: node_id, @@ -164,10 +166,10 @@ impl IntentState { pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option) { if self.attached != new_attached { if let Some(old_attached) = self.attached.take() { - scheduler.node_dec_ref(old_attached); + scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach); } if let Some(new_attached) = &new_attached { - scheduler.node_inc_ref(*new_attached); + scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach); } self.attached = new_attached; } @@ -177,22 +179,27 @@ impl IntentState { /// secondary to attached while maintaining the scheduler's reference counts. pub(crate) fn promote_attached( &mut self, - _scheduler: &mut Scheduler, + scheduler: &mut Scheduler, promote_secondary: NodeId, ) { // If we call this with a node that isn't in secondary, it would cause incorrect // scheduler reference counting, since we assume the node is already referenced as a secondary. debug_assert!(self.secondary.contains(&promote_secondary)); - // TODO: when scheduler starts tracking attached + secondary counts separately, we will - // need to call into it here. self.secondary.retain(|n| n != &promote_secondary); + + let demoted = self.attached; self.attached = Some(promote_secondary); + + scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary); + if let Some(demoted) = demoted { + scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached); + } } pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) { debug_assert!(!self.secondary.contains(&new_secondary)); - scheduler.node_inc_ref(new_secondary); + scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary); self.secondary.push(new_secondary); } @@ -200,27 +207,27 @@ impl IntentState { pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) { let index = self.secondary.iter().position(|n| *n == node_id); if let Some(index) = index { - scheduler.node_dec_ref(node_id); + scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary); self.secondary.remove(index); } } pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) { for secondary in self.secondary.drain(..) { - scheduler.node_dec_ref(secondary); + scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary); } } /// Remove the last secondary node from the list of secondaries pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) { if let Some(node_id) = self.secondary.pop() { - scheduler.node_dec_ref(node_id); + scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary); } } pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) { if let Some(old_attached) = self.attached.take() { - scheduler.node_dec_ref(old_attached); + scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach); } self.clear_secondary(scheduler); @@ -251,12 +258,11 @@ impl IntentState { /// forget the location on the offline node. /// /// Returns true if a change was made - pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool { + pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool { if self.attached == Some(node_id) { - // TODO: when scheduler starts tracking attached + secondary counts separately, we will - // need to call into it here. self.attached = None; self.secondary.push(node_id); + scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached); true } else { false @@ -307,6 +313,12 @@ pub(crate) struct ReconcilerWaiter { seq: Sequence, } +pub(crate) enum ReconcilerStatus { + Done, + Failed, + InProgress, +} + #[derive(thiserror::Error, Debug)] pub(crate) enum ReconcileWaitError { #[error("Timeout waiting for shard {0}")] @@ -369,6 +381,16 @@ impl ReconcilerWaiter { Ok(()) } + + pub(crate) fn get_status(&self) -> ReconcilerStatus { + if self.seq_wait.would_wait_for(self.seq).is_err() { + ReconcilerStatus::Done + } else if self.error_seq_wait.would_wait_for(self.seq).is_err() { + ReconcilerStatus::Failed + } else { + ReconcilerStatus::InProgress + } + } } /// Having spawned a reconciler task, the tenant shard's state will carry enough @@ -593,7 +615,7 @@ impl TenantShard { Secondary => { if let Some(node_id) = self.intent.get_attached() { // Populate secondary by demoting the attached node - self.intent.demote_attached(*node_id); + self.intent.demote_attached(scheduler, *node_id); modified = true; } else if self.intent.secondary.is_empty() { // Populate secondary by scheduling a fresh node @@ -648,13 +670,17 @@ impl TenantShard { let mut scores = all_pageservers .iter() .flat_map(|node_id| { - if matches!( - nodes - .get(node_id) - .map(|n| n.may_schedule()) - .unwrap_or(MaySchedule::No), - MaySchedule::No + let node = nodes.get(node_id); + if node.is_none() { + None + } else if matches!( + node.unwrap().get_scheduling(), + NodeSchedulingPolicy::Filling ) { + // If the node is currently filling, don't count it as a candidate to avoid, + // racing with the background fill. + None + } else if matches!(node.unwrap().may_schedule(), MaySchedule::No) { None } else { let affinity_score = schedule_context.get_node_affinity(*node_id); @@ -783,7 +809,7 @@ impl TenantShard { old_attached_node_id, new_attached_node_id, }) => { - self.intent.demote_attached(old_attached_node_id); + self.intent.demote_attached(scheduler, old_attached_node_id); self.intent .promote_attached(scheduler, new_attached_node_id); } @@ -1321,7 +1347,9 @@ pub(crate) mod tests { assert_ne!(attached_node_id, secondary_node_id); // Notifying the attached node is offline should demote it to a secondary - let changed = tenant_shard.intent.demote_attached(attached_node_id); + let changed = tenant_shard + .intent + .demote_attached(&mut scheduler, attached_node_id); assert!(changed); assert!(tenant_shard.intent.attached.is_none()); assert_eq!(tenant_shard.intent.secondary.len(), 2); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6fdad2188c..6c87dbbfc2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2213,6 +2213,30 @@ class NeonStorageController(MetricsGetter, LogUtils): headers=self.headers(TokenScope.ADMIN), ) + def node_drain(self, node_id): + log.info(f"node_drain({node_id})") + self.request( + "PUT", + f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain", + headers=self.headers(TokenScope.ADMIN), + ) + + def node_fill(self, node_id): + log.info(f"node_fill({node_id})") + self.request( + "PUT", + f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill", + headers=self.headers(TokenScope.ADMIN), + ) + + def node_status(self, node_id): + response = self.request( + "GET", + f"{self.env.storage_controller_api}/control/v1/node/{node_id}", + headers=self.headers(TokenScope.ADMIN), + ) + return response.json() + def node_list(self): response = self.request( "GET", diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 2031feaa83..38ed8a47a7 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -1477,3 +1477,110 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto workload = Workload(env, tenant_id, timeline, branch_name=branch) workload.expect_rows = expect_rows workload.validate() + + +def test_storage_controller_drain_and_fill_smoke(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_pageservers = 2 + env = neon_env_builder.init_configs() + env.start() + + tenant_count = 5 + shard_count_per_tenant = 8 + total_shards = tenant_count * shard_count_per_tenant + tenant_ids = [] + + for _ in range(0, tenant_count): + tid = TenantId.generate() + tenant_ids.append(tid) + env.neon_cli.create_tenant( + tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant + ) + + # Give things a chance to settle. + # TODO(vlad): replace with a wait + time.sleep(2) + + nodes = env.storage_controller.node_list() + assert len(nodes) == 2 + + def retryable_node_operation(op, ps_id, max_attempts, backoff): + while max_attempts > 0: + try: + op(ps_id) + return + except StorageControllerApiException as e: + max_attempts -= 1 + log.info(f"Operation failed ({max_attempts} attempts left): {e}") + + if max_attempts == 0: + raise e + + time.sleep(backoff) + + def poll_node_status(node_id, desired_scheduling_policy, max_attempts, backoff): + log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy") + while max_attempts > 0: + try: + status = env.storage_controller.node_status(node_id) + policy = status["scheduling"] + if policy == desired_scheduling_policy: + return + else: + max_attempts -= 1 + log.info(f"Status call returned {policy=} ({max_attempts} attempts left)") + + if max_attempts == 0: + raise AssertionError( + f"Status for {node_id=} did not reach {desired_scheduling_policy=}" + ) + + time.sleep(backoff) + except StorageControllerApiException as e: + max_attempts -= 1 + log.info(f"Status call failed ({max_attempts} retries left): {e}") + + if max_attempts == 0: + raise e + + time.sleep(backoff) + + def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards): + # Assert that all nodes have some attached shards + assert len(shard_counts) == len(env.pageservers) + + min_shard_count = min(shard_counts.values()) + max_shard_count = max(shard_counts.values()) + + flake_factor = 5 / 100 + assert max_shard_count - min_shard_count <= int(total_shards * flake_factor) + + # Perform a graceful rolling restart + for ps in env.pageservers: + retryable_node_operation( + lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2 + ) + poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5) + + shard_counts = get_node_shard_counts(env, tenant_ids) + log.info(f"Shard counts after draining node {ps.id}: {shard_counts}") + # Assert that we've drained the node + assert shard_counts[str(ps.id)] == 0 + # Assert that those shards actually went somewhere + assert sum(shard_counts.values()) == total_shards + + ps.restart() + poll_node_status(ps.id, "Active", max_attempts=10, backoff=1) + + retryable_node_operation( + lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2 + ) + poll_node_status(ps.id, "Active", max_attempts=6, backoff=5) + + shard_counts = get_node_shard_counts(env, tenant_ids) + log.info(f"Shard counts after filling node {ps.id}: {shard_counts}") + assert_shard_counts_balanced(env, shard_counts, total_shards) + + # Now check that shards are reasonably balanced + shard_counts = get_node_shard_counts(env, tenant_ids) + log.info(f"Shard counts after rolling restart: {shard_counts}") + assert_shard_counts_balanced(env, shard_counts, total_shards)