From 8058ce7cf22215f6bcdc17b7bbd1d858618170d1 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 25 Mar 2026 20:25:50 -0700 Subject: [PATCH] refactor: simplify scan memory tracking (#7827) * refactor: simplify scan memory tracking Signed-off-by: jeremyhi * chore: make confg-docs Signed-off-by: jeremyhi * chore: by codex review comment Signed-off-by: jeremyhi * feat: track_with_policy Signed-off-by: jeremyhi * chore: minor change Signed-off-by: jeremyhi * chore: mem granularity mb to kb Signed-off-by: jeremyhi * chore: by review comment Signed-off-by: jeremyhi * chore: by scan_memory_on_exhausted comment Signed-off-by: jeremyhi * fix: by review comment Signed-off-by: jeremyhi * chore: typo Signed-off-by: jeremyhi --------- Signed-off-by: jeremyhi --- Cargo.lock | 1 + config/config.md | 10 +- config/datanode.example.toml | 15 +- config/standalone.example.toml | 15 +- src/common/recordbatch/Cargo.toml | 1 + src/common/recordbatch/src/lib.rs | 807 +++++++++++++++-------------- src/datanode/src/datanode.rs | 3 - src/mito2/src/config.rs | 6 + src/mito2/src/engine.rs | 35 +- src/query/src/dummy_catalog.rs | 4 +- src/store-api/src/region_engine.rs | 6 +- src/table/src/table/scan.rs | 14 +- tests-integration/tests/http.rs | 1 + 13 files changed, 469 insertions(+), 449 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74440726d8..b3000970b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2657,6 +2657,7 @@ dependencies = [ "common-base", "common-error", "common-macro", + "common-memory-manager", "common-telemetry", "common-time", "criterion 0.7.0", diff --git a/config/config.md b/config/config.md index 2ac11dd6e6..4861675217 100644 --- a/config/config.md +++ b/config/config.md @@ -18,7 +18,7 @@ | `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.
Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail" | | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | -| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited.
NOTE: This setting affects scan_memory_limit's privileged tier allocation.
When set, 70% of queries get privileged memory access (full scan_memory_limit).
The remaining 30% get standard tier access (70% of scan_memory_limit). | +| `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. | | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | @@ -160,7 +160,8 @@ | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | -| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.
Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
Setting it to 0 disables the limit.
NOTE: Works with max_concurrent_queries for tiered memory allocation.
- If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
- If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. | +| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.
Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
Setting it to 0 disables the limit. | +| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately.
"fail" (default) fails fast and is the recommended option for most users.
"wait" / "wait()" waits for memory to become available. This is mainly
for advanced tuning in bursty workloads where temporary contention is common and
higher latency is acceptable.
"wait" means "wait(10s)", not unlimited waiting. | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | @@ -440,7 +441,7 @@ | `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.
It will block the datanode start if it can't receive leases in the heartbeat from metasrv. | | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | -| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited.
NOTE: This setting affects scan_memory_limit's privileged tier allocation.
When set, 70% of queries get privileged memory access (full scan_memory_limit).
The remaining 30% get standard tier access (70% of scan_memory_limit). | +| `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | @@ -552,7 +553,8 @@ | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | -| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.
Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
Setting it to 0 disables the limit.
NOTE: Works with max_concurrent_queries for tiered memory allocation.
- If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
- If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. | +| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.
Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
Setting it to 0 disables the limit. | +| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately.
"fail" (default) fails fast and is the recommended option for most users.
"wait" / "wait()" waits for memory to become available. This is mainly
for advanced tuning in bursty workloads where temporary contention is common and
higher latency is acceptable.
"wait" means "wait(10s)", not unlimited waiting. | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 2631a089e1..833a567d74 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -17,10 +17,7 @@ init_regions_in_background = false ## Parallelism of initializing regions. init_regions_parallelism = 16 -## The maximum current queries allowed to be executed. Zero means unlimited. -## NOTE: This setting affects scan_memory_limit's privileged tier allocation. -## When set, 70% of queries get privileged memory access (full scan_memory_limit). -## The remaining 30% get standard tier access (70% of scan_memory_limit). +## The maximum concurrent queries allowed to be executed. Zero means unlimited. max_concurrent_queries = 0 ## Enable telemetry to collect anonymous usage data. Enabled by default. @@ -535,10 +532,14 @@ allow_stale_entries = false ## Memory limit for table scans across all queries. ## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). ## Setting it to 0 disables the limit. -## NOTE: Works with max_concurrent_queries for tiered memory allocation. -## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access. -## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. scan_memory_limit = "50%" +## Controls what happens when a scan cannot get memory immediately. +## "fail" (default) fails fast and is the recommended option for most users. +## "wait" / "wait()" waits for memory to become available. This is mainly +## for advanced tuning in bursty workloads where temporary contention is common and +## higher latency is acceptable. +## "wait" means "wait(10s)", not unlimited waiting. +scan_memory_on_exhausted = "fail" ## Minimum time interval between two compactions. ## To align with the old behavior, the default value is 0 (no restrictions). diff --git a/config/standalone.example.toml b/config/standalone.example.toml index ef96406316..94c5feebf1 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -23,10 +23,7 @@ init_regions_in_background = false ## Parallelism of initializing regions. init_regions_parallelism = 16 -## The maximum current queries allowed to be executed. Zero means unlimited. -## NOTE: This setting affects scan_memory_limit's privileged tier allocation. -## When set, 70% of queries get privileged memory access (full scan_memory_limit). -## The remaining 30% get standard tier access (70% of scan_memory_limit). +## The maximum concurrent queries allowed to be executed. Zero means unlimited. max_concurrent_queries = 0 ## Enable telemetry to collect anonymous usage data. Enabled by default. @@ -627,10 +624,14 @@ allow_stale_entries = false ## Memory limit for table scans across all queries. ## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). ## Setting it to 0 disables the limit. -## NOTE: Works with max_concurrent_queries for tiered memory allocation. -## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access. -## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. scan_memory_limit = "50%" +## Controls what happens when a scan cannot get memory immediately. +## "fail" (default) fails fast and is the recommended option for most users. +## "wait" / "wait()" waits for memory to become available. This is mainly +## for advanced tuning in bursty workloads where temporary contention is common and +## higher latency is acceptable. +## "wait" means "wait(10s)", not unlimited waiting. +scan_memory_on_exhausted = "fail" ## Minimum time interval between two compactions. ## To align with the old behavior, the default value is 0 (no restrictions). diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index 5887dc31c5..efc6b6f60e 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -12,6 +12,7 @@ arc-swap = "1.6" common-base.workspace = true common-error.workspace = true common-macro.workspace = true +common-memory-manager.workspace = true common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 85e0d5c496..0a2d697407 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -22,13 +22,17 @@ pub mod recordbatch; pub mod util; use std::fmt; +use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use adapter::RecordBatchMetrics; use arc_swap::ArcSwapOption; use common_base::readable_size::ReadableSize; +use common_error::ext::BoxedError; +use common_memory_manager::{ + MemoryGuard, MemoryManager, MemoryMetrics, OnExhaustedPolicy, PermitGranularity, +}; use common_telemetry::tracing::Span; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::arrow::array::{ArrayRef, AsArray, StringBuilder}; @@ -42,7 +46,7 @@ use error::Result; use futures::task::{Context, Poll}; use futures::{Stream, TryStreamExt}; pub use recordbatch::RecordBatch; -use snafu::{ResultExt, ensure}; +use snafu::{IntoError, ResultExt, ensure}; use crate::error::NewDfRecordBatchSnafu; @@ -416,205 +420,93 @@ impl> + Unpin> Stream for RecordBatchStream } } -/// Memory permit for a stream, providing privileged access or rate limiting. -/// -/// The permit tracks whether this stream has privileged Top-K status. -/// When dropped, it automatically releases any privileged slot it holds. -pub struct MemoryPermit { - tracker: QueryMemoryTracker, - is_privileged: AtomicBool, -} - -impl MemoryPermit { - /// Check if this permit currently has privileged status. - pub fn is_privileged(&self) -> bool { - self.is_privileged.load(Ordering::Acquire) - } - - /// Ensure this permit has privileged status by acquiring a slot if available. - /// Returns true if privileged (either already privileged or just acquired privilege). - fn ensure_privileged(&self) -> bool { - if self.is_privileged.load(Ordering::Acquire) { - return true; - } - - // Try to claim a privileged slot - self.tracker - .privileged_count - .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| { - if count < self.tracker.privileged_slots { - Some(count + 1) - } else { - None - } - }) - .map(|_| { - self.is_privileged.store(true, Ordering::Release); - true - }) - .unwrap_or(false) - } - - /// Track additional memory usage with this permit. - /// Returns error if limit is exceeded. - /// - /// # Arguments - /// * `additional` - Additional memory size to track in bytes - /// * `stream_tracked` - Total memory already tracked by this stream - /// - /// # Behavior - /// - Privileged streams: Can push global memory usage up to full limit - /// - Standard-tier streams: Can push global memory usage up to limit * standard_tier_memory_fraction (default: 0.7) - /// - Standard-tier streams automatically attempt to acquire privilege if slots become available - /// - The configured limit is absolute hard limit - no stream can exceed it - pub fn track(&self, additional: usize, stream_tracked: usize) -> Result<()> { - // Ensure privileged status if possible - let is_privileged = self.ensure_privileged(); - - self.tracker - .track_internal(additional, is_privileged, stream_tracked) - } - - /// Release tracked memory. - /// - /// # Arguments - /// * `amount` - Amount of memory to release in bytes - pub fn release(&self, amount: usize) { - self.tracker.release(amount); - } -} - -impl Drop for MemoryPermit { - fn drop(&mut self) { - // Release privileged slot if we had one - if self.is_privileged.load(Ordering::Acquire) { - self.tracker - .privileged_count - .fetch_sub(1, Ordering::Release); - } - } -} - /// Memory tracker for RecordBatch streams. Clone to share the same limit across queries. /// -/// Implements a two-tier memory allocation strategy: -/// - **Privileged tier**: First N streams (default: 20) can use up to the full memory limit -/// - **Standard tier**: Remaining streams are restricted to a fraction of the limit (default: 70%) -/// - Privilege is granted on a first-come-first-served basis -/// - The configured limit is an absolute hard cap - no stream can exceed it +/// Each stream acquires quota independently from this tracker. #[derive(Clone)] pub struct QueryMemoryTracker { - current: Arc, - limit: usize, - standard_tier_memory_fraction: f64, - privileged_count: Arc, - privileged_slots: usize, - on_update: Option>, - on_reject: Option>, + manager: MemoryManager, + metrics: CallbackMemoryMetrics, + on_exhausted_policy: OnExhaustedPolicy, } impl fmt::Debug for QueryMemoryTracker { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("QueryMemoryTracker") - .field("current", &self.current.load(Ordering::Acquire)) - .field("limit", &self.limit) - .field( - "standard_tier_memory_fraction", - &self.standard_tier_memory_fraction, - ) - .field( - "privileged_count", - &self.privileged_count.load(Ordering::Acquire), - ) - .field("privileged_slots", &self.privileged_slots) - .field("on_update", &self.on_update.is_some()) - .field("on_reject", &self.on_reject.is_some()) + .field("current", &self.current()) + .field("limit", &self.limit()) + .field("on_exhausted_policy", &self.on_exhausted_policy) + .field("on_update", &self.metrics.has_on_update()) + .field("on_reject", &self.metrics.has_on_reject()) .finish() } } impl QueryMemoryTracker { - // Default privileged slots when max_concurrent_queries is 0. - const DEFAULT_PRIVILEGED_SLOTS: usize = 20; - // Ratio for privileged tier: 70% queries get privileged access, standard tier uses 70% memory. - const DEFAULT_PRIVILEGED_TIER_RATIO: f64 = 0.7; - - /// Create a new memory tracker with the given limit and max_concurrent_queries. - /// Calculates privileged slots as 70% of max_concurrent_queries (or 20 if max_concurrent_queries is 0). - /// - /// # Arguments - /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited. - /// * `max_concurrent_queries` - Maximum number of concurrent queries (0 = unlimited). - pub fn new(limit: usize, max_concurrent_queries: usize) -> Self { - let privileged_slots = Self::calculate_privileged_slots(max_concurrent_queries); - Self::with_privileged_slots(limit, privileged_slots) - } - - /// Create a new memory tracker with custom privileged slots limit. - pub fn with_privileged_slots(limit: usize, privileged_slots: usize) -> Self { - Self::with_config(limit, privileged_slots, Self::DEFAULT_PRIVILEGED_TIER_RATIO) - } - - /// Create a new memory tracker with full configuration. - /// - /// # Arguments - /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited. - /// * `privileged_slots` - Maximum number of streams that can get privileged status. - /// * `standard_tier_memory_fraction` - Memory fraction for standard-tier streams (range: [0.0, 1.0]). - /// - /// # Panics - /// Panics if `standard_tier_memory_fraction` is not in the range [0.0, 1.0]. - pub fn with_config( + /// Create a builder for a query memory tracker. + pub fn builder( limit: usize, - privileged_slots: usize, - standard_tier_memory_fraction: f64, - ) -> Self { - assert!( - (0.0..=1.0).contains(&standard_tier_memory_fraction), - "standard_tier_memory_fraction must be in [0.0, 1.0], got {}", - standard_tier_memory_fraction - ); - - Self { - current: Arc::new(AtomicUsize::new(0)), + on_exhausted_policy: OnExhaustedPolicy, + ) -> QueryMemoryTrackerBuilder { + QueryMemoryTrackerBuilder { limit, - standard_tier_memory_fraction, - privileged_count: Arc::new(AtomicUsize::new(0)), - privileged_slots, + on_exhausted_policy, on_update: None, on_reject: None, } } - /// Register a new permit for memory tracking. - /// The first `privileged_slots` permits get privileged status automatically. - /// The returned permit can be shared across multiple streams of the same query. - pub fn register_permit(&self) -> MemoryPermit { - // Try to claim a privileged slot - let is_privileged = self - .privileged_count - .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| { - if count < self.privileged_slots { - Some(count + 1) - } else { - None - } - }) - .is_ok(); - - MemoryPermit { + fn new_stream_tracker(&self) -> StreamMemoryTracker { + StreamMemoryTracker { tracker: self.clone(), - is_privileged: AtomicBool::new(is_privileged), + guard: self.manager.try_acquire(0).unwrap(), + tracked_bytes: 0, } } + /// Get the current memory usage in bytes. + pub fn current(&self) -> usize { + self.manager.used_bytes() as usize + } + fn limit(&self) -> usize { + self.manager.limit_bytes() as usize + } + + fn reject_error( + &self, + current: usize, + additional: usize, + stream_tracked: usize, + ) -> error::Error { + let limit = self.limit(); + let msg = format!( + "{} requested, {} used globally ({}%), {} used by this stream, hard limit: {}", + ReadableSize(additional as u64), + ReadableSize(current as u64), + if limit > 0 { current * 100 / limit } else { 0 }, + ReadableSize(stream_tracked as u64), + ReadableSize(limit as u64) + ); + error::ExceedMemoryLimitSnafu { msg }.build() + } +} + +/// Builder for constructing a [`QueryMemoryTracker`] with optional callbacks. +pub struct QueryMemoryTrackerBuilder { + limit: usize, + on_exhausted_policy: OnExhaustedPolicy, + on_update: Option, + on_reject: Option, +} + +impl QueryMemoryTrackerBuilder { /// Set a callback to be called whenever the usage changes successfully. /// The callback receives the new total usage in bytes. /// /// # Note - /// The callback is called after both successful `track()` and `release()` operations. - /// It is called even when `limit == 0` (unlimited mode) to track actual usage. - pub fn with_on_update(mut self, on_update: F) -> Self + /// The callback is called after both successful `track()` and stream drop. + /// Usage is exact in unlimited mode and 1KB-aligned in limited mode. + pub fn on_update(mut self, on_update: F) -> Self where F: Fn(usize) + Send + Sync + 'static, { @@ -627,7 +519,7 @@ impl QueryMemoryTracker { /// # Note /// This is only called when `track()` fails due to exceeding the limit. /// It is never called when `limit == 0` (unlimited mode). - pub fn with_on_reject(mut self, on_reject: F) -> Self + pub fn on_reject(mut self, on_reject: F) -> Self where F: Fn() + Send + Sync + 'static, { @@ -635,105 +527,130 @@ impl QueryMemoryTracker { self } - /// Get the current memory usage in bytes. - pub fn current(&self) -> usize { - self.current.load(Ordering::Acquire) - } + /// Build a [`QueryMemoryTracker`] from this builder. + pub fn build(self) -> QueryMemoryTracker { + let metrics = CallbackMemoryMetrics::new(self.on_update, self.on_reject); + let manager = MemoryManager::with_granularity( + self.limit as u64, + PermitGranularity::Kilobyte, + metrics.clone(), + ); - fn calculate_privileged_slots(max_concurrent_queries: usize) -> usize { - if max_concurrent_queries == 0 { - Self::DEFAULT_PRIVILEGED_SLOTS + QueryMemoryTracker { + manager, + metrics, + on_exhausted_policy: self.on_exhausted_policy, + } + } +} + +struct StreamMemoryTracker { + tracker: QueryMemoryTracker, + guard: MemoryGuard, + tracked_bytes: usize, +} + +type MemoryAcquireResult = std::result::Result<(), common_memory_manager::Error>; + +impl StreamMemoryTracker { + fn try_track(&mut self, additional: usize) -> Result<()> { + if self.guard.try_acquire_additional(additional as u64) { + self.tracked_bytes = self.tracked_bytes.saturating_add(additional); + Ok(()) } else { - ((max_concurrent_queries as f64 * Self::DEFAULT_PRIVILEGED_TIER_RATIO) as usize).max(1) + Err(self.reject_error(additional)) } } - /// Internal method to track additional memory usage. - /// - /// Called by `MemoryPermit::track()`. Use `MemoryPermit::track()` instead of calling this directly. - fn track_internal( - &self, - additional: usize, - is_privileged: bool, - stream_tracked: usize, - ) -> Result<()> { - // Calculate effective global limit based on stream privilege - // Privileged streams: can push global usage up to full limit - // Standard-tier streams: can only push global usage up to fraction of limit - let effective_limit = if is_privileged { - self.limit - } else { - (self.limit as f64 * self.standard_tier_memory_fraction) as usize - }; - - let mut new_total = 0; + async fn track_with_policy(mut self, additional: usize) -> (Self, MemoryAcquireResult) { let result = self - .current - .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| { - new_total = current.saturating_add(additional); + .guard + .acquire_additional_with_policy(additional as u64, self.tracker.on_exhausted_policy) + .await; + if result.is_ok() { + self.tracked_bytes = self.tracked_bytes.saturating_add(additional); + } + (self, result) + } - if self.limit == 0 { - // Unlimited mode - return Some(new_total); - } + fn reject_error(&self, additional: usize) -> error::Error { + let current = self.tracker.current(); + self.tracker + .reject_error(current, additional, self.tracked_bytes) + } - // Check if new global total exceeds effective limit - // The configured limit is absolute hard limit - no stream can exceed it - if new_total <= effective_limit { - Some(new_total) - } else { - None - } - }); - - match result { - Ok(_) => { - if let Some(callback) = &self.on_update { - callback(new_total); - } - Ok(()) + fn wait_error(&self, additional: usize, source: common_memory_manager::Error) -> error::Error { + match source { + common_memory_manager::Error::MemoryLimitExceeded { .. } => { + self.reject_error(additional) } - Err(current) => { - if let Some(callback) = &self.on_reject { - callback(); - } + common_memory_manager::Error::MemoryAcquireTimeout { waited, .. } => { + let current = self.tracker.current(); + let limit = self.tracker.limit(); let msg = format!( - "{} requested, {} used globally ({}%), {} used by this stream (privileged: {}), effective limit: {} ({}%), hard limit: {}", + "timed out waiting {:?} for {}, {} used globally ({}%), {} used by this stream, hard limit: {}", + waited, ReadableSize(additional as u64), ReadableSize(current as u64), - if self.limit > 0 { - current * 100 / self.limit - } else { - 0 - }, - ReadableSize(stream_tracked as u64), - is_privileged, - ReadableSize(effective_limit as u64), - if self.limit > 0 { - effective_limit * 100 / self.limit - } else { - 0 - }, - ReadableSize(self.limit as u64) + if limit > 0 { current * 100 / limit } else { 0 }, + ReadableSize(self.tracked_bytes as u64), + ReadableSize(limit as u64) ); - error::ExceedMemoryLimitSnafu { msg }.fail() + error::ExceedMemoryLimitSnafu { msg }.build() } + error => error::ExternalSnafu.into_error(BoxedError::new(error)), + } + } +} + +type PendingTrackFuture = Pin< + Box + Send>, +>; + +#[derive(Clone)] +struct CallbackMemoryMetrics { + inner: Arc, +} + +type UpdateCallback = Arc; +type RejectCallback = Arc; + +struct CallbackMemoryMetricsInner { + on_update: Option, + on_reject: Option, +} + +impl CallbackMemoryMetrics { + fn new(on_update: Option, on_reject: Option) -> Self { + Self { + inner: Arc::new(CallbackMemoryMetricsInner { + on_update, + on_reject, + }), } } - /// Release tracked memory. - /// - /// # Arguments - /// * `amount` - Amount of memory to release in bytes - pub fn release(&self, amount: usize) { - if let Ok(old_value) = - self.current - .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| { - Some(current.saturating_sub(amount)) - }) - && let Some(callback) = &self.on_update - { - callback(old_value.saturating_sub(amount)); + fn has_on_update(&self) -> bool { + self.inner.on_update.is_some() + } + + fn has_on_reject(&self) -> bool { + self.inner.on_reject.is_some() + } +} + +impl MemoryMetrics for CallbackMemoryMetrics { + fn set_limit(&self, _: i64) {} + + fn set_in_use(&self, bytes: i64) { + if let Some(callback) = &self.inner.on_update { + callback(bytes.max(0) as usize); + } + } + + fn inc_rejected(&self, _: &str) { + if let Some(callback) = &self.inner.on_reject { + callback(); } } } @@ -741,38 +658,107 @@ impl QueryMemoryTracker { /// A wrapper stream that tracks memory usage of RecordBatches. pub struct MemoryTrackedStream { inner: SendableRecordBatchStream, - permit: Arc, - // Total tracked size, released when stream drops. - total_tracked: usize, + tracker: Option, + // Waiting stores a batch that has already been pulled from the inner stream but has not yet + // acquired additional quota. This keeps `poll_next()` non-blocking and allows bounded waits, + // at the cost of temporarily holding one untracked batch per blocked stream in memory. + waiting: Option, } impl MemoryTrackedStream { - pub fn new(inner: SendableRecordBatchStream, permit: Arc) -> Self { + pub fn new(inner: SendableRecordBatchStream, tracker: QueryMemoryTracker) -> Self { Self { inner, - permit, - total_tracked: 0, + tracker: Some(tracker.new_stream_tracker()), + waiting: None, } } + + fn ready_tracker_mut(&mut self) -> &mut StreamMemoryTracker { + debug_assert!( + self.waiting.is_none(), + "a ready tracker must not coexist with a waiting future" + ); + self.tracker.as_mut().unwrap() + } + + fn enter_waiting(&mut self, batch: RecordBatch, additional: usize) { + debug_assert!( + self.waiting.is_none(), + "enter_waiting should only be called from the ready state" + ); + debug_assert!( + self.tracker.is_some(), + "enter_waiting requires a tracker in the ready state" + ); + let tracker = self.tracker.take().unwrap(); + self.waiting = Some(Self::start_waiting(tracker, batch, additional)); + } + + fn start_waiting( + tracker: StreamMemoryTracker, + batch: RecordBatch, + additional: usize, + ) -> PendingTrackFuture { + Box::pin(async move { + let (tracker, result) = tracker.track_with_policy(additional).await; + (tracker, batch, additional, result) + }) + } + + fn poll_waiting(&mut self, cx: &mut Context<'_>) -> Poll>> { + let future = self.waiting.as_mut().unwrap(); + match future.as_mut().poll(cx) { + Poll::Ready((tracker, batch, additional, result)) => { + let output = match result { + Ok(()) => Ok(batch), + Err(error) => Err(tracker.wait_error(additional, error)), + }; + self.waiting = None; + self.tracker = Some(tracker); + Poll::Ready(Some(output)) + } + Poll::Pending => Poll::Pending, + } + } + + fn poll_batch( + &mut self, + batch: RecordBatch, + cx: &mut Context<'_>, + ) -> Poll>> { + let additional = batch.buffer_memory_size(); + let tracker = self.ready_tracker_mut(); + + if let Err(error) = tracker.try_track(additional) { + match tracker.tracker.on_exhausted_policy { + OnExhaustedPolicy::Fail => return Poll::Ready(Some(Err(error))), + // `Wait` is a deliberate tradeoff: the batch has already been materialized, so we + // keep it in memory while waiting for quota instead of failing immediately. Under + // contention, real memory usage can therefore exceed `scan_memory_limit` by up to + // one buffered batch per blocked stream. + OnExhaustedPolicy::Wait { .. } => { + self.enter_waiting(batch, additional); + return self.poll_waiting(cx); + } + } + } + + Poll::Ready(Some(Ok(batch))) + } } impl Stream for MemoryTrackedStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.waiting.is_some() { + return self.poll_waiting(cx); + } + match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Ready(Some(Ok(batch))) => { - let additional = batch.buffer_memory_size(); - - if let Err(e) = self.permit.track(additional, self.total_tracked) { - return Poll::Ready(Some(Err(e))); - } - - self.total_tracked += additional; - - Poll::Ready(Some(Ok(batch))) - } - Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Ok(batch))) => self.poll_batch(batch, cx), + Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } @@ -783,14 +769,6 @@ impl Stream for MemoryTrackedStream { } } -impl Drop for MemoryTrackedStream { - fn drop(&mut self) { - if self.total_tracked > 0 { - self.permit.release(self.total_tracked); - } - } -} - impl RecordBatchStream for MemoryTrackedStream { fn schema(&self) -> SchemaRef { self.inner.schema() @@ -808,13 +786,34 @@ impl RecordBatchStream for MemoryTrackedStream { #[cfg(test)] mod tests { use std::sync::Arc; + use std::time::Duration; + use common_memory_manager::{OnExhaustedPolicy, PermitGranularity}; use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{BooleanVector, Int32Vector, StringVector}; + use futures::StreamExt; + use tokio::time::{sleep, timeout}; use super::*; + fn large_string_batch(bytes: usize) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "payload", + ConcreteDataType::string_datatype(), + false, + )])); + let payload = "x".repeat(bytes); + let vector: VectorRef = Arc::new(StringVector::from(vec![payload])); + RecordBatch::new(schema, vec![vector]).unwrap() + } + + fn aligned_tracked_bytes(bytes: usize) -> usize { + PermitGranularity::Kilobyte + .permits_to_bytes(PermitGranularity::Kilobyte.bytes_to_permits(bytes as u64)) + as usize + } + #[test] fn test_recordbatches_try_from_columns() { let schema = Arc::new(Schema::new(vec![ColumnSchema::new( @@ -896,156 +895,168 @@ mod tests { assert_eq!(collected[1], batch2); } + const MB: usize = 1024 * 1024; + #[test] fn test_query_memory_tracker_basic() { - let tracker = Arc::new(QueryMemoryTracker::new(1000, 0)); + let tracker = + Arc::new(QueryMemoryTracker::builder(10 * MB, OnExhaustedPolicy::Fail).build()); - // Register first stream - should get privileged status - let permit1 = tracker.register_permit(); - assert!(permit1.is_privileged()); + let mut stream1 = tracker.new_stream_tracker(); + assert!(stream1.try_track(5 * MB).is_ok()); + assert_eq!(tracker.current(), 5 * MB); - // Privileged stream can use up to limit - assert!(permit1.track(500, 0).is_ok()); - assert_eq!(tracker.current(), 500); + let mut stream2 = tracker.new_stream_tracker(); + assert!(stream2.try_track(4 * MB).is_ok()); + assert_eq!(tracker.current(), 9 * MB); - // Register second stream - also privileged - let permit2 = tracker.register_permit(); - assert!(permit2.is_privileged()); - // Can add more but cannot exceed hard limit (1000) - assert!(permit2.track(400, 0).is_ok()); - assert_eq!(tracker.current(), 900); - - permit1.release(500); - permit2.release(400); + drop(stream1); + drop(stream2); assert_eq!(tracker.current(), 0); } #[test] - fn test_query_memory_tracker_privileged_limit() { - // Privileged slots = 2 for easy testing - // Limit: 1000, standard-tier fraction: 0.7 (default) - // Privileged can push global to 1000, standard-tier can push global to 700 - let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 2)); + fn test_query_memory_tracker_shared_global_limit() { + let tracker = + Arc::new(QueryMemoryTracker::builder(10 * MB, OnExhaustedPolicy::Fail).build()); + let mut stream1 = tracker.new_stream_tracker(); + let mut stream2 = tracker.new_stream_tracker(); - // First 2 streams are privileged - let permit1 = tracker.register_permit(); - let permit2 = tracker.register_permit(); - assert!(permit1.is_privileged()); - assert!(permit2.is_privileged()); + assert!(stream1.try_track(3 * MB).is_ok()); + assert_eq!(tracker.current(), 3 * MB); + assert!(stream2.try_track(6 * MB).is_ok()); + assert_eq!(tracker.current(), 9 * MB); - // Third stream is standard-tier (not privileged) - let permit3 = tracker.register_permit(); - assert!(!permit3.is_privileged()); - - // Privileged stream uses some memory - assert!(permit1.track(300, 0).is_ok()); - assert_eq!(tracker.current(), 300); - - // Standard-tier can add up to 400 (total becomes 700, its effective limit) - assert!(permit3.track(400, 0).is_ok()); - assert_eq!(tracker.current(), 700); - - // Standard-tier stream cannot push global beyond 700 - let err = permit3.track(100, 400).unwrap_err(); + let err = stream2.try_track(2 * MB).unwrap_err(); let err_msg = err.to_string(); - assert!(err_msg.contains("400B used by this stream")); - assert!(err_msg.contains("effective limit: 700B (70%)")); - assert!(err_msg.contains("700B used globally (70%)")); - assert_eq!(tracker.current(), 700); + assert!(err_msg.contains("6.0MiB used by this stream")); + assert!(err_msg.contains("9.0MiB used globally (90%)")); + assert!(err_msg.contains("hard limit: 10.0MiB")); + assert_eq!(tracker.current(), 9 * MB); - permit1.release(300); - permit3.release(400); + drop(stream1); + assert_eq!(tracker.current(), 6 * MB); + drop(stream2); assert_eq!(tracker.current(), 0); } #[test] - fn test_query_memory_tracker_promotion() { - // Privileged slots = 1 for easy testing - let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 1)); + fn test_query_memory_tracker_hard_limit() { + let tracker = + Arc::new(QueryMemoryTracker::builder(10 * MB, OnExhaustedPolicy::Fail).build()); + let mut stream = tracker.new_stream_tracker(); - // First stream is privileged - let permit1 = tracker.register_permit(); - assert!(permit1.is_privileged()); + assert!(stream.try_track(9 * MB).is_ok()); + assert_eq!(tracker.current(), 9 * MB); - // Second stream is standard-tier (can only use 500) - let permit2 = tracker.register_permit(); - assert!(!permit2.is_privileged()); + assert!(stream.try_track(2 * MB).is_err()); + assert_eq!(tracker.current(), 9 * MB); - // Standard-tier can only track 500 - assert!(permit2.track(400, 0).is_ok()); - assert_eq!(tracker.current(), 400); + assert!(stream.try_track(MB).is_ok()); + assert_eq!(tracker.current(), 10 * MB); - // Drop first permit to release privileged slot - drop(permit1); + assert!(stream.try_track(MB).is_err()); + assert_eq!(tracker.current(), 10 * MB); - // Second stream can now be promoted and use more memory - assert!(permit2.track(500, 400).is_ok()); - assert!(permit2.is_privileged()); - assert_eq!(tracker.current(), 900); - - permit2.release(900); + drop(stream); assert_eq!(tracker.current(), 0); } #[test] - fn test_query_memory_tracker_privileged_hard_limit() { - // Test that the configured limit is absolute hard limit for all streams - // Privileged: can use full limit (1000) - // Standard-tier: can use 0.7x limit (700 with defaults) - let tracker = Arc::new(QueryMemoryTracker::new(1000, 0)); + fn test_query_memory_tracker_unlimited() { + let tracker = Arc::new(QueryMemoryTracker::builder(0, OnExhaustedPolicy::Fail).build()); + let mut stream = tracker.new_stream_tracker(); - let permit1 = tracker.register_permit(); - assert!(permit1.is_privileged()); - - // Privileged can use up to full limit (1000) - assert!(permit1.track(900, 0).is_ok()); - assert_eq!(tracker.current(), 900); - - // Privileged cannot exceed hard limit (1000) - assert!(permit1.track(200, 900).is_err()); - assert_eq!(tracker.current(), 900); - - // Can add within hard limit - assert!(permit1.track(100, 900).is_ok()); - assert_eq!(tracker.current(), 1000); - - // Cannot exceed even by 1 byte - assert!(permit1.track(1, 1000).is_err()); - assert_eq!(tracker.current(), 1000); - - permit1.release(1000); + assert!(stream.try_track(10 * MB).is_ok()); + assert_eq!(tracker.current(), 10 * MB); + drop(stream); assert_eq!(tracker.current(), 0); } #[test] - fn test_query_memory_tracker_standard_tier_fraction() { - // Test standard-tier streams use fraction of limit - // Limit: 1000, default fraction: 0.7, so standard-tier can use 700 - let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 1)); + fn test_query_memory_tracker_rounds_to_kilobytes() { + let tracker = + Arc::new(QueryMemoryTracker::builder(10 * MB, OnExhaustedPolicy::Fail).build()); + let mut stream = tracker.new_stream_tracker(); - let permit1 = tracker.register_permit(); - assert!(permit1.is_privileged()); + assert!(stream.try_track(1_537).is_ok()); + assert_eq!(tracker.current(), 2 * 1024); - let permit2 = tracker.register_permit(); - assert!(!permit2.is_privileged()); - - // Standard-tier can use up to 700 (1000 * 0.7 default) - assert!(permit2.track(600, 0).is_ok()); - assert_eq!(tracker.current(), 600); - - // Cannot exceed standard-tier limit (700) - assert!(permit2.track(200, 600).is_err()); - assert_eq!(tracker.current(), 600); - - // Can add within standard-tier limit - assert!(permit2.track(100, 600).is_ok()); - assert_eq!(tracker.current(), 700); - - // Cannot exceed standard-tier limit - assert!(permit2.track(1, 700).is_err()); - assert_eq!(tracker.current(), 700); - - permit2.release(700); + drop(stream); assert_eq!(tracker.current(), 0); } + + #[tokio::test] + async fn test_memory_tracked_stream_waits_for_capacity() { + let tracker = QueryMemoryTracker::builder( + MB, + OnExhaustedPolicy::Wait { + timeout: Duration::from_millis(200), + }, + ) + .build(); + let batch = large_string_batch(700 * 1024); + let expected_bytes = aligned_tracked_bytes(batch.buffer_memory_size()); + + let mut stream1 = MemoryTrackedStream::new( + RecordBatches::try_new(batch.schema.clone(), vec![batch.clone()]) + .unwrap() + .as_stream(), + tracker.clone(), + ); + let first = stream1.next().await.unwrap().unwrap(); + assert_eq!(first.num_rows(), 1); + assert_eq!(tracker.current(), expected_bytes); + + let stream2 = MemoryTrackedStream::new( + RecordBatches::try_new(batch.schema.clone(), vec![batch]) + .unwrap() + .as_stream(), + tracker.clone(), + ); + let waiter = tokio::spawn(async move { + let mut stream2 = stream2; + stream2.next().await.unwrap() + }); + + sleep(Duration::from_millis(50)).await; + assert!(!waiter.is_finished()); + + drop(stream1); + let second = waiter.await.unwrap().unwrap(); + assert_eq!(second.num_rows(), 1); + } + + #[tokio::test] + async fn test_memory_tracked_stream_wait_times_out() { + let tracker = QueryMemoryTracker::builder( + MB, + OnExhaustedPolicy::Wait { + timeout: Duration::from_millis(50), + }, + ) + .build(); + let batch = large_string_batch(700 * 1024); + + let mut stream1 = MemoryTrackedStream::new( + RecordBatches::try_new(batch.schema.clone(), vec![batch.clone()]) + .unwrap() + .as_stream(), + tracker.clone(), + ); + let first = stream1.next().await.unwrap().unwrap(); + assert_eq!(first.num_rows(), 1); + + let mut stream2 = MemoryTrackedStream::new( + RecordBatches::try_new(batch.schema.clone(), vec![batch]) + .unwrap() + .as_stream(), + tracker, + ); + let result = timeout(Duration::from_secs(1), stream2.next()) + .await + .unwrap(); + let error = result.unwrap().unwrap_err(); + assert!(error.to_string().contains("timed out waiting")); + } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 3c62015179..859235fa9f 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -538,7 +538,6 @@ impl DatanodeBuilder { file_ref_manager, partition_expr_fetcher.clone(), plugins, - opts.max_concurrent_queries, ); #[cfg(feature = "enterprise")] @@ -581,7 +580,6 @@ impl DatanodeBuilder { file_ref_manager, partition_expr_fetcher, plugins, - opts.max_concurrent_queries, ); #[cfg(feature = "enterprise")] @@ -603,7 +601,6 @@ impl DatanodeBuilder { file_ref_manager, partition_expr_fetcher.clone(), plugins, - opts.max_concurrent_queries, ); #[cfg(feature = "enterprise")] diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 0eee067ab6..da0ec74022 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -151,6 +151,11 @@ pub struct MitoConfig { /// Memory limit for table scans across all queries. Setting it to 0 disables the limit. /// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%"). pub scan_memory_limit: MemoryLimit, + /// Behavior when scan memory tracking cannot acquire memory from the budget. + /// `wait` means `wait(10s)`, not unlimited waiting. + /// Defaults to [`OnExhaustedPolicy::Fail`], which intentionally differs from + /// [`OnExhaustedPolicy::default()`]. + pub scan_memory_on_exhausted: OnExhaustedPolicy, /// Index configs. pub index: IndexConfig, @@ -216,6 +221,7 @@ impl Default for MitoConfig { max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES, allow_stale_entries: false, scan_memory_limit: MemoryLimit::default(), + scan_memory_on_exhausted: OnExhaustedPolicy::Fail, index: IndexConfig::default(), inverted_index: InvertedIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(), diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 1af79daff6..fbafe1da67 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -95,7 +95,7 @@ use common_base::Plugins; use common_error::ext::BoxedError; use common_meta::error::UnexpectedSnafu; use common_meta::key::SchemaMetadataManagerRef; -use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream}; +use common_recordbatch::{QueryMemoryTracker, SendableRecordBatchStream}; use common_stat::get_total_memory_bytes; use common_telemetry::{info, tracing, warn}; use common_wal::options::WalOptions; @@ -167,7 +167,6 @@ pub struct MitoEngineBuilder<'a, S: LogStore> { file_ref_manager: FileReferenceManagerRef, partition_expr_fetcher: PartitionExprFetcherRef, plugins: Plugins, - max_concurrent_queries: usize, #[cfg(feature = "enterprise")] extension_range_provider_factory: Option, } @@ -183,7 +182,6 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { file_ref_manager: FileReferenceManagerRef, partition_expr_fetcher: PartitionExprFetcherRef, plugins: Plugins, - max_concurrent_queries: usize, ) -> Self { Self { data_home, @@ -194,7 +192,6 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { file_ref_manager, plugins, partition_expr_fetcher, - max_concurrent_queries, #[cfg(feature = "enterprise")] extension_range_provider_factory: None, } @@ -230,13 +227,14 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { let total_memory = get_total_memory_bytes().max(0) as u64; let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize; let scan_memory_tracker = - QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries) - .with_on_update(|usage| { + QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted) + .on_update(|usage| { SCAN_MEMORY_USAGE_BYTES.set(usage as i64); }) - .with_on_reject(|| { + .on_reject(|| { SCAN_REQUESTS_REJECTED_TOTAL.inc(); - }); + }) + .build(); let inner = EngineInner { workers, @@ -285,7 +283,6 @@ impl MitoEngine { file_ref_manager, partition_expr_fetcher, plugins, - 0, // Default: no limit on concurrent queries ); builder.try_build().await } @@ -1212,8 +1209,8 @@ impl RegionEngine for MitoEngine { .map_err(BoxedError::new) } - fn register_query_memory_permit(&self) -> Option> { - Some(Arc::new(self.inner.scan_memory_tracker.register_permit())) + fn query_memory_tracker(&self) -> Option { + Some(self.inner.scan_memory_tracker.clone()) } async fn get_committed_sequence( @@ -1378,13 +1375,15 @@ impl MitoEngine { let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone())); let total_memory = get_total_memory_bytes().max(0) as u64; let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize; - let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0) - .with_on_update(|usage| { - SCAN_MEMORY_USAGE_BYTES.set(usage as i64); - }) - .with_on_reject(|| { - SCAN_REQUESTS_REJECTED_TOTAL.inc(); - }); + let scan_memory_tracker = + QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted) + .on_update(|usage| { + SCAN_MEMORY_USAGE_BYTES.set(usage as i64); + }) + .on_reject(|| { + SCAN_REQUESTS_REJECTED_TOTAL.inc(); + }) + .build(); Ok(MitoEngine { inner: Arc::new(EngineInner { workers: WorkerGroup::start_for_test( diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 239cf7cea8..7ce85afbbb 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -187,8 +187,8 @@ impl TableProvider for DummyTableProvider { .handle_query(self.region_id, request.clone()) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; - let query_memory_permit = self.engine.register_query_memory_permit(); - let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?; + let query_memory_tracker = self.engine.query_memory_tracker(); + let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_tracker)?; if let Some(query_ctx) = &self.query_ctx { scan_exec.set_explain_verbose(query_ctx.explain_verbose()); } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index b3f460d01d..115c841f93 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -23,7 +23,7 @@ use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole use api::region::RegionResponse; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream}; +use common_recordbatch::{EmptyRecordBatchStream, QueryMemoryTracker, SendableRecordBatchStream}; use common_time::Timestamp; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr}; @@ -886,8 +886,8 @@ pub trait RegionEngine: Send + Sync { request: ScanRequest, ) -> Result; - /// Registers and returns a query memory permit. - fn register_query_memory_permit(&self) -> Option> { + /// Returns the query memory tracker for scan execution. + fn query_memory_tracker(&self) -> Option { None } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index e2d8f794da..83319f2688 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -20,7 +20,7 @@ use std::time::Instant; use common_error::ext::BoxedError; use common_recordbatch::{ - DfRecordBatch, DfSendableRecordBatchStream, MemoryPermit, MemoryTrackedStream, + DfRecordBatch, DfSendableRecordBatchStream, MemoryTrackedStream, QueryMemoryTracker, SendableRecordBatchStream, }; use common_telemetry::tracing::Span; @@ -67,7 +67,7 @@ pub struct RegionScanExec { // TODO(ruihang): handle TimeWindowed dist via this parameter distribution: Option, explain_verbose: bool, - query_memory_permit: Option>, + query_memory_tracker: Option, } impl std::fmt::Debug for RegionScanExec { @@ -91,7 +91,7 @@ impl RegionScanExec { pub fn new( scanner: RegionScannerRef, request: ScanRequest, - query_memory_permit: Option>, + query_memory_tracker: Option, ) -> DfResult { let arrow_schema = scanner.schema().arrow_schema().clone(); let scanner_props = scanner.properties(); @@ -226,7 +226,7 @@ impl RegionScanExec { is_partition_set: false, distribution: request.distribution, explain_verbose: false, - query_memory_permit, + query_memory_tracker, }) } @@ -299,7 +299,7 @@ impl RegionScanExec { is_partition_set: true, distribution: self.distribution, explain_verbose: self.explain_verbose, - query_memory_permit: self.query_memory_permit.clone(), + query_memory_tracker: self.query_memory_tracker.clone(), }) } @@ -387,8 +387,8 @@ impl ExecutionPlan for RegionScanExec { .scan_partition(&ctx, &self.metric, partition) .map_err(|e| DataFusionError::External(Box::new(e)))?; - let stream = if let Some(permit) = &self.query_memory_permit { - Box::pin(MemoryTrackedStream::new(stream, permit.clone())) + let stream = if let Some(tracker) = &self.query_memory_tracker { + Box::pin(MemoryTrackedStream::new(stream, tracker.clone())) } else { stream }; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7ae59ae9fc..05a34eb5b7 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1548,6 +1548,7 @@ sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 max_concurrent_scan_files = 384 allow_stale_entries = false +scan_memory_on_exhausted = "fail" min_compaction_interval = "0s" default_experimental_flat_format = false