storcon: Skip the queue for request-initiated reconciles

This commit is contained in:
John Spray
2025-02-13 21:54:56 +01:00
parent a54853abd5
commit 6b5118313e
2 changed files with 43 additions and 19 deletions

View File

@@ -413,6 +413,7 @@ pub struct Service {
// Limit how many Reconcilers we will spawn concurrently
reconciler_concurrency: Arc<tokio::sync::Semaphore>,
prio_reconciler_concurrency: Arc<tokio::sync::Semaphore>,
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
@@ -1189,7 +1190,7 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
if let Some(shard) = tenants.get_mut(&tenant_shard_id) {
shard.delayed_reconcile = false;
self.maybe_reconcile_shard(shard, nodes);
self.maybe_reconcile_shard(shard, nodes, false);
}
if self.reconciler_concurrency.available_permits() == 0 {
@@ -1466,6 +1467,7 @@ impl Service {
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.reconciler_concurrency,
)),
prio_reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(1024)),
delayed_reconcile_tx,
abort_tx,
startup_complete: startup_complete.clone(),
@@ -2244,7 +2246,7 @@ impl Service {
tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.filter_map(|(_shard_id, shard)| {
self.maybe_configured_reconcile_shard(shard, nodes, config)
self.maybe_configured_reconcile_shard(shard, nodes, config, true)
})
.collect::<Vec<_>>()
};
@@ -2713,7 +2715,7 @@ impl Service {
shard.schedule(scheduler, &mut schedule_context)?;
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes, true);
if let Some(waiter) = maybe_waiter {
waiters.push(waiter);
}
@@ -2834,7 +2836,7 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.config = config.clone();
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes, true) {
waiters.push(waiter);
}
}
@@ -3116,7 +3118,7 @@ impl Service {
debug_assert!(shard.intent.get_attached().is_none());
debug_assert!(shard.intent.get_secondary().is_empty());
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes, true) {
detach_waiters.push(waiter);
}
}
@@ -3268,7 +3270,7 @@ impl Service {
// In case scheduling is being switched back on, try it now.
shard.schedule(scheduler, &mut schedule_context).ok();
self.maybe_reconcile_shard(shard, nodes);
self.maybe_reconcile_shard(shard, nodes, true);
}
Ok(())
@@ -4317,7 +4319,7 @@ impl Service {
tracing::warn!("Failed to schedule {tenant_shard_id} during shard abort: {e}")
}
self.maybe_reconcile_shard(shard, nodes);
self.maybe_reconcile_shard(shard, nodes, true);
}
// We don't expect any new_shard_count shards to exist here, but drop them just in case
@@ -4483,7 +4485,8 @@ impl Service {
tracing::warn!("Failed to schedule child shard {child}: {e}");
}
// In the background, attach secondary locations for the new shards
if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes) {
if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes, true)
{
waiters.push(waiter);
}
@@ -4848,7 +4851,7 @@ impl Service {
shard.intent.clear_secondary(scheduler);
// Run Reconciler to execute detach fo secondary locations.
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes, true) {
waiters.push(waiter);
}
}
@@ -5114,7 +5117,7 @@ impl Service {
shard.sequence = shard.sequence.next();
}
self.maybe_reconcile_shard(shard, nodes)
self.maybe_reconcile_shard(shard, nodes, true)
};
if let Some(waiter) = waiter {
@@ -5177,7 +5180,7 @@ impl Service {
);
}
self.maybe_reconcile_shard(shard, nodes)
self.maybe_reconcile_shard(shard, nodes, true)
};
if let Some(waiter) = waiter {
@@ -5589,7 +5592,7 @@ impl Service {
)
}
self.maybe_reconcile_shard(shard, nodes);
self.maybe_reconcile_shard(shard, nodes, true);
}
// Here we remove an existing observed location for the node we're removing, and it will
@@ -5958,7 +5961,10 @@ impl Service {
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
if self
.maybe_reconcile_shard(tenant_shard, nodes, false)
.is_some()
{
tenants_affected += 1;
};
}
@@ -5989,7 +5995,7 @@ impl Service {
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
if observed_loc.conf.is_none() {
self.maybe_reconcile_shard(tenant_shard, nodes);
self.maybe_reconcile_shard(tenant_shard, nodes, false);
}
}
}
@@ -6353,8 +6359,14 @@ impl Service {
&self,
shard: &mut TenantShard,
nodes: &Arc<HashMap<NodeId, Node>>,
high_priority: bool,
) -> Option<ReconcilerWaiter> {
self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::default())
self.maybe_configured_reconcile_shard(
shard,
nodes,
ReconcilerConfig::default(),
high_priority,
)
}
/// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`],
@@ -6363,6 +6375,7 @@ impl Service {
shard: &mut TenantShard,
nodes: &Arc<HashMap<NodeId, Node>>,
reconciler_config: ReconcilerConfig,
high_priority: bool,
) -> Option<ReconcilerWaiter> {
let reconcile_needed = shard.get_reconcile_needed(nodes);
@@ -6374,7 +6387,13 @@ impl Service {
}
};
let units = match self.reconciler_concurrency.clone().try_acquire_owned() {
let acq = if high_priority {
self.prio_reconciler_concurrency.clone().try_acquire_owned()
} else {
self.reconciler_concurrency.clone().try_acquire_owned()
};
let units = match acq {
Ok(u) => ReconcileUnits::new(u),
Err(_) => {
tracing::info!(tenant_id=%shard.tenant_shard_id.tenant_id, shard_id=%shard.tenant_shard_id.shard_slug(),
@@ -6468,7 +6487,10 @@ impl Service {
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another rone
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
if self
.maybe_reconcile_shard(shard, &pageservers, false)
.is_some()
{
reconciles_spawned += 1;
} else if shard.delayed_reconcile {
// Shard wanted to reconcile but for some reason couldn't.
@@ -6554,7 +6576,7 @@ impl Service {
tracing::info!(tenant_shard_id=%tenant_shard_id, "Applying optimization: {optimization:?}");
if shard.apply_optimization(scheduler, optimization) {
optimizations_applied += 1;
if self.maybe_reconcile_shard(shard, nodes).is_some() {
if self.maybe_reconcile_shard(shard, nodes, false).is_some() {
reconciles_spawned += 1;
}
}
@@ -7221,6 +7243,7 @@ impl Service {
tenant_shard,
nodes,
reconciler_config,
false,
);
if let Some(some) = waiter {
waiters.push(some);
@@ -7513,6 +7536,7 @@ impl Service {
tenant_shard,
nodes,
reconciler_config,
false,
) {
waiters.push(waiter);
}

View File

@@ -88,7 +88,7 @@ impl ChaosInjector {
shard.intent.demote_attached(scheduler, old_location);
shard.intent.promote_attached(scheduler, new_location);
self.service.maybe_reconcile_shard(shard, nodes);
self.service.maybe_reconcile_shard(shard, nodes, false);
}
async fn inject_chaos(&mut self) {