storcon: Ignore keep-failing reconciles (#12391)

## Problem

Currently, if `storcon` (storage controller) reconciliations repeatedly
fail, the system will indefinitely freeze optimizations. This can result
in optimization starvation for several days until the reconciliation
issues are manually resolved. To mitigate this, we should detect
persistently failing reconciliations and exclude them from influencing
the optimization decision.

## Summary of Changes

- A tenant shard reconciliation is now considered "keep-failing" if it
fails 5 consecutive times. These failures are excluded from the
optimization readiness check.
- Added a new metric: `storage_controller_keep_failing_reconciles` to
monitor such cases.
- Added a warning log message when a reconciliation is marked as
"keep-failing".

---------

Co-authored-by: Aleksandr Sarantsev <aleksandr.sarantsev@databricks.com>
This commit is contained in:
Aleksandr Sarantsev
2025-07-03 20:21:36 +04:00
committed by GitHub
parent 3ed28661b1
commit b96983a31c
4 changed files with 191 additions and 17 deletions

View File

@@ -76,6 +76,9 @@ pub(crate) struct StorageControllerMetricGroup {
/// How many shards would like to reconcile but were blocked by concurrency limits
pub(crate) storage_controller_pending_reconciles: measured::Gauge,
/// How many shards are keep-failing and will be ignored when considering to run optimizations
pub(crate) storage_controller_keep_failing_reconciles: measured::Gauge,
/// HTTP request status counters for handled requests
pub(crate) storage_controller_http_request_status:
measured::CounterVec<HttpRequestStatusLabelGroupSet>,

View File

@@ -210,6 +210,10 @@ pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32;
// Number of consecutive reconciliation errors, occured for one shard,
// after which the shard is ignored when considering to run optimizations.
const MAX_CONSECUTIVE_RECONCILIATION_ERRORS: usize = 5;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
// than they're being pushed onto the queue.
@@ -702,6 +706,36 @@ struct ShardMutationLocations {
#[derive(Default, Clone)]
struct TenantMutationLocations(BTreeMap<TenantShardId, ShardMutationLocations>);
struct ReconcileAllResult {
spawned_reconciles: usize,
keep_failing_reconciles: usize,
has_delayed_reconciles: bool,
}
impl ReconcileAllResult {
fn new(
spawned_reconciles: usize,
keep_failing_reconciles: usize,
has_delayed_reconciles: bool,
) -> Self {
assert!(
spawned_reconciles >= keep_failing_reconciles,
"It is impossible to have more keep-failing reconciles than spawned reconciles"
);
Self {
spawned_reconciles,
keep_failing_reconciles,
has_delayed_reconciles,
}
}
/// We can run optimizations only if we don't have any delayed reconciles and
/// all spawned reconciles are also keep-failing reconciles.
fn can_run_optimizations(&self) -> bool {
!self.has_delayed_reconciles && self.spawned_reconciles == self.keep_failing_reconciles
}
}
impl Service {
pub fn get_config(&self) -> &Config {
&self.config
@@ -899,7 +933,7 @@ impl Service {
// which require it: under normal circumstances this should only include tenants that were in some
// transient state before we restarted, or any tenants whose compute hooks failed above.
tracing::info!("Checking for shards in need of reconciliation...");
let reconcile_tasks = self.reconcile_all();
let reconcile_all_result = self.reconcile_all();
// We will not wait for these reconciliation tasks to run here: we're now done with startup and
// normal operations may proceed.
@@ -947,8 +981,9 @@ impl Service {
}
}
let spawned_reconciles = reconcile_all_result.spawned_reconciles;
tracing::info!(
"Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"
"Startup complete, spawned {spawned_reconciles} reconciliation tasks ({shard_count} shards total)"
);
}
@@ -1199,8 +1234,8 @@ impl Service {
while !self.reconcilers_cancel.is_cancelled() {
tokio::select! {
_ = interval.tick() => {
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
let reconcile_all_result = self.reconcile_all();
if reconcile_all_result.can_run_optimizations() {
// Run optimizer only when we didn't find any other work to do
self.optimize_all().await;
}
@@ -1408,6 +1443,7 @@ impl Service {
match result.result {
Ok(()) => {
tenant.consecutive_errors_count = 0;
tenant.apply_observed_deltas(deltas);
tenant.waiter.advance(result.sequence);
}
@@ -1426,6 +1462,8 @@ impl Service {
}
}
tenant.consecutive_errors_count = tenant.consecutive_errors_count.saturating_add(1);
// Ordering: populate last_error before advancing error_seq,
// so that waiters will see the correct error after waiting.
tenant.set_last_error(result.sequence, e);
@@ -8026,7 +8064,7 @@ impl Service {
/// Returns how many reconciliation tasks were started, or `1` if no reconciles were
/// spawned but some _would_ have been spawned if `reconciler_concurrency` units where
/// available. A return value of 0 indicates that everything is fully reconciled already.
fn reconcile_all(&self) -> usize {
fn reconcile_all(&self) -> ReconcileAllResult {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
@@ -8034,13 +8072,16 @@ impl Service {
// This function is an efficient place to update lazy statistics, since we are walking
// all tenants.
let mut pending_reconciles = 0;
let mut keep_failing_reconciles = 0;
let mut az_violations = 0;
// If we find any tenants to drop from memory, stash them to offload after
// we're done traversing the map of tenants.
let mut drop_detached_tenants = Vec::new();
let mut reconciles_spawned = 0;
let mut spawned_reconciles = 0;
let mut has_delayed_reconciles = false;
for shard in tenants.values_mut() {
// Accumulate scheduling statistics
if let (Some(attached), Some(preferred)) =
@@ -8060,18 +8101,32 @@ impl Service {
// If there is something delayed, then return a nonzero count so that
// callers like reconcile_all_now do not incorrectly get the impression
// that the system is in a quiescent state.
reconciles_spawned = std::cmp::max(1, reconciles_spawned);
has_delayed_reconciles = true;
pending_reconciles += 1;
continue;
}
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another one
let consecutive_errors_count = shard.consecutive_errors_count;
if self
.maybe_reconcile_shard(shard, &pageservers, ReconcilerPriority::Normal)
.is_some()
{
reconciles_spawned += 1;
spawned_reconciles += 1;
// Count shards that are keep-failing. We still want to reconcile them
// to avoid a situation where a shard is stuck.
// But we don't want to consider them when deciding to run optimizations.
if consecutive_errors_count >= MAX_CONSECUTIVE_RECONCILIATION_ERRORS {
tracing::warn!(
tenant_id=%shard.tenant_shard_id.tenant_id,
shard_id=%shard.tenant_shard_id.shard_slug(),
"Shard reconciliation is keep-failing: {} errors",
consecutive_errors_count
);
keep_failing_reconciles += 1;
}
} else if shard.delayed_reconcile {
// Shard wanted to reconcile but for some reason couldn't.
pending_reconciles += 1;
@@ -8110,7 +8165,16 @@ impl Service {
.storage_controller_pending_reconciles
.set(pending_reconciles as i64);
reconciles_spawned
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_keep_failing_reconciles
.set(keep_failing_reconciles as i64);
ReconcileAllResult::new(
spawned_reconciles,
keep_failing_reconciles,
has_delayed_reconciles,
)
}
/// `optimize` in this context means identifying shards which have valid scheduled locations, but
@@ -8783,13 +8847,13 @@ impl Service {
/// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
/// put the system into a quiescent state where future background reconciliations won't do anything.
pub(crate) async fn reconcile_all_now(&self) -> Result<usize, ReconcileWaitError> {
let reconciles_spawned = self.reconcile_all();
let reconciles_spawned = if reconciles_spawned == 0 {
let reconcile_all_result = self.reconcile_all();
let mut spawned_reconciles = reconcile_all_result.spawned_reconciles;
if reconcile_all_result.can_run_optimizations() {
// Only optimize when we are otherwise idle
self.optimize_all().await
} else {
reconciles_spawned
};
let optimization_reconciles = self.optimize_all().await;
spawned_reconciles += optimization_reconciles;
}
let waiters = {
let mut waiters = Vec::new();
@@ -8826,11 +8890,11 @@ impl Service {
tracing::info!(
"{} reconciles in reconcile_all, {} waiters",
reconciles_spawned,
spawned_reconciles,
waiter_count
);
Ok(std::cmp::max(waiter_count, reconciles_spawned))
Ok(std::cmp::max(waiter_count, spawned_reconciles))
}
async fn stop_reconciliations(&self, reason: StopReconciliationsReason) {

View File

@@ -131,6 +131,15 @@ pub(crate) struct TenantShard {
#[serde(serialize_with = "read_last_error")]
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
/// Number of consecutive reconciliation errors that have occurred for this shard.
///
/// When this count reaches MAX_CONSECUTIVE_RECONCILIATION_ERRORS, the tenant shard
/// will be countered as keep-failing in `reconcile_all` calculations. This will lead to
/// allowing optimizations to run even with some failing shards.
///
/// The counter is reset to 0 after a successful reconciliation.
pub(crate) consecutive_errors_count: usize,
/// If we have a pending compute notification that for some reason we weren't able to send,
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
/// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
@@ -594,6 +603,7 @@ impl TenantShard {
waiter: Arc::new(SeqWait::new(Sequence(0))),
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(),
consecutive_errors_count: 0,
pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
preferred_node: None,
@@ -1859,6 +1869,7 @@ impl TenantShard {
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
consecutive_errors_count: 0,
pending_compute_notification: false,
delayed_reconcile: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),

View File

@@ -989,6 +989,102 @@ def test_storage_controller_compute_hook_retry(
)
@run_only_on_default_postgres("postgres behavior is not relevant")
def test_storage_controller_compute_hook_keep_failing(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address: ListenAddress,
):
neon_env_builder.num_pageservers = 4
neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False}
(host, port) = httpserver_listen_address
neon_env_builder.control_plane_hooks_api = f"http://{host}:{port}"
# Set up CP handler for compute notifications
status_by_tenant: dict[TenantId, int] = {}
def handler(request: Request):
notify_request = request.json
assert notify_request is not None
status = status_by_tenant[TenantId(notify_request["tenant_id"])]
log.info(f"Notify request[{status}]: {notify_request}")
return Response(status=status)
httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler)
# Run neon environment
env = neon_env_builder.init_configs()
env.start()
# Create two tenants:
# - The first tenant is banned by CP and contains only one shard
# - The second tenant is allowed by CP and contains four shards
banned_tenant = TenantId.generate()
status_by_tenant[banned_tenant] = 200 # we will ban this tenant later
env.create_tenant(banned_tenant, placement_policy='{"Attached": 1}')
shard_count = 4
allowed_tenant = TenantId.generate()
status_by_tenant[allowed_tenant] = 200
env.create_tenant(allowed_tenant, shard_count=shard_count, placement_policy='{"Attached": 1}')
# Find the pageserver of the banned tenant
banned_tenant_ps = env.get_tenant_pageserver(banned_tenant)
assert banned_tenant_ps is not None
alive_pageservers = [p for p in env.pageservers if p.id != banned_tenant_ps.id]
# Stop pageserver and ban tenant to trigger failed reconciliation
status_by_tenant[banned_tenant] = 423
banned_tenant_ps.stop()
env.storage_controller.allowed_errors.append(NOTIFY_BLOCKED_LOG)
env.storage_controller.allowed_errors.extend(NOTIFY_FAILURE_LOGS)
env.storage_controller.allowed_errors.append(".*Shard reconciliation is keep-failing.*")
env.storage_controller.node_configure(banned_tenant_ps.id, {"availability": "Offline"})
# Migrate all allowed tenant shards to the first alive pageserver
# to trigger storage controller optimizations due to affinity rules
for shard_number in range(shard_count):
env.storage_controller.tenant_shard_migrate(
TenantShardId(allowed_tenant, shard_number, shard_count),
alive_pageservers[0].id,
config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True),
)
# Make some reconcile_all calls to trigger optimizations
# RECONCILE_COUNT must be greater than storcon's MAX_CONSECUTIVE_RECONCILIATION_ERRORS
RECONCILE_COUNT = 12
for i in range(RECONCILE_COUNT):
try:
n = env.storage_controller.reconcile_all()
log.info(f"Reconciliation attempt {i} finished with success: {n}")
except StorageControllerApiException as e:
assert "Control plane tenant busy" in str(e)
log.info(f"Reconciliation attempt {i} finished with failure")
banned_descr = env.storage_controller.tenant_describe(banned_tenant)
assert banned_descr["shards"][0]["is_pending_compute_notification"] is True
time.sleep(2)
# Check that the allowed tenant shards are optimized due to affinity rules
locations = alive_pageservers[0].http_client().tenant_list_locations()["tenant_shards"]
not_optimized_shard_count = 0
for loc in locations:
tsi = TenantShardId.parse(loc[0])
if tsi.tenant_id != allowed_tenant:
continue
if loc[1]["mode"] == "AttachedSingle":
not_optimized_shard_count += 1
log.info(f"Shard {tsi} seen in mode {loc[1]['mode']}")
assert not_optimized_shard_count < shard_count, "At least one shard should be optimized"
# Unban the tenant and run reconciliations
status_by_tenant[banned_tenant] = 200
env.storage_controller.reconcile_all()
banned_descr = env.storage_controller.tenant_describe(banned_tenant)
assert banned_descr["shards"][0]["is_pending_compute_notification"] is False
@run_only_on_default_postgres("this test doesn't start an endpoint")
def test_storage_controller_compute_hook_revert(
httpserver: HTTPServer,