mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
wip
This commit is contained in:
@@ -727,7 +727,7 @@ impl RemoteTimelineClient {
|
||||
reason: "no need for a downloads gauge",
|
||||
},
|
||||
);
|
||||
download::download_layer_file(
|
||||
let fut = download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
self.tenant_shard_id,
|
||||
@@ -744,8 +744,11 @@ impl RemoteTimelineClient {
|
||||
RemoteOpFileKind::Layer,
|
||||
RemoteOpKind::Download,
|
||||
Arc::clone(&self.metrics),
|
||||
)
|
||||
.await?
|
||||
);
|
||||
/*tokio::select! {
|
||||
res = fut => res,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
|
||||
}*/fut.await?
|
||||
};
|
||||
|
||||
REMOTE_ONDEMAND_DOWNLOADED_LAYERS.inc();
|
||||
|
||||
@@ -190,6 +190,8 @@ async fn download_object(
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Starting layer download");
|
||||
|
||||
pausable_failpoint!(
|
||||
"before-downloading-layer-stream-pausable",
|
||||
ctx.cancellation_token()
|
||||
|
||||
@@ -337,10 +337,14 @@ impl Layer {
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.0
|
||||
// good
|
||||
let fut = self.0
|
||||
.get_or_maybe_download(true, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone())
|
||||
.await
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_context| crnt_perf_context.clone());
|
||||
/*tokio::select! {
|
||||
res = fut => res,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(GetVectoredError::Cancelled),
|
||||
}*/fut.await
|
||||
.map_err(|err| match err {
|
||||
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
|
||||
GetVectoredError::Cancelled
|
||||
@@ -1041,10 +1045,11 @@ impl LayerInner {
|
||||
//
|
||||
// if we are cancelled while doing this `stat` the `self.inner` will be uninitialized. a
|
||||
// pending eviction will try to evict even upon finding an uninitialized `self.inner`.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
let needs_download = tokio::select! {
|
||||
dl = self.needs_download() => dl,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
|
||||
}
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
@@ -1054,8 +1059,12 @@ impl LayerInner {
|
||||
// the file is present locally because eviction has not had a chance to run yet
|
||||
|
||||
#[cfg(test)]
|
||||
self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
|
||||
.await?;
|
||||
//self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload)
|
||||
//.await?;
|
||||
tokio::select! {
|
||||
dl = self.failpoint(failpoints::FailpointKind::AfterDeterminingLayerNeedsNoDownload) => dl?,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
|
||||
}
|
||||
|
||||
LAYER_IMPL_METRICS.inc_init_needed_no_download();
|
||||
|
||||
@@ -1092,10 +1101,16 @@ impl LayerInner {
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self
|
||||
let fut = self
|
||||
.download_init_and_wait(timeline, permit, ctx.attached_child())
|
||||
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
|
||||
.await?;
|
||||
;
|
||||
let res = tokio::select! {
|
||||
res = fut => res,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(DownloadError::DownloadCancelled),
|
||||
}?;
|
||||
// bad
|
||||
//fut.await?;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
Ok(res)
|
||||
|
||||
@@ -1345,9 +1345,12 @@ impl Timeline {
|
||||
})
|
||||
.attached_child();
|
||||
|
||||
self.get_vectored_reconstruct_data(query.clone(), reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
let fut = self.get_vectored_reconstruct_data(query.clone(), reconstruct_state, &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone());
|
||||
/*tokio::select! {
|
||||
res = fut => res,
|
||||
_ = ctx.cancellation_token().cancelled() => return Err(GetVectoredError::Cancelled),
|
||||
}*/fut.await
|
||||
};
|
||||
|
||||
if let Err(err) = traversal_res {
|
||||
|
||||
Reference in New Issue
Block a user