mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-19 21:30:38 +00:00
Compare commits
1 Commits
hackathon/
...
skyzh/lfc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
393f323106 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -6644,6 +6644,8 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"futures",
|
"futures",
|
||||||
"inotify 0.10.2",
|
"inotify 0.10.2",
|
||||||
|
"once_cell",
|
||||||
|
"prometheus",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sysinfo",
|
"sysinfo",
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ axum.workspace = true
|
|||||||
clap.workspace = true
|
clap.workspace = true
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
inotify.workspace = true
|
inotify.workspace = true
|
||||||
|
once_cell.workspace = true
|
||||||
|
prometheus.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
sysinfo.workspace = true
|
sysinfo.workspace = true
|
||||||
|
|||||||
@@ -54,6 +54,15 @@ pub struct FileCacheConfig {
|
|||||||
spread_factor: f64,
|
spread_factor: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct FileCacheStats {
|
||||||
|
pub lfc_hits: u64,
|
||||||
|
pub lfc_writes: u64,
|
||||||
|
pub lfc_misses: u64,
|
||||||
|
pub lfc_used: u64,
|
||||||
|
pub lfc_working_set_size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for FileCacheConfig {
|
impl Default for FileCacheConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -222,6 +231,40 @@ impl FileCacheState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get file cache status
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub async fn get_file_cache_stats(&mut self) -> anyhow::Result<FileCacheStats> {
|
||||||
|
let queries = [
|
||||||
|
"select lfc_value as lfc_hits from neon.neon_lfc_stats where lfc_key='file_cache_hits'",
|
||||||
|
"select lfc_value as lfc_writes from neon.neon_lfc_stats where lfc_key='file_cache_writes'",
|
||||||
|
"select lfc_value as lfc_misses from neon.neon_lfc_stats where lfc_key='file_cache_misses'",
|
||||||
|
"select lfc_value as lfc_used from neon.neon_lfc_stats where lfc_key='file_cache_used'",
|
||||||
|
"select neon.approximate_working_set_size(false) as approximate_working_set_size",
|
||||||
|
];
|
||||||
|
let mut result = Vec::with_capacity(queries.len());
|
||||||
|
for query in queries {
|
||||||
|
let item = self
|
||||||
|
.query_with_retry(query, &[])
|
||||||
|
.await
|
||||||
|
.context("failed to query pg for file cache size")?
|
||||||
|
.first()
|
||||||
|
.ok_or_else(|| anyhow!("file cache size query returned no rows"))?
|
||||||
|
// the queries will always return a value >= 0
|
||||||
|
.try_get::<_, i64>(0)
|
||||||
|
// Since the size of the table is not negative, the cast is sound.
|
||||||
|
.map(|bytes| bytes as u64)
|
||||||
|
.context("failed to extract file cache stat from query result")?;
|
||||||
|
result.push(item);
|
||||||
|
}
|
||||||
|
Ok(FileCacheStats {
|
||||||
|
lfc_hits: result[0],
|
||||||
|
lfc_writes: result[1],
|
||||||
|
lfc_misses: result[2],
|
||||||
|
lfc_used: result[3],
|
||||||
|
lfc_working_set_size: result[4],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the current size of the file cache.
|
/// Get the current size of the file cache.
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
|
pub async fn get_file_cache_size(&mut self) -> anyhow::Result<u64> {
|
||||||
|
|||||||
@@ -2,14 +2,17 @@
|
|||||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||||
#![cfg(target_os = "linux")]
|
#![cfg(target_os = "linux")]
|
||||||
|
|
||||||
|
use ::prometheus::TextEncoder;
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{ws::WebSocket, State, WebSocketUpgrade},
|
extract::{ws::WebSocket, State, WebSocketUpgrade},
|
||||||
response::Response,
|
response::Response,
|
||||||
|
routing::get,
|
||||||
|
Router, Server,
|
||||||
};
|
};
|
||||||
use axum::{routing::get, Router, Server};
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
|
use prometheus::Encoder;
|
||||||
use std::{fmt::Debug, time::Duration};
|
use std::{fmt::Debug, time::Duration};
|
||||||
use sysinfo::{RefreshKind, System, SystemExt};
|
use sysinfo::{RefreshKind, System, SystemExt};
|
||||||
use tokio::{sync::broadcast, task::JoinHandle};
|
use tokio::{sync::broadcast, task::JoinHandle};
|
||||||
@@ -24,6 +27,7 @@ pub mod protocol;
|
|||||||
|
|
||||||
pub mod cgroup;
|
pub mod cgroup;
|
||||||
pub mod filecache;
|
pub mod filecache;
|
||||||
|
pub mod metrics;
|
||||||
pub mod runner;
|
pub mod runner;
|
||||||
|
|
||||||
/// The vm-monitor is an autoscaling component started by compute_ctl.
|
/// The vm-monitor is an autoscaling component started by compute_ctl.
|
||||||
@@ -126,6 +130,17 @@ pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Res
|
|||||||
// one connection at a time, which we enforce by killing old connections
|
// one connection at a time, which we enforce by killing old connections
|
||||||
// when we receive a new one.
|
// when we receive a new one.
|
||||||
.route("/monitor", get(ws_handler))
|
.route("/monitor", get(ws_handler))
|
||||||
|
// Get promethus metrics
|
||||||
|
.route(
|
||||||
|
"/metrics",
|
||||||
|
get(|| async {
|
||||||
|
let mut buffer = vec![];
|
||||||
|
let encoder = TextEncoder::new();
|
||||||
|
let metric_families = prometheus::gather();
|
||||||
|
encoder.encode(&metric_families, &mut buffer).unwrap();
|
||||||
|
buffer
|
||||||
|
}),
|
||||||
|
)
|
||||||
.with_state(ServerState {
|
.with_state(ServerState {
|
||||||
sender,
|
sender,
|
||||||
token,
|
token,
|
||||||
|
|||||||
17
libs/vm_monitor/src/metrics.rs
Normal file
17
libs/vm_monitor/src/metrics.rs
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use prometheus::{opts, register_int_gauge, IntGauge};
|
||||||
|
|
||||||
|
pub(crate) static METRICS_LFC_HITS: Lazy<IntGauge> =
|
||||||
|
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_hits", "",)).unwrap());
|
||||||
|
|
||||||
|
pub(crate) static METRICS_LFC_MISSES: Lazy<IntGauge> =
|
||||||
|
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_misses", "",)).unwrap());
|
||||||
|
|
||||||
|
pub(crate) static METRICS_LFC_USED: Lazy<IntGauge> =
|
||||||
|
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_used", "",)).unwrap());
|
||||||
|
|
||||||
|
pub(crate) static METRICS_LFC_WRITES: Lazy<IntGauge> =
|
||||||
|
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_writes", "",)).unwrap());
|
||||||
|
|
||||||
|
pub(crate) static METRICS_LFC_WORKING_SET_SIZE: Lazy<IntGauge> =
|
||||||
|
Lazy::new(|| register_int_gauge!(opts!("vm_monitor_lfc_working_set_size", "",)).unwrap());
|
||||||
@@ -413,10 +413,23 @@ impl Runner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn update_lfc_stats(&mut self) -> anyhow::Result<()> {
|
||||||
|
if let Some(ref mut filecache) = self.filecache {
|
||||||
|
let stats = filecache.get_file_cache_stats().await?;
|
||||||
|
crate::metrics::METRICS_LFC_HITS.set(stats.lfc_hits as i64);
|
||||||
|
crate::metrics::METRICS_LFC_MISSES.set(stats.lfc_misses as i64);
|
||||||
|
crate::metrics::METRICS_LFC_USED.set(stats.lfc_used as i64);
|
||||||
|
crate::metrics::METRICS_LFC_WRITES.set(stats.lfc_writes as i64);
|
||||||
|
crate::metrics::METRICS_LFC_WORKING_SET_SIZE.set(stats.lfc_working_set_size as i64);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: don't propagate errors, probably just warn!?
|
// TODO: don't propagate errors, probably just warn!?
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub async fn run(&mut self) -> anyhow::Result<()> {
|
pub async fn run(&mut self) -> anyhow::Result<()> {
|
||||||
info!("starting dispatcher");
|
info!("starting dispatcher");
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
signal = self.kill.recv() => {
|
signal = self.kill.recv() => {
|
||||||
@@ -523,6 +536,12 @@ impl Runner {
|
|||||||
anyhow::bail!("dispatcher connection closed")
|
anyhow::bail!("dispatcher connection closed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ = interval.tick() => {
|
||||||
|
if let Err(e) = self.update_lfc_stats().await {
|
||||||
|
warn!("{e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user