mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
If the cgroup integration was not enabled, this would cause compute_ctl to leak memory. Thankfully, we never use vm-monitor *without* the cgroup handling enabled, so this wasn't actually impacting us, but... it still looked suspicious, so figured it was worth changing.
491 lines
20 KiB
Rust
491 lines
20 KiB
Rust
//! Exposes the `Runner`, which handles messages received from agent and
|
|
//! sends upscale requests.
|
|
//!
|
|
//! This is the "Monitor" part of the monitor binary and is the main entrypoint for
|
|
//! all functionality.
|
|
|
|
use std::fmt::Debug;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use anyhow::{bail, Context};
|
|
use axum::extract::ws::{Message, WebSocket};
|
|
use futures::StreamExt;
|
|
use tokio::sync::broadcast;
|
|
use tokio::sync::mpsc;
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::{error, info, warn};
|
|
|
|
use crate::cgroup::{CgroupWatcher, Sequenced};
|
|
use crate::dispatcher::Dispatcher;
|
|
use crate::filecache::{FileCacheConfig, FileCacheState};
|
|
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
|
|
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
|
|
|
|
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
|
|
/// signals from the agent.
|
|
#[derive(Debug)]
|
|
pub struct Runner {
|
|
config: Config,
|
|
filecache: Option<FileCacheState>,
|
|
cgroup: Option<Arc<CgroupWatcher>>,
|
|
dispatcher: Dispatcher,
|
|
|
|
/// We "mint" new message ids by incrementing this counter and taking the value.
|
|
///
|
|
/// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated
|
|
/// by us vs the autoscaler-agent.
|
|
counter: usize,
|
|
|
|
last_upscale_request_at: Option<Instant>,
|
|
|
|
/// A signal to kill the main thread produced by `self.run()`. This is triggered
|
|
/// when the server receives a new connection. When the thread receives the
|
|
/// signal off this channel, it will gracefully shutdown.
|
|
kill: broadcast::Receiver<()>,
|
|
}
|
|
|
|
/// Configuration for a `Runner`
|
|
#[derive(Debug)]
|
|
pub struct Config {
|
|
/// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before
|
|
/// handing out the rest to userspace. This value is the estimated difference between the
|
|
/// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`.
|
|
///
|
|
/// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM
|
|
/// (i.e., physical RAM minus a few reserved bits and the kernel binary code)".
|
|
///
|
|
/// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory
|
|
/// size, rather than the self-reported memory size, according to the kernel.
|
|
///
|
|
/// TODO: this field is only necessary while we still have to trust the autoscaler-agent's
|
|
/// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
|
|
/// should be removed once we have a better solution there.
|
|
sys_buffer_bytes: u64,
|
|
}
|
|
|
|
impl Default for Config {
|
|
fn default() -> Self {
|
|
Self {
|
|
sys_buffer_bytes: 100 * MiB,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Runner {
|
|
/// Create a new monitor.
|
|
#[tracing::instrument(skip_all, fields(?config, ?args))]
|
|
pub async fn new(
|
|
config: Config,
|
|
args: &Args,
|
|
ws: WebSocket,
|
|
kill: broadcast::Receiver<()>,
|
|
token: CancellationToken,
|
|
) -> anyhow::Result<Runner> {
|
|
anyhow::ensure!(
|
|
config.sys_buffer_bytes != 0,
|
|
"invalid monitor Config: sys_buffer_bytes cannot be 0"
|
|
);
|
|
|
|
// *NOTE*: the dispatcher and cgroup manager talk through these channels
|
|
// so make sure they each get the correct half, nothing is droppped, etc.
|
|
let (notified_send, notified_recv) = mpsc::channel(1);
|
|
let (requesting_send, requesting_recv) = mpsc::channel(1);
|
|
|
|
let dispatcher = Dispatcher::new(ws, notified_send, requesting_recv)
|
|
.await
|
|
.context("error creating new dispatcher")?;
|
|
|
|
let mut state = Runner {
|
|
config,
|
|
filecache: None,
|
|
cgroup: None,
|
|
dispatcher,
|
|
counter: 1, // NB: must be odd, see the comment about the field for more.
|
|
last_upscale_request_at: None,
|
|
kill,
|
|
};
|
|
|
|
// If we have both the cgroup and file cache integrations enabled, it's possible for
|
|
// temporary failures to result in cgroup throttling (from memory.high), that in turn makes
|
|
// it near-impossible to connect to the file cache (because it times out). Unfortunately,
|
|
// we *do* still want to determine the file cache size before setting the cgroup's
|
|
// memory.high, so it's not as simple as just swapping the order.
|
|
//
|
|
// Instead, the resolution here is that on vm-monitor startup (note: happens on each
|
|
// connection from autoscaler-agent, possibly multiple times per compute_ctl lifecycle), we
|
|
// temporarily unset memory.high, to allow any existing throttling to dissipate. It's a bit
|
|
// of a hacky solution, but helps with reliability.
|
|
if let Some(name) = &args.cgroup {
|
|
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
|
|
// now, and then set limits later.
|
|
info!("initializing cgroup");
|
|
|
|
let (cgroup, cgroup_event_stream) = CgroupWatcher::new(name.clone(), requesting_send)
|
|
.context("failed to create cgroup manager")?;
|
|
|
|
info!("temporarily unsetting memory.high");
|
|
|
|
// Temporarily un-set cgroup memory.high; see above.
|
|
cgroup
|
|
.unset_memory_high()
|
|
.context("failed to unset memory.high")?;
|
|
|
|
let cgroup = Arc::new(cgroup);
|
|
|
|
let cgroup_clone = Arc::clone(&cgroup);
|
|
spawn_with_cancel(
|
|
token.clone(),
|
|
|_| error!("cgroup watcher terminated"),
|
|
async move { cgroup_clone.watch(notified_recv, cgroup_event_stream).await },
|
|
);
|
|
|
|
state.cgroup = Some(cgroup);
|
|
}
|
|
|
|
let mut file_cache_reserved_bytes = 0;
|
|
let mem = get_total_system_memory();
|
|
|
|
// We need to process file cache initialization before cgroup initialization, so that the memory
|
|
// allocated to the file cache is appropriately taken into account when we decide the cgroup's
|
|
// memory limits.
|
|
if let Some(connstr) = &args.pgconnstr {
|
|
info!("initializing file cache");
|
|
let config = match args.file_cache_on_disk {
|
|
true => FileCacheConfig::default_on_disk(),
|
|
false => FileCacheConfig::default_in_memory(),
|
|
};
|
|
|
|
let mut file_cache = FileCacheState::new(connstr, config, token)
|
|
.await
|
|
.context("failed to create file cache")?;
|
|
|
|
let size = file_cache
|
|
.get_file_cache_size()
|
|
.await
|
|
.context("error getting file cache size")?;
|
|
|
|
let new_size = file_cache.config.calculate_cache_size(mem);
|
|
info!(
|
|
initial = bytes_to_mebibytes(size),
|
|
new = bytes_to_mebibytes(new_size),
|
|
"setting initial file cache size",
|
|
);
|
|
|
|
// note: even if size == new_size, we want to explicitly set it, just
|
|
// to make sure that we have the permissions to do so
|
|
let actual_size = file_cache
|
|
.set_file_cache_size(new_size)
|
|
.await
|
|
.context("failed to set file cache size, possibly due to inadequate permissions")?;
|
|
if actual_size != new_size {
|
|
info!("file cache size actually got set to {actual_size}")
|
|
}
|
|
// Mark the resources given to the file cache as reserved, but only if it's in memory.
|
|
if !args.file_cache_on_disk {
|
|
file_cache_reserved_bytes = actual_size;
|
|
}
|
|
|
|
state.filecache = Some(file_cache);
|
|
}
|
|
|
|
if let Some(cgroup) = &state.cgroup {
|
|
let available = mem - file_cache_reserved_bytes;
|
|
let value = cgroup.config.calculate_memory_high_value(available);
|
|
|
|
info!(value, "setting memory.high");
|
|
|
|
cgroup
|
|
.set_memory_high_bytes(value)
|
|
.context("failed to set cgroup memory.high")?;
|
|
}
|
|
|
|
Ok(state)
|
|
}
|
|
|
|
/// Attempt to downscale filecache + cgroup
|
|
#[tracing::instrument(skip_all, fields(?target))]
|
|
pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
|
|
// Nothing to adjust
|
|
if self.cgroup.is_none() && self.filecache.is_none() {
|
|
info!("no action needed for downscale (no cgroup or file cache enabled)");
|
|
return Ok((
|
|
true,
|
|
"monitor is not managing cgroup or file cache".to_string(),
|
|
));
|
|
}
|
|
|
|
let requested_mem = target.mem;
|
|
let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
|
|
let expected_file_cache_mem_usage = self
|
|
.filecache
|
|
.as_ref()
|
|
.map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
|
|
.unwrap_or(0);
|
|
let mut new_cgroup_mem_high = 0;
|
|
if let Some(cgroup) = &self.cgroup {
|
|
new_cgroup_mem_high = cgroup
|
|
.config
|
|
.calculate_memory_high_value(usable_system_memory - expected_file_cache_mem_usage);
|
|
|
|
let current = cgroup
|
|
.current_memory_usage()
|
|
.context("failed to fetch cgroup memory")?;
|
|
|
|
if new_cgroup_mem_high < current + cgroup.config.memory_high_buffer_bytes {
|
|
let status = format!(
|
|
"{}: {} MiB (new high) < {} (current usage) + {} (buffer)",
|
|
"calculated memory.high too low",
|
|
bytes_to_mebibytes(new_cgroup_mem_high),
|
|
bytes_to_mebibytes(current),
|
|
bytes_to_mebibytes(cgroup.config.memory_high_buffer_bytes)
|
|
);
|
|
|
|
info!(status, "discontinuing downscale");
|
|
|
|
return Ok((false, status));
|
|
}
|
|
}
|
|
|
|
// The downscaling has been approved. Downscale the file cache, then the cgroup.
|
|
let mut status = vec![];
|
|
let mut file_cache_mem_usage = 0;
|
|
if let Some(file_cache) = &mut self.filecache {
|
|
let actual_usage = file_cache
|
|
.set_file_cache_size(expected_file_cache_mem_usage)
|
|
.await
|
|
.context("failed to set file cache size")?;
|
|
if file_cache.config.in_memory {
|
|
file_cache_mem_usage = actual_usage;
|
|
}
|
|
let message = format!(
|
|
"set file cache size to {} MiB (in memory = {})",
|
|
bytes_to_mebibytes(actual_usage),
|
|
file_cache.config.in_memory,
|
|
);
|
|
info!("downscale: {message}");
|
|
status.push(message);
|
|
}
|
|
|
|
if let Some(cgroup) = &self.cgroup {
|
|
let available_memory = usable_system_memory - file_cache_mem_usage;
|
|
|
|
if file_cache_mem_usage != expected_file_cache_mem_usage {
|
|
new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
|
|
}
|
|
|
|
// new_cgroup_mem_high is initialized to 0 but it is guaranteed to not be here
|
|
// since it is properly initialized in the previous cgroup if let block
|
|
cgroup
|
|
.set_memory_high_bytes(new_cgroup_mem_high)
|
|
.context("failed to set cgroup memory.high")?;
|
|
|
|
let message = format!(
|
|
"set cgroup memory.high to {} MiB, of new max {} MiB",
|
|
bytes_to_mebibytes(new_cgroup_mem_high),
|
|
bytes_to_mebibytes(available_memory)
|
|
);
|
|
info!("downscale: {message}");
|
|
status.push(message);
|
|
}
|
|
|
|
// TODO: make this status thing less jank
|
|
let status = status.join("; ");
|
|
Ok((true, status))
|
|
}
|
|
|
|
/// Handle new resources
|
|
#[tracing::instrument(skip_all, fields(?resources))]
|
|
pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
|
|
if self.filecache.is_none() && self.cgroup.is_none() {
|
|
info!("no action needed for upscale (no cgroup or file cache enabled)");
|
|
return Ok(());
|
|
}
|
|
|
|
let new_mem = resources.mem;
|
|
let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
|
|
|
|
// Get the file cache's expected contribution to the memory usage
|
|
let mut file_cache_mem_usage = 0;
|
|
if let Some(file_cache) = &mut self.filecache {
|
|
let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
|
|
info!(
|
|
target = bytes_to_mebibytes(expected_usage),
|
|
total = bytes_to_mebibytes(new_mem),
|
|
"updating file cache size",
|
|
);
|
|
|
|
let actual_usage = file_cache
|
|
.set_file_cache_size(expected_usage)
|
|
.await
|
|
.context("failed to set file cache size")?;
|
|
if file_cache.config.in_memory {
|
|
file_cache_mem_usage = actual_usage;
|
|
}
|
|
|
|
if actual_usage != expected_usage {
|
|
warn!(
|
|
"file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib",
|
|
bytes_to_mebibytes(expected_usage),
|
|
bytes_to_mebibytes(actual_usage)
|
|
)
|
|
}
|
|
}
|
|
|
|
if let Some(cgroup) = &self.cgroup {
|
|
let available_memory = usable_system_memory - file_cache_mem_usage;
|
|
let new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
|
|
info!(
|
|
target = bytes_to_mebibytes(new_cgroup_mem_high),
|
|
total = bytes_to_mebibytes(new_mem),
|
|
name = cgroup.path(),
|
|
"updating cgroup memory.high",
|
|
);
|
|
cgroup
|
|
.set_memory_high_bytes(new_cgroup_mem_high)
|
|
.context("failed to set cgroup memory.high")?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Take in a message and perform some action, such as downscaling or upscaling,
|
|
/// and return a message to be send back.
|
|
#[tracing::instrument(skip_all, fields(%id, message = ?inner))]
|
|
pub async fn process_message(
|
|
&mut self,
|
|
InboundMsg { inner, id }: InboundMsg,
|
|
) -> anyhow::Result<Option<OutboundMsg>> {
|
|
match inner {
|
|
InboundMsgKind::UpscaleNotification { granted } => {
|
|
self.handle_upscale(granted)
|
|
.await
|
|
.context("failed to handle upscale")?;
|
|
self.dispatcher
|
|
.notify_upscale(Sequenced::new(granted))
|
|
.await
|
|
.context("failed to notify notify cgroup of upscale")?;
|
|
Ok(Some(OutboundMsg::new(
|
|
OutboundMsgKind::UpscaleConfirmation {},
|
|
id,
|
|
)))
|
|
}
|
|
InboundMsgKind::DownscaleRequest { target } => self
|
|
.try_downscale(target)
|
|
.await
|
|
.context("failed to downscale")
|
|
.map(|(ok, status)| {
|
|
Some(OutboundMsg::new(
|
|
OutboundMsgKind::DownscaleResult { ok, status },
|
|
id,
|
|
))
|
|
}),
|
|
InboundMsgKind::InvalidMessage { error } => {
|
|
warn!(
|
|
%error, id, "received notification of an invalid message we sent"
|
|
);
|
|
Ok(None)
|
|
}
|
|
InboundMsgKind::InternalError { error } => {
|
|
warn!(error, id, "agent experienced an internal error");
|
|
Ok(None)
|
|
}
|
|
InboundMsgKind::HealthCheck {} => {
|
|
Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id)))
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO: don't propagate errors, probably just warn!?
|
|
#[tracing::instrument(skip_all)]
|
|
pub async fn run(&mut self) -> anyhow::Result<()> {
|
|
info!("starting dispatcher");
|
|
loop {
|
|
tokio::select! {
|
|
signal = self.kill.recv() => {
|
|
match signal {
|
|
Ok(()) => return Ok(()),
|
|
Err(e) => bail!("failed to receive kill signal: {e}")
|
|
}
|
|
}
|
|
// we need to propagate an upscale request
|
|
request = self.dispatcher.request_upscale_events.recv(), if self.cgroup.is_some() => {
|
|
if request.is_none() {
|
|
bail!("failed to listen for upscale event from cgroup")
|
|
}
|
|
|
|
// If it's been less than 1 second since the last time we requested upscaling,
|
|
// ignore the event, to avoid spamming the agent (otherwise, this can happen
|
|
// ~1k times per second).
|
|
if let Some(t) = self.last_upscale_request_at {
|
|
let elapsed = t.elapsed();
|
|
if elapsed < Duration::from_secs(1) {
|
|
info!(elapsed_millis = elapsed.as_millis(), "cgroup asked for upscale but too soon to forward the request, ignoring");
|
|
continue;
|
|
}
|
|
}
|
|
|
|
self.last_upscale_request_at = Some(Instant::now());
|
|
|
|
info!("cgroup asking for upscale; forwarding request");
|
|
self.counter += 2; // Increment, preserving parity (i.e. keep the
|
|
// counter odd). See the field comment for more.
|
|
self.dispatcher
|
|
.send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
|
|
.await
|
|
.context("failed to send message")?;
|
|
}
|
|
// there is a message from the agent
|
|
msg = self.dispatcher.source.next() => {
|
|
if let Some(msg) = msg {
|
|
// Don't use 'message' as a key as the string also uses
|
|
// that for its key
|
|
info!(?msg, "received message");
|
|
match msg {
|
|
Ok(msg) => {
|
|
let message: InboundMsg = match msg {
|
|
Message::Text(text) => {
|
|
serde_json::from_str(&text).context("failed to deserialize text message")?
|
|
}
|
|
other => {
|
|
warn!(
|
|
// Don't use 'message' as a key as the
|
|
// string also uses that for its key
|
|
msg = ?other,
|
|
"agent should only send text messages but received different type"
|
|
);
|
|
continue
|
|
},
|
|
};
|
|
|
|
let out = match self.process_message(message.clone()).await {
|
|
Ok(Some(out)) => out,
|
|
Ok(None) => continue,
|
|
Err(e) => {
|
|
let error = e.to_string();
|
|
warn!(?error, "error handling message");
|
|
OutboundMsg::new(
|
|
OutboundMsgKind::InternalError {
|
|
error
|
|
},
|
|
message.id
|
|
)
|
|
}
|
|
};
|
|
|
|
self.dispatcher
|
|
.send(out)
|
|
.await
|
|
.context("failed to send message")?;
|
|
}
|
|
Err(e) => warn!("{e}"),
|
|
}
|
|
} else {
|
|
anyhow::bail!("dispatcher connection closed")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|