mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
storcon: shutdown with clean observed state (#8494)
## Problem Storcon shutdown did not produce a clean observed state. This is not a problem at the moment, but we will need to stop all reconciles with clean observed state for rolling restarts. I tried to test this by collecting the observed state during shutdown and comparing it with the in-memory observed state, but it doesn't work because a lot of tests use the cursed attach hook to create tenants directly through the ps. ## Summary of Changes Rework storcon shutdown as follows: * Reconcilers get a separate cancellation token which is a child token of the global `Service::cancel`. * Reconcilers get a separate gate * Add a mechanism to drain the reconciler result queue before * Put all of this together into a clean shutdown sequence Related https://github.com/neondatabase/cloud/issues/14701
This commit is contained in:
@@ -278,7 +278,7 @@ pub struct Service {
|
||||
config: Config,
|
||||
persistence: Arc<Persistence>,
|
||||
compute_hook: Arc<ComputeHook>,
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
|
||||
heartbeater: Heartbeater,
|
||||
|
||||
@@ -308,9 +308,15 @@ pub struct Service {
|
||||
// Process shutdown will fire this token
|
||||
cancel: CancellationToken,
|
||||
|
||||
// Child token of [`Service::cancel`] used by reconcilers
|
||||
reconcilers_cancel: CancellationToken,
|
||||
|
||||
// Background tasks will hold this gate
|
||||
gate: Gate,
|
||||
|
||||
// Reconcilers background tasks will hold this gate
|
||||
reconcilers_gate: Gate,
|
||||
|
||||
/// This waits for initial reconciliation with pageservers to complete. Until this barrier
|
||||
/// passes, it isn't safe to do any actions that mutate tenants.
|
||||
pub(crate) startup_complete: Barrier,
|
||||
@@ -397,6 +403,11 @@ struct ShardUpdate {
|
||||
generation: Option<Generation>,
|
||||
}
|
||||
|
||||
pub(crate) enum ReconcileResultRequest {
|
||||
ReconcileResult(ReconcileResult),
|
||||
Stop,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
&self.config
|
||||
@@ -753,7 +764,7 @@ impl Service {
|
||||
const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
|
||||
|
||||
let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
|
||||
while !self.cancel.is_cancelled() {
|
||||
while !self.reconcilers_cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
let reconciles_spawned = self.reconcile_all();
|
||||
@@ -766,7 +777,7 @@ impl Service {
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = self.cancel.cancelled() => return
|
||||
_ = self.reconcilers_cancel.cancelled() => return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -937,7 +948,7 @@ impl Service {
|
||||
|
||||
async fn process_results(
|
||||
&self,
|
||||
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
|
||||
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResultRequest>,
|
||||
mut bg_compute_hook_result_rx: tokio::sync::mpsc::Receiver<
|
||||
Result<(), (TenantShardId, NotifyError)>,
|
||||
>,
|
||||
@@ -947,8 +958,8 @@ impl Service {
|
||||
tokio::select! {
|
||||
r = result_rx.recv() => {
|
||||
match r {
|
||||
Some(result) => {self.process_result(result);},
|
||||
None => {break;}
|
||||
Some(ReconcileResultRequest::ReconcileResult(result)) => {self.process_result(result);},
|
||||
None | Some(ReconcileResultRequest::Stop) => {break;}
|
||||
}
|
||||
}
|
||||
_ = async{
|
||||
@@ -974,9 +985,6 @@ impl Service {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// We should only fall through on shutdown
|
||||
assert!(self.cancel.is_cancelled());
|
||||
}
|
||||
|
||||
async fn process_aborts(
|
||||
@@ -1153,6 +1161,8 @@ impl Service {
|
||||
tokio::sync::mpsc::channel(MAX_DELAYED_RECONCILES);
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let reconcilers_cancel = cancel.child_token();
|
||||
|
||||
let heartbeater = Heartbeater::new(
|
||||
config.jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
@@ -1178,7 +1188,9 @@ impl Service {
|
||||
abort_tx,
|
||||
startup_complete: startup_complete.clone(),
|
||||
cancel,
|
||||
reconcilers_cancel,
|
||||
gate: Gate::default(),
|
||||
reconcilers_gate: Gate::default(),
|
||||
tenant_op_locks: Default::default(),
|
||||
node_op_locks: Default::default(),
|
||||
});
|
||||
@@ -5132,7 +5144,7 @@ impl Service {
|
||||
}
|
||||
};
|
||||
|
||||
let Ok(gate_guard) = self.gate.enter() else {
|
||||
let Ok(gate_guard) = self.reconcilers_gate.enter() else {
|
||||
// Gate closed: we're shutting down, drop out.
|
||||
return None;
|
||||
};
|
||||
@@ -5145,7 +5157,7 @@ impl Service {
|
||||
&self.persistence,
|
||||
units,
|
||||
gate_guard,
|
||||
&self.cancel,
|
||||
&self.reconcilers_cancel,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -5592,17 +5604,21 @@ impl Service {
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
// Note that this already stops processing any results from reconciles: so
|
||||
// we do not expect that our [`TenantShard`] objects will reach a neat
|
||||
// final state.
|
||||
// Cancel all on-going reconciles and wait for them to exit the gate.
|
||||
tracing::info!("Shutting down: cancelling and waiting for in-flight reconciles");
|
||||
self.reconcilers_cancel.cancel();
|
||||
self.reconcilers_gate.close().await;
|
||||
|
||||
// Signal the background loop in [`Service::process_results`] to exit once
|
||||
// it has proccessed the results from all the reconciles we cancelled earlier.
|
||||
tracing::info!("Shutting down: processing results from previously in-flight reconciles");
|
||||
self.result_tx.send(ReconcileResultRequest::Stop).ok();
|
||||
self.result_tx.closed().await;
|
||||
|
||||
// Background tasks hold gate guards: this notifies them of the cancellation and
|
||||
// waits for them all to complete.
|
||||
tracing::info!("Shutting down: cancelling and waiting for background tasks to exit");
|
||||
self.cancel.cancel();
|
||||
|
||||
// The cancellation tokens in [`crate::reconciler::Reconciler`] are children
|
||||
// of our cancellation token, so we do not need to explicitly cancel each of
|
||||
// them.
|
||||
|
||||
// Background tasks and reconcilers hold gate guards: this waits for them all
|
||||
// to complete.
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::{
|
||||
persistence::TenantShardPersistence,
|
||||
reconciler::ReconcileUnits,
|
||||
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
|
||||
service::ReconcileResultRequest,
|
||||
};
|
||||
use pageserver_api::controller_api::{
|
||||
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
@@ -1059,7 +1060,7 @@ impl TenantShard {
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn spawn_reconciler(
|
||||
&mut self,
|
||||
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
pageservers: &Arc<HashMap<NodeId, Node>>,
|
||||
compute_hook: &Arc<ComputeHook>,
|
||||
service_config: &service::Config,
|
||||
@@ -1183,7 +1184,9 @@ impl TenantShard {
|
||||
pending_compute_notification: reconciler.compute_notify_failure,
|
||||
};
|
||||
|
||||
result_tx.send(result).ok();
|
||||
result_tx
|
||||
.send(ReconcileResultRequest::ReconcileResult(result))
|
||||
.ok();
|
||||
}
|
||||
.instrument(reconciler_span),
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user