diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 0036b7d0f6..701c4b3b2e 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -65,12 +65,27 @@ enum Command { #[arg(long)] scheduling: Option, }, - // Set a node status as deleted. + /// Exists for backup usage and will be removed in future. + /// Use [`Command::NodeStartDelete`] instead, if possible. NodeDelete { #[arg(long)] node_id: NodeId, }, + /// Start deletion of the specified pageserver. + NodeStartDelete { + #[arg(long)] + node_id: NodeId, + }, + /// Cancel deletion of the specified pageserver and wait for `timeout` + /// for the operation to be canceled. May be retried. + NodeCancelDelete { + #[arg(long)] + node_id: NodeId, + #[arg(long)] + timeout: humantime::Duration, + }, /// Delete a tombstone of node from the storage controller. + /// This is used when we want to allow the node to be re-registered. NodeDeleteTombstone { #[arg(long)] node_id: NodeId, @@ -912,10 +927,43 @@ async fn main() -> anyhow::Result<()> { .await?; } Command::NodeDelete { node_id } => { + eprintln!("Warning: This command is obsolete and will be removed in a future version"); + eprintln!("Use `NodeStartDelete` instead, if possible"); storcon_client .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None) .await?; } + Command::NodeStartDelete { node_id } => { + storcon_client + .dispatch::<(), ()>( + Method::PUT, + format!("control/v1/node/{node_id}/delete"), + None, + ) + .await?; + println!("Delete started for {node_id}"); + } + Command::NodeCancelDelete { node_id, timeout } => { + storcon_client + .dispatch::<(), ()>( + Method::DELETE, + format!("control/v1/node/{node_id}/delete"), + None, + ) + .await?; + + println!("Waiting for node {node_id} to quiesce on scheduling policy ..."); + + let final_policy = + wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| { + !matches!(sched, NodeSchedulingPolicy::Deleting) + }) + .await?; + + println!( + "Delete was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}" + ); + } Command::NodeDeleteTombstone { node_id } => { storcon_client .dispatch::<(), ()>( diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index dc9fab2bdb..a8c7083b17 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -386,6 +386,7 @@ pub enum NodeSchedulingPolicy { Pause, PauseForRestart, Draining, + Deleting, } impl FromStr for NodeSchedulingPolicy { @@ -398,6 +399,7 @@ impl FromStr for NodeSchedulingPolicy { "pause" => Ok(Self::Pause), "pause_for_restart" => Ok(Self::PauseForRestart), "draining" => Ok(Self::Draining), + "deleting" => Ok(Self::Deleting), _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")), } } @@ -412,6 +414,7 @@ impl From for String { Pause => "pause", PauseForRestart => "pause_for_restart", Draining => "draining", + Deleting => "deleting", } .to_string() } diff --git a/storage_controller/src/background_node_operations.rs b/storage_controller/src/background_node_operations.rs index a630316f46..7a111f6329 100644 --- a/storage_controller/src/background_node_operations.rs +++ b/storage_controller/src/background_node_operations.rs @@ -6,6 +6,11 @@ use utils::id::NodeId; pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 64; +#[derive(Copy, Clone)] +pub(crate) struct Delete { + pub(crate) node_id: NodeId, +} + #[derive(Copy, Clone)] pub(crate) struct Drain { pub(crate) node_id: NodeId, @@ -18,6 +23,7 @@ pub(crate) struct Fill { #[derive(Copy, Clone)] pub(crate) enum Operation { + Delete(Delete), Drain(Drain), Fill(Fill), } @@ -30,6 +36,8 @@ pub(crate) enum OperationError { FinalizeError(Cow<'static, str>), #[error("Operation cancelled")] Cancelled, + #[error("Impossible constraint error: {0}")] + ImpossibleConstraint(Cow<'static, str>), } pub(crate) struct OperationHandler { @@ -38,6 +46,12 @@ pub(crate) struct OperationHandler { pub(crate) cancel: CancellationToken, } +impl Display for Delete { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "delete {}", self.node_id) + } +} + impl Display for Drain { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "drain {}", self.node_id) @@ -53,6 +67,7 @@ impl Display for Fill { impl Display for Operation { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { + Operation::Delete(op) => write!(f, "{op}"), Operation::Drain(op) => write!(f, "{op}"), Operation::Fill(op) => write!(f, "{op}"), } diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 66c44b5674..ee446ea65d 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -919,7 +919,7 @@ async fn handle_node_drop(req: Request) -> Result, ApiError json_response(StatusCode::OK, state.service.node_drop(node_id).await?) } -async fn handle_node_delete(req: Request) -> Result, ApiError> { +async fn handle_node_delete_old(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; let req = match maybe_forward(req).await { @@ -931,7 +931,10 @@ async fn handle_node_delete(req: Request) -> Result, ApiErr let state = get_state(&req); let node_id: NodeId = parse_request_param(&req, "node_id")?; - json_response(StatusCode::OK, state.service.node_delete(node_id).await?) + json_response( + StatusCode::OK, + state.service.node_delete_old(node_id).await?, + ) } async fn handle_tombstone_list(req: Request) -> Result, ApiError> { @@ -1051,6 +1054,42 @@ async fn handle_get_leader(req: Request) -> Result, ApiErro json_response(StatusCode::OK, leader) } +async fn handle_node_delete(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + json_response( + StatusCode::OK, + state.service.start_node_delete(node_id).await?, + ) +} + +async fn handle_cancel_node_delete(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Infra)?; + + let req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let state = get_state(&req); + let node_id: NodeId = parse_request_param(&req, "node_id")?; + json_response( + StatusCode::ACCEPTED, + state.service.cancel_node_delete(node_id).await?, + ) +} + async fn handle_node_drain(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Infra)?; @@ -2221,8 +2260,14 @@ pub fn make_router( .post("/control/v1/node", |r| { named_request_span(r, handle_node_register, RequestName("control_v1_node")) }) + // This endpoint is deprecated and will be removed in a future version. + // Use PUT /control/v1/node/:node_id/delete instead. .delete("/control/v1/node/:node_id", |r| { - named_request_span(r, handle_node_delete, RequestName("control_v1_node_delete")) + named_request_span( + r, + handle_node_delete_old, + RequestName("control_v1_node_delete"), + ) }) .get("/control/v1/node", |r| { named_request_span(r, handle_node_list, RequestName("control_v1_node")) @@ -2247,6 +2292,20 @@ pub fn make_router( .get("/control/v1/leader", |r| { named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader")) }) + .put("/control/v1/node/:node_id/delete", |r| { + named_request_span( + r, + handle_node_delete, + RequestName("control_v1_start_node_delete"), + ) + }) + .delete("/control/v1/node/:node_id/delete", |r| { + named_request_span( + r, + handle_cancel_node_delete, + RequestName("control_v1_cancel_node_delete"), + ) + }) .put("/control/v1/node/:node_id/drain", |r| { named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain")) }) diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index a9ec511431..36e3c5dc6c 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -6,13 +6,13 @@ extern crate hyper0 as hyper; mod auth; mod background_node_operations; mod compute_hook; -mod drain_utils; mod heartbeater; pub mod http; mod id_lock_map; mod leadership; pub mod metrics; mod node; +mod operation_utils; mod pageserver_client; mod peer_client; pub mod persistence; diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index cba007d75f..6642c72f3c 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -201,6 +201,7 @@ impl Node { match self.scheduling { NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization), + NodeSchedulingPolicy::Deleting => MaySchedule::No, NodeSchedulingPolicy::Draining => MaySchedule::No, NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization), NodeSchedulingPolicy::Pause => MaySchedule::No, diff --git a/storage_controller/src/drain_utils.rs b/storage_controller/src/operation_utils.rs similarity index 64% rename from storage_controller/src/drain_utils.rs rename to storage_controller/src/operation_utils.rs index 0dae7b8147..af86010ab7 100644 --- a/storage_controller/src/drain_utils.rs +++ b/storage_controller/src/operation_utils.rs @@ -10,63 +10,19 @@ use crate::node::Node; use crate::scheduler::Scheduler; use crate::tenant_shard::TenantShard; -pub(crate) struct TenantShardIterator { - tenants_accessor: F, - inspected_all_shards: bool, - last_inspected_shard: Option, -} - -/// A simple iterator which can be used in tandem with [`crate::service::Service`] -/// to iterate over all known tenant shard ids without holding the lock on the -/// service state at all times. -impl TenantShardIterator -where - F: Fn(Option) -> Option, -{ - pub(crate) fn new(tenants_accessor: F) -> Self { - Self { - tenants_accessor, - inspected_all_shards: false, - last_inspected_shard: None, - } - } - - /// Returns the next tenant shard id if one exists - pub(crate) fn next(&mut self) -> Option { - if self.inspected_all_shards { - return None; - } - - match (self.tenants_accessor)(self.last_inspected_shard) { - Some(tid) => { - self.last_inspected_shard = Some(tid); - Some(tid) - } - None => { - self.inspected_all_shards = true; - None - } - } - } - - /// Returns true when the end of the iterator is reached and false otherwise - pub(crate) fn finished(&self) -> bool { - self.inspected_all_shards - } -} - /// Check that the state of the node being drained is as expected: -/// node is present in memory and scheduling policy is set to [`NodeSchedulingPolicy::Draining`] +/// node is present in memory and scheduling policy is set to expected_policy pub(crate) fn validate_node_state( node_id: &NodeId, nodes: Arc>, + expected_policy: NodeSchedulingPolicy, ) -> Result<(), OperationError> { let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged( format!("node {node_id} was removed").into(), ))?; let current_policy = node.get_scheduling(); - if !matches!(current_policy, NodeSchedulingPolicy::Draining) { + if current_policy != expected_policy { // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think // about it return Err(OperationError::NodeStateChanged( @@ -182,55 +138,3 @@ impl TenantShardDrain { } } } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use utils::id::TenantId; - use utils::shard::{ShardCount, ShardNumber, TenantShardId}; - - use super::TenantShardIterator; - - #[test] - fn test_tenant_shard_iterator() { - let tenant_id = TenantId::generate(); - let shard_count = ShardCount(8); - - let mut tenant_shards = Vec::default(); - for i in 0..shard_count.0 { - tenant_shards.push(( - TenantShardId { - tenant_id, - shard_number: ShardNumber(i), - shard_count, - }, - (), - )) - } - - let tenant_shards = Arc::new(tenant_shards); - - let mut tid_iter = TenantShardIterator::new({ - let tenants = tenant_shards.clone(); - move |last_inspected_shard: Option| { - let entry = match last_inspected_shard { - Some(skip_past) => { - let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past); - cursor.nth(1) - } - None => tenants.first(), - }; - - entry.map(|(tid, _)| tid).copied() - } - }); - - let mut iterated_over = Vec::default(); - while let Some(tid) = tid_iter.next() { - iterated_over.push((tid, ())); - } - - assert_eq!(iterated_over, *tenant_shards); - } -} diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index aaf71624ae..ed9a268064 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -635,18 +635,23 @@ impl Persistence { let updated = self .with_measured_conn(DatabaseOperation::ReAttach, move |conn| { Box::pin(async move { - // Check if the node is not marked as deleted - let deleted_node: i64 = nodes + let node: Option = nodes .filter(node_id.eq(input_node_id.0 as i64)) - .filter(lifecycle.eq(String::from(NodeLifecycle::Deleted))) - .count() - .get_result(conn) - .await?; - if deleted_node > 0 { - return Err(DatabaseError::Logical(format!( - "Node {input_node_id} is marked as deleted, re-attach is not allowed" - ))); - } + .first::(conn) + .await + .optional()?; + + // Check if the node is not marked as deleted + match node { + Some(node) if matches!(NodeLifecycle::from_str(&node.lifecycle), Ok(NodeLifecycle::Deleted)) => { + return Err(DatabaseError::Logical(format!( + "Node {input_node_id} is marked as deleted, re-attach is not allowed" + ))); + } + _ => { + // go through + } + }; let rows_updated = diesel::update(tenant_shards) .filter(generation_pageserver.eq(input_node_id.0 as i64)) @@ -664,21 +669,23 @@ impl Persistence { .load(conn) .await?; - // If the node went through a drain and restart phase before re-attaching, - // then reset it's node scheduling policy to active. - diesel::update(nodes) - .filter(node_id.eq(input_node_id.0 as i64)) - .filter( - scheduling_policy - .eq(String::from(NodeSchedulingPolicy::PauseForRestart)) - .or(scheduling_policy - .eq(String::from(NodeSchedulingPolicy::Draining))) - .or(scheduling_policy - .eq(String::from(NodeSchedulingPolicy::Filling))), - ) - .set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) - .execute(conn) - .await?; + if let Some(node) = node { + let old_scheduling_policy = + NodeSchedulingPolicy::from_str(&node.scheduling_policy).unwrap(); + let new_scheduling_policy = match old_scheduling_policy { + NodeSchedulingPolicy::Active => NodeSchedulingPolicy::Active, + NodeSchedulingPolicy::PauseForRestart => NodeSchedulingPolicy::Active, + NodeSchedulingPolicy::Draining => NodeSchedulingPolicy::Active, + NodeSchedulingPolicy::Filling => NodeSchedulingPolicy::Active, + NodeSchedulingPolicy::Pause => NodeSchedulingPolicy::Pause, + NodeSchedulingPolicy::Deleting => NodeSchedulingPolicy::Pause, + }; + diesel::update(nodes) + .filter(node_id.eq(input_node_id.0 as i64)) + .set(scheduling_policy.eq(String::from(new_scheduling_policy))) + .execute(conn) + .await?; + } Ok(updated) }) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 75b0876b38..9360225396 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1,8 +1,8 @@ pub mod chaos_injector; -mod context_iterator; pub mod feature_flag; pub(crate) mod safekeeper_reconciler; mod safekeeper_service; +mod tenant_shard_iterator; use std::borrow::Cow; use std::cmp::Ordering; @@ -16,7 +16,6 @@ use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant, SystemTime}; use anyhow::Context; -use context_iterator::TenantShardContextIterator; use control_plane::storage_controller::{ AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, }; @@ -55,6 +54,7 @@ use pageserver_client::{BlockUnblock, mgmt_api}; use reqwest::{Certificate, StatusCode}; use safekeeper_api::models::SafekeeperUtilization; use safekeeper_reconciler::SafekeeperReconcilers; +use tenant_shard_iterator::{TenantShardExclusiveIterator, create_shared_shard_iterator}; use tokio::sync::TryAcquireError; use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; @@ -68,10 +68,9 @@ use utils::sync::gate::{Gate, GateGuard}; use utils::{failpoint_support, pausable_failpoint}; use crate::background_node_operations::{ - Drain, Fill, MAX_RECONCILES_PER_OPERATION, Operation, OperationError, OperationHandler, + Delete, Drain, Fill, MAX_RECONCILES_PER_OPERATION, Operation, OperationError, OperationHandler, }; use crate::compute_hook::{self, ComputeHook, NotifyError}; -use crate::drain_utils::{self, TenantShardDrain, TenantShardIterator}; use crate::heartbeater::{Heartbeater, PageserverState, SafekeeperState}; use crate::id_lock_map::{ IdLockMap, TracingExclusiveGuard, trace_exclusive_lock, trace_shared_lock, @@ -79,6 +78,7 @@ use crate::id_lock_map::{ use crate::leadership::Leadership; use crate::metrics; use crate::node::{AvailabilityTransition, Node}; +use crate::operation_utils::{self, TenantShardDrain}; use crate::pageserver_client::PageserverClient; use crate::peer_client::GlobalObservedState; use crate::persistence::split_state::SplitState; @@ -105,7 +105,7 @@ use crate::timeline_import::{ TimelineImportFinalizeError, TimelineImportState, UpcallClient, }; -const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500); +const WAITER_OPERATION_POLL_TIMEOUT: Duration = Duration::from_millis(500); // For operations that should be quick, like attaching a new tenant const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5); @@ -581,7 +581,9 @@ impl From for ApiError { impl From for ApiError { fn from(value: OperationError) -> Self { match value { - OperationError::NodeStateChanged(err) | OperationError::FinalizeError(err) => { + OperationError::NodeStateChanged(err) + | OperationError::FinalizeError(err) + | OperationError::ImpossibleConstraint(err) => { ApiError::InternalServerError(anyhow::anyhow!(err)) } OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()), @@ -2414,6 +2416,7 @@ impl Service { NodeSchedulingPolicy::PauseForRestart | NodeSchedulingPolicy::Draining | NodeSchedulingPolicy::Filling + | NodeSchedulingPolicy::Deleting ); let mut new_nodes = (**nodes).clone(); @@ -7055,7 +7058,7 @@ impl Service { /// If a node has any work on it, it will be rescheduled: this is "clean" in the sense /// that we don't leave any bad state behind in the storage controller, but unclean /// in the sense that we are not carefully draining the node. - pub(crate) async fn node_delete(&self, node_id: NodeId) -> Result<(), ApiError> { + pub(crate) async fn node_delete_old(&self, node_id: NodeId) -> Result<(), ApiError> { let _node_lock = trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Delete).await; @@ -7089,7 +7092,7 @@ impl Service { } for (_tenant_id, mut schedule_context, shards) in - TenantShardContextIterator::new(tenants, ScheduleMode::Normal) + TenantShardExclusiveIterator::new(tenants, ScheduleMode::Normal) { for shard in shards { if shard.deref_node(node_id) { @@ -7158,6 +7161,171 @@ impl Service { Ok(()) } + pub(crate) async fn delete_node( + self: &Arc, + node_id: NodeId, + policy_on_start: NodeSchedulingPolicy, + cancel: CancellationToken, + ) -> Result<(), OperationError> { + let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal).build(); + + let mut waiters: Vec = Vec::new(); + let mut tid_iter = create_shared_shard_iterator(self.clone()); + + while !tid_iter.finished() { + if cancel.is_cancelled() { + match self + .node_configure(node_id, None, Some(policy_on_start)) + .await + { + Ok(()) => return Err(OperationError::Cancelled), + Err(err) => { + return Err(OperationError::FinalizeError( + format!( + "Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}", + node_id, String::from(policy_on_start), err + ) + .into(), + )); + } + } + } + + operation_utils::validate_node_state( + &node_id, + self.inner.read().unwrap().nodes.clone(), + NodeSchedulingPolicy::Deleting, + )?; + + while waiters.len() < MAX_RECONCILES_PER_OPERATION { + let tid = match tid_iter.next() { + Some(tid) => tid, + None => { + break; + } + }; + + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, scheduler) = locked.parts_mut(); + + let tenant_shard = match tenants.get_mut(&tid) { + Some(tenant_shard) => tenant_shard, + None => { + // Tenant shard was deleted by another operation. Skip it. + continue; + } + }; + + match tenant_shard.get_scheduling_policy() { + ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => { + // A migration during delete is classed as 'essential' because it is required to + // uphold our availability goals for the tenant: this shard is elegible for migration. + } + ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => { + // If we have been asked to avoid rescheduling this shard, then do not migrate it during a deletion + tracing::warn!( + tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(), + "Skip migration during deletion because shard scheduling policy {:?} disallows it", + tenant_shard.get_scheduling_policy(), + ); + continue; + } + } + + if tenant_shard.deref_node(node_id) { + // TODO(ephemeralsad): we should process all shards in a tenant at once, so + // we can avoid settling the tenant unevenly. + let mut schedule_context = ScheduleContext::new(ScheduleMode::Normal); + if let Err(e) = tenant_shard.schedule(scheduler, &mut schedule_context) { + tracing::error!( + "Refusing to delete node, shard {} can't be rescheduled: {e}", + tenant_shard.tenant_shard_id + ); + return Err(OperationError::ImpossibleConstraint(e.to_string().into())); + } else { + tracing::info!( + "Rescheduled shard {} away from node during deletion", + tenant_shard.tenant_shard_id + ) + } + + let waiter = self.maybe_configured_reconcile_shard( + tenant_shard, + nodes, + reconciler_config, + ); + if let Some(some) = waiter { + waiters.push(some); + } + } + } + + waiters = self + .await_waiters_remainder(waiters, WAITER_OPERATION_POLL_TIMEOUT) + .await; + + failpoint_support::sleep_millis_async!("sleepy-delete-loop", &cancel); + } + + while !waiters.is_empty() { + if cancel.is_cancelled() { + match self + .node_configure(node_id, None, Some(policy_on_start)) + .await + { + Ok(()) => return Err(OperationError::Cancelled), + Err(err) => { + return Err(OperationError::FinalizeError( + format!( + "Failed to finalise drain cancel of {} by setting scheduling policy to {}: {}", + node_id, String::from(policy_on_start), err + ) + .into(), + )); + } + } + } + + tracing::info!("Awaiting {} pending delete reconciliations", waiters.len()); + + waiters = self + .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) + .await; + } + + self.persistence + .set_tombstone(node_id) + .await + .map_err(|e| OperationError::FinalizeError(e.to_string().into()))?; + + { + let mut locked = self.inner.write().unwrap(); + let (nodes, _, scheduler) = locked.parts_mut(); + + scheduler.node_remove(node_id); + + let mut nodes_mut = (**nodes).clone(); + if let Some(mut removed_node) = nodes_mut.remove(&node_id) { + // Ensure that any reconciler holding an Arc<> to this node will + // drop out when trying to RPC to it (setting Offline state sets the + // cancellation token on the Node object). + removed_node.set_availability(NodeAvailability::Offline); + } + *nodes = Arc::new(nodes_mut); + + metrics::METRICS_REGISTRY + .metrics_group + .storage_controller_pageserver_nodes + .set(nodes.len() as i64); + metrics::METRICS_REGISTRY + .metrics_group + .storage_controller_https_pageserver_nodes + .set(nodes.values().filter(|n| n.has_https_port()).count() as i64); + } + + Ok(()) + } + pub(crate) async fn node_list(&self) -> Result, ApiError> { let nodes = { self.inner @@ -7546,7 +7714,7 @@ impl Service { let mut tenants_affected: usize = 0; for (_tenant_id, mut schedule_context, shards) in - TenantShardContextIterator::new(tenants, ScheduleMode::Normal) + TenantShardExclusiveIterator::new(tenants, ScheduleMode::Normal) { for tenant_shard in shards { let tenant_shard_id = tenant_shard.tenant_shard_id; @@ -7717,6 +7885,142 @@ impl Service { self.node_configure(node_id, availability, scheduling).await } + pub(crate) async fn start_node_delete( + self: &Arc, + node_id: NodeId, + ) -> Result<(), ApiError> { + let (ongoing_op, node_policy, schedulable_nodes_count) = { + let locked = self.inner.read().unwrap(); + let nodes = &locked.nodes; + let node = nodes.get(&node_id).ok_or(ApiError::NotFound( + anyhow::anyhow!("Node {} not registered", node_id).into(), + ))?; + let schedulable_nodes_count = nodes + .iter() + .filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_))) + .count(); + + ( + locked + .ongoing_operation + .as_ref() + .map(|ongoing| ongoing.operation), + node.get_scheduling(), + schedulable_nodes_count, + ) + }; + + if let Some(ongoing) = ongoing_op { + return Err(ApiError::PreconditionFailed( + format!("Background operation already ongoing for node: {ongoing}").into(), + )); + } + + if schedulable_nodes_count == 0 { + return Err(ApiError::PreconditionFailed( + "No other schedulable nodes to move shards".into(), + )); + } + + match node_policy { + NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => { + self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Deleting)) + .await?; + + let cancel = self.cancel.child_token(); + let gate_guard = self.gate.enter().map_err(|_| ApiError::ShuttingDown)?; + let policy_on_start = node_policy; + + self.inner.write().unwrap().ongoing_operation = Some(OperationHandler { + operation: Operation::Delete(Delete { node_id }), + cancel: cancel.clone(), + }); + + let span = tracing::info_span!(parent: None, "delete_node", %node_id); + + tokio::task::spawn( + { + let service = self.clone(); + let cancel = cancel.clone(); + async move { + let _gate_guard = gate_guard; + + scopeguard::defer! { + let prev = service.inner.write().unwrap().ongoing_operation.take(); + + if let Some(Operation::Delete(removed_delete)) = prev.map(|h| h.operation) { + assert_eq!(removed_delete.node_id, node_id, "We always take the same operation"); + } else { + panic!("We always remove the same operation") + } + } + + tracing::info!("Delete background operation starting"); + let res = service + .delete_node(node_id, policy_on_start, cancel) + .await; + match res { + Ok(()) => { + tracing::info!( + "Delete background operation completed successfully" + ); + } + Err(OperationError::Cancelled) => { + tracing::info!("Delete background operation was cancelled"); + } + Err(err) => { + tracing::error!( + "Delete background operation encountered: {err}" + ) + } + } + } + } + .instrument(span), + ); + } + NodeSchedulingPolicy::Deleting => { + return Err(ApiError::Conflict(format!( + "Node {node_id} has delete in progress" + ))); + } + policy => { + return Err(ApiError::PreconditionFailed( + format!("Node {node_id} cannot be deleted due to {policy:?} policy").into(), + )); + } + } + + Ok(()) + } + + pub(crate) async fn cancel_node_delete( + self: &Arc, + node_id: NodeId, + ) -> Result<(), ApiError> { + { + let locked = self.inner.read().unwrap(); + let nodes = &locked.nodes; + nodes.get(&node_id).ok_or(ApiError::NotFound( + anyhow::anyhow!("Node {} not registered", node_id).into(), + ))?; + } + + if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() { + if let Operation::Delete(delete) = op_handler.operation { + if delete.node_id == node_id { + tracing::info!("Cancelling background delete operation for node {node_id}"); + op_handler.cancel.cancel(); + return Ok(()); + } + } + } + + Err(ApiError::PreconditionFailed( + format!("Node {node_id} has no delete in progress").into(), + )) + } + pub(crate) async fn start_node_drain( self: &Arc, node_id: NodeId, @@ -8293,7 +8597,7 @@ impl Service { // to ignore the utilisation component of the score. for (_tenant_id, schedule_context, shards) in - TenantShardContextIterator::new(tenants, ScheduleMode::Speculative) + TenantShardExclusiveIterator::new(tenants, ScheduleMode::Speculative) { for shard in shards { if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { @@ -9020,25 +9324,7 @@ impl Service { let mut waiters = Vec::new(); - let mut tid_iter = TenantShardIterator::new({ - let service = self.clone(); - move |last_inspected_shard: Option| { - let locked = &service.inner.read().unwrap(); - let tenants = &locked.tenants; - let entry = match last_inspected_shard { - Some(skip_past) => { - // Skip to the last seen tenant shard id - let mut cursor = tenants.iter().skip_while(|(tid, _)| **tid != skip_past); - - // Skip past the last seen - cursor.nth(1) - } - None => tenants.first_key_value(), - }; - - entry.map(|(tid, _)| tid).copied() - } - }); + let mut tid_iter = create_shared_shard_iterator(self.clone()); while !tid_iter.finished() { if cancel.is_cancelled() { @@ -9058,7 +9344,11 @@ impl Service { } } - drain_utils::validate_node_state(&node_id, self.inner.read().unwrap().nodes.clone())?; + operation_utils::validate_node_state( + &node_id, + self.inner.read().unwrap().nodes.clone(), + NodeSchedulingPolicy::Draining, + )?; while waiters.len() < MAX_RECONCILES_PER_OPERATION { let tid = match tid_iter.next() { @@ -9138,7 +9428,7 @@ impl Service { } waiters = self - .await_waiters_remainder(waiters, WAITER_FILL_DRAIN_POLL_TIMEOUT) + .await_waiters_remainder(waiters, WAITER_OPERATION_POLL_TIMEOUT) .await; failpoint_support::sleep_millis_async!("sleepy-drain-loop", &cancel); @@ -9432,7 +9722,7 @@ impl Service { } waiters = self - .await_waiters_remainder(waiters, WAITER_FILL_DRAIN_POLL_TIMEOUT) + .await_waiters_remainder(waiters, WAITER_OPERATION_POLL_TIMEOUT) .await; } diff --git a/storage_controller/src/service/context_iterator.rs b/storage_controller/src/service/tenant_shard_iterator.rs similarity index 52% rename from storage_controller/src/service/context_iterator.rs rename to storage_controller/src/service/tenant_shard_iterator.rs index c4784e5e36..576b94b3a4 100644 --- a/storage_controller/src/service/context_iterator.rs +++ b/storage_controller/src/service/tenant_shard_iterator.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::sync::Arc; use utils::id::TenantId; use utils::shard::TenantShardId; @@ -6,16 +7,21 @@ use utils::shard::TenantShardId; use crate::scheduler::{ScheduleContext, ScheduleMode}; use crate::tenant_shard::TenantShard; +use super::Service; + +/// Exclusive iterator over all tenant shards. +/// It is used to iterate over consistent tenants state at specific point in time. +/// /// When making scheduling decisions, it is useful to have the ScheduleContext for a whole /// tenant while considering the individual shards within it. This iterator is a helper /// that gathers all the shards in a tenant and then yields them together with a ScheduleContext /// for the tenant. -pub(super) struct TenantShardContextIterator<'a> { +pub(super) struct TenantShardExclusiveIterator<'a> { schedule_mode: ScheduleMode, inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>, } -impl<'a> TenantShardContextIterator<'a> { +impl<'a> TenantShardExclusiveIterator<'a> { pub(super) fn new( tenants: &'a mut BTreeMap, schedule_mode: ScheduleMode, @@ -27,7 +33,7 @@ impl<'a> TenantShardContextIterator<'a> { } } -impl<'a> Iterator for TenantShardContextIterator<'a> { +impl<'a> Iterator for TenantShardExclusiveIterator<'a> { type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>); fn next(&mut self) -> Option { @@ -52,13 +58,93 @@ impl<'a> Iterator for TenantShardContextIterator<'a> { } } +/// Shared iterator over all tenant shards. +/// It is used to iterate over all tenants without blocking another code, working with tenants +/// +/// A simple iterator which can be used in tandem with [`crate::service::Service`] +/// to iterate over all known tenant shard ids without holding the lock on the +/// service state at all times. +pub(crate) struct TenantShardSharedIterator { + tenants_accessor: F, + inspected_all_shards: bool, + last_inspected_shard: Option, +} + +impl TenantShardSharedIterator +where + F: Fn(Option) -> Option, +{ + pub(crate) fn new(tenants_accessor: F) -> Self { + Self { + tenants_accessor, + inspected_all_shards: false, + last_inspected_shard: None, + } + } + + pub(crate) fn finished(&self) -> bool { + self.inspected_all_shards + } +} + +impl Iterator for TenantShardSharedIterator +where + F: Fn(Option) -> Option, +{ + // TODO(ephemeralsad): consider adding schedule context to the iterator + type Item = TenantShardId; + + /// Returns the next tenant shard id if one exists + fn next(&mut self) -> Option { + if self.inspected_all_shards { + return None; + } + + match (self.tenants_accessor)(self.last_inspected_shard) { + Some(tid) => { + self.last_inspected_shard = Some(tid); + Some(tid) + } + None => { + self.inspected_all_shards = true; + None + } + } + } +} + +pub(crate) fn create_shared_shard_iterator( + service: Arc, +) -> TenantShardSharedIterator) -> Option> { + let tenants_accessor = move |last_inspected_shard: Option| { + let locked = &service.inner.read().unwrap(); + let tenants = &locked.tenants; + let entry = match last_inspected_shard { + Some(skip_past) => { + // Skip to the last seen tenant shard id + let mut cursor = tenants.iter().skip_while(|(tid, _)| **tid != skip_past); + + // Skip past the last seen + cursor.nth(1) + } + None => tenants.first_key_value(), + }; + + entry.map(|(tid, _)| tid).copied() + }; + + TenantShardSharedIterator::new(tenants_accessor) +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; use std::str::FromStr; + use std::sync::Arc; use pageserver_api::controller_api::PlacementPolicy; - use utils::shard::{ShardCount, ShardNumber}; + use utils::id::TenantId; + use utils::shard::{ShardCount, ShardNumber, TenantShardId}; use super::*; use crate::scheduler::test_utils::make_test_nodes; @@ -66,7 +152,7 @@ mod tests { use crate::tenant_shard::tests::make_test_tenant_with_id; #[test] - fn test_context_iterator() { + fn test_exclusive_shard_iterator() { // Hand-crafted tenant IDs to ensure they appear in the expected order when put into // a btreemap & iterated let mut t_1_shards = make_test_tenant_with_id( @@ -106,7 +192,7 @@ mod tests { shard.schedule(&mut scheduler, &mut context).unwrap(); } - let mut iter = TenantShardContextIterator::new(&mut tenants, ScheduleMode::Speculative); + let mut iter = TenantShardExclusiveIterator::new(&mut tenants, ScheduleMode::Speculative); let (tenant_id, context, shards) = iter.next().unwrap(); assert_eq!(tenant_id, t1_id); assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); @@ -132,4 +218,46 @@ mod tests { shard.intent.clear(&mut scheduler); } } + + #[test] + fn test_shared_shard_iterator() { + let tenant_id = TenantId::generate(); + let shard_count = ShardCount(8); + + let mut tenant_shards = Vec::default(); + for i in 0..shard_count.0 { + tenant_shards.push(( + TenantShardId { + tenant_id, + shard_number: ShardNumber(i), + shard_count, + }, + (), + )) + } + + let tenant_shards = Arc::new(tenant_shards); + + let tid_iter = TenantShardSharedIterator::new({ + let tenants = tenant_shards.clone(); + move |last_inspected_shard: Option| { + let entry = match last_inspected_shard { + Some(skip_past) => { + let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past); + cursor.nth(1) + } + None => tenants.first(), + }; + + entry.map(|(tid, _)| tid).copied() + } + }); + + let mut iterated_over = Vec::default(); + for tid in tid_iter { + iterated_over.push((tid, ())); + } + + assert_eq!(iterated_over, *tenant_shards); + } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 2031ec132e..52ff977162 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1867,6 +1867,7 @@ class PageserverSchedulingPolicy(StrEnum): FILLING = "Filling" PAUSE = "Pause" PAUSE_FOR_RESTART = "PauseForRestart" + DELETING = "Deleting" class StorageControllerLeadershipStatus(StrEnum): @@ -2075,14 +2076,30 @@ class NeonStorageController(MetricsGetter, LogUtils): headers=self.headers(TokenScope.ADMIN), ) - def node_delete(self, node_id): - log.info(f"node_delete({node_id})") + def node_delete_old(self, node_id): + log.info(f"node_delete_old({node_id})") self.request( "DELETE", f"{self.api}/control/v1/node/{node_id}", headers=self.headers(TokenScope.ADMIN), ) + def node_delete(self, node_id): + log.info(f"node_delete({node_id})") + self.request( + "PUT", + f"{self.api}/control/v1/node/{node_id}/delete", + headers=self.headers(TokenScope.ADMIN), + ) + + def cancel_node_delete(self, node_id): + log.info(f"cancel_node_delete({node_id})") + self.request( + "DELETE", + f"{self.api}/control/v1/node/{node_id}/delete", + headers=self.headers(TokenScope.ADMIN), + ) + def tombstone_delete(self, node_id): log.info(f"tombstone_delete({node_id})") self.request( diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index c644ff569e..8471ab9f57 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -2618,7 +2618,7 @@ def test_storage_controller_node_deletion( wait_until(assert_shards_migrated) log.info(f"Deleting pageserver {victim.id}") - env.storage_controller.node_delete(victim.id) + env.storage_controller.node_delete_old(victim.id) if not while_offline: @@ -2653,6 +2653,60 @@ def test_storage_controller_node_deletion( env.storage_controller.consistency_check() +def test_storage_controller_node_delete_cancellation(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_pageservers = 3 + neon_env_builder.num_azs = 3 + env = neon_env_builder.init_configs() + env.start() + + tenant_count = 12 + shard_count_per_tenant = 16 + tenant_ids = [] + + for _ in range(0, tenant_count): + tid = TenantId.generate() + tenant_ids.append(tid) + env.create_tenant( + tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant + ) + + # Sanity check: initial creations should not leave the system in an unstable scheduling state + assert env.storage_controller.reconcile_all() == 0 + + nodes = env.storage_controller.node_list() + assert len(nodes) == 3 + + env.storage_controller.configure_failpoints(("sleepy-delete-loop", "return(10000)")) + + ps_id_to_delete = env.pageservers[0].id + + env.storage_controller.warm_up_all_secondaries() + env.storage_controller.retryable_node_operation( + lambda ps_id: env.storage_controller.node_delete(ps_id), + ps_id_to_delete, + max_attempts=3, + backoff=2, + ) + + env.storage_controller.poll_node_status( + ps_id_to_delete, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.DELETING, + max_attempts=6, + backoff=2, + ) + + env.storage_controller.cancel_node_delete(ps_id_to_delete) + + env.storage_controller.poll_node_status( + ps_id_to_delete, + PageserverAvailability.ACTIVE, + PageserverSchedulingPolicy.ACTIVE, + max_attempts=6, + backoff=2, + ) + + @pytest.mark.parametrize("shard_count", [None, 2]) def test_storage_controller_metadata_health( neon_env_builder: NeonEnvBuilder, @@ -3208,7 +3262,7 @@ def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder): assert_nodes_count(3) ps = env.pageservers[0] - env.storage_controller.node_delete(ps.id) + env.storage_controller.node_delete_old(ps.id) # After deletion, the node count must be reduced assert_nodes_count(2)