mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
Part 1 of 2, for moving the file cache onto disk. Because VMs are created by the control plane (and that's where the filesystem for the file cache is defined), we can't rely on any kind of synchronization between releases, so the change needs to be feature-gated (kind of), with the default remaining the same for now. See also: neondatabase/cloud#6593
216 lines
6.6 KiB
Rust
216 lines
6.6 KiB
Rust
#![cfg(target_os = "linux")]
|
|
|
|
use anyhow::Context;
|
|
use axum::{
|
|
extract::{ws::WebSocket, State, WebSocketUpgrade},
|
|
response::Response,
|
|
};
|
|
use axum::{routing::get, Router, Server};
|
|
use clap::Parser;
|
|
use futures::Future;
|
|
use std::{fmt::Debug, time::Duration};
|
|
use sysinfo::{RefreshKind, System, SystemExt};
|
|
use tokio::{sync::broadcast, task::JoinHandle};
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::{error, info};
|
|
|
|
use runner::Runner;
|
|
|
|
// Code that interfaces with agent
|
|
pub mod dispatcher;
|
|
pub mod protocol;
|
|
|
|
pub mod cgroup;
|
|
pub mod filecache;
|
|
pub mod runner;
|
|
|
|
/// The vm-monitor is an autoscaling component started by compute_ctl.
|
|
///
|
|
/// It carries out autoscaling decisions (upscaling/downscaling) and responds to
|
|
/// memory pressure by making requests to the autoscaler-agent.
|
|
#[derive(Debug, Parser)]
|
|
pub struct Args {
|
|
/// The name of the cgroup we should monitor for memory.high events. This
|
|
/// is the cgroup that postgres should be running in.
|
|
#[arg(short, long)]
|
|
pub cgroup: Option<String>,
|
|
|
|
/// The connection string for the Postgres file cache we should manage.
|
|
#[arg(short, long)]
|
|
pub pgconnstr: Option<String>,
|
|
|
|
/// Flag to signal that the Postgres file cache is on disk (i.e. not in memory aside from the
|
|
/// kernel's page cache), and therefore should not count against available memory.
|
|
//
|
|
// NB: Ideally this flag would directly refer to whether the file cache is in memory (rather
|
|
// than a roundabout way, via whether it's on disk), but in order to be backwards compatible
|
|
// during the switch away from an in-memory file cache, we had to default to the previous
|
|
// behavior.
|
|
#[arg(long)]
|
|
pub file_cache_on_disk: bool,
|
|
|
|
/// The address we should listen on for connection requests. For the
|
|
/// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369.
|
|
#[arg(short, long)]
|
|
pub addr: String,
|
|
}
|
|
|
|
impl Args {
|
|
pub fn addr(&self) -> &str {
|
|
&self.addr
|
|
}
|
|
}
|
|
|
|
/// The number of bytes in one mebibyte.
|
|
#[allow(non_upper_case_globals)]
|
|
const MiB: u64 = 1 << 20;
|
|
|
|
/// Convert a quantity in bytes to a quantity in mebibytes, generally for display
|
|
/// purposes. (Most calculations in this crate use bytes directly)
|
|
pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
|
|
(bytes as f32) / (MiB as f32)
|
|
}
|
|
|
|
pub fn get_total_system_memory() -> u64 {
|
|
System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
|
|
}
|
|
|
|
/// Global app state for the Axum server
|
|
#[derive(Debug, Clone)]
|
|
pub struct ServerState {
|
|
/// Used to close old connections.
|
|
///
|
|
/// When a new connection is made, we send a message signalling to the old
|
|
/// connection to close.
|
|
pub sender: broadcast::Sender<()>,
|
|
|
|
/// Used to cancel all spawned threads in the monitor.
|
|
pub token: CancellationToken,
|
|
|
|
// The CLI args
|
|
pub args: &'static Args,
|
|
}
|
|
|
|
/// Spawn a thread that may get cancelled by the provided [`CancellationToken`].
|
|
///
|
|
/// This is mainly meant to be called with futures that will be pending for a very
|
|
/// long time, or are not mean to return. If it is not desirable for the future to
|
|
/// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
|
|
/// be logged with `f`.
|
|
pub fn spawn_with_cancel<T, F>(
|
|
token: CancellationToken,
|
|
f: F,
|
|
future: T,
|
|
) -> JoinHandle<Option<T::Output>>
|
|
where
|
|
T: Future + Send + 'static,
|
|
T::Output: Send + 'static,
|
|
F: FnOnce(&T::Output) + Send + 'static,
|
|
{
|
|
tokio::spawn(async move {
|
|
tokio::select! {
|
|
_ = token.cancelled() => {
|
|
info!("received global kill signal");
|
|
None
|
|
}
|
|
res = future => {
|
|
f(&res);
|
|
Some(res)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
/// The entrypoint to the binary.
|
|
///
|
|
/// Set up tracing, parse arguments, and start an http server.
|
|
pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> {
|
|
// This channel is used to close old connections. When a new connection is
|
|
// made, we send a message signalling to the old connection to close.
|
|
let (sender, _) = tokio::sync::broadcast::channel::<()>(1);
|
|
|
|
let app = Router::new()
|
|
// This route gets upgraded to a websocket connection. We only support
|
|
// one connection at a time, which we enforce by killing old connections
|
|
// when we receive a new one.
|
|
.route("/monitor", get(ws_handler))
|
|
.with_state(ServerState {
|
|
sender,
|
|
token,
|
|
args,
|
|
});
|
|
|
|
let addr = args.addr();
|
|
let bound = Server::try_bind(&addr.parse().expect("parsing address should not fail"))
|
|
.with_context(|| format!("failed to bind to {addr}"))?;
|
|
|
|
info!(addr, "server bound");
|
|
|
|
bound
|
|
.serve(app.into_make_service())
|
|
.await
|
|
.context("server exited")?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Handles incoming websocket connections.
|
|
///
|
|
/// If we are already to connected to an agent, we kill that old connection
|
|
/// and accept the new one.
|
|
#[tracing::instrument(name = "/monitor", skip_all, fields(?args))]
|
|
pub async fn ws_handler(
|
|
ws: WebSocketUpgrade,
|
|
State(ServerState {
|
|
sender,
|
|
token,
|
|
args,
|
|
}): State<ServerState>,
|
|
) -> Response {
|
|
// Kill the old monitor
|
|
info!("closing old connection if there is one");
|
|
let _ = sender.send(());
|
|
|
|
// Start the new one. Wow, the cycle of death and rebirth
|
|
let closer = sender.subscribe();
|
|
ws.on_upgrade(|ws| start_monitor(ws, args, closer, token))
|
|
}
|
|
|
|
/// Starts the monitor. If startup fails or the monitor exits, an error will
|
|
/// be logged and our internal state will be reset to allow for new connections.
|
|
#[tracing::instrument(skip_all, fields(?args))]
|
|
async fn start_monitor(
|
|
ws: WebSocket,
|
|
args: &Args,
|
|
kill: broadcast::Receiver<()>,
|
|
token: CancellationToken,
|
|
) {
|
|
info!("accepted new websocket connection -> starting monitor");
|
|
let timeout = Duration::from_secs(4);
|
|
let monitor = tokio::time::timeout(
|
|
timeout,
|
|
Runner::new(Default::default(), args, ws, kill, token),
|
|
)
|
|
.await;
|
|
let mut monitor = match monitor {
|
|
Ok(Ok(monitor)) => monitor,
|
|
Ok(Err(error)) => {
|
|
error!(?error, "failed to create monitor");
|
|
return;
|
|
}
|
|
Err(_) => {
|
|
error!(
|
|
?timeout,
|
|
"creating monitor timed out (probably waiting to receive protocol range)"
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
info!("connected to agent");
|
|
|
|
match monitor.run().await {
|
|
Ok(()) => info!("monitor was killed due to new connection"),
|
|
Err(e) => error!(error = ?e, "monitor terminated unexpectedly"),
|
|
}
|
|
}
|