diff --git a/Cargo.lock b/Cargo.lock
index 74440726d8..b3000970b3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2657,6 +2657,7 @@ dependencies = [
"common-base",
"common-error",
"common-macro",
+ "common-memory-manager",
"common-telemetry",
"common-time",
"criterion 0.7.0",
diff --git a/config/config.md b/config/config.md
index 2ac11dd6e6..4861675217 100644
--- a/config/config.md
+++ b/config/config.md
@@ -18,7 +18,7 @@
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted. Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail" |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup. By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
-| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. NOTE: This setting affects scan_memory_limit's privileged tier allocation. When set, 70% of queries get privileged memory access (full scan_memory_limit). The remaining 30% get standard tier access (70% of scan_memory_limit). |
+| `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
@@ -160,7 +160,8 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
-| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. NOTE: Works with max_concurrent_queries for tiered memory allocation. - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access. - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. |
+| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. |
+| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately. "fail" (default) fails fast and is the recommended option for most users. "wait" / "wait()" waits for memory to become available. This is mainly for advanced tuning in bursty workloads where temporary contention is common and higher latency is acceptable. "wait" means "wait(10s)", not unlimited waiting. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions. To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -440,7 +441,7 @@
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases. It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup. By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
-| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. NOTE: This setting affects scan_memory_limit's privileged tier allocation. When set, 70% of queries get privileged memory access (full scan_memory_limit). The remaining 30% get standard tier access (70% of scan_memory_limit). |
+| `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
@@ -552,7 +553,8 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
-| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. NOTE: Works with max_concurrent_queries for tiered memory allocation. - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access. - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. |
+| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. |
+| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately. "fail" (default) fails fast and is the recommended option for most users. "wait" / "wait()" waits for memory to become available. This is mainly for advanced tuning in bursty workloads where temporary contention is common and higher latency is acceptable. "wait" means "wait(10s)", not unlimited waiting. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions. To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 2631a089e1..833a567d74 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -17,10 +17,7 @@ init_regions_in_background = false
## Parallelism of initializing regions.
init_regions_parallelism = 16
-## The maximum current queries allowed to be executed. Zero means unlimited.
-## NOTE: This setting affects scan_memory_limit's privileged tier allocation.
-## When set, 70% of queries get privileged memory access (full scan_memory_limit).
-## The remaining 30% get standard tier access (70% of scan_memory_limit).
+## The maximum concurrent queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
@@ -535,10 +532,14 @@ allow_stale_entries = false
## Memory limit for table scans across all queries.
## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
## Setting it to 0 disables the limit.
-## NOTE: Works with max_concurrent_queries for tiered memory allocation.
-## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
-## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access.
scan_memory_limit = "50%"
+## Controls what happens when a scan cannot get memory immediately.
+## "fail" (default) fails fast and is the recommended option for most users.
+## "wait" / "wait()" waits for memory to become available. This is mainly
+## for advanced tuning in bursty workloads where temporary contention is common and
+## higher latency is acceptable.
+## "wait" means "wait(10s)", not unlimited waiting.
+scan_memory_on_exhausted = "fail"
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index ef96406316..94c5feebf1 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -23,10 +23,7 @@ init_regions_in_background = false
## Parallelism of initializing regions.
init_regions_parallelism = 16
-## The maximum current queries allowed to be executed. Zero means unlimited.
-## NOTE: This setting affects scan_memory_limit's privileged tier allocation.
-## When set, 70% of queries get privileged memory access (full scan_memory_limit).
-## The remaining 30% get standard tier access (70% of scan_memory_limit).
+## The maximum concurrent queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
@@ -627,10 +624,14 @@ allow_stale_entries = false
## Memory limit for table scans across all queries.
## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
## Setting it to 0 disables the limit.
-## NOTE: Works with max_concurrent_queries for tiered memory allocation.
-## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
-## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access.
scan_memory_limit = "50%"
+## Controls what happens when a scan cannot get memory immediately.
+## "fail" (default) fails fast and is the recommended option for most users.
+## "wait" / "wait()" waits for memory to become available. This is mainly
+## for advanced tuning in bursty workloads where temporary contention is common and
+## higher latency is acceptable.
+## "wait" means "wait(10s)", not unlimited waiting.
+scan_memory_on_exhausted = "fail"
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml
index 5887dc31c5..efc6b6f60e 100644
--- a/src/common/recordbatch/Cargo.toml
+++ b/src/common/recordbatch/Cargo.toml
@@ -12,6 +12,7 @@ arc-swap = "1.6"
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
+common-memory-manager.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs
index 85e0d5c496..0a2d697407 100644
--- a/src/common/recordbatch/src/lib.rs
+++ b/src/common/recordbatch/src/lib.rs
@@ -22,13 +22,17 @@ pub mod recordbatch;
pub mod util;
use std::fmt;
+use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
use common_base::readable_size::ReadableSize;
+use common_error::ext::BoxedError;
+use common_memory_manager::{
+ MemoryGuard, MemoryManager, MemoryMetrics, OnExhaustedPolicy, PermitGranularity,
+};
use common_telemetry::tracing::Span;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::array::{ArrayRef, AsArray, StringBuilder};
@@ -42,7 +46,7 @@ use error::Result;
use futures::task::{Context, Poll};
use futures::{Stream, TryStreamExt};
pub use recordbatch::RecordBatch;
-use snafu::{ResultExt, ensure};
+use snafu::{IntoError, ResultExt, ensure};
use crate::error::NewDfRecordBatchSnafu;
@@ -416,205 +420,93 @@ impl> + Unpin> Stream for RecordBatchStream
}
}
-/// Memory permit for a stream, providing privileged access or rate limiting.
-///
-/// The permit tracks whether this stream has privileged Top-K status.
-/// When dropped, it automatically releases any privileged slot it holds.
-pub struct MemoryPermit {
- tracker: QueryMemoryTracker,
- is_privileged: AtomicBool,
-}
-
-impl MemoryPermit {
- /// Check if this permit currently has privileged status.
- pub fn is_privileged(&self) -> bool {
- self.is_privileged.load(Ordering::Acquire)
- }
-
- /// Ensure this permit has privileged status by acquiring a slot if available.
- /// Returns true if privileged (either already privileged or just acquired privilege).
- fn ensure_privileged(&self) -> bool {
- if self.is_privileged.load(Ordering::Acquire) {
- return true;
- }
-
- // Try to claim a privileged slot
- self.tracker
- .privileged_count
- .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
- if count < self.tracker.privileged_slots {
- Some(count + 1)
- } else {
- None
- }
- })
- .map(|_| {
- self.is_privileged.store(true, Ordering::Release);
- true
- })
- .unwrap_or(false)
- }
-
- /// Track additional memory usage with this permit.
- /// Returns error if limit is exceeded.
- ///
- /// # Arguments
- /// * `additional` - Additional memory size to track in bytes
- /// * `stream_tracked` - Total memory already tracked by this stream
- ///
- /// # Behavior
- /// - Privileged streams: Can push global memory usage up to full limit
- /// - Standard-tier streams: Can push global memory usage up to limit * standard_tier_memory_fraction (default: 0.7)
- /// - Standard-tier streams automatically attempt to acquire privilege if slots become available
- /// - The configured limit is absolute hard limit - no stream can exceed it
- pub fn track(&self, additional: usize, stream_tracked: usize) -> Result<()> {
- // Ensure privileged status if possible
- let is_privileged = self.ensure_privileged();
-
- self.tracker
- .track_internal(additional, is_privileged, stream_tracked)
- }
-
- /// Release tracked memory.
- ///
- /// # Arguments
- /// * `amount` - Amount of memory to release in bytes
- pub fn release(&self, amount: usize) {
- self.tracker.release(amount);
- }
-}
-
-impl Drop for MemoryPermit {
- fn drop(&mut self) {
- // Release privileged slot if we had one
- if self.is_privileged.load(Ordering::Acquire) {
- self.tracker
- .privileged_count
- .fetch_sub(1, Ordering::Release);
- }
- }
-}
-
/// Memory tracker for RecordBatch streams. Clone to share the same limit across queries.
///
-/// Implements a two-tier memory allocation strategy:
-/// - **Privileged tier**: First N streams (default: 20) can use up to the full memory limit
-/// - **Standard tier**: Remaining streams are restricted to a fraction of the limit (default: 70%)
-/// - Privilege is granted on a first-come-first-served basis
-/// - The configured limit is an absolute hard cap - no stream can exceed it
+/// Each stream acquires quota independently from this tracker.
#[derive(Clone)]
pub struct QueryMemoryTracker {
- current: Arc,
- limit: usize,
- standard_tier_memory_fraction: f64,
- privileged_count: Arc,
- privileged_slots: usize,
- on_update: Option>,
- on_reject: Option>,
+ manager: MemoryManager,
+ metrics: CallbackMemoryMetrics,
+ on_exhausted_policy: OnExhaustedPolicy,
}
impl fmt::Debug for QueryMemoryTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueryMemoryTracker")
- .field("current", &self.current.load(Ordering::Acquire))
- .field("limit", &self.limit)
- .field(
- "standard_tier_memory_fraction",
- &self.standard_tier_memory_fraction,
- )
- .field(
- "privileged_count",
- &self.privileged_count.load(Ordering::Acquire),
- )
- .field("privileged_slots", &self.privileged_slots)
- .field("on_update", &self.on_update.is_some())
- .field("on_reject", &self.on_reject.is_some())
+ .field("current", &self.current())
+ .field("limit", &self.limit())
+ .field("on_exhausted_policy", &self.on_exhausted_policy)
+ .field("on_update", &self.metrics.has_on_update())
+ .field("on_reject", &self.metrics.has_on_reject())
.finish()
}
}
impl QueryMemoryTracker {
- // Default privileged slots when max_concurrent_queries is 0.
- const DEFAULT_PRIVILEGED_SLOTS: usize = 20;
- // Ratio for privileged tier: 70% queries get privileged access, standard tier uses 70% memory.
- const DEFAULT_PRIVILEGED_TIER_RATIO: f64 = 0.7;
-
- /// Create a new memory tracker with the given limit and max_concurrent_queries.
- /// Calculates privileged slots as 70% of max_concurrent_queries (or 20 if max_concurrent_queries is 0).
- ///
- /// # Arguments
- /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
- /// * `max_concurrent_queries` - Maximum number of concurrent queries (0 = unlimited).
- pub fn new(limit: usize, max_concurrent_queries: usize) -> Self {
- let privileged_slots = Self::calculate_privileged_slots(max_concurrent_queries);
- Self::with_privileged_slots(limit, privileged_slots)
- }
-
- /// Create a new memory tracker with custom privileged slots limit.
- pub fn with_privileged_slots(limit: usize, privileged_slots: usize) -> Self {
- Self::with_config(limit, privileged_slots, Self::DEFAULT_PRIVILEGED_TIER_RATIO)
- }
-
- /// Create a new memory tracker with full configuration.
- ///
- /// # Arguments
- /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
- /// * `privileged_slots` - Maximum number of streams that can get privileged status.
- /// * `standard_tier_memory_fraction` - Memory fraction for standard-tier streams (range: [0.0, 1.0]).
- ///
- /// # Panics
- /// Panics if `standard_tier_memory_fraction` is not in the range [0.0, 1.0].
- pub fn with_config(
+ /// Create a builder for a query memory tracker.
+ pub fn builder(
limit: usize,
- privileged_slots: usize,
- standard_tier_memory_fraction: f64,
- ) -> Self {
- assert!(
- (0.0..=1.0).contains(&standard_tier_memory_fraction),
- "standard_tier_memory_fraction must be in [0.0, 1.0], got {}",
- standard_tier_memory_fraction
- );
-
- Self {
- current: Arc::new(AtomicUsize::new(0)),
+ on_exhausted_policy: OnExhaustedPolicy,
+ ) -> QueryMemoryTrackerBuilder {
+ QueryMemoryTrackerBuilder {
limit,
- standard_tier_memory_fraction,
- privileged_count: Arc::new(AtomicUsize::new(0)),
- privileged_slots,
+ on_exhausted_policy,
on_update: None,
on_reject: None,
}
}
- /// Register a new permit for memory tracking.
- /// The first `privileged_slots` permits get privileged status automatically.
- /// The returned permit can be shared across multiple streams of the same query.
- pub fn register_permit(&self) -> MemoryPermit {
- // Try to claim a privileged slot
- let is_privileged = self
- .privileged_count
- .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
- if count < self.privileged_slots {
- Some(count + 1)
- } else {
- None
- }
- })
- .is_ok();
-
- MemoryPermit {
+ fn new_stream_tracker(&self) -> StreamMemoryTracker {
+ StreamMemoryTracker {
tracker: self.clone(),
- is_privileged: AtomicBool::new(is_privileged),
+ guard: self.manager.try_acquire(0).unwrap(),
+ tracked_bytes: 0,
}
}
+ /// Get the current memory usage in bytes.
+ pub fn current(&self) -> usize {
+ self.manager.used_bytes() as usize
+ }
+ fn limit(&self) -> usize {
+ self.manager.limit_bytes() as usize
+ }
+
+ fn reject_error(
+ &self,
+ current: usize,
+ additional: usize,
+ stream_tracked: usize,
+ ) -> error::Error {
+ let limit = self.limit();
+ let msg = format!(
+ "{} requested, {} used globally ({}%), {} used by this stream, hard limit: {}",
+ ReadableSize(additional as u64),
+ ReadableSize(current as u64),
+ if limit > 0 { current * 100 / limit } else { 0 },
+ ReadableSize(stream_tracked as u64),
+ ReadableSize(limit as u64)
+ );
+ error::ExceedMemoryLimitSnafu { msg }.build()
+ }
+}
+
+/// Builder for constructing a [`QueryMemoryTracker`] with optional callbacks.
+pub struct QueryMemoryTrackerBuilder {
+ limit: usize,
+ on_exhausted_policy: OnExhaustedPolicy,
+ on_update: Option,
+ on_reject: Option,
+}
+
+impl QueryMemoryTrackerBuilder {
/// Set a callback to be called whenever the usage changes successfully.
/// The callback receives the new total usage in bytes.
///
/// # Note
- /// The callback is called after both successful `track()` and `release()` operations.
- /// It is called even when `limit == 0` (unlimited mode) to track actual usage.
- pub fn with_on_update(mut self, on_update: F) -> Self
+ /// The callback is called after both successful `track()` and stream drop.
+ /// Usage is exact in unlimited mode and 1KB-aligned in limited mode.
+ pub fn on_update(mut self, on_update: F) -> Self
where
F: Fn(usize) + Send + Sync + 'static,
{
@@ -627,7 +519,7 @@ impl QueryMemoryTracker {
/// # Note
/// This is only called when `track()` fails due to exceeding the limit.
/// It is never called when `limit == 0` (unlimited mode).
- pub fn with_on_reject(mut self, on_reject: F) -> Self
+ pub fn on_reject(mut self, on_reject: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
@@ -635,105 +527,130 @@ impl QueryMemoryTracker {
self
}
- /// Get the current memory usage in bytes.
- pub fn current(&self) -> usize {
- self.current.load(Ordering::Acquire)
- }
+ /// Build a [`QueryMemoryTracker`] from this builder.
+ pub fn build(self) -> QueryMemoryTracker {
+ let metrics = CallbackMemoryMetrics::new(self.on_update, self.on_reject);
+ let manager = MemoryManager::with_granularity(
+ self.limit as u64,
+ PermitGranularity::Kilobyte,
+ metrics.clone(),
+ );
- fn calculate_privileged_slots(max_concurrent_queries: usize) -> usize {
- if max_concurrent_queries == 0 {
- Self::DEFAULT_PRIVILEGED_SLOTS
+ QueryMemoryTracker {
+ manager,
+ metrics,
+ on_exhausted_policy: self.on_exhausted_policy,
+ }
+ }
+}
+
+struct StreamMemoryTracker {
+ tracker: QueryMemoryTracker,
+ guard: MemoryGuard,
+ tracked_bytes: usize,
+}
+
+type MemoryAcquireResult = std::result::Result<(), common_memory_manager::Error>;
+
+impl StreamMemoryTracker {
+ fn try_track(&mut self, additional: usize) -> Result<()> {
+ if self.guard.try_acquire_additional(additional as u64) {
+ self.tracked_bytes = self.tracked_bytes.saturating_add(additional);
+ Ok(())
} else {
- ((max_concurrent_queries as f64 * Self::DEFAULT_PRIVILEGED_TIER_RATIO) as usize).max(1)
+ Err(self.reject_error(additional))
}
}
- /// Internal method to track additional memory usage.
- ///
- /// Called by `MemoryPermit::track()`. Use `MemoryPermit::track()` instead of calling this directly.
- fn track_internal(
- &self,
- additional: usize,
- is_privileged: bool,
- stream_tracked: usize,
- ) -> Result<()> {
- // Calculate effective global limit based on stream privilege
- // Privileged streams: can push global usage up to full limit
- // Standard-tier streams: can only push global usage up to fraction of limit
- let effective_limit = if is_privileged {
- self.limit
- } else {
- (self.limit as f64 * self.standard_tier_memory_fraction) as usize
- };
-
- let mut new_total = 0;
+ async fn track_with_policy(mut self, additional: usize) -> (Self, MemoryAcquireResult) {
let result = self
- .current
- .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
- new_total = current.saturating_add(additional);
+ .guard
+ .acquire_additional_with_policy(additional as u64, self.tracker.on_exhausted_policy)
+ .await;
+ if result.is_ok() {
+ self.tracked_bytes = self.tracked_bytes.saturating_add(additional);
+ }
+ (self, result)
+ }
- if self.limit == 0 {
- // Unlimited mode
- return Some(new_total);
- }
+ fn reject_error(&self, additional: usize) -> error::Error {
+ let current = self.tracker.current();
+ self.tracker
+ .reject_error(current, additional, self.tracked_bytes)
+ }
- // Check if new global total exceeds effective limit
- // The configured limit is absolute hard limit - no stream can exceed it
- if new_total <= effective_limit {
- Some(new_total)
- } else {
- None
- }
- });
-
- match result {
- Ok(_) => {
- if let Some(callback) = &self.on_update {
- callback(new_total);
- }
- Ok(())
+ fn wait_error(&self, additional: usize, source: common_memory_manager::Error) -> error::Error {
+ match source {
+ common_memory_manager::Error::MemoryLimitExceeded { .. } => {
+ self.reject_error(additional)
}
- Err(current) => {
- if let Some(callback) = &self.on_reject {
- callback();
- }
+ common_memory_manager::Error::MemoryAcquireTimeout { waited, .. } => {
+ let current = self.tracker.current();
+ let limit = self.tracker.limit();
let msg = format!(
- "{} requested, {} used globally ({}%), {} used by this stream (privileged: {}), effective limit: {} ({}%), hard limit: {}",
+ "timed out waiting {:?} for {}, {} used globally ({}%), {} used by this stream, hard limit: {}",
+ waited,
ReadableSize(additional as u64),
ReadableSize(current as u64),
- if self.limit > 0 {
- current * 100 / self.limit
- } else {
- 0
- },
- ReadableSize(stream_tracked as u64),
- is_privileged,
- ReadableSize(effective_limit as u64),
- if self.limit > 0 {
- effective_limit * 100 / self.limit
- } else {
- 0
- },
- ReadableSize(self.limit as u64)
+ if limit > 0 { current * 100 / limit } else { 0 },
+ ReadableSize(self.tracked_bytes as u64),
+ ReadableSize(limit as u64)
);
- error::ExceedMemoryLimitSnafu { msg }.fail()
+ error::ExceedMemoryLimitSnafu { msg }.build()
}
+ error => error::ExternalSnafu.into_error(BoxedError::new(error)),
+ }
+ }
+}
+
+type PendingTrackFuture = Pin<
+ Box + Send>,
+>;
+
+#[derive(Clone)]
+struct CallbackMemoryMetrics {
+ inner: Arc,
+}
+
+type UpdateCallback = Arc;
+type RejectCallback = Arc;
+
+struct CallbackMemoryMetricsInner {
+ on_update: Option,
+ on_reject: Option,
+}
+
+impl CallbackMemoryMetrics {
+ fn new(on_update: Option, on_reject: Option) -> Self {
+ Self {
+ inner: Arc::new(CallbackMemoryMetricsInner {
+ on_update,
+ on_reject,
+ }),
}
}
- /// Release tracked memory.
- ///
- /// # Arguments
- /// * `amount` - Amount of memory to release in bytes
- pub fn release(&self, amount: usize) {
- if let Ok(old_value) =
- self.current
- .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
- Some(current.saturating_sub(amount))
- })
- && let Some(callback) = &self.on_update
- {
- callback(old_value.saturating_sub(amount));
+ fn has_on_update(&self) -> bool {
+ self.inner.on_update.is_some()
+ }
+
+ fn has_on_reject(&self) -> bool {
+ self.inner.on_reject.is_some()
+ }
+}
+
+impl MemoryMetrics for CallbackMemoryMetrics {
+ fn set_limit(&self, _: i64) {}
+
+ fn set_in_use(&self, bytes: i64) {
+ if let Some(callback) = &self.inner.on_update {
+ callback(bytes.max(0) as usize);
+ }
+ }
+
+ fn inc_rejected(&self, _: &str) {
+ if let Some(callback) = &self.inner.on_reject {
+ callback();
}
}
}
@@ -741,38 +658,107 @@ impl QueryMemoryTracker {
/// A wrapper stream that tracks memory usage of RecordBatches.
pub struct MemoryTrackedStream {
inner: SendableRecordBatchStream,
- permit: Arc,
- // Total tracked size, released when stream drops.
- total_tracked: usize,
+ tracker: Option,
+ // Waiting stores a batch that has already been pulled from the inner stream but has not yet
+ // acquired additional quota. This keeps `poll_next()` non-blocking and allows bounded waits,
+ // at the cost of temporarily holding one untracked batch per blocked stream in memory.
+ waiting: Option,
}
impl MemoryTrackedStream {
- pub fn new(inner: SendableRecordBatchStream, permit: Arc) -> Self {
+ pub fn new(inner: SendableRecordBatchStream, tracker: QueryMemoryTracker) -> Self {
Self {
inner,
- permit,
- total_tracked: 0,
+ tracker: Some(tracker.new_stream_tracker()),
+ waiting: None,
}
}
+
+ fn ready_tracker_mut(&mut self) -> &mut StreamMemoryTracker {
+ debug_assert!(
+ self.waiting.is_none(),
+ "a ready tracker must not coexist with a waiting future"
+ );
+ self.tracker.as_mut().unwrap()
+ }
+
+ fn enter_waiting(&mut self, batch: RecordBatch, additional: usize) {
+ debug_assert!(
+ self.waiting.is_none(),
+ "enter_waiting should only be called from the ready state"
+ );
+ debug_assert!(
+ self.tracker.is_some(),
+ "enter_waiting requires a tracker in the ready state"
+ );
+ let tracker = self.tracker.take().unwrap();
+ self.waiting = Some(Self::start_waiting(tracker, batch, additional));
+ }
+
+ fn start_waiting(
+ tracker: StreamMemoryTracker,
+ batch: RecordBatch,
+ additional: usize,
+ ) -> PendingTrackFuture {
+ Box::pin(async move {
+ let (tracker, result) = tracker.track_with_policy(additional).await;
+ (tracker, batch, additional, result)
+ })
+ }
+
+ fn poll_waiting(&mut self, cx: &mut Context<'_>) -> Poll