feat: query mem limiter (#7078)

* feat: query mem limiter

* feat: config docs

* feat: frontend query limit config

* fix: unused imports

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: add metrics for query memory tracker

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: right postion for tracker

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: avoid race condition

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: soft and hard limit

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: docs

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: when soft_limit == 0

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: upgrade limit algorithm

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: remove batch window

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: batch mem size

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: refine limit algorithm

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: get sys mem

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: minor change

* feat: up tracker to the top stream

* feat: estimated_size for batch

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: minor refactor

* feat: scan_memory_limit connect to max_concurrent_queries

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: make callback clearly

* feat: add unlimted enum

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: by review comment

* chore: comment on recursion_limit

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: refactor and put permit into RegionScanExec

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: multiple lazy static blocks

* chore: minor change

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2025-11-11 15:47:55 +08:00
committed by GitHub
parent afa8684ebd
commit c7fded29ee
24 changed files with 1118 additions and 22 deletions

1
Cargo.lock generated
View File

@@ -10127,6 +10127,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"common-runtime",
"common-stat",
"common-telemetry",
"common-time",
"datafusion",

View File

@@ -16,7 +16,7 @@
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited.<br/>NOTE: This setting affects scan_memory_limit's privileged tier allocation.<br/>When set, 70% of queries get privileged memory access (full scan_memory_limit).<br/>The remaining 30% get standard tier access (70% of scan_memory_limit). |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
@@ -104,6 +104,7 @@
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join).<br/>Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit (unbounded, default behavior).<br/>When this limit is reached, queries will fail with ResourceExhausted error.<br/>NOTE: This does NOT limit memory used by table scans. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
@@ -155,6 +156,7 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.<br/>Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit.<br/>NOTE: Works with max_concurrent_queries for tiered memory allocation.<br/>- If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.<br/>- If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -308,6 +310,7 @@
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `query.allow_query_fallback` | Bool | `false` | Whether to allow query fallback when push down optimize fails.<br/>Default to false, meaning when push down optimize failed, return error msg |
| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join).<br/>Supports absolute size (e.g., "4GB", "8GB") or percentage of system memory (e.g., "30%").<br/>Setting it to 0 disables the limit (unbounded, default behavior).<br/>When this limit is reached, queries will fail with ResourceExhausted error.<br/>NOTE: This does NOT limit memory used by table scans (only applies to datanodes). |
| `datanode` | -- | -- | Datanode options. |
| `datanode.client` | -- | -- | Datanode client options. |
| `datanode.client.connect_timeout` | String | `10s` | -- |
@@ -446,7 +449,7 @@
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited.<br/>NOTE: This setting affects scan_memory_limit's privileged tier allocation.<br/>When set, 70% of queries get privileged memory access (full scan_memory_limit).<br/>The remaining 30% get standard tier access (70% of scan_memory_limit). |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
@@ -500,6 +503,7 @@
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `query` | -- | -- | The query engine options. |
| `query.parallelism` | Integer | `0` | Parallelism of the query engine.<br/>Default to 0, which means the number of CPU cores. |
| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join).<br/>Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit (unbounded, default behavior).<br/>When this limit is reached, queries will fail with ResourceExhausted error.<br/>NOTE: This does NOT limit memory used by table scans. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
@@ -553,6 +557,7 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.<br/>Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit.<br/>NOTE: Works with max_concurrent_queries for tiered memory allocation.<br/>- If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.<br/>- If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -673,5 +678,6 @@
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
| `query` | -- | -- | -- |
| `query.parallelism` | Integer | `1` | Parallelism of the query engine for query sent by flownode.<br/>Default to 1, so it won't use too much cpu or memory |
| `query.memory_pool_size` | String | `50%` | Memory pool size for query execution operators (aggregation, sorting, join).<br/>Supports absolute size (e.g., "1GB", "2GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit (unbounded, default behavior).<br/>When this limit is reached, queries will fail with ResourceExhausted error.<br/>NOTE: This does NOT limit memory used by table scans. |
| `memory` | -- | -- | The memory options. |
| `memory.enable_heap_profiling` | Bool | `true` | Whether to enable heap profiling activation during startup.<br/>When enabled, heap profiling will be activated if the `MALLOC_CONF` environment variable<br/>is set to "prof:true,prof_active:false". The official image adds this env variable.<br/>Default is true. |

View File

@@ -18,6 +18,9 @@ init_regions_in_background = false
init_regions_parallelism = 16
## The maximum current queries allowed to be executed. Zero means unlimited.
## NOTE: This setting affects scan_memory_limit's privileged tier allocation.
## When set, 70% of queries get privileged memory access (full scan_memory_limit).
## The remaining 30% get standard tier access (70% of scan_memory_limit).
max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
@@ -261,6 +264,13 @@ overwrite_entry_start_id = false
## Default to 0, which means the number of CPU cores.
parallelism = 0
## Memory pool size for query execution operators (aggregation, sorting, join).
## Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").
## Setting it to 0 disables the limit (unbounded, default behavior).
## When this limit is reached, queries will fail with ResourceExhausted error.
## NOTE: This does NOT limit memory used by table scans.
memory_pool_size = "50%"
## The data storage options.
[storage]
## The working home directory.
@@ -501,6 +511,14 @@ max_concurrent_scan_files = 384
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
## Memory limit for table scans across all queries.
## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
## Setting it to 0 disables the limit.
## NOTE: Works with max_concurrent_queries for tiered memory allocation.
## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access.
scan_memory_limit = "50%"
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"

View File

@@ -158,6 +158,13 @@ default_ratio = 1.0
## Default to 1, so it won't use too much cpu or memory
parallelism = 1
## Memory pool size for query execution operators (aggregation, sorting, join).
## Supports absolute size (e.g., "1GB", "2GB") or percentage of system memory (e.g., "20%").
## Setting it to 0 disables the limit (unbounded, default behavior).
## When this limit is reached, queries will fail with ResourceExhausted error.
## NOTE: This does NOT limit memory used by table scans.
memory_pool_size = "50%"
## The memory options.
[memory]
## Whether to enable heap profiling activation during startup.

View File

@@ -256,6 +256,13 @@ parallelism = 0
## Default to false, meaning when push down optimize failed, return error msg
allow_query_fallback = false
## Memory pool size for query execution operators (aggregation, sorting, join).
## Supports absolute size (e.g., "4GB", "8GB") or percentage of system memory (e.g., "30%").
## Setting it to 0 disables the limit (unbounded, default behavior).
## When this limit is reached, queries will fail with ResourceExhausted error.
## NOTE: This does NOT limit memory used by table scans (only applies to datanodes).
memory_pool_size = "50%"
## Datanode options.
[datanode]
## Datanode client options.

View File

@@ -14,6 +14,9 @@ init_regions_in_background = false
init_regions_parallelism = 16
## The maximum current queries allowed to be executed. Zero means unlimited.
## NOTE: This setting affects scan_memory_limit's privileged tier allocation.
## When set, 70% of queries get privileged memory access (full scan_memory_limit).
## The remaining 30% get standard tier access (70% of scan_memory_limit).
max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
@@ -365,6 +368,13 @@ max_running_procedures = 128
## Default to 0, which means the number of CPU cores.
parallelism = 0
## Memory pool size for query execution operators (aggregation, sorting, join).
## Supports absolute size (e.g., "2GB", "4GB") or percentage of system memory (e.g., "20%").
## Setting it to 0 disables the limit (unbounded, default behavior).
## When this limit is reached, queries will fail with ResourceExhausted error.
## NOTE: This does NOT limit memory used by table scans.
memory_pool_size = "50%"
## The data storage options.
[storage]
## The working home directory.
@@ -592,6 +602,14 @@ max_concurrent_scan_files = 384
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
## Memory limit for table scans across all queries.
## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
## Setting it to 0 disables the limit.
## NOTE: Works with max_concurrent_queries for tiered memory allocation.
## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access.
scan_memory_limit = "50%"
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"

View File

@@ -15,6 +15,7 @@
use std::time::Duration;
use cmd::options::GreptimeOptions;
use common_base::memory_limit::MemoryLimit;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions};
@@ -74,6 +75,7 @@ fn test_load_datanode_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
scan_memory_limit: MemoryLimit::Percentage(50),
..Default::default()
}),
RegionEngineConfig::File(FileEngineConfig {}),
@@ -82,6 +84,10 @@ fn test_load_datanode_example_config() {
flush_metadata_region_interval: Duration::from_secs(30),
}),
],
query: QueryOptions {
memory_pool_size: MemoryLimit::Percentage(50),
..Default::default()
},
logging: LoggingOptions {
level: Some("info".to_string()),
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
@@ -155,6 +161,10 @@ fn test_load_frontend_example_config() {
cors_allowed_origins: vec!["https://example.com".to_string()],
..Default::default()
},
query: QueryOptions {
memory_pool_size: MemoryLimit::Percentage(50),
..Default::default()
},
..Default::default()
},
..Default::default()
@@ -242,6 +252,7 @@ fn test_load_flownode_example_config() {
query: QueryOptions {
parallelism: 1,
allow_query_fallback: false,
memory_pool_size: MemoryLimit::Percentage(50),
},
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
@@ -286,6 +297,7 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
scan_memory_limit: MemoryLimit::Percentage(50),
..Default::default()
}),
RegionEngineConfig::File(FileEngineConfig {}),
@@ -314,7 +326,10 @@ fn test_load_standalone_example_config() {
cors_allowed_origins: vec!["https://example.com".to_string()],
..Default::default()
},
query: QueryOptions {
memory_pool_size: MemoryLimit::Percentage(50),
..Default::default()
},
..Default::default()
},
..Default::default()

