diff --git a/Cargo.lock b/Cargo.lock
index ee23582145..ef68e3f55c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10127,6 +10127,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"common-runtime",
+ "common-stat",
"common-telemetry",
"common-time",
"datafusion",
diff --git a/config/config.md b/config/config.md
index b18b70cd20..6f8a19a49a 100644
--- a/config/config.md
+++ b/config/config.md
@@ -16,7 +16,7 @@
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup. By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
-| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
+| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. NOTE: This setting affects scan_memory_limit's privileged tier allocation. When set, 70% of queries get privileged memory access (full scan_memory_limit). The remaining 30% get standard tier access (70% of scan_memory_limit). |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
@@ -104,6 +104,7 @@
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode. Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine. Default to 0, which means the number of CPU cores. |
+| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join). Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit (unbounded, default behavior). When this limit is reached, queries will fail with ResourceExhausted error. NOTE: This does NOT limit memory used by table scans. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data. - `File`: the data is stored in the local file system. - `S3`: the data is stored in the S3 object storage. - `Gcs`: the data is stored in the Google Cloud Storage. - `Azblob`: the data is stored in the Azure Blob Storage. - `Oss`: the data is stored in the Aliyun OSS. |
@@ -155,6 +156,7 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
+| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. NOTE: Works with max_concurrent_queries for tiered memory allocation. - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access. - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions. To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -308,6 +310,7 @@
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine. Default to 0, which means the number of CPU cores. |
| `query.allow_query_fallback` | Bool | `false` | Whether to allow query fallback when push down optimize fails. Default to false, meaning when push down optimize failed, return error msg |
+| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join). Supports absolute size (e.g., "4GB", "8GB") or percentage of system memory (e.g., "30%"). Setting it to 0 disables the limit (unbounded, default behavior). When this limit is reached, queries will fail with ResourceExhausted error. NOTE: This does NOT limit memory used by table scans (only applies to datanodes). |
| `datanode` | -- | -- | Datanode options. |
| `datanode.client` | -- | -- | Datanode client options. |
| `datanode.client.connect_timeout` | String | `10s` | -- |
@@ -446,7 +449,7 @@
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases. It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup. By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
-| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
+| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. NOTE: This setting affects scan_memory_limit's privileged tier allocation. When set, 70% of queries get privileged memory access (full scan_memory_limit). The remaining 30% get standard tier access (70% of scan_memory_limit). |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
@@ -500,6 +503,7 @@
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL. **It's only used when the provider is `kafka`**.
This option ensures that when Kafka messages are deleted, the system can still successfully replay memtable data without throwing an out-of-range error. However, enabling this option might lead to unexpected data loss, as the system will skip over missing entries instead of treating them as critical errors. |
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine. Default to 0, which means the number of CPU cores. |
+| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join). Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit (unbounded, default behavior). When this limit is reached, queries will fail with ResourceExhausted error. NOTE: This does NOT limit memory used by table scans. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data. - `File`: the data is stored in the local file system. - `S3`: the data is stored in the S3 object storage. - `Gcs`: the data is stored in the Google Cloud Storage. - `Azblob`: the data is stored in the Azure Blob Storage. - `Oss`: the data is stored in the Aliyun OSS. |
@@ -553,6 +557,7 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
+| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. NOTE: Works with max_concurrent_queries for tiered memory allocation. - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access. - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions. To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -673,5 +678,6 @@
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
| `query` | -- | -- | -- |
| `query.parallelism` | Integer | `1` | Parallelism of the query engine for query sent by flownode. Default to 1, so it won't use too much cpu or memory |
+| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join). Supports absolute size (e.g., "1GB", "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit (unbounded, default behavior). When this limit is reached, queries will fail with ResourceExhausted error. NOTE: This does NOT limit memory used by table scans. |
| `memory` | -- | -- | The memory options. |
| `memory.enable_heap_profiling` | Bool | `true` | Whether to enable heap profiling activation during startup. When enabled, heap profiling will be activated if the `MALLOC_CONF` environment variable is set to "prof:true,prof_active:false". The official image adds this env variable. Default is true. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index c2cf183025..4e8605b387 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -18,6 +18,9 @@ init_regions_in_background = false
init_regions_parallelism = 16
## The maximum current queries allowed to be executed. Zero means unlimited.
+## NOTE: This setting affects scan_memory_limit's privileged tier allocation.
+## When set, 70% of queries get privileged memory access (full scan_memory_limit).
+## The remaining 30% get standard tier access (70% of scan_memory_limit).
max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
@@ -261,6 +264,13 @@ overwrite_entry_start_id = false
## Default to 0, which means the number of CPU cores.
parallelism = 0
+## Memory pool size for query execution operators (aggregation, sorting, join).
+## Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit (unbounded, default behavior).
+## When this limit is reached, queries will fail with ResourceExhausted error.
+## NOTE: This does NOT limit memory used by table scans.
+memory_pool_size = "50%"
+
## The data storage options.
[storage]
## The working home directory.
@@ -501,6 +511,14 @@ max_concurrent_scan_files = 384
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
+## Memory limit for table scans across all queries.
+## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit.
+## NOTE: Works with max_concurrent_queries for tiered memory allocation.
+## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
+## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access.
+scan_memory_limit = "50%"
+
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index 81ff25f283..4e44c1ecbb 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -158,6 +158,13 @@ default_ratio = 1.0
## Default to 1, so it won't use too much cpu or memory
parallelism = 1
+## Memory pool size for query execution operators (aggregation, sorting, join).
+## Supports absolute size (e.g., "1GB", "2GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit (unbounded, default behavior).
+## When this limit is reached, queries will fail with ResourceExhausted error.
+## NOTE: This does NOT limit memory used by table scans.
+memory_pool_size = "50%"
+
## The memory options.
[memory]
## Whether to enable heap profiling activation during startup.
diff --git a/config/frontend.example.toml b/config/frontend.example.toml
index 70c61c82c7..04d763c18f 100644
--- a/config/frontend.example.toml
+++ b/config/frontend.example.toml
@@ -256,6 +256,13 @@ parallelism = 0
## Default to false, meaning when push down optimize failed, return error msg
allow_query_fallback = false
+## Memory pool size for query execution operators (aggregation, sorting, join).
+## Supports absolute size (e.g., "4GB", "8GB") or percentage of system memory (e.g., "30%").
+## Setting it to 0 disables the limit (unbounded, default behavior).
+## When this limit is reached, queries will fail with ResourceExhausted error.
+## NOTE: This does NOT limit memory used by table scans (only applies to datanodes).
+memory_pool_size = "50%"
+
## Datanode options.
[datanode]
## Datanode client options.
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 1857f91ee9..bfdb507969 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -14,6 +14,9 @@ init_regions_in_background = false
init_regions_parallelism = 16
## The maximum current queries allowed to be executed. Zero means unlimited.
+## NOTE: This setting affects scan_memory_limit's privileged tier allocation.
+## When set, 70% of queries get privileged memory access (full scan_memory_limit).
+## The remaining 30% get standard tier access (70% of scan_memory_limit).
max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
@@ -365,6 +368,13 @@ max_running_procedures = 128
## Default to 0, which means the number of CPU cores.
parallelism = 0
+## Memory pool size for query execution operators (aggregation, sorting, join).
+## Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit (unbounded, default behavior).
+## When this limit is reached, queries will fail with ResourceExhausted error.
+## NOTE: This does NOT limit memory used by table scans.
+memory_pool_size = "50%"
+
## The data storage options.
[storage]
## The working home directory.
@@ -592,6 +602,14 @@ max_concurrent_scan_files = 384
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
+## Memory limit for table scans across all queries.
+## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
+## Setting it to 0 disables the limit.
+## NOTE: Works with max_concurrent_queries for tiered memory allocation.
+## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
+## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access.
+scan_memory_limit = "50%"
+
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index 94a5107bbb..222012bfd8 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -15,6 +15,7 @@
use std::time::Duration;
use cmd::options::GreptimeOptions;
+use common_base::memory_limit::MemoryLimit;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions};
@@ -74,6 +75,7 @@ fn test_load_datanode_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
+ scan_memory_limit: MemoryLimit::Percentage(50),
..Default::default()
}),
RegionEngineConfig::File(FileEngineConfig {}),
@@ -82,6 +84,10 @@ fn test_load_datanode_example_config() {
flush_metadata_region_interval: Duration::from_secs(30),
}),
],
+ query: QueryOptions {
+ memory_pool_size: MemoryLimit::Percentage(50),
+ ..Default::default()
+ },
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
@@ -155,6 +161,10 @@ fn test_load_frontend_example_config() {
cors_allowed_origins: vec!["https://example.com".to_string()],
..Default::default()
},
+ query: QueryOptions {
+ memory_pool_size: MemoryLimit::Percentage(50),
+ ..Default::default()
+ },
..Default::default()
},
..Default::default()
@@ -242,6 +252,7 @@ fn test_load_flownode_example_config() {
query: QueryOptions {
parallelism: 1,
allow_query_fallback: false,
+ memory_pool_size: MemoryLimit::Percentage(50),
},
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
@@ -286,6 +297,7 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
+ scan_memory_limit: MemoryLimit::Percentage(50),
..Default::default()
}),
RegionEngineConfig::File(FileEngineConfig {}),
@@ -314,7 +326,10 @@ fn test_load_standalone_example_config() {
cors_allowed_origins: vec!["https://example.com".to_string()],
..Default::default()
},
-
+ query: QueryOptions {
+ memory_pool_size: MemoryLimit::Percentage(50),
+ ..Default::default()
+ },
..Default::default()
},
..Default::default()
diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs
index 1f530c2753..91ee9d3343 100644
--- a/src/common/base/src/lib.rs
+++ b/src/common/base/src/lib.rs
@@ -15,6 +15,7 @@
pub mod bit_vec;
pub mod bytes;
pub mod cancellation;
+pub mod memory_limit;
pub mod plugins;
pub mod range_read;
#[allow(clippy::all)]
diff --git a/src/common/base/src/memory_limit.rs b/src/common/base/src/memory_limit.rs
new file mode 100644
index 0000000000..7129a4a027
--- /dev/null
+++ b/src/common/base/src/memory_limit.rs
@@ -0,0 +1,265 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::fmt::{self, Display};
+use std::str::FromStr;
+
+use serde::{Deserialize, Deserializer, Serialize, Serializer};
+
+use crate::readable_size::ReadableSize;
+
+/// Memory limit configuration that supports both absolute size and percentage.
+///
+/// Examples:
+/// - Absolute size: "2GB", "4GiB", "512MB"
+/// - Percentage: "50%", "75%"
+/// - Unlimited: "unlimited", "0"
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum MemoryLimit {
+ /// Absolute memory size.
+ Size(ReadableSize),
+ /// Percentage of total system memory (0-100).
+ Percentage(u8),
+ /// No memory limit.
+ #[default]
+ Unlimited,
+}
+
+impl MemoryLimit {
+ /// Resolve the memory limit to bytes based on total system memory.
+ /// Returns 0 if the limit is unlimited.
+ pub fn resolve(&self, total_memory_bytes: u64) -> u64 {
+ match self {
+ MemoryLimit::Size(size) => size.as_bytes(),
+ MemoryLimit::Percentage(pct) => total_memory_bytes * (*pct as u64) / 100,
+ MemoryLimit::Unlimited => 0,
+ }
+ }
+
+ /// Returns true if this limit is unlimited.
+ pub fn is_unlimited(&self) -> bool {
+ match self {
+ MemoryLimit::Size(size) => size.as_bytes() == 0,
+ MemoryLimit::Percentage(pct) => *pct == 0,
+ MemoryLimit::Unlimited => true,
+ }
+ }
+}
+
+impl FromStr for MemoryLimit {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result {
+ let s = s.trim();
+
+ if s.eq_ignore_ascii_case("unlimited") {
+ return Ok(MemoryLimit::Unlimited);
+ }
+
+ if let Some(pct_str) = s.strip_suffix('%') {
+ let pct = pct_str
+ .trim()
+ .parse::()
+ .map_err(|e| format!("invalid percentage value '{}': {}", pct_str, e))?;
+
+ if pct > 100 {
+ return Err(format!("percentage must be between 0 and 100, got {}", pct));
+ }
+
+ if pct == 0 {
+ Ok(MemoryLimit::Unlimited)
+ } else {
+ Ok(MemoryLimit::Percentage(pct))
+ }
+ } else {
+ let size = ReadableSize::from_str(s)?;
+ if size.as_bytes() == 0 {
+ Ok(MemoryLimit::Unlimited)
+ } else {
+ Ok(MemoryLimit::Size(size))
+ }
+ }
+ }
+}
+
+impl Display for MemoryLimit {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ MemoryLimit::Size(size) => write!(f, "{}", size),
+ MemoryLimit::Percentage(pct) => write!(f, "{}%", pct),
+ MemoryLimit::Unlimited => write!(f, "unlimited"),
+ }
+ }
+}
+
+impl Serialize for MemoryLimit {
+ fn serialize(&self, serializer: S) -> Result
+ where
+ S: Serializer,
+ {
+ serializer.serialize_str(&self.to_string())
+ }
+}
+
+impl<'de> Deserialize<'de> for MemoryLimit {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: Deserializer<'de>,
+ {
+ let s = String::deserialize(deserializer)?;
+ MemoryLimit::from_str(&s).map_err(serde::de::Error::custom)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_absolute_size() {
+ assert_eq!(
+ "2GB".parse::().unwrap(),
+ MemoryLimit::Size(ReadableSize(2 * 1024 * 1024 * 1024))
+ );
+ assert_eq!(
+ "512MB".parse::().unwrap(),
+ MemoryLimit::Size(ReadableSize(512 * 1024 * 1024))
+ );
+ assert_eq!("0".parse::().unwrap(), MemoryLimit::Unlimited);
+ }
+
+ #[test]
+ fn test_parse_percentage() {
+ assert_eq!(
+ "50%".parse::().unwrap(),
+ MemoryLimit::Percentage(50)
+ );
+ assert_eq!(
+ "75%".parse::().unwrap(),
+ MemoryLimit::Percentage(75)
+ );
+ assert_eq!("0%".parse::().unwrap(), MemoryLimit::Unlimited);
+ }
+
+ #[test]
+ fn test_parse_invalid() {
+ assert!("150%".parse::().is_err());
+ assert!("-10%".parse::().is_err());
+ assert!("invalid".parse::().is_err());
+ }
+
+ #[test]
+ fn test_resolve() {
+ let total = 8 * 1024 * 1024 * 1024; // 8GB
+
+ assert_eq!(
+ MemoryLimit::Size(ReadableSize(2 * 1024 * 1024 * 1024)).resolve(total),
+ 2 * 1024 * 1024 * 1024
+ );
+ assert_eq!(
+ MemoryLimit::Percentage(50).resolve(total),
+ 4 * 1024 * 1024 * 1024
+ );
+ assert_eq!(MemoryLimit::Unlimited.resolve(total), 0);
+ }
+
+ #[test]
+ fn test_is_unlimited() {
+ assert!(MemoryLimit::Unlimited.is_unlimited());
+ assert!(!MemoryLimit::Size(ReadableSize(1024)).is_unlimited());
+ assert!(!MemoryLimit::Percentage(50).is_unlimited());
+ assert!(!MemoryLimit::Percentage(1).is_unlimited());
+
+ // Defensive: these states shouldn't exist via public API, but check anyway
+ assert!(MemoryLimit::Size(ReadableSize(0)).is_unlimited());
+ assert!(MemoryLimit::Percentage(0).is_unlimited());
+ }
+
+ #[test]
+ fn test_parse_100_percent() {
+ assert_eq!(
+ "100%".parse::().unwrap(),
+ MemoryLimit::Percentage(100)
+ );
+ }
+
+ #[test]
+ fn test_display_percentage() {
+ assert_eq!(MemoryLimit::Percentage(20).to_string(), "20%");
+ assert_eq!(MemoryLimit::Percentage(50).to_string(), "50%");
+ assert_eq!(MemoryLimit::Percentage(100).to_string(), "100%");
+ }
+
+ #[test]
+ fn test_parse_unlimited() {
+ assert_eq!(
+ "unlimited".parse::().unwrap(),
+ MemoryLimit::Unlimited
+ );
+ assert_eq!(
+ "UNLIMITED".parse::().unwrap(),
+ MemoryLimit::Unlimited
+ );
+ assert_eq!(
+ "Unlimited".parse::().unwrap(),
+ MemoryLimit::Unlimited
+ );
+ }
+
+ #[test]
+ fn test_display_unlimited() {
+ assert_eq!(MemoryLimit::Unlimited.to_string(), "unlimited");
+ }
+
+ #[test]
+ fn test_parse_display_roundtrip() {
+ let cases = vec![
+ "50%",
+ "100%",
+ "1%",
+ "2GB",
+ "512MB",
+ "unlimited",
+ "UNLIMITED",
+ "0", // normalized to unlimited
+ "0%", // normalized to unlimited
+ ];
+
+ for input in cases {
+ let parsed = input.parse::().unwrap();
+ let displayed = parsed.to_string();
+ let reparsed = displayed.parse::().unwrap();
+ assert_eq!(
+ parsed, reparsed,
+ "round-trip failed: '{}' -> '{}' -> '{:?}'",
+ input, displayed, reparsed
+ );
+ }
+ }
+
+ #[test]
+ fn test_zero_normalization() {
+ // All forms of zero should normalize to Unlimited
+ assert_eq!("0".parse::().unwrap(), MemoryLimit::Unlimited);
+ assert_eq!("0%".parse::().unwrap(), MemoryLimit::Unlimited);
+ assert_eq!("0B".parse::().unwrap(), MemoryLimit::Unlimited);
+ assert_eq!(
+ "0KB".parse::().unwrap(),
+ MemoryLimit::Unlimited
+ );
+
+ // Unlimited always displays as "unlimited"
+ assert_eq!(MemoryLimit::Unlimited.to_string(), "unlimited");
+ }
+}
diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs
index e07d152d2d..2584b41b25 100644
--- a/src/common/recordbatch/src/error.rs
+++ b/src/common/recordbatch/src/error.rs
@@ -193,6 +193,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
+
+ #[snafu(display("Exceeded memory limit: {}", msg))]
+ ExceedMemoryLimit {
+ msg: String,
+ #[snafu(implicit)]
+ location: Location,
+ },
}
impl ErrorExt for Error {
@@ -229,6 +236,8 @@ impl ErrorExt for Error {
Error::StreamTimeout { .. } => StatusCode::Cancelled,
Error::StreamCancelled { .. } => StatusCode::Cancelled,
+
+ Error::ExceedMemoryLimit { .. } => StatusCode::RuntimeResourcesExhausted,
}
}
diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs
index 7ae4a419d6..1f5f28e87f 100644
--- a/src/common/recordbatch/src/lib.rs
+++ b/src/common/recordbatch/src/lib.rs
@@ -21,11 +21,14 @@ pub mod filter;
mod recordbatch;
pub mod util;
+use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
+use common_base::readable_size::ReadableSize;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::compute::SortOptions;
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
@@ -406,6 +409,399 @@ impl> + Unpin> Stream for RecordBatchStream
}
}
+/// Memory permit for a stream, providing privileged access or rate limiting.
+///
+/// The permit tracks whether this stream has privileged Top-K status.
+/// When dropped, it automatically releases any privileged slot it holds.
+pub struct MemoryPermit {
+ tracker: QueryMemoryTracker,
+ is_privileged: AtomicBool,
+}
+
+impl MemoryPermit {
+ /// Check if this permit currently has privileged status.
+ pub fn is_privileged(&self) -> bool {
+ self.is_privileged.load(Ordering::Acquire)
+ }
+
+ /// Ensure this permit has privileged status by acquiring a slot if available.
+ /// Returns true if privileged (either already privileged or just acquired privilege).
+ fn ensure_privileged(&self) -> bool {
+ if self.is_privileged.load(Ordering::Acquire) {
+ return true;
+ }
+
+ // Try to claim a privileged slot
+ self.tracker
+ .privileged_count
+ .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
+ if count < self.tracker.privileged_slots {
+ Some(count + 1)
+ } else {
+ None
+ }
+ })
+ .map(|_| {
+ self.is_privileged.store(true, Ordering::Release);
+ true
+ })
+ .unwrap_or(false)
+ }
+
+ /// Track additional memory usage with this permit.
+ /// Returns error if limit is exceeded.
+ ///
+ /// # Arguments
+ /// * `additional` - Additional memory size to track in bytes
+ /// * `stream_tracked` - Total memory already tracked by this stream
+ ///
+ /// # Behavior
+ /// - Privileged streams: Can push global memory usage up to full limit
+ /// - Standard-tier streams: Can push global memory usage up to limit * standard_tier_memory_fraction (default: 0.7)
+ /// - Standard-tier streams automatically attempt to acquire privilege if slots become available
+ /// - The configured limit is absolute hard limit - no stream can exceed it
+ pub fn track(&self, additional: usize, stream_tracked: usize) -> Result<()> {
+ // Ensure privileged status if possible
+ let is_privileged = self.ensure_privileged();
+
+ self.tracker
+ .track_internal(additional, is_privileged, stream_tracked)
+ }
+
+ /// Release tracked memory.
+ ///
+ /// # Arguments
+ /// * `amount` - Amount of memory to release in bytes
+ pub fn release(&self, amount: usize) {
+ self.tracker.release(amount);
+ }
+}
+
+impl Drop for MemoryPermit {
+ fn drop(&mut self) {
+ // Release privileged slot if we had one
+ if self.is_privileged.load(Ordering::Acquire) {
+ self.tracker
+ .privileged_count
+ .fetch_sub(1, Ordering::Release);
+ }
+ }
+}
+
+/// Memory tracker for RecordBatch streams. Clone to share the same limit across queries.
+///
+/// Implements a two-tier memory allocation strategy:
+/// - **Privileged tier**: First N streams (default: 20) can use up to the full memory limit
+/// - **Standard tier**: Remaining streams are restricted to a fraction of the limit (default: 70%)
+/// - Privilege is granted on a first-come-first-served basis
+/// - The configured limit is an absolute hard cap - no stream can exceed it
+#[derive(Clone)]
+pub struct QueryMemoryTracker {
+ current: Arc,
+ limit: usize,
+ standard_tier_memory_fraction: f64,
+ privileged_count: Arc,
+ privileged_slots: usize,
+ on_update: Option>,
+ on_reject: Option>,
+}
+
+impl fmt::Debug for QueryMemoryTracker {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("QueryMemoryTracker")
+ .field("current", &self.current.load(Ordering::Acquire))
+ .field("limit", &self.limit)
+ .field(
+ "standard_tier_memory_fraction",
+ &self.standard_tier_memory_fraction,
+ )
+ .field(
+ "privileged_count",
+ &self.privileged_count.load(Ordering::Acquire),
+ )
+ .field("privileged_slots", &self.privileged_slots)
+ .field("on_update", &self.on_update.is_some())
+ .field("on_reject", &self.on_reject.is_some())
+ .finish()
+ }
+}
+
+impl QueryMemoryTracker {
+ // Default privileged slots when max_concurrent_queries is 0.
+ const DEFAULT_PRIVILEGED_SLOTS: usize = 20;
+ // Ratio for privileged tier: 70% queries get privileged access, standard tier uses 70% memory.
+ const DEFAULT_PRIVILEGED_TIER_RATIO: f64 = 0.7;
+
+ /// Create a new memory tracker with the given limit and max_concurrent_queries.
+ /// Calculates privileged slots as 70% of max_concurrent_queries (or 20 if max_concurrent_queries is 0).
+ ///
+ /// # Arguments
+ /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
+ /// * `max_concurrent_queries` - Maximum number of concurrent queries (0 = unlimited).
+ pub fn new(limit: usize, max_concurrent_queries: usize) -> Self {
+ let privileged_slots = Self::calculate_privileged_slots(max_concurrent_queries);
+ Self::with_privileged_slots(limit, privileged_slots)
+ }
+
+ /// Create a new memory tracker with custom privileged slots limit.
+ pub fn with_privileged_slots(limit: usize, privileged_slots: usize) -> Self {
+ Self::with_config(limit, privileged_slots, Self::DEFAULT_PRIVILEGED_TIER_RATIO)
+ }
+
+ /// Create a new memory tracker with full configuration.
+ ///
+ /// # Arguments
+ /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
+ /// * `privileged_slots` - Maximum number of streams that can get privileged status.
+ /// * `standard_tier_memory_fraction` - Memory fraction for standard-tier streams (range: [0.0, 1.0]).
+ ///
+ /// # Panics
+ /// Panics if `standard_tier_memory_fraction` is not in the range [0.0, 1.0].
+ pub fn with_config(
+ limit: usize,
+ privileged_slots: usize,
+ standard_tier_memory_fraction: f64,
+ ) -> Self {
+ assert!(
+ (0.0..=1.0).contains(&standard_tier_memory_fraction),
+ "standard_tier_memory_fraction must be in [0.0, 1.0], got {}",
+ standard_tier_memory_fraction
+ );
+
+ Self {
+ current: Arc::new(AtomicUsize::new(0)),
+ limit,
+ standard_tier_memory_fraction,
+ privileged_count: Arc::new(AtomicUsize::new(0)),
+ privileged_slots,
+ on_update: None,
+ on_reject: None,
+ }
+ }
+
+ /// Register a new permit for memory tracking.
+ /// The first `privileged_slots` permits get privileged status automatically.
+ /// The returned permit can be shared across multiple streams of the same query.
+ pub fn register_permit(&self) -> MemoryPermit {
+ // Try to claim a privileged slot
+ let is_privileged = self
+ .privileged_count
+ .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
+ if count < self.privileged_slots {
+ Some(count + 1)
+ } else {
+ None
+ }
+ })
+ .is_ok();
+
+ MemoryPermit {
+ tracker: self.clone(),
+ is_privileged: AtomicBool::new(is_privileged),
+ }
+ }
+
+ /// Set a callback to be called whenever the usage changes successfully.
+ /// The callback receives the new total usage in bytes.
+ ///
+ /// # Note
+ /// The callback is called after both successful `track()` and `release()` operations.
+ /// It is called even when `limit == 0` (unlimited mode) to track actual usage.
+ pub fn with_on_update(mut self, on_update: F) -> Self
+ where
+ F: Fn(usize) + Send + Sync + 'static,
+ {
+ self.on_update = Some(Arc::new(on_update));
+ self
+ }
+
+ /// Set a callback to be called when memory allocation is rejected.
+ ///
+ /// # Note
+ /// This is only called when `track()` fails due to exceeding the limit.
+ /// It is never called when `limit == 0` (unlimited mode).
+ pub fn with_on_reject(mut self, on_reject: F) -> Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.on_reject = Some(Arc::new(on_reject));
+ self
+ }
+
+ /// Get the current memory usage in bytes.
+ pub fn current(&self) -> usize {
+ self.current.load(Ordering::Acquire)
+ }
+
+ fn calculate_privileged_slots(max_concurrent_queries: usize) -> usize {
+ if max_concurrent_queries == 0 {
+ Self::DEFAULT_PRIVILEGED_SLOTS
+ } else {
+ ((max_concurrent_queries as f64 * Self::DEFAULT_PRIVILEGED_TIER_RATIO) as usize).max(1)
+ }
+ }
+
+ /// Internal method to track additional memory usage.
+ ///
+ /// Called by `MemoryPermit::track()`. Use `MemoryPermit::track()` instead of calling this directly.
+ fn track_internal(
+ &self,
+ additional: usize,
+ is_privileged: bool,
+ stream_tracked: usize,
+ ) -> Result<()> {
+ // Calculate effective global limit based on stream privilege
+ // Privileged streams: can push global usage up to full limit
+ // Standard-tier streams: can only push global usage up to fraction of limit
+ let effective_limit = if is_privileged {
+ self.limit
+ } else {
+ (self.limit as f64 * self.standard_tier_memory_fraction) as usize
+ };
+
+ let mut new_total = 0;
+ let result = self
+ .current
+ .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
+ new_total = current.saturating_add(additional);
+
+ if self.limit == 0 {
+ // Unlimited mode
+ return Some(new_total);
+ }
+
+ // Check if new global total exceeds effective limit
+ // The configured limit is absolute hard limit - no stream can exceed it
+ if new_total <= effective_limit {
+ Some(new_total)
+ } else {
+ None
+ }
+ });
+
+ match result {
+ Ok(_) => {
+ if let Some(callback) = &self.on_update {
+ callback(new_total);
+ }
+ Ok(())
+ }
+ Err(current) => {
+ if let Some(callback) = &self.on_reject {
+ callback();
+ }
+ let msg = format!(
+ "{} requested, {} used globally ({}%), {} used by this stream (privileged: {}), effective limit: {} ({}%), hard limit: {}",
+ ReadableSize(additional as u64),
+ ReadableSize(current as u64),
+ if self.limit > 0 {
+ current * 100 / self.limit
+ } else {
+ 0
+ },
+ ReadableSize(stream_tracked as u64),
+ is_privileged,
+ ReadableSize(effective_limit as u64),
+ if self.limit > 0 {
+ effective_limit * 100 / self.limit
+ } else {
+ 0
+ },
+ ReadableSize(self.limit as u64)
+ );
+ error::ExceedMemoryLimitSnafu { msg }.fail()
+ }
+ }
+ }
+
+ /// Release tracked memory.
+ ///
+ /// # Arguments
+ /// * `amount` - Amount of memory to release in bytes
+ pub fn release(&self, amount: usize) {
+ if let Ok(old_value) =
+ self.current
+ .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
+ Some(current.saturating_sub(amount))
+ })
+ && let Some(callback) = &self.on_update
+ {
+ callback(old_value.saturating_sub(amount));
+ }
+ }
+}
+
+/// A wrapper stream that tracks memory usage of RecordBatches.
+pub struct MemoryTrackedStream {
+ inner: SendableRecordBatchStream,
+ permit: Arc,
+ // Total tracked size, released when stream drops.
+ total_tracked: usize,
+}
+
+impl MemoryTrackedStream {
+ pub fn new(inner: SendableRecordBatchStream, permit: Arc) -> Self {
+ Self {
+ inner,
+ permit,
+ total_tracked: 0,
+ }
+ }
+}
+
+impl Stream for MemoryTrackedStream {
+ type Item = Result;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll