diff --git a/Cargo.lock b/Cargo.lock
index aac00fdc98..6781ac3895 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7672,6 +7672,7 @@ dependencies = [
"either",
"futures",
"greptime-proto",
+ "humantime",
"humantime-serde",
"index",
"itertools 0.14.0",
diff --git a/config/config.md b/config/config.md
index 8b499ca7ee..dc663cead4 100644
--- a/config/config.md
+++ b/config/config.md
@@ -141,6 +141,8 @@
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
+| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
+| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "skip" |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. |
@@ -521,6 +523,8 @@
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
+| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
+| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "skip" |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index bb769c4625..0159fc237b 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -452,6 +452,15 @@ compress_manifest = false
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8
+## Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
+## @toml2docs:none-default="0"
+#+ experimental_compaction_memory_limit = "0"
+
+## Behavior when compaction cannot acquire memory from the budget.
+## Options: "wait" (default, 10s), "wait()", "skip"
+## @toml2docs:none-default="wait"
+#+ experimental_compaction_on_exhausted = "wait"
+
## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 661067d2a1..653b0cb4af 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -546,6 +546,15 @@ compress_manifest = false
## @toml2docs:none-default="Auto"
#+ max_background_purges = 8
+## Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
+## @toml2docs:none-default="0"
+#+ experimental_compaction_memory_limit = "0"
+
+## Behavior when compaction cannot acquire memory from the budget.
+## Options: "wait" (default, 10s), "wait()", "skip"
+## @toml2docs:none-default="wait"
+#+ experimental_compaction_on_exhausted = "wait"
+
## Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs
index 61b47552eb..0a3f27b77e 100644
--- a/src/cmd/src/datanode/objbench.rs
+++ b/src/cmd/src/datanode/objbench.rs
@@ -145,6 +145,17 @@ impl ObjbenchCommand {
let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
let num_row_groups = parquet_meta.num_row_groups() as u64;
+ let max_row_group_uncompressed_size: u64 = parquet_meta
+ .row_groups()
+ .iter()
+ .map(|rg| {
+ rg.columns()
+ .iter()
+ .map(|c| c.uncompressed_size() as u64)
+ .sum::()
+ })
+ .max()
+ .unwrap_or(0);
println!(
"{} Metadata loaded - rows: {}, size: {} bytes",
@@ -160,6 +171,7 @@ impl ObjbenchCommand {
time_range: Default::default(),
level: 0,
file_size,
+ max_row_group_uncompressed_size,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml
index a3686251bb..68d996723e 100644
--- a/src/mito2/Cargo.toml
+++ b/src/mito2/Cargo.toml
@@ -48,6 +48,7 @@ dotenv.workspace = true
either.workspace = true
futures.workspace = true
humantime-serde.workspace = true
+humantime.workspace = true
index.workspace = true
itertools.workspace = true
greptime-proto.workspace = true
diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs
index 4fe93f11fb..bda22ac652 100644
--- a/src/mito2/src/compaction.rs
+++ b/src/mito2/src/compaction.rs
@@ -14,6 +14,7 @@
mod buckets;
pub mod compactor;
+pub mod memory_manager;
pub mod picker;
pub mod run;
mod task;
@@ -46,7 +47,8 @@ use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
-use crate::compaction::picker::{CompactionTask, new_picker};
+use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy};
+use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
@@ -104,12 +106,15 @@ pub(crate) struct CompactionScheduler {
request_sender: Sender,
cache_manager: CacheManagerRef,
engine_config: Arc,
+ memory_manager: Arc,
+ memory_policy: OnExhaustedPolicy,
listener: WorkerListener,
/// Plugins for the compaction scheduler.
plugins: Plugins,
}
impl CompactionScheduler {
+ #[allow(clippy::too_many_arguments)]
pub(crate) fn new(
scheduler: SchedulerRef,
request_sender: Sender,
@@ -117,6 +122,8 @@ impl CompactionScheduler {
engine_config: Arc,
listener: WorkerListener,
plugins: Plugins,
+ memory_manager: Arc,
+ memory_policy: OnExhaustedPolicy,
) -> Self {
Self {
scheduler,
@@ -124,6 +131,8 @@ impl CompactionScheduler {
request_sender,
cache_manager,
engine_config,
+ memory_manager,
+ memory_policy,
listener,
plugins,
}
@@ -429,7 +438,8 @@ impl CompactionScheduler {
};
// Create a local compaction task.
- let mut local_compaction_task = Box::new(CompactionTaskImpl {
+ let estimated_bytes = estimate_compaction_bytes(&picker_output);
+ let local_compaction_task = Box::new(CompactionTaskImpl {
request_sender,
waiters,
start_time,
@@ -437,18 +447,27 @@ impl CompactionScheduler {
picker_output,
compaction_region,
compactor: Arc::new(DefaultCompactor {}),
+ memory_manager: self.memory_manager.clone(),
+ memory_policy: self.memory_policy,
+ estimated_memory_bytes: estimated_bytes,
});
- // Submit the compaction task.
+ self.submit_compaction_task(local_compaction_task, region_id)
+ }
+
+ fn submit_compaction_task(
+ &mut self,
+ mut task: Box,
+ region_id: RegionId,
+ ) -> Result<()> {
self.scheduler
.schedule(Box::pin(async move {
INFLIGHT_COMPACTION_COUNT.inc();
- local_compaction_task.run().await;
+ task.run().await;
INFLIGHT_COMPACTION_COUNT.dec();
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);
- // If failed to submit the job, we need to remove the region from the scheduler.
self.region_status.remove(®ion_id);
e
})
@@ -758,6 +777,20 @@ fn get_expired_ssts(
.collect()
}
+/// Estimates compaction memory as the sum of all input files' maximum row-group
+/// uncompressed sizes.
+fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
+ picker_output
+ .outputs
+ .iter()
+ .flat_map(|output| output.inputs.iter())
+ .map(|file: &FileHandle| {
+ let meta = file.meta_ref();
+ meta.max_row_group_uncompressed_size
+ })
+ .sum()
+}
+
/// Pending compaction request that is supposed to run after current task is finished,
/// typically used for manual compactions.
struct PendingCompaction {
@@ -773,7 +806,7 @@ struct PendingCompaction {
mod tests {
use api::v1::region::StrictWindow;
use common_datasource::compression::CompressionType;
- use tokio::sync::oneshot;
+ use tokio::sync::{Barrier, oneshot};
use super::*;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
@@ -1145,4 +1178,39 @@ mod tests {
assert_eq!(result.unwrap(), 0); // is there a better way to check this?
assert_eq!(0, scheduler.region_status.len());
}
+
+ #[tokio::test]
+ async fn test_concurrent_memory_competition() {
+ let manager = Arc::new(CompactionMemoryManager::new(3 * 1024 * 1024)); // 3MB
+ let barrier = Arc::new(Barrier::new(3));
+ let mut handles = vec![];
+
+ // Spawn 3 tasks competing for memory, each trying to acquire 2MB
+ for _i in 0..3 {
+ let mgr = manager.clone();
+ let bar = barrier.clone();
+ let handle = tokio::spawn(async move {
+ bar.wait().await; // Synchronize start
+ mgr.try_acquire(2 * 1024 * 1024)
+ });
+ handles.push(handle);
+ }
+
+ let results: Vec<_> = futures::future::join_all(handles)
+ .await
+ .into_iter()
+ .map(|r| r.unwrap())
+ .collect();
+
+ // Only 1 should succeed (3MB limit, 2MB request, can only fit one)
+ let succeeded = results.iter().filter(|r| r.is_some()).count();
+ let failed = results.iter().filter(|r| r.is_none()).count();
+
+ assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
+ assert_eq!(failed, 2, "Expected 2 tasks to fail");
+
+ // Clean up
+ drop(results);
+ assert_eq!(manager.used_bytes(), 0);
+ }
}
diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs
index 3945c2d31e..4f9089c13d 100644
--- a/src/mito2/src/compaction/compactor.rs
+++ b/src/mito2/src/compaction/compactor.rs
@@ -396,6 +396,7 @@ impl DefaultCompactor {
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
+ max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
diff --git a/src/mito2/src/compaction/memory_manager.rs b/src/mito2/src/compaction/memory_manager.rs
new file mode 100644
index 0000000000..831e4c4e8b
--- /dev/null
+++ b/src/mito2/src/compaction/memory_manager.rs
@@ -0,0 +1,638 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+use std::time::Duration;
+use std::{fmt, mem};
+
+use common_telemetry::debug;
+use humantime::{format_duration, parse_duration};
+use serde::{Deserialize, Serialize};
+use snafu::ensure;
+use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
+
+use crate::error::{
+ CompactionMemoryLimitExceededSnafu, CompactionMemorySemaphoreClosedSnafu, Result,
+};
+use crate::metrics::{
+ COMPACTION_MEMORY_IN_USE, COMPACTION_MEMORY_LIMIT, COMPACTION_MEMORY_REJECTED,
+};
+
+/// Minimum bytes controlled by one semaphore permit.
+const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
+
+/// Default wait timeout for compaction memory.
+pub const DEFAULT_MEMORY_WAIT_TIMEOUT: Duration = Duration::from_secs(10);
+
+/// Defines how to react when compaction cannot acquire enough memory.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum OnExhaustedPolicy {
+ /// Wait until enough memory is released, bounded by timeout.
+ Wait { timeout: Duration },
+ /// Skip the compaction if memory is not immediately available.
+ Skip,
+}
+
+impl Default for OnExhaustedPolicy {
+ fn default() -> Self {
+ OnExhaustedPolicy::Wait {
+ timeout: DEFAULT_MEMORY_WAIT_TIMEOUT,
+ }
+ }
+}
+
+impl Serialize for OnExhaustedPolicy {
+ fn serialize(&self, serializer: S) -> Result
+ where
+ S: serde::Serializer,
+ {
+ let text = match self {
+ OnExhaustedPolicy::Skip => "skip".to_string(),
+ OnExhaustedPolicy::Wait { timeout } if *timeout == DEFAULT_MEMORY_WAIT_TIMEOUT => {
+ "wait".to_string()
+ }
+ OnExhaustedPolicy::Wait { timeout } => {
+ format!("wait({})", format_duration(*timeout))
+ }
+ };
+ serializer.serialize_str(&text)
+ }
+}
+
+impl<'de> Deserialize<'de> for OnExhaustedPolicy {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: serde::Deserializer<'de>,
+ {
+ let raw = String::deserialize(deserializer)?;
+ let lower = raw.to_ascii_lowercase();
+
+ if lower == "skip" {
+ return Ok(OnExhaustedPolicy::Skip);
+ }
+ if lower == "wait" {
+ return Ok(OnExhaustedPolicy::default());
+ }
+ if lower.starts_with("wait(") && lower.ends_with(')') {
+ let inner = &raw[5..raw.len() - 1];
+ let timeout = parse_duration(inner).map_err(serde::de::Error::custom)?;
+ return Ok(OnExhaustedPolicy::Wait { timeout });
+ }
+
+ Err(serde::de::Error::custom(format!(
+ "invalid compaction memory policy: {}, expected wait, wait() or skip",
+ raw
+ )))
+ }
+}
+
+/// Global memory manager for compaction tasks.
+#[derive(Clone)]
+pub struct CompactionMemoryManager {
+ quota: Option,
+}
+
+/// Shared memory quota state across all compaction guards.
+#[derive(Clone)]
+struct MemoryQuota {
+ semaphore: Arc,
+ // Maximum permits (aligned to PERMIT_GRANULARITY_BYTES).
+ limit_permits: u32,
+}
+
+impl CompactionMemoryManager {
+ /// Creates a new memory manager with the given limit in bytes.
+ /// `limit_bytes = 0` disables the limit.
+ pub fn new(limit_bytes: u64) -> Self {
+ if limit_bytes == 0 {
+ COMPACTION_MEMORY_LIMIT.set(0);
+ return Self { quota: None };
+ }
+
+ let limit_permits = bytes_to_permits(limit_bytes);
+ let limit_aligned_bytes = permits_to_bytes(limit_permits);
+ COMPACTION_MEMORY_LIMIT.set(limit_aligned_bytes as i64);
+
+ Self {
+ quota: Some(MemoryQuota {
+ semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
+ limit_permits,
+ }),
+ }
+ }
+
+ /// Returns the configured limit in bytes (0 if unlimited).
+ pub fn limit_bytes(&self) -> u64 {
+ self.quota
+ .as_ref()
+ .map(|quota| permits_to_bytes(quota.limit_permits))
+ .unwrap_or(0)
+ }
+
+ /// Returns currently used bytes.
+ pub fn used_bytes(&self) -> u64 {
+ self.quota
+ .as_ref()
+ .map(|quota| permits_to_bytes(quota.used_permits()))
+ .unwrap_or(0)
+ }
+
+ /// Returns available bytes.
+ pub fn available_bytes(&self) -> u64 {
+ self.quota
+ .as_ref()
+ .map(|quota| permits_to_bytes(quota.available_permits_clamped()))
+ .unwrap_or(0)
+ }
+
+ /// Acquires memory, waiting if necessary until enough is available.
+ ///
+ /// # Errors
+ /// - Returns error if requested bytes exceed the total limit
+ /// - Returns error if the semaphore is unexpectedly closed
+ pub async fn acquire(&self, bytes: u64) -> Result {
+ match &self.quota {
+ None => Ok(CompactionMemoryGuard::unlimited()),
+ Some(quota) => {
+ let permits = bytes_to_permits(bytes);
+
+ // Fail-fast: reject if request exceeds total limit.
+ ensure!(
+ permits <= quota.limit_permits,
+ CompactionMemoryLimitExceededSnafu {
+ requested_bytes: bytes,
+ limit_bytes: permits_to_bytes(quota.limit_permits),
+ }
+ );
+
+ let permit = quota
+ .semaphore
+ .clone()
+ .acquire_many_owned(permits)
+ .await
+ .map_err(|_| CompactionMemorySemaphoreClosedSnafu.build())?;
+ quota.update_in_use_metric();
+ Ok(CompactionMemoryGuard::limited(permit, quota.clone()))
+ }
+ }
+ }
+
+ /// Tries to acquire memory. Returns Some(guard) on success, None if insufficient.
+ pub fn try_acquire(&self, bytes: u64) -> Option {
+ match &self.quota {
+ None => Some(CompactionMemoryGuard::unlimited()),
+ Some(quota) => {
+ let permits = bytes_to_permits(bytes);
+
+ match quota.semaphore.clone().try_acquire_many_owned(permits) {
+ Ok(permit) => {
+ quota.update_in_use_metric();
+ Some(CompactionMemoryGuard::limited(permit, quota.clone()))
+ }
+ Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
+ COMPACTION_MEMORY_REJECTED
+ .with_label_values(&["try_acquire"])
+ .inc();
+ None
+ }
+ }
+ }
+ }
+ }
+}
+
+impl MemoryQuota {
+ fn used_permits(&self) -> u32 {
+ self.limit_permits
+ .saturating_sub(self.available_permits_clamped())
+ }
+
+ fn available_permits_clamped(&self) -> u32 {
+ // Clamp to limit_permits to ensure we never report more available permits
+ // than our configured limit, even if semaphore state becomes inconsistent.
+ self.semaphore
+ .available_permits()
+ .min(self.limit_permits as usize) as u32
+ }
+
+ fn update_in_use_metric(&self) {
+ let bytes = permits_to_bytes(self.used_permits());
+ COMPACTION_MEMORY_IN_USE.set(bytes as i64);
+ }
+}
+
+/// Guard representing a slice of reserved compaction memory.
+///
+/// Memory is automatically released when this guard is dropped.
+pub struct CompactionMemoryGuard {
+ state: GuardState,
+}
+
+enum GuardState {
+ Unlimited,
+ Limited {
+ // Holds all permits owned by this guard (base plus any additional).
+ // Additional requests merge into this permit and are released together on drop.
+ permit: OwnedSemaphorePermit,
+ // Memory quota for requesting additional permits and updating metrics.
+ quota: MemoryQuota,
+ },
+}
+
+impl CompactionMemoryGuard {
+ fn unlimited() -> Self {
+ Self {
+ state: GuardState::Unlimited,
+ }
+ }
+
+ fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota) -> Self {
+ Self {
+ state: GuardState::Limited { permit, quota },
+ }
+ }
+
+ /// Returns granted quota in bytes.
+ pub fn granted_bytes(&self) -> u64 {
+ match &self.state {
+ GuardState::Unlimited => 0,
+ GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
+ }
+ }
+
+ /// Tries to allocate additional memory during task execution.
+ ///
+ /// On success, merges the new memory into this guard and returns true.
+ /// On failure, returns false and leaves this guard unchanged.
+ ///
+ /// # Behavior
+ /// - Running tasks can request additional memory on top of their initial allocation
+ /// - If total memory (all tasks) would exceed limit, returns false immediately
+ /// - The task should gracefully handle false by failing or adjusting its strategy
+ /// - The additional memory is merged into this guard and released together on drop
+ pub fn request_additional(&mut self, bytes: u64) -> bool {
+ match &mut self.state {
+ GuardState::Unlimited => true,
+ GuardState::Limited { permit, quota } => {
+ // Early return for zero-byte requests (no-op)
+ if bytes == 0 {
+ return true;
+ }
+
+ let additional_permits = bytes_to_permits(bytes);
+
+ // Try to acquire additional permits from the quota
+ match quota
+ .semaphore
+ .clone()
+ .try_acquire_many_owned(additional_permits)
+ {
+ Ok(additional_permit) => {
+ // Merge into main permit
+ permit.merge(additional_permit);
+ quota.update_in_use_metric();
+
+ debug!("Allocated additional {} bytes", bytes);
+
+ true
+ }
+ Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
+ COMPACTION_MEMORY_REJECTED
+ .with_label_values(&["request_additional"])
+ .inc();
+ false
+ }
+ }
+ }
+ }
+ }
+
+ /// Releases a portion of granted memory back to the pool early,
+ /// before the guard is dropped.
+ ///
+ /// This is useful when a task's memory requirement decreases during execution
+ /// (e.g., after completing a memory-intensive phase). The guard remains valid
+ /// with reduced quota, and the task can continue running.
+ pub fn early_release_partial(&mut self, bytes: u64) -> bool {
+ match &mut self.state {
+ GuardState::Unlimited => true,
+ GuardState::Limited { permit, quota } => {
+ // Early return for zero-byte requests (no-op)
+ if bytes == 0 {
+ return true;
+ }
+
+ let release_permits = bytes_to_permits(bytes);
+
+ // Split out the permits we want to release
+ match permit.split(release_permits as usize) {
+ Some(released_permit) => {
+ let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
+
+ // Drop the split permit to return it to the quota
+ drop(released_permit);
+ quota.update_in_use_metric();
+
+ debug!(
+ "Early released {} bytes from compaction memory guard",
+ released_bytes
+ );
+
+ true
+ }
+ None => {
+ // Requested release exceeds granted amount
+ false
+ }
+ }
+ }
+ }
+ }
+}
+
+impl Drop for CompactionMemoryGuard {
+ fn drop(&mut self) {
+ if let GuardState::Limited { permit, quota } =
+ mem::replace(&mut self.state, GuardState::Unlimited)
+ {
+ let bytes = permits_to_bytes(permit.num_permits() as u32);
+
+ // Release permits before updating metrics to reflect latest usage.
+ drop(permit);
+ quota.update_in_use_metric();
+
+ debug!("Released compaction memory: {} bytes", bytes);
+ }
+ }
+}
+
+impl fmt::Debug for CompactionMemoryGuard {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("CompactionMemoryGuard")
+ .field("granted_bytes", &self.granted_bytes())
+ .finish()
+ }
+}
+
+fn bytes_to_permits(bytes: u64) -> u32 {
+ // Round up to the nearest permit.
+ // Returns 0 for bytes=0, which allows lazy allocation via request_additional().
+ // Non-zero bytes always round up to at least 1 permit due to the math.
+ bytes
+ .saturating_add(PERMIT_GRANULARITY_BYTES - 1)
+ .saturating_div(PERMIT_GRANULARITY_BYTES)
+ .min(Semaphore::MAX_PERMITS as u64)
+ .min(u32::MAX as u64) as u32
+}
+
+fn permits_to_bytes(permits: u32) -> u64 {
+ (permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
+}
+
+#[cfg(test)]
+mod tests {
+ use tokio::time::{Duration, sleep};
+
+ use super::*;
+
+ #[test]
+ fn test_try_acquire_unlimited() {
+ let manager = CompactionMemoryManager::new(0);
+ let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap();
+ assert_eq!(manager.limit_bytes(), 0);
+ assert_eq!(guard.granted_bytes(), 0);
+ }
+
+ #[test]
+ fn test_try_acquire_limited_success_and_release() {
+ let bytes = 2 * PERMIT_GRANULARITY_BYTES;
+ let manager = CompactionMemoryManager::new(bytes);
+ {
+ let guard = manager.try_acquire(PERMIT_GRANULARITY_BYTES).unwrap();
+ assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), PERMIT_GRANULARITY_BYTES);
+ drop(guard);
+ }
+ assert_eq!(manager.used_bytes(), 0);
+ }
+
+ #[test]
+ fn test_try_acquire_exceeds_limit() {
+ let limit = PERMIT_GRANULARITY_BYTES;
+ let manager = CompactionMemoryManager::new(limit);
+ let result = manager.try_acquire(limit + PERMIT_GRANULARITY_BYTES);
+ assert!(result.is_none());
+ }
+
+ #[tokio::test(flavor = "current_thread")]
+ async fn test_acquire_blocks_and_unblocks() {
+ let bytes = 2 * PERMIT_GRANULARITY_BYTES;
+ let manager = CompactionMemoryManager::new(bytes);
+ let guard = manager.try_acquire(bytes).unwrap();
+
+ // Spawn a task that will block on acquire()
+ let waiter = {
+ let manager = manager.clone();
+ tokio::spawn(async move {
+ // This will block until memory is available
+ let _guard = manager.acquire(bytes).await.unwrap();
+ })
+ };
+
+ sleep(Duration::from_millis(10)).await;
+ // Release memory - this should unblock the waiter
+ drop(guard);
+
+ // Waiter should complete now
+ waiter.await.unwrap();
+ }
+
+ #[test]
+ fn test_request_additional_success() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
+ let manager = CompactionMemoryManager::new(limit);
+
+ // Acquire base quota (5MB)
+ let base = 5 * PERMIT_GRANULARITY_BYTES;
+ let mut guard = manager.try_acquire(base).unwrap();
+ assert_eq!(guard.granted_bytes(), base);
+ assert_eq!(manager.used_bytes(), base);
+
+ // Request additional memory (3MB) - should succeed and merge
+ assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+ }
+
+ #[test]
+ fn test_request_additional_exceeds_limit() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
+ let manager = CompactionMemoryManager::new(limit);
+
+ // Acquire base quota (5MB)
+ let base = 5 * PERMIT_GRANULARITY_BYTES;
+ let mut guard = manager.try_acquire(base).unwrap();
+
+ // Request additional memory (3MB) - should succeed
+ assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+
+ // Request more (3MB) - should fail (would exceed 10MB limit)
+ let result = guard.request_additional(3 * PERMIT_GRANULARITY_BYTES);
+ assert!(!result);
+
+ // Still at 8MB
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+ }
+
+ #[test]
+ fn test_request_additional_auto_release_on_guard_drop() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES;
+ let manager = CompactionMemoryManager::new(limit);
+
+ {
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Request additional - memory is merged into guard
+ assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+
+ // When guard drops, all memory (base + additional) is released together
+ }
+
+ // After scope, all memory should be released
+ assert_eq!(manager.used_bytes(), 0);
+ }
+
+ #[test]
+ fn test_request_additional_unlimited() {
+ let manager = CompactionMemoryManager::new(0); // Unlimited
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Should always succeed with unlimited manager
+ assert!(guard.request_additional(100 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 0);
+ assert_eq!(manager.used_bytes(), 0);
+ }
+
+ #[test]
+ fn test_request_additional_zero_bytes() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES;
+ let manager = CompactionMemoryManager::new(limit);
+
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Request 0 bytes should succeed without affecting anything
+ assert!(guard.request_additional(0));
+ assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+ }
+
+ #[test]
+ fn test_early_release_partial_success() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES;
+ let manager = CompactionMemoryManager::new(limit);
+
+ let mut guard = manager.try_acquire(8 * PERMIT_GRANULARITY_BYTES).unwrap();
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+
+ // Release half
+ assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
+
+ // Released memory should be available to others
+ let _guard2 = manager.try_acquire(4 * PERMIT_GRANULARITY_BYTES).unwrap();
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+ }
+
+ #[test]
+ fn test_early_release_partial_exceeds_granted() {
+ let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES);
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Try to release more than granted - should fail
+ assert!(!guard.early_release_partial(10 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+ }
+
+ #[test]
+ fn test_early_release_partial_unlimited() {
+ let manager = CompactionMemoryManager::new(0);
+ let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Unlimited guard - release should succeed (no-op)
+ assert!(guard.early_release_partial(50 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 0);
+ }
+
+ #[test]
+ fn test_request_and_early_release_symmetry() {
+ let limit = 20 * PERMIT_GRANULARITY_BYTES;
+ let manager = CompactionMemoryManager::new(limit);
+
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Request additional
+ assert!(guard.request_additional(5 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
+
+ // Early release some
+ assert!(guard.early_release_partial(3 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
+
+ // Request again
+ assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
+
+ // Early release again
+ assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+
+ drop(guard);
+ assert_eq!(manager.used_bytes(), 0);
+ }
+
+ #[test]
+ fn test_small_allocation_rounds_up() {
+ // Test that allocations smaller than PERMIT_GRANULARITY_BYTES
+ // round up to 1 permit and can use request_additional()
+ let limit = 10 * PERMIT_GRANULARITY_BYTES;
+ let manager = CompactionMemoryManager::new(limit);
+
+ let mut guard = manager.try_acquire(512 * 1024).unwrap(); // 512KB
+ assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); // Rounds up to 1MB
+ assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); // Can request more
+ assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
+ }
+
+ #[test]
+ fn test_acquire_zero_bytes_lazy_allocation() {
+ // Test that acquire(0) returns 0 permits but can request_additional() later
+ let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES);
+
+ let mut guard = manager.try_acquire(0).unwrap();
+ assert_eq!(guard.granted_bytes(), 0); // No permits consumed
+ assert_eq!(manager.used_bytes(), 0);
+
+ assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); // Lazy allocation
+ assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
+ }
+}
diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs
index c952f4ba97..831d01fa02 100644
--- a/src/mito2/src/compaction/task.rs
+++ b/src/mito2/src/compaction/task.rs
@@ -22,10 +22,13 @@ use snafu::ResultExt;
use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
+use crate::compaction::memory_manager::{
+ CompactionMemoryGuard, CompactionMemoryManager, OnExhaustedPolicy,
+};
use crate::compaction::picker::{CompactionTask, PickerOutput};
-use crate::error::CompactRegionSnafu;
+use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
-use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
+use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
use crate::region::RegionRoleState;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
@@ -52,6 +55,12 @@ pub(crate) struct CompactionTaskImpl {
pub(crate) compactor: Arc,
/// Output of the picker.
pub(crate) picker_output: PickerOutput,
+ /// Memory manager to acquire memory budget.
+ pub(crate) memory_manager: Arc,
+ /// Policy when memory is exhausted.
+ pub(crate) memory_policy: OnExhaustedPolicy,
+ /// Estimated memory bytes needed for this compaction.
+ pub(crate) estimated_memory_bytes: u64,
}
impl Debug for CompactionTaskImpl {
@@ -81,6 +90,86 @@ impl CompactionTaskImpl {
.for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
}
+ /// Acquires memory budget based on the configured policy.
+ ///
+ /// Returns an error if memory cannot be acquired according to the policy.
+ async fn acquire_memory_with_policy(&self) -> error::Result {
+ let region_id = self.compaction_region.region_id;
+ let requested_bytes = self.estimated_memory_bytes;
+ let limit_bytes = self.memory_manager.limit_bytes();
+
+ if limit_bytes > 0 && requested_bytes > limit_bytes {
+ warn!(
+ "Compaction for region {} requires {} bytes but limit is {} bytes; cannot satisfy request",
+ region_id, requested_bytes, limit_bytes
+ );
+ return Err(CompactionMemoryExhaustedSnafu {
+ region_id,
+ required_bytes: requested_bytes,
+ limit_bytes,
+ policy: "exceed_limit".to_string(),
+ }
+ .build());
+ }
+
+ match self.memory_policy {
+ OnExhaustedPolicy::Wait {
+ timeout: wait_timeout,
+ } => {
+ let timer = COMPACTION_MEMORY_WAIT.start_timer();
+
+ match tokio::time::timeout(
+ wait_timeout,
+ self.memory_manager.acquire(requested_bytes),
+ )
+ .await
+ {
+ Ok(Ok(guard)) => {
+ timer.observe_duration();
+ Ok(guard)
+ }
+ Ok(Err(e)) => {
+ timer.observe_duration();
+ Err(e)
+ }
+ Err(_) => {
+ timer.observe_duration();
+ warn!(
+ "Compaction for region {} waited {:?} for {} bytes but timed out",
+ region_id, wait_timeout, requested_bytes
+ );
+ CompactionMemoryExhaustedSnafu {
+ region_id,
+ required_bytes: requested_bytes,
+ limit_bytes,
+ policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
+ }
+ .fail()
+ }
+ }
+ }
+ OnExhaustedPolicy::Skip => {
+ // Try to acquire, skip if not immediately available
+ self.memory_manager
+ .try_acquire(requested_bytes)
+ .ok_or_else(|| {
+ warn!(
+ "Skipping compaction for region {} due to memory limit \
+ (need {} bytes, limit {} bytes)",
+ region_id, requested_bytes, limit_bytes
+ );
+ CompactionMemoryExhaustedSnafu {
+ region_id,
+ required_bytes: requested_bytes,
+ limit_bytes,
+ policy: "skip".to_string(),
+ }
+ .build()
+ })
+ }
+ }
+ }
+
/// Remove expired ssts files, update manifest immediately
/// and apply the edit to region version.
///
@@ -222,7 +311,7 @@ impl CompactionTaskImpl {
}
/// Handles compaction failure, notifies all waiters.
- fn on_failure(&mut self, err: Arc) {
+ pub(crate) fn on_failure(&mut self, err: Arc) {
COMPACTION_FAILURE_COUNT.inc();
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
@@ -249,6 +338,26 @@ impl CompactionTaskImpl {
#[async_trait::async_trait]
impl CompactionTask for CompactionTaskImpl {
async fn run(&mut self) {
+ // Acquire memory budget before starting compaction
+ let _memory_guard = match self.acquire_memory_with_policy().await {
+ Ok(guard) => guard,
+ Err(e) => {
+ error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id);
+ let err = Arc::new(e);
+ self.on_failure(err.clone());
+ let notify = BackgroundNotify::CompactionFailed(CompactionFailed {
+ region_id: self.compaction_region.region_id,
+ err,
+ });
+ self.send_to_worker(WorkerRequest::Background {
+ region_id: self.compaction_region.region_id,
+ notify,
+ })
+ .await;
+ return;
+ }
+ };
+
let notify = match self.handle_expiration_and_compaction().await {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,
diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs
index d5cf251a68..6061e294bd 100644
--- a/src/mito2/src/compaction/test_util.rs
+++ b/src/mito2/src/compaction/test_util.rs
@@ -74,6 +74,7 @@ pub fn new_file_handle_with_size_and_sequence(
),
level,
file_size,
+ max_row_group_uncompressed_size: file_size,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index 53cc745fe5..11ce9de9ea 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -26,6 +26,7 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
+use crate::compaction::memory_manager::OnExhaustedPolicy;
use crate::error::Result;
use crate::gc::GcConfig;
use crate::memtable::MemtableConfig;
@@ -92,6 +93,10 @@ pub struct MitoConfig {
pub max_background_compactions: usize,
/// Max number of running background purge jobs (default: number of cpu cores).
pub max_background_purges: usize,
+ /// Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
+ pub experimental_compaction_memory_limit: MemoryLimit,
+ /// Behavior when compaction cannot acquire memory from the budget.
+ pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
// Flush configs:
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
@@ -178,6 +183,8 @@ impl Default for MitoConfig {
max_background_flushes: divide_num_cpus(2),
max_background_compactions: divide_num_cpus(4),
max_background_purges: get_total_cpu_cores(),
+ experimental_compaction_memory_limit: MemoryLimit::Unlimited,
+ experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs
index d357c68774..90a735381b 100644
--- a/src/mito2/src/error.rs
+++ b/src/mito2/src/error.rs
@@ -1041,6 +1041,40 @@ pub enum Error {
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
+ #[snafu(display(
+ "Compaction memory limit exceeded for region {}: required {} bytes, limit {} bytes (policy: {})",
+ region_id,
+ required_bytes,
+ limit_bytes,
+ policy
+ ))]
+ CompactionMemoryExhausted {
+ region_id: RegionId,
+ required_bytes: u64,
+ limit_bytes: u64,
+ policy: String,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display(
+ "Requested compaction memory ({} bytes) exceeds total limit ({} bytes)",
+ requested_bytes,
+ limit_bytes
+ ))]
+ CompactionMemoryLimitExceeded {
+ requested_bytes: u64,
+ limit_bytes: u64,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Compaction memory semaphore unexpectedly closed"))]
+ CompactionMemorySemaphoreClosed {
+ #[snafu(implicit)]
+ location: Location,
+ },
+
#[snafu(display(
"Incompatible WAL provider change. This is typically caused by changing WAL provider in database config file without completely cleaning existing files. Global provider: {}, region provider: {}",
global,
@@ -1323,6 +1357,12 @@ impl ErrorExt for Error {
ManualCompactionOverride {} => StatusCode::Cancelled,
+ CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted,
+
+ CompactionMemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
+
+ CompactionMemorySemaphoreClosed { .. } => StatusCode::Unexpected,
+
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
ScanSeries { source, .. } => source.status_code(),
diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs
index 8371fd52c7..695f919979 100644
--- a/src/mito2/src/flush.rs
+++ b/src/mito2/src/flush.rs
@@ -640,6 +640,7 @@ impl RegionFlushTask {
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
+ max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
available_indexes: sst_info.index_metadata.build_available_indexes(),
indexes: sst_info.index_metadata.build_indexes(),
index_file_size: sst_info.index_metadata.file_size,
diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs
index d709bb96f0..c584ae3276 100644
--- a/src/mito2/src/manifest/tests/checkpoint.rs
+++ b/src/mito2/src/manifest/tests/checkpoint.rs
@@ -244,6 +244,7 @@ async fn checkpoint_with_different_compression_types() {
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
+ max_row_group_uncompressed_size: 1024000,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
@@ -309,6 +310,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec)
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
+ max_row_group_uncompressed_size: 1024000,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs
index 6aedf9a5d7..4bf6d0fadb 100644
--- a/src/mito2/src/memtable/bulk/part.rs
+++ b/src/mito2/src/memtable/bulk/part.rs
@@ -974,6 +974,19 @@ impl EncodedBulkPart {
/// Returns a `SstInfo` instance with information derived from this bulk part's metadata
pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
let unit = self.metadata.region_metadata.time_index_type().unit();
+ let max_row_group_uncompressed_size: u64 = self
+ .metadata
+ .parquet_metadata
+ .row_groups()
+ .iter()
+ .map(|rg| {
+ rg.columns()
+ .iter()
+ .map(|c| c.uncompressed_size() as u64)
+ .sum::()
+ })
+ .max()
+ .unwrap_or(0);
SstInfo {
file_id,
time_range: (
@@ -981,6 +994,7 @@ impl EncodedBulkPart {
Timestamp::new(self.metadata.max_timestamp, unit),
),
file_size: self.data.len() as u64,
+ max_row_group_uncompressed_size,
num_rows: self.metadata.num_rows,
num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
file_metadata: Some(self.metadata.parquet_metadata.clone()),
diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs
index 1e35ae1c06..be5f4945fd 100644
--- a/src/mito2/src/metrics.rs
+++ b/src/mito2/src/metrics.rs
@@ -157,6 +157,35 @@ lazy_static! {
"greptime_mito_inflight_compaction_count",
"inflight compaction count",
).unwrap();
+
+ /// Bytes reserved by compaction memory manager.
+ pub static ref COMPACTION_MEMORY_IN_USE: IntGauge =
+ register_int_gauge!(
+ "greptime_mito_compaction_memory_in_use_bytes",
+ "bytes currently reserved for compaction tasks",
+ )
+ .unwrap();
+ /// Configured compaction memory limit.
+ pub static ref COMPACTION_MEMORY_LIMIT: IntGauge =
+ register_int_gauge!(
+ "greptime_mito_compaction_memory_limit_bytes",
+ "maximum bytes allowed for compaction tasks",
+ )
+ .unwrap();
+ /// Wait time to obtain compaction memory.
+ pub static ref COMPACTION_MEMORY_WAIT: Histogram = register_histogram!(
+ "greptime_mito_compaction_memory_wait_seconds",
+ "time waiting for compaction memory",
+ // 0.01s ~ ~10s
+ exponential_buckets(0.01, 2.0, 10).unwrap(),
+ ).unwrap();
+ /// Counter of rejected compaction memory allocations.
+ pub static ref COMPACTION_MEMORY_REJECTED: IntCounterVec =
+ register_int_counter_vec!(
+ "greptime_mito_compaction_memory_rejected_total",
+ "number of compaction tasks rejected due to memory limit",
+ &[TYPE_LABEL]
+ ).unwrap();
}
// Query metrics.
diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs
index 1191349b04..59920ad945 100644
--- a/src/mito2/src/remap_manifest.rs
+++ b/src/mito2/src/remap_manifest.rs
@@ -425,6 +425,7 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 1024,
+ max_row_group_uncompressed_size: 1024,
available_indexes: SmallVec::new(),
indexes: Default::default(),
index_file_size: 0,
diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs
index 7ab8ee132b..94209d7b0c 100644
--- a/src/mito2/src/sst/file.rs
+++ b/src/mito2/src/sst/file.rs
@@ -180,6 +180,8 @@ pub struct FileMeta {
pub level: Level,
/// Size of the file.
pub file_size: u64,
+ /// Maximum uncompressed row group size of the file. 0 means unknown.
+ pub max_row_group_uncompressed_size: u64,
/// Available indexes of the file.
pub available_indexes: IndexTypes,
/// Created indexes of the file for each column.
@@ -248,7 +250,11 @@ impl Debug for FileMeta {
)
})
.field("level", &self.level)
- .field("file_size", &ReadableSize(self.file_size));
+ .field("file_size", &ReadableSize(self.file_size))
+ .field(
+ "max_row_group_uncompressed_size",
+ &ReadableSize(self.max_row_group_uncompressed_size),
+ );
if !self.available_indexes.is_empty() {
debug_struct
.field("available_indexes", &self.available_indexes)
@@ -652,6 +658,7 @@ mod tests {
time_range: FileTimeRange::default(),
level,
file_size: 0,
+ max_row_group_uncompressed_size: 0,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
@@ -703,6 +710,7 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 0,
+ max_row_group_uncompressed_size: 0,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs
index 2715223a51..5b23f3e069 100644
--- a/src/mito2/src/sst/file_purger.rs
+++ b/src/mito2/src/sst/file_purger.rs
@@ -251,6 +251,7 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
+ max_row_group_uncompressed_size: 4096,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
@@ -321,6 +322,7 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
+ max_row_group_uncompressed_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs
index 5cb7dc8c88..9e69bd42cf 100644
--- a/src/mito2/src/sst/file_ref.rs
+++ b/src/mito2/src/sst/file_ref.rs
@@ -224,6 +224,7 @@ mod tests {
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
+ max_row_group_uncompressed_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
indexes: vec![ColumnIndexMetadata {
column_id: 0,
diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs
index ecd8f87343..f17d11aff7 100644
--- a/src/mito2/src/sst/index.rs
+++ b/src/mito2/src/sst/index.rs
@@ -1549,6 +1549,7 @@ mod tests {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
+ max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
@@ -1623,6 +1624,7 @@ mod tests {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
+ max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
@@ -1726,6 +1728,7 @@ mod tests {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
+ max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs
index 37918553d5..21187bedd3 100644
--- a/src/mito2/src/sst/parquet.rs
+++ b/src/mito2/src/sst/parquet.rs
@@ -76,6 +76,8 @@ pub struct SstInfo {
pub time_range: FileTimeRange,
/// File size in bytes.
pub file_size: u64,
+ /// Maximum uncompressed row group size in bytes. 0 if unknown.
+ pub max_row_group_uncompressed_size: u64,
/// Number of rows.
pub num_rows: usize,
/// Number of row groups
@@ -771,6 +773,7 @@ mod tests {
time_range: info.time_range,
level: 0,
file_size: info.file_size,
+ max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
available_indexes: info.index_metadata.build_available_indexes(),
indexes: info.index_metadata.build_indexes(),
index_file_size: info.index_metadata.file_size,
diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs
index e30c7fa0b8..8c03a51368 100644
--- a/src/mito2/src/sst/parquet/writer.rs
+++ b/src/mito2/src/sst/parquet/writer.rs
@@ -213,11 +213,23 @@ where
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
+ let max_row_group_uncompressed_size: u64 = parquet_metadata
+ .row_groups()
+ .iter()
+ .map(|rg| {
+ rg.columns()
+ .iter()
+ .map(|c| c.uncompressed_size() as u64)
+ .sum::()
+ })
+ .max()
+ .unwrap_or(0);
let num_series = stats.series_estimator.finish();
ssts.push(SstInfo {
file_id: self.current_file,
time_range,
file_size,
+ max_row_group_uncompressed_size,
num_rows: stats.num_rows,
num_row_groups: parquet_metadata.num_row_groups() as u64,
file_metadata: Some(Arc::new(parquet_metadata)),
diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs
index b481acf317..bdf8ca05e6 100644
--- a/src/mito2/src/test_util/scheduler_util.rs
+++ b/src/mito2/src/test_util/scheduler_util.rs
@@ -28,6 +28,7 @@ use tokio::sync::mpsc::Sender;
use crate::access_layer::{AccessLayer, AccessLayerRef};
use crate::cache::CacheManager;
use crate::compaction::CompactionScheduler;
+use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy};
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::FlushScheduler;
@@ -100,6 +101,8 @@ impl SchedulerEnv {
Arc::new(MitoConfig::default()),
WorkerListener::default(),
Plugins::new(),
+ Arc::new(CompactionMemoryManager::new(0)),
+ OnExhaustedPolicy::default(),
)
}
diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs
index d92c628067..9f7b6a0658 100644
--- a/src/mito2/src/test_util/sst_util.rs
+++ b/src/mito2/src/test_util/sst_util.rs
@@ -123,6 +123,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
),
level: 0,
file_size: 0,
+ max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs
index b381b16c05..7a407ece4f 100644
--- a/src/mito2/src/test_util/version_util.rs
+++ b/src/mito2/src/test_util/version_util.rs
@@ -101,6 +101,7 @@ impl VersionControlBuilder {
),
level: 0,
file_size: 0, // We don't care file size.
+ max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
@@ -192,6 +193,7 @@ pub(crate) fn apply_edit(
),
level: 0,
file_size: 0, // We don't care file size.
+ max_row_group_uncompressed_size: 0,
available_indexes: Default::default(),
indexes: Default::default(),
index_file_size: 0,
diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs
index 05114d63a1..69f41e905a 100644
--- a/src/mito2/src/worker.rs
+++ b/src/mito2/src/worker.rs
@@ -41,6 +41,7 @@ use common_base::readable_size::ReadableSize;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_runtime::JoinHandle;
+use common_stat::get_total_memory_bytes;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
@@ -58,6 +59,7 @@ use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
+use crate::compaction::memory_manager::CompactionMemoryManager;
use crate::config::MitoConfig;
use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
@@ -205,6 +207,17 @@ impl WorkerGroup {
.build(),
);
let time_provider = Arc::new(StdTimeProvider);
+ let total_memory = get_total_memory_bytes();
+ let total_memory = if total_memory > 0 {
+ total_memory as u64
+ } else {
+ 0
+ };
+ let compaction_limit_bytes = config
+ .experimental_compaction_memory_limit
+ .resolve(total_memory);
+ let compaction_memory_manager =
+ Arc::new(CompactionMemoryManager::new(compaction_limit_bytes));
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
let workers = (0..config.num_workers)
@@ -221,6 +234,7 @@ impl WorkerGroup {
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
+ compaction_memory_manager: compaction_memory_manager.clone(),
puffin_manager_factory: puffin_manager_factory.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
@@ -381,6 +395,17 @@ impl WorkerGroup {
.write_cache(write_cache)
.build(),
);
+ let total_memory = get_total_memory_bytes();
+ let total_memory = if total_memory > 0 {
+ total_memory as u64
+ } else {
+ 0
+ };
+ let compaction_limit_bytes = config
+ .experimental_compaction_memory_limit
+ .resolve(total_memory);
+ let compaction_memory_manager =
+ Arc::new(CompactionMemoryManager::new(compaction_limit_bytes));
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
let workers = (0..config.num_workers)
.map(|id| {
@@ -396,6 +421,7 @@ impl WorkerGroup {
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
+ compaction_memory_manager: compaction_memory_manager.clone(),
puffin_manager_factory: puffin_manager_factory.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
@@ -482,6 +508,7 @@ struct WorkerStarter {
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
+ compaction_memory_manager: Arc,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
@@ -534,9 +561,11 @@ impl WorkerStarter {
self.compact_job_pool,
sender.clone(),
self.cache_manager.clone(),
- self.config,
+ self.config.clone(),
self.listener.clone(),
self.plugins.clone(),
+ self.compaction_memory_manager.clone(),
+ self.config.experimental_compaction_on_exhausted,
),
stalled_requests: StalledRequests::default(),
listener: self.listener,
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 7192396db5..e91f2c7fac 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1495,6 +1495,8 @@ manifest_checkpoint_distance = 10
experimental_manifest_keep_removed_file_count = 256
experimental_manifest_keep_removed_file_ttl = "1h"
compress_manifest = false
+experimental_compaction_memory_limit = "unlimited"
+experimental_compaction_on_exhausted = "wait"
auto_flush_interval = "30m"
enable_write_cache = false
write_cache_path = ""