fix: compaction can and should use borrowed DeltaEntrys

otherwise we risk evicting the L0 while we are reading different blobs
out of it.
This commit is contained in:
Joonas Koivunen
2023-08-24 14:14:09 +03:00
parent dc97970215
commit e5d00b6c2a
2 changed files with 42 additions and 22 deletions

View File

@@ -10,6 +10,7 @@ use crate::config::PageServerConf;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Key;
use crate::task_mgr::TaskKind;
use crate::tenant::storage_layer::delta_layer::Ref;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use anyhow::Result;
@@ -563,10 +564,10 @@ impl LayerE {
debug_assert!(outer.needs_download_blocking().unwrap().is_none());
let _downloaded = resident.expect("just initialized");
let downloaded = resident.expect("just initialized");
ResidentLayer {
_downloaded,
downloaded,
owner: outer,
}
}
@@ -619,7 +620,7 @@ impl LayerE {
// but the files would be left.
Ok(ResidentLayer {
_downloaded: resident.expect("just wrote Some"),
downloaded: resident.expect("just wrote Some"),
owner: outer,
})
}
@@ -718,17 +719,6 @@ impl LayerE {
.await
}
pub(crate) async fn load_keys(
self: &Arc<Self>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<DeltaEntry<ResidentDeltaLayer>>> {
let layer = self.get_or_maybe_download(true, Some(ctx)).await?;
self.access_stats
.record_access(LayerAccessKind::KeyIter, ctx);
layer.load_keys().await
}
/// Creates a guard object which prohibit evicting this layer as long as the value is kept
/// around.
pub(crate) async fn guard_against_eviction(
@@ -738,7 +728,7 @@ impl LayerE {
let downloaded = self.get_or_maybe_download(allow_download, None).await?;
Ok(ResidentLayer {
_downloaded: downloaded,
downloaded,
owner: self.clone(),
})
}
@@ -1229,7 +1219,7 @@ impl NeedsDownload {
#[derive(Clone)]
pub(crate) struct ResidentLayer {
owner: Arc<LayerE>,
_downloaded: Arc<DownloadedLayer>,
downloaded: Arc<DownloadedLayer>,
}
impl std::fmt::Display for ResidentLayer {
@@ -1248,6 +1238,31 @@ impl ResidentLayer {
pub(crate) fn drop_eviction_guard(self) -> Arc<LayerE> {
self.into()
}
pub(crate) async fn load_keys(
&self,
ctx: &RequestContext,
) -> anyhow::Result<Vec<DeltaEntry<Ref<&'_ delta_layer::DeltaLayerInner>>>> {
use LayerKind::*;
match self.downloaded.get().await? {
Delta(d) => {
self.owner
.access_stats
.record_access(LayerAccessKind::KeyIter, ctx);
// this is valid because the DownloadedLayer::kind is a OnceCell, not a
// Mutex<OnceCell>, so we cannot go and deinitialize the value with OnceCell::take
// while it's being held.
let resident = Ref(d);
delta_layer::DeltaLayerInner::load_keys(&resident)
.await
.context("Layer index is corrupted")
}
Image(_) => anyhow::bail!("cannot load_keys on a image layer"),
}
}
}
impl AsLayerDesc for ResidentLayer {
@@ -1361,12 +1376,14 @@ impl DownloadedLayer {
}
/// Loads all keys stored in the layer. Returns key, lsn and value size.
async fn load_keys(self: &Arc<Self>) -> anyhow::Result<Vec<DeltaEntry<ResidentDeltaLayer>>> {
pub(crate) async fn load_keys(
&self,
) -> anyhow::Result<Vec<DeltaEntry<Ref<&'_ delta_layer::DeltaLayerInner>>>> {
use LayerKind::*;
match self.get().await? {
Delta(_) => {
let resident = ResidentDeltaLayer(self.clone());
Delta(d) => {
let resident = Ref(d);
delta_layer::DeltaLayerInner::load_keys(&resident)
.await
.context("Layer index is corrupted")

View File

@@ -3027,14 +3027,14 @@ impl Timeline {
let first_level0_delta = level0_deltas_iter.next().unwrap();
let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
deltas_to_compact.push(first_level0_delta.clone());
deltas_to_compact.push(first_level0_delta.guard_against_eviction(true).await?);
for l in level0_deltas_iter {
let lsn_range = &l.layer_desc().lsn_range;
if lsn_range.start != prev_lsn_end {
break;
}
deltas_to_compact.push(l.clone());
deltas_to_compact.push(l.guard_against_eviction(true).await?);
prev_lsn_end = lsn_range.end;
}
let lsn_range = Range {
@@ -3353,7 +3353,10 @@ impl Timeline {
Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
deltas_to_compact: deltas_to_compact
.into_iter()
.map(|x| x.drop_eviction_guard())
.collect::<Vec<_>>(),
})
}