mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
Reduce metrics footprint in safekeeper (#2491)
Fixes bugs with metrics in control_file and wal_storage, where we haven't deleted metrics for inactive timelines.
This commit is contained in:
committed by
GitHub
parent
19fa410ff8
commit
7eebb45ea6
@@ -2,7 +2,6 @@
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Read, Write};
|
||||
@@ -10,8 +9,8 @@ use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use crate::control_file_upgrade::upgrade_control_file;
|
||||
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
|
||||
use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC};
|
||||
use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
|
||||
use utils::{bin_ser::LeSer, id::TenantTimelineId};
|
||||
|
||||
use crate::SafeKeeperConf;
|
||||
@@ -24,16 +23,6 @@ const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
|
||||
pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
|
||||
|
||||
static PERSIST_CONTROL_FILE_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"safekeeper_persist_control_file_seconds",
|
||||
"Seconds to persist and sync control file, grouped by timeline",
|
||||
&["tenant_id", "timeline_id"],
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
|
||||
});
|
||||
|
||||
/// Storage should keep actual state inside of it. It should implement Deref
|
||||
/// trait to access state fields and have persist method for updating that state.
|
||||
pub trait Storage: Deref<Target = SafeKeeperState> {
|
||||
@@ -46,7 +35,6 @@ pub struct FileStorage {
|
||||
// save timeline dir to avoid reconstructing it every time
|
||||
timeline_dir: PathBuf,
|
||||
conf: SafeKeeperConf,
|
||||
persist_control_file_seconds: Histogram,
|
||||
|
||||
/// Last state persisted to disk.
|
||||
state: SafeKeeperState,
|
||||
@@ -56,16 +44,12 @@ impl FileStorage {
|
||||
/// Initialize storage by loading state from disk.
|
||||
pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
|
||||
let timeline_dir = conf.timeline_dir(ttid);
|
||||
let tenant_id = ttid.tenant_id.to_string();
|
||||
let timeline_id = ttid.timeline_id.to_string();
|
||||
|
||||
let state = Self::load_control_file_conf(conf, ttid)?;
|
||||
|
||||
Ok(FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
state,
|
||||
})
|
||||
}
|
||||
@@ -77,14 +61,10 @@ impl FileStorage {
|
||||
state: SafeKeeperState,
|
||||
) -> Result<FileStorage> {
|
||||
let timeline_dir = conf.timeline_dir(ttid);
|
||||
let tenant_id = ttid.tenant_id.to_string();
|
||||
let timeline_id = ttid.timeline_id.to_string();
|
||||
|
||||
let store = FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
state,
|
||||
};
|
||||
|
||||
@@ -175,7 +155,7 @@ impl Storage for FileStorage {
|
||||
/// persists state durably to underlying storage
|
||||
/// for description see https://lwn.net/Articles/457667/
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
let _timer = &self.persist_control_file_seconds.start_timer();
|
||||
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
|
||||
|
||||
// write data to safekeeper.control.partial
|
||||
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
//! This module exports metrics for all active timelines.
|
||||
//! Global safekeeper mertics and per-timeline safekeeper metrics.
|
||||
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
use ::metrics::{register_histogram, GaugeVec, Histogram, DISK_WRITE_SECONDS_BUCKETS};
|
||||
use anyhow::Result;
|
||||
use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts},
|
||||
proto::MetricFamily,
|
||||
Gauge, IntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_ffi::XLogSegNo;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
@@ -16,6 +19,85 @@ use crate::{
|
||||
GlobalTimelines,
|
||||
};
|
||||
|
||||
// Global metrics across all timelines.
|
||||
pub static WRITE_WAL_BYTES: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"safekeeper_write_wal_bytes",
|
||||
"Bytes written to WAL in a single request",
|
||||
vec![
|
||||
1.0,
|
||||
10.0,
|
||||
100.0,
|
||||
1024.0,
|
||||
8192.0,
|
||||
128.0 * 1024.0,
|
||||
1024.0 * 1024.0,
|
||||
10.0 * 1024.0 * 1024.0
|
||||
]
|
||||
)
|
||||
.expect("Failed to register safekeeper_write_wal_bytes histogram")
|
||||
});
|
||||
pub static WRITE_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"safekeeper_write_wal_seconds",
|
||||
"Seconds spent writing and syncing WAL to a disk in a single request",
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_write_wal_seconds histogram")
|
||||
});
|
||||
pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"safekeeper_flush_wal_seconds",
|
||||
"Seconds spent syncing WAL to a disk",
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_flush_wal_seconds histogram")
|
||||
});
|
||||
pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"safekeeper_persist_control_file_seconds",
|
||||
"Seconds to persist and sync control file",
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_persist_control_file_seconds histogram vec")
|
||||
});
|
||||
|
||||
/// Metrics for WalStorage in a single timeline.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct WalStorageMetrics {
|
||||
/// How much bytes were written in total.
|
||||
write_wal_bytes: u64,
|
||||
/// How much time spent writing WAL to disk, waiting for write(2).
|
||||
write_wal_seconds: f64,
|
||||
/// How much time spent syncing WAL to disk, waiting for fsync(2).
|
||||
flush_wal_seconds: f64,
|
||||
}
|
||||
|
||||
impl WalStorageMetrics {
|
||||
pub fn observe_write_bytes(&mut self, bytes: usize) {
|
||||
self.write_wal_bytes += bytes as u64;
|
||||
WRITE_WAL_BYTES.observe(bytes as f64);
|
||||
}
|
||||
|
||||
pub fn observe_write_seconds(&mut self, seconds: f64) {
|
||||
self.write_wal_seconds += seconds;
|
||||
WRITE_WAL_SECONDS.observe(seconds);
|
||||
}
|
||||
|
||||
pub fn observe_flush_seconds(&mut self, seconds: f64) {
|
||||
self.flush_wal_seconds += seconds;
|
||||
FLUSH_WAL_SECONDS.observe(seconds);
|
||||
}
|
||||
}
|
||||
|
||||
/// Accepts a closure that returns a result, and returns the duration of the closure.
|
||||
pub fn time_io_closure(closure: impl FnOnce() -> Result<()>) -> Result<f64> {
|
||||
let start = std::time::Instant::now();
|
||||
closure()?;
|
||||
Ok(start.elapsed().as_secs_f64())
|
||||
}
|
||||
|
||||
/// Metrics for a single timeline.
|
||||
pub struct FullTimelineInfo {
|
||||
pub ttid: TenantTimelineId,
|
||||
pub replicas: Vec<ReplicaState>,
|
||||
@@ -29,8 +111,11 @@ pub struct FullTimelineInfo {
|
||||
pub persisted_state: SafeKeeperState,
|
||||
|
||||
pub flush_lsn: Lsn,
|
||||
|
||||
pub wal_storage: WalStorageMetrics,
|
||||
}
|
||||
|
||||
/// Collects metrics for all active timelines.
|
||||
pub struct TimelineCollector {
|
||||
descs: Vec<Desc>,
|
||||
commit_lsn: GenericGaugeVec<AtomicU64>,
|
||||
@@ -46,6 +131,9 @@ pub struct TimelineCollector {
|
||||
connected_computes: IntGaugeVec,
|
||||
disk_usage: GenericGaugeVec<AtomicU64>,
|
||||
acceptor_term: GenericGaugeVec<AtomicU64>,
|
||||
written_wal_bytes: GenericGaugeVec<AtomicU64>,
|
||||
written_wal_seconds: GaugeVec,
|
||||
flushed_wal_seconds: GaugeVec,
|
||||
collect_timeline_metrics: Gauge,
|
||||
}
|
||||
|
||||
@@ -186,6 +274,36 @@ impl TimelineCollector {
|
||||
.unwrap();
|
||||
descs.extend(acceptor_term.desc().into_iter().cloned());
|
||||
|
||||
let written_wal_bytes = GenericGaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_written_wal_bytes_total",
|
||||
"Number of WAL bytes written to disk, grouped by timeline",
|
||||
),
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(written_wal_bytes.desc().into_iter().cloned());
|
||||
|
||||
let written_wal_seconds = GaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_written_wal_seconds_total",
|
||||
"Total time spent in write(2) writing WAL to disk, grouped by timeline",
|
||||
),
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(written_wal_seconds.desc().into_iter().cloned());
|
||||
|
||||
let flushed_wal_seconds = GaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_flushed_wal_seconds_total",
|
||||
"Total time spent in fsync(2) flushing WAL to disk, grouped by timeline",
|
||||
),
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(flushed_wal_seconds.desc().into_iter().cloned());
|
||||
|
||||
let collect_timeline_metrics = Gauge::new(
|
||||
"safekeeper_collect_timeline_metrics_seconds",
|
||||
"Time spent collecting timeline metrics, including obtaining mutex lock for all timelines",
|
||||
@@ -208,6 +326,9 @@ impl TimelineCollector {
|
||||
connected_computes,
|
||||
disk_usage,
|
||||
acceptor_term,
|
||||
written_wal_bytes,
|
||||
written_wal_seconds,
|
||||
flushed_wal_seconds,
|
||||
collect_timeline_metrics,
|
||||
}
|
||||
}
|
||||
@@ -235,6 +356,9 @@ impl Collector for TimelineCollector {
|
||||
self.connected_computes.reset();
|
||||
self.disk_usage.reset();
|
||||
self.acceptor_term.reset();
|
||||
self.written_wal_bytes.reset();
|
||||
self.written_wal_seconds.reset();
|
||||
self.flushed_wal_seconds.reset();
|
||||
|
||||
let timelines = GlobalTimelines::get_all();
|
||||
|
||||
@@ -292,6 +416,15 @@ impl Collector for TimelineCollector {
|
||||
self.acceptor_term
|
||||
.with_label_values(labels)
|
||||
.set(tli.persisted_state.acceptor_state.term as u64);
|
||||
self.written_wal_bytes
|
||||
.with_label_values(labels)
|
||||
.set(tli.wal_storage.write_wal_bytes);
|
||||
self.written_wal_seconds
|
||||
.with_label_values(labels)
|
||||
.set(tli.wal_storage.write_wal_seconds);
|
||||
self.flushed_wal_seconds
|
||||
.with_label_values(labels)
|
||||
.set(tli.wal_storage.flush_wal_seconds);
|
||||
|
||||
if let Some(feedback) = most_advanced {
|
||||
self.feedback_ps_write_lsn
|
||||
@@ -332,6 +465,9 @@ impl Collector for TimelineCollector {
|
||||
mfs.extend(self.connected_computes.collect());
|
||||
mfs.extend(self.disk_usage.collect());
|
||||
mfs.extend(self.acceptor_term.collect());
|
||||
mfs.extend(self.written_wal_bytes.collect());
|
||||
mfs.extend(self.written_wal_seconds.collect());
|
||||
mfs.extend(self.flushed_wal_seconds.collect());
|
||||
|
||||
// report time it took to collect all info
|
||||
let elapsed = start_collecting.elapsed().as_secs_f64();
|
||||
|
||||
@@ -998,6 +998,10 @@ mod tests {
|
||||
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
|
||||
Box::new(move |_segno_up_to: XLogSegNo| Ok(()))
|
||||
}
|
||||
|
||||
fn get_metrics(&self) -> crate::metrics::WalStorageMetrics {
|
||||
crate::metrics::WalStorageMetrics::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -534,6 +534,7 @@ impl Timeline {
|
||||
mem_state: state.sk.inmem.clone(),
|
||||
persisted_state: state.sk.state.clone(),
|
||||
flush_lsn: state.sk.wal_store.flush_lsn(),
|
||||
wal_storage: state.sk.wal_store.get_metrics(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -8,11 +8,11 @@
|
||||
//! Note that last file has `.partial` suffix, that's different from postgres.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
|
||||
use std::io::{self, Seek, SeekFrom};
|
||||
use std::pin::Pin;
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_ffi::v14::xlog_utils::{
|
||||
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName,
|
||||
};
|
||||
@@ -27,6 +27,7 @@ use tracing::*;
|
||||
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
use crate::metrics::{time_io_closure, WalStorageMetrics};
|
||||
use crate::safekeeper::SafeKeeperState;
|
||||
|
||||
use crate::wal_backup::read_object;
|
||||
@@ -36,67 +37,8 @@ use postgres_ffi::XLOG_BLCKSZ;
|
||||
|
||||
use postgres_ffi::v14::waldecoder::WalStreamDecoder;
|
||||
|
||||
use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||
|
||||
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
|
||||
// i64 is faster than f64, so update to u64 when available.
|
||||
static WRITE_WAL_BYTES: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"safekeeper_write_wal_bytes",
|
||||
"Bytes written to WAL in a single request, grouped by timeline",
|
||||
&["tenant_id", "timeline_id"],
|
||||
vec![
|
||||
1.0,
|
||||
10.0,
|
||||
100.0,
|
||||
1024.0,
|
||||
8192.0,
|
||||
128.0 * 1024.0,
|
||||
1024.0 * 1024.0,
|
||||
10.0 * 1024.0 * 1024.0
|
||||
]
|
||||
)
|
||||
.expect("Failed to register safekeeper_write_wal_bytes histogram vec")
|
||||
});
|
||||
static WRITE_WAL_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"safekeeper_write_wal_seconds",
|
||||
"Seconds spent writing and syncing WAL to a disk in a single request, grouped by timeline",
|
||||
&["tenant_id", "timeline_id"],
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_write_wal_seconds histogram vec")
|
||||
});
|
||||
static FLUSH_WAL_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"safekeeper_flush_wal_seconds",
|
||||
"Seconds spent syncing WAL to a disk, grouped by timeline",
|
||||
&["tenant_id", "timeline_id"],
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_flush_wal_seconds histogram vec")
|
||||
});
|
||||
|
||||
struct WalStorageMetrics {
|
||||
write_wal_bytes: Histogram,
|
||||
write_wal_seconds: Histogram,
|
||||
flush_wal_seconds: Histogram,
|
||||
}
|
||||
|
||||
impl WalStorageMetrics {
|
||||
fn new(ttid: &TenantTimelineId) -> Self {
|
||||
let tenant_id = ttid.tenant_id.to_string();
|
||||
let timeline_id = ttid.timeline_id.to_string();
|
||||
Self {
|
||||
write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
flush_wal_seconds: FLUSH_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Storage {
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
@@ -113,6 +55,9 @@ pub trait Storage {
|
||||
/// Remove all segments <= given segno. Returns closure as we want to do
|
||||
/// that without timeline lock.
|
||||
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>>;
|
||||
|
||||
/// Get metrics for this timeline.
|
||||
fn get_metrics(&self) -> WalStorageMetrics;
|
||||
}
|
||||
|
||||
/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
|
||||
@@ -187,7 +132,7 @@ impl PhysicalStorage {
|
||||
}
|
||||
|
||||
Ok(PhysicalStorage {
|
||||
metrics: WalStorageMetrics::new(ttid),
|
||||
metrics: WalStorageMetrics::default(),
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
wal_seg_size,
|
||||
@@ -200,28 +145,26 @@ impl PhysicalStorage {
|
||||
}
|
||||
|
||||
/// Call fdatasync if config requires so.
|
||||
fn fdatasync_file(&self, file: &mut File) -> Result<()> {
|
||||
fn fdatasync_file(&mut self, file: &mut File) -> Result<()> {
|
||||
if !self.conf.no_sync {
|
||||
self.metrics
|
||||
.flush_wal_seconds
|
||||
.observe_closure_duration(|| file.sync_data())?;
|
||||
.observe_flush_seconds(time_io_closure(|| Ok(file.sync_data()?))?);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call fsync if config requires so.
|
||||
fn fsync_file(&self, file: &mut File) -> Result<()> {
|
||||
fn fsync_file(&mut self, file: &mut File) -> Result<()> {
|
||||
if !self.conf.no_sync {
|
||||
self.metrics
|
||||
.flush_wal_seconds
|
||||
.observe_closure_duration(|| file.sync_all())?;
|
||||
.observe_flush_seconds(time_io_closure(|| Ok(file.sync_all()?))?);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Open or create WAL segment file. Caller must call seek to the wanted position.
|
||||
/// Returns `file` and `is_partial`.
|
||||
fn open_or_create(&self, segno: XLogSegNo) -> Result<(File, bool)> {
|
||||
fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
|
||||
@@ -335,13 +278,10 @@ impl Storage for PhysicalStorage {
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
let _timer = self.metrics.write_wal_seconds.start_timer();
|
||||
self.write_exact(startpos, buf)?;
|
||||
}
|
||||
|
||||
let write_seconds = time_io_closure(|| self.write_exact(startpos, buf))?;
|
||||
// WAL is written, updating write metrics
|
||||
self.metrics.write_wal_bytes.observe(buf.len() as f64);
|
||||
self.metrics.observe_write_seconds(write_seconds);
|
||||
self.metrics.observe_write_bytes(buf.len());
|
||||
|
||||
// figure out last record's end lsn for reporting (if we got the
|
||||
// whole record)
|
||||
@@ -444,6 +384,10 @@ impl Storage for PhysicalStorage {
|
||||
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to)
|
||||
})
|
||||
}
|
||||
|
||||
fn get_metrics(&self) -> WalStorageMetrics {
|
||||
self.metrics.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all WAL segments in timeline_dir that match the given predicate.
|
||||
|
||||
Reference in New Issue
Block a user