From 3904df5397af595749dbce7334295b121750cef8 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sun, 29 Mar 2026 20:32:49 -0700 Subject: [PATCH] chore: refine track memory metrics semantics (#7874) Signed-off-by: jeremyhi --- src/common/memory-manager/src/guard.rs | 2 +- src/common/memory-manager/src/lib.rs | 2 +- src/common/memory-manager/src/manager.rs | 5 +- src/common/recordbatch/src/lib.rs | 128 +++++++++++++++++++-- src/mito2/src/compaction/memory_manager.rs | 2 +- src/mito2/src/engine.rs | 9 +- src/mito2/src/metrics.rs | 9 +- src/servers/src/metrics.rs | 2 +- src/servers/src/request_memory_metrics.rs | 2 +- 9 files changed, 140 insertions(+), 21 deletions(-) diff --git a/src/common/memory-manager/src/guard.rs b/src/common/memory-manager/src/guard.rs index ad3111581b..784b72830d 100644 --- a/src/common/memory-manager/src/guard.rs +++ b/src/common/memory-manager/src/guard.rs @@ -172,7 +172,7 @@ impl MemoryGuard { true } Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { - quota.metrics.inc_rejected("try_acquire_additional"); + quota.metrics.inc_exhausted("try_acquire_additional"); false } } diff --git a/src/common/memory-manager/src/lib.rs b/src/common/memory-manager/src/lib.rs index b1d858ef89..983c6ca524 100644 --- a/src/common/memory-manager/src/lib.rs +++ b/src/common/memory-manager/src/lib.rs @@ -45,5 +45,5 @@ impl MemoryMetrics for NoOpMetrics { fn set_in_use(&self, _: i64) {} #[inline(always)] - fn inc_rejected(&self, _: &str) {} + fn inc_exhausted(&self, _: &str) {} } diff --git a/src/common/memory-manager/src/manager.rs b/src/common/memory-manager/src/manager.rs index 8cca5f220c..282f51e315 100644 --- a/src/common/memory-manager/src/manager.rs +++ b/src/common/memory-manager/src/manager.rs @@ -29,7 +29,8 @@ use crate::policy::OnExhaustedPolicy; pub trait MemoryMetrics: Clone + Send + Sync + 'static { fn set_limit(&self, bytes: i64); fn set_in_use(&self, bytes: i64); - fn inc_rejected(&self, reason: &str); + /// Record that immediate memory acquisition failed due to exhausted quota. + fn inc_exhausted(&self, reason: &str); } /// Generic memory manager for quota-controlled operations. @@ -171,7 +172,7 @@ impl MemoryManager { Some(MemoryGuard::limited(quota.clone(), permit)) } Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { - quota.metrics.inc_rejected("try_acquire"); + quota.metrics.inc_exhausted("try_acquire"); None } } diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 0a2d697407..d84e9e9d26 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -437,7 +437,8 @@ impl fmt::Debug for QueryMemoryTracker { .field("limit", &self.limit()) .field("on_exhausted_policy", &self.on_exhausted_policy) .field("on_update", &self.metrics.has_on_update()) - .field("on_reject", &self.metrics.has_on_reject()) + .field("on_exhausted", &self.metrics.has_on_exhausted()) + .field("on_rejected", &self.metrics.has_on_rejected()) .finish() } } @@ -452,6 +453,7 @@ impl QueryMemoryTracker { limit, on_exhausted_policy, on_update: None, + on_exhausted: None, on_reject: None, } } @@ -489,6 +491,10 @@ impl QueryMemoryTracker { ); error::ExceedMemoryLimitSnafu { msg }.build() } + + fn inc_rejected(&self) { + self.metrics.inc_rejected(); + } } /// Builder for constructing a [`QueryMemoryTracker`] with optional callbacks. @@ -496,6 +502,7 @@ pub struct QueryMemoryTrackerBuilder { limit: usize, on_exhausted_policy: OnExhaustedPolicy, on_update: Option, + on_exhausted: Option, on_reject: Option, } @@ -514,11 +521,21 @@ impl QueryMemoryTrackerBuilder { self } - /// Set a callback to be called when memory allocation is rejected. + /// Set a callback to be called when memory is unavailable for immediate acquisition. /// /// # Note - /// This is only called when `track()` fails due to exceeding the limit. + /// This is called when the non-blocking allocation fast path fails. + /// Requests using `OnExhaustedPolicy::Wait` may still succeed after waiting. /// It is never called when `limit == 0` (unlimited mode). + pub fn on_exhausted(mut self, on_exhausted: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + self.on_exhausted = Some(Arc::new(on_exhausted)); + self + } + + /// Set a callback to be called when the request ultimately fails due to memory pressure. pub fn on_reject(mut self, on_reject: F) -> Self where F: Fn() + Send + Sync + 'static, @@ -529,7 +546,7 @@ impl QueryMemoryTrackerBuilder { /// Build a [`QueryMemoryTracker`] from this builder. pub fn build(self) -> QueryMemoryTracker { - let metrics = CallbackMemoryMetrics::new(self.on_update, self.on_reject); + let metrics = CallbackMemoryMetrics::new(self.on_update, self.on_exhausted, self.on_reject); let manager = MemoryManager::with_granularity( self.limit as u64, PermitGranularity::Kilobyte, @@ -553,6 +570,10 @@ struct StreamMemoryTracker { type MemoryAcquireResult = std::result::Result<(), common_memory_manager::Error>; impl StreamMemoryTracker { + fn inc_rejected(&self) { + self.tracker.inc_rejected(); + } + fn try_track(&mut self, additional: usize) -> Result<()> { if self.guard.try_acquire_additional(additional as u64) { self.tracked_bytes = self.tracked_bytes.saturating_add(additional); @@ -613,18 +634,25 @@ struct CallbackMemoryMetrics { } type UpdateCallback = Arc; -type RejectCallback = Arc; +type UnitCallback = Arc; +type RejectCallback = UnitCallback; struct CallbackMemoryMetricsInner { on_update: Option, + on_exhausted: Option, on_reject: Option, } impl CallbackMemoryMetrics { - fn new(on_update: Option, on_reject: Option) -> Self { + fn new( + on_update: Option, + on_exhausted: Option, + on_reject: Option, + ) -> Self { Self { inner: Arc::new(CallbackMemoryMetricsInner { on_update, + on_exhausted, on_reject, }), } @@ -634,9 +662,19 @@ impl CallbackMemoryMetrics { self.inner.on_update.is_some() } - fn has_on_reject(&self) -> bool { + fn has_on_exhausted(&self) -> bool { + self.inner.on_exhausted.is_some() + } + + fn has_on_rejected(&self) -> bool { self.inner.on_reject.is_some() } + + fn inc_rejected(&self) { + if let Some(callback) = &self.inner.on_reject { + callback(); + } + } } impl MemoryMetrics for CallbackMemoryMetrics { @@ -648,8 +686,8 @@ impl MemoryMetrics for CallbackMemoryMetrics { } } - fn inc_rejected(&self, _: &str) { - if let Some(callback) = &self.inner.on_reject { + fn inc_exhausted(&self, _: &str) { + if let Some(callback) = &self.inner.on_exhausted { callback(); } } @@ -712,7 +750,10 @@ impl MemoryTrackedStream { Poll::Ready((tracker, batch, additional, result)) => { let output = match result { Ok(()) => Ok(batch), - Err(error) => Err(tracker.wait_error(additional, error)), + Err(error) => { + tracker.inc_rejected(); + Err(tracker.wait_error(additional, error)) + } }; self.waiting = None; self.tracker = Some(tracker); @@ -732,7 +773,10 @@ impl MemoryTrackedStream { if let Err(error) = tracker.try_track(additional) { match tracker.tracker.on_exhausted_policy { - OnExhaustedPolicy::Fail => return Poll::Ready(Some(Err(error))), + OnExhaustedPolicy::Fail => { + tracker.inc_rejected(); + return Poll::Ready(Some(Err(error))); + } // `Wait` is a deliberate tradeoff: the batch has already been materialized, so we // keep it in memory while waiting for quota instead of failing immediately. Under // contention, real memory usage can therefore exceed `scan_memory_limit` by up to @@ -786,6 +830,7 @@ impl RecordBatchStream for MemoryTrackedStream { #[cfg(test)] mod tests { use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use common_memory_manager::{OnExhaustedPolicy, PermitGranularity}; @@ -988,12 +1033,22 @@ mod tests { #[tokio::test] async fn test_memory_tracked_stream_waits_for_capacity() { + let exhausted = Arc::new(AtomicUsize::new(0)); + let rejected = Arc::new(AtomicUsize::new(0)); + let exhausted_counter = exhausted.clone(); + let rejected_counter = rejected.clone(); let tracker = QueryMemoryTracker::builder( MB, OnExhaustedPolicy::Wait { timeout: Duration::from_millis(200), }, ) + .on_exhausted(move || { + exhausted_counter.fetch_add(1, Ordering::Relaxed); + }) + .on_reject(move || { + rejected_counter.fetch_add(1, Ordering::Relaxed); + }) .build(); let batch = large_string_batch(700 * 1024); let expected_bytes = aligned_tracked_bytes(batch.buffer_memory_size()); @@ -1025,16 +1080,28 @@ mod tests { drop(stream1); let second = waiter.await.unwrap().unwrap(); assert_eq!(second.num_rows(), 1); + assert_eq!(exhausted.load(Ordering::Relaxed), 1); + assert_eq!(rejected.load(Ordering::Relaxed), 0); } #[tokio::test] async fn test_memory_tracked_stream_wait_times_out() { + let exhausted = Arc::new(AtomicUsize::new(0)); + let rejected = Arc::new(AtomicUsize::new(0)); + let exhausted_counter = exhausted.clone(); + let rejected_counter = rejected.clone(); let tracker = QueryMemoryTracker::builder( MB, OnExhaustedPolicy::Wait { timeout: Duration::from_millis(50), }, ) + .on_exhausted(move || { + exhausted_counter.fetch_add(1, Ordering::Relaxed); + }) + .on_reject(move || { + rejected_counter.fetch_add(1, Ordering::Relaxed); + }) .build(); let batch = large_string_batch(700 * 1024); @@ -1058,5 +1125,44 @@ mod tests { .unwrap(); let error = result.unwrap().unwrap_err(); assert!(error.to_string().contains("timed out waiting")); + assert_eq!(exhausted.load(Ordering::Relaxed), 1); + assert_eq!(rejected.load(Ordering::Relaxed), 1); + } + + #[tokio::test] + async fn test_memory_tracked_stream_fail_policy_rejects_immediately() { + let exhausted = Arc::new(AtomicUsize::new(0)); + let rejected = Arc::new(AtomicUsize::new(0)); + let exhausted_counter = exhausted.clone(); + let rejected_counter = rejected.clone(); + let tracker = QueryMemoryTracker::builder(MB, OnExhaustedPolicy::Fail) + .on_exhausted(move || { + exhausted_counter.fetch_add(1, Ordering::Relaxed); + }) + .on_reject(move || { + rejected_counter.fetch_add(1, Ordering::Relaxed); + }) + .build(); + let batch = large_string_batch(700 * 1024); + + let mut stream1 = MemoryTrackedStream::new( + RecordBatches::try_new(batch.schema.clone(), vec![batch.clone()]) + .unwrap() + .as_stream(), + tracker.clone(), + ); + let first = stream1.next().await.unwrap().unwrap(); + assert_eq!(first.num_rows(), 1); + + let mut stream2 = MemoryTrackedStream::new( + RecordBatches::try_new(batch.schema.clone(), vec![batch]) + .unwrap() + .as_stream(), + tracker, + ); + let result = stream2.next().await.unwrap(); + assert!(result.is_err()); + assert_eq!(exhausted.load(Ordering::Relaxed), 1); + assert_eq!(rejected.load(Ordering::Relaxed), 1); } } diff --git a/src/mito2/src/compaction/memory_manager.rs b/src/mito2/src/compaction/memory_manager.rs index 8cbb5d293a..94b0779254 100644 --- a/src/mito2/src/compaction/memory_manager.rs +++ b/src/mito2/src/compaction/memory_manager.rs @@ -31,7 +31,7 @@ impl MemoryMetrics for CompactionMemoryMetrics { COMPACTION_MEMORY_IN_USE.set(bytes); } - fn inc_rejected(&self, reason: &str) { + fn inc_exhausted(&self, reason: &str) { COMPACTION_MEMORY_REJECTED .with_label_values(&[reason]) .inc(); diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index fbafe1da67..d1c30c3ff6 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -138,7 +138,8 @@ use crate::gc::GcLimiterRef; use crate::manifest::action::RegionEdit; use crate::memtable::MemtableStats; use crate::metrics::{ - HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL, + HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_EXHAUSTED_TOTAL, SCAN_MEMORY_USAGE_BYTES, + SCAN_REQUESTS_REJECTED_TOTAL, }; use crate::read::scan_region::{ScanRegion, Scanner}; use crate::read::stream::ScanBatchStream; @@ -231,6 +232,9 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { .on_update(|usage| { SCAN_MEMORY_USAGE_BYTES.set(usage as i64); }) + .on_exhausted(|| { + SCAN_MEMORY_EXHAUSTED_TOTAL.inc(); + }) .on_reject(|| { SCAN_REQUESTS_REJECTED_TOTAL.inc(); }) @@ -1380,6 +1384,9 @@ impl MitoEngine { .on_update(|usage| { SCAN_MEMORY_USAGE_BYTES.set(usage as i64); }) + .on_exhausted(|| { + SCAN_MEMORY_EXHAUSTED_TOTAL.inc(); + }) .on_reject(|| { SCAN_REQUESTS_REJECTED_TOTAL.inc(); }) diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index c0537567f9..30a7ac765c 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -244,10 +244,15 @@ lazy_static! { "greptime_mito_scan_memory_usage_bytes", "current scan memory usage in bytes" ).unwrap(); - /// Counter of rejected scan requests due to memory limit. + /// Counter of scan allocation attempts that could not acquire memory immediately. + pub static ref SCAN_MEMORY_EXHAUSTED_TOTAL: IntCounter = register_int_counter!( + "greptime_mito_scan_memory_exhausted_total", + "total number of times scan memory was unavailable for immediate acquisition" + ).unwrap(); + /// Counter of scan requests that ultimately failed due to memory pressure. pub static ref SCAN_REQUESTS_REJECTED_TOTAL: IntCounter = register_int_counter!( "greptime_mito_scan_requests_rejected_total", - "total number of scan requests rejected due to memory limit" + "total number of scan requests that ultimately failed due to memory limit" ).unwrap(); /// Gauge for active file range builders in the pruner. pub static ref PRUNER_ACTIVE_BUILDERS: IntGauge = register_int_gauge!( diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 37f923b73d..e3bff7fdbc 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -361,7 +361,7 @@ lazy_static! { "maximum bytes allowed for all concurrent request bodies and messages" ).unwrap(); - /// Total number of rejected requests due to memory exhaustion. + /// Total number of requests rejected due to memory exhaustion. pub static ref REQUEST_MEMORY_REJECTED: IntCounterVec = register_int_counter_vec!( "greptime_servers_request_memory_rejected_total", "number of requests rejected due to memory limit", diff --git a/src/servers/src/request_memory_metrics.rs b/src/servers/src/request_memory_metrics.rs index 4298830f18..68f52816f4 100644 --- a/src/servers/src/request_memory_metrics.rs +++ b/src/servers/src/request_memory_metrics.rs @@ -34,7 +34,7 @@ impl MemoryMetrics for RequestMemoryMetrics { REQUEST_MEMORY_IN_USE.set(bytes); } - fn inc_rejected(&self, reason: &str) { + fn inc_exhausted(&self, reason: &str) { REQUEST_MEMORY_REJECTED.with_label_values(&[reason]).inc(); } }