Introduce flag for deletion API

This commit is contained in:
Aleksandr Sarantsev
2025-07-08 17:20:15 +04:00
parent 2f3fc7cb57
commit c8475ed008
5 changed files with 64 additions and 46 deletions

View File

@@ -75,6 +75,12 @@ enum Command {
NodeStartDelete {
#[arg(long)]
node_id: NodeId,
/// When `force` is true, skip waiting for shards to prewarm during migration.
/// This can significantly speed up node deletion since prewarming all shards
/// can take considerable time, but may result in slower initial access to
/// migrated shards until they warm up naturally.
#[arg(long)]
force: bool,
},
/// Cancel deletion of the specified pageserver and wait for `timeout`
/// for the operation to be canceled. May be retried.
@@ -933,13 +939,14 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
.await?;
}
Command::NodeStartDelete { node_id } => {
Command::NodeStartDelete { node_id, force } => {
let query = if force {
format!("control/v1/node/{node_id}/delete?force=true")
} else {
format!("control/v1/node/{node_id}/delete")
};
storcon_client
.dispatch::<(), ()>(
Method::PUT,
format!("control/v1/node/{node_id}/delete"),
None,
)
.dispatch::<(), ()>(Method::PUT, query, None)
.await?;
println!("Delete started for {node_id}");
}

View File

@@ -1066,9 +1066,10 @@ async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiErr
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let force: bool = parse_query_param(&req, "force")?.unwrap_or(false);
json_response(
StatusCode::OK,
state.service.start_node_delete(node_id).await?,
state.service.start_node_delete(node_id, force).await?,
)
}

View File

@@ -7165,6 +7165,7 @@ impl Service {
self: &Arc<Self>,
node_id: NodeId,
policy_on_start: NodeSchedulingPolicy,
force: bool,
cancel: CancellationToken,
) -> Result<(), OperationError> {
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal).build();
@@ -7172,23 +7173,28 @@ impl Service {
let mut waiters: Vec<ReconcilerWaiter> = Vec::new();
let mut tid_iter = create_shared_shard_iterator(self.clone());
while !tid_iter.finished() {
if cancel.is_cancelled() {
let process_cancel = || async {
// Attempt to restore the node to its original scheduling policy
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => return Err(OperationError::Cancelled),
Ok(()) => Err(OperationError::Cancelled),
Err(err) => {
return Err(OperationError::FinalizeError(
Err(OperationError::FinalizeError(
format!(
"Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
));
))
}
}
};
while !tid_iter.finished() {
if cancel.is_cancelled() {
return process_cancel().await;
}
operation_utils::validate_node_state(
@@ -7249,13 +7255,24 @@ impl Service {
)
}
// Do not wait for any reconciliations to finish if the deletion has been forced.
let waiter = self.maybe_configured_reconcile_shard(
tenant_shard,
nodes,
reconciler_config,
);
if let Some(some) = waiter {
waiters.push(some);
if force {
// Here we remove an existing observed location for the node we're removing, and it will
// not be re-added by a reconciler's completion because we filter out removed nodes in
// process_result.
//
// Note that we update the shard's observed state _after_ calling maybe_configured_reconcile_shard:
// that means any reconciles we spawned will know about the node we're deleting,
// enabling them to do live migrations if it's still online.
tenant_shard.observed.locations.remove(&node_id);
} else if let Some(waiter) = waiter {
waiters.push(waiter);
}
}
}
@@ -7269,21 +7286,7 @@ impl Service {
while !waiters.is_empty() {
if cancel.is_cancelled() {
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => return Err(OperationError::Cancelled),
Err(err) => {
return Err(OperationError::FinalizeError(
format!(
"Failed to finalise drain cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
));
}
}
return process_cancel().await;
}
tracing::info!("Awaiting {} pending delete reconciliations", waiters.len());
@@ -7888,6 +7891,7 @@ impl Service {
pub(crate) async fn start_node_delete(
self: &Arc<Self>,
node_id: NodeId,
force: bool,
) -> Result<(), ApiError> {
let (ongoing_op, node_policy, schedulable_nodes_count) = {
let locked = self.inner.read().unwrap();
@@ -7957,7 +7961,7 @@ impl Service {
tracing::info!("Delete background operation starting");
let res = service
.delete_node(node_id, policy_on_start, cancel)
.delete_node(node_id, policy_on_start, force, cancel)
.await;
match res {
Ok(()) => {

View File

@@ -2084,11 +2084,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN),
)
def node_delete(self, node_id):
def node_delete(self, node_id, force: bool = False):
log.info(f"node_delete({node_id})")
query = f"{self.api}/control/v1/node/{node_id}/delete"
if force:
query += "?force=true"
self.request(
"PUT",
f"{self.api}/control/v1/node/{node_id}/delete",
query,
headers=self.headers(TokenScope.ADMIN),
)

View File

@@ -2621,7 +2621,7 @@ def test_storage_controller_node_deletion(
wait_until(assert_shards_migrated)
log.info(f"Deleting pageserver {victim.id}")
env.storage_controller.node_delete_old(victim.id)
env.storage_controller.node_delete(victim.id, force=True)
if not while_offline:
@@ -2634,8 +2634,11 @@ def test_storage_controller_node_deletion(
wait_until(assert_victim_evacuated)
# The node should be gone from the list API
def assert_victim_gone():
assert victim.id not in [n["id"] for n in env.storage_controller.node_list()]
wait_until(assert_victim_gone)
# No tenants should refer to the node in their intent
for tenant_id in tenant_ids:
describe = env.storage_controller.tenant_describe(tenant_id)
@@ -3265,10 +3268,10 @@ def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder):
assert_nodes_count(3)
ps = env.pageservers[0]
env.storage_controller.node_delete_old(ps.id)
env.storage_controller.node_delete(ps.id, force=True)
# After deletion, the node count must be reduced
assert_nodes_count(2)
wait_until(lambda: assert_nodes_count(2))
# Running pageserver CLI init in a separate thread
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: