Compare commits

...

10 Commits

Author SHA1 Message Date
John Spray
1e392a00e4 pageserver: add GcInfo::notify_child_keyspace 2024-07-06 19:51:57 +01:00
John Spray
3a6101bd21 pageserver: initialize layer visbility opportunistically 2024-07-06 19:51:57 +01:00
John Spray
4dbb8f4c18 pageserver: exclude covered layers from heatmap 2024-07-06 19:51:57 +01:00
John Spray
d7bca9fcdb pageserver: implementation of update_layer_visibility 2024-07-06 19:51:57 +01:00
John Spray
c78c810118 pageserver: move out common compaction step 2024-07-06 18:38:49 +01:00
John Spray
b9f1fa5edb pageserver: add LayerVisibility 2024-07-06 18:38:49 +01:00
John Spray
b874f1dc94 pageserver: maintain GcInfo incrementally (2/2) 2024-07-06 16:08:24 +01:00
John Spray
8b6e076983 pageserver: hold child timeline IDs in retain_lsns 2024-07-06 16:08:24 +01:00
John Spray
9d042caa0d pageserver: maintain GcInfo incrementally 2024-07-06 16:08:24 +01:00
John Spray
a33b3d93f4 pageserver: ordered entries in sparse keyspace 2024-07-06 16:08:24 +01:00
12 changed files with 506 additions and 121 deletions

View File

