refactor(LayerManager): resident layers query (#6634)

Refactor out layer accesses so that we can have easy access to resident
layers, which are needed for number of cases instead of layers for
eviction. Simplifies the heatmap building by only using Layers, not
RemoteTimelineClient.

Cc: #5331
This commit is contained in:
Joonas Koivunen
2024-02-12 17:13:35 +02:00
committed by GitHub
parent 789a71c4ee
commit 7ea593db22
6 changed files with 74 additions and 104 deletions

View File

@@ -1700,23 +1700,6 @@ impl RemoteTimelineClient {
}
}
}
pub(crate) fn get_layers_metadata(
&self,
layers: Vec<LayerFileName>,
) -> anyhow::Result<Vec<Option<LayerFileMetadata>>> {
let q = self.upload_queue.lock().unwrap();
let q = match &*q {
UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
anyhow::bail!("queue is in state {}", q.as_str())
}
UploadQueue::Initialized(inner) => inner,
};
let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned());
Ok(decorated.collect())
}
}
pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {

View File

@@ -257,6 +257,12 @@ impl LayerAccessStats {
ret
}
/// Get the latest access timestamp, falling back to latest residence event, further falling
/// back to `SystemTime::now` for a usable timestamp for eviction.
pub(crate) fn latest_activity_or_now(&self) -> SystemTime {
self.latest_activity().unwrap_or_else(SystemTime::now)
}
/// Get the latest access timestamp, falling back to latest residence event.
///
/// This function can only return `None` if there has not yet been a call to the
@@ -271,7 +277,7 @@ impl LayerAccessStats {
/// that that type can only be produced by inserting into the layer map.
///
/// [`record_residence_event`]: Self::record_residence_event
pub(crate) fn latest_activity(&self) -> Option<SystemTime> {
fn latest_activity(&self) -> Option<SystemTime> {
let locked = self.0.lock().unwrap();
let inner = &locked.for_eviction_policy;
match inner.last_accesses.recent() {

View File

@@ -1413,10 +1413,6 @@ impl ResidentLayer {
&self.owner.0.path
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
self.owner.access_stats()
}
pub(crate) fn metadata(&self) -> LayerFileMetadata {
self.owner.metadata()
}

View File

@@ -12,6 +12,7 @@ use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use enumset::EnumSet;
use fail::fail_point;
use futures::stream::StreamExt;
use itertools::Itertools;
use pageserver_api::{
keyspace::{key_range_size, KeySpaceAccum},
@@ -105,7 +106,7 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
@@ -1458,7 +1459,7 @@ impl Timeline {
generation,
shard_identity,
pg_version,
layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
layers: Default::default(),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
@@ -2283,45 +2284,28 @@ impl Timeline {
/// should treat this as a cue to simply skip doing any heatmap uploading
/// for this timeline.
pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
let eviction_info = self.get_local_layers_for_disk_usage_eviction().await;
// no point in heatmaps without remote client
let _remote_client = self.remote_client.as_ref()?;
let remote_client = match &self.remote_client {
Some(c) => c,
None => return None,
};
if !self.is_active() {
return None;
}
let layer_file_names = eviction_info
.resident_layers
.iter()
.map(|l| l.layer.get_name())
.collect::<Vec<_>>();
let guard = self.layers.read().await;
let decorated = match remote_client.get_layers_metadata(layer_file_names) {
Ok(d) => d,
Err(_) => {
// Getting metadata only fails on Timeline in bad state.
return None;
}
};
let resident = guard.resident_layers().map(|layer| {
let last_activity_ts = layer.access_stats().latest_activity_or_now();
let heatmap_layers = std::iter::zip(
eviction_info.resident_layers.into_iter(),
decorated.into_iter(),
)
.filter_map(|(layer, remote_info)| {
remote_info.map(|remote_info| {
HeatMapLayer::new(
layer.layer.get_name(),
IndexLayerMetadata::from(remote_info),
layer.last_activity_ts,
)
})
HeatMapLayer::new(
layer.layer_desc().filename(),
layer.metadata().into(),
last_activity_ts,
)
});
Some(HeatMapTimeline::new(
self.timeline_id,
heatmap_layers.collect(),
))
let layers = resident.collect().await;
Some(HeatMapTimeline::new(self.timeline_id, layers))
}
}
@@ -4662,41 +4646,24 @@ impl Timeline {
/// Returns non-remote layers for eviction.
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new();
for l in layers.iter_historic_layers() {
let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
let resident_layers = guard
.resident_layers()
.map(|layer| {
let file_size = layer.layer_desc().file_size;
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
let l = guard.get_from_desc(&l);
let last_activity_ts = layer.access_stats().latest_activity_or_now();
let l = match l.keep_resident().await {
Ok(Some(l)) => l,
Ok(None) => continue,
Err(e) => {
// these should not happen, but we cannot make them statically impossible right
// now.
tracing::warn!(layer=%l, "failed to keep the layer resident: {e:#}");
continue;
EvictionCandidate {
layer: layer.into(),
last_activity_ts,
relative_last_activity: finite_f32::FiniteF32::ZERO,
}
};
let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
// `latest_activity` already does rate-limited warn!() log.
debug!(layer=%l, "last_activity returns None, using SystemTime::now");
SystemTime::now()
});
resident_layers.push(EvictionCandidate {
layer: l.drop_eviction_guard().into(),
last_activity_ts,
relative_last_activity: finite_f32::FiniteF32::ZERO,
});
}
})
.collect()
.await;
DiskUsageEvictionInfo {
max_layer_size,

View File

@@ -239,12 +239,7 @@ impl Timeline {
}
};
let last_activity_ts = hist_layer.access_stats().latest_activity().unwrap_or_else(|| {
// We only use this fallback if there's an implementation error.
// `latest_activity` already does rate-limited warn!() log.
debug!(layer=%hist_layer, "last_activity returns None, using SystemTime::now");
SystemTime::now()
});
let last_activity_ts = hist_layer.access_stats().latest_activity_or_now();
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,

View File

@@ -1,4 +1,5 @@
use anyhow::{bail, ensure, Context, Result};
use futures::StreamExt;
use pageserver_api::shard::TenantShardId;
use std::{collections::HashMap, sync::Arc};
use tracing::trace;
@@ -20,19 +21,13 @@ use crate::{
};
/// Provides semantic APIs to manipulate the layer map.
#[derive(Default)]
pub(crate) struct LayerManager {
layer_map: LayerMap,
layer_fmgr: LayerFileManager<Layer>,
}
impl LayerManager {
pub(crate) fn create() -> Self {
Self {
layer_map: LayerMap::default(),
layer_fmgr: LayerFileManager::new(),
}
}
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.layer_fmgr.get_from_desc(desc)
}
@@ -246,6 +241,32 @@ impl LayerManager {
layer.delete_on_drop();
}
pub(crate) fn resident_layers(&self) -> impl futures::stream::Stream<Item = Layer> + '_ {
// for small layer maps, we most likely have all resident, but for larger more are likely
// to be evicted assuming lots of layers correlated with longer lifespan.
let layers = self
.layer_map()
.iter_historic_layers()
.map(|desc| self.get_from_desc(&desc));
let layers = futures::stream::iter(layers);
layers.filter_map(|layer| async move {
// TODO(#6028): this query does not really need to see the ResidentLayer
match layer.keep_resident().await {
Ok(Some(layer)) => Some(layer.drop_eviction_guard()),
Ok(None) => None,
Err(e) => {
// these should not happen, but we cannot make them statically impossible right
// now.
tracing::warn!(%layer, "failed to keep the layer resident: {e:#}");
None
}
}
})
}
pub(crate) fn contains(&self, layer: &Layer) -> bool {
self.layer_fmgr.contains(layer)
}
@@ -253,6 +274,12 @@ impl LayerManager {
pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
impl<T> Default for LayerFileManager<T> {
fn default() -> Self {
Self(HashMap::default())
}
}
impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
// The assumption for the `expect()` is that all code maintains the following invariant:
@@ -275,10 +302,6 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
self.0.contains_key(&layer.layer_desc().key())
}
pub(crate) fn new() -> Self {
Self(HashMap::new())
}
pub(crate) fn remove(&mut self, layer: &T) {
let present = self.0.remove(&layer.layer_desc().key());
if present.is_none() && cfg!(debug_assertions) {