storage controller: send startup compute notifications in background (#7495)

## Problem

Previously, we try to send compute notifications in startup_reconcile
before completing that function, with a time limit. Any notifications
that don't happen within the time limit result in tenants having their
`pending_compute_notification` flag set, which causes them to spawn a
Reconciler next time the background reconciler loop runs.

This causes two problems:
- Spawning a lot of reconcilers after startup caused a spike in memory
(this is addressed in https://github.com/neondatabase/neon/pull/7493)
- After https://github.com/neondatabase/neon/pull/7493, spawning lots of
reconcilers will block some other operations, e.g. a tenant creation
might fail due to lack of reconciler semaphore units while the
controller is busy running all the Reconcilers for its startup compute
notifications.

When the code was first written, ComputeHook didn't have internal
ordering logic to ensure that notifications for a shard were sent in the
right order. Since that was added in
https://github.com/neondatabase/neon/pull/7088, we can use it to avoid
waiting for notifications to complete in startup_reconcile.

Related to: https://github.com/neondatabase/neon/issues/7460

## Summary of changes

- Add a `notify_background` method to ComputeHook.
- Call this from startup_reconcile instead of doing notifications inline
- Process completions from `notify_background` in `process_results`, and
if a notification failed then set the `pending_compute_notification`
flag on the shard.

The result is that we will only spawn lots of Reconcilers if the compute
notifications _fail_, not just because they take some significant amount
of time.

Test coverage for this case is in
https://github.com/neondatabase/neon/pull/7475
This commit is contained in:
John Spray
2024-04-29 09:59:22 +01:00
committed by GitHub
parent b655c7030f
commit 84914434e3
2 changed files with 183 additions and 133 deletions

View File

@@ -3,11 +3,13 @@ use std::{collections::HashMap, time::Duration};
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::{Method, StatusCode};
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, Instrument};
use utils::{
backoff::{self},
id::{NodeId, TenantId},
@@ -420,48 +422,37 @@ impl ComputeHook {
.and_then(|x| x)
}
/// Call this to notify the compute (postgres) tier of new pageservers to use
/// for a tenant. notify() is called by each shard individually, and this function
/// will decide whether an update to the tenant is sent. An update is sent on the
/// condition that:
/// - We know a pageserver for every shard.
/// - All the shards have the same shard_count (i.e. we are not mid-split)
///
/// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
/// that is cancelled.
///
/// This function is fallible, including in the case that the control plane is transiently
/// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
/// the proper pageserver nodes for a tenant.
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify(
/// Synchronous phase: update the per-tenant state for the next intended notification
fn notify_prepare(
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
) -> MaybeSendResult {
let mut state_locked = self.state.lock().unwrap();
use std::collections::hash_map::Entry;
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
node_id,
)),
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant
}
};
tenant.maybe_send(tenant_shard_id.tenant_id, None)
}
async fn notify_execute(
&self,
maybe_send_result: MaybeSendResult,
tenant_shard_id: TenantShardId,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let maybe_send_result = {
let mut state_locked = self.state.lock().unwrap();
use std::collections::hash_map::Entry;
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
node_id,
)),
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant
}
};
tenant.maybe_send(tenant_shard_id.tenant_id, None)
};
// Process result: we may get an update to send, or we may have to wait for a lock
// before trying again.
let (request, mut send_lock_guard) = match maybe_send_result {
@@ -469,7 +460,12 @@ impl ComputeHook {
return Ok(());
}
MaybeSendResult::AwaitLock(send_lock) => {
let send_locked = send_lock.lock_owned().await;
let send_locked = tokio::select! {
guard = send_lock.lock_owned() => {guard},
_ = cancel.cancelled() => {
return Err(NotifyError::ShuttingDown)
}
};
// Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
// we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
@@ -508,6 +504,94 @@ impl ComputeHook {
}
result
}
/// Infallible synchronous fire-and-forget version of notify(), that sends its results to
/// a channel. Something should consume the channel and arrange to try notifying again
/// if something failed.
pub(super) fn notify_background(
self: &Arc<Self>,
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
cancel: &CancellationToken,
) {
let mut maybe_sends = Vec::new();
for (tenant_shard_id, node_id, stripe_size) in notifications {
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
maybe_sends.push((tenant_shard_id, maybe_send_result))
}
let this = self.clone();
let cancel = cancel.clone();
tokio::task::spawn(async move {
// Construct an async stream of futures to invoke the compute notify function: we do this
// in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
// ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
// would mostly just be waiting on that semaphore.
let mut stream = futures::stream::iter(maybe_sends)
.map(|(tenant_shard_id, maybe_send_result)| {
let this = this.clone();
let cancel = cancel.clone();
async move {
this
.notify_execute(maybe_send_result, tenant_shard_id, &cancel)
.await.map_err(|e| (tenant_shard_id, e))
}.instrument(info_span!(
"notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
))
})
.buffered(API_CONCURRENCY);
loop {
tokio::select! {
next = stream.next() => {
match next {
Some(r) => {
result_tx.send(r).await.ok();
},
None => {
tracing::info!("Finished sending background compute notifications");
break;
}
}
},
_ = cancel.cancelled() => {
tracing::info!("Shutdown while running background compute notifications");
break;
}
};
}
});
}
/// Call this to notify the compute (postgres) tier of new pageservers to use
/// for a tenant. notify() is called by each shard individually, and this function
/// will decide whether an update to the tenant is sent. An update is sent on the
/// condition that:
/// - We know a pageserver for every shard.
/// - All the shards have the same shard_count (i.e. we are not mid-split)
///
/// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
/// that is cancelled.
///
/// This function is fallible, including in the case that the control plane is transiently
/// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
/// the proper pageserver nodes for a tenant.
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
pub(super) async fn notify(
&self,
tenant_shard_id: TenantShardId,
node_id: NodeId,
stripe_size: ShardStripeSize,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
.await
}
}
#[cfg(test)]

