Compare commits

...

2 Commits

Author SHA1 Message Date
Heikki Linnakangas
e67746c41c Implement new vm-monitor - controller autoscaling protocol
This is the VM monitor implementation of the RFC at
https://github.com/neondatabase/neon/pull/8111.

I tried to keep the user-visible behavior unchanged from what we have
today. Improving the autoscaling algorithm is a separate topic, the
point of this work is just to move the algorihm from the autoscaler
agent to the VM monitor. That lays the groundwork for improving it
later, based on more metrics and signals inside the VM.

Some notable changes:

- I removed all the cgroup managing stuff. Instead of polling the
  cgroup memory threshold, this polls the overall system memory usage.

- The scaling algorithm is based on sliding window of load average and
  memory usage over the last minute. I'm not sure how close that is to
  the algorithm used by the autoscaler agent, I couldn't find a
  description of what exactly the algorithm used there is. I think
  this is close, but if not, it can be changed to match the agent's
  current algorithm more closely. I copied the LoadAverageFractionTarget
  and MemoryUsageFractionTarget settings from the autoscaler agent, with
  the defaults I found in the repo, but I'm not sure if we use different
  settings in production.

- I also didn't fully understand how the memory history logging in VM
  monitor, which was used to trigger upscaling. There is only one
  memory scaling codepath now, based on the max over 1-minute sliding
  window.
2024-06-19 16:58:10 +03:00
Heikki Linnakangas
cd77bb991d Update sysinfo crate 2024-06-19 16:51:30 +03:00
11 changed files with 245 additions and 635 deletions

46
Cargo.lock generated
View File

@@ -1056,19 +1056,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cgroups-rs"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb3af90c8d48ad5f432d8afb521b5b40c2a2fce46dd60e05912de51c47fba64"
dependencies = [
"libc",
"log",
"nix 0.25.1",
"regex",
"thiserror",
]
[[package]]
name = "chrono"
version = "0.4.38"
@@ -2877,9 +2864,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.150"
version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "libloading"
@@ -3139,18 +3126,6 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "nix"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4"
dependencies = [
"autocfg",
"bitflags 1.3.2",
"cfg-if",
"libc",
]
[[package]]
name = "nix"
version = "0.26.4"
@@ -4493,9 +4468,9 @@ dependencies = [
[[package]]
name = "rayon"
version = "1.7.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b"
checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa"
dependencies = [
"either",
"rayon-core",
@@ -4503,14 +4478,12 @@ dependencies = [
[[package]]
name = "rayon-core"
version = "1.11.0"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"num_cpus",
]
[[package]]
@@ -5928,9 +5901,9 @@ dependencies = [
[[package]]
name = "sysinfo"
version = "0.29.7"
version = "0.30.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "165d6d8539689e3d3bc8b98ac59541e1f21c7de7c85d60dc80e43ae0ed2113db"
checksum = "732ffa00f53e6b2af46208fba5718d9662a421049204e156328b66791ffa15ae"
dependencies = [
"cfg-if",
"core-foundation-sys",
@@ -5938,7 +5911,7 @@ dependencies = [
"ntapi",
"once_cell",
"rayon",
"winapi",
"windows 0.52.0",
]
[[package]]
@@ -6811,7 +6784,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"cgroups-rs",
"clap",
"futures",
"inotify 0.10.2",

View File

@@ -144,7 +144,7 @@ rustls = "0.22"
rustls-pemfile = "2"
rustls-split = "0.3"
scopeguard = "1.1"
sysinfo = "0.29.2"
sysinfo = "0.30.12"
sd-notify = "0.4.1"
sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }

View File

@@ -25,6 +25,3 @@ tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = "0.3.3"

View File

@@ -5,7 +5,6 @@
// The monitor was previously started by vm-builder, and for testing purposes,
// we can mimic that setup with this binary.
#[cfg(target_os = "linux")]
#[tokio::main]
async fn main() -> anyhow::Result<()> {
use clap::Parser;
@@ -26,8 +25,3 @@ async fn main() -> anyhow::Result<()> {
let token = CancellationToken::new();
vm_monitor::start(args, token).await
}
#[cfg(not(target_os = "linux"))]
fn main() {
panic!("the monitor requires cgroups, which are only available on linux")
}

View File

@@ -1,363 +0,0 @@
use std::fmt::{self, Debug, Formatter};
use std::time::{Duration, Instant};
use anyhow::{anyhow, Context};
use cgroups_rs::{
hierarchies::{self, is_cgroup2_unified_mode},
memory::MemController,
Subsystem,
};
use tokio::sync::watch;
use tracing::{info, warn};
/// Configuration for a `CgroupWatcher`
#[derive(Debug, Clone)]
pub struct Config {
/// Interval at which we should be fetching memory statistics
memory_poll_interval: Duration,
/// The number of samples used in constructing aggregated memory statistics
memory_history_len: usize,
/// The number of most recent samples that will be periodically logged.
///
/// Each sample is logged exactly once. Increasing this value means that recent samples will be
/// logged less frequently, and vice versa.
///
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
memory_history_log_interval: usize,
/// The max number of iterations to skip before logging the next iteration
memory_history_log_noskip_interval: Duration,
}
impl Default for Config {
fn default() -> Self {
Self {
memory_poll_interval: Duration::from_millis(100),
memory_history_len: 5, // use 500ms of history for decision-making
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
memory_history_log_noskip_interval: Duration::from_secs(15), // but only if it's changed, or 60 seconds have passed
}
}
}
/// Responds to `MonitorEvents` to manage the cgroup: preventing it from being
/// OOM killed or throttling.
///
/// The `CgroupWatcher` primarily achieves this by reading from a stream of
/// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the
/// cgroup happy.
#[derive(Debug)]
pub struct CgroupWatcher {
pub config: Config,
/// The actual cgroup we are watching and managing.
cgroup: cgroups_rs::Cgroup,
}
impl CgroupWatcher {
/// Create a new `CgroupWatcher`.
#[tracing::instrument(skip_all, fields(%name))]
pub fn new(name: String) -> anyhow::Result<Self> {
// TODO: clarify exactly why we need v2
// Make sure cgroups v2 (aka unified) are supported
if !is_cgroup2_unified_mode() {
anyhow::bail!("cgroups v2 not supported");
}
let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
Ok(Self {
cgroup,
config: Default::default(),
})
}
/// The entrypoint for the `CgroupWatcher`.
#[tracing::instrument(skip_all)]
pub async fn watch(
&self,
updates: watch::Sender<(Instant, MemoryHistory)>,
) -> anyhow::Result<()> {
// this requirement makes the code a bit easier to work with; see the config for more.
assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
let mem_controller = self.memory()?;
// buffer for samples that will be logged. once full, it remains so.
let history_log_len = self.config.memory_history_log_interval;
let max_skip = self.config.memory_history_log_noskip_interval;
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
let mut last_logged_memusage = MemoryStatus::zeroed();
// Ensure that we're tracking a value that's definitely in the past, as Instant::now is only guaranteed to be non-decreasing on Rust's T1-supported systems.
let mut can_skip_logs_until = Instant::now() - max_skip;
for t in 0_u64.. {
ticker.tick().await;
let now = Instant::now();
let mem = Self::memory_usage(mem_controller);
let i = t as usize % history_log_len;
history_log_buf[i] = mem;
// We're taking *at most* memory_history_len values; we may be bounded by the total
// number of samples that have come in so far.
let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
// NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
// that we just inserted a value there, so the end of the iterator will *include* the
// value at i, rather than stopping just short of it.
let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
let summary = MemoryHistory {
avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
/ samples_count as u64,
samples_count,
samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
};
// Log the current history if it's time to do so. Because `history_log_buf` has length
// equal to the logging interval, we can just log the entire buffer every time we set
// the last entry, which also means that for this log line, we can ignore that it's a
// ring buffer (because all the entries are in order of increasing time).
//
// We skip logging the data if data hasn't meaningfully changed in a while, unless
// we've already ignored previous iterations for the last max_skip period.
if i == history_log_len - 1
&& (now > can_skip_logs_until
|| !history_log_buf
.iter()
.all(|usage| last_logged_memusage.status_is_close_or_similar(usage)))
{
info!(
history = ?MemoryStatus::debug_slice(&history_log_buf),
summary = ?summary,
"Recent cgroup memory statistics history"
);
can_skip_logs_until = now + max_skip;
last_logged_memusage = *history_log_buf.last().unwrap();
}
updates
.send((now, summary))
.context("failed to send MemoryHistory")?;
}
unreachable!()
}
/// Get a handle on the memory subsystem.
fn memory(&self) -> anyhow::Result<&MemController> {
self.cgroup
.subsystems()
.iter()
.find_map(|sub| match sub {
Subsystem::Mem(c) => Some(c),
_ => None,
})
.ok_or_else(|| anyhow!("could not find memory subsystem"))
}
/// Given a handle on the memory subsystem, returns the current memory information
fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
let stat = mem_controller.memory_stat().stat;
MemoryStatus {
non_reclaimable: stat.active_anon + stat.inactive_anon,
}
}
}
// Helper function for `CgroupWatcher::watch`
fn ring_buf_recent_values_iter<T>(
buf: &[T],
last_value_idx: usize,
count: usize,
) -> impl '_ + Iterator<Item = &T> {
// Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
// easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
assert!(count <= buf.len());
buf.iter()
// 'cycle' because the values could wrap around
.cycle()
// with 'cycle', this skip is more like 'offset', and functionally this is
// offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
// careful to avoid underflow, so we pre-add buf.len().
// The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
.skip((buf.len() + last_value_idx + 1 - count) % buf.len())
.take(count)
}
/// Summary of recent memory usage
#[derive(Debug, Copy, Clone)]
pub struct MemoryHistory {
/// Rolling average of non-reclaimable memory usage samples over the last `history_period`
pub avg_non_reclaimable: u64,
/// The number of samples used to construct this summary
pub samples_count: usize,
/// Total timespan between the first and last sample used for this summary
pub samples_span: Duration,
}
#[derive(Debug, Copy, Clone)]
pub struct MemoryStatus {
non_reclaimable: u64,
}
impl MemoryStatus {
fn zeroed() -> Self {
MemoryStatus { non_reclaimable: 0 }
}
fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
struct DS<'a>(&'a [MemoryStatus]);
impl<'a> Debug for DS<'a> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("[MemoryStatus]")
.field(
"non_reclaimable[..]",
&Fields(self.0, |stat: &MemoryStatus| {
BytesToGB(stat.non_reclaimable)
}),
)
.finish()
}
}
struct Fields<'a, F>(&'a [MemoryStatus], F);
impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_list().entries(self.0.iter().map(&self.1)).finish()
}
}
struct BytesToGB(u64);
impl Debug for BytesToGB {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.write_fmt(format_args!(
"{:.3}Gi",
self.0 as f64 / (1_u64 << 30) as f64
))
}
}
DS(slice)
}
/// Check if the other memory status is a close or similar result.
/// Returns true if the larger value is not larger than the smaller value
/// by 1/8 of the smaller value, and within 128MiB.
/// See tests::check_similarity_behaviour for examples of behaviour
fn status_is_close_or_similar(&self, other: &MemoryStatus) -> bool {
let margin;
let diff;
if self.non_reclaimable >= other.non_reclaimable {
margin = other.non_reclaimable / 8;
diff = self.non_reclaimable - other.non_reclaimable;
} else {
margin = self.non_reclaimable / 8;
diff = other.non_reclaimable - self.non_reclaimable;
}
diff < margin && diff < 128 * 1024 * 1024
}
}
#[cfg(test)]
mod tests {
#[test]
fn ring_buf_iter() {
let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let values = |offset, count| {
super::ring_buf_recent_values_iter(&buf, offset, count)
.copied()
.collect::<Vec<i32>>()
};
// Boundary conditions: start, end, and entire thing:
assert_eq!(values(0, 1), [0]);
assert_eq!(values(3, 4), [0, 1, 2, 3]);
assert_eq!(values(9, 4), [6, 7, 8, 9]);
assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
// "normal" operation: no wraparound
assert_eq!(values(7, 4), [4, 5, 6, 7]);
// wraparound:
assert_eq!(values(0, 4), [7, 8, 9, 0]);
assert_eq!(values(1, 4), [8, 9, 0, 1]);
assert_eq!(values(2, 4), [9, 0, 1, 2]);
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
}
#[test]
fn check_similarity_behaviour() {
// This all accesses private methods, so we can't actually run this
// as doctests, because doctests run as an external crate.
let mut small = super::MemoryStatus {
non_reclaimable: 1024,
};
let mut large = super::MemoryStatus {
non_reclaimable: 1024 * 1024 * 1024 * 1024,
};
// objects are self-similar, no matter the size
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
// inequality is symmetric
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
small.non_reclaimable = 64;
large.non_reclaimable = (small.non_reclaimable / 8) * 9;
// objects are self-similar, no matter the size
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
// values are similar if the larger value is larger by less than
// 12.5%, i.e. 1/8 of the smaller value.
// In the example above, large is exactly 12.5% larger, so this doesn't
// match.
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
large.non_reclaimable -= 1;
assert!(large.status_is_close_or_similar(&large));
assert!(small.status_is_close_or_similar(&large));
assert!(large.status_is_close_or_similar(&small));
// The 1/8 rule only applies up to 128MiB of difference
small.non_reclaimable = 1024 * 1024 * 1024 * 1024;
large.non_reclaimable = small.non_reclaimable / 8 * 9;
assert!(small.status_is_close_or_similar(&small));
assert!(large.status_is_close_or_similar(&large));
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
// the large value is put just above the threshold
large.non_reclaimable = small.non_reclaimable + 128 * 1024 * 1024;
assert!(large.status_is_close_or_similar(&large));
assert!(!small.status_is_close_or_similar(&large));
assert!(!large.status_is_close_or_similar(&small));
// now below
large.non_reclaimable -= 1;
assert!(large.status_is_close_or_similar(&large));
assert!(small.status_is_close_or_similar(&large));
assert!(large.status_is_close_or_similar(&small));
}
}

View File

@@ -2,9 +2,7 @@
//!
//! Contains types that manage the interaction (not data interchange, see `protocol`)
//! between agent and monitor, allowing us to to process and send messages in a
//! straightforward way. The dispatcher also manages that signals that come from
//! the cgroup (requesting upscale), and the signals that go to the cgroup
//! (notifying it of upscale).
//! straightforward way.
use anyhow::{bail, Context};
use axum::extract::ws::{Message, WebSocket};
@@ -21,11 +19,7 @@ use crate::protocol::{
/// The central handler for all communications in the monitor.
///
/// The dispatcher has two purposes:
/// 1. Manage the connection to the agent, sending and receiving messages.
/// 2. Communicate with the cgroup manager, notifying it when upscale is received,
/// and sending a message to the agent when the cgroup manager requests
/// upscale.
/// The dispatcher manages the connection to the agent, sending and receiving messages.
#[derive(Debug)]
pub struct Dispatcher {
/// We read agent messages of of `source`

View File

@@ -11,7 +11,6 @@ use axum::{routing::get, Router, Server};
use clap::Parser;
use futures::Future;
use std::{fmt::Debug, time::Duration};
use sysinfo::{RefreshKind, System, SystemExt};
use tokio::{sync::broadcast, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
@@ -22,18 +21,18 @@ use runner::Runner;
pub mod dispatcher;
pub mod protocol;
pub mod cgroup;
pub mod filecache;
pub mod runner;
pub mod sliding_window;
/// The vm-monitor is an autoscaling component started by compute_ctl.
///
/// It carries out autoscaling decisions (upscaling/downscaling) and responds to
/// memory pressure by making requests to the autoscaler-agent.
#[derive(Debug, Parser)]
pub struct Args {
/// The name of the cgroup we should monitor for memory.high events. This
/// is the cgroup that postgres should be running in.
/// Unused but accepted for backwards compatibility.
#[arg(short, long)]
pub cgroup: Option<String>,
@@ -63,10 +62,6 @@ pub fn bytes_to_mebibytes(bytes: u64) -> f32 {
(bytes as f32) / (MiB as f32)
}
pub fn get_total_system_memory() -> u64 {
System::new_with_specifics(RefreshKind::new().with_memory()).total_memory()
}
/// Global app state for the Axum server
#[derive(Debug, Clone)]
pub struct ServerState {
@@ -87,8 +82,7 @@ pub struct ServerState {
///
/// This is mainly meant to be called with futures that will be pending for a very
/// long time, or are not mean to return. If it is not desirable for the future to
/// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can
/// be logged with `f`.
/// ever resolve, the error can be logged with `f`.
pub fn spawn_with_cancel<T, F>(
token: CancellationToken,
f: F,

View File

@@ -59,16 +59,15 @@ pub enum OutboundMsgKind {
/// properly deserialize it.
InvalidMessage { error: String },
/// Indicates that we experienced an internal error while processing a message.
/// For example, if a cgroup operation fails while trying to handle an upscale,
/// For example, if enlarging the file cache fails while trying to handle an upscale,
/// we return `InternalError`.
InternalError { error: String },
/// Returned to the agent once we have finished handling an upscale. If the
/// handling was unsuccessful, an `InternalError` will get returned instead.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleConfirmation {},
/// Indicates to the monitor that we are urgently requesting resources.
/// *Note*: this is a struct variant because of the way go serializes struct{}
UpscaleRequest {},
/// Indicates to the monitor that we are requesting the VM to be scaled to this size.
ScaleRequest { target: Resources },
/// Returned to the agent once we have finished attempting to downscale. If
/// an error occured trying to do so, an `InternalError` will get returned instead.
/// However, if we are simply unsuccessful (for example, do to needing the resources),
@@ -126,7 +125,7 @@ pub enum InboundMsgKind {
}
/// Represents the resources granted to a VM.
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
// Renamed because the agent has multiple resources types:
// `Resources` (milliCPU/memory slots)
// `Allocation` (vCPU/bytes) <- what we correspond to

View File

@@ -10,23 +10,23 @@ use std::time::{Duration, Instant};
use anyhow::{bail, Context};
use axum::extract::ws::{Message, WebSocket};
use futures::StreamExt;
use tokio::sync::{broadcast, watch};
use sysinfo::System;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing::{debug, info, warn};
use crate::cgroup::{self, CgroupWatcher};
use crate::dispatcher::Dispatcher;
use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
use crate::sliding_window::SlidingMax;
use crate::{bytes_to_mebibytes, Args, MiB};
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
/// signals from the agent.
/// Central struct that tracks the desired scaling target, and interacts with the agent
/// and dispatcher to handle signals from the agent.
#[derive(Debug)]
pub struct Runner {
config: Config,
filecache: Option<FileCacheState>,
cgroup: Option<CgroupState>,
dispatcher: Dispatcher,
/// We "mint" new message ids by incrementing this counter and taking the value.
@@ -35,7 +35,12 @@ pub struct Runner {
/// by us vs the autoscaler-agent.
counter: usize,
last_upscale_request_at: Option<Instant>,
last_scale_request: Option<(Resources, Instant)>,
cpu_window: SlidingMax<f64, Instant>,
mem_window: SlidingMax<u64, Instant>,
system: System,
/// A signal to kill the main thread produced by `self.run()`. This is triggered
/// when the server receives a new connection. When the thread receives the
@@ -43,14 +48,6 @@ pub struct Runner {
kill: broadcast::Receiver<()>,
}
#[derive(Debug)]
struct CgroupState {
watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
/// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
/// requests.
threshold: u64,
}
/// Configuration for a `Runner`
#[derive(Debug)]
pub struct Config {
@@ -69,55 +66,45 @@ pub struct Config {
/// should be removed once we have a better solution there.
sys_buffer_bytes: u64,
/// Minimum fraction of total system memory reserved *before* the cgroup threshold; in
/// other words, providing a ceiling for the highest value of the threshold by enforcing that
/// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
/// threshold.
///
/// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
/// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
/// memory.
///
/// The default value of `0.15` means that we *guarantee* sending upscale requests if the
/// cgroup is using more than 85% of total memory (even if we're *not* separately reserving
/// memory for the file cache).
cgroup_min_overhead_fraction: f64,
/// Interval at which we poll memory and CPU statistics for scaling decisions.
poll_interval: Duration,
cgroup_downscale_threshold_buffer_bytes: u64,
/// The resources requested from the agent are calculated based on the Max of memory
/// usage and load average over a sliding window of the last X seconds. This controls
/// the length of the window to consider.
sliding_window_length: Duration,
/// Desired fraction of current CPU that the load average should be. For example, with a value
/// of 0.7, we'd want load average to sit at 0.7 × CPU, scaling CPU to make this happen.
load_average_fraction_target: f64,
/// Desired fraction of current memory that we would like to be using. For example, with a value
/// of 0.7, on a 4GB VM we'd like to be using 2.8GB of memory.
memory_usage_fraction_target: f64,
/// When requesting scaling to a certain # of CPUs, the request is rounded up to the
/// nearest multiple of 'cpu_quantum'. For example, if the desired # of CPUs based on the
/// usage is 3.1, and cpu_quantum is 0.25, we'd request 3.25 CPUs.
cpu_quantum: f64,
/// Like 'cpu_quantum', but for memory. In bytes.
mem_quantum: u64,
}
impl Default for Config {
fn default() -> Self {
Self {
sys_buffer_bytes: 100 * MiB,
cgroup_min_overhead_fraction: 0.15,
cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
poll_interval: Duration::from_millis(100),
sliding_window_length: Duration::from_secs(60),
cpu_quantum: 0.25,
mem_quantum: 512 * 1024 * 1024,
load_average_fraction_target: 0.9,
memory_usage_fraction_target: 0.75,
}
}
}
impl Config {
fn cgroup_threshold(&self, total_mem: u64, file_cache_disk_size: u64) -> u64 {
// If the file cache is in tmpfs, then it will count towards shmem usage of the cgroup,
// and thus be non-reclaimable, so we should allow for additional memory usage.
//
// If the file cache sits on disk, our desired stable system state is for it to be fully
// page cached (its contents should only be paged to/from disk in situations where we can't
// upscale fast enough). Page-cached memory is reclaimable, so we need to lower the
// threshold for non-reclaimable memory so we scale up *before* the kernel starts paging
// out the file cache.
let memory_remaining_for_cgroup = total_mem.saturating_sub(file_cache_disk_size);
// Even if we're not separately making room for the file cache (if it's in tmpfs), we still
// want our threshold to be met gracefully instead of letting postgres get OOM-killed.
// So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
// remaining above the threshold.
let max_threshold = (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64;
memory_remaining_for_cgroup.min(max_threshold)
}
}
impl Runner {
/// Create a new monitor.
#[tracing::instrument(skip_all, fields(?config, ?args))]
@@ -137,23 +124,25 @@ impl Runner {
.await
.context("error creating new dispatcher")?;
let now = Instant::now();
let mut state = Runner {
config,
filecache: None,
cgroup: None,
dispatcher,
counter: 1, // NB: must be odd, see the comment about the field for more.
last_upscale_request_at: None,
last_scale_request: None,
cpu_window: SlidingMax::new(0.0, now),
mem_window: SlidingMax::new(0, now),
kill,
system: System::new(),
};
let mem = get_total_system_memory();
state.system.refresh_specifics(
sysinfo::RefreshKind::new().with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
);
let mut file_cache_disk_size = 0;
let mem = state.system.total_memory();
// We need to process file cache initialization before cgroup initialization, so that the memory
// allocated to the file cache is appropriately taken into account when we decide the cgroup's
// memory limits.
if let Some(connstr) = &args.pgconnstr {
info!("initializing file cache");
let config = FileCacheConfig::default();
@@ -184,51 +173,19 @@ impl Runner {
info!("file cache size actually got set to {actual_size}")
}
file_cache_disk_size = actual_size;
state.filecache = Some(file_cache);
}
if let Some(name) = &args.cgroup {
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
// now, and then set limits later.
info!("initializing cgroup");
let cgroup =
CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
let init_value = cgroup::MemoryHistory {
avg_non_reclaimable: 0,
samples_count: 0,
samples_span: Duration::ZERO,
};
let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
cgroup.watch(hist_tx).await
});
let threshold = state.config.cgroup_threshold(mem, file_cache_disk_size);
info!(threshold, "set initial cgroup threshold",);
state.cgroup = Some(CgroupState {
watcher: hist_rx,
threshold,
});
}
Ok(state)
}
/// Attempt to downscale filecache + cgroup
/// Attempt to downscale filecache
#[tracing::instrument(skip_all, fields(?target))]
pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
// Nothing to adjust
if self.cgroup.is_none() && self.filecache.is_none() {
info!("no action needed for downscale (no cgroup or file cache enabled)");
return Ok((
true,
"monitor is not managing cgroup or file cache".to_string(),
));
if self.filecache.is_none() {
info!("no action needed for downscale (no file cache enabled)");
return Ok((true, "monitor is not managing file cache".to_string()));
}
let requested_mem = target.mem;
@@ -238,57 +195,14 @@ impl Runner {
.as_ref()
.map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
.unwrap_or(0);
if let Some(cgroup) = &self.cgroup {
let (last_time, last_history) = *cgroup.watcher.borrow();
// NB: The ordering of these conditions is intentional. During startup, we should deny
// downscaling until we have enough information to determine that it's safe to do so
// (i.e. enough samples have come in). But if it's been a while and we *still* haven't
// received any information, we should *fail* instead of just denying downscaling.
//
// `last_time` is set to `Instant::now()` on startup, so checking `last_time.elapsed()`
// serves double-duty: it trips if we haven't received *any* metrics for long enough,
// OR if we haven't received metrics *recently enough*.
//
// TODO: make the duration here configurable.
if last_time.elapsed() > Duration::from_secs(5) {
bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
} else if last_history.samples_count <= 1 {
let status = "haven't received enough cgroup memory stats yet";
info!(status, "discontinuing downscale");
return Ok((false, status.to_owned()));
}
let new_threshold = self
.config
.cgroup_threshold(usable_system_memory, expected_file_cache_size);
let current = last_history.avg_non_reclaimable;
if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
let status = format!(
"{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
"calculated memory threshold too low",
bytes_to_mebibytes(new_threshold),
bytes_to_mebibytes(current),
bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
);
info!(status, "discontinuing downscale");
return Ok((false, status));
}
}
// The downscaling has been approved. Downscale the file cache, then the cgroup.
// The downscaling has been approved. Downscale the file cache.
let mut status = vec![];
let mut file_cache_disk_size = 0;
if let Some(file_cache) = &mut self.filecache {
let actual_usage = file_cache
.set_file_cache_size(expected_file_cache_size)
.await
.context("failed to set file cache size")?;
file_cache_disk_size = actual_usage;
let message = format!(
"set file cache size to {} MiB",
bytes_to_mebibytes(actual_usage),
@@ -297,22 +211,6 @@ impl Runner {
status.push(message);
}
if let Some(cgroup) = &mut self.cgroup {
let new_threshold = self
.config
.cgroup_threshold(usable_system_memory, file_cache_disk_size);
let message = format!(
"set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
bytes_to_mebibytes(cgroup.threshold),
bytes_to_mebibytes(new_threshold),
bytes_to_mebibytes(usable_system_memory)
);
cgroup.threshold = new_threshold;
info!("downscale: {message}");
status.push(message);
}
// TODO: make this status thing less jank
let status = status.join("; ");
Ok((true, status))
@@ -321,15 +219,14 @@ impl Runner {
/// Handle new resources
#[tracing::instrument(skip_all, fields(?resources))]
pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
if self.filecache.is_none() && self.cgroup.is_none() {
info!("no action needed for upscale (no cgroup or file cache enabled)");
if self.filecache.is_none() {
info!("no action needed for upscale (file cache is disabled)");
return Ok(());
}
let new_mem = resources.mem;
let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
let mut file_cache_disk_size = 0;
if let Some(file_cache) = &mut self.filecache {
let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
info!(
@@ -342,7 +239,6 @@ impl Runner {
.set_file_cache_size(expected_usage)
.await
.context("failed to set file cache size")?;
file_cache_disk_size = actual_usage;
if actual_usage != expected_usage {
warn!(
@@ -353,20 +249,6 @@ impl Runner {
}
}
if let Some(cgroup) = &mut self.cgroup {
let new_threshold = self
.config
.cgroup_threshold(usable_system_memory, file_cache_disk_size);
info!(
"set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
bytes_to_mebibytes(cgroup.threshold),
bytes_to_mebibytes(new_threshold),
bytes_to_mebibytes(usable_system_memory)
);
cgroup.threshold = new_threshold;
}
Ok(())
}
@@ -413,10 +295,57 @@ impl Runner {
}
}
/// Calculate the desired size of the VM, based on the CPU and memory usage right now.
pub fn calculate_raw_target(&mut self) -> Resources {
self.system.refresh_specifics(
sysinfo::RefreshKind::new().with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
);
// For CPU:
//
// We use the 1 minute load average to measure "current" CPU usage. Target # of
// CPUs is at the point where:
//
// (CPUs) * (LoadAverageFractionTarget) == (load average).
let load_avg_1min = System::load_average().one;
let goal_cpus = load_avg_1min / self.config.load_average_fraction_target;
// For Memory:
//
// Target point is where (Mem) * (MemoryUsageFractionTarget) == (Mem Usage)
let used_memory = self.system.used_memory();
let goal_memory_bytes: u64 =
(self.system.used_memory() as f64 / self.config.memory_usage_fraction_target) as u64;
debug!("load avg: {load_avg_1min} used memory: {used_memory}");
Resources {
cpu: goal_cpus,
mem: goal_memory_bytes,
}
}
/// To avoid overly fine-grained requests to the agent, round up the request to a
/// multiple of the CPU and memory size of one a Compute Unit.
///
/// We still track CPU and memory separately though. The autoscaler agent will combine
/// the CPU and memory requests to a single "# of Compute Units" measure.
fn quantize_resources(&self, res: Resources) -> Resources {
Resources {
cpu: (res.cpu / self.config.cpu_quantum).ceil() * self.config.cpu_quantum,
mem: ((res.mem as f64 / self.config.mem_quantum as f64).ceil()
* self.config.mem_quantum as f64) as u64,
}
}
// TODO: don't propagate errors, probably just warn!?
#[tracing::instrument(skip_all)]
pub async fn run(&mut self) -> anyhow::Result<()> {
info!("starting dispatcher");
let mut ticker = tokio::time::interval(self.config.poll_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
loop {
tokio::select! {
signal = self.kill.recv() => {
@@ -426,25 +355,39 @@ impl Runner {
}
}
// New memory stats from the cgroup, *may* need to request upscaling, if we've
// exceeded the threshold
result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
result.context("failed to receive from cgroup memory stats watcher")?;
// Time to re-evaluate the scaling target
_ = ticker.tick() => {
let now = Instant::now();
let cgroup = self.cgroup.as_ref().unwrap();
// Calculate the desired resources based on current usage
let target_now = self.calculate_raw_target();
let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
// Round it up to the nearest CU sizes, to avoid overly fine-grained
// requests.
let quantized_target_now = self.quantize_resources(target_now);
// If we haven't exceeded the threshold, then we're all ok
if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
continue;
}
// Smoothen using sliding windows.
self.cpu_window.add_sample(quantized_target_now.cpu, now);
self.cpu_window.trim(now - self.config.sliding_window_length);
self.mem_window.add_sample(quantized_target_now.mem, now);
self.mem_window.trim(now - self.config.sliding_window_length);
// Otherwise, we generally want upscaling. But, if it's been less than 1 second
// since the last time we requested upscaling, ignore the event, to avoid
// spamming the agent.
if let Some(t) = self.last_upscale_request_at {
let elapsed = t.elapsed();
let sliding_target = Resources {
cpu: *self.cpu_window.get_max(),
mem: *self.mem_window.get_max(),
};
// If no change, we're all ok.
//
// XXX: If the agent doesn't perform the scaling, should we retry after a while though?
if let Some((last_request_target, last_request_at)) = self.last_scale_request {
if last_request_target == sliding_target {
continue;
}
// If it's been less than 1 second since the last time we requested
// scaling, don't send a request to avoid spamming the agent.
let elapsed = now.duration_since(last_request_at);
if elapsed < Duration::from_secs(1) {
// *Ideally* we'd like to log here that we're ignoring the fact the
// memory stats are too high, but in practice this can result in
@@ -453,20 +396,28 @@ impl Runner {
// See https://github.com/neondatabase/neon/issues/5865 for more.
continue;
}
info!(
old_target_cpu = last_request_target.cpu,
old_target_mem = last_request_target.mem,
target_cpu = sliding_target.cpu,
target_mem = sliding_target.mem,
"scaling target changed, requesting scaling",
);
} else {
info!(
target_cpu = sliding_target.cpu,
target_mem = sliding_target.mem,
"no previous scaling request, requesting initial scale",
);
}
self.last_upscale_request_at = Some(Instant::now());
info!(
avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
threshold = bytes_to_mebibytes(cgroup.threshold),
"cgroup memory stats are high enough to upscale, requesting upscale",
);
self.last_scale_request = Some((sliding_target, now));
self.counter += 2; // Increment, preserving parity (i.e. keep the
// counter odd). See the field comment for more.
self.dispatcher
.send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
.send(OutboundMsg::new(OutboundMsgKind::ScaleRequest {target: sliding_target}, self.counter))
.await
.context("failed to send message")?;
},

View File

@@ -0,0 +1,72 @@
use std::collections::VecDeque;
/// Maintain a sliding window for calculating Max over a period of time.
///
/// The window maintains a queue of samples. Each sample consists of a
/// "value" and the timestamp that it was measured at.
///
/// The queue is ordered by time, newest samples are at the front.
/// When a new sample is added, we delete any older samples in the
/// queue with a lower value, because they cannot affect the max
/// anymore. This means that the queue is always also ordered by value,
/// with the greatest value at the back:
///
/// front back
///
/// #
/// # #
/// # # #
/// # # #
///
///
/// V: Value
/// T: Time unit
#[derive(Debug)]
pub struct SlidingMax<V, T> {
samples: VecDeque<(V, T)>,
}
impl<V: std::cmp::PartialOrd, T: std::cmp::PartialOrd> SlidingMax<V, T> {
pub fn new(initial_val: V, initial_time: T) -> SlidingMax<V, T> {
SlidingMax {
samples: VecDeque::from([(initial_val, initial_time)]),
}
}
/// Add a new sample to the window.
///
/// We assume that the time is >= the time of any existing sample
/// in the queue, although we don't check it, and the code still
/// works without e.g. panicking if you violate that. It just
/// might not produce the correct result, until the disordered
/// samples have fallen off the window.
pub fn add_sample(&mut self, sample: V, time: T) {
while let Some((v, _t)) = self.samples.front() {
if sample < *v {
break;
} else {
self.samples.pop_front();
continue;
}
}
self.samples.push_front((sample, time))
}
/// Remove samples older than 'threshold' from the window
pub fn trim(&mut self, threshold: T) {
while self.samples.len() >= 2 {
let (_v, t) = self.samples.back().unwrap();
if *t < threshold {
self.samples.pop_back();
} else {
break;
}
}
}
/// Get the current max over the window
pub fn get_max(&self) -> &V {
&self.samples.back().unwrap().0
}
}

View File

@@ -18,7 +18,6 @@ use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::SystemExt;
use tokio::fs;
use anyhow::Context;
@@ -484,9 +483,10 @@ pub async fn init_tenant_mgr(
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
// Initialize dynamic limits that depend on system resources
let system_memory =
sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
.total_memory();
let system_memory = sysinfo::System::new_with_specifics(
sysinfo::RefreshKind::new().with_memory(sysinfo::MemoryRefreshKind::new().with_ram()),
)
.total_memory();
let max_ephemeral_layer_bytes =
conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
tracing::info!("Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory");