storage controller: prioritize reconciles for user-facing operations (#10822)

## Problem

Some situations may produce a large number of pending reconciles. If we
experience an issue where reconciles are processed more slowly than
expected, that can prevent us responding promptly to user requests like
tenant/timeline CRUD.

This is a cleaner implementation of the hotfix in
https://github.com/neondatabase/neon/pull/10815

## Summary of changes

- Introduce a second semaphore for high priority tasks, with
configurable units (default 256). The intent is that in practical
situations these user-facing requests should never have to wait.
- Use the high priority semaphore for: tenant/timeline CRUD, and shard
splitting operations. Use normal priority for everything else.
This commit is contained in:
John Spray
2025-02-14 13:25:43 +00:00
committed by GitHub
parent da7496e1ee
commit a82a6631fd
4 changed files with 143 additions and 32 deletions

View File

@@ -12,7 +12,8 @@ use storage_controller::persistence::Persistence;
use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::{
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
};
use tokio::signal::unix::SignalKind;
use tokio_util::sync::CancellationToken;
@@ -75,10 +76,14 @@ struct Cli {
#[arg(long)]
split_threshold: Option<u64>,
/// Maximum number of reconcilers that may run in parallel
/// Maximum number of normal-priority reconcilers that may run in parallel
#[arg(long)]
reconciler_concurrency: Option<usize>,
/// Maximum number of high-priority reconcilers that may run in parallel
#[arg(long)]
priority_reconciler_concurrency: Option<usize>,
/// How long to wait for the initial database connection to be available.
#[arg(long, default_value = "5s")]
db_connect_timeout: humantime::Duration,
@@ -289,6 +294,9 @@ async fn async_main() -> anyhow::Result<()> {
reconciler_concurrency: args
.reconciler_concurrency
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
priority_reconciler_concurrency: args
.priority_reconciler_concurrency
.unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT),
split_threshold: args.split_threshold,
neon_local_repo_dir: args.neon_local_repo_dir,
max_secondary_lag_bytes: args.max_secondary_lag_bytes,

View File

@@ -91,9 +91,10 @@ pub(crate) struct ReconcilerConfigBuilder {
}
impl ReconcilerConfigBuilder {
pub(crate) fn new() -> Self {
/// Priority is special: you must pick one thoughtfully, do not just use 'normal' as the default
pub(crate) fn new(priority: ReconcilerPriority) -> Self {
Self {
config: ReconcilerConfig::default(),
config: ReconcilerConfig::new(priority),
}
}
@@ -129,8 +130,18 @@ impl ReconcilerConfigBuilder {
}
}
#[derive(Default, Debug, Copy, Clone)]
// Higher priorities are used for user-facing tasks, so that a long backlog of housekeeping work (e.g. reconciling on startup, rescheduling
// things on node changes) does not starve user-facing tasks.
#[derive(Debug, Copy, Clone)]
pub(crate) enum ReconcilerPriority {
Normal,
High,
}
#[derive(Debug, Copy, Clone)]
pub(crate) struct ReconcilerConfig {
pub(crate) priority: ReconcilerPriority,
// During live migration give up on warming-up the secondary
// after this timeout.
secondary_warmup_timeout: Option<Duration>,
@@ -145,6 +156,18 @@ pub(crate) struct ReconcilerConfig {
}
impl ReconcilerConfig {
/// Configs are always constructed with an explicit priority, to force callers to think about whether
/// the operation they're scheduling is high-priority or not. Normal priority is not a safe default, because
/// scheduling something user-facing at normal priority can result in it getting starved out by background work.
pub(crate) fn new(priority: ReconcilerPriority) -> Self {
Self {
priority,
secondary_warmup_timeout: None,
secondary_download_request_timeout: None,
tenant_creation_hint: false,
}
}
pub(crate) fn get_secondary_warmup_timeout(&self) -> Duration {
const SECONDARY_WARMUP_TIMEOUT_DEFAULT: Duration = Duration::from_secs(300);
self.secondary_warmup_timeout
@@ -164,7 +187,9 @@ impl ReconcilerConfig {
impl From<&MigrationConfig> for ReconcilerConfig {
fn from(value: &MigrationConfig) -> Self {
let mut builder = ReconcilerConfigBuilder::new();
// Run reconciler at high priority because MigrationConfig comes from human requests that should
// be presumed urgent.
let mut builder = ReconcilerConfigBuilder::new(ReconcilerPriority::High);
if let Some(timeout) = value.secondary_warmup_timeout {
builder = builder.secondary_warmup_timeout(timeout)

View File

@@ -30,7 +30,10 @@ use crate::{
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
ShardGenerationState, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
reconciler::{
ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder,
ReconcilerPriority,
},
safekeeper::Safekeeper,
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
@@ -79,7 +82,7 @@ use pageserver_api::{
},
};
use pageserver_client::{mgmt_api, BlockUnblock};
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc::error::TrySendError, TryAcquireError};
use tokio_util::sync::CancellationToken;
use utils::{
completion::Barrier,
@@ -195,6 +198,7 @@ pub(crate) enum LeadershipStatus {
}
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
@@ -366,9 +370,12 @@ pub struct Config {
/// and/or upon handling the re-attach request from a node.
pub max_warming_up_interval: Duration,
/// How many Reconcilers may be spawned concurrently
/// How many normal-priority Reconcilers may be spawned concurrently
pub reconciler_concurrency: usize,
/// How many high-priority Reconcilers may be spawned concurrently
pub priority_reconciler_concurrency: usize,
/// How large must a shard grow in bytes before we split it?
/// None disables auto-splitting.
pub split_threshold: Option<u64>,
@@ -436,9 +443,14 @@ pub struct Service {
// that transition it to/from Active.
node_op_locks: IdLockMap<NodeId, NodeOperations>,
// Limit how many Reconcilers we will spawn concurrently
// Limit how many Reconcilers we will spawn concurrently for normal-priority tasks such as background reconciliations
// and reconciliation on startup.
reconciler_concurrency: Arc<tokio::sync::Semaphore>,
// Limit how many Reconcilers we will spawn concurrently for high-priority tasks such as tenant/timeline CRUD, which
// a human user might be waiting for.
priority_reconciler_concurrency: Arc<tokio::sync::Semaphore>,
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
///
@@ -1263,12 +1275,15 @@ impl Service {
}
// Maybe some other work can proceed now that this job finished.
//
// Only bother with this if we have some semaphore units available in the normal-priority semaphore (these
// reconciles are scheduled at `[ReconcilerPriority::Normal]`).
if self.reconciler_concurrency.available_permits() > 0 {
while let Ok(tenant_shard_id) = locked.delayed_reconcile_rx.try_recv() {
let (nodes, tenants, _scheduler) = locked.parts_mut();
if let Some(shard) = tenants.get_mut(&tenant_shard_id) {
shard.delayed_reconcile = false;
self.maybe_reconcile_shard(shard, nodes);
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::Normal);
}
if self.reconciler_concurrency.available_permits() == 0 {
@@ -1565,6 +1580,9 @@ impl Service {
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.reconciler_concurrency,
)),
priority_reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.priority_reconciler_concurrency,
)),
delayed_reconcile_tx,
abort_tx,
startup_complete: startup_complete.clone(),
@@ -2337,7 +2355,7 @@ impl Service {
let waiters = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
let config = ReconcilerConfigBuilder::new()
let config = ReconcilerConfigBuilder::new(ReconcilerPriority::High)
.tenant_creation_hint(true)
.build();
tenants
@@ -2812,7 +2830,8 @@ impl Service {
shard.schedule(scheduler, &mut schedule_context)?;
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
let maybe_waiter =
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High);
if let Some(waiter) = maybe_waiter {
waiters.push(waiter);
}
@@ -2933,7 +2952,9 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.config = config.clone();
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
if let Some(waiter) =
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
{
waiters.push(waiter);
}
}
@@ -3215,7 +3236,9 @@ impl Service {
debug_assert!(shard.intent.get_attached().is_none());
debug_assert!(shard.intent.get_secondary().is_empty());
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
if let Some(waiter) =
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
{
detach_waiters.push(waiter);
}
}
@@ -3367,7 +3390,7 @@ impl Service {
// In case scheduling is being switched back on, try it now.
shard.schedule(scheduler, &mut schedule_context).ok();
self.maybe_reconcile_shard(shard, nodes);
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High);
}
Ok(())
@@ -4416,7 +4439,7 @@ impl Service {
tracing::warn!("Failed to schedule {tenant_shard_id} during shard abort: {e}")
}
self.maybe_reconcile_shard(shard, nodes);
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High);
}
// We don't expect any new_shard_count shards to exist here, but drop them just in case
@@ -4582,7 +4605,11 @@ impl Service {
tracing::warn!("Failed to schedule child shard {child}: {e}");
}
// In the background, attach secondary locations for the new shards
if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes) {
if let Some(waiter) = self.maybe_reconcile_shard(
&mut child_state,
nodes,
ReconcilerPriority::High,
) {
waiters.push(waiter);
}
@@ -4947,7 +4974,9 @@ impl Service {
shard.intent.clear_secondary(scheduler);
// Run Reconciler to execute detach fo secondary locations.
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
if let Some(waiter) =
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
{
waiters.push(waiter);
}
}
@@ -5215,7 +5244,7 @@ impl Service {
let reconciler_config = match migrate_req.migration_config {
Some(cfg) => (&cfg).into(),
None => ReconcilerConfig::default(),
None => ReconcilerConfig::new(ReconcilerPriority::High),
};
self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config)
@@ -5281,7 +5310,7 @@ impl Service {
);
}
self.maybe_reconcile_shard(shard, nodes)
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
};
if let Some(waiter) = waiter {
@@ -5693,7 +5722,7 @@ impl Service {
)
}
self.maybe_reconcile_shard(shard, nodes);
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::Normal);
}
// Here we remove an existing observed location for the node we're removing, and it will
@@ -6062,7 +6091,14 @@ impl Service {
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
if self
.maybe_reconcile_shard(
tenant_shard,
nodes,
ReconcilerPriority::Normal,
)
.is_some()
{
tenants_affected += 1;
};
}
@@ -6093,7 +6129,11 @@ impl Service {
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
if observed_loc.conf.is_none() {
self.maybe_reconcile_shard(tenant_shard, nodes);
self.maybe_reconcile_shard(
tenant_shard,
nodes,
ReconcilerPriority::Normal,
);
}
}
}
@@ -6457,8 +6497,36 @@ impl Service {
&self,
shard: &mut TenantShard,
nodes: &Arc<HashMap<NodeId, Node>>,
priority: ReconcilerPriority,
) -> Option<ReconcilerWaiter> {
self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::default())
self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::new(priority))
}
/// Before constructing a Reconciler, acquire semaphore units from the appropriate concurrency limit (depends on priority)
fn get_reconciler_units(
&self,
priority: ReconcilerPriority,
) -> Result<ReconcileUnits, TryAcquireError> {
let units = match priority {
ReconcilerPriority::Normal => self.reconciler_concurrency.clone().try_acquire_owned(),
ReconcilerPriority::High => {
match self
.priority_reconciler_concurrency
.clone()
.try_acquire_owned()
{
Ok(u) => Ok(u),
Err(TryAcquireError::NoPermits) => {
// If the high priority semaphore is exhausted, then high priority tasks may steal units from
// the normal priority semaphore.
self.reconciler_concurrency.clone().try_acquire_owned()
}
Err(e) => Err(e),
}
}
};
units.map(ReconcileUnits::new)
}
/// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`],
@@ -6478,8 +6546,8 @@ impl Service {
}
};
let units = match self.reconciler_concurrency.clone().try_acquire_owned() {
Ok(u) => ReconcileUnits::new(u),
let units = match self.get_reconciler_units(reconciler_config.priority) {
Ok(u) => u,
Err(_) => {
tracing::info!(tenant_id=%shard.tenant_shard_id.tenant_id, shard_id=%shard.tenant_shard_id.shard_slug(),
"Concurrency limited: enqueued for reconcile later");
@@ -6572,7 +6640,10 @@ impl Service {
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another rone
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
if self
.maybe_reconcile_shard(shard, &pageservers, ReconcilerPriority::Normal)
.is_some()
{
reconciles_spawned += 1;
} else if shard.delayed_reconcile {
// Shard wanted to reconcile but for some reason couldn't.
@@ -6658,7 +6729,10 @@ impl Service {
tracing::info!(tenant_shard_id=%tenant_shard_id, "Applying optimization: {optimization:?}");
if shard.apply_optimization(scheduler, optimization) {
optimizations_applied += 1;
if self.maybe_reconcile_shard(shard, nodes).is_some() {
if self
.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::Normal)
.is_some()
{
reconciles_spawned += 1;
}
}
@@ -7208,7 +7282,7 @@ impl Service {
// to not stall the operation when a cold secondary is encountered.
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new()
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
.build();
@@ -7541,7 +7615,7 @@ impl Service {
) -> Result<(), OperationError> {
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new()
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
.build();

View File

@@ -88,7 +88,11 @@ impl ChaosInjector {
shard.intent.demote_attached(scheduler, old_location);
shard.intent.promote_attached(scheduler, new_location);
self.service.maybe_reconcile_shard(shard, nodes);
self.service.maybe_reconcile_shard(
shard,
nodes,
crate::reconciler::ReconcilerPriority::Normal,
);
}
async fn inject_chaos(&mut self) {