From 393f323106cfb7010f962fd96fad4b5cd5ffb526 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Wed, 3 Apr 2024 17:11:45 -0400 Subject: [PATCH] collect lfc stats from vm-monitor Signed-off-by: Alex Chi Z --- Cargo.lock | 2 ++ libs/vm_monitor/Cargo.toml | 2 ++ libs/vm_monitor/src/filecache.rs | 43 ++++++++++++++++++++++++++++++++ libs/vm_monitor/src/lib.rs | 17 ++++++++++++- libs/vm_monitor/src/metrics.rs | 17 +++++++++++++ libs/vm_monitor/src/runner.rs | 19 ++++++++++++++ 6 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 libs/vm_monitor/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index dae406e4ae..2264fd673a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6644,6 +6644,8 @@ dependencies = [ "clap", "futures", "inotify 0.10.2", + "once_cell", + "prometheus", "serde", "serde_json", "sysinfo", diff --git a/libs/vm_monitor/Cargo.toml b/libs/vm_monitor/Cargo.toml index 46e9f880a1..aaf19ecf04 100644 --- a/libs/vm_monitor/Cargo.toml +++ b/libs/vm_monitor/Cargo.toml @@ -16,6 +16,8 @@ axum.workspace = true clap.workspace = true futures.workspace = true inotify.workspace = true +once_cell.workspace = true +prometheus.workspace = true serde.workspace = true serde_json.workspace = true sysinfo.workspace = true diff --git a/libs/vm_monitor/src/filecache.rs b/libs/vm_monitor/src/filecache.rs index fe71e11197..3414a57f96 100644 --- a/libs/vm_monitor/src/filecache.rs +++ b/libs/vm_monitor/src/filecache.rs @@ -54,6 +54,15 @@ pub struct FileCacheConfig { spread_factor: f64, } +#[derive(Debug)] +pub struct FileCacheStats { + pub lfc_hits: u64, + pub lfc_writes: u64, + pub lfc_misses: u64, + pub lfc_used: u64, + pub lfc_working_set_size: u64, +} + impl Default for FileCacheConfig { fn default() -> Self { Self { @@ -222,6 +231,40 @@ impl FileCacheState { } } + /// Get file cache status + #[tracing::instrument(skip_all)] + pub async fn get_file_cache_stats(&mut self) -> anyhow::Result { + let queries = [ + "select lfc_value as lfc_hits from neon.neon_lfc_stats where lfc_key='file_cache_hits'", + "select lfc_value as lfc_writes from neon.neon_lfc_stats where lfc_key='file_cache_writes'", + "select lfc_value as lfc_misses from neon.neon_lfc_stats where lfc_key='file_cache_misses'", + "select lfc_value as lfc_used from neon.neon_lfc_stats where lfc_key='file_cache_used'", + "select neon.approximate_working_set_size(false) as approximate_working_set_size", + ]; + let mut result = Vec::with_capacity(queries.len()); + for query in queries { + let item = self + .query_with_retry(query, &[]) + .await + .context("failed to query pg for file cache size")? + .first() + .ok_or_else(|| anyhow!("file cache size query returned no rows"))? + // the queries will always return a value >= 0 + .try_get::<_, i64>(0) + // Since the size of the table is not negative, the cast is sound. + .map(|bytes| bytes as u64) + .context("failed to extract file cache stat from query result")?; + result.push(item); + } + Ok(FileCacheStats { + lfc_hits: result[0], + lfc_writes: result[1], + lfc_misses: result[2], + lfc_used: result[3], + lfc_working_set_size: result[4], + }) + } + /// Get the current size of the file cache. #[tracing::instrument(skip_all)] pub async fn get_file_cache_size(&mut self) -> anyhow::Result { diff --git a/libs/vm_monitor/src/lib.rs b/libs/vm_monitor/src/lib.rs index 89ca91fdd7..f4ffcee3f2 100644 --- a/libs/vm_monitor/src/lib.rs +++ b/libs/vm_monitor/src/lib.rs @@ -2,14 +2,17 @@ #![deny(clippy::undocumented_unsafe_blocks)] #![cfg(target_os = "linux")] +use ::prometheus::TextEncoder; use anyhow::Context; use axum::{ extract::{ws::WebSocket, State, WebSocketUpgrade}, response::Response, + routing::get, + Router, Server, }; -use axum::{routing::get, Router, Server}; use clap::Parser; use futures::Future; +use prometheus::Encoder; use std::{fmt::Debug, time::Duration}; use sysinfo::{RefreshKind, System, SystemExt}; use tokio::{sync::broadcast, task::JoinHandle}; @@ -24,6 +27,7 @@ pub mod protocol; pub mod cgroup; pub mod filecache; +pub mod metrics; pub mod runner; /// The vm-monitor is an autoscaling component started by compute_ctl. @@ -126,6 +130,17 @@ pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Res // one connection at a time, which we enforce by killing old connections // when we receive a new one. .route("/monitor", get(ws_handler)) + // Get promethus metrics + .route( + "/metrics", + get(|| async { + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + let metric_families = prometheus::gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + buffer + }), + ) .with_state(ServerState { sender, token, diff --git a/libs/vm_monitor/src/metrics.rs b/libs/vm_monitor/src/metrics.rs new file mode 100644 index 0000000000..eebe32f570 --- /dev/null +++ b/libs/vm_monitor/src/metrics.rs @@ -0,0 +1,17 @@ +use once_cell::sync::Lazy; +use prometheus::{opts, register_int_gauge, IntGauge}; + +pub(crate) static METRICS_LFC_HITS: Lazy = + Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_hits", "",)).unwrap()); + +pub(crate) static METRICS_LFC_MISSES: Lazy = + Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_misses", "",)).unwrap()); + +pub(crate) static METRICS_LFC_USED: Lazy = + Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_used", "",)).unwrap()); + +pub(crate) static METRICS_LFC_WRITES: Lazy = + Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_writes", "",)).unwrap()); + +pub(crate) static METRICS_LFC_WORKING_SET_SIZE: Lazy = + Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_working_set_size", "",)).unwrap()); diff --git a/libs/vm_monitor/src/runner.rs b/libs/vm_monitor/src/runner.rs index ca02637ecf..6af0a7eee4 100644 --- a/libs/vm_monitor/src/runner.rs +++ b/libs/vm_monitor/src/runner.rs @@ -413,10 +413,23 @@ impl Runner { } } + pub async fn update_lfc_stats(&mut self) -> anyhow::Result<()> { + if let Some(ref mut filecache) = self.filecache { + let stats = filecache.get_file_cache_stats().await?; + crate::metrics::METRICS_LFC_HITS.set(stats.lfc_hits as i64); + crate::metrics::METRICS_LFC_MISSES.set(stats.lfc_misses as i64); + crate::metrics::METRICS_LFC_USED.set(stats.lfc_used as i64); + crate::metrics::METRICS_LFC_WRITES.set(stats.lfc_writes as i64); + crate::metrics::METRICS_LFC_WORKING_SET_SIZE.set(stats.lfc_working_set_size as i64); + } + Ok(()) + } + // TODO: don't propagate errors, probably just warn!? #[tracing::instrument(skip_all)] pub async fn run(&mut self) -> anyhow::Result<()> { info!("starting dispatcher"); + let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { tokio::select! { signal = self.kill.recv() => { @@ -523,6 +536,12 @@ impl Runner { anyhow::bail!("dispatcher connection closed") } } + + _ = interval.tick() => { + if let Err(e) = self.update_lfc_stats().await { + warn!("{e}"); + } + } } } }