safekeeper: add walreceiver metrics (#9450)

## Problem

We don't have any observability for Safekeeper WAL receiver queues.

## Summary of changes

Adds a few WAL receiver metrics:

* `safekeeper_wal_receivers`: gauge of currently connected WAL
receivers.
* `safekeeper_wal_receiver_queue_depth`: histogram of queue depths per
receiver, sampled every 5 seconds.
* `safekeeper_wal_receiver_queue_depth_total`: gauge of total queued
messages across all receivers.
* `safekeeper_wal_receiver_queue_size_total`: gauge of total queued
message sizes across all receivers.

There are already metrics for ingested WAL volume: `written_wal_bytes`
counter per timeline, and `safekeeper_write_wal_bytes` per-request
histogram.
This commit is contained in:
Erik Grinaker
2024-11-04 16:22:46 +01:00
committed by GitHub
parent 8ad1dbce72
commit 0d5a512825
4 changed files with 234 additions and 9 deletions

View File

@@ -110,6 +110,23 @@ static MAXRSS_KB: Lazy<IntGauge> = Lazy::new(|| {
pub const DISK_FSYNC_SECONDS_BUCKETS: &[f64] =
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0];
/// Constructs histogram buckets that are powers of two starting at 1 (i.e. 2^0), covering the end
/// points. For example, passing start=5,end=20 yields 4,8,16,32 as does start=4,end=32.
pub fn pow2_buckets(start: usize, end: usize) -> Vec<f64> {
assert_ne!(start, 0);
assert!(start <= end);
let start = match start.checked_next_power_of_two() {
Some(n) if n == start => n, // start already power of two
Some(n) => n >> 1, // power of two below start
None => panic!("start too large"),
};
let end = end.checked_next_power_of_two().expect("end too large");
std::iter::successors(Some(start), |n| n.checked_mul(2))
.take_while(|n| n <= &end)
.map(|n| n as f64)
.collect()
}
pub struct BuildInfo {
pub revision: &'static str,
pub build_tag: &'static str,
@@ -595,3 +612,67 @@ where
self.dec.collect_into(metadata, labels, name, &mut enc.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
const POW2_BUCKETS_MAX: usize = 1 << (usize::BITS - 1);
#[test]
fn pow2_buckets_cases() {
assert_eq!(pow2_buckets(1, 1), vec![1.0]);
assert_eq!(pow2_buckets(1, 2), vec![1.0, 2.0]);
assert_eq!(pow2_buckets(1, 3), vec![1.0, 2.0, 4.0]);
assert_eq!(pow2_buckets(1, 4), vec![1.0, 2.0, 4.0]);
assert_eq!(pow2_buckets(1, 5), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(1, 6), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(1, 7), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(1, 8), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(
pow2_buckets(1, 200),
vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0]
);
assert_eq!(pow2_buckets(1, 8), vec![1.0, 2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(2, 8), vec![2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(3, 8), vec![2.0, 4.0, 8.0]);
assert_eq!(pow2_buckets(4, 8), vec![4.0, 8.0]);
assert_eq!(pow2_buckets(5, 8), vec![4.0, 8.0]);
assert_eq!(pow2_buckets(6, 8), vec![4.0, 8.0]);
assert_eq!(pow2_buckets(7, 8), vec![4.0, 8.0]);
assert_eq!(pow2_buckets(8, 8), vec![8.0]);
assert_eq!(pow2_buckets(20, 200), vec![16.0, 32.0, 64.0, 128.0, 256.0]);
// Largest valid values.
assert_eq!(
pow2_buckets(1, POW2_BUCKETS_MAX).len(),
usize::BITS as usize
);
assert_eq!(pow2_buckets(POW2_BUCKETS_MAX, POW2_BUCKETS_MAX).len(), 1);
}
#[test]
#[should_panic]
fn pow2_buckets_zero_start() {
pow2_buckets(0, 1);
}
#[test]
#[should_panic]
fn pow2_buckets_end_lt_start() {
pow2_buckets(2, 1);
}
#[test]
#[should_panic]
fn pow2_buckets_end_overflow_min() {
pow2_buckets(1, POW2_BUCKETS_MAX + 1);
}
#[test]
#[should_panic]
fn pow2_buckets_end_overflow_max() {
pow2_buckets(1, usize::MAX);
}
}

View File

@@ -5,23 +5,23 @@ use std::{
time::{Instant, SystemTime},
};
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_FSYNC_SECONDS_BUCKETS};
use anyhow::Result;
use futures::Future;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
pow2_buckets,
proto::MetricFamily,
register_histogram_vec, register_int_counter, register_int_counter_pair,
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge,
HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, IntGaugeVec,
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair,
register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, Gauge, GaugeVec,
Histogram, HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec,
IntGauge, IntGaugeVec, DISK_FSYNC_SECONDS_BUCKETS,
};
use once_cell::sync::Lazy;
use postgres_ffi::XLogSegNo;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::TenantTimelineId, lsn::Lsn};
use utils::{id::TenantTimelineId, lsn::Lsn, pageserver_feedback::PageserverFeedback};
use crate::{
receive_wal::MSG_QUEUE_SIZE,
state::{TimelineMemState, TimelinePersistentState},
GlobalTimelines,
};
@@ -204,6 +204,44 @@ pub static WAL_BACKUP_TASKS: Lazy<IntCounterPair> = Lazy::new(|| {
)
.expect("Failed to register safekeeper_wal_backup_tasks_finished_total counter")
});
pub static WAL_RECEIVERS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"safekeeper_wal_receivers",
"Number of currently connected WAL receivers (i.e. connected computes)"
)
.expect("Failed to register safekeeper_wal_receivers")
});
pub static WAL_RECEIVER_QUEUE_DEPTH: Lazy<Histogram> = Lazy::new(|| {
// Use powers of two buckets, but add a bucket at 0 and the max queue size to track empty and
// full queues respectively.
let mut buckets = pow2_buckets(1, MSG_QUEUE_SIZE);
buckets.insert(0, 0.0);
buckets.insert(buckets.len() - 1, (MSG_QUEUE_SIZE - 1) as f64);
assert!(buckets.len() <= 12, "too many histogram buckets");
register_histogram!(
"safekeeper_wal_receiver_queue_depth",
"Number of queued messages per WAL receiver (sampled every 5 seconds)",
buckets
)
.expect("Failed to register safekeeper_wal_receiver_queue_depth histogram")
});
pub static WAL_RECEIVER_QUEUE_DEPTH_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"safekeeper_wal_receiver_queue_depth_total",
"Total number of queued messages across all WAL receivers",
)
.expect("Failed to register safekeeper_wal_receiver_queue_depth_total gauge")
});
// TODO: consider adding a per-receiver queue_size histogram. This will require wrapping the Tokio
// MPSC channel to update counters on send, receive, and drop, while forwarding all other methods.
pub static WAL_RECEIVER_QUEUE_SIZE_TOTAL: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"safekeeper_wal_receiver_queue_size_total",
"Total memory byte size of queued messages across all WAL receivers",
)
.expect("Failed to register safekeeper_wal_receiver_queue_size_total gauge")
});
// Metrics collected on operations on the storage repository.
#[derive(strum_macros::EnumString, strum_macros::Display, strum_macros::IntoStaticStr)]

