refactor: simplify scan memory tracking (#7827)

* refactor: simplify scan memory tracking

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: make confg-docs

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: by codex review comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: track_with_policy

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: minor change

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: mem granularity mb to kb

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: by review comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: by scan_memory_on_exhausted comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by review comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: typo

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-03-25 20:25:50 -07:00
committed by GitHub
parent 8d40b129f1
commit 8058ce7cf2
13 changed files with 469 additions and 449 deletions

1
Cargo.lock generated
View File

@@ -2657,6 +2657,7 @@ dependencies = [
"common-base",
"common-error",
"common-macro",
"common-memory-manager",
"common-telemetry",
"common-time",
"criterion 0.7.0",

View File

@@ -18,7 +18,7 @@
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.<br/>Options: "wait" (default, 10s timeout), "wait(<duration>)" (e.g., "wait(30s)"), "fail" |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited.<br/>NOTE: This setting affects scan_memory_limit's privileged tier allocation.<br/>When set, 70% of queries get privileged memory access (full scan_memory_limit).<br/>The remaining 30% get standard tier access (70% of scan_memory_limit). |
| `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
@@ -160,7 +160,8 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.<br/>Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit.<br/>NOTE: Works with max_concurrent_queries for tiered memory allocation.<br/>- If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.<br/>- If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.<br/>Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit. |
| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately.<br/>"fail" (default) fails fast and is the recommended option for most users.<br/>"wait" / "wait(<duration>)" waits for memory to become available. This is mainly<br/>for advanced tuning in bursty workloads where temporary contention is common and<br/>higher latency is acceptable.<br/>"wait" means "wait(10s)", not unlimited waiting. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
@@ -440,7 +441,7 @@
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited.<br/>NOTE: This setting affects scan_memory_limit's privileged tier allocation.<br/>When set, 70% of queries get privileged memory access (full scan_memory_limit).<br/>The remaining 30% get standard tier access (70% of scan_memory_limit). |
| `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
@@ -552,7 +553,8 @@
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.<br/>Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit.<br/>NOTE: Works with max_concurrent_queries for tiered memory allocation.<br/>- If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.<br/>- If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.<br/>Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit. |
| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately.<br/>"fail" (default) fails fast and is the recommended option for most users.<br/>"wait" / "wait(<duration>)" waits for memory to become available. This is mainly<br/>for advanced tuning in bursty workloads where temporary contention is common and<br/>higher latency is acceptable.<br/>"wait" means "wait(10s)", not unlimited waiting. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |

View File

@@ -17,10 +17,7 @@ init_regions_in_background = false
## Parallelism of initializing regions.
init_regions_parallelism = 16
## The maximum current queries allowed to be executed. Zero means unlimited.
## NOTE: This setting affects scan_memory_limit's privileged tier allocation.
## When set, 70% of queries get privileged memory access (full scan_memory_limit).
## The remaining 30% get standard tier access (70% of scan_memory_limit).
## The maximum concurrent queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
@@ -535,10 +532,14 @@ allow_stale_entries = false
## Memory limit for table scans across all queries.
## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
## Setting it to 0 disables the limit.
## NOTE: Works with max_concurrent_queries for tiered memory allocation.
## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access.
scan_memory_limit = "50%"
## Controls what happens when a scan cannot get memory immediately.
## "fail" (default) fails fast and is the recommended option for most users.
## "wait" / "wait(<duration>)" waits for memory to become available. This is mainly
## for advanced tuning in bursty workloads where temporary contention is common and
## higher latency is acceptable.
## "wait" means "wait(10s)", not unlimited waiting.
scan_memory_on_exhausted = "fail"
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).

View File

@@ -23,10 +23,7 @@ init_regions_in_background = false
## Parallelism of initializing regions.
init_regions_parallelism = 16
## The maximum current queries allowed to be executed. Zero means unlimited.
## NOTE: This setting affects scan_memory_limit's privileged tier allocation.
## When set, 70% of queries get privileged memory access (full scan_memory_limit).
## The remaining 30% get standard tier access (70% of scan_memory_limit).
## The maximum concurrent queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
@@ -627,10 +624,14 @@ allow_stale_entries = false
## Memory limit for table scans across all queries.
## Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").
## Setting it to 0 disables the limit.
## NOTE: Works with max_concurrent_queries for tiered memory allocation.
## - If max_concurrent_queries is set: 70% of queries get full access, 30% get 70% access.
## - If max_concurrent_queries is 0 (unlimited): first 20 queries get full access, rest get 70% access.
scan_memory_limit = "50%"
## Controls what happens when a scan cannot get memory immediately.
## "fail" (default) fails fast and is the recommended option for most users.
## "wait" / "wait(<duration>)" waits for memory to become available. This is mainly
## for advanced tuning in bursty workloads where temporary contention is common and
## higher latency is acceptable.
## "wait" means "wait(10s)", not unlimited waiting.
scan_memory_on_exhausted = "fail"
## Minimum time interval between two compactions.
## To align with the old behavior, the default value is 0 (no restrictions).

View File

@@ -12,6 +12,7 @@ arc-swap = "1.6"
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-memory-manager.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true

View File

@@ -22,13 +22,17 @@ pub mod recordbatch;
pub mod util;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
use common_base::readable_size::ReadableSize;
use common_error::ext::BoxedError;
use common_memory_manager::{
MemoryGuard, MemoryManager, MemoryMetrics, OnExhaustedPolicy, PermitGranularity,
};
use common_telemetry::tracing::Span;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::array::{ArrayRef, AsArray, StringBuilder};
@@ -42,7 +46,7 @@ use error::Result;
use futures::task::{Context, Poll};
use futures::{Stream, TryStreamExt};
pub use recordbatch::RecordBatch;
use snafu::{ResultExt, ensure};
use snafu::{IntoError, ResultExt, ensure};
use crate::error::NewDfRecordBatchSnafu;
@@ -416,205 +420,93 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStream
}
}
/// Memory permit for a stream, providing privileged access or rate limiting.
///
/// The permit tracks whether this stream has privileged Top-K status.
/// When dropped, it automatically releases any privileged slot it holds.
pub struct MemoryPermit {
tracker: QueryMemoryTracker,
is_privileged: AtomicBool,
}
impl MemoryPermit {
/// Check if this permit currently has privileged status.
pub fn is_privileged(&self) -> bool {
self.is_privileged.load(Ordering::Acquire)
}
/// Ensure this permit has privileged status by acquiring a slot if available.
/// Returns true if privileged (either already privileged or just acquired privilege).
fn ensure_privileged(&self) -> bool {
if self.is_privileged.load(Ordering::Acquire) {
return true;
}
// Try to claim a privileged slot
self.tracker
.privileged_count
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
if count < self.tracker.privileged_slots {
Some(count + 1)
} else {
None
}
})
.map(|_| {
self.is_privileged.store(true, Ordering::Release);
true
})
.unwrap_or(false)
}
/// Track additional memory usage with this permit.
/// Returns error if limit is exceeded.
///
/// # Arguments
/// * `additional` - Additional memory size to track in bytes
/// * `stream_tracked` - Total memory already tracked by this stream
///
/// # Behavior
/// - Privileged streams: Can push global memory usage up to full limit
/// - Standard-tier streams: Can push global memory usage up to limit * standard_tier_memory_fraction (default: 0.7)
/// - Standard-tier streams automatically attempt to acquire privilege if slots become available
/// - The configured limit is absolute hard limit - no stream can exceed it
pub fn track(&self, additional: usize, stream_tracked: usize) -> Result<()> {
// Ensure privileged status if possible
let is_privileged = self.ensure_privileged();
self.tracker
.track_internal(additional, is_privileged, stream_tracked)
}
/// Release tracked memory.
///
/// # Arguments
/// * `amount` - Amount of memory to release in bytes
pub fn release(&self, amount: usize) {
self.tracker.release(amount);
}
}
impl Drop for MemoryPermit {
fn drop(&mut self) {
// Release privileged slot if we had one
if self.is_privileged.load(Ordering::Acquire) {
self.tracker
.privileged_count
.fetch_sub(1, Ordering::Release);
}
}
}
/// Memory tracker for RecordBatch streams. Clone to share the same limit across queries.
///
/// Implements a two-tier memory allocation strategy:
/// - **Privileged tier**: First N streams (default: 20) can use up to the full memory limit
/// - **Standard tier**: Remaining streams are restricted to a fraction of the limit (default: 70%)
/// - Privilege is granted on a first-come-first-served basis
/// - The configured limit is an absolute hard cap - no stream can exceed it
/// Each stream acquires quota independently from this tracker.
#[derive(Clone)]
pub struct QueryMemoryTracker {
current: Arc<AtomicUsize>,
limit: usize,
standard_tier_memory_fraction: f64,
privileged_count: Arc<AtomicUsize>,
privileged_slots: usize,
on_update: Option<Arc<dyn Fn(usize) + Send + Sync>>,
on_reject: Option<Arc<dyn Fn() + Send + Sync>>,
manager: MemoryManager<CallbackMemoryMetrics>,
metrics: CallbackMemoryMetrics,
on_exhausted_policy: OnExhaustedPolicy,
}
impl fmt::Debug for QueryMemoryTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueryMemoryTracker")
.field("current", &self.current.load(Ordering::Acquire))
.field("limit", &self.limit)
.field(
"standard_tier_memory_fraction",
&self.standard_tier_memory_fraction,
)
.field(
"privileged_count",
&self.privileged_count.load(Ordering::Acquire),
)
.field("privileged_slots", &self.privileged_slots)
.field("on_update", &self.on_update.is_some())
.field("on_reject", &self.on_reject.is_some())
.field("current", &self.current())
.field("limit", &self.limit())
.field("on_exhausted_policy", &self.on_exhausted_policy)
.field("on_update", &self.metrics.has_on_update())
.field("on_reject", &self.metrics.has_on_reject())
.finish()
}
}
impl QueryMemoryTracker {
// Default privileged slots when max_concurrent_queries is 0.
const DEFAULT_PRIVILEGED_SLOTS: usize = 20;
// Ratio for privileged tier: 70% queries get privileged access, standard tier uses 70% memory.
const DEFAULT_PRIVILEGED_TIER_RATIO: f64 = 0.7;
/// Create a new memory tracker with the given limit and max_concurrent_queries.
/// Calculates privileged slots as 70% of max_concurrent_queries (or 20 if max_concurrent_queries is 0).
///
/// # Arguments
/// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
/// * `max_concurrent_queries` - Maximum number of concurrent queries (0 = unlimited).
pub fn new(limit: usize, max_concurrent_queries: usize) -> Self {
let privileged_slots = Self::calculate_privileged_slots(max_concurrent_queries);
Self::with_privileged_slots(limit, privileged_slots)
}
/// Create a new memory tracker with custom privileged slots limit.
pub fn with_privileged_slots(limit: usize, privileged_slots: usize) -> Self {
Self::with_config(limit, privileged_slots, Self::DEFAULT_PRIVILEGED_TIER_RATIO)
}
/// Create a new memory tracker with full configuration.
///
/// # Arguments
/// * `limit` - Maximum memory usage in bytes (hard limit for all streams). 0 means unlimited.
/// * `privileged_slots` - Maximum number of streams that can get privileged status.
/// * `standard_tier_memory_fraction` - Memory fraction for standard-tier streams (range: [0.0, 1.0]).
///
/// # Panics
/// Panics if `standard_tier_memory_fraction` is not in the range [0.0, 1.0].
pub fn with_config(
/// Create a builder for a query memory tracker.
pub fn builder(
limit: usize,
privileged_slots: usize,
standard_tier_memory_fraction: f64,
) -> Self {
assert!(
(0.0..=1.0).contains(&standard_tier_memory_fraction),
"standard_tier_memory_fraction must be in [0.0, 1.0], got {}",
standard_tier_memory_fraction
);
Self {
current: Arc::new(AtomicUsize::new(0)),
on_exhausted_policy: OnExhaustedPolicy,
) -> QueryMemoryTrackerBuilder {
QueryMemoryTrackerBuilder {
limit,
standard_tier_memory_fraction,
privileged_count: Arc::new(AtomicUsize::new(0)),
privileged_slots,
on_exhausted_policy,
on_update: None,
on_reject: None,
}
}
/// Register a new permit for memory tracking.
/// The first `privileged_slots` permits get privileged status automatically.
/// The returned permit can be shared across multiple streams of the same query.
pub fn register_permit(&self) -> MemoryPermit {
// Try to claim a privileged slot
let is_privileged = self
.privileged_count
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
if count < self.privileged_slots {
Some(count + 1)
} else {
None
}
})
.is_ok();
MemoryPermit {
fn new_stream_tracker(&self) -> StreamMemoryTracker {
StreamMemoryTracker {
tracker: self.clone(),
is_privileged: AtomicBool::new(is_privileged),
guard: self.manager.try_acquire(0).unwrap(),
tracked_bytes: 0,
}
}
/// Get the current memory usage in bytes.
pub fn current(&self) -> usize {
self.manager.used_bytes() as usize
}
fn limit(&self) -> usize {
self.manager.limit_bytes() as usize
}
fn reject_error(
&self,
current: usize,
additional: usize,
stream_tracked: usize,
) -> error::Error {
let limit = self.limit();
let msg = format!(
"{} requested, {} used globally ({}%), {} used by this stream, hard limit: {}",
ReadableSize(additional as u64),
ReadableSize(current as u64),
if limit > 0 { current * 100 / limit } else { 0 },
ReadableSize(stream_tracked as u64),
ReadableSize(limit as u64)
);
error::ExceedMemoryLimitSnafu { msg }.build()
}
}
/// Builder for constructing a [`QueryMemoryTracker`] with optional callbacks.
pub struct QueryMemoryTrackerBuilder {
limit: usize,
on_exhausted_policy: OnExhaustedPolicy,
on_update: Option<UpdateCallback>,
on_reject: Option<RejectCallback>,
}
impl QueryMemoryTrackerBuilder {
/// Set a callback to be called whenever the usage changes successfully.
/// The callback receives the new total usage in bytes.
///
/// # Note
/// The callback is called after both successful `track()` and `release()` operations.
/// It is called even when `limit == 0` (unlimited mode) to track actual usage.
pub fn with_on_update<F>(mut self, on_update: F) -> Self
/// The callback is called after both successful `track()` and stream drop.
/// Usage is exact in unlimited mode and 1KB-aligned in limited mode.
pub fn on_update<F>(mut self, on_update: F) -> Self
where
F: Fn(usize) + Send + Sync + 'static,
{
@@ -627,7 +519,7 @@ impl QueryMemoryTracker {
/// # Note
/// This is only called when `track()` fails due to exceeding the limit.
/// It is never called when `limit == 0` (unlimited mode).
pub fn with_on_reject<F>(mut self, on_reject: F) -> Self
pub fn on_reject<F>(mut self, on_reject: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
@@ -635,105 +527,130 @@ impl QueryMemoryTracker {
self
}
/// Get the current memory usage in bytes.
pub fn current(&self) -> usize {
self.current.load(Ordering::Acquire)
}
/// Build a [`QueryMemoryTracker`] from this builder.
pub fn build(self) -> QueryMemoryTracker {
let metrics = CallbackMemoryMetrics::new(self.on_update, self.on_reject);
let manager = MemoryManager::with_granularity(
self.limit as u64,
PermitGranularity::Kilobyte,
metrics.clone(),
);
fn calculate_privileged_slots(max_concurrent_queries: usize) -> usize {
if max_concurrent_queries == 0 {
Self::DEFAULT_PRIVILEGED_SLOTS
QueryMemoryTracker {
manager,
metrics,
on_exhausted_policy: self.on_exhausted_policy,
}
}
}
struct StreamMemoryTracker {
tracker: QueryMemoryTracker,
guard: MemoryGuard<CallbackMemoryMetrics>,
tracked_bytes: usize,
}
type MemoryAcquireResult = std::result::Result<(), common_memory_manager::Error>;
impl StreamMemoryTracker {
fn try_track(&mut self, additional: usize) -> Result<()> {
if self.guard.try_acquire_additional(additional as u64) {
self.tracked_bytes = self.tracked_bytes.saturating_add(additional);
Ok(())
} else {
((max_concurrent_queries as f64 * Self::DEFAULT_PRIVILEGED_TIER_RATIO) as usize).max(1)
Err(self.reject_error(additional))
}
}
/// Internal method to track additional memory usage.
///
/// Called by `MemoryPermit::track()`. Use `MemoryPermit::track()` instead of calling this directly.
fn track_internal(
&self,
additional: usize,
is_privileged: bool,
stream_tracked: usize,
) -> Result<()> {
// Calculate effective global limit based on stream privilege
// Privileged streams: can push global usage up to full limit
// Standard-tier streams: can only push global usage up to fraction of limit
let effective_limit = if is_privileged {
self.limit
} else {
(self.limit as f64 * self.standard_tier_memory_fraction) as usize
};
let mut new_total = 0;
async fn track_with_policy(mut self, additional: usize) -> (Self, MemoryAcquireResult) {
let result = self
.current
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
new_total = current.saturating_add(additional);
.guard
.acquire_additional_with_policy(additional as u64, self.tracker.on_exhausted_policy)
.await;
if result.is_ok() {
self.tracked_bytes = self.tracked_bytes.saturating_add(additional);
}
(self, result)
}
if self.limit == 0 {
// Unlimited mode
return Some(new_total);
}
fn reject_error(&self, additional: usize) -> error::Error {
let current = self.tracker.current();
self.tracker
.reject_error(current, additional, self.tracked_bytes)
}
// Check if new global total exceeds effective limit
// The configured limit is absolute hard limit - no stream can exceed it
if new_total <= effective_limit {
Some(new_total)
} else {
None
}
});
match result {
Ok(_) => {
if let Some(callback) = &self.on_update {
callback(new_total);
}
Ok(())
fn wait_error(&self, additional: usize, source: common_memory_manager::Error) -> error::Error {
match source {
common_memory_manager::Error::MemoryLimitExceeded { .. } => {
self.reject_error(additional)
}
Err(current) => {
if let Some(callback) = &self.on_reject {
callback();
}
common_memory_manager::Error::MemoryAcquireTimeout { waited, .. } => {
let current = self.tracker.current();
let limit = self.tracker.limit();
let msg = format!(
"{} requested, {} used globally ({}%), {} used by this stream (privileged: {}), effective limit: {} ({}%), hard limit: {}",
"timed out waiting {:?} for {}, {} used globally ({}%), {} used by this stream, hard limit: {}",
waited,
ReadableSize(additional as u64),
ReadableSize(current as u64),
if self.limit > 0 {
current * 100 / self.limit
} else {
0
},
ReadableSize(stream_tracked as u64),
is_privileged,
ReadableSize(effective_limit as u64),
if self.limit > 0 {
effective_limit * 100 / self.limit
} else {
0
},
ReadableSize(self.limit as u64)
if limit > 0 { current * 100 / limit } else { 0 },
ReadableSize(self.tracked_bytes as u64),
ReadableSize(limit as u64)
);
error::ExceedMemoryLimitSnafu { msg }.fail()
error::ExceedMemoryLimitSnafu { msg }.build()
}
error => error::ExternalSnafu.into_error(BoxedError::new(error)),
}
}
}
type PendingTrackFuture = Pin<
Box<dyn Future<Output = (StreamMemoryTracker, RecordBatch, usize, MemoryAcquireResult)> + Send>,
>;
#[derive(Clone)]
struct CallbackMemoryMetrics {
inner: Arc<CallbackMemoryMetricsInner>,
}
type UpdateCallback = Arc<dyn Fn(usize) + Send + Sync>;
type RejectCallback = Arc<dyn Fn() + Send + Sync>;
struct CallbackMemoryMetricsInner {
on_update: Option<UpdateCallback>,
on_reject: Option<RejectCallback>,
}
impl CallbackMemoryMetrics {
fn new(on_update: Option<UpdateCallback>, on_reject: Option<RejectCallback>) -> Self {
Self {
inner: Arc::new(CallbackMemoryMetricsInner {
on_update,
on_reject,
}),
}
}
/// Release tracked memory.
///
/// # Arguments
/// * `amount` - Amount of memory to release in bytes
pub fn release(&self, amount: usize) {
if let Ok(old_value) =
self.current
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
Some(current.saturating_sub(amount))
})
&& let Some(callback) = &self.on_update
{
callback(old_value.saturating_sub(amount));
fn has_on_update(&self) -> bool {
self.inner.on_update.is_some()
}
fn has_on_reject(&self) -> bool {
self.inner.on_reject.is_some()
}
}
impl MemoryMetrics for CallbackMemoryMetrics {
fn set_limit(&self, _: i64) {}
fn set_in_use(&self, bytes: i64) {
if let Some(callback) = &self.inner.on_update {
callback(bytes.max(0) as usize);
}
}
fn inc_rejected(&self, _: &str) {
if let Some(callback) = &self.inner.on_reject {
callback();
}
}
}
@@ -741,38 +658,107 @@ impl QueryMemoryTracker {
/// A wrapper stream that tracks memory usage of RecordBatches.
pub struct MemoryTrackedStream {
inner: SendableRecordBatchStream,
permit: Arc<MemoryPermit>,
// Total tracked size, released when stream drops.
total_tracked: usize,
tracker: Option<StreamMemoryTracker>,
// Waiting stores a batch that has already been pulled from the inner stream but has not yet
// acquired additional quota. This keeps `poll_next()` non-blocking and allows bounded waits,
// at the cost of temporarily holding one untracked batch per blocked stream in memory.
waiting: Option<PendingTrackFuture>,
}
impl MemoryTrackedStream {
pub fn new(inner: SendableRecordBatchStream, permit: Arc<MemoryPermit>) -> Self {
pub fn new(inner: SendableRecordBatchStream, tracker: QueryMemoryTracker) -> Self {
Self {
inner,
permit,
total_tracked: 0,
tracker: Some(tracker.new_stream_tracker()),
waiting: None,
}
}
fn ready_tracker_mut(&mut self) -> &mut StreamMemoryTracker {
debug_assert!(
self.waiting.is_none(),
"a ready tracker must not coexist with a waiting future"
);
self.tracker.as_mut().unwrap()
}
fn enter_waiting(&mut self, batch: RecordBatch, additional: usize) {
debug_assert!(
self.waiting.is_none(),
"enter_waiting should only be called from the ready state"
);
debug_assert!(
self.tracker.is_some(),
"enter_waiting requires a tracker in the ready state"
);
let tracker = self.tracker.take().unwrap();
self.waiting = Some(Self::start_waiting(tracker, batch, additional));
}
fn start_waiting(
tracker: StreamMemoryTracker,
batch: RecordBatch,
additional: usize,
) -> PendingTrackFuture {
Box::pin(async move {
let (tracker, result) = tracker.track_with_policy(additional).await;
(tracker, batch, additional, result)
})
}
fn poll_waiting(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
let future = self.waiting.as_mut().unwrap();
match future.as_mut().poll(cx) {
Poll::Ready((tracker, batch, additional, result)) => {
let output = match result {
Ok(()) => Ok(batch),
Err(error) => Err(tracker.wait_error(additional, error)),
};
self.waiting = None;
self.tracker = Some(tracker);
Poll::Ready(Some(output))
}
Poll::Pending => Poll::Pending,
}
}
fn poll_batch(
&mut self,
batch: RecordBatch,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
let additional = batch.buffer_memory_size();
let tracker = self.ready_tracker_mut();
if let Err(error) = tracker.try_track(additional) {
match tracker.tracker.on_exhausted_policy {
OnExhaustedPolicy::Fail => return Poll::Ready(Some(Err(error))),
// `Wait` is a deliberate tradeoff: the batch has already been materialized, so we
// keep it in memory while waiting for quota instead of failing immediately. Under
// contention, real memory usage can therefore exceed `scan_memory_limit` by up to
// one buffered batch per blocked stream.
OnExhaustedPolicy::Wait { .. } => {
self.enter_waiting(batch, additional);
return self.poll_waiting(cx);
}
}
}
Poll::Ready(Some(Ok(batch)))
}
}
impl Stream for MemoryTrackedStream {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.waiting.is_some() {
return self.poll_waiting(cx);
}
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => {
let additional = batch.buffer_memory_size();
if let Err(e) = self.permit.track(additional, self.total_tracked) {
return Poll::Ready(Some(Err(e)));
}
self.total_tracked += additional;
Poll::Ready(Some(Ok(batch)))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(batch))) => self.poll_batch(batch, cx),
Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
@@ -783,14 +769,6 @@ impl Stream for MemoryTrackedStream {
}
}
impl Drop for MemoryTrackedStream {
fn drop(&mut self) {
if self.total_tracked > 0 {
self.permit.release(self.total_tracked);
}
}
}
impl RecordBatchStream for MemoryTrackedStream {
fn schema(&self) -> SchemaRef {
self.inner.schema()
@@ -808,13 +786,34 @@ impl RecordBatchStream for MemoryTrackedStream {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use common_memory_manager::{OnExhaustedPolicy, PermitGranularity};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{BooleanVector, Int32Vector, StringVector};
use futures::StreamExt;
use tokio::time::{sleep, timeout};
use super::*;
fn large_string_batch(bytes: usize) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"payload",
ConcreteDataType::string_datatype(),
false,
)]));
let payload = "x".repeat(bytes);
let vector: VectorRef = Arc::new(StringVector::from(vec![payload]));
RecordBatch::new(schema, vec![vector]).unwrap()
}
fn aligned_tracked_bytes(bytes: usize) -> usize {
PermitGranularity::Kilobyte
.permits_to_bytes(PermitGranularity::Kilobyte.bytes_to_permits(bytes as u64))
as usize
}
#[test]
fn test_recordbatches_try_from_columns() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
@@ -896,156 +895,168 @@ mod tests {
assert_eq!(collected[1], batch2);
}
const MB: usize = 1024 * 1024;
#[test]
fn test_query_memory_tracker_basic() {
let tracker = Arc::new(QueryMemoryTracker::new(1000, 0));
let tracker =
Arc::new(QueryMemoryTracker::builder(10 * MB, OnExhaustedPolicy::Fail).build());
// Register first stream - should get privileged status
let permit1 = tracker.register_permit();
assert!(permit1.is_privileged());
let mut stream1 = tracker.new_stream_tracker();
assert!(stream1.try_track(5 * MB).is_ok());
assert_eq!(tracker.current(), 5 * MB);
// Privileged stream can use up to limit
assert!(permit1.track(500, 0).is_ok());
assert_eq!(tracker.current(), 500);
let mut stream2 = tracker.new_stream_tracker();
assert!(stream2.try_track(4 * MB).is_ok());
assert_eq!(tracker.current(), 9 * MB);
// Register second stream - also privileged
let permit2 = tracker.register_permit();
assert!(permit2.is_privileged());
// Can add more but cannot exceed hard limit (1000)
assert!(permit2.track(400, 0).is_ok());
assert_eq!(tracker.current(), 900);
permit1.release(500);
permit2.release(400);
drop(stream1);
drop(stream2);
assert_eq!(tracker.current(), 0);
}
#[test]
fn test_query_memory_tracker_privileged_limit() {
// Privileged slots = 2 for easy testing
// Limit: 1000, standard-tier fraction: 0.7 (default)
// Privileged can push global to 1000, standard-tier can push global to 700
let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 2));
fn test_query_memory_tracker_shared_global_limit() {
let tracker =
Arc::new(QueryMemoryTracker::builder(10 * MB, OnExhaustedPolicy::Fail).build());
let mut stream1 = tracker.new_stream_tracker();
let mut stream2 = tracker.new_stream_tracker();
// First 2 streams are privileged
let permit1 = tracker.register_permit();
let permit2 = tracker.register_permit();
assert!(permit1.is_privileged());
assert!(permit2.is_privileged());
assert!(stream1.try_track(3 * MB).is_ok());
assert_eq!(tracker.current(), 3 * MB);
assert!(stream2.try_track(6 * MB).is_ok());
assert_eq!(tracker.current(), 9 * MB);
// Third stream is standard-tier (not privileged)
let permit3 = tracker.register_permit();
assert!(!permit3.is_privileged());
// Privileged stream uses some memory
assert!(permit1.track(300, 0).is_ok());
assert_eq!(tracker.current(), 300);
// Standard-tier can add up to 400 (total becomes 700, its effective limit)
assert!(permit3.track(400, 0).is_ok());
assert_eq!(tracker.current(), 700);
// Standard-tier stream cannot push global beyond 700
let err = permit3.track(100, 400).unwrap_err();
let err = stream2.try_track(2 * MB).unwrap_err();
let err_msg = err.to_string();
assert!(err_msg.contains("400B used by this stream"));
assert!(err_msg.contains("effective limit: 700B (70%)"));
assert!(err_msg.contains("700B used globally (70%)"));
assert_eq!(tracker.current(), 700);
assert!(err_msg.contains("6.0MiB used by this stream"));
assert!(err_msg.contains("9.0MiB used globally (90%)"));
assert!(err_msg.contains("hard limit: 10.0MiB"));
assert_eq!(tracker.current(), 9 * MB);
permit1.release(300);
permit3.release(400);
drop(stream1);
assert_eq!(tracker.current(), 6 * MB);
drop(stream2);
assert_eq!(tracker.current(), 0);
}
#[test]
fn test_query_memory_tracker_promotion() {
// Privileged slots = 1 for easy testing
let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 1));
fn test_query_memory_tracker_hard_limit() {
let tracker =
Arc::new(QueryMemoryTracker::builder(10 * MB, OnExhaustedPolicy::Fail).build());
let mut stream = tracker.new_stream_tracker();
// First stream is privileged
let permit1 = tracker.register_permit();
assert!(permit1.is_privileged());
assert!(stream.try_track(9 * MB).is_ok());
assert_eq!(tracker.current(), 9 * MB);
// Second stream is standard-tier (can only use 500)
let permit2 = tracker.register_permit();
assert!(!permit2.is_privileged());
assert!(stream.try_track(2 * MB).is_err());
assert_eq!(tracker.current(), 9 * MB);
// Standard-tier can only track 500
assert!(permit2.track(400, 0).is_ok());
assert_eq!(tracker.current(), 400);
assert!(stream.try_track(MB).is_ok());
assert_eq!(tracker.current(), 10 * MB);
// Drop first permit to release privileged slot
drop(permit1);
assert!(stream.try_track(MB).is_err());
assert_eq!(tracker.current(), 10 * MB);
// Second stream can now be promoted and use more memory
assert!(permit2.track(500, 400).is_ok());
assert!(permit2.is_privileged());
assert_eq!(tracker.current(), 900);
permit2.release(900);
drop(stream);
assert_eq!(tracker.current(), 0);
}
#[test]
fn test_query_memory_tracker_privileged_hard_limit() {
// Test that the configured limit is absolute hard limit for all streams
// Privileged: can use full limit (1000)
// Standard-tier: can use 0.7x limit (700 with defaults)
let tracker = Arc::new(QueryMemoryTracker::new(1000, 0));
fn test_query_memory_tracker_unlimited() {
let tracker = Arc::new(QueryMemoryTracker::builder(0, OnExhaustedPolicy::Fail).build());
let mut stream = tracker.new_stream_tracker();
let permit1 = tracker.register_permit();
assert!(permit1.is_privileged());
// Privileged can use up to full limit (1000)
assert!(permit1.track(900, 0).is_ok());
assert_eq!(tracker.current(), 900);
// Privileged cannot exceed hard limit (1000)
assert!(permit1.track(200, 900).is_err());
assert_eq!(tracker.current(), 900);
// Can add within hard limit
assert!(permit1.track(100, 900).is_ok());
assert_eq!(tracker.current(), 1000);
// Cannot exceed even by 1 byte
assert!(permit1.track(1, 1000).is_err());
assert_eq!(tracker.current(), 1000);
permit1.release(1000);
assert!(stream.try_track(10 * MB).is_ok());
assert_eq!(tracker.current(), 10 * MB);
drop(stream);
assert_eq!(tracker.current(), 0);
}
#[test]
fn test_query_memory_tracker_standard_tier_fraction() {
// Test standard-tier streams use fraction of limit
// Limit: 1000, default fraction: 0.7, so standard-tier can use 700
let tracker = Arc::new(QueryMemoryTracker::with_privileged_slots(1000, 1));
fn test_query_memory_tracker_rounds_to_kilobytes() {
let tracker =
Arc::new(QueryMemoryTracker::builder(10 * MB, OnExhaustedPolicy::Fail).build());
let mut stream = tracker.new_stream_tracker();
let permit1 = tracker.register_permit();
assert!(permit1.is_privileged());
assert!(stream.try_track(1_537).is_ok());
assert_eq!(tracker.current(), 2 * 1024);
let permit2 = tracker.register_permit();
assert!(!permit2.is_privileged());
// Standard-tier can use up to 700 (1000 * 0.7 default)
assert!(permit2.track(600, 0).is_ok());
assert_eq!(tracker.current(), 600);
// Cannot exceed standard-tier limit (700)
assert!(permit2.track(200, 600).is_err());
assert_eq!(tracker.current(), 600);
// Can add within standard-tier limit
assert!(permit2.track(100, 600).is_ok());
assert_eq!(tracker.current(), 700);
// Cannot exceed standard-tier limit
assert!(permit2.track(1, 700).is_err());
assert_eq!(tracker.current(), 700);
permit2.release(700);
drop(stream);
assert_eq!(tracker.current(), 0);
}
#[tokio::test]
async fn test_memory_tracked_stream_waits_for_capacity() {
let tracker = QueryMemoryTracker::builder(
MB,
OnExhaustedPolicy::Wait {
timeout: Duration::from_millis(200),
},
)
.build();
let batch = large_string_batch(700 * 1024);
let expected_bytes = aligned_tracked_bytes(batch.buffer_memory_size());
let mut stream1 = MemoryTrackedStream::new(
RecordBatches::try_new(batch.schema.clone(), vec![batch.clone()])
.unwrap()
.as_stream(),
tracker.clone(),
);
let first = stream1.next().await.unwrap().unwrap();
assert_eq!(first.num_rows(), 1);
assert_eq!(tracker.current(), expected_bytes);
let stream2 = MemoryTrackedStream::new(
RecordBatches::try_new(batch.schema.clone(), vec![batch])
.unwrap()
.as_stream(),
tracker.clone(),
);
let waiter = tokio::spawn(async move {
let mut stream2 = stream2;
stream2.next().await.unwrap()
});
sleep(Duration::from_millis(50)).await;
assert!(!waiter.is_finished());
drop(stream1);
let second = waiter.await.unwrap().unwrap();
assert_eq!(second.num_rows(), 1);
}
#[tokio::test]
async fn test_memory_tracked_stream_wait_times_out() {
let tracker = QueryMemoryTracker::builder(
MB,
OnExhaustedPolicy::Wait {
timeout: Duration::from_millis(50),
},
)
.build();
let batch = large_string_batch(700 * 1024);
let mut stream1 = MemoryTrackedStream::new(
RecordBatches::try_new(batch.schema.clone(), vec![batch.clone()])
.unwrap()
.as_stream(),
tracker.clone(),
);
let first = stream1.next().await.unwrap().unwrap();
assert_eq!(first.num_rows(), 1);
let mut stream2 = MemoryTrackedStream::new(
RecordBatches::try_new(batch.schema.clone(), vec![batch])
.unwrap()
.as_stream(),
tracker,
);
let result = timeout(Duration::from_secs(1), stream2.next())
.await
.unwrap();
let error = result.unwrap().unwrap_err();
assert!(error.to_string().contains("timed out waiting"));
}
}

View File

@@ -538,7 +538,6 @@ impl DatanodeBuilder {
file_ref_manager,
partition_expr_fetcher.clone(),
plugins,
opts.max_concurrent_queries,
);
#[cfg(feature = "enterprise")]
@@ -581,7 +580,6 @@ impl DatanodeBuilder {
file_ref_manager,
partition_expr_fetcher,
plugins,
opts.max_concurrent_queries,
);
#[cfg(feature = "enterprise")]
@@ -603,7 +601,6 @@ impl DatanodeBuilder {
file_ref_manager,
partition_expr_fetcher.clone(),
plugins,
opts.max_concurrent_queries,
);
#[cfg(feature = "enterprise")]

View File

@@ -151,6 +151,11 @@ pub struct MitoConfig {
/// Memory limit for table scans across all queries. Setting it to 0 disables the limit.
/// Supports absolute size (e.g., "2GB") or percentage (e.g., "50%").
pub scan_memory_limit: MemoryLimit,
/// Behavior when scan memory tracking cannot acquire memory from the budget.
/// `wait` means `wait(10s)`, not unlimited waiting.
/// Defaults to [`OnExhaustedPolicy::Fail`], which intentionally differs from
/// [`OnExhaustedPolicy::default()`].
pub scan_memory_on_exhausted: OnExhaustedPolicy,
/// Index configs.
pub index: IndexConfig,
@@ -216,6 +221,7 @@ impl Default for MitoConfig {
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
allow_stale_entries: false,
scan_memory_limit: MemoryLimit::default(),
scan_memory_on_exhausted: OnExhaustedPolicy::Fail,
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),
fulltext_index: FulltextIndexConfig::default(),

View File

@@ -95,7 +95,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::error::UnexpectedSnafu;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
use common_recordbatch::{QueryMemoryTracker, SendableRecordBatchStream};
use common_stat::get_total_memory_bytes;
use common_telemetry::{info, tracing, warn};
use common_wal::options::WalOptions;
@@ -167,7 +167,6 @@ pub struct MitoEngineBuilder<'a, S: LogStore> {
file_ref_manager: FileReferenceManagerRef,
partition_expr_fetcher: PartitionExprFetcherRef,
plugins: Plugins,
max_concurrent_queries: usize,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: Option<BoxedExtensionRangeProviderFactory>,
}
@@ -183,7 +182,6 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
file_ref_manager: FileReferenceManagerRef,
partition_expr_fetcher: PartitionExprFetcherRef,
plugins: Plugins,
max_concurrent_queries: usize,
) -> Self {
Self {
data_home,
@@ -194,7 +192,6 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
file_ref_manager,
plugins,
partition_expr_fetcher,
max_concurrent_queries,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
}
@@ -230,13 +227,14 @@ impl<'a, S: LogStore> MitoEngineBuilder<'a, S> {
let total_memory = get_total_memory_bytes().max(0) as u64;
let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
let scan_memory_tracker =
QueryMemoryTracker::new(scan_memory_limit, self.max_concurrent_queries)
.with_on_update(|usage| {
QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
.on_update(|usage| {
SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
})
.with_on_reject(|| {
.on_reject(|| {
SCAN_REQUESTS_REJECTED_TOTAL.inc();
});
})
.build();
let inner = EngineInner {
workers,
@@ -285,7 +283,6 @@ impl MitoEngine {
file_ref_manager,
partition_expr_fetcher,
plugins,
0, // Default: no limit on concurrent queries
);
builder.try_build().await
}
@@ -1212,8 +1209,8 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}
fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
Some(Arc::new(self.inner.scan_memory_tracker.register_permit()))
fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
Some(self.inner.scan_memory_tracker.clone())
}
async fn get_committed_sequence(
@@ -1378,13 +1375,15 @@ impl MitoEngine {
let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone()));
let total_memory = get_total_memory_bytes().max(0) as u64;
let scan_memory_limit = config.scan_memory_limit.resolve(total_memory) as usize;
let scan_memory_tracker = QueryMemoryTracker::new(scan_memory_limit, 0)
.with_on_update(|usage| {
SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
})
.with_on_reject(|| {
SCAN_REQUESTS_REJECTED_TOTAL.inc();
});
let scan_memory_tracker =
QueryMemoryTracker::builder(scan_memory_limit, config.scan_memory_on_exhausted)
.on_update(|usage| {
SCAN_MEMORY_USAGE_BYTES.set(usage as i64);
})
.on_reject(|| {
SCAN_REQUESTS_REJECTED_TOTAL.inc();
})
.build();
Ok(MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(

View File

@@ -187,8 +187,8 @@ impl TableProvider for DummyTableProvider {
.handle_query(self.region_id, request.clone())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let query_memory_permit = self.engine.register_query_memory_permit();
let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_permit)?;
let query_memory_tracker = self.engine.query_memory_tracker();
let mut scan_exec = RegionScanExec::new(scanner, request, query_memory_tracker)?;
if let Some(query_ctx) = &self.query_ctx {
scan_exec.set_explain_verbose(query_ctx.explain_verbose());
}

View File

@@ -23,7 +23,7 @@ use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole
use api::region::RegionResponse;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream};
use common_recordbatch::{EmptyRecordBatchStream, QueryMemoryTracker, SendableRecordBatchStream};
use common_time::Timestamp;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PhysicalExpr};
@@ -886,8 +886,8 @@ pub trait RegionEngine: Send + Sync {
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError>;
/// Registers and returns a query memory permit.
fn register_query_memory_permit(&self) -> Option<Arc<MemoryPermit>> {
/// Returns the query memory tracker for scan execution.
fn query_memory_tracker(&self) -> Option<QueryMemoryTracker> {
None
}

View File

@@ -20,7 +20,7 @@ use std::time::Instant;
use common_error::ext::BoxedError;
use common_recordbatch::{
DfRecordBatch, DfSendableRecordBatchStream, MemoryPermit, MemoryTrackedStream,
DfRecordBatch, DfSendableRecordBatchStream, MemoryTrackedStream, QueryMemoryTracker,
SendableRecordBatchStream,
};
use common_telemetry::tracing::Span;
@@ -67,7 +67,7 @@ pub struct RegionScanExec {
// TODO(ruihang): handle TimeWindowed dist via this parameter
distribution: Option<TimeSeriesDistribution>,
explain_verbose: bool,
query_memory_permit: Option<Arc<MemoryPermit>>,
query_memory_tracker: Option<QueryMemoryTracker>,
}
impl std::fmt::Debug for RegionScanExec {
@@ -91,7 +91,7 @@ impl RegionScanExec {
pub fn new(
scanner: RegionScannerRef,
request: ScanRequest,
query_memory_permit: Option<Arc<MemoryPermit>>,
query_memory_tracker: Option<QueryMemoryTracker>,
) -> DfResult<Self> {
let arrow_schema = scanner.schema().arrow_schema().clone();
let scanner_props = scanner.properties();
@@ -226,7 +226,7 @@ impl RegionScanExec {
is_partition_set: false,
distribution: request.distribution,
explain_verbose: false,
query_memory_permit,
query_memory_tracker,
})
}
@@ -299,7 +299,7 @@ impl RegionScanExec {
is_partition_set: true,
distribution: self.distribution,
explain_verbose: self.explain_verbose,
query_memory_permit: self.query_memory_permit.clone(),
query_memory_tracker: self.query_memory_tracker.clone(),
})
}
@@ -387,8 +387,8 @@ impl ExecutionPlan for RegionScanExec {
.scan_partition(&ctx, &self.metric, partition)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream = if let Some(permit) = &self.query_memory_permit {
Box::pin(MemoryTrackedStream::new(stream, permit.clone()))
let stream = if let Some(tracker) = &self.query_memory_tracker {
Box::pin(MemoryTrackedStream::new(stream, tracker.clone()))
} else {
stream
};

View File

@@ -1548,6 +1548,7 @@ sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
max_concurrent_scan_files = 384
allow_stale_entries = false
scan_memory_on_exhausted = "fail"
min_compaction_interval = "0s"
default_experimental_flat_format = false