mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
storcon: implement endpoints for cancellation of drain and fill operations (#8029)
## Problem There's no way to cancel drain and fill operations. ## Summary of changes Implement HTTP endpoints to allow cancelling of background operations. When the operationis cancelled successfully, the node scheduling policy will revert to `Active`.
This commit is contained in:
@@ -502,6 +502,17 @@ async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_cancel_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.cancel_node_drain(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -513,6 +524,17 @@ async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.cancel_node_fill(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_split(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
@@ -871,9 +893,23 @@ pub fn make_router(
|
||||
.put("/control/v1/node/:node_id/drain", |r| {
|
||||
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
|
||||
})
|
||||
.delete("/control/v1/node/:node_id/drain", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_cancel_node_drain,
|
||||
RequestName("control_v1_cancel_node_drain"),
|
||||
)
|
||||
})
|
||||
.put("/control/v1/node/:node_id/fill", |r| {
|
||||
named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
|
||||
})
|
||||
.delete("/control/v1/node/:node_id/fill", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_cancel_node_fill,
|
||||
RequestName("control_v1_cancel_node_fill"),
|
||||
)
|
||||
})
|
||||
// TODO(vlad): endpoint for cancelling drain and fill
|
||||
// Tenant Shard operations
|
||||
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
|
||||
|
||||
@@ -4541,7 +4541,8 @@ impl Service {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel = self.cancel.child_token();
|
||||
let gate_guard = self.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
|
||||
|
||||
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
|
||||
operation: Operation::Drain(Drain { node_id }),
|
||||
@@ -4552,6 +4553,8 @@ impl Service {
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
let _gate_guard = gate_guard;
|
||||
|
||||
scopeguard::defer! {
|
||||
let prev = service.inner.write().unwrap().ongoing_operation.take();
|
||||
|
||||
@@ -4593,6 +4596,44 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn cancel_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
|
||||
(node.is_available(), node.get_scheduling())
|
||||
};
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if !matches!(node_policy, NodeSchedulingPolicy::Draining) {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} has no drain in progress").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
|
||||
if let Operation::Drain(drain) = op_handler.operation {
|
||||
if drain.node_id == node_id {
|
||||
tracing::info!("Cancelling background drain operation for node {node_id}");
|
||||
op_handler.cancel.cancel();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} has no drain in progress").into(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_fill(self: &Arc<Self>, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (ongoing_op, node_available, node_policy, total_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
@@ -4635,7 +4676,8 @@ impl Service {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel = self.cancel.child_token();
|
||||
let gate_guard = self.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
|
||||
|
||||
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
|
||||
operation: Operation::Fill(Fill { node_id }),
|
||||
@@ -4646,6 +4688,8 @@ impl Service {
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
let _gate_guard = gate_guard;
|
||||
|
||||
scopeguard::defer! {
|
||||
let prev = service.inner.write().unwrap().ongoing_operation.take();
|
||||
|
||||
@@ -4687,6 +4731,44 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn cancel_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
|
||||
(node.is_available(), node.get_scheduling())
|
||||
};
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if !matches!(node_policy, NodeSchedulingPolicy::Filling) {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} has no fill in progress").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() {
|
||||
if let Operation::Fill(fill) = op_handler.operation {
|
||||
if fill.node_id == node_id {
|
||||
tracing::info!("Cancelling background drain operation for node {node_id}");
|
||||
op_handler.cancel.cancel();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} has no fill in progress").into(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Helper for methods that will try and call pageserver APIs for
|
||||
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
|
||||
/// is attached somewhere.
|
||||
@@ -5286,7 +5368,21 @@ impl Service {
|
||||
|
||||
while !inspected_all_shards {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(OperationError::Cancelled);
|
||||
match self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
|
||||
.await
|
||||
{
|
||||
Ok(()) => return Err(OperationError::Cancelled),
|
||||
Err(err) => {
|
||||
return Err(OperationError::FinalizeError(
|
||||
format!(
|
||||
"Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
|
||||
node_id, err
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
@@ -5356,9 +5452,29 @@ impl Service {
|
||||
waiters = self
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
|
||||
failpoint_support::sleep_millis_async!("sleepy-drain-loop");
|
||||
}
|
||||
|
||||
while !waiters.is_empty() {
|
||||
if cancel.is_cancelled() {
|
||||
match self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
|
||||
.await
|
||||
{
|
||||
Ok(()) => return Err(OperationError::Cancelled),
|
||||
Err(err) => {
|
||||
return Err(OperationError::FinalizeError(
|
||||
format!(
|
||||
"Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
|
||||
node_id, err
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Awaiting {} pending drain reconciliations", waiters.len());
|
||||
|
||||
waiters = self
|
||||
@@ -5495,7 +5611,21 @@ impl Service {
|
||||
// we validate to ensure that it has not gone stale in the meantime.
|
||||
while !tids_to_promote.is_empty() {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(OperationError::Cancelled);
|
||||
match self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
|
||||
.await
|
||||
{
|
||||
Ok(()) => return Err(OperationError::Cancelled),
|
||||
Err(err) => {
|
||||
return Err(OperationError::FinalizeError(
|
||||
format!(
|
||||
"Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
|
||||
node_id, err
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
@@ -5563,6 +5693,24 @@ impl Service {
|
||||
}
|
||||
|
||||
while !waiters.is_empty() {
|
||||
if cancel.is_cancelled() {
|
||||
match self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
|
||||
.await
|
||||
{
|
||||
Ok(()) => return Err(OperationError::Cancelled),
|
||||
Err(err) => {
|
||||
return Err(OperationError::FinalizeError(
|
||||
format!(
|
||||
"Failed to finalise drain cancel of {} by setting scheduling policy to Active: {}",
|
||||
node_id, err
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!("Awaiting {} pending fill reconciliations", waiters.len());
|
||||
|
||||
waiters = self
|
||||
|
||||
@@ -2249,6 +2249,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def cancel_node_drain(self, node_id):
|
||||
log.info(f"cancel_node_drain({node_id})")
|
||||
self.request(
|
||||
"DELETE",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_fill(self, node_id):
|
||||
log.info(f"node_fill({node_id})")
|
||||
self.request(
|
||||
@@ -2257,6 +2265,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def cancel_node_fill(self, node_id):
|
||||
log.info(f"cancel_node_fill({node_id})")
|
||||
self.request(
|
||||
"DELETE",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_status(self, node_id):
|
||||
response = self.request(
|
||||
"GET",
|
||||
|
||||
@@ -1518,6 +1518,49 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
|
||||
workload.validate()
|
||||
|
||||
|
||||
def retryable_node_operation(op, ps_id, max_attempts, backoff):
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
op(ps_id)
|
||||
return
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
|
||||
def poll_node_status(env, node_id, desired_scheduling_policy, max_attempts, backoff):
|
||||
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
status = env.storage_controller.node_status(node_id)
|
||||
policy = status["scheduling"]
|
||||
if policy == desired_scheduling_policy:
|
||||
return
|
||||
else:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise AssertionError(
|
||||
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
|
||||
)
|
||||
|
||||
time.sleep(backoff)
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call failed ({max_attempts} retries left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
|
||||
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Graceful reststart of storage controller clusters use the drain and
|
||||
@@ -1546,47 +1589,6 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
def retryable_node_operation(op, ps_id, max_attempts, backoff):
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
op(ps_id)
|
||||
return
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def poll_node_status(node_id, desired_scheduling_policy, max_attempts, backoff):
|
||||
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
status = env.storage_controller.node_status(node_id)
|
||||
policy = status["scheduling"]
|
||||
if policy == desired_scheduling_policy:
|
||||
return
|
||||
else:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise AssertionError(
|
||||
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
|
||||
)
|
||||
|
||||
time.sleep(backoff)
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call failed ({max_attempts} retries left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
|
||||
# Assert that all nodes have some attached shards
|
||||
assert len(shard_counts) == len(env.pageservers)
|
||||
@@ -1602,7 +1604,7 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
|
||||
poll_node_status(env, ps.id, "PauseForRestart", max_attempts=6, backoff=5)
|
||||
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
|
||||
@@ -1612,12 +1614,12 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
|
||||
assert sum(shard_counts.values()) == total_shards
|
||||
|
||||
ps.restart()
|
||||
poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
|
||||
poll_node_status(env, ps.id, "Active", max_attempts=10, backoff=1)
|
||||
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
|
||||
poll_node_status(env, ps.id, "Active", max_attempts=6, backoff=5)
|
||||
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
|
||||
@@ -1627,3 +1629,43 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after rolling restart: {shard_counts}")
|
||||
assert_shard_counts_balanced(env, shard_counts, total_shards)
|
||||
|
||||
|
||||
def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_count = 5
|
||||
shard_count_per_tenant = 8
|
||||
tenant_ids = []
|
||||
|
||||
for _ in range(0, tenant_count):
|
||||
tid = TenantId.generate()
|
||||
tenant_ids.append(tid)
|
||||
env.neon_cli.create_tenant(
|
||||
tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
|
||||
)
|
||||
|
||||
# See sleep comment in the test above.
|
||||
time.sleep(2)
|
||||
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
env.storage_controller.configure_failpoints(("sleepy-drain-loop", "return(2000)"))
|
||||
|
||||
ps_id_to_drain = env.pageservers[0].id
|
||||
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id),
|
||||
ps_id_to_drain,
|
||||
max_attempts=3,
|
||||
backoff=2,
|
||||
)
|
||||
|
||||
poll_node_status(env, ps_id_to_drain, "Draining", max_attempts=6, backoff=2)
|
||||
|
||||
env.storage_controller.cancel_node_drain(ps_id_to_drain)
|
||||
|
||||
poll_node_status(env, ps_id_to_drain, "Active", max_attempts=6, backoff=2)
|
||||
|
||||
Reference in New Issue
Block a user