From cc70fc802d2107122b330dba6ce8e2d8f8799189 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 6 Dec 2024 12:51:41 +0000 Subject: [PATCH] pageserver: add metric for number of wal records received by each shard (#10035) ## Problem With the current metrics we can't identify which shards are ingesting data at any given time. ## Summary of changes Add a metric for the number of wal records received for processing by each shard. This is per (tenant, timeline, shard). --- pageserver/src/metrics.rs | 20 +++++++++++++++++++ .../walreceiver/walreceiver_connection.rs | 8 ++++++++ test_runner/fixtures/metrics.py | 1 + 3 files changed, 29 insertions(+) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index e3b6f43bc4..62bf9acf01 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2204,6 +2204,15 @@ pub(crate) static WAL_INGEST: Lazy = Lazy::new(|| WalIngestMet .expect("failed to define a metric"), }); +pub(crate) static PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_timeline_wal_records_received", + "Number of WAL records received per shard", + &["tenant_id", "shard_id", "timeline_id"] + ) + .expect("failed to define a metric") +}); + pub(crate) static WAL_REDO_TIME: Lazy = Lazy::new(|| { register_histogram!( "pageserver_wal_redo_seconds", @@ -2431,6 +2440,7 @@ pub(crate) struct TimelineMetrics { pub evictions_with_low_residence_duration: std::sync::RwLock, /// Number of valid LSN leases. pub valid_lsn_lease_count_gauge: UIntGauge, + pub wal_records_received: IntCounter, shutdown: std::sync::atomic::AtomicBool, } @@ -2588,6 +2598,10 @@ impl TimelineMetrics { .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) .unwrap(); + let wal_records_received = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED + .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) + .unwrap(); + TimelineMetrics { tenant_id, shard_id, @@ -2620,6 +2634,7 @@ impl TimelineMetrics { evictions_with_low_residence_duration, ), valid_lsn_lease_count_gauge, + wal_records_received, shutdown: std::sync::atomic::AtomicBool::default(), } } @@ -2757,6 +2772,11 @@ impl TimelineMetrics { shard_id, timeline_id, ]); + let _ = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED.remove_label_values(&[ + tenant_id, + shard_id, + timeline_id, + ]); } } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index d90ffbfa2c..3f10eeda60 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -369,6 +369,13 @@ pub(super) async fn handle_walreceiver_connection( // advances it to its end LSN. 0 is just an initialization placeholder. let mut modification = timeline.begin_modification(Lsn(0)); + if !records.is_empty() { + timeline + .metrics + .wal_records_received + .inc_by(records.len() as u64); + } + for interpreted in records { if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) && uncommitted_records > 0 @@ -510,6 +517,7 @@ pub(super) async fn handle_walreceiver_connection( } // Ingest the records without immediately committing them. + timeline.metrics.wal_records_received.inc(); let ingested = walingest .ingest_record(interpreted, &mut modification, &ctx) .await diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index 1278ed1aef..52ed7da36b 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -175,6 +175,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = ( counter("pageserver_tenant_throttling_count_accounted_finish"), counter("pageserver_tenant_throttling_wait_usecs_sum"), counter("pageserver_tenant_throttling_count"), + counter("pageserver_timeline_wal_records_received"), *histogram("pageserver_page_service_batch_size"), *PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS, # "pageserver_directory_entries_count", -- only used if above a certain threshold