controller: limit Reconciler concurrency (#7493)

## Problem

Storage controller memory can spike very high if we have many tenants
and they all try to reconcile at the same time.

Related:
- https://github.com/neondatabase/neon/issues/7463
- https://github.com/neondatabase/neon/issues/7460

Not closing those issues in this PR, because the test coverage for them
will be in https://github.com/neondatabase/neon/pull/7475

## Summary of changes

- Add a CLI arg `--reconciler-concurrency`, defaulted to 128
- Add a semaphore to Service with this many units
- In `maybe_reconcile_shard`, try to acquire semaphore unit. If we can't
get one, return a ReconcileWaiter for a future sequence number, and push
the TenantShardId onto a channel of delayed IDs.
- In `process_result`, consume from the channel of delayed IDs if there
are semaphore units available and call maybe_reconcile_shard again for
these delayed shards.

This has been tested in https://github.com/neondatabase/neon/pull/7475,
but will land that PR separately because it contains other changes &
needs the test stabilizing. This change is worth merging sooner, because
it fixes a practical issue with larger shard counts.
This commit is contained in:
John Spray
2024-04-25 10:46:07 +01:00
committed by GitHub
parent c18d3340b5
commit e8814b6f81
4 changed files with 238 additions and 47 deletions

View File

@@ -9,7 +9,9 @@ use std::time::Duration;
use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::{Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT};
use storage_controller::service::{
Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
};
use tokio::signal::unix::SignalKind;
use tokio_util::sync::CancellationToken;
use utils::auth::{JwtAuth, SwappableJwtAuth};
@@ -64,6 +66,10 @@ struct Cli {
/// Grace period before marking unresponsive pageserver offline
#[arg(long)]
max_unavailable_interval: Option<humantime::Duration>,
/// Maximum number of reconcilers that may run in parallel
#[arg(long)]
reconciler_concurrency: Option<usize>,
}
enum StrictMode {
@@ -243,6 +249,9 @@ async fn async_main() -> anyhow::Result<()> {
.max_unavailable_interval
.map(humantime::Duration::into)
.unwrap_or(MAX_UNAVAILABLE_INTERVAL_DEFAULT),
reconciler_concurrency: args
.reconciler_concurrency
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
};
// After loading secrets & config, but before starting anything else, apply database migrations

View File

@@ -51,6 +51,10 @@ pub(super) struct Reconciler {
/// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
pub(crate) compute_notify_failure: bool,
/// Reconciler is responsible for keeping alive semaphore units that limit concurrency on how many
/// we will spawn.
pub(crate) _resource_units: ReconcileUnits,
/// A means to abort background reconciliation: it is essential to
/// call this when something changes in the original TenantShard that
/// will make this reconciliation impossible or unnecessary, for
@@ -66,6 +70,19 @@ pub(super) struct Reconciler {
pub(crate) persistence: Arc<Persistence>,
}
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
pub(crate) struct ReconcileUnits {
_sem_units: tokio::sync::OwnedSemaphorePermit,
}
impl ReconcileUnits {
pub(crate) fn new(sem_units: tokio::sync::OwnedSemaphorePermit) -> Self {
Self {
_sem_units: sem_units,
}
}
}
/// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any
/// reference counting for Scheduler. The IntentState is what the scheduler works with,
/// and the TargetState is just the instruction for a particular Reconciler run.

View File

@@ -10,8 +10,9 @@ use std::{
use crate::{
id_lock_map::IdLockMap,
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::ReconcileError,
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
tenant_shard::ReconcileNeeded,
};
use anyhow::Context;
use control_plane::storage_controller::{
@@ -48,7 +49,7 @@ use pageserver_api::{
},
};
use pageserver_client::mgmt_api;
use tokio::sync::OwnedRwLockWriteGuard;
use tokio::sync::{mpsc::error::TrySendError, OwnedRwLockWriteGuard};
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use utils::{
@@ -90,6 +91,13 @@ pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
// than they're being pushed onto the queue.
const MAX_DELAYED_RECONCILES: usize = 10000;
// Top level state available to all HTTP handlers
struct ServiceState {
tenants: BTreeMap<TenantShardId, TenantShard>,
@@ -97,6 +105,9 @@ struct ServiceState {
nodes: Arc<HashMap<NodeId, Node>>,
scheduler: Scheduler,
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
}
impl ServiceState {
@@ -104,11 +115,13 @@ impl ServiceState {
nodes: HashMap<NodeId, Node>,
tenants: BTreeMap<TenantShardId, TenantShard>,
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
) -> Self {
Self {
tenants,
nodes: Arc::new(nodes),
scheduler,
delayed_reconcile_rx,
}
}
@@ -142,6 +155,9 @@ pub struct Config {
/// considered active. Once the grace period elapses, the next heartbeat failure will
/// mark the pagseserver offline.
pub max_unavailable_interval: Duration,
/// How many Reconcilers may be spawned concurrently
pub reconciler_concurrency: usize,
}
impl From<DatabaseError> for ApiError {
@@ -180,6 +196,17 @@ pub struct Service {
// that transition it to/from Active.
node_op_locks: IdLockMap<NodeId>,
// Limit how many Reconcilers we will spawn concurrently
reconciler_concurrency: Arc<tokio::sync::Semaphore>,
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
///
/// Note that this state logically lives inside ServiceInner, but carrying Sender here makes the code simpler
/// by avoiding needing a &mut ref to something inside the ServiceInner. This could be optimized to
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,
// Process shutdown will fire this token
cancel: CancellationToken,
@@ -742,8 +769,9 @@ impl Service {
}
/// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation
/// was successful, this will update the observed state of the tenant such that subsequent
/// calls to [`TenantShard::maybe_reconcile`] will do nothing.
/// was successful and intent hasn't changed since the Reconciler was spawned, this will update
/// the observed state of the tenant such that subsequent calls to [`TenantShard::get_reconcile_needed`]
/// will indicate that reconciliation is not needed.
#[instrument(skip_all, fields(
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
sequence=%result.sequence
@@ -804,6 +832,21 @@ impl Service {
}
}
}
// Maybe some other work can proceed now that this job finished.
if self.reconciler_concurrency.available_permits() > 0 {
while let Ok(tenant_shard_id) = locked.delayed_reconcile_rx.try_recv() {
let (nodes, tenants, _scheduler) = locked.parts_mut();
if let Some(shard) = tenants.get_mut(&tenant_shard_id) {
shard.delayed_reconcile = false;
self.maybe_reconcile_shard(shard, nodes);
}
if self.reconciler_concurrency.available_permits() == 0 {
break;
}
}
}
}
async fn process_results(
@@ -986,6 +1029,9 @@ impl Service {
let (startup_completion, startup_complete) = utils::completion::channel();
let (delayed_reconcile_tx, delayed_reconcile_rx) =
tokio::sync::mpsc::channel(MAX_DELAYED_RECONCILES);
let cancel = CancellationToken::new();
let heartbeater = Heartbeater::new(
config.jwt_token.clone(),
@@ -994,13 +1040,20 @@ impl Service {
);
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes, tenants, scheduler,
nodes,
tenants,
scheduler,
delayed_reconcile_rx,
))),
config: config.clone(),
persistence,
compute_hook: Arc::new(ComputeHook::new(config)),
compute_hook: Arc::new(ComputeHook::new(config.clone())),
result_tx,
heartbeater,
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.reconciler_concurrency,
)),
delayed_reconcile_tx,
abort_tx,
startup_complete: startup_complete.clone(),
cancel,
@@ -1535,7 +1588,7 @@ impl Service {
let (response, waiters) = self.do_tenant_create(create_req).await?;
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
if let Err(e) = self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
// Avoid deadlock: reconcile may fail while notifying compute, if the cloud control plane refuses to
// accept compute notifications while it is in the process of creating. Reconciliation will
// be retried in the background.
@@ -4053,20 +4106,64 @@ impl Service {
Ok(())
}
/// Convenience wrapper around [`TenantShard::maybe_reconcile`] that provides
/// all the references to parts of Self that are needed
/// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`],
fn maybe_reconcile_shard(
&self,
shard: &mut TenantShard,
nodes: &Arc<HashMap<NodeId, Node>>,
) -> Option<ReconcilerWaiter> {
shard.maybe_reconcile(
let reconcile_needed = shard.get_reconcile_needed(nodes);
match reconcile_needed {
ReconcileNeeded::No => return None,
ReconcileNeeded::WaitExisting(waiter) => return Some(waiter),
ReconcileNeeded::Yes => {
// Fall through to try and acquire units for spawning reconciler
}
};
let units = match self.reconciler_concurrency.clone().try_acquire_owned() {
Ok(u) => ReconcileUnits::new(u),
Err(_) => {
tracing::info!(tenant_id=%shard.tenant_shard_id.tenant_id, shard_id=%shard.tenant_shard_id.shard_slug(),
"Concurrency limited: enqueued for reconcile later");
if !shard.delayed_reconcile {
match self.delayed_reconcile_tx.try_send(shard.tenant_shard_id) {
Err(TrySendError::Closed(_)) => {
// Weird mid-shutdown case?
}
Err(TrySendError::Full(_)) => {
// It is safe to skip sending our ID in the channel: we will eventually get retried by the background reconcile task.
tracing::warn!(
"Many shards are waiting to reconcile: delayed_reconcile queue is full"
);
}
Ok(()) => {
shard.delayed_reconcile = true;
}
}
}
// We won't spawn a reconciler, but we will construct a waiter that waits for the shard's sequence
// number to advance. When this function is eventually called again and succeeds in getting units,
// it will spawn a reconciler that makes this waiter complete.
return Some(shard.future_reconcile_waiter());
}
};
let Ok(gate_guard) = self.gate.enter() else {
// Gate closed: we're shutting down, drop out.
return None;
};
shard.spawn_reconciler(
&self.result_tx,
nodes,
&self.compute_hook,
&self.config,
&self.persistence,
&self.gate,
units,
gate_guard,
&self.cancel,
)
}
@@ -4088,6 +4185,11 @@ impl Service {
schedule_context = ScheduleContext::default();
}
// Skip checking if this shard is already enqueued for reconciliation
if shard.delayed_reconcile {
continue;
}
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another rone
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {

View File

@@ -7,6 +7,7 @@ use std::{
use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
reconciler::ReconcileUnits,
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
};
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
@@ -22,7 +23,7 @@ use utils::{
generation::Generation,
id::NodeId,
seqwait::{SeqWait, SeqWaitError},
sync::gate::Gate,
sync::gate::GateGuard,
};
use crate::{
@@ -95,6 +96,10 @@ pub(crate) struct TenantShard {
/// reconciliation, and timeline creation.
pub(crate) splitting: SplitState,
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
/// is set. This flag is cleared when the tenant is popped off the delay queue.
pub(crate) delayed_reconcile: bool,
/// Optionally wait for reconciliation to complete up to a particular
/// sequence number.
#[serde(skip)]
@@ -113,8 +118,8 @@ pub(crate) struct TenantShard {
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<String>>,
/// If we have a pending compute notification that for some reason we weren't able to send,
/// set this to true. If this is set, calls to [`Self::maybe_reconcile`] will run a task to retry
/// sending it. This is the mechanism by which compute notifications are included in the scope
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
/// and trigger a Reconciler run. This is the mechanism by which compute notifications are included in the scope
/// of state that we publish externally in an eventually consistent way.
pub(crate) pending_compute_notification: bool,
@@ -353,6 +358,17 @@ pub(crate) struct ReconcilerHandle {
cancel: CancellationToken,
}
pub(crate) enum ReconcileNeeded {
/// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
/// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
No,
/// shard has a reconciler running, and its intent hasn't changed since that one was
/// spawned: wait for the existing reconciler rather than spawning a new one.
WaitExisting(ReconcilerWaiter),
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
Yes,
}
/// When a reconcile task completes, it sends this result object
/// to be applied to the primary TenantShard.
pub(crate) struct ReconcileResult {
@@ -396,6 +412,7 @@ impl TenantShard {
reconciler: None,
splitting: SplitState::Idle,
sequence: Sequence(1),
delayed_reconcile: false,
waiter: Arc::new(SeqWait::new(Sequence(0))),
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(),
@@ -831,16 +848,10 @@ impl TenantShard {
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn maybe_reconcile(
pub(crate) fn get_reconcile_needed(
&mut self,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
service_config: &service::Config,
persistence: &Arc<Persistence>,
gate: &Gate,
cancel: &CancellationToken,
) -> Option<ReconcilerWaiter> {
) -> ReconcileNeeded {
// If there are any ambiguous observed states, and the nodes they refer to are available,
// we should reconcile to clean them up.
let mut dirty_observed = false;
@@ -863,7 +874,7 @@ impl TenantShard {
if !do_reconcile {
tracing::info!("Not dirty, no reconciliation needed.");
return None;
return ReconcileNeeded::No;
}
// If we are currently splitting, then never start a reconciler task: the splitting logic
@@ -871,7 +882,7 @@ impl TenantShard {
// up top, so that we only log this message if we would otherwise have done a reconciliation.
if !matches!(self.splitting, SplitState::Idle) {
tracing::info!("Refusing to reconcile, splitting in progress");
return None;
return ReconcileNeeded::No;
}
// Reconcile already in flight for the current sequence?
@@ -881,7 +892,7 @@ impl TenantShard {
"Reconciliation already in progress for sequence {:?}",
self.sequence,
);
return Some(ReconcilerWaiter {
return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
error_seq_wait: self.error_waiter.clone(),
@@ -900,10 +911,67 @@ impl TenantShard {
// We only reach this point if there is work to do and we're going to skip
// doing it: warn it obvious why this tenant isn't doing what it ought to.
tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
return None;
return ReconcileNeeded::No;
}
}
ReconcileNeeded::Yes
}
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
/// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
///
/// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
/// that the waiter will not complete until some future Reconciler is constructed and run.
fn ensure_sequence_ahead(&mut self) {
// Find the highest sequence for which a Reconciler has previously run or is currently
// running
let max_seen = std::cmp::max(
self.reconciler
.as_ref()
.map(|r| r.sequence)
.unwrap_or(Sequence(0)),
std::cmp::max(self.waiter.load(), self.error_waiter.load()),
);
if self.sequence <= max_seen {
self.sequence = max_seen.next();
}
}
/// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
///
/// This is appropriate when you can't spawn a recociler (e.g. due to resource limits), but
/// you would like to wait until one gets spawned in the background.
pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
self.ensure_sequence_ahead();
ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
error_seq_wait: self.error_waiter.clone(),
error: self.last_error.clone(),
seq: self.sequence,
}
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn spawn_reconciler(
&mut self,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
service_config: &service::Config,
persistence: &Arc<Persistence>,
units: ReconcileUnits,
gate_guard: GateGuard,
cancel: &CancellationToken,
) -> Option<ReconcilerWaiter> {
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
// doing our sequence's work.
let old_handle = self.reconciler.take();
// Build list of nodes from which the reconciler should detach
let mut detach = Vec::new();
for node_id in self.observed.locations.keys() {
@@ -919,18 +987,9 @@ impl TenantShard {
}
}
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
// doing our sequence's work.
let old_handle = self.reconciler.take();
let Ok(gate_guard) = gate.enter() else {
// Shutting down, don't start a reconciler
return None;
};
// Advance the sequence before spawning a reconciler, so that sequence waiters
// can distinguish between before+after the reconcile completes.
self.sequence = self.sequence.next();
self.ensure_sequence_ahead();
let reconciler_cancel = cancel.child_token();
let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
@@ -945,6 +1004,7 @@ impl TenantShard {
compute_hook: compute_hook.clone(),
service_config: service_config.clone(),
_gate_guard: gate_guard,
_resource_units: units,
cancel: reconciler_cancel.clone(),
persistence: persistence.clone(),
compute_notify_failure: false,
@@ -1011,16 +1071,18 @@ impl TenantShard {
status: outcome_label,
});
result_tx
.send(ReconcileResult {
sequence: reconcile_seq,
result,
tenant_shard_id: reconciler.tenant_shard_id,
generation: reconciler.generation,
observed: reconciler.observed,
pending_compute_notification: reconciler.compute_notify_failure,
})
.ok();
// Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
// try and schedule more work in response to our result.
let result = ReconcileResult {
sequence: reconcile_seq,
result,
tenant_shard_id: reconciler.tenant_shard_id,
generation: reconciler.generation,
observed: reconciler.observed,
pending_compute_notification: reconciler.compute_notify_failure,
};
result_tx.send(result).ok();
}
.instrument(reconciler_span),
);
@@ -1111,6 +1173,7 @@ impl TenantShard {
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
pending_compute_notification: false,
delayed_reconcile: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
})
}