@@ -500,6 +500,15 @@ static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static VISIBLE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_visible_physical_size",
"The size of the layer files present in the pageserver's filesystem.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static RESIDENT_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_resident_physical_size_global",
@@ -2130,6 +2139,7 @@ pub(crate) struct TimelineMetrics {
pub archival_size: UIntGauge,
pub standby_horizon_gauge: IntGauge,
pub resident_physical_size_gauge: UIntGauge,
pub visible_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub aux_file_size_gauge: IntGauge,
@@ -2216,6 +2226,9 @@ impl TimelineMetrics {
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let visible_physical_size_gauge = VISIBLE_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
// TODO: we shouldn't expose this metric
let current_logical_size_gauge = CURRENT_LOGICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
@@ -2266,6 +2279,7 @@ impl TimelineMetrics {
archival_size,
standby_horizon_gauge,
resident_physical_size_gauge,
visible_physical_size_gauge,
current_logical_size_gauge,
aux_file_size_gauge,
directory_entries_count_gauge,
@@ -2317,6 +2331,7 @@ impl TimelineMetrics {
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
}
let _ = VISIBLE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
let _ = metric.remove_label_values(&[tenant_id, shard_id, timeline_id]);

View File

@@ -931,7 +931,7 @@ impl Timeline {
result.to_keyspace(),
/* AUX sparse key space */
SparseKeySpace(KeySpace {
ranges: vec![repl_origin_key_range(), Key::metadata_aux_key_range()],
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
}),
))
}

View File

@@ -19,6 +19,7 @@ use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::TimelineState;
@@ -30,6 +31,7 @@ use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::collections::BTreeMap;
use std::fmt;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
@@ -92,14 +94,12 @@ use crate::tenant::storage_layer::ImageLayer;
use crate::walredo;
use crate::InitializationOrder;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::ops::Bound::Included;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@@ -632,6 +632,11 @@ impl Tenant {
timeline.maybe_spawn_flush_loop();
}
}
if let Some(ancestor) = timeline.get_ancestor_timeline() {
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
ancestor_gc_info.insert_child(timeline.timeline_id, timeline.get_ancestor_lsn());
}
};
// Sanity check: a timeline should have some content.
@@ -1733,6 +1738,9 @@ impl Tenant {
.values()
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, background_jobs_can_start);
@@ -2765,6 +2773,56 @@ impl Tenant {
.await
}
/// Populate all Timelines' `GcInfo` with information about their children. We do not set the
/// PITR cutoffs here, because that requires I/O: this is done later, before GC, by [`Self::refresh_gc_info_internal`]
///
/// Subsequently, parent-child relationships are updated incrementally during timeline creation/deletion.
fn initialize_gc_info(
&self,
timelines: &std::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
) {
// This function must be called before activation: after activation timeline create/delete operations
// might happen, and this function is not safe to run concurrently with those.
assert!(!self.is_active());
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId, Option<KeySpace>)>> =
BTreeMap::new();
timelines.iter().for_each(|(timeline_id, timeline_entry)| {
if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() {
let ancestor_children = all_branchpoints.entry(*ancestor_timeline_id).or_default();
ancestor_children.push((timeline_entry.get_ancestor_lsn(), *timeline_id, None));
}
});
// The number of bytes we always keep, irrespective of PITR: this is a constant across timelines
let horizon = self.get_gc_horizon();
// Populate each timeline's GcInfo with information about its child branches
for timeline in timelines.values() {
let mut branchpoints: Vec<(Lsn, TimelineId, Option<KeySpace>)> = all_branchpoints
.remove(&timeline.timeline_id)
.unwrap_or_default();
branchpoints.sort_by_key(|b| b.0);
let mut target = timeline.gc_info.write().unwrap();
target.retain_lsns = branchpoints;
let horizon_cutoff = timeline
.get_last_record_lsn()
.checked_sub(horizon)
.unwrap_or(Lsn(0));
target.cutoffs = GcCutoffs {
horizon: horizon_cutoff,
pitr: Lsn::INVALID,
};
}
}
async fn refresh_gc_info_internal(
&self,
target_timeline_id: Option<TimelineId>,
@@ -2787,6 +2845,11 @@ impl Tenant {
.cloned()
.collect::<Vec<_>>();
if target_timeline_id.is_some() && timelines.is_empty() {
// We were to act on a particular timeline and it wasn't found
return Err(GcError::TimelineNotFound);
}
let mut gc_cutoffs: HashMap<TimelineId, GcCutoffs> =
HashMap::with_capacity(timelines.len());
@@ -2809,68 +2872,15 @@ impl Tenant {
// because that will stall branch creation.
let gc_cs = self.gc_cs.lock().await;
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
let (all_branchpoints, timelines): (BTreeSet<(TimelineId, Lsn)>, _) = {
let timelines = self.timelines.lock().unwrap();
let mut all_branchpoints = BTreeSet::new();
let timelines = {
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
if timelines.get(target_timeline_id).is_none() {
return Err(GcError::TimelineNotFound);
}
};
timelines
.iter()
.map(|(_timeline_id, timeline_entry)| {
if let Some(ancestor_timeline_id) =
&timeline_entry.get_ancestor_timeline_id()
{
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timeline_id) = target_timeline_id {
if ancestor_timeline_id == &timeline_id {
all_branchpoints.insert((
*ancestor_timeline_id,
timeline_entry.get_ancestor_lsn(),
));
}
}
// Collect branchpoints for all timelines
else {
all_branchpoints.insert((
*ancestor_timeline_id,
timeline_entry.get_ancestor_lsn(),
));
}
}
timeline_entry.clone()
})
.collect::<Vec<_>>()
};
(all_branchpoints, timelines)
};
// Ok, we now know all the branch points.
// Update the GC information for each timeline.
let mut gc_timelines = Vec::with_capacity(timelines.len());
for timeline in timelines {
// If target_timeline is specified, ignore all other timelines
// We filtered the timeline list above
if let Some(target_timeline_id) = target_timeline_id {
if timeline.timeline_id != target_timeline_id {
continue;
}
assert_eq!(target_timeline_id, timeline.timeline_id);
}
let branchpoints: Vec<Lsn> = all_branchpoints
.range((
Included((timeline.timeline_id, Lsn(0))),
Included((timeline.timeline_id, Lsn(u64::MAX))),
))
.map(|&x| x.1)
.collect();
{
let mut target = timeline.gc_info.write().unwrap();
@@ -2908,20 +2918,12 @@ impl Tenant {
.0,
);
match gc_cutoffs.remove(&timeline.timeline_id) {
Some(cutoffs) => {
target.retain_lsns = branchpoints;
target.cutoffs = cutoffs;
}
None => {
// reasons for this being unavailable:
// - this timeline was created while we were finding cutoffs
// - lsn for timestamp search fails for this timeline repeatedly
//
// in both cases, refreshing the branchpoints is correct.
target.retain_lsns = branchpoints;
}
};
// Apply the cutoffs we found to the Timeline's GcInfo. Why might we _not_ have cutoffs for a timeline?
// - this timeline was created while we were finding cutoffs
// - lsn for timestamp search fails for this timeline repeatedly
if let Some(cutoffs) = gc_cutoffs.remove(&timeline.timeline_id) {
target.cutoffs = cutoffs;
}
}
gc_timelines.push(timeline);
@@ -4305,7 +4307,7 @@ mod tests {
{
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
assert_eq!(branchpoints.len(), 1);
assert_eq!(branchpoints[0], Lsn(0x40));
assert_eq!(branchpoints[0], (Lsn(0x40), NEW_TIMELINE_ID, None));
}
// You can read the key from the child branch even though the parent is

View File

@@ -51,7 +51,7 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use anyhow::Result;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::keyspace::{KeySpace, KeySpaceAccum, KeySpaceRandomAccum};
use std::collections::{HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
@@ -61,7 +61,7 @@ use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::LayerKey;
use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::{LayerVisibility, PersistentLayerDesc};
///
/// LayerMap tracks what layers exist on a timeline.
@@ -870,6 +870,164 @@ impl LayerMap {
println!("End dump LayerMap");
Ok(())
}
/// `read_points` represent the tip of a timeline and any branch points, i.e. the places
/// where we expect to serve reads.
///
/// This function is O(N) and should be called infrequently. The caller is responsible for
/// looking up and updating the Layer objects for these layer descriptors.
pub(crate) fn get_visibility(
&self,
mut read_points: Vec<(Lsn, KeySpace)>,
) -> (Vec<(Arc<PersistentLayerDesc>, LayerVisibility)>, KeySpace) {
// This is like a KeySpace, but written for efficient subtraction of layers and unions with KeySpaces
struct KeyShadow {
// FIXME: consider efficiency. KeySpace is a flat vector, so in principle fairly inefficient for
// repeatedly calling contains(), BUT as we iterate through the layermap we expect the shadow to shrink
// to something quite small, and for small collections an algorithmically expensive vector is often better
// for performance than a more algorithmically cheap data structure.
inner: KeySpace,
}
impl KeyShadow {
fn new(keyspace: KeySpace) -> Self {
Self { inner: keyspace }
}
fn contains(&self, range: Range<Key>) -> bool {
self.inner.overlaps(&range)
}
/// Return true if anything was removed.
fn subtract(&mut self, range: Range<Key>) -> bool {
let removed = self.inner.remove_overlapping_with(&KeySpace {
ranges: vec![range],
});
!removed.ranges.is_empty()
}
fn union_with(&mut self, keyspace: KeySpace) {
let mut accum = KeySpaceRandomAccum::new();
let prev = std::mem::take(&mut self.inner);
accum.add_keyspace(prev);
accum.add_keyspace(keyspace);
self.inner = accum.to_keyspace();
}
}
// The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
// and a ReadPoint
read_points.sort_by_key(|rp| rp.0);
let mut shadow = KeyShadow::new(
read_points
.pop()
.expect("Every timeline has at least one read point")
.1,
);
// We will interleave all our read points and layers into a sorted collection
enum Item {
ReadPoint { lsn: Lsn, keyspace: KeySpace },
Layer(Arc<PersistentLayerDesc>),
}
let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
items.extend(self.iter_historic_layers().map(Item::Layer));
items.extend(read_points.into_iter().map(|rp| Item::ReadPoint {
lsn: rp.0,
keyspace: rp.1,
}));
// Ordering: we want to iterate like this:
// 1. Highest LSNs first
// 2. Consider ReadPoints before image layers if they're at the same LSN
items.sort_by_key(|item| {
std::cmp::Reverse(match item {
Item::ReadPoint {
lsn,
keyspace: _keyspace,
} => (*lsn, 0),
Item::Layer(layer) => {
if layer.is_delta() {
(layer.get_lsn_range().end, 1)
} else {
(layer.image_layer_lsn(), 2)
}
}
})
});
let mut results = Vec::with_capacity(self.historic.len());
// TODO: handle delta layers properly with multiple read points: if a read point intersects a delta layer, we might already
// have encountered it and marked it as not-visible. We need to keep track of which delta layers we are currently within, and
// when we encounter a ReadPoint, update the delta layer's visibility as needed.
// let mut pending_delta : Vec= ...
let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
for item in items {
let (reached_lsn, is_readpoint) = match &item {
Item::ReadPoint {
lsn,
keyspace: _keyspace,
} => (lsn, true),
Item::Layer(layer) => (&layer.lsn_range.start, false),
};
maybe_covered_deltas.retain(|d| {
if *reached_lsn >= d.lsn_range.start && is_readpoint {
// We encountered a readpoint within the delta layer: it is visible
results.push((d.clone(), LayerVisibility::Visible));
false
} else if *reached_lsn < d.lsn_range.start {
// We passed the layer's range without encountering a read point: it is not visible
results.push((d.clone(), LayerVisibility::Covered));
false
} else {
// We're still in the delta layer: continue iterating
true
}
});
match item {
Item::ReadPoint {
lsn: _lsn,
keyspace,
} => {
shadow.union_with(keyspace);
}
Item::Layer(layer) => {
let visibility = if layer.is_delta() {
if shadow.contains(layer.get_key_range()) {
LayerVisibility::Visible
} else {
// If a layer isn't visible based on current state, we must defer deciding whether
// it is truly not visible until we have advanced past the delta's range: we might
// encounter another branch point within this delta layer's LSN range.
maybe_covered_deltas.push(layer);
continue;
}
} else if shadow.subtract(layer.get_key_range()) {
// An image layer, which overlapped with the shadow
LayerVisibility::Visible
} else {
// An image layer, which did not overlap with the shadow
LayerVisibility::Covered
};
results.push((layer, visibility));
}
}
}
// Drain any remaining maybe_covered deltas
results.extend(
maybe_covered_deltas
.into_iter()
.map(|d| (d, LayerVisibility::Covered)),
);
(results, shadow.inner)
}
}
#[cfg(test)]

View File

@@ -521,6 +521,10 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
Ok(&self.historic_coverage)
}
pub(crate) fn len(&self) -> usize {
self.layers.len()
}
}
#[test]

View File

@@ -271,10 +271,14 @@ pub(super) async fn gather_inputs(
let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
.retain_lsns
.iter()
.filter(|&&lsn| lsn > ancestor_lsn)
.copied()
// this assumes there are no other retain_lsns than the branchpoints
.map(|lsn| (lsn, LsnKind::BranchPoint))
.filter_map(|(lsn, _child_id, _)| {
if lsn > &ancestor_lsn {
// this assumes there are no other retain_lsns than the branchpoints
Some((*lsn, LsnKind::BranchPoint))
} else {
None
}
})
.collect::<Vec<_>>();
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));

