diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs
index 3e9951fb9e..680e6f09c4 100644
--- a/storage_controller/src/http.rs
+++ b/storage_controller/src/http.rs
@@ -502,6 +502,17 @@ async fn handle_node_drain(req: Request
) -> Result, ApiErro
json_response(StatusCode::ACCEPTED, ())
}
+async fn handle_cancel_node_drain(req: Request) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+
+ let state = get_state(&req);
+ let node_id: NodeId = parse_request_param(&req, "node_id")?;
+
+ state.service.cancel_node_drain(node_id).await?;
+
+ json_response(StatusCode::ACCEPTED, ())
+}
+
async fn handle_node_fill(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
@@ -513,6 +524,17 @@ async fn handle_node_fill(req: Request) -> Result, ApiError
json_response(StatusCode::ACCEPTED, ())
}
+async fn handle_cancel_node_fill(req: Request) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+
+ let state = get_state(&req);
+ let node_id: NodeId = parse_request_param(&req, "node_id")?;
+
+ state.service.cancel_node_fill(node_id).await?;
+
+ json_response(StatusCode::ACCEPTED, ())
+}
+
async fn handle_tenant_shard_split(
service: Arc,
mut req: Request,
@@ -871,9 +893,23 @@ pub fn make_router(
.put("/control/v1/node/:node_id/drain", |r| {
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
})
+ .delete("/control/v1/node/:node_id/drain", |r| {
+ named_request_span(
+ r,
+ handle_cancel_node_drain,
+ RequestName("control_v1_cancel_node_drain"),
+ )
+ })
.put("/control/v1/node/:node_id/fill", |r| {
named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
})
+ .delete("/control/v1/node/:node_id/fill", |r| {
+ named_request_span(
+ r,
+ handle_cancel_node_fill,
+ RequestName("control_v1_cancel_node_fill"),
+ )
+ })
// TODO(vlad): endpoint for cancelling drain and fill
// Tenant Shard operations
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs
index 792f68cc5a..752fb2c161 100644
--- a/storage_controller/src/service.rs
+++ b/storage_controller/src/service.rs
@@ -4541,7 +4541,8 @@ impl Service {
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
.await?;
- let cancel = CancellationToken::new();
+ let cancel = self.cancel.child_token();
+ let gate_guard = self.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
operation: Operation::Drain(Drain { node_id }),
@@ -4552,6 +4553,8 @@ impl Service {
let service = self.clone();
let cancel = cancel.clone();
async move {
+ let _gate_guard = gate_guard;
+
scopeguard::defer! {
let prev = service.inner.write().unwrap().ongoing_operation.take();
@@ -4593,6 +4596,44 @@ impl Service {
Ok(())
}
+ pub(crate) async fn cancel_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
+ let (node_available, node_policy) = {
+ let locked = self.inner.read().unwrap();
+ let nodes = &locked.nodes;
+ let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
+ anyhow::anyhow!("Node {} not registered", node_id).into(),
+ ))?;
+
+ (node.is_available(), node.get_scheduling())
+ };
+
+ if !node_available {
+ return Err(ApiError::ResourceUnavailable(
+ format!("Node {node_id} is currently unavailable").into(),
+ ));
+ }
+
+ if !matches!(node_policy, NodeSchedulingPolicy::Draining) {
+ return Err(ApiError::PreconditionFailed(
+ format!("Node {node_id} has no drain in progress").into(),
+ ));
+ }
+
+ if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
+ if let Operation::Drain(drain) = op_handler.operation {
+ if drain.node_id == node_id {
+ tracing::info!("Cancelling background drain operation for node {node_id}");
+ op_handler.cancel.cancel();
+ return Ok(());
+ }
+ }
+ }
+
+ Err(ApiError::PreconditionFailed(
+ format!("Node {node_id} has no drain in progress").into(),
+ ))
+ }
+
pub(crate) async fn start_node_fill(self: &Arc, node_id: NodeId) -> Result<(), ApiError> {
let (ongoing_op, node_available, node_policy, total_nodes_count) = {
let locked = self.inner.read().unwrap();
@@ -4635,7 +4676,8 @@ impl Service {
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
.await?;
- let cancel = CancellationToken::new();
+ let cancel = self.cancel.child_token();
+ let gate_guard = self.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
operation: Operation::Fill(Fill { node_id }),
@@ -4646,6 +4688,8 @@ impl Service {
let service = self.clone();
let cancel = cancel.clone();
async move {
+ let _gate_guard = gate_guard;
+
scopeguard::defer! {
let prev = service.inner.write().unwrap().ongoing_operation.take();
@@ -4687,6 +4731,44 @@ impl Service {
Ok(())
}
+ pub(crate) async fn cancel_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
+ let (node_available, node_policy) = {
+ let locked = self.inner.read().unwrap();
+ let nodes = &locked.nodes;
+ let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
+ anyhow::anyhow!("Node {} not registered", node_id).into(),
+ ))?;
+
+ (node.is_available(), node.get_scheduling())
+ };
+
+ if !node_available {
+ return Err(ApiError::ResourceUnavailable(
+ format!("Node {node_id} is currently unavailable").into(),
+ ));
+ }
+
+ if !matches!(node_policy, NodeSchedulingPolicy::Filling) {
+ return Err(ApiError::PreconditionFailed(
+ format!("Node {node_id} has no fill in progress").into(),
+ ));
+ }
+
+ if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
+ if let Operation::Fill(fill) = op_handler.operation {
+ if fill.node_id == node_id {
+ tracing::info!("Cancelling background drain operation for node {node_id}");
+ op_handler.cancel.cancel();
+ return Ok(());
+ }
+ }
+ }
+
+ Err(ApiError::PreconditionFailed(
+ format!("Node {node_id} has no fill in progress").into(),
+ ))
+ }
+
/// Helper for methods that will try and call pageserver APIs for
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
/// is attached somewhere.
@@ -5286,7 +5368,21 @@ impl Service {
while !inspected_all_shards {
if cancel.is_cancelled() {
- return Err(OperationError::Cancelled);
+ match self
+ .node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
+ .await
+ {
+ Ok(()) => return Err(OperationError::Cancelled),
+ Err(err) => {
+ return Err(OperationError::FinalizeError(
+ format!(
+ "Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
+ node_id, err
+ )
+ .into(),
+ ));
+ }
+ }
}
{
@@ -5356,9 +5452,29 @@ impl Service {
waiters = self
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
.await;
+
+ failpoint_support::sleep_millis_async!("sleepy-drain-loop");
}
while !waiters.is_empty() {
+ if cancel.is_cancelled() {
+ match self
+ .node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
+ .await
+ {
+ Ok(()) => return Err(OperationError::Cancelled),
+ Err(err) => {
+ return Err(OperationError::FinalizeError(
+ format!(
+ "Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
+ node_id, err
+ )
+ .into(),
+ ));
+ }
+ }
+ }
+
tracing::info!("Awaiting {} pending drain reconciliations", waiters.len());
waiters = self
@@ -5495,7 +5611,21 @@ impl Service {
// we validate to ensure that it has not gone stale in the meantime.
while !tids_to_promote.is_empty() {
if cancel.is_cancelled() {
- return Err(OperationError::Cancelled);
+ match self
+ .node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
+ .await
+ {
+ Ok(()) => return Err(OperationError::Cancelled),
+ Err(err) => {
+ return Err(OperationError::FinalizeError(
+ format!(
+ "Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
+ node_id, err
+ )
+ .into(),
+ ));
+ }
+ }
}
{
@@ -5563,6 +5693,24 @@ impl Service {
}
while !waiters.is_empty() {
+ if cancel.is_cancelled() {
+ match self
+ .node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
+ .await
+ {
+ Ok(()) => return Err(OperationError::Cancelled),
+ Err(err) => {
+ return Err(OperationError::FinalizeError(
+ format!(
+ "Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
+ node_id, err
+ )
+ .into(),
+ ));
+ }
+ }
+ }
+
tracing::info!("Awaiting {} pending fill reconciliations", waiters.len());
waiters = self
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index 4ff1705ca4..b624c84fad 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -2249,6 +2249,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN),
)
+ def cancel_node_drain(self, node_id):
+ log.info(f"cancel_node_drain({node_id})")
+ self.request(
+ "DELETE",
+ f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
+ headers=self.headers(TokenScope.ADMIN),
+ )
+
def node_fill(self, node_id):
log.info(f"node_fill({node_id})")
self.request(
@@ -2257,6 +2265,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN),
)
+ def cancel_node_fill(self, node_id):
+ log.info(f"cancel_node_fill({node_id})")
+ self.request(
+ "DELETE",
+ f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
+ headers=self.headers(TokenScope.ADMIN),
+ )
+
def node_status(self, node_id):
response = self.request(
"GET",
diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py
index d72377e33e..9cc13ecfdb 100644
--- a/test_runner/regress/test_storage_controller.py
+++ b/test_runner/regress/test_storage_controller.py
@@ -1518,6 +1518,49 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
workload.validate()
+def retryable_node_operation(op, ps_id, max_attempts, backoff):
+ while max_attempts > 0:
+ try:
+ op(ps_id)
+ return
+ except StorageControllerApiException as e:
+ max_attempts -= 1
+ log.info(f"Operation failed ({max_attempts} attempts left): {e}")
+
+ if max_attempts == 0:
+ raise e
+
+ time.sleep(backoff)
+
+
+def poll_node_status(env, node_id, desired_scheduling_policy, max_attempts, backoff):
+ log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
+ while max_attempts > 0:
+ try:
+ status = env.storage_controller.node_status(node_id)
+ policy = status["scheduling"]
+ if policy == desired_scheduling_policy:
+ return
+ else:
+ max_attempts -= 1
+ log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
+
+ if max_attempts == 0:
+ raise AssertionError(
+ f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
+ )
+
+ time.sleep(backoff)
+ except StorageControllerApiException as e:
+ max_attempts -= 1
+ log.info(f"Status call failed ({max_attempts} retries left): {e}")
+
+ if max_attempts == 0:
+ raise e
+
+ time.sleep(backoff)
+
+
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
"""
Graceful reststart of storage controller clusters use the drain and
@@ -1546,47 +1589,6 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
nodes = env.storage_controller.node_list()
assert len(nodes) == 2
- def retryable_node_operation(op, ps_id, max_attempts, backoff):
- while max_attempts > 0:
- try:
- op(ps_id)
- return
- except StorageControllerApiException as e:
- max_attempts -= 1
- log.info(f"Operation failed ({max_attempts} attempts left): {e}")
-
- if max_attempts == 0:
- raise e
-
- time.sleep(backoff)
-
- def poll_node_status(node_id, desired_scheduling_policy, max_attempts, backoff):
- log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
- while max_attempts > 0:
- try:
- status = env.storage_controller.node_status(node_id)
- policy = status["scheduling"]
- if policy == desired_scheduling_policy:
- return
- else:
- max_attempts -= 1
- log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
-
- if max_attempts == 0:
- raise AssertionError(
- f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
- )
-
- time.sleep(backoff)
- except StorageControllerApiException as e:
- max_attempts -= 1
- log.info(f"Status call failed ({max_attempts} retries left): {e}")
-
- if max_attempts == 0:
- raise e
-
- time.sleep(backoff)
-
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
# Assert that all nodes have some attached shards
assert len(shard_counts) == len(env.pageservers)
@@ -1602,7 +1604,7 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
- poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
+ poll_node_status(env, ps.id, "PauseForRestart", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
@@ -1612,12 +1614,12 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
assert sum(shard_counts.values()) == total_shards
ps.restart()
- poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
+ poll_node_status(env, ps.id, "Active", max_attempts=10, backoff=1)
retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
- poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
+ poll_node_status(env, ps.id, "Active", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
@@ -1627,3 +1629,43 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after rolling restart: {shard_counts}")
assert_shard_counts_balanced(env, shard_counts, total_shards)
+
+
+def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
+ neon_env_builder.num_pageservers = 2
+ env = neon_env_builder.init_configs()
+ env.start()
+
+ tenant_count = 5
+ shard_count_per_tenant = 8
+ tenant_ids = []
+
+ for _ in range(0, tenant_count):
+ tid = TenantId.generate()
+ tenant_ids.append(tid)
+ env.neon_cli.create_tenant(
+ tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
+ )
+
+ # See sleep comment in the test above.
+ time.sleep(2)
+
+ nodes = env.storage_controller.node_list()
+ assert len(nodes) == 2
+
+ env.storage_controller.configure_failpoints(("sleepy-drain-loop", "return(2000)"))
+
+ ps_id_to_drain = env.pageservers[0].id
+
+ retryable_node_operation(
+ lambda ps_id: env.storage_controller.node_drain(ps_id),
+ ps_id_to_drain,
+ max_attempts=3,
+ backoff=2,
+ )
+
+ poll_node_status(env, ps_id_to_drain, "Draining", max_attempts=6, backoff=2)
+
+ env.storage_controller.cancel_node_drain(ps_id_to_drain)
+
+ poll_node_status(env, ps_id_to_drain, "Active", max_attempts=6, backoff=2)