From 0d6d58bd3e98cb459d8ce0b7ee78c5b3a7890f17 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 28 Feb 2025 10:36:53 +0000 Subject: [PATCH] pageserver: make heatmap layer download API more cplane friendly (#10957) ## Problem We intend for cplane to use the heatmap layer download API to warm up timelines after unarchival. It's tricky for them to recurse in the ancestors, and the current implementation doesn't work well when unarchiving a chain of branches and warming them up. ## Summary of changes * Add a `recurse` flag to the API. When the flag is set, the operation recurses into the parent timeline after the current one is done. * Be resilient to warming up a chain of unarchived branches. Let's say we unarchived `B` and `C` from the `A -> B -> C` branch hierarchy. `B` got unarchived first. We generated the unarchival heatmaps and stash them in `A` and `B`. When `C` unarchived, it dropped it's unarchival heatmap since `A` and `B` already had one. If `C` needed layers from `A` and `B`, it was out of luck. Now, when choosing whether to keep an unarchival heatmap we look at its end LSN. If it's more inclusive than what we currently have, keep it. --- pageserver/client/src/mgmt_api.rs | 4 ++ pageserver/src/http/routes.rs | 5 +- pageserver/src/pgdatadir_mapping.rs | 2 + pageserver/src/tenant.rs | 8 ++- pageserver/src/tenant/timeline.rs | 53 +++++++++++-------- .../timeline/heatmap_layers_downloader.rs | 25 ++++++++- storage_controller/src/http.rs | 3 +- storage_controller/src/pageserver_client.rs | 8 ++- storage_controller/src/service.rs | 8 ++- test_runner/fixtures/neon_fixtures.py | 13 ++++- .../regress/test_pageserver_secondary.py | 42 ++++++--------- 11 files changed, 111 insertions(+), 60 deletions(-) diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index f19b4e964d..37c914c4e9 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -480,6 +480,7 @@ impl Client { tenant_shard_id: TenantShardId, timeline_id: TimelineId, concurrency: Option, + recurse: bool, ) -> Result<()> { let mut path = reqwest::Url::parse(&format!( "{}/v1/tenant/{}/timeline/{}/download_heatmap_layers", @@ -487,6 +488,9 @@ impl Client { )) .expect("Cannot build URL"); + path.query_pairs_mut() + .append_pair("recurse", &format!("{}", recurse)); + if let Some(concurrency) = concurrency { path.query_pairs_mut() .append_pair("concurrency", &format!("{}", concurrency)); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index b738d22740..a3ee31d6e6 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1435,6 +1435,7 @@ async fn timeline_download_heatmap_layers_handler( let desired_concurrency = parse_query_param(&request, "concurrency")?.unwrap_or(DEFAULT_CONCURRENCY); + let recurse = parse_query_param(&request, "recurse")?.unwrap_or(false); check_permission(&request, Some(tenant_shard_id.tenant_id))?; @@ -1451,9 +1452,7 @@ async fn timeline_download_heatmap_layers_handler( .unwrap_or(DEFAULT_MAX_CONCURRENCY); let concurrency = std::cmp::min(max_concurrency, desired_concurrency); - timeline - .start_heatmap_layers_download(concurrency, &ctx) - .await?; + timeline.start_heatmap_layers_download(concurrency, recurse, &ctx)?; json_response(StatusCode::ACCEPTED, ()) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 787b1b895c..c10dfb4542 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1052,6 +1052,8 @@ impl Timeline { ) -> Result { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); + fail::fail_point!("skip-logical-size-calculation", |_| { Ok(0) }); + // Fetch list of database dirs and iterate them let buf = self.get(DBDIR_KEY, lsn, ctx).await?; let dbdir = DbDirectory::des(&buf)?; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 9243f131ad..11d656eb25 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1153,12 +1153,15 @@ impl Tenant { let mut tline_ending_at = Some((&timeline, timeline.get_last_record_lsn())); while let Some((tline, end_lsn)) = tline_ending_at { let unarchival_heatmap = tline.generate_unarchival_heatmap(end_lsn).await; - if !tline.is_previous_heatmap_active() { + // Another unearchived timeline might have generated a heatmap for this ancestor. + // If the current branch point greater than the previous one use the the heatmap + // we just generated - it should include more layers. + if !tline.should_keep_previous_heatmap(end_lsn) { tline .previous_heatmap .store(Some(Arc::new(unarchival_heatmap))); } else { - tracing::info!("Previous heatmap still active. Dropping unarchival heatmap.") + tracing::info!("Previous heatmap preferred. Dropping unarchival heatmap.") } match tline.ancestor_timeline() { @@ -1939,6 +1942,7 @@ impl Tenant { hs.0.remove(&timeline_id).map(|h| PreviousHeatmap::Active { heatmap: h, read_at: hs.1, + end_lsn: None, }) }); part_downloads.spawn( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 662088fbde..851f84f603 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -442,6 +442,8 @@ pub(crate) enum PreviousHeatmap { Active { heatmap: HeatMapTimeline, read_at: std::time::Instant, + // End LSN covered by the heatmap if known + end_lsn: Option, }, Obsolete, } @@ -3570,12 +3572,16 @@ impl Timeline { Ok(layer) } - pub(super) fn is_previous_heatmap_active(&self) -> bool { - self.previous_heatmap - .load() - .as_ref() - .map(|prev| matches!(**prev, PreviousHeatmap::Active { .. })) - .unwrap_or(false) + pub(super) fn should_keep_previous_heatmap(&self, new_heatmap_end_lsn: Lsn) -> bool { + let crnt = self.previous_heatmap.load(); + match crnt.as_deref() { + Some(PreviousHeatmap::Active { end_lsn, .. }) => match end_lsn { + Some(crnt_end_lsn) => *crnt_end_lsn > new_heatmap_end_lsn, + None => true, + }, + Some(PreviousHeatmap::Obsolete) => false, + None => false, + } } /// The timeline heatmap is a hint to secondary locations from the primary location, @@ -3603,26 +3609,26 @@ impl Timeline { // heatamp. let previous_heatmap = self.previous_heatmap.load(); let visible_non_resident = match previous_heatmap.as_deref() { - Some(PreviousHeatmap::Active { heatmap, read_at }) => { - Some(heatmap.layers.iter().filter_map(|hl| { - let desc: PersistentLayerDesc = hl.name.clone().into(); - let layer = guard.try_get_from_key(&desc.key())?; + Some(PreviousHeatmap::Active { + heatmap, read_at, .. + }) => Some(heatmap.layers.iter().filter_map(|hl| { + let desc: PersistentLayerDesc = hl.name.clone().into(); + let layer = guard.try_get_from_key(&desc.key())?; - if layer.visibility() == LayerVisibilityHint::Covered { - return None; - } + if layer.visibility() == LayerVisibilityHint::Covered { + return None; + } - if layer.is_likely_resident() { - return None; - } + if layer.is_likely_resident() { + return None; + } - if layer.last_evicted_at().happened_after(*read_at) { - return None; - } + if layer.last_evicted_at().happened_after(*read_at) { + return None; + } - Some((desc, hl.metadata.clone(), hl.access_time)) - })) - } + Some((desc, hl.metadata.clone(), hl.access_time)) + })), Some(PreviousHeatmap::Obsolete) => None, None => None, }; @@ -3709,6 +3715,7 @@ impl Timeline { PreviousHeatmap::Active { heatmap, read_at: Instant::now(), + end_lsn: Some(end_lsn), } } @@ -7046,6 +7053,7 @@ mod tests { .store(Some(Arc::new(PreviousHeatmap::Active { heatmap: heatmap.clone(), read_at: std::time::Instant::now(), + end_lsn: None, }))); // Generate a new heatmap and assert that it contains the same layers as the old one. @@ -7148,6 +7156,7 @@ mod tests { .store(Some(Arc::new(PreviousHeatmap::Active { heatmap: heatmap.clone(), read_at: std::time::Instant::now(), + end_lsn: None, }))); // Evict all the layers in the previous heatmap diff --git a/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs b/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs index 184c830464..6209b63de4 100644 --- a/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs +++ b/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs @@ -32,6 +32,7 @@ impl HeatmapLayersDownloader { fn new( timeline: Arc, concurrency: usize, + recurse: bool, ctx: RequestContext, ) -> Result { let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?; @@ -98,6 +99,20 @@ impl HeatmapLayersDownloader { }, _ = cancel.cancelled() => { tracing::info!("Heatmap layers download cancelled"); + return; + } + } + + if recurse { + if let Some(ancestor) = timeline.ancestor_timeline() { + let ctx = ctx.attached_child(); + let res = + ancestor.start_heatmap_layers_download(concurrency, recurse, &ctx); + if let Err(err) = res { + tracing::info!( + "Failed to start heatmap layers download for ancestor: {err}" + ); + } } } } @@ -140,14 +155,20 @@ impl HeatmapLayersDownloader { } impl Timeline { - pub(crate) async fn start_heatmap_layers_download( + pub(crate) fn start_heatmap_layers_download( self: &Arc, concurrency: usize, + recurse: bool, ctx: &RequestContext, ) -> Result<(), ApiError> { let mut locked = self.heatmap_layers_downloader.lock().unwrap(); if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) { - let dl = HeatmapLayersDownloader::new(self.clone(), concurrency, ctx.attached_child())?; + let dl = HeatmapLayersDownloader::new( + self.clone(), + concurrency, + recurse, + ctx.attached_child(), + )?; *locked = Some(dl); Ok(()) } else { diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index de4d45adbe..64f0be3c23 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -524,9 +524,10 @@ async fn handle_tenant_timeline_download_heatmap_layers( let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; let concurrency: Option = parse_query_param(&req, "concurrency")?; + let recurse = parse_query_param(&req, "recurse")?.unwrap_or(false); service - .tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency) + .tenant_timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency, recurse) .await?; json_response(StatusCode::OK, ()) diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index e9c54414a3..d6127c355a 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -281,13 +281,19 @@ impl PageserverClient { tenant_shard_id: TenantShardId, timeline_id: TimelineId, concurrency: Option, + recurse: bool, ) -> Result<()> { measured_request!( "download_heatmap_layers", crate::metrics::Method::Post, &self.node_id_label, self.inner - .timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency) + .timeline_download_heatmap_layers( + tenant_shard_id, + timeline_id, + concurrency, + recurse + ) .await ) } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 91ce4b83e0..9ba9504718 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3774,6 +3774,7 @@ impl Service { tenant_shard_id: TenantShardId, timeline_id: TimelineId, concurrency: Option, + recurse: bool, ) -> Result<(), ApiError> { let _tenant_lock = trace_shared_lock( &self.tenant_op_locks, @@ -3811,7 +3812,12 @@ impl Service { targets, |tenant_shard_id, client| async move { client - .timeline_download_heatmap_layers(tenant_shard_id, timeline_id, concurrency) + .timeline_download_heatmap_layers( + tenant_shard_id, + timeline_id, + concurrency, + recurse, + ) .await }, 1, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5159ad4e3b..73c8406237 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2469,12 +2469,21 @@ class NeonStorageController(MetricsGetter, LogUtils): response.raise_for_status() return [TenantShardId.parse(tid) for tid in response.json()["updated"]] - def download_heatmap_layers(self, tenant_shard_id: TenantShardId, timeline_id: TimelineId): + def download_heatmap_layers( + self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, recurse: bool | None = None + ): + url = ( + f"{self.api}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers" + ) + if recurse is not None: + url = url + f"?recurse={str(recurse).lower()}" + response = self.request( "POST", - f"{self.api}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers", + url, headers=self.headers(TokenScope.ADMIN), ) + response.raise_for_status() def __enter__(self) -> Self: diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index a9b897b741..ab0f00db1c 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -938,9 +938,12 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): # Expect lots of layers assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10 - # Simulate large data by making layer downloads artifically slow for ps in env.pageservers: + # Simulate large data by making layer downloads artifically slow ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")]) + # Make the initial logical size calculation lie. Otherwise it on demand downloads + # layers and makes accounting difficult. + ps.http_client().configure_failpoints(("skip-logical-size-calculation", "return")) def timeline_heatmap(tlid): assert env.pageserver_remote_storage is not None @@ -952,21 +955,6 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): raise RuntimeError(f"No heatmap for timeline: {tlid}") - # Upload a heatmap, so that secondaries have something to download - ps_attached.http_client().tenant_heatmap_upload(tenant_id) - heatmap_before_migration = timeline_heatmap(timeline_id) - - # This has no chance to succeed: we have lots of layers and each one takes at least 1000ms. - # However, it pulls the heatmap, which will be important later. - http_client = env.storage_controller.pageserver_api() - (status, progress) = http_client.tenant_secondary_download(tenant_id, wait_ms=4000) - assert status == 202 - assert progress["heatmap_mtime"] is not None - assert progress["layers_downloaded"] > 0 - assert progress["bytes_downloaded"] > 0 - assert progress["layers_total"] > progress["layers_downloaded"] - assert progress["bytes_total"] > progress["bytes_downloaded"] - env.storage_controller.allowed_errors.extend( [ ".*Timed out.*downloading layers.*", @@ -975,6 +963,7 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): # Use a custom configuration that gives up earlier than usual. # We can't hydrate everything anyway because of the failpoints. + # Implicitly, this also uploads a heatmap from the current attached location. config = StorageControllerMigrationConfig( secondary_warmup_timeout="5s", secondary_download_request_timeout="2s" ) @@ -988,22 +977,17 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): ps_secondary.http_client().tenant_heatmap_upload(tenant_id) heatmap_after_migration = timeline_heatmap(timeline_id) - assert len(heatmap_before_migration["layers"]) > 0 + local_layers = ps_secondary.list_layers(tenant_id, timeline_id) + # We download 1 layer per second and give up within 5 seconds. + assert len(local_layers) < 10 after_migration_heatmap_layers_count = len(heatmap_after_migration["layers"]) - assert len(heatmap_before_migration["layers"]) <= after_migration_heatmap_layers_count - log.info(f"Heatmap size after cold migration is {after_migration_heatmap_layers_count}") env.storage_controller.download_heatmap_layers( TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id ) - # Now simulate the case where a child timeline is archived, parent layers - # are evicted and the child is unarchived. When the child is unarchived, - # itself and the parent update their heatmaps to contain layers needed by the - # child. One can warm up the timeline hierarchy since the heatmaps are ready. - def all_layers_downloaded(expected_layer_count: int): local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id)) @@ -1011,8 +995,9 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): assert local_layers_count >= expected_layer_count wait_until(lambda: all_layers_downloaded(after_migration_heatmap_layers_count)) - ps_secondary.http_client().tenant_heatmap_upload(tenant_id) + # Read everything and make sure that we're not downloading anything extra. + # All hot layers should be available locally now. before = ( ps_secondary.http_client() .get_metrics() @@ -1030,6 +1015,11 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): workload.stop() assert before == after + # Now simulate the case where a child timeline is archived, parent layers + # are evicted and the child is unarchived. When the child is unarchived, + # itself and the parent update their heatmaps to contain layers needed by the + # child. One can warm up the timeline hierarchy since the heatmaps are ready. + def check_archival_state(state: TimelineArchivalState, tline): timelines = ( timeline["timeline_id"] @@ -1064,6 +1054,6 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): assert expected_locally > 0 env.storage_controller.download_heatmap_layers( - TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id + TenantShardId(tenant_id, shard_number=0, shard_count=0), child_timeline_id, recurse=True ) wait_until(lambda: all_layers_downloaded(expected_locally))