wip: LayerE

This commit is contained in:
Joonas Koivunen
2023-08-08 18:52:16 +03:00
parent 6afeb3c6f7
commit 235a8cbd28

View File

@@ -12,6 +12,7 @@ use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Key;
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use anyhow::Result;
use bytes::Bytes;
use enum_map::EnumMap;
@@ -23,9 +24,11 @@ use pageserver_api::models::{
};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
use tracing::Instrument;
use utils::history_buffer::HistoryBufferWithDropCounter;
use utils::rate_limit::RateLimit;
@@ -41,6 +44,10 @@ pub use inmemory_layer::InMemoryLayer;
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use remote_layer::RemoteLayer;
use self::delta_layer::DeltaEntry;
use super::remote_timeline_client::RemoteTimelineClient;
use super::Timeline;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
T: PartialOrd<T>,
@@ -321,6 +328,885 @@ impl LayerAccessStats {
}
}
/// The download-ness ([`DownloadedLayer`]) can be either resident or wanted evicted.
///
/// However when we want something evicted, we cannot evict it right away as there might be current
/// reads happening on it. It has been for example searched from [`LayerMap`] but not yet
/// [`Layer::get_value_reconstruct_data`].
///
/// [`LayerMap`]: crate::tenant::layer_map::LayerMap
enum ResidentOrWantedEvicted {
Resident(Arc<DownloadedLayer>),
WantedEvicted(Weak<DownloadedLayer>),
}
impl ResidentOrWantedEvicted {
fn get(&self) -> Option<Arc<DownloadedLayer>> {
match self {
ResidentOrWantedEvicted::Resident(strong) => Some(strong.clone()),
ResidentOrWantedEvicted::WantedEvicted(weak) => weak.upgrade(),
}
}
/// When eviction is first requested, drop down to holding a [`Weak`].
///
/// Returns `true` if this was the first time eviction was requested.
fn downgrade(&mut self) -> &Weak<DownloadedLayer> {
let _was_first = match self {
ResidentOrWantedEvicted::Resident(strong) => {
let weak = Arc::downgrade(strong);
*self = ResidentOrWantedEvicted::WantedEvicted(weak);
// returning the weak is not useful, because the drop could had already ran with
// the replacement above, and that will take care of cleaning the Option we are in
true
}
ResidentOrWantedEvicted::WantedEvicted(_) => false,
};
match self {
ResidentOrWantedEvicted::WantedEvicted(ref weak) => weak,
_ => unreachable!("just wrote wanted evicted"),
}
}
}
// TODO:
// - internal arc, because I've now worked away majority of external wrapping
// - load time api which checks that files are present, fixmes in load time, remote timeline
// client tests
pub(crate) struct LayerE {
// do we really need this?
conf: &'static PageServerConf,
path: PathBuf,
desc: PersistentLayerDesc,
/// Should this be weak? This is probably a runtime cycle which leaks Timelines on
/// detaches.
timeline: Weak<Timeline>,
access_stats: LayerAccessStats,
/// This is a mutex, because we want to be able to
/// - `Option::take(&mut self)` to drop the Arc allocation
/// - `ResidentDeltaLayer::downgrade(&mut self)`
inner: tokio::sync::Mutex<Option<ResidentOrWantedEvicted>>,
/// Do we want to garbage collect this when `LayerE` is dropped, where garbage collection
/// means:
/// - schedule remote deletion
/// - instant local deletion
wanted_garbage_collected: AtomicBool,
/// Accessed using `Ordering::Acquire` or `Ordering::Release` to have happens before together
/// to allow wait-less `evict`
///
/// FIXME: this is likely bogus assumption, there is still time for us to set the flag in
/// `evict` after the task holding the lock has made the check and is dropping the mutex guard.
///
/// However eviction will try to evict this again, so maybe it's fine?
wanted_evicted: AtomicBool,
/// Version is to make sure we will in fact only evict a file if no new guard has been created
/// for it.
version: AtomicUsize,
have_remote_client: bool,
}
impl std::fmt::Display for LayerE {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}
impl AsLayerDesc for LayerE {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
}
}
impl Drop for LayerE {
fn drop(&mut self) {
if !*self.wanted_garbage_collected.get_mut() {
// should we try to evict if the last wish was for eviction?
// feels like there's some hazard of overcrowding near shutdown near by, but we don't
// run drops during shutdown (yet)
return;
}
let span = tracing::info_span!(parent: None, "layer_drop", tenant_id = %self.layer_desc().tenant_id, timeline_id = %self.layer_desc().timeline_id, layer = %self);
// SEMITODO: yes, this is sync, could spawn as well..
let _g = span.entered();
if let Err(e) = std::fs::remove_file(&self.path) {
tracing::error!(layer = %self, "failed to remove garbage collected layer: {e}");
} else if let Some(timeline) = self.timeline.upgrade() {
timeline
.metrics
.resident_physical_size_gauge
.sub(self.layer_desc().file_size);
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res =
remote_client.schedule_layer_file_deletion(&[self.layer_desc().filename()]);
if let Err(e) = res {
if !timeline.is_active() {
// downgrade the warning to info maybe?
}
tracing::warn!(layer=%self, "scheduling deletion on drop failed: {e:#}");
}
}
} else {
// no need to nag that timeline is gone
}
}
}
impl LayerE {
pub(crate) fn new(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
filename: &LayerFileName,
file_size: u64,
access_stats: LayerAccessStats,
) -> LayerE {
let desc = PersistentLayerDesc::from_filename(
timeline.tenant_id,
timeline.timeline_id,
filename.clone(),
file_size,
);
let path = conf
.timeline_path(&desc.tenant_id, &desc.timeline_id)
.join(desc.filename().to_string());
LayerE {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
access_stats,
wanted_garbage_collected: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner: Default::default(),
version: AtomicUsize::new(0),
}
}
pub(crate) fn for_written(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
desc: PersistentLayerDesc,
) -> anyhow::Result<ResidentLayer> {
let path = conf
.timeline_path(&desc.tenant_id, &desc.timeline_id)
.join(desc.filename().to_string());
let mut resident = None;
let outer = Arc::new_cyclic(|owner| {
let inner = Arc::new(DownloadedLayer {
owner: owner.clone(),
kind: tokio::sync::OnceCell::default(),
});
resident = Some(inner.clone());
LayerE {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
wanted_garbage_collected: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner: tokio::sync::Mutex::new(Some(ResidentOrWantedEvicted::Resident(inner))),
version: AtomicUsize::new(0),
}
});
// FIXME: ugly, but if we don't do this check here, any error will pop up at read time
// but we cannot check it because DeltaLayerWriter and ImageLayerWriter create the
// instances *before* renaming the file to final destination
// anyhow::ensure!(
// outer.needs_download_blocking()?.is_none(),
// "should not need downloading if it was just written"
// );
// FIXME: because we can now do garbage collection on drop, should we mark these files as
// garbage collected until they get really get added to LayerMap? consider that files are
// written out to disk, fsynced, renamed by `{Delta,Image}LayerWriter`, then waiting for
// remaining files to be generated (compaction, create_image_layers) before being added to
// LayerMap. We could panic or just error out during that time, even for unrelated reasons,
// but the files would be left.
Ok(ResidentLayer {
_downloaded: resident.expect("just wrote Some"),
owner: outer,
})
}
/// Evict the the layer file as soon as possible, but then allow redownloads to happen.
pub(crate) async fn evict(
&self,
_: &RemoteTimelineClient,
) -> Result<bool, super::timeline::EvictionError> {
assert!(
self.have_remote_client,
"refusing to evict without a remote timeline client"
);
self.wanted_evicted.store(true, Ordering::Release);
let Ok(mut guard) = self.inner.try_lock() else {
// we don't need to wait around if there is a download ongoing, because that might reset the wanted_evicted
// however it's also possible that we are present and just accessed by someone else.
return Ok(false);
};
if let Some(either) = guard.as_mut() {
// now, this might immediatedly cause the drop fn to run, but that'll only act on
// background
let weak = either.downgrade();
let right_away = weak.upgrade().is_none();
Ok(right_away)
} else {
// already evicted; the wanted_evicted will be reset by next download
Err(super::timeline::EvictionError::FileNotFound)
}
}
/// Delete the layer file when the `self` gets dropped, also schedule a remote index upload
/// then perhaps.
pub(crate) fn garbage_collect(&self) {
self.wanted_garbage_collected.store(true, Ordering::Release);
}
pub(crate) async fn get_value_reconstruct_data(
self: &Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let layer = self.get_or_download(Some(ctx)).await?;
self.access_stats
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
layer
.get_value_reconstruct_data(key, lsn_range, reconstruct_data)
.await
}
pub(crate) async fn load_keys(
self: &Arc<Self>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<DeltaEntry<ResidentDeltaLayer>>> {
let layer = self.get_or_download(Some(ctx)).await?;
self.access_stats
.record_access(LayerAccessKind::KeyIter, ctx);
layer.load_keys().await
}
/// Creates a guard object which prohibit evicting this layer as long as the value is kept
/// around.
pub(crate) async fn guard_against_eviction(
self: &Arc<Self>,
allow_download: bool,
) -> anyhow::Result<ResidentLayer> {
let downloaded = if !allow_download {
self.get()
.await
.ok_or_else(|| anyhow::anyhow!("layer {self} is not downloaded"))
} else {
self.get_or_download(None).await
}?;
Ok(ResidentLayer {
_downloaded: downloaded,
owner: self.clone(),
})
}
async fn get(&self) -> Option<Arc<DownloadedLayer>> {
let mut locked = self.inner.lock().await;
Self::get_or_apply_evictedness(&mut locked, &self.wanted_evicted)
}
fn get_or_apply_evictedness(
guard: &mut tokio::sync::MutexGuard<'_, Option<ResidentOrWantedEvicted>>,
wanted_evicted: &AtomicBool,
) -> Option<Arc<DownloadedLayer>> {
if let Some(x) = &mut **guard {
let ret = x.get();
if let Some(won) = ret {
// there are no guarantees that we will always get to observe a concurrent call
// to evict
if wanted_evicted.load(Ordering::Acquire) {
x.downgrade();
}
return Some(won);
}
}
None
}
/// Cancellation safe.
pub(crate) async fn get_or_download(
self: &Arc<Self>,
ctx: Option<&RequestContext>,
) -> anyhow::Result<Arc<DownloadedLayer>> {
let mut locked = self.inner.lock().await;
if let Some(strong) = Self::get_or_apply_evictedness(&mut locked, &self.wanted_evicted) {
return Ok(strong);
}
if let Some(ctx) = ctx {
use crate::context::DownloadBehavior::*;
let b = ctx.download_behavior();
match b {
Download => {}
Warn | Error => {
warn!(
"unexpectedly on-demand downloading remote layer {self} for task kind {:?}",
ctx.task_kind()
);
crate::metrics::UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
let really_error = matches!(b, Error)
&& !self.conf.ondemand_download_behavior_treat_error_as_warn;
if really_error {
// originally this returned
// return Err(PageReconstructError::NeedsDownload(
// TenantTimelineId::new(self.tenant_id, self.timeline_id),
// remote_layer.filename(),
// ))
//
// this check is only probablistic, seems like flakyness footgun
anyhow::bail!("refusing to download layer {self} due to RequestContext")
}
}
}
}
// disable any scheduled but not yet running eviction deletions for this
self.version.fetch_add(1, Ordering::Relaxed);
// what to do if we have a concurrent eviction request when we are downloading? eviction
// api's use ResidentLayer, so evict could be moved there, or we just reset the state here.
self.wanted_evicted.store(false, Ordering::Release);
// drop the old one, we only held the weak or it was had not been initialized ever
locked.take();
// technically the mutex could be dropped here and it does seem extra not to have Option
// here
let Some(timeline) = self.timeline.upgrade() else { anyhow::bail!("timeline has gone already") };
let task_name = format!("download layer {}", self);
let can_ever_evict = timeline.remote_client.as_ref().is_some();
let needs_download = self
.needs_download()
.await
.context("check if layer file is present")?;
if let Some(reason) = needs_download {
if !can_ever_evict {
anyhow::bail!("refusing to attempt downloading {self} because no remote timeline client, reason: {reason}")
};
if self.wanted_garbage_collected.load(Ordering::Acquire) {
// it will fail because we should had already scheduled a delete and an
// index update
tracing::info!(%reason, "downloading a wanted garbage collected layer, this might fail");
// FIXME: we probably do not gc delete until the file goes away...? unsure
} else {
tracing::debug!(%reason, "downloading layer");
}
let (tx, rx) = tokio::sync::oneshot::channel();
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
// block tenant::mgr::remove_tenant_from_memory.
let this = self.clone();
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::RemoteDownloadTask,
Some(self.desc.tenant_id),
Some(self.desc.timeline_id),
&task_name,
false,
async move {
let client = timeline
.remote_client
.as_ref()
.expect("checked above with can_ever_evict");
let result = client
.download_layer_file(
&this.desc.filename(),
&crate::tenant::remote_timeline_client::index::LayerFileMetadata::new(
this.desc.file_size,
),
)
.await;
match result {
Ok(size) => {
timeline.metrics.resident_physical_size_gauge.add(size);
let _ = tx.send(());
}
Err(e) => {
// TODO: the temp file might still be around, metrics might be off
tracing::error!("layer file download failed: {e:?}",);
}
}
Ok(())
}
.in_current_span(),
);
if rx.await.is_err() {
return Err(anyhow::anyhow!("downloading failed, possibly for shutdown"));
}
// FIXME: we need backoff here so never spiral to download loop
anyhow::ensure!(
self.needs_download()
.await
.context("test if downloading is still needed")?
.is_none(),
"post-condition for downloading: no longer needs downloading"
);
} else {
// the file is present locally and we could even be running without remote
// storage
}
// the assumption is that we own the layer residentness, no operator should go in
// and delete random files. this would be evident when trying to access the file
// Nth time (N>1) while having the VirtualFile evicted in between.
//
// we could support this by looping on NotFound from the layer access methods, but
// it's difficult to implement this so that the operator does not delete
// not-yet-uploaded files.
let res = Arc::new(DownloadedLayer {
owner: Arc::downgrade(self),
kind: tokio::sync::OnceCell::default(),
});
*locked = Some(if self.wanted_evicted.load(Ordering::Acquire) {
// because we reset wanted_evictness near beginning, this means when we were downloading someone
// wanted to evict this layer.
//
// perhaps the evict should only possible via ResidentLayer because this makes my head
// spin. the caller of this function will still get the proper `Arc<DownloadedLayer>`.
//
// the risk is that eviction becomes too flaky.
ResidentOrWantedEvicted::WantedEvicted(Arc::downgrade(&res))
} else {
ResidentOrWantedEvicted::Resident(res.clone())
});
Ok(res)
}
pub(crate) fn local_path(&self) -> &std::path::Path {
// maybe it does make sense to have this or maybe not
&self.path
}
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match tokio::fs::metadata(self.local_path()).await {
Ok(m) => Ok(self.is_file_present_and_good_size(&m)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
Err(e) => Err(e),
}
}
pub(crate) fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match self.local_path().metadata() {
Ok(m) => Ok(self.is_file_present_and_good_size(&m)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
Err(e) => Err(e),
}
}
fn is_file_present_and_good_size(&self, m: &std::fs::Metadata) -> Option<NeedsDownload> {
// in future, this should include sha2-256 the file, hopefully rarely, because info uses
// this as well
if !m.is_file() {
Some(NeedsDownload::NotFile)
} else if m.len() != self.desc.file_size {
Some(NeedsDownload::WrongSize {
actual: m.len(),
expected: self.desc.file_size,
})
} else {
None
}
}
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.desc.filename().file_name();
let remote = self
.needs_download_blocking()
.map(|maybe| maybe.is_some())
.unwrap_or(false);
let access_stats = self.access_stats.as_api_model(reset);
if self.desc.is_delta {
let lsn_range = &self.desc.lsn_range;
HistoricLayerInfo::Delta {
layer_file_name,
layer_file_size: self.desc.file_size,
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote,
access_stats,
}
} else {
let lsn = self.desc.image_layer_lsn();
HistoricLayerInfo::Image {
layer_file_name,
layer_file_size: self.desc.file_size,
lsn_start: lsn,
remote,
access_stats,
}
}
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
/// Our resident layer has been dropped, we might hold the lock elsewhere.
fn on_drop(self: Arc<LayerE>) {
let gc = self.wanted_garbage_collected.load(Ordering::Acquire);
let evict = self.wanted_evicted.load(Ordering::Acquire);
let can_evict = self.have_remote_client;
if gc {
// do nothing now, only when the whole layer is dropped. gc will end up dropping the
// whole layer, in case there is no reference cycle.
} else if can_evict && evict {
// we can remove this right now, but ... we really should not block or do anything.
// spawn a task which first does a version check, and that version is also incremented
// on get_or_download, so we will not collide?
let version = self.version.load(Ordering::Relaxed);
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_id, timeline_id = %self.desc.timeline_id, layer=%self);
// downgrade in case there's a queue backing up, or we are just tearing stuff down, and
// would soon delete anyways.
let this = Arc::downgrade(&self);
drop(self);
let eviction = {
let span = tracing::info_span!(parent: span.clone(), "blocking");
async move {
// the layer is already gone, don't do anything. LayerE drop has already ran.
let Some(this) = this.upgrade() else { return; };
// deleted or detached timeline, don't do anything.
let Some(timeline) = this.timeline.upgrade() else { return; };
let mut guard = this.inner.lock().await;
// relaxed ordering: we dont have any other atomics pending
if version != this.version.load(Ordering::Relaxed) {
// downloadness-state has advanced, we might no longer be the latest eviction
// work; don't do anything.
return;
}
// free the DownloadedLayer allocation
let taken = guard.take();
assert!(matches!(taken, None | Some(ResidentOrWantedEvicted::WantedEvicted(_))), "this is what the version is supposed to guard against but we could just undo it and remove version?");
if !this.wanted_evicted.load(Ordering::Acquire) {
// if there's already interest, should we just early exit? this is not
// currently *cleared* on interest, maybe it shouldn't?
// FIXME: wanted_evicted cannot be unset right now
return;
}
let path = this.path.to_owned();
let capture_mtime_and_delete = tokio::task::spawn_blocking({
let span = span.clone();
move || {
let _e = span.entered();
// FIXME: we can now initialize the mtime during first get_or_download,
// and track that in-memory for the following? does that help?
let m = path.metadata()?;
let local_layer_mtime = m.modified()?;
std::fs::remove_file(&path)?;
Ok::<_, std::io::Error>(local_layer_mtime)
}
});
match capture_mtime_and_delete.await {
Ok(Ok(local_layer_mtime)) => {
let duration =
std::time::SystemTime::now().duration_since(local_layer_mtime);
match duration {
Ok(elapsed) => {
timeline
.metrics
.evictions_with_low_residence_duration
.read()
.unwrap()
.observe(elapsed);
tracing::info!(
residence_millis = elapsed.as_millis(),
"evicted layer after known residence period"
);
}
Err(_) => {
tracing::info!("evicted layer after unknown residence period");
}
}
timeline
.metrics
.resident_physical_size_gauge
.sub(this.desc.file_size);
}
Ok(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => {
tracing::info!("failed to evict file from disk, it was already gone");
}
Ok(Err(e)) => {
tracing::warn!("failed to evict file from disk: {e:#}");
}
Err(je) if je.is_cancelled() => unreachable!("unsupported"),
Err(je) if je.is_panic() => { /* already logged */ }
Err(je) => {
tracing::warn!(error = ?je, "unexpected join_error while evicting the file")
}
}
}
}
.instrument(span);
crate::task_mgr::BACKGROUND_RUNTIME.spawn(eviction);
}
}
}
pub(crate) enum NeedsDownload {
NotFound,
NotFile,
WrongSize { actual: u64, expected: u64 },
}
impl std::fmt::Display for NeedsDownload {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NeedsDownload::NotFound => write!(f, "file was not found"),
NeedsDownload::NotFile => write!(f, "path is not a file"),
NeedsDownload::WrongSize { actual, expected } => {
write!(f, "file size mismatch {actual} vs. {expected}")
}
}
}
}
impl NeedsDownload {
pub(crate) fn is_not_found(&self) -> bool {
matches!(self, NeedsDownload::NotFound)
}
pub(crate) fn actual_size(&self) -> Option<u64> {
match self {
NeedsDownload::WrongSize { actual, .. } => Some(*actual),
_ => None,
}
}
}
/// Holds both Arc requriring that both components stay resident while holding this alive and no evictions
/// or garbage collection happens.
#[derive(Clone)]
pub(crate) struct ResidentLayer {
// field order matters: we want the downloaded layer to be dropped before owner, so that ... at
// least this is how the code expects it right now. The added spawn carrying a weak should
// protect us, but it's theoretically possible for that spawn to keep the LayerE alive and
// evict before garbage_collect.
_downloaded: Arc<DownloadedLayer>,
owner: Arc<LayerE>,
}
impl std::fmt::Display for ResidentLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.owner)
}
}
impl ResidentLayer {
pub(crate) fn local_path(&self) -> &std::path::Path {
&self.owner.path
}
}
impl AsLayerDesc for ResidentLayer {
fn layer_desc(&self) -> &PersistentLayerDesc {
self.owner.layer_desc()
}
}
impl AsRef<Arc<LayerE>> for ResidentLayer {
fn as_ref(&self) -> &Arc<LayerE> {
&self.owner
}
}
/// Allow slimming down if we don't want the `2*usize` with eviction candidates?
impl From<ResidentLayer> for Arc<LayerE> {
fn from(value: ResidentLayer) -> Self {
value.owner
}
}
impl std::ops::Deref for ResidentLayer {
type Target = LayerE;
fn deref(&self) -> &Self::Target {
&self.owner
}
}
#[derive(Debug, thiserror::Error)]
#[error("Layer has been removed from LayerMap already")]
pub(crate) struct RemovedFromLayerMap;
/// Holds the actual downloaded layer, and handles evicting the file on drop.
pub(crate) struct DownloadedLayer {
owner: Weak<LayerE>,
kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
}
impl Drop for DownloadedLayer {
fn drop(&mut self) {
if let Some(owner) = self.owner.upgrade() {
owner.on_drop();
} else {
// no need to do anything, we are shutting down
}
}
}
impl DownloadedLayer {
async fn get(&self) -> anyhow::Result<&LayerKind> {
self.kind
.get_or_init(|| async {
let Some(owner) = self.owner.upgrade() else {
anyhow::bail!("Cannot init, the layer has already been dropped");
};
// there is nothing async here, but it should be async
if owner.desc.is_delta {
let summary = Some(delta_layer::Summary::expected(
owner.desc.tenant_id,
owner.desc.timeline_id,
owner.desc.key_range.clone(),
owner.desc.lsn_range.clone(),
));
delta_layer::DeltaLayerInner::load(&owner.path, summary).map(LayerKind::Delta)
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
owner.desc.tenant_id,
owner.desc.timeline_id,
owner.desc.key_range.clone(),
lsn,
));
image_layer::ImageLayerInner::load(&owner.path, lsn, summary)
.map(LayerKind::Image)
}
// this should be a permanent failure
.context("load layer")
})
.await
.as_ref()
.map_err(|e| {
// errors are not clonabled, cannot but stringify
anyhow::anyhow!("layer loading failed: {e:#}")
})
}
async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
) -> anyhow::Result<ValueReconstructResult> {
use LayerKind::*;
// FIXME: some asserts are left behind in DeltaLayer, ImageLayer
match self.get().await? {
Delta(d) => {
d.get_value_reconstruct_data(key, lsn_range, reconstruct_data)
.await
}
Image(i) => i.get_value_reconstruct_data(key, reconstruct_data).await,
}
}
/// Loads all keys stored in the layer. Returns key, lsn and value size.
async fn load_keys(self: &Arc<Self>) -> anyhow::Result<Vec<DeltaEntry<ResidentDeltaLayer>>> {
use LayerKind::*;
match self.get().await? {
Delta(_) => {
let resident = ResidentDeltaLayer(self.clone());
delta_layer::DeltaLayerInner::load_keys(&resident)
.await
.context("Layer index is corrupted")
}
Image(_) => anyhow::bail!("cannot load_keys on a image layer"),
}
}
}
#[derive(Clone)]
pub(crate) struct ResidentDeltaLayer(Arc<DownloadedLayer>);
impl AsRef<delta_layer::DeltaLayerInner> for ResidentDeltaLayer {
fn as_ref(&self) -> &delta_layer::DeltaLayerInner {
use LayerKind::*;
let kind = self
.0
.kind
.get()
.expect("ResidentDeltaLayer must not be created before the delta is init");
match kind {
Ok(Delta(ref d)) => d,
Err(_) => unreachable!("ResidentDeltaLayer must not be created for failed loads"),
_ => unreachable!("checked before creating ResidentDeltaLayer"),
}
}
}
/// Wrapper around an actual layer implementation.
#[derive(Debug)]
enum LayerKind {
Delta(delta_layer::DeltaLayerInner),
Image(image_layer::ImageLayerInner),
}
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
/// required by [`LayerMap`](super::layer_map::LayerMap).
///