mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
refactor: split guard_against_eviction into three
- download - keep_resident - download_and_keep_resident No need to bool enum.
This commit is contained in:
@@ -216,19 +216,35 @@ impl Layer {
|
||||
|
||||
/// Download the layer if evicted.
|
||||
///
|
||||
/// Will not error when it is already downloaded.
|
||||
pub(crate) async fn get_or_download(&self) -> anyhow::Result<()> {
|
||||
/// Will not error when the layer is already downloaded.
|
||||
pub(crate) async fn download(&self) -> anyhow::Result<()> {
|
||||
self.0.get_or_maybe_download(true, None).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates a guard object which prohibit evicting this layer as long as the value is kept
|
||||
/// around.
|
||||
pub(crate) async fn guard_against_eviction(
|
||||
&self,
|
||||
allow_download: bool,
|
||||
) -> anyhow::Result<ResidentLayer> {
|
||||
let downloaded = self.0.get_or_maybe_download(allow_download, None).await?;
|
||||
/// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
|
||||
/// while the guard exists.
|
||||
///
|
||||
/// Returns None if the layer is currently evicted.
|
||||
pub(crate) async fn keep_resident(&self) -> anyhow::Result<Option<ResidentLayer>> {
|
||||
let downloaded = match self.0.get_or_maybe_download(false, None).await {
|
||||
Ok(d) => d,
|
||||
// technically there are a lot of possible errors, but in practice it should only be
|
||||
// DownloadRequired which is tripped up. could work to improve this situation
|
||||
// statically later.
|
||||
Err(DownloadError::DownloadRequired) => return Ok(None),
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
Ok(Some(ResidentLayer {
|
||||
downloaded,
|
||||
owner: self.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
/// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
|
||||
pub(crate) async fn download_and_keep_resident(&self) -> anyhow::Result<ResidentLayer> {
|
||||
let downloaded = self.0.get_or_maybe_download(true, None).await?;
|
||||
|
||||
Ok(ResidentLayer {
|
||||
downloaded,
|
||||
|
||||
@@ -965,7 +965,7 @@ impl Timeline {
|
||||
return Ok(Some(false));
|
||||
}
|
||||
|
||||
layer.guard_against_eviction(true).await?;
|
||||
layer.download().await?;
|
||||
|
||||
Ok(Some(true))
|
||||
}
|
||||
@@ -977,7 +977,7 @@ impl Timeline {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let Ok(local_layer) = local_layer.guard_against_eviction(false).await else { return Ok(Some(false)); };
|
||||
let Some(local_layer) = local_layer.keep_resident().await? else { return Ok(Some(false)); };
|
||||
|
||||
let local_layer: Layer = local_layer.into();
|
||||
|
||||
@@ -2935,7 +2935,6 @@ impl Timeline {
|
||||
//
|
||||
// This failpoint is a superset of both of the cases.
|
||||
if cfg!(feature = "testing") {
|
||||
// FIXME: utils does not depend on `fail` so there's no non-macro answer to this
|
||||
let active = (|| {
|
||||
::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
|
||||
false
|
||||
@@ -2947,7 +2946,7 @@ impl Timeline {
|
||||
// we are just faking these layers as being produced again for this failpoint
|
||||
new_layers.push(
|
||||
delta
|
||||
.guard_against_eviction(true)
|
||||
.download_and_keep_resident()
|
||||
.await
|
||||
.context("download layer for failpoint")?,
|
||||
);
|
||||
@@ -2980,14 +2979,17 @@ impl Timeline {
|
||||
let first_level0_delta = level0_deltas_iter.next().unwrap();
|
||||
let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
|
||||
let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
|
||||
deltas_to_compact.push(first_level0_delta.guard_against_eviction(true).await?);
|
||||
|
||||
// FIXME: downloading while holding layer_removal_cs is not great, but we will remove that
|
||||
// soon
|
||||
deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
|
||||
for l in level0_deltas_iter {
|
||||
let lsn_range = &l.layer_desc().lsn_range;
|
||||
|
||||
if lsn_range.start != prev_lsn_end {
|
||||
break;
|
||||
}
|
||||
deltas_to_compact.push(l.guard_against_eviction(true).await?);
|
||||
deltas_to_compact.push(l.download_and_keep_resident().await?);
|
||||
prev_lsn_end = lsn_range.end;
|
||||
}
|
||||
let lsn_range = Range {
|
||||
@@ -3951,7 +3953,7 @@ impl Timeline {
|
||||
|
||||
js.spawn(
|
||||
async move {
|
||||
let res = next.get_or_download().await;
|
||||
let res = next.download().await;
|
||||
(next, res)
|
||||
}
|
||||
.instrument(span),
|
||||
@@ -4049,7 +4051,16 @@ impl Timeline {
|
||||
|
||||
let l = guard.get_from_desc(&l);
|
||||
|
||||
let Ok(l) = l.guard_against_eviction(false).await else { continue; };
|
||||
let l = match l.keep_resident().await {
|
||||
Ok(Some(l)) => l,
|
||||
Ok(None) => continue,
|
||||
Err(e) => {
|
||||
// these should not happen, but we cannot make them statically impossible right
|
||||
// now.
|
||||
tracing::warn!(layer=%l, "failed to keep the layer resident: {e:#}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
|
||||
// We only use this fallback if there's an implementation error.
|
||||
@@ -4218,8 +4229,12 @@ mod tests {
|
||||
.expect("just configured this");
|
||||
|
||||
let layer = find_some_layer(&timeline).await;
|
||||
let layer = layer.guard_against_eviction(false).await.unwrap();
|
||||
let layer = layer.drop_eviction_guard();
|
||||
let layer = layer
|
||||
.keep_resident()
|
||||
.await
|
||||
.expect("no download => no downloading errors")
|
||||
.expect("should had been resident")
|
||||
.drop_eviction_guard();
|
||||
|
||||
let cancel = tokio_util::sync::CancellationToken::new();
|
||||
let batch = [layer];
|
||||
@@ -4245,7 +4260,8 @@ mod tests {
|
||||
|
||||
let (first, second) = (only_one(first), only_one(second));
|
||||
|
||||
batch[0].guard_against_eviction(false).await.unwrap_err();
|
||||
let res = batch[0].keep_resident().await;
|
||||
assert!(matches!(res, Ok(None)), "{res:?}");
|
||||
|
||||
match (first, second) {
|
||||
(Ok(()), Ok(())) => {
|
||||
|
||||
@@ -200,9 +200,16 @@ impl Timeline {
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
let hist_layer = guard.get_from_desc(&hist_layer);
|
||||
|
||||
// funny: this is the best way to get local layers is to lock them into
|
||||
// memory for the duration of eviction
|
||||
let Ok(guard) = hist_layer.guard_against_eviction(false).await else { continue; };
|
||||
let guard = match hist_layer.keep_resident().await {
|
||||
Ok(Some(l)) => l,
|
||||
Ok(None) => continue,
|
||||
Err(e) => {
|
||||
// these should not happen, but we cannot make them statically impossible right
|
||||
// now.
|
||||
tracing::warn!(layer=%hist_layer, "failed to keep the layer resident: {e:#}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
|
||||
// We only use this fallback if there's an implementation error.
|
||||
|
||||
@@ -283,7 +283,7 @@ impl LayerManager {
|
||||
// map index without actually rebuilding the index.
|
||||
updates.remove_historic(desc);
|
||||
mapping.remove(&layer);
|
||||
layer.garbage_collect();
|
||||
layer.garbage_collect_on_drop();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user