vm-monitor: Switch from memory.high to polling memory.stat (#5524)

tl;dr it's really hard to avoid throttling from memory.high, and it
counts tmpfs & page cache usage, so it's also hard to make sense of.

In the interest of fixing things quickly with something that should be
*good enough*, this PR switches to instead periodically fetch memory
statistics from the cgroup's memory.stat and use that data to determine
if and when we should upscale.

This PR fixes #5444, which has a lot more detail on the difficulties
we've hit with memory.high. This PR also supersedes #5488.
This commit is contained in:
Em Sharnoff
2023-10-17 15:30:40 -07:00
committed by GitHub
parent 543b8153c6
commit 9fe5cc6a82
4 changed files with 366 additions and 738 deletions

View File

@@ -27,8 +27,8 @@ and old one if it exists.
* the filecache: a struct that allows communication with the Postgres file cache. * the filecache: a struct that allows communication with the Postgres file cache.
On startup, we connect to the filecache and hold on to the connection for the On startup, we connect to the filecache and hold on to the connection for the
entire monitor lifetime. entire monitor lifetime.
* the cgroup watcher: the `CgroupWatcher` manages the `neon-postgres` cgroup by * the cgroup watcher: the `CgroupWatcher` polls the `neon-postgres` cgroup's memory
listening for `memory.high` events and setting its `memory.{high,max}` values. usage and sends rolling aggregates to the runner.
* the runner: the runner marries the filecache and cgroup watcher together, * the runner: the runner marries the filecache and cgroup watcher together,
communicating with the agent throught the `Dispatcher`, and then calling filecache communicating with the agent throught the `Dispatcher`, and then calling filecache
and cgroup watcher functions as needed to upscale and downscale and cgroup watcher functions as needed to upscale and downscale

View File

@@ -1,161 +1,38 @@
use std::{ use std::fmt::{self, Debug, Formatter};
fmt::{Debug, Display}, use std::time::{Duration, Instant};
fs,
pin::pin,
sync::atomic::{AtomicU64, Ordering},
};
use anyhow::{anyhow, bail, Context}; use anyhow::{anyhow, Context};
use cgroups_rs::{ use cgroups_rs::{
freezer::FreezerController, hierarchies::{self, is_cgroup2_unified_mode},
hierarchies::{self, is_cgroup2_unified_mode, UNIFIED_MOUNTPOINT},
memory::MemController, memory::MemController,
MaxValue, Subsystem,
Subsystem::{Freezer, Mem},
}; };
use inotify::{EventStream, Inotify, WatchMask}; use tokio::sync::watch;
use tokio::sync::mpsc::{self, error::TryRecvError};
use tokio::time::{Duration, Instant};
use tokio_stream::{Stream, StreamExt};
use tracing::{info, warn}; use tracing::{info, warn};
use crate::protocol::Resources;
use crate::MiB;
/// Monotonically increasing counter of the number of memory.high events
/// the cgroup has experienced.
///
/// We use this to determine if a modification to the `memory.events` file actually
/// changed the `high` field. If not, we don't care about the change. When we
/// read the file, we check the `high` field in the file against `MEMORY_EVENT_COUNT`
/// to see if it changed since last time.
pub static MEMORY_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
/// Monotonically increasing counter that gives each cgroup event a unique id.
///
/// This allows us to answer questions like "did this upscale arrive before this
/// memory.high?". This static is also used by the `Sequenced` type to "tag" values
/// with a sequence number. As such, prefer to used the `Sequenced` type rather
/// than this static directly.
static EVENT_SEQUENCE_NUMBER: AtomicU64 = AtomicU64::new(0);
/// A memory event type reported in memory.events.
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum MemoryEvent {
Low,
High,
Max,
Oom,
OomKill,
OomGroupKill,
}
impl MemoryEvent {
fn as_str(&self) -> &str {
match self {
MemoryEvent::Low => "low",
MemoryEvent::High => "high",
MemoryEvent::Max => "max",
MemoryEvent::Oom => "oom",
MemoryEvent::OomKill => "oom_kill",
MemoryEvent::OomGroupKill => "oom_group_kill",
}
}
}
impl Display for MemoryEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
/// Configuration for a `CgroupWatcher` /// Configuration for a `CgroupWatcher`
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Config { pub struct Config {
// The target difference between the total memory reserved for the cgroup /// Interval at which we should be fetching memory statistics
// and the value of the cgroup's memory.high. memory_poll_interval: Duration,
//
// In other words, memory.high + oom_buffer_bytes will equal the total memory that the cgroup may
// use (equal to system memory, minus whatever's taken out for the file cache).
oom_buffer_bytes: u64,
// The amount of memory, in bytes, below a proposed new value for /// The number of samples used in constructing aggregated memory statistics
// memory.high that the cgroup's memory usage must be for us to downscale memory_history_len: usize,
// /// The number of most recent samples that will be periodically logged.
// In other words, we can downscale only when: ///
// /// Each sample is logged exactly once. Increasing this value means that recent samples will be
// memory.current + memory_high_buffer_bytes < (proposed) memory.high /// logged less frequently, and vice versa.
// ///
// TODO: there's some minor issues with this approach -- in particular, that we might have /// For simplicity, this value must be greater than or equal to `memory_history_len`.
// memory in use by the kernel's page cache that we're actually ok with getting rid of. memory_history_log_interval: usize,
pub(crate) memory_high_buffer_bytes: u64,
// The maximum duration, in milliseconds, that we're allowed to pause
// the cgroup for while waiting for the autoscaler-agent to upscale us
max_upscale_wait: Duration,
// The required minimum time, in milliseconds, that we must wait before re-freezing
// the cgroup while waiting for the autoscaler-agent to upscale us.
do_not_freeze_more_often_than: Duration,
// The amount of memory, in bytes, that we should periodically increase memory.high
// by while waiting for the autoscaler-agent to upscale us.
//
// This exists to avoid the excessive throttling that happens when a cgroup is above its
// memory.high for too long. See more here:
// https://github.com/neondatabase/autoscaling/issues/44#issuecomment-1522487217
memory_high_increase_by_bytes: u64,
// The period, in milliseconds, at which we should repeatedly increase the value
// of the cgroup's memory.high while we're waiting on upscaling and memory.high
// is still being hit.
//
// Technically speaking, this actually serves as a rate limit to moderate responding to
// memory.high events, but these are roughly equivalent if the process is still allocating
// memory.
memory_high_increase_every: Duration,
}
impl Config {
/// Calculate the new value for the cgroups memory.high based on system memory
pub fn calculate_memory_high_value(&self, total_system_mem: u64) -> u64 {
total_system_mem.saturating_sub(self.oom_buffer_bytes)
}
} }
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
oom_buffer_bytes: 100 * MiB, memory_poll_interval: Duration::from_millis(100),
memory_high_buffer_bytes: 100 * MiB, memory_history_len: 5, // use 500ms of history for decision-making
// while waiting for upscale, don't freeze for more than 20ms every 1s memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
max_upscale_wait: Duration::from_millis(20),
do_not_freeze_more_often_than: Duration::from_millis(1000),
// while waiting for upscale, increase memory.high by 10MiB every 25ms
memory_high_increase_by_bytes: 10 * MiB,
memory_high_increase_every: Duration::from_millis(25),
}
}
}
/// Used to represent data that is associated with a certain point in time, such
/// as an upscale request or memory.high event.
///
/// Internally, creating a `Sequenced` uses a static atomic counter to obtain
/// a unique sequence number. Sequence numbers are monotonically increasing,
/// allowing us to answer questions like "did this upscale happen after this
/// memory.high event?" by comparing the sequence numbers of the two events.
#[derive(Debug, Clone)]
pub struct Sequenced<T> {
seqnum: u64,
data: T,
}
impl<T> Sequenced<T> {
pub fn new(data: T) -> Self {
Self {
seqnum: EVENT_SEQUENCE_NUMBER.fetch_add(1, Ordering::AcqRel),
data,
} }
} }
} }
@@ -170,74 +47,14 @@ impl<T> Sequenced<T> {
pub struct CgroupWatcher { pub struct CgroupWatcher {
pub config: Config, pub config: Config,
/// The sequence number of the last upscale.
///
/// If we receive a memory.high event that has a _lower_ sequence number than
/// `last_upscale_seqnum`, then we know it occured before the upscale, and we
/// can safely ignore it.
///
/// Note: Like the `events` field, this doesn't _need_ interior mutability but we
/// use it anyways so that methods take `&self`, not `&mut self`.
last_upscale_seqnum: AtomicU64,
/// A channel on which we send messages to request upscale from the dispatcher.
upscale_requester: mpsc::Sender<()>,
/// The actual cgroup we are watching and managing. /// The actual cgroup we are watching and managing.
cgroup: cgroups_rs::Cgroup, cgroup: cgroups_rs::Cgroup,
} }
/// Read memory.events for the desired event type.
///
/// `path` specifies the path to the desired `memory.events` file.
/// For more info, see the `memory.events` section of the [kernel docs]
/// <https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files>
fn get_event_count(path: &str, event: MemoryEvent) -> anyhow::Result<u64> {
let contents = fs::read_to_string(path)
.with_context(|| format!("failed to read memory.events from {path}"))?;
// Then contents of the file look like:
// low 42
// high 101
// ...
contents
.lines()
.filter_map(|s| s.split_once(' '))
.find(|(e, _)| *e == event.as_str())
.ok_or_else(|| anyhow!("failed to find entry for memory.{event} events in {path}"))
.and_then(|(_, count)| {
count
.parse::<u64>()
.with_context(|| format!("failed to parse memory.{event} as u64"))
})
}
/// Create an event stream that produces events whenever the file at the provided
/// path is modified.
fn create_file_watcher(path: &str) -> anyhow::Result<EventStream<[u8; 1024]>> {
info!("creating file watcher for {path}");
let inotify = Inotify::init().context("failed to initialize file watcher")?;
inotify
.watches()
.add(path, WatchMask::MODIFY)
.with_context(|| format!("failed to start watching {path}"))?;
inotify
// The inotify docs use [0u8; 1024] so we'll just copy them. We only need
// to store one event at a time - if the event gets written over, that's
// ok. We still see that there is an event. For more information, see:
// https://man7.org/linux/man-pages/man7/inotify.7.html
.into_event_stream([0u8; 1024])
.context("failed to start inotify event stream")
}
impl CgroupWatcher { impl CgroupWatcher {
/// Create a new `CgroupWatcher`. /// Create a new `CgroupWatcher`.
#[tracing::instrument(skip_all, fields(%name))] #[tracing::instrument(skip_all, fields(%name))]
pub fn new( pub fn new(name: String) -> anyhow::Result<Self> {
name: String,
// A channel on which to send upscale requests
upscale_requester: mpsc::Sender<()>,
) -> anyhow::Result<(Self, impl Stream<Item = Sequenced<u64>>)> {
// TODO: clarify exactly why we need v2 // TODO: clarify exactly why we need v2
// Make sure cgroups v2 (aka unified) are supported // Make sure cgroups v2 (aka unified) are supported
if !is_cgroup2_unified_mode() { if !is_cgroup2_unified_mode() {
@@ -245,410 +62,203 @@ impl CgroupWatcher {
} }
let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name); let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
// Start monitoring the cgroup for memory events. In general, for Ok(Self {
// cgroups v2 (aka unified), metrics are reported in files like cgroup,
// > `/sys/fs/cgroup/{name}/{metric}` config: Default::default(),
// We are looking for `memory.high` events, which are stored in the })
// file `memory.events`. For more info, see the `memory.events` section
// of https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files
let path = format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name);
let memory_events = create_file_watcher(&path)
.with_context(|| format!("failed to create event watcher for {path}"))?
// This would be nice with with .inspect_err followed by .ok
.filter_map(move |_| match get_event_count(&path, MemoryEvent::High) {
Ok(high) => Some(high),
Err(error) => {
// TODO: Might want to just panic here
warn!(?error, "failed to read high events count from {}", &path);
None
}
})
// Only report the event if the memory.high count increased
.filter_map(|high| {
if MEMORY_EVENT_COUNT.fetch_max(high, Ordering::AcqRel) < high {
Some(high)
} else {
None
}
})
.map(Sequenced::new);
let initial_count = get_event_count(
&format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name),
MemoryEvent::High,
)?;
info!(initial_count, "initial memory.high event count");
// Hard update `MEMORY_EVENT_COUNT` since there could have been processes
// running in the cgroup before that caused it to be non-zero.
MEMORY_EVENT_COUNT.fetch_max(initial_count, Ordering::AcqRel);
Ok((
Self {
cgroup,
upscale_requester,
last_upscale_seqnum: AtomicU64::new(0),
config: Default::default(),
},
memory_events,
))
} }
/// The entrypoint for the `CgroupWatcher`. /// The entrypoint for the `CgroupWatcher`.
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn watch<E>( pub async fn watch(
&self, &self,
// These are ~dependency injected~ (fancy, I know) because this function updates: watch::Sender<(Instant, MemoryHistory)>,
// should never return.
// -> therefore: when we tokio::spawn it, we don't await the JoinHandle.
// -> therefore: if we want to stick it in an Arc so many threads can access
// it, methods can never take mutable access.
// - note: we use the Arc strategy so that a) we can call this function
// right here and b) the runner can call the set/get_memory methods
// -> since calling recv() on a tokio::sync::mpsc::Receiver takes &mut self,
// we just pass them in here instead of holding them in fields, as that
// would require this method to take &mut self.
mut upscales: mpsc::Receiver<Sequenced<Resources>>,
events: E,
) -> anyhow::Result<()>
where
E: Stream<Item = Sequenced<u64>>,
{
let mut wait_to_freeze = pin!(tokio::time::sleep(Duration::ZERO));
let mut last_memory_high_increase_at: Option<Instant> = None;
let mut events = pin!(events);
// Are we waiting to be upscaled? Could be true if we request upscale due
// to a memory.high event and it does not arrive in time.
let mut waiting_on_upscale = false;
loop {
tokio::select! {
upscale = upscales.recv() => {
let Sequenced { seqnum, data } = upscale
.context("failed to listen on upscale notification channel")?;
waiting_on_upscale = false;
last_memory_high_increase_at = None;
self.last_upscale_seqnum.store(seqnum, Ordering::Release);
info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale");
}
event = events.next() => {
let Some(Sequenced { seqnum, .. }) = event else {
bail!("failed to listen for memory.high events")
};
// The memory.high came before our last upscale, so we consider
// it resolved
if self.last_upscale_seqnum.fetch_max(seqnum, Ordering::AcqRel) > seqnum {
info!(
"received memory.high event, but it came before our last upscale -> ignoring it"
);
continue;
}
// The memory.high came after our latest upscale. We don't
// want to do anything yet, so peek the next event in hopes
// that it's an upscale.
if let Some(upscale_num) = self
.upscaled(&mut upscales)
.context("failed to check if we were upscaled")?
{
if upscale_num > seqnum {
info!(
"received memory.high event, but it came before our last upscale -> ignoring it"
);
continue;
}
}
// If it's been long enough since we last froze, freeze the
// cgroup and request upscale
if wait_to_freeze.is_elapsed() {
info!("received memory.high event -> requesting upscale");
waiting_on_upscale = self
.handle_memory_high_event(&mut upscales)
.await
.context("failed to handle upscale")?;
wait_to_freeze
.as_mut()
.reset(Instant::now() + self.config.do_not_freeze_more_often_than);
continue;
}
// Ok, we can't freeze, just request upscale
if !waiting_on_upscale {
info!("received memory.high event, but too soon to refreeze -> requesting upscale");
// Make check to make sure we haven't been upscaled in the
// meantine (can happen if the agent independently decides
// to upscale us again)
if self
.upscaled(&mut upscales)
.context("failed to check if we were upscaled")?
.is_some()
{
info!("no need to request upscaling because we got upscaled");
continue;
}
self.upscale_requester
.send(())
.await
.context("failed to request upscale")?;
waiting_on_upscale = true;
continue;
}
// Shoot, we can't freeze or and we're still waiting on upscale,
// increase memory.high to reduce throttling
let can_increase_memory_high = match last_memory_high_increase_at {
None => true,
Some(t) => t.elapsed() > self.config.memory_high_increase_every,
};
if can_increase_memory_high {
info!(
"received memory.high event, \
but too soon to refreeze and already requested upscale \
-> increasing memory.high"
);
// Make check to make sure we haven't been upscaled in the
// meantine (can happen if the agent independently decides
// to upscale us again)
if self
.upscaled(&mut upscales)
.context("failed to check if we were upscaled")?
.is_some()
{
info!("no need to increase memory.high because got upscaled");
continue;
}
// Request upscale anyways (the agent will handle deduplicating
// requests)
self.upscale_requester
.send(())
.await
.context("failed to request upscale")?;
let memory_high =
self.get_memory_high_bytes().context("failed to get memory.high")?;
let new_high = memory_high + self.config.memory_high_increase_by_bytes;
info!(
current_high_bytes = memory_high,
new_high_bytes = new_high,
"updating memory.high"
);
self.set_memory_high_bytes(new_high)
.context("failed to set memory.high")?;
last_memory_high_increase_at = Some(Instant::now());
continue;
}
info!("received memory.high event, but can't do anything");
}
};
}
}
/// Handle a `memory.high`, returning whether we are still waiting on upscale
/// by the time the function returns.
///
/// The general plan for handling a `memory.high` event is as follows:
/// 1. Freeze the cgroup
/// 2. Start a timer for `self.config.max_upscale_wait`
/// 3. Request upscale
/// 4. After the timer elapses or we receive upscale, thaw the cgroup.
/// 5. Return whether or not we are still waiting for upscale. If we are,
/// we'll increase the cgroups memory.high to avoid getting oom killed
#[tracing::instrument(skip_all)]
async fn handle_memory_high_event(
&self,
upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
) -> anyhow::Result<bool> {
// Immediately freeze the cgroup before doing anything else.
info!("received memory.high event -> freezing cgroup");
self.freeze().context("failed to freeze cgroup")?;
// We'll use this for logging durations
let start_time = Instant::now();
// Await the upscale until we have to unfreeze
let timed =
tokio::time::timeout(self.config.max_upscale_wait, self.await_upscale(upscales));
// Request the upscale
info!(
wait = ?self.config.max_upscale_wait,
"sending request for immediate upscaling",
);
self.upscale_requester
.send(())
.await
.context("failed to request upscale")?;
let waiting_on_upscale = match timed.await {
Ok(Ok(())) => {
info!(elapsed = ?start_time.elapsed(), "received upscale in time");
false
}
// **important**: unfreeze the cgroup before ?-reporting the error
Ok(Err(e)) => {
info!("error waiting for upscale -> thawing cgroup");
self.thaw()
.context("failed to thaw cgroup after errored waiting for upscale")?;
Err(e.context("failed to await upscale"))?
}
Err(_) => {
info!(elapsed = ?self.config.max_upscale_wait, "timed out waiting for upscale");
true
}
};
info!("thawing cgroup");
self.thaw().context("failed to thaw cgroup")?;
Ok(waiting_on_upscale)
}
/// Checks whether we were just upscaled, returning the upscale's sequence
/// number if so.
#[tracing::instrument(skip_all)]
fn upscaled(
&self,
upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
) -> anyhow::Result<Option<u64>> {
let Sequenced { seqnum, data } = match upscales.try_recv() {
Ok(upscale) => upscale,
Err(TryRecvError::Empty) => return Ok(None),
Err(TryRecvError::Disconnected) => {
bail!("upscale notification channel was disconnected")
}
};
// Make sure to update the last upscale sequence number
self.last_upscale_seqnum.store(seqnum, Ordering::Release);
info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale");
Ok(Some(seqnum))
}
/// Await an upscale event, discarding any `memory.high` events received in
/// the process.
///
/// This is used in `handle_memory_high_event`, where we need to listen
/// for upscales in particular so we know if we can thaw the cgroup early.
#[tracing::instrument(skip_all)]
async fn await_upscale(
&self,
upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let Sequenced { seqnum, .. } = upscales // this requirement makes the code a bit easier to work with; see the config for more.
.recv() assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
.await
.context("error listening for upscales")?;
self.last_upscale_seqnum.store(seqnum, Ordering::Release); let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
Ok(()) ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
} // ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
/// Get the cgroup's name. let mem_controller = self.memory()?;
pub fn path(&self) -> &str {
self.cgroup.path()
}
}
// Methods for manipulating the actual cgroup // buffer for samples that will be logged. once full, it remains so.
impl CgroupWatcher { let history_log_len = self.config.memory_history_log_interval;
/// Get a handle on the freezer subsystem. let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
fn freezer(&self) -> anyhow::Result<&FreezerController> {
if let Some(Freezer(freezer)) = self for t in 0_u64.. {
.cgroup ticker.tick().await;
.subsystems()
.iter() let now = Instant::now();
.find(|sub| matches!(sub, Freezer(_))) let mem = Self::memory_usage(mem_controller);
{
Ok(freezer) let i = t as usize % history_log_len;
} else { history_log_buf[i] = mem;
anyhow::bail!("could not find freezer subsystem")
// We're taking *at most* memory_history_len values; we may be bounded by the total
// number of samples that have come in so far.
let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
// NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
// that we just inserted a value there, so the end of the iterator will *include* the
// value at i, rather than stopping just short of it.
let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
let summary = MemoryHistory {
avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
/ samples_count as u64,
samples_count,
samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
};
// Log the current history if it's time to do so. Because `history_log_buf` has length
// equal to the logging interval, we can just log the entire buffer every time we set
// the last entry, which also means that for this log line, we can ignore that it's a
// ring buffer (because all the entries are in order of increasing time).
if i == history_log_len - 1 {
info!(
history = ?MemoryStatus::debug_slice(&history_log_buf),
summary = ?summary,
"Recent cgroup memory statistics history"
);
}
updates
.send((now, summary))
.context("failed to send MemoryHistory")?;
} }
}
/// Attempt to freeze the cgroup. unreachable!()
pub fn freeze(&self) -> anyhow::Result<()> {
self.freezer()
.context("failed to get freezer subsystem")?
.freeze()
.context("failed to freeze")
}
/// Attempt to thaw the cgroup.
pub fn thaw(&self) -> anyhow::Result<()> {
self.freezer()
.context("failed to get freezer subsystem")?
.thaw()
.context("failed to thaw")
} }
/// Get a handle on the memory subsystem. /// Get a handle on the memory subsystem.
///
/// Note: this method does not require `self.memory_update_lock` because
/// getting a handle to the subsystem does not access any of the files we
/// care about, such as memory.high and memory.events
fn memory(&self) -> anyhow::Result<&MemController> { fn memory(&self) -> anyhow::Result<&MemController> {
if let Some(Mem(memory)) = self self.cgroup
.cgroup
.subsystems() .subsystems()
.iter() .iter()
.find(|sub| matches!(sub, Mem(_))) .find_map(|sub| match sub {
{ Subsystem::Mem(c) => Some(c),
Ok(memory) _ => None,
} else {
anyhow::bail!("could not find memory subsystem")
}
}
/// Get cgroup current memory usage.
pub fn current_memory_usage(&self) -> anyhow::Result<u64> {
Ok(self
.memory()
.context("failed to get memory subsystem")?
.memory_stat()
.usage_in_bytes)
}
/// Set cgroup memory.high threshold.
pub fn set_memory_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
self.set_memory_high_internal(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64))
}
/// Set the cgroup's memory.high to 'max', disabling it.
pub fn unset_memory_high(&self) -> anyhow::Result<()> {
self.set_memory_high_internal(MaxValue::Max)
}
fn set_memory_high_internal(&self, value: MaxValue) -> anyhow::Result<()> {
self.memory()
.context("failed to get memory subsystem")?
.set_mem(cgroups_rs::memory::SetMemory {
low: None,
high: Some(value),
min: None,
max: None,
}) })
.map_err(anyhow::Error::from) .ok_or_else(|| anyhow!("could not find memory subsystem"))
} }
/// Get memory.high threshold. /// Given a handle on the memory subsystem, returns the current memory information
pub fn get_memory_high_bytes(&self) -> anyhow::Result<u64> { fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
let high = self let stat = mem_controller.memory_stat().stat;
.memory() MemoryStatus {
.context("failed to get memory subsystem while getting memory statistics")? non_reclaimable: stat.active_anon + stat.inactive_anon,
.get_mem()
.map(|mem| mem.high)
.context("failed to get memory statistics from subsystem")?;
match high {
Some(MaxValue::Max) => Ok(i64::MAX as u64),
Some(MaxValue::Value(high)) => Ok(high as u64),
None => anyhow::bail!("failed to read memory.high from memory subsystem"),
} }
} }
} }
// Helper function for `CgroupWatcher::watch`
fn ring_buf_recent_values_iter<T>(
buf: &[T],
last_value_idx: usize,
count: usize,
) -> impl '_ + Iterator<Item = &T> {
// Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
// easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
assert!(count <= buf.len());
buf.iter()
// 'cycle' because the values could wrap around
.cycle()
// with 'cycle', this skip is more like 'offset', and functionally this is
// offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
// careful to avoid underflow, so we pre-add buf.len().
// The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
.skip((buf.len() + last_value_idx + 1 - count) % buf.len())
.take(count)
}
/// Summary of recent memory usage
#[derive(Debug, Copy, Clone)]
pub struct MemoryHistory {
/// Rolling average of non-reclaimable memory usage samples over the last `history_period`
pub avg_non_reclaimable: u64,
/// The number of samples used to construct this summary
pub samples_count: usize,
/// Total timespan between the first and last sample used for this summary
pub samples_span: Duration,
}
#[derive(Debug, Copy, Clone)]
pub struct MemoryStatus {
non_reclaimable: u64,
}
impl MemoryStatus {
fn zeroed() -> Self {
MemoryStatus { non_reclaimable: 0 }
}
fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
struct DS<'a>(&'a [MemoryStatus]);
impl<'a> Debug for DS<'a> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("[MemoryStatus]")
.field(
"non_reclaimable[..]",
&Fields(self.0, |stat: &MemoryStatus| {
BytesToGB(stat.non_reclaimable)
}),
)
.finish()
}
}
struct Fields<'a, F>(&'a [MemoryStatus], F);
impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_list().entries(self.0.iter().map(&self.1)).finish()
}
}
struct BytesToGB(u64);
impl Debug for BytesToGB {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.write_fmt(format_args!(
"{:.3}Gi",
self.0 as f64 / (1_u64 << 30) as f64
))
}
}
DS(slice)
}
}
#[cfg(test)]
mod tests {
#[test]
fn ring_buf_iter() {
let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let values = |offset, count| {
super::ring_buf_recent_values_iter(&buf, offset, count)
.copied()
.collect::<Vec<i32>>()
};
// Boundary conditions: start, end, and entire thing:
assert_eq!(values(0, 1), [0]);
assert_eq!(values(3, 4), [0, 1, 2, 3]);
assert_eq!(values(9, 4), [6, 7, 8, 9]);
assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
// "normal" operation: no wraparound
assert_eq!(values(7, 4), [4, 5, 6, 7]);
// wraparound:
assert_eq!(values(0, 4), [7, 8, 9, 0]);
assert_eq!(values(1, 4), [8, 9, 0, 1]);
assert_eq!(values(2, 4), [9, 0, 1, 2]);
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
}
}