View File

@@ -8,6 +8,7 @@ use std::{
};
use crate::{
compute_hook::NotifyError,
id_lock_map::IdLockMap,
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
@@ -61,7 +62,7 @@ use utils::{
};
use crate::{
compute_hook::{self, ComputeHook},
compute_hook::ComputeHook,
heartbeater::{Heartbeater, PageserverState},
node::{AvailabilityTransition, Node},
persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
@@ -332,7 +333,12 @@ impl Service {
/// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
/// view of the world, and determine which pageservers are responsive.
#[instrument(skip_all)]
async fn startup_reconcile(self: &Arc<Service>) {
async fn startup_reconcile(
self: &Arc<Service>,
bg_compute_notify_result_tx: tokio::sync::mpsc::Sender<
Result<(), (TenantShardId, NotifyError)>,
>,
) {
// For all tenant shards, a vector of observed states on nodes (where None means
// indeterminate, same as in [`ObservedStateLocation`])
let mut observed: HashMap<TenantShardId, Vec<(NodeId, Option<LocationConfig>)>> =
@@ -351,10 +357,6 @@ impl Service {
.checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
.expect("Reconcile timeout is a modest constant");
let compute_notify_deadline = start_at
.checked_add((STARTUP_RECONCILE_TIMEOUT / 4) * 3)
.expect("Reconcile timeout is a modest constant");
// Accumulate a list of any tenant locations that ought to be detached
let mut cleanup = Vec::new();
@@ -380,6 +382,7 @@ impl Service {
let mut compute_notifications = Vec::new();
// Populate intent and observed states for all tenants, based on reported state on pageservers
tracing::info!("Populating tenant shards' states from initial pageserver scan...");
let shard_count = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
@@ -446,28 +449,27 @@ impl Service {
// Emit compute hook notifications for all tenants which are already stably attached. Other tenants
// will emit compute hook notifications when they reconcile.
//
// Ordering: we must complete these notification attempts before doing any other reconciliation for the
// tenants named here, because otherwise our calls to notify() might race with more recent values
// generated by reconciliation.
let notify_failures = self
.compute_notify_many(compute_notifications, compute_notify_deadline)
.await;
// Compute notify is fallible. If it fails here, do not delay overall startup: set the
// flag on these shards that they have a pending notification.
// Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
{
let mut locked = self.inner.write().unwrap();
for tenant_shard_id in notify_failures.into_iter() {
if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
shard.pending_compute_notification = true;
}
}
}
// Ordering: our calls to notify_background synchronously establish a relative order for these notifications vs. any later
// calls into the ComputeHook for the same tenant: we can leave these to run to completion in the background and any later
// calls will be correctly ordered wrt these.
//
// Concurrency: we call notify_background for all tenants, which will create O(N) tokio tasks, but almost all of them
// will just wait on the ComputeHook::API_CONCURRENCY semaphore immediately, so very cheap until they get that semaphore
// unit and start doing I/O.
tracing::info!(
"Sending {} compute notifications",
compute_notifications.len()
);
self.compute_hook.notify_background(
compute_notifications,
bg_compute_notify_result_tx.clone(),
&self.cancel,
);
// Finally, now that the service is up and running, launch reconcile operations for any tenants
// which require it: under normal circumstances this should only include tenants that were in some
// transient state before we restarted, or any tenants whose compute hooks failed above.
tracing::info!("Checking for shards in need of reconciliation...");
let reconcile_tasks = self.reconcile_all();
// We will not wait for these reconciliation tasks to run here: we're now done with startup and
// normal operations may proceed.
@@ -508,6 +510,7 @@ impl Service {
}
}
tracing::info!("Sending initial heartbeats...");
let res = self
.heartbeater
.heartbeat(Arc::new(nodes_to_heartbeat))
@@ -544,6 +547,7 @@ impl Service {
let mut node_list_futs = FuturesUnordered::new();
tracing::info!("Scanning shards on {} nodes...", nodes.len());
for node in nodes.values() {
node_list_futs.push({
async move {
@@ -663,72 +667,6 @@ impl Service {
}
}
/// Used during [`Self::startup_reconcile`]: issue many concurrent compute notifications.
///
/// Returns a set of any shards for which notifications where not acked within the deadline.
async fn compute_notify_many(
&self,
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
deadline: Instant,
) -> HashSet<TenantShardId> {
let attempt_shards = notifications.iter().map(|i| i.0).collect::<HashSet<_>>();
let mut success_shards = HashSet::new();
// Construct an async stream of futures to invoke the compute notify function: we do this
// in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
let mut stream = futures::stream::iter(notifications.into_iter())
.map(|(tenant_shard_id, node_id, stripe_size)| {
let compute_hook = self.compute_hook.clone();
let cancel = self.cancel.clone();
async move {
if let Err(e) = compute_hook
.notify(tenant_shard_id, node_id, stripe_size, &cancel)
.await
{
tracing::error!(
%tenant_shard_id,
%node_id,
"Failed to notify compute on startup for shard: {e}"
);
None
} else {
Some(tenant_shard_id)
}
}
})
.buffered(compute_hook::API_CONCURRENCY);
loop {
tokio::select! {
next = stream.next() => {
match next {
Some(Some(success_shard)) => {
// A notification succeeded
success_shards.insert(success_shard);
},
Some(None) => {
// A notification that failed
},
None => {
tracing::info!("Successfully sent all compute notifications");
break;
}
}
},
_ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
// Give up sending any that didn't succeed yet
tracing::info!("Reached deadline while sending compute notifications");
break;
}
};
}
attempt_shards
.difference(&success_shards)
.cloned()
.collect()
}
/// Long running background task that periodically wakes up and looks for shards that need
/// reconciliation. Reconciliation is fallible, so any reconciliation tasks that fail during
/// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible
@@ -887,23 +825,45 @@ impl Service {
async fn process_results(
&self,
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
mut bg_compute_hook_result_rx: tokio::sync::mpsc::Receiver<
Result<(), (TenantShardId, NotifyError)>,
>,
) {
loop {
// Wait for the next result, or for cancellation
let result = tokio::select! {
tokio::select! {
r = result_rx.recv() => {
match r {
Some(result) => {result},
Some(result) => {self.process_result(result);},
None => {break;}
}
}
_ = async{
match bg_compute_hook_result_rx.recv().await {
Some(result) => {
if let Err((tenant_shard_id, notify_error)) = result {
tracing::warn!("Marking shard {tenant_shard_id} for notification retry, due to error {notify_error}");
let mut locked = self.inner.write().unwrap();
if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
shard.pending_compute_notification = true;
}
}
},
None => {
// This channel is dead, but we don't want to terminate the outer loop{}: just wait for shutdown
self.cancel.cancelled().await;
}
}
} => {},
_ = self.cancel.cancelled() => {
break;
}
};
self.process_result(result);
}
// We should only fall through on shutdown
assert!(self.cancel.is_cancelled());
}
async fn process_aborts(
@@ -1064,6 +1024,10 @@ impl Service {
let (startup_completion, startup_complete) = utils::completion::channel();
// This channel is continuously consumed by process_results, so doesn't need to be very large.
let (bg_compute_notify_result_tx, bg_compute_notify_result_rx) =
tokio::sync::mpsc::channel(512);
let (delayed_reconcile_tx, delayed_reconcile_rx) =
tokio::sync::mpsc::channel(MAX_DELAYED_RECONCILES);
@@ -1101,7 +1065,9 @@ impl Service {
tokio::task::spawn(async move {
// Block shutdown until we're done (we must respect self.cancel)
if let Ok(_gate) = result_task_this.gate.enter() {
result_task_this.process_results(result_rx).await
result_task_this
.process_results(result_rx, bg_compute_notify_result_rx)
.await
}
});
@@ -1143,7 +1109,7 @@ impl Service {
return;
};
this.startup_reconcile().await;
this.startup_reconcile(bg_compute_notify_result_tx).await;
drop(startup_completion);
}
});