diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index 22ed34cc00..6be3f9abb2 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -2,7 +2,6 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; -use once_cell::sync::Lazy; use std::fs::{self, File, OpenOptions}; use std::io::{Read, Write}; @@ -10,8 +9,8 @@ use std::ops::Deref; use std::path::{Path, PathBuf}; use crate::control_file_upgrade::upgrade_control_file; +use crate::metrics::PERSIST_CONTROL_FILE_SECONDS; use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC}; -use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; use utils::{bin_ser::LeSer, id::TenantTimelineId}; use crate::SafeKeeperConf; @@ -24,16 +23,6 @@ const CONTROL_FILE_NAME: &str = "safekeeper.control"; const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); -static PERSIST_CONTROL_FILE_SECONDS: Lazy = Lazy::new(|| { - register_histogram_vec!( - "safekeeper_persist_control_file_seconds", - "Seconds to persist and sync control file, grouped by timeline", - &["tenant_id", "timeline_id"], - DISK_WRITE_SECONDS_BUCKETS.to_vec() - ) - .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec") -}); - /// Storage should keep actual state inside of it. It should implement Deref /// trait to access state fields and have persist method for updating that state. pub trait Storage: Deref { @@ -46,7 +35,6 @@ pub struct FileStorage { // save timeline dir to avoid reconstructing it every time timeline_dir: PathBuf, conf: SafeKeeperConf, - persist_control_file_seconds: Histogram, /// Last state persisted to disk. state: SafeKeeperState, @@ -56,16 +44,12 @@ impl FileStorage { /// Initialize storage by loading state from disk. pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result { let timeline_dir = conf.timeline_dir(ttid); - let tenant_id = ttid.tenant_id.to_string(); - let timeline_id = ttid.timeline_id.to_string(); let state = Self::load_control_file_conf(conf, ttid)?; Ok(FileStorage { timeline_dir, conf: conf.clone(), - persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS - .with_label_values(&[&tenant_id, &timeline_id]), state, }) } @@ -77,14 +61,10 @@ impl FileStorage { state: SafeKeeperState, ) -> Result { let timeline_dir = conf.timeline_dir(ttid); - let tenant_id = ttid.tenant_id.to_string(); - let timeline_id = ttid.timeline_id.to_string(); let store = FileStorage { timeline_dir, conf: conf.clone(), - persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS - .with_label_values(&[&tenant_id, &timeline_id]), state, }; @@ -175,7 +155,7 @@ impl Storage for FileStorage { /// persists state durably to underlying storage /// for description see https://lwn.net/Articles/457667/ fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { - let _timer = &self.persist_control_file_seconds.start_timer(); + let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer(); // write data to safekeeper.control.partial let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 851a568aec..51138df776 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -1,12 +1,15 @@ -//! This module exports metrics for all active timelines. +//! Global safekeeper mertics and per-timeline safekeeper metrics. use std::time::{Instant, SystemTime}; +use ::metrics::{register_histogram, GaugeVec, Histogram, DISK_WRITE_SECONDS_BUCKETS}; +use anyhow::Result; use metrics::{ core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts}, proto::MetricFamily, Gauge, IntGaugeVec, }; +use once_cell::sync::Lazy; use postgres_ffi::XLogSegNo; use utils::{id::TenantTimelineId, lsn::Lsn}; @@ -16,6 +19,85 @@ use crate::{ GlobalTimelines, }; +// Global metrics across all timelines. +pub static WRITE_WAL_BYTES: Lazy = Lazy::new(|| { + register_histogram!( + "safekeeper_write_wal_bytes", + "Bytes written to WAL in a single request", + vec![ + 1.0, + 10.0, + 100.0, + 1024.0, + 8192.0, + 128.0 * 1024.0, + 1024.0 * 1024.0, + 10.0 * 1024.0 * 1024.0 + ] + ) + .expect("Failed to register safekeeper_write_wal_bytes histogram") +}); +pub static WRITE_WAL_SECONDS: Lazy = Lazy::new(|| { + register_histogram!( + "safekeeper_write_wal_seconds", + "Seconds spent writing and syncing WAL to a disk in a single request", + DISK_WRITE_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_write_wal_seconds histogram") +}); +pub static FLUSH_WAL_SECONDS: Lazy = Lazy::new(|| { + register_histogram!( + "safekeeper_flush_wal_seconds", + "Seconds spent syncing WAL to a disk", + DISK_WRITE_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_flush_wal_seconds histogram") +}); +pub static PERSIST_CONTROL_FILE_SECONDS: Lazy = Lazy::new(|| { + register_histogram!( + "safekeeper_persist_control_file_seconds", + "Seconds to persist and sync control file", + DISK_WRITE_SECONDS_BUCKETS.to_vec() + ) + .expect("Failed to register safekeeper_persist_control_file_seconds histogram vec") +}); + +/// Metrics for WalStorage in a single timeline. +#[derive(Clone, Default)] +pub struct WalStorageMetrics { + /// How much bytes were written in total. + write_wal_bytes: u64, + /// How much time spent writing WAL to disk, waiting for write(2). + write_wal_seconds: f64, + /// How much time spent syncing WAL to disk, waiting for fsync(2). + flush_wal_seconds: f64, +} + +impl WalStorageMetrics { + pub fn observe_write_bytes(&mut self, bytes: usize) { + self.write_wal_bytes += bytes as u64; + WRITE_WAL_BYTES.observe(bytes as f64); + } + + pub fn observe_write_seconds(&mut self, seconds: f64) { + self.write_wal_seconds += seconds; + WRITE_WAL_SECONDS.observe(seconds); + } + + pub fn observe_flush_seconds(&mut self, seconds: f64) { + self.flush_wal_seconds += seconds; + FLUSH_WAL_SECONDS.observe(seconds); + } +} + +/// Accepts a closure that returns a result, and returns the duration of the closure. +pub fn time_io_closure(closure: impl FnOnce() -> Result<()>) -> Result { + let start = std::time::Instant::now(); + closure()?; + Ok(start.elapsed().as_secs_f64()) +} + +/// Metrics for a single timeline. pub struct FullTimelineInfo { pub ttid: TenantTimelineId, pub replicas: Vec, @@ -29,8 +111,11 @@ pub struct FullTimelineInfo { pub persisted_state: SafeKeeperState, pub flush_lsn: Lsn, + + pub wal_storage: WalStorageMetrics, } +/// Collects metrics for all active timelines. pub struct TimelineCollector { descs: Vec, commit_lsn: GenericGaugeVec, @@ -46,6 +131,9 @@ pub struct TimelineCollector { connected_computes: IntGaugeVec, disk_usage: GenericGaugeVec, acceptor_term: GenericGaugeVec, + written_wal_bytes: GenericGaugeVec, + written_wal_seconds: GaugeVec, + flushed_wal_seconds: GaugeVec, collect_timeline_metrics: Gauge, } @@ -186,6 +274,36 @@ impl TimelineCollector { .unwrap(); descs.extend(acceptor_term.desc().into_iter().cloned()); + let written_wal_bytes = GenericGaugeVec::new( + Opts::new( + "safekeeper_written_wal_bytes_total", + "Number of WAL bytes written to disk, grouped by timeline", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(written_wal_bytes.desc().into_iter().cloned()); + + let written_wal_seconds = GaugeVec::new( + Opts::new( + "safekeeper_written_wal_seconds_total", + "Total time spent in write(2) writing WAL to disk, grouped by timeline", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(written_wal_seconds.desc().into_iter().cloned()); + + let flushed_wal_seconds = GaugeVec::new( + Opts::new( + "safekeeper_flushed_wal_seconds_total", + "Total time spent in fsync(2) flushing WAL to disk, grouped by timeline", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + descs.extend(flushed_wal_seconds.desc().into_iter().cloned()); + let collect_timeline_metrics = Gauge::new( "safekeeper_collect_timeline_metrics_seconds", "Time spent collecting timeline metrics, including obtaining mutex lock for all timelines", @@ -208,6 +326,9 @@ impl TimelineCollector { connected_computes, disk_usage, acceptor_term, + written_wal_bytes, + written_wal_seconds, + flushed_wal_seconds, collect_timeline_metrics, } } @@ -235,6 +356,9 @@ impl Collector for TimelineCollector { self.connected_computes.reset(); self.disk_usage.reset(); self.acceptor_term.reset(); + self.written_wal_bytes.reset(); + self.written_wal_seconds.reset(); + self.flushed_wal_seconds.reset(); let timelines = GlobalTimelines::get_all(); @@ -292,6 +416,15 @@ impl Collector for TimelineCollector { self.acceptor_term .with_label_values(labels) .set(tli.persisted_state.acceptor_state.term as u64); + self.written_wal_bytes + .with_label_values(labels) + .set(tli.wal_storage.write_wal_bytes); + self.written_wal_seconds + .with_label_values(labels) + .set(tli.wal_storage.write_wal_seconds); + self.flushed_wal_seconds + .with_label_values(labels) + .set(tli.wal_storage.flush_wal_seconds); if let Some(feedback) = most_advanced { self.feedback_ps_write_lsn @@ -332,6 +465,9 @@ impl Collector for TimelineCollector { mfs.extend(self.connected_computes.collect()); mfs.extend(self.disk_usage.collect()); mfs.extend(self.acceptor_term.collect()); + mfs.extend(self.written_wal_bytes.collect()); + mfs.extend(self.written_wal_seconds.collect()); + mfs.extend(self.flushed_wal_seconds.collect()); // report time it took to collect all info let elapsed = start_collecting.elapsed().as_secs_f64(); diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index d34a77e02b..65340ac0ed 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -998,6 +998,10 @@ mod tests { fn remove_up_to(&self) -> Box Result<()>> { Box::new(move |_segno_up_to: XLogSegNo| Ok(())) } + + fn get_metrics(&self) -> crate::metrics::WalStorageMetrics { + crate::metrics::WalStorageMetrics::default() + } } #[test] diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 4000815857..ec29e13931 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -534,6 +534,7 @@ impl Timeline { mem_state: state.sk.inmem.clone(), persisted_state: state.sk.state.clone(), flush_lsn: state.sk.wal_store.flush_lsn(), + wal_storage: state.sk.wal_store.get_metrics(), }) } else { None diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index ea613dd0f1..692bd18342 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -8,11 +8,11 @@ //! Note that last file has `.partial` suffix, that's different from postgres. use anyhow::{bail, Context, Result}; + use std::io::{self, Seek, SeekFrom}; use std::pin::Pin; use tokio::io::AsyncRead; -use once_cell::sync::Lazy; use postgres_ffi::v14::xlog_utils::{ find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, }; @@ -27,6 +27,7 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; +use crate::metrics::{time_io_closure, WalStorageMetrics}; use crate::safekeeper::SafeKeeperState; use crate::wal_backup::read_object; @@ -36,67 +37,8 @@ use postgres_ffi::XLOG_BLCKSZ; use postgres_ffi::v14::waldecoder::WalStreamDecoder; -use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; - use tokio::io::{AsyncReadExt, AsyncSeekExt}; -// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`). -// i64 is faster than f64, so update to u64 when available. -static WRITE_WAL_BYTES: Lazy = Lazy::new(|| { - register_histogram_vec!( - "safekeeper_write_wal_bytes", - "Bytes written to WAL in a single request, grouped by timeline", - &["tenant_id", "timeline_id"], - vec![ - 1.0, - 10.0, - 100.0, - 1024.0, - 8192.0, - 128.0 * 1024.0, - 1024.0 * 1024.0, - 10.0 * 1024.0 * 1024.0 - ] - ) - .expect("Failed to register safekeeper_write_wal_bytes histogram vec") -}); -static WRITE_WAL_SECONDS: Lazy = Lazy::new(|| { - register_histogram_vec!( - "safekeeper_write_wal_seconds", - "Seconds spent writing and syncing WAL to a disk in a single request, grouped by timeline", - &["tenant_id", "timeline_id"], - DISK_WRITE_SECONDS_BUCKETS.to_vec() - ) - .expect("Failed to register safekeeper_write_wal_seconds histogram vec") -}); -static FLUSH_WAL_SECONDS: Lazy = Lazy::new(|| { - register_histogram_vec!( - "safekeeper_flush_wal_seconds", - "Seconds spent syncing WAL to a disk, grouped by timeline", - &["tenant_id", "timeline_id"], - DISK_WRITE_SECONDS_BUCKETS.to_vec() - ) - .expect("Failed to register safekeeper_flush_wal_seconds histogram vec") -}); - -struct WalStorageMetrics { - write_wal_bytes: Histogram, - write_wal_seconds: Histogram, - flush_wal_seconds: Histogram, -} - -impl WalStorageMetrics { - fn new(ttid: &TenantTimelineId) -> Self { - let tenant_id = ttid.tenant_id.to_string(); - let timeline_id = ttid.timeline_id.to_string(); - Self { - write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]), - write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]), - flush_wal_seconds: FLUSH_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]), - } - } -} - pub trait Storage { /// LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn; @@ -113,6 +55,9 @@ pub trait Storage { /// Remove all segments <= given segno. Returns closure as we want to do /// that without timeline lock. fn remove_up_to(&self) -> Box Result<()>>; + + /// Get metrics for this timeline. + fn get_metrics(&self) -> WalStorageMetrics; } /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes @@ -187,7 +132,7 @@ impl PhysicalStorage { } Ok(PhysicalStorage { - metrics: WalStorageMetrics::new(ttid), + metrics: WalStorageMetrics::default(), timeline_dir, conf: conf.clone(), wal_seg_size, @@ -200,28 +145,26 @@ impl PhysicalStorage { } /// Call fdatasync if config requires so. - fn fdatasync_file(&self, file: &mut File) -> Result<()> { + fn fdatasync_file(&mut self, file: &mut File) -> Result<()> { if !self.conf.no_sync { self.metrics - .flush_wal_seconds - .observe_closure_duration(|| file.sync_data())?; + .observe_flush_seconds(time_io_closure(|| Ok(file.sync_data()?))?); } Ok(()) } /// Call fsync if config requires so. - fn fsync_file(&self, file: &mut File) -> Result<()> { + fn fsync_file(&mut self, file: &mut File) -> Result<()> { if !self.conf.no_sync { self.metrics - .flush_wal_seconds - .observe_closure_duration(|| file.sync_all())?; + .observe_flush_seconds(time_io_closure(|| Ok(file.sync_all()?))?); } Ok(()) } /// Open or create WAL segment file. Caller must call seek to the wanted position. /// Returns `file` and `is_partial`. - fn open_or_create(&self, segno: XLogSegNo) -> Result<(File, bool)> { + fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> { let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; @@ -335,13 +278,10 @@ impl Storage for PhysicalStorage { ); } - { - let _timer = self.metrics.write_wal_seconds.start_timer(); - self.write_exact(startpos, buf)?; - } - + let write_seconds = time_io_closure(|| self.write_exact(startpos, buf))?; // WAL is written, updating write metrics - self.metrics.write_wal_bytes.observe(buf.len() as f64); + self.metrics.observe_write_seconds(write_seconds); + self.metrics.observe_write_bytes(buf.len()); // figure out last record's end lsn for reporting (if we got the // whole record) @@ -444,6 +384,10 @@ impl Storage for PhysicalStorage { remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to) }) } + + fn get_metrics(&self) -> WalStorageMetrics { + self.metrics.clone() + } } /// Remove all WAL segments in timeline_dir that match the given predicate.