pass through cancellations

This commit is contained in:
Arpad Müller
2025-06-28 03:03:56 +02:00
parent 216ec91ef3
commit 5415b6cb0d
6 changed files with 37 additions and 24 deletions

View File

@@ -361,11 +361,6 @@ impl RequestContextBuilder {
self
}
pub(crate) fn cancel(mut self, cancel: CancellationToken) -> Self {
self.inner.cancel = cancel;
self
}
pub fn root(self) -> RequestContext {
self.inner
}
@@ -379,6 +374,11 @@ impl RequestContextBuilder {
self.inner.cancel = CancellationToken::new();
self.inner
}
pub fn detached_child_with_cancel(mut self, cancel: CancellationToken) -> RequestContext {
self.inner.cancel = cancel;
self.inner
}
}
impl RequestContext {
@@ -439,10 +439,22 @@ impl RequestContext {
RequestContextBuilder::from(self)
.task_kind(task_kind)
.download_behavior(download_behavior)
.cancel(CancellationToken::new())
.detached_child()
}
/// Like [`Self::detached_child`], but with the ability to specify the cancellation token
pub fn detached_child_with_cancel(
&self,
task_kind: TaskKind,
download_behavior: DownloadBehavior,
cancel: CancellationToken,
) -> Self {
RequestContextBuilder::from(self)
.task_kind(task_kind)
.download_behavior(download_behavior)
.detached_child_with_cancel(cancel)
}
/// Create a child of context `self` for a task that shall not outlive `self`.
///
/// Use this when fanning-out work to other async tasks.
@@ -619,6 +631,10 @@ impl RequestContext {
pub(crate) fn has_perf_span(&self) -> bool {
self.perf_span.is_some()
}
pub(crate) fn cancellation_token(&self) -> &CancellationToken {
&self.cancel
}
}
/// [`Future`] extension trait that allow for creating performance

View File

@@ -190,7 +190,11 @@ async fn download_object(
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
pausable_failpoint!(
"before-downloading-layer-stream-pausable",
ctx.cancellation_token()
)
.map_err(|_| DownloadError::Cancelled)?;
let dst_path = destination_file.path().to_owned();
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(

View File

@@ -4441,7 +4441,7 @@ impl Timeline {
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
while let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
if cancel.is_cancelled() {
if cancel.is_cancelled() || ctx.cancellation_token().is_cancelled() {
return Err(GetVectoredError::Cancelled);
}

View File

@@ -536,9 +536,10 @@ impl ConnectionManagerState {
let protocol = self.conf.protocol;
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
let ctx = ctx.detached_child_with_cancel(
TaskKind::WalReceiverConnectionHandler,
DownloadBehavior::Download,
self.cancel.clone()
);
let span = info_span!("connection", %node_id);

View File

@@ -275,20 +275,12 @@ pub(super) async fn handle_walreceiver_connection(
let copy_stream = replication_client.copy_both_simple(&query).await?;
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx);
let walingest_res = select! {
walingest_res = walingest_future => walingest_res,
_ = cancellation.cancelled() => {
// We are doing reads in WalIngest::new, and those can hang as they come from the network.
// Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one.
debug!("Connection cancelled");
return Err(WalReceiverError::Cancelled);
},
};
let mut walingest = walingest_res.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let (format, compression) = match protocol {
PostgresClientProtocol::Interpreted {

View File

@@ -299,7 +299,7 @@ where
//
let mut request_storage = Some(request);
for attempt in 1.. {
if self.cancel.is_cancelled() {
if self.cancel.is_cancelled() || self.ctx.cancellation_token().is_cancelled() {
return Err(FlushTaskError::Cancelled);
}
let result = async {