mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
It makes the logs too verbose. ref https://neondb.slack.com/archives/C03F5SM1N02/p1694281232874719?thread_ts=1694272777.207109&cid=C03F5SM1N02
219 lines
6.6 KiB
Rust
219 lines
6.6 KiB
Rust
#![cfg(target_os = "linux")]
|
|
|
|
use anyhow::Context;
|
|
use axum::{
|
|
extract::{ws::WebSocket, State, WebSocketUpgrade},
|
|
response::Response,
|
|
};
|
|
use axum::{routing::get, Router, Server};
|
|
use clap::Parser;
|
|
use futures::Future;
|
|
use std::{fmt::Debug, time::Duration};
|
|
use sysinfo::{RefreshKind, System, SystemExt};
|
|
use tokio::{sync::broadcast, task::JoinHandle};
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::{error, info};
|
|
|
|
use runner::Runner;
|
|
|
|
// Code that interfaces with agent
|
|
pub mod dispatcher;
|
|
pub mod protocol;
|
|
|
|
pub mod cgroup;
|
|
pub mod filecache;
|
|
pub mod runner;
|
|
|
|
/// The vm-monitor is an autoscaling component started by compute_ctl.
|
|
///
|
|
/// It carries out autoscaling decisions (upscaling/downscaling) and responds to
|
|
/// memory pressure by making requests to the autoscaler-agent.
|
|
#[derive(Debug, Parser)]
|
|
pub struct Args {
|
|
/// The name of the cgroup we should monitor for memory.high events. This
|
|
/// is the cgroup that postgres should be running in.
|
|
#[arg(short, long)]
|
|
pub cgroup: Option<String>,
|
|
|
|
/// The connection string for the Postgres file cache we should manage.
|
|
#[arg(short, long)]
|
|
pub pgconnstr: Option<String>,
|
|
|
|
/// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
|
|
/// kernel's page cache), and therefore should not count against available memory.
|
|
//
|
|
// NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
|
|
// than a roundabout way, via whether it's on disk), but in order to be backwards compatible
|
|
// during the switch away from an in-memory file cache, we had to default to the previous
|
|
// behavior.
|
|
#[arg(long)]
|
|
pub file_cache_on_disk: bool,
|
|
|
|
/// The address we should listen on for connection requests. For the
|
|
/// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
|
|
#[arg(short, long)]
|
|
pub addr: String,
|
|
}
|
|
|
|
impl Args {
|
|
pub fn addr(&self) -> &str {
|
|
&self.addr
|
|
}
|
|
}
|
|
|
|
/// The number of bytes in one mebibyte.
|
|
#[allow(non_upper_case_globals)]
|
|
const MiB: u64 = 1 << 20;
|
|
|
|
/// Convert a quantity in bytes to a quantity in mebibytes, generally for display
|
|
/// purposes. (Most calculations in this crate use bytes directly)
|
|
pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
|
|
(bytes as f32) / (MiB as f32)
|
|
}
|
|
|
|
pub fn get_total_system_memory() -> u64 {
|
|
System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
|
|
}
|
|
|
|
/// Global app state for the Axum server
|
|
#[derive(Debug, Clone)]
|
|
pub struct ServerState {
|
|
/// Used to close old connections.
|
|
///
|
|
/// When a new connection is made, we send a message signalling to the old
|
|
/// connection to close.
|
|
pub sender: broadcast::Sender<()>,
|
|
|
|
/// Used to cancel all spawned threads in the monitor.
|
|
pub token: CancellationToken,
|
|
|
|
// The CLI args
|
|
pub args: &'static Args,
|
|
}
|
|
|
|
/// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
|
|
///
|
|
/// This is mainly meant to be called with futures that will be pending for a very
|
|
/// long time, or are not mean to return. If it is not desirable for the future to
|
|
/// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
|
|
/// be logged with `f`.
|
|
pub fn spawn_with_cancel<T, F>(
|
|
token: CancellationToken,
|
|
f: F,
|
|
future: T,
|
|
) -> JoinHandle<Option<T::Output>>
|
|
where
|
|
T: Future + Send + 'static,
|
|
T::Output: Send + 'static,
|
|
F: FnOnce(&T::Output) + Send + 'static,
|
|
{
|
|
tokio::spawn(async move {
|
|
tokio::select! {
|
|
_ = token.cancelled() => {
|
|
info!("received global kill signal");
|
|
None
|
|
}
|
|
res = future => {
|
|
f(&res);
|
|
Some(res)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
/// The entrypoint to the binary.
|
|
///
|
|
/// Set up tracing, parse arguments, and start an http server.
|
|
pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
|
|
// This channel is used to close old connections. When a new connection is
|
|
// made, we send a message signalling to the old connection to close.
|
|
let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
|
|
|
|
let app = Router::new()
|
|
// This route gets upgraded to a websocket connection. We only support
|
|
// one connection at a time, which we enforce by killing old connections
|
|
// when we receive a new one.
|
|
.route("/monitor", get(ws_handler))
|
|
.with_state(ServerState {
|
|
sender,
|
|
token,
|
|
args,
|
|
});
|
|
|
|
let addr = args.addr();
|
|
let bound = Server::try_bind(&addr.parse().expect("parsing address should not fail"))
|
|
.with_context(|| format!("failed to bind to {addr}"))?;
|
|
|
|
info!(addr, "server bound");
|
|
|
|
bound
|
|
.serve(app.into_make_service())
|
|
.await
|
|
.context("server exited")?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Handles incoming websocket connections.
|
|
///
|
|
/// If we are already to connected to an agent, we kill that old connection
|
|
/// and accept the new one.
|
|
#[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
|
|
pub async fn ws_handler(
|
|
ws: WebSocketUpgrade,
|
|
State(ServerState {
|
|
sender,
|
|
token,
|
|
args,
|
|
}): State<ServerState>,
|
|
) -> Response {
|
|
// Kill the old monitor
|
|
info!("closing old connection if there is one");
|
|
let _ = sender.send(());
|
|
|
|
// Start the new one. Wow, the cycle of death and rebirth
|
|
let closer = sender.subscribe();
|
|
ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
|
|
}
|
|
|
|
/// Starts the monitor. If startup fails or the monitor exits, an error will
|
|
/// be logged and our internal state will be reset to allow for new connections.
|
|
#[tracing::instrument(skip_all)]
|
|
async fn start_monitor(
|
|
ws: WebSocket,
|
|
args: &Args,
|
|
kill: broadcast::Receiver<()>,
|
|
token: CancellationToken,
|
|
) {
|
|
info!(
|
|
?args,
|
|
"accepted new websocket connection -> starting monitor"
|
|
);
|
|
let timeout = Duration::from_secs(4);
|
|
let monitor = tokio::time::timeout(
|
|
timeout,
|
|
Runner::new(Default::default(), args, ws, kill, token),
|
|
)
|
|
.await;
|
|
let mut monitor = match monitor {
|
|
Ok(Ok(monitor)) => monitor,
|
|
Ok(Err(error)) => {
|
|
error!(?error, "failed to create monitor");
|
|
return;
|
|
}
|
|
Err(_) => {
|
|
error!(
|
|
?timeout,
|
|
"creating monitor timed out (probably waiting to receive protocol range)"
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
info!("connected to agent");
|
|
|
|
match monitor.run().await {
|
|
Ok(()) => info!("monitor was killed due to new connection"),
|
|
Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
|
|
}
|
|
}
|