diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 860fe4802a..e890c5e45e 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -278,7 +278,7 @@ pub struct Service { config: Config, persistence: Arc, compute_hook: Arc, - result_tx: tokio::sync::mpsc::UnboundedSender, + result_tx: tokio::sync::mpsc::UnboundedSender, heartbeater: Heartbeater, @@ -308,9 +308,15 @@ pub struct Service { // Process shutdown will fire this token cancel: CancellationToken, + // Child token of [`Service::cancel`] used by reconcilers + reconcilers_cancel: CancellationToken, + // Background tasks will hold this gate gate: Gate, + // Reconcilers background tasks will hold this gate + reconcilers_gate: Gate, + /// This waits for initial reconciliation with pageservers to complete. Until this barrier /// passes, it isn't safe to do any actions that mutate tenants. pub(crate) startup_complete: Barrier, @@ -397,6 +403,11 @@ struct ShardUpdate { generation: Option, } +pub(crate) enum ReconcileResultRequest { + ReconcileResult(ReconcileResult), + Stop, +} + impl Service { pub fn get_config(&self) -> &Config { &self.config @@ -753,7 +764,7 @@ impl Service { const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20); let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD); - while !self.cancel.is_cancelled() { + while !self.reconcilers_cancel.is_cancelled() { tokio::select! { _ = interval.tick() => { let reconciles_spawned = self.reconcile_all(); @@ -766,7 +777,7 @@ impl Service { } } } - _ = self.cancel.cancelled() => return + _ = self.reconcilers_cancel.cancelled() => return } } } @@ -937,7 +948,7 @@ impl Service { async fn process_results( &self, - mut result_rx: tokio::sync::mpsc::UnboundedReceiver, + mut result_rx: tokio::sync::mpsc::UnboundedReceiver, mut bg_compute_hook_result_rx: tokio::sync::mpsc::Receiver< Result<(), (TenantShardId, NotifyError)>, >, @@ -947,8 +958,8 @@ impl Service { tokio::select! { r = result_rx.recv() => { match r { - Some(result) => {self.process_result(result);}, - None => {break;} + Some(ReconcileResultRequest::ReconcileResult(result)) => {self.process_result(result);}, + None | Some(ReconcileResultRequest::Stop) => {break;} } } _ = async{ @@ -974,9 +985,6 @@ impl Service { } }; } - - // We should only fall through on shutdown - assert!(self.cancel.is_cancelled()); } async fn process_aborts( @@ -1153,6 +1161,8 @@ impl Service { tokio::sync::mpsc::channel(MAX_DELAYED_RECONCILES); let cancel = CancellationToken::new(); + let reconcilers_cancel = cancel.child_token(); + let heartbeater = Heartbeater::new( config.jwt_token.clone(), config.max_offline_interval, @@ -1178,7 +1188,9 @@ impl Service { abort_tx, startup_complete: startup_complete.clone(), cancel, + reconcilers_cancel, gate: Gate::default(), + reconcilers_gate: Gate::default(), tenant_op_locks: Default::default(), node_op_locks: Default::default(), }); @@ -5132,7 +5144,7 @@ impl Service { } }; - let Ok(gate_guard) = self.gate.enter() else { + let Ok(gate_guard) = self.reconcilers_gate.enter() else { // Gate closed: we're shutting down, drop out. return None; }; @@ -5145,7 +5157,7 @@ impl Service { &self.persistence, units, gate_guard, - &self.cancel, + &self.reconcilers_cancel, ) } @@ -5592,17 +5604,21 @@ impl Service { } pub async fn shutdown(&self) { - // Note that this already stops processing any results from reconciles: so - // we do not expect that our [`TenantShard`] objects will reach a neat - // final state. + // Cancel all on-going reconciles and wait for them to exit the gate. + tracing::info!("Shutting down: cancelling and waiting for in-flight reconciles"); + self.reconcilers_cancel.cancel(); + self.reconcilers_gate.close().await; + + // Signal the background loop in [`Service::process_results`] to exit once + // it has proccessed the results from all the reconciles we cancelled earlier. + tracing::info!("Shutting down: processing results from previously in-flight reconciles"); + self.result_tx.send(ReconcileResultRequest::Stop).ok(); + self.result_tx.closed().await; + + // Background tasks hold gate guards: this notifies them of the cancellation and + // waits for them all to complete. + tracing::info!("Shutting down: cancelling and waiting for background tasks to exit"); self.cancel.cancel(); - - // The cancellation tokens in [`crate::reconciler::Reconciler`] are children - // of our cancellation token, so we do not need to explicitly cancel each of - // them. - - // Background tasks and reconcilers hold gate guards: this waits for them all - // to complete. self.gate.close().await; } diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index ee2ba6c4ee..670efae154 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -9,6 +9,7 @@ use crate::{ persistence::TenantShardPersistence, reconciler::ReconcileUnits, scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext}, + service::ReconcileResultRequest, }; use pageserver_api::controller_api::{ NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, @@ -1059,7 +1060,7 @@ impl TenantShard { #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) fn spawn_reconciler( &mut self, - result_tx: &tokio::sync::mpsc::UnboundedSender, + result_tx: &tokio::sync::mpsc::UnboundedSender, pageservers: &Arc>, compute_hook: &Arc, service_config: &service::Config, @@ -1183,7 +1184,9 @@ impl TenantShard { pending_compute_notification: reconciler.compute_notify_failure, }; - result_tx.send(result).ok(); + result_tx + .send(ReconcileResultRequest::ReconcileResult(result)) + .ok(); } .instrument(reconciler_span), );