chore: refine track memory metrics semantics (#7874)

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-03-29 20:32:49 -07:00
committed by GitHub
parent 8e7e4a91d2
commit 3904df5397
9 changed files with 140 additions and 21 deletions

View File

@@ -172,7 +172,7 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
true
}
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
quota.metrics.inc_rejected("try_acquire_additional");
quota.metrics.inc_exhausted("try_acquire_additional");
false
}
}

View File

@@ -45,5 +45,5 @@ impl MemoryMetrics for NoOpMetrics {
fn set_in_use(&self, _: i64) {}
#[inline(always)]
fn inc_rejected(&self, _: &str) {}
fn inc_exhausted(&self, _: &str) {}
}

View File

@@ -29,7 +29,8 @@ use crate::policy::OnExhaustedPolicy;
pub trait MemoryMetrics: Clone + Send + Sync + 'static {
fn set_limit(&self, bytes: i64);
fn set_in_use(&self, bytes: i64);
fn inc_rejected(&self, reason: &str);
/// Record that immediate memory acquisition failed due to exhausted quota.
fn inc_exhausted(&self, reason: &str);
}
/// Generic memory manager for quota-controlled operations.
@@ -171,7 +172,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
Some(MemoryGuard::limited(quota.clone(), permit))
}
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
quota.metrics.inc_rejected("try_acquire");
quota.metrics.inc_exhausted("try_acquire");
None
}
}

View File

@@ -437,7 +437,8 @@ impl fmt::Debug for QueryMemoryTracker {
.field("limit", &self.limit())
.field("on_exhausted_policy", &self.on_exhausted_policy)
.field("on_update", &self.metrics.has_on_update())
.field("on_reject", &self.metrics.has_on_reject())
.field("on_exhausted", &self.metrics.has_on_exhausted())
.field("on_rejected", &self.metrics.has_on_rejected())
.finish()
}
}
@@ -452,6 +453,7 @@ impl QueryMemoryTracker {
limit,
on_exhausted_policy,
on_update: None,
on_exhausted: None,
on_reject: None,
}
}
@@ -489,6 +491,10 @@ impl QueryMemoryTracker {
);
error::ExceedMemoryLimitSnafu { msg }.build()
}
fn inc_rejected(&self) {
self.metrics.inc_rejected();
}
}
/// Builder for constructing a [`QueryMemoryTracker`] with optional callbacks.
@@ -496,6 +502,7 @@ pub struct QueryMemoryTrackerBuilder {
limit: usize,
on_exhausted_policy: OnExhaustedPolicy,
on_update: Option<UpdateCallback>,
on_exhausted: Option<UnitCallback>,
on_reject: Option<RejectCallback>,
}
@@ -514,11 +521,21 @@ impl QueryMemoryTrackerBuilder {
self
}
/// Set a callback to be called when memory allocation is rejected.
/// Set a callback to be called when memory is unavailable for immediate acquisition.
///
/// # Note
/// This is only called when `track()` fails due to exceeding the limit.
/// This is called when the non-blocking allocation fast path fails.
/// Requests using `OnExhaustedPolicy::Wait` may still succeed after waiting.
/// It is never called when `limit == 0` (unlimited mode).
pub fn on_exhausted<F>(mut self, on_exhausted: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.on_exhausted = Some(Arc::new(on_exhausted));
self
}
/// Set a callback to be called when the request ultimately fails due to memory pressure.
pub fn on_reject<F>(mut self, on_reject: F) -> Self
where
F: Fn() + Send + Sync + 'static,
@@ -529,7 +546,7 @@ impl QueryMemoryTrackerBuilder {
/// Build a [`QueryMemoryTracker`] from this builder.
pub fn build(self) -> QueryMemoryTracker {
let metrics = CallbackMemoryMetrics::new(self.on_update, self.on_reject);
let metrics = CallbackMemoryMetrics::new(self.on_update, self.on_exhausted, self.on_reject);
let manager = MemoryManager::with_granularity(
self.limit as u64,
PermitGranularity::Kilobyte,
@@ -553,6 +570,10 @@ struct StreamMemoryTracker {
type MemoryAcquireResult = std::result::Result<(), common_memory_manager::Error>;
impl StreamMemoryTracker {
fn inc_rejected(&self) {
self.tracker.inc_rejected();
}
fn try_track(&mut self, additional: usize) -> Result<()> {
if self.guard.try_acquire_additional(additional as u64) {
self.tracked_bytes = self.tracked_bytes.saturating_add(additional);
@@ -613,18 +634,25 @@ struct CallbackMemoryMetrics {
}
type UpdateCallback = Arc<dyn Fn(usize) + Send + Sync>;
type RejectCallback = Arc<dyn Fn() + Send + Sync>;
type UnitCallback = Arc<dyn Fn() + Send + Sync>;
type RejectCallback = UnitCallback;
struct CallbackMemoryMetricsInner {
on_update: Option<UpdateCallback>,
on_exhausted: Option<UnitCallback>,
on_reject: Option<RejectCallback>,
}
impl CallbackMemoryMetrics {
fn new(on_update: Option<UpdateCallback>, on_reject: Option<RejectCallback>) -> Self {
fn new(
on_update: Option<UpdateCallback>,
on_exhausted: Option<UnitCallback>,
on_reject: Option<RejectCallback>,
) -> Self {
Self {
inner: Arc::new(CallbackMemoryMetricsInner {
on_update,
on_exhausted,
on_reject,
}),
}
@@ -634,9 +662,19 @@ impl CallbackMemoryMetrics {
self.inner.on_update.is_some()
}
fn has_on_reject(&self) -> bool {
fn has_on_exhausted(&self) -> bool {
self.inner.on_exhausted.is_some()
}
fn has_on_rejected(&self) -> bool {
self.inner.on_reject.is_some()
}
fn inc_rejected(&self) {
if let Some(callback) = &self.inner.on_reject {
callback();
}
}
}
impl MemoryMetrics for CallbackMemoryMetrics {
@@ -648,8 +686,8 @@ impl MemoryMetrics for CallbackMemoryMetrics {
}
}
fn inc_rejected(&self, _: &str) {
if let Some(callback) = &self.inner.on_reject {
fn inc_exhausted(&self, _: &str) {
if let Some(callback) = &self.inner.on_exhausted {
callback();
}
}
@@ -712,7 +750,10 @@ impl MemoryTrackedStream {
Poll::Ready((tracker, batch, additional, result)) => {
let output = match result {
Ok(()) => Ok(batch),
Err(error) => Err(tracker.wait_error(additional, error)),
Err(error) => {
tracker.inc_rejected();
Err(tracker.wait_error(additional, error))
}
};
self.waiting = None;
self.tracker = Some(tracker);
@@ -732,7 +773,10 @@ impl MemoryTrackedStream {
if let Err(error) = tracker.try_track(additional) {
match tracker.tracker.on_exhausted_policy {
OnExhaustedPolicy::Fail => return Poll::Ready(Some(Err(error))),
OnExhaustedPolicy::Fail => {
tracker.inc_rejected();
return Poll::Ready(Some(Err(error)));
}
// `Wait` is a deliberate tradeoff: the batch has already been materialized, so we
// keep it in memory while waiting for quota instead of failing immediately. Under
// contention, real memory usage can therefore exceed `scan_memory_limit` by up to
@@ -786,6 +830,7 @@ impl RecordBatchStream for MemoryTrackedStream {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use common_memory_manager::{OnExhaustedPolicy, PermitGranularity};
@@ -988,12 +1033,22 @@ mod tests {
#[tokio::test]
async fn test_memory_tracked_stream_waits_for_capacity() {
let exhausted = Arc::new(AtomicUsize::new(0));
let rejected = Arc::new(AtomicUsize::new(0));
let exhausted_counter = exhausted.clone();
let rejected_counter = rejected.clone();
let tracker = QueryMemoryTracker::builder(
MB,
OnExhaustedPolicy::Wait {
timeout: Duration::from_millis(200),
},
)
.on_exhausted(move || {
exhausted_counter.fetch_add(1, Ordering::Relaxed);
})
.on_reject(move || {
rejected_counter.fetch_add(1, Ordering::Relaxed);
})
.build();
let batch = large_string_batch(700 * 1024);
let expected_bytes = aligned_tracked_bytes(batch.buffer_memory_size());
@@ -1025,16 +1080,28 @@ mod tests {
drop(stream1);
let second = waiter.await.unwrap().unwrap();
assert_eq!(second.num_rows(), 1);
assert_eq!(exhausted.load(Ordering::Relaxed), 1);
assert_eq!(rejected.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn test_memory_tracked_stream_wait_times_out() {
let exhausted = Arc::new(AtomicUsize::new(0));
let rejected = Arc::new(AtomicUsize::new(0));
let exhausted_counter = exhausted.clone();
let rejected_counter = rejected.clone();
let tracker = QueryMemoryTracker::builder(
MB,
OnExhaustedPolicy::Wait {
timeout: Duration::from_millis(50),
},
)
.on_exhausted(move || {
exhausted_counter.fetch_add(1, Ordering::Relaxed);
})
.on_reject(move || {
rejected_counter.fetch_add(1, Ordering::Relaxed);
})
.build();
let batch = large_string_batch(700 * 1024);
@@ -1058,5 +1125,44 @@ mod tests {
.unwrap();
let error = result.unwrap().unwrap_err();
assert!(error.to_string().contains("timed out waiting"));
assert_eq!(exhausted.load(Ordering::Relaxed), 1);
assert_eq!(rejected.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn test_memory_tracked_stream_fail_policy_rejects_immediately() {
let exhausted = Arc::new(AtomicUsize::new(0));
let rejected = Arc::new(AtomicUsize::new(0));
let exhausted_counter = exhausted.clone();
let rejected_counter = rejected.clone();
let tracker = QueryMemoryTracker::builder(MB, OnExhaustedPolicy::Fail)
.on_exhausted(move || {
exhausted_counter.fetch_add(1, Ordering::Relaxed);
})
.on_reject(move || {
rejected_counter.fetch_add(1, Ordering::Relaxed);
})
.build();
let batch = large_string_batch(700 * 1024);
let mut stream1 = MemoryTrackedStream::new(
RecordBatches::try_new(batch.schema.clone(), vec![batch.clone()])
.unwrap()
.as_stream(),
tracker.clone(),
);
let first = stream1.next().await.unwrap().unwrap();
assert_eq!(first.num_rows(), 1);
let mut stream2 = MemoryTrackedStream::new(
RecordBatches::try_new(batch.schema.clone(), vec![batch])
.unwrap()
.as_stream(),
tracker,
);
let result = stream2.next().await.unwrap();
assert!(result.is_err());
assert_eq!(exhausted.load(Ordering::Relaxed), 1);
assert_eq!(rejected.load(Ordering::Relaxed), 1);
}
}

View File

@@ -31,7 +31,7 @@ impl MemoryMetrics for CompactionMemoryMetrics {
COMPACTION_MEMORY_IN_USE.set(bytes);
}
fn inc_rejected(&self, reason: &str) {
fn inc_exhausted(&self, reason: &str) {
COMPACTION_MEMORY_REJECTED
.with_label_values(&[reason])
.inc();

View File

@@ -138,7 +138,8 @@ use crate::gc::GcLimiterRef;
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableStats;
use crate::metrics::{
HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL,
HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_EXHAUSTED_TOTAL, SCAN_MEMORY_USAGE_BYTES,
SCAN_REQUESTS_REJECTED_TOTAL,
};
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::read::stream::ScanBatchStream;
@@ -231,6 +232,9 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
.on_update(|usage| {
SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
})
.on_exhausted(|| {
SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
})
.on_reject(|| {
SCAN_REQUESTS_REJECTED_TOTAL.inc();
})
@@ -1380,6 +1384,9 @@ impl MitoEngine {
.on_update(|usage| {
SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
})
.on_exhausted(|| {
SCAN_MEMORY_EXHAUSTED_TOTAL.inc();
})
.on_reject(|| {
SCAN_REQUESTS_REJECTED_TOTAL.inc();
})

View File

@@ -244,10 +244,15 @@ lazy_static! {
"greptime_mito_scan_memory_usage_bytes",
"current scan memory usage in bytes"
).unwrap();
/// Counter of rejected scan requests due to memory limit.
/// Counter of scan allocation attempts that could not acquire memory immediately.
pub static ref SCAN_MEMORY_EXHAUSTED_TOTAL: IntCounter = register_int_counter!(
"greptime_mito_scan_memory_exhausted_total",
"total number of times scan memory was unavailable for immediate acquisition"
).unwrap();
/// Counter of scan requests that ultimately failed due to memory pressure.
pub static ref SCAN_REQUESTS_REJECTED_TOTAL: IntCounter = register_int_counter!(
"greptime_mito_scan_requests_rejected_total",
"total number of scan requests rejected due to memory limit"
"total number of scan requests that ultimately failed due to memory limit"
).unwrap();
/// Gauge for active file range builders in the pruner.
pub static ref PRUNER_ACTIVE_BUILDERS: IntGauge = register_int_gauge!(

View File

@@ -361,7 +361,7 @@ lazy_static! {
"maximum bytes allowed for all concurrent request bodies and messages"
).unwrap();
/// Total number of rejected requests due to memory exhaustion.
/// Total number of requests rejected due to memory exhaustion.
pub static ref REQUEST_MEMORY_REJECTED: IntCounterVec = register_int_counter_vec!(
"greptime_servers_request_memory_rejected_total",
"number of requests rejected due to memory limit",

View File

@@ -34,7 +34,7 @@ impl MemoryMetrics for RequestMemoryMetrics {
REQUEST_MEMORY_IN_USE.set(bytes);
}
fn inc_rejected(&self, reason: &str) {
fn inc_exhausted(&self, reason: &str) {
REQUEST_MEMORY_REJECTED.with_label_values(&[reason]).inc();
}
}