diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c8d897d074..d7f5958128 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2127,22 +2127,14 @@ impl Timeline { debug_assert_current_span_has_tenant_and_timeline_id(); // Regardless of whether we're going to try_freeze_and_flush - // or not, stop ingesting any more data. Walreceiver only provides - // cancellation but no "wait until gone", because it uses the Timeline::gate. - // So, only after the self.gate.close() below will we know for sure that - // no walreceiver tasks are left. - // For `try_freeze_and_flush=true`, this means that we might still be ingesting - // data during the call to `self.freeze_and_flush()` below. - // That's not ideal, but, we don't have the concept of a ChildGuard, - // which is what we'd need to properly model early shutdown of the walreceiver - // task sub-tree before the other Timeline task sub-trees. + // or not, stop ingesting any more data. let walreceiver = self.walreceiver.lock().unwrap().take(); tracing::debug!( is_some = walreceiver.is_some(), "Waiting for WalReceiverManager..." ); if let Some(walreceiver) = walreceiver { - walreceiver.cancel(); + walreceiver.shutdown().await; } // ... and inform any waiters for newer LSNs that there won't be any. self.last_record_lsn.shutdown(); diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 4f80073cc3..0f73eb839b 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -63,6 +63,7 @@ pub struct WalReceiver { /// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token. /// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`. cancel: CancellationToken, + task: tokio::task::JoinHandle<()>, } impl WalReceiver { @@ -79,7 +80,7 @@ impl WalReceiver { let loop_status = Arc::new(std::sync::RwLock::new(None)); let manager_status = Arc::clone(&loop_status); let cancel = timeline.cancel.child_token(); - WALRECEIVER_RUNTIME.spawn({ + let task = WALRECEIVER_RUNTIME.spawn({ let cancel = cancel.clone(); async move { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -120,14 +121,25 @@ impl WalReceiver { Self { manager_status, cancel, + task, } } #[instrument(skip_all, level = tracing::Level::DEBUG)] - pub fn cancel(&self) { + pub async fn shutdown(self) { debug_assert_current_span_has_tenant_and_timeline_id(); debug!("cancelling walreceiver tasks"); self.cancel.cancel(); + match self.task.await { + Ok(()) => debug!("Shutdown success"), + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { + // already logged by panic hook + } + Err(je) => { + error!("shutdown walreceiver task join error: {je}") + } + } } pub(crate) fn status(&self) -> Option {