mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 11:30:37 +00:00
fix(Layer): always init after downloading in the spawned task (#7175)
Before this PR, cancellation for `LayerInner::get_or_maybe_download` could occur so that we have downloaded the layer file in the filesystem, but because of the cancellation chance, we have not set the internal `LayerInner::inner` or initialized the state. With the detached init support introduced in #7135 and in place in #7152, we can now initialize the internal state after successfully downloading in the spawned task. The next PR will fix the remaining problems that this PR leaves: - `Layer::keep_resident` is still used because - `Layer::get_or_maybe_download` always cancels an eviction, even when canceled Split off from #7030. Stacked on top of #7152. Cc: #5331.
This commit is contained in:
@@ -702,6 +702,11 @@ impl LayerInner {
|
||||
allow_download: bool,
|
||||
ctx: Option<&RequestContext>,
|
||||
) -> Result<Arc<DownloadedLayer>, DownloadError> {
|
||||
// get_or_init_detached can:
|
||||
// - be fast (mutex lock) OR uncontested semaphore permit acquire
|
||||
// - be slow (wait for semaphore permit or closing)
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
let (weak, permit) = {
|
||||
let locked = self
|
||||
.inner
|
||||
@@ -736,6 +741,8 @@ impl LayerInner {
|
||||
}
|
||||
};
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
if let Some(weak) = weak {
|
||||
// only drop the weak after dropping the heavier_once_cell guard
|
||||
assert!(
|
||||
@@ -744,86 +751,57 @@ impl LayerInner {
|
||||
);
|
||||
}
|
||||
|
||||
let timeline = self
|
||||
.timeline
|
||||
.upgrade()
|
||||
.ok_or_else(|| DownloadError::TimelineShutdown)?;
|
||||
|
||||
// count cancellations, which currently remain largely unexpected
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
// check if we really need to be downloaded; could have been already downloaded by a
|
||||
// cancelled previous attempt.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
let needs_download = needs_download?;
|
||||
|
||||
let Some(reason) = needs_download else {
|
||||
// the file is present locally, probably by a previous but cancelled call to
|
||||
// get_or_maybe_download. alternatively we might be running without remote storage.
|
||||
LAYER_IMPL_METRICS.inc_init_needed_no_download();
|
||||
|
||||
return Ok(self.initialize_after_layer_is_on_disk(permit));
|
||||
};
|
||||
|
||||
if let NeedsDownload::NotFile(ft) = reason {
|
||||
return Err(DownloadError::NotFile(ft));
|
||||
}
|
||||
|
||||
if timeline.remote_client.as_ref().is_none() {
|
||||
return Err(DownloadError::NoRemoteStorage);
|
||||
}
|
||||
|
||||
if let Some(ctx) = ctx {
|
||||
self.check_expected_download(ctx)?;
|
||||
}
|
||||
|
||||
if !allow_download {
|
||||
// this does look weird, but for LayerInner the "downloading" means also changing
|
||||
// internal once related state ...
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
async move {
|
||||
// disable any scheduled but not yet running eviction deletions for this
|
||||
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// no need to make the evict_and_wait wait for the actual download to complete
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
|
||||
let timeline = self
|
||||
.timeline
|
||||
.upgrade()
|
||||
.ok_or_else(|| DownloadError::TimelineShutdown)?;
|
||||
|
||||
// count cancellations, which currently remain largely unexpected
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
// check if we really need to be downloaded; could have been already downloaded by a
|
||||
// cancelled previous attempt.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
|
||||
let needs_download = match needs_download {
|
||||
Ok(reason) => reason,
|
||||
Err(e) => {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let Some(reason) = needs_download else {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
// the file is present locally, probably by a previous but cancelled call to
|
||||
// get_or_maybe_download. alternatively we might be running without remote storage.
|
||||
LAYER_IMPL_METRICS.inc_init_needed_no_download();
|
||||
|
||||
let res = self.initialize_after_layer_is_on_disk(next_version, permit, false);
|
||||
return Ok(res);
|
||||
};
|
||||
|
||||
if let NeedsDownload::NotFile(ft) = reason {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::NotFile(ft));
|
||||
}
|
||||
|
||||
// only reset this after we've decided we really need to download. otherwise it'd
|
||||
// be impossible to mark cancelled downloads for eviction, like one could imagine
|
||||
// we would like to do for prefetching which was not needed.
|
||||
self.wanted_evicted.store(false, Ordering::Release);
|
||||
|
||||
if timeline.remote_client.as_ref().is_none() {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::NoRemoteStorage);
|
||||
}
|
||||
|
||||
if let Some(ctx) = ctx {
|
||||
let res = self.check_expected_download(ctx);
|
||||
if let Err(e) = res {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
if !allow_download {
|
||||
// this does look weird, but for LayerInner the "downloading" means also changing
|
||||
// internal once related state ...
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let permit = self.spawn_download_and_wait(timeline, permit).await;
|
||||
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
let res = self.download_init_and_wait(timeline, permit).await?;
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
let permit = permit?;
|
||||
|
||||
let res = self.initialize_after_layer_is_on_disk(next_version, permit, true);
|
||||
Ok(res)
|
||||
}
|
||||
.instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
|
||||
@@ -857,11 +835,11 @@ impl LayerInner {
|
||||
}
|
||||
|
||||
/// Actual download, at most one is executed at the time.
|
||||
async fn spawn_download_and_wait(
|
||||
async fn download_init_and_wait(
|
||||
self: &Arc<Self>,
|
||||
timeline: Arc<Timeline>,
|
||||
permit: heavier_once_cell::InitPermit,
|
||||
) -> Result<heavier_once_cell::InitPermit, DownloadError> {
|
||||
) -> Result<Arc<DownloadedLayer>, DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
@@ -873,66 +851,24 @@ impl LayerInner {
|
||||
.enter()
|
||||
.map_err(|_| DownloadError::DownloadCancelled)?;
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
|
||||
tokio::task::spawn(
|
||||
async move {
|
||||
let _guard = guard;
|
||||
|
||||
let client = timeline
|
||||
.remote_client
|
||||
.as_ref()
|
||||
.expect("checked above with have_remote_client");
|
||||
drop(this.status.send(Status::Downloaded));
|
||||
|
||||
let result = client.download_layer_file(
|
||||
&this.desc.filename(),
|
||||
&this.metadata(),
|
||||
&timeline.cancel
|
||||
)
|
||||
.await;
|
||||
let res = this.download_and_init(timeline, permit).await;
|
||||
|
||||
let result = match result {
|
||||
Ok(size) => {
|
||||
timeline.metrics.resident_physical_size_add(size);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
let consecutive_failures =
|
||||
this.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
1.5,
|
||||
60.0,
|
||||
);
|
||||
|
||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(backoff) => {},
|
||||
_ = timeline.cancel.cancelled() => {},
|
||||
};
|
||||
|
||||
Err(e)
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(res) = tx.send((result, permit)) {
|
||||
if let Err(res) = tx.send(res) {
|
||||
match res {
|
||||
(Ok(()), _) => {
|
||||
// our caller is cancellation safe so this is fine; if someone
|
||||
// else requests the layer, they'll find it already downloaded.
|
||||
//
|
||||
// See counter [`LayerImplMetrics::inc_init_needed_no_download`]
|
||||
//
|
||||
// FIXME(#6028): however, could be that we should consider marking the
|
||||
// layer for eviction? alas, cannot: because only DownloadedLayer will
|
||||
// handle that.
|
||||
},
|
||||
(Err(e), _) => {
|
||||
// our caller is cancellation safe, but we might be racing with
|
||||
// another attempt to initialize. before we have cancellation
|
||||
// token support: these attempts should converge regardless of
|
||||
// their completion order.
|
||||
tracing::error!("layer file download failed, and additionally failed to communicate this to caller: {e:?}");
|
||||
Ok(_res) => {
|
||||
tracing::debug!("layer initialized, but caller has been cancelled");
|
||||
LAYER_IMPL_METRICS.inc_init_completed_without_requester();
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!(
|
||||
"layer file download failed, and caller has been cancelled: {e:?}"
|
||||
);
|
||||
LAYER_IMPL_METRICS.inc_download_failed_without_requester();
|
||||
}
|
||||
}
|
||||
@@ -942,41 +878,100 @@ impl LayerInner {
|
||||
);
|
||||
|
||||
match rx.await {
|
||||
Ok((Ok(()), permit)) => {
|
||||
if let Some(reason) = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PostStatFailed)?
|
||||
{
|
||||
// this is really a bug in needs_download or remote timeline client
|
||||
panic!("post-condition failed: needs_download returned {reason:?}");
|
||||
}
|
||||
|
||||
self.consecutive_failures.store(0, Ordering::Relaxed);
|
||||
tracing::info!(size=%self.desc.file_size, "on-demand download successful");
|
||||
|
||||
Ok(permit)
|
||||
}
|
||||
Ok((Err(e), _permit)) => {
|
||||
Ok(Ok(res)) => Ok(res),
|
||||
Ok(Err(e)) => {
|
||||
// sleep already happened in the spawned task, if it was not cancelled
|
||||
let consecutive_failures = self.consecutive_failures.load(Ordering::Relaxed);
|
||||
|
||||
match e.downcast_ref::<remote_storage::DownloadError>() {
|
||||
// If the download failed due to its cancellation token,
|
||||
// propagate the cancellation error upstream.
|
||||
Some(remote_storage::DownloadError::Cancelled) => {
|
||||
Err(DownloadError::DownloadCancelled)
|
||||
}
|
||||
_ => {
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
Err(DownloadError::DownloadFailed)
|
||||
}
|
||||
_ => Err(DownloadError::DownloadFailed),
|
||||
}
|
||||
}
|
||||
Err(_gone) => Err(DownloadError::DownloadCancelled),
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_and_init(
|
||||
self: &Arc<LayerInner>,
|
||||
timeline: Arc<Timeline>,
|
||||
permit: heavier_once_cell::InitPermit,
|
||||
) -> anyhow::Result<Arc<DownloadedLayer>> {
|
||||
let client = timeline
|
||||
.remote_client
|
||||
.as_ref()
|
||||
.expect("checked before download_init_and_wait");
|
||||
|
||||
let result = client
|
||||
.download_layer_file(&self.desc.filename(), &self.metadata(), &timeline.cancel)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(size) => {
|
||||
assert_eq!(size, self.desc.file_size);
|
||||
|
||||
match self.needs_download().await {
|
||||
Ok(Some(reason)) => {
|
||||
// this is really a bug in needs_download or remote timeline client
|
||||
panic!("post-condition failed: needs_download returned {reason:?}");
|
||||
}
|
||||
Ok(None) => {
|
||||
// as expected
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("post-condition failed: needs_download errored: {e:?}");
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(size=%self.desc.file_size, "on-demand download successful");
|
||||
timeline
|
||||
.metrics
|
||||
.resident_physical_size_add(self.desc.file_size);
|
||||
self.consecutive_failures.store(0, Ordering::Relaxed);
|
||||
|
||||
let since_last_eviction = self
|
||||
.last_evicted_at
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.map(|ts| ts.elapsed());
|
||||
if let Some(since_last_eviction) = since_last_eviction {
|
||||
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
|
||||
}
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
|
||||
Ok(self.initialize_after_layer_is_on_disk(permit))
|
||||
}
|
||||
Err(e) => {
|
||||
let consecutive_failures =
|
||||
1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
|
||||
|
||||
let backoff = utils::backoff::exponential_backoff_duration_seconds(
|
||||
consecutive_failures.min(u32::MAX as usize) as u32,
|
||||
1.5,
|
||||
60.0,
|
||||
);
|
||||
|
||||
let backoff = std::time::Duration::from_secs_f64(backoff);
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(backoff) => {},
|
||||
_ = timeline.cancel.cancelled() => {},
|
||||
};
|
||||
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Initializes the `Self::inner` to a "resident" state.
|
||||
///
|
||||
/// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
|
||||
@@ -986,25 +981,22 @@ impl LayerInner {
|
||||
/// changes are made before we can write to the OnceCell in non-cancellable fashion.
|
||||
fn initialize_after_layer_is_on_disk(
|
||||
self: &Arc<LayerInner>,
|
||||
next_version: usize,
|
||||
permit: heavier_once_cell::InitPermit,
|
||||
downloaded: bool,
|
||||
) -> Arc<DownloadedLayer> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
if downloaded {
|
||||
let since_last_eviction = self
|
||||
.last_evicted_at
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.map(|ts| ts.elapsed());
|
||||
if let Some(since_last_eviction) = since_last_eviction {
|
||||
// FIXME: this will not always be recorded correctly until #6028 (the no
|
||||
// download needed branch above)
|
||||
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
|
||||
}
|
||||
}
|
||||
// disable any scheduled but not yet running eviction deletions for this
|
||||
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// only reset this after we've decided we really need to download. otherwise it'd
|
||||
// be impossible to mark cancelled downloads for eviction, like one could imagine
|
||||
// we would like to do for prefetching which was not needed.
|
||||
self.wanted_evicted.store(false, Ordering::Release);
|
||||
|
||||
// re-send the notification we've already sent when we started to download, just so
|
||||
// evict_and_wait does not need to wait for the download to complete. note that this is
|
||||
// sent when initializing after finding the file on the disk.
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
|
||||
let res = Arc::new(DownloadedLayer {
|
||||
owner: Arc::downgrade(self),
|
||||
@@ -1012,15 +1004,9 @@ impl LayerInner {
|
||||
version: next_version,
|
||||
});
|
||||
|
||||
// FIXME: this might now be double-accounted for !downloaded
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
|
||||
let waiters = self.inner.initializer_count();
|
||||
if waiters > 0 {
|
||||
tracing::info!(waiters, "completing the on-demand download for other tasks");
|
||||
tracing::info!(waiters, "completing layer init for other tasks");
|
||||
}
|
||||
|
||||
let value = ResidentOrWantedEvicted::Resident(res.clone());
|
||||
@@ -1268,8 +1254,6 @@ pub(crate) enum DownloadError {
|
||||
DownloadCancelled,
|
||||
#[error("pre-condition: stat before download failed")]
|
||||
PreStatFailed(#[source] std::io::Error),
|
||||
#[error("post-condition: stat after download failed")]
|
||||
PostStatFailed(#[source] std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
@@ -1694,6 +1678,12 @@ impl LayerImplMetrics {
|
||||
self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
|
||||
}
|
||||
|
||||
/// Expected rare just as cancellations are rare, but we could have cancellations separate from
|
||||
/// the single caller which can start the download, so use this counter to separte them.
|
||||
fn inc_init_completed_without_requester(&self) {
|
||||
self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because cancellations are unexpected, and failures are unexpected
|
||||
fn inc_download_failed_without_requester(&self) {
|
||||
self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
|
||||
@@ -1778,6 +1768,7 @@ impl DeleteFailed {
|
||||
#[derive(enum_map::Enum)]
|
||||
enum RareEvent {
|
||||
RemoveOnDropFailed,
|
||||
InitCompletedWithoutRequester,
|
||||
DownloadFailedWithoutRequester,
|
||||
UpgradedWantedEvicted,
|
||||
InitWithoutDownload,
|
||||
@@ -1791,6 +1782,7 @@ impl RareEvent {
|
||||
|
||||
match self {
|
||||
RemoveOnDropFailed => "remove_on_drop_failed",
|
||||
InitCompletedWithoutRequester => "init_completed_without",
|
||||
DownloadFailedWithoutRequester => "download_failed_without",
|
||||
UpgradedWantedEvicted => "raced_wanted_evicted",
|
||||
InitWithoutDownload => "init_needed_no_download",
|
||||
|
||||
Reference in New Issue
Block a user