View File

@@ -457,6 +457,26 @@ pub enum ValueReconstructResult {
Missing,
}
#[derive(Debug, Clone)]
pub(crate) enum LayerVisibility {
/// A Visible layer might be read while serving a read, because there is not an image layer between it
/// and a readable LSN (the tip of the branch or a child's branch point)
Visible,
/// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
/// a branch or ephemeral endpoint at an LSN below the layer that covers this.
Covered,
/// Calculating layer visibilty requires I/O, so until this has happened layers are loaded
/// in this state. Note that newly written layers may be called Visible immediately, this uninitialized
/// state is for when existing layers are constructed while loading a timeline.
Uninitialized,
}
impl Default for LayerVisibility {
fn default() -> Self {
Self::Uninitialized
}
}
#[derive(Debug)]
pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
@@ -468,6 +488,7 @@ pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
struct LayerAccessStatsLocked {
for_scraping_api: LayerAccessStatsInner,
for_eviction_policy: LayerAccessStatsInner,
visibility: LayerVisibility,
}
impl LayerAccessStatsLocked {
@@ -591,7 +612,13 @@ impl LayerAccessStats {
inner.count_by_access_kind[access_kind] += 1;
inner.task_kind_flag |= ctx.task_kind();
inner.last_accesses.write(this_access);
})
});
// We may access a layer marked as Covered, if a new branch was created that depends on
// this layer, and background updates to layer visibility didn't notice it yet
if !matches!(locked.visibility, LayerVisibility::Visible) {
locked.visibility = LayerVisibility::Visible;
}
}
fn as_api_model(
@@ -673,6 +700,28 @@ impl LayerAccessStats {
},
}
}
pub(crate) fn set_visibility(&self, visibility: LayerVisibility) {
self.0.lock().unwrap().visibility = visibility;
}
pub(crate) fn get_visibility(&self) -> LayerVisibility {
self.0.lock().unwrap().visibility.clone()
}
/// Summarize how likely this layer is to be used: its access time (if accessed), and its visibility hint.
pub(crate) fn atime_visibility(&self) -> (Option<SystemTime>, LayerVisibility) {
let state = self.0.lock().unwrap();
(
state
.for_eviction_policy
.last_accesses
.recent()
.map(|a| a.when),
state.visibility.clone(),
)
}
}
/// Get a layer descriptor from a layer.

View File

@@ -250,6 +250,8 @@ impl Layer {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
// Newly created layers are marked visible by default: the usual case is that they were created to be read.
access_stats.set_visibility(super::LayerVisibility::Visible);
let local_path = local_layer_path(
conf,

View File

@@ -30,7 +30,7 @@ use pageserver_api::{
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId},
};
use rand::Rng;
use serde_with::serde_as;
@@ -135,7 +135,7 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::{config::TenantConf, storage_layer::LayerVisibility};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
@@ -453,12 +453,12 @@ pub struct WalReceiverInfo {
/// Garbage Collection.
#[derive(Default)]
pub(crate) struct GcInfo {
/// Specific LSNs that are needed.
/// Record which parts of this timeline's history are still needed by children
///
/// Currently, this includes all points where child branches have
/// been forked off from. In the future, could also include
/// explicit user-defined snapshot points.
pub(crate) retain_lsns: Vec<Lsn>,
/// Optionally store each child's keyspace at their branch LSN: parts of the keyspace not covered here may be dropped during GC, as
/// the child will never read them. For example, a child which has covered its whole keyspace with image layers
/// will put an empty keyspace here. Children populate this: if it is None, presume the child may read any part of the keyspace.
pub(crate) retain_lsns: Vec<(Lsn, TimelineId, Option<KeySpace>)>,
/// The cutoff coordinates, which are combined by selecting the minimum.
pub(crate) cutoffs: GcCutoffs,
@@ -474,6 +474,23 @@ impl GcInfo {
pub(crate) fn min_cutoff(&self) -> Lsn {
self.cutoffs.select_min()
}
pub(super) fn insert_child(&mut self, child_id: TimelineId, child_lsn: Lsn) {
self.retain_lsns.push((child_lsn, child_id, None));
self.retain_lsns.sort_by_key(|i| i.0);
}
pub(super) fn remove_child(&mut self, child_id: TimelineId) {
self.retain_lsns.retain(|i| i.1 != child_id);
}
/// When the child re-calculates which parts of the keyspace it will read from the ancestor, it posts
/// and update to the parent using this function, to enable the parent to perhaps GC more layers.
pub(super) fn notify_child_keyspace(&mut self, child_id: TimelineId, key_space: KeySpace) {
if let Ok(idx) = self.retain_lsns.binary_search_by_key(&child_id, |i| i.1) {
self.retain_lsns.get_mut(idx).unwrap().2 = Some(key_space);
}
}
}
/// The `GcInfo` component describing which Lsns need to be retained.
@@ -1793,9 +1810,26 @@ impl Timeline {
}
match self.get_compaction_algorithm_settings().kind {
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await?,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await?,
}
if self.shard_identity.count >= ShardCount::new(2) {
// Limit the number of layer rewrites to the number of partitions: this means its
// runtime should be comparable to a full round of image layer creations, rather than
// being potentially much longer.
// TODO: make `partitioning` a sync lock: see comment in `repartition()` for why there's no
// real async use.
let rewrite_max = self.partitioning.try_lock().unwrap().0 .0.parts.len();
self.compact_shard_ancestors(rewrite_max, ctx).await?;
}
// TODO: be more selective: call this once at startup, and thereafter only when some branching changes or
// when image layer are generated.
self.update_layer_visibility(ctx).await?;
Ok(())
}
/// Mutate the timeline with a [`TimelineWriter`].
@@ -2967,6 +3001,17 @@ impl Timeline {
.set((calculated_size, metrics_guard.calculation_result_saved()))
.ok()
.expect("only this task sets it");
// As a nice-to-have, calculate layer visibilties. Otherwise this will
// be initialized on first compaction. Doing it as early as possible
// enables code that depends on layer visibility (like uploading heatmaps)
// to execute earlier, rather than waiting for compaction.
match self.update_layer_visibility(&background_ctx).await {
Ok(_) | Err(CompactionError::ShuttingDown) => {}
Err(e) => {
tracing::warn!("Initial layer visibility calculation failed: {e}");
}
}
}
pub(crate) fn spawn_ondemand_logical_size_calculation(
@@ -3144,7 +3189,8 @@ impl Timeline {
}
/// The timeline heatmap is a hint to secondary locations from the primary location,
/// indicating which layers are currently on-disk on the primary.
/// indicating which layers should be downloaded on the secondary to give it a warm
/// cache, that will enable it to take over as the attached location without degrading performance.
///
/// None is returned if the Timeline is in a state where uploading a heatmap
/// doesn't make sense, such as shutting down or initializing. The caller
@@ -3157,19 +3203,32 @@ impl Timeline {
let guard = self.layers.read().await;
let resident = guard.likely_resident_layers().map(|layer| {
let last_activity_ts = layer.access_stats().latest_activity_or_now();
let mut resident_visible_layers = Vec::new();
let now = SystemTime::now();
for layer in guard.likely_resident_layers() {
let (atime, visibility) = layer.access_stats().atime_visibility();
HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
last_activity_ts,
)
});
match visibility {
LayerVisibility::Uninitialized => {
// Refuse to generate a heatmap at all until layer visibilty is initialized
return None;
}
LayerVisibility::Covered => {
// This layer is covered: exclude it from the heatmap because a secondary
// node is highly unlikely to need this layer in the event that it takes over as attached
}
LayerVisibility::Visible => resident_visible_layers.push(HeatMapLayer::new(
layer.layer_desc().layer_name(),
layer.metadata(),
atime.unwrap_or(now),
)),
}
}
let layers = resident.collect();
Some(HeatMapTimeline::new(self.timeline_id, layers))
Some(HeatMapTimeline::new(
self.timeline_id,
resident_visible_layers,
))
}
/// Returns true if the given lsn is or was an ancestor branchpoint.
@@ -5035,7 +5094,11 @@ impl Timeline {
let horizon_cutoff = min(gc_info.cutoffs.horizon, self.get_disk_consistent_lsn());
let pitr_cutoff = gc_info.cutoffs.pitr;
let retain_lsns = gc_info.retain_lsns.clone();
let retain_lsns = gc_info
.retain_lsns
.iter()
.map(|(lsn, _child_id, _)| *lsn)
.collect();
// Gets the maximum LSN that holds the valid lease.
//

View File

@@ -19,14 +19,14 @@ use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};
use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
use crate::tenant::storage_layer::{AsLayerDesc, LayerVisibility, PersistentLayerDesc};
use crate::tenant::timeline::{drop_rlock, Hole, ImageLayerCreationOutcome};
use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
@@ -100,7 +100,7 @@ impl Timeline {
// Define partitioning schema if needed
// FIXME: the match should only cover repartitioning, not the next steps
let partition_count = match self
match self
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
@@ -140,7 +140,6 @@ impl Timeline {
.await?;
self.upload_new_image_layers(image_layers)?;
partitioning.parts.len()
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
@@ -152,19 +151,9 @@ impl Timeline {
if !self.cancel.is_cancelled() {
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
}
1
}
};
if self.shard_identity.count >= ShardCount::new(2) {
// Limit the number of layer rewrites to the number of partitions: this means its
// runtime should be comparable to a full round of image layer creations, rather than
// being potentially much longer.
let rewrite_max = partition_count;
self.compact_shard_ancestors(rewrite_max, ctx).await?;
}
Ok(())
}
@@ -176,7 +165,7 @@ impl Timeline {
///
/// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
/// how much work it will try to do in each compaction pass.
async fn compact_shard_ancestors(
pub(super) async fn compact_shard_ancestors(
self: &Arc<Self>,
rewrite_max: usize,
ctx: &RequestContext,
@@ -358,6 +347,88 @@ impl Timeline {
Ok(())
}
/// A post-compaction step to update the LayerVisibility of layers covered by image layers. This
/// should also be called when new branches are created.
///
/// Sweep through the layer map, identifying layers which are covered by image layers
/// such that they do not need to be available to service reads. The resulting LayerVisibility
/// result may be used as an input to eviction and secondary downloads to de-prioritize layers
/// that we know won't be needed for reads.
pub(super) async fn update_layer_visibility(
&self,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
// Start with a keyspace representing all the keys we need to read from the tip of the branch
let head_lsn = self.get_last_record_lsn();
let (mut head_keyspace, sparse_ks) = self.collect_keyspace(head_lsn, ctx).await?;
// Converting the sparse part of the keyspace into the dense keyspace is safe in this context
// because we will never iterate through the keys.
head_keyspace.merge(&sparse_ks.0);
// We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
// are implicitly visible, because LayerVisibility's default is Visible, and we never modify it here.
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map();
let mut visible_size: u64 = 0;
// FIXME: we only get accurate keyspaces from children if they've already run update_layer_visibility themselves. At startup all the timelines
// initialize this in arbitrary order (at the end of initial_logical_size_calculation). We should coordinate these. Perhaps at the very start
// of the tenant compaction task we should do all the timelines' layer visibility calculations in a leaf-first order?
let readable_points = {
let children = self.gc_info.read().unwrap().retain_lsns.clone();
let mut readable_points = Vec::with_capacity(children.len() + 1);
for (child_lsn, _child_timeline_id, child_keyspace) in &children {
let keyspace = match child_keyspace {
Some(ks) => ks.clone(),
None => {
// The child has not posted information about which parts of the keyspace they depend on: presume they depend on all of it.
let (mut keyspace, sparse_keyspace) =
self.collect_keyspace(*child_lsn, ctx).await?;
keyspace.merge(&sparse_keyspace.0);
keyspace
}
};
readable_points.push((*child_lsn, keyspace));
}
readable_points.push((head_lsn, head_keyspace));
readable_points
};
let (layer_visibility, shadow) = layer_map.get_visibility(readable_points);
for (layer_desc, visibility) in layer_visibility {
// FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
let layer = layer_manager.get_from_desc(&layer_desc);
if matches!(visibility, LayerVisibility::Visible) {
visible_size += layer.metadata().file_size;
}
layer.access_stats().set_visibility(visibility);
}
if let Some(ancestor) = &self.ancestor_timeline {
// Having calculated the readable keyspace after walking back through all this timeline's layers, the resulting keyspace is the remaining
// keys for which reads may still fall through to the parent branch. Notify the parent branch of this, so that they may GC layers which
// do not overlap with this keyspace, and so that they may use this as an input to their own visibility updates.
ancestor
.gc_info
.write()
.unwrap()
.notify_child_keyspace(self.timeline_id, shadow);
}
// Also include in the visible size all the layers which we would never update visibility on
// TODO: getter that doesn't spuriously construct a Vec<>
for layer in layer_map.get_level0_deltas().unwrap() {
visible_size += layer.file_size;
}
self.metrics.visible_physical_size_gauge.set(visible_size);
Ok(())
}
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files.
async fn compact_level0(

View File

@@ -148,14 +148,14 @@ async fn cleanup_remaining_timeline_fs_traces(
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
async fn remove_timeline_from_tenant(
tenant: &Tenant,
timeline_id: TimelineId,
timeline: &Timeline,
_: &DeletionGuard, // using it as a witness
) -> anyhow::Result<()> {
// Remove the timeline from the map.
let mut timelines = tenant.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline.timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
@@ -163,8 +163,14 @@ async fn remove_timeline_from_tenant(
panic!("Timeline grew children while we removed layer files");
}
// Unlink from parent
if let Some(ancestor) = timeline.get_ancestor_timeline() {
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
ancestor_gc_info.remove_child(timeline.timeline_id);
}
timelines
.remove(&timeline_id)
.remove(&timeline.timeline_id)
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
drop(timelines);
@@ -293,6 +299,9 @@ impl DeleteTimelineFlow {
{
let mut locked = tenant.timelines.lock().unwrap();
locked.insert(timeline_id, Arc::clone(&timeline));
// Note that we do not insert this into the parent branch's GcInfo: the parent is not obliged to retain
// any data for child timelines being deleted.
}
guard.mark_in_progress()?;
@@ -413,7 +422,7 @@ impl DeleteTimelineFlow {
pausable_failpoint!("in_progress_delete");
remove_timeline_from_tenant(tenant, timeline.timeline_id, &guard).await?;
remove_timeline_from_tenant(tenant, timeline, &guard).await?;
*guard = Self::Finished;

View File

@@ -255,6 +255,14 @@ impl LayerManager {
new_layer.layer_desc().lsn_range
);
// Transfer visibilty hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
// be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
// always marking rewritten layers as visible.
new_layer
.as_ref()
.access_stats()
.set_visibility(old_layer.access_stats().get_visibility());
// Safety: we may never rewrite the same file in-place. Callers are responsible
// for ensuring that they only rewrite layers after something changes the path,
// such as an increment in the generation number.