diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 4028cd7023..3278ff9cf6 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -413,6 +413,7 @@ pub struct Service { // Limit how many Reconcilers we will spawn concurrently reconciler_concurrency: Arc, + prio_reconciler_concurrency: Arc, /// Queue of tenants who are waiting for concurrency limits to permit them to reconcile /// Send into this queue to promptly attempt to reconcile this shard next time units are available. @@ -1189,7 +1190,7 @@ impl Service { let (nodes, tenants, _scheduler) = locked.parts_mut(); if let Some(shard) = tenants.get_mut(&tenant_shard_id) { shard.delayed_reconcile = false; - self.maybe_reconcile_shard(shard, nodes); + self.maybe_reconcile_shard(shard, nodes, false); } if self.reconciler_concurrency.available_permits() == 0 { @@ -1466,6 +1467,7 @@ impl Service { reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new( config.reconciler_concurrency, )), + prio_reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(1024)), delayed_reconcile_tx, abort_tx, startup_complete: startup_complete.clone(), @@ -2244,7 +2246,7 @@ impl Service { tenants .range_mut(TenantShardId::tenant_range(tenant_id)) .filter_map(|(_shard_id, shard)| { - self.maybe_configured_reconcile_shard(shard, nodes, config) + self.maybe_configured_reconcile_shard(shard, nodes, config, true) }) .collect::>() }; @@ -2713,7 +2715,7 @@ impl Service { shard.schedule(scheduler, &mut schedule_context)?; - let maybe_waiter = self.maybe_reconcile_shard(shard, nodes); + let maybe_waiter = self.maybe_reconcile_shard(shard, nodes, true); if let Some(waiter) = maybe_waiter { waiters.push(waiter); } @@ -2834,7 +2836,7 @@ impl Service { let (nodes, tenants, _scheduler) = locked.parts_mut(); for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { shard.config = config.clone(); - if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) { + if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes, true) { waiters.push(waiter); } } @@ -3116,7 +3118,7 @@ impl Service { debug_assert!(shard.intent.get_attached().is_none()); debug_assert!(shard.intent.get_secondary().is_empty()); - if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) { + if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes, true) { detach_waiters.push(waiter); } } @@ -3268,7 +3270,7 @@ impl Service { // In case scheduling is being switched back on, try it now. shard.schedule(scheduler, &mut schedule_context).ok(); - self.maybe_reconcile_shard(shard, nodes); + self.maybe_reconcile_shard(shard, nodes, true); } Ok(()) @@ -4317,7 +4319,7 @@ impl Service { tracing::warn!("Failed to schedule {tenant_shard_id} during shard abort: {e}") } - self.maybe_reconcile_shard(shard, nodes); + self.maybe_reconcile_shard(shard, nodes, true); } // We don't expect any new_shard_count shards to exist here, but drop them just in case @@ -4483,7 +4485,8 @@ impl Service { tracing::warn!("Failed to schedule child shard {child}: {e}"); } // In the background, attach secondary locations for the new shards - if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes) { + if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes, true) + { waiters.push(waiter); } @@ -4848,7 +4851,7 @@ impl Service { shard.intent.clear_secondary(scheduler); // Run Reconciler to execute detach fo secondary locations. - if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) { + if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes, true) { waiters.push(waiter); } } @@ -5114,7 +5117,7 @@ impl Service { shard.sequence = shard.sequence.next(); } - self.maybe_reconcile_shard(shard, nodes) + self.maybe_reconcile_shard(shard, nodes, true) }; if let Some(waiter) = waiter { @@ -5177,7 +5180,7 @@ impl Service { ); } - self.maybe_reconcile_shard(shard, nodes) + self.maybe_reconcile_shard(shard, nodes, true) }; if let Some(waiter) = waiter { @@ -5589,7 +5592,7 @@ impl Service { ) } - self.maybe_reconcile_shard(shard, nodes); + self.maybe_reconcile_shard(shard, nodes, true); } // Here we remove an existing observed location for the node we're removing, and it will @@ -5958,7 +5961,10 @@ impl Service { tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id); } Ok(()) => { - if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() { + if self + .maybe_reconcile_shard(tenant_shard, nodes, false) + .is_some() + { tenants_affected += 1; }; } @@ -5989,7 +5995,7 @@ impl Service { if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) { if observed_loc.conf.is_none() { - self.maybe_reconcile_shard(tenant_shard, nodes); + self.maybe_reconcile_shard(tenant_shard, nodes, false); } } } @@ -6353,8 +6359,14 @@ impl Service { &self, shard: &mut TenantShard, nodes: &Arc>, + high_priority: bool, ) -> Option { - self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::default()) + self.maybe_configured_reconcile_shard( + shard, + nodes, + ReconcilerConfig::default(), + high_priority, + ) } /// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`], @@ -6363,6 +6375,7 @@ impl Service { shard: &mut TenantShard, nodes: &Arc>, reconciler_config: ReconcilerConfig, + high_priority: bool, ) -> Option { let reconcile_needed = shard.get_reconcile_needed(nodes); @@ -6374,7 +6387,13 @@ impl Service { } }; - let units = match self.reconciler_concurrency.clone().try_acquire_owned() { + let acq = if high_priority { + self.prio_reconciler_concurrency.clone().try_acquire_owned() + } else { + self.reconciler_concurrency.clone().try_acquire_owned() + }; + + let units = match acq { Ok(u) => ReconcileUnits::new(u), Err(_) => { tracing::info!(tenant_id=%shard.tenant_shard_id.tenant_id, shard_id=%shard.tenant_shard_id.shard_slug(), @@ -6468,7 +6487,10 @@ impl Service { // Eventual consistency: if an earlier reconcile job failed, and the shard is still // dirty, spawn another rone - if self.maybe_reconcile_shard(shard, &pageservers).is_some() { + if self + .maybe_reconcile_shard(shard, &pageservers, false) + .is_some() + { reconciles_spawned += 1; } else if shard.delayed_reconcile { // Shard wanted to reconcile but for some reason couldn't. @@ -6554,7 +6576,7 @@ impl Service { tracing::info!(tenant_shard_id=%tenant_shard_id, "Applying optimization: {optimization:?}"); if shard.apply_optimization(scheduler, optimization) { optimizations_applied += 1; - if self.maybe_reconcile_shard(shard, nodes).is_some() { + if self.maybe_reconcile_shard(shard, nodes, false).is_some() { reconciles_spawned += 1; } } @@ -7221,6 +7243,7 @@ impl Service { tenant_shard, nodes, reconciler_config, + false, ); if let Some(some) = waiter { waiters.push(some); @@ -7513,6 +7536,7 @@ impl Service { tenant_shard, nodes, reconciler_config, + false, ) { waiters.push(waiter); } diff --git a/storage_controller/src/service/chaos_injector.rs b/storage_controller/src/service/chaos_injector.rs index 91d7183fde..5b08aafcf8 100644 --- a/storage_controller/src/service/chaos_injector.rs +++ b/storage_controller/src/service/chaos_injector.rs @@ -88,7 +88,7 @@ impl ChaosInjector { shard.intent.demote_attached(scheduler, old_location); shard.intent.promote_attached(scheduler, new_location); - self.service.maybe_reconcile_shard(shard, nodes); + self.service.maybe_reconcile_shard(shard, nodes, false); } async fn inject_chaos(&mut self) {