diff --git a/compute_tools/README.md b/compute_tools/README.md index 305ccae5dd..22a7de7cb7 100644 --- a/compute_tools/README.md +++ b/compute_tools/README.md @@ -19,9 +19,10 @@ Also `compute_ctl` spawns two separate service threads: - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the last activity requests. -If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM -compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates -downscaling and (eventually) will request immediate upscaling under resource pressure. +If `AUTOSCALING` environment variable is set, `compute_ctl` will start the +`vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes, +`vm-monitor` communicates with the VM autoscaling system. It coordinates +downscaling and requests immediate upscaling under resource pressure. Usage example: ```sh diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index b330dafa99..447da813f2 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -20,9 +20,10 @@ //! - `http-endpoint` runs a Hyper HTTP API server, which serves readiness and the //! last activity requests. //! -//! If the `vm-informant` binary is present at `/bin/vm-informant`, it will also be started. For VM -//! compute nodes, `vm-informant` communicates with the VM autoscaling system. It coordinates -//! downscaling and (eventually) will request immediate upscaling under resource pressure. +//! If `AUTOSCALING` environment variable is set, `compute_ctl` will start the +//! `vm-monitor` located in [`neon/libs/vm_monitor`]. For VM compute nodes, +//! `vm-monitor` communicates with the VM autoscaling system. It coordinates +//! downscaling and requests immediate upscaling under resource pressure. //! //! Usage example: //! ```sh diff --git a/libs/vm_monitor/README.md b/libs/vm_monitor/README.md index 4c5a196107..53cdecd9f3 100644 --- a/libs/vm_monitor/README.md +++ b/libs/vm_monitor/README.md @@ -16,3 +16,19 @@ in the `neon-postgres` cgroup and set its `memory.{max,high}`. * See also: [`neondatabase/vm-monitor`](https://github.com/neondatabase/vm-monitor/), where initial development of the monitor happened. The repository is no longer maintained but the commit history may be useful for debugging. + +## Structure + +The `vm-monitor` is loosely comprised of a few systems. These are: +* the server: this is just a simple `axum` server that accepts requests and +upgrades them to websocket connections. The server only allows one connection at +a time. This means that upon receiving a new connection, the server will terminate +and old one if it exists. +* the filecache: a struct that allows communication with the Postgres file cache. +On startup, we connect to the filecache and hold on to the connection for the +entire monitor lifetime. +* the cgroup watcher: the `CgroupWatcher` manages the `neon-postgres` cgroup by +listening for `memory.high` events and setting its `memory.{high,max}` values. +* the runner: the runner marries the filecache and cgroup watcher together, +communicating with the agent throught the `Dispatcher`, and then calling filecache +and cgroup watcher functions as needed to upscale and downscale diff --git a/libs/vm_monitor/src/dispatcher.rs b/libs/vm_monitor/src/dispatcher.rs index 9d3b966700..109a68fff1 100644 --- a/libs/vm_monitor/src/dispatcher.rs +++ b/libs/vm_monitor/src/dispatcher.rs @@ -1,7 +1,7 @@ //! Managing the websocket connection and other signals in the monitor. //! //! Contains types that manage the interaction (not data interchange, see `protocol`) -//! between informant and monitor, allowing us to to process and send messages in a +//! between agent and monitor, allowing us to to process and send messages in a //! straightforward way. The dispatcher also manages that signals that come from //! the cgroup (requesting upscale), and the signals that go to the cgroup //! (notifying it of upscale). @@ -24,16 +24,16 @@ use crate::protocol::{ /// The central handler for all communications in the monitor. /// /// The dispatcher has two purposes: -/// 1. Manage the connection to the informant, sending and receiving messages. +/// 1. Manage the connection to the agent, sending and receiving messages. /// 2. Communicate with the cgroup manager, notifying it when upscale is received, -/// and sending a message to the informant when the cgroup manager requests +/// and sending a message to the agent when the cgroup manager requests /// upscale. #[derive(Debug)] pub struct Dispatcher { - /// We read informant messages of of `source` + /// We read agent messages of of `source` pub(crate) source: SplitStream, - /// We send messages to the informant through `sink` + /// We send messages to the agent through `sink` sink: SplitSink, /// Used to notify the cgroup when we are upscaled. @@ -43,7 +43,7 @@ pub struct Dispatcher { /// we send an `UpscaleRequst` to the agent. pub(crate) request_upscale_events: mpsc::Receiver<()>, - /// The protocol version we have agreed to use with the informant. This is negotiated + /// The protocol version we have agreed to use with the agent. This is negotiated /// during the creation of the dispatcher, and should be the highest shared protocol /// version. /// @@ -56,9 +56,9 @@ pub struct Dispatcher { impl Dispatcher { /// Creates a new dispatcher using the passed-in connection. /// - /// Performs a negotiation with the informant to determine the highest protocol + /// Performs a negotiation with the agent to determine the highest protocol /// version that both support. This consists of two steps: - /// 1. Wait for the informant to sent the range of protocols it supports. + /// 1. Wait for the agent to sent the range of protocols it supports. /// 2. Send a protocol version that works for us as well, or an error if there /// is no compatible version. pub async fn new( @@ -69,7 +69,7 @@ impl Dispatcher { let (mut sink, mut source) = stream.split(); // Figure out the highest protocol version we both support - info!("waiting for informant to send protocol version range"); + info!("waiting for agent to send protocol version range"); let Some(message) = source.next().await else { bail!("websocket connection closed while performing protocol handshake") }; @@ -79,7 +79,7 @@ impl Dispatcher { let Message::Text(message_text) = message else { // All messages should be in text form, since we don't do any // pinging/ponging. See nhooyr/websocket's implementation and the - // informant/agent for more info + // agent for more info bail!("received non-text message during proocol handshake: {message:?}") }; @@ -88,32 +88,30 @@ impl Dispatcher { max: PROTOCOL_MAX_VERSION, }; - let informant_range: ProtocolRange = serde_json::from_str(&message_text) + let agent_range: ProtocolRange = serde_json::from_str(&message_text) .context("failed to deserialize protocol version range")?; - info!(range = ?informant_range, "received protocol version range"); + info!(range = ?agent_range, "received protocol version range"); - let highest_shared_version = match monitor_range.highest_shared_version(&informant_range) { + let highest_shared_version = match monitor_range.highest_shared_version(&agent_range) { Ok(version) => { sink.send(Message::Text( serde_json::to_string(&ProtocolResponse::Version(version)).unwrap(), )) .await - .context("failed to notify informant of negotiated protocol version")?; + .context("failed to notify agent of negotiated protocol version")?; version } Err(e) => { sink.send(Message::Text( serde_json::to_string(&ProtocolResponse::Error(format!( "Received protocol version range {} which does not overlap with {}", - informant_range, monitor_range + agent_range, monitor_range ))) .unwrap(), )) .await - .context( - "failed to notify informant of no overlap between protocol version ranges", - )?; + .context("failed to notify agent of no overlap between protocol version ranges")?; Err(e).context("error determining suitable protocol version range")? } }; @@ -137,7 +135,7 @@ impl Dispatcher { .context("failed to send resources and oneshot sender across channel") } - /// Send a message to the informant. + /// Send a message to the agent. /// /// Although this function is small, it has one major benefit: it is the only /// way to send data accross the connection, and you can only pass in a proper diff --git a/libs/vm_monitor/src/lib.rs b/libs/vm_monitor/src/lib.rs index 82d5d4b4d0..59a1435e1b 100644 --- a/libs/vm_monitor/src/lib.rs +++ b/libs/vm_monitor/src/lib.rs @@ -146,7 +146,7 @@ pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Res /// Handles incoming websocket connections. /// -/// If we are already to connected to an informant, we kill that old connection +/// If we are already to connected to an agent, we kill that old connection /// and accept the new one. #[tracing::instrument(name = "/monitor", skip_all, fields(?args))] pub async fn ws_handler( @@ -196,7 +196,7 @@ async fn start_monitor( return; } }; - info!("connected to informant"); + info!("connected to agent"); match monitor.run().await { Ok(()) => info!("monitor was killed due to new connection"), diff --git a/libs/vm_monitor/src/protocol.rs b/libs/vm_monitor/src/protocol.rs index c6f1f0f718..5f07435503 100644 --- a/libs/vm_monitor/src/protocol.rs +++ b/libs/vm_monitor/src/protocol.rs @@ -1,13 +1,13 @@ -//! Types representing protocols and actual informant-monitor messages. +//! Types representing protocols and actual agent-monitor messages. //! //! The pervasive use of serde modifiers throughout this module is to ease //! serialization on the go side. Because go does not have enums (which model //! messages well), it is harder to model messages, and we accomodate that with //! serde. //! -//! *Note*: the informant sends and receives messages in different ways. +//! *Note*: the agent sends and receives messages in different ways. //! -//! The informant serializes messages in the form and then sends them. The use +//! The agent serializes messages in the form and then sends them. The use //! of `#[serde(tag = "type", content = "content")]` allows us to use `Type` //! to determine how to deserialize `Content`. //! ```ignore @@ -25,9 +25,9 @@ //! Id uint64 //! } //! ``` -//! After reading the type field, the informant will decode the entire message +//! After reading the type field, the agent will decode the entire message //! again, this time into the correct type using the embedded fields. -//! Because the informant cannot just extract the json contained in a certain field +//! Because the agent cannot just extract the json contained in a certain field //! (it initially deserializes to `map[string]interface{}`), we keep the fields //! at the top level, so the entire piece of json can be deserialized into a struct, //! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored. @@ -37,7 +37,7 @@ use std::cmp; use serde::{de::Error, Deserialize, Serialize}; -/// A Message we send to the informant. +/// A Message we send to the agent. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct OutboundMsg { #[serde(flatten)] @@ -51,31 +51,31 @@ impl OutboundMsg { } } -/// The different underlying message types we can send to the informant. +/// The different underlying message types we can send to the agent. #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(tag = "type")] pub enum OutboundMsgKind { - /// Indicates that the informant sent an invalid message, i.e, we couldn't + /// Indicates that the agent sent an invalid message, i.e, we couldn't /// properly deserialize it. InvalidMessage { error: String }, /// Indicates that we experienced an internal error while processing a message. /// For example, if a cgroup operation fails while trying to handle an upscale, /// we return `InternalError`. InternalError { error: String }, - /// Returned to the informant once we have finished handling an upscale. If the + /// Returned to the agent once we have finished handling an upscale. If the /// handling was unsuccessful, an `InternalError` will get returned instead. /// *Note*: this is a struct variant because of the way go serializes struct{} UpscaleConfirmation {}, /// Indicates to the monitor that we are urgently requesting resources. /// *Note*: this is a struct variant because of the way go serializes struct{} UpscaleRequest {}, - /// Returned to the informant once we have finished attempting to downscale. If + /// Returned to the agent once we have finished attempting to downscale. If /// an error occured trying to do so, an `InternalError` will get returned instead. /// However, if we are simply unsuccessful (for example, do to needing the resources), /// that gets included in the `DownscaleResult`. DownscaleResult { // FIXME for the future (once the informant is deprecated) - // As of the time of writing, the informant/agent version of this struct is + // As of the time of writing, the agent/informant version of this struct is // called api.DownscaleResult. This struct has uppercase fields which are // serialized as such. Thus, we serialize using uppercase names so we don't // have to make a breaking change to the agent<->informant protocol. Once @@ -88,12 +88,12 @@ pub enum OutboundMsgKind { status: String, }, /// Part of the bidirectional heartbeat. The heartbeat is initiated by the - /// informant. + /// agent. /// *Note*: this is a struct variant because of the way go serializes struct{} HealthCheck {}, } -/// A message received form the informant. +/// A message received form the agent. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct InboundMsg { #[serde(flatten)] @@ -101,7 +101,7 @@ pub struct InboundMsg { pub(crate) id: usize, } -/// The different underlying message types we can receive from the informant. +/// The different underlying message types we can receive from the agent. #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(tag = "type", content = "content")] pub enum InboundMsgKind { @@ -120,14 +120,14 @@ pub enum InboundMsgKind { /// when done. DownscaleRequest { target: Resources }, /// Part of the bidirectional heartbeat. The heartbeat is initiated by the - /// informant. + /// agent. /// *Note*: this is a struct variant because of the way go serializes struct{} HealthCheck {}, } /// Represents the resources granted to a VM. #[derive(Serialize, Deserialize, Debug, Clone, Copy)] -// Renamed because the agent/informant has multiple resources types: +// Renamed because the agent has multiple resources types: // `Resources` (milliCPU/memory slots) // `Allocation` (vCPU/bytes) <- what we correspond to #[serde(rename(serialize = "Allocation", deserialize = "Allocation"))] @@ -151,7 +151,7 @@ pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0; pub struct ProtocolVersion(u8); impl ProtocolVersion { - /// Represents v1.0 of the informant<-> monitor protocol - the initial version + /// Represents v1.0 of the agent<-> monitor protocol - the initial version /// /// Currently the latest version. const V1_0: ProtocolVersion = ProtocolVersion(1); diff --git a/libs/vm_monitor/src/runner.rs b/libs/vm_monitor/src/runner.rs index cf871707f0..356f941b5a 100644 --- a/libs/vm_monitor/src/runner.rs +++ b/libs/vm_monitor/src/runner.rs @@ -1,4 +1,4 @@ -//! Exposes the `Runner`, which handles messages received from informant and +//! Exposes the `Runner`, which handles messages received from agent and //! sends upscale requests. //! //! This is the "Monitor" part of the monitor binary and is the main entrypoint for @@ -21,8 +21,8 @@ use crate::filecache::{FileCacheConfig, FileCacheState}; use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources}; use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB}; -/// Central struct that interacts with informant, dispatcher, and cgroup to handle -/// signals from the informant. +/// Central struct that interacts with agent, dispatcher, and cgroup to handle +/// signals from the agent. #[derive(Debug)] pub struct Runner { config: Config, @@ -371,7 +371,7 @@ impl Runner { Ok(None) } InboundMsgKind::InternalError { error } => { - warn!(error, id, "informant experienced an internal error"); + warn!(error, id, "agent experienced an internal error"); Ok(None) } InboundMsgKind::HealthCheck {} => { @@ -405,7 +405,7 @@ impl Runner { .await .context("failed to send message")?; } - // there is a message from the informant + // there is a message from the agent msg = self.dispatcher.source.next() => { if let Some(msg) = msg { // Don't use 'message' as a key as the string also uses @@ -422,7 +422,7 @@ impl Runner { // Don't use 'message' as a key as the // string also uses that for its key msg = ?other, - "informant should only send text messages but received different type" + "agent should only send text messages but received different type" ); continue },