From 3128eeff019f2c1d0fb3d51dbe430d7c5533162e Mon Sep 17 00:00:00 2001 From: Felix Prasanna <91577249+fprasx@users.noreply.github.com> Date: Thu, 24 Aug 2023 15:54:37 -0400 Subject: [PATCH] compute_ctl: add vm-monitor (#4946) Co-authored-by: Em Sharnoff --- .github/workflows/build_and_test.yml | 1 - Cargo.lock | 217 +++++++-- Cargo.toml | 8 +- compute_tools/Cargo.toml | 3 + compute_tools/src/bin/compute_ctl.rs | 91 +++- libs/vm_monitor/Cargo.toml | 31 ++ libs/vm_monitor/README.md | 18 + libs/vm_monitor/src/bin/monitor.rs | 33 ++ libs/vm_monitor/src/cgroup.rs | 691 +++++++++++++++++++++++++++ libs/vm_monitor/src/dispatcher.rs | 155 ++++++ libs/vm_monitor/src/filecache.rs | 306 ++++++++++++ libs/vm_monitor/src/lib.rs | 205 ++++++++ libs/vm_monitor/src/protocol.rs | 241 ++++++++++ libs/vm_monitor/src/runner.rs | 456 ++++++++++++++++++ workspace_hack/Cargo.toml | 4 +- 15 files changed, 2416 insertions(+), 44 deletions(-) create mode 100644 libs/vm_monitor/Cargo.toml create mode 100644 libs/vm_monitor/README.md create mode 100644 libs/vm_monitor/src/bin/monitor.rs create mode 100644 libs/vm_monitor/src/cgroup.rs create mode 100644 libs/vm_monitor/src/dispatcher.rs create mode 100644 libs/vm_monitor/src/filecache.rs create mode 100644 libs/vm_monitor/src/lib.rs create mode 100644 libs/vm_monitor/src/protocol.rs create mode 100644 libs/vm_monitor/src/runner.rs diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 33472e9aa4..b39d008664 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -775,7 +775,6 @@ jobs: run: | ./vm-builder \ -enable-file-cache \ - -enable-monitor \ -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} \ -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} diff --git a/Cargo.lock b/Cargo.lock index 5de1164487..c78b7605b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -190,7 +190,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -201,7 +201,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -553,12 +553,13 @@ dependencies = [ [[package]] name = "axum" -version = "0.6.18" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8175979259124331c1d7bf6586ee7e0da434155e4b2d48ec2c8386281d8df39" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", + "base64 0.21.1", "bitflags", "bytes", "futures-util", @@ -573,7 +574,13 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", "sync_wrapper", + "tokio", + "tokio-tungstenite 0.20.0", "tower", "tower-layer", "tower-service", @@ -673,7 +680,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.16", + "syn 2.0.28", "which", ] @@ -765,6 +772,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cgroups-rs" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb3af90c8d48ad5f432d8afb521b5b40c2a2fce46dd60e05912de51c47fba64" +dependencies = [ + "libc", + "log", + "nix 0.25.1", + "regex", + "thiserror", +] + [[package]] name = "chrono" version = "0.4.24" @@ -849,7 +869,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -907,6 +927,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-compression", + "cfg-if", "chrono", "clap", "compute_api", @@ -925,6 +946,7 @@ dependencies = [ "tar", "tokio", "tokio-postgres", + "tokio-util", "toml_edit", "tracing", "tracing-opentelemetry", @@ -932,6 +954,7 @@ dependencies = [ "tracing-utils", "url", "utils", + "vm_monitor", "workspace_hack", "zstd", ] @@ -978,7 +1001,7 @@ dependencies = [ "comfy-table", "compute_api", "git-version", - "nix", + "nix 0.26.2", "once_cell", "pageserver_api", "postgres", @@ -1184,7 +1207,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -1195,7 +1218,7 @@ checksum = "29a358ff9f12ec09c3e61fef9b5a9902623a695a46a917b07f269bff1445611a" dependencies = [ "darling_core", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -1260,7 +1283,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -1316,7 +1339,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -1512,7 +1535,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -1863,8 +1886,8 @@ dependencies = [ "hyper", "pin-project", "tokio", - "tokio-tungstenite", - "tungstenite", + "tokio-tungstenite 0.18.0", + "tungstenite 0.18.0", ] [[package]] @@ -1928,6 +1951,19 @@ dependencies = [ "libc", ] +[[package]] +name = "inotify" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd168d97690d0b8c412d6b6c10360277f4d7ee495c5d0d5d5fe0854923255cc" +dependencies = [ + "bitflags", + "futures-core", + "inotify-sys", + "libc", + "tokio", +] + [[package]] name = "inotify-sys" version = "0.1.5" @@ -2251,6 +2287,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "nix" version = "0.26.2" @@ -2285,7 +2333,7 @@ dependencies = [ "crossbeam-channel", "filetime", "fsevent-sys", - "inotify", + "inotify 0.9.6", "kqueue", "libc", "mio", @@ -2293,6 +2341,15 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.3" @@ -2386,7 +2443,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -2573,7 +2630,7 @@ dependencies = [ "hyper", "itertools", "metrics", - "nix", + "nix 0.26.2", "num-traits", "num_cpus", "once_cell", @@ -2774,7 +2831,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -2971,7 +3028,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b69d39aab54d069e7f2fe8cb970493e7834601ca2d8c65fd7bbd183578080d1" dependencies = [ "proc-macro2", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -2982,9 +3039,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.64" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -3146,9 +3203,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.27" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f4f29d145265ec1c483c7c654450edde0bfe043d3938d6972630663356d9500" +checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" dependencies = [ "proc-macro2", ] @@ -3799,22 +3856,22 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.163" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2" +checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.163" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" +checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -3828,6 +3885,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_spanned" version = "0.6.2" @@ -3874,7 +3941,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -4112,9 +4179,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.16" +version = "2.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01" +checksum = "04361975b3f5e348b2189d8dc55bc942f278b2d482a6a0365de5bdd62d351567" dependencies = [ "proc-macro2", "quote", @@ -4139,6 +4206,21 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sysinfo" +version = "0.29.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "165d6d8539689e3d3bc8b98ac59541e1f21c7de7c85d60dc80e43ae0ed2113db" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + [[package]] name = "tar" version = "0.4.40" @@ -4229,7 +4311,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -4344,7 +4426,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -4450,7 +4532,19 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.18.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.20.0", ] [[package]] @@ -4642,7 +4736,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", ] [[package]] @@ -4771,6 +4865,25 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.16.0" @@ -4898,7 +5011,7 @@ dependencies = [ "hyper", "jsonwebtoken", "metrics", - "nix", + "nix 0.26.2", "once_cell", "pin-project-lite", "pq_proto", @@ -4953,6 +5066,28 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vm_monitor" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "cgroups-rs", + "clap", + "futures", + "inotify 0.10.2", + "serde", + "serde_json", + "sysinfo", + "tokio", + "tokio-postgres", + "tokio-stream", + "tokio-util", + "tracing", + "tracing-subscriber", + "workspace_hack", +] + [[package]] name = "vsimd" version = "0.8.0" @@ -5023,7 +5158,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", "wasm-bindgen-shared", ] @@ -5057,7 +5192,7 @@ checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.16", + "syn 2.0.28", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5342,12 +5477,14 @@ name = "workspace_hack" version = "0.1.0" dependencies = [ "anyhow", + "axum", "bytes", "cc", "chrono", "clap", "clap_builder", "crossbeam-utils", + "digest", "either", "fail", "futures", @@ -5356,6 +5493,7 @@ dependencies = [ "futures-executor", "futures-sink", "futures-util", + "hyper", "itertools", "libc", "log", @@ -5377,7 +5515,7 @@ dependencies = [ "smallvec", "socket2 0.4.9", "syn 1.0.109", - "syn 2.0.16", + "syn 2.0.28", "tokio", "tokio-rustls 0.23.4", "tokio-util", @@ -5386,7 +5524,6 @@ dependencies = [ "tower", "tracing", "tracing-core", - "tracing-subscriber", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 7c916ad61d..f38e67f487 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "libs/remote_storage", "libs/tracing-utils", "libs/postgres_ffi/wal_craft", + "libs/vm_monitor", ] [workspace.package] @@ -41,12 +42,14 @@ aws-sdk-s3 = "0.27" aws-smithy-http = "0.55" aws-credential-types = "0.55" aws-types = "0.55" +axum = { version = "0.6.20", features = ["ws"] } base64 = "0.13.0" bincode = "1.3" bindgen = "0.65" bstr = "1.0" byteorder = "1.4" bytes = "1.0" +cfg-if = "1.0.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } clap = { version = "4.0", features = ["derive"] } close_fds = "0.3.2" @@ -74,6 +77,7 @@ humantime = "2.1" humantime-serde = "1.1.1" hyper = "0.14" hyper-tungstenite = "0.9" +inotify = "0.10.2" itertools = "0.10" jsonwebtoken = "8" libc = "0.2" @@ -105,6 +109,7 @@ rustls = "0.20" rustls-pemfile = "1" rustls-split = "0.3" scopeguard = "1.1" +sysinfo = "0.29.2" sentry = { version = "0.30", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1" @@ -134,7 +139,7 @@ tonic = {version = "0.9", features = ["tls", "tls-roots"]} tracing = "0.1" tracing-error = "0.2.0" tracing-opentelemetry = "0.19.0" -tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter"] } +tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json"] } url = "2.2" uuid = { version = "1.2", features = ["v4", "serde"] } walkdir = "2.3.2" @@ -170,6 +175,7 @@ storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main br tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" } tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" } utils = { version = "0.1", path = "./libs/utils/" } +vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" } ## Common library dependency workspace_hack = { version = "0.1", path = "./workspace_hack/" } diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 08dcc21c7a..6c93befaa3 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true anyhow.workspace = true async-compression.workspace = true chrono.workspace = true +cfg-if.workspace = true clap.workspace = true flate2.workspace = true futures.workspace = true @@ -23,6 +24,7 @@ tar.workspace = true reqwest = { workspace = true, features = ["json"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tokio-postgres.workspace = true +tokio-util.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true tracing-subscriber.workspace = true @@ -34,4 +36,5 @@ utils.workspace = true workspace_hack.workspace = true toml_edit.workspace = true remote_storage = { version = "0.1", path = "../libs/remote_storage/" } +vm_monitor = { version = "0.1", path = "../libs/vm_monitor/" } zstd = "0.12.4" diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 343bb41d3b..ba8f4ab2b9 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -35,7 +35,6 @@ //! use std::collections::HashMap; use std::fs::File; -use std::panic; use std::path::Path; use std::process::exit; use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock}; @@ -271,6 +270,55 @@ fn main() -> Result<()> { } }; + // Start the vm-monitor if directed to. The vm-monitor only runs on linux + // because it requires cgroups. + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + use std::env; + use tokio_util::sync::CancellationToken; + use tracing::warn; + let vm_monitor_addr = matches.get_one::("vm-monitor-addr"); + let cgroup = matches.get_one::("filecache-connstr"); + let file_cache_connstr = matches.get_one::("cgroup"); + + // Only make a runtime if we need to. + // Note: it seems like you can make a runtime in an inner scope and + // if you start a task in it it won't be dropped. However, make it + // in the outermost scope just to be safe. + let rt = match (env::var_os("AUTOSCALING"), vm_monitor_addr) { + (None, None) => None, + (None, Some(_)) => { + warn!("--vm-monitor-addr option set but AUTOSCALING env var not present"); + None + } + (Some(_), None) => { + panic!("AUTOSCALING env var present but --vm-monitor-addr option not set") + } + (Some(_), Some(_)) => Some( + tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .enable_all() + .build() + .expect("failed to create tokio runtime for monitor"), + ), + }; + + // This token is used internally by the monitor to clean up all threads + let token = CancellationToken::new(); + + let vm_monitor = &rt.as_ref().map(|rt| { + rt.spawn(vm_monitor::start( + Box::leak(Box::new(vm_monitor::Args { + cgroup: cgroup.cloned(), + pgconnstr: file_cache_connstr.cloned(), + addr: vm_monitor_addr.cloned().unwrap(), + })), + token.clone(), + )) + }); + } + } + // Wait for the child Postgres process forever. In this state Ctrl+C will // propagate to Postgres and it will be shut down as well. if let Some(mut pg) = pg { @@ -284,6 +332,24 @@ fn main() -> Result<()> { exit_code = ecode.code() } + // Terminate the vm_monitor so it releases the file watcher on + // /sys/fs/cgroup/neon-postgres. + // Note: the vm-monitor only runs on linux because it requires cgroups. + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + if let Some(handle) = vm_monitor { + // Kills all threads spawned by the monitor + token.cancel(); + // Kills the actual task running the monitor + handle.abort(); + + // If handle is some, rt must have been used to produce it, and + // hence is also some + rt.unwrap().shutdown_timeout(Duration::from_secs(2)); + } + } + } + // Maybe sync safekeepers again, to speed up next startup let compute_state = compute.state.lock().unwrap().clone(); let pspec = compute_state.pspec.as_ref().expect("spec must be set"); @@ -393,6 +459,29 @@ fn cli() -> clap::Command { .long("remote-ext-config") .value_name("REMOTE_EXT_CONFIG"), ) + // TODO(fprasx): we currently have default arguments because the cloud PR + // to pass them in hasn't been merged yet. We should get rid of them once + // the PR is merged. + .arg( + Arg::new("vm-monitor-addr") + .long("vm-monitor-addr") + .default_value("127.0.0.1:10369") + .value_name("VM_MONITOR_ADDR"), + ) + .arg( + Arg::new("cgroup") + .long("cgroup") + .default_value("neon-postgres") + .value_name("CGROUP"), + ) + .arg( + Arg::new("filecache-connstr") + .long("filecache-connstr") + .default_value( + "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable", + ) + .value_name("FILECACHE_CONNSTR"), + ) } #[test] diff --git a/libs/vm_monitor/Cargo.toml b/libs/vm_monitor/Cargo.toml new file mode 100644 index 0000000000..26b976830a --- /dev/null +++ b/libs/vm_monitor/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "vm_monitor" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[[bin]] +name = "vm-monitor" +path = "./src/bin/monitor.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow.workspace = true +axum.workspace = true +clap.workspace = true +futures.workspace = true +inotify.workspace = true +serde.workspace = true +serde_json.workspace = true +sysinfo.workspace = true +tokio.workspace = true +tokio-postgres.workspace = true +tokio-stream.workspace = true +tokio-util.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +workspace_hack = { version = "0.1", path = "../../workspace_hack" } + +[target.'cfg(target_os = "linux")'.dependencies] +cgroups-rs = "0.3.3" diff --git a/libs/vm_monitor/README.md b/libs/vm_monitor/README.md new file mode 100644 index 0000000000..4c5a196107 --- /dev/null +++ b/libs/vm_monitor/README.md @@ -0,0 +1,18 @@ +# `vm-monitor` + +The `vm-monitor` (or just monitor) is a core component of the autoscaling system, +along with the `autoscale-scheduler` and the `autoscaler-agent`s. The monitor has +two primary roles: 1) notifying agents when immediate upscaling is necessary due +to memory conditions and 2) managing Postgres' file cache and a cgroup to carry +out upscaling and downscaling decisions. + +## More on scaling + +We scale CPU and memory using NeonVM, our in-house QEMU tool for use with Kubernetes. +To control thresholds for receiving memory usage notifications, we start Postgres +in the `neon-postgres` cgroup and set its `memory.{max,high}`. + +* See also: [`neondatabase/autoscaling`](https://github.com/neondatabase/autoscaling/) +* See also: [`neondatabase/vm-monitor`](https://github.com/neondatabase/vm-monitor/), +where initial development of the monitor happened. The repository is no longer +maintained but the commit history may be useful for debugging. diff --git a/libs/vm_monitor/src/bin/monitor.rs b/libs/vm_monitor/src/bin/monitor.rs new file mode 100644 index 0000000000..873618fc31 --- /dev/null +++ b/libs/vm_monitor/src/bin/monitor.rs @@ -0,0 +1,33 @@ +// We expose a standalone binary _and_ start the monitor in `compute_ctl` so that +// we can test the monitor as part of the entire autoscaling system in +// neondatabase/autoscaling. +// +// The monitor was previously started by vm-builder, and for testing purposes, +// we can mimic that setup with this binary. + +#[cfg(target_os = "linux")] +#[tokio::main] +async fn main() -> anyhow::Result<()> { + use clap::Parser; + use tokio_util::sync::CancellationToken; + use tracing_subscriber::EnvFilter; + use vm_monitor::Args; + + let subscriber = tracing_subscriber::fmt::Subscriber::builder() + .json() + .with_file(true) + .with_line_number(true) + .with_span_list(true) + .with_env_filter(EnvFilter::from_default_env()) + .finish(); + tracing::subscriber::set_global_default(subscriber)?; + + let args: &'static Args = Box::leak(Box::new(Args::parse())); + let token = CancellationToken::new(); + vm_monitor::start(args, token).await +} + +#[cfg(not(target_os = "linux"))] +fn main() { + panic!("the monitor requires cgroups, which are only available on linux") +} diff --git a/libs/vm_monitor/src/cgroup.rs b/libs/vm_monitor/src/cgroup.rs new file mode 100644 index 0000000000..84a1a07b9d --- /dev/null +++ b/libs/vm_monitor/src/cgroup.rs @@ -0,0 +1,691 @@ +use std::{ + fmt::{Debug, Display}, + fs, + pin::pin, + sync::atomic::{AtomicU64, Ordering}, +}; + +use anyhow::{anyhow, bail, Context}; +use cgroups_rs::{ + freezer::FreezerController, + hierarchies::{self, is_cgroup2_unified_mode, UNIFIED_MOUNTPOINT}, + memory::MemController, + MaxValue, + Subsystem::{Freezer, Mem}, +}; +use inotify::{EventStream, Inotify, WatchMask}; +use tokio::sync::mpsc::{self, error::TryRecvError}; +use tokio::time::{Duration, Instant}; +use tokio_stream::{Stream, StreamExt}; +use tracing::{info, warn}; + +use crate::protocol::Resources; +use crate::MiB; + +/// Monotonically increasing counter of the number of memory.high events +/// the cgroup has experienced. +/// +/// We use this to determine if a modification to the `memory.events` file actually +/// changed the `high` field. If not, we don't care about the change. When we +/// read the file, we check the `high` field in the file against `MEMORY_EVENT_COUNT` +/// to see if it changed since last time. +pub static MEMORY_EVENT_COUNT: AtomicU64 = AtomicU64::new(0); + +/// Monotonically increasing counter that gives each cgroup event a unique id. +/// +/// This allows us to answer questions like "did this upscale arrive before this +/// memory.high?". This static is also used by the `Sequenced` type to "tag" values +/// with a sequence number. As such, prefer to used the `Sequenced` type rather +/// than this static directly. +static EVENT_SEQUENCE_NUMBER: AtomicU64 = AtomicU64::new(0); + +/// A memory event type reported in memory.events. +#[derive(Debug, Eq, PartialEq, Copy, Clone)] +pub enum MemoryEvent { + Low, + High, + Max, + Oom, + OomKill, + OomGroupKill, +} + +impl MemoryEvent { + fn as_str(&self) -> &str { + match self { + MemoryEvent::Low => "low", + MemoryEvent::High => "high", + MemoryEvent::Max => "max", + MemoryEvent::Oom => "oom", + MemoryEvent::OomKill => "oom_kill", + MemoryEvent::OomGroupKill => "oom_group_kill", + } + } +} + +impl Display for MemoryEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Configuration for a `CgroupWatcher` +#[derive(Debug, Clone)] +pub struct Config { + // The target difference between the total memory reserved for the cgroup + // and the value of the cgroup's memory.high. + // + // In other words, memory.high + oom_buffer_bytes will equal the total memory that the cgroup may + // use (equal to system memory, minus whatever's taken out for the file cache). + oom_buffer_bytes: u64, + + // The amount of memory, in bytes, below a proposed new value for + // memory.high that the cgroup's memory usage must be for us to downscale + // + // In other words, we can downscale only when: + // + // memory.current + memory_high_buffer_bytes < (proposed) memory.high + // + // TODO: there's some minor issues with this approach -- in particular, that we might have + // memory in use by the kernel's page cache that we're actually ok with getting rid of. + pub(crate) memory_high_buffer_bytes: u64, + + // The maximum duration, in milliseconds, that we're allowed to pause + // the cgroup for while waiting for the autoscaler-agent to upscale us + max_upscale_wait: Duration, + + // The required minimum time, in milliseconds, that we must wait before re-freezing + // the cgroup while waiting for the autoscaler-agent to upscale us. + do_not_freeze_more_often_than: Duration, + + // The amount of memory, in bytes, that we should periodically increase memory.high + // by while waiting for the autoscaler-agent to upscale us. + // + // This exists to avoid the excessive throttling that happens when a cgroup is above its + // memory.high for too long. See more here: + // https://github.com/neondatabase/autoscaling/issues/44#issuecomment-1522487217 + memory_high_increase_by_bytes: u64, + + // The period, in milliseconds, at which we should repeatedly increase the value + // of the cgroup's memory.high while we're waiting on upscaling and memory.high + // is still being hit. + // + // Technically speaking, this actually serves as a rate limit to moderate responding to + // memory.high events, but these are roughly equivalent if the process is still allocating + // memory. + memory_high_increase_every: Duration, +} + +impl Config { + /// Calculate the new value for the cgroups memory.high based on system memory + pub fn calculate_memory_high_value(&self, total_system_mem: u64) -> u64 { + total_system_mem.saturating_sub(self.oom_buffer_bytes) + } +} + +impl Default for Config { + fn default() -> Self { + Self { + oom_buffer_bytes: 100 * MiB, + memory_high_buffer_bytes: 100 * MiB, + // while waiting for upscale, don't freeze for more than 20ms every 1s + max_upscale_wait: Duration::from_millis(20), + do_not_freeze_more_often_than: Duration::from_millis(1000), + // while waiting for upscale, increase memory.high by 10MiB every 25ms + memory_high_increase_by_bytes: 10 * MiB, + memory_high_increase_every: Duration::from_millis(25), + } + } +} + +/// Used to represent data that is associated with a certain point in time, such +/// as an upscale request or memory.high event. +/// +/// Internally, creating a `Sequenced` uses a static atomic counter to obtain +/// a unique sequence number. Sequence numbers are monotonically increasing, +/// allowing us to answer questions like "did this upscale happen after this +/// memory.high event?" by comparing the sequence numbers of the two events. +#[derive(Debug, Clone)] +pub struct Sequenced { + seqnum: u64, + data: T, +} + +impl Sequenced { + pub fn new(data: T) -> Self { + Self { + seqnum: EVENT_SEQUENCE_NUMBER.fetch_add(1, Ordering::AcqRel), + data, + } + } +} + +/// Responds to `MonitorEvents` to manage the cgroup: preventing it from being +/// OOM killed or throttling. +/// +/// The `CgroupWatcher` primarily achieves this by reading from a stream of +/// `MonitorEvent`s. See `main_signals_loop` for details on how to keep the +/// cgroup happy. +#[derive(Debug)] +pub struct CgroupWatcher { + pub config: Config, + + /// The sequence number of the last upscale. + /// + /// If we receive a memory.high event that has a _lower_ sequence number than + /// `last_upscale_seqnum`, then we know it occured before the upscale, and we + /// can safely ignore it. + /// + /// Note: Like the `events` field, this doesn't _need_ interior mutability but we + /// use it anyways so that methods take `&self`, not `&mut self`. + last_upscale_seqnum: AtomicU64, + + /// A channel on which we send messages to request upscale from the dispatcher. + upscale_requester: mpsc::Sender<()>, + + /// The actual cgroup we are watching and managing. + cgroup: cgroups_rs::Cgroup, +} + +/// Read memory.events for the desired event type. +/// +/// `path` specifies the path to the desired `memory.events` file. +/// For more info, see the `memory.events` section of the [kernel docs] +/// +fn get_event_count(path: &str, event: MemoryEvent) -> anyhow::Result { + let contents = fs::read_to_string(path) + .with_context(|| format!("failed to read memory.events from {path}"))?; + + // Then contents of the file look like: + // low 42 + // high 101 + // ... + contents + .lines() + .filter_map(|s| s.split_once(' ')) + .find(|(e, _)| *e == event.as_str()) + .ok_or_else(|| anyhow!("failed to find entry for memory.{event} events in {path}")) + .and_then(|(_, count)| { + count + .parse::() + .with_context(|| format!("failed to parse memory.{event} as u64")) + }) +} + +/// Create an event stream that produces events whenever the file at the provided +/// path is modified. +fn create_file_watcher(path: &str) -> anyhow::Result> { + info!("creating file watcher for {path}"); + let inotify = Inotify::init().context("failed to initialize file watcher")?; + inotify + .watches() + .add(path, WatchMask::MODIFY) + .with_context(|| format!("failed to start watching {path}"))?; + inotify + // The inotify docs use [0u8; 1024] so we'll just copy them. We only need + // to store one event at a time - if the event gets written over, that's + // ok. We still see that there is an event. For more information, see: + // https://man7.org/linux/man-pages/man7/inotify.7.html + .into_event_stream([0u8; 1024]) + .context("failed to start inotify event stream") +} + +impl CgroupWatcher { + /// Create a new `CgroupWatcher`. + #[tracing::instrument(skip_all, fields(%name))] + pub fn new( + name: String, + // A channel on which to send upscale requests + upscale_requester: mpsc::Sender<()>, + ) -> anyhow::Result<(Self, impl Stream>)> { + // TODO: clarify exactly why we need v2 + // Make sure cgroups v2 (aka unified) are supported + if !is_cgroup2_unified_mode() { + anyhow::bail!("cgroups v2 not supported"); + } + let cgroup = cgroups_rs::Cgroup::load(hierarchies::auto(), &name); + + // Start monitoring the cgroup for memory events. In general, for + // cgroups v2 (aka unified), metrics are reported in files like + // > `/sys/fs/cgroup/{name}/{metric}` + // We are looking for `memory.high` events, which are stored in the + // file `memory.events`. For more info, see the `memory.events` section + // of https://docs.kernel.org/admin-guide/cgroup-v2.html#memory-interface-files + let path = format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name); + let memory_events = create_file_watcher(&path) + .with_context(|| format!("failed to create event watcher for {path}"))? + // This would be nice with with .inspect_err followed by .ok + .filter_map(move |_| match get_event_count(&path, MemoryEvent::High) { + Ok(high) => Some(high), + Err(error) => { + // TODO: Might want to just panic here + warn!(?error, "failed to read high events count from {}", &path); + None + } + }) + // Only report the event if the memory.high count increased + .filter_map(|high| { + if MEMORY_EVENT_COUNT.fetch_max(high, Ordering::AcqRel) < high { + Some(high) + } else { + None + } + }) + .map(Sequenced::new); + + let initial_count = get_event_count( + &format!("{}/{}/memory.events", UNIFIED_MOUNTPOINT, &name), + MemoryEvent::High, + )?; + + info!(initial_count, "initial memory.high event count"); + + // Hard update `MEMORY_EVENT_COUNT` since there could have been processes + // running in the cgroup before that caused it to be non-zero. + MEMORY_EVENT_COUNT.fetch_max(initial_count, Ordering::AcqRel); + + Ok(( + Self { + cgroup, + upscale_requester, + last_upscale_seqnum: AtomicU64::new(0), + config: Default::default(), + }, + memory_events, + )) + } + + /// The entrypoint for the `CgroupWatcher`. + #[tracing::instrument(skip_all)] + pub async fn watch( + &self, + // These are ~dependency injected~ (fancy, I know) because this function + // should never return. + // -> therefore: when we tokio::spawn it, we don't await the JoinHandle. + // -> therefore: if we want to stick it in an Arc so many threads can access + // it, methods can never take mutable access. + // - note: we use the Arc strategy so that a) we can call this function + // right here and b) the runner can call the set/get_memory methods + // -> since calling recv() on a tokio::sync::mpsc::Receiver takes &mut self, + // we just pass them in here instead of holding them in fields, as that + // would require this method to take &mut self. + mut upscales: mpsc::Receiver>, + events: E, + ) -> anyhow::Result<()> + where + E: Stream>, + { + // There are several actions might do when receiving a `memory.high`, + // such as freezing the cgroup, or increasing its `memory.high`. We don't + // want to do these things too often (because postgres needs to run, and + // we only have so much memory). These timers serve as rate limits for this. + let mut wait_to_freeze = pin!(tokio::time::sleep(Duration::ZERO)); + let mut wait_to_increase_memory_high = pin!(tokio::time::sleep(Duration::ZERO)); + let mut events = pin!(events); + + // Are we waiting to be upscaled? Could be true if we request upscale due + // to a memory.high event and it does not arrive in time. + let mut waiting_on_upscale = false; + + loop { + tokio::select! { + upscale = upscales.recv() => { + let Sequenced { seqnum, data } = upscale + .context("failed to listen on upscale notification channel")?; + self.last_upscale_seqnum.store(seqnum, Ordering::Release); + info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale"); + } + event = events.next() => { + let Some(Sequenced { seqnum, .. }) = event else { + bail!("failed to listen for memory.high events") + }; + // The memory.high came before our last upscale, so we consider + // it resolved + if self.last_upscale_seqnum.fetch_max(seqnum, Ordering::AcqRel) > seqnum { + info!( + "received memory.high event, but it came before our last upscale -> ignoring it" + ); + continue; + } + + // The memory.high came after our latest upscale. We don't + // want to do anything yet, so peek the next event in hopes + // that it's an upscale. + if let Some(upscale_num) = self + .upscaled(&mut upscales) + .context("failed to check if we were upscaled")? + { + if upscale_num > seqnum { + info!( + "received memory.high event, but it came before our last upscale -> ignoring it" + ); + continue; + } + } + + // If it's been long enough since we last froze, freeze the + // cgroup and request upscale + if wait_to_freeze.is_elapsed() { + info!("received memory.high event -> requesting upscale"); + waiting_on_upscale = self + .handle_memory_high_event(&mut upscales) + .await + .context("failed to handle upscale")?; + wait_to_freeze + .as_mut() + .reset(Instant::now() + self.config.do_not_freeze_more_often_than); + continue; + } + + // Ok, we can't freeze, just request upscale + if !waiting_on_upscale { + info!("received memory.high event, but too soon to refreeze -> requesting upscale"); + + // Make check to make sure we haven't been upscaled in the + // meantine (can happen if the agent independently decides + // to upscale us again) + if self + .upscaled(&mut upscales) + .context("failed to check if we were upscaled")? + .is_some() + { + info!("no need to request upscaling because we got upscaled"); + continue; + } + self.upscale_requester + .send(()) + .await + .context("failed to request upscale")?; + continue; + } + + // Shoot, we can't freeze or and we're still waiting on upscale, + // increase memory.high to reduce throttling + if wait_to_increase_memory_high.is_elapsed() { + info!( + "received memory.high event, \ + but too soon to refreeze and already requested upscale \ + -> increasing memory.high" + ); + + // Make check to make sure we haven't been upscaled in the + // meantine (can happen if the agent independently decides + // to upscale us again) + if self + .upscaled(&mut upscales) + .context("failed to check if we were upscaled")? + .is_some() + { + info!("no need to increase memory.high because got upscaled"); + continue; + } + + // Request upscale anyways (the agent will handle deduplicating + // requests) + self.upscale_requester + .send(()) + .await + .context("failed to request upscale")?; + + let memory_high = + self.get_high_bytes().context("failed to get memory.high")?; + let new_high = memory_high + self.config.memory_high_increase_by_bytes; + info!( + current_high_bytes = memory_high, + new_high_bytes = new_high, + "updating memory.high" + ); + self.set_high_bytes(new_high) + .context("failed to set memory.high")?; + wait_to_increase_memory_high + .as_mut() + .reset(Instant::now() + self.config.memory_high_increase_every) + } + + // we can't do anything + } + }; + } + } + + /// Handle a `memory.high`, returning whether we are still waiting on upscale + /// by the time the function returns. + /// + /// The general plan for handling a `memory.high` event is as follows: + /// 1. Freeze the cgroup + /// 2. Start a timer for `self.config.max_upscale_wait` + /// 3. Request upscale + /// 4. After the timer elapses or we receive upscale, thaw the cgroup. + /// 5. Return whether or not we are still waiting for upscale. If we are, + /// we'll increase the cgroups memory.high to avoid getting oom killed + #[tracing::instrument(skip_all)] + async fn handle_memory_high_event( + &self, + upscales: &mut mpsc::Receiver>, + ) -> anyhow::Result { + // Immediately freeze the cgroup before doing anything else. + info!("received memory.high event -> freezing cgroup"); + self.freeze().context("failed to freeze cgroup")?; + + // We'll use this for logging durations + let start_time = Instant::now(); + + // Await the upscale until we have to unfreeze + let timed = + tokio::time::timeout(self.config.max_upscale_wait, self.await_upscale(upscales)); + + // Request the upscale + info!( + wait = ?self.config.max_upscale_wait, + "sending request for immediate upscaling", + ); + self.upscale_requester + .send(()) + .await + .context("failed to request upscale")?; + + let waiting_on_upscale = match timed.await { + Ok(Ok(())) => { + info!(elapsed = ?start_time.elapsed(), "received upscale in time"); + false + } + // **important**: unfreeze the cgroup before ?-reporting the error + Ok(Err(e)) => { + info!("error waiting for upscale -> thawing cgroup"); + self.thaw() + .context("failed to thaw cgroup after errored waiting for upscale")?; + Err(e.context("failed to await upscale"))? + } + Err(_) => { + info!(elapsed = ?self.config.max_upscale_wait, "timed out waiting for upscale"); + true + } + }; + + info!("thawing cgroup"); + self.thaw().context("failed to thaw cgroup")?; + + Ok(waiting_on_upscale) + } + + /// Checks whether we were just upscaled, returning the upscale's sequence + /// number if so. + #[tracing::instrument(skip_all)] + fn upscaled( + &self, + upscales: &mut mpsc::Receiver>, + ) -> anyhow::Result> { + let Sequenced { seqnum, data } = match upscales.try_recv() { + Ok(upscale) => upscale, + Err(TryRecvError::Empty) => return Ok(None), + Err(TryRecvError::Disconnected) => { + bail!("upscale notification channel was disconnected") + } + }; + + // Make sure to update the last upscale sequence number + self.last_upscale_seqnum.store(seqnum, Ordering::Release); + info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale"); + Ok(Some(seqnum)) + } + + /// Await an upscale event, discarding any `memory.high` events received in + /// the process. + /// + /// This is used in `handle_memory_high_event`, where we need to listen + /// for upscales in particular so we know if we can thaw the cgroup early. + #[tracing::instrument(skip_all)] + async fn await_upscale( + &self, + upscales: &mut mpsc::Receiver>, + ) -> anyhow::Result<()> { + let Sequenced { seqnum, .. } = upscales + .recv() + .await + .context("error listening for upscales")?; + + self.last_upscale_seqnum.store(seqnum, Ordering::Release); + Ok(()) + } + + /// Get the cgroup's name. + pub fn path(&self) -> &str { + self.cgroup.path() + } +} + +/// Represents a set of limits we apply to a cgroup to control memory usage. +/// +/// Setting these values also affects the thresholds for receiving usage alerts. +#[derive(Debug)] +pub struct MemoryLimits { + high: u64, + max: u64, +} + +impl MemoryLimits { + pub fn new(high: u64, max: u64) -> Self { + Self { max, high } + } +} + +// Methods for manipulating the actual cgroup +impl CgroupWatcher { + /// Get a handle on the freezer subsystem. + fn freezer(&self) -> anyhow::Result<&FreezerController> { + if let Some(Freezer(freezer)) = self + .cgroup + .subsystems() + .iter() + .find(|sub| matches!(sub, Freezer(_))) + { + Ok(freezer) + } else { + anyhow::bail!("could not find freezer subsystem") + } + } + + /// Attempt to freeze the cgroup. + pub fn freeze(&self) -> anyhow::Result<()> { + self.freezer() + .context("failed to get freezer subsystem")? + .freeze() + .context("failed to freeze") + } + + /// Attempt to thaw the cgroup. + pub fn thaw(&self) -> anyhow::Result<()> { + self.freezer() + .context("failed to get freezer subsystem")? + .thaw() + .context("failed to thaw") + } + + /// Get a handle on the memory subsystem. + /// + /// Note: this method does not require `self.memory_update_lock` because + /// getting a handle to the subsystem does not access any of the files we + /// care about, such as memory.high and memory.events + fn memory(&self) -> anyhow::Result<&MemController> { + if let Some(Mem(memory)) = self + .cgroup + .subsystems() + .iter() + .find(|sub| matches!(sub, Mem(_))) + { + Ok(memory) + } else { + anyhow::bail!("could not find memory subsystem") + } + } + + /// Get cgroup current memory usage. + pub fn current_memory_usage(&self) -> anyhow::Result { + Ok(self + .memory() + .context("failed to get memory subsystem")? + .memory_stat() + .usage_in_bytes) + } + + /// Set cgroup memory.high threshold. + pub fn set_high_bytes(&self, bytes: u64) -> anyhow::Result<()> { + self.memory() + .context("failed to get memory subsystem")? + .set_mem(cgroups_rs::memory::SetMemory { + low: None, + high: Some(MaxValue::Value(bytes.min(i64::MAX as u64) as i64)), + min: None, + max: None, + }) + .context("failed to set memory.high") + } + + /// Set cgroup memory.high and memory.max. + pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> { + info!( + limits.high, + limits.max, + path = self.path(), + "writing new memory limits", + ); + self.memory() + .context("failed to get memory subsystem while setting memory limits")? + .set_mem(cgroups_rs::memory::SetMemory { + min: None, + low: None, + high: Some(MaxValue::Value(limits.high.min(i64::MAX as u64) as i64)), + max: Some(MaxValue::Value(limits.max.min(i64::MAX as u64) as i64)), + }) + .context("failed to set memory limits") + } + + /// Given some amount of available memory, set the desired cgroup memory limits + pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> { + let new_high = self.config.calculate_memory_high_value(available_memory); + let limits = MemoryLimits::new(new_high, available_memory); + info!( + path = self.path(), + memory = ?limits, + "setting cgroup memory", + ); + self.set_limits(&limits) + .context("failed to set cgroup memory limits")?; + Ok(()) + } + + /// Get memory.high threshold. + pub fn get_high_bytes(&self) -> anyhow::Result { + let high = self + .memory() + .context("failed to get memory subsystem while getting memory statistics")? + .get_mem() + .map(|mem| mem.high) + .context("failed to get memory statistics from subsystem")?; + match high { + Some(MaxValue::Max) => Ok(i64::MAX as u64), + Some(MaxValue::Value(high)) => Ok(high as u64), + None => anyhow::bail!("failed to read memory.high from memory subsystem"), + } + } +} diff --git a/libs/vm_monitor/src/dispatcher.rs b/libs/vm_monitor/src/dispatcher.rs new file mode 100644 index 0000000000..9d3b966700 --- /dev/null +++ b/libs/vm_monitor/src/dispatcher.rs @@ -0,0 +1,155 @@ +//! Managing the websocket connection and other signals in the monitor. +//! +//! Contains types that manage the interaction (not data interchange, see `protocol`) +//! between informant and monitor, allowing us to to process and send messages in a +//! straightforward way. The dispatcher also manages that signals that come from +//! the cgroup (requesting upscale), and the signals that go to the cgroup +//! (notifying it of upscale). + +use anyhow::{bail, Context}; +use axum::extract::ws::{Message, WebSocket}; +use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use tokio::sync::mpsc; +use tracing::info; + +use crate::cgroup::Sequenced; +use crate::protocol::{ + OutboundMsg, ProtocolRange, ProtocolResponse, ProtocolVersion, Resources, PROTOCOL_MAX_VERSION, + PROTOCOL_MIN_VERSION, +}; + +/// The central handler for all communications in the monitor. +/// +/// The dispatcher has two purposes: +/// 1. Manage the connection to the informant, sending and receiving messages. +/// 2. Communicate with the cgroup manager, notifying it when upscale is received, +/// and sending a message to the informant when the cgroup manager requests +/// upscale. +#[derive(Debug)] +pub struct Dispatcher { + /// We read informant messages of of `source` + pub(crate) source: SplitStream, + + /// We send messages to the informant through `sink` + sink: SplitSink, + + /// Used to notify the cgroup when we are upscaled. + pub(crate) notify_upscale_events: mpsc::Sender>, + + /// When the cgroup requests upscale it will send on this channel. In response + /// we send an `UpscaleRequst` to the agent. + pub(crate) request_upscale_events: mpsc::Receiver<()>, + + /// The protocol version we have agreed to use with the informant. This is negotiated + /// during the creation of the dispatcher, and should be the highest shared protocol + /// version. + /// + // NOTE: currently unused, but will almost certainly be used in the futures + // as the protocol changes + #[allow(unused)] + pub(crate) proto_version: ProtocolVersion, +} + +impl Dispatcher { + /// Creates a new dispatcher using the passed-in connection. + /// + /// Performs a negotiation with the informant to determine the highest protocol + /// version that both support. This consists of two steps: + /// 1. Wait for the informant to sent the range of protocols it supports. + /// 2. Send a protocol version that works for us as well, or an error if there + /// is no compatible version. + pub async fn new( + stream: WebSocket, + notify_upscale_events: mpsc::Sender>, + request_upscale_events: mpsc::Receiver<()>, + ) -> anyhow::Result { + let (mut sink, mut source) = stream.split(); + + // Figure out the highest protocol version we both support + info!("waiting for informant to send protocol version range"); + let Some(message) = source.next().await else { + bail!("websocket connection closed while performing protocol handshake") + }; + + let message = message.context("failed to read protocol version range off connection")?; + + let Message::Text(message_text) = message else { + // All messages should be in text form, since we don't do any + // pinging/ponging. See nhooyr/websocket's implementation and the + // informant/agent for more info + bail!("received non-text message during proocol handshake: {message:?}") + }; + + let monitor_range = ProtocolRange { + min: PROTOCOL_MIN_VERSION, + max: PROTOCOL_MAX_VERSION, + }; + + let informant_range: ProtocolRange = serde_json::from_str(&message_text) + .context("failed to deserialize protocol version range")?; + + info!(range = ?informant_range, "received protocol version range"); + + let highest_shared_version = match monitor_range.highest_shared_version(&informant_range) { + Ok(version) => { + sink.send(Message::Text( + serde_json::to_string(&ProtocolResponse::Version(version)).unwrap(), + )) + .await + .context("failed to notify informant of negotiated protocol version")?; + version + } + Err(e) => { + sink.send(Message::Text( + serde_json::to_string(&ProtocolResponse::Error(format!( + "Received protocol version range {} which does not overlap with {}", + informant_range, monitor_range + ))) + .unwrap(), + )) + .await + .context( + "failed to notify informant of no overlap between protocol version ranges", + )?; + Err(e).context("error determining suitable protocol version range")? + } + }; + + Ok(Self { + sink, + source, + notify_upscale_events, + request_upscale_events, + proto_version: highest_shared_version, + }) + } + + /// Notify the cgroup manager that we have received upscale and wait for + /// the acknowledgement. + #[tracing::instrument(skip_all, fields(?resources))] + pub async fn notify_upscale(&self, resources: Sequenced) -> anyhow::Result<()> { + self.notify_upscale_events + .send(resources) + .await + .context("failed to send resources and oneshot sender across channel") + } + + /// Send a message to the informant. + /// + /// Although this function is small, it has one major benefit: it is the only + /// way to send data accross the connection, and you can only pass in a proper + /// `MonitorMessage`. Without safeguards like this, it's easy to accidentally + /// serialize the wrong thing and send it, since `self.sink.send` will take + /// any string. + pub async fn send(&mut self, message: OutboundMsg) -> anyhow::Result<()> { + info!(?message, "sending message"); + let json = serde_json::to_string(&message).context("failed to serialize message")?; + self.sink + .send(Message::Text(json)) + .await + .context("stream error sending message") + } +} diff --git a/libs/vm_monitor/src/filecache.rs b/libs/vm_monitor/src/filecache.rs new file mode 100644 index 0000000000..33c76de35e --- /dev/null +++ b/libs/vm_monitor/src/filecache.rs @@ -0,0 +1,306 @@ +//! Logic for configuring and scaling the Postgres file cache. + +use std::num::NonZeroU64; + +use crate::MiB; +use anyhow::{anyhow, Context}; +use tokio_postgres::{types::ToSql, Client, NoTls, Row}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; + +/// Manages Postgres' file cache by keeping a connection open. +#[derive(Debug)] +pub struct FileCacheState { + client: Client, + conn_str: String, + pub(crate) config: FileCacheConfig, + + /// A token for cancelling spawned threads during shutdown. + token: CancellationToken, +} + +#[derive(Debug)] +pub struct FileCacheConfig { + /// Whether the file cache is *actually* stored in memory (e.g. by writing to + /// a tmpfs or shmem file). If true, the size of the file cache will be counted against the + /// memory available for the cgroup. + pub(crate) in_memory: bool, + + /// The size of the file cache, in terms of the size of the resource it consumes + /// (currently: only memory) + /// + /// For example, setting `resource_multipler = 0.75` gives the cache a target size of 75% of total + /// resources. + /// + /// This value must be strictly between 0 and 1. + resource_multiplier: f64, + + /// The required minimum amount of memory, in bytes, that must remain available + /// after subtracting the file cache. + /// + /// This value must be non-zero. + min_remaining_after_cache: NonZeroU64, + + /// Controls the rate of increase in the file cache's size as it grows from zero + /// (when total resources equals min_remaining_after_cache) to the desired size based on + /// `resource_multiplier`. + /// + /// A `spread_factor` of zero means that all additional resources will go to the cache until it + /// reaches the desired size. Setting `spread_factor` to N roughly means "for every 1 byte added to + /// the cache's size, N bytes are reserved for the rest of the system, until the cache gets to + /// its desired size". + /// + /// This value must be >= 0, and must retain an increase that is more than what would be given by + /// `resource_multiplier`. For example, setting `resource_multiplier` = 0.75 but `spread_factor` = 1 + /// would be invalid, because `spread_factor` would induce only 50% usage - never reaching the 75% + /// as desired by `resource_multiplier`. + /// + /// `spread_factor` is too large if `(spread_factor + 1) * resource_multiplier >= 1`. + spread_factor: f64, +} + +impl Default for FileCacheConfig { + fn default() -> Self { + Self { + in_memory: true, + // 75 % + resource_multiplier: 0.75, + // 640 MiB; (512 + 128) + min_remaining_after_cache: NonZeroU64::new(640 * MiB).unwrap(), + // ensure any increase in file cache size is split 90-10 with 10% to other memory + spread_factor: 0.1, + } + } +} + +impl FileCacheConfig { + /// Make sure fields of the config are consistent. + pub fn validate(&self) -> anyhow::Result<()> { + // Single field validity + anyhow::ensure!( + 0.0 < self.resource_multiplier && self.resource_multiplier < 1.0, + "resource_multiplier must be between 0.0 and 1.0 exclusive, got {}", + self.resource_multiplier + ); + anyhow::ensure!( + self.spread_factor >= 0.0, + "spread_factor must be >= 0, got {}", + self.spread_factor + ); + + // Check that `resource_multiplier` and `spread_factor` are valid w.r.t. each other. + // + // As shown in `calculate_cache_size`, we have two lines resulting from `resource_multiplier` and + // `spread_factor`, respectively. They are: + // + // `total` `min_remaining_after_cache` + // size = ————————————————————— - ————————————————————————————— + // `spread_factor` + 1 `spread_factor` + 1 + // + // and + // + // size = `resource_multiplier` × total + // + // .. where `total` is the total resources. These are isomorphic to the typical 'y = mx + b' + // form, with y = "size" and x = "total". + // + // These lines intersect at: + // + // `min_remaining_after_cache` + // ——————————————————————————————————————————————————— + // 1 - `resource_multiplier` × (`spread_factor` + 1) + // + // We want to ensure that this value (a) exists, and (b) is >= `min_remaining_after_cache`. This is + // guaranteed when '`resource_multiplier` × (`spread_factor` + 1)' is less than 1. + // (We also need it to be >= 0, but that's already guaranteed.) + + let intersect_factor = self.resource_multiplier * (self.spread_factor + 1.0); + anyhow::ensure!( + intersect_factor < 1.0, + "incompatible resource_multipler and spread_factor" + ); + Ok(()) + } + + /// Calculate the desired size of the cache, given the total memory + pub fn calculate_cache_size(&self, total: u64) -> u64 { + // *Note*: all units are in bytes, until the very last line. + let available = total.saturating_sub(self.min_remaining_after_cache.get()); + if available == 0 { + return 0; + } + + // Conversions to ensure we don't overflow from floating-point ops + let size_from_spread = + 0_i64.max((available as f64 / (1.0 + self.spread_factor)) as i64) as u64; + + let size_from_normal = (total as f64 * self.resource_multiplier) as u64; + + let byte_size = size_from_spread.min(size_from_normal); + + // The file cache operates in units of mebibytes, so the sizes we produce should + // be rounded to a mebibyte. We round down to be conservative. + byte_size / MiB * MiB + } +} + +impl FileCacheState { + /// Connect to the file cache. + #[tracing::instrument(skip_all, fields(%conn_str, ?config))] + pub async fn new( + conn_str: &str, + config: FileCacheConfig, + token: CancellationToken, + ) -> anyhow::Result { + config.validate().context("file cache config is invalid")?; + + info!(conn_str, "connecting to Postgres file cache"); + let client = FileCacheState::connect(conn_str, token.clone()) + .await + .context("failed to connect to postgres file cache")?; + + let conn_str = conn_str.to_string(); + Ok(Self { + client, + config, + conn_str, + token, + }) + } + + /// Connect to Postgres. + /// + /// Aborts the spawned thread if the kill signal is received. This is not + /// a method as it is called in [`FileCacheState::new`]. + #[tracing::instrument(skip_all, fields(%conn_str))] + async fn connect(conn_str: &str, token: CancellationToken) -> anyhow::Result { + let (client, conn) = tokio_postgres::connect(conn_str, NoTls) + .await + .context("failed to connect to pg client")?; + + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. See tokio-postgres docs. + crate::spawn_with_cancel( + token, + |res| { + if let Err(error) = res { + error!(%error, "postgres error") + } + }, + conn, + ); + + Ok(client) + } + + /// Execute a query with a retry if necessary. + /// + /// If the initial query fails, we restart the database connection and attempt + /// if again. + #[tracing::instrument(skip_all, fields(%statement))] + pub async fn query_with_retry( + &mut self, + statement: &str, + params: &[&(dyn ToSql + Sync)], + ) -> anyhow::Result> { + match self + .client + .query(statement, params) + .await + .context("failed to execute query") + { + Ok(rows) => Ok(rows), + Err(e) => { + error!(error = ?e, "postgres error: {e} -> retrying"); + + let client = FileCacheState::connect(&self.conn_str, self.token.clone()) + .await + .context("failed to connect to postgres file cache")?; + info!("successfully reconnected to postgres client"); + + // Replace the old client and attempt the query with the new one + self.client = client; + self.client + .query(statement, params) + .await + .context("failed to execute query a second time") + } + } + } + + /// Get the current size of the file cache. + #[tracing::instrument(skip_all)] + pub async fn get_file_cache_size(&mut self) -> anyhow::Result { + self.query_with_retry( + // The file cache GUC variable is in MiB, but the conversion with + // pg_size_bytes means that the end result we get is in bytes. + "SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit'));", + &[], + ) + .await + .context("failed to query pg for file cache size")? + .first() + .ok_or_else(|| anyhow!("file cache size query returned no rows"))? + // pg_size_bytes returns a bigint which is the same as an i64. + .try_get::<_, i64>(0) + // Since the size of the table is not negative, the cast is sound. + .map(|bytes| bytes as u64) + .context("failed to extract file cache size from query result") + } + + /// Attempt to set the file cache size, returning the size it was actually + /// set to. + #[tracing::instrument(skip_all, fields(%num_bytes))] + pub async fn set_file_cache_size(&mut self, num_bytes: u64) -> anyhow::Result { + let max_bytes = self + // The file cache GUC variable is in MiB, but the conversion with pg_size_bytes + // means that the end result we get is in bytes. + .query_with_retry( + "SELECT pg_size_bytes(current_setting('neon.max_file_cache_size'));", + &[], + ) + .await + .context("failed to query pg for max file cache size")? + .first() + .ok_or_else(|| anyhow!("max file cache size query returned no rows"))? + .try_get::<_, i64>(0) + .map(|bytes| bytes as u64) + .context("failed to extract max file cache size from query result")?; + + let max_mb = max_bytes / MiB; + let num_mb = (num_bytes / MiB).max(max_mb); + + let capped = if num_bytes > max_bytes { + " (capped by maximum size)" + } else { + "" + }; + + info!( + size = num_mb, + max = max_mb, + "updating file cache size {capped}", + ); + + // note: even though the normal ways to get the cache size produce values with trailing "MB" + // (hence why we call pg_size_bytes in `get_file_cache_size`'s query), the format + // it expects to set the value is "integer number of MB" without trailing units. + // For some reason, this *really* wasn't working with normal arguments, so that's + // why we're constructing the query here. + self.client + .query( + &format!("ALTER SYSTEM SET neon.file_cache_size_limit = {};", num_mb), + &[], + ) + .await + .context("failed to change file cache size limit")?; + + // must use pg_reload_conf to have the settings change take effect + self.client + .execute("SELECT pg_reload_conf();", &[]) + .await + .context("failed to reload config")?; + + Ok(num_mb * MiB) + } +} diff --git a/libs/vm_monitor/src/lib.rs b/libs/vm_monitor/src/lib.rs new file mode 100644 index 0000000000..da5b450b66 --- /dev/null +++ b/libs/vm_monitor/src/lib.rs @@ -0,0 +1,205 @@ +#![cfg(target_os = "linux")] + +use anyhow::Context; +use axum::{ + extract::{ws::WebSocket, State, WebSocketUpgrade}, + response::Response, +}; +use axum::{routing::get, Router, Server}; +use clap::Parser; +use futures::Future; +use std::{fmt::Debug, time::Duration}; +use sysinfo::{RefreshKind, System, SystemExt}; +use tokio::{sync::broadcast, task::JoinHandle}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; + +use runner::Runner; + +// Code that interfaces with agent +pub mod dispatcher; +pub mod protocol; + +pub mod cgroup; +pub mod filecache; +pub mod runner; + +/// The vm-monitor is an autoscaling component started by compute_ctl. +/// +/// It carries out autoscaling decisions (upscaling/downscaling) and responds to +/// memory pressure by making requests to the autoscaler-agent. +#[derive(Debug, Parser)] +pub struct Args { + /// The name of the cgroup we should monitor for memory.high events. This + /// is the cgroup that postgres should be running in. + #[arg(short, long)] + pub cgroup: Option, + + /// The connection string for the Postgres file cache we should manage. + #[arg(short, long)] + pub pgconnstr: Option, + + /// The address we should listen on for connection requests. For the + /// agent, this is 0.0.0.0:10301. For the informant, this is 127.0.0.1:10369. + #[arg(short, long)] + pub addr: String, +} + +impl Args { + pub fn addr(&self) -> &str { + &self.addr + } +} + +/// The number of bytes in one mebibyte. +#[allow(non_upper_case_globals)] +const MiB: u64 = 1 << 20; + +/// Convert a quantity in bytes to a quantity in mebibytes, generally for display +/// purposes. (Most calculations in this crate use bytes directly) +pub fn bytes_to_mebibytes(bytes: u64) -> f32 { + (bytes as f32) / (MiB as f32) +} + +pub fn get_total_system_memory() -> u64 { + System::new_with_specifics(RefreshKind::new().with_memory()).total_memory() +} + +/// Global app state for the Axum server +#[derive(Debug, Clone)] +pub struct ServerState { + /// Used to close old connections. + /// + /// When a new connection is made, we send a message signalling to the old + /// connection to close. + pub sender: broadcast::Sender<()>, + + /// Used to cancel all spawned threads in the monitor. + pub token: CancellationToken, + + // The CLI args + pub args: &'static Args, +} + +/// Spawn a thread that may get cancelled by the provided [`CancellationToken`]. +/// +/// This is mainly meant to be called with futures that will be pending for a very +/// long time, or are not mean to return. If it is not desirable for the future to +/// ever resolve, such as in the case of [`cgroup::CgroupWatcher::watch`], the error can +/// be logged with `f`. +pub fn spawn_with_cancel( + token: CancellationToken, + f: F, + future: T, +) -> JoinHandle> +where + T: Future + Send + 'static, + T::Output: Send + 'static, + F: FnOnce(&T::Output) + Send + 'static, +{ + tokio::spawn(async move { + tokio::select! { + _ = token.cancelled() => { + info!("received global kill signal"); + None + } + res = future => { + f(&res); + Some(res) + } + } + }) +} + +/// The entrypoint to the binary. +/// +/// Set up tracing, parse arguments, and start an http server. +pub async fn start(args: &'static Args, token: CancellationToken) -> anyhow::Result<()> { + // This channel is used to close old connections. When a new connection is + // made, we send a message signalling to the old connection to close. + let (sender, _) = tokio::sync::broadcast::channel::<()>(1); + + let app = Router::new() + // This route gets upgraded to a websocket connection. We only support + // one connection at a time, which we enforce by killing old connections + // when we receive a new one. + .route("/monitor", get(ws_handler)) + .with_state(ServerState { + sender, + token, + args, + }); + + let addr = args.addr(); + let bound = Server::try_bind(&addr.parse().expect("parsing address should not fail")) + .with_context(|| format!("failed to bind to {addr}"))?; + + info!(addr, "server bound"); + + bound + .serve(app.into_make_service()) + .await + .context("server exited")?; + + Ok(()) +} + +/// Handles incoming websocket connections. +/// +/// If we are already to connected to an informant, we kill that old connection +/// and accept the new one. +#[tracing::instrument(name = "/monitor", skip_all, fields(?args))] +pub async fn ws_handler( + ws: WebSocketUpgrade, + State(ServerState { + sender, + token, + args, + }): State, +) -> Response { + // Kill the old monitor + info!("closing old connection if there is one"); + let _ = sender.send(()); + + // Start the new one. Wow, the cycle of death and rebirth + let closer = sender.subscribe(); + ws.on_upgrade(|ws| start_monitor(ws, args, closer, token)) +} + +/// Starts the monitor. If startup fails or the monitor exits, an error will +/// be logged and our internal state will be reset to allow for new connections. +#[tracing::instrument(skip_all, fields(?args))] +async fn start_monitor( + ws: WebSocket, + args: &Args, + kill: broadcast::Receiver<()>, + token: CancellationToken, +) { + info!("accepted new websocket connection -> starting monitor"); + let timeout = Duration::from_secs(4); + let monitor = tokio::time::timeout( + timeout, + Runner::new(Default::default(), args, ws, kill, token), + ) + .await; + let mut monitor = match monitor { + Ok(Ok(monitor)) => monitor, + Ok(Err(error)) => { + error!(?error, "failed to create monitor"); + return; + } + Err(_) => { + error!( + ?timeout, + "creating monitor timed out (probably waiting to receive protocol range)" + ); + return; + } + }; + info!("connected to informant"); + + match monitor.run().await { + Ok(()) => info!("monitor was killed due to new connection"), + Err(e) => error!(error = ?e, "monitor terminated by itself"), + } +} diff --git a/libs/vm_monitor/src/protocol.rs b/libs/vm_monitor/src/protocol.rs new file mode 100644 index 0000000000..c6f1f0f718 --- /dev/null +++ b/libs/vm_monitor/src/protocol.rs @@ -0,0 +1,241 @@ +//! Types representing protocols and actual informant-monitor messages. +//! +//! The pervasive use of serde modifiers throughout this module is to ease +//! serialization on the go side. Because go does not have enums (which model +//! messages well), it is harder to model messages, and we accomodate that with +//! serde. +//! +//! *Note*: the informant sends and receives messages in different ways. +//! +//! The informant serializes messages in the form and then sends them. The use +//! of `#[serde(tag = "type", content = "content")]` allows us to use `Type` +//! to determine how to deserialize `Content`. +//! ```ignore +//! struct { +//! Content any +//! Type string +//! Id uint64 +//! } +//! ``` +//! and receives messages in the form: +//! ```ignore +//! struct { +//! {fields embedded} +//! Type string +//! Id uint64 +//! } +//! ``` +//! After reading the type field, the informant will decode the entire message +//! again, this time into the correct type using the embedded fields. +//! Because the informant cannot just extract the json contained in a certain field +//! (it initially deserializes to `map[string]interface{}`), we keep the fields +//! at the top level, so the entire piece of json can be deserialized into a struct, +//! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored. + +use core::fmt; +use std::cmp; + +use serde::{de::Error, Deserialize, Serialize}; + +/// A Message we send to the informant. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct OutboundMsg { + #[serde(flatten)] + pub(crate) inner: OutboundMsgKind, + pub(crate) id: usize, +} + +impl OutboundMsg { + pub fn new(inner: OutboundMsgKind, id: usize) -> Self { + Self { inner, id } + } +} + +/// The different underlying message types we can send to the informant. +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "type")] +pub enum OutboundMsgKind { + /// Indicates that the informant sent an invalid message, i.e, we couldn't + /// properly deserialize it. + InvalidMessage { error: String }, + /// Indicates that we experienced an internal error while processing a message. + /// For example, if a cgroup operation fails while trying to handle an upscale, + /// we return `InternalError`. + InternalError { error: String }, + /// Returned to the informant once we have finished handling an upscale. If the + /// handling was unsuccessful, an `InternalError` will get returned instead. + /// *Note*: this is a struct variant because of the way go serializes struct{} + UpscaleConfirmation {}, + /// Indicates to the monitor that we are urgently requesting resources. + /// *Note*: this is a struct variant because of the way go serializes struct{} + UpscaleRequest {}, + /// Returned to the informant once we have finished attempting to downscale. If + /// an error occured trying to do so, an `InternalError` will get returned instead. + /// However, if we are simply unsuccessful (for example, do to needing the resources), + /// that gets included in the `DownscaleResult`. + DownscaleResult { + // FIXME for the future (once the informant is deprecated) + // As of the time of writing, the informant/agent version of this struct is + // called api.DownscaleResult. This struct has uppercase fields which are + // serialized as such. Thus, we serialize using uppercase names so we don't + // have to make a breaking change to the agent<->informant protocol. Once + // the informant has been superseded by the monitor, we can add the correct + // struct tags to api.DownscaleResult without causing a breaking change, + // since we don't need to support the agent<->informant protocol anymore. + #[serde(rename = "Ok")] + ok: bool, + #[serde(rename = "Status")] + status: String, + }, + /// Part of the bidirectional heartbeat. The heartbeat is initiated by the + /// informant. + /// *Note*: this is a struct variant because of the way go serializes struct{} + HealthCheck {}, +} + +/// A message received form the informant. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct InboundMsg { + #[serde(flatten)] + pub(crate) inner: InboundMsgKind, + pub(crate) id: usize, +} + +/// The different underlying message types we can receive from the informant. +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "type", content = "content")] +pub enum InboundMsgKind { + /// Indicates that the we sent an invalid message, i.e, we couldn't + /// properly deserialize it. + InvalidMessage { error: String }, + /// Indicates that the informan experienced an internal error while processing + /// a message. For example, if it failed to request upsacle from the agent, it + /// would return an `InternalError`. + InternalError { error: String }, + /// Indicates to us that we have been granted more resources. We should respond + /// with an `UpscaleConfirmation` when done handling the resources (increasins + /// file cache size, cgorup memory limits). + UpscaleNotification { granted: Resources }, + /// A request to reduce resource usage. We should response with a `DownscaleResult`, + /// when done. + DownscaleRequest { target: Resources }, + /// Part of the bidirectional heartbeat. The heartbeat is initiated by the + /// informant. + /// *Note*: this is a struct variant because of the way go serializes struct{} + HealthCheck {}, +} + +/// Represents the resources granted to a VM. +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +// Renamed because the agent/informant has multiple resources types: +// `Resources` (milliCPU/memory slots) +// `Allocation` (vCPU/bytes) <- what we correspond to +#[serde(rename(serialize = "Allocation", deserialize = "Allocation"))] +pub struct Resources { + /// Number of vCPUs + pub(crate) cpu: f64, + /// Bytes of memory + pub(crate) mem: u64, +} + +impl Resources { + pub fn new(cpu: f64, mem: u64) -> Self { + Self { cpu, mem } + } +} + +pub const PROTOCOL_MIN_VERSION: ProtocolVersion = ProtocolVersion::V1_0; +pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0; + +#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)] +pub struct ProtocolVersion(u8); + +impl ProtocolVersion { + /// Represents v1.0 of the informant<-> monitor protocol - the initial version + /// + /// Currently the latest version. + const V1_0: ProtocolVersion = ProtocolVersion(1); +} + +impl fmt::Display for ProtocolVersion { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + ProtocolVersion(0) => f.write_str(""), + ProtocolVersion::V1_0 => f.write_str("v1.0"), + other => write!(f, ""), + } + } +} + +/// A set of protocol bounds that determines what we are speaking. +/// +/// These bounds are inclusive. +#[derive(Debug)] +pub struct ProtocolRange { + pub min: ProtocolVersion, + pub max: ProtocolVersion, +} + +// Use a custom deserialize impl to ensure that `self.min <= self.max` +impl<'de> Deserialize<'de> for ProtocolRange { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize)] + struct InnerProtocolRange { + min: ProtocolVersion, + max: ProtocolVersion, + } + let InnerProtocolRange { min, max } = InnerProtocolRange::deserialize(deserializer)?; + if min > max { + Err(D::Error::custom(format!( + "min version = {min} is greater than max version = {max}", + ))) + } else { + Ok(ProtocolRange { min, max }) + } + } +} + +impl fmt::Display for ProtocolRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.min == self.max { + f.write_fmt(format_args!("{}", self.max)) + } else { + f.write_fmt(format_args!("{} to {}", self.min, self.max)) + } + } +} + +impl ProtocolRange { + /// Find the highest shared version between two `ProtocolRange`'s + pub fn highest_shared_version(&self, other: &Self) -> anyhow::Result { + // We first have to make sure the ranges are overlapping. Once we know + // this, we can merge the ranges by taking the max of the mins and the + // mins of the maxes. + if self.min > other.max { + anyhow::bail!( + "Non-overlapping bounds: other.max = {} was less than self.min = {}", + other.max, + self.min, + ) + } else if self.max < other.min { + anyhow::bail!( + "Non-overlappinng bounds: self.max = {} was less than other.min = {}", + self.max, + other.min + ) + } else { + Ok(cmp::min(self.max, other.max)) + } + } +} + +/// We send this to the monitor after negotiating which protocol to use +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub enum ProtocolResponse { + Error(String), + Version(ProtocolVersion), +} diff --git a/libs/vm_monitor/src/runner.rs b/libs/vm_monitor/src/runner.rs new file mode 100644 index 0000000000..7f247386e9 --- /dev/null +++ b/libs/vm_monitor/src/runner.rs @@ -0,0 +1,456 @@ +//! Exposes the `Runner`, which handles messages received from informant and +//! sends upscale requests. +//! +//! This is the "Monitor" part of the monitor binary and is the main entrypoint for +//! all functionality. + +use std::sync::Arc; +use std::{fmt::Debug, mem}; + +use anyhow::{bail, Context}; +use axum::extract::ws::{Message, WebSocket}; +use futures::StreamExt; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +use crate::cgroup::{CgroupWatcher, MemoryLimits, Sequenced}; +use crate::dispatcher::Dispatcher; +use crate::filecache::{FileCacheConfig, FileCacheState}; +use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources}; +use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB}; + +/// Central struct that interacts with informant, dispatcher, and cgroup to handle +/// signals from the informant. +#[derive(Debug)] +pub struct Runner { + config: Config, + filecache: Option, + cgroup: Option>, + dispatcher: Dispatcher, + + /// We "mint" new message ids by incrementing this counter and taking the value. + /// + /// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated + /// by us vs the autoscaler-agent. + counter: usize, + + /// A signal to kill the main thread produced by `self.run()`. This is triggered + /// when the server receives a new connection. When the thread receives the + /// signal off this channel, it will gracefully shutdown. + kill: broadcast::Receiver<()>, +} + +/// Configuration for a `Runner` +#[derive(Debug)] +pub struct Config { + /// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before + /// handing out the rest to userspace. This value is the estimated difference between the + /// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`. + /// + /// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM + /// (i.e., physical RAM minus a few reserved bits and the kernel binary code)". + /// + /// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory + /// size, rather than the self-reported memory size, according to the kernel. + /// + /// TODO: this field is only necessary while we still have to trust the autoscaler-agent's + /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field + /// should be removed once we have a better solution there. + sys_buffer_bytes: u64, +} + +impl Default for Config { + fn default() -> Self { + Self { + sys_buffer_bytes: 100 * MiB, + } + } +} + +impl Runner { + /// Create a new monitor. + #[tracing::instrument(skip_all, fields(?config, ?args))] + pub async fn new( + config: Config, + args: &Args, + ws: WebSocket, + kill: broadcast::Receiver<()>, + token: CancellationToken, + ) -> anyhow::Result { + anyhow::ensure!( + config.sys_buffer_bytes != 0, + "invalid monitor Config: sys_buffer_bytes cannot be 0" + ); + + // *NOTE*: the dispatcher and cgroup manager talk through these channels + // so make sure they each get the correct half, nothing is droppped, etc. + let (notified_send, notified_recv) = mpsc::channel(1); + let (requesting_send, requesting_recv) = mpsc::channel(1); + + let dispatcher = Dispatcher::new(ws, notified_send, requesting_recv) + .await + .context("error creating new dispatcher")?; + + let mut state = Runner { + config, + filecache: None, + cgroup: None, + dispatcher, + counter: 1, // NB: must be odd, see the comment about the field for more. + kill, + }; + + let mut file_cache_reserved_bytes = 0; + let mem = get_total_system_memory(); + + // We need to process file cache initialization before cgroup initialization, so that the memory + // allocated to the file cache is appropriately taken into account when we decide the cgroup's + // memory limits. + if let Some(connstr) = &args.pgconnstr { + info!("initializing file cache"); + let config: FileCacheConfig = Default::default(); + if !config.in_memory { + panic!("file cache not in-memory implemented") + } + + let mut file_cache = FileCacheState::new(connstr, config, token.clone()) + .await + .context("failed to create file cache")?; + + let size = file_cache + .get_file_cache_size() + .await + .context("error getting file cache size")?; + + let new_size = file_cache.config.calculate_cache_size(mem); + info!( + initial = bytes_to_mebibytes(size), + new = bytes_to_mebibytes(new_size), + "setting initial file cache size", + ); + + // note: even if size == new_size, we want to explicitly set it, just + // to make sure that we have the permissions to do so + let actual_size = file_cache + .set_file_cache_size(new_size) + .await + .context("failed to set file cache size, possibly due to inadequate permissions")?; + if actual_size != new_size { + info!("file cache size actually got set to {actual_size}") + } + file_cache_reserved_bytes = actual_size; + + state.filecache = Some(file_cache); + } + + if let Some(name) = &args.cgroup { + let (mut cgroup, cgroup_event_stream) = + CgroupWatcher::new(name.clone(), requesting_send) + .context("failed to create cgroup manager")?; + + let available = mem - file_cache_reserved_bytes; + + cgroup + .set_memory_limits(available) + .context("failed to set cgroup memory limits")?; + + let cgroup = Arc::new(cgroup); + + // Some might call this . . . cgroup v2 + let cgroup_clone = Arc::clone(&cgroup); + + spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move { + cgroup_clone.watch(notified_recv, cgroup_event_stream).await + }); + + state.cgroup = Some(cgroup); + } else { + // *NOTE*: We need to forget the sender so that its drop impl does not get ran. + // This allows us to poll it in `Monitor::run` regardless of whether we + // are managing a cgroup or not. If we don't forget it, all receives will + // immediately return an error because the sender is droped and it will + // claim all select! statements, effectively turning `Monitor::run` into + // `loop { fail to receive }`. + mem::forget(requesting_send); + } + + Ok(state) + } + + /// Attempt to downscale filecache + cgroup + #[tracing::instrument(skip_all, fields(?target))] + pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> { + // Nothing to adjust + if self.cgroup.is_none() && self.filecache.is_none() { + info!("no action needed for downscale (no cgroup or file cache enabled)"); + return Ok(( + true, + "monitor is not managing cgroup or file cache".to_string(), + )); + } + + let requested_mem = target.mem; + let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes); + let expected_file_cache_mem_usage = self + .filecache + .as_ref() + .map(|file_cache| file_cache.config.calculate_cache_size(usable_system_memory)) + .unwrap_or(0); + let mut new_cgroup_mem_high = 0; + if let Some(cgroup) = &self.cgroup { + new_cgroup_mem_high = cgroup + .config + .calculate_memory_high_value(usable_system_memory - expected_file_cache_mem_usage); + + let current = cgroup + .current_memory_usage() + .context("failed to fetch cgroup memory")?; + + if new_cgroup_mem_high < current + cgroup.config.memory_high_buffer_bytes { + let status = format!( + "{}: {} MiB (new high) < {} (current usage) + {} (buffer)", + "calculated memory.high too low", + bytes_to_mebibytes(new_cgroup_mem_high), + bytes_to_mebibytes(current), + bytes_to_mebibytes(cgroup.config.memory_high_buffer_bytes) + ); + + info!(status, "discontinuing downscale"); + + return Ok((false, status)); + } + } + + // The downscaling has been approved. Downscale the file cache, then the cgroup. + let mut status = vec![]; + let mut file_cache_mem_usage = 0; + if let Some(file_cache) = &mut self.filecache { + if !file_cache.config.in_memory { + panic!("file cache not in-memory unimplemented") + } + + let actual_usage = file_cache + .set_file_cache_size(expected_file_cache_mem_usage) + .await + .context("failed to set file cache size")?; + file_cache_mem_usage = actual_usage; + let message = format!( + "set file cache size to {} MiB", + bytes_to_mebibytes(actual_usage) + ); + info!("downscale: {message}"); + status.push(message); + } + + if let Some(cgroup) = &self.cgroup { + let available_memory = usable_system_memory - file_cache_mem_usage; + + if file_cache_mem_usage != expected_file_cache_mem_usage { + new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory); + } + + let limits = MemoryLimits::new( + // new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here + // since it is properly initialized in the previous cgroup if let block + new_cgroup_mem_high, + available_memory, + ); + cgroup + .set_limits(&limits) + .context("failed to set cgroup memory limits")?; + + let message = format!( + "set cgroup memory.high to {} MiB, of new max {} MiB", + bytes_to_mebibytes(new_cgroup_mem_high), + bytes_to_mebibytes(available_memory) + ); + info!("downscale: {message}"); + status.push(message); + } + + // TODO: make this status thing less jank + let status = status.join("; "); + Ok((true, status)) + } + + /// Handle new resources + #[tracing::instrument(skip_all, fields(?resources))] + pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> { + if self.filecache.is_none() && self.cgroup.is_none() { + info!("no action needed for upscale (no cgroup or file cache enabled)"); + return Ok(()); + } + + let new_mem = resources.mem; + let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes); + + // Get the file cache's expected contribution to the memory usage + let mut file_cache_mem_usage = 0; + if let Some(file_cache) = &mut self.filecache { + if !file_cache.config.in_memory { + panic!("file cache not in-memory unimplemented"); + } + + let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory); + info!( + target = bytes_to_mebibytes(expected_usage), + total = bytes_to_mebibytes(new_mem), + "updating file cache size", + ); + + let actual_usage = file_cache + .set_file_cache_size(expected_usage) + .await + .context("failed to set file cache size")?; + + if actual_usage != expected_usage { + warn!( + "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib", + bytes_to_mebibytes(expected_usage), + bytes_to_mebibytes(actual_usage) + ) + } + file_cache_mem_usage = actual_usage; + } + + if let Some(cgroup) = &self.cgroup { + let available_memory = usable_system_memory - file_cache_mem_usage; + let new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory); + info!( + target = bytes_to_mebibytes(new_cgroup_mem_high), + total = bytes_to_mebibytes(new_mem), + name = cgroup.path(), + "updating cgroup memory.high", + ); + let limits = MemoryLimits::new(new_cgroup_mem_high, available_memory); + cgroup + .set_limits(&limits) + .context("failed to set file cache size")?; + } + + Ok(()) + } + + /// Take in a message and perform some action, such as downscaling or upscaling, + /// and return a message to be send back. + #[tracing::instrument(skip_all, fields(%id, message = ?inner))] + pub async fn process_message( + &mut self, + InboundMsg { inner, id }: InboundMsg, + ) -> anyhow::Result> { + match inner { + InboundMsgKind::UpscaleNotification { granted } => { + self.handle_upscale(granted) + .await + .context("failed to handle upscale")?; + self.dispatcher + .notify_upscale(Sequenced::new(granted)) + .await + .context("failed to notify notify cgroup of upscale")?; + Ok(Some(OutboundMsg::new( + OutboundMsgKind::UpscaleConfirmation {}, + id, + ))) + } + InboundMsgKind::DownscaleRequest { target } => self + .try_downscale(target) + .await + .context("failed to downscale") + .map(|(ok, status)| { + Some(OutboundMsg::new( + OutboundMsgKind::DownscaleResult { ok, status }, + id, + )) + }), + InboundMsgKind::InvalidMessage { error } => { + warn!( + %error, id, "received notification of an invalid message we sent" + ); + Ok(None) + } + InboundMsgKind::InternalError { error } => { + warn!(error, id, "informant experienced an internal error"); + Ok(None) + } + InboundMsgKind::HealthCheck {} => { + Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id))) + } + } + } + + // TODO: don't propagate errors, probably just warn!? + #[tracing::instrument(skip_all)] + pub async fn run(&mut self) -> anyhow::Result<()> { + info!("starting dispatcher"); + loop { + tokio::select! { + signal = self.kill.recv() => { + match signal { + Ok(()) => return Ok(()), + Err(e) => bail!("failed to receive kill signal: {e}") + } + } + // we need to propagate an upscale request + request = self.dispatcher.request_upscale_events.recv() => { + if request.is_none() { + bail!("failed to listen for upscale event from cgroup") + } + info!("cgroup asking for upscale; forwarding request"); + self.counter += 2; // Increment, preserving parity (i.e. keep the + // counter odd). See the field comment for more. + self.dispatcher + .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter)) + .await + .context("failed to send message")?; + } + // there is a message from the informant + msg = self.dispatcher.source.next() => { + if let Some(msg) = msg { + info!(message = ?msg, "received message"); + match msg { + Ok(msg) => { + let message: InboundMsg = match msg { + Message::Text(text) => { + serde_json::from_str(&text).context("failed to deserialize text message")? + } + other => { + warn!( + message = ?other, + "informant should only send text messages but received different type" + ); + continue + }, + }; + + let out = match self.process_message(message.clone()).await { + Ok(Some(out)) => out, + Ok(None) => continue, + Err(e) => { + let error = e.to_string(); + warn!(%error, "error handling message"); + OutboundMsg::new( + OutboundMsgKind::InternalError { + error + }, + message.id + ) + } + }; + + self.dispatcher + .send(out) + .await + .context("failed to send message")?; + } + Err(e) => warn!("{e}"), + } + } else { + anyhow::bail!("dispatcher connection closed") + } + } + } + } + } +} diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index dcac4677a5..831cc6f6b1 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -14,11 +14,13 @@ publish = false ### BEGIN HAKARI SECTION [dependencies] anyhow = { version = "1", features = ["backtrace"] } +axum = { version = "0.6", features = ["ws"] } bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] } clap = { version = "4", features = ["derive", "string"] } clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] } crossbeam-utils = { version = "0.8" } +digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1" } fail = { version = "0.5", default-features = false, features = ["failpoints"] } futures = { version = "0.3" } @@ -27,6 +29,7 @@ futures-core = { version = "0.3" } futures-executor = { version = "0.3" } futures-sink = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } +hyper = { version = "0.14", features = ["full"] } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] } log = { version = "0.4", default-features = false, features = ["std"] } @@ -55,7 +58,6 @@ toml_edit = { version = "0.19", features = ["serde"] } tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] } tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" } -tracing-subscriber = { version = "0.3", default-features = false, features = ["env-filter", "fmt", "json", "smallvec", "tracing-log"] } url = { version = "2", features = ["serde"] } [build-dependencies]