mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
collect lfc stats from vm-monitor
Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -6644,6 +6644,8 @@ dependencies = [
|
||||
"clap",
|
||||
"futures",
|
||||
"inotify 0.10.2",
|
||||
"once_cell",
|
||||
"prometheus",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sysinfo",
|
||||
|
||||
@@ -16,6 +16,8 @@ axum.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
inotify.workspace = true
|
||||
once_cell.workspace = true
|
||||
prometheus.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
sysinfo.workspace = true
|
||||
|
||||
@@ -54,6 +54,15 @@ pub struct FileCacheConfig {
|
||||
spread_factor: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FileCacheStats {
|
||||
pub lfc_hits: u64,
|
||||
pub lfc_writes: u64,
|
||||
pub lfc_misses: u64,
|
||||
pub lfc_used: u64,
|
||||
pub lfc_working_set_size: u64,
|
||||
}
|
||||
|
||||
impl Default for FileCacheConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@@ -222,6 +231,40 @@ impl FileCacheState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get file cache status
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn get_file_cache_stats(&mut self) -> anyhow::Result<FileCacheStats> {
|
||||
let queries = [
|
||||
"select lfc_value as lfc_hits from neon.neon_lfc_stats where lfc_key='file_cache_hits'",
|
||||
"select lfc_value as lfc_writes from neon.neon_lfc_stats where lfc_key='file_cache_writes'",
|
||||
"select lfc_value as lfc_misses from neon.neon_lfc_stats where lfc_key='file_cache_misses'",
|
||||
"select lfc_value as lfc_used from neon.neon_lfc_stats where lfc_key='file_cache_used'",
|
||||
"select neon.approximate_working_set_size(false) as approximate_working_set_size",
|
||||
];
|
||||
let mut result = Vec::with_capacity(queries.len());
|
||||
for query in queries {
|
||||
let item = self
|
||||
.query_with_retry(query, &[])
|
||||
.await
|
||||
.context("failed to query pg for file cache size")?
|
||||
.first()
|
||||
.ok_or_else(|| anyhow!("file cache size query returned no rows"))?
|
||||
// the queries will always return a value >= 0
|
||||
.try_get::<_, i64>(0)
|
||||
// Since the size of the table is not negative, the cast is sound.
|
||||
.map(|bytes| bytes as u64)
|
||||
.context("failed to extract file cache stat from query result")?;
|
||||
result.push(item);
|
||||
}
|
||||
Ok(FileCacheStats {
|
||||
lfc_hits: result[0],
|
||||
lfc_writes: result[1],
|
||||
lfc_misses: result[2],
|
||||
lfc_used: result[3],
|
||||
lfc_working_set_size: result[4],
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the current size of the file cache.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
|
||||
|
||||
@@ -2,14 +2,17 @@
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
#![cfg(target_os = "linux")]
|
||||
|
||||
use ::prometheus::TextEncoder;
|
||||
use anyhow::Context;
|
||||
use axum::{
|
||||
extract::{ws::WebSocket, State, WebSocketUpgrade},
|
||||
response::Response,
|
||||
routing::get,
|
||||
Router, Server,
|
||||
};
|
||||
use axum::{routing::get, Router, Server};
|
||||
use clap::Parser;
|
||||
use futures::Future;
|
||||
use prometheus::Encoder;
|
||||
use std::{fmt::Debug, time::Duration};
|
||||
use sysinfo::{RefreshKind, System, SystemExt};
|
||||
use tokio::{sync::broadcast, task::JoinHandle};
|
||||
@@ -24,6 +27,7 @@ pub mod protocol;
|
||||
|
||||
pub mod cgroup;
|
||||
pub mod filecache;
|
||||
pub mod metrics;
|
||||
pub mod runner;
|
||||
|
||||
/// The vm-monitor is an autoscaling component started by compute_ctl.
|
||||
@@ -126,6 +130,17 @@ pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Res
|
||||
// one connection at a time, which we enforce by killing old connections
|
||||
// when we receive a new one.
|
||||
.route("/monitor", get(ws_handler))
|
||||
// Get promethus metrics
|
||||
.route(
|
||||
"/metrics",
|
||||
get(|| async {
|
||||
let mut buffer = vec![];
|
||||
let encoder = TextEncoder::new();
|
||||
let metric_families = prometheus::gather();
|
||||
encoder.encode(&metric_families, &mut buffer).unwrap();
|
||||
buffer
|
||||
}),
|
||||
)
|
||||
.with_state(ServerState {
|
||||
sender,
|
||||
token,
|
||||
|
||||
17
libs/vm_monitor/src/metrics.rs
Normal file
17
libs/vm_monitor/src/metrics.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
use once_cell::sync::Lazy;
|
||||
use prometheus::{opts, register_int_gauge, IntGauge};
|
||||
|
||||
pub(crate) static METRICS_LFC_HITS: Lazy<IntGauge> =
|
||||
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_hits", "",)).unwrap());
|
||||
|
||||
pub(crate) static METRICS_LFC_MISSES: Lazy<IntGauge> =
|
||||
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_misses", "",)).unwrap());
|
||||
|
||||
pub(crate) static METRICS_LFC_USED: Lazy<IntGauge> =
|
||||
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_used", "",)).unwrap());
|
||||
|
||||
pub(crate) static METRICS_LFC_WRITES: Lazy<IntGauge> =
|
||||
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_writes", "",)).unwrap());
|
||||
|
||||
pub(crate) static METRICS_LFC_WORKING_SET_SIZE: Lazy<IntGauge> =
|
||||
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_working_set_size", "",)).unwrap());
|
||||
@@ -413,10 +413,23 @@ impl Runner {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_lfc_stats(&mut self) -> anyhow::Result<()> {
|
||||
if let Some(ref mut filecache) = self.filecache {
|
||||
let stats = filecache.get_file_cache_stats().await?;
|
||||
crate::metrics::METRICS_LFC_HITS.set(stats.lfc_hits as i64);
|
||||
crate::metrics::METRICS_LFC_MISSES.set(stats.lfc_misses as i64);
|
||||
crate::metrics::METRICS_LFC_USED.set(stats.lfc_used as i64);
|
||||
crate::metrics::METRICS_LFC_WRITES.set(stats.lfc_writes as i64);
|
||||
crate::metrics::METRICS_LFC_WORKING_SET_SIZE.set(stats.lfc_working_set_size as i64);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: don't propagate errors, probably just warn!?
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn run(&mut self) -> anyhow::Result<()> {
|
||||
info!("starting dispatcher");
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||||
loop {
|
||||
tokio::select! {
|
||||
signal = self.kill.recv() => {
|
||||
@@ -523,6 +536,12 @@ impl Runner {
|
||||
anyhow::bail!("dispatcher connection closed")
|
||||
}
|
||||
}
|
||||
|
||||
_ = interval.tick() => {
|
||||
if let Err(e) = self.update_lfc_stats().await {
|
||||
warn!("{e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user