diff --git a/libs/vm_monitor/src/cgroup.rs b/libs/vm_monitor/src/cgroup.rs index 3254fa4501..15e972505e 100644 --- a/libs/vm_monitor/src/cgroup.rs +++ b/libs/vm_monitor/src/cgroup.rs @@ -431,14 +431,14 @@ impl CgroupWatcher { .context("failed to request upscale")?; let memory_high = - self.get_high_bytes().context("failed to get memory.high")?; + self.get_memory_high_bytes().context("failed to get memory.high")?; let new_high = memory_high + self.config.memory_high_increase_by_bytes; info!( current_high_bytes = memory_high, new_high_bytes = new_high, "updating memory.high" ); - self.set_high_bytes(new_high) + self.set_memory_high_bytes(new_high) .context("failed to set memory.high")?; last_memory_high_increase_at = Some(Instant::now()); continue; @@ -556,14 +556,6 @@ impl CgroupWatcher { } } -/// Represents a set of limits we apply to a cgroup to control memory usage. -/// -/// Setting these values also affects the thresholds for receiving usage alerts. -#[derive(Debug)] -pub struct MemoryLimits { - pub high: u64, -} - // Methods for manipulating the actual cgroup impl CgroupWatcher { /// Get a handle on the freezer subsystem. @@ -624,50 +616,29 @@ impl CgroupWatcher { } /// Set cgroup memory.high threshold. - pub fn set_high_bytes(&self, bytes: u64) -> anyhow::Result<()> { + pub fn set_memory_high_bytes(&self, bytes: u64) -> anyhow::Result<()> { + self.set_memory_high_internal(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)) + } + + /// Set the cgroup's memory.high to 'max', disabling it. + pub fn unset_memory_high(&self) -> anyhow::Result<()> { + self.set_memory_high_internal(MaxValue::Max) + } + + fn set_memory_high_internal(&self, value: MaxValue) -> anyhow::Result<()> { self.memory() .context("failed to get memory subsystem")? .set_mem(cgroups_rs::memory::SetMemory { low: None, - high: Some(MaxValue::Value(u64::min(bytes, i64::MAX as u64) as i64)), + high: Some(value), min: None, max: None, }) - .context("failed to set memory.high") - } - - /// Set cgroup memory.high and memory.max. - pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> { - info!(limits.high, path = self.path(), "writing new memory limits",); - self.memory() - .context("failed to get memory subsystem while setting memory limits")? - .set_mem(cgroups_rs::memory::SetMemory { - min: None, - low: None, - high: Some(MaxValue::Value( - u64::min(limits.high, i64::MAX as u64) as i64 - )), - max: None, - }) - .context("failed to set memory limits") - } - - /// Given some amount of available memory, set the desired cgroup memory limits - pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> { - let new_high = self.config.calculate_memory_high_value(available_memory); - let limits = MemoryLimits { high: new_high }; - info!( - path = self.path(), - memory = ?limits, - "setting cgroup memory", - ); - self.set_limits(&limits) - .context("failed to set cgroup memory limits")?; - Ok(()) + .map_err(anyhow::Error::from) } /// Get memory.high threshold. - pub fn get_high_bytes(&self) -> anyhow::Result { + pub fn get_memory_high_bytes(&self) -> anyhow::Result { let high = self .memory() .context("failed to get memory subsystem while getting memory statistics")? diff --git a/libs/vm_monitor/src/runner.rs b/libs/vm_monitor/src/runner.rs index 376017d784..09863c8936 100644 --- a/libs/vm_monitor/src/runner.rs +++ b/libs/vm_monitor/src/runner.rs @@ -16,7 +16,7 @@ use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; -use crate::cgroup::{CgroupWatcher, MemoryLimits, Sequenced}; +use crate::cgroup::{CgroupWatcher, Sequenced}; use crate::dispatcher::Dispatcher; use crate::filecache::{FileCacheConfig, FileCacheState}; use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources}; @@ -106,6 +106,51 @@ impl Runner { kill, }; + // If we have both the cgroup and file cache integrations enabled, it's possible for + // temporary failures to result in cgroup throttling (from memory.high), that in turn makes + // it near-impossible to connect to the file cache (because it times out). Unfortunately, + // we *do* still want to determine the file cache size before setting the cgroup's + // memory.high, so it's not as simple as just swapping the order. + // + // Instead, the resolution here is that on vm-monitor startup (note: happens on each + // connection from autoscaler-agent, possibly multiple times per compute_ctl lifecycle), we + // temporarily unset memory.high, to allow any existing throttling to dissipate. It's a bit + // of a hacky solution, but helps with reliability. + if let Some(name) = &args.cgroup { + // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state + // now, and then set limits later. + info!("initializing cgroup"); + + let (cgroup, cgroup_event_stream) = CgroupWatcher::new(name.clone(), requesting_send) + .context("failed to create cgroup manager")?; + + info!("temporarily unsetting memory.high"); + + // Temporarily un-set cgroup memory.high; see above. + cgroup + .unset_memory_high() + .context("failed to unset memory.high")?; + + let cgroup = Arc::new(cgroup); + + let cgroup_clone = Arc::clone(&cgroup); + spawn_with_cancel( + token.clone(), + |_| error!("cgroup watcher terminated"), + async move { cgroup_clone.watch(notified_recv, cgroup_event_stream).await }, + ); + + state.cgroup = Some(cgroup); + } else { + // *NOTE*: We need to forget the sender so that its drop impl does not get ran. + // This allows us to poll it in `Monitor::run` regardless of whether we + // are managing a cgroup or not. If we don't forget it, all receives will + // immediately return an error because the sender is droped and it will + // claim all select! statements, effectively turning `Monitor::run` into + // `loop { fail to receive }`. + mem::forget(requesting_send); + } + let mut file_cache_reserved_bytes = 0; let mem = get_total_system_memory(); @@ -119,7 +164,7 @@ impl Runner { false => FileCacheConfig::default_in_memory(), }; - let mut file_cache = FileCacheState::new(connstr, config, token.clone()) + let mut file_cache = FileCacheState::new(connstr, config, token) .await .context("failed to create file cache")?; @@ -152,35 +197,15 @@ impl Runner { state.filecache = Some(file_cache); } - if let Some(name) = &args.cgroup { - let (mut cgroup, cgroup_event_stream) = - CgroupWatcher::new(name.clone(), requesting_send) - .context("failed to create cgroup manager")?; - + if let Some(cgroup) = &state.cgroup { let available = mem - file_cache_reserved_bytes; + let value = cgroup.config.calculate_memory_high_value(available); + + info!(value, "setting memory.high"); cgroup - .set_memory_limits(available) - .context("failed to set cgroup memory limits")?; - - let cgroup = Arc::new(cgroup); - - // Some might call this . . . cgroup v2 - let cgroup_clone = Arc::clone(&cgroup); - - spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move { - cgroup_clone.watch(notified_recv, cgroup_event_stream).await - }); - - state.cgroup = Some(cgroup); - } else { - // *NOTE*: We need to forget the sender so that its drop impl does not get ran. - // This allows us to poll it in `Monitor::run` regardless of whether we - // are managing a cgroup or not. If we don't forget it, all receives will - // immediately return an error because the sender is droped and it will - // claim all select! statements, effectively turning `Monitor::run` into - // `loop { fail to receive }`. - mem::forget(requesting_send); + .set_memory_high_bytes(value) + .context("failed to set cgroup memory.high")?; } Ok(state) @@ -257,14 +282,11 @@ impl Runner { new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory); } - let limits = MemoryLimits { - // new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here - // since it is properly initialized in the previous cgroup if let block - high: new_cgroup_mem_high, - }; + // new_cgroup_mem_high is initialized to 0 but it is guaranteed to not be here + // since it is properly initialized in the previous cgroup if let block cgroup - .set_limits(&limits) - .context("failed to set cgroup memory limits")?; + .set_memory_high_bytes(new_cgroup_mem_high) + .context("failed to set cgroup memory.high")?; let message = format!( "set cgroup memory.high to {} MiB, of new max {} MiB", @@ -327,12 +349,9 @@ impl Runner { name = cgroup.path(), "updating cgroup memory.high", ); - let limits = MemoryLimits { - high: new_cgroup_mem_high, - }; cgroup - .set_limits(&limits) - .context("failed to set file cache size")?; + .set_memory_high_bytes(new_cgroup_mem_high) + .context("failed to set cgroup memory.high")?; } Ok(())