diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 12de754c2b..d0dcd76192 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -62,7 +62,7 @@ use crate::{ task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ self, - storage_layer::{AsLayerDesc, LayerE}, + storage_layer::{AsLayerDesc, Layer}, timeline::EvictionError, Timeline, }, @@ -481,7 +481,7 @@ pub async fn disk_usage_eviction_task_iteration_impl( #[derive(Clone)] struct EvictionCandidate { timeline: Arc, - layer: Arc, + layer: Layer, last_activity_ts: SystemTime, } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 166100b2bf..ccaaa30ba5 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -606,7 +606,7 @@ impl RemoteTimelineClient { let upload_queue = guard.initialized_mut()?; // FIXME: we might be still including no longer existing files in the index_part because - // that consistency is built on strings and gentleman agreements, not Weak which + // that consistency is built on strings and gentleman agreements, not WeakLayer which // could be upgraded at the time of rendering of index_part. upload_queue .latest_files @@ -1365,7 +1365,7 @@ mod tests { context::RequestContext, tenant::{ harness::{TenantHarness, TIMELINE_ID}, - storage_layer::{LayerE, PersistentLayerDesc}, + storage_layer::{Layer, PersistentLayerDesc}, Tenant, Timeline, }, DEFAULT_PG_VERSION, @@ -1541,7 +1541,7 @@ mod tests { std::fs::write(timeline_path.join(filename.file_name()), content).unwrap(); } - let layer_file_1 = LayerE::for_written( + let layer_file_1 = Layer::for_written( harness.conf, &timeline, PersistentLayerDesc::from_filename( @@ -1556,7 +1556,7 @@ mod tests { // FIXME: need that api for local files assert!(layer_file_1.needs_download_blocking().unwrap().is_none()); - let layer_file_2 = LayerE::for_written( + let layer_file_2 = Layer::for_written( harness.conf, &timeline, PersistentLayerDesc::from_filename( @@ -1569,7 +1569,7 @@ mod tests { .unwrap(); assert!(layer_file_2.needs_download_blocking().unwrap().is_none()); - let layer_file_3 = LayerE::for_written( + let layer_file_3 = Layer::for_written( harness.conf, &timeline, PersistentLayerDesc::from_filename( @@ -1721,7 +1721,7 @@ mod tests { ) .unwrap(); - let layer_file_1 = LayerE::for_written( + let layer_file_1 = Layer::for_written( harness.conf, &timeline, PersistentLayerDesc::from_filename( diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 80aa47d2dd..da3e91c68a 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -81,7 +81,7 @@ pub struct ValueReconstructState { pub img: Option<(Lsn, Bytes)>, } -/// Return value from [`LayerE::get_value_reconstruct_data`] +/// Return value from [`Layer::get_value_reconstruct_data`] #[derive(Clone, Copy, Debug)] pub enum ValueReconstructResult { /// Got all the data needed to reconstruct the requested page @@ -312,7 +312,7 @@ impl LayerAccessStats { /// /// However when we want something evicted, we cannot evict it right away as there might be current /// reads happening on it. It has been for example searched from [`LayerMap`] but not yet -/// [`LayerE::get_value_reconstruct_data`]. +/// [`Layer::get_value_reconstruct_data`]. /// /// [`LayerMap`]: crate::tenant::layer_map::LayerMap enum ResidentOrWantedEvicted { @@ -363,9 +363,266 @@ impl ResidentOrWantedEvicted { /// LSN. /// /// This type models the on-disk layers, which can be evicted and on-demand downloaded. +#[derive(Clone)] +pub(crate) struct Layer(Arc); + +impl std::fmt::Display for Layer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.layer_desc().short_id()) + } +} + +impl std::fmt::Debug for Layer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self) + } +} + +impl AsLayerDesc for Layer { + fn layer_desc(&self) -> &PersistentLayerDesc { + self.0.layer_desc() + } +} + +impl Layer { + pub(crate) fn for_evicted( + conf: &'static PageServerConf, + timeline: &Arc, + file_name: LayerFileName, + metadata: LayerFileMetadata, + ) -> Self { + let path = conf + .timeline_path(&timeline.tenant_id, &timeline.timeline_id) + .join(file_name.file_name()); + + let desc = PersistentLayerDesc::from_filename( + timeline.tenant_id, + timeline.timeline_id, + file_name, + metadata.file_size(), + ); + + let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted); + + let outer = Arc::new(LayerInner { + conf, + path, + desc, + timeline: Arc::downgrade(timeline), + access_stats, + inner: heavier_once_cell::OnceCell::default(), + wanted_garbage_collected: AtomicBool::default(), + wanted_evicted: AtomicBool::default(), + version: AtomicUsize::default(), + have_remote_client: timeline.remote_client.is_some(), + status: tokio::sync::broadcast::channel(1).0, + }); + + debug_assert!(outer.needs_download_blocking().unwrap().is_some()); + + Layer(outer) + } + + pub(crate) fn for_resident( + conf: &'static PageServerConf, + timeline: &Arc, + file_name: LayerFileName, + metadata: LayerFileMetadata, + ) -> ResidentLayer { + let path = conf + .timeline_path(&timeline.tenant_id, &timeline.timeline_id) + .join(file_name.file_name()); + + let desc = PersistentLayerDesc::from_filename( + timeline.tenant_id, + timeline.timeline_id, + file_name, + metadata.file_size(), + ); + + let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident); + + let mut resident = None; + + let outer = Arc::new_cyclic(|owner| { + let inner = Arc::new(DownloadedLayer { + owner: owner.clone(), + kind: tokio::sync::OnceCell::default(), + }); + resident = Some(inner.clone()); + LayerInner { + conf, + path, + desc, + timeline: Arc::downgrade(timeline), + have_remote_client: timeline.remote_client.is_some(), + access_stats, + wanted_garbage_collected: AtomicBool::new(false), + wanted_evicted: AtomicBool::new(false), + inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)), + version: AtomicUsize::new(0), + status: tokio::sync::broadcast::channel(1).0, + } + }); + + debug_assert!(outer.needs_download_blocking().unwrap().is_none()); + + let downloaded = resident.expect("just initialized"); + + ResidentLayer { + downloaded, + owner: Layer(outer), + } + } + + pub(crate) fn for_written( + conf: &'static PageServerConf, + timeline: &Arc, + desc: PersistentLayerDesc, + ) -> anyhow::Result { + let path = conf + .timeline_path(&desc.tenant_id, &desc.timeline_id) + .join(desc.filename().to_string()); + + let mut resident = None; + + let outer = Arc::new_cyclic(|owner| { + let inner = Arc::new(DownloadedLayer { + owner: owner.clone(), + kind: tokio::sync::OnceCell::default(), + }); + resident = Some(inner.clone()); + LayerInner { + conf, + path, + desc, + timeline: Arc::downgrade(timeline), + have_remote_client: timeline.remote_client.is_some(), + access_stats: LayerAccessStats::empty_will_record_residence_event_later(), + wanted_garbage_collected: AtomicBool::new(false), + wanted_evicted: AtomicBool::new(false), + inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)), + version: AtomicUsize::new(0), + status: tokio::sync::broadcast::channel(1).0, + } + }); + + // FIXME: ugly, but if we don't do this check here, any error will pop up at read time + // but we cannot check it because DeltaLayerWriter and ImageLayerWriter create the + // instances *before* renaming the file to final destination + // anyhow::ensure!( + // outer.needs_download_blocking()?.is_none(), + // "should not need downloading if it was just written" + // ); + + // FIXME: because we can now do garbage collection on drop, should we mark these files as + // garbage collected until they get really get added to LayerMap? consider that files are + // written out to disk, fsynced, renamed by `{Delta,Image}LayerWriter`, then waiting for + // remaining files to be generated (compaction, create_image_layers) before being added to + // LayerMap. We could panic or just error out during that time, even for unrelated reasons, + // but the files would be left. + + Ok(ResidentLayer { + downloaded: resident.expect("just wrote Some"), + owner: Layer(outer), + }) + } + + pub(crate) async fn evict_and_wait( + &self, + rtc: &RemoteTimelineClient, + ) -> Result<(), super::timeline::EvictionError> { + self.0.evict_and_wait(rtc).await + } + + /// Delete the layer file when the `self` gets dropped, also schedule a remote index upload + /// then perhaps. + pub(crate) fn garbage_collect(&self) { + self.0.garbage_collect(); + } + + /// Return data needed to reconstruct given page at LSN. + /// + /// It is up to the caller to collect more data from previous layer and + /// perform WAL redo, if necessary. + /// + /// See PageReconstructResult for possible return values. The collected data + /// is appended to reconstruct_data; the caller should pass an empty struct + /// on first call, or a struct with a cached older image of the page if one + /// is available. If this returns ValueReconstructResult::Continue, look up + /// the predecessor layer and call again with the same 'reconstruct_data' to + /// collect more data. + pub(crate) async fn get_value_reconstruct_data( + &self, + key: Key, + lsn_range: Range, + reconstruct_data: &mut ValueReconstructState, + ctx: &RequestContext, + ) -> anyhow::Result { + use anyhow::ensure; + + let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?; + self.0 + .access_stats + .record_access(LayerAccessKind::GetValueReconstructData, ctx); + + if self.layer_desc().is_delta { + ensure!(lsn_range.start >= self.layer_desc().lsn_range.start); + ensure!(self.layer_desc().key_range.contains(&key)); + } else { + ensure!(self.layer_desc().key_range.contains(&key)); + ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn()); + ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn()); + } + + layer + .get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0) + .await + } + + /// Download the layer if evicted. + /// + /// Will not error when it is already downloaded. + pub(crate) async fn get_or_download(&self) -> anyhow::Result<()> { + self.0.get_or_maybe_download(true, None).await?; + Ok(()) + } + + /// Creates a guard object which prohibit evicting this layer as long as the value is kept + /// around. + pub(crate) async fn guard_against_eviction( + &self, + allow_download: bool, + ) -> anyhow::Result { + let downloaded = self.0.get_or_maybe_download(allow_download, None).await?; + + Ok(ResidentLayer { + downloaded, + owner: self.clone(), + }) + } + + pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { + self.0.info(reset) + } + + pub(crate) fn access_stats(&self) -> &LayerAccessStats { + &self.0.access_stats + } + + pub(crate) fn local_path(&self) -> &std::path::Path { + &self.0.path + } + + #[cfg(test)] + pub(crate) fn needs_download_blocking(&self) -> Result, std::io::Error> { + self.0.needs_download_blocking() + } +} + // TODO: // - internal arc, because I've now worked away majority of external wrapping -pub(crate) struct LayerE { +struct LayerInner { // only needed to check ondemand_download_behavior_treat_error_as_warn conf: &'static PageServerConf, path: PathBuf, @@ -380,7 +637,7 @@ pub(crate) struct LayerE { /// Initialization and deinitialization is done while holding a permit. inner: heavier_once_cell::OnceCell, - /// Do we want to garbage collect this when `LayerE` is dropped, where garbage collection + /// Do we want to garbage collect this when `LayerInner` is dropped, where garbage collection /// means: /// - schedule remote deletion /// - instant local deletion @@ -407,31 +664,25 @@ pub(crate) struct LayerE { status: tokio::sync::broadcast::Sender, } +impl std::fmt::Display for LayerInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.layer_desc().short_id()) + } +} + +impl AsLayerDesc for LayerInner { + fn layer_desc(&self) -> &PersistentLayerDesc { + &self.desc + } +} + #[derive(Debug, Clone, Copy)] enum Status { Evicted, Downloaded, } -impl std::fmt::Display for LayerE { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.layer_desc().short_id()) - } -} - -impl std::fmt::Debug for LayerE { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self) - } -} - -impl AsLayerDesc for LayerE { - fn layer_desc(&self) -> &PersistentLayerDesc { - &self.desc - } -} - -impl Drop for LayerE { +impl Drop for LayerInner { fn drop(&mut self) { if !*self.wanted_garbage_collected.get_mut() { // should we try to evict if the last wish was for eviction? @@ -488,152 +739,13 @@ impl Drop for LayerE { } } -impl LayerE { - pub(crate) fn for_evicted( - conf: &'static PageServerConf, - timeline: &Arc, - file_name: LayerFileName, - metadata: LayerFileMetadata, - ) -> Arc { - let path = conf - .timeline_path(&timeline.tenant_id, &timeline.timeline_id) - .join(file_name.file_name()); - - let desc = PersistentLayerDesc::from_filename( - timeline.tenant_id, - timeline.timeline_id, - file_name, - metadata.file_size(), - ); - - let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted); - - let outer = Arc::new(LayerE { - conf, - path, - desc, - timeline: Arc::downgrade(timeline), - access_stats, - inner: heavier_once_cell::OnceCell::default(), - wanted_garbage_collected: AtomicBool::default(), - wanted_evicted: AtomicBool::default(), - version: AtomicUsize::default(), - have_remote_client: timeline.remote_client.is_some(), - status: tokio::sync::broadcast::channel(1).0, - }); - - debug_assert!(outer.needs_download_blocking().unwrap().is_some()); - - outer - } - - pub(crate) fn for_resident( - conf: &'static PageServerConf, - timeline: &Arc, - file_name: LayerFileName, - metadata: LayerFileMetadata, - ) -> ResidentLayer { - let path = conf - .timeline_path(&timeline.tenant_id, &timeline.timeline_id) - .join(file_name.file_name()); - - let desc = PersistentLayerDesc::from_filename( - timeline.tenant_id, - timeline.timeline_id, - file_name, - metadata.file_size(), - ); - - let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident); - - let mut resident = None; - - let outer = Arc::new_cyclic(|owner| { - let inner = Arc::new(DownloadedLayer { - owner: owner.clone(), - kind: tokio::sync::OnceCell::default(), - }); - resident = Some(inner.clone()); - LayerE { - conf, - path, - desc, - timeline: Arc::downgrade(timeline), - have_remote_client: timeline.remote_client.is_some(), - access_stats, - wanted_garbage_collected: AtomicBool::new(false), - wanted_evicted: AtomicBool::new(false), - inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)), - version: AtomicUsize::new(0), - status: tokio::sync::broadcast::channel(1).0, - } - }); - - debug_assert!(outer.needs_download_blocking().unwrap().is_none()); - - let downloaded = resident.expect("just initialized"); - - ResidentLayer { - downloaded, - owner: outer, - } - } - - pub(crate) fn for_written( - conf: &'static PageServerConf, - timeline: &Arc, - desc: PersistentLayerDesc, - ) -> anyhow::Result { - let path = conf - .timeline_path(&desc.tenant_id, &desc.timeline_id) - .join(desc.filename().to_string()); - - let mut resident = None; - - let outer = Arc::new_cyclic(|owner| { - let inner = Arc::new(DownloadedLayer { - owner: owner.clone(), - kind: tokio::sync::OnceCell::default(), - }); - resident = Some(inner.clone()); - LayerE { - conf, - path, - desc, - timeline: Arc::downgrade(timeline), - have_remote_client: timeline.remote_client.is_some(), - access_stats: LayerAccessStats::empty_will_record_residence_event_later(), - wanted_garbage_collected: AtomicBool::new(false), - wanted_evicted: AtomicBool::new(false), - inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)), - version: AtomicUsize::new(0), - status: tokio::sync::broadcast::channel(1).0, - } - }); - - // FIXME: ugly, but if we don't do this check here, any error will pop up at read time - // but we cannot check it because DeltaLayerWriter and ImageLayerWriter create the - // instances *before* renaming the file to final destination - // anyhow::ensure!( - // outer.needs_download_blocking()?.is_none(), - // "should not need downloading if it was just written" - // ); - - // FIXME: because we can now do garbage collection on drop, should we mark these files as - // garbage collected until they get really get added to LayerMap? consider that files are - // written out to disk, fsynced, renamed by `{Delta,Image}LayerWriter`, then waiting for - // remaining files to be generated (compaction, create_image_layers) before being added to - // LayerMap. We could panic or just error out during that time, even for unrelated reasons, - // but the files would be left. - - Ok(ResidentLayer { - downloaded: resident.expect("just wrote Some"), - owner: outer, - }) +impl LayerInner { + fn garbage_collect(&self) { + self.wanted_garbage_collected.store(true, Ordering::Release); } pub(crate) async fn evict_and_wait( - self: &Arc, + &self, _: &RemoteTimelineClient, ) -> Result<(), super::timeline::EvictionError> { use tokio::sync::broadcast::error::RecvError; @@ -682,69 +794,6 @@ impl LayerE { Self::get_or_apply_evictedness(locked, &self.wanted_evicted) } - /// Delete the layer file when the `self` gets dropped, also schedule a remote index upload - /// then perhaps. - pub(crate) fn garbage_collect(&self) { - self.wanted_garbage_collected.store(true, Ordering::Release); - } - - /// Return data needed to reconstruct given page at LSN. - /// - /// It is up to the caller to collect more data from previous layer and - /// perform WAL redo, if necessary. - /// - /// See PageReconstructResult for possible return values. The collected data - /// is appended to reconstruct_data; the caller should pass an empty struct - /// on first call, or a struct with a cached older image of the page if one - /// is available. If this returns ValueReconstructResult::Continue, look up - /// the predecessor layer and call again with the same 'reconstruct_data' to - /// collect more data. - pub(crate) async fn get_value_reconstruct_data( - self: &Arc, - key: Key, - lsn_range: Range, - reconstruct_data: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> anyhow::Result { - use anyhow::ensure; - - let layer = self.get_or_maybe_download(true, Some(ctx)).await?; - self.access_stats - .record_access(LayerAccessKind::GetValueReconstructData, ctx); - - if self.layer_desc().is_delta { - ensure!(lsn_range.start >= self.layer_desc().lsn_range.start); - ensure!(self.layer_desc().key_range.contains(&key)); - } else { - ensure!(self.layer_desc().key_range.contains(&key)); - ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn()); - ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn()); - } - - layer - .get_value_reconstruct_data(key, lsn_range, reconstruct_data, self) - .await - } - - /// Creates a guard object which prohibit evicting this layer as long as the value is kept - /// around. - pub(crate) async fn guard_against_eviction( - self: &Arc, - allow_download: bool, - ) -> anyhow::Result { - let downloaded = self.get_or_maybe_download(allow_download, None).await?; - - Ok(ResidentLayer { - downloaded, - owner: self.clone(), - }) - } - - pub(crate) async fn get_or_download(self: &Arc) -> anyhow::Result<()> { - self.get_or_maybe_download(true, None).await?; - Ok(()) - } - fn get_or_apply_evictedness( guard: Option>, wanted_evicted: &AtomicBool, @@ -834,7 +883,7 @@ impl LayerE { } if !allow_download { - // this does look weird, but for LayerE the "downloading" means also changing + // this does look weird, but for LayerInner the "downloading" means also changing // internal once related state ... return Err(DownloadError::DownloadRequired); } @@ -845,7 +894,7 @@ impl LayerE { // this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot // block tenant::mgr::remove_tenant_from_memory. - let this = self.clone(); + let this: Arc = self.clone(); crate::task_mgr::spawn( &tokio::runtime::Handle::current(), TaskKind::RemoteDownloadTask, @@ -955,21 +1004,16 @@ impl LayerE { ) } - pub(crate) fn local_path(&self) -> &std::path::Path { - // maybe it does make sense to have this or maybe not - &self.path - } - async fn needs_download(&self) -> Result, std::io::Error> { - match tokio::fs::metadata(self.local_path()).await { + match tokio::fs::metadata(&self.path).await { Ok(m) => Ok(self.is_file_present_and_good_size(&m)), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)), Err(e) => Err(e), } } - pub(crate) fn needs_download_blocking(&self) -> Result, std::io::Error> { - match self.local_path().metadata() { + fn needs_download_blocking(&self) -> Result, std::io::Error> { + match self.path.metadata() { Ok(m) => Ok(self.is_file_present_and_good_size(&m)), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)), Err(e) => Err(e), @@ -991,13 +1035,14 @@ impl LayerE { } } - pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { + fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.desc.filename().file_name(); let remote = self .needs_download_blocking() .map(|maybe| maybe.is_some()) - .unwrap_or(false); + .unwrap_or(true); + let access_stats = self.access_stats.as_api_model(reset); if self.desc.is_delta { @@ -1024,12 +1069,8 @@ impl LayerE { } } - pub(crate) fn access_stats(&self) -> &LayerAccessStats { - &self.access_stats - } - /// Our resident layer has been dropped, we might hold the lock elsewhere. - fn on_drop(self: Arc) { + fn on_drop(self: Arc) { let gc = self.wanted_garbage_collected.load(Ordering::Acquire); let evict = self.wanted_evicted.load(Ordering::Acquire); let can_evict = self.have_remote_client; @@ -1053,7 +1094,7 @@ impl LayerE { let eviction = { let span = tracing::info_span!(parent: span.clone(), "blocking"); async move { - // the layer is already gone, don't do anything. LayerE drop has already ran. + // the layer is already gone, don't do anything. LayerInner drop has already ran. let Some(this) = this.upgrade() else { return; }; // deleted or detached timeline, don't do anything. @@ -1206,7 +1247,7 @@ impl std::fmt::Display for NeedsDownload { /// or garbage collection happens. #[derive(Clone)] pub(crate) struct ResidentLayer { - owner: Arc, + owner: Layer, downloaded: Arc, } @@ -1223,7 +1264,7 @@ impl std::fmt::Debug for ResidentLayer { } impl ResidentLayer { - pub(crate) fn drop_eviction_guard(self) -> Arc { + pub(crate) fn drop_eviction_guard(self) -> Layer { self.into() } @@ -1234,9 +1275,11 @@ impl ResidentLayer { ) -> anyhow::Result>> { use LayerKind::*; - match self.downloaded.get(&self.owner).await? { + let inner = &self.owner.0; + + match self.downloaded.get(inner).await? { Delta(d) => { - self.owner + inner .access_stats .record_access(LayerAccessKind::KeyIter, ctx); @@ -1248,6 +1291,19 @@ impl ResidentLayer { Image(_) => anyhow::bail!("cannot load_keys on a image layer"), } } + + pub(crate) fn local_path(&self) -> &std::path::Path { + &self.owner.0.path + } + + pub(crate) fn access_stats(&self) -> &LayerAccessStats { + self.owner.access_stats() + } + + #[cfg(test)] + pub(crate) fn needs_download_blocking(&self) -> Result, std::io::Error> { + self.owner.needs_download_blocking() + } } impl AsLayerDesc for ResidentLayer { @@ -1256,34 +1312,26 @@ impl AsLayerDesc for ResidentLayer { } } -impl AsRef> for ResidentLayer { - fn as_ref(&self) -> &Arc { +impl AsRef for ResidentLayer { + fn as_ref(&self) -> &Layer { &self.owner } } /// Allow slimming down if we don't want the `2*usize` with eviction candidates? -impl From for Arc { +impl From for Layer { fn from(value: ResidentLayer) -> Self { value.owner } } -impl std::ops::Deref for ResidentLayer { - type Target = LayerE; - - fn deref(&self) -> &Self::Target { - &self.owner - } -} - #[derive(Debug, thiserror::Error)] #[error("Layer has been removed from LayerMap already")] pub(crate) struct RemovedFromLayerMap; /// Holds the actual downloaded layer, and handles evicting the file on drop. pub(crate) struct DownloadedLayer { - owner: Weak, + owner: Weak, kind: tokio::sync::OnceCell>, } @@ -1308,7 +1356,7 @@ impl Drop for DownloadedLayer { } impl DownloadedLayer { - async fn get(&self, owner: &LayerE) -> anyhow::Result<&LayerKind> { + async fn get(&self, owner: &LayerInner) -> anyhow::Result<&LayerKind> { // the owner is required so that we don't have to upgrade the self.owner, which will only // be used on drop. this way, initializing a DownloadedLayer without an owner is statically // impossible, so we can just not worry about it. @@ -1347,7 +1395,7 @@ impl DownloadedLayer { key: Key, lsn_range: Range, reconstruct_data: &mut ValueReconstructState, - owner: &LayerE, + owner: &LayerInner, ) -> anyhow::Result { use LayerKind::*; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index a5819e1d9a..1afdfbd30a 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -34,7 +34,7 @@ use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; -use crate::tenant::storage_layer::{LayerE, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::tenant::Timeline; use crate::virtual_file::VirtualFile; use crate::{walrecord, TEMP_FILE_SUFFIX}; @@ -561,7 +561,7 @@ impl DeltaLayerWriterInner { // fsync the file file.sync_all()?; - let layer = LayerE::for_written(self.conf, timeline, desc)?; + let layer = Layer::for_written(self.conf, timeline, desc)?; // Rename the file to its final name // // Note: This overwrites any existing file. There shouldn't be any. diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 2977828092..c64c114a9c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -59,7 +59,7 @@ use utils::{ }; use super::filename::ImageFileName; -use super::{AsLayerDesc, LayerE, PersistentLayerDesc, ResidentLayer}; +use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer}; /// /// Header stored in the beginning of the file @@ -495,7 +495,7 @@ impl ImageLayerWriterInner { // fsync the file file.sync_all()?; - let layer = LayerE::for_written(self.conf, timeline, desc)?; + let layer = Layer::for_written(self.conf, timeline, desc)?; // Rename the file to its final name // diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7494c14f3b..946643f7f6 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -36,7 +36,7 @@ use crate::context::{ use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ - AsLayerDesc, DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStatsReset, LayerE, + AsLayerDesc, DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, Layer, LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, ValueReconstructState, }; use crate::tenant::timeline::logical_size::CurrentLogicalSize; @@ -978,7 +978,7 @@ impl Timeline { let Ok(local_layer) = local_layer.guard_against_eviction(false).await else { return Ok(Some(false)); }; - let local_layer: Arc = local_layer.into(); + let local_layer: Layer = local_layer.into(); let remote_client = self .remote_client @@ -1001,7 +1001,7 @@ impl Timeline { /// Evict a batch of layers. pub(crate) async fn evict_layers( &self, - layers_to_evict: &[Arc], + layers_to_evict: &[Layer], cancel: &CancellationToken, ) -> anyhow::Result>>> { let remote_client = self @@ -1030,7 +1030,7 @@ impl Timeline { async fn evict_layer_batch( &self, remote_client: &Arc, - layers_to_evict: &[Arc], + layers_to_evict: &[Layer], cancel: &CancellationToken, ) -> anyhow::Result>>> { // ensure that the layers have finished uploading @@ -1510,17 +1510,17 @@ impl Timeline { let layer = match decision { NeedsUpload(m) => { total_physical_size += m.file_size(); - let resident = LayerE::for_resident(conf, &this, name, m.clone()); + let resident = Layer::for_resident(conf, &this, name, m.clone()); let layer = resident.as_ref().clone(); needs_upload.push((resident, m)); layer } UseLocal(m) => { total_physical_size += m.file_size(); - LayerE::for_resident(conf, &this, name, m).drop_eviction_guard() + Layer::for_resident(conf, &this, name, m).drop_eviction_guard() } Evicted(remote) | UseRemote { remote, .. } => { - LayerE::for_evicted(conf, &this, name, remote) + Layer::for_evicted(conf, &this, name, remote) } }; @@ -1882,7 +1882,7 @@ impl Timeline { } } - async fn find_layer(&self, layer_file_name: &str) -> Option> { + async fn find_layer(&self, layer_file_name: &str) -> Option { let guard = self.layers.read().await; for historic_layer in guard.layer_map().iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); @@ -1901,7 +1901,7 @@ trait TraversalLayerExt { fn traversal_id(&self) -> TraversalId; } -impl TraversalLayerExt for Arc { +impl TraversalLayerExt for Layer { fn traversal_id(&self) -> TraversalId { self.local_path().display().to_string() } @@ -2140,7 +2140,7 @@ impl Timeline { result, cont_lsn, Box::new({ - let layer = Arc::clone(&layer); + let layer = layer.to_owned(); move || layer.traversal_id() }), )); @@ -2759,7 +2759,7 @@ impl Timeline { #[derive(Default)] struct CompactLevel0Phase1Result { new_layers: Vec, - deltas_to_compact: Vec>, + deltas_to_compact: Vec, } /// Top-level failure to compact. @@ -3738,7 +3738,7 @@ impl Timeline { let gc_layers = layers_to_remove .iter() .map(|x| guard.get_from_desc(x)) - .collect::>>(); + .collect::>(); result.layers_removed = gc_layers.len() as u64; @@ -4014,7 +4014,7 @@ pub(crate) struct DiskUsageEvictionInfo { } pub(crate) struct LocalLayerInfoForDiskUsageEviction { - pub layer: Arc, + pub layer: Layer, pub last_activity_ts: SystemTime, } @@ -4189,12 +4189,10 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> { #[cfg(test)] mod tests { - use std::sync::Arc; - use utils::{id::TimelineId, lsn::Lsn}; use crate::tenant::{ - harness::TenantHarness, storage_layer::LayerE, timeline::EvictionError, Timeline, + harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline, }; #[tokio::test] @@ -4287,7 +4285,7 @@ mod tests { .expect("no cancellation") } - async fn find_some_layer(timeline: &Timeline) -> Arc { + async fn find_some_layer(timeline: &Timeline) -> Layer { let layers = timeline.layers.read().await; let desc = layers .layer_map() diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index e96cbad802..b15f65c325 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -12,7 +12,7 @@ use crate::{ tenant::{ layer_map::{BatchedUpdates, LayerMap}, storage_layer::{ - AsLayerDesc, InMemoryLayer, LayerE, PersistentLayerDesc, PersistentLayerKey, + AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey, ResidentLayer, }, }, @@ -21,7 +21,7 @@ use crate::{ /// Provides semantic APIs to manipulate the layer map. pub(crate) struct LayerManager { layer_map: LayerMap, - layer_fmgr: LayerFileManager, + layer_fmgr: LayerFileManager, } /// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after @@ -42,7 +42,7 @@ impl LayerManager { } } - pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer { self.layer_fmgr.get_from_desc(desc) } @@ -59,7 +59,7 @@ impl LayerManager { /// 2. next open layer (with disk disk_consistent_lsn LSN) pub(crate) fn initialize_local_layers( &mut self, - on_disk_layers: Vec>, + on_disk_layers: Vec, next_open_layer_at: Lsn, ) { let mut updates = self.layer_map.batch_update(); @@ -211,7 +211,7 @@ impl LayerManager { pub(crate) fn finish_compact_l0( &mut self, layer_removal_cs: &Arc>, - compact_from: Vec>, + compact_from: Vec, compact_to: &[ResidentLayer], metrics: &crate::metrics::TimelineMetrics, ) -> Result<()> { @@ -241,7 +241,7 @@ impl LayerManager { pub(crate) fn finish_gc_timeline( &mut self, layer_removal_cs: &Arc>, - gc_layers: Vec>, + gc_layers: Vec, ) -> Result { let mut updates = self.layer_map.batch_update(); for doomed_layer in gc_layers { @@ -257,9 +257,9 @@ impl LayerManager { /// Helper function to insert a layer into the layer map and file manager. fn insert_historic_layer( - layer: Arc, + layer: Layer, updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, + mapping: &mut LayerFileManager, ) { updates.insert_historic(layer.layer_desc().clone()); mapping.insert(layer); @@ -270,9 +270,9 @@ impl LayerManager { fn delete_historic_layer( // we cannot remove layers otherwise, since gc and compaction will race _layer_removal_cs: &Arc>, - layer: Arc, + layer: Layer, updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, + mapping: &mut LayerFileManager, ) -> anyhow::Result<()> { let desc = layer.layer_desc();