From c3e0b6c839fa37bc9734a09dc8288d577557cb27 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Tue, 31 May 2022 11:10:50 +0300 Subject: [PATCH] Implement timeline-based metrics in safekeeper (#1823) Now there's timelines metrics collector, which goes through all timelines and reports metrics only for active ones --- libs/metrics/src/lib.rs | 1 + safekeeper/src/bin/safekeeper.rs | 6 + safekeeper/src/lib.rs | 1 + safekeeper/src/metrics.rs | 336 +++++++++++++++++++++++++++++++ safekeeper/src/safekeeper.rs | 37 +--- safekeeper/src/timeline.rs | 38 ++++ safekeeper/src/wal_storage.rs | 14 +- 7 files changed, 384 insertions(+), 49 deletions(-) create mode 100644 safekeeper/src/metrics.rs diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index 9929fc6d45..3b5da9f7ff 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -3,6 +3,7 @@ //! Otherwise, we might not see all metrics registered via //! a default registry. use lazy_static::lazy_static; +pub use prometheus::{core, default_registry, proto}; pub use prometheus::{exponential_buckets, linear_buckets}; pub use prometheus::{register_gauge, Gauge}; pub use prometheus::{register_gauge_vec, GaugeVec}; diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index e792a854d5..9feb984c4f 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -264,6 +264,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo } } + // Register metrics collector for active timelines. It's important to do this + // after daemonizing, otherwise process collector will be upset. + let registry = metrics::default_registry(); + let timeline_collector = safekeeper::metrics::TimelineCollector::new(); + registry.register(Box::new(timeline_collector))?; + let signals = signals::install_shutdown_handlers()?; let mut threads = vec![]; let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel(); diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index c092f5185b..1fae9b00f8 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -14,6 +14,7 @@ pub mod control_file_upgrade; pub mod handler; pub mod http; pub mod json_ctrl; +pub mod metrics; pub mod receive_wal; pub mod remove_wal; pub mod safekeeper; diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs new file mode 100644 index 0000000000..5a2e5f125f --- /dev/null +++ b/safekeeper/src/metrics.rs @@ -0,0 +1,336 @@ +//! This module exports metrics for all active timelines. + +use std::time::{Instant, SystemTime}; + +use metrics::{ + core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts}, + proto::MetricFamily, + Gauge, IntGaugeVec, +}; +use postgres_ffi::xlog_utils::XLogSegNo; +use utils::{lsn::Lsn, zid::ZTenantTimelineId}; + +use crate::{ + safekeeper::{SafeKeeperState, SafekeeperMemState}, + timeline::{GlobalTimelines, ReplicaState}, +}; + +pub struct FullTimelineInfo { + pub zttid: ZTenantTimelineId, + pub replicas: Vec, + pub wal_backup_active: bool, + pub timeline_is_active: bool, + pub num_computes: u32, + pub last_removed_segno: XLogSegNo, + + pub epoch_start_lsn: Lsn, + pub mem_state: SafekeeperMemState, + pub persisted_state: SafeKeeperState, + + pub flush_lsn: Lsn, +} + +pub struct TimelineCollector { + descs: Vec, + commit_lsn: GenericGaugeVec, + backup_lsn: GenericGaugeVec, + flush_lsn: GenericGaugeVec, + epoch_start_lsn: GenericGaugeVec, + peer_horizon_lsn: GenericGaugeVec, + remote_consistent_lsn: GenericGaugeVec, + feedback_ps_write_lsn: GenericGaugeVec, + feedback_last_time_seconds: GenericGaugeVec, + timeline_active: GenericGaugeVec, + wal_backup_active: GenericGaugeVec, + connected_computes: IntGaugeVec, + disk_usage: GenericGaugeVec, + acceptor_term: GenericGaugeVec, + collect_timeline_metrics: Gauge, +} + +impl Default for TimelineCollector { + fn default() -> Self { + Self::new() + } +} + +impl TimelineCollector { + pub fn new() -> TimelineCollector { + let mut descs = Vec::new(); + + let commit_lsn = GenericGaugeVec::new( + Opts::new( + "safekeeper_commit_lsn", + "Current commit_lsn (not necessarily persisted to disk), grouped by timeline", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(commit_lsn.desc().into_iter().cloned()); + + let backup_lsn = GenericGaugeVec::new( + Opts::new( + "safekeeper_backup_lsn", + "Current backup_lsn, up to which WAL is backed up, grouped by timeline", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(backup_lsn.desc().into_iter().cloned()); + + let flush_lsn = GenericGaugeVec::new( + Opts::new( + "safekeeper_flush_lsn", + "Current flush_lsn, grouped by timeline", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(flush_lsn.desc().into_iter().cloned()); + + let epoch_start_lsn = GenericGaugeVec::new( + Opts::new( + "safekeeper_epoch_start_lsn", + "Point since which compute generates new WAL in the current consensus term", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(epoch_start_lsn.desc().into_iter().cloned()); + + let peer_horizon_lsn = GenericGaugeVec::new( + Opts::new( + "safekeeper_peer_horizon_lsn", + "LSN of the most lagging safekeeper", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(peer_horizon_lsn.desc().into_iter().cloned()); + + let remote_consistent_lsn = GenericGaugeVec::new( + Opts::new( + "safekeeper_remote_consistent_lsn", + "LSN which is persisted to the remote storage in pageserver", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(remote_consistent_lsn.desc().into_iter().cloned()); + + let feedback_ps_write_lsn = GenericGaugeVec::new( + Opts::new( + "safekeeper_feedback_ps_write_lsn", + "Last LSN received by the pageserver, acknowledged in the feedback", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(feedback_ps_write_lsn.desc().into_iter().cloned()); + + let feedback_last_time_seconds = GenericGaugeVec::new( + Opts::new( + "safekeeper_feedback_last_time_seconds", + "Timestamp of the last feedback from the pageserver", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(feedback_last_time_seconds.desc().into_iter().cloned()); + + let timeline_active = GenericGaugeVec::new( + Opts::new( + "safekeeper_timeline_active", + "Reports 1 for active timelines, 0 for inactive", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(timeline_active.desc().into_iter().cloned()); + + let wal_backup_active = GenericGaugeVec::new( + Opts::new( + "safekeeper_wal_backup_active", + "Reports 1 for timelines with active WAL backup, 0 otherwise", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(wal_backup_active.desc().into_iter().cloned()); + + let connected_computes = IntGaugeVec::new( + Opts::new( + "safekeeper_connected_computes", + "Number of active compute connections", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(connected_computes.desc().into_iter().cloned()); + + let disk_usage = GenericGaugeVec::new( + Opts::new( + "safekeeper_disk_usage_bytes", + "Estimated disk space used to store WAL segments", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(disk_usage.desc().into_iter().cloned()); + + let acceptor_term = GenericGaugeVec::new( + Opts::new("safekeeper_acceptor_term", "Current consensus term"), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(acceptor_term.desc().into_iter().cloned()); + + let collect_timeline_metrics = Gauge::new( + "safekeeper_collect_timeline_metrics_seconds", + "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines", + ) + .unwrap(); + descs.extend(collect_timeline_metrics.desc().into_iter().cloned()); + + TimelineCollector { + descs, + commit_lsn, + backup_lsn, + flush_lsn, + epoch_start_lsn, + peer_horizon_lsn, + remote_consistent_lsn, + feedback_ps_write_lsn, + feedback_last_time_seconds, + timeline_active, + wal_backup_active, + connected_computes, + disk_usage, + acceptor_term, + collect_timeline_metrics, + } + } +} + +impl Collector for TimelineCollector { + fn desc(&self) -> Vec<&Desc> { + self.descs.iter().collect() + } + + fn collect(&self) -> Vec { + let start_collecting = Instant::now(); + + // reset all metrics to clean up inactive timelines + self.commit_lsn.reset(); + self.backup_lsn.reset(); + self.flush_lsn.reset(); + self.epoch_start_lsn.reset(); + self.peer_horizon_lsn.reset(); + self.remote_consistent_lsn.reset(); + self.feedback_ps_write_lsn.reset(); + self.feedback_last_time_seconds.reset(); + self.timeline_active.reset(); + self.wal_backup_active.reset(); + self.connected_computes.reset(); + self.disk_usage.reset(); + self.acceptor_term.reset(); + + let timelines = GlobalTimelines::active_timelines_metrics(); + + for tli in timelines { + let tenant_id = tli.zttid.tenant_id.to_string(); + let timeline_id = tli.zttid.timeline_id.to_string(); + let labels = &[tenant_id.as_str(), timeline_id.as_str()]; + + let mut most_advanced: Option = None; + for replica in tli.replicas.iter() { + if let Some(replica_feedback) = replica.zenith_feedback { + if let Some(current) = most_advanced { + if current.ps_writelsn < replica_feedback.ps_writelsn { + most_advanced = Some(replica_feedback); + } + } else { + most_advanced = Some(replica_feedback); + } + } + } + + self.commit_lsn + .with_label_values(labels) + .set(tli.mem_state.commit_lsn.into()); + self.backup_lsn + .with_label_values(labels) + .set(tli.mem_state.backup_lsn.into()); + self.flush_lsn + .with_label_values(labels) + .set(tli.flush_lsn.into()); + self.epoch_start_lsn + .with_label_values(labels) + .set(tli.epoch_start_lsn.into()); + self.peer_horizon_lsn + .with_label_values(labels) + .set(tli.mem_state.peer_horizon_lsn.into()); + self.remote_consistent_lsn + .with_label_values(labels) + .set(tli.mem_state.remote_consistent_lsn.into()); + self.timeline_active + .with_label_values(labels) + .set(tli.timeline_is_active as u64); + self.wal_backup_active + .with_label_values(labels) + .set(tli.wal_backup_active as u64); + self.connected_computes + .with_label_values(labels) + .set(tli.num_computes as i64); + self.acceptor_term + .with_label_values(labels) + .set(tli.persisted_state.acceptor_state.term as u64); + + if let Some(feedback) = most_advanced { + self.feedback_ps_write_lsn + .with_label_values(labels) + .set(feedback.ps_writelsn); + if let Ok(unix_time) = feedback.ps_replytime.duration_since(SystemTime::UNIX_EPOCH) + { + self.feedback_last_time_seconds + .with_label_values(labels) + .set(unix_time.as_secs()); + } + } + + if tli.last_removed_segno != 0 { + let segno_count = tli + .flush_lsn + .segment_number(tli.persisted_state.server.wal_seg_size as usize) + - tli.last_removed_segno; + let disk_usage_bytes = segno_count * tli.persisted_state.server.wal_seg_size as u64; + self.disk_usage + .with_label_values(labels) + .set(disk_usage_bytes); + } + } + + // collect MetricFamilys. + let mut mfs = Vec::new(); + mfs.extend(self.commit_lsn.collect()); + mfs.extend(self.backup_lsn.collect()); + mfs.extend(self.flush_lsn.collect()); + mfs.extend(self.epoch_start_lsn.collect()); + mfs.extend(self.peer_horizon_lsn.collect()); + mfs.extend(self.remote_consistent_lsn.collect()); + mfs.extend(self.feedback_ps_write_lsn.collect()); + mfs.extend(self.feedback_last_time_seconds.collect()); + mfs.extend(self.timeline_active.collect()); + mfs.extend(self.wal_backup_active.collect()); + mfs.extend(self.connected_computes.collect()); + mfs.extend(self.disk_usage.collect()); + mfs.extend(self.acceptor_term.collect()); + + // report time it took to collect all info + let elapsed = start_collecting.elapsed().as_secs_f64(); + self.collect_timeline_metrics.set(elapsed); + mfs.extend(self.collect_timeline_metrics.collect()); + + mfs + } +} diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index df4b202063..1c00af7043 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -15,13 +15,10 @@ use std::fmt; use std::io::Read; use tracing::*; -use lazy_static::lazy_static; - use crate::control_file; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; -use metrics::{register_gauge_vec, Gauge, GaugeVec}; use postgres_ffi::xlog_utils::MAX_SEND_SIZE; use utils::{ bin_ser::LeSer, @@ -487,45 +484,16 @@ impl AcceptorProposerMessage { } } -lazy_static! { - // The prometheus crate does not support u64 yet, i64 only (see `IntGauge`). - // i64 is faster than f64, so update to u64 when available. - static ref COMMIT_LSN_GAUGE: GaugeVec = register_gauge_vec!( - "safekeeper_commit_lsn", - "Current commit_lsn (not necessarily persisted to disk), grouped by timeline", - &["tenant_id", "timeline_id"] - ) - .expect("Failed to register safekeeper_commit_lsn gauge vec"); -} - -struct SafeKeeperMetrics { - commit_lsn: Gauge, - // WAL-related metrics are in WalStorageMetrics -} - -impl SafeKeeperMetrics { - fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Self { - let tenant_id = tenant_id.to_string(); - let timeline_id = timeline_id.to_string(); - Self { - commit_lsn: COMMIT_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]), - } - } -} - /// SafeKeeper which consumes events (messages from compute) and provides /// replies. pub struct SafeKeeper { - // Cached metrics so we don't have to recompute labels on each update. - metrics: SafeKeeperMetrics, - /// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn. /// Note: be careful to set only if we are sure our WAL (term history) matches /// committed one. pub global_commit_lsn: Lsn, /// LSN since the proposer safekeeper currently talking to appends WAL; /// determines epoch switch point. - epoch_start_lsn: Lsn, + pub epoch_start_lsn: Lsn, pub inmem: SafekeeperMemState, // in memory part pub state: CTRL, // persistent state storage @@ -555,7 +523,6 @@ where wal_store.init_storage(&state)?; Ok(SafeKeeper { - metrics: SafeKeeperMetrics::new(state.tenant_id, ztli), global_commit_lsn: state.commit_lsn, epoch_start_lsn: Lsn(0), inmem: SafekeeperMemState { @@ -757,7 +724,6 @@ where // upgrade. self.global_commit_lsn = max(self.global_commit_lsn, state.timeline_start_lsn); self.inmem.commit_lsn = max(self.inmem.commit_lsn, state.timeline_start_lsn); - self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64); // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment. self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn); @@ -777,7 +743,6 @@ where assert!(commit_lsn >= self.inmem.commit_lsn); self.inmem.commit_lsn = commit_lsn; - self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64); // If new commit_lsn reached epoch switch, force sync of control // file: walproposer in sync mode is very interested when this diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 74a61410fd..2fc5bcc1f6 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -33,6 +33,7 @@ use crate::safekeeper::{ }; use crate::send_wal::HotStandbyFeedback; +use crate::metrics::FullTimelineInfo; use crate::wal_storage; use crate::wal_storage::Storage as wal_storage_iface; use crate::SafeKeeperConf; @@ -450,6 +451,33 @@ impl Timeline { shared_state.active } + /// Returns full timeline info, required for the metrics. + /// If the timeline is not active, returns None instead. + pub fn info_for_metrics(&self) -> Option { + let shared_state = self.mutex.lock().unwrap(); + if !shared_state.active { + return None; + } + + Some(FullTimelineInfo { + zttid: self.zttid, + replicas: shared_state + .replicas + .iter() + .filter_map(|r| r.as_ref()) + .copied() + .collect(), + wal_backup_active: shared_state.wal_backup_active, + timeline_is_active: shared_state.active, + num_computes: shared_state.num_computes, + last_removed_segno: shared_state.last_removed_segno, + epoch_start_lsn: shared_state.sk.epoch_start_lsn, + mem_state: shared_state.sk.inmem.clone(), + persisted_state: shared_state.sk.state.clone(), + flush_lsn: shared_state.sk.wal_store.flush_lsn(), + }) + } + /// Timed wait for an LSN to be committed. /// /// Returns the last committed LSN, which will be at least @@ -777,6 +805,16 @@ impl GlobalTimelines { .collect() } + /// Return FullTimelineInfo for all active timelines. + pub fn active_timelines_metrics() -> Vec { + let state = TIMELINES_STATE.lock().unwrap(); + state + .timelines + .iter() + .filter_map(|(_, tli)| tli.info_for_metrics()) + .collect() + } + fn delete_force_internal( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 7285cedc03..e3f1ce7333 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -31,20 +31,11 @@ use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; use postgres_ffi::waldecoder::WalStreamDecoder; -use metrics::{ - register_gauge_vec, register_histogram_vec, Gauge, GaugeVec, Histogram, HistogramVec, - DISK_WRITE_SECONDS_BUCKETS, -}; +use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; lazy_static! { // The prometheus crate does not support u64 yet, i64 only (see `IntGauge`). // i64 is faster than f64, so update to u64 when available. - static ref FLUSH_LSN_GAUGE: GaugeVec = register_gauge_vec!( - "safekeeper_flush_lsn", - "Current flush_lsn, grouped by timeline", - &["tenant_id", "timeline_id"] - ) - .expect("Failed to register safekeeper_flush_lsn gauge vec"); static ref WRITE_WAL_BYTES: HistogramVec = register_histogram_vec!( "safekeeper_write_wal_bytes", "Bytes written to WAL in a single request, grouped by timeline", @@ -69,7 +60,6 @@ lazy_static! { } struct WalStorageMetrics { - flush_lsn: Gauge, write_wal_bytes: Histogram, write_wal_seconds: Histogram, flush_wal_seconds: Histogram, @@ -80,7 +70,6 @@ impl WalStorageMetrics { let tenant_id = zttid.tenant_id.to_string(); let timeline_id = zttid.timeline_id.to_string(); Self { - flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]), write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]), write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]), flush_wal_seconds: FLUSH_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]), @@ -171,7 +160,6 @@ impl PhysicalStorage { /// Wrapper for flush_lsn updates that also updates metrics. fn update_flush_lsn(&mut self) { self.flush_record_lsn = self.write_record_lsn; - self.metrics.flush_lsn.set(self.flush_record_lsn.0 as f64); } /// Call fdatasync if config requires so.