View File

@@ -15,6 +15,7 @@
pub mod bit_vec;
pub mod bytes;
pub mod cancellation;
pub mod memory_limit;
pub mod plugins;
pub mod range_read;
#[allow(clippy::all)]

View File

@@ -0,0 +1,265 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self, Display};
use std::str::FromStr;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::readable_size::ReadableSize;
/// Memory limit configuration that supports both absolute size and percentage.
///
/// Examples:
/// - Absolute size: "2GB", "4GiB", "512MB"
/// - Percentage: "50%", "75%"
/// - Unlimited: "unlimited", "0"
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum MemoryLimit {
/// Absolute memory size.
Size(ReadableSize),
/// Percentage of total system memory (0-100).
Percentage(u8),
/// No memory limit.
#[default]
Unlimited,
}
impl MemoryLimit {
/// Resolve the memory limit to bytes based on total system memory.
/// Returns 0 if the limit is unlimited.
pub fn resolve(&self, total_memory_bytes: u64) -> u64 {
match self {
MemoryLimit::Size(size) => size.as_bytes(),
MemoryLimit::Percentage(pct) => total_memory_bytes * (*pct as u64) / 100,
MemoryLimit::Unlimited => 0,
}
}
/// Returns true if this limit is unlimited.
pub fn is_unlimited(&self) -> bool {
match self {
MemoryLimit::Size(size) => size.as_bytes() == 0,
MemoryLimit::Percentage(pct) => *pct == 0,
MemoryLimit::Unlimited => true,
}
}
}
impl FromStr for MemoryLimit {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.trim();
if s.eq_ignore_ascii_case("unlimited") {
return Ok(MemoryLimit::Unlimited);
}
if let Some(pct_str) = s.strip_suffix('%') {
let pct = pct_str
.trim()
.parse::<u8>()
.map_err(|e| format!("invalid percentage value '{}': {}", pct_str, e))?;
if pct > 100 {
return Err(format!("percentage must be between 0 and 100, got {}", pct));
}
if pct == 0 {
Ok(MemoryLimit::Unlimited)
} else {
Ok(MemoryLimit::Percentage(pct))
}
} else {
let size = ReadableSize::from_str(s)?;
if size.as_bytes() == 0 {
Ok(MemoryLimit::Unlimited)
} else {
Ok(MemoryLimit::Size(size))
}
}
}
}
impl Display for MemoryLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MemoryLimit::Size(size) => write!(f, "{}", size),
MemoryLimit::Percentage(pct) => write!(f, "{}%", pct),
MemoryLimit::Unlimited => write!(f, "unlimited"),
}
}
}
impl Serialize for MemoryLimit {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
impl<'de> Deserialize<'de> for MemoryLimit {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
MemoryLimit::from_str(&s).map_err(serde::de::Error::custom)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_absolute_size() {
assert_eq!(
"2GB".parse::<MemoryLimit>().unwrap(),
MemoryLimit::Size(ReadableSize(2 * 1024 * 1024 * 1024))
);
assert_eq!(
"512MB".parse::<MemoryLimit>().unwrap(),
MemoryLimit::Size(ReadableSize(512 * 1024 * 1024))
);
assert_eq!("0".parse::<MemoryLimit>().unwrap(), MemoryLimit::Unlimited);
}
#[test]
fn test_parse_percentage() {
assert_eq!(
"50%".parse::<MemoryLimit>().unwrap(),
MemoryLimit::Percentage(50)
);
assert_eq!(
"75%".parse::<MemoryLimit>().unwrap(),
MemoryLimit::Percentage(75)
);
assert_eq!("0%".parse::<MemoryLimit>().unwrap(), MemoryLimit::Unlimited);
}
#[test]
fn test_parse_invalid() {
assert!("150%".parse::<MemoryLimit>().is_err());
assert!("-10%".parse::<MemoryLimit>().is_err());
assert!("invalid".parse::<MemoryLimit>().is_err());
}
#[test]
fn test_resolve() {
let total = 8 * 1024 * 1024 * 1024; // 8GB
assert_eq!(
MemoryLimit::Size(ReadableSize(2 * 1024 * 1024 * 1024)).resolve(total),
2 * 1024 * 1024 * 1024
);
assert_eq!(
MemoryLimit::Percentage(50).resolve(total),
4 * 1024 * 1024 * 1024
);
assert_eq!(MemoryLimit::Unlimited.resolve(total), 0);
}
#[test]
fn test_is_unlimited() {
assert!(MemoryLimit::Unlimited.is_unlimited());
assert!(!MemoryLimit::Size(ReadableSize(1024)).is_unlimited());
assert!(!MemoryLimit::Percentage(50).is_unlimited());
assert!(!MemoryLimit::Percentage(1).is_unlimited());
// Defensive: these states shouldn't exist via public API, but check anyway
assert!(MemoryLimit::Size(ReadableSize(0)).is_unlimited());
assert!(MemoryLimit::Percentage(0).is_unlimited());
}
#[test]
fn test_parse_100_percent() {
assert_eq!(
"100%".parse::<MemoryLimit>().unwrap(),
MemoryLimit::Percentage(100)
);
}
#[test]
fn test_display_percentage() {
assert_eq!(MemoryLimit::Percentage(20).to_string(), "20%");
assert_eq!(MemoryLimit::Percentage(50).to_string(), "50%");
assert_eq!(MemoryLimit::Percentage(100).to_string(), "100%");
}
#[test]
fn test_parse_unlimited() {
assert_eq!(
"unlimited".parse::<MemoryLimit>().unwrap(),
MemoryLimit::Unlimited
);
assert_eq!(
"UNLIMITED".parse::<MemoryLimit>().unwrap(),
MemoryLimit::Unlimited
);
assert_eq!(
"Unlimited".parse::<MemoryLimit>().unwrap(),
MemoryLimit::Unlimited
);
}
#[test]
fn test_display_unlimited() {
assert_eq!(MemoryLimit::Unlimited.to_string(), "unlimited");
}
#[test]
fn test_parse_display_roundtrip() {
let cases = vec![
"50%",
"100%",
"1%",
"2GB",
"512MB",
"unlimited",
"UNLIMITED",
"0", // normalized to unlimited
"0%", // normalized to unlimited
];
for input in cases {
let parsed = input.parse::<MemoryLimit>().unwrap();
let displayed = parsed.to_string();
let reparsed = displayed.parse::<MemoryLimit>().unwrap();
assert_eq!(
parsed, reparsed,
"round-trip failed: '{}' -> '{}' -> '{:?}'",
input, displayed, reparsed
);
}
}
#[test]
fn test_zero_normalization() {
// All forms of zero should normalize to Unlimited
assert_eq!("0".parse::<MemoryLimit>().unwrap(), MemoryLimit::Unlimited);
assert_eq!("0%".parse::<MemoryLimit>().unwrap(), MemoryLimit::Unlimited);
assert_eq!("0B".parse::<MemoryLimit>().unwrap(), MemoryLimit::Unlimited);
assert_eq!(
"0KB".parse::<MemoryLimit>().unwrap(),
MemoryLimit::Unlimited
);
// Unlimited always displays as "unlimited"
assert_eq!(MemoryLimit::Unlimited.to_string(), "unlimited");
}
}

View File

@@ -193,6 +193,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Exceeded memory limit: {}", msg))]
ExceedMemoryLimit {
msg: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -229,6 +236,8 @@ impl ErrorExt for Error {
Error::StreamTimeout { .. } => StatusCode::Cancelled,
Error::StreamCancelled { .. } => StatusCode::Cancelled,
Error::ExceedMemoryLimit { .. } => StatusCode::RuntimeResourcesExhausted,
}
}

View File

@@ -21,11 +21,14 @@ pub mod filter;
mod recordbatch;
pub mod util;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
use common_base::readable_size::ReadableSize;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::compute::SortOptions;
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
@@ -406,6 +409,399 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStream
}
}
/// Memory permit for a stream, providing privileged access or rate limiting.
///
/// The permit tracks whether this stream has privileged Top-K status.
/// When dropped, it automatically releases any privileged slot it holds.
pub struct MemoryPermit {
tracker: QueryMemoryTracker,
is_privileged: AtomicBool,
}
impl MemoryPermit {
/// Check if this permit currently has privileged status.
pub fn is_privileged(&self) -> bool {
self.is_privileged.load(Ordering::Acquire)
}
/// Ensure this permit has privileged status by acquiring a slot if available.
/// Returns true if privileged (either already privileged or just acquired privilege).
fn ensure_privileged(&self) -> bool {
if self.is_privileged.load(Ordering::Acquire) {
return true;
}
// Try to claim a privileged slot
self.tracker
.privileged_count
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
if count < self.tracker.privileged_slots {
Some(count + 1)
} else {
None
}
})
.map(|_| {
self.is_privileged.store(true, Ordering::Release);
true
})
.unwrap_or(false)
}
/// Track additional memory usage with this permit.
/// Returns error if limit is exceeded.
///
/// # Arguments
/// * `additional` - Additional memory size to track in bytes
/// * `stream_tracked` - Total memory already tracked by this stream
///
/// # Behavior
/// - Privileged streams: Can push global memory usage up to full limit
/// - Standard-tier streams: Can push global memory usage up to limit * standard_tier_memory_fraction (default: 0.7)
/// - Standard-tier streams automatically attempt to acquire privilege if slots become available
/// - The configured limit is absolute hard limit - no stream can exceed it
pub fn track(&self, additional: usize, stream_tracked: usize) -> Result<()> {
// Ensure privileged status if possible
let is_privileged = self.ensure_privileged();
self.tracker
.track_internal(additional, is_privileged, stream_tracked)
}
/// Release tracked memory.
///
/// # Arguments
/// * `amount` - Amount of memory to release in bytes
pub fn release(&self, amount: usize) {
self.tracker.release(amount);
}
}
impl Drop for MemoryPermit {
fn drop(&mut self) {
// Release privileged slot if we had one
if self.is_privileged.load(Ordering::Acquire) {
self.tracker
.privileged_count
.fetch_sub(1, Ordering::Release);
}
}
}
/// Memory tracker for RecordBatch streams. Clone to share the same limit across queries.
///
/// Implements a two-tier memory allocation strategy:
/// - **Privileged tier**: First N streams (default: 20) can use up to the full memory limit
/// - **Standard tier**: Remaining streams are restricted to a fraction of the limit (default: 70%)
/// - Privilege is granted on a first-come-first-served basis
/// - The configured limit is an absolute hard cap - no stream can exceed it
#[derive(Clone)]
pub struct QueryMemoryTracker {
current: Arc<AtomicUsize>,
limit: usize,
standard_tier_memory_fraction: f64,
privileged_count: Arc<AtomicUsize>,
privileged_slots: usize,
on_update: Option<Arc<dyn Fn(usize) + Send + Sync>>,
on_reject: Option<Arc<dyn Fn() + Send + Sync>>,
}
impl fmt::Debug for QueryMemoryTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueryMemoryTracker")
.field("current", &self.current.load(Ordering::Acquire))
.field("limit", &self.limit)
.field(
"standard_tier_memory_fraction",
&self.standard_tier_memory_fraction,
)
.field(
"privileged_count",
&self.privileged_count.load(Ordering::Acquire),
)
.field("privileged_slots", &self.privileged_slots)
.field("on_update", &self.on_update.is_some())
.field("on_reject", &self.on_reject.is_some())
.finish()
}
}
impl QueryMemoryTracker {
// Default privileged slots when max_concurrent_queries is 0.
const DEFAULT_PRIVILEGED_SLOTS: usize = 20;
// Ratio for privileged tier: 70% queries get privileged access, standard tier uses 70% memory.
const DEFAULT_PRIVILEGED_TIER_RATIO: f64 = 0.7;
/// Create a new memory tracker with the given limit and max_concurrent_queries.
/// Calculates privileged slots as 70% of max_concurrent_queries (or 20 if max_concurrent_queries is 0).
///
/// # Arguments
/// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
/// * `max_concurrent_queries` - Maximum number of concurrent queries (0 = unlimited).
pub fn new(limit: usize, max_concurrent_queries: usize) -> Self {
let privileged_slots = Self::calculate_privileged_slots(max_concurrent_queries);
Self::with_privileged_slots(limit, privileged_slots)
}
/// Create a new memory tracker with custom privileged slots limit.
pub fn with_privileged_slots(limit: usize, privileged_slots: usize) -> Self {
Self::with_config(limit, privileged_slots, Self::DEFAULT_PRIVILEGED_TIER_RATIO)
}
/// Create a new memory tracker with full configuration.
///
/// # Arguments
/// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
/// * `privileged_slots` - Maximum number of streams that can get privileged status.
/// * `standard_tier_memory_fraction` - Memory fraction for standard-tier streams (range: [0.0, 1.0]).
///
/// # Panics
/// Panics if `standard_tier_memory_fraction` is not in the range [0.0, 1.0].
pub fn with_config(
limit: usize,
privileged_slots: usize,
standard_tier_memory_fraction: f64,
) -> Self {
assert!(
(0.0..=1.0).contains(&standard_tier_memory_fraction),
"standard_tier_memory_fraction must be in [0.0, 1.0], got {}",
standard_tier_memory_fraction
);
Self {
current: Arc::new(AtomicUsize::new(0)),
limit,
standard_tier_memory_fraction,
privileged_count: Arc::new(AtomicUsize::new(0)),
privileged_slots,
on_update: None,
on_reject: None,
}
}
/// Register a new permit for memory tracking.
/// The first `privileged_slots` permits get privileged status automatically.
/// The returned permit can be shared across multiple streams of the same query.
pub fn register_permit(&self) -> MemoryPermit {
// Try to claim a privileged slot
let is_privileged = self
.privileged_count
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
if count < self.privileged_slots {
Some(count + 1)
} else {
None
}
})
.is_ok();
MemoryPermit {
tracker: self.clone(),
is_privileged: AtomicBool::new(is_privileged),
}
}
/// Set a callback to be called whenever the usage changes successfully.
/// The callback receives the new total usage in bytes.
///
/// # Note
/// The callback is called after both successful `track()` and `release()` operations.
/// It is called even when `limit == 0` (unlimited mode) to track actual usage.
pub fn with_on_update<F>(mut self, on_update: F) -> Self
where
F: Fn(usize) + Send + Sync + 'static,
{
self.on_update = Some(Arc::new(on_update));
self
}
/// Set a callback to be called when memory allocation is rejected.
///
/// # Note
/// This is only called when `track()` fails due to exceeding the limit.
/// It is never called when `limit == 0` (unlimited mode).
pub fn with_on_reject<F>(mut self, on_reject: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.on_reject = Some(Arc::new(on_reject));
self
}
/// Get the current memory usage in bytes.
pub fn current(&self) -> usize {
self.current.load(Ordering::Acquire)
}
fn calculate_privileged_slots(max_concurrent_queries: usize) -> usize {
if max_concurrent_queries == 0 {
Self::DEFAULT_PRIVILEGED_SLOTS
} else {
((max_concurrent_queries as f64 * Self::DEFAULT_PRIVILEGED_TIER_RATIO) as usize).max(1)
}
}
/// Internal method to track additional memory usage.
///
/// Called by `MemoryPermit::track()`. Use `MemoryPermit::track()` instead of calling this directly.
fn track_internal(
&self,
additional: usize,
is_privileged: bool,
stream_tracked: usize,
) -> Result<()> {
// Calculate effective global limit based on stream privilege
// Privileged streams: can push global usage up to full limit
// Standard-tier streams: can only push global usage up to fraction of limit
let effective_limit = if is_privileged {
self.limit
} else {
(self.limit as f64 * self.standard_tier_memory_fraction) as usize
};
let mut new_total = 0;
let result = self
.current
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
new_total = current.saturating_add(additional);
if self.limit == 0 {
// Unlimited mode
return Some(new_total);
}
// Check if new global total exceeds effective limit
// The configured limit is absolute hard limit - no stream can exceed it
if new_total <= effective_limit {
Some(new_total)
} else {
None
}
});
match result {
Ok(_) => {
if let Some(callback) = &self.on_update {
callback(new_total);
}
Ok(())
}
Err(current) => {
if let Some(callback) = &self.on_reject {
callback();
}
let msg = format!(
"{} requested, {} used globally ({}%), {} used by this stream (privileged: {}), effective limit: {} ({}%), hard limit: {}",
ReadableSize(additional as u64),
ReadableSize(current as u64),
if self.limit > 0 {
current * 100 / self.limit
} else {
0
},
ReadableSize(stream_tracked as u64),
is_privileged,
ReadableSize(effective_limit as u64),
if self.limit > 0 {
effective_limit * 100 / self.limit
} else {
0
},
ReadableSize(self.limit as u64)
);
error::ExceedMemoryLimitSnafu { msg }.fail()
}
}
}
/// Release tracked memory.
///
/// # Arguments
/// * `amount` - Amount of memory to release in bytes
pub fn release(&self, amount: usize) {
if let Ok(old_value) =
self.current
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
Some(current.saturating_sub(amount))
})
&& let Some(callback) = &self.on_update
{
callback(old_value.saturating_sub(amount));
}
}
}
/// A wrapper stream that tracks memory usage of RecordBatches.
pub struct MemoryTrackedStream {
inner: SendableRecordBatchStream,
permit: Arc<MemoryPermit>,
// Total tracked size, released when stream drops.
total_tracked: usize,
}
impl MemoryTrackedStream {
pub fn new(inner: SendableRecordBatchStream, permit: Arc<MemoryPermit>) -> Self {
Self {
inner,
permit,
total_tracked: 0,
}
}
}
impl Stream for MemoryTrackedStream {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => {
let additional = batch
.columns()
.iter()
.map(|c| c.memory_size())
.sum::<usize>();
if let Err(e) = self.permit.track(additional, self.total_tracked) {
return Poll::Ready(Some(Err(e)));
}
self.total_tracked += additional;
Poll::Ready(Some(Ok(batch)))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl Drop for MemoryTrackedStream {
fn drop(&mut self) {
if self.total_tracked > 0 {
self.permit.release(self.total_tracked);
}
}
}
impl RecordBatchStream for MemoryTrackedStream {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.inner.output_ordering()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.inner.metrics()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -496,4 +892,157 @@ mod tests {
assert_eq!(collected[0], batch1);
assert_eq!(collected[1], batch2);
}
#[test]
fn test_query_memory_tracker_basic() {
let tracker = Arc::new(QueryMemoryTracker::new(1000, 0));
// Register first stream - should get privileged status
let permit1 = tracker.register_permit();
assert!(permit1.is_privileged());
// Privileged stream can use up to limit
assert!(permit1.track(500, 0).is_ok());
assert_eq!(tracker.current(), 500);
// Register second stream - also privileged
let permit2 = tracker.register_permit();
assert!(permit2.is_privileged());
// Can add more but cannot exceed hard limit (1000)
assert!(permit2.track(400, 0).is_ok());
assert_eq!(tracker.current(), 900);
permit1.release(500);
permit2.release(400);
assert_eq!(tracker.current(), 0);
}
#[test]
fn test_query_memory_tracker_privileged_limit() {
// Privileged slots = 2 for easy testing
// Limit: 1000, standard-tier fraction: 0.7 (default)
// Privileged can push global to 1000, standard-tier can push global to 700
let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 2));
// First 2 streams are privileged
let permit1 = tracker.register_permit();
let permit2 = tracker.register_permit();
assert!(permit1.is_privileged());
assert!(permit2.is_privileged());
// Third stream is standard-tier (not privileged)
let permit3 = tracker.register_permit();
assert!(!permit3.is_privileged());
// Privileged stream uses some memory
assert!(permit1.track(300, 0).is_ok());
assert_eq!(tracker.current(), 300);
// Standard-tier can add up to 400 (total becomes 700, its effective limit)
assert!(permit3.track(400, 0).is_ok());
assert_eq!(tracker.current(), 700);
// Standard-tier stream cannot push global beyond 700
let err = permit3.track(100, 400).unwrap_err();
let err_msg = err.to_string();
assert!(err_msg.contains("400B used by this stream"));
assert!(err_msg.contains("effective limit: 700B (70%)"));
assert!(err_msg.contains("700B used globally (70%)"));
assert_eq!(tracker.current(), 700);
permit1.release(300);
permit3.release(400);
assert_eq!(tracker.current(), 0);
}
#[test]
fn test_query_memory_tracker_promotion() {
// Privileged slots = 1 for easy testing
let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 1));
// First stream is privileged
let permit1 = tracker.register_permit();
assert!(permit1.is_privileged());
// Second stream is standard-tier (can only use 500)
let permit2 = tracker.register_permit();
assert!(!permit2.is_privileged());
// Standard-tier can only track 500
assert!(permit2.track(400, 0).is_ok());
assert_eq!(tracker.current(), 400);
// Drop first permit to release privileged slot
drop(permit1);
// Second stream can now be promoted and use more memory
assert!(permit2.track(500, 400).is_ok());
assert!(permit2.is_privileged());
assert_eq!(tracker.current(), 900);
permit2.release(900);
assert_eq!(tracker.current(), 0);
}
#[test]
fn test_query_memory_tracker_privileged_hard_limit() {
// Test that the configured limit is absolute hard limit for all streams
// Privileged: can use full limit (1000)
// Standard-tier: can use 0.7x limit (700 with defaults)
let tracker = Arc::new(QueryMemoryTracker::new(1000, 0));
let permit1 = tracker.register_permit();
assert!(permit1.is_privileged());
// Privileged can use up to full limit (1000)
assert!(permit1.track(900, 0).is_ok());
assert_eq!(tracker.current(), 900);
// Privileged cannot exceed hard limit (1000)
assert!(permit1.track(200, 900).is_err());
assert_eq!(tracker.current(), 900);
// Can add within hard limit
assert!(permit1.track(100, 900).is_ok());
assert_eq!(tracker.current(), 1000);
// Cannot exceed even by 1 byte
assert!(permit1.track(1, 1000).is_err());
assert_eq!(tracker.current(), 1000);
permit1.release(1000);
assert_eq!(tracker.current(), 0);
}
#[test]
fn test_query_memory_tracker_standard_tier_fraction() {
// Test standard-tier streams use fraction of limit
// Limit: 1000, default fraction: 0.7, so standard-tier can use 700
let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 1));
let permit1 = tracker.register_permit();
assert!(permit1.is_privileged());
let permit2 = tracker.register_permit();
assert!(!permit2.is_privileged());
// Standard-tier can use up to 700 (1000 * 0.7 default)
assert!(permit2.track(600, 0).is_ok());
assert_eq!(tracker.current(), 600);
// Cannot exceed standard-tier limit (700)
assert!(permit2.track(200, 600).is_err());
assert_eq!(tracker.current(), 600);
// Can add within standard-tier limit
assert!(permit2.track(100, 600).is_ok());
assert_eq!(tracker.current(), 700);
// Cannot exceed standard-tier limit
assert!(permit2.track(1, 700).is_err());
assert_eq!(tracker.current(), 700);
permit2.release(700);
assert_eq!(tracker.current(), 0);
}
}