View File

@@ -12,12 +12,10 @@ use futures::{
stream::{SplitSink, SplitStream}, stream::{SplitSink, SplitStream},
SinkExt, StreamExt, SinkExt, StreamExt,
}; };
use tokio::sync::mpsc;
use tracing::info; use tracing::info;
use crate::cgroup::Sequenced;
use crate::protocol::{ use crate::protocol::{
OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, Resources, PROTOCOL_MAX_VERSION, OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION,
PROTOCOL_MIN_VERSION, PROTOCOL_MIN_VERSION,
}; };
@@ -36,13 +34,6 @@ pub struct Dispatcher {
/// We send messages to the agent through `sink` /// We send messages to the agent through `sink`
sink: SplitSink<WebSocket, Message>, sink: SplitSink<WebSocket, Message>,
/// Used to notify the cgroup when we are upscaled.
pub(crate) notify_upscale_events: mpsc::Sender<Sequenced<Resources>>,
/// When the cgroup requests upscale it will send on this channel. In response
/// we send an `UpscaleRequst` to the agent.
pub(crate) request_upscale_events: mpsc::Receiver<()>,
/// The protocol version we have agreed to use with the agent. This is negotiated /// The protocol version we have agreed to use with the agent. This is negotiated
/// during the creation of the dispatcher, and should be the highest shared protocol /// during the creation of the dispatcher, and should be the highest shared protocol
/// version. /// version.
@@ -61,11 +52,7 @@ impl Dispatcher {
/// 1. Wait for the agent to sent the range of protocols it supports. /// 1. Wait for the agent to sent the range of protocols it supports.
/// 2. Send a protocol version that works for us as well, or an error if there /// 2. Send a protocol version that works for us as well, or an error if there
/// is no compatible version. /// is no compatible version.
pub async fn new( pub async fn new(stream: WebSocket) -> anyhow::Result<Self> {
stream: WebSocket,
notify_upscale_events: mpsc::Sender<Sequenced<Resources>>,
request_upscale_events: mpsc::Receiver<()>,
) -> anyhow::Result<Self> {
let (mut sink, mut source) = stream.split(); let (mut sink, mut source) = stream.split();
// Figure out the highest protocol version we both support // Figure out the highest protocol version we both support
@@ -119,22 +106,10 @@ impl Dispatcher {
Ok(Self { Ok(Self {
sink, sink,
source, source,
notify_upscale_events,
request_upscale_events,
proto_version: highest_shared_version, proto_version: highest_shared_version,
}) })
} }
/// Notify the cgroup manager that we have received upscale and wait for
/// the acknowledgement.
#[tracing::instrument(skip_all, fields(?resources))]
pub async fn notify_upscale(&self, resources: Sequenced<Resources>) -> anyhow::Result<()> {
self.notify_upscale_events
.send(resources)
.await
.context("failed to send resources and oneshot sender across channel")
}
/// Send a message to the agent. /// Send a message to the agent.
/// ///
/// Although this function is small, it has one major benefit: it is the only /// Although this function is small, it has one major benefit: it is the only

View File

@@ -5,18 +5,16 @@
//! all functionality. //! all functionality.
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use axum::extract::ws::{Message, WebSocket}; use axum::extract::ws::{Message, WebSocket};
use futures::StreamExt; use futures::StreamExt;
use tokio::sync::broadcast; use tokio::sync::{broadcast, watch};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use crate::cgroup::{CgroupWatcher, Sequenced}; use crate::cgroup::{self, CgroupWatcher};
use crate::dispatcher::Dispatcher; use crate::dispatcher::Dispatcher;
use crate::filecache::{FileCacheConfig, FileCacheState}; use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources}; use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
@@ -28,7 +26,7 @@ use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args
pub struct Runner { pub struct Runner {
config: Config, config: Config,
filecache: Option<FileCacheState>, filecache: Option<FileCacheState>,
cgroup: Option<Arc<CgroupWatcher>>, cgroup: Option<CgroupState>,
dispatcher: Dispatcher, dispatcher: Dispatcher,
/// We "mint" new message ids by incrementing this counter and taking the value. /// We "mint" new message ids by incrementing this counter and taking the value.
@@ -45,6 +43,14 @@ pub struct Runner {
kill: broadcast::Receiver<()>, kill: broadcast::Receiver<()>,
} }
#[derive(Debug)]
struct CgroupState {
watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
/// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
/// requests.
threshold: u64,
}
/// Configuration for a `Runner` /// Configuration for a `Runner`
#[derive(Debug)] #[derive(Debug)]
pub struct Config { pub struct Config {
@@ -62,16 +68,56 @@ pub struct Config {
/// upscale resource amounts (because we might not *actually* have been upscaled yet). This field /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
/// should be removed once we have a better solution there. /// should be removed once we have a better solution there.
sys_buffer_bytes: u64, sys_buffer_bytes: u64,
/// Minimum fraction of total system memory reserved *before* the the cgroup threshold; in
/// other words, providing a ceiling for the highest value of the threshold by enforcing that
/// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
/// threshold.
///
/// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
/// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
/// memory.
///
/// The default value of `0.15` means that we *guarantee* sending upscale requests if the
/// cgroup is using more than 85% of total memory (even if we're *not* separately reserving
/// memory for the file cache).
cgroup_min_overhead_fraction: f64,
cgroup_downscale_threshold_buffer_bytes: u64,
} }
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
sys_buffer_bytes: 100 * MiB, sys_buffer_bytes: 100 * MiB,
cgroup_min_overhead_fraction: 0.15,
cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
} }
} }
} }
impl Config {
fn cgroup_threshold(&self, total_mem: u64, file_cache_disk_size: u64) -> u64 {
// If the file cache is in tmpfs, then it will count towards shmem usage of the cgroup,
// and thus be non-reclaimable, so we should allow for additional memory usage.
//
// If the file cache sits on disk, our desired stable system state is for it to be fully
// page cached (its contents should only be paged to/from disk in situations where we can't
// upscale fast enough). Page-cached memory is reclaimable, so we need to lower the
// threshold for non-reclaimable memory so we scale up *before* the kernel starts paging
// out the file cache.
let memory_remaining_for_cgroup = total_mem.saturating_sub(file_cache_disk_size);
// Even if we're not separately making room for the file cache (if it's in tmpfs), we still
// want our threshold to be met gracefully instead of letting postgres get OOM-killed.
// So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
// remaining above the threshold.
let max_threshold = (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64;
memory_remaining_for_cgroup.min(max_threshold)
}
}
impl Runner { impl Runner {
/// Create a new monitor. /// Create a new monitor.
#[tracing::instrument(skip_all, fields(?config, ?args))] #[tracing::instrument(skip_all, fields(?config, ?args))]
@@ -87,12 +133,7 @@ impl Runner {
"invalid monitor Config: sys_buffer_bytes cannot be 0" "invalid monitor Config: sys_buffer_bytes cannot be 0"
); );
// *NOTE*: the dispatcher and cgroup manager talk through these channels let dispatcher = Dispatcher::new(ws)
// so make sure they each get the correct half, nothing is droppped, etc.
let (notified_send, notified_recv) = mpsc::channel(1);
let (requesting_send, requesting_recv) = mpsc::channel(1);
let dispatcher = Dispatcher::new(ws, notified_send, requesting_recv)
.await .await
.context("error creating new dispatcher")?; .context("error creating new dispatcher")?;
@@ -106,46 +147,10 @@ impl Runner {
kill, kill,
}; };
// If we have both the cgroup and file cache integrations enabled, it's possible for
// temporary failures to result in cgroup throttling (from memory.high), that in turn makes
// it near-impossible to connect to the file cache (because it times out). Unfortunately,
// we *do* still want to determine the file cache size before setting the cgroup's
// memory.high, so it's not as simple as just swapping the order.
//
// Instead, the resolution here is that on vm-monitor startup (note: happens on each
// connection from autoscaler-agent, possibly multiple times per compute_ctl lifecycle), we
// temporarily unset memory.high, to allow any existing throttling to dissipate. It's a bit
// of a hacky solution, but helps with reliability.
if let Some(name) = &args.cgroup {
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
// now, and then set limits later.
info!("initializing cgroup");
let (cgroup, cgroup_event_stream) = CgroupWatcher::new(name.clone(), requesting_send)
.context("failed to create cgroup manager")?;
info!("temporarily unsetting memory.high");
// Temporarily un-set cgroup memory.high; see above.
cgroup
.unset_memory_high()
.context("failed to unset memory.high")?;
let cgroup = Arc::new(cgroup);
let cgroup_clone = Arc::clone(&cgroup);
spawn_with_cancel(
token.clone(),
|_| error!("cgroup watcher terminated"),
async move { cgroup_clone.watch(notified_recv, cgroup_event_stream).await },
);
state.cgroup = Some(cgroup);
}
let mut file_cache_reserved_bytes = 0;
let mem = get_total_system_memory(); let mem = get_total_system_memory();
let mut file_cache_disk_size = 0;
// We need to process file cache initialization before cgroup initialization, so that the memory // We need to process file cache initialization before cgroup initialization, so that the memory
// allocated to the file cache is appropriately taken into account when we decide the cgroup's // allocated to the file cache is appropriately taken into account when we decide the cgroup's
// memory limits. // memory limits.
@@ -156,7 +161,7 @@ impl Runner {
false => FileCacheConfig::default_in_memory(), false => FileCacheConfig::default_in_memory(),
}; };
let mut file_cache = FileCacheState::new(connstr, config, token) let mut file_cache = FileCacheState::new(connstr, config, token.clone())
.await .await
.context("failed to create file cache")?; .context("failed to create file cache")?;
@@ -181,23 +186,40 @@ impl Runner {
if actual_size != new_size { if actual_size != new_size {
info!("file cache size actually got set to {actual_size}") info!("file cache size actually got set to {actual_size}")
} }
// Mark the resources given to the file cache as reserved, but only if it's in memory.
if !args.file_cache_on_disk { if args.file_cache_on_disk {
file_cache_reserved_bytes = actual_size; file_cache_disk_size = actual_size;
} }
state.filecache = Some(file_cache); state.filecache = Some(file_cache);
} }
if let Some(cgroup) = &state.cgroup { if let Some(name) = &args.cgroup {
let available = mem - file_cache_reserved_bytes; // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
let value = cgroup.config.calculate_memory_high_value(available); // now, and then set limits later.
info!("initializing cgroup");
info!(value, "setting memory.high"); let cgroup =
CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
cgroup let init_value = cgroup::MemoryHistory {
.set_memory_high_bytes(value) avg_non_reclaimable: 0,
.context("failed to set cgroup memory.high")?; samples_count: 0,
samples_span: Duration::ZERO,
};
let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
cgroup.watch(hist_tx).await
});
let threshold = state.config.cgroup_threshold(mem, file_cache_disk_size);
info!(threshold, "set initial cgroup threshold",);
state.cgroup = Some(CgroupState {
watcher: hist_rx,
threshold,
});
} }
Ok(state) Ok(state)
@@ -217,28 +239,40 @@ impl Runner {
let requested_mem = target.mem; let requested_mem = target.mem;
let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes); let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
let expected_file_cache_mem_usage = self let (expected_file_cache_size, expected_file_cache_disk_size) = self
.filecache .filecache
.as_ref() .as_ref()
.map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory)) .map(|file_cache| {
.unwrap_or(0); let size = file_cache.config.calculate_cache_size(usable_system_memory);
let mut new_cgroup_mem_high = 0; match file_cache.config.in_memory {
true => (size, 0),
false => (size, size),
}
})
.unwrap_or((0, 0));
if let Some(cgroup) = &self.cgroup { if let Some(cgroup) = &self.cgroup {
new_cgroup_mem_high = cgroup let (last_time, last_history) = *cgroup.watcher.borrow();
// TODO: make the duration here configurable.
if last_time.elapsed() > Duration::from_secs(5) {
bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
} else if last_history.samples_count <= 1 {
bail!("haven't received enough cgroup memory stats yet");
}
let new_threshold = self
.config .config
.calculate_memory_high_value(usable_system_memory - expected_file_cache_mem_usage); .cgroup_threshold(usable_system_memory, expected_file_cache_disk_size);
let current = cgroup let current = last_history.avg_non_reclaimable;
.current_memory_usage()
.context("failed to fetch cgroup memory")?;
if new_cgroup_mem_high < current + cgroup.config.memory_high_buffer_bytes { if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
let status = format!( let status = format!(
"{}: {} MiB (new high) < {} (current usage) + {} (buffer)", "{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
"calculated memory.high too low", "calculated memory threshold too low",
bytes_to_mebibytes(new_cgroup_mem_high), bytes_to_mebibytes(new_threshold),
bytes_to_mebibytes(current), bytes_to_mebibytes(current),
bytes_to_mebibytes(cgroup.config.memory_high_buffer_bytes) bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
); );
info!(status, "discontinuing downscale"); info!(status, "discontinuing downscale");
@@ -249,14 +283,14 @@ impl Runner {
// The downscaling has been approved. Downscale the file cache, then the cgroup. // The downscaling has been approved. Downscale the file cache, then the cgroup.
let mut status = vec![]; let mut status = vec![];
let mut file_cache_mem_usage = 0; let mut file_cache_disk_size = 0;
if let Some(file_cache) = &mut self.filecache { if let Some(file_cache) = &mut self.filecache {
let actual_usage = file_cache let actual_usage = file_cache
.set_file_cache_size(expected_file_cache_mem_usage) .set_file_cache_size(expected_file_cache_size)
.await .await
.context("failed to set file cache size")?; .context("failed to set file cache size")?;
if file_cache.config.in_memory { if !file_cache.config.in_memory {
file_cache_mem_usage = actual_usage; file_cache_disk_size = actual_usage;
} }
let message = format!( let message = format!(
"set file cache size to {} MiB (in memory = {})", "set file cache size to {} MiB (in memory = {})",
@@ -267,24 +301,18 @@ impl Runner {
status.push(message); status.push(message);
} }
if let Some(cgroup) = &self.cgroup { if let Some(cgroup) = &mut self.cgroup {
let available_memory = usable_system_memory - file_cache_mem_usage; let new_threshold = self
.config
if file_cache_mem_usage != expected_file_cache_mem_usage { .cgroup_threshold(usable_system_memory, file_cache_disk_size);
new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
}
// new_cgroup_mem_high is initialized to 0 but it is guaranteed to not be here
// since it is properly initialized in the previous cgroup if let block
cgroup
.set_memory_high_bytes(new_cgroup_mem_high)
.context("failed to set cgroup memory.high")?;
let message = format!( let message = format!(
"set cgroup memory.high to {} MiB, of new max {} MiB", "set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
bytes_to_mebibytes(new_cgroup_mem_high), bytes_to_mebibytes(cgroup.threshold),
bytes_to_mebibytes(available_memory) bytes_to_mebibytes(new_threshold),
bytes_to_mebibytes(usable_system_memory)
); );
cgroup.threshold = new_threshold;
info!("downscale: {message}"); info!("downscale: {message}");
status.push(message); status.push(message);
} }
@@ -305,8 +333,7 @@ impl Runner {
let new_mem = resources.mem; let new_mem = resources.mem;
let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes); let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
// Get the file cache's expected contribution to the memory usage let mut file_cache_disk_size = 0;
let mut file_cache_mem_usage = 0;
if let Some(file_cache) = &mut self.filecache { if let Some(file_cache) = &mut self.filecache {
let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory); let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
info!( info!(
@@ -319,8 +346,8 @@ impl Runner {
.set_file_cache_size(expected_usage) .set_file_cache_size(expected_usage)
.await .await
.context("failed to set file cache size")?; .context("failed to set file cache size")?;
if file_cache.config.in_memory { if !file_cache.config.in_memory {
file_cache_mem_usage = actual_usage; file_cache_disk_size = actual_usage;
} }
if actual_usage != expected_usage { if actual_usage != expected_usage {
@@ -332,18 +359,18 @@ impl Runner {
} }
} }
if let Some(cgroup) = &self.cgroup { if let Some(cgroup) = &mut self.cgroup {
let available_memory = usable_system_memory - file_cache_mem_usage; let new_threshold = self
let new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory); .config
.cgroup_threshold(usable_system_memory, file_cache_disk_size);
info!( info!(
target = bytes_to_mebibytes(new_cgroup_mem_high), "set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
total = bytes_to_mebibytes(new_mem), bytes_to_mebibytes(cgroup.threshold),
name = cgroup.path(), bytes_to_mebibytes(new_threshold),
"updating cgroup memory.high", bytes_to_mebibytes(usable_system_memory)
); );
cgroup cgroup.threshold = new_threshold;
.set_memory_high_bytes(new_cgroup_mem_high)
.context("failed to set cgroup memory.high")?;
} }
Ok(()) Ok(())
@@ -361,10 +388,6 @@ impl Runner {
self.handle_upscale(granted) self.handle_upscale(granted)
.await .await
.context("failed to handle upscale")?; .context("failed to handle upscale")?;
self.dispatcher
.notify_upscale(Sequenced::new(granted))
.await
.context("failed to notify notify cgroup of upscale")?;
Ok(Some(OutboundMsg::new( Ok(Some(OutboundMsg::new(
OutboundMsgKind::UpscaleConfirmation {}, OutboundMsgKind::UpscaleConfirmation {},
id, id,
@@ -408,33 +431,53 @@ impl Runner {
Err(e) => bail!("failed to receive kill signal: {e}") Err(e) => bail!("failed to receive kill signal: {e}")
} }
} }
// we need to propagate an upscale request
request = self.dispatcher.request_upscale_events.recv(), if self.cgroup.is_some() => { // New memory stats from the cgroup, *may* need to request upscaling, if we've
if request.is_none() { // exceeded the threshold
bail!("failed to listen for upscale event from cgroup") result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
result.context("failed to receive from cgroup memory stats watcher")?;
let cgroup = self.cgroup.as_ref().unwrap();
let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
// If we haven't exceeded the threshold, then we're all ok
if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
continue;
} }
// If it's been less than 1 second since the last time we requested upscaling, // Otherwise, we generally want upscaling. But, if it's been less than 1 second
// ignore the event, to avoid spamming the agent (otherwise, this can happen // since the last time we requested upscaling, ignore the event, to avoid
// ~1k times per second). // spamming the agent.
if let Some(t) = self.last_upscale_request_at { if let Some(t) = self.last_upscale_request_at {
let elapsed = t.elapsed(); let elapsed = t.elapsed();
if elapsed < Duration::from_secs(1) { if elapsed < Duration::from_secs(1) {
info!(elapsed_millis = elapsed.as_millis(), "cgroup asked for upscale but too soon to forward the request, ignoring"); info!(
elapsed_millis = elapsed.as_millis(),
avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
threshold = bytes_to_mebibytes(cgroup.threshold),
"cgroup memory stats are high enough to upscale but too soon to forward the request, ignoring",
);
continue; continue;
} }
} }
self.last_upscale_request_at = Some(Instant::now()); self.last_upscale_request_at = Some(Instant::now());
info!("cgroup asking for upscale; forwarding request"); info!(
avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
threshold = bytes_to_mebibytes(cgroup.threshold),
"cgroup memory stats are high enough to upscale, requesting upscale",
);
self.counter += 2; // Increment, preserving parity (i.e. keep the self.counter += 2; // Increment, preserving parity (i.e. keep the
// counter odd). See the field comment for more. // counter odd). See the field comment for more.
self.dispatcher self.dispatcher
.send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter)) .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
.await .await
.context("failed to send message")?; .context("failed to send message")?;
} },
// there is a message from the agent // there is a message from the agent
msg = self.dispatcher.source.next() => { msg = self.dispatcher.source.next() => {
if let Some(msg) = msg { if let Some(msg) = msg {