mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
refactor: split get_or_maybe_download
This commit is contained in:
@@ -508,7 +508,8 @@ impl LayerInner {
|
||||
None
|
||||
}
|
||||
|
||||
/// Cancellation safe.
|
||||
/// Should be cancellation safe, but cancellation is troublesome together with the spawned
|
||||
/// download.
|
||||
async fn get_or_maybe_download(
|
||||
self: &Arc<Self>,
|
||||
allow_download: bool,
|
||||
@@ -546,36 +547,10 @@ impl LayerInner {
|
||||
return Err(DownloadError::NoRemoteStorage);
|
||||
}
|
||||
|
||||
if self.wanted_garbage_collected.load(Ordering::Acquire) {
|
||||
// it will fail because we should had already scheduled a delete and an
|
||||
// index update
|
||||
tracing::info!(%reason, "downloading a wanted garbage collected layer, this might fail");
|
||||
// FIXME: we probably do not gc delete until the file goes away...? unsure
|
||||
} else {
|
||||
tracing::debug!(%reason, "downloading layer");
|
||||
}
|
||||
tracing::debug!(%reason, "downloading layer");
|
||||
|
||||
if let Some(ctx) = ctx {
|
||||
use crate::context::DownloadBehavior::*;
|
||||
let b = ctx.download_behavior();
|
||||
match b {
|
||||
Download => {}
|
||||
Warn | Error => {
|
||||
tracing::warn!(
|
||||
"unexpectedly on-demand downloading remote layer {self} for task kind {:?}",
|
||||
ctx.task_kind()
|
||||
);
|
||||
crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
|
||||
|
||||
let really_error = matches!(b, Error)
|
||||
&& !self.conf.ondemand_download_behavior_treat_error_as_warn;
|
||||
|
||||
if really_error {
|
||||
// this check is only probablistic, seems like flakyness footgun
|
||||
return Err(DownloadError::ContextAndConfigReallyDeniesDownloads);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.check_expected_download(ctx)?;
|
||||
}
|
||||
|
||||
if !allow_download {
|
||||
@@ -584,102 +559,7 @@ impl LayerInner {
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
let task_name = format!("download layer {}", self);
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
|
||||
// block tenant::mgr::remove_tenant_from_memory.
|
||||
|
||||
let this: Arc<Self> = self.clone();
|
||||
crate::task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
crate::task_mgr::TaskKind::RemoteDownloadTask,
|
||||
Some(self.desc.tenant_id),
|
||||
Some(self.desc.timeline_id),
|
||||
&task_name,
|
||||
false,
|
||||
async move {
|
||||
let client = timeline
|
||||
.remote_client
|
||||
.as_ref()
|
||||
.expect("checked above with have_remote_client");
|
||||
|
||||
let result = client.download_layer_file(
|
||||
&this.desc.filename(),
|
||||
&LayerFileMetadata::new(
|
||||
this.desc.file_size,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let result = match result {
|
||||
Ok(size) => {
|
||||
timeline.metrics.resident_physical_size_gauge.add(size);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
Err(e)
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(res) = tx.send(result) {
|
||||
match res {
|
||||
Ok(()) => {
|
||||
// our caller is cancellation safe so this is fine; if someone
|
||||
// else requests the layer, they'll find it already downloaded
|
||||
// or redownload.
|
||||
//
|
||||
// however, could be that we should consider marking the layer
|
||||
// for eviction? alas, cannot: because only DownloadedLayer
|
||||
// will handle that.
|
||||
},
|
||||
Err(e) => {
|
||||
// our caller is cancellation safe, but we might be racing with
|
||||
// another attempt to initialize. before we have cancellation
|
||||
// token support: these attempts should converge regardless of
|
||||
// their completion order.
|
||||
tracing::error!("layer file download failed, and additionally failed to communicate this to caller: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
match rx.await {
|
||||
Ok(Ok(())) => {
|
||||
if let Some(reason) = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PostStatFailed)?
|
||||
{
|
||||
// this is really a bug in needs_download or remote timeline client
|
||||
panic!("post-condition failed: needs_download returned {reason:?}");
|
||||
}
|
||||
|
||||
self.consecutive_failures.store(0, Ordering::Relaxed);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
let consecutive_failures =
|
||||
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
1.5,
|
||||
60.0,
|
||||
);
|
||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||
|
||||
// unless we get cancelled, we will hold off the semaphore init
|
||||
tokio::time::sleep(backoff).await;
|
||||
|
||||
return Err(DownloadError::DownloadFailed);
|
||||
}
|
||||
Err(_gone) => {
|
||||
return Err(DownloadError::DownloadCancelled);
|
||||
}
|
||||
}
|
||||
self.spawn_download_and_wait(timeline).await?;
|
||||
} else {
|
||||
// the file is present locally, probably by a previous but cancelled call to
|
||||
// get_or_maybe_download.
|
||||
@@ -712,6 +592,134 @@ impl LayerInner {
|
||||
)
|
||||
}
|
||||
|
||||
fn check_expected_download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
|
||||
use crate::context::DownloadBehavior::*;
|
||||
let b = ctx.download_behavior();
|
||||
match b {
|
||||
Download => Ok(()),
|
||||
Warn | Error => {
|
||||
tracing::warn!(
|
||||
"unexpectedly on-demand downloading remote layer {self} for task kind {:?}",
|
||||
ctx.task_kind()
|
||||
);
|
||||
crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
|
||||
|
||||
let really_error =
|
||||
matches!(b, Error) && !self.conf.ondemand_download_behavior_treat_error_as_warn;
|
||||
|
||||
if really_error {
|
||||
// this check is only probablistic, seems like flakyness footgun
|
||||
Err(DownloadError::ContextAndConfigReallyDeniesDownloads)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Actual download, at most one is executed at the time.
|
||||
async fn spawn_download_and_wait(
|
||||
self: &Arc<Self>,
|
||||
timeline: Arc<Timeline>,
|
||||
) -> Result<(), DownloadError> {
|
||||
let task_name = format!("download layer {}", self);
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
|
||||
// block tenant::mgr::remove_tenant_from_memory.
|
||||
|
||||
let this: Arc<Self> = self.clone();
|
||||
crate::task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
crate::task_mgr::TaskKind::RemoteDownloadTask,
|
||||
Some(self.desc.tenant_id),
|
||||
Some(self.desc.timeline_id),
|
||||
&task_name,
|
||||
false,
|
||||
async move {
|
||||
let client = timeline
|
||||
.remote_client
|
||||
.as_ref()
|
||||
.expect("checked above with have_remote_client");
|
||||
|
||||
let result = client.download_layer_file(
|
||||
&this.desc.filename(),
|
||||
&LayerFileMetadata::new(
|
||||
this.desc.file_size,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let result = match result {
|
||||
Ok(size) => {
|
||||
timeline.metrics.resident_physical_size_gauge.add(size);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
Err(e)
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(res) = tx.send(result) {
|
||||
match res {
|
||||
Ok(()) => {
|
||||
// our caller is cancellation safe so this is fine; if someone
|
||||
// else requests the layer, they'll find it already downloaded
|
||||
// or redownload.
|
||||
//
|
||||
// however, could be that we should consider marking the layer
|
||||
// for eviction? alas, cannot: because only DownloadedLayer
|
||||
// will handle that.
|
||||
},
|
||||
Err(e) => {
|
||||
// our caller is cancellation safe, but we might be racing with
|
||||
// another attempt to initialize. before we have cancellation
|
||||
// token support: these attempts should converge regardless of
|
||||
// their completion order.
|
||||
tracing::error!("layer file download failed, and additionally failed to communicate this to caller: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
match rx.await {
|
||||
Ok(Ok(())) => {
|
||||
if let Some(reason) = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PostStatFailed)?
|
||||
{
|
||||
// this is really a bug in needs_download or remote timeline client
|
||||
panic!("post-condition failed: needs_download returned {reason:?}");
|
||||
}
|
||||
|
||||
self.consecutive_failures.store(0, Ordering::Relaxed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
let consecutive_failures =
|
||||
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
1.5,
|
||||
60.0,
|
||||
);
|
||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||
|
||||
// unless we get cancelled, we will hold off the semaphore init
|
||||
tokio::time::sleep(backoff).await;
|
||||
|
||||
Err(DownloadError::DownloadFailed)
|
||||
}
|
||||
Err(_gone) => Err(DownloadError::DownloadCancelled),
|
||||
}
|
||||
}
|
||||
|
||||
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
match tokio::fs::metadata(&self.path).await {
|
||||
Ok(m) => Ok(self.is_file_present_and_good_size(&m)),
|
||||
|
||||
Reference in New Issue
Block a user