From 7ea593db2292324e136d3325cd96217c9d652395 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 12 Feb 2024 17:13:35 +0200 Subject: [PATCH] refactor(LayerManager): resident layers query (#6634) Refactor out layer accesses so that we can have easy access to resident layers, which are needed for number of cases instead of layers for eviction. Simplifies the heatmap building by only using Layers, not RemoteTimelineClient. Cc: #5331 --- .../src/tenant/remote_timeline_client.rs | 17 ---- pageserver/src/tenant/storage_layer.rs | 8 +- pageserver/src/tenant/storage_layer/layer.rs | 4 - pageserver/src/tenant/timeline.rs | 97 ++++++------------- .../src/tenant/timeline/eviction_task.rs | 7 +- .../src/tenant/timeline/layer_manager.rs | 45 ++++++--- 6 files changed, 74 insertions(+), 104 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index e17dea01a8..483f53d5c8 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1700,23 +1700,6 @@ impl RemoteTimelineClient { } } } - - pub(crate) fn get_layers_metadata( - &self, - layers: Vec, - ) -> anyhow::Result>> { - let q = self.upload_queue.lock().unwrap(); - let q = match &*q { - UploadQueue::Stopped(_) | UploadQueue::Uninitialized => { - anyhow::bail!("queue is in state {}", q.as_str()) - } - UploadQueue::Initialized(inner) => inner, - }; - - let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned()); - - Ok(decorated.collect()) - } } pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath { diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6e9a4932d8..2d92baccbe 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -257,6 +257,12 @@ impl LayerAccessStats { ret } + /// Get the latest access timestamp, falling back to latest residence event, further falling + /// back to `SystemTime::now` for a usable timestamp for eviction. + pub(crate) fn latest_activity_or_now(&self) -> SystemTime { + self.latest_activity().unwrap_or_else(SystemTime::now) + } + /// Get the latest access timestamp, falling back to latest residence event. /// /// This function can only return `None` if there has not yet been a call to the @@ -271,7 +277,7 @@ impl LayerAccessStats { /// that that type can only be produced by inserting into the layer map. /// /// [`record_residence_event`]: Self::record_residence_event - pub(crate) fn latest_activity(&self) -> Option { + fn latest_activity(&self) -> Option { let locked = self.0.lock().unwrap(); let inner = &locked.for_eviction_policy; match inner.last_accesses.recent() { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index dd9de99477..bfcc031863 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1413,10 +1413,6 @@ impl ResidentLayer { &self.owner.0.path } - pub(crate) fn access_stats(&self) -> &LayerAccessStats { - self.owner.access_stats() - } - pub(crate) fn metadata(&self) -> LayerFileMetadata { self.owner.metadata() } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 74676277d5..625be7a644 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -12,6 +12,7 @@ use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use enumset::EnumSet; use fail::fail_point; +use futures::stream::StreamExt; use itertools::Itertools; use pageserver_api::{ keyspace::{key_range_size, KeySpaceAccum}, @@ -105,7 +106,7 @@ use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::config::TenantConf; -use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart}; +use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; @@ -1458,7 +1459,7 @@ impl Timeline { generation, shard_identity, pg_version, - layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())), + layers: Default::default(), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -2283,45 +2284,28 @@ impl Timeline { /// should treat this as a cue to simply skip doing any heatmap uploading /// for this timeline. pub(crate) async fn generate_heatmap(&self) -> Option { - let eviction_info = self.get_local_layers_for_disk_usage_eviction().await; + // no point in heatmaps without remote client + let _remote_client = self.remote_client.as_ref()?; - let remote_client = match &self.remote_client { - Some(c) => c, - None => return None, - }; + if !self.is_active() { + return None; + } - let layer_file_names = eviction_info - .resident_layers - .iter() - .map(|l| l.layer.get_name()) - .collect::>(); + let guard = self.layers.read().await; - let decorated = match remote_client.get_layers_metadata(layer_file_names) { - Ok(d) => d, - Err(_) => { - // Getting metadata only fails on Timeline in bad state. - return None; - } - }; + let resident = guard.resident_layers().map(|layer| { + let last_activity_ts = layer.access_stats().latest_activity_or_now(); - let heatmap_layers = std::iter::zip( - eviction_info.resident_layers.into_iter(), - decorated.into_iter(), - ) - .filter_map(|(layer, remote_info)| { - remote_info.map(|remote_info| { - HeatMapLayer::new( - layer.layer.get_name(), - IndexLayerMetadata::from(remote_info), - layer.last_activity_ts, - ) - }) + HeatMapLayer::new( + layer.layer_desc().filename(), + layer.metadata().into(), + last_activity_ts, + ) }); - Some(HeatMapTimeline::new( - self.timeline_id, - heatmap_layers.collect(), - )) + let layers = resident.collect().await; + + Some(HeatMapTimeline::new(self.timeline_id, layers)) } } @@ -4662,41 +4646,24 @@ impl Timeline { /// Returns non-remote layers for eviction. pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { let guard = self.layers.read().await; - let layers = guard.layer_map(); - let mut max_layer_size: Option = None; - let mut resident_layers = Vec::new(); - for l in layers.iter_historic_layers() { - let file_size = l.file_size(); - max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); + let resident_layers = guard + .resident_layers() + .map(|layer| { + let file_size = layer.layer_desc().file_size; + max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); - let l = guard.get_from_desc(&l); + let last_activity_ts = layer.access_stats().latest_activity_or_now(); - let l = match l.keep_resident().await { - Ok(Some(l)) => l, - Ok(None) => continue, - Err(e) => { - // these should not happen, but we cannot make them statically impossible right - // now. - tracing::warn!(layer=%l, "failed to keep the layer resident: {e:#}"); - continue; + EvictionCandidate { + layer: layer.into(), + last_activity_ts, + relative_last_activity: finite_f32::FiniteF32::ZERO, } - }; - - let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| { - // We only use this fallback if there's an implementation error. - // `latest_activity` already does rate-limited warn!() log. - debug!(layer=%l, "last_activity returns None, using SystemTime::now"); - SystemTime::now() - }); - - resident_layers.push(EvictionCandidate { - layer: l.drop_eviction_guard().into(), - last_activity_ts, - relative_last_activity: finite_f32::FiniteF32::ZERO, - }); - } + }) + .collect() + .await; DiskUsageEvictionInfo { max_layer_size, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 9bdd52e809..d87f78e35f 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -239,12 +239,7 @@ impl Timeline { } }; - let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| { - // We only use this fallback if there's an implementation error. - // `latest_activity` already does rate-limited warn!() log. - debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now"); - SystemTime::now() - }); + let last_activity_ts = hist_layer.access_stats().latest_activity_or_now(); let no_activity_for = match now.duration_since(last_activity_ts) { Ok(d) => d, diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index e38f5be209..ebcdcfdb4d 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -1,4 +1,5 @@ use anyhow::{bail, ensure, Context, Result}; +use futures::StreamExt; use pageserver_api::shard::TenantShardId; use std::{collections::HashMap, sync::Arc}; use tracing::trace; @@ -20,19 +21,13 @@ use crate::{ }; /// Provides semantic APIs to manipulate the layer map. +#[derive(Default)] pub(crate) struct LayerManager { layer_map: LayerMap, layer_fmgr: LayerFileManager, } impl LayerManager { - pub(crate) fn create() -> Self { - Self { - layer_map: LayerMap::default(), - layer_fmgr: LayerFileManager::new(), - } - } - pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer { self.layer_fmgr.get_from_desc(desc) } @@ -246,6 +241,32 @@ impl LayerManager { layer.delete_on_drop(); } + pub(crate) fn resident_layers(&self) -> impl futures::stream::Stream + '_ { + // for small layer maps, we most likely have all resident, but for larger more are likely + // to be evicted assuming lots of layers correlated with longer lifespan. + + let layers = self + .layer_map() + .iter_historic_layers() + .map(|desc| self.get_from_desc(&desc)); + + let layers = futures::stream::iter(layers); + + layers.filter_map(|layer| async move { + // TODO(#6028): this query does not really need to see the ResidentLayer + match layer.keep_resident().await { + Ok(Some(layer)) => Some(layer.drop_eviction_guard()), + Ok(None) => None, + Err(e) => { + // these should not happen, but we cannot make them statically impossible right + // now. + tracing::warn!(%layer, "failed to keep the layer resident: {e:#}"); + None + } + } + }) + } + pub(crate) fn contains(&self, layer: &Layer) -> bool { self.layer_fmgr.contains(layer) } @@ -253,6 +274,12 @@ impl LayerManager { pub(crate) struct LayerFileManager(HashMap); +impl Default for LayerFileManager { + fn default() -> Self { + Self(HashMap::default()) + } +} + impl LayerFileManager { fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T { // The assumption for the `expect()` is that all code maintains the following invariant: @@ -275,10 +302,6 @@ impl LayerFileManager { self.0.contains_key(&layer.layer_desc().key()) } - pub(crate) fn new() -> Self { - Self(HashMap::new()) - } - pub(crate) fn remove(&mut self, layer: &T) { let present = self.0.remove(&layer.layer_desc().key()); if present.is_none() && cfg!(debug_assertions) {