From baffed8c6ae075cf0999e97f9718d8ac19d3ca6d Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Fri, 12 Dec 2025 14:49:58 +0800 Subject: [PATCH] feat: mem manager on compaction (#7305) * feat: mem manager on compaction Signed-off-by: jeremyhi * fix: by copilot review comment Signed-off-by: jeremyhi * feat: experimental_ Signed-off-by: jeremyhi * fix: refine estimate_compaction_bytes Signed-off-by: jeremyhi * feat: make them into config example Signed-off-by: jeremyhi * chore: by copilot comment Signed-off-by: jeremyhi * Update src/mito2/src/compaction.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * fix: dedup the regions waiting Signed-off-by: jeremyhi * chore: by comment Signed-off-by: jeremyhi * chore: minor change Signed-off-by: jeremyhi * feat: add AdditionalMemoryGuard for the running compaction task Signed-off-by: jeremyhi * refactor: do OnExhaustedPolicy before running task Signed-off-by: jeremyhi * refactor: use OwnedSemaphorePermit to impl guard Signed-off-by: jeremyhi * feat: add early_release_partial method to release a portion of memory Signed-off-by: jeremyhi * fix: 0 bytes make request_additional unlimited Signed-off-by: jeremyhi * fix: fail-fast on acquire Signed-off-by: jeremyhi --------- Signed-off-by: jeremyhi Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- Cargo.lock | 1 + config/config.md | 4 + config/datanode.example.toml | 9 + config/standalone.example.toml | 9 + src/cmd/src/datanode/objbench.rs | 12 + src/mito2/Cargo.toml | 1 + src/mito2/src/compaction.rs | 80 ++- src/mito2/src/compaction/compactor.rs | 1 + src/mito2/src/compaction/memory_manager.rs | 638 +++++++++++++++++++++ src/mito2/src/compaction/task.rs | 115 +++- src/mito2/src/compaction/test_util.rs | 1 + src/mito2/src/config.rs | 7 + src/mito2/src/error.rs | 40 ++ src/mito2/src/flush.rs | 1 + src/mito2/src/manifest/tests/checkpoint.rs | 2 + src/mito2/src/memtable/bulk/part.rs | 14 + src/mito2/src/metrics.rs | 29 + src/mito2/src/remap_manifest.rs | 1 + src/mito2/src/sst/file.rs | 10 +- src/mito2/src/sst/file_purger.rs | 2 + src/mito2/src/sst/file_ref.rs | 1 + src/mito2/src/sst/index.rs | 3 + src/mito2/src/sst/parquet.rs | 3 + src/mito2/src/sst/parquet/writer.rs | 12 + src/mito2/src/test_util/scheduler_util.rs | 3 + src/mito2/src/test_util/sst_util.rs | 1 + src/mito2/src/test_util/version_util.rs | 2 + src/mito2/src/worker.rs | 31 +- tests-integration/tests/http.rs | 2 + 29 files changed, 1024 insertions(+), 11 deletions(-) create mode 100644 src/mito2/src/compaction/memory_manager.rs diff --git a/Cargo.lock b/Cargo.lock index aac00fdc98..6781ac3895 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7672,6 +7672,7 @@ dependencies = [ "either", "futures", "greptime-proto", + "humantime", "humantime-serde", "index", "itertools 0.14.0", diff --git a/config/config.md b/config/config.md index 8b499ca7ee..dc663cead4 100644 --- a/config/config.md +++ b/config/config.md @@ -141,6 +141,8 @@ | `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). | | `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). | | `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). | +| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. | +| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "skip" | | `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. | | `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. | | `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. | @@ -521,6 +523,8 @@ | `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). | | `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). | | `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). | +| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. | +| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "skip" | | `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. | | `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. | | `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index bb769c4625..0159fc237b 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -452,6 +452,15 @@ compress_manifest = false ## @toml2docs:none-default="Auto" #+ max_background_purges = 8 +## Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. +## @toml2docs:none-default="0" +#+ experimental_compaction_memory_limit = "0" + +## Behavior when compaction cannot acquire memory from the budget. +## Options: "wait" (default, 10s), "wait()", "skip" +## @toml2docs:none-default="wait" +#+ experimental_compaction_on_exhausted = "wait" + ## Interval to auto flush a region if it has not flushed yet. auto_flush_interval = "1h" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 661067d2a1..653b0cb4af 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -546,6 +546,15 @@ compress_manifest = false ## @toml2docs:none-default="Auto" #+ max_background_purges = 8 +## Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. +## @toml2docs:none-default="0" +#+ experimental_compaction_memory_limit = "0" + +## Behavior when compaction cannot acquire memory from the budget. +## Options: "wait" (default, 10s), "wait()", "skip" +## @toml2docs:none-default="wait" +#+ experimental_compaction_on_exhausted = "wait" + ## Interval to auto flush a region if it has not flushed yet. auto_flush_interval = "1h" diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index 61b47552eb..0a3f27b77e 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -145,6 +145,17 @@ impl ObjbenchCommand { let region_meta = extract_region_metadata(&self.source, &parquet_meta)?; let num_rows = parquet_meta.file_metadata().num_rows() as u64; let num_row_groups = parquet_meta.num_row_groups() as u64; + let max_row_group_uncompressed_size: u64 = parquet_meta + .row_groups() + .iter() + .map(|rg| { + rg.columns() + .iter() + .map(|c| c.uncompressed_size() as u64) + .sum::() + }) + .max() + .unwrap_or(0); println!( "{} Metadata loaded - rows: {}, size: {} bytes", @@ -160,6 +171,7 @@ impl ObjbenchCommand { time_range: Default::default(), level: 0, file_size, + max_row_group_uncompressed_size, available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index a3686251bb..68d996723e 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -48,6 +48,7 @@ dotenv.workspace = true either.workspace = true futures.workspace = true humantime-serde.workspace = true +humantime.workspace = true index.workspace = true itertools.workspace = true greptime-proto.workspace = true diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 4fe93f11fb..bda22ac652 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -14,6 +14,7 @@ mod buckets; pub mod compactor; +pub mod memory_manager; pub mod picker; pub mod run; mod task; @@ -46,7 +47,8 @@ use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor}; -use crate::compaction::picker::{CompactionTask, new_picker}; +use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy}; +use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker}; use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; use crate::error::{ @@ -104,12 +106,15 @@ pub(crate) struct CompactionScheduler { request_sender: Sender, cache_manager: CacheManagerRef, engine_config: Arc, + memory_manager: Arc, + memory_policy: OnExhaustedPolicy, listener: WorkerListener, /// Plugins for the compaction scheduler. plugins: Plugins, } impl CompactionScheduler { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( scheduler: SchedulerRef, request_sender: Sender, @@ -117,6 +122,8 @@ impl CompactionScheduler { engine_config: Arc, listener: WorkerListener, plugins: Plugins, + memory_manager: Arc, + memory_policy: OnExhaustedPolicy, ) -> Self { Self { scheduler, @@ -124,6 +131,8 @@ impl CompactionScheduler { request_sender, cache_manager, engine_config, + memory_manager, + memory_policy, listener, plugins, } @@ -429,7 +438,8 @@ impl CompactionScheduler { }; // Create a local compaction task. - let mut local_compaction_task = Box::new(CompactionTaskImpl { + let estimated_bytes = estimate_compaction_bytes(&picker_output); + let local_compaction_task = Box::new(CompactionTaskImpl { request_sender, waiters, start_time, @@ -437,18 +447,27 @@ impl CompactionScheduler { picker_output, compaction_region, compactor: Arc::new(DefaultCompactor {}), + memory_manager: self.memory_manager.clone(), + memory_policy: self.memory_policy, + estimated_memory_bytes: estimated_bytes, }); - // Submit the compaction task. + self.submit_compaction_task(local_compaction_task, region_id) + } + + fn submit_compaction_task( + &mut self, + mut task: Box, + region_id: RegionId, + ) -> Result<()> { self.scheduler .schedule(Box::pin(async move { INFLIGHT_COMPACTION_COUNT.inc(); - local_compaction_task.run().await; + task.run().await; INFLIGHT_COMPACTION_COUNT.dec(); })) .map_err(|e| { error!(e; "Failed to submit compaction request for region {}", region_id); - // If failed to submit the job, we need to remove the region from the scheduler. self.region_status.remove(®ion_id); e }) @@ -758,6 +777,20 @@ fn get_expired_ssts( .collect() } +/// Estimates compaction memory as the sum of all input files' maximum row-group +/// uncompressed sizes. +fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 { + picker_output + .outputs + .iter() + .flat_map(|output| output.inputs.iter()) + .map(|file: &FileHandle| { + let meta = file.meta_ref(); + meta.max_row_group_uncompressed_size + }) + .sum() +} + /// Pending compaction request that is supposed to run after current task is finished, /// typically used for manual compactions. struct PendingCompaction { @@ -773,7 +806,7 @@ struct PendingCompaction { mod tests { use api::v1::region::StrictWindow; use common_datasource::compression::CompressionType; - use tokio::sync::oneshot; + use tokio::sync::{Barrier, oneshot}; use super::*; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; @@ -1145,4 +1178,39 @@ mod tests { assert_eq!(result.unwrap(), 0); // is there a better way to check this? assert_eq!(0, scheduler.region_status.len()); } + + #[tokio::test] + async fn test_concurrent_memory_competition() { + let manager = Arc::new(CompactionMemoryManager::new(3 * 1024 * 1024)); // 3MB + let barrier = Arc::new(Barrier::new(3)); + let mut handles = vec![]; + + // Spawn 3 tasks competing for memory, each trying to acquire 2MB + for _i in 0..3 { + let mgr = manager.clone(); + let bar = barrier.clone(); + let handle = tokio::spawn(async move { + bar.wait().await; // Synchronize start + mgr.try_acquire(2 * 1024 * 1024) + }); + handles.push(handle); + } + + let results: Vec<_> = futures::future::join_all(handles) + .await + .into_iter() + .map(|r| r.unwrap()) + .collect(); + + // Only 1 should succeed (3MB limit, 2MB request, can only fit one) + let succeeded = results.iter().filter(|r| r.is_some()).count(); + let failed = results.iter().filter(|r| r.is_none()).count(); + + assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory"); + assert_eq!(failed, 2, "Expected 2 tasks to fail"); + + // Clean up + drop(results); + assert_eq!(manager.used_bytes(), 0); + } } diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 3945c2d31e..4f9089c13d 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -396,6 +396,7 @@ impl DefaultCompactor { time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, + max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size, available_indexes: sst_info.index_metadata.build_available_indexes(), indexes: sst_info.index_metadata.build_indexes(), index_file_size: sst_info.index_metadata.file_size, diff --git a/src/mito2/src/compaction/memory_manager.rs b/src/mito2/src/compaction/memory_manager.rs new file mode 100644 index 0000000000..831e4c4e8b --- /dev/null +++ b/src/mito2/src/compaction/memory_manager.rs @@ -0,0 +1,638 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; +use std::{fmt, mem}; + +use common_telemetry::debug; +use humantime::{format_duration, parse_duration}; +use serde::{Deserialize, Serialize}; +use snafu::ensure; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError}; + +use crate::error::{ + CompactionMemoryLimitExceededSnafu, CompactionMemorySemaphoreClosedSnafu, Result, +}; +use crate::metrics::{ + COMPACTION_MEMORY_IN_USE, COMPACTION_MEMORY_LIMIT, COMPACTION_MEMORY_REJECTED, +}; + +/// Minimum bytes controlled by one semaphore permit. +const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB + +/// Default wait timeout for compaction memory. +pub const DEFAULT_MEMORY_WAIT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Defines how to react when compaction cannot acquire enough memory. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OnExhaustedPolicy { + /// Wait until enough memory is released, bounded by timeout. + Wait { timeout: Duration }, + /// Skip the compaction if memory is not immediately available. + Skip, +} + +impl Default for OnExhaustedPolicy { + fn default() -> Self { + OnExhaustedPolicy::Wait { + timeout: DEFAULT_MEMORY_WAIT_TIMEOUT, + } + } +} + +impl Serialize for OnExhaustedPolicy { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let text = match self { + OnExhaustedPolicy::Skip => "skip".to_string(), + OnExhaustedPolicy::Wait { timeout } if *timeout == DEFAULT_MEMORY_WAIT_TIMEOUT => { + "wait".to_string() + } + OnExhaustedPolicy::Wait { timeout } => { + format!("wait({})", format_duration(*timeout)) + } + }; + serializer.serialize_str(&text) + } +} + +impl<'de> Deserialize<'de> for OnExhaustedPolicy { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let raw = String::deserialize(deserializer)?; + let lower = raw.to_ascii_lowercase(); + + if lower == "skip" { + return Ok(OnExhaustedPolicy::Skip); + } + if lower == "wait" { + return Ok(OnExhaustedPolicy::default()); + } + if lower.starts_with("wait(") && lower.ends_with(')') { + let inner = &raw[5..raw.len() - 1]; + let timeout = parse_duration(inner).map_err(serde::de::Error::custom)?; + return Ok(OnExhaustedPolicy::Wait { timeout }); + } + + Err(serde::de::Error::custom(format!( + "invalid compaction memory policy: {}, expected wait, wait() or skip", + raw + ))) + } +} + +/// Global memory manager for compaction tasks. +#[derive(Clone)] +pub struct CompactionMemoryManager { + quota: Option, +} + +/// Shared memory quota state across all compaction guards. +#[derive(Clone)] +struct MemoryQuota { + semaphore: Arc, + // Maximum permits (aligned to PERMIT_GRANULARITY_BYTES). + limit_permits: u32, +} + +impl CompactionMemoryManager { + /// Creates a new memory manager with the given limit in bytes. + /// `limit_bytes = 0` disables the limit. + pub fn new(limit_bytes: u64) -> Self { + if limit_bytes == 0 { + COMPACTION_MEMORY_LIMIT.set(0); + return Self { quota: None }; + } + + let limit_permits = bytes_to_permits(limit_bytes); + let limit_aligned_bytes = permits_to_bytes(limit_permits); + COMPACTION_MEMORY_LIMIT.set(limit_aligned_bytes as i64); + + Self { + quota: Some(MemoryQuota { + semaphore: Arc::new(Semaphore::new(limit_permits as usize)), + limit_permits, + }), + } + } + + /// Returns the configured limit in bytes (0 if unlimited). + pub fn limit_bytes(&self) -> u64 { + self.quota + .as_ref() + .map(|quota| permits_to_bytes(quota.limit_permits)) + .unwrap_or(0) + } + + /// Returns currently used bytes. + pub fn used_bytes(&self) -> u64 { + self.quota + .as_ref() + .map(|quota| permits_to_bytes(quota.used_permits())) + .unwrap_or(0) + } + + /// Returns available bytes. + pub fn available_bytes(&self) -> u64 { + self.quota + .as_ref() + .map(|quota| permits_to_bytes(quota.available_permits_clamped())) + .unwrap_or(0) + } + + /// Acquires memory, waiting if necessary until enough is available. + /// + /// # Errors + /// - Returns error if requested bytes exceed the total limit + /// - Returns error if the semaphore is unexpectedly closed + pub async fn acquire(&self, bytes: u64) -> Result { + match &self.quota { + None => Ok(CompactionMemoryGuard::unlimited()), + Some(quota) => { + let permits = bytes_to_permits(bytes); + + // Fail-fast: reject if request exceeds total limit. + ensure!( + permits <= quota.limit_permits, + CompactionMemoryLimitExceededSnafu { + requested_bytes: bytes, + limit_bytes: permits_to_bytes(quota.limit_permits), + } + ); + + let permit = quota + .semaphore + .clone() + .acquire_many_owned(permits) + .await + .map_err(|_| CompactionMemorySemaphoreClosedSnafu.build())?; + quota.update_in_use_metric(); + Ok(CompactionMemoryGuard::limited(permit, quota.clone())) + } + } + } + + /// Tries to acquire memory. Returns Some(guard) on success, None if insufficient. + pub fn try_acquire(&self, bytes: u64) -> Option { + match &self.quota { + None => Some(CompactionMemoryGuard::unlimited()), + Some(quota) => { + let permits = bytes_to_permits(bytes); + + match quota.semaphore.clone().try_acquire_many_owned(permits) { + Ok(permit) => { + quota.update_in_use_metric(); + Some(CompactionMemoryGuard::limited(permit, quota.clone())) + } + Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { + COMPACTION_MEMORY_REJECTED + .with_label_values(&["try_acquire"]) + .inc(); + None + } + } + } + } + } +} + +impl MemoryQuota { + fn used_permits(&self) -> u32 { + self.limit_permits + .saturating_sub(self.available_permits_clamped()) + } + + fn available_permits_clamped(&self) -> u32 { + // Clamp to limit_permits to ensure we never report more available permits + // than our configured limit, even if semaphore state becomes inconsistent. + self.semaphore + .available_permits() + .min(self.limit_permits as usize) as u32 + } + + fn update_in_use_metric(&self) { + let bytes = permits_to_bytes(self.used_permits()); + COMPACTION_MEMORY_IN_USE.set(bytes as i64); + } +} + +/// Guard representing a slice of reserved compaction memory. +/// +/// Memory is automatically released when this guard is dropped. +pub struct CompactionMemoryGuard { + state: GuardState, +} + +enum GuardState { + Unlimited, + Limited { + // Holds all permits owned by this guard (base plus any additional). + // Additional requests merge into this permit and are released together on drop. + permit: OwnedSemaphorePermit, + // Memory quota for requesting additional permits and updating metrics. + quota: MemoryQuota, + }, +} + +impl CompactionMemoryGuard { + fn unlimited() -> Self { + Self { + state: GuardState::Unlimited, + } + } + + fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota) -> Self { + Self { + state: GuardState::Limited { permit, quota }, + } + } + + /// Returns granted quota in bytes. + pub fn granted_bytes(&self) -> u64 { + match &self.state { + GuardState::Unlimited => 0, + GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32), + } + } + + /// Tries to allocate additional memory during task execution. + /// + /// On success, merges the new memory into this guard and returns true. + /// On failure, returns false and leaves this guard unchanged. + /// + /// # Behavior + /// - Running tasks can request additional memory on top of their initial allocation + /// - If total memory (all tasks) would exceed limit, returns false immediately + /// - The task should gracefully handle false by failing or adjusting its strategy + /// - The additional memory is merged into this guard and released together on drop + pub fn request_additional(&mut self, bytes: u64) -> bool { + match &mut self.state { + GuardState::Unlimited => true, + GuardState::Limited { permit, quota } => { + // Early return for zero-byte requests (no-op) + if bytes == 0 { + return true; + } + + let additional_permits = bytes_to_permits(bytes); + + // Try to acquire additional permits from the quota + match quota + .semaphore + .clone() + .try_acquire_many_owned(additional_permits) + { + Ok(additional_permit) => { + // Merge into main permit + permit.merge(additional_permit); + quota.update_in_use_metric(); + + debug!("Allocated additional {} bytes", bytes); + + true + } + Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { + COMPACTION_MEMORY_REJECTED + .with_label_values(&["request_additional"]) + .inc(); + false + } + } + } + } + } + + /// Releases a portion of granted memory back to the pool early, + /// before the guard is dropped. + /// + /// This is useful when a task's memory requirement decreases during execution + /// (e.g., after completing a memory-intensive phase). The guard remains valid + /// with reduced quota, and the task can continue running. + pub fn early_release_partial(&mut self, bytes: u64) -> bool { + match &mut self.state { + GuardState::Unlimited => true, + GuardState::Limited { permit, quota } => { + // Early return for zero-byte requests (no-op) + if bytes == 0 { + return true; + } + + let release_permits = bytes_to_permits(bytes); + + // Split out the permits we want to release + match permit.split(release_permits as usize) { + Some(released_permit) => { + let released_bytes = permits_to_bytes(released_permit.num_permits() as u32); + + // Drop the split permit to return it to the quota + drop(released_permit); + quota.update_in_use_metric(); + + debug!( + "Early released {} bytes from compaction memory guard", + released_bytes + ); + + true + } + None => { + // Requested release exceeds granted amount + false + } + } + } + } + } +} + +impl Drop for CompactionMemoryGuard { + fn drop(&mut self) { + if let GuardState::Limited { permit, quota } = + mem::replace(&mut self.state, GuardState::Unlimited) + { + let bytes = permits_to_bytes(permit.num_permits() as u32); + + // Release permits before updating metrics to reflect latest usage. + drop(permit); + quota.update_in_use_metric(); + + debug!("Released compaction memory: {} bytes", bytes); + } + } +} + +impl fmt::Debug for CompactionMemoryGuard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CompactionMemoryGuard") + .field("granted_bytes", &self.granted_bytes()) + .finish() + } +} + +fn bytes_to_permits(bytes: u64) -> u32 { + // Round up to the nearest permit. + // Returns 0 for bytes=0, which allows lazy allocation via request_additional(). + // Non-zero bytes always round up to at least 1 permit due to the math. + bytes + .saturating_add(PERMIT_GRANULARITY_BYTES - 1) + .saturating_div(PERMIT_GRANULARITY_BYTES) + .min(Semaphore::MAX_PERMITS as u64) + .min(u32::MAX as u64) as u32 +} + +fn permits_to_bytes(permits: u32) -> u64 { + (permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES) +} + +#[cfg(test)] +mod tests { + use tokio::time::{Duration, sleep}; + + use super::*; + + #[test] + fn test_try_acquire_unlimited() { + let manager = CompactionMemoryManager::new(0); + let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap(); + assert_eq!(manager.limit_bytes(), 0); + assert_eq!(guard.granted_bytes(), 0); + } + + #[test] + fn test_try_acquire_limited_success_and_release() { + let bytes = 2 * PERMIT_GRANULARITY_BYTES; + let manager = CompactionMemoryManager::new(bytes); + { + let guard = manager.try_acquire(PERMIT_GRANULARITY_BYTES).unwrap(); + assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), PERMIT_GRANULARITY_BYTES); + drop(guard); + } + assert_eq!(manager.used_bytes(), 0); + } + + #[test] + fn test_try_acquire_exceeds_limit() { + let limit = PERMIT_GRANULARITY_BYTES; + let manager = CompactionMemoryManager::new(limit); + let result = manager.try_acquire(limit + PERMIT_GRANULARITY_BYTES); + assert!(result.is_none()); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_acquire_blocks_and_unblocks() { + let bytes = 2 * PERMIT_GRANULARITY_BYTES; + let manager = CompactionMemoryManager::new(bytes); + let guard = manager.try_acquire(bytes).unwrap(); + + // Spawn a task that will block on acquire() + let waiter = { + let manager = manager.clone(); + tokio::spawn(async move { + // This will block until memory is available + let _guard = manager.acquire(bytes).await.unwrap(); + }) + }; + + sleep(Duration::from_millis(10)).await; + // Release memory - this should unblock the waiter + drop(guard); + + // Waiter should complete now + waiter.await.unwrap(); + } + + #[test] + fn test_request_additional_success() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit + let manager = CompactionMemoryManager::new(limit); + + // Acquire base quota (5MB) + let base = 5 * PERMIT_GRANULARITY_BYTES; + let mut guard = manager.try_acquire(base).unwrap(); + assert_eq!(guard.granted_bytes(), base); + assert_eq!(manager.used_bytes(), base); + + // Request additional memory (3MB) - should succeed and merge + assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + } + + #[test] + fn test_request_additional_exceeds_limit() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit + let manager = CompactionMemoryManager::new(limit); + + // Acquire base quota (5MB) + let base = 5 * PERMIT_GRANULARITY_BYTES; + let mut guard = manager.try_acquire(base).unwrap(); + + // Request additional memory (3MB) - should succeed + assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + + // Request more (3MB) - should fail (would exceed 10MB limit) + let result = guard.request_additional(3 * PERMIT_GRANULARITY_BYTES); + assert!(!result); + + // Still at 8MB + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + } + + #[test] + fn test_request_additional_auto_release_on_guard_drop() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; + let manager = CompactionMemoryManager::new(limit); + + { + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Request additional - memory is merged into guard + assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + + // When guard drops, all memory (base + additional) is released together + } + + // After scope, all memory should be released + assert_eq!(manager.used_bytes(), 0); + } + + #[test] + fn test_request_additional_unlimited() { + let manager = CompactionMemoryManager::new(0); // Unlimited + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Should always succeed with unlimited manager + assert!(guard.request_additional(100 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 0); + assert_eq!(manager.used_bytes(), 0); + } + + #[test] + fn test_request_additional_zero_bytes() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; + let manager = CompactionMemoryManager::new(limit); + + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Request 0 bytes should succeed without affecting anything + assert!(guard.request_additional(0)); + assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + } + + #[test] + fn test_early_release_partial_success() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; + let manager = CompactionMemoryManager::new(limit); + + let mut guard = manager.try_acquire(8 * PERMIT_GRANULARITY_BYTES).unwrap(); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + + // Release half + assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 4 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 4 * PERMIT_GRANULARITY_BYTES); + + // Released memory should be available to others + let _guard2 = manager.try_acquire(4 * PERMIT_GRANULARITY_BYTES).unwrap(); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + } + + #[test] + fn test_early_release_partial_exceeds_granted() { + let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES); + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Try to release more than granted - should fail + assert!(!guard.early_release_partial(10 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + } + + #[test] + fn test_early_release_partial_unlimited() { + let manager = CompactionMemoryManager::new(0); + let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Unlimited guard - release should succeed (no-op) + assert!(guard.early_release_partial(50 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 0); + } + + #[test] + fn test_request_and_early_release_symmetry() { + let limit = 20 * PERMIT_GRANULARITY_BYTES; + let manager = CompactionMemoryManager::new(limit); + + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Request additional + assert!(guard.request_additional(5 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES); + + // Early release some + assert!(guard.early_release_partial(3 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 7 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 7 * PERMIT_GRANULARITY_BYTES); + + // Request again + assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 9 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 9 * PERMIT_GRANULARITY_BYTES); + + // Early release again + assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + + drop(guard); + assert_eq!(manager.used_bytes(), 0); + } + + #[test] + fn test_small_allocation_rounds_up() { + // Test that allocations smaller than PERMIT_GRANULARITY_BYTES + // round up to 1 permit and can use request_additional() + let limit = 10 * PERMIT_GRANULARITY_BYTES; + let manager = CompactionMemoryManager::new(limit); + + let mut guard = manager.try_acquire(512 * 1024).unwrap(); // 512KB + assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); // Rounds up to 1MB + assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); // Can request more + assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES); + } + + #[test] + fn test_acquire_zero_bytes_lazy_allocation() { + // Test that acquire(0) returns 0 permits but can request_additional() later + let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES); + + let mut guard = manager.try_acquire(0).unwrap(); + assert_eq!(guard.granted_bytes(), 0); // No permits consumed + assert_eq!(manager.used_bytes(), 0); + + assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); // Lazy allocation + assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES); + } +} diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index c952f4ba97..831d01fa02 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -22,10 +22,13 @@ use snafu::ResultExt; use tokio::sync::mpsc; use crate::compaction::compactor::{CompactionRegion, Compactor}; +use crate::compaction::memory_manager::{ + CompactionMemoryGuard, CompactionMemoryManager, OnExhaustedPolicy, +}; use crate::compaction::picker::{CompactionTask, PickerOutput}; -use crate::error::CompactRegionSnafu; +use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; +use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED}; use crate::region::RegionRoleState; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult, @@ -52,6 +55,12 @@ pub(crate) struct CompactionTaskImpl { pub(crate) compactor: Arc, /// Output of the picker. pub(crate) picker_output: PickerOutput, + /// Memory manager to acquire memory budget. + pub(crate) memory_manager: Arc, + /// Policy when memory is exhausted. + pub(crate) memory_policy: OnExhaustedPolicy, + /// Estimated memory bytes needed for this compaction. + pub(crate) estimated_memory_bytes: u64, } impl Debug for CompactionTaskImpl { @@ -81,6 +90,86 @@ impl CompactionTaskImpl { .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting))); } + /// Acquires memory budget based on the configured policy. + /// + /// Returns an error if memory cannot be acquired according to the policy. + async fn acquire_memory_with_policy(&self) -> error::Result { + let region_id = self.compaction_region.region_id; + let requested_bytes = self.estimated_memory_bytes; + let limit_bytes = self.memory_manager.limit_bytes(); + + if limit_bytes > 0 && requested_bytes > limit_bytes { + warn!( + "Compaction for region {} requires {} bytes but limit is {} bytes; cannot satisfy request", + region_id, requested_bytes, limit_bytes + ); + return Err(CompactionMemoryExhaustedSnafu { + region_id, + required_bytes: requested_bytes, + limit_bytes, + policy: "exceed_limit".to_string(), + } + .build()); + } + + match self.memory_policy { + OnExhaustedPolicy::Wait { + timeout: wait_timeout, + } => { + let timer = COMPACTION_MEMORY_WAIT.start_timer(); + + match tokio::time::timeout( + wait_timeout, + self.memory_manager.acquire(requested_bytes), + ) + .await + { + Ok(Ok(guard)) => { + timer.observe_duration(); + Ok(guard) + } + Ok(Err(e)) => { + timer.observe_duration(); + Err(e) + } + Err(_) => { + timer.observe_duration(); + warn!( + "Compaction for region {} waited {:?} for {} bytes but timed out", + region_id, wait_timeout, requested_bytes + ); + CompactionMemoryExhaustedSnafu { + region_id, + required_bytes: requested_bytes, + limit_bytes, + policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()), + } + .fail() + } + } + } + OnExhaustedPolicy::Skip => { + // Try to acquire, skip if not immediately available + self.memory_manager + .try_acquire(requested_bytes) + .ok_or_else(|| { + warn!( + "Skipping compaction for region {} due to memory limit \ + (need {} bytes, limit {} bytes)", + region_id, requested_bytes, limit_bytes + ); + CompactionMemoryExhaustedSnafu { + region_id, + required_bytes: requested_bytes, + limit_bytes, + policy: "skip".to_string(), + } + .build() + }) + } + } + } + /// Remove expired ssts files, update manifest immediately /// and apply the edit to region version. /// @@ -222,7 +311,7 @@ impl CompactionTaskImpl { } /// Handles compaction failure, notifies all waiters. - fn on_failure(&mut self, err: Arc) { + pub(crate) fn on_failure(&mut self, err: Arc) { COMPACTION_FAILURE_COUNT.inc(); for waiter in self.waiters.drain(..) { waiter.send(Err(err.clone()).context(CompactRegionSnafu { @@ -249,6 +338,26 @@ impl CompactionTaskImpl { #[async_trait::async_trait] impl CompactionTask for CompactionTaskImpl { async fn run(&mut self) { + // Acquire memory budget before starting compaction + let _memory_guard = match self.acquire_memory_with_policy().await { + Ok(guard) => guard, + Err(e) => { + error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id); + let err = Arc::new(e); + self.on_failure(err.clone()); + let notify = BackgroundNotify::CompactionFailed(CompactionFailed { + region_id: self.compaction_region.region_id, + err, + }); + self.send_to_worker(WorkerRequest::Background { + region_id: self.compaction_region.region_id, + notify, + }) + .await; + return; + } + }; + let notify = match self.handle_expiration_and_compaction().await { Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished { region_id: self.compaction_region.region_id, diff --git a/src/mito2/src/compaction/test_util.rs b/src/mito2/src/compaction/test_util.rs index d5cf251a68..6061e294bd 100644 --- a/src/mito2/src/compaction/test_util.rs +++ b/src/mito2/src/compaction/test_util.rs @@ -74,6 +74,7 @@ pub fn new_file_handle_with_size_and_sequence( ), level, file_size, + max_row_group_uncompressed_size: file_size, available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 53cc745fe5..11ce9de9ea 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -26,6 +26,7 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT; +use crate::compaction::memory_manager::OnExhaustedPolicy; use crate::error::Result; use crate::gc::GcConfig; use crate::memtable::MemtableConfig; @@ -92,6 +93,10 @@ pub struct MitoConfig { pub max_background_compactions: usize, /// Max number of running background purge jobs (default: number of cpu cores). pub max_background_purges: usize, + /// Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. + pub experimental_compaction_memory_limit: MemoryLimit, + /// Behavior when compaction cannot acquire memory from the budget. + pub experimental_compaction_on_exhausted: OnExhaustedPolicy, // Flush configs: /// Interval to auto flush a region if it has not flushed yet (default 30 min). @@ -178,6 +183,8 @@ impl Default for MitoConfig { max_background_flushes: divide_num_cpus(2), max_background_compactions: divide_num_cpus(4), max_background_purges: get_total_cpu_cores(), + experimental_compaction_memory_limit: MemoryLimit::Unlimited, + experimental_compaction_on_exhausted: OnExhaustedPolicy::default(), auto_flush_interval: Duration::from_secs(30 * 60), global_write_buffer_size: ReadableSize::gb(1), global_write_buffer_reject_size: ReadableSize::gb(2), diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d357c68774..90a735381b 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1041,6 +1041,40 @@ pub enum Error { #[snafu(display("Manual compaction is override by following operations."))] ManualCompactionOverride {}, + #[snafu(display( + "Compaction memory limit exceeded for region {}: required {} bytes, limit {} bytes (policy: {})", + region_id, + required_bytes, + limit_bytes, + policy + ))] + CompactionMemoryExhausted { + region_id: RegionId, + required_bytes: u64, + limit_bytes: u64, + policy: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Requested compaction memory ({} bytes) exceeds total limit ({} bytes)", + requested_bytes, + limit_bytes + ))] + CompactionMemoryLimitExceeded { + requested_bytes: u64, + limit_bytes: u64, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Compaction memory semaphore unexpectedly closed"))] + CompactionMemorySemaphoreClosed { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Incompatible WAL provider change. This is typically caused by changing WAL provider in database config file without completely cleaning existing files. Global provider: {}, region provider: {}", global, @@ -1323,6 +1357,12 @@ impl ErrorExt for Error { ManualCompactionOverride {} => StatusCode::Cancelled, + CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted, + + CompactionMemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted, + + CompactionMemorySemaphoreClosed { .. } => StatusCode::Unexpected, + IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments, ScanSeries { source, .. } => source.status_code(), diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 8371fd52c7..695f919979 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -640,6 +640,7 @@ impl RegionFlushTask { time_range: sst_info.time_range, level: 0, file_size: sst_info.file_size, + max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size, available_indexes: sst_info.index_metadata.build_available_indexes(), indexes: sst_info.index_metadata.build_indexes(), index_file_size: sst_info.index_metadata.file_size, diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index d709bb96f0..c584ae3276 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -244,6 +244,7 @@ async fn checkpoint_with_different_compression_types() { time_range: (0.into(), 10000000.into()), level: 0, file_size: 1024000, + max_row_group_uncompressed_size: 1024000, available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, @@ -309,6 +310,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec) time_range: (0.into(), 10000000.into()), level: 0, file_size: 1024000, + max_row_group_uncompressed_size: 1024000, available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 6aedf9a5d7..4bf6d0fadb 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -974,6 +974,19 @@ impl EncodedBulkPart { /// Returns a `SstInfo` instance with information derived from this bulk part's metadata pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo { let unit = self.metadata.region_metadata.time_index_type().unit(); + let max_row_group_uncompressed_size: u64 = self + .metadata + .parquet_metadata + .row_groups() + .iter() + .map(|rg| { + rg.columns() + .iter() + .map(|c| c.uncompressed_size() as u64) + .sum::() + }) + .max() + .unwrap_or(0); SstInfo { file_id, time_range: ( @@ -981,6 +994,7 @@ impl EncodedBulkPart { Timestamp::new(self.metadata.max_timestamp, unit), ), file_size: self.data.len() as u64, + max_row_group_uncompressed_size, num_rows: self.metadata.num_rows, num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64, file_metadata: Some(self.metadata.parquet_metadata.clone()), diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 1e35ae1c06..be5f4945fd 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -157,6 +157,35 @@ lazy_static! { "greptime_mito_inflight_compaction_count", "inflight compaction count", ).unwrap(); + + /// Bytes reserved by compaction memory manager. + pub static ref COMPACTION_MEMORY_IN_USE: IntGauge = + register_int_gauge!( + "greptime_mito_compaction_memory_in_use_bytes", + "bytes currently reserved for compaction tasks", + ) + .unwrap(); + /// Configured compaction memory limit. + pub static ref COMPACTION_MEMORY_LIMIT: IntGauge = + register_int_gauge!( + "greptime_mito_compaction_memory_limit_bytes", + "maximum bytes allowed for compaction tasks", + ) + .unwrap(); + /// Wait time to obtain compaction memory. + pub static ref COMPACTION_MEMORY_WAIT: Histogram = register_histogram!( + "greptime_mito_compaction_memory_wait_seconds", + "time waiting for compaction memory", + // 0.01s ~ ~10s + exponential_buckets(0.01, 2.0, 10).unwrap(), + ).unwrap(); + /// Counter of rejected compaction memory allocations. + pub static ref COMPACTION_MEMORY_REJECTED: IntCounterVec = + register_int_counter_vec!( + "greptime_mito_compaction_memory_rejected_total", + "number of compaction tasks rejected due to memory limit", + &[TYPE_LABEL] + ).unwrap(); } // Query metrics. diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs index 1191349b04..59920ad945 100644 --- a/src/mito2/src/remap_manifest.rs +++ b/src/mito2/src/remap_manifest.rs @@ -425,6 +425,7 @@ mod tests { time_range: FileTimeRange::default(), level: 0, file_size: 1024, + max_row_group_uncompressed_size: 1024, available_indexes: SmallVec::new(), indexes: Default::default(), index_file_size: 0, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 7ab8ee132b..94209d7b0c 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -180,6 +180,8 @@ pub struct FileMeta { pub level: Level, /// Size of the file. pub file_size: u64, + /// Maximum uncompressed row group size of the file. 0 means unknown. + pub max_row_group_uncompressed_size: u64, /// Available indexes of the file. pub available_indexes: IndexTypes, /// Created indexes of the file for each column. @@ -248,7 +250,11 @@ impl Debug for FileMeta { ) }) .field("level", &self.level) - .field("file_size", &ReadableSize(self.file_size)); + .field("file_size", &ReadableSize(self.file_size)) + .field( + "max_row_group_uncompressed_size", + &ReadableSize(self.max_row_group_uncompressed_size), + ); if !self.available_indexes.is_empty() { debug_struct .field("available_indexes", &self.available_indexes) @@ -652,6 +658,7 @@ mod tests { time_range: FileTimeRange::default(), level, file_size: 0, + max_row_group_uncompressed_size: 0, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), indexes: vec![ColumnIndexMetadata { column_id: 0, @@ -703,6 +710,7 @@ mod tests { time_range: FileTimeRange::default(), level: 0, file_size: 0, + max_row_group_uncompressed_size: 0, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), indexes: vec![ColumnIndexMetadata { column_id: 0, diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 2715223a51..5b23f3e069 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -251,6 +251,7 @@ mod tests { time_range: FileTimeRange::default(), level: 0, file_size: 4096, + max_row_group_uncompressed_size: 4096, available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, @@ -321,6 +322,7 @@ mod tests { time_range: FileTimeRange::default(), level: 0, file_size: 4096, + max_row_group_uncompressed_size: 4096, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), indexes: vec![ColumnIndexMetadata { column_id: 0, diff --git a/src/mito2/src/sst/file_ref.rs b/src/mito2/src/sst/file_ref.rs index 5cb7dc8c88..9e69bd42cf 100644 --- a/src/mito2/src/sst/file_ref.rs +++ b/src/mito2/src/sst/file_ref.rs @@ -224,6 +224,7 @@ mod tests { time_range: FileTimeRange::default(), level: 0, file_size: 4096, + max_row_group_uncompressed_size: 4096, available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]), indexes: vec![ColumnIndexMetadata { column_id: 0, diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index ecd8f87343..f17d11aff7 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -1549,6 +1549,7 @@ mod tests { region_id, file_id: sst_info.file_id, file_size: sst_info.file_size, + max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size, index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, @@ -1623,6 +1624,7 @@ mod tests { region_id, file_id: sst_info.file_id, file_size: sst_info.file_size, + max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size, index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, @@ -1726,6 +1728,7 @@ mod tests { region_id, file_id: sst_info.file_id, file_size: sst_info.file_size, + max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size, index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 37918553d5..21187bedd3 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -76,6 +76,8 @@ pub struct SstInfo { pub time_range: FileTimeRange, /// File size in bytes. pub file_size: u64, + /// Maximum uncompressed row group size in bytes. 0 if unknown. + pub max_row_group_uncompressed_size: u64, /// Number of rows. pub num_rows: usize, /// Number of row groups @@ -771,6 +773,7 @@ mod tests { time_range: info.time_range, level: 0, file_size: info.file_size, + max_row_group_uncompressed_size: info.max_row_group_uncompressed_size, available_indexes: info.index_metadata.build_available_indexes(), indexes: info.index_metadata.build_indexes(), index_file_size: info.index_metadata.file_size, diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index e30c7fa0b8..8c03a51368 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -213,11 +213,23 @@ where // convert FileMetaData to ParquetMetaData let parquet_metadata = parse_parquet_metadata(file_meta)?; + let max_row_group_uncompressed_size: u64 = parquet_metadata + .row_groups() + .iter() + .map(|rg| { + rg.columns() + .iter() + .map(|c| c.uncompressed_size() as u64) + .sum::() + }) + .max() + .unwrap_or(0); let num_series = stats.series_estimator.finish(); ssts.push(SstInfo { file_id: self.current_file, time_range, file_size, + max_row_group_uncompressed_size, num_rows: stats.num_rows, num_row_groups: parquet_metadata.num_row_groups() as u64, file_metadata: Some(Arc::new(parquet_metadata)), diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index b481acf317..bdf8ca05e6 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -28,6 +28,7 @@ use tokio::sync::mpsc::Sender; use crate::access_layer::{AccessLayer, AccessLayerRef}; use crate::cache::CacheManager; use crate::compaction::CompactionScheduler; +use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy}; use crate::config::MitoConfig; use crate::error::Result; use crate::flush::FlushScheduler; @@ -100,6 +101,8 @@ impl SchedulerEnv { Arc::new(MitoConfig::default()), WorkerListener::default(), Plugins::new(), + Arc::new(CompactionMemoryManager::new(0)), + OnExhaustedPolicy::default(), ) } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index d92c628067..9f7b6a0658 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -123,6 +123,7 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64) ), level: 0, file_size: 0, + max_row_group_uncompressed_size: 0, available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index b381b16c05..7a407ece4f 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -101,6 +101,7 @@ impl VersionControlBuilder { ), level: 0, file_size: 0, // We don't care file size. + max_row_group_uncompressed_size: 0, available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, @@ -192,6 +193,7 @@ pub(crate) fn apply_edit( ), level: 0, file_size: 0, // We don't care file size. + max_row_group_uncompressed_size: 0, available_indexes: Default::default(), indexes: Default::default(), index_file_size: 0, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 05114d63a1..69f41e905a 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -41,6 +41,7 @@ use common_base::readable_size::ReadableSize; use common_error::ext::BoxedError; use common_meta::key::SchemaMetadataManagerRef; use common_runtime::JoinHandle; +use common_stat::get_total_memory_bytes; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; use object_store::manager::ObjectStoreManagerRef; @@ -58,6 +59,7 @@ use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch}; use crate::cache::write_cache::{WriteCache, WriteCacheRef}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::CompactionScheduler; +use crate::compaction::memory_manager::CompactionMemoryManager; use crate::config::MitoConfig; use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; @@ -205,6 +207,17 @@ impl WorkerGroup { .build(), ); let time_provider = Arc::new(StdTimeProvider); + let total_memory = get_total_memory_bytes(); + let total_memory = if total_memory > 0 { + total_memory as u64 + } else { + 0 + }; + let compaction_limit_bytes = config + .experimental_compaction_memory_limit + .resolve(total_memory); + let compaction_memory_manager = + Arc::new(CompactionMemoryManager::new(compaction_limit_bytes)); let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job)); let workers = (0..config.num_workers) @@ -221,6 +234,7 @@ impl WorkerGroup { purge_scheduler: purge_scheduler.clone(), listener: WorkerListener::default(), cache_manager: cache_manager.clone(), + compaction_memory_manager: compaction_memory_manager.clone(), puffin_manager_factory: puffin_manager_factory.clone(), intermediate_manager: intermediate_manager.clone(), time_provider: time_provider.clone(), @@ -381,6 +395,17 @@ impl WorkerGroup { .write_cache(write_cache) .build(), ); + let total_memory = get_total_memory_bytes(); + let total_memory = if total_memory > 0 { + total_memory as u64 + } else { + 0 + }; + let compaction_limit_bytes = config + .experimental_compaction_memory_limit + .resolve(total_memory); + let compaction_memory_manager = + Arc::new(CompactionMemoryManager::new(compaction_limit_bytes)); let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job)); let workers = (0..config.num_workers) .map(|id| { @@ -396,6 +421,7 @@ impl WorkerGroup { purge_scheduler: purge_scheduler.clone(), listener: WorkerListener::new(listener.clone()), cache_manager: cache_manager.clone(), + compaction_memory_manager: compaction_memory_manager.clone(), puffin_manager_factory: puffin_manager_factory.clone(), intermediate_manager: intermediate_manager.clone(), time_provider: time_provider.clone(), @@ -482,6 +508,7 @@ struct WorkerStarter { purge_scheduler: SchedulerRef, listener: WorkerListener, cache_manager: CacheManagerRef, + compaction_memory_manager: Arc, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, time_provider: TimeProviderRef, @@ -534,9 +561,11 @@ impl WorkerStarter { self.compact_job_pool, sender.clone(), self.cache_manager.clone(), - self.config, + self.config.clone(), self.listener.clone(), self.plugins.clone(), + self.compaction_memory_manager.clone(), + self.config.experimental_compaction_on_exhausted, ), stalled_requests: StalledRequests::default(), listener: self.listener, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7192396db5..e91f2c7fac 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1495,6 +1495,8 @@ manifest_checkpoint_distance = 10 experimental_manifest_keep_removed_file_count = 256 experimental_manifest_keep_removed_file_ttl = "1h" compress_manifest = false +experimental_compaction_memory_limit = "unlimited" +experimental_compaction_on_exhausted = "wait" auto_flush_interval = "30m" enable_write_cache = false write_cache_path = ""