mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Compare commits
5 Commits
skyzh/try-
...
arpad/no_d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
896420acce | ||
|
|
9d858b8cbe | ||
|
|
5415b6cb0d | ||
|
|
216ec91ef3 | ||
|
|
29d4f0638e |
@@ -92,6 +92,7 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
use utils::{id::TimelineId, shard::TenantShardId};
|
||||
|
||||
@@ -117,6 +118,7 @@ pub struct RequestContext {
|
||||
scope: Scope,
|
||||
perf_span: Option<PerfSpan>,
|
||||
perf_span_dispatch: Option<Dispatch>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -263,6 +265,10 @@ pub struct RequestContextBuilder {
|
||||
impl RequestContextBuilder {
|
||||
/// A new builder with default settings
|
||||
pub fn new(task_kind: TaskKind) -> Self {
|
||||
Self::new_with_cancel(task_kind, CancellationToken::new())
|
||||
}
|
||||
/// A new builder with default settings, with ability to specify the cancellation token
|
||||
pub(crate) fn new_with_cancel(task_kind: TaskKind, cancel: CancellationToken) -> Self {
|
||||
Self {
|
||||
inner: RequestContext {
|
||||
task_kind,
|
||||
@@ -273,6 +279,7 @@ impl RequestContextBuilder {
|
||||
scope: Scope::new_global(),
|
||||
perf_span: None,
|
||||
perf_span_dispatch: None,
|
||||
cancel,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -358,11 +365,18 @@ impl RequestContextBuilder {
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn attached_child(self) -> RequestContext {
|
||||
pub fn attached_child(mut self) -> RequestContext {
|
||||
self.inner.cancel = self.inner.cancel.child_token();
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn detached_child(self) -> RequestContext {
|
||||
pub fn detached_child(mut self) -> RequestContext {
|
||||
self.inner.cancel = CancellationToken::new();
|
||||
self.inner
|
||||
}
|
||||
|
||||
pub fn detached_child_with_cancel(mut self, cancel: CancellationToken) -> RequestContext {
|
||||
self.inner.cancel = cancel;
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
@@ -382,6 +396,7 @@ impl RequestContext {
|
||||
scope: self.scope.clone(),
|
||||
perf_span: self.perf_span.clone(),
|
||||
perf_span_dispatch: self.perf_span_dispatch.clone(),
|
||||
cancel: self.cancel.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,6 +442,19 @@ impl RequestContext {
|
||||
.detached_child()
|
||||
}
|
||||
|
||||
/// Like [`Self::detached_child`], but with the ability to specify the cancellation token
|
||||
pub fn detached_child_with_cancel(
|
||||
&self,
|
||||
task_kind: TaskKind,
|
||||
download_behavior: DownloadBehavior,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
RequestContextBuilder::from(self)
|
||||
.task_kind(task_kind)
|
||||
.download_behavior(download_behavior)
|
||||
.detached_child_with_cancel(cancel)
|
||||
}
|
||||
|
||||
/// Create a child of context `self` for a task that shall not outlive `self`.
|
||||
///
|
||||
/// Use this when fanning-out work to other async tasks.
|
||||
@@ -603,6 +631,10 @@ impl RequestContext {
|
||||
pub(crate) fn has_perf_span(&self) -> bool {
|
||||
self.perf_span.is_some()
|
||||
}
|
||||
|
||||
pub(crate) fn cancellation_token(&self) -> &CancellationToken {
|
||||
&self.cancel
|
||||
}
|
||||
}
|
||||
|
||||
/// [`Future`] extension trait that allow for creating performance
|
||||
|
||||
@@ -727,7 +727,7 @@ impl RemoteTimelineClient {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
download::download_layer_file(
|
||||
let fut = download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
self.tenant_shard_id,
|
||||
@@ -744,8 +744,11 @@ impl RemoteTimelineClient {
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
)
|
||||
.await?
|
||||
);
|
||||
/*tokio::select! {
|
||||
res = fut => res,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
|
||||
}*/fut.await?
|
||||
};
|
||||
|
||||
REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
|
||||
|
||||
@@ -190,7 +190,13 @@ async fn download_object(
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
tracing::info!("Starting layer download");
|
||||
|
||||
pausable_failpoint!(
|
||||
"before-downloading-layer-stream-pausable",
|
||||
ctx.cancellation_token()
|
||||
)
|
||||
.map_err(|_| DownloadError::Cancelled)?;
|
||||
|
||||
let dst_path = destination_file.path().to_owned();
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
|
||||
|
||||
@@ -337,10 +337,14 @@ impl Layer {
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.0
|
||||
// good
|
||||
let fut = self.0
|
||||
.get_or_maybe_download(true, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone())
|
||||
.await
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone());
|
||||
/*tokio::select! {
|
||||
res = fut => res,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(GetVectoredError::Cancelled),
|
||||
}*/fut.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
@@ -1041,10 +1045,11 @@ impl LayerInner {
|
||||
//
|
||||
// if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
|
||||
// pending eviction will try to evict even upon finding an uninitialized `self.inner`.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
let needs_download = tokio::select! {
|
||||
dl = self.needs_download() => dl,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
|
||||
}
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
@@ -1054,8 +1059,12 @@ impl LayerInner {
|
||||
// the file is present locally because eviction has not had a chance to run yet
|
||||
|
||||
#[cfg(test)]
|
||||
self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
|
||||
.await?;
|
||||
//self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
|
||||
//.await?;
|
||||
tokio::select! {
|
||||
dl = self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload) => dl?,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
|
||||
}
|
||||
|
||||
LAYER_IMPL_METRICS.inc_init_needed_no_download();
|
||||
|
||||
@@ -1092,10 +1101,16 @@ impl LayerInner {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
let fut = self
|
||||
.download_init_and_wait(timeline, permit, ctx.attached_child())
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await?;
|
||||
;
|
||||
let res = tokio::select! {
|
||||
res = fut => res,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
|
||||
}?;
|
||||
// bad
|
||||
//fut.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
Ok(res)
|
||||
|
||||
@@ -1345,9 +1345,12 @@ impl Timeline {
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.get_vectored_reconstruct_data(query.clone(), reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
let fut = self.get_vectored_reconstruct_data(query.clone(), reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone());
|
||||
/*tokio::select! {
|
||||
res = fut => res,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(GetVectoredError::Cancelled),
|
||||
}*/fut.await
|
||||
};
|
||||
|
||||
if let Err(err) = traversal_res {
|
||||
@@ -4441,7 +4444,7 @@ impl Timeline {
|
||||
let mut image_covered_keyspace = KeySpaceRandomAccum::new();
|
||||
|
||||
while let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
|
||||
if cancel.is_cancelled() {
|
||||
if cancel.is_cancelled() || ctx.cancellation_token().is_cancelled() {
|
||||
return Err(GetVectoredError::Cancelled);
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ use utils::postgres_client::{ConnectionConfigArgs, wal_stream_connection_config}
|
||||
|
||||
use super::walreceiver_connection::{WalConnectionStatus, WalReceiverError};
|
||||
use super::{TaskEvent, TaskHandle, TaskStateUpdate, WalReceiverConf};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::metrics::{
|
||||
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
|
||||
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
|
||||
@@ -536,16 +536,18 @@ impl ConnectionManagerState {
|
||||
let protocol = self.conf.protocol;
|
||||
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
|
||||
let timeline = Arc::clone(&self.timeline);
|
||||
let ctx = ctx.detached_child(
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
DownloadBehavior::Download,
|
||||
);
|
||||
|
||||
let ctx_builder = RequestContextBuilder::from(ctx)
|
||||
.task_kind(TaskKind::WalReceiverConnectionHandler)
|
||||
.download_behavior(DownloadBehavior::Download);
|
||||
|
||||
let span = info_span!("connection", %node_id);
|
||||
let connection_handle = self.spawn(move |events_sender, cancellation| {
|
||||
async move {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let ctx = ctx_builder.detached_child_with_cancel(cancellation.clone());
|
||||
|
||||
let res = super::walreceiver_connection::handle_walreceiver_connection(
|
||||
timeline,
|
||||
protocol,
|
||||
|
||||
@@ -275,20 +275,12 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let copy_stream = replication_client.copy_both_simple(&query).await?;
|
||||
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
|
||||
|
||||
let walingest_future = WalIngest::new(timeline.as_ref(), startpoint, &ctx);
|
||||
let walingest_res = select! {
|
||||
walingest_res = walingest_future => walingest_res,
|
||||
_ = cancellation.cancelled() => {
|
||||
// We are doing reads in WalIngest::new, and those can hang as they come from the network.
|
||||
// Timeline cancellation hits the walreceiver cancellation token before it hits the timeline global one.
|
||||
debug!("Connection cancelled");
|
||||
return Err(WalReceiverError::Cancelled);
|
||||
},
|
||||
};
|
||||
let mut walingest = walingest_res.map_err(|e| match e.kind {
|
||||
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
|
||||
_ => WalReceiverError::Other(e.into()),
|
||||
})?;
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.map_err(|e| match e.kind {
|
||||
crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
|
||||
_ => WalReceiverError::Other(e.into()),
|
||||
})?;
|
||||
|
||||
let (format, compression) = match protocol {
|
||||
PostgresClientProtocol::Interpreted {
|
||||
|
||||
@@ -299,7 +299,7 @@ where
|
||||
//
|
||||
let mut request_storage = Some(request);
|
||||
for attempt in 1.. {
|
||||
if self.cancel.is_cancelled() {
|
||||
if self.cancel.is_cancelled() || self.ctx.cancellation_token().is_cancelled() {
|
||||
return Err(FlushTaskError::Cancelled);
|
||||
}
|
||||
let result = async {
|
||||
|
||||
Reference in New Issue
Block a user