diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs index f7f77cdd23..8738386968 100644 --- a/storage_controller/src/metrics.rs +++ b/storage_controller/src/metrics.rs @@ -76,6 +76,9 @@ pub(crate) struct StorageControllerMetricGroup { /// How many shards would like to reconcile but were blocked by concurrency limits pub(crate) storage_controller_pending_reconciles: measured::Gauge, + /// How many shards are keep-failing and will be ignored when considering to run optimizations + pub(crate) storage_controller_keep_failing_reconciles: measured::Gauge, + /// HTTP request status counters for handled requests pub(crate) storage_controller_http_request_status: measured::CounterVec, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index e4c494db8f..d76fcd6f7b 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -210,6 +210,10 @@ pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128; pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256; pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32; +// Number of consecutive reconciliation errors, occured for one shard, +// after which the shard is ignored when considering to run optimizations. +const MAX_CONSECUTIVE_RECONCILIATION_ERRORS: usize = 5; + // Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately. // This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly // than they're being pushed onto the queue. @@ -702,6 +706,36 @@ struct ShardMutationLocations { #[derive(Default, Clone)] struct TenantMutationLocations(BTreeMap); +struct ReconcileAllResult { + spawned_reconciles: usize, + keep_failing_reconciles: usize, + has_delayed_reconciles: bool, +} + +impl ReconcileAllResult { + fn new( + spawned_reconciles: usize, + keep_failing_reconciles: usize, + has_delayed_reconciles: bool, + ) -> Self { + assert!( + spawned_reconciles >= keep_failing_reconciles, + "It is impossible to have more keep-failing reconciles than spawned reconciles" + ); + Self { + spawned_reconciles, + keep_failing_reconciles, + has_delayed_reconciles, + } + } + + /// We can run optimizations only if we don't have any delayed reconciles and + /// all spawned reconciles are also keep-failing reconciles. + fn can_run_optimizations(&self) -> bool { + !self.has_delayed_reconciles && self.spawned_reconciles == self.keep_failing_reconciles + } +} + impl Service { pub fn get_config(&self) -> &Config { &self.config @@ -899,7 +933,7 @@ impl Service { // which require it: under normal circumstances this should only include tenants that were in some // transient state before we restarted, or any tenants whose compute hooks failed above. tracing::info!("Checking for shards in need of reconciliation..."); - let reconcile_tasks = self.reconcile_all(); + let reconcile_all_result = self.reconcile_all(); // We will not wait for these reconciliation tasks to run here: we're now done with startup and // normal operations may proceed. @@ -947,8 +981,9 @@ impl Service { } } + let spawned_reconciles = reconcile_all_result.spawned_reconciles; tracing::info!( - "Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)" + "Startup complete, spawned {spawned_reconciles} reconciliation tasks ({shard_count} shards total)" ); } @@ -1199,8 +1234,8 @@ impl Service { while !self.reconcilers_cancel.is_cancelled() { tokio::select! { _ = interval.tick() => { - let reconciles_spawned = self.reconcile_all(); - if reconciles_spawned == 0 { + let reconcile_all_result = self.reconcile_all(); + if reconcile_all_result.can_run_optimizations() { // Run optimizer only when we didn't find any other work to do self.optimize_all().await; } @@ -1408,6 +1443,7 @@ impl Service { match result.result { Ok(()) => { + tenant.consecutive_errors_count = 0; tenant.apply_observed_deltas(deltas); tenant.waiter.advance(result.sequence); } @@ -1426,6 +1462,8 @@ impl Service { } } + tenant.consecutive_errors_count = tenant.consecutive_errors_count.saturating_add(1); + // Ordering: populate last_error before advancing error_seq, // so that waiters will see the correct error after waiting. tenant.set_last_error(result.sequence, e); @@ -8026,7 +8064,7 @@ impl Service { /// Returns how many reconciliation tasks were started, or `1` if no reconciles were /// spawned but some _would_ have been spawned if `reconciler_concurrency` units where /// available. A return value of 0 indicates that everything is fully reconciled already. - fn reconcile_all(&self) -> usize { + fn reconcile_all(&self) -> ReconcileAllResult { let mut locked = self.inner.write().unwrap(); let (nodes, tenants, scheduler) = locked.parts_mut(); let pageservers = nodes.clone(); @@ -8034,13 +8072,16 @@ impl Service { // This function is an efficient place to update lazy statistics, since we are walking // all tenants. let mut pending_reconciles = 0; + let mut keep_failing_reconciles = 0; let mut az_violations = 0; // If we find any tenants to drop from memory, stash them to offload after // we're done traversing the map of tenants. let mut drop_detached_tenants = Vec::new(); - let mut reconciles_spawned = 0; + let mut spawned_reconciles = 0; + let mut has_delayed_reconciles = false; + for shard in tenants.values_mut() { // Accumulate scheduling statistics if let (Some(attached), Some(preferred)) = @@ -8060,18 +8101,32 @@ impl Service { // If there is something delayed, then return a nonzero count so that // callers like reconcile_all_now do not incorrectly get the impression // that the system is in a quiescent state. - reconciles_spawned = std::cmp::max(1, reconciles_spawned); + has_delayed_reconciles = true; pending_reconciles += 1; continue; } // Eventual consistency: if an earlier reconcile job failed, and the shard is still // dirty, spawn another one + let consecutive_errors_count = shard.consecutive_errors_count; if self .maybe_reconcile_shard(shard, &pageservers, ReconcilerPriority::Normal) .is_some() { - reconciles_spawned += 1; + spawned_reconciles += 1; + + // Count shards that are keep-failing. We still want to reconcile them + // to avoid a situation where a shard is stuck. + // But we don't want to consider them when deciding to run optimizations. + if consecutive_errors_count >= MAX_CONSECUTIVE_RECONCILIATION_ERRORS { + tracing::warn!( + tenant_id=%shard.tenant_shard_id.tenant_id, + shard_id=%shard.tenant_shard_id.shard_slug(), + "Shard reconciliation is keep-failing: {} errors", + consecutive_errors_count + ); + keep_failing_reconciles += 1; + } } else if shard.delayed_reconcile { // Shard wanted to reconcile but for some reason couldn't. pending_reconciles += 1; @@ -8110,7 +8165,16 @@ impl Service { .storage_controller_pending_reconciles .set(pending_reconciles as i64); - reconciles_spawned + metrics::METRICS_REGISTRY + .metrics_group + .storage_controller_keep_failing_reconciles + .set(keep_failing_reconciles as i64); + + ReconcileAllResult::new( + spawned_reconciles, + keep_failing_reconciles, + has_delayed_reconciles, + ) } /// `optimize` in this context means identifying shards which have valid scheduled locations, but @@ -8783,13 +8847,13 @@ impl Service { /// also wait for any generated Reconcilers to complete. Calling this until it returns zero should /// put the system into a quiescent state where future background reconciliations won't do anything. pub(crate) async fn reconcile_all_now(&self) -> Result { - let reconciles_spawned = self.reconcile_all(); - let reconciles_spawned = if reconciles_spawned == 0 { + let reconcile_all_result = self.reconcile_all(); + let mut spawned_reconciles = reconcile_all_result.spawned_reconciles; + if reconcile_all_result.can_run_optimizations() { // Only optimize when we are otherwise idle - self.optimize_all().await - } else { - reconciles_spawned - }; + let optimization_reconciles = self.optimize_all().await; + spawned_reconciles += optimization_reconciles; + } let waiters = { let mut waiters = Vec::new(); @@ -8826,11 +8890,11 @@ impl Service { tracing::info!( "{} reconciles in reconcile_all, {} waiters", - reconciles_spawned, + spawned_reconciles, waiter_count ); - Ok(std::cmp::max(waiter_count, reconciles_spawned)) + Ok(std::cmp::max(waiter_count, spawned_reconciles)) } async fn stop_reconciliations(&self, reason: StopReconciliationsReason) { diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 359921ecbf..0bfca5385e 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -131,6 +131,15 @@ pub(crate) struct TenantShard { #[serde(serialize_with = "read_last_error")] pub(crate) last_error: std::sync::Arc>>>, + /// Number of consecutive reconciliation errors that have occurred for this shard. + /// + /// When this count reaches MAX_CONSECUTIVE_RECONCILIATION_ERRORS, the tenant shard + /// will be countered as keep-failing in `reconcile_all` calculations. This will lead to + /// allowing optimizations to run even with some failing shards. + /// + /// The counter is reset to 0 after a successful reconciliation. + pub(crate) consecutive_errors_count: usize, + /// If we have a pending compute notification that for some reason we weren't able to send, /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes /// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope @@ -594,6 +603,7 @@ impl TenantShard { waiter: Arc::new(SeqWait::new(Sequence(0))), error_waiter: Arc::new(SeqWait::new(Sequence(0))), last_error: Arc::default(), + consecutive_errors_count: 0, pending_compute_notification: false, scheduling_policy: ShardSchedulingPolicy::default(), preferred_node: None, @@ -1859,6 +1869,7 @@ impl TenantShard { waiter: Arc::new(SeqWait::new(Sequence::initial())), error_waiter: Arc::new(SeqWait::new(Sequence::initial())), last_error: Arc::default(), + consecutive_errors_count: 0, pending_compute_notification: false, delayed_reconcile: false, scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(), diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 290ebe456b..17439fd811 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -989,6 +989,102 @@ def test_storage_controller_compute_hook_retry( ) +@run_only_on_default_postgres("postgres behavior is not relevant") +def test_storage_controller_compute_hook_keep_failing( + httpserver: HTTPServer, + neon_env_builder: NeonEnvBuilder, + httpserver_listen_address: ListenAddress, +): + neon_env_builder.num_pageservers = 4 + neon_env_builder.storage_controller_config = {"use_local_compute_notifications": False} + (host, port) = httpserver_listen_address + neon_env_builder.control_plane_hooks_api = f"http://{host}:{port}" + + # Set up CP handler for compute notifications + status_by_tenant: dict[TenantId, int] = {} + + def handler(request: Request): + notify_request = request.json + assert notify_request is not None + status = status_by_tenant[TenantId(notify_request["tenant_id"])] + log.info(f"Notify request[{status}]: {notify_request}") + return Response(status=status) + + httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler) + + # Run neon environment + env = neon_env_builder.init_configs() + env.start() + + # Create two tenants: + # - The first tenant is banned by CP and contains only one shard + # - The second tenant is allowed by CP and contains four shards + banned_tenant = TenantId.generate() + status_by_tenant[banned_tenant] = 200 # we will ban this tenant later + env.create_tenant(banned_tenant, placement_policy='{"Attached": 1}') + + shard_count = 4 + allowed_tenant = TenantId.generate() + status_by_tenant[allowed_tenant] = 200 + env.create_tenant(allowed_tenant, shard_count=shard_count, placement_policy='{"Attached": 1}') + + # Find the pageserver of the banned tenant + banned_tenant_ps = env.get_tenant_pageserver(banned_tenant) + assert banned_tenant_ps is not None + alive_pageservers = [p for p in env.pageservers if p.id != banned_tenant_ps.id] + + # Stop pageserver and ban tenant to trigger failed reconciliation + status_by_tenant[banned_tenant] = 423 + banned_tenant_ps.stop() + env.storage_controller.allowed_errors.append(NOTIFY_BLOCKED_LOG) + env.storage_controller.allowed_errors.extend(NOTIFY_FAILURE_LOGS) + env.storage_controller.allowed_errors.append(".*Shard reconciliation is keep-failing.*") + env.storage_controller.node_configure(banned_tenant_ps.id, {"availability": "Offline"}) + + # Migrate all allowed tenant shards to the first alive pageserver + # to trigger storage controller optimizations due to affinity rules + for shard_number in range(shard_count): + env.storage_controller.tenant_shard_migrate( + TenantShardId(allowed_tenant, shard_number, shard_count), + alive_pageservers[0].id, + config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True), + ) + + # Make some reconcile_all calls to trigger optimizations + # RECONCILE_COUNT must be greater than storcon's MAX_CONSECUTIVE_RECONCILIATION_ERRORS + RECONCILE_COUNT = 12 + for i in range(RECONCILE_COUNT): + try: + n = env.storage_controller.reconcile_all() + log.info(f"Reconciliation attempt {i} finished with success: {n}") + except StorageControllerApiException as e: + assert "Control plane tenant busy" in str(e) + log.info(f"Reconciliation attempt {i} finished with failure") + + banned_descr = env.storage_controller.tenant_describe(banned_tenant) + assert banned_descr["shards"][0]["is_pending_compute_notification"] is True + time.sleep(2) + + # Check that the allowed tenant shards are optimized due to affinity rules + locations = alive_pageservers[0].http_client().tenant_list_locations()["tenant_shards"] + not_optimized_shard_count = 0 + for loc in locations: + tsi = TenantShardId.parse(loc[0]) + if tsi.tenant_id != allowed_tenant: + continue + if loc[1]["mode"] == "AttachedSingle": + not_optimized_shard_count += 1 + log.info(f"Shard {tsi} seen in mode {loc[1]['mode']}") + + assert not_optimized_shard_count < shard_count, "At least one shard should be optimized" + + # Unban the tenant and run reconciliations + status_by_tenant[banned_tenant] = 200 + env.storage_controller.reconcile_all() + banned_descr = env.storage_controller.tenant_describe(banned_tenant) + assert banned_descr["shards"][0]["is_pending_compute_notification"] is False + + @run_only_on_default_postgres("this test doesn't start an endpoint") def test_storage_controller_compute_hook_revert( httpserver: HTTPServer,