storcon: Make node deletion process cancellable (#12320)

## Problem

The current deletion operation is synchronous and blocking, which is
unsuitable for potentially long-running tasks like. In such cases, the
standard HTTP request-response pattern is not a good fit.

## Summary of Changes

- Added new `storcon_cli` commands: `NodeStartDelete` and
`NodeCancelDelete` to initiate and cancel deletion asynchronously.
- Added corresponding `storcon` HTTP handlers to support the new
start/cancel deletion flow.
- Introduced a new type of background operation: `Delete`, to track and
manage the deletion process outside the request lifecycle.

---------

Co-authored-by: Aleksandr Sarantsev <aleksandr.sarantsev@databricks.com>
This commit is contained in:
Aleksandr Sarantsev
2025-07-04 18:08:09 +04:00
committed by GitHub
parent 225267b3ae
commit b2705cfee6
12 changed files with 698 additions and 172 deletions

View File

@@ -65,12 +65,27 @@ enum Command {
#[arg(long)] #[arg(long)]
scheduling: Option<NodeSchedulingPolicy>, scheduling: Option<NodeSchedulingPolicy>,
}, },
// Set a node status as deleted. /// Exists for backup usage and will be removed in future.
/// Use [`Command::NodeStartDelete`] instead, if possible.
NodeDelete { NodeDelete {
#[arg(long)] #[arg(long)]
node_id: NodeId, node_id: NodeId,
}, },
/// Start deletion of the specified pageserver.
NodeStartDelete {
#[arg(long)]
node_id: NodeId,
},
/// Cancel deletion of the specified pageserver and wait for `timeout`
/// for the operation to be canceled. May be retried.
NodeCancelDelete {
#[arg(long)]
node_id: NodeId,
#[arg(long)]
timeout: humantime::Duration,
},
/// Delete a tombstone of node from the storage controller. /// Delete a tombstone of node from the storage controller.
/// This is used when we want to allow the node to be re-registered.
NodeDeleteTombstone { NodeDeleteTombstone {
#[arg(long)] #[arg(long)]
node_id: NodeId, node_id: NodeId,
@@ -912,10 +927,43 @@ async fn main() -> anyhow::Result<()> {
.await?; .await?;
} }
Command::NodeDelete { node_id } => { Command::NodeDelete { node_id } => {
eprintln!("Warning: This command is obsolete and will be removed in a future version");
eprintln!("Use `NodeStartDelete` instead, if possible");
storcon_client storcon_client
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None) .dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
.await?; .await?;
} }
Command::NodeStartDelete { node_id } => {
storcon_client
.dispatch::<(), ()>(
Method::PUT,
format!("control/v1/node/{node_id}/delete"),
None,
)
.await?;
println!("Delete started for {node_id}");
}
Command::NodeCancelDelete { node_id, timeout } => {
storcon_client
.dispatch::<(), ()>(
Method::DELETE,
format!("control/v1/node/{node_id}/delete"),
None,
)
.await?;
println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
let final_policy =
wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
!matches!(sched, NodeSchedulingPolicy::Deleting)
})
.await?;
println!(
"Delete was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
);
}
Command::NodeDeleteTombstone { node_id } => { Command::NodeDeleteTombstone { node_id } => {
storcon_client storcon_client
.dispatch::<(), ()>( .dispatch::<(), ()>(

View File

@@ -386,6 +386,7 @@ pub enum NodeSchedulingPolicy {
Pause, Pause,
PauseForRestart, PauseForRestart,
Draining, Draining,
Deleting,
} }
impl FromStr for NodeSchedulingPolicy { impl FromStr for NodeSchedulingPolicy {
@@ -398,6 +399,7 @@ impl FromStr for NodeSchedulingPolicy {
"pause" => Ok(Self::Pause), "pause" => Ok(Self::Pause),
"pause_for_restart" => Ok(Self::PauseForRestart), "pause_for_restart" => Ok(Self::PauseForRestart),
"draining" => Ok(Self::Draining), "draining" => Ok(Self::Draining),
"deleting" => Ok(Self::Deleting),
_ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")), _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
} }
} }
@@ -412,6 +414,7 @@ impl From<NodeSchedulingPolicy> for String {
Pause => "pause", Pause => "pause",
PauseForRestart => "pause_for_restart", PauseForRestart => "pause_for_restart",
Draining => "draining", Draining => "draining",
Deleting => "deleting",
} }
.to_string() .to_string()
} }

View File

