From 65e2aae6e469d4c77e72fdf1a8b97135ac6e45f1 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 4 Apr 2025 11:52:59 +0100 Subject: [PATCH] pageserver/secondary: deregister IO metrics (#11283) ## Problem IO metrics for secondary locations do not get deregistered when the timeline is removed. ## Summary of changes Stash the request context to be used for downloads in `SecondaryTimelineDetail`. These objects match the lifetime of the secondary timeline location pretty well. When the timeline is removed, deregister the metrics too. Closes https://github.com/neondatabase/neon/issues/11156 --- pageserver/src/metrics.rs | 6 +- pageserver/src/tenant/secondary.rs | 7 ++ pageserver/src/tenant/secondary/downloader.rs | 116 +++++++++++++++--- .../regress/test_pageserver_secondary.py | 67 ++++++++++ 4 files changed, 177 insertions(+), 19 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 9820d50e7b..d8497288ca 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1248,13 +1248,13 @@ pub(crate) static STORAGE_IO_TIME_METRIC: Lazy = Lazy::new(Storag #[derive(Clone, Copy)] #[repr(usize)] -enum StorageIoSizeOperation { +pub(crate) enum StorageIoSizeOperation { Read, Write, } impl StorageIoSizeOperation { - const VARIANTS: &'static [&'static str] = &["read", "write"]; + pub(crate) const VARIANTS: &'static [&'static str] = &["read", "write"]; fn as_str(&self) -> &'static str { Self::VARIANTS[*self as usize] @@ -1262,7 +1262,7 @@ impl StorageIoSizeOperation { } // Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1 -static STORAGE_IO_SIZE: Lazy = Lazy::new(|| { +pub(crate) static STORAGE_IO_SIZE: Lazy = Lazy::new(|| { register_uint_gauge_vec!( "pageserver_io_operations_bytes_total", "Total amount of bytes read/written in IO operations", diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index a378961620..2fa0ed9be9 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -167,10 +167,17 @@ impl SecondaryTenant { self.validate_metrics(); + // Metrics are subtracted from and/or removed eagerly. + // Deletions are done in the background via [`BackgroundPurges::spawn`]. let tenant_id = self.tenant_shard_id.tenant_id.to_string(); let shard_id = format!("{}", self.tenant_shard_id.shard_slug()); let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); + + self.detail + .lock() + .unwrap() + .drain_timelines(&self.tenant_shard_id, &self.resident_size_metric); } pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) { diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 1cf0241631..60cf7ac79e 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -4,6 +4,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; +use crate::metrics::{STORAGE_IO_SIZE, StorageIoSizeOperation}; use camino::Utf8PathBuf; use chrono::format::{DelayedFormat, StrftimeItems}; use futures::Future; @@ -124,15 +125,53 @@ impl OnDiskState { } } -#[derive(Debug, Clone, Default)] pub(super) struct SecondaryDetailTimeline { on_disk_layers: HashMap, /// We remember when layers were evicted, to prevent re-downloading them. pub(super) evicted_at: HashMap, + + ctx: RequestContext, +} + +impl Clone for SecondaryDetailTimeline { + fn clone(&self) -> Self { + Self { + on_disk_layers: self.on_disk_layers.clone(), + evicted_at: self.evicted_at.clone(), + // This is a bit awkward. The downloader code operates on a snapshot + // of the secondary list to avoid locking it for extended periods of time. + // No particularly strong reason to chose [`RequestContext::detached_child`], + // but makes more sense than [`RequestContext::attached_child`]. + ctx: self + .ctx + .detached_child(self.ctx.task_kind(), self.ctx.download_behavior()), + } + } +} + +impl std::fmt::Debug for SecondaryDetailTimeline { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SecondaryDetailTimeline") + .field("on_disk_layers", &self.on_disk_layers) + .field("evicted_at", &self.evicted_at) + .finish() + } } impl SecondaryDetailTimeline { + pub(super) fn empty(ctx: RequestContext) -> Self { + SecondaryDetailTimeline { + on_disk_layers: Default::default(), + evicted_at: Default::default(), + ctx, + } + } + + pub(super) fn context(&self) -> &RequestContext { + &self.ctx + } + pub(super) fn remove_layer( &mut self, name: &LayerName, @@ -258,18 +297,50 @@ impl SecondaryDetail { pub(super) fn remove_timeline( &mut self, + tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, resident_metric: &UIntGauge, ) { let removed = self.timelines.remove(timeline_id); if let Some(removed) = removed { - resident_metric.sub( - removed - .on_disk_layers - .values() - .map(|l| l.metadata.file_size) - .sum(), - ); + Self::clear_timeline_metrics(tenant_shard_id, timeline_id, removed, resident_metric); + } + } + + pub(super) fn drain_timelines( + &mut self, + tenant_shard_id: &TenantShardId, + resident_metric: &UIntGauge, + ) { + for (timeline_id, removed) in self.timelines.drain() { + Self::clear_timeline_metrics(tenant_shard_id, &timeline_id, removed, resident_metric); + } + } + + fn clear_timeline_metrics( + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + detail: SecondaryDetailTimeline, + resident_metric: &UIntGauge, + ) { + resident_metric.sub( + detail + .on_disk_layers + .values() + .map(|l| l.metadata.file_size) + .sum(), + ); + + let shard_id = format!("{}", tenant_shard_id.shard_slug()); + let tenant_id = tenant_shard_id.tenant_id.to_string(); + let timeline_id = timeline_id.to_string(); + for op in StorageIoSizeOperation::VARIANTS { + let _ = STORAGE_IO_SIZE.remove_label_values(&[ + op, + tenant_id.as_str(), + shard_id.as_str(), + timeline_id.as_str(), + ]); } } @@ -727,6 +798,7 @@ impl<'a> TenantDownloader<'a> { last_heatmap, timeline, &self.secondary_state.resident_size_metric, + ctx, ) .await; @@ -774,7 +846,6 @@ impl<'a> TenantDownloader<'a> { // Download the layers in the heatmap for timeline in heatmap.timelines { - let ctx = &ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline.timeline_id); let timeline_state = timeline_states .remove(&timeline.timeline_id) .expect("Just populated above"); @@ -917,7 +988,11 @@ impl<'a> TenantDownloader<'a> { for delete_timeline in &delete_timelines { // We haven't removed from disk yet, but optimistically remove from in-memory state: if removal // from disk fails that will be a fatal error. - detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric); + detail.remove_timeline( + self.secondary_state.get_tenant_shard_id(), + delete_timeline, + &self.secondary_state.resident_size_metric, + ); } } @@ -1013,7 +1088,6 @@ impl<'a> TenantDownloader<'a> { timeline: HeatMapTimeline, timeline_state: SecondaryDetailTimeline, deadline: Instant, - ctx: &RequestContext, ) -> (Result<(), UpdateError>, Vec) { // Accumulate updates to the state let mut touched = Vec::new(); @@ -1044,7 +1118,12 @@ impl<'a> TenantDownloader<'a> { } match self - .download_layer(tenant_shard_id, &timeline_id, layer, ctx) + .download_layer( + tenant_shard_id, + &timeline_id, + layer, + timeline_state.context(), + ) .await { Ok(Some(layer)) => touched.push(layer), @@ -1155,13 +1234,16 @@ impl<'a> TenantDownloader<'a> { tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count()); let (result, touched) = self - .download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx) + .download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline) .await; // Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful { let mut detail = self.secondary_state.detail.lock().unwrap(); - let timeline_detail = detail.timelines.entry(timeline_id).or_default(); + let timeline_detail = detail.timelines.entry(timeline_id).or_insert_with(|| { + let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline_id); + SecondaryDetailTimeline::empty(ctx) + }); tracing::info!("Wrote timeline_detail for {} touched layers", touched.len()); touched.into_iter().for_each(|t| { @@ -1295,10 +1377,12 @@ async fn init_timeline_state( last_heatmap: Option<&HeatMapTimeline>, heatmap: &HeatMapTimeline, resident_metric: &UIntGauge, + ctx: &RequestContext, ) -> SecondaryDetailTimeline { - let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id); - let mut detail = SecondaryDetailTimeline::default(); + let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &heatmap.timeline_id); + let mut detail = SecondaryDetailTimeline::empty(ctx); + let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id); let mut dir = match tokio::fs::read_dir(&timeline_path).await { Ok(d) => d, Err(e) => { diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index 3749df2229..c73a592d98 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -1099,3 +1099,70 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): # Warm up the current secondary. ps_attached.http_client().tenant_secondary_download(tenant_id, wait_ms=100) wait_until(lambda: all_layers_downloaded(ps_secondary, expected_locally)) + + +@run_only_on_default_postgres("PG version is not interesting here") +@pytest.mark.parametrize("action", ["delete_timeline", "detach"]) +def test_io_metrics_match_secondary_timeline_lifecycle( + neon_env_builder: NeonEnvBuilder, action: str +): + """ + Check that IO metrics for secondary timelines are de-registered when the timeline + is removed + """ + neon_env_builder.num_pageservers = 2 + env = neon_env_builder.init_configs() + env.start() + + tenant_id = TenantId.generate() + parent_timeline_id = TimelineId.generate() + + # We do heatmap uploads and pulls manually + tenant_conf = {"heatmap_period": "0s"} + env.create_tenant( + tenant_id, parent_timeline_id, conf=tenant_conf, placement_policy='{"Attached":1}' + ) + + child_timeline_id = env.create_branch("foo", tenant_id) + + attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"] + ps_attached = env.get_pageserver(attached_to_id) + ps_secondary = next(p for p in env.pageservers if p != ps_attached) + + ps_attached.http_client().tenant_heatmap_upload(tenant_id) + status, _ = ps_secondary.http_client().tenant_secondary_download(tenant_id, wait_ms=5000) + assert status == 200 + + labels = { + "operation": "write", + "tenant_id": str(tenant_id), + "timeline_id": str(child_timeline_id), + } + bytes_written = ( + ps_secondary.http_client() + .get_metrics() + .query_one("pageserver_io_operations_bytes_total", labels) + .value + ) + + assert bytes_written == 0 + + if action == "delete_timeline": + env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id) + ps_attached.http_client().tenant_heatmap_upload(tenant_id) + status, _ = ps_secondary.http_client().tenant_secondary_download(tenant_id, wait_ms=5000) + assert status == 200 + elif action == "detach": + env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 0}}) + env.storage_controller.reconcile_until_idle() + else: + raise Exception("Unexpected action") + + assert ( + len( + ps_secondary.http_client() + .get_metrics() + .query_all("pageserver_io_operations_bytes_total", labels) + ) + == 0 + )