diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 165ae61379..aeced98859 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2144,14 +2144,31 @@ impl Timeline { debug_assert_current_span_has_tenant_and_timeline_id(); // Regardless of whether we're going to try_freeze_and_flush - // or not, stop ingesting any more data. + // cancel walreceiver to stop ingesting more data asap. + // + // Note that we're accepting a race condition here where we may + // do the final flush below, before walreceiver observes the + // cancellation and exits. + // This means we may open a new InMemoryLayer after the final flush below. + // Flush loop is also still running for a short while, so, in theory, it + // could also make its way into the upload queue. + // + // If we wait for the shutdown of the walreceiver before moving on to the + // flush, then that would be avoided. But we don't do it because the + // walreceiver entertains reads internally, which means that it possibly + // depends on the download of layers. Layer download is only sensitive to + // the cancellation of the entire timeline, so cancelling the walreceiver + // will have no effect on the individual get requests. + // This would cause problems when there is a lot of ongoing downloads or + // there is S3 unavailabilities, i.e. detach, deletion, etc would hang, + // and we can't deallocate resources of the timeline, etc. let walreceiver = self.walreceiver.lock().unwrap().take(); tracing::debug!( is_some = walreceiver.is_some(), "Waiting for WalReceiverManager..." ); if let Some(walreceiver) = walreceiver { - walreceiver.shutdown().await; + walreceiver.cancel().await; } // ... and inform any waiters for newer LSNs that there won't be any. self.last_record_lsn.shutdown(); diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 633c94a010..3fe6c21a7d 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -63,7 +63,6 @@ pub struct WalReceiver { /// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token. /// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`. cancel: CancellationToken, - task: tokio::task::JoinHandle<()>, } impl WalReceiver { @@ -80,7 +79,7 @@ impl WalReceiver { let loop_status = Arc::new(std::sync::RwLock::new(None)); let manager_status = Arc::clone(&loop_status); let cancel = timeline.cancel.child_token(); - let task = WALRECEIVER_RUNTIME.spawn({ + let _task = WALRECEIVER_RUNTIME.spawn({ let cancel = cancel.clone(); async move { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -121,25 +120,14 @@ impl WalReceiver { Self { manager_status, cancel, - task, } } #[instrument(skip_all, level = tracing::Level::DEBUG)] - pub async fn shutdown(self) { + pub async fn cancel(self) { debug_assert_current_span_has_tenant_and_timeline_id(); debug!("cancelling walreceiver tasks"); self.cancel.cancel(); - match self.task.await { - Ok(()) => debug!("Shutdown success"), - Err(je) if je.is_cancelled() => unreachable!("not used"), - Err(je) if je.is_panic() => { - // already logged by panic hook - } - Err(je) => { - error!("shutdown walreceiver task join error: {je}") - } - } } pub(crate) fn status(&self) -> Option { diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index f8b0849c73..f619c69599 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -275,20 +275,12 @@ pub(super) async fn handle_walreceiver_connection( let copy_stream = replication_client.copy_both_simple(&query).await?; let mut physical_stream = pin!(ReplicationStream::new(copy_stream)); - let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx); - let walingest_res = select! { - walingest_res = walingest_future => walingest_res, - _ = cancellation.cancelled() => { - // We are doing reads in WalIngest::new, and those can hang as they come from the network. - // Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one. - debug!("Connection cancelled"); - return Err(WalReceiverError::Cancelled); - }, - }; - let mut walingest = walingest_res.map_err(|e| match e.kind { - crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled, - _ => WalReceiverError::Other(e.into()), - })?; + let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx) + .await + .map_err(|e| match e.kind { + crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled, + _ => WalReceiverError::Other(e.into()), + })?; let (format, compression) = match protocol { PostgresClientProtocol::Interpreted {