mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
pageserver: add metric for number of wal records received by each shard (#10035)
## Problem With the current metrics we can't identify which shards are ingesting data at any given time. ## Summary of changes Add a metric for the number of wal records received for processing by each shard. This is per (tenant, timeline, shard).
This commit is contained in:
@@ -2204,6 +2204,15 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
|
||||
.expect("failed to define a metric"),
|
||||
});
|
||||
|
||||
pub(crate) static PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_timeline_wal_records_received",
|
||||
"Number of WAL records received per shard",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wal_redo_seconds",
|
||||
@@ -2431,6 +2440,7 @@ pub(crate) struct TimelineMetrics {
|
||||
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
|
||||
/// Number of valid LSN leases.
|
||||
pub valid_lsn_lease_count_gauge: UIntGauge,
|
||||
pub wal_records_received: IntCounter,
|
||||
shutdown: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
@@ -2588,6 +2598,10 @@ impl TimelineMetrics {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let wal_records_received = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
shard_id,
|
||||
@@ -2620,6 +2634,7 @@ impl TimelineMetrics {
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
valid_lsn_lease_count_gauge,
|
||||
wal_records_received,
|
||||
shutdown: std::sync::atomic::AtomicBool::default(),
|
||||
}
|
||||
}
|
||||
@@ -2757,6 +2772,11 @@ impl TimelineMetrics {
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
let _ = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED.remove_label_values(&[
|
||||
tenant_id,
|
||||
shard_id,
|
||||
timeline_id,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -369,6 +369,13 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
let mut modification = timeline.begin_modification(Lsn(0));
|
||||
|
||||
if !records.is_empty() {
|
||||
timeline
|
||||
.metrics
|
||||
.wal_records_received
|
||||
.inc_by(records.len() as u64);
|
||||
}
|
||||
|
||||
for interpreted in records {
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
@@ -510,6 +517,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
|
||||
// Ingest the records without immediately committing them.
|
||||
timeline.metrics.wal_records_received.inc();
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
|
||||
@@ -175,6 +175,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
counter("pageserver_tenant_throttling_count_accounted_finish"),
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
||||
counter("pageserver_tenant_throttling_count"),
|
||||
counter("pageserver_timeline_wal_records_received"),
|
||||
*histogram("pageserver_page_service_batch_size"),
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
# "pageserver_directory_entries_count", -- only used if above a certain threshold
|
||||
|
||||
Reference in New Issue
Block a user