From 5415b6cb0d670c2a9de93546c5cb7e649deaebdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Sat, 28 Jun 2025 03:03:56 +0200 Subject: [PATCH] pass through cancellations --- pageserver/src/context.rs | 28 +++++++++++++++---- .../tenant/remote_timeline_client/download.rs | 6 +++- pageserver/src/tenant/timeline.rs | 2 +- .../walreceiver/connection_manager.rs | 3 +- .../walreceiver/walreceiver_connection.rs | 20 ++++--------- .../owned_buffers_io/write/flush.rs | 2 +- 6 files changed, 37 insertions(+), 24 deletions(-) diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index 4d624c72c7..2d73eecf92 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -361,11 +361,6 @@ impl RequestContextBuilder { self } - pub(crate) fn cancel(mut self, cancel: CancellationToken) -> Self { - self.inner.cancel = cancel; - self - } - pub fn root(self) -> RequestContext { self.inner } @@ -379,6 +374,11 @@ impl RequestContextBuilder { self.inner.cancel = CancellationToken::new(); self.inner } + + pub fn detached_child_with_cancel(mut self, cancel: CancellationToken) -> RequestContext { + self.inner.cancel = cancel; + self.inner + } } impl RequestContext { @@ -439,10 +439,22 @@ impl RequestContext { RequestContextBuilder::from(self) .task_kind(task_kind) .download_behavior(download_behavior) - .cancel(CancellationToken::new()) .detached_child() } + /// Like [`Self::detached_child`], but with the ability to specify the cancellation token + pub fn detached_child_with_cancel( + &self, + task_kind: TaskKind, + download_behavior: DownloadBehavior, + cancel: CancellationToken, + ) -> Self { + RequestContextBuilder::from(self) + .task_kind(task_kind) + .download_behavior(download_behavior) + .detached_child_with_cancel(cancel) + } + /// Create a child of context `self` for a task that shall not outlive `self`. /// /// Use this when fanning-out work to other async tasks. @@ -619,6 +631,10 @@ impl RequestContext { pub(crate) fn has_perf_span(&self) -> bool { self.perf_span.is_some() } + + pub(crate) fn cancellation_token(&self) -> &CancellationToken { + &self.cancel + } } /// [`Future`] extension trait that allow for creating performance diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 84989e0fb8..91a4eb21b0 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -190,7 +190,11 @@ async fn download_object( .download(src_path, &DownloadOpts::default(), cancel) .await?; - pausable_failpoint!("before-downloading-layer-stream-pausable"); + pausable_failpoint!( + "before-downloading-layer-stream-pausable", + ctx.cancellation_token() + ) + .map_err(|_| DownloadError::Cancelled)?; let dst_path = destination_file.path().to_owned(); let mut buffered = owned_buffers_io::write::BufferedWriter::::new( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 08bc6d4a59..b32e9a6bda 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4441,7 +4441,7 @@ impl Timeline { let mut image_covered_keyspace = KeySpaceRandomAccum::new(); while let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() { - if cancel.is_cancelled() { + if cancel.is_cancelled() || ctx.cancellation_token().is_cancelled() { return Err(GetVectoredError::Cancelled); } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 7e0b0e9b25..36c8c518b0 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -536,9 +536,10 @@ impl ConnectionManagerState { let protocol = self.conf.protocol; let validate_wal_contiguity = self.conf.validate_wal_contiguity; let timeline = Arc::clone(&self.timeline); - let ctx = ctx.detached_child( + let ctx = ctx.detached_child_with_cancel( TaskKind::WalReceiverConnectionHandler, DownloadBehavior::Download, + self.cancel.clone() ); let span = info_span!("connection", %node_id); diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 6d52da1f00..e91bd5d43a 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -275,20 +275,12 @@ pub(super) async fn handle_walreceiver_connection( let copy_stream = replication_client.copy_both_simple(&query).await?; let mut physical_stream = pin!(ReplicationStream::new(copy_stream)); - let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx); - let walingest_res = select! { - walingest_res = walingest_future => walingest_res, - _ = cancellation.cancelled() => { - // We are doing reads in WalIngest::new, and those can hang as they come from the network. - // Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one. - debug!("Connection cancelled"); - return Err(WalReceiverError::Cancelled); - }, - }; - let mut walingest = walingest_res.map_err(|e| match e.kind { - crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled, - _ => WalReceiverError::Other(e.into()), - })?; + let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx) + .await + .map_err(|e| match e.kind { + crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled, + _ => WalReceiverError::Other(e.into()), + })?; let (format, compression) = match protocol { PostgresClientProtocol::Interpreted { diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index ac9867e8b4..9fcc72df8f 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -299,7 +299,7 @@ where // let mut request_storage = Some(request); for attempt in 1.. { - if self.cancel.is_cancelled() { + if self.cancel.is_cancelled() || self.ctx.cancellation_token().is_cancelled() { return Err(FlushTaskError::Cancelled); } let result = async {