@@ -6,6 +6,11 @@ use utils::id::NodeId;
pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 64; pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 64;
#[derive(Copy, Clone)]
pub(crate) struct Delete {
pub(crate) node_id: NodeId,
}
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub(crate) struct Drain { pub(crate) struct Drain {
pub(crate) node_id: NodeId, pub(crate) node_id: NodeId,
@@ -18,6 +23,7 @@ pub(crate) struct Fill {
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub(crate) enum Operation { pub(crate) enum Operation {
Delete(Delete),
Drain(Drain), Drain(Drain),
Fill(Fill), Fill(Fill),
} }
@@ -30,6 +36,8 @@ pub(crate) enum OperationError {
FinalizeError(Cow<'static, str>), FinalizeError(Cow<'static, str>),
#[error("Operation cancelled")] #[error("Operation cancelled")]
Cancelled, Cancelled,
#[error("Impossible constraint error: {0}")]
ImpossibleConstraint(Cow<'static, str>),
} }
pub(crate) struct OperationHandler { pub(crate) struct OperationHandler {
@@ -38,6 +46,12 @@ pub(crate) struct OperationHandler {
pub(crate) cancel: CancellationToken, pub(crate) cancel: CancellationToken,
} }
impl Display for Delete {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "delete {}", self.node_id)
}
}
impl Display for Drain { impl Display for Drain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "drain {}", self.node_id) write!(f, "drain {}", self.node_id)
@@ -53,6 +67,7 @@ impl Display for Fill {
impl Display for Operation { impl Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self { match self {
Operation::Delete(op) => write!(f, "{op}"),
Operation::Drain(op) => write!(f, "{op}"), Operation::Drain(op) => write!(f, "{op}"),
Operation::Fill(op) => write!(f, "{op}"), Operation::Fill(op) => write!(f, "{op}"),
} }

View File

@@ -919,7 +919,7 @@ async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError
json_response(StatusCode::OK, state.service.node_drop(node_id).await?) json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
} }
async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> { async fn handle_node_delete_old(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?; check_permissions(&req, Scope::Admin)?;
let req = match maybe_forward(req).await { let req = match maybe_forward(req).await {
@@ -931,7 +931,10 @@ async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiErr
let state = get_state(&req); let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?; let node_id: NodeId = parse_request_param(&req, "node_id")?;
json_response(StatusCode::OK, state.service.node_delete(node_id).await?) json_response(
StatusCode::OK,
state.service.node_delete_old(node_id).await?,
)
} }
async fn handle_tombstone_list(req: Request<Body>) -> Result<Response<Body>, ApiError> { async fn handle_tombstone_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -1051,6 +1054,42 @@ async fn handle_get_leader(req: Request<Body>) -> Result<Response<Body>, ApiErro
json_response(StatusCode::OK, leader) json_response(StatusCode::OK, leader)
} }
async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
json_response(
StatusCode::OK,
state.service.start_node_delete(node_id).await?,
)
}
async fn handle_cancel_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Infra)?;
let req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
json_response(
StatusCode::ACCEPTED,
state.service.cancel_node_delete(node_id).await?,
)
}
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> { async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Infra)?; check_permissions(&req, Scope::Infra)?;
@@ -2221,8 +2260,14 @@ pub fn make_router(
.post("/control/v1/node", |r| { .post("/control/v1/node", |r| {
named_request_span(r, handle_node_register, RequestName("control_v1_node")) named_request_span(r, handle_node_register, RequestName("control_v1_node"))
}) })
// This endpoint is deprecated and will be removed in a future version.
// Use PUT /control/v1/node/:node_id/delete instead.
.delete("/control/v1/node/:node_id", |r| { .delete("/control/v1/node/:node_id", |r| {
named_request_span(r, handle_node_delete, RequestName("control_v1_node_delete")) named_request_span(
r,
handle_node_delete_old,
RequestName("control_v1_node_delete"),
)
}) })
.get("/control/v1/node", |r| { .get("/control/v1/node", |r| {
named_request_span(r, handle_node_list, RequestName("control_v1_node")) named_request_span(r, handle_node_list, RequestName("control_v1_node"))
@@ -2247,6 +2292,20 @@ pub fn make_router(
.get("/control/v1/leader", |r| { .get("/control/v1/leader", |r| {
named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader")) named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader"))
}) })
.put("/control/v1/node/:node_id/delete", |r| {
named_request_span(
r,
handle_node_delete,
RequestName("control_v1_start_node_delete"),
)
})
.delete("/control/v1/node/:node_id/delete", |r| {
named_request_span(
r,
handle_cancel_node_delete,
RequestName("control_v1_cancel_node_delete"),
)
})
.put("/control/v1/node/:node_id/drain", |r| { .put("/control/v1/node/:node_id/drain", |r| {
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain")) named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
}) })

View File

@@ -6,13 +6,13 @@ extern crate hyper0 as hyper;
mod auth; mod auth;
mod background_node_operations; mod background_node_operations;
mod compute_hook; mod compute_hook;
mod drain_utils;
mod heartbeater; mod heartbeater;
pub mod http; pub mod http;
mod id_lock_map; mod id_lock_map;
mod leadership; mod leadership;
pub mod metrics; pub mod metrics;
mod node; mod node;
mod operation_utils;
mod pageserver_client; mod pageserver_client;
mod peer_client; mod peer_client;
pub mod persistence; pub mod persistence;

View File

@@ -201,6 +201,7 @@ impl Node {
match self.scheduling { match self.scheduling {
NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization), NodeSchedulingPolicy::Active => MaySchedule::Yes(utilization),
NodeSchedulingPolicy::Deleting => MaySchedule::No,
NodeSchedulingPolicy::Draining => MaySchedule::No, NodeSchedulingPolicy::Draining => MaySchedule::No,
NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization), NodeSchedulingPolicy::Filling => MaySchedule::Yes(utilization),
NodeSchedulingPolicy::Pause => MaySchedule::No, NodeSchedulingPolicy::Pause => MaySchedule::No,

View File

@@ -10,63 +10,19 @@ use crate::node::Node;
use crate::scheduler::Scheduler; use crate::scheduler::Scheduler;
use crate::tenant_shard::TenantShard; use crate::tenant_shard::TenantShard;
pub(crate) struct TenantShardIterator<F> {
tenants_accessor: F,
inspected_all_shards: bool,
last_inspected_shard: Option<TenantShardId>,
}
/// A simple iterator which can be used in tandem with [`crate::service::Service`]
/// to iterate over all known tenant shard ids without holding the lock on the
/// service state at all times.
impl<F> TenantShardIterator<F>
where
F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
{
pub(crate) fn new(tenants_accessor: F) -> Self {
Self {
tenants_accessor,
inspected_all_shards: false,
last_inspected_shard: None,
}
}
/// Returns the next tenant shard id if one exists
pub(crate) fn next(&mut self) -> Option<TenantShardId> {
if self.inspected_all_shards {
return None;
}
match (self.tenants_accessor)(self.last_inspected_shard) {
Some(tid) => {
self.last_inspected_shard = Some(tid);
Some(tid)
}
None => {
self.inspected_all_shards = true;
None
}
}
}
/// Returns true when the end of the iterator is reached and false otherwise
pub(crate) fn finished(&self) -> bool {
self.inspected_all_shards
}
}
/// Check that the state of the node being drained is as expected: /// Check that the state of the node being drained is as expected:
/// node is present in memory and scheduling policy is set to [`NodeSchedulingPolicy::Draining`] /// node is present in memory and scheduling policy is set to expected_policy
pub(crate) fn validate_node_state( pub(crate) fn validate_node_state(
node_id: &NodeId, node_id: &NodeId,
nodes: Arc<HashMap<NodeId, Node>>, nodes: Arc<HashMap<NodeId, Node>>,
expected_policy: NodeSchedulingPolicy,
) -> Result<(), OperationError> { ) -> Result<(), OperationError> {
let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged( let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
format!("node {node_id} was removed").into(), format!("node {node_id} was removed").into(),
))?; ))?;
let current_policy = node.get_scheduling(); let current_policy = node.get_scheduling();
if !matches!(current_policy, NodeSchedulingPolicy::Draining) { if current_policy != expected_policy {
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
// about it // about it
return Err(OperationError::NodeStateChanged( return Err(OperationError::NodeStateChanged(
@@ -182,55 +138,3 @@ impl TenantShardDrain {
} }
} }
} }
#[cfg(test)]
mod tests {
use std::sync::Arc;
use utils::id::TenantId;
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
use super::TenantShardIterator;
#[test]
fn test_tenant_shard_iterator() {
let tenant_id = TenantId::generate();
let shard_count = ShardCount(8);
let mut tenant_shards = Vec::default();
for i in 0..shard_count.0 {
tenant_shards.push((
TenantShardId {
tenant_id,
shard_number: ShardNumber(i),
shard_count,
},
(),
))
}
let tenant_shards = Arc::new(tenant_shards);
let mut tid_iter = TenantShardIterator::new({
let tenants = tenant_shards.clone();
move |last_inspected_shard: Option<TenantShardId>| {
let entry = match last_inspected_shard {
Some(skip_past) => {
let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
cursor.nth(1)
}
None => tenants.first(),
};
entry.map(|(tid, _)| tid).copied()
}
});
let mut iterated_over = Vec::default();
while let Some(tid) = tid_iter.next() {
iterated_over.push((tid, ()));
}
assert_eq!(iterated_over, *tenant_shards);
}
}

View File

@@ -635,18 +635,23 @@ impl Persistence {
let updated = self let updated = self
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| { .with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
Box::pin(async move { Box::pin(async move {
// Check if the node is not marked as deleted let node: Option<NodePersistence> = nodes
let deleted_node: i64 = nodes
.filter(node_id.eq(input_node_id.0 as i64)) .filter(node_id.eq(input_node_id.0 as i64))
.filter(lifecycle.eq(String::from(NodeLifecycle::Deleted))) .first::<NodePersistence>(conn)
.count() .await
.get_result(conn) .optional()?;
.await?;
if deleted_node > 0 { // Check if the node is not marked as deleted
return Err(DatabaseError::Logical(format!( match node {
"Node {input_node_id} is marked as deleted, re-attach is not allowed" Some(node) if matches!(NodeLifecycle::from_str(&node.lifecycle), Ok(NodeLifecycle::Deleted)) => {
))); return Err(DatabaseError::Logical(format!(
} "Node {input_node_id} is marked as deleted, re-attach is not allowed"
)));
}
_ => {
// go through
}
};
let rows_updated = diesel::update(tenant_shards) let rows_updated = diesel::update(tenant_shards)
.filter(generation_pageserver.eq(input_node_id.0 as i64)) .filter(generation_pageserver.eq(input_node_id.0 as i64))
@@ -664,21 +669,23 @@ impl Persistence {
.load(conn) .load(conn)
.await?; .await?;
// If the node went through a drain and restart phase before re-attaching, if let Some(node) = node {
// then reset it's node scheduling policy to active. let old_scheduling_policy =
diesel::update(nodes) NodeSchedulingPolicy::from_str(&node.scheduling_policy).unwrap();
.filter(node_id.eq(input_node_id.0 as i64)) let new_scheduling_policy = match old_scheduling_policy {
.filter( NodeSchedulingPolicy::Active => NodeSchedulingPolicy::Active,
scheduling_policy NodeSchedulingPolicy::PauseForRestart => NodeSchedulingPolicy::Active,
.eq(String::from(NodeSchedulingPolicy::PauseForRestart)) NodeSchedulingPolicy::Draining => NodeSchedulingPolicy::Active,
.or(scheduling_policy NodeSchedulingPolicy::Filling => NodeSchedulingPolicy::Active,
.eq(String::from(NodeSchedulingPolicy::Draining))) NodeSchedulingPolicy::Pause => NodeSchedulingPolicy::Pause,
.or(scheduling_policy NodeSchedulingPolicy::Deleting => NodeSchedulingPolicy::Pause,
.eq(String::from(NodeSchedulingPolicy::Filling))), };
) diesel::update(nodes)
.set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active))) .filter(node_id.eq(input_node_id.0 as i64))
.execute(conn) .set(scheduling_policy.eq(String::from(new_scheduling_policy)))
.await?; .execute(conn)
.await?;
}
Ok(updated) Ok(updated)
}) })

View File

@@ -1,8 +1,8 @@
pub mod chaos_injector; pub mod chaos_injector;
mod context_iterator;
pub mod feature_flag; pub mod feature_flag;
pub(crate) mod safekeeper_reconciler; pub(crate) mod safekeeper_reconciler;
mod safekeeper_service; mod safekeeper_service;
mod tenant_shard_iterator;
use std::borrow::Cow; use std::borrow::Cow;
use std::cmp::Ordering; use std::cmp::Ordering;
@@ -16,7 +16,6 @@ use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant, SystemTime};
use anyhow::Context; use anyhow::Context;
use context_iterator::TenantShardContextIterator;
use control_plane::storage_controller::{ use control_plane::storage_controller::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
}; };
@@ -55,6 +54,7 @@ use pageserver_client::{BlockUnblock, mgmt_api};
use reqwest::{Certificate, StatusCode}; use reqwest::{Certificate, StatusCode};
use safekeeper_api::models::SafekeeperUtilization; use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_reconciler::SafekeeperReconcilers; use safekeeper_reconciler::SafekeeperReconcilers;
use tenant_shard_iterator::{TenantShardExclusiveIterator, create_shared_shard_iterator};
use tokio::sync::TryAcquireError; use tokio::sync::TryAcquireError;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -68,10 +68,9 @@ use utils::sync::gate::{Gate, GateGuard};
use utils::{failpoint_support, pausable_failpoint}; use utils::{failpoint_support, pausable_failpoint};
use crate::background_node_operations::{ use crate::background_node_operations::{
Drain, Fill, MAX_RECONCILES_PER_OPERATION, Operation, OperationError, OperationHandler, Delete, Drain, Fill, MAX_RECONCILES_PER_OPERATION, Operation, OperationError, OperationHandler,
}; };
use crate::compute_hook::{self, ComputeHook, NotifyError}; use crate::compute_hook::{self, ComputeHook, NotifyError};
use crate::drain_utils::{self, TenantShardDrain, TenantShardIterator};
use crate::heartbeater::{Heartbeater, PageserverState, SafekeeperState}; use crate::heartbeater::{Heartbeater, PageserverState, SafekeeperState};
use crate::id_lock_map::{ use crate::id_lock_map::{
IdLockMap, TracingExclusiveGuard, trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard, trace_exclusive_lock, trace_shared_lock,
@@ -79,6 +78,7 @@ use crate::id_lock_map::{
use crate::leadership::Leadership; use crate::leadership::Leadership;
use crate::metrics; use crate::metrics;
use crate::node::{AvailabilityTransition, Node}; use crate::node::{AvailabilityTransition, Node};
use crate::operation_utils::{self, TenantShardDrain};
use crate::pageserver_client::PageserverClient; use crate::pageserver_client::PageserverClient;
use crate::peer_client::GlobalObservedState; use crate::peer_client::GlobalObservedState;
use crate::persistence::split_state::SplitState; use crate::persistence::split_state::SplitState;
@@ -105,7 +105,7 @@ use crate::timeline_import::{
TimelineImportFinalizeError, TimelineImportState, UpcallClient, TimelineImportFinalizeError, TimelineImportState, UpcallClient,
}; };
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500); const WAITER_OPERATION_POLL_TIMEOUT: Duration = Duration::from_millis(500);
// For operations that should be quick, like attaching a new tenant // For operations that should be quick, like attaching a new tenant
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5); const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
@@ -581,7 +581,9 @@ impl From<ReconcileWaitError> for ApiError {
impl From<OperationError> for ApiError { impl From<OperationError> for ApiError {
fn from(value: OperationError) -> Self { fn from(value: OperationError) -> Self {
match value { match value {
OperationError::NodeStateChanged(err) | OperationError::FinalizeError(err) => { OperationError::NodeStateChanged(err)
| OperationError::FinalizeError(err)
| OperationError::ImpossibleConstraint(err) => {
ApiError::InternalServerError(anyhow::anyhow!(err)) ApiError::InternalServerError(anyhow::anyhow!(err))
} }
OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()), OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()),
@@ -2414,6 +2416,7 @@ impl Service {
NodeSchedulingPolicy::PauseForRestart NodeSchedulingPolicy::PauseForRestart
| NodeSchedulingPolicy::Draining | NodeSchedulingPolicy::Draining
| NodeSchedulingPolicy::Filling | NodeSchedulingPolicy::Filling
| NodeSchedulingPolicy::Deleting
); );
let mut new_nodes = (**nodes).clone(); let mut new_nodes = (**nodes).clone();
@@ -7055,7 +7058,7 @@ impl Service {
/// If a node has any work on it, it will be rescheduled: this is "clean" in the sense /// If a node has any work on it, it will be rescheduled: this is "clean" in the sense
/// that we don't leave any bad state behind in the storage controller, but unclean /// that we don't leave any bad state behind in the storage controller, but unclean
/// in the sense that we are not carefully draining the node. /// in the sense that we are not carefully draining the node.
pub(crate) async fn node_delete(&self, node_id: NodeId) -> Result<(), ApiError> { pub(crate) async fn node_delete_old(&self, node_id: NodeId) -> Result<(), ApiError> {
let _node_lock = let _node_lock =
trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Delete).await; trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Delete).await;
@@ -7089,7 +7092,7 @@ impl Service {
} }
for (_tenant_id, mut schedule_context, shards) in for (_tenant_id, mut schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Normal) TenantShardExclusiveIterator::new(tenants, ScheduleMode::Normal)
{ {
for shard in shards { for shard in shards {
if shard.deref_node(node_id) { if shard.deref_node(node_id) {
@@ -7158,6 +7161,171 @@ impl Service {
Ok(()) Ok(())
} }
pub(crate) async fn delete_node(
self: &Arc<Self>,
node_id: NodeId,
policy_on_start: NodeSchedulingPolicy,
cancel: CancellationToken,
) -> Result<(), OperationError> {
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal).build();
let mut waiters: Vec<ReconcilerWaiter> = Vec::new();
let mut tid_iter = create_shared_shard_iterator(self.clone());
while !tid_iter.finished() {
if cancel.is_cancelled() {
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => return Err(OperationError::Cancelled),
Err(err) => {
return Err(OperationError::FinalizeError(
format!(
"Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
));
}
}
}
operation_utils::validate_node_state(
&node_id,
self.inner.read().unwrap().nodes.clone(),
NodeSchedulingPolicy::Deleting,
)?;
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
let tid = match tid_iter.next() {
Some(tid) => tid,
None => {
break;
}
};
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let tenant_shard = match tenants.get_mut(&tid) {
Some(tenant_shard) => tenant_shard,
None => {
// Tenant shard was deleted by another operation. Skip it.
continue;
}
};
match tenant_shard.get_scheduling_policy() {
ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {
// A migration during delete is classed as 'essential' because it is required to
// uphold our availability goals for the tenant: this shard is elegible for migration.
}
ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
// If we have been asked to avoid rescheduling this shard, then do not migrate it during a deletion
tracing::warn!(
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
"Skip migration during deletion because shard scheduling policy {:?} disallows it",
tenant_shard.get_scheduling_policy(),
);
continue;
}
}
if tenant_shard.deref_node(node_id) {
// TODO(ephemeralsad): we should process all shards in a tenant at once, so
// we can avoid settling the tenant unevenly.
let mut schedule_context = ScheduleContext::new(ScheduleMode::Normal);
if let Err(e) = tenant_shard.schedule(scheduler, &mut schedule_context) {
tracing::error!(
"Refusing to delete node, shard {} can't be rescheduled: {e}",
tenant_shard.tenant_shard_id
);
return Err(OperationError::ImpossibleConstraint(e.to_string().into()));
} else {
tracing::info!(
"Rescheduled shard {} away from node during deletion",
tenant_shard.tenant_shard_id
)
}
let waiter = self.maybe_configured_reconcile_shard(
tenant_shard,
nodes,
reconciler_config,
);
if let Some(some) = waiter {
waiters.push(some);
}
}
}
waiters = self
.await_waiters_remainder(waiters, WAITER_OPERATION_POLL_TIMEOUT)
.await;
failpoint_support::sleep_millis_async!("sleepy-delete-loop", &cancel);
}
while !waiters.is_empty() {
if cancel.is_cancelled() {
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => return Err(OperationError::Cancelled),
Err(err) => {
return Err(OperationError::FinalizeError(
format!(
"Failed to finalise drain cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
));
}
}
}
tracing::info!("Awaiting {} pending delete reconciliations", waiters.len());
waiters = self
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
.await;
}
self.persistence
.set_tombstone(node_id)
.await
.map_err(|e| OperationError::FinalizeError(e.to_string().into()))?;
{
let mut locked = self.inner.write().unwrap();
let (nodes, _, scheduler) = locked.parts_mut();
scheduler.node_remove(node_id);
let mut nodes_mut = (**nodes).clone();
if let Some(mut removed_node) = nodes_mut.remove(&node_id) {
// Ensure that any reconciler holding an Arc<> to this node will
// drop out when trying to RPC to it (setting Offline state sets the
// cancellation token on the Node object).
removed_node.set_availability(NodeAvailability::Offline);
}
*nodes = Arc::new(nodes_mut);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_nodes
.set(nodes.len() as i64);
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_https_pageserver_nodes
.set(nodes.values().filter(|n| n.has_https_port()).count() as i64);
}
Ok(())
}
pub(crate) async fn node_list(&self) -> Result<Vec<Node>, ApiError> { pub(crate) async fn node_list(&self) -> Result<Vec<Node>, ApiError> {
let nodes = { let nodes = {
self.inner self.inner
@@ -7546,7 +7714,7 @@ impl Service {
let mut tenants_affected: usize = 0; let mut tenants_affected: usize = 0;
for (_tenant_id, mut schedule_context, shards) in for (_tenant_id, mut schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Normal) TenantShardExclusiveIterator::new(tenants, ScheduleMode::Normal)
{ {
for tenant_shard in shards { for tenant_shard in shards {
let tenant_shard_id = tenant_shard.tenant_shard_id; let tenant_shard_id = tenant_shard.tenant_shard_id;
@@ -7717,6 +7885,142 @@ impl Service {
self.node_configure(node_id, availability, scheduling).await self.node_configure(node_id, availability, scheduling).await
} }
pub(crate) async fn start_node_delete(
self: &Arc<Self>,
node_id: NodeId,
) -> Result<(), ApiError> {
let (ongoing_op, node_policy, schedulable_nodes_count) = {
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
let schedulable_nodes_count = nodes
.iter()
.filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_)))
.count();
(
locked
.ongoing_operation
.as_ref()
.map(|ongoing| ongoing.operation),
node.get_scheduling(),
schedulable_nodes_count,
)
};
if let Some(ongoing) = ongoing_op {
return Err(ApiError::PreconditionFailed(
format!("Background operation already ongoing for node: {ongoing}").into(),
));
}
if schedulable_nodes_count == 0 {
return Err(ApiError::PreconditionFailed(
"No other schedulable nodes to move shards".into(),
));
}
match node_policy {
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Deleting))
.await?;
let cancel = self.cancel.child_token();
let gate_guard = self.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
let policy_on_start = node_policy;
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
operation: Operation::Delete(Delete { node_id }),
cancel: cancel.clone(),
});
let span = tracing::info_span!(parent: None, "delete_node", %node_id);
tokio::task::spawn(
{
let service = self.clone();
let cancel = cancel.clone();
async move {
let _gate_guard = gate_guard;
scopeguard::defer! {
let prev = service.inner.write().unwrap().ongoing_operation.take();
if let Some(Operation::Delete(removed_delete)) = prev.map(|h| h.operation) {
assert_eq!(removed_delete.node_id, node_id, "We always take the same operation");
} else {
panic!("We always remove the same operation")
}
}
tracing::info!("Delete background operation starting");
let res = service
.delete_node(node_id, policy_on_start, cancel)
.await;
match res {
Ok(()) => {
tracing::info!(
"Delete background operation completed successfully"
);
}
Err(OperationError::Cancelled) => {
tracing::info!("Delete background operation was cancelled");
}
Err(err) => {
tracing::error!(
"Delete background operation encountered: {err}"
)
}
}
}
}
.instrument(span),
);
}
NodeSchedulingPolicy::Deleting => {
return Err(ApiError::Conflict(format!(
"Node {node_id} has delete in progress"
)));
}
policy => {
return Err(ApiError::PreconditionFailed(
format!("Node {node_id} cannot be deleted due to {policy:?} policy").into(),
));
}
}
Ok(())
}
pub(crate) async fn cancel_node_delete(
self: &Arc<Self>,
node_id: NodeId,
) -> Result<(), ApiError> {
{
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
}
if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
if let Operation::Delete(delete) = op_handler.operation {
if delete.node_id == node_id {
tracing::info!("Cancelling background delete operation for node {node_id}");
op_handler.cancel.cancel();
return Ok(());
}
}
}
Err(ApiError::PreconditionFailed(
format!("Node {node_id} has no delete in progress").into(),
))
}
pub(crate) async fn start_node_drain( pub(crate) async fn start_node_drain(
self: &Arc<Self>, self: &Arc<Self>,
node_id: NodeId, node_id: NodeId,
@@ -8293,7 +8597,7 @@ impl Service {
// to ignore the utilisation component of the score. // to ignore the utilisation component of the score.
for (_tenant_id, schedule_context, shards) in for (_tenant_id, schedule_context, shards) in
TenantShardContextIterator::new(tenants, ScheduleMode::Speculative) TenantShardExclusiveIterator::new(tenants, ScheduleMode::Speculative)
{ {
for shard in shards { for shard in shards {
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS { if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
@@ -9020,25 +9324,7 @@ impl Service {
let mut waiters = Vec::new(); let mut waiters = Vec::new();
let mut tid_iter = TenantShardIterator::new({ let mut tid_iter = create_shared_shard_iterator(self.clone());
let service = self.clone();
move |last_inspected_shard: Option<TenantShardId>| {
let locked = &service.inner.read().unwrap();
let tenants = &locked.tenants;
let entry = match last_inspected_shard {
Some(skip_past) => {
// Skip to the last seen tenant shard id
let mut cursor = tenants.iter().skip_while(|(tid, _)| **tid != skip_past);
// Skip past the last seen
cursor.nth(1)
}
None => tenants.first_key_value(),
};
entry.map(|(tid, _)| tid).copied()
}
});
while !tid_iter.finished() { while !tid_iter.finished() {
if cancel.is_cancelled() { if cancel.is_cancelled() {
@@ -9058,7 +9344,11 @@ impl Service {
} }
} }
drain_utils::validate_node_state(&node_id, self.inner.read().unwrap().nodes.clone())?; operation_utils::validate_node_state(
&node_id,
self.inner.read().unwrap().nodes.clone(),
NodeSchedulingPolicy::Draining,
)?;
while waiters.len() < MAX_RECONCILES_PER_OPERATION { while waiters.len() < MAX_RECONCILES_PER_OPERATION {
let tid = match tid_iter.next() { let tid = match tid_iter.next() {
@@ -9138,7 +9428,7 @@ impl Service {
} }
waiters = self waiters = self
.await_waiters_remainder(waiters, WAITER_FILL_DRAIN_POLL_TIMEOUT) .await_waiters_remainder(waiters, WAITER_OPERATION_POLL_TIMEOUT)
.await; .await;
failpoint_support::sleep_millis_async!("sleepy-drain-loop", &cancel); failpoint_support::sleep_millis_async!("sleepy-drain-loop", &cancel);
@@ -9432,7 +9722,7 @@ impl Service {
} }
waiters = self waiters = self
.await_waiters_remainder(waiters, WAITER_FILL_DRAIN_POLL_TIMEOUT) .await_waiters_remainder(waiters, WAITER_OPERATION_POLL_TIMEOUT)
.await; .await;
} }

View File

@@ -1,4 +1,5 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc;
use utils::id::TenantId; use utils::id::TenantId;
use utils::shard::TenantShardId; use utils::shard::TenantShardId;
@@ -6,16 +7,21 @@ use utils::shard::TenantShardId;
use crate::scheduler::{ScheduleContext, ScheduleMode}; use crate::scheduler::{ScheduleContext, ScheduleMode};
use crate::tenant_shard::TenantShard; use crate::tenant_shard::TenantShard;
use super::Service;
/// Exclusive iterator over all tenant shards.
/// It is used to iterate over consistent tenants state at specific point in time.
///
/// When making scheduling decisions, it is useful to have the ScheduleContext for a whole /// When making scheduling decisions, it is useful to have the ScheduleContext for a whole
/// tenant while considering the individual shards within it. This iterator is a helper /// tenant while considering the individual shards within it. This iterator is a helper
/// that gathers all the shards in a tenant and then yields them together with a ScheduleContext /// that gathers all the shards in a tenant and then yields them together with a ScheduleContext
/// for the tenant. /// for the tenant.
pub(super) struct TenantShardContextIterator<'a> { pub(super) struct TenantShardExclusiveIterator<'a> {
schedule_mode: ScheduleMode, schedule_mode: ScheduleMode,
inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>, inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>,
} }
impl<'a> TenantShardContextIterator<'a> { impl<'a> TenantShardExclusiveIterator<'a> {
pub(super) fn new( pub(super) fn new(
tenants: &'a mut BTreeMap<TenantShardId, TenantShard>, tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
schedule_mode: ScheduleMode, schedule_mode: ScheduleMode,
@@ -27,7 +33,7 @@ impl<'a> TenantShardContextIterator<'a> {
} }
} }
impl<'a> Iterator for TenantShardContextIterator<'a> { impl<'a> Iterator for TenantShardExclusiveIterator<'a> {
type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>); type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>);
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
@@ -52,13 +58,93 @@ impl<'a> Iterator for TenantShardContextIterator<'a> {
} }
} }
/// Shared iterator over all tenant shards.
/// It is used to iterate over all tenants without blocking another code, working with tenants
///
/// A simple iterator which can be used in tandem with [`crate::service::Service`]
/// to iterate over all known tenant shard ids without holding the lock on the
/// service state at all times.
pub(crate) struct TenantShardSharedIterator<F> {
tenants_accessor: F,
inspected_all_shards: bool,
last_inspected_shard: Option<TenantShardId>,
}
impl<F> TenantShardSharedIterator<F>
where
F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
{
pub(crate) fn new(tenants_accessor: F) -> Self {
Self {
tenants_accessor,
inspected_all_shards: false,
last_inspected_shard: None,
}
}
pub(crate) fn finished(&self) -> bool {
self.inspected_all_shards
}
}
impl<F> Iterator for TenantShardSharedIterator<F>
where
F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
{
// TODO(ephemeralsad): consider adding schedule context to the iterator
type Item = TenantShardId;
/// Returns the next tenant shard id if one exists
fn next(&mut self) -> Option<Self::Item> {
if self.inspected_all_shards {
return None;
}
match (self.tenants_accessor)(self.last_inspected_shard) {
Some(tid) => {
self.last_inspected_shard = Some(tid);
Some(tid)
}
None => {
self.inspected_all_shards = true;
None
}
}
}
}
pub(crate) fn create_shared_shard_iterator(
service: Arc<Service>,
) -> TenantShardSharedIterator<impl Fn(Option<TenantShardId>) -> Option<TenantShardId>> {
let tenants_accessor = move |last_inspected_shard: Option<TenantShardId>| {
let locked = &service.inner.read().unwrap();
let tenants = &locked.tenants;
let entry = match last_inspected_shard {
Some(skip_past) => {
// Skip to the last seen tenant shard id
let mut cursor = tenants.iter().skip_while(|(tid, _)| **tid != skip_past);
// Skip past the last seen
cursor.nth(1)
}
None => tenants.first_key_value(),
};
entry.map(|(tid, _)| tid).copied()
};
TenantShardSharedIterator::new(tenants_accessor)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use pageserver_api::controller_api::PlacementPolicy; use pageserver_api::controller_api::PlacementPolicy;
use utils::shard::{ShardCount, ShardNumber}; use utils::id::TenantId;
use utils::shard::{ShardCount, ShardNumber, TenantShardId};
use super::*; use super::*;
use crate::scheduler::test_utils::make_test_nodes; use crate::scheduler::test_utils::make_test_nodes;
@@ -66,7 +152,7 @@ mod tests {
use crate::tenant_shard::tests::make_test_tenant_with_id; use crate::tenant_shard::tests::make_test_tenant_with_id;
#[test] #[test]
fn test_context_iterator() { fn test_exclusive_shard_iterator() {
// Hand-crafted tenant IDs to ensure they appear in the expected order when put into // Hand-crafted tenant IDs to ensure they appear in the expected order when put into
// a btreemap & iterated // a btreemap & iterated
let mut t_1_shards = make_test_tenant_with_id( let mut t_1_shards = make_test_tenant_with_id(
@@ -106,7 +192,7 @@ mod tests {
shard.schedule(&mut scheduler, &mut context).unwrap(); shard.schedule(&mut scheduler, &mut context).unwrap();
} }
let mut iter = TenantShardContextIterator::new(&mut tenants, ScheduleMode::Speculative); let mut iter = TenantShardExclusiveIterator::new(&mut tenants, ScheduleMode::Speculative);
let (tenant_id, context, shards) = iter.next().unwrap(); let (tenant_id, context, shards) = iter.next().unwrap();
assert_eq!(tenant_id, t1_id); assert_eq!(tenant_id, t1_id);
assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0)); assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
@@ -132,4 +218,46 @@ mod tests {
shard.intent.clear(&mut scheduler); shard.intent.clear(&mut scheduler);
} }
} }
#[test]
fn test_shared_shard_iterator() {
let tenant_id = TenantId::generate();
let shard_count = ShardCount(8);
let mut tenant_shards = Vec::default();
for i in 0..shard_count.0 {
tenant_shards.push((
TenantShardId {
tenant_id,
shard_number: ShardNumber(i),
shard_count,
},
(),
))
}
let tenant_shards = Arc::new(tenant_shards);
let tid_iter = TenantShardSharedIterator::new({
let tenants = tenant_shards.clone();
move |last_inspected_shard: Option<TenantShardId>| {
let entry = match last_inspected_shard {
Some(skip_past) => {
let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
cursor.nth(1)
}
None => tenants.first(),
};
entry.map(|(tid, _)| tid).copied()
}
});
let mut iterated_over = Vec::default();
for tid in tid_iter {
iterated_over.push((tid, ()));
}
assert_eq!(iterated_over, *tenant_shards);
}
} }

View File

@@ -1867,6 +1867,7 @@ class PageserverSchedulingPolicy(StrEnum):
FILLING = "Filling" FILLING = "Filling"
PAUSE = "Pause" PAUSE = "Pause"
PAUSE_FOR_RESTART = "PauseForRestart" PAUSE_FOR_RESTART = "PauseForRestart"
DELETING = "Deleting"
class StorageControllerLeadershipStatus(StrEnum): class StorageControllerLeadershipStatus(StrEnum):
@@ -2075,14 +2076,30 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN), headers=self.headers(TokenScope.ADMIN),
) )
def node_delete(self, node_id): def node_delete_old(self, node_id):
log.info(f"node_delete({node_id})") log.info(f"node_delete_old({node_id})")
self.request( self.request(
"DELETE", "DELETE",
f"{self.api}/control/v1/node/{node_id}", f"{self.api}/control/v1/node/{node_id}",
headers=self.headers(TokenScope.ADMIN), headers=self.headers(TokenScope.ADMIN),
) )
def node_delete(self, node_id):
log.info(f"node_delete({node_id})")
self.request(
"PUT",
f"{self.api}/control/v1/node/{node_id}/delete",
headers=self.headers(TokenScope.ADMIN),
)
def cancel_node_delete(self, node_id):
log.info(f"cancel_node_delete({node_id})")
self.request(
"DELETE",
f"{self.api}/control/v1/node/{node_id}/delete",
headers=self.headers(TokenScope.ADMIN),
)
def tombstone_delete(self, node_id): def tombstone_delete(self, node_id):
log.info(f"tombstone_delete({node_id})") log.info(f"tombstone_delete({node_id})")
self.request( self.request(

View File

@@ -2618,7 +2618,7 @@ def test_storage_controller_node_deletion(
wait_until(assert_shards_migrated) wait_until(assert_shards_migrated)
log.info(f"Deleting pageserver {victim.id}") log.info(f"Deleting pageserver {victim.id}")
env.storage_controller.node_delete(victim.id) env.storage_controller.node_delete_old(victim.id)
if not while_offline: if not while_offline:
@@ -2653,6 +2653,60 @@ def test_storage_controller_node_deletion(
env.storage_controller.consistency_check() env.storage_controller.consistency_check()
def test_storage_controller_node_delete_cancellation(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 3
neon_env_builder.num_azs = 3
env = neon_env_builder.init_configs()
env.start()
tenant_count = 12
shard_count_per_tenant = 16
tenant_ids = []
for _ in range(0, tenant_count):
tid = TenantId.generate()
tenant_ids.append(tid)
env.create_tenant(
tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
)
# Sanity check: initial creations should not leave the system in an unstable scheduling state
assert env.storage_controller.reconcile_all() == 0
nodes = env.storage_controller.node_list()
assert len(nodes) == 3
env.storage_controller.configure_failpoints(("sleepy-delete-loop", "return(10000)"))
ps_id_to_delete = env.pageservers[0].id
env.storage_controller.warm_up_all_secondaries()
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_delete(ps_id),
ps_id_to_delete,
max_attempts=3,
backoff=2,
)
env.storage_controller.poll_node_status(
ps_id_to_delete,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.DELETING,
max_attempts=6,
backoff=2,
)
env.storage_controller.cancel_node_delete(ps_id_to_delete)
env.storage_controller.poll_node_status(
ps_id_to_delete,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.ACTIVE,
max_attempts=6,
backoff=2,
)
@pytest.mark.parametrize("shard_count", [None, 2]) @pytest.mark.parametrize("shard_count", [None, 2])
def test_storage_controller_metadata_health( def test_storage_controller_metadata_health(
neon_env_builder: NeonEnvBuilder, neon_env_builder: NeonEnvBuilder,
@@ -3208,7 +3262,7 @@ def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder):
assert_nodes_count(3) assert_nodes_count(3)
ps = env.pageservers[0] ps = env.pageservers[0]
env.storage_controller.node_delete(ps.id) env.storage_controller.node_delete_old(ps.id)
# After deletion, the node count must be reduced # After deletion, the node count must be reduced
assert_nodes_count(2) assert_nodes_count(2)