View File

@@ -522,6 +522,7 @@ impl DatanodeBuilder {
file_ref_manager,
partition_expr_fetcher.clone(),
plugins,
opts.max_concurrent_queries,
);
#[cfg(feature = "enterprise")]
@@ -564,6 +565,7 @@ impl DatanodeBuilder {
file_ref_manager,
partition_expr_fetcher,
plugins,
opts.max_concurrent_queries,
);
#[cfg(feature = "enterprise")]
@@ -585,6 +587,7 @@ impl DatanodeBuilder {
file_ref_manager,
partition_expr_fetcher.clone(),
plugins,
opts.max_concurrent_queries,
);
#[cfg(feature = "enterprise")]

View File

@@ -21,6 +21,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use common_base::memory_limit::MemoryLimit;
use common_config::Configurable;
use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManagerRef;
@@ -132,6 +133,7 @@ impl Default for FlownodeOptions {
query: QueryOptions {
parallelism: 1,
allow_query_fallback: false,
memory_pool_size: MemoryLimit::default(),
},
user_provider: None,
memory: MemoryOptions::default(),

View File

@@ -18,6 +18,7 @@ use std::cmp;
use std::path::Path;
use std::time::Duration;
use common_base::memory_limit::MemoryLimit;
use common_base::readable_size::ReadableSize;
use common_stat::{get_total_cpu_cores, get_total_memory_readable};
use common_telemetry::warn;
@@ -128,6 +129,9 @@ pub struct MitoConfig {
pub max_concurrent_scan_files: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,
/// Memory limit for table scans across all queries. Setting it to 0 disables the limit.
/// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%").
pub scan_memory_limit: MemoryLimit,
/// Index configs.
pub index: IndexConfig,
@@ -182,6 +186,7 @@ impl Default for MitoConfig {
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
allow_stale_entries: false,
scan_memory_limit: MemoryLimit::default(),
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),
fulltext_index: FulltextIndexConfig::default(),

View File

@@ -83,7 +83,8 @@ use async_trait::async_trait;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
use common_stat::get_total_memory_bytes;
use common_telemetry::{info, tracing, warn};
use common_wal::options::{WAL_OPTIONS_KEY, WalOptions};
use futures::future::{join_all, try_join_all};
@@ -122,7 +123,9 @@ use crate::extension::BoxedExtensionRangeProviderFactory;
use crate::gc::GcLimiterRef;
use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableStats;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::metrics::{
HANDLE_REQUEST_ELAPSED, SCAN_MEMORY_USAGE_BYTES, SCAN_REQUESTS_REJECTED_TOTAL,
};
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
@@ -147,6 +150,7 @@ pub struct MitoEngineBuilder<'a, S: LogStore> {
file_ref_manager: FileReferenceManagerRef,
partition_expr_fetcher: PartitionExprFetcherRef,
plugins: Plugins,
max_concurrent_queries: usize,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
}
@@ -162,6 +166,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
file_ref_manager: FileReferenceManagerRef,
partition_expr_fetcher: PartitionExprFetcherRef,
plugins: Plugins,
max_concurrent_queries: usize,
) -> Self {
Self {
data_home,
@@ -172,6 +177,7 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
file_ref_manager,
plugins,
partition_expr_fetcher,
max_concurrent_queries,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
}
@@ -204,10 +210,22 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
)
.await?;
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(self.log_store));
let total_memory = get_total_memory_bytes().max(0) as u64;
let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
let scan_memory_tracker =
QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries)
.with_on_update(|usage| {
SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
})
.with_on_reject(|| {
SCAN_REQUESTS_REJECTED_TOTAL.inc();
});
let inner = EngineInner {
workers,
config,
wal_raw_entry_reader,
scan_memory_tracker,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
};
@@ -250,6 +268,7 @@ impl MitoEngine {
file_ref_manager,
partition_expr_fetcher,
plugins,
0, // Default: no limit on concurrent queries
);
builder.try_build().await
}
@@ -614,6 +633,8 @@ struct EngineInner {
config: Arc<MitoConfig>,
/// The Wal raw entry reader.
wal_raw_entry_reader: Arc<dyn RawEntryReader>,
/// Memory tracker for table scans.
scan_memory_tracker: QueryMemoryTracker,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
}
@@ -1105,6 +1126,10 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}
fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
Some(Arc::new(self.inner.scan_memory_tracker.register_permit()))
}
async fn get_committed_sequence(
&self,
region_id: RegionId,
@@ -1249,6 +1274,15 @@ impl MitoEngine {
let config = Arc::new(config);
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
let total_memory = get_total_memory_bytes().max(0) as u64;
let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0)
.with_on_update(|usage| {
SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
})
.with_on_reject(|| {
SCAN_REQUESTS_REJECTED_TOTAL.inc();
});
Ok(MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(
@@ -1265,6 +1299,7 @@ impl MitoEngine {
.await?,
config,
wal_raw_entry_reader,
scan_memory_tracker,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
}),

View File

@@ -36,6 +36,7 @@ pub const STAGING_TYPE: &str = "index_staging";
/// Recycle bin type label.
pub const RECYCLE_TYPE: &str = "recycle_bin";
// Write metrics.
lazy_static! {
/// Global write buffer size in bytes.
pub static ref WRITE_BUFFER_BYTES: IntGauge =
@@ -114,10 +115,10 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
// ------ End of write related metrics
}
// Compaction metrics
// Compaction metrics.
lazy_static! {
/// Timer of different stages in compaction.
/// - pick
/// - merge (in parallel)
@@ -156,8 +157,10 @@ lazy_static! {
"greptime_mito_inflight_compaction_count",
"inflight compaction count",
).unwrap();
}
// Query metrics.
// Query metrics.
lazy_static! {
/// Timer of different stages in query.
pub static ref READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_mito_read_stage_elapsed",
@@ -207,9 +210,20 @@ lazy_static! {
"Number of rows returned in a scan task",
exponential_buckets(100.0, 10.0, 7).unwrap(),
).unwrap();
// ------- End of query metrics.
/// Gauge for scan memory usage in bytes.
pub static ref SCAN_MEMORY_USAGE_BYTES: IntGauge = register_int_gauge!(
"greptime_mito_scan_memory_usage_bytes",
"current scan memory usage in bytes"
).unwrap();
/// Counter of rejected scan requests due to memory limit.
pub static ref SCAN_REQUESTS_REJECTED_TOTAL: IntCounter = register_int_counter!(
"greptime_mito_scan_requests_rejected_total",
"total number of scan requests rejected due to memory limit"
).unwrap();
}
// Cache related metrics.
// Cache metrics.
lazy_static! {
/// Cache hit counter.
pub static ref CACHE_HIT: IntCounterVec = register_int_counter_vec!(
"greptime_mito_cache_hit",
@@ -261,8 +275,10 @@ lazy_static! {
"mito cache eviction",
&[TYPE_LABEL, CACHE_EVICTION_CAUSE]
).unwrap();
// ------- End of cache metrics.
}
// Index metrics.
lazy_static! {
// Index metrics.
/// Timer of index application.
pub static ref INDEX_APPLY_ELAPSED: HistogramVec = register_histogram_vec!(
@@ -359,8 +375,9 @@ lazy_static! {
/// Counter of flush operations on intermediate files.
pub static ref INDEX_INTERMEDIATE_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "intermediate"]);
// ------- End of index metrics.
}
lazy_static! {
/// Partition tree memtable data buffer freeze metrics
pub static ref PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_partition_tree_buffer_freeze_stage_elapsed",
@@ -405,7 +422,6 @@ lazy_static! {
}
// Use another block to avoid reaching the recursion limit.
lazy_static! {
/// Counter for compaction input file size.
pub static ref COMPACTION_INPUT_BYTES: Counter = register_counter!(

View File

@@ -33,6 +33,7 @@ common-plugins.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-stat.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true

View File

@@ -185,7 +185,8 @@ impl TableProvider for DummyTableProvider {
.handle_query(self.region_id, request.clone())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mut scan_exec = RegionScanExec::new(scanner, request)?;
let query_memory_permit = self.engine.register_query_memory_permit();
let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?;
if let Some(query_ctx) = &self.query_ctx {
scan_exec.set_explain_verbose(query_ctx.explain_verbose());
}

View File

@@ -62,6 +62,18 @@ lazy_static! {
"query push down fallback errors total"
)
.unwrap();
pub static ref QUERY_MEMORY_POOL_USAGE_BYTES: IntGauge = register_int_gauge!(
"greptime_query_memory_pool_usage_bytes",
"current query memory pool usage in bytes"
)
.unwrap();
pub static ref QUERY_MEMORY_POOL_REJECTED_TOTAL: IntCounter = register_int_counter!(
"greptime_query_memory_pool_rejected_total",
"total number of query memory allocations rejected"
)
.unwrap();
}
/// A stream to call the callback once a RecordBatch stream is done.

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::memory_limit::MemoryLimit;
use serde::{Deserialize, Serialize};
/// Query engine config
@@ -22,6 +23,10 @@ pub struct QueryOptions {
pub parallelism: usize,
/// Whether to allow query fallback when push down fails.
pub allow_query_fallback: bool,
/// Memory pool size for query execution. Setting it to 0 disables the limit (unbounded).
/// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%").
/// When this limit is reached, queries will fail with ResourceExhausted error.
pub memory_pool_size: MemoryLimit,
}
#[allow(clippy::derivable_impls)]
@@ -30,6 +35,7 @@ impl Default for QueryOptions {
Self {
parallelism: 0,
allow_query_fallback: false,
memory_pool_size: MemoryLimit::default(),
}
}
}

View File

@@ -25,13 +25,17 @@ use common_function::handlers::{
FlowServiceHandlerRef, ProcedureServiceHandlerRef, TableMutationHandlerRef,
};
use common_function::state::FunctionState;
use common_stat::get_total_memory_bytes;
use common_telemetry::warn;
use datafusion::catalog::TableFunction;
use datafusion::dataframe::DataFrame;
use datafusion::error::Result as DfResult;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
};
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
@@ -49,6 +53,7 @@ use crate::QueryEngineContext;
use crate::dist_plan::{
DistExtensionPlanner, DistPlannerAnalyzer, DistPlannerOptions, MergeSortExtensionPlanner,
};
use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES};
use crate::optimizer::ExtensionAnalyzerRule;
use crate::optimizer::constant_term::MatchesConstantTermOptimizer;
use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule;
@@ -100,7 +105,18 @@ impl QueryEngineState {
plugins: Plugins,
options: QueryOptionsNew,
) -> Self {
let runtime_env = Arc::new(RuntimeEnv::default());
let total_memory = get_total_memory_bytes().max(0) as u64;
let memory_pool_size = options.memory_pool_size.resolve(total_memory) as usize;
let runtime_env = if memory_pool_size > 0 {
Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(MetricsMemoryPool::new(memory_pool_size)))
.build()
.expect("Failed to build RuntimeEnv"),
)
} else {
Arc::new(RuntimeEnv::default())
};
let mut session_config = SessionConfig::new().with_create_default_catalog_and_schema(false);
if options.parallelism > 0 {
session_config = session_config.with_target_partitions(options.parallelism);
@@ -420,3 +436,66 @@ impl DfQueryPlanner {
}
}
}
/// A wrapper around GreedyMemoryPool that records metrics.
///
/// This wrapper intercepts all memory pool operations and updates
/// Prometheus metrics for monitoring query memory usage and rejections.
#[derive(Debug)]
struct MetricsMemoryPool {
inner: Arc<GreedyMemoryPool>,
}
impl MetricsMemoryPool {
fn new(limit: usize) -> Self {
Self {
inner: Arc::new(GreedyMemoryPool::new(limit)),
}
}
#[inline]
fn update_metrics(&self) {
QUERY_MEMORY_POOL_USAGE_BYTES.set(self.inner.reserved() as i64);
}
}
impl MemoryPool for MetricsMemoryPool {
fn register(&self, consumer: &MemoryConsumer) {
self.inner.register(consumer);
}
fn unregister(&self, consumer: &MemoryConsumer) {
self.inner.unregister(consumer);
}
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
self.inner.grow(reservation, additional);
self.update_metrics();
}
fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
self.inner.shrink(reservation, shrink);
self.update_metrics();
}
fn try_grow(
&self,
reservation: &MemoryReservation,
additional: usize,
) -> datafusion_common::Result<()> {
let result = self.inner.try_grow(reservation, additional);
if result.is_err() {
QUERY_MEMORY_POOL_REJECTED_TOTAL.inc();
}
self.update_metrics();
result
}
fn reserved(&self) -> usize {
self.inner.reserved()
}
fn memory_limit(&self) -> MemoryLimit {
self.inner.memory_limit()
}
}

