From e961e0d3df1e7040221300fbb3d3e654257e4cad Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 20 Mar 2024 20:37:47 +0200 Subject: [PATCH] fix(Layer): always init after downloading in the spawned task (#7175) Before this PR, cancellation for `LayerInner::get_or_maybe_download` could occur so that we have downloaded the layer file in the filesystem, but because of the cancellation chance, we have not set the internal `LayerInner::inner` or initialized the state. With the detached init support introduced in #7135 and in place in #7152, we can now initialize the internal state after successfully downloading in the spawned task. The next PR will fix the remaining problems that this PR leaves: - `Layer::keep_resident` is still used because - `Layer::get_or_maybe_download` always cancels an eviction, even when canceled Split off from #7030. Stacked on top of #7152. Cc: #5331. --- pageserver/src/tenant/storage_layer/layer.rs | 350 +++++++++---------- 1 file changed, 171 insertions(+), 179 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index eed423c3e6..626fd69ef3 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -702,6 +702,11 @@ impl LayerInner { allow_download: bool, ctx: Option<&RequestContext>, ) -> Result, DownloadError> { + // get_or_init_detached can: + // - be fast (mutex lock) OR uncontested semaphore permit acquire + // - be slow (wait for semaphore permit or closing) + let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled()); + let (weak, permit) = { let locked = self .inner @@ -736,6 +741,8 @@ impl LayerInner { } }; + scopeguard::ScopeGuard::into_inner(init_cancelled); + if let Some(weak) = weak { // only drop the weak after dropping the heavier_once_cell guard assert!( @@ -744,86 +751,57 @@ impl LayerInner { ); } + let timeline = self + .timeline + .upgrade() + .ok_or_else(|| DownloadError::TimelineShutdown)?; + + // count cancellations, which currently remain largely unexpected + let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled()); + + // check if we really need to be downloaded; could have been already downloaded by a + // cancelled previous attempt. + let needs_download = self + .needs_download() + .await + .map_err(DownloadError::PreStatFailed); + + scopeguard::ScopeGuard::into_inner(init_cancelled); + + let needs_download = needs_download?; + + let Some(reason) = needs_download else { + // the file is present locally, probably by a previous but cancelled call to + // get_or_maybe_download. alternatively we might be running without remote storage. + LAYER_IMPL_METRICS.inc_init_needed_no_download(); + + return Ok(self.initialize_after_layer_is_on_disk(permit)); + }; + + if let NeedsDownload::NotFile(ft) = reason { + return Err(DownloadError::NotFile(ft)); + } + + if timeline.remote_client.as_ref().is_none() { + return Err(DownloadError::NoRemoteStorage); + } + + if let Some(ctx) = ctx { + self.check_expected_download(ctx)?; + } + + if !allow_download { + // this does look weird, but for LayerInner the "downloading" means also changing + // internal once related state ... + return Err(DownloadError::DownloadRequired); + } + async move { - // disable any scheduled but not yet running eviction deletions for this - let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed); - - // no need to make the evict_and_wait wait for the actual download to complete - drop(self.status.send(Status::Downloaded)); - - let timeline = self - .timeline - .upgrade() - .ok_or_else(|| DownloadError::TimelineShutdown)?; - - // count cancellations, which currently remain largely unexpected - let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled()); - - // check if we really need to be downloaded; could have been already downloaded by a - // cancelled previous attempt. - let needs_download = self - .needs_download() - .await - .map_err(DownloadError::PreStatFailed); - - let needs_download = match needs_download { - Ok(reason) => reason, - Err(e) => { - scopeguard::ScopeGuard::into_inner(init_cancelled); - return Err(e); - } - }; - - let Some(reason) = needs_download else { - scopeguard::ScopeGuard::into_inner(init_cancelled); - - // the file is present locally, probably by a previous but cancelled call to - // get_or_maybe_download. alternatively we might be running without remote storage. - LAYER_IMPL_METRICS.inc_init_needed_no_download(); - - let res = self.initialize_after_layer_is_on_disk(next_version, permit, false); - return Ok(res); - }; - - if let NeedsDownload::NotFile(ft) = reason { - scopeguard::ScopeGuard::into_inner(init_cancelled); - return Err(DownloadError::NotFile(ft)); - } - - // only reset this after we've decided we really need to download. otherwise it'd - // be impossible to mark cancelled downloads for eviction, like one could imagine - // we would like to do for prefetching which was not needed. - self.wanted_evicted.store(false, Ordering::Release); - - if timeline.remote_client.as_ref().is_none() { - scopeguard::ScopeGuard::into_inner(init_cancelled); - return Err(DownloadError::NoRemoteStorage); - } - - if let Some(ctx) = ctx { - let res = self.check_expected_download(ctx); - if let Err(e) = res { - scopeguard::ScopeGuard::into_inner(init_cancelled); - return Err(e); - } - } - - if !allow_download { - // this does look weird, but for LayerInner the "downloading" means also changing - // internal once related state ... - scopeguard::ScopeGuard::into_inner(init_cancelled); - return Err(DownloadError::DownloadRequired); - } - tracing::info!(%reason, "downloading on-demand"); - let permit = self.spawn_download_and_wait(timeline, permit).await; - + let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled()); + let res = self.download_init_and_wait(timeline, permit).await?; scopeguard::ScopeGuard::into_inner(init_cancelled); - - let permit = permit?; - - let res = self.initialize_after_layer_is_on_disk(next_version, permit, true); Ok(res) } .instrument(tracing::info_span!("get_or_maybe_download", layer=%self)) @@ -857,11 +835,11 @@ impl LayerInner { } /// Actual download, at most one is executed at the time. - async fn spawn_download_and_wait( + async fn download_init_and_wait( self: &Arc, timeline: Arc, permit: heavier_once_cell::InitPermit, - ) -> Result { + ) -> Result, DownloadError> { debug_assert_current_span_has_tenant_and_timeline_id(); let (tx, rx) = tokio::sync::oneshot::channel(); @@ -873,66 +851,24 @@ impl LayerInner { .enter() .map_err(|_| DownloadError::DownloadCancelled)?; - tokio::task::spawn(async move { - + tokio::task::spawn( + async move { let _guard = guard; - let client = timeline - .remote_client - .as_ref() - .expect("checked above with have_remote_client"); + drop(this.status.send(Status::Downloaded)); - let result = client.download_layer_file( - &this.desc.filename(), - &this.metadata(), - &timeline.cancel - ) - .await; + let res = this.download_and_init(timeline, permit).await; - let result = match result { - Ok(size) => { - timeline.metrics.resident_physical_size_add(size); - Ok(()) - } - Err(e) => { - let consecutive_failures = - this.consecutive_failures.fetch_add(1, Ordering::Relaxed); - - let backoff = utils::backoff::exponential_backoff_duration_seconds( - consecutive_failures.min(u32::MAX as usize) as u32, - 1.5, - 60.0, - ); - - let backoff = std::time::Duration::from_secs_f64(backoff); - - tokio::select! { - _ = tokio::time::sleep(backoff) => {}, - _ = timeline.cancel.cancelled() => {}, - }; - - Err(e) - } - }; - - if let Err(res) = tx.send((result, permit)) { + if let Err(res) = tx.send(res) { match res { - (Ok(()), _) => { - // our caller is cancellation safe so this is fine; if someone - // else requests the layer, they'll find it already downloaded. - // - // See counter [`LayerImplMetrics::inc_init_needed_no_download`] - // - // FIXME(#6028): however, could be that we should consider marking the - // layer for eviction? alas, cannot: because only DownloadedLayer will - // handle that. - }, - (Err(e), _) => { - // our caller is cancellation safe, but we might be racing with - // another attempt to initialize. before we have cancellation - // token support: these attempts should converge regardless of - // their completion order. - tracing::error!("layer file download failed, and additionally failed to communicate this to caller: {e:?}"); + Ok(_res) => { + tracing::debug!("layer initialized, but caller has been cancelled"); + LAYER_IMPL_METRICS.inc_init_completed_without_requester(); + } + Err(e) => { + tracing::info!( + "layer file download failed, and caller has been cancelled: {e:?}" + ); LAYER_IMPL_METRICS.inc_download_failed_without_requester(); } } @@ -942,41 +878,100 @@ impl LayerInner { ); match rx.await { - Ok((Ok(()), permit)) => { - if let Some(reason) = self - .needs_download() - .await - .map_err(DownloadError::PostStatFailed)? - { - // this is really a bug in needs_download or remote timeline client - panic!("post-condition failed: needs_download returned {reason:?}"); - } - - self.consecutive_failures.store(0, Ordering::Relaxed); - tracing::info!(size=%self.desc.file_size, "on-demand download successful"); - - Ok(permit) - } - Ok((Err(e), _permit)) => { + Ok(Ok(res)) => Ok(res), + Ok(Err(e)) => { // sleep already happened in the spawned task, if it was not cancelled - let consecutive_failures = self.consecutive_failures.load(Ordering::Relaxed); - match e.downcast_ref::() { // If the download failed due to its cancellation token, // propagate the cancellation error upstream. Some(remote_storage::DownloadError::Cancelled) => { Err(DownloadError::DownloadCancelled) } - _ => { - tracing::error!(consecutive_failures, "layer file download failed: {e:#}"); - Err(DownloadError::DownloadFailed) - } + _ => Err(DownloadError::DownloadFailed), } } Err(_gone) => Err(DownloadError::DownloadCancelled), } } + async fn download_and_init( + self: &Arc, + timeline: Arc, + permit: heavier_once_cell::InitPermit, + ) -> anyhow::Result> { + let client = timeline + .remote_client + .as_ref() + .expect("checked before download_init_and_wait"); + + let result = client + .download_layer_file(&self.desc.filename(), &self.metadata(), &timeline.cancel) + .await; + + match result { + Ok(size) => { + assert_eq!(size, self.desc.file_size); + + match self.needs_download().await { + Ok(Some(reason)) => { + // this is really a bug in needs_download or remote timeline client + panic!("post-condition failed: needs_download returned {reason:?}"); + } + Ok(None) => { + // as expected + } + Err(e) => { + panic!("post-condition failed: needs_download errored: {e:?}"); + } + } + + tracing::info!(size=%self.desc.file_size, "on-demand download successful"); + timeline + .metrics + .resident_physical_size_add(self.desc.file_size); + self.consecutive_failures.store(0, Ordering::Relaxed); + + let since_last_eviction = self + .last_evicted_at + .lock() + .unwrap() + .take() + .map(|ts| ts.elapsed()); + if let Some(since_last_eviction) = since_last_eviction { + LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction); + } + + self.access_stats.record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::ResidenceChange, + ); + + Ok(self.initialize_after_layer_is_on_disk(permit)) + } + Err(e) => { + let consecutive_failures = + 1 + self.consecutive_failures.fetch_add(1, Ordering::Relaxed); + + tracing::error!(consecutive_failures, "layer file download failed: {e:#}"); + + let backoff = utils::backoff::exponential_backoff_duration_seconds( + consecutive_failures.min(u32::MAX as usize) as u32, + 1.5, + 60.0, + ); + + let backoff = std::time::Duration::from_secs_f64(backoff); + + tokio::select! { + _ = tokio::time::sleep(backoff) => {}, + _ = timeline.cancel.cancelled() => {}, + }; + + Err(e) + } + } + } + /// Initializes the `Self::inner` to a "resident" state. /// /// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download` @@ -986,25 +981,22 @@ impl LayerInner { /// changes are made before we can write to the OnceCell in non-cancellable fashion. fn initialize_after_layer_is_on_disk( self: &Arc, - next_version: usize, permit: heavier_once_cell::InitPermit, - downloaded: bool, ) -> Arc { debug_assert_current_span_has_tenant_and_timeline_id(); - if downloaded { - let since_last_eviction = self - .last_evicted_at - .lock() - .unwrap() - .take() - .map(|ts| ts.elapsed()); - if let Some(since_last_eviction) = since_last_eviction { - // FIXME: this will not always be recorded correctly until #6028 (the no - // download needed branch above) - LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction); - } - } + // disable any scheduled but not yet running eviction deletions for this + let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed); + + // only reset this after we've decided we really need to download. otherwise it'd + // be impossible to mark cancelled downloads for eviction, like one could imagine + // we would like to do for prefetching which was not needed. + self.wanted_evicted.store(false, Ordering::Release); + + // re-send the notification we've already sent when we started to download, just so + // evict_and_wait does not need to wait for the download to complete. note that this is + // sent when initializing after finding the file on the disk. + drop(self.status.send(Status::Downloaded)); let res = Arc::new(DownloadedLayer { owner: Arc::downgrade(self), @@ -1012,15 +1004,9 @@ impl LayerInner { version: next_version, }); - // FIXME: this might now be double-accounted for !downloaded - self.access_stats.record_residence_event( - LayerResidenceStatus::Resident, - LayerResidenceEventReason::ResidenceChange, - ); - let waiters = self.inner.initializer_count(); if waiters > 0 { - tracing::info!(waiters, "completing the on-demand download for other tasks"); + tracing::info!(waiters, "completing layer init for other tasks"); } let value = ResidentOrWantedEvicted::Resident(res.clone()); @@ -1268,8 +1254,6 @@ pub(crate) enum DownloadError { DownloadCancelled, #[error("pre-condition: stat before download failed")] PreStatFailed(#[source] std::io::Error), - #[error("post-condition: stat after download failed")] - PostStatFailed(#[source] std::io::Error), } #[derive(Debug, PartialEq)] @@ -1694,6 +1678,12 @@ impl LayerImplMetrics { self.rare_counters[RareEvent::RemoveOnDropFailed].inc(); } + /// Expected rare just as cancellations are rare, but we could have cancellations separate from + /// the single caller which can start the download, so use this counter to separte them. + fn inc_init_completed_without_requester(&self) { + self.rare_counters[RareEvent::InitCompletedWithoutRequester].inc(); + } + /// Expected rare because cancellations are unexpected, and failures are unexpected fn inc_download_failed_without_requester(&self) { self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc(); @@ -1778,6 +1768,7 @@ impl DeleteFailed { #[derive(enum_map::Enum)] enum RareEvent { RemoveOnDropFailed, + InitCompletedWithoutRequester, DownloadFailedWithoutRequester, UpgradedWantedEvicted, InitWithoutDownload, @@ -1791,6 +1782,7 @@ impl RareEvent { match self { RemoveOnDropFailed => "remove_on_drop_failed", + InitCompletedWithoutRequester => "init_completed_without", DownloadFailedWithoutRequester => "download_failed_without", UpgradedWantedEvicted => "raced_wanted_evicted", InitWithoutDownload => "init_needed_no_download",