Implement timeline-based metrics in safekeeper (#1823)

Now there's timelines metrics collector, which goes through all timelines and reports metrics only for active ones
This commit is contained in:
Arthur Petukhovsky
2022-05-31 11:10:50 +03:00
committed by GitHub
parent 36281e3b47
commit c3e0b6c839
7 changed files with 384 additions and 49 deletions

View File

@@ -3,6 +3,7 @@
//! Otherwise, we might not see all metrics registered via
//! a default registry.
use lazy_static::lazy_static;
pub use prometheus::{core, default_registry, proto};
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_gauge, Gauge};
pub use prometheus::{register_gauge_vec, GaugeVec};

View File

@@ -264,6 +264,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
}
}
// Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset.
let registry = metrics::default_registry();
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
registry.register(Box::new(timeline_collector))?;
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel();

View File

@@ -14,6 +14,7 @@ pub mod control_file_upgrade;
pub mod handler;
pub mod http;
pub mod json_ctrl;
pub mod metrics;
pub mod receive_wal;
pub mod remove_wal;
pub mod safekeeper;

336
safekeeper/src/metrics.rs Normal file
View File

@@ -0,0 +1,336 @@
//! This module exports metrics for all active timelines.
use std::time::{Instant, SystemTime};
use metrics::{
core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts},
proto::MetricFamily,
Gauge, IntGaugeVec,
};
use postgres_ffi::xlog_utils::XLogSegNo;
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
use crate::{
safekeeper::{SafeKeeperState, SafekeeperMemState},
timeline::{GlobalTimelines, ReplicaState},
};
pub struct FullTimelineInfo {
pub zttid: ZTenantTimelineId,
pub replicas: Vec<ReplicaState>,
pub wal_backup_active: bool,
pub timeline_is_active: bool,
pub num_computes: u32,
pub last_removed_segno: XLogSegNo,
pub epoch_start_lsn: Lsn,
pub mem_state: SafekeeperMemState,
pub persisted_state: SafeKeeperState,
pub flush_lsn: Lsn,
}
pub struct TimelineCollector {
descs: Vec<Desc>,
commit_lsn: GenericGaugeVec<AtomicU64>,
backup_lsn: GenericGaugeVec<AtomicU64>,
flush_lsn: GenericGaugeVec<AtomicU64>,
epoch_start_lsn: GenericGaugeVec<AtomicU64>,
peer_horizon_lsn: GenericGaugeVec<AtomicU64>,
remote_consistent_lsn: GenericGaugeVec<AtomicU64>,
feedback_ps_write_lsn: GenericGaugeVec<AtomicU64>,
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
timeline_active: GenericGaugeVec<AtomicU64>,
wal_backup_active: GenericGaugeVec<AtomicU64>,
connected_computes: IntGaugeVec,
disk_usage: GenericGaugeVec<AtomicU64>,
acceptor_term: GenericGaugeVec<AtomicU64>,
collect_timeline_metrics: Gauge,
}
impl Default for TimelineCollector {
fn default() -> Self {
Self::new()
}
}
impl TimelineCollector {
pub fn new() -> TimelineCollector {
let mut descs = Vec::new();
let commit_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_commit_lsn",
"Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(commit_lsn.desc().into_iter().cloned());
let backup_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_backup_lsn",
"Current backup_lsn, up to which WAL is backed up, grouped by timeline",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(backup_lsn.desc().into_iter().cloned());
let flush_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_flush_lsn",
"Current flush_lsn, grouped by timeline",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(flush_lsn.desc().into_iter().cloned());
let epoch_start_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_epoch_start_lsn",
"Point since which compute generates new WAL in the current consensus term",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(epoch_start_lsn.desc().into_iter().cloned());
let peer_horizon_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_peer_horizon_lsn",
"LSN of the most lagging safekeeper",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(peer_horizon_lsn.desc().into_iter().cloned());
let remote_consistent_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_remote_consistent_lsn",
"LSN which is persisted to the remote storage in pageserver",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(remote_consistent_lsn.desc().into_iter().cloned());
let feedback_ps_write_lsn = GenericGaugeVec::new(
Opts::new(
"safekeeper_feedback_ps_write_lsn",
"Last LSN received by the pageserver, acknowledged in the feedback",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(feedback_ps_write_lsn.desc().into_iter().cloned());
let feedback_last_time_seconds = GenericGaugeVec::new(
Opts::new(
"safekeeper_feedback_last_time_seconds",
"Timestamp of the last feedback from the pageserver",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(feedback_last_time_seconds.desc().into_iter().cloned());
let timeline_active = GenericGaugeVec::new(
Opts::new(
"safekeeper_timeline_active",
"Reports 1 for active timelines, 0 for inactive",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(timeline_active.desc().into_iter().cloned());
let wal_backup_active = GenericGaugeVec::new(
Opts::new(
"safekeeper_wal_backup_active",
"Reports 1 for timelines with active WAL backup, 0 otherwise",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(wal_backup_active.desc().into_iter().cloned());
let connected_computes = IntGaugeVec::new(
Opts::new(
"safekeeper_connected_computes",
"Number of active compute connections",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(connected_computes.desc().into_iter().cloned());
let disk_usage = GenericGaugeVec::new(
Opts::new(
"safekeeper_disk_usage_bytes",
"Estimated disk space used to store WAL segments",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(disk_usage.desc().into_iter().cloned());
let acceptor_term = GenericGaugeVec::new(
Opts::new("safekeeper_acceptor_term", "Current consensus term"),
&["tenant_id", "timeline_id"],
)
.unwrap();
descs.extend(acceptor_term.desc().into_iter().cloned());
let collect_timeline_metrics = Gauge::new(
"safekeeper_collect_timeline_metrics_seconds",
"Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
)
.unwrap();
descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
TimelineCollector {
descs,
commit_lsn,
backup_lsn,
flush_lsn,
epoch_start_lsn,
peer_horizon_lsn,
remote_consistent_lsn,
feedback_ps_write_lsn,
feedback_last_time_seconds,
timeline_active,
wal_backup_active,
connected_computes,
disk_usage,
acceptor_term,
collect_timeline_metrics,
}
}
}
impl Collector for TimelineCollector {
fn desc(&self) -> Vec<&Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<MetricFamily> {
let start_collecting = Instant::now();
// reset all metrics to clean up inactive timelines
self.commit_lsn.reset();
self.backup_lsn.reset();
self.flush_lsn.reset();
self.epoch_start_lsn.reset();
self.peer_horizon_lsn.reset();
self.remote_consistent_lsn.reset();
self.feedback_ps_write_lsn.reset();
self.feedback_last_time_seconds.reset();
self.timeline_active.reset();
self.wal_backup_active.reset();
self.connected_computes.reset();
self.disk_usage.reset();
self.acceptor_term.reset();
let timelines = GlobalTimelines::active_timelines_metrics();
for tli in timelines {
let tenant_id = tli.zttid.tenant_id.to_string();
let timeline_id = tli.zttid.timeline_id.to_string();
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
let mut most_advanced: Option<utils::pq_proto::ZenithFeedback> = None;
for replica in tli.replicas.iter() {
if let Some(replica_feedback) = replica.zenith_feedback {
if let Some(current) = most_advanced {
if current.ps_writelsn < replica_feedback.ps_writelsn {
most_advanced = Some(replica_feedback);
}
} else {
most_advanced = Some(replica_feedback);
}
}
}
self.commit_lsn
.with_label_values(labels)
.set(tli.mem_state.commit_lsn.into());
self.backup_lsn
.with_label_values(labels)
.set(tli.mem_state.backup_lsn.into());
self.flush_lsn
.with_label_values(labels)
.set(tli.flush_lsn.into());
self.epoch_start_lsn
.with_label_values(labels)
.set(tli.epoch_start_lsn.into());
self.peer_horizon_lsn
.with_label_values(labels)
.set(tli.mem_state.peer_horizon_lsn.into());
self.remote_consistent_lsn
.with_label_values(labels)
.set(tli.mem_state.remote_consistent_lsn.into());
self.timeline_active
.with_label_values(labels)
.set(tli.timeline_is_active as u64);
self.wal_backup_active
.with_label_values(labels)
.set(tli.wal_backup_active as u64);
self.connected_computes
.with_label_values(labels)
.set(tli.num_computes as i64);
self.acceptor_term
.with_label_values(labels)
.set(tli.persisted_state.acceptor_state.term as u64);
if let Some(feedback) = most_advanced {
self.feedback_ps_write_lsn
.with_label_values(labels)
.set(feedback.ps_writelsn);
if let Ok(unix_time) = feedback.ps_replytime.duration_since(SystemTime::UNIX_EPOCH)
{
self.feedback_last_time_seconds
.with_label_values(labels)
.set(unix_time.as_secs());
}
}
if tli.last_removed_segno != 0 {
let segno_count = tli
.flush_lsn
.segment_number(tli.persisted_state.server.wal_seg_size as usize)
- tli.last_removed_segno;
let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64;
self.disk_usage
.with_label_values(labels)
.set(disk_usage_bytes);
}
}
// collect MetricFamilys.
let mut mfs = Vec::new();
mfs.extend(self.commit_lsn.collect());
mfs.extend(self.backup_lsn.collect());
mfs.extend(self.flush_lsn.collect());
mfs.extend(self.epoch_start_lsn.collect());
mfs.extend(self.peer_horizon_lsn.collect());
mfs.extend(self.remote_consistent_lsn.collect());
mfs.extend(self.feedback_ps_write_lsn.collect());
mfs.extend(self.feedback_last_time_seconds.collect());
mfs.extend(self.timeline_active.collect());
mfs.extend(self.wal_backup_active.collect());
mfs.extend(self.connected_computes.collect());
mfs.extend(self.disk_usage.collect());
mfs.extend(self.acceptor_term.collect());
// report time it took to collect all info
let elapsed = start_collecting.elapsed().as_secs_f64();
self.collect_timeline_metrics.set(elapsed);
mfs.extend(self.collect_timeline_metrics.collect());
mfs
}
}

View File

@@ -15,13 +15,10 @@ use std::fmt;
use std::io::Read;
use tracing::*;
use lazy_static::lazy_static;
use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use metrics::{register_gauge_vec, Gauge, GaugeVec};
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
use utils::{
bin_ser::LeSer,
@@ -487,45 +484,16 @@ impl AcceptorProposerMessage {
}
}
lazy_static! {
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
// i64 is faster than f64, so update to u64 when available.
static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_commit_lsn",
"Current commit_lsn (not necessarily persisted to disk), grouped by timeline",
&["tenant_id", "timeline_id"]
)
.expect("Failed to register safekeeper_commit_lsn gauge vec");
}
struct SafeKeeperMetrics {
commit_lsn: Gauge,
// WAL-related metrics are in WalStorageMetrics
}
impl SafeKeeperMetrics {
fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
Self {
commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]),
}
}
}
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
// Cached metrics so we don't have to recompute labels on each update.
metrics: SafeKeeperMetrics,
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
/// Note: be careful to set only if we are sure our WAL (term history) matches
/// committed one.
pub global_commit_lsn: Lsn,
/// LSN since the proposer safekeeper currently talking to appends WAL;
/// determines epoch switch point.
epoch_start_lsn: Lsn,
pub epoch_start_lsn: Lsn,
pub inmem: SafekeeperMemState, // in memory part
pub state: CTRL, // persistent state storage
@@ -555,7 +523,6 @@ where
wal_store.init_storage(&state)?;
Ok(SafeKeeper {
metrics: SafeKeeperMetrics::new(state.tenant_id, ztli),
global_commit_lsn: state.commit_lsn,
epoch_start_lsn: Lsn(0),
inmem: SafekeeperMemState {
@@ -757,7 +724,6 @@ where
// upgrade.
self.global_commit_lsn = max(self.global_commit_lsn, state.timeline_start_lsn);
self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn);
self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64);
// Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment.
self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn);
@@ -777,7 +743,6 @@ where
assert!(commit_lsn >= self.inmem.commit_lsn);
self.inmem.commit_lsn = commit_lsn;
self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64);
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this

View File

@@ -33,6 +33,7 @@ use crate::safekeeper::{
};
use crate::send_wal::HotStandbyFeedback;
use crate::metrics::FullTimelineInfo;
use crate::wal_storage;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::SafeKeeperConf;
@@ -450,6 +451,33 @@ impl Timeline {
shared_state.active
}
/// Returns full timeline info, required for the metrics.
/// If the timeline is not active, returns None instead.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
let shared_state = self.mutex.lock().unwrap();
if !shared_state.active {
return None;
}
Some(FullTimelineInfo {
zttid: self.zttid,
replicas: shared_state
.replicas
.iter()
.filter_map(|r| r.as_ref())
.copied()
.collect(),
wal_backup_active: shared_state.wal_backup_active,
timeline_is_active: shared_state.active,
num_computes: shared_state.num_computes,
last_removed_segno: shared_state.last_removed_segno,
epoch_start_lsn: shared_state.sk.epoch_start_lsn,
mem_state: shared_state.sk.inmem.clone(),
persisted_state: shared_state.sk.state.clone(),
flush_lsn: shared_state.sk.wal_store.flush_lsn(),
})
}
/// Timed wait for an LSN to be committed.
///
/// Returns the last committed LSN, which will be at least
@@ -777,6 +805,16 @@ impl GlobalTimelines {
.collect()
}
/// Return FullTimelineInfo for all active timelines.
pub fn active_timelines_metrics() -> Vec<FullTimelineInfo> {
let state = TIMELINES_STATE.lock().unwrap();
state
.timelines
.iter()
.filter_map(|(_, tli)| tli.info_for_metrics())
.collect()
}
fn delete_force_internal(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,

View File

@@ -31,20 +31,11 @@ use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
use postgres_ffi::waldecoder::WalStreamDecoder;
use metrics::{
register_gauge_vec, register_histogram_vec, Gauge, GaugeVec, Histogram, HistogramVec,
DISK_WRITE_SECONDS_BUCKETS,
};
use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
lazy_static! {
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
// i64 is faster than f64, so update to u64 when available.
static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!(
"safekeeper_flush_lsn",
"Current flush_lsn, grouped by timeline",
&["tenant_id", "timeline_id"]
)
.expect("Failed to register safekeeper_flush_lsn gauge vec");
static ref WRITE_WAL_BYTES: HistogramVec = register_histogram_vec!(
"safekeeper_write_wal_bytes",
"Bytes written to WAL in a single request, grouped by timeline",
@@ -69,7 +60,6 @@ lazy_static! {
}
struct WalStorageMetrics {
flush_lsn: Gauge,
write_wal_bytes: Histogram,
write_wal_seconds: Histogram,
flush_wal_seconds: Histogram,
@@ -80,7 +70,6 @@ impl WalStorageMetrics {
let tenant_id = zttid.tenant_id.to_string();
let timeline_id = zttid.timeline_id.to_string();
Self {
flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]),
write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]),
write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
flush_wal_seconds: FLUSH_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
@@ -171,7 +160,6 @@ impl PhysicalStorage {
/// Wrapper for flush_lsn updates that also updates metrics.
fn update_flush_lsn(&mut self) {
self.flush_record_lsn = self.write_record_lsn;
self.metrics.flush_lsn.set(self.flush_record_lsn.0 as f64);
}
/// Call fdatasync if config requires so.