View File

@@ -3,6 +3,10 @@
//! sends replies back.
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::{
WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
WAL_RECEIVER_QUEUE_SIZE_TOTAL,
};
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::safekeeper::ServerInfo;
@@ -86,6 +90,7 @@ impl WalReceivers {
};
self.update_num(&shared);
WAL_RECEIVERS.inc();
WalReceiverGuard {
id: pos,
@@ -144,6 +149,7 @@ impl WalReceivers {
let mut shared = self.mutex.lock();
shared.slots[id] = None;
self.update_num(&shared);
WAL_RECEIVERS.dec();
}
/// Broadcast pageserver feedback to connected walproposers.
@@ -390,6 +396,7 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
loop {
let started = Instant::now();
let size = next_msg.size();
match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
Ok(()) => {}
// Slow send, log a message and keep trying. Log context has timeline ID.
@@ -409,6 +416,11 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
// WalAcceptor terminated.
Err(SendTimeoutError::Closed(_)) => return Ok(()),
}
// Update metrics. Will be decremented in WalAcceptor.
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);
next_msg = read_message(pgb_reader).await?;
}
}
@@ -466,6 +478,12 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
/// walproposer, even when it's writing a steady stream of messages.
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
/// The metrics computation interval.
///
/// The Prometheus poll interval is 60 seconds at the time of writing. We sample the queue depth
/// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
const METRICS_INTERVAL: Duration = Duration::from_secs(5);
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx.
///
@@ -512,12 +530,15 @@ impl WalAcceptor {
async fn run(&mut self) -> anyhow::Result<()> {
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
// Periodically flush the WAL.
// Periodically flush the WAL and compute metrics.
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
flush_ticker.tick().await; // skip the initial, immediate tick
// Tracks unflushed appends.
let mut metrics_ticker = tokio::time::interval(METRICS_INTERVAL);
metrics_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
// Tracks whether we have unflushed appends.
let mut dirty = false;
loop {
@@ -529,6 +550,10 @@ impl WalAcceptor {
break;
};
// Update gauge metrics.
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &msg {
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
@@ -565,6 +590,12 @@ impl WalAcceptor {
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
}
// Update histogram metrics periodically.
_ = metrics_ticker.tick() => {
WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64);
None // no reply
}
};
// Send reply, if any.
@@ -585,3 +616,14 @@ impl WalAcceptor {
Ok(())
}
}
/// On drop, drain msg_rx and update metrics to avoid leaks.
impl Drop for WalAcceptor {
fn drop(&mut self) {
self.msg_rx.close(); // prevent further sends
while let Ok(msg) = self.msg_rx.try_recv() {
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
}
}
}

View File

@@ -422,6 +422,70 @@ impl ProposerAcceptorMessage {
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
}
/// The memory size of the message, including byte slices.
pub fn size(&self) -> usize {
const BASE_SIZE: usize = std::mem::size_of::<ProposerAcceptorMessage>();
// For most types, the size is just the base enum size including the nested structs. Some
// types also contain byte slices; add them.
//
// We explicitly list all fields, to draw attention here when new fields are added.
let mut size = BASE_SIZE;
size += match self {
Self::Greeting(ProposerGreeting {
protocol_version: _,
pg_version: _,
proposer_id: _,
system_id: _,
timeline_id: _,
tenant_id: _,
tli: _,
wal_seg_size: _,
}) => 0,
Self::VoteRequest(VoteRequest { term: _ }) => 0,
Self::Elected(ProposerElected {
term: _,
start_streaming_at: _,
term_history: _,
timeline_start_lsn: _,
}) => 0,
Self::AppendRequest(AppendRequest {
h:
AppendRequestHeader {
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),
Self::NoFlushAppendRequest(AppendRequest {
h:
AppendRequestHeader {
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),
Self::FlushWAL => 0,
};
size
}
}
/// Acceptor -> Proposer messages