View File

@@ -23,7 +23,7 @@ use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole
use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream};
use common_time::Timestamp;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
@@ -783,6 +783,11 @@ pub trait RegionEngine: Send + Sync {
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError>;
/// Registers and returns a query memory permit.
fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
None
}
/// Retrieves region's metadata.
async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;

View File

@@ -19,7 +19,10 @@ use std::task::{Context, Poll};
use std::time::Instant;
use common_error::ext::BoxedError;
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream};
use common_recordbatch::{
DfRecordBatch, DfSendableRecordBatchStream, MemoryPermit, MemoryTrackedStream,
SendableRecordBatchStream,
};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
@@ -49,7 +52,6 @@ use store_api::storage::{ScanRequest, TimeSeriesDistribution};
use crate::table::metrics::StreamMetrics;
/// A plan to read multiple partitions from a region of a table.
#[derive(Debug)]
pub struct RegionScanExec {
scanner: Arc<Mutex<RegionScannerRef>>,
arrow_schema: ArrowSchemaRef,
@@ -63,10 +65,32 @@ pub struct RegionScanExec {
// TODO(ruihang): handle TimeWindowed dist via this parameter
distribution: Option<TimeSeriesDistribution>,
explain_verbose: bool,
query_memory_permit: Option<Arc<MemoryPermit>>,
}
impl std::fmt::Debug for RegionScanExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RegionScanExec")
.field("scanner", &self.scanner)
.field("arrow_schema", &self.arrow_schema)
.field("output_ordering", &self.output_ordering)
.field("metric", &self.metric)
.field("properties", &self.properties)
.field("append_mode", &self.append_mode)
.field("total_rows", &self.total_rows)
.field("is_partition_set", &self.is_partition_set)
.field("distribution", &self.distribution)
.field("explain_verbose", &self.explain_verbose)
.finish()
}
}
impl RegionScanExec {
pub fn new(scanner: RegionScannerRef, request: ScanRequest) -> DfResult<Self> {
pub fn new(
scanner: RegionScannerRef,
request: ScanRequest,
query_memory_permit: Option<Arc<MemoryPermit>>,
) -> DfResult<Self> {
let arrow_schema = scanner.schema().arrow_schema().clone();
let scanner_props = scanner.properties();
let mut num_output_partition = scanner_props.num_partitions();
@@ -170,6 +194,7 @@ impl RegionScanExec {
is_partition_set: false,
distribution: request.distribution,
explain_verbose: false,
query_memory_permit,
})
}
@@ -237,6 +262,7 @@ impl RegionScanExec {
is_partition_set: true,
distribution: self.distribution,
explain_verbose: self.explain_verbose,
query_memory_permit: self.query_memory_permit.clone(),
})
}
@@ -320,6 +346,13 @@ impl ExecutionPlan for RegionScanExec {
.unwrap()
.scan_partition(&ctx, &self.metric, partition)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream = if let Some(permit) = &self.query_memory_permit {
Box::pin(MemoryTrackedStream::new(stream, permit.clone()))
} else {
stream
};
let stream_metrics = StreamMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
@@ -511,7 +544,7 @@ mod test {
let region_metadata = Arc::new(builder.build().unwrap());
let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata));
let plan = RegionScanExec::new(scanner, ScanRequest::default()).unwrap();
let plan = RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap();
let actual: SchemaRef = Arc::new(
plan.properties
.eq_properties

View File

@@ -1584,6 +1584,8 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"enable_virtual_host_style =",
"cache_path =",
"cache_capacity =",
"memory_pool_size =",
"scan_memory_limit =",
"sas_token =",
"scope =",
"num_workers =",