mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
193 Commits
getpage_ls
...
release-40
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e614a95853 | ||
|
|
850db4cc13 | ||
|
|
8a316b1277 | ||
|
|
4d13bae449 | ||
|
|
49377abd98 | ||
|
|
a6b2f4e54e | ||
|
|
face60d50b | ||
|
|
9768aa27f2 | ||
|
|
96b2e575e1 | ||
|
|
7222777784 | ||
|
|
5469fdede0 | ||
|
|
72aa6b9fdd | ||
|
|
ae0634b7be | ||
|
|
70711f32fa | ||
|
|
52a88af0aa | ||
|
|
b7a43bf817 | ||
|
|
dce91b33a4 | ||
|
|
23ee4f3050 | ||
|
|
46857e8282 | ||
|
|
368ab0ce54 | ||
|
|
a5987eebfd | ||
|
|
6686ede30f | ||
|
|
373c7057cc | ||
|
|
7d6ec16166 | ||
|
|
0e6fdc8a58 | ||
|
|
521438a5c6 | ||
|
|
07d7874bc8 | ||
|
|
1804111a02 | ||
|
|
cd0178efed | ||
|
|
333574be57 | ||
|
|
79a799a143 | ||
|
|
9da06af6c9 | ||
|
|
ce1753d036 | ||
|
|
67db8432b4 | ||
|
|
4e2e44e524 | ||
|
|
ed786104f3 | ||
|
|
84b74f2bd1 | ||
|
|
fec2ad6283 | ||
|
|
98eebd4682 | ||
|
|
2f74287c9b | ||
|
|
aee1bf95e3 | ||
|
|
b9de9d75ff | ||
|
|
7943b709e6 | ||
|
|
d7d066d493 | ||
|
|
e78ac22107 | ||
|
|
76a8f2bb44 | ||
|
|
8d59a8581f | ||
|
|
b1ddd01289 | ||
|
|
6eae4fc9aa | ||
|
|
765455bca2 | ||
|
|
4204960942 | ||
|
|
67345d66ea | ||
|
|
2266ee5971 | ||
|
|
b58445d855 | ||
|
|
36050e7f3d | ||
|
|
33360ed96d | ||
|
|
39a28d1108 | ||
|
|
efa6aa134f | ||
|
|
2c724e56e2 | ||
|
|
feff887c6f | ||
|
|
353d915fcf | ||
|
|
2e38098cbc | ||
|
|
a6fe5ea1ac | ||
|
|
05b0aed0c1 | ||
|
|
cd1705357d | ||
|
|
6bc7561290 | ||
|
|
fbd3ac14b5 | ||
|
|
e437787c8f | ||
|
|
3460dbf90b | ||
|
|
6b89d99677 | ||
|
|
6cc8ea86e4 | ||
|
|
e62a492d6f | ||
|
|
a475cdf642 | ||
|
|
7002c79a47 | ||
|
|
ee6cf357b4 | ||
|
|
e5c2086b5f | ||
|
|
5f1208296a | ||
|
|
88e8e473cd | ||
|
|
b0a77844f6 | ||
|
|
1baf464307 | ||
|
|
e9b8e81cea | ||
|
|
85d6194aa4 | ||
|
|
333a7a68ef | ||
|
|
6aa4e41bee | ||
|
|
840183e51f | ||
|
|
cbccc94b03 | ||
|
|
fce227df22 | ||
|
|
bd787e800f | ||
|
|
4a7704b4a3 | ||
|
|
ff1119da66 | ||
|
|
4c3ba1627b | ||
|
|
1407174fb2 | ||
|
|
ec9dcb1889 | ||
|
|
d11d781afc | ||
|
|
4e44565b71 | ||
|
|
4ed51ad33b | ||
|
|
1c1ebe5537 | ||
|
|
c19cb7f386 | ||
|
|
4b97d31b16 | ||
|
|
923ade3dd7 | ||
|
|
b04e711975 | ||
|
|
afd0a6b39a | ||
|
|
99752286d8 | ||
|
|
15df93363c | ||
|
|
bc0ab741af | ||
|
|
51d9dfeaa3 | ||
|
|
f63cb18155 | ||
|
|
0de603d88e | ||
|
|
240913912a | ||
|
|
91a4ea0de2 | ||
|
|
8608704f49 | ||
|
|
efef68ce99 | ||
|
|
8daefd24da | ||
|
|
46cc8b7982 | ||
|
|
38cd90dd0c | ||
|
|
a51b269f15 | ||
|
|
43bf6d0a0f | ||
|
|
15273a9b66 | ||
|
|
78aca668d0 | ||
|
|
acbf4148ea | ||
|
|
6508540561 | ||
|
|
a41b5244a8 | ||
|
|
2b3189be95 | ||
|
|
248563c595 | ||
|
|
14cd6ca933 | ||
|
|
eb36403e71 | ||
|
|
3c6f779698 | ||
|
|
f67f0c1c11 | ||
|
|
edb02d3299 | ||
|
|
664a69e65b | ||
|
|
478322ebf9 | ||
|
|
802f174072 | ||
|
|
47f9890bae | ||
|
|
262265daad | ||
|
|
300da5b872 | ||
|
|
7b22b5c433 | ||
|
|
ffca97bc1e | ||
|
|
cb356f3259 | ||
|
|
c85374295f | ||
|
|
4992160677 | ||
|
|
bd535b3371 | ||
|
|
d90c5a03af | ||
|
|
2d02cc9079 | ||
|
|
49ad94b99f | ||
|
|
948a217398 | ||
|
|
125381eae7 | ||
|
|
cd01bbc715 | ||
|
|
d8b5e3b88d | ||
|
|
06d25f2186 | ||
|
|
f759b561f3 | ||
|
|
ece0555600 | ||
|
|
73ea0a0b01 | ||
|
|
d8f6d6fd6f | ||
|
|
d24de169a7 | ||
|
|
0816168296 | ||
|
|
277b44d57a | ||
|
|
68c2c3880e | ||
|
|
49da498f65 | ||
|
|
2c76ba3dd7 | ||
|
|
dbe3dc69ad | ||
|
|
8e5bb3ed49 | ||
|
|
ab0be7b8da | ||
|
|
b4c55f5d24 | ||
|
|
ede70d833c | ||
|
|
70c3d18bb0 | ||
|
|
7a491f52c4 | ||
|
|
323c4ecb4f | ||
|
|
3d2466607e | ||
|
|
ed478b39f4 | ||
|
|
91585a558d | ||
|
|
93467eae1f | ||
|
|
f3aac81d19 | ||
|
|
979ad60c19 | ||
|
|
9316cb1b1f | ||
|
|
e7939a527a | ||
|
|
36d26665e1 | ||
|
|
873347f977 | ||
|
|
e814ac16f9 | ||
|
|
ad3055d386 | ||
|
|
94e03eb452 | ||
|
|
380f26ef79 | ||
|
|
3c5b7f59d7 | ||
|
|
fee89f80b5 | ||
|
|
41cce8eaf1 | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
@@ -27,8 +27,8 @@ and old one if it exists.
|
||||
* the filecache: a struct that allows communication with the Postgres file cache.
|
||||
On startup, we connect to the filecache and hold on to the connection for the
|
||||
entire monitor lifetime.
|
||||
* the cgroup watcher: the `CgroupWatcher` manages the `neon-postgres` cgroup by
|
||||
listening for `memory.high` events and setting its `memory.{high,max}` values.
|
||||
* the cgroup watcher: the `CgroupWatcher` polls the `neon-postgres` cgroup's memory
|
||||
usage and sends rolling aggregates to the runner.
|
||||
* the runner: the runner marries the filecache and cgroup watcher together,
|
||||
communicating with the agent throught the `Dispatcher`, and then calling filecache
|
||||
and cgroup watcher functions as needed to upscale and downscale
|
||||
|
||||
@@ -1,161 +1,38 @@
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
fs,
|
||||
pin::pin,
|
||||
sync::atomic::{AtomicU64, Ordering},
|
||||
};
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use anyhow::{anyhow, Context};
|
||||
use cgroups_rs::{
|
||||
freezer::FreezerController,
|
||||
hierarchies::{self, is_cgroup2_unified_mode, UNIFIED_MOUNTPOINT},
|
||||
hierarchies::{self, is_cgroup2_unified_mode},
|
||||
memory::MemController,
|
||||
MaxValue,
|
||||
Subsystem::{Freezer, Mem},
|
||||
Subsystem,
|
||||
};
|
||||
use inotify::{EventStream, Inotify, WatchMask};
|
||||
use tokio::sync::mpsc::{self, error::TryRecvError};
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tokio_stream::{Stream, StreamExt};
|
||||
use tokio::sync::watch;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::protocol::Resources;
|
||||
use crate::MiB;
|
||||
|
||||
/// Monotonically increasing counter of the number of memory.high events
|
||||
/// the cgroup has experienced.
|
||||
///
|
||||
/// We use this to determine if a modification to the `memory.events` file actually
|
||||
/// changed the `high` field. If not, we don't care about the change. When we
|
||||
/// read the file, we check the `high` field in the file against `MEMORY_EVENT_COUNT`
|
||||
/// to see if it changed since last time.
|
||||
pub static MEMORY_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
/// Monotonically increasing counter that gives each cgroup event a unique id.
|
||||
///
|
||||
/// This allows us to answer questions like "did this upscale arrive before this
|
||||
/// memory.high?". This static is also used by the `Sequenced` type to "tag" values
|
||||
/// with a sequence number. As such, prefer to used the `Sequenced` type rather
|
||||
/// than this static directly.
|
||||
static EVENT_SEQUENCE_NUMBER: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
/// A memory event type reported in memory.events.
|
||||
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
|
||||
pub enum MemoryEvent {
|
||||
Low,
|
||||
High,
|
||||
Max,
|
||||
Oom,
|
||||
OomKill,
|
||||
OomGroupKill,
|
||||
}
|
||||
|
||||
impl MemoryEvent {
|
||||
fn as_str(&self) -> &str {
|
||||
match self {
|
||||
MemoryEvent::Low => "low",
|
||||
MemoryEvent::High => "high",
|
||||
MemoryEvent::Max => "max",
|
||||
MemoryEvent::Oom => "oom",
|
||||
MemoryEvent::OomKill => "oom_kill",
|
||||
MemoryEvent::OomGroupKill => "oom_group_kill",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for MemoryEvent {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration for a `CgroupWatcher`
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
// The target difference between the total memory reserved for the cgroup
|
||||
// and the value of the cgroup's memory.high.
|
||||
//
|
||||
// In other words, memory.high + oom_buffer_bytes will equal the total memory that the cgroup may
|
||||
// use (equal to system memory, minus whatever's taken out for the file cache).
|
||||
oom_buffer_bytes: u64,
|
||||
/// Interval at which we should be fetching memory statistics
|
||||
memory_poll_interval: Duration,
|
||||
|
||||
// The amount of memory, in bytes, below a proposed new value for
|
||||
// memory.high that the cgroup's memory usage must be for us to downscale
|
||||
//
|
||||
// In other words, we can downscale only when:
|
||||
//
|
||||
// memory.current + memory_high_buffer_bytes < (proposed) memory.high
|
||||
//
|
||||
// TODO: there's some minor issues with this approach -- in particular, that we might have
|
||||
// memory in use by the kernel's page cache that we're actually ok with getting rid of.
|
||||
pub(crate) memory_high_buffer_bytes: u64,
|
||||
|
||||
// The maximum duration, in milliseconds, that we're allowed to pause
|
||||
// the cgroup for while waiting for the autoscaler-agent to upscale us
|
||||
max_upscale_wait: Duration,
|
||||
|
||||
// The required minimum time, in milliseconds, that we must wait before re-freezing
|
||||
// the cgroup while waiting for the autoscaler-agent to upscale us.
|
||||
do_not_freeze_more_often_than: Duration,
|
||||
|
||||
// The amount of memory, in bytes, that we should periodically increase memory.high
|
||||
// by while waiting for the autoscaler-agent to upscale us.
|
||||
//
|
||||
// This exists to avoid the excessive throttling that happens when a cgroup is above its
|
||||
// memory.high for too long. See more here:
|
||||
// https://github.com/neondatabase/autoscaling/issues/44#issuecomment-1522487217
|
||||
memory_high_increase_by_bytes: u64,
|
||||
|
||||
// The period, in milliseconds, at which we should repeatedly increase the value
|
||||
// of the cgroup's memory.high while we're waiting on upscaling and memory.high
|
||||
// is still being hit.
|
||||
//
|
||||
// Technically speaking, this actually serves as a rate limit to moderate responding to
|
||||
// memory.high events, but these are roughly equivalent if the process is still allocating
|
||||
// memory.
|
||||
memory_high_increase_every: Duration,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
/// Calculate the new value for the cgroups memory.high based on system memory
|
||||
pub fn calculate_memory_high_value(&self, total_system_mem: u64) -> u64 {
|
||||
total_system_mem.saturating_sub(self.oom_buffer_bytes)
|
||||
}
|
||||
/// The number of samples used in constructing aggregated memory statistics
|
||||
memory_history_len: usize,
|
||||
/// The number of most recent samples that will be periodically logged.
|
||||
///
|
||||
/// Each sample is logged exactly once. Increasing this value means that recent samples will be
|
||||
/// logged less frequently, and vice versa.
|
||||
///
|
||||
/// For simplicity, this value must be greater than or equal to `memory_history_len`.
|
||||
memory_history_log_interval: usize,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
oom_buffer_bytes: 100 * MiB,
|
||||
memory_high_buffer_bytes: 100 * MiB,
|
||||
// while waiting for upscale, don't freeze for more than 20ms every 1s
|
||||
max_upscale_wait: Duration::from_millis(20),
|
||||
do_not_freeze_more_often_than: Duration::from_millis(1000),
|
||||
// while waiting for upscale, increase memory.high by 10MiB every 25ms
|
||||
memory_high_increase_by_bytes: 10 * MiB,
|
||||
memory_high_increase_every: Duration::from_millis(25),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Used to represent data that is associated with a certain point in time, such
|
||||
/// as an upscale request or memory.high event.
|
||||
///
|
||||
/// Internally, creating a `Sequenced` uses a static atomic counter to obtain
|
||||
/// a unique sequence number. Sequence numbers are monotonically increasing,
|
||||
/// allowing us to answer questions like "did this upscale happen after this
|
||||
/// memory.high event?" by comparing the sequence numbers of the two events.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Sequenced<T> {
|
||||
seqnum: u64,
|
||||
data: T,
|
||||
}
|
||||
|
||||
impl<T> Sequenced<T> {
|
||||
pub fn new(data: T) -> Self {
|
||||
Self {
|
||||
seqnum: EVENT_SEQUENCE_NUMBER.fetch_add(1, Ordering::AcqRel),
|
||||
data,
|
||||
memory_poll_interval: Duration::from_millis(100),
|
||||
memory_history_len: 5, // use 500ms of history for decision-making
|
||||
memory_history_log_interval: 20, // but only log every ~2s (otherwise it's spammy)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -170,74 +47,14 @@ impl<T> Sequenced<T> {
|
||||
pub struct CgroupWatcher {
|
||||
pub config: Config,
|
||||
|
||||
/// The sequence number of the last upscale.
|
||||
///
|
||||
/// If we receive a memory.high event that has a _lower_ sequence number than
|
||||
/// `last_upscale_seqnum`, then we know it occured before the upscale, and we
|
||||
/// can safely ignore it.
|
||||
///
|
||||
/// Note: Like the `events` field, this doesn't _need_ interior mutability but we
|
||||
/// use it anyways so that methods take `&self`, not `&mut self`.
|
||||
last_upscale_seqnum: AtomicU64,
|
||||
|
||||
/// A channel on which we send messages to request upscale from the dispatcher.
|
||||
upscale_requester: mpsc::Sender<()>,
|
||||
|
||||
/// The actual cgroup we are watching and managing.
|
||||
cgroup: cgroups_rs::Cgroup,
|
||||
}
|
||||
|
||||
/// Read memory.events for the desired event type.
|
||||
///
|
||||
/// `path` specifies the path to the desired `memory.events` file.
|
||||
/// For more info, see the `memory.events` section of the [kernel docs]
|
||||
/// <https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files>
|
||||
fn get_event_count(path: &str, event: MemoryEvent) -> anyhow::Result<u64> {
|
||||
let contents = fs::read_to_string(path)
|
||||
.with_context(|| format!("failed to read memory.events from {path}"))?;
|
||||
|
||||
// Then contents of the file look like:
|
||||
// low 42
|
||||
// high 101
|
||||
// ...
|
||||
contents
|
||||
.lines()
|
||||
.filter_map(|s| s.split_once(' '))
|
||||
.find(|(e, _)| *e == event.as_str())
|
||||
.ok_or_else(|| anyhow!("failed to find entry for memory.{event} events in {path}"))
|
||||
.and_then(|(_, count)| {
|
||||
count
|
||||
.parse::<u64>()
|
||||
.with_context(|| format!("failed to parse memory.{event} as u64"))
|
||||
})
|
||||
}
|
||||
|
||||
/// Create an event stream that produces events whenever the file at the provided
|
||||
/// path is modified.
|
||||
fn create_file_watcher(path: &str) -> anyhow::Result<EventStream<[u8; 1024]>> {
|
||||
info!("creating file watcher for {path}");
|
||||
let inotify = Inotify::init().context("failed to initialize file watcher")?;
|
||||
inotify
|
||||
.watches()
|
||||
.add(path, WatchMask::MODIFY)
|
||||
.with_context(|| format!("failed to start watching {path}"))?;
|
||||
inotify
|
||||
// The inotify docs use [0u8; 1024] so we'll just copy them. We only need
|
||||
// to store one event at a time - if the event gets written over, that's
|
||||
// ok. We still see that there is an event. For more information, see:
|
||||
// https://man7.org/linux/man-pages/man7/inotify.7.html
|
||||
.into_event_stream([0u8; 1024])
|
||||
.context("failed to start inotify event stream")
|
||||
}
|
||||
|
||||
impl CgroupWatcher {
|
||||
/// Create a new `CgroupWatcher`.
|
||||
#[tracing::instrument(skip_all, fields(%name))]
|
||||
pub fn new(
|
||||
name: String,
|
||||
// A channel on which to send upscale requests
|
||||
upscale_requester: mpsc::Sender<()>,
|
||||
) -> anyhow::Result<(Self, impl Stream<Item = Sequenced<u64>>)> {
|
||||
pub fn new(name: String) -> anyhow::Result<Self> {
|
||||
// TODO: clarify exactly why we need v2
|
||||
// Make sure cgroups v2 (aka unified) are supported
|
||||
if !is_cgroup2_unified_mode() {
|
||||
@@ -245,410 +62,203 @@ impl CgroupWatcher {
|
||||
}
|
||||
let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name);
|
||||
|
||||
// Start monitoring the cgroup for memory events. In general, for
|
||||
// cgroups v2 (aka unified), metrics are reported in files like
|
||||
// > `/sys/fs/cgroup/{name}/{metric}`
|
||||
// We are looking for `memory.high` events, which are stored in the
|
||||
// file `memory.events`. For more info, see the `memory.events` section
|
||||
// of https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files
|
||||
let path = format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name);
|
||||
let memory_events = create_file_watcher(&path)
|
||||
.with_context(|| format!("failed to create event watcher for {path}"))?
|
||||
// This would be nice with with .inspect_err followed by .ok
|
||||
.filter_map(move |_| match get_event_count(&path, MemoryEvent::High) {
|
||||
Ok(high) => Some(high),
|
||||
Err(error) => {
|
||||
// TODO: Might want to just panic here
|
||||
warn!(?error, "failed to read high events count from {}", &path);
|
||||
None
|
||||
}
|
||||
})
|
||||
// Only report the event if the memory.high count increased
|
||||
.filter_map(|high| {
|
||||
if MEMORY_EVENT_COUNT.fetch_max(high, Ordering::AcqRel) < high {
|
||||
Some(high)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.map(Sequenced::new);
|
||||
|
||||
let initial_count = get_event_count(
|
||||
&format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name),
|
||||
MemoryEvent::High,
|
||||
)?;
|
||||
|
||||
info!(initial_count, "initial memory.high event count");
|
||||
|
||||
// Hard update `MEMORY_EVENT_COUNT` since there could have been processes
|
||||
// running in the cgroup before that caused it to be non-zero.
|
||||
MEMORY_EVENT_COUNT.fetch_max(initial_count, Ordering::AcqRel);
|
||||
|
||||
Ok((
|
||||
Self {
|
||||
cgroup,
|
||||
upscale_requester,
|
||||
last_upscale_seqnum: AtomicU64::new(0),
|
||||
config: Default::default(),
|
||||
},
|
||||
memory_events,
|
||||
))
|
||||
Ok(Self {
|
||||
cgroup,
|
||||
config: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
/// The entrypoint for the `CgroupWatcher`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn watch<E>(
|
||||
pub async fn watch(
|
||||
&self,
|
||||
// These are ~dependency injected~ (fancy, I know) because this function
|
||||
// should never return.
|
||||
// -> therefore: when we tokio::spawn it, we don't await the JoinHandle.
|
||||
// -> therefore: if we want to stick it in an Arc so many threads can access
|
||||
// it, methods can never take mutable access.
|
||||
// - note: we use the Arc strategy so that a) we can call this function
|
||||
// right here and b) the runner can call the set/get_memory methods
|
||||
// -> since calling recv() on a tokio::sync::mpsc::Receiver takes &mut self,
|
||||
// we just pass them in here instead of holding them in fields, as that
|
||||
// would require this method to take &mut self.
|
||||
mut upscales: mpsc::Receiver<Sequenced<Resources>>,
|
||||
events: E,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
E: Stream<Item = Sequenced<u64>>,
|
||||
{
|
||||
let mut wait_to_freeze = pin!(tokio::time::sleep(Duration::ZERO));
|
||||
let mut last_memory_high_increase_at: Option<Instant> = None;
|
||||
let mut events = pin!(events);
|
||||
|
||||
// Are we waiting to be upscaled? Could be true if we request upscale due
|
||||
// to a memory.high event and it does not arrive in time.
|
||||
let mut waiting_on_upscale = false;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
upscale = upscales.recv() => {
|
||||
let Sequenced { seqnum, data } = upscale
|
||||
.context("failed to listen on upscale notification channel")?;
|
||||
waiting_on_upscale = false;
|
||||
last_memory_high_increase_at = None;
|
||||
self.last_upscale_seqnum.store(seqnum, Ordering::Release);
|
||||
info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale");
|
||||
}
|
||||
event = events.next() => {
|
||||
let Some(Sequenced { seqnum, .. }) = event else {
|
||||
bail!("failed to listen for memory.high events")
|
||||
};
|
||||
// The memory.high came before our last upscale, so we consider
|
||||
// it resolved
|
||||
if self.last_upscale_seqnum.fetch_max(seqnum, Ordering::AcqRel) > seqnum {
|
||||
info!(
|
||||
"received memory.high event, but it came before our last upscale -> ignoring it"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// The memory.high came after our latest upscale. We don't
|
||||
// want to do anything yet, so peek the next event in hopes
|
||||
// that it's an upscale.
|
||||
if let Some(upscale_num) = self
|
||||
.upscaled(&mut upscales)
|
||||
.context("failed to check if we were upscaled")?
|
||||
{
|
||||
if upscale_num > seqnum {
|
||||
info!(
|
||||
"received memory.high event, but it came before our last upscale -> ignoring it"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// If it's been long enough since we last froze, freeze the
|
||||
// cgroup and request upscale
|
||||
if wait_to_freeze.is_elapsed() {
|
||||
info!("received memory.high event -> requesting upscale");
|
||||
waiting_on_upscale = self
|
||||
.handle_memory_high_event(&mut upscales)
|
||||
.await
|
||||
.context("failed to handle upscale")?;
|
||||
wait_to_freeze
|
||||
.as_mut()
|
||||
.reset(Instant::now() + self.config.do_not_freeze_more_often_than);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ok, we can't freeze, just request upscale
|
||||
if !waiting_on_upscale {
|
||||
info!("received memory.high event, but too soon to refreeze -> requesting upscale");
|
||||
|
||||
// Make check to make sure we haven't been upscaled in the
|
||||
// meantine (can happen if the agent independently decides
|
||||
// to upscale us again)
|
||||
if self
|
||||
.upscaled(&mut upscales)
|
||||
.context("failed to check if we were upscaled")?
|
||||
.is_some()
|
||||
{
|
||||
info!("no need to request upscaling because we got upscaled");
|
||||
continue;
|
||||
}
|
||||
self.upscale_requester
|
||||
.send(())
|
||||
.await
|
||||
.context("failed to request upscale")?;
|
||||
waiting_on_upscale = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Shoot, we can't freeze or and we're still waiting on upscale,
|
||||
// increase memory.high to reduce throttling
|
||||
let can_increase_memory_high = match last_memory_high_increase_at {
|
||||
None => true,
|
||||
Some(t) => t.elapsed() > self.config.memory_high_increase_every,
|
||||
};
|
||||
if can_increase_memory_high {
|
||||
info!(
|
||||
"received memory.high event, \
|
||||
but too soon to refreeze and already requested upscale \
|
||||
-> increasing memory.high"
|
||||
);
|
||||
|
||||
// Make check to make sure we haven't been upscaled in the
|
||||
// meantine (can happen if the agent independently decides
|
||||
// to upscale us again)
|
||||
if self
|
||||
.upscaled(&mut upscales)
|
||||
.context("failed to check if we were upscaled")?
|
||||
.is_some()
|
||||
{
|
||||
info!("no need to increase memory.high because got upscaled");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Request upscale anyways (the agent will handle deduplicating
|
||||
// requests)
|
||||
self.upscale_requester
|
||||
.send(())
|
||||
.await
|
||||
.context("failed to request upscale")?;
|
||||
|
||||
let memory_high =
|
||||
self.get_memory_high_bytes().context("failed to get memory.high")?;
|
||||
let new_high = memory_high + self.config.memory_high_increase_by_bytes;
|
||||
info!(
|
||||
current_high_bytes = memory_high,
|
||||
new_high_bytes = new_high,
|
||||
"updating memory.high"
|
||||
);
|
||||
self.set_memory_high_bytes(new_high)
|
||||
.context("failed to set memory.high")?;
|
||||
last_memory_high_increase_at = Some(Instant::now());
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("received memory.high event, but can't do anything");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a `memory.high`, returning whether we are still waiting on upscale
|
||||
/// by the time the function returns.
|
||||
///
|
||||
/// The general plan for handling a `memory.high` event is as follows:
|
||||
/// 1. Freeze the cgroup
|
||||
/// 2. Start a timer for `self.config.max_upscale_wait`
|
||||
/// 3. Request upscale
|
||||
/// 4. After the timer elapses or we receive upscale, thaw the cgroup.
|
||||
/// 5. Return whether or not we are still waiting for upscale. If we are,
|
||||
/// we'll increase the cgroups memory.high to avoid getting oom killed
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn handle_memory_high_event(
|
||||
&self,
|
||||
upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
|
||||
) -> anyhow::Result<bool> {
|
||||
// Immediately freeze the cgroup before doing anything else.
|
||||
info!("received memory.high event -> freezing cgroup");
|
||||
self.freeze().context("failed to freeze cgroup")?;
|
||||
|
||||
// We'll use this for logging durations
|
||||
let start_time = Instant::now();
|
||||
|
||||
// Await the upscale until we have to unfreeze
|
||||
let timed =
|
||||
tokio::time::timeout(self.config.max_upscale_wait, self.await_upscale(upscales));
|
||||
|
||||
// Request the upscale
|
||||
info!(
|
||||
wait = ?self.config.max_upscale_wait,
|
||||
"sending request for immediate upscaling",
|
||||
);
|
||||
self.upscale_requester
|
||||
.send(())
|
||||
.await
|
||||
.context("failed to request upscale")?;
|
||||
|
||||
let waiting_on_upscale = match timed.await {
|
||||
Ok(Ok(())) => {
|
||||
info!(elapsed = ?start_time.elapsed(), "received upscale in time");
|
||||
false
|
||||
}
|
||||
// **important**: unfreeze the cgroup before ?-reporting the error
|
||||
Ok(Err(e)) => {
|
||||
info!("error waiting for upscale -> thawing cgroup");
|
||||
self.thaw()
|
||||
.context("failed to thaw cgroup after errored waiting for upscale")?;
|
||||
Err(e.context("failed to await upscale"))?
|
||||
}
|
||||
Err(_) => {
|
||||
info!(elapsed = ?self.config.max_upscale_wait, "timed out waiting for upscale");
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
info!("thawing cgroup");
|
||||
self.thaw().context("failed to thaw cgroup")?;
|
||||
|
||||
Ok(waiting_on_upscale)
|
||||
}
|
||||
|
||||
/// Checks whether we were just upscaled, returning the upscale's sequence
|
||||
/// number if so.
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn upscaled(
|
||||
&self,
|
||||
upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
|
||||
) -> anyhow::Result<Option<u64>> {
|
||||
let Sequenced { seqnum, data } = match upscales.try_recv() {
|
||||
Ok(upscale) => upscale,
|
||||
Err(TryRecvError::Empty) => return Ok(None),
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
bail!("upscale notification channel was disconnected")
|
||||
}
|
||||
};
|
||||
|
||||
// Make sure to update the last upscale sequence number
|
||||
self.last_upscale_seqnum.store(seqnum, Ordering::Release);
|
||||
info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale");
|
||||
Ok(Some(seqnum))
|
||||
}
|
||||
|
||||
/// Await an upscale event, discarding any `memory.high` events received in
|
||||
/// the process.
|
||||
///
|
||||
/// This is used in `handle_memory_high_event`, where we need to listen
|
||||
/// for upscales in particular so we know if we can thaw the cgroup early.
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn await_upscale(
|
||||
&self,
|
||||
upscales: &mut mpsc::Receiver<Sequenced<Resources>>,
|
||||
updates: watch::Sender<(Instant, MemoryHistory)>,
|
||||
) -> anyhow::Result<()> {
|
||||
let Sequenced { seqnum, .. } = upscales
|
||||
.recv()
|
||||
.await
|
||||
.context("error listening for upscales")?;
|
||||
// this requirement makes the code a bit easier to work with; see the config for more.
|
||||
assert!(self.config.memory_history_len <= self.config.memory_history_log_interval);
|
||||
|
||||
self.last_upscale_seqnum.store(seqnum, Ordering::Release);
|
||||
Ok(())
|
||||
}
|
||||
let mut ticker = tokio::time::interval(self.config.memory_poll_interval);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
// ticker.reset_immediately(); // FIXME: enable this once updating to tokio >= 1.30.0
|
||||
|
||||
/// Get the cgroup's name.
|
||||
pub fn path(&self) -> &str {
|
||||
self.cgroup.path()
|
||||
}
|
||||
}
|
||||
let mem_controller = self.memory()?;
|
||||
|
||||
// Methods for manipulating the actual cgroup
|
||||
impl CgroupWatcher {
|
||||
/// Get a handle on the freezer subsystem.
|
||||
fn freezer(&self) -> anyhow::Result<&FreezerController> {
|
||||
if let Some(Freezer(freezer)) = self
|
||||
.cgroup
|
||||
.subsystems()
|
||||
.iter()
|
||||
.find(|sub| matches!(sub, Freezer(_)))
|
||||
{
|
||||
Ok(freezer)
|
||||
} else {
|
||||
anyhow::bail!("could not find freezer subsystem")
|
||||
// buffer for samples that will be logged. once full, it remains so.
|
||||
let history_log_len = self.config.memory_history_log_interval;
|
||||
let mut history_log_buf = vec![MemoryStatus::zeroed(); history_log_len];
|
||||
|
||||
for t in 0_u64.. {
|
||||
ticker.tick().await;
|
||||
|
||||
let now = Instant::now();
|
||||
let mem = Self::memory_usage(mem_controller);
|
||||
|
||||
let i = t as usize % history_log_len;
|
||||
history_log_buf[i] = mem;
|
||||
|
||||
// We're taking *at most* memory_history_len values; we may be bounded by the total
|
||||
// number of samples that have come in so far.
|
||||
let samples_count = (t + 1).min(self.config.memory_history_len as u64) as usize;
|
||||
// NB: in `ring_buf_recent_values_iter`, `i` is *inclusive*, which matches the fact
|
||||
// that we just inserted a value there, so the end of the iterator will *include* the
|
||||
// value at i, rather than stopping just short of it.
|
||||
let samples = ring_buf_recent_values_iter(&history_log_buf, i, samples_count);
|
||||
|
||||
let summary = MemoryHistory {
|
||||
avg_non_reclaimable: samples.map(|h| h.non_reclaimable).sum::<u64>()
|
||||
/ samples_count as u64,
|
||||
samples_count,
|
||||
samples_span: self.config.memory_poll_interval * (samples_count - 1) as u32,
|
||||
};
|
||||
|
||||
// Log the current history if it's time to do so. Because `history_log_buf` has length
|
||||
// equal to the logging interval, we can just log the entire buffer every time we set
|
||||
// the last entry, which also means that for this log line, we can ignore that it's a
|
||||
// ring buffer (because all the entries are in order of increasing time).
|
||||
if i == history_log_len - 1 {
|
||||
info!(
|
||||
history = ?MemoryStatus::debug_slice(&history_log_buf),
|
||||
summary = ?summary,
|
||||
"Recent cgroup memory statistics history"
|
||||
);
|
||||
}
|
||||
|
||||
updates
|
||||
.send((now, summary))
|
||||
.context("failed to send MemoryHistory")?;
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to freeze the cgroup.
|
||||
pub fn freeze(&self) -> anyhow::Result<()> {
|
||||
self.freezer()
|
||||
.context("failed to get freezer subsystem")?
|
||||
.freeze()
|
||||
.context("failed to freeze")
|
||||
}
|
||||
|
||||
/// Attempt to thaw the cgroup.
|
||||
pub fn thaw(&self) -> anyhow::Result<()> {
|
||||
self.freezer()
|
||||
.context("failed to get freezer subsystem")?
|
||||
.thaw()
|
||||
.context("failed to thaw")
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
/// Get a handle on the memory subsystem.
|
||||
///
|
||||
/// Note: this method does not require `self.memory_update_lock` because
|
||||
/// getting a handle to the subsystem does not access any of the files we
|
||||
/// care about, such as memory.high and memory.events
|
||||
fn memory(&self) -> anyhow::Result<&MemController> {
|
||||
if let Some(Mem(memory)) = self
|
||||
.cgroup
|
||||
self.cgroup
|
||||
.subsystems()
|
||||
.iter()
|
||||
.find(|sub| matches!(sub, Mem(_)))
|
||||
{
|
||||
Ok(memory)
|
||||
} else {
|
||||
anyhow::bail!("could not find memory subsystem")
|
||||
}
|
||||
}
|
||||
|
||||
/// Get cgroup current memory usage.
|
||||
pub fn current_memory_usage(&self) -> anyhow::Result<u64> {
|
||||
Ok(self
|
||||
.memory()
|
||||
.context("failed to get memory subsystem")?
|
||||
.memory_stat()
|
||||
.usage_in_bytes)
|
||||
}
|
||||
|
||||
/// Set cgroup memory.high threshold.
|
||||
pub fn set_memory_high_bytes(&self, bytes: u64) -> anyhow::Result<()> {
|
||||
self.set_memory_high_internal(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64))
|
||||
}
|
||||
|
||||
/// Set the cgroup's memory.high to 'max', disabling it.
|
||||
pub fn unset_memory_high(&self) -> anyhow::Result<()> {
|
||||
self.set_memory_high_internal(MaxValue::Max)
|
||||
}
|
||||
|
||||
fn set_memory_high_internal(&self, value: MaxValue) -> anyhow::Result<()> {
|
||||
self.memory()
|
||||
.context("failed to get memory subsystem")?
|
||||
.set_mem(cgroups_rs::memory::SetMemory {
|
||||
low: None,
|
||||
high: Some(value),
|
||||
min: None,
|
||||
max: None,
|
||||
.find_map(|sub| match sub {
|
||||
Subsystem::Mem(c) => Some(c),
|
||||
_ => None,
|
||||
})
|
||||
.map_err(anyhow::Error::from)
|
||||
.ok_or_else(|| anyhow!("could not find memory subsystem"))
|
||||
}
|
||||
|
||||
/// Get memory.high threshold.
|
||||
pub fn get_memory_high_bytes(&self) -> anyhow::Result<u64> {
|
||||
let high = self
|
||||
.memory()
|
||||
.context("failed to get memory subsystem while getting memory statistics")?
|
||||
.get_mem()
|
||||
.map(|mem| mem.high)
|
||||
.context("failed to get memory statistics from subsystem")?;
|
||||
match high {
|
||||
Some(MaxValue::Max) => Ok(i64::MAX as u64),
|
||||
Some(MaxValue::Value(high)) => Ok(high as u64),
|
||||
None => anyhow::bail!("failed to read memory.high from memory subsystem"),
|
||||
/// Given a handle on the memory subsystem, returns the current memory information
|
||||
fn memory_usage(mem_controller: &MemController) -> MemoryStatus {
|
||||
let stat = mem_controller.memory_stat().stat;
|
||||
MemoryStatus {
|
||||
non_reclaimable: stat.active_anon + stat.inactive_anon,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function for `CgroupWatcher::watch`
|
||||
fn ring_buf_recent_values_iter<T>(
|
||||
buf: &[T],
|
||||
last_value_idx: usize,
|
||||
count: usize,
|
||||
) -> impl '_ + Iterator<Item = &T> {
|
||||
// Assertion carried over from `CgroupWatcher::watch`, to make the logic in this function
|
||||
// easier (we only have to add `buf.len()` once, rather than a dynamic number of times).
|
||||
assert!(count <= buf.len());
|
||||
|
||||
buf.iter()
|
||||
// 'cycle' because the values could wrap around
|
||||
.cycle()
|
||||
// with 'cycle', this skip is more like 'offset', and functionally this is
|
||||
// offsettting by 'last_value_idx - count (mod buf.len())', but we have to be
|
||||
// careful to avoid underflow, so we pre-add buf.len().
|
||||
// The '+ 1' is because `last_value_idx` is inclusive, rather than exclusive.
|
||||
.skip((buf.len() + last_value_idx + 1 - count) % buf.len())
|
||||
.take(count)
|
||||
}
|
||||
|
||||
/// Summary of recent memory usage
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct MemoryHistory {
|
||||
/// Rolling average of non-reclaimable memory usage samples over the last `history_period`
|
||||
pub avg_non_reclaimable: u64,
|
||||
|
||||
/// The number of samples used to construct this summary
|
||||
pub samples_count: usize,
|
||||
/// Total timespan between the first and last sample used for this summary
|
||||
pub samples_span: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct MemoryStatus {
|
||||
non_reclaimable: u64,
|
||||
}
|
||||
|
||||
impl MemoryStatus {
|
||||
fn zeroed() -> Self {
|
||||
MemoryStatus { non_reclaimable: 0 }
|
||||
}
|
||||
|
||||
fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
|
||||
struct DS<'a>(&'a [MemoryStatus]);
|
||||
|
||||
impl<'a> Debug for DS<'a> {
|
||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||
f.debug_struct("[MemoryStatus]")
|
||||
.field(
|
||||
"non_reclaimable[..]",
|
||||
&Fields(self.0, |stat: &MemoryStatus| {
|
||||
BytesToGB(stat.non_reclaimable)
|
||||
}),
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
struct Fields<'a, F>(&'a [MemoryStatus], F);
|
||||
|
||||
impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> {
|
||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||
f.debug_list().entries(self.0.iter().map(&self.1)).finish()
|
||||
}
|
||||
}
|
||||
|
||||
struct BytesToGB(u64);
|
||||
|
||||
impl Debug for BytesToGB {
|
||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||
f.write_fmt(format_args!(
|
||||
"{:.3}Gi",
|
||||
self.0 as f64 / (1_u64 << 30) as f64
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
DS(slice)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn ring_buf_iter() {
|
||||
let buf = vec![0_i32, 1, 2, 3, 4, 5, 6, 7, 8, 9];
|
||||
|
||||
let values = |offset, count| {
|
||||
super::ring_buf_recent_values_iter(&buf, offset, count)
|
||||
.copied()
|
||||
.collect::<Vec<i32>>()
|
||||
};
|
||||
|
||||
// Boundary conditions: start, end, and entire thing:
|
||||
assert_eq!(values(0, 1), [0]);
|
||||
assert_eq!(values(3, 4), [0, 1, 2, 3]);
|
||||
assert_eq!(values(9, 4), [6, 7, 8, 9]);
|
||||
assert_eq!(values(9, 10), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
|
||||
|
||||
// "normal" operation: no wraparound
|
||||
assert_eq!(values(7, 4), [4, 5, 6, 7]);
|
||||
|
||||
// wraparound:
|
||||
assert_eq!(values(0, 4), [7, 8, 9, 0]);
|
||||
assert_eq!(values(1, 4), [8, 9, 0, 1]);
|
||||
assert_eq!(values(2, 4), [9, 0, 1, 2]);
|
||||
assert_eq!(values(2, 10), [3, 4, 5, 6, 7, 8, 9, 0, 1, 2]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,12 +12,10 @@ use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::info;
|
||||
|
||||
use crate::cgroup::Sequenced;
|
||||
use crate::protocol::{
|
||||
OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, Resources, PROTOCOL_MAX_VERSION,
|
||||
OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, PROTOCOL_MAX_VERSION,
|
||||
PROTOCOL_MIN_VERSION,
|
||||
};
|
||||
|
||||
@@ -36,13 +34,6 @@ pub struct Dispatcher {
|
||||
/// We send messages to the agent through `sink`
|
||||
sink: SplitSink<WebSocket, Message>,
|
||||
|
||||
/// Used to notify the cgroup when we are upscaled.
|
||||
pub(crate) notify_upscale_events: mpsc::Sender<Sequenced<Resources>>,
|
||||
|
||||
/// When the cgroup requests upscale it will send on this channel. In response
|
||||
/// we send an `UpscaleRequst` to the agent.
|
||||
pub(crate) request_upscale_events: mpsc::Receiver<()>,
|
||||
|
||||
/// The protocol version we have agreed to use with the agent. This is negotiated
|
||||
/// during the creation of the dispatcher, and should be the highest shared protocol
|
||||
/// version.
|
||||
@@ -61,11 +52,7 @@ impl Dispatcher {
|
||||
/// 1. Wait for the agent to sent the range of protocols it supports.
|
||||
/// 2. Send a protocol version that works for us as well, or an error if there
|
||||
/// is no compatible version.
|
||||
pub async fn new(
|
||||
stream: WebSocket,
|
||||
notify_upscale_events: mpsc::Sender<Sequenced<Resources>>,
|
||||
request_upscale_events: mpsc::Receiver<()>,
|
||||
) -> anyhow::Result<Self> {
|
||||
pub async fn new(stream: WebSocket) -> anyhow::Result<Self> {
|
||||
let (mut sink, mut source) = stream.split();
|
||||
|
||||
// Figure out the highest protocol version we both support
|
||||
@@ -119,22 +106,10 @@ impl Dispatcher {
|
||||
Ok(Self {
|
||||
sink,
|
||||
source,
|
||||
notify_upscale_events,
|
||||
request_upscale_events,
|
||||
proto_version: highest_shared_version,
|
||||
})
|
||||
}
|
||||
|
||||
/// Notify the cgroup manager that we have received upscale and wait for
|
||||
/// the acknowledgement.
|
||||
#[tracing::instrument(skip_all, fields(?resources))]
|
||||
pub async fn notify_upscale(&self, resources: Sequenced<Resources>) -> anyhow::Result<()> {
|
||||
self.notify_upscale_events
|
||||
.send(resources)
|
||||
.await
|
||||
.context("failed to send resources and oneshot sender across channel")
|
||||
}
|
||||
|
||||
/// Send a message to the agent.
|
||||
///
|
||||
/// Although this function is small, it has one major benefit: it is the only
|
||||
|
||||
@@ -5,18 +5,16 @@
|
||||
//! all functionality.
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use axum::extract::ws::{Message, WebSocket};
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::{broadcast, watch};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::cgroup::{CgroupWatcher, Sequenced};
|
||||
use crate::cgroup::{self, CgroupWatcher};
|
||||
use crate::dispatcher::Dispatcher;
|
||||
use crate::filecache::{FileCacheConfig, FileCacheState};
|
||||
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
|
||||
@@ -28,7 +26,7 @@ use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args
|
||||
pub struct Runner {
|
||||
config: Config,
|
||||
filecache: Option<FileCacheState>,
|
||||
cgroup: Option<Arc<CgroupWatcher>>,
|
||||
cgroup: Option<CgroupState>,
|
||||
dispatcher: Dispatcher,
|
||||
|
||||
/// We "mint" new message ids by incrementing this counter and taking the value.
|
||||
@@ -45,6 +43,14 @@ pub struct Runner {
|
||||
kill: broadcast::Receiver<()>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct CgroupState {
|
||||
watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
|
||||
/// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
|
||||
/// requests.
|
||||
threshold: u64,
|
||||
}
|
||||
|
||||
/// Configuration for a `Runner`
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
@@ -62,16 +68,56 @@ pub struct Config {
|
||||
/// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
|
||||
/// should be removed once we have a better solution there.
|
||||
sys_buffer_bytes: u64,
|
||||
|
||||
/// Minimum fraction of total system memory reserved *before* the the cgroup threshold; in
|
||||
/// other words, providing a ceiling for the highest value of the threshold by enforcing that
|
||||
/// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
|
||||
/// threshold.
|
||||
///
|
||||
/// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
|
||||
/// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
|
||||
/// memory.
|
||||
///
|
||||
/// The default value of `0.15` means that we *guarantee* sending upscale requests if the
|
||||
/// cgroup is using more than 85% of total memory (even if we're *not* separately reserving
|
||||
/// memory for the file cache).
|
||||
cgroup_min_overhead_fraction: f64,
|
||||
|
||||
cgroup_downscale_threshold_buffer_bytes: u64,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
sys_buffer_bytes: 100 * MiB,
|
||||
cgroup_min_overhead_fraction: 0.15,
|
||||
cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
fn cgroup_threshold(&self, total_mem: u64, file_cache_disk_size: u64) -> u64 {
|
||||
// If the file cache is in tmpfs, then it will count towards shmem usage of the cgroup,
|
||||
// and thus be non-reclaimable, so we should allow for additional memory usage.
|
||||
//
|
||||
// If the file cache sits on disk, our desired stable system state is for it to be fully
|
||||
// page cached (its contents should only be paged to/from disk in situations where we can't
|
||||
// upscale fast enough). Page-cached memory is reclaimable, so we need to lower the
|
||||
// threshold for non-reclaimable memory so we scale up *before* the kernel starts paging
|
||||
// out the file cache.
|
||||
let memory_remaining_for_cgroup = total_mem.saturating_sub(file_cache_disk_size);
|
||||
|
||||
// Even if we're not separately making room for the file cache (if it's in tmpfs), we still
|
||||
// want our threshold to be met gracefully instead of letting postgres get OOM-killed.
|
||||
// So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
|
||||
// remaining above the threshold.
|
||||
let max_threshold = (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64;
|
||||
|
||||
memory_remaining_for_cgroup.min(max_threshold)
|
||||
}
|
||||
}
|
||||
|
||||
impl Runner {
|
||||
/// Create a new monitor.
|
||||
#[tracing::instrument(skip_all, fields(?config, ?args))]
|
||||
@@ -87,12 +133,7 @@ impl Runner {
|
||||
"invalid monitor Config: sys_buffer_bytes cannot be 0"
|
||||
);
|
||||
|
||||
// *NOTE*: the dispatcher and cgroup manager talk through these channels
|
||||
// so make sure they each get the correct half, nothing is droppped, etc.
|
||||
let (notified_send, notified_recv) = mpsc::channel(1);
|
||||
let (requesting_send, requesting_recv) = mpsc::channel(1);
|
||||
|
||||
let dispatcher = Dispatcher::new(ws, notified_send, requesting_recv)
|
||||
let dispatcher = Dispatcher::new(ws)
|
||||
.await
|
||||
.context("error creating new dispatcher")?;
|
||||
|
||||
@@ -106,46 +147,10 @@ impl Runner {
|
||||
kill,
|
||||
};
|
||||
|
||||
// If we have both the cgroup and file cache integrations enabled, it's possible for
|
||||
// temporary failures to result in cgroup throttling (from memory.high), that in turn makes
|
||||
// it near-impossible to connect to the file cache (because it times out). Unfortunately,
|
||||
// we *do* still want to determine the file cache size before setting the cgroup's
|
||||
// memory.high, so it's not as simple as just swapping the order.
|
||||
//
|
||||
// Instead, the resolution here is that on vm-monitor startup (note: happens on each
|
||||
// connection from autoscaler-agent, possibly multiple times per compute_ctl lifecycle), we
|
||||
// temporarily unset memory.high, to allow any existing throttling to dissipate. It's a bit
|
||||
// of a hacky solution, but helps with reliability.
|
||||
if let Some(name) = &args.cgroup {
|
||||
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
|
||||
// now, and then set limits later.
|
||||
info!("initializing cgroup");
|
||||
|
||||
let (cgroup, cgroup_event_stream) = CgroupWatcher::new(name.clone(), requesting_send)
|
||||
.context("failed to create cgroup manager")?;
|
||||
|
||||
info!("temporarily unsetting memory.high");
|
||||
|
||||
// Temporarily un-set cgroup memory.high; see above.
|
||||
cgroup
|
||||
.unset_memory_high()
|
||||
.context("failed to unset memory.high")?;
|
||||
|
||||
let cgroup = Arc::new(cgroup);
|
||||
|
||||
let cgroup_clone = Arc::clone(&cgroup);
|
||||
spawn_with_cancel(
|
||||
token.clone(),
|
||||
|_| error!("cgroup watcher terminated"),
|
||||
async move { cgroup_clone.watch(notified_recv, cgroup_event_stream).await },
|
||||
);
|
||||
|
||||
state.cgroup = Some(cgroup);
|
||||
}
|
||||
|
||||
let mut file_cache_reserved_bytes = 0;
|
||||
let mem = get_total_system_memory();
|
||||
|
||||
let mut file_cache_disk_size = 0;
|
||||
|
||||
// We need to process file cache initialization before cgroup initialization, so that the memory
|
||||
// allocated to the file cache is appropriately taken into account when we decide the cgroup's
|
||||
// memory limits.
|
||||
@@ -156,7 +161,7 @@ impl Runner {
|
||||
false => FileCacheConfig::default_in_memory(),
|
||||
};
|
||||
|
||||
let mut file_cache = FileCacheState::new(connstr, config, token)
|
||||
let mut file_cache = FileCacheState::new(connstr, config, token.clone())
|
||||
.await
|
||||
.context("failed to create file cache")?;
|
||||
|
||||
@@ -181,23 +186,40 @@ impl Runner {
|
||||
if actual_size != new_size {
|
||||
info!("file cache size actually got set to {actual_size}")
|
||||
}
|
||||
// Mark the resources given to the file cache as reserved, but only if it's in memory.
|
||||
if !args.file_cache_on_disk {
|
||||
file_cache_reserved_bytes = actual_size;
|
||||
|
||||
if args.file_cache_on_disk {
|
||||
file_cache_disk_size = actual_size;
|
||||
}
|
||||
|
||||
state.filecache = Some(file_cache);
|
||||
}
|
||||
|
||||
if let Some(cgroup) = &state.cgroup {
|
||||
let available = mem - file_cache_reserved_bytes;
|
||||
let value = cgroup.config.calculate_memory_high_value(available);
|
||||
if let Some(name) = &args.cgroup {
|
||||
// Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
|
||||
// now, and then set limits later.
|
||||
info!("initializing cgroup");
|
||||
|
||||
info!(value, "setting memory.high");
|
||||
let cgroup =
|
||||
CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
|
||||
|
||||
cgroup
|
||||
.set_memory_high_bytes(value)
|
||||
.context("failed to set cgroup memory.high")?;
|
||||
let init_value = cgroup::MemoryHistory {
|
||||
avg_non_reclaimable: 0,
|
||||
samples_count: 0,
|
||||
samples_span: Duration::ZERO,
|
||||
};
|
||||
let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
|
||||
|
||||
spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
|
||||
cgroup.watch(hist_tx).await
|
||||
});
|
||||
|
||||
let threshold = state.config.cgroup_threshold(mem, file_cache_disk_size);
|
||||
info!(threshold, "set initial cgroup threshold",);
|
||||
|
||||
state.cgroup = Some(CgroupState {
|
||||
watcher: hist_rx,
|
||||
threshold,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(state)
|
||||
@@ -217,28 +239,51 @@ impl Runner {
|
||||
|
||||
let requested_mem = target.mem;
|
||||
let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
|
||||
let expected_file_cache_mem_usage = self
|
||||
let (expected_file_cache_size, expected_file_cache_disk_size) = self
|
||||
.filecache
|
||||
.as_ref()
|
||||
.map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory))
|
||||
.unwrap_or(0);
|
||||
let mut new_cgroup_mem_high = 0;
|
||||
.map(|file_cache| {
|
||||
let size = file_cache.config.calculate_cache_size(usable_system_memory);
|
||||
match file_cache.config.in_memory {
|
||||
true => (size, 0),
|
||||
false => (size, size),
|
||||
}
|
||||
})
|
||||
.unwrap_or((0, 0));
|
||||
if let Some(cgroup) = &self.cgroup {
|
||||
new_cgroup_mem_high = cgroup
|
||||
let (last_time, last_history) = *cgroup.watcher.borrow();
|
||||
|
||||
// NB: The ordering of these conditions is intentional. During startup, we should deny
|
||||
// downscaling until we have enough information to determine that it's safe to do so
|
||||
// (i.e. enough samples have come in). But if it's been a while and we *still* haven't
|
||||
// received any information, we should *fail* instead of just denying downscaling.
|
||||
//
|
||||
// `last_time` is set to `Instant::now()` on startup, so checking `last_time.elapsed()`
|
||||
// serves double-duty: it trips if we haven't received *any* metrics for long enough,
|
||||
// OR if we haven't received metrics *recently enough*.
|
||||
//
|
||||
// TODO: make the duration here configurable.
|
||||
if last_time.elapsed() > Duration::from_secs(5) {
|
||||
bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
|
||||
} else if last_history.samples_count <= 1 {
|
||||
let status = "haven't received enough cgroup memory stats yet";
|
||||
info!(status, "discontinuing downscale");
|
||||
return Ok((false, status.to_owned()));
|
||||
}
|
||||
|
||||
let new_threshold = self
|
||||
.config
|
||||
.calculate_memory_high_value(usable_system_memory - expected_file_cache_mem_usage);
|
||||
.cgroup_threshold(usable_system_memory, expected_file_cache_disk_size);
|
||||
|
||||
let current = cgroup
|
||||
.current_memory_usage()
|
||||
.context("failed to fetch cgroup memory")?;
|
||||
let current = last_history.avg_non_reclaimable;
|
||||
|
||||
if new_cgroup_mem_high < current + cgroup.config.memory_high_buffer_bytes {
|
||||
if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
|
||||
let status = format!(
|
||||
"{}: {} MiB (new high) < {} (current usage) + {} (buffer)",
|
||||
"calculated memory.high too low",
|
||||
bytes_to_mebibytes(new_cgroup_mem_high),
|
||||
"{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
|
||||
"calculated memory threshold too low",
|
||||
bytes_to_mebibytes(new_threshold),
|
||||
bytes_to_mebibytes(current),
|
||||
bytes_to_mebibytes(cgroup.config.memory_high_buffer_bytes)
|
||||
bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
|
||||
);
|
||||
|
||||
info!(status, "discontinuing downscale");
|
||||
@@ -249,14 +294,14 @@ impl Runner {
|
||||
|
||||
// The downscaling has been approved. Downscale the file cache, then the cgroup.
|
||||
let mut status = vec![];
|
||||
let mut file_cache_mem_usage = 0;
|
||||
let mut file_cache_disk_size = 0;
|
||||
if let Some(file_cache) = &mut self.filecache {
|
||||
let actual_usage = file_cache
|
||||
.set_file_cache_size(expected_file_cache_mem_usage)
|
||||
.set_file_cache_size(expected_file_cache_size)
|
||||
.await
|
||||
.context("failed to set file cache size")?;
|
||||
if file_cache.config.in_memory {
|
||||
file_cache_mem_usage = actual_usage;
|
||||
if !file_cache.config.in_memory {
|
||||
file_cache_disk_size = actual_usage;
|
||||
}
|
||||
let message = format!(
|
||||
"set file cache size to {} MiB (in memory = {})",
|
||||
@@ -267,24 +312,18 @@ impl Runner {
|
||||
status.push(message);
|
||||
}
|
||||
|
||||
if let Some(cgroup) = &self.cgroup {
|
||||
let available_memory = usable_system_memory - file_cache_mem_usage;
|
||||
|
||||
if file_cache_mem_usage != expected_file_cache_mem_usage {
|
||||
new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
|
||||
}
|
||||
|
||||
// new_cgroup_mem_high is initialized to 0 but it is guaranteed to not be here
|
||||
// since it is properly initialized in the previous cgroup if let block
|
||||
cgroup
|
||||
.set_memory_high_bytes(new_cgroup_mem_high)
|
||||
.context("failed to set cgroup memory.high")?;
|
||||
if let Some(cgroup) = &mut self.cgroup {
|
||||
let new_threshold = self
|
||||
.config
|
||||
.cgroup_threshold(usable_system_memory, file_cache_disk_size);
|
||||
|
||||
let message = format!(
|
||||
"set cgroup memory.high to {} MiB, of new max {} MiB",
|
||||
bytes_to_mebibytes(new_cgroup_mem_high),
|
||||
bytes_to_mebibytes(available_memory)
|
||||
"set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
|
||||
bytes_to_mebibytes(cgroup.threshold),
|
||||
bytes_to_mebibytes(new_threshold),
|
||||
bytes_to_mebibytes(usable_system_memory)
|
||||
);
|
||||
cgroup.threshold = new_threshold;
|
||||
info!("downscale: {message}");
|
||||
status.push(message);
|
||||
}
|
||||
@@ -305,8 +344,7 @@ impl Runner {
|
||||
let new_mem = resources.mem;
|
||||
let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
|
||||
|
||||
// Get the file cache's expected contribution to the memory usage
|
||||
let mut file_cache_mem_usage = 0;
|
||||
let mut file_cache_disk_size = 0;
|
||||
if let Some(file_cache) = &mut self.filecache {
|
||||
let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
|
||||
info!(
|
||||
@@ -319,8 +357,8 @@ impl Runner {
|
||||
.set_file_cache_size(expected_usage)
|
||||
.await
|
||||
.context("failed to set file cache size")?;
|
||||
if file_cache.config.in_memory {
|
||||
file_cache_mem_usage = actual_usage;
|
||||
if !file_cache.config.in_memory {
|
||||
file_cache_disk_size = actual_usage;
|
||||
}
|
||||
|
||||
if actual_usage != expected_usage {
|
||||
@@ -332,18 +370,18 @@ impl Runner {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(cgroup) = &self.cgroup {
|
||||
let available_memory = usable_system_memory - file_cache_mem_usage;
|
||||
let new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
|
||||
if let Some(cgroup) = &mut self.cgroup {
|
||||
let new_threshold = self
|
||||
.config
|
||||
.cgroup_threshold(usable_system_memory, file_cache_disk_size);
|
||||
|
||||
info!(
|
||||
target = bytes_to_mebibytes(new_cgroup_mem_high),
|
||||
total = bytes_to_mebibytes(new_mem),
|
||||
name = cgroup.path(),
|
||||
"updating cgroup memory.high",
|
||||
"set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
|
||||
bytes_to_mebibytes(cgroup.threshold),
|
||||
bytes_to_mebibytes(new_threshold),
|
||||
bytes_to_mebibytes(usable_system_memory)
|
||||
);
|
||||
cgroup
|
||||
.set_memory_high_bytes(new_cgroup_mem_high)
|
||||
.context("failed to set cgroup memory.high")?;
|
||||
cgroup.threshold = new_threshold;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -361,10 +399,6 @@ impl Runner {
|
||||
self.handle_upscale(granted)
|
||||
.await
|
||||
.context("failed to handle upscale")?;
|
||||
self.dispatcher
|
||||
.notify_upscale(Sequenced::new(granted))
|
||||
.await
|
||||
.context("failed to notify notify cgroup of upscale")?;
|
||||
Ok(Some(OutboundMsg::new(
|
||||
OutboundMsgKind::UpscaleConfirmation {},
|
||||
id,
|
||||
@@ -408,33 +442,53 @@ impl Runner {
|
||||
Err(e) => bail!("failed to receive kill signal: {e}")
|
||||
}
|
||||
}
|
||||
// we need to propagate an upscale request
|
||||
request = self.dispatcher.request_upscale_events.recv(), if self.cgroup.is_some() => {
|
||||
if request.is_none() {
|
||||
bail!("failed to listen for upscale event from cgroup")
|
||||
|
||||
// New memory stats from the cgroup, *may* need to request upscaling, if we've
|
||||
// exceeded the threshold
|
||||
result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
|
||||
result.context("failed to receive from cgroup memory stats watcher")?;
|
||||
|
||||
let cgroup = self.cgroup.as_ref().unwrap();
|
||||
|
||||
let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
|
||||
|
||||
// If we haven't exceeded the threshold, then we're all ok
|
||||
if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If it's been less than 1 second since the last time we requested upscaling,
|
||||
// ignore the event, to avoid spamming the agent (otherwise, this can happen
|
||||
// ~1k times per second).
|
||||
// Otherwise, we generally want upscaling. But, if it's been less than 1 second
|
||||
// since the last time we requested upscaling, ignore the event, to avoid
|
||||
// spamming the agent.
|
||||
if let Some(t) = self.last_upscale_request_at {
|
||||
let elapsed = t.elapsed();
|
||||
if elapsed < Duration::from_secs(1) {
|
||||
info!(elapsed_millis = elapsed.as_millis(), "cgroup asked for upscale but too soon to forward the request, ignoring");
|
||||
info!(
|
||||
elapsed_millis = elapsed.as_millis(),
|
||||
avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
|
||||
threshold = bytes_to_mebibytes(cgroup.threshold),
|
||||
"cgroup memory stats are high enough to upscale but too soon to forward the request, ignoring",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
self.last_upscale_request_at = Some(Instant::now());
|
||||
|
||||
info!("cgroup asking for upscale; forwarding request");
|
||||
info!(
|
||||
avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
|
||||
threshold = bytes_to_mebibytes(cgroup.threshold),
|
||||
"cgroup memory stats are high enough to upscale, requesting upscale",
|
||||
);
|
||||
|
||||
self.counter += 2; // Increment, preserving parity (i.e. keep the
|
||||
// counter odd). See the field comment for more.
|
||||
self.dispatcher
|
||||
.send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
|
||||
.await
|
||||
.context("failed to send message")?;
|
||||
}
|
||||
},
|
||||
|
||||
// there is a message from the agent
|
||||
msg = self.dispatcher.source.next() => {
|
||||
if let Some(msg) = msg {
|
||||
@@ -462,11 +516,14 @@ impl Runner {
|
||||
Ok(Some(out)) => out,
|
||||
Ok(None) => continue,
|
||||
Err(e) => {
|
||||
let error = e.to_string();
|
||||
warn!(?error, "error handling message");
|
||||
// use {:#} for our logging because the display impl only
|
||||
// gives the outermost cause, and the debug impl
|
||||
// pretty-prints the error, whereas {:#} contains all the
|
||||
// causes, but is compact (no newlines).
|
||||
warn!(error = format!("{e:#}"), "error handling message");
|
||||
OutboundMsg::new(
|
||||
OutboundMsgKind::InternalError {
|
||||
error
|
||||
error: e.to_string(),
|
||||
},
|
||||
message.id
|
||||
)
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//! and push them to a HTTP endpoint.
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::{mgr, LogicalSizeCalculationCause};
|
||||
use camino::Utf8PathBuf;
|
||||
use consumption_metrics::EventType;
|
||||
@@ -143,7 +144,7 @@ pub async fn collect_metrics(
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
tick_at.elapsed(),
|
||||
metric_collection_interval,
|
||||
"consumption_metrics_collect_metrics",
|
||||
BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -268,6 +269,11 @@ async fn calculate_synthetic_size_worker(
|
||||
}
|
||||
|
||||
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
|
||||
// TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
|
||||
// We can put in some prioritization for consumption metrics.
|
||||
// Same for the loop that fetches computed metrics.
|
||||
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
|
||||
// which turns out is really handy to understand the system.
|
||||
if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await {
|
||||
error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
|
||||
}
|
||||
@@ -277,7 +283,7 @@ async fn calculate_synthetic_size_worker(
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
tick_at.elapsed(),
|
||||
synthetic_size_calculation_interval,
|
||||
"consumption_metrics_synthetic_size_worker",
|
||||
BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1067,6 +1067,26 @@ pub(crate) static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
.expect("Failed to register tenant_task_events metric")
|
||||
});
|
||||
|
||||
pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT: Lazy<IntCounterVec> =
|
||||
Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_background_loop_semaphore_wait_start_count",
|
||||
"Counter for background loop concurrency-limiting semaphore acquire calls started",
|
||||
&["task"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT: Lazy<IntCounterVec> =
|
||||
Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_background_loop_semaphore_wait_finish_count",
|
||||
"Counter for background loop concurrency-limiting semaphore acquire calls finished",
|
||||
&["task"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub(crate) static BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_background_loop_period_overrun_count",
|
||||
|
||||
@@ -14,6 +14,73 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::completion;
|
||||
|
||||
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub(crate) enum BackgroundLoopKind {
|
||||
Compaction,
|
||||
Gc,
|
||||
Eviction,
|
||||
ConsumptionMetricsCollectMetrics,
|
||||
ConsumptionMetricsSyntheticSizeWorker,
|
||||
}
|
||||
|
||||
impl BackgroundLoopKind {
|
||||
fn as_static_str(&self) -> &'static str {
|
||||
let s: &'static str = self.into();
|
||||
s
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum RateLimitError {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub(crate) async fn concurrent_background_tasks_rate_limit(
|
||||
loop_kind: BackgroundLoopKind,
|
||||
_ctx: &RequestContext,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<impl Drop, RateLimitError> {
|
||||
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT
|
||||
.with_label_values(&[loop_kind.as_static_str()])
|
||||
.inc();
|
||||
scopeguard::defer!(
|
||||
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc();
|
||||
);
|
||||
tokio::select! {
|
||||
permit = CONCURRENT_BACKGROUND_TASKS.acquire() => {
|
||||
match permit {
|
||||
Ok(permit) => Ok(permit),
|
||||
Err(_closed) => unreachable!("we never close the semaphore"),
|
||||
}
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
Err(RateLimitError::Cancelled)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Start per tenant background loops: compaction and gc.
|
||||
pub fn start_background_loops(
|
||||
tenant: &Arc<Tenant>,
|
||||
@@ -116,7 +183,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
}
|
||||
};
|
||||
|
||||
warn_when_period_overrun(started_at.elapsed(), period, "compaction");
|
||||
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
@@ -184,7 +251,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
}
|
||||
};
|
||||
|
||||
warn_when_period_overrun(started_at.elapsed(), period, "gc");
|
||||
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc);
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
@@ -258,7 +325,11 @@ pub(crate) async fn random_init_delay(
|
||||
}
|
||||
|
||||
/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
|
||||
pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
|
||||
pub(crate) fn warn_when_period_overrun(
|
||||
elapsed: Duration,
|
||||
period: Duration,
|
||||
task: BackgroundLoopKind,
|
||||
) {
|
||||
// Duration::ZERO will happen because it's the "disable [bgtask]" value.
|
||||
if elapsed >= period && period != Duration::ZERO {
|
||||
// humantime does no significant digits clamping whereas Duration's debug is a bit more
|
||||
@@ -267,11 +338,11 @@ pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task
|
||||
warn!(
|
||||
?elapsed,
|
||||
period = %humantime::format_duration(period),
|
||||
task,
|
||||
?task,
|
||||
"task iteration took longer than the configured period"
|
||||
);
|
||||
crate::metrics::BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT
|
||||
.with_label_values(&[task, &format!("{}", period.as_secs())])
|
||||
.with_label_values(&[task.as_static_str(), &format!("{}", period.as_secs())])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer,
|
||||
};
|
||||
use crate::tenant::tasks::{BackgroundLoopKind, RateLimitError};
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
@@ -684,37 +685,17 @@ impl Timeline {
|
||||
) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = tokio::select! {
|
||||
permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
permit
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
return Ok(());
|
||||
}
|
||||
let _permit = match super::tasks::concurrent_background_tasks_rate_limit(
|
||||
BackgroundLoopKind::Compaction,
|
||||
ctx,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(permit) => permit,
|
||||
Err(RateLimitError::Cancelled) => return Ok(()),
|
||||
};
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
@@ -30,6 +30,7 @@ use crate::{
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
storage_layer::PersistentLayer,
|
||||
tasks::{BackgroundLoopKind, RateLimitError},
|
||||
timeline::EvictionError,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
@@ -129,7 +130,11 @@ impl Timeline {
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
let elapsed = start.elapsed();
|
||||
crate::tenant::tasks::warn_when_period_overrun(elapsed, p.period, "eviction");
|
||||
crate::tenant::tasks::warn_when_period_overrun(
|
||||
elapsed,
|
||||
p.period,
|
||||
BackgroundLoopKind::Eviction,
|
||||
);
|
||||
crate::metrics::EVICTION_ITERATION_DURATION
|
||||
.get_metric_with_label_values(&[
|
||||
&format!("{}", p.period.as_secs()),
|
||||
@@ -150,6 +155,17 @@ impl Timeline {
|
||||
) -> ControlFlow<()> {
|
||||
let now = SystemTime::now();
|
||||
|
||||
let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit(
|
||||
BackgroundLoopKind::Eviction,
|
||||
ctx,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(permit) => permit,
|
||||
Err(RateLimitError::Cancelled) => return ControlFlow::Break(()),
|
||||
};
|
||||
|
||||
// If we evict layers but keep cached values derived from those layers, then
|
||||
// we face a storm of on-demand downloads after pageserver restart.
|
||||
// The reason is that the restart empties the caches, and so, the values
|
||||
|
||||
Reference in New Issue
Block a user