From c0d0b99a32e97478c6dd156cc96a7bf8a74ad802 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 24 Nov 2025 14:18:23 +0800 Subject: [PATCH] feat: track query memory pool (#7219) Signed-off-by: jeremyhi --- src/query/src/query_engine/state.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index ffe64e7005..9328f5f736 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::fmt; +use std::num::NonZeroUsize; use std::sync::{Arc, RwLock}; use async_trait::async_trait; @@ -34,6 +35,7 @@ use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState}; use datafusion::execution::memory_pool::{ GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, + TrackConsumersPool, }; use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use datafusion::physical_optimizer::PhysicalOptimizerRule; @@ -437,19 +439,25 @@ impl DfQueryPlanner { } } -/// A wrapper around GreedyMemoryPool that records metrics. +/// A wrapper around TrackConsumersPool that records metrics. /// /// This wrapper intercepts all memory pool operations and updates /// Prometheus metrics for monitoring query memory usage and rejections. #[derive(Debug)] struct MetricsMemoryPool { - inner: Arc, + inner: Arc>, } impl MetricsMemoryPool { + // Number of top memory consumers to report in OOM error messages + const TOP_CONSUMERS_TO_REPORT: usize = 5; + fn new(limit: usize) -> Self { Self { - inner: Arc::new(GreedyMemoryPool::new(limit)), + inner: Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(limit), + NonZeroUsize::new(Self::TOP_CONSUMERS_TO_REPORT).unwrap(), + )), } }