mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
storage controller: improve safety of shard splits coinciding with controller restarts (#11412)
## Problem The graceful leadership transfer process involves calling step_down on the old controller, but this was not waiting for shard splits to complete, and the new controller could therefore end up trying to abort a shard split while it was still going on. We mitigated this already in #11256 by avoiding the case where shard split completion would update the database incorrectly, but this was a fragile fix because it assumes that is the only problematic part of the split running concurrently. Precursors: - #11290 - #11256 Closes: #11254 ## Summary of changes - Hold the reconciler gate from shard splits, so that step_down will wait for them. Splits should always be fairly prompt, so it is okay to wait here. - Defense in depth: if step_down times out (hardcoded 10 second limit), then fully terminate the controller process rather than letting it continue running, potentially doing split-brainy things. This makes sense because the new controller will always declare itself leader unilaterally if step_down fails, so leaving an old controller running is not beneficial. - Tests: extend `test_storage_controller_leadership_transfer_during_split` to separately exercise the case of a split holding up step_down, and the case where the overall timeout on step_down is hit and the controller terminates.
This commit is contained in:
@@ -1235,8 +1235,18 @@ async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let state = get_state(&req);
|
||||
json_response(StatusCode::OK, state.service.step_down().await)
|
||||
// Spawn a background task: once we start stepping down, we must finish: if the client drops
|
||||
// their request we should avoid stopping in some part-stepped-down state.
|
||||
let handle = tokio::spawn(async move {
|
||||
let state = get_state(&req);
|
||||
state.service.step_down().await
|
||||
});
|
||||
|
||||
let result = handle
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
json_response(StatusCode::OK, result)
|
||||
}
|
||||
|
||||
async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
|
||||
@@ -61,7 +61,7 @@ use utils::completion::Barrier;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::{failpoint_support, pausable_failpoint};
|
||||
|
||||
use crate::background_node_operations::{
|
||||
@@ -594,6 +594,8 @@ struct TenantShardSplitAbort {
|
||||
new_stripe_size: Option<ShardStripeSize>,
|
||||
/// Until this abort op is complete, no other operations may be done on the tenant
|
||||
_tenant_lock: TracingExclusiveGuard<TenantOperations>,
|
||||
/// The reconciler gate for the duration of the split operation, and any included abort.
|
||||
_gate: GateGuard,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
@@ -1460,7 +1462,7 @@ impl Service {
|
||||
// Retry until shutdown: we must keep this request object alive until it is properly
|
||||
// processed, as it holds a lock guard that prevents other operations trying to do things
|
||||
// to the tenant while it is in a weird part-split state.
|
||||
while !self.cancel.is_cancelled() {
|
||||
while !self.reconcilers_cancel.is_cancelled() {
|
||||
match self.abort_tenant_shard_split(&op).await {
|
||||
Ok(_) => break,
|
||||
Err(e) => {
|
||||
@@ -1473,9 +1475,12 @@ impl Service {
|
||||
// when we retry, so that the abort op will succeed. If the abort op is failing
|
||||
// for some other reason, we will keep retrying forever, or until a human notices
|
||||
// and does something about it (either fixing a pageserver or restarting the controller).
|
||||
tokio::time::timeout(Duration::from_secs(5), self.cancel.cancelled())
|
||||
.await
|
||||
.ok();
|
||||
tokio::time::timeout(
|
||||
Duration::from_secs(5),
|
||||
self.reconcilers_cancel.cancelled(),
|
||||
)
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4910,7 +4915,7 @@ impl Service {
|
||||
1,
|
||||
10,
|
||||
Duration::from_secs(5),
|
||||
&self.cancel,
|
||||
&self.reconcilers_cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -5161,6 +5166,11 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
let _gate = self
|
||||
.reconcilers_gate
|
||||
.enter()
|
||||
.map_err(|_| ApiError::ShuttingDown)?;
|
||||
|
||||
let new_shard_count = ShardCount::new(split_req.new_shard_count);
|
||||
let new_stripe_size = split_req.new_stripe_size;
|
||||
|
||||
@@ -5188,6 +5198,7 @@ impl Service {
|
||||
new_shard_count,
|
||||
new_stripe_size,
|
||||
_tenant_lock,
|
||||
_gate,
|
||||
})
|
||||
// Ignore error sending: that just means we're shutting down: aborts are ephemeral so it's fine to drop it.
|
||||
.ok();
|
||||
@@ -5527,7 +5538,10 @@ impl Service {
|
||||
"failpoint".to_string()
|
||||
)));
|
||||
|
||||
failpoint_support::sleep_millis_async!("shard-split-post-remote-sleep", &self.cancel);
|
||||
failpoint_support::sleep_millis_async!(
|
||||
"shard-split-post-remote-sleep",
|
||||
&self.reconcilers_cancel
|
||||
);
|
||||
|
||||
tracing::info!(
|
||||
"Split {} into {}",
|
||||
@@ -5585,7 +5599,7 @@ impl Service {
|
||||
stripe_size,
|
||||
preferred_az: preferred_az_id.as_ref().map(Cow::Borrowed),
|
||||
},
|
||||
&self.cancel,
|
||||
&self.reconcilers_cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -8670,9 +8684,24 @@ impl Service {
|
||||
failpoint_support::sleep_millis_async!("sleep-on-step-down-handling");
|
||||
|
||||
self.inner.write().unwrap().step_down();
|
||||
// TODO: would it make sense to have a time-out for this?
|
||||
self.stop_reconciliations(StopReconciliationsReason::SteppingDown)
|
||||
.await;
|
||||
|
||||
// Wait for reconciliations to stop, or terminate this process if they
|
||||
// fail to stop in time (this indicates a bug in shutdown)
|
||||
tokio::select! {
|
||||
_ = self.stop_reconciliations(StopReconciliationsReason::SteppingDown) => {
|
||||
tracing::info!("Reconciliations stopped, proceeding with step down");
|
||||
}
|
||||
_ = async {
|
||||
failpoint_support::sleep_millis_async!("step-down-delay-timeout");
|
||||
tokio::time::sleep(Duration::from_secs(10)).await
|
||||
} => {
|
||||
tracing::warn!("Step down timed out while waiting for reconciliation gate, terminating process");
|
||||
|
||||
// The caller may proceed to act as leader when it sees this request fail: reduce the chance
|
||||
// of a split-brain situation by terminating this controller instead of leaving it up in a partially-shut-down state.
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
let mut global_observed = GlobalObservedState::default();
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
@@ -2892,10 +2892,12 @@ def test_storage_controller_leadership_transfer(
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("step_down_times_out", [False, True])
|
||||
def test_storage_controller_leadership_transfer_during_split(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
storage_controller_proxy: StorageControllerProxy,
|
||||
port_distributor: PortDistributor,
|
||||
step_down_times_out: bool,
|
||||
):
|
||||
"""
|
||||
Exercise a race between shard splitting and graceful leadership transfer. This is
|
||||
@@ -2936,6 +2938,18 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# We are testing scenarios where the step down API does not complete: either because it is stuck
|
||||
# doing a shard split, or because it totally times out on some other failpoint.
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*step_down.*request was dropped before completing.*",
|
||||
".*step_down.*operation timed out.*",
|
||||
".*Send step down request failed, will retry.*",
|
||||
".*Send step down request still failed after.*retries.*",
|
||||
".*Leader .+ did not respond to step-down request.*",
|
||||
]
|
||||
)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
# Start a shard split
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
@@ -2943,6 +2957,14 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
)
|
||||
pause_failpoint = "shard-split-pre-complete"
|
||||
env.storage_controller.configure_failpoints((pause_failpoint, "pause"))
|
||||
|
||||
if not step_down_times_out:
|
||||
# Prevent the timeout self-terminate code from executing: we will block step down on the
|
||||
# shard split itself
|
||||
env.storage_controller.configure_failpoints(
|
||||
("step-down-delay-timeout", "return(3600000)")
|
||||
)
|
||||
|
||||
split_fut = executor.submit(
|
||||
env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2
|
||||
)
|
||||
@@ -2961,12 +2983,20 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port
|
||||
)
|
||||
|
||||
if step_down_times_out:
|
||||
# Step down will time out, original controller will terminate itself
|
||||
env.storage_controller.allowed_errors.extend([".*terminating process.*"])
|
||||
else:
|
||||
# Step down does not time out: original controller hits its shard split completion
|
||||
# code path and realises that it must not purge the parent shards from the database.
|
||||
env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"])
|
||||
|
||||
def passed_split_abort():
|
||||
try:
|
||||
log.info("Checking log for pattern...")
|
||||
assert env.storage_controller.log_contains(
|
||||
".*Using observed state received from leader.*"
|
||||
)
|
||||
# This log is indicative of entering startup_reconcile, which happens
|
||||
# after the point we would abort shard splits
|
||||
assert env.storage_controller.log_contains(".*Populating tenant shards.*")
|
||||
except Exception:
|
||||
log.exception("Failed to find pattern in log")
|
||||
raise
|
||||
@@ -2975,34 +3005,42 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
wait_until(passed_split_abort, interval=0.1, status_interval=1.0)
|
||||
assert env.storage_controller.log_contains(".*Aborting shard split.*")
|
||||
|
||||
# Proxy is still talking to original controller here: disable its pause failpoint so
|
||||
# that its shard split can run to completion.
|
||||
log.info("Disabling failpoint")
|
||||
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
|
||||
# on handling the shard split request.
|
||||
env.storage_controller.request(
|
||||
"PUT",
|
||||
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
|
||||
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
if step_down_times_out:
|
||||
# We will let the old controller hit a timeout path where it terminates itself, rather than
|
||||
# completing step_down and trying to complete a shard split
|
||||
def old_controller_terminated():
|
||||
assert env.storage_controller.log_contains(".*terminating process.*")
|
||||
|
||||
def previous_stepped_down():
|
||||
assert (
|
||||
env.storage_controller.get_leadership_status()
|
||||
== StorageControllerLeadershipStatus.STEPPED_DOWN
|
||||
wait_until(old_controller_terminated)
|
||||
else:
|
||||
# Proxy is still talking to original controller here: disable its pause failpoint so
|
||||
# that its shard split can run to completion.
|
||||
log.info("Disabling failpoint")
|
||||
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
|
||||
# on handling the shard split request.
|
||||
env.storage_controller.request(
|
||||
"PUT",
|
||||
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
|
||||
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
log.info("Awaiting step down")
|
||||
wait_until(previous_stepped_down)
|
||||
def previous_stepped_down():
|
||||
assert (
|
||||
env.storage_controller.get_leadership_status()
|
||||
== StorageControllerLeadershipStatus.STEPPED_DOWN
|
||||
)
|
||||
|
||||
# Let the shard split complete: this may happen _after_ the replacement has come up
|
||||
# and tried to clean up the databases
|
||||
log.info("Unblocking & awaiting shard split")
|
||||
with pytest.raises(Exception, match="Unexpected child shard count"):
|
||||
# This split fails when it tries to persist results, because it encounters
|
||||
# changes already made by the new controller's abort-on-startup
|
||||
split_fut.result()
|
||||
log.info("Awaiting step down")
|
||||
wait_until(previous_stepped_down)
|
||||
|
||||
# Let the shard split complete: this may happen _after_ the replacement has come up
|
||||
# and tried to clean up the databases
|
||||
log.info("Unblocking & awaiting shard split")
|
||||
with pytest.raises(Exception, match="Unexpected child shard count"):
|
||||
# This split fails when it tries to persist results, because it encounters
|
||||
# changes already made by the new controller's abort-on-startup
|
||||
split_fut.result()
|
||||
|
||||
log.info("Routing to new leader")
|
||||
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}")
|
||||
@@ -3020,13 +3058,14 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
env.storage_controller.wait_until_ready()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
# Check that the stepped down instance forwards requests
|
||||
# to the new leader while it's still running.
|
||||
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
|
||||
env.storage_controller.tenant_shard_dump()
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
|
||||
status = env.storage_controller.node_status(env.pageservers[0].id)
|
||||
assert status["scheduling"] == "Pause"
|
||||
if not step_down_times_out:
|
||||
# Check that the stepped down instance forwards requests
|
||||
# to the new leader while it's still running.
|
||||
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
|
||||
env.storage_controller.tenant_shard_dump()
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
|
||||
status = env.storage_controller.node_status(env.pageservers[0].id)
|
||||
assert status["scheduling"] == "Pause"
|
||||
|
||||
|
||||
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
Reference in New Issue
Block a user