From 5ceb8c994d5c22737ef6c2c70349525f13cc225c Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 6 Mar 2025 11:25:02 +0000 Subject: [PATCH] pageserver: mark unarchival heatmap layers as cold (#11098) ## Problem On unarchival, we update the previous heatmap with all visible layers. When the primary generates a new heatmap it includes all those layers, so the secondary will download them. Since they're not actually resident on the primary (we didn't call the warm up API), they'll never be evicted, so they remain in the heatmap. We want these layers in the heatmap, since we might wish to warm-up an unarchived timeline after a shard migration. However, we don't want them to be downloaded on the secondary until we've warmed up the primary. ## Summary of Changes Include these layers in the heatmap and mark them as cold. All heatmap operations act on non-cold layers apart from the attached location warming up API, which will download the cold layers. Once the cold layers are downloaded on the primary, they'll be included in the next heatmap as hot and the secondary starts fetching them too. --- pageserver/src/tenant/secondary/downloader.rs | 14 ++--- pageserver/src/tenant/secondary/heatmap.rs | 24 ++++++-- pageserver/src/tenant/storage_layer/layer.rs | 4 +- pageserver/src/tenant/timeline.rs | 28 ++++++---- .../timeline/heatmap_layers_downloader.rs | 4 +- .../regress/test_pageserver_secondary.py | 55 ++++++++++++++++--- 6 files changed, 96 insertions(+), 33 deletions(-) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index a13b9323ac..5f3a0932c4 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -869,8 +869,7 @@ impl<'a> TenantDownloader<'a> { let heatmap_timeline = heatmap.timelines.get(heatmap_timeline_index).unwrap(); let layers_in_heatmap = heatmap_timeline - .layers - .iter() + .hot_layers() .map(|l| (&l.name, l.metadata.generation)) .collect::>(); let layers_on_disk = timeline_state @@ -1015,7 +1014,8 @@ impl<'a> TenantDownloader<'a> { // Accumulate updates to the state let mut touched = Vec::new(); - for layer in timeline.layers { + let timeline_id = timeline.timeline_id; + for layer in timeline.into_hot_layers() { if self.secondary_state.cancel.is_cancelled() { tracing::debug!("Cancelled -- dropping out of layer loop"); return (Err(UpdateError::Cancelled), touched); @@ -1040,7 +1040,7 @@ impl<'a> TenantDownloader<'a> { } match self - .download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx) + .download_layer(tenant_shard_id, &timeline_id, layer, ctx) .await { Ok(Some(layer)) => touched.push(layer), @@ -1148,7 +1148,7 @@ impl<'a> TenantDownloader<'a> { let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); let timeline_id = timeline.timeline_id; - tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len()); + tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count()); let (result, touched) = self .download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx) @@ -1316,11 +1316,11 @@ async fn init_timeline_state( // As we iterate through layers found on disk, we will look up their metadata from this map. // Layers not present in metadata will be discarded. let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> = - heatmap.layers.iter().map(|l| (&l.name, l)).collect(); + heatmap.hot_layers().map(|l| (&l.name, l)).collect(); let last_heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> = if let Some(last_heatmap) = last_heatmap { - last_heatmap.layers.iter().map(|l| (&l.name, l)).collect() + last_heatmap.hot_layers().map(|l| (&l.name, l)).collect() } else { HashMap::new() }; diff --git a/pageserver/src/tenant/secondary/heatmap.rs b/pageserver/src/tenant/secondary/heatmap.rs index 4a938e9095..6dbb3f091f 100644 --- a/pageserver/src/tenant/secondary/heatmap.rs +++ b/pageserver/src/tenant/secondary/heatmap.rs @@ -42,7 +42,7 @@ pub(crate) struct HeatMapTimeline { #[serde_as(as = "DisplayFromStr")] pub(crate) timeline_id: TimelineId, - pub(crate) layers: Vec, + layers: Vec, } #[serde_as] @@ -53,8 +53,10 @@ pub(crate) struct HeatMapLayer { #[serde_as(as = "TimestampSeconds")] pub(crate) access_time: SystemTime, - // TODO: an actual 'heat' score that would let secondary locations prioritize downloading - // the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary. + + #[serde(default)] + pub(crate) cold: bool, // TODO: an actual 'heat' score that would let secondary locations prioritize downloading + // the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary. } impl HeatMapLayer { @@ -62,11 +64,13 @@ impl HeatMapLayer { name: LayerName, metadata: LayerFileMetadata, access_time: SystemTime, + cold: bool, ) -> Self { Self { name, metadata, access_time, + cold, } } } @@ -78,6 +82,18 @@ impl HeatMapTimeline { layers, } } + + pub(crate) fn into_hot_layers(self) -> impl Iterator { + self.layers.into_iter().filter(|l| !l.cold) + } + + pub(crate) fn hot_layers(&self) -> impl Iterator { + self.layers.iter().filter(|l| !l.cold) + } + + pub(crate) fn all_layers(&self) -> impl Iterator { + self.layers.iter() + } } pub(crate) struct HeatMapStats { @@ -92,7 +108,7 @@ impl HeatMapTenant { layers: 0, }; for timeline in &self.timelines { - for layer in &timeline.layers { + for layer in timeline.hot_layers() { stats.layers += 1; stats.bytes += layer.metadata.file_size; } diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index bde7fbc1f9..247092bf45 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1563,10 +1563,10 @@ impl LayerInner { self.access_stats.record_residence_event(); - self.status.as_ref().unwrap().send_replace(Status::Evicted); - *self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now()); + self.status.as_ref().unwrap().send_replace(Status::Evicted); + Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f646e621d3..4483ecfe94 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3648,7 +3648,7 @@ impl Timeline { let visible_non_resident = match previous_heatmap.as_deref() { Some(PreviousHeatmap::Active { heatmap, read_at, .. - }) => Some(heatmap.layers.iter().filter_map(|hl| { + }) => Some(heatmap.all_layers().filter_map(|hl| { let desc: PersistentLayerDesc = hl.name.clone().into(); let layer = guard.try_get_from_key(&desc.key())?; @@ -3664,7 +3664,7 @@ impl Timeline { return None; } - Some((desc, hl.metadata.clone(), hl.access_time)) + Some((desc, hl.metadata.clone(), hl.access_time, hl.cold)) })), Some(PreviousHeatmap::Obsolete) => None, None => None, @@ -3680,6 +3680,7 @@ impl Timeline { layer.layer_desc().clone(), layer.metadata(), last_activity_ts, + false, // these layers are not cold )) } LayerVisibilityHint::Covered => { @@ -3706,12 +3707,14 @@ impl Timeline { // Sort layers in order of which to download first. For a large set of layers to download, we // want to prioritize those layers which are most likely to still be in the resident many minutes // or hours later: + // - Cold layers go last for convenience when a human inspects the heatmap. // - Download L0s last, because they churn the fastest: L0s on a fast-writing tenant might // only exist for a few minutes before being compacted into L1s. // - For L1 & image layers, download most recent LSNs first: the older the LSN, the sooner // the layer is likely to be covered by an image layer during compaction. - layers.sort_by_key(|(desc, _meta, _atime)| { + layers.sort_by_key(|(desc, _meta, _atime, cold)| { std::cmp::Reverse(( + *cold, !LayerMap::is_l0(&desc.key_range, desc.is_delta), desc.lsn_range.end, )) @@ -3719,7 +3722,9 @@ impl Timeline { let layers = layers .into_iter() - .map(|(desc, meta, atime)| HeatMapLayer::new(desc.layer_name(), meta, atime)) + .map(|(desc, meta, atime, cold)| { + HeatMapLayer::new(desc.layer_name(), meta, atime, cold) + }) .collect(); Some(HeatMapTimeline::new(self.timeline_id, layers)) @@ -3739,6 +3744,7 @@ impl Timeline { name: vl.layer_desc().layer_name(), metadata: vl.metadata(), access_time: now, + cold: true, }; heatmap_layers.push(hl); } @@ -7040,6 +7046,7 @@ mod tests { use pageserver_api::key::Key; use pageserver_api::value::Value; + use std::iter::Iterator; use tracing::Instrument; use utils::id::TimelineId; use utils::lsn::Lsn; @@ -7053,8 +7060,8 @@ mod tests { use crate::tenant::{PreviousHeatmap, Timeline}; fn assert_heatmaps_have_same_layers(lhs: &HeatMapTimeline, rhs: &HeatMapTimeline) { - assert_eq!(lhs.layers.len(), rhs.layers.len()); - let lhs_rhs = lhs.layers.iter().zip(rhs.layers.iter()); + assert_eq!(lhs.all_layers().count(), rhs.all_layers().count()); + let lhs_rhs = lhs.all_layers().zip(rhs.all_layers()); for (l, r) in lhs_rhs { assert_eq!(l.name, r.name); assert_eq!(l.metadata, r.metadata); @@ -7132,10 +7139,11 @@ mod tests { assert_eq!(heatmap.timeline_id, timeline.timeline_id); // L0 should come last - assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name()); + let heatmap_layers = heatmap.all_layers().collect::>(); + assert_eq!(heatmap_layers.last().unwrap().name, l0_delta.layer_name()); let mut last_lsn = Lsn::MAX; - for layer in &heatmap.layers { + for layer in heatmap_layers { // Covered layer should be omitted assert!(layer.name != covered_delta.layer_name()); @@ -7264,7 +7272,7 @@ mod tests { .expect("Infallible while timeline is not shut down"); // Both layers should be in the heatmap - assert!(!heatmap.layers.is_empty()); + assert!(heatmap.all_layers().count() > 0); // Now simulate a migration. timeline @@ -7290,7 +7298,7 @@ mod tests { .await .expect("Infallible while timeline is not shut down"); - assert!(post_eviction_heatmap.layers.is_empty()); + assert_eq!(post_eviction_heatmap.all_layers().count(), 0); assert!(matches!( timeline.previous_heatmap.load().as_deref(), Some(PreviousHeatmap::Obsolete) diff --git a/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs b/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs index 6209b63de4..11df232a10 100644 --- a/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs +++ b/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs @@ -61,11 +61,11 @@ impl HeatmapLayersDownloader { tracing::info!( resident_size=%timeline.resident_physical_size(), - heatmap_layers=%heatmap.layers.len(), + heatmap_layers=%heatmap.all_layers().count(), "Starting heatmap layers download" ); - let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map( + let stream = futures::stream::iter(heatmap.all_layers().cloned().filter_map( |layer| { let ctx = ctx.attached_child(); let tl = timeline.clone(); diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index ab0f00db1c..b9e2934505 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -955,6 +955,17 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): raise RuntimeError(f"No heatmap for timeline: {tlid}") + def count_timeline_heatmap_layers(tlid) -> tuple[int, int]: + cold, hot = 0, 0 + layers = timeline_heatmap(tlid)["layers"] + for layer in layers: + if layer["cold"]: + cold += 1 + else: + hot += 1 + + return cold, hot + env.storage_controller.allowed_errors.extend( [ ".*Timed out.*downloading layers.*", @@ -988,13 +999,19 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id ) - def all_layers_downloaded(expected_layer_count: int): - local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id)) + def all_layers_downloaded(node, expected_layer_count: int): + local_layers_count = len(node.list_layers(tenant_id, timeline_id)) log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}") assert local_layers_count >= expected_layer_count - wait_until(lambda: all_layers_downloaded(after_migration_heatmap_layers_count)) + def no_layers_downloaded(node): + local_layers_count = len(node.list_layers(tenant_id, timeline_id)) + + log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}") + assert local_layers_count == 0 + + wait_until(lambda: all_layers_downloaded(ps_secondary, after_migration_heatmap_layers_count)) # Read everything and make sure that we're not downloading anything extra. # All hot layers should be available locally now. @@ -1047,13 +1064,35 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): wait_until(lambda: check_archival_state(TimelineArchivalState.UNARCHIVED, child_timeline_id)) ps_secondary.http_client().tenant_heatmap_upload(tenant_id) - log.info(f"Parent timeline heatmap size: {len(timeline_heatmap(timeline_id)['layers'])}") - log.info(f"Child timeline heatmap size: {len(timeline_heatmap(child_timeline_id)['layers'])}") - expected_locally = len(timeline_heatmap(timeline_id)["layers"]) - assert expected_locally > 0 + parent_cold, parent_hot = count_timeline_heatmap_layers(timeline_id) + child_cold, child_hot = count_timeline_heatmap_layers(child_timeline_id) + + log.info(f"Parent timeline heatmap size: cold={parent_cold}, hot={parent_hot}") + log.info(f"Child timeline heatmap size: cold={child_cold}, hot={child_hot}") + + # All layers in the heatmap should come from the generation on unarchival. + # Hence, they should be cold. + assert parent_cold > 0 + assert parent_hot == 0 + + expected_locally = parent_cold env.storage_controller.download_heatmap_layers( TenantShardId(tenant_id, shard_number=0, shard_count=0), child_timeline_id, recurse=True ) - wait_until(lambda: all_layers_downloaded(expected_locally)) + wait_until(lambda: all_layers_downloaded(ps_secondary, expected_locally)) + + for ps in env.pageservers: + ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "off")]) + + # The uploaded heatmap is still empty. Clean up all layers on the secondary. + ps_attached.http_client().tenant_secondary_download(tenant_id, wait_ms=100) + wait_until(lambda: no_layers_downloaded(ps_attached)) + + # Upload a new heatmap. The previously cold layers become hot since they're now resident. + ps_secondary.http_client().tenant_heatmap_upload(tenant_id) + + # Warm up the current secondary. + ps_attached.http_client().tenant_secondary_download(tenant_id, wait_ms=100) + wait_until(lambda: all_layers_downloaded(ps_secondary, expected_locally))