mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
refactor(layer): use detached init (#7152)
The second part of work towards fixing `Layer::keep_resident` so that it does not need to repair the internal state. #7135 added a nicer API for initialization. This PR uses it to remove a few indentation levels and the loop construction. The next PR #7175 will use the refactorings done in this PR, and always initialize the internal state after a download. Cc: #5331
This commit is contained in:
@@ -702,181 +702,132 @@ impl LayerInner {
|
||||
allow_download: bool,
|
||||
ctx: Option<&RequestContext>,
|
||||
) -> Result<Arc<DownloadedLayer>, DownloadError> {
|
||||
let mut init_permit = None;
|
||||
let (weak, permit) = {
|
||||
let locked = self
|
||||
.inner
|
||||
.get_or_init_detached()
|
||||
.await
|
||||
.map(|mut guard| guard.get_and_upgrade().ok_or(guard));
|
||||
|
||||
loop {
|
||||
let download = move |permit| {
|
||||
async move {
|
||||
// disable any scheduled but not yet running eviction deletions for this
|
||||
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
|
||||
match locked {
|
||||
// this path could had been a RwLock::read
|
||||
Ok(Ok((strong, upgraded))) if !upgraded => return Ok(strong),
|
||||
Ok(Ok((strong, _))) => {
|
||||
// when upgraded back, the Arc<DownloadedLayer> is still available, but
|
||||
// previously a `evict_and_wait` was received.
|
||||
self.wanted_evicted.store(false, Ordering::Relaxed);
|
||||
|
||||
// no need to make the evict_and_wait wait for the actual download to complete
|
||||
// error out any `evict_and_wait`
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
|
||||
let timeline = self
|
||||
.timeline
|
||||
.upgrade()
|
||||
.ok_or_else(|| DownloadError::TimelineShutdown)?;
|
||||
|
||||
// count cancellations, which currently remain largely unexpected
|
||||
let init_cancelled =
|
||||
scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
let can_ever_evict = timeline.remote_client.as_ref().is_some();
|
||||
|
||||
// check if we really need to be downloaded; could have been already downloaded by a
|
||||
// cancelled previous attempt.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
|
||||
let needs_download = match needs_download {
|
||||
Ok(reason) => reason,
|
||||
Err(e) => {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let (permit, downloaded) = if let Some(reason) = needs_download {
|
||||
if let NeedsDownload::NotFile(ft) = reason {
|
||||
return Err(DownloadError::NotFile(ft));
|
||||
}
|
||||
|
||||
// only reset this after we've decided we really need to download. otherwise it'd
|
||||
// be impossible to mark cancelled downloads for eviction, like one could imagine
|
||||
// we would like to do for prefetching which was not needed.
|
||||
self.wanted_evicted.store(false, Ordering::Release);
|
||||
|
||||
if !can_ever_evict {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::NoRemoteStorage);
|
||||
}
|
||||
|
||||
if let Some(ctx) = ctx {
|
||||
let res = self.check_expected_download(ctx);
|
||||
if let Err(e) = res {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
if !allow_download {
|
||||
// this does look weird, but for LayerInner the "downloading" means also changing
|
||||
// internal once related state ...
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let permit = self.spawn_download_and_wait(timeline, permit).await;
|
||||
|
||||
let permit = match permit {
|
||||
Ok(permit) => permit,
|
||||
Err(e) => {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
(permit, true)
|
||||
} else {
|
||||
// the file is present locally, probably by a previous but cancelled call to
|
||||
// get_or_maybe_download. alternatively we might be running without remote storage.
|
||||
LAYER_IMPL_METRICS.inc_init_needed_no_download();
|
||||
|
||||
(permit, false)
|
||||
};
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
if downloaded {
|
||||
let since_last_eviction = self
|
||||
.last_evicted_at
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.map(|ts| ts.elapsed());
|
||||
|
||||
if let Some(since_last_eviction) = since_last_eviction {
|
||||
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
|
||||
}
|
||||
}
|
||||
|
||||
let res = Arc::new(DownloadedLayer {
|
||||
owner: Arc::downgrade(self),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: next_version,
|
||||
});
|
||||
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
|
||||
let waiters = self.inner.initializer_count();
|
||||
if waiters > 0 {
|
||||
tracing::info!(
|
||||
waiters,
|
||||
"completing the on-demand download for other tasks"
|
||||
);
|
||||
}
|
||||
|
||||
Ok((ResidentOrWantedEvicted::Resident(res), permit))
|
||||
}
|
||||
.instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
|
||||
};
|
||||
|
||||
if let Some(init_permit) = init_permit.take() {
|
||||
// use the already held initialization permit because it is impossible to hit the
|
||||
// below paths anymore essentially limiting the max loop iterations to 2.
|
||||
let (value, init_permit) = download(init_permit).await?;
|
||||
let mut guard = self.inner.set(value, init_permit);
|
||||
let (strong, _upgraded) = guard
|
||||
.get_and_upgrade()
|
||||
.expect("init creates strong reference, we held the init permit");
|
||||
return Ok(strong);
|
||||
}
|
||||
|
||||
let (weak, permit) = {
|
||||
let mut locked = self.inner.get_or_init(download).await?;
|
||||
|
||||
if let Some((strong, upgraded)) = locked.get_and_upgrade() {
|
||||
if upgraded {
|
||||
// when upgraded back, the Arc<DownloadedLayer> is still available, but
|
||||
// previously a `evict_and_wait` was received.
|
||||
self.wanted_evicted.store(false, Ordering::Relaxed);
|
||||
|
||||
// error out any `evict_and_wait`
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
LAYER_IMPL_METRICS
|
||||
.inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
|
||||
}
|
||||
LAYER_IMPL_METRICS
|
||||
.inc_eviction_cancelled(EvictionCancelled::UpgradedBackOnAccess);
|
||||
|
||||
return Ok(strong);
|
||||
} else {
|
||||
}
|
||||
Ok(Err(mut guard)) => {
|
||||
// path to here: the evict_blocking is stuck on spawn_blocking queue.
|
||||
//
|
||||
// reset the contents, deactivating the eviction and causing a
|
||||
// EvictionCancelled::LostToDownload or EvictionCancelled::VersionCheckFailed.
|
||||
locked.take_and_deinit()
|
||||
let (weak, permit) = guard.take_and_deinit();
|
||||
(Some(weak), permit)
|
||||
}
|
||||
};
|
||||
|
||||
// unlock first, then drop the weak, but because upgrade failed, we
|
||||
// know it cannot be a problem.
|
||||
Err(permit) => (None, permit),
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(weak) = weak {
|
||||
// only drop the weak after dropping the heavier_once_cell guard
|
||||
assert!(
|
||||
matches!(weak, ResidentOrWantedEvicted::WantedEvicted(..)),
|
||||
"unexpected {weak:?}, ResidentOrWantedEvicted::get_and_upgrade has a bug"
|
||||
);
|
||||
|
||||
init_permit = Some(permit);
|
||||
|
||||
LAYER_IMPL_METRICS.inc_retried_get_or_maybe_download();
|
||||
}
|
||||
|
||||
async move {
|
||||
// disable any scheduled but not yet running eviction deletions for this
|
||||
let next_version = 1 + self.version.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// no need to make the evict_and_wait wait for the actual download to complete
|
||||
drop(self.status.send(Status::Downloaded));
|
||||
|
||||
let timeline = self
|
||||
.timeline
|
||||
.upgrade()
|
||||
.ok_or_else(|| DownloadError::TimelineShutdown)?;
|
||||
|
||||
// count cancellations, which currently remain largely unexpected
|
||||
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
|
||||
|
||||
// check if we really need to be downloaded; could have been already downloaded by a
|
||||
// cancelled previous attempt.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed);
|
||||
|
||||
let needs_download = match needs_download {
|
||||
Ok(reason) => reason,
|
||||
Err(e) => {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let Some(reason) = needs_download else {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
// the file is present locally, probably by a previous but cancelled call to
|
||||
// get_or_maybe_download. alternatively we might be running without remote storage.
|
||||
LAYER_IMPL_METRICS.inc_init_needed_no_download();
|
||||
|
||||
let res = self.initialize_after_layer_is_on_disk(next_version, permit, false);
|
||||
return Ok(res);
|
||||
};
|
||||
|
||||
if let NeedsDownload::NotFile(ft) = reason {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::NotFile(ft));
|
||||
}
|
||||
|
||||
// only reset this after we've decided we really need to download. otherwise it'd
|
||||
// be impossible to mark cancelled downloads for eviction, like one could imagine
|
||||
// we would like to do for prefetching which was not needed.
|
||||
self.wanted_evicted.store(false, Ordering::Release);
|
||||
|
||||
if timeline.remote_client.as_ref().is_none() {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::NoRemoteStorage);
|
||||
}
|
||||
|
||||
if let Some(ctx) = ctx {
|
||||
let res = self.check_expected_download(ctx);
|
||||
if let Err(e) = res {
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
if !allow_download {
|
||||
// this does look weird, but for LayerInner the "downloading" means also changing
|
||||
// internal once related state ...
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
return Err(DownloadError::DownloadRequired);
|
||||
}
|
||||
|
||||
tracing::info!(%reason, "downloading on-demand");
|
||||
|
||||
let permit = self.spawn_download_and_wait(timeline, permit).await;
|
||||
|
||||
scopeguard::ScopeGuard::into_inner(init_cancelled);
|
||||
|
||||
let permit = permit?;
|
||||
|
||||
let res = self.initialize_after_layer_is_on_disk(next_version, permit, true);
|
||||
Ok(res)
|
||||
}
|
||||
.instrument(tracing::info_span!("get_or_maybe_download", layer=%self))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Nag or fail per RequestContext policy
|
||||
@@ -1026,6 +977,59 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// Initializes the `Self::inner` to a "resident" state.
|
||||
///
|
||||
/// Callers are assumed to ensure that the file is actually on disk with `Self::needs_download`
|
||||
/// before calling this method.
|
||||
///
|
||||
/// If this method is ever made async, it needs to be cancellation safe so that no state
|
||||
/// changes are made before we can write to the OnceCell in non-cancellable fashion.
|
||||
fn initialize_after_layer_is_on_disk(
|
||||
self: &Arc<LayerInner>,
|
||||
next_version: usize,
|
||||
permit: heavier_once_cell::InitPermit,
|
||||
downloaded: bool,
|
||||
) -> Arc<DownloadedLayer> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
if downloaded {
|
||||
let since_last_eviction = self
|
||||
.last_evicted_at
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.map(|ts| ts.elapsed());
|
||||
if let Some(since_last_eviction) = since_last_eviction {
|
||||
// FIXME: this will not always be recorded correctly until #6028 (the no
|
||||
// download needed branch above)
|
||||
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
|
||||
}
|
||||
}
|
||||
|
||||
let res = Arc::new(DownloadedLayer {
|
||||
owner: Arc::downgrade(self),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
version: next_version,
|
||||
});
|
||||
|
||||
// FIXME: this might now be double-accounted for !downloaded
|
||||
self.access_stats.record_residence_event(
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
|
||||
let waiters = self.inner.initializer_count();
|
||||
if waiters > 0 {
|
||||
tracing::info!(waiters, "completing the on-demand download for other tasks");
|
||||
}
|
||||
|
||||
let value = ResidentOrWantedEvicted::Resident(res.clone());
|
||||
|
||||
self.inner.set(value, permit);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
match tokio::fs::metadata(&self.path).await {
|
||||
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
|
||||
@@ -1690,11 +1694,6 @@ impl LayerImplMetrics {
|
||||
self.rare_counters[RareEvent::RemoveOnDropFailed].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because requires a race with `evict_blocking` and `get_or_maybe_download`.
|
||||
fn inc_retried_get_or_maybe_download(&self) {
|
||||
self.rare_counters[RareEvent::RetriedGetOrMaybeDownload].inc();
|
||||
}
|
||||
|
||||
/// Expected rare because cancellations are unexpected, and failures are unexpected
|
||||
fn inc_download_failed_without_requester(&self) {
|
||||
self.rare_counters[RareEvent::DownloadFailedWithoutRequester].inc();
|
||||
@@ -1779,7 +1778,6 @@ impl DeleteFailed {
|
||||
#[derive(enum_map::Enum)]
|
||||
enum RareEvent {
|
||||
RemoveOnDropFailed,
|
||||
RetriedGetOrMaybeDownload,
|
||||
DownloadFailedWithoutRequester,
|
||||
UpgradedWantedEvicted,
|
||||
InitWithoutDownload,
|
||||
@@ -1793,7 +1791,6 @@ impl RareEvent {
|
||||
|
||||
match self {
|
||||
RemoveOnDropFailed => "remove_on_drop_failed",
|
||||
RetriedGetOrMaybeDownload => "retried_gomd",
|
||||
DownloadFailedWithoutRequester => "download_failed_without",
|
||||
UpgradedWantedEvicted => "raced_wanted_evicted",
|
||||
InitWithoutDownload => "init_needed_no_download",
|
||||
|
||||
@@ -254,6 +254,8 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
let h = TenantHarness::create("residency_check_while_evict_and_wait_on_clogged_spawn_blocking")
|
||||
.unwrap();
|
||||
let (tenant, ctx) = h.load().await;
|
||||
let span = h.span();
|
||||
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
|
||||
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
|
||||
@@ -292,6 +294,7 @@ async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
|
||||
// because no actual eviction happened, we get to just reinitialize the DownloadedLayer
|
||||
layer
|
||||
.keep_resident()
|
||||
.instrument(download_span)
|
||||
.await
|
||||
.expect("keep_resident should had reinitialized without downloading")
|
||||
.expect("ResidentLayer");
|
||||
|
||||
Reference in New Issue
Block a user