From c7fded29ee845b51b8a102d25dd52b53c123521c Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 11 Nov 2025 15:47:55 +0800 Subject: [PATCH] feat: query mem limiter (#7078) * feat: query mem limiter * feat: config docs * feat: frontend query limit config * fix: unused imports Signed-off-by: jeremyhi * feat: add metrics for query memory tracker Signed-off-by: jeremyhi * fix: right postion for tracker Signed-off-by: jeremyhi * fix: avoid race condition Signed-off-by: jeremyhi * feat: soft and hard limit Signed-off-by: jeremyhi * feat: docs Signed-off-by: jeremyhi * fix: when soft_limit == 0 Signed-off-by: jeremyhi * feat: upgrade limit algorithm Signed-off-by: jeremyhi * fix: remove batch window Signed-off-by: jeremyhi * chore: batch mem size Signed-off-by: jeremyhi * feat: refine limit algorithm Signed-off-by: jeremyhi * fix: get sys mem Signed-off-by: jeremyhi * chore: minor change * feat: up tracker to the top stream * feat: estimated_size for batch Signed-off-by: jeremyhi * chore: minor refactor * feat: scan_memory_limit connect to max_concurrent_queries Signed-off-by: jeremyhi * chore: make callback clearly * feat: add unlimted enum Signed-off-by: jeremyhi * chore: by review comment * chore: comment on recursion_limit Signed-off-by: jeremyhi * feat: refactor and put permit into RegionScanExec Signed-off-by: jeremyhi * chore: multiple lazy static blocks * chore: minor change Signed-off-by: jeremyhi --------- Signed-off-by: jeremyhi --- Cargo.lock | 1 + config/config.md | 10 +- config/datanode.example.toml | 18 + config/flownode.example.toml | 7 + config/frontend.example.toml | 7 + config/standalone.example.toml | 18 + src/cmd/tests/load_config_test.rs | 17 +- src/common/base/src/lib.rs | 1 + src/common/base/src/memory_limit.rs | 265 ++++++++++++++ src/common/recordbatch/src/error.rs | 9 + src/common/recordbatch/src/lib.rs | 549 ++++++++++++++++++++++++++++ src/datanode/src/datanode.rs | 3 + src/flow/src/adapter.rs | 2 + src/mito2/src/config.rs | 5 + src/mito2/src/engine.rs | 39 +- src/mito2/src/metrics.rs | 34 +- src/query/Cargo.toml | 1 + src/query/src/dummy_catalog.rs | 3 +- src/query/src/metrics.rs | 12 + src/query/src/options.rs | 6 + src/query/src/query_engine/state.rs | 83 ++++- src/store-api/src/region_engine.rs | 7 +- src/table/src/table/scan.rs | 41 ++- tests-integration/tests/http.rs | 2 + 24 files changed, 1118 insertions(+), 22 deletions(-) create mode 100644 src/common/base/src/memory_limit.rs diff --git a/Cargo.lock b/Cargo.lock index ee23582145..ef68e3f55c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10127,6 +10127,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-runtime", + "common-stat", "common-telemetry", "common-time", "datafusion", diff --git a/config/config.md b/config/config.md index b18b70cd20..6f8a19a49a 100644 --- a/config/config.md +++ b/config/config.md @@ -16,7 +16,7 @@ | `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. | | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | -| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. | +| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited.
NOTE: This setting affects scan_memory_limit's privileged tier allocation.
When set, 70% of queries get privileged memory access (full scan_memory_limit).
The remaining 30% get standard tier access (70% of scan_memory_limit). | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. | | `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. | | `runtime` | -- | -- | The runtime options. | @@ -104,6 +104,7 @@ | `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.
Not setting(or set to 0) this value will use the number of CPU cores divided by 2. | | `query` | -- | -- | The query engine options. | | `query.parallelism` | Integer | `0` | Parallelism of the query engine.
Default to 0, which means the number of CPU cores. | +| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join).
Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").
Setting it to 0 disables the limit (unbounded, default behavior).
When this limit is reached, queries will fail with ResourceExhausted error.
NOTE: This does NOT limit memory used by table scans. | | `storage` | -- | -- | The data storage options. | | `storage.data_home` | String | `./greptimedb_data` | The working home directory. | | `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. | @@ -155,6 +156,7 @@ | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | +| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.
Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
Setting it to 0 disables the limit.
NOTE: Works with max_concurrent_queries for tiered memory allocation.
- If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
- If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | @@ -308,6 +310,7 @@ | `query` | -- | -- | The query engine options. | | `query.parallelism` | Integer | `0` | Parallelism of the query engine.
Default to 0, which means the number of CPU cores. | | `query.allow_query_fallback` | Bool | `false` | Whether to allow query fallback when push down optimize fails.
Default to false, meaning when push down optimize failed, return error msg | +| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join).
Supports absolute size (e.g., "4GB", "8GB") or percentage of system memory (e.g., "30%").
Setting it to 0 disables the limit (unbounded, default behavior).
When this limit is reached, queries will fail with ResourceExhausted error.
NOTE: This does NOT limit memory used by table scans (only applies to datanodes). | | `datanode` | -- | -- | Datanode options. | | `datanode.client` | -- | -- | Datanode client options. | | `datanode.client.connect_timeout` | String | `10s` | -- | @@ -446,7 +449,7 @@ | `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.
It will block the datanode start if it can't receive leases in the heartbeat from metasrv. | | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | -| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. | +| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited.
NOTE: This setting affects scan_memory_limit's privileged tier allocation.
When set, 70% of queries get privileged memory access (full scan_memory_limit).
The remaining 30% get standard tier access (70% of scan_memory_limit). | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | @@ -500,6 +503,7 @@ | `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.

This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. | | `query` | -- | -- | The query engine options. | | `query.parallelism` | Integer | `0` | Parallelism of the query engine.
Default to 0, which means the number of CPU cores. | +| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join).
Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").
Setting it to 0 disables the limit (unbounded, default behavior).
When this limit is reached, queries will fail with ResourceExhausted error.
NOTE: This does NOT limit memory used by table scans. | | `storage` | -- | -- | The data storage options. | | `storage.data_home` | String | `./greptimedb_data` | The working home directory. | | `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. | @@ -553,6 +557,7 @@ | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | +| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.
Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
Setting it to 0 disables the limit.
NOTE: Works with max_concurrent_queries for tiered memory allocation.
- If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
- If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | | `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | @@ -673,5 +678,6 @@ | `tracing.tokio_console_addr` | String | Unset | The tokio console address. | | `query` | -- | -- | -- | | `query.parallelism` | Integer | `1` | Parallelism of the query engine for query sent by flownode.
Default to 1, so it won't use too much cpu or memory | +| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join).
Supports absolute size (e.g., "1GB", "2GB") or percentage of system memory (e.g., "20%").
Setting it to 0 disables the limit (unbounded, default behavior).
When this limit is reached, queries will fail with ResourceExhausted error.
NOTE: This does NOT limit memory used by table scans. | | `memory` | -- | -- | The memory options. | | `memory.enable_heap_profiling` | Bool | `true` | Whether to enable heap profiling activation during startup.
When enabled, heap profiling will be activated if the `MALLOC_CONF` environment variable
is set to "prof:true,prof_active:false". The official image adds this env variable.
Default is true. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index c2cf183025..4e8605b387 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -18,6 +18,9 @@ init_regions_in_background = false init_regions_parallelism = 16 ## The maximum current queries allowed to be executed. Zero means unlimited. +## NOTE: This setting affects scan_memory_limit's privileged tier allocation. +## When set, 70% of queries get privileged memory access (full scan_memory_limit). +## The remaining 30% get standard tier access (70% of scan_memory_limit). max_concurrent_queries = 0 ## Enable telemetry to collect anonymous usage data. Enabled by default. @@ -261,6 +264,13 @@ overwrite_entry_start_id = false ## Default to 0, which means the number of CPU cores. parallelism = 0 +## Memory pool size for query execution operators (aggregation, sorting, join). +## Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%"). +## Setting it to 0 disables the limit (unbounded, default behavior). +## When this limit is reached, queries will fail with ResourceExhausted error. +## NOTE: This does NOT limit memory used by table scans. +memory_pool_size = "50%" + ## The data storage options. [storage] ## The working home directory. @@ -501,6 +511,14 @@ max_concurrent_scan_files = 384 ## Whether to allow stale WAL entries read during replay. allow_stale_entries = false +## Memory limit for table scans across all queries. +## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). +## Setting it to 0 disables the limit. +## NOTE: Works with max_concurrent_queries for tiered memory allocation. +## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access. +## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. +scan_memory_limit = "50%" + ## Minimum time interval between two compactions. ## To align with the old behavior, the default value is 0 (no restrictions). min_compaction_interval = "0m" diff --git a/config/flownode.example.toml b/config/flownode.example.toml index 81ff25f283..4e44c1ecbb 100644 --- a/config/flownode.example.toml +++ b/config/flownode.example.toml @@ -158,6 +158,13 @@ default_ratio = 1.0 ## Default to 1, so it won't use too much cpu or memory parallelism = 1 +## Memory pool size for query execution operators (aggregation, sorting, join). +## Supports absolute size (e.g., "1GB", "2GB") or percentage of system memory (e.g., "20%"). +## Setting it to 0 disables the limit (unbounded, default behavior). +## When this limit is reached, queries will fail with ResourceExhausted error. +## NOTE: This does NOT limit memory used by table scans. +memory_pool_size = "50%" + ## The memory options. [memory] ## Whether to enable heap profiling activation during startup. diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 70c61c82c7..04d763c18f 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -256,6 +256,13 @@ parallelism = 0 ## Default to false, meaning when push down optimize failed, return error msg allow_query_fallback = false +## Memory pool size for query execution operators (aggregation, sorting, join). +## Supports absolute size (e.g., "4GB", "8GB") or percentage of system memory (e.g., "30%"). +## Setting it to 0 disables the limit (unbounded, default behavior). +## When this limit is reached, queries will fail with ResourceExhausted error. +## NOTE: This does NOT limit memory used by table scans (only applies to datanodes). +memory_pool_size = "50%" + ## Datanode options. [datanode] ## Datanode client options. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 1857f91ee9..bfdb507969 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -14,6 +14,9 @@ init_regions_in_background = false init_regions_parallelism = 16 ## The maximum current queries allowed to be executed. Zero means unlimited. +## NOTE: This setting affects scan_memory_limit's privileged tier allocation. +## When set, 70% of queries get privileged memory access (full scan_memory_limit). +## The remaining 30% get standard tier access (70% of scan_memory_limit). max_concurrent_queries = 0 ## Enable telemetry to collect anonymous usage data. Enabled by default. @@ -365,6 +368,13 @@ max_running_procedures = 128 ## Default to 0, which means the number of CPU cores. parallelism = 0 +## Memory pool size for query execution operators (aggregation, sorting, join). +## Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%"). +## Setting it to 0 disables the limit (unbounded, default behavior). +## When this limit is reached, queries will fail with ResourceExhausted error. +## NOTE: This does NOT limit memory used by table scans. +memory_pool_size = "50%" + ## The data storage options. [storage] ## The working home directory. @@ -592,6 +602,14 @@ max_concurrent_scan_files = 384 ## Whether to allow stale WAL entries read during replay. allow_stale_entries = false +## Memory limit for table scans across all queries. +## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). +## Setting it to 0 disables the limit. +## NOTE: Works with max_concurrent_queries for tiered memory allocation. +## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access. +## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. +scan_memory_limit = "50%" + ## Minimum time interval between two compactions. ## To align with the old behavior, the default value is 0 (no restrictions). min_compaction_interval = "0m" diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 94a5107bbb..222012bfd8 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -15,6 +15,7 @@ use std::time::Duration; use cmd::options::GreptimeOptions; +use common_base::memory_limit::MemoryLimit; use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_options::datanode::{ClientOptions, DatanodeClientOptions}; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions}; @@ -74,6 +75,7 @@ fn test_load_datanode_example_config() { RegionEngineConfig::Mito(MitoConfig { auto_flush_interval: Duration::from_secs(3600), write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)), + scan_memory_limit: MemoryLimit::Percentage(50), ..Default::default() }), RegionEngineConfig::File(FileEngineConfig {}), @@ -82,6 +84,10 @@ fn test_load_datanode_example_config() { flush_metadata_region_interval: Duration::from_secs(30), }), ], + query: QueryOptions { + memory_pool_size: MemoryLimit::Percentage(50), + ..Default::default() + }, logging: LoggingOptions { level: Some("info".to_string()), dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR), @@ -155,6 +161,10 @@ fn test_load_frontend_example_config() { cors_allowed_origins: vec!["https://example.com".to_string()], ..Default::default() }, + query: QueryOptions { + memory_pool_size: MemoryLimit::Percentage(50), + ..Default::default() + }, ..Default::default() }, ..Default::default() @@ -242,6 +252,7 @@ fn test_load_flownode_example_config() { query: QueryOptions { parallelism: 1, allow_query_fallback: false, + memory_pool_size: MemoryLimit::Percentage(50), }, meta_client: Some(MetaClientOptions { metasrv_addrs: vec!["127.0.0.1:3002".to_string()], @@ -286,6 +297,7 @@ fn test_load_standalone_example_config() { RegionEngineConfig::Mito(MitoConfig { auto_flush_interval: Duration::from_secs(3600), write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)), + scan_memory_limit: MemoryLimit::Percentage(50), ..Default::default() }), RegionEngineConfig::File(FileEngineConfig {}), @@ -314,7 +326,10 @@ fn test_load_standalone_example_config() { cors_allowed_origins: vec!["https://example.com".to_string()], ..Default::default() }, - + query: QueryOptions { + memory_pool_size: MemoryLimit::Percentage(50), + ..Default::default() + }, ..Default::default() }, ..Default::default() diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 1f530c2753..91ee9d3343 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -15,6 +15,7 @@ pub mod bit_vec; pub mod bytes; pub mod cancellation; +pub mod memory_limit; pub mod plugins; pub mod range_read; #[allow(clippy::all)] diff --git a/src/common/base/src/memory_limit.rs b/src/common/base/src/memory_limit.rs new file mode 100644 index 0000000000..7129a4a027 --- /dev/null +++ b/src/common/base/src/memory_limit.rs @@ -0,0 +1,265 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self, Display}; +use std::str::FromStr; + +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use crate::readable_size::ReadableSize; + +/// Memory limit configuration that supports both absolute size and percentage. +/// +/// Examples: +/// - Absolute size: "2GB", "4GiB", "512MB" +/// - Percentage: "50%", "75%" +/// - Unlimited: "unlimited", "0" +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum MemoryLimit { + /// Absolute memory size. + Size(ReadableSize), + /// Percentage of total system memory (0-100). + Percentage(u8), + /// No memory limit. + #[default] + Unlimited, +} + +impl MemoryLimit { + /// Resolve the memory limit to bytes based on total system memory. + /// Returns 0 if the limit is unlimited. + pub fn resolve(&self, total_memory_bytes: u64) -> u64 { + match self { + MemoryLimit::Size(size) => size.as_bytes(), + MemoryLimit::Percentage(pct) => total_memory_bytes * (*pct as u64) / 100, + MemoryLimit::Unlimited => 0, + } + } + + /// Returns true if this limit is unlimited. + pub fn is_unlimited(&self) -> bool { + match self { + MemoryLimit::Size(size) => size.as_bytes() == 0, + MemoryLimit::Percentage(pct) => *pct == 0, + MemoryLimit::Unlimited => true, + } + } +} + +impl FromStr for MemoryLimit { + type Err = String; + + fn from_str(s: &str) -> Result { + let s = s.trim(); + + if s.eq_ignore_ascii_case("unlimited") { + return Ok(MemoryLimit::Unlimited); + } + + if let Some(pct_str) = s.strip_suffix('%') { + let pct = pct_str + .trim() + .parse::() + .map_err(|e| format!("invalid percentage value '{}': {}", pct_str, e))?; + + if pct > 100 { + return Err(format!("percentage must be between 0 and 100, got {}", pct)); + } + + if pct == 0 { + Ok(MemoryLimit::Unlimited) + } else { + Ok(MemoryLimit::Percentage(pct)) + } + } else { + let size = ReadableSize::from_str(s)?; + if size.as_bytes() == 0 { + Ok(MemoryLimit::Unlimited) + } else { + Ok(MemoryLimit::Size(size)) + } + } + } +} + +impl Display for MemoryLimit { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MemoryLimit::Size(size) => write!(f, "{}", size), + MemoryLimit::Percentage(pct) => write!(f, "{}%", pct), + MemoryLimit::Unlimited => write!(f, "unlimited"), + } + } +} + +impl Serialize for MemoryLimit { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl<'de> Deserialize<'de> for MemoryLimit { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + MemoryLimit::from_str(&s).map_err(serde::de::Error::custom) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_absolute_size() { + assert_eq!( + "2GB".parse::().unwrap(), + MemoryLimit::Size(ReadableSize(2 * 1024 * 1024 * 1024)) + ); + assert_eq!( + "512MB".parse::().unwrap(), + MemoryLimit::Size(ReadableSize(512 * 1024 * 1024)) + ); + assert_eq!("0".parse::().unwrap(), MemoryLimit::Unlimited); + } + + #[test] + fn test_parse_percentage() { + assert_eq!( + "50%".parse::().unwrap(), + MemoryLimit::Percentage(50) + ); + assert_eq!( + "75%".parse::().unwrap(), + MemoryLimit::Percentage(75) + ); + assert_eq!("0%".parse::().unwrap(), MemoryLimit::Unlimited); + } + + #[test] + fn test_parse_invalid() { + assert!("150%".parse::().is_err()); + assert!("-10%".parse::().is_err()); + assert!("invalid".parse::().is_err()); + } + + #[test] + fn test_resolve() { + let total = 8 * 1024 * 1024 * 1024; // 8GB + + assert_eq!( + MemoryLimit::Size(ReadableSize(2 * 1024 * 1024 * 1024)).resolve(total), + 2 * 1024 * 1024 * 1024 + ); + assert_eq!( + MemoryLimit::Percentage(50).resolve(total), + 4 * 1024 * 1024 * 1024 + ); + assert_eq!(MemoryLimit::Unlimited.resolve(total), 0); + } + + #[test] + fn test_is_unlimited() { + assert!(MemoryLimit::Unlimited.is_unlimited()); + assert!(!MemoryLimit::Size(ReadableSize(1024)).is_unlimited()); + assert!(!MemoryLimit::Percentage(50).is_unlimited()); + assert!(!MemoryLimit::Percentage(1).is_unlimited()); + + // Defensive: these states shouldn't exist via public API, but check anyway + assert!(MemoryLimit::Size(ReadableSize(0)).is_unlimited()); + assert!(MemoryLimit::Percentage(0).is_unlimited()); + } + + #[test] + fn test_parse_100_percent() { + assert_eq!( + "100%".parse::().unwrap(), + MemoryLimit::Percentage(100) + ); + } + + #[test] + fn test_display_percentage() { + assert_eq!(MemoryLimit::Percentage(20).to_string(), "20%"); + assert_eq!(MemoryLimit::Percentage(50).to_string(), "50%"); + assert_eq!(MemoryLimit::Percentage(100).to_string(), "100%"); + } + + #[test] + fn test_parse_unlimited() { + assert_eq!( + "unlimited".parse::().unwrap(), + MemoryLimit::Unlimited + ); + assert_eq!( + "UNLIMITED".parse::().unwrap(), + MemoryLimit::Unlimited + ); + assert_eq!( + "Unlimited".parse::().unwrap(), + MemoryLimit::Unlimited + ); + } + + #[test] + fn test_display_unlimited() { + assert_eq!(MemoryLimit::Unlimited.to_string(), "unlimited"); + } + + #[test] + fn test_parse_display_roundtrip() { + let cases = vec![ + "50%", + "100%", + "1%", + "2GB", + "512MB", + "unlimited", + "UNLIMITED", + "0", // normalized to unlimited + "0%", // normalized to unlimited + ]; + + for input in cases { + let parsed = input.parse::().unwrap(); + let displayed = parsed.to_string(); + let reparsed = displayed.parse::().unwrap(); + assert_eq!( + parsed, reparsed, + "round-trip failed: '{}' -> '{}' -> '{:?}'", + input, displayed, reparsed + ); + } + } + + #[test] + fn test_zero_normalization() { + // All forms of zero should normalize to Unlimited + assert_eq!("0".parse::().unwrap(), MemoryLimit::Unlimited); + assert_eq!("0%".parse::().unwrap(), MemoryLimit::Unlimited); + assert_eq!("0B".parse::().unwrap(), MemoryLimit::Unlimited); + assert_eq!( + "0KB".parse::().unwrap(), + MemoryLimit::Unlimited + ); + + // Unlimited always displays as "unlimited" + assert_eq!(MemoryLimit::Unlimited.to_string(), "unlimited"); + } +} diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index e07d152d2d..2584b41b25 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -193,6 +193,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Exceeded memory limit: {}", msg))] + ExceedMemoryLimit { + msg: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -229,6 +236,8 @@ impl ErrorExt for Error { Error::StreamTimeout { .. } => StatusCode::Cancelled, Error::StreamCancelled { .. } => StatusCode::Cancelled, + + Error::ExceedMemoryLimit { .. } => StatusCode::RuntimeResourcesExhausted, } } diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 7ae4a419d6..1f5f28e87f 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -21,11 +21,14 @@ pub mod filter; mod recordbatch; pub mod util; +use std::fmt; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use adapter::RecordBatchMetrics; use arc_swap::ArcSwapOption; +use common_base::readable_size::ReadableSize; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::arrow::compute::SortOptions; pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch; @@ -406,6 +409,399 @@ impl> + Unpin> Stream for RecordBatchStream } } +/// Memory permit for a stream, providing privileged access or rate limiting. +/// +/// The permit tracks whether this stream has privileged Top-K status. +/// When dropped, it automatically releases any privileged slot it holds. +pub struct MemoryPermit { + tracker: QueryMemoryTracker, + is_privileged: AtomicBool, +} + +impl MemoryPermit { + /// Check if this permit currently has privileged status. + pub fn is_privileged(&self) -> bool { + self.is_privileged.load(Ordering::Acquire) + } + + /// Ensure this permit has privileged status by acquiring a slot if available. + /// Returns true if privileged (either already privileged or just acquired privilege). + fn ensure_privileged(&self) -> bool { + if self.is_privileged.load(Ordering::Acquire) { + return true; + } + + // Try to claim a privileged slot + self.tracker + .privileged_count + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| { + if count < self.tracker.privileged_slots { + Some(count + 1) + } else { + None + } + }) + .map(|_| { + self.is_privileged.store(true, Ordering::Release); + true + }) + .unwrap_or(false) + } + + /// Track additional memory usage with this permit. + /// Returns error if limit is exceeded. + /// + /// # Arguments + /// * `additional` - Additional memory size to track in bytes + /// * `stream_tracked` - Total memory already tracked by this stream + /// + /// # Behavior + /// - Privileged streams: Can push global memory usage up to full limit + /// - Standard-tier streams: Can push global memory usage up to limit * standard_tier_memory_fraction (default: 0.7) + /// - Standard-tier streams automatically attempt to acquire privilege if slots become available + /// - The configured limit is absolute hard limit - no stream can exceed it + pub fn track(&self, additional: usize, stream_tracked: usize) -> Result<()> { + // Ensure privileged status if possible + let is_privileged = self.ensure_privileged(); + + self.tracker + .track_internal(additional, is_privileged, stream_tracked) + } + + /// Release tracked memory. + /// + /// # Arguments + /// * `amount` - Amount of memory to release in bytes + pub fn release(&self, amount: usize) { + self.tracker.release(amount); + } +} + +impl Drop for MemoryPermit { + fn drop(&mut self) { + // Release privileged slot if we had one + if self.is_privileged.load(Ordering::Acquire) { + self.tracker + .privileged_count + .fetch_sub(1, Ordering::Release); + } + } +} + +/// Memory tracker for RecordBatch streams. Clone to share the same limit across queries. +/// +/// Implements a two-tier memory allocation strategy: +/// - **Privileged tier**: First N streams (default: 20) can use up to the full memory limit +/// - **Standard tier**: Remaining streams are restricted to a fraction of the limit (default: 70%) +/// - Privilege is granted on a first-come-first-served basis +/// - The configured limit is an absolute hard cap - no stream can exceed it +#[derive(Clone)] +pub struct QueryMemoryTracker { + current: Arc, + limit: usize, + standard_tier_memory_fraction: f64, + privileged_count: Arc, + privileged_slots: usize, + on_update: Option>, + on_reject: Option>, +} + +impl fmt::Debug for QueryMemoryTracker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("QueryMemoryTracker") + .field("current", &self.current.load(Ordering::Acquire)) + .field("limit", &self.limit) + .field( + "standard_tier_memory_fraction", + &self.standard_tier_memory_fraction, + ) + .field( + "privileged_count", + &self.privileged_count.load(Ordering::Acquire), + ) + .field("privileged_slots", &self.privileged_slots) + .field("on_update", &self.on_update.is_some()) + .field("on_reject", &self.on_reject.is_some()) + .finish() + } +} + +impl QueryMemoryTracker { + // Default privileged slots when max_concurrent_queries is 0. + const DEFAULT_PRIVILEGED_SLOTS: usize = 20; + // Ratio for privileged tier: 70% queries get privileged access, standard tier uses 70% memory. + const DEFAULT_PRIVILEGED_TIER_RATIO: f64 = 0.7; + + /// Create a new memory tracker with the given limit and max_concurrent_queries. + /// Calculates privileged slots as 70% of max_concurrent_queries (or 20 if max_concurrent_queries is 0). + /// + /// # Arguments + /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited. + /// * `max_concurrent_queries` - Maximum number of concurrent queries (0 = unlimited). + pub fn new(limit: usize, max_concurrent_queries: usize) -> Self { + let privileged_slots = Self::calculate_privileged_slots(max_concurrent_queries); + Self::with_privileged_slots(limit, privileged_slots) + } + + /// Create a new memory tracker with custom privileged slots limit. + pub fn with_privileged_slots(limit: usize, privileged_slots: usize) -> Self { + Self::with_config(limit, privileged_slots, Self::DEFAULT_PRIVILEGED_TIER_RATIO) + } + + /// Create a new memory tracker with full configuration. + /// + /// # Arguments + /// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited. + /// * `privileged_slots` - Maximum number of streams that can get privileged status. + /// * `standard_tier_memory_fraction` - Memory fraction for standard-tier streams (range: [0.0, 1.0]). + /// + /// # Panics + /// Panics if `standard_tier_memory_fraction` is not in the range [0.0, 1.0]. + pub fn with_config( + limit: usize, + privileged_slots: usize, + standard_tier_memory_fraction: f64, + ) -> Self { + assert!( + (0.0..=1.0).contains(&standard_tier_memory_fraction), + "standard_tier_memory_fraction must be in [0.0, 1.0], got {}", + standard_tier_memory_fraction + ); + + Self { + current: Arc::new(AtomicUsize::new(0)), + limit, + standard_tier_memory_fraction, + privileged_count: Arc::new(AtomicUsize::new(0)), + privileged_slots, + on_update: None, + on_reject: None, + } + } + + /// Register a new permit for memory tracking. + /// The first `privileged_slots` permits get privileged status automatically. + /// The returned permit can be shared across multiple streams of the same query. + pub fn register_permit(&self) -> MemoryPermit { + // Try to claim a privileged slot + let is_privileged = self + .privileged_count + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| { + if count < self.privileged_slots { + Some(count + 1) + } else { + None + } + }) + .is_ok(); + + MemoryPermit { + tracker: self.clone(), + is_privileged: AtomicBool::new(is_privileged), + } + } + + /// Set a callback to be called whenever the usage changes successfully. + /// The callback receives the new total usage in bytes. + /// + /// # Note + /// The callback is called after both successful `track()` and `release()` operations. + /// It is called even when `limit == 0` (unlimited mode) to track actual usage. + pub fn with_on_update(mut self, on_update: F) -> Self + where + F: Fn(usize) + Send + Sync + 'static, + { + self.on_update = Some(Arc::new(on_update)); + self + } + + /// Set a callback to be called when memory allocation is rejected. + /// + /// # Note + /// This is only called when `track()` fails due to exceeding the limit. + /// It is never called when `limit == 0` (unlimited mode). + pub fn with_on_reject(mut self, on_reject: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + self.on_reject = Some(Arc::new(on_reject)); + self + } + + /// Get the current memory usage in bytes. + pub fn current(&self) -> usize { + self.current.load(Ordering::Acquire) + } + + fn calculate_privileged_slots(max_concurrent_queries: usize) -> usize { + if max_concurrent_queries == 0 { + Self::DEFAULT_PRIVILEGED_SLOTS + } else { + ((max_concurrent_queries as f64 * Self::DEFAULT_PRIVILEGED_TIER_RATIO) as usize).max(1) + } + } + + /// Internal method to track additional memory usage. + /// + /// Called by `MemoryPermit::track()`. Use `MemoryPermit::track()` instead of calling this directly. + fn track_internal( + &self, + additional: usize, + is_privileged: bool, + stream_tracked: usize, + ) -> Result<()> { + // Calculate effective global limit based on stream privilege + // Privileged streams: can push global usage up to full limit + // Standard-tier streams: can only push global usage up to fraction of limit + let effective_limit = if is_privileged { + self.limit + } else { + (self.limit as f64 * self.standard_tier_memory_fraction) as usize + }; + + let mut new_total = 0; + let result = self + .current + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| { + new_total = current.saturating_add(additional); + + if self.limit == 0 { + // Unlimited mode + return Some(new_total); + } + + // Check if new global total exceeds effective limit + // The configured limit is absolute hard limit - no stream can exceed it + if new_total <= effective_limit { + Some(new_total) + } else { + None + } + }); + + match result { + Ok(_) => { + if let Some(callback) = &self.on_update { + callback(new_total); + } + Ok(()) + } + Err(current) => { + if let Some(callback) = &self.on_reject { + callback(); + } + let msg = format!( + "{} requested, {} used globally ({}%), {} used by this stream (privileged: {}), effective limit: {} ({}%), hard limit: {}", + ReadableSize(additional as u64), + ReadableSize(current as u64), + if self.limit > 0 { + current * 100 / self.limit + } else { + 0 + }, + ReadableSize(stream_tracked as u64), + is_privileged, + ReadableSize(effective_limit as u64), + if self.limit > 0 { + effective_limit * 100 / self.limit + } else { + 0 + }, + ReadableSize(self.limit as u64) + ); + error::ExceedMemoryLimitSnafu { msg }.fail() + } + } + } + + /// Release tracked memory. + /// + /// # Arguments + /// * `amount` - Amount of memory to release in bytes + pub fn release(&self, amount: usize) { + if let Ok(old_value) = + self.current + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| { + Some(current.saturating_sub(amount)) + }) + && let Some(callback) = &self.on_update + { + callback(old_value.saturating_sub(amount)); + } + } +} + +/// A wrapper stream that tracks memory usage of RecordBatches. +pub struct MemoryTrackedStream { + inner: SendableRecordBatchStream, + permit: Arc, + // Total tracked size, released when stream drops. + total_tracked: usize, +} + +impl MemoryTrackedStream { + pub fn new(inner: SendableRecordBatchStream, permit: Arc) -> Self { + Self { + inner, + permit, + total_tracked: 0, + } + } +} + +impl Stream for MemoryTrackedStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Ready(Some(Ok(batch))) => { + let additional = batch + .columns() + .iter() + .map(|c| c.memory_size()) + .sum::(); + + if let Err(e) = self.permit.track(additional, self.total_tracked) { + return Poll::Ready(Some(Err(e))); + } + + self.total_tracked += additional; + + Poll::Ready(Some(Ok(batch))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl Drop for MemoryTrackedStream { + fn drop(&mut self) { + if self.total_tracked > 0 { + self.permit.release(self.total_tracked); + } + } +} + +impl RecordBatchStream for MemoryTrackedStream { + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.inner.output_ordering() + } + + fn metrics(&self) -> Option { + self.inner.metrics() + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -496,4 +892,157 @@ mod tests { assert_eq!(collected[0], batch1); assert_eq!(collected[1], batch2); } + + #[test] + fn test_query_memory_tracker_basic() { + let tracker = Arc::new(QueryMemoryTracker::new(1000, 0)); + + // Register first stream - should get privileged status + let permit1 = tracker.register_permit(); + assert!(permit1.is_privileged()); + + // Privileged stream can use up to limit + assert!(permit1.track(500, 0).is_ok()); + assert_eq!(tracker.current(), 500); + + // Register second stream - also privileged + let permit2 = tracker.register_permit(); + assert!(permit2.is_privileged()); + // Can add more but cannot exceed hard limit (1000) + assert!(permit2.track(400, 0).is_ok()); + assert_eq!(tracker.current(), 900); + + permit1.release(500); + permit2.release(400); + assert_eq!(tracker.current(), 0); + } + + #[test] + fn test_query_memory_tracker_privileged_limit() { + // Privileged slots = 2 for easy testing + // Limit: 1000, standard-tier fraction: 0.7 (default) + // Privileged can push global to 1000, standard-tier can push global to 700 + let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 2)); + + // First 2 streams are privileged + let permit1 = tracker.register_permit(); + let permit2 = tracker.register_permit(); + assert!(permit1.is_privileged()); + assert!(permit2.is_privileged()); + + // Third stream is standard-tier (not privileged) + let permit3 = tracker.register_permit(); + assert!(!permit3.is_privileged()); + + // Privileged stream uses some memory + assert!(permit1.track(300, 0).is_ok()); + assert_eq!(tracker.current(), 300); + + // Standard-tier can add up to 400 (total becomes 700, its effective limit) + assert!(permit3.track(400, 0).is_ok()); + assert_eq!(tracker.current(), 700); + + // Standard-tier stream cannot push global beyond 700 + let err = permit3.track(100, 400).unwrap_err(); + let err_msg = err.to_string(); + assert!(err_msg.contains("400B used by this stream")); + assert!(err_msg.contains("effective limit: 700B (70%)")); + assert!(err_msg.contains("700B used globally (70%)")); + assert_eq!(tracker.current(), 700); + + permit1.release(300); + permit3.release(400); + assert_eq!(tracker.current(), 0); + } + + #[test] + fn test_query_memory_tracker_promotion() { + // Privileged slots = 1 for easy testing + let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 1)); + + // First stream is privileged + let permit1 = tracker.register_permit(); + assert!(permit1.is_privileged()); + + // Second stream is standard-tier (can only use 500) + let permit2 = tracker.register_permit(); + assert!(!permit2.is_privileged()); + + // Standard-tier can only track 500 + assert!(permit2.track(400, 0).is_ok()); + assert_eq!(tracker.current(), 400); + + // Drop first permit to release privileged slot + drop(permit1); + + // Second stream can now be promoted and use more memory + assert!(permit2.track(500, 400).is_ok()); + assert!(permit2.is_privileged()); + assert_eq!(tracker.current(), 900); + + permit2.release(900); + assert_eq!(tracker.current(), 0); + } + + #[test] + fn test_query_memory_tracker_privileged_hard_limit() { + // Test that the configured limit is absolute hard limit for all streams + // Privileged: can use full limit (1000) + // Standard-tier: can use 0.7x limit (700 with defaults) + let tracker = Arc::new(QueryMemoryTracker::new(1000, 0)); + + let permit1 = tracker.register_permit(); + assert!(permit1.is_privileged()); + + // Privileged can use up to full limit (1000) + assert!(permit1.track(900, 0).is_ok()); + assert_eq!(tracker.current(), 900); + + // Privileged cannot exceed hard limit (1000) + assert!(permit1.track(200, 900).is_err()); + assert_eq!(tracker.current(), 900); + + // Can add within hard limit + assert!(permit1.track(100, 900).is_ok()); + assert_eq!(tracker.current(), 1000); + + // Cannot exceed even by 1 byte + assert!(permit1.track(1, 1000).is_err()); + assert_eq!(tracker.current(), 1000); + + permit1.release(1000); + assert_eq!(tracker.current(), 0); + } + + #[test] + fn test_query_memory_tracker_standard_tier_fraction() { + // Test standard-tier streams use fraction of limit + // Limit: 1000, default fraction: 0.7, so standard-tier can use 700 + let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 1)); + + let permit1 = tracker.register_permit(); + assert!(permit1.is_privileged()); + + let permit2 = tracker.register_permit(); + assert!(!permit2.is_privileged()); + + // Standard-tier can use up to 700 (1000 * 0.7 default) + assert!(permit2.track(600, 0).is_ok()); + assert_eq!(tracker.current(), 600); + + // Cannot exceed standard-tier limit (700) + assert!(permit2.track(200, 600).is_err()); + assert_eq!(tracker.current(), 600); + + // Can add within standard-tier limit + assert!(permit2.track(100, 600).is_ok()); + assert_eq!(tracker.current(), 700); + + // Cannot exceed standard-tier limit + assert!(permit2.track(1, 700).is_err()); + assert_eq!(tracker.current(), 700); + + permit2.release(700); + assert_eq!(tracker.current(), 0); + } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 50d0ef4076..6b370c7eb6 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -522,6 +522,7 @@ impl DatanodeBuilder { file_ref_manager, partition_expr_fetcher.clone(), plugins, + opts.max_concurrent_queries, ); #[cfg(feature = "enterprise")] @@ -564,6 +565,7 @@ impl DatanodeBuilder { file_ref_manager, partition_expr_fetcher, plugins, + opts.max_concurrent_queries, ); #[cfg(feature = "enterprise")] @@ -585,6 +587,7 @@ impl DatanodeBuilder { file_ref_manager, partition_expr_fetcher.clone(), plugins, + opts.max_concurrent_queries, ); #[cfg(feature = "enterprise")] diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 9721d49040..a8d2482faf 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; +use common_base::memory_limit::MemoryLimit; use common_config::Configurable; use common_error::ext::BoxedError; use common_meta::key::TableMetadataManagerRef; @@ -132,6 +133,7 @@ impl Default for FlownodeOptions { query: QueryOptions { parallelism: 1, allow_query_fallback: false, + memory_pool_size: MemoryLimit::default(), }, user_provider: None, memory: MemoryOptions::default(), diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index e76a8dbe19..6b787f070f 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -18,6 +18,7 @@ use std::cmp; use std::path::Path; use std::time::Duration; +use common_base::memory_limit::MemoryLimit; use common_base::readable_size::ReadableSize; use common_stat::{get_total_cpu_cores, get_total_memory_readable}; use common_telemetry::warn; @@ -128,6 +129,9 @@ pub struct MitoConfig { pub max_concurrent_scan_files: usize, /// Whether to allow stale entries read during replay. pub allow_stale_entries: bool, + /// Memory limit for table scans across all queries. Setting it to 0 disables the limit. + /// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%"). + pub scan_memory_limit: MemoryLimit, /// Index configs. pub index: IndexConfig, @@ -182,6 +186,7 @@ impl Default for MitoConfig { parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES, allow_stale_entries: false, + scan_memory_limit: MemoryLimit::default(), index: IndexConfig::default(), inverted_index: InvertedIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(), diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index c63c7cf99b..22d55b7a57 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -83,7 +83,8 @@ use async_trait::async_trait; use common_base::Plugins; use common_error::ext::BoxedError; use common_meta::key::SchemaMetadataManagerRef; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream}; +use common_stat::get_total_memory_bytes; use common_telemetry::{info, tracing, warn}; use common_wal::options::{WAL_OPTIONS_KEY, WalOptions}; use futures::future::{join_all, try_join_all}; @@ -122,7 +123,9 @@ use crate::extension::BoxedExtensionRangeProviderFactory; use crate::gc::GcLimiterRef; use crate::manifest::action::RegionEdit; use crate::memtable::MemtableStats; -use crate::metrics::HANDLE_REQUEST_ELAPSED; +use crate::metrics::{ + HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL, +}; use crate::read::scan_region::{ScanRegion, Scanner}; use crate::read::stream::ScanBatchStream; use crate::region::MitoRegionRef; @@ -147,6 +150,7 @@ pub struct MitoEngineBuilder<'a, S: LogStore> { file_ref_manager: FileReferenceManagerRef, partition_expr_fetcher: PartitionExprFetcherRef, plugins: Plugins, + max_concurrent_queries: usize, #[cfg(feature = "enterprise")] extension_range_provider_factory: Option, } @@ -162,6 +166,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { file_ref_manager: FileReferenceManagerRef, partition_expr_fetcher: PartitionExprFetcherRef, plugins: Plugins, + max_concurrent_queries: usize, ) -> Self { Self { data_home, @@ -172,6 +177,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { file_ref_manager, plugins, partition_expr_fetcher, + max_concurrent_queries, #[cfg(feature = "enterprise")] extension_range_provider_factory: None, } @@ -204,10 +210,22 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> { ) .await?; let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store)); + let total_memory = get_total_memory_bytes().max(0) as u64; + let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize; + let scan_memory_tracker = + QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries) + .with_on_update(|usage| { + SCAN_MEMORY_USAGE_BYTES.set(usage as i64); + }) + .with_on_reject(|| { + SCAN_REQUESTS_REJECTED_TOTAL.inc(); + }); + let inner = EngineInner { workers, config, wal_raw_entry_reader, + scan_memory_tracker, #[cfg(feature = "enterprise")] extension_range_provider_factory: None, }; @@ -250,6 +268,7 @@ impl MitoEngine { file_ref_manager, partition_expr_fetcher, plugins, + 0, // Default: no limit on concurrent queries ); builder.try_build().await } @@ -614,6 +633,8 @@ struct EngineInner { config: Arc, /// The Wal raw entry reader. wal_raw_entry_reader: Arc, + /// Memory tracker for table scans. + scan_memory_tracker: QueryMemoryTracker, #[cfg(feature = "enterprise")] extension_range_provider_factory: Option, } @@ -1105,6 +1126,10 @@ impl RegionEngine for MitoEngine { .map_err(BoxedError::new) } + fn register_query_memory_permit(&self) -> Option> { + Some(Arc::new(self.inner.scan_memory_tracker.register_permit())) + } + async fn get_committed_sequence( &self, region_id: RegionId, @@ -1249,6 +1274,15 @@ impl MitoEngine { let config = Arc::new(config); let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone())); + let total_memory = get_total_memory_bytes().max(0) as u64; + let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize; + let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0) + .with_on_update(|usage| { + SCAN_MEMORY_USAGE_BYTES.set(usage as i64); + }) + .with_on_reject(|| { + SCAN_REQUESTS_REJECTED_TOTAL.inc(); + }); Ok(MitoEngine { inner: Arc::new(EngineInner { workers: WorkerGroup::start_for_test( @@ -1265,6 +1299,7 @@ impl MitoEngine { .await?, config, wal_raw_entry_reader, + scan_memory_tracker, #[cfg(feature = "enterprise")] extension_range_provider_factory: None, }), diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index fd8110b526..dcf8752dcd 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -36,6 +36,7 @@ pub const STAGING_TYPE: &str = "index_staging"; /// Recycle bin type label. pub const RECYCLE_TYPE: &str = "recycle_bin"; +// Write metrics. lazy_static! { /// Global write buffer size in bytes. pub static ref WRITE_BUFFER_BYTES: IntGauge = @@ -114,10 +115,10 @@ lazy_static! { &[TYPE_LABEL] ) .unwrap(); - // ------ End of write related metrics +} - - // Compaction metrics +// Compaction metrics. +lazy_static! { /// Timer of different stages in compaction. /// - pick /// - merge (in parallel) @@ -156,8 +157,10 @@ lazy_static! { "greptime_mito_inflight_compaction_count", "inflight compaction count", ).unwrap(); +} - // Query metrics. +// Query metrics. +lazy_static! { /// Timer of different stages in query. pub static ref READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_mito_read_stage_elapsed", @@ -207,9 +210,20 @@ lazy_static! { "Number of rows returned in a scan task", exponential_buckets(100.0, 10.0, 7).unwrap(), ).unwrap(); - // ------- End of query metrics. + /// Gauge for scan memory usage in bytes. + pub static ref SCAN_MEMORY_USAGE_BYTES: IntGauge = register_int_gauge!( + "greptime_mito_scan_memory_usage_bytes", + "current scan memory usage in bytes" + ).unwrap(); + /// Counter of rejected scan requests due to memory limit. + pub static ref SCAN_REQUESTS_REJECTED_TOTAL: IntCounter = register_int_counter!( + "greptime_mito_scan_requests_rejected_total", + "total number of scan requests rejected due to memory limit" + ).unwrap(); +} - // Cache related metrics. +// Cache metrics. +lazy_static! { /// Cache hit counter. pub static ref CACHE_HIT: IntCounterVec = register_int_counter_vec!( "greptime_mito_cache_hit", @@ -261,8 +275,10 @@ lazy_static! { "mito cache eviction", &[TYPE_LABEL, CACHE_EVICTION_CAUSE] ).unwrap(); - // ------- End of cache metrics. +} +// Index metrics. +lazy_static! { // Index metrics. /// Timer of index application. pub static ref INDEX_APPLY_ELAPSED: HistogramVec = register_histogram_vec!( @@ -359,8 +375,9 @@ lazy_static! { /// Counter of flush operations on intermediate files. pub static ref INDEX_INTERMEDIATE_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL .with_label_values(&["flush", "intermediate"]); - // ------- End of index metrics. +} +lazy_static! { /// Partition tree memtable data buffer freeze metrics pub static ref PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_partition_tree_buffer_freeze_stage_elapsed", @@ -405,7 +422,6 @@ lazy_static! { } -// Use another block to avoid reaching the recursion limit. lazy_static! { /// Counter for compaction input file size. pub static ref COMPACTION_INPUT_BYTES: Counter = register_counter!( diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 344d7bd5fc..89a356e9a1 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -33,6 +33,7 @@ common-plugins.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true +common-stat.workspace = true common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 798ae52549..907b5e8c99 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -185,7 +185,8 @@ impl TableProvider for DummyTableProvider { .handle_query(self.region_id, request.clone()) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; - let mut scan_exec = RegionScanExec::new(scanner, request)?; + let query_memory_permit = self.engine.register_query_memory_permit(); + let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?; if let Some(query_ctx) = &self.query_ctx { scan_exec.set_explain_verbose(query_ctx.explain_verbose()); } diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index 290f368a8f..e0d02e9a3d 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -62,6 +62,18 @@ lazy_static! { "query push down fallback errors total" ) .unwrap(); + + pub static ref QUERY_MEMORY_POOL_USAGE_BYTES: IntGauge = register_int_gauge!( + "greptime_query_memory_pool_usage_bytes", + "current query memory pool usage in bytes" + ) + .unwrap(); + + pub static ref QUERY_MEMORY_POOL_REJECTED_TOTAL: IntCounter = register_int_counter!( + "greptime_query_memory_pool_rejected_total", + "total number of query memory allocations rejected" + ) + .unwrap(); } /// A stream to call the callback once a RecordBatch stream is done. diff --git a/src/query/src/options.rs b/src/query/src/options.rs index 25e1a0a2a0..50ca1177a5 100644 --- a/src/query/src/options.rs +++ b/src/query/src/options.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_base::memory_limit::MemoryLimit; use serde::{Deserialize, Serialize}; /// Query engine config @@ -22,6 +23,10 @@ pub struct QueryOptions { pub parallelism: usize, /// Whether to allow query fallback when push down fails. pub allow_query_fallback: bool, + /// Memory pool size for query execution. Setting it to 0 disables the limit (unbounded). + /// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%"). + /// When this limit is reached, queries will fail with ResourceExhausted error. + pub memory_pool_size: MemoryLimit, } #[allow(clippy::derivable_impls)] @@ -30,6 +35,7 @@ impl Default for QueryOptions { Self { parallelism: 0, allow_query_fallback: false, + memory_pool_size: MemoryLimit::default(), } } } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 7b53f385e8..ffe64e7005 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -25,13 +25,17 @@ use common_function::handlers::{ FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef, }; use common_function::state::FunctionState; +use common_stat::get_total_memory_bytes; use common_telemetry::warn; use datafusion::catalog::TableFunction; use datafusion::dataframe::DataFrame; use datafusion::error::Result as DfResult; use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState}; -use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::execution::memory_pool::{ + GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, +}; +use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_optimizer::optimizer::PhysicalOptimizer; use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan; @@ -49,6 +53,7 @@ use crate::QueryEngineContext; use crate::dist_plan::{ DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner, }; +use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES}; use crate::optimizer::ExtensionAnalyzerRule; use crate::optimizer::constant_term::MatchesConstantTermOptimizer; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; @@ -100,7 +105,18 @@ impl QueryEngineState { plugins: Plugins, options: QueryOptionsNew, ) -> Self { - let runtime_env = Arc::new(RuntimeEnv::default()); + let total_memory = get_total_memory_bytes().max(0) as u64; + let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize; + let runtime_env = if memory_pool_size > 0 { + Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size))) + .build() + .expect("Failed to build RuntimeEnv"), + ) + } else { + Arc::new(RuntimeEnv::default()) + }; let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false); if options.parallelism > 0 { session_config = session_config.with_target_partitions(options.parallelism); @@ -420,3 +436,66 @@ impl DfQueryPlanner { } } } + +/// A wrapper around GreedyMemoryPool that records metrics. +/// +/// This wrapper intercepts all memory pool operations and updates +/// Prometheus metrics for monitoring query memory usage and rejections. +#[derive(Debug)] +struct MetricsMemoryPool { + inner: Arc, +} + +impl MetricsMemoryPool { + fn new(limit: usize) -> Self { + Self { + inner: Arc::new(GreedyMemoryPool::new(limit)), + } + } + + #[inline] + fn update_metrics(&self) { + QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64); + } +} + +impl MemoryPool for MetricsMemoryPool { + fn register(&self, consumer: &MemoryConsumer) { + self.inner.register(consumer); + } + + fn unregister(&self, consumer: &MemoryConsumer) { + self.inner.unregister(consumer); + } + + fn grow(&self, reservation: &MemoryReservation, additional: usize) { + self.inner.grow(reservation, additional); + self.update_metrics(); + } + + fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { + self.inner.shrink(reservation, shrink); + self.update_metrics(); + } + + fn try_grow( + &self, + reservation: &MemoryReservation, + additional: usize, + ) -> datafusion_common::Result<()> { + let result = self.inner.try_grow(reservation, additional); + if result.is_err() { + QUERY_MEMORY_POOL_REJECTED_TOTAL.inc(); + } + self.update_metrics(); + result + } + + fn reserved(&self) -> usize { + self.inner.reserved() + } + + fn memory_limit(&self) -> MemoryLimit { + self.inner.memory_limit() + } +} diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 80254fd39c..1a19f68551 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -23,7 +23,7 @@ use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole use api::region::RegionResponse; use async_trait::async_trait; use common_error::ext::BoxedError; -use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream}; use common_time::Timestamp; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; @@ -783,6 +783,11 @@ pub trait RegionEngine: Send + Sync { request: ScanRequest, ) -> Result; + /// Registers and returns a query memory permit. + fn register_query_memory_permit(&self) -> Option> { + None + } + /// Retrieves region's metadata. async fn get_metadata(&self, region_id: RegionId) -> Result; diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 1dc3982ed2..c67f5431e8 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -19,7 +19,10 @@ use std::task::{Context, Poll}; use std::time::Instant; use common_error::ext::BoxedError; -use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{ + DfRecordBatch, DfSendableRecordBatchStream, MemoryPermit, MemoryTrackedStream, + SendableRecordBatchStream, +}; use common_telemetry::tracing::Span; use common_telemetry::tracing_context::TracingContext; use common_telemetry::warn; @@ -49,7 +52,6 @@ use store_api::storage::{ScanRequest, TimeSeriesDistribution}; use crate::table::metrics::StreamMetrics; /// A plan to read multiple partitions from a region of a table. -#[derive(Debug)] pub struct RegionScanExec { scanner: Arc>, arrow_schema: ArrowSchemaRef, @@ -63,10 +65,32 @@ pub struct RegionScanExec { // TODO(ruihang): handle TimeWindowed dist via this parameter distribution: Option, explain_verbose: bool, + query_memory_permit: Option>, +} + +impl std::fmt::Debug for RegionScanExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RegionScanExec") + .field("scanner", &self.scanner) + .field("arrow_schema", &self.arrow_schema) + .field("output_ordering", &self.output_ordering) + .field("metric", &self.metric) + .field("properties", &self.properties) + .field("append_mode", &self.append_mode) + .field("total_rows", &self.total_rows) + .field("is_partition_set", &self.is_partition_set) + .field("distribution", &self.distribution) + .field("explain_verbose", &self.explain_verbose) + .finish() + } } impl RegionScanExec { - pub fn new(scanner: RegionScannerRef, request: ScanRequest) -> DfResult { + pub fn new( + scanner: RegionScannerRef, + request: ScanRequest, + query_memory_permit: Option>, + ) -> DfResult { let arrow_schema = scanner.schema().arrow_schema().clone(); let scanner_props = scanner.properties(); let mut num_output_partition = scanner_props.num_partitions(); @@ -170,6 +194,7 @@ impl RegionScanExec { is_partition_set: false, distribution: request.distribution, explain_verbose: false, + query_memory_permit, }) } @@ -237,6 +262,7 @@ impl RegionScanExec { is_partition_set: true, distribution: self.distribution, explain_verbose: self.explain_verbose, + query_memory_permit: self.query_memory_permit.clone(), }) } @@ -320,6 +346,13 @@ impl ExecutionPlan for RegionScanExec { .unwrap() .scan_partition(&ctx, &self.metric, partition) .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let stream = if let Some(permit) = &self.query_memory_permit { + Box::pin(MemoryTrackedStream::new(stream, permit.clone())) + } else { + stream + }; + let stream_metrics = StreamMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper { stream, @@ -511,7 +544,7 @@ mod test { let region_metadata = Arc::new(builder.build().unwrap()); let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata)); - let plan = RegionScanExec::new(scanner, ScanRequest::default()).unwrap(); + let plan = RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap(); let actual: SchemaRef = Arc::new( plan.properties .eq_properties diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index b4d437cd99..b90541f6da 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1584,6 +1584,8 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { "enable_virtual_host_style =", "cache_path =", "cache_capacity =", + "memory_pool_size =", + "scan_memory_limit =", "sas_token =", "scope =", "num_workers =",