Compare commits

...

5 Commits

Author SHA1 Message Date
Arpad Müller
896420acce wip 2025-07-01 01:15:19 +02:00
Arpad Müller
9d858b8cbe pass the right cancellation token 2025-06-30 16:08:18 +02:00
Arpad Müller
5415b6cb0d pass through cancellations 2025-06-30 16:08:18 +02:00
Arpad Müller
216ec91ef3 Respect cancellation for child generation 2025-06-30 16:08:18 +02:00
Arpad Müller
29d4f0638e Add cancellation token to RequestContext 2025-06-30 16:08:18 +02:00
8 changed files with 94 additions and 41 deletions

View File

@@ -92,6 +92,7 @@
use std::{sync::Arc, time::Duration};
use once_cell::sync::Lazy;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use utils::{id::TimelineId, shard::TenantShardId};
@@ -117,6 +118,7 @@ pub struct RequestContext {
scope: Scope,
perf_span: Option<PerfSpan>,
perf_span_dispatch: Option<Dispatch>,
cancel: CancellationToken,
}
#[derive(Clone)]
@@ -263,6 +265,10 @@ pub struct RequestContextBuilder {
impl RequestContextBuilder {
/// A new builder with default settings
pub fn new(task_kind: TaskKind) -> Self {
Self::new_with_cancel(task_kind, CancellationToken::new())
}
/// A new builder with default settings, with ability to specify the cancellation token
pub(crate) fn new_with_cancel(task_kind: TaskKind, cancel: CancellationToken) -> Self {
Self {
inner: RequestContext {
task_kind,
@@ -273,6 +279,7 @@ impl RequestContextBuilder {
scope: Scope::new_global(),
perf_span: None,
perf_span_dispatch: None,
cancel,
},
}
}
@@ -358,11 +365,18 @@ impl RequestContextBuilder {
self.inner
}
pub fn attached_child(self) -> RequestContext {
pub fn attached_child(mut self) -> RequestContext {
self.inner.cancel = self.inner.cancel.child_token();
self.inner
}
pub fn detached_child(self) -> RequestContext {
pub fn detached_child(mut self) -> RequestContext {
self.inner.cancel = CancellationToken::new();
self.inner
}
pub fn detached_child_with_cancel(mut self, cancel: CancellationToken) -> RequestContext {
self.inner.cancel = cancel;
self.inner
}
}
@@ -382,6 +396,7 @@ impl RequestContext {
scope: self.scope.clone(),
perf_span: self.perf_span.clone(),
perf_span_dispatch: self.perf_span_dispatch.clone(),
cancel: self.cancel.clone(),
}
}
@@ -427,6 +442,19 @@ impl RequestContext {
.detached_child()
}
/// Like [`Self::detached_child`], but with the ability to specify the cancellation token
pub fn detached_child_with_cancel(
&self,
task_kind: TaskKind,
download_behavior: DownloadBehavior,
cancel: CancellationToken,
) -> Self {
RequestContextBuilder::from(self)
.task_kind(task_kind)
.download_behavior(download_behavior)
.detached_child_with_cancel(cancel)
}
/// Create a child of context `self` for a task that shall not outlive `self`.
///
/// Use this when fanning-out work to other async tasks.
@@ -603,6 +631,10 @@ impl RequestContext {
pub(crate) fn has_perf_span(&self) -> bool {
self.perf_span.is_some()
}
pub(crate) fn cancellation_token(&self) -> &CancellationToken {
&self.cancel
}
}
/// [`Future`] extension trait that allow for creating performance

View File

@@ -727,7 +727,7 @@ impl RemoteTimelineClient {
reason: "no need for a downloads gauge",
},
);
download::download_layer_file(
let fut = download::download_layer_file(
self.conf,
&self.storage_impl,
self.tenant_shard_id,
@@ -744,8 +744,11 @@ impl RemoteTimelineClient {
RemoteOpFileKind::Layer,
RemoteOpKind::Download,
Arc::clone(&self.metrics),
)
.await?
);
/*tokio::select! {
res = fut => res,
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
}*/fut.await?
};
REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();

View File

@@ -190,7 +190,13 @@ async fn download_object(
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
tracing::info!("Starting layer download");
pausable_failpoint!(
"before-downloading-layer-stream-pausable",
ctx.cancellation_token()
)
.map_err(|_| DownloadError::Cancelled)?;
let dst_path = destination_file.path().to_owned();
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(

View File

@@ -337,10 +337,14 @@ impl Layer {
})
.attached_child();
self.0
// good
let fut = self.0
.get_or_maybe_download(true, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone())
.await
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone());
/*tokio::select! {
res = fut => res,
_ = ctx.cancellation_token().cancelled() => return Err(GetVectoredError::Cancelled),
}*/fut.await
.map_err(|err| match err {
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
@@ -1041,10 +1045,11 @@ impl LayerInner {
//
// if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
// pending eviction will try to evict even upon finding an uninitialized `self.inner`.
let needs_download = self
.needs_download()
.await
.map_err(DownloadError::PreStatFailed);
let needs_download = tokio::select! {
dl = self.needs_download() => dl,
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
}
.map_err(DownloadError::PreStatFailed);
scopeguard::ScopeGuard::into_inner(init_cancelled);
@@ -1054,8 +1059,12 @@ impl LayerInner {
// the file is present locally because eviction has not had a chance to run yet
#[cfg(test)]
self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
.await?;
//self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
//.await?;
tokio::select! {
dl = self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload) => dl?,
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
}
LAYER_IMPL_METRICS.inc_init_needed_no_download();
@@ -1092,10 +1101,16 @@ impl LayerInner {
tracing::info!(%reason, "downloading on-demand");
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let res = self
let fut = self
.download_init_and_wait(timeline, permit, ctx.attached_child())
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
.await?;
;
let res = tokio::select! {
res = fut => res,
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
}?;
// bad
//fut.await?;
scopeguard::ScopeGuard::into_inner(init_cancelled);
Ok(res)

View File

@@ -1345,9 +1345,12 @@ impl Timeline {
})
.attached_child();
self.get_vectored_reconstruct_data(query.clone(), reconstruct_state, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
let fut = self.get_vectored_reconstruct_data(query.clone(), reconstruct_state, &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone());
/*tokio::select! {
res = fut => res,
_ = ctx.cancellation_token().cancelled() => return Err(GetVectoredError::Cancelled),
}*/fut.await
};
if let Err(err) = traversal_res {
@@ -4441,7 +4444,7 @@ impl Timeline {
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
while let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
if cancel.is_cancelled() {
if cancel.is_cancelled() || ctx.cancellation_token().is_cancelled() {
return Err(GetVectoredError::Cancelled);
}

View File

@@ -36,7 +36,7 @@ use utils::postgres_client::{ConnectionConfigArgs, wal_stream_connection_config}
use super::walreceiver_connection::{WalConnectionStatus, WalReceiverError};
use super::{TaskEvent, TaskHandle, TaskStateUpdate, WalReceiverConf};
use crate::context::{DownloadBehavior, RequestContext};
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::metrics::{
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
@@ -536,16 +536,18 @@ impl ConnectionManagerState {
let protocol = self.conf.protocol;
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
DownloadBehavior::Download,
);
let ctx_builder = RequestContextBuilder::from(ctx)
.task_kind(TaskKind::WalReceiverConnectionHandler)
.download_behavior(DownloadBehavior::Download);
let span = info_span!("connection", %node_id);
let connection_handle = self.spawn(move |events_sender, cancellation| {
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
let ctx = ctx_builder.detached_child_with_cancel(cancellation.clone());
let res = super::walreceiver_connection::handle_walreceiver_connection(
timeline,
protocol,

View File

@@ -275,20 +275,12 @@ pub(super) async fn handle_walreceiver_connection(
let copy_stream = replication_client.copy_both_simple(&query).await?;
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx);
let walingest_res = select! {
walingest_res = walingest_future => walingest_res,
_ = cancellation.cancelled() => {
// We are doing reads in WalIngest::new, and those can hang as they come from the network.
// Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one.
debug!("Connection cancelled");
return Err(WalReceiverError::Cancelled);
},
};
let mut walingest = walingest_res.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
_ => WalReceiverError::Other(e.into()),
})?;
let (format, compression) = match protocol {
PostgresClientProtocol::Interpreted {

View File

@@ -299,7 +299,7 @@ where
//
let mut request_storage = Some(request);
for attempt in 1.. {
if self.cancel.is_cancelled() {
if self.cancel.is_cancelled() || self.ctx.cancellation_token().is_cancelled() {
return Err(FlushTaskError::Cancelled);
}
let result = async {