diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index e17dea01a8..483f53d5c8 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1700,23 +1700,6 @@ impl RemoteTimelineClient { } } } - - pub(crate) fn get_layers_metadata( - &self, - layers: Vec, - ) -> anyhow::Result>> { - let q = self.upload_queue.lock().unwrap(); - let q = match &*q { - UploadQueue::Stopped(_) | UploadQueue::Uninitialized => { - anyhow::bail!("queue is in state {}", q.as_str()) - } - UploadQueue::Initialized(inner) => inner, - }; - - let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned()); - - Ok(decorated.collect()) - } } pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath { diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6e9a4932d8..2d92baccbe 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -257,6 +257,12 @@ impl LayerAccessStats { ret } + /// Get the latest access timestamp, falling back to latest residence event, further falling + /// back to `SystemTime::now` for a usable timestamp for eviction. + pub(crate) fn latest_activity_or_now(&self) -> SystemTime { + self.latest_activity().unwrap_or_else(SystemTime::now) + } + /// Get the latest access timestamp, falling back to latest residence event. /// /// This function can only return `None` if there has not yet been a call to the @@ -271,7 +277,7 @@ impl LayerAccessStats { /// that that type can only be produced by inserting into the layer map. /// /// [`record_residence_event`]: Self::record_residence_event - pub(crate) fn latest_activity(&self) -> Option { + fn latest_activity(&self) -> Option { let locked = self.0.lock().unwrap(); let inner = &locked.for_eviction_policy; match inner.last_accesses.recent() { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index dd9de99477..bfcc031863 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1413,10 +1413,6 @@ impl ResidentLayer { &self.owner.0.path } - pub(crate) fn access_stats(&self) -> &LayerAccessStats { - self.owner.access_stats() - } - pub(crate) fn metadata(&self) -> LayerFileMetadata { self.owner.metadata() } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 74676277d5..625be7a644 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -12,6 +12,7 @@ use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use enumset::EnumSet; use fail::fail_point; +use futures::stream::StreamExt; use itertools::Itertools; use pageserver_api::{ keyspace::{key_range_size, KeySpaceAccum}, @@ -105,7 +106,7 @@ use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::config::TenantConf; -use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart}; +use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; @@ -1458,7 +1459,7 @@ impl Timeline { generation, shard_identity, pg_version, - layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())), + layers: Default::default(), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -2283,45 +2284,28 @@ impl Timeline { /// should treat this as a cue to simply skip doing any heatmap uploading /// for this timeline. pub(crate) async fn generate_heatmap(&self) -> Option { - let eviction_info = self.get_local_layers_for_disk_usage_eviction().await; + // no point in heatmaps without remote client + let _remote_client = self.remote_client.as_ref()?; - let remote_client = match &self.remote_client { - Some(c) => c, - None => return None, - }; + if !self.is_active() { + return None; + } - let layer_file_names = eviction_info - .resident_layers - .iter() - .map(|l| l.layer.get_name()) - .collect::>(); + let guard = self.layers.read().await; - let decorated = match remote_client.get_layers_metadata(layer_file_names) { - Ok(d) => d, - Err(_) => { - // Getting metadata only fails on Timeline in bad state. - return None; - } - }; + let resident = guard.resident_layers().map(|layer| { + let last_activity_ts = layer.access_stats().latest_activity_or_now(); - let heatmap_layers = std::iter::zip( - eviction_info.resident_layers.into_iter(), - decorated.into_iter(), - ) - .filter_map(|(layer, remote_info)| { - remote_info.map(|remote_info| { - HeatMapLayer::new( - layer.layer.get_name(), - IndexLayerMetadata::from(remote_info), - layer.last_activity_ts, - ) - }) + HeatMapLayer::new( + layer.layer_desc().filename(), + layer.metadata().into(), + last_activity_ts, + ) }); - Some(HeatMapTimeline::new( - self.timeline_id, - heatmap_layers.collect(), - )) + let layers = resident.collect().await; + + Some(HeatMapTimeline::new(self.timeline_id, layers)) } } @@ -4662,41 +4646,24 @@ impl Timeline { /// Returns non-remote layers for eviction. pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { let guard = self.layers.read().await; - let layers = guard.layer_map(); - let mut max_layer_size: Option = None; - let mut resident_layers = Vec::new(); - for l in layers.iter_historic_layers() { - let file_size = l.file_size(); - max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); + let resident_layers = guard + .resident_layers() + .map(|layer| { + let file_size = layer.layer_desc().file_size; + max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); - let l = guard.get_from_desc(&l); + let last_activity_ts = layer.access_stats().latest_activity_or_now(); - let l = match l.keep_resident().await { - Ok(Some(l)) => l, - Ok(None) => continue, - Err(e) => { - // these should not happen, but we cannot make them statically impossible right - // now. - tracing::warn!(layer=%l, "failed to keep the layer resident: {e:#}"); - continue; + EvictionCandidate { + layer: layer.into(), + last_activity_ts, + relative_last_activity: finite_f32::FiniteF32::ZERO, } - }; - - let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| { - // We only use this fallback if there's an implementation error. - // `latest_activity` already does rate-limited warn!() log. - debug!(layer=%l, "last_activity returns None, using SystemTime::now"); - SystemTime::now() - }); - - resident_layers.push(EvictionCandidate { - layer: l.drop_eviction_guard().into(), - last_activity_ts, - relative_last_activity: finite_f32::FiniteF32::ZERO, - }); - } + }) + .collect() + .await; DiskUsageEvictionInfo { max_layer_size, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 9bdd52e809..d87f78e35f 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -239,12 +239,7 @@ impl Timeline { } }; - let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| { - // We only use this fallback if there's an implementation error. - // `latest_activity` already does rate-limited warn!() log. - debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now"); - SystemTime::now() - }); + let last_activity_ts = hist_layer.access_stats().latest_activity_or_now(); let no_activity_for = match now.duration_since(last_activity_ts) { Ok(d) => d, diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index e38f5be209..ebcdcfdb4d 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -1,4 +1,5 @@ use anyhow::{bail, ensure, Context, Result}; +use futures::StreamExt; use pageserver_api::shard::TenantShardId; use std::{collections::HashMap, sync::Arc}; use tracing::trace; @@ -20,19 +21,13 @@ use crate::{ }; /// Provides semantic APIs to manipulate the layer map. +#[derive(Default)] pub(crate) struct LayerManager { layer_map: LayerMap, layer_fmgr: LayerFileManager, } impl LayerManager { - pub(crate) fn create() -> Self { - Self { - layer_map: LayerMap::default(), - layer_fmgr: LayerFileManager::new(), - } - } - pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer { self.layer_fmgr.get_from_desc(desc) } @@ -246,6 +241,32 @@ impl LayerManager { layer.delete_on_drop(); } + pub(crate) fn resident_layers(&self) -> impl futures::stream::Stream + '_ { + // for small layer maps, we most likely have all resident, but for larger more are likely + // to be evicted assuming lots of layers correlated with longer lifespan. + + let layers = self + .layer_map() + .iter_historic_layers() + .map(|desc| self.get_from_desc(&desc)); + + let layers = futures::stream::iter(layers); + + layers.filter_map(|layer| async move { + // TODO(#6028): this query does not really need to see the ResidentLayer + match layer.keep_resident().await { + Ok(Some(layer)) => Some(layer.drop_eviction_guard()), + Ok(None) => None, + Err(e) => { + // these should not happen, but we cannot make them statically impossible right + // now. + tracing::warn!(%layer, "failed to keep the layer resident: {e:#}"); + None + } + } + }) + } + pub(crate) fn contains(&self, layer: &Layer) -> bool { self.layer_fmgr.contains(layer) } @@ -253,6 +274,12 @@ impl LayerManager { pub(crate) struct LayerFileManager(HashMap); +impl Default for LayerFileManager { + fn default() -> Self { + Self(HashMap::default()) + } +} + impl LayerFileManager { fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T { // The assumption for the `expect()` is that all code maintains the following invariant: @@ -275,10 +302,6 @@ impl LayerFileManager { self.0.contains_key(&layer.layer_desc().key()) } - pub(crate) fn new() -> Self { - Self(HashMap::new()) - } - pub(crate) fn remove(&mut self, layer: &T) { let present = self.0.remove(&layer.layer_desc().key()); if present.is_none() && cfg!(debug_assertions) {