From 32f9cc528692bb88d0dd15b561f344409da88820 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 15 Dec 2025 21:15:33 +0800 Subject: [PATCH] feat: move memory_manager to common crate (#7408) * feat: move memory_manager to common crate Signed-off-by: jeremyhi * chore: add license header Signed-off-by: jeremyhi * fix: by AI comment Signed-off-by: jeremyhi --------- Signed-off-by: jeremyhi --- Cargo.lock | 14 + Cargo.toml | 2 + config/config.md | 4 +- config/datanode.example.toml | 2 +- config/standalone.example.toml | 2 +- src/common/memory-manager/Cargo.toml | 20 + src/common/memory-manager/src/error.rs | 53 ++ src/common/memory-manager/src/guard.rs | 138 +++++ src/common/memory-manager/src/lib.rs | 47 ++ src/common/memory-manager/src/manager.rs | 173 ++++++ src/common/memory-manager/src/policy.rs | 83 +++ src/common/memory-manager/src/tests.rs | 247 ++++++++ src/mito2/Cargo.toml | 1 + src/mito2/src/compaction.rs | 8 +- src/mito2/src/compaction/memory_manager.rs | 636 +-------------------- src/mito2/src/compaction/task.rs | 21 +- src/mito2/src/config.rs | 2 +- src/mito2/src/error.rs | 31 +- src/mito2/src/test_util/scheduler_util.rs | 5 +- src/mito2/src/worker.rs | 6 +- 20 files changed, 838 insertions(+), 657 deletions(-) create mode 100644 src/common/memory-manager/Cargo.toml create mode 100644 src/common/memory-manager/src/error.rs create mode 100644 src/common/memory-manager/src/guard.rs create mode 100644 src/common/memory-manager/src/lib.rs create mode 100644 src/common/memory-manager/src/manager.rs create mode 100644 src/common/memory-manager/src/policy.rs create mode 100644 src/common/memory-manager/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 6781ac3895..bdf431f7ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2328,6 +2328,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "common-memory-manager" +version = "1.0.0-beta.2" +dependencies = [ + "common-error", + "common-macro", + "common-telemetry", + "humantime", + "serde", + "snafu 0.8.6", + "tokio", +] + [[package]] name = "common-meta" version = "1.0.0-beta.2" @@ -7651,6 +7664,7 @@ dependencies = [ "common-function", "common-grpc", "common-macro", + "common-memory-manager", "common-meta", "common-query", "common-recordbatch", diff --git a/Cargo.toml b/Cargo.toml index c327702ca8..a8a09ae993 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "src/common/grpc-expr", "src/common/macro", "src/common/mem-prof", + "src/common/memory-manager", "src/common/meta", "src/common/options", "src/common/plugins", @@ -266,6 +267,7 @@ common-grpc = { path = "src/common/grpc" } common-grpc-expr = { path = "src/common/grpc-expr" } common-macro = { path = "src/common/macro" } common-mem-prof = { path = "src/common/mem-prof" } +common-memory-manager = { path = "src/common/memory-manager" } common-meta = { path = "src/common/meta" } common-options = { path = "src/common/options" } common-plugins = { path = "src/common/plugins" } diff --git a/config/config.md b/config/config.md index dc663cead4..56eda451a4 100644 --- a/config/config.md +++ b/config/config.md @@ -142,7 +142,7 @@ | `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). | | `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). | | `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. | -| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "skip" | +| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "fail" | | `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. | | `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. | | `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. | @@ -524,7 +524,7 @@ | `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). | | `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). | | `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. | -| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "skip" | +| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "fail" | | `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. | | `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. | | `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 0159fc237b..d159d52ae1 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -457,7 +457,7 @@ compress_manifest = false #+ experimental_compaction_memory_limit = "0" ## Behavior when compaction cannot acquire memory from the budget. -## Options: "wait" (default, 10s), "wait()", "skip" +## Options: "wait" (default, 10s), "wait()", "fail" ## @toml2docs:none-default="wait" #+ experimental_compaction_on_exhausted = "wait" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 653b0cb4af..ff0b8c3d5e 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -551,7 +551,7 @@ compress_manifest = false #+ experimental_compaction_memory_limit = "0" ## Behavior when compaction cannot acquire memory from the budget. -## Options: "wait" (default, 10s), "wait()", "skip" +## Options: "wait" (default, 10s), "wait()", "fail" ## @toml2docs:none-default="wait" #+ experimental_compaction_on_exhausted = "wait" diff --git a/src/common/memory-manager/Cargo.toml b/src/common/memory-manager/Cargo.toml new file mode 100644 index 0000000000..a6be50f774 --- /dev/null +++ b/src/common/memory-manager/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "common-memory-manager" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +common-error = { workspace = true } +common-macro = { workspace = true } +common-telemetry = { workspace = true } +humantime = { workspace = true } +serde = { workspace = true } +snafu = { workspace = true } +tokio = { workspace = true, features = ["sync"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["rt", "macros"] } diff --git a/src/common/memory-manager/src/error.rs b/src/common/memory-manager/src/error.rs new file mode 100644 index 0000000000..5ff7d74ad6 --- /dev/null +++ b/src/common/memory-manager/src/error.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::Snafu; + +pub type Result = std::result::Result; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display( + "Memory limit exceeded: requested {requested_bytes} bytes, limit {limit_bytes} bytes" + ))] + MemoryLimitExceeded { + requested_bytes: u64, + limit_bytes: u64, + }, + + #[snafu(display("Memory semaphore unexpectedly closed"))] + MemorySemaphoreClosed, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + MemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted, + MemorySemaphoreClosed => StatusCode::Unexpected, + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/memory-manager/src/guard.rs b/src/common/memory-manager/src/guard.rs new file mode 100644 index 0000000000..e72e16ab5b --- /dev/null +++ b/src/common/memory-manager/src/guard.rs @@ -0,0 +1,138 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{fmt, mem}; + +use common_telemetry::debug; +use tokio::sync::{OwnedSemaphorePermit, TryAcquireError}; + +use crate::manager::{MemoryMetrics, MemoryQuota, bytes_to_permits, permits_to_bytes}; + +/// Guard representing a slice of reserved memory. +pub struct MemoryGuard { + pub(crate) state: GuardState, +} + +pub(crate) enum GuardState { + Unlimited, + Limited { + permit: OwnedSemaphorePermit, + quota: MemoryQuota, + }, +} + +impl MemoryGuard { + pub(crate) fn unlimited() -> Self { + Self { + state: GuardState::Unlimited, + } + } + + pub(crate) fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota) -> Self { + Self { + state: GuardState::Limited { permit, quota }, + } + } + + /// Returns granted quota in bytes. + pub fn granted_bytes(&self) -> u64 { + match &self.state { + GuardState::Unlimited => 0, + GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32), + } + } + + /// Tries to allocate additional memory during task execution. + /// + /// On success, merges the new memory into this guard and returns true. + /// On failure, returns false and leaves this guard unchanged. + pub fn request_additional(&mut self, bytes: u64) -> bool { + match &mut self.state { + GuardState::Unlimited => true, + GuardState::Limited { permit, quota } => { + if bytes == 0 { + return true; + } + + let additional_permits = bytes_to_permits(bytes); + + match quota + .semaphore + .clone() + .try_acquire_many_owned(additional_permits) + { + Ok(additional_permit) => { + permit.merge(additional_permit); + quota.update_in_use_metric(); + debug!("Allocated additional {} bytes", bytes); + true + } + Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { + quota.metrics.inc_rejected("request_additional"); + false + } + } + } + } + } + + /// Releases a portion of granted memory back to the pool early, + /// before the guard is dropped. + /// + /// Returns true if the release succeeds or is a no-op; false if the request exceeds granted. + pub fn early_release_partial(&mut self, bytes: u64) -> bool { + match &mut self.state { + GuardState::Unlimited => true, + GuardState::Limited { permit, quota } => { + if bytes == 0 { + return true; + } + + let release_permits = bytes_to_permits(bytes); + + match permit.split(release_permits as usize) { + Some(released_permit) => { + let released_bytes = permits_to_bytes(released_permit.num_permits() as u32); + drop(released_permit); + quota.update_in_use_metric(); + debug!("Early released {} bytes from memory guard", released_bytes); + true + } + None => false, + } + } + } + } +} + +impl Drop for MemoryGuard { + fn drop(&mut self) { + if let GuardState::Limited { permit, quota } = + mem::replace(&mut self.state, GuardState::Unlimited) + { + let bytes = permits_to_bytes(permit.num_permits() as u32); + drop(permit); + quota.update_in_use_metric(); + debug!("Released memory: {} bytes", bytes); + } + } +} + +impl fmt::Debug for MemoryGuard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MemoryGuard") + .field("granted_bytes", &self.granted_bytes()) + .finish() + } +} diff --git a/src/common/memory-manager/src/lib.rs b/src/common/memory-manager/src/lib.rs new file mode 100644 index 0000000000..61d52f6366 --- /dev/null +++ b/src/common/memory-manager/src/lib.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Generic memory management for resource-constrained operations. +//! +//! This crate provides a reusable memory quota system based on semaphores, +//! allowing different subsystems (compaction, flush, index build, etc.) to +//! share the same allocation logic while using their own metrics. + +mod error; +mod guard; +mod manager; +mod policy; + +#[cfg(test)] +mod tests; + +pub use error::{Error, Result}; +pub use guard::MemoryGuard; +pub use manager::{MemoryManager, MemoryMetrics, PERMIT_GRANULARITY_BYTES}; +pub use policy::{DEFAULT_MEMORY_WAIT_TIMEOUT, OnExhaustedPolicy}; + +/// No-op metrics implementation for testing. +#[derive(Clone, Copy, Debug, Default)] +pub struct NoOpMetrics; + +impl MemoryMetrics for NoOpMetrics { + #[inline(always)] + fn set_limit(&self, _: i64) {} + + #[inline(always)] + fn set_in_use(&self, _: i64) {} + + #[inline(always)] + fn inc_rejected(&self, _: &str) {} +} diff --git a/src/common/memory-manager/src/manager.rs b/src/common/memory-manager/src/manager.rs new file mode 100644 index 0000000000..8cc7b937e4 --- /dev/null +++ b/src/common/memory-manager/src/manager.rs @@ -0,0 +1,173 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use snafu::ensure; +use tokio::sync::{Semaphore, TryAcquireError}; + +use crate::error::{MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result}; +use crate::guard::MemoryGuard; + +/// Minimum bytes controlled by one semaphore permit. +pub const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB + +/// Trait for recording memory usage metrics. +pub trait MemoryMetrics: Clone + Send + Sync + 'static { + fn set_limit(&self, bytes: i64); + fn set_in_use(&self, bytes: i64); + fn inc_rejected(&self, reason: &str); +} + +/// Generic memory manager for quota-controlled operations. +#[derive(Clone)] +pub struct MemoryManager { + quota: Option>, +} + +#[derive(Clone)] +pub(crate) struct MemoryQuota { + pub(crate) semaphore: Arc, + pub(crate) limit_permits: u32, + pub(crate) metrics: M, +} + +impl MemoryManager { + /// Creates a new memory manager with the given limit in bytes. + /// `limit_bytes = 0` disables the limit. + pub fn new(limit_bytes: u64, metrics: M) -> Self { + if limit_bytes == 0 { + metrics.set_limit(0); + return Self { quota: None }; + } + + let limit_permits = bytes_to_permits(limit_bytes); + let limit_aligned_bytes = permits_to_bytes(limit_permits); + metrics.set_limit(limit_aligned_bytes as i64); + + Self { + quota: Some(MemoryQuota { + semaphore: Arc::new(Semaphore::new(limit_permits as usize)), + limit_permits, + metrics, + }), + } + } + + /// Returns the configured limit in bytes (0 if unlimited). + pub fn limit_bytes(&self) -> u64 { + self.quota + .as_ref() + .map(|quota| permits_to_bytes(quota.limit_permits)) + .unwrap_or(0) + } + + /// Returns currently used bytes. + pub fn used_bytes(&self) -> u64 { + self.quota + .as_ref() + .map(|quota| permits_to_bytes(quota.used_permits())) + .unwrap_or(0) + } + + /// Returns available bytes. + pub fn available_bytes(&self) -> u64 { + self.quota + .as_ref() + .map(|quota| permits_to_bytes(quota.available_permits_clamped())) + .unwrap_or(0) + } + + /// Acquires memory, waiting if necessary until enough is available. + /// + /// # Errors + /// - Returns error if requested bytes exceed the total limit + /// - Returns error if the semaphore is unexpectedly closed + pub async fn acquire(&self, bytes: u64) -> Result> { + match &self.quota { + None => Ok(MemoryGuard::unlimited()), + Some(quota) => { + let permits = bytes_to_permits(bytes); + + ensure!( + permits <= quota.limit_permits, + MemoryLimitExceededSnafu { + requested_bytes: bytes, + limit_bytes: permits_to_bytes(quota.limit_permits), + } + ); + + let permit = quota + .semaphore + .clone() + .acquire_many_owned(permits) + .await + .map_err(|_| MemorySemaphoreClosedSnafu.build())?; + quota.update_in_use_metric(); + Ok(MemoryGuard::limited(permit, quota.clone())) + } + } + } + + /// Tries to acquire memory. Returns Some(guard) on success, None if insufficient. + pub fn try_acquire(&self, bytes: u64) -> Option> { + match &self.quota { + None => Some(MemoryGuard::unlimited()), + Some(quota) => { + let permits = bytes_to_permits(bytes); + + match quota.semaphore.clone().try_acquire_many_owned(permits) { + Ok(permit) => { + quota.update_in_use_metric(); + Some(MemoryGuard::limited(permit, quota.clone())) + } + Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { + quota.metrics.inc_rejected("try_acquire"); + None + } + } + } + } + } +} + +impl MemoryQuota { + pub(crate) fn used_permits(&self) -> u32 { + self.limit_permits + .saturating_sub(self.available_permits_clamped()) + } + + pub(crate) fn available_permits_clamped(&self) -> u32 { + self.semaphore + .available_permits() + .min(self.limit_permits as usize) as u32 + } + + pub(crate) fn update_in_use_metric(&self) { + let bytes = permits_to_bytes(self.used_permits()); + self.metrics.set_in_use(bytes as i64); + } +} + +pub(crate) fn bytes_to_permits(bytes: u64) -> u32 { + bytes + .saturating_add(PERMIT_GRANULARITY_BYTES - 1) + .saturating_div(PERMIT_GRANULARITY_BYTES) + .min(Semaphore::MAX_PERMITS as u64) + .min(u32::MAX as u64) as u32 +} + +pub(crate) fn permits_to_bytes(permits: u32) -> u64 { + (permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES) +} diff --git a/src/common/memory-manager/src/policy.rs b/src/common/memory-manager/src/policy.rs new file mode 100644 index 0000000000..3f19568b8f --- /dev/null +++ b/src/common/memory-manager/src/policy.rs @@ -0,0 +1,83 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use humantime::{format_duration, parse_duration}; +use serde::{Deserialize, Serialize}; + +/// Default wait timeout for memory acquisition. +pub const DEFAULT_MEMORY_WAIT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Defines how to react when memory cannot be acquired immediately. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OnExhaustedPolicy { + /// Wait until enough memory is released, bounded by timeout. + Wait { timeout: Duration }, + + /// Fail immediately if memory is not available. + Fail, +} + +impl Default for OnExhaustedPolicy { + fn default() -> Self { + OnExhaustedPolicy::Wait { + timeout: DEFAULT_MEMORY_WAIT_TIMEOUT, + } + } +} + +impl Serialize for OnExhaustedPolicy { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let text = match self { + OnExhaustedPolicy::Fail => "fail".to_string(), + OnExhaustedPolicy::Wait { timeout } if *timeout == DEFAULT_MEMORY_WAIT_TIMEOUT => { + "wait".to_string() + } + OnExhaustedPolicy::Wait { timeout } => format!("wait({})", format_duration(*timeout)), + }; + serializer.serialize_str(&text) + } +} + +impl<'de> Deserialize<'de> for OnExhaustedPolicy { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let raw = String::deserialize(deserializer)?; + let lower = raw.to_ascii_lowercase(); + + // Accept both "skip" (legacy) and "fail". + if lower == "skip" || lower == "fail" { + return Ok(OnExhaustedPolicy::Fail); + } + if lower == "wait" { + return Ok(OnExhaustedPolicy::default()); + } + if lower.starts_with("wait(") && lower.ends_with(')') { + let inner = &raw[5..raw.len() - 1]; + let timeout = parse_duration(inner).map_err(serde::de::Error::custom)?; + return Ok(OnExhaustedPolicy::Wait { timeout }); + } + + Err(serde::de::Error::custom(format!( + "invalid memory policy: {}, expected wait, wait(), fail", + raw + ))) + } +} diff --git a/src/common/memory-manager/src/tests.rs b/src/common/memory-manager/src/tests.rs new file mode 100644 index 0000000000..3a928f9c7c --- /dev/null +++ b/src/common/memory-manager/src/tests.rs @@ -0,0 +1,247 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tokio::time::{Duration, sleep}; + +use crate::{MemoryManager, NoOpMetrics, PERMIT_GRANULARITY_BYTES}; + +#[test] +fn test_try_acquire_unlimited() { + let manager = MemoryManager::new(0, NoOpMetrics); + let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap(); + assert_eq!(manager.limit_bytes(), 0); + assert_eq!(guard.granted_bytes(), 0); +} + +#[test] +fn test_try_acquire_limited_success_and_release() { + let bytes = 2 * PERMIT_GRANULARITY_BYTES; + let manager = MemoryManager::new(bytes, NoOpMetrics); + { + let guard = manager.try_acquire(PERMIT_GRANULARITY_BYTES).unwrap(); + assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), PERMIT_GRANULARITY_BYTES); + drop(guard); + } + assert_eq!(manager.used_bytes(), 0); +} + +#[test] +fn test_try_acquire_exceeds_limit() { + let limit = PERMIT_GRANULARITY_BYTES; + let manager = MemoryManager::new(limit, NoOpMetrics); + let result = manager.try_acquire(limit + PERMIT_GRANULARITY_BYTES); + assert!(result.is_none()); +} + +#[tokio::test(flavor = "current_thread")] +async fn test_acquire_blocks_and_unblocks() { + let bytes = 2 * PERMIT_GRANULARITY_BYTES; + let manager = MemoryManager::new(bytes, NoOpMetrics); + let guard = manager.try_acquire(bytes).unwrap(); + + // Spawn a task that will block on acquire() + let waiter = { + let manager = manager.clone(); + tokio::spawn(async move { + // This will block until memory is available + let _guard = manager.acquire(bytes).await.unwrap(); + }) + }; + + sleep(Duration::from_millis(10)).await; + // Release memory - this should unblock the waiter + drop(guard); + + // Waiter should complete now + waiter.await.unwrap(); +} + +#[test] +fn test_request_additional_success() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit + let manager = MemoryManager::new(limit, NoOpMetrics); + + // Acquire base quota (5MB) + let base = 5 * PERMIT_GRANULARITY_BYTES; + let mut guard = manager.try_acquire(base).unwrap(); + assert_eq!(guard.granted_bytes(), base); + assert_eq!(manager.used_bytes(), base); + + // Request additional memory (3MB) - should succeed and merge + assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); +} + +#[test] +fn test_request_additional_exceeds_limit() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit + let manager = MemoryManager::new(limit, NoOpMetrics); + + // Acquire base quota (5MB) + let base = 5 * PERMIT_GRANULARITY_BYTES; + let mut guard = manager.try_acquire(base).unwrap(); + + // Request additional memory (3MB) - should succeed + assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + + // Request more (3MB) - should fail (would exceed 10MB limit) + let result = guard.request_additional(3 * PERMIT_GRANULARITY_BYTES); + assert!(!result); + + // Still at 8MB + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES); +} + +#[test] +fn test_request_additional_auto_release_on_guard_drop() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; + let manager = MemoryManager::new(limit, NoOpMetrics); + + { + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Request additional - memory is merged into guard + assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + + // When guard drops, all memory (base + additional) is released together + } + + // After scope, all memory should be released + assert_eq!(manager.used_bytes(), 0); +} + +#[test] +fn test_request_additional_unlimited() { + let manager = MemoryManager::new(0, NoOpMetrics); // Unlimited + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Should always succeed with unlimited manager + assert!(guard.request_additional(100 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 0); + assert_eq!(manager.used_bytes(), 0); +} + +#[test] +fn test_request_additional_zero_bytes() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; + let manager = MemoryManager::new(limit, NoOpMetrics); + + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Request 0 bytes should succeed without affecting anything + assert!(guard.request_additional(0)); + assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES); +} + +#[test] +fn test_early_release_partial_success() { + let limit = 10 * PERMIT_GRANULARITY_BYTES; + let manager = MemoryManager::new(limit, NoOpMetrics); + + let mut guard = manager.try_acquire(8 * PERMIT_GRANULARITY_BYTES).unwrap(); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); + + // Release half + assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 4 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 4 * PERMIT_GRANULARITY_BYTES); + + // Released memory should be available to others + let _guard2 = manager.try_acquire(4 * PERMIT_GRANULARITY_BYTES).unwrap(); + assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); +} + +#[test] +fn test_early_release_partial_exceeds_granted() { + let manager = MemoryManager::new(10 * PERMIT_GRANULARITY_BYTES, NoOpMetrics); + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Try to release more than granted - should fail + assert!(!guard.early_release_partial(10 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES); +} + +#[test] +fn test_early_release_partial_unlimited() { + let manager = MemoryManager::new(0, NoOpMetrics); + let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Unlimited guard - release should succeed (no-op) + assert!(guard.early_release_partial(50 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 0); +} + +#[test] +fn test_request_and_early_release_symmetry() { + let limit = 20 * PERMIT_GRANULARITY_BYTES; + let manager = MemoryManager::new(limit, NoOpMetrics); + + let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); + + // Request additional + assert!(guard.request_additional(5 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES); + + // Early release some + assert!(guard.early_release_partial(3 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 7 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 7 * PERMIT_GRANULARITY_BYTES); + + // Request again + assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 9 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 9 * PERMIT_GRANULARITY_BYTES); + + // Early release again + assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES)); + assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES); + + drop(guard); + assert_eq!(manager.used_bytes(), 0); +} + +#[test] +fn test_small_allocation_rounds_up() { + // Test that allocations smaller than PERMIT_GRANULARITY_BYTES + // round up to 1 permit and can use request_additional() + let limit = 10 * PERMIT_GRANULARITY_BYTES; + let manager = MemoryManager::new(limit, NoOpMetrics); + + let mut guard = manager.try_acquire(512 * 1024).unwrap(); // 512KB + assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); // Rounds up to 1MB + assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); // Can request more + assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES); +} + +#[test] +fn test_acquire_zero_bytes_lazy_allocation() { + // Test that acquire(0) returns 0 permits but can request_additional() later + let manager = MemoryManager::new(10 * PERMIT_GRANULARITY_BYTES, NoOpMetrics); + + let mut guard = manager.try_acquire(0).unwrap(); + assert_eq!(guard.granted_bytes(), 0); // No permits consumed + assert_eq!(manager.used_bytes(), 0); + + assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); // Lazy allocation + assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES); +} diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 68d996723e..c453534317 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -30,6 +30,7 @@ common-error.workspace = true common-grpc.workspace = true common-macro.workspace = true common-meta.workspace = true +common-memory-manager.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index bda22ac652..3bb3fe932f 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -30,6 +30,7 @@ use std::time::Instant; use api::v1::region::compact_request; use api::v1::region::compact_request::Options; use common_base::Plugins; +use common_memory_manager::OnExhaustedPolicy; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{debug, error, info, warn}; use common_time::range::TimestampRange; @@ -47,7 +48,7 @@ use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor}; -use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy}; +use crate::compaction::memory_manager::CompactionMemoryManager; use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker}; use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; @@ -809,6 +810,7 @@ mod tests { use tokio::sync::{Barrier, oneshot}; use super::*; + use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::ManifestContext; use crate::sst::FormatType; @@ -1181,7 +1183,7 @@ mod tests { #[tokio::test] async fn test_concurrent_memory_competition() { - let manager = Arc::new(CompactionMemoryManager::new(3 * 1024 * 1024)); // 3MB + let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB let barrier = Arc::new(Barrier::new(3)); let mut handles = vec![]; @@ -1196,7 +1198,7 @@ mod tests { handles.push(handle); } - let results: Vec<_> = futures::future::join_all(handles) + let results: Vec> = futures::future::join_all(handles) .await .into_iter() .map(|r| r.unwrap()) diff --git a/src/mito2/src/compaction/memory_manager.rs b/src/mito2/src/compaction/memory_manager.rs index 831e4c4e8b..8cbb5d293a 100644 --- a/src/mito2/src/compaction/memory_manager.rs +++ b/src/mito2/src/compaction/memory_manager.rs @@ -12,627 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; -use std::time::Duration; -use std::{fmt, mem}; +use common_memory_manager::{MemoryGuard, MemoryManager, MemoryMetrics}; -use common_telemetry::debug; -use humantime::{format_duration, parse_duration}; -use serde::{Deserialize, Serialize}; -use snafu::ensure; -use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError}; - -use crate::error::{ - CompactionMemoryLimitExceededSnafu, CompactionMemorySemaphoreClosedSnafu, Result, -}; use crate::metrics::{ COMPACTION_MEMORY_IN_USE, COMPACTION_MEMORY_LIMIT, COMPACTION_MEMORY_REJECTED, }; -/// Minimum bytes controlled by one semaphore permit. -const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB +/// Compaction-specific memory metrics implementation. +#[derive(Clone, Copy, Debug, Default)] +pub struct CompactionMemoryMetrics; -/// Default wait timeout for compaction memory. -pub const DEFAULT_MEMORY_WAIT_TIMEOUT: Duration = Duration::from_secs(10); +impl MemoryMetrics for CompactionMemoryMetrics { + fn set_limit(&self, bytes: i64) { + COMPACTION_MEMORY_LIMIT.set(bytes); + } -/// Defines how to react when compaction cannot acquire enough memory. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum OnExhaustedPolicy { - /// Wait until enough memory is released, bounded by timeout. - Wait { timeout: Duration }, - /// Skip the compaction if memory is not immediately available. - Skip, -} + fn set_in_use(&self, bytes: i64) { + COMPACTION_MEMORY_IN_USE.set(bytes); + } -impl Default for OnExhaustedPolicy { - fn default() -> Self { - OnExhaustedPolicy::Wait { - timeout: DEFAULT_MEMORY_WAIT_TIMEOUT, - } + fn inc_rejected(&self, reason: &str) { + COMPACTION_MEMORY_REJECTED + .with_label_values(&[reason]) + .inc(); } } -impl Serialize for OnExhaustedPolicy { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let text = match self { - OnExhaustedPolicy::Skip => "skip".to_string(), - OnExhaustedPolicy::Wait { timeout } if *timeout == DEFAULT_MEMORY_WAIT_TIMEOUT => { - "wait".to_string() - } - OnExhaustedPolicy::Wait { timeout } => { - format!("wait({})", format_duration(*timeout)) - } - }; - serializer.serialize_str(&text) - } -} - -impl<'de> Deserialize<'de> for OnExhaustedPolicy { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let raw = String::deserialize(deserializer)?; - let lower = raw.to_ascii_lowercase(); - - if lower == "skip" { - return Ok(OnExhaustedPolicy::Skip); - } - if lower == "wait" { - return Ok(OnExhaustedPolicy::default()); - } - if lower.starts_with("wait(") && lower.ends_with(')') { - let inner = &raw[5..raw.len() - 1]; - let timeout = parse_duration(inner).map_err(serde::de::Error::custom)?; - return Ok(OnExhaustedPolicy::Wait { timeout }); - } - - Err(serde::de::Error::custom(format!( - "invalid compaction memory policy: {}, expected wait, wait() or skip", - raw - ))) - } -} - -/// Global memory manager for compaction tasks. -#[derive(Clone)] -pub struct CompactionMemoryManager { - quota: Option, -} - -/// Shared memory quota state across all compaction guards. -#[derive(Clone)] -struct MemoryQuota { - semaphore: Arc, - // Maximum permits (aligned to PERMIT_GRANULARITY_BYTES). - limit_permits: u32, -} - -impl CompactionMemoryManager { - /// Creates a new memory manager with the given limit in bytes. - /// `limit_bytes = 0` disables the limit. - pub fn new(limit_bytes: u64) -> Self { - if limit_bytes == 0 { - COMPACTION_MEMORY_LIMIT.set(0); - return Self { quota: None }; - } - - let limit_permits = bytes_to_permits(limit_bytes); - let limit_aligned_bytes = permits_to_bytes(limit_permits); - COMPACTION_MEMORY_LIMIT.set(limit_aligned_bytes as i64); - - Self { - quota: Some(MemoryQuota { - semaphore: Arc::new(Semaphore::new(limit_permits as usize)), - limit_permits, - }), - } - } - - /// Returns the configured limit in bytes (0 if unlimited). - pub fn limit_bytes(&self) -> u64 { - self.quota - .as_ref() - .map(|quota| permits_to_bytes(quota.limit_permits)) - .unwrap_or(0) - } - - /// Returns currently used bytes. - pub fn used_bytes(&self) -> u64 { - self.quota - .as_ref() - .map(|quota| permits_to_bytes(quota.used_permits())) - .unwrap_or(0) - } - - /// Returns available bytes. - pub fn available_bytes(&self) -> u64 { - self.quota - .as_ref() - .map(|quota| permits_to_bytes(quota.available_permits_clamped())) - .unwrap_or(0) - } - - /// Acquires memory, waiting if necessary until enough is available. - /// - /// # Errors - /// - Returns error if requested bytes exceed the total limit - /// - Returns error if the semaphore is unexpectedly closed - pub async fn acquire(&self, bytes: u64) -> Result { - match &self.quota { - None => Ok(CompactionMemoryGuard::unlimited()), - Some(quota) => { - let permits = bytes_to_permits(bytes); - - // Fail-fast: reject if request exceeds total limit. - ensure!( - permits <= quota.limit_permits, - CompactionMemoryLimitExceededSnafu { - requested_bytes: bytes, - limit_bytes: permits_to_bytes(quota.limit_permits), - } - ); - - let permit = quota - .semaphore - .clone() - .acquire_many_owned(permits) - .await - .map_err(|_| CompactionMemorySemaphoreClosedSnafu.build())?; - quota.update_in_use_metric(); - Ok(CompactionMemoryGuard::limited(permit, quota.clone())) - } - } - } - - /// Tries to acquire memory. Returns Some(guard) on success, None if insufficient. - pub fn try_acquire(&self, bytes: u64) -> Option { - match &self.quota { - None => Some(CompactionMemoryGuard::unlimited()), - Some(quota) => { - let permits = bytes_to_permits(bytes); - - match quota.semaphore.clone().try_acquire_many_owned(permits) { - Ok(permit) => { - quota.update_in_use_metric(); - Some(CompactionMemoryGuard::limited(permit, quota.clone())) - } - Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { - COMPACTION_MEMORY_REJECTED - .with_label_values(&["try_acquire"]) - .inc(); - None - } - } - } - } - } -} - -impl MemoryQuota { - fn used_permits(&self) -> u32 { - self.limit_permits - .saturating_sub(self.available_permits_clamped()) - } - - fn available_permits_clamped(&self) -> u32 { - // Clamp to limit_permits to ensure we never report more available permits - // than our configured limit, even if semaphore state becomes inconsistent. - self.semaphore - .available_permits() - .min(self.limit_permits as usize) as u32 - } - - fn update_in_use_metric(&self) { - let bytes = permits_to_bytes(self.used_permits()); - COMPACTION_MEMORY_IN_USE.set(bytes as i64); - } -} - -/// Guard representing a slice of reserved compaction memory. -/// -/// Memory is automatically released when this guard is dropped. -pub struct CompactionMemoryGuard { - state: GuardState, -} - -enum GuardState { - Unlimited, - Limited { - // Holds all permits owned by this guard (base plus any additional). - // Additional requests merge into this permit and are released together on drop. - permit: OwnedSemaphorePermit, - // Memory quota for requesting additional permits and updating metrics. - quota: MemoryQuota, - }, -} - -impl CompactionMemoryGuard { - fn unlimited() -> Self { - Self { - state: GuardState::Unlimited, - } - } - - fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota) -> Self { - Self { - state: GuardState::Limited { permit, quota }, - } - } - - /// Returns granted quota in bytes. - pub fn granted_bytes(&self) -> u64 { - match &self.state { - GuardState::Unlimited => 0, - GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32), - } - } - - /// Tries to allocate additional memory during task execution. - /// - /// On success, merges the new memory into this guard and returns true. - /// On failure, returns false and leaves this guard unchanged. - /// - /// # Behavior - /// - Running tasks can request additional memory on top of their initial allocation - /// - If total memory (all tasks) would exceed limit, returns false immediately - /// - The task should gracefully handle false by failing or adjusting its strategy - /// - The additional memory is merged into this guard and released together on drop - pub fn request_additional(&mut self, bytes: u64) -> bool { - match &mut self.state { - GuardState::Unlimited => true, - GuardState::Limited { permit, quota } => { - // Early return for zero-byte requests (no-op) - if bytes == 0 { - return true; - } - - let additional_permits = bytes_to_permits(bytes); - - // Try to acquire additional permits from the quota - match quota - .semaphore - .clone() - .try_acquire_many_owned(additional_permits) - { - Ok(additional_permit) => { - // Merge into main permit - permit.merge(additional_permit); - quota.update_in_use_metric(); - - debug!("Allocated additional {} bytes", bytes); - - true - } - Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { - COMPACTION_MEMORY_REJECTED - .with_label_values(&["request_additional"]) - .inc(); - false - } - } - } - } - } - - /// Releases a portion of granted memory back to the pool early, - /// before the guard is dropped. - /// - /// This is useful when a task's memory requirement decreases during execution - /// (e.g., after completing a memory-intensive phase). The guard remains valid - /// with reduced quota, and the task can continue running. - pub fn early_release_partial(&mut self, bytes: u64) -> bool { - match &mut self.state { - GuardState::Unlimited => true, - GuardState::Limited { permit, quota } => { - // Early return for zero-byte requests (no-op) - if bytes == 0 { - return true; - } - - let release_permits = bytes_to_permits(bytes); - - // Split out the permits we want to release - match permit.split(release_permits as usize) { - Some(released_permit) => { - let released_bytes = permits_to_bytes(released_permit.num_permits() as u32); - - // Drop the split permit to return it to the quota - drop(released_permit); - quota.update_in_use_metric(); - - debug!( - "Early released {} bytes from compaction memory guard", - released_bytes - ); - - true - } - None => { - // Requested release exceeds granted amount - false - } - } - } - } - } -} - -impl Drop for CompactionMemoryGuard { - fn drop(&mut self) { - if let GuardState::Limited { permit, quota } = - mem::replace(&mut self.state, GuardState::Unlimited) - { - let bytes = permits_to_bytes(permit.num_permits() as u32); - - // Release permits before updating metrics to reflect latest usage. - drop(permit); - quota.update_in_use_metric(); - - debug!("Released compaction memory: {} bytes", bytes); - } - } -} - -impl fmt::Debug for CompactionMemoryGuard { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("CompactionMemoryGuard") - .field("granted_bytes", &self.granted_bytes()) - .finish() - } -} - -fn bytes_to_permits(bytes: u64) -> u32 { - // Round up to the nearest permit. - // Returns 0 for bytes=0, which allows lazy allocation via request_additional(). - // Non-zero bytes always round up to at least 1 permit due to the math. - bytes - .saturating_add(PERMIT_GRANULARITY_BYTES - 1) - .saturating_div(PERMIT_GRANULARITY_BYTES) - .min(Semaphore::MAX_PERMITS as u64) - .min(u32::MAX as u64) as u32 -} - -fn permits_to_bytes(permits: u32) -> u64 { - (permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES) -} - -#[cfg(test)] -mod tests { - use tokio::time::{Duration, sleep}; - - use super::*; - - #[test] - fn test_try_acquire_unlimited() { - let manager = CompactionMemoryManager::new(0); - let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap(); - assert_eq!(manager.limit_bytes(), 0); - assert_eq!(guard.granted_bytes(), 0); - } - - #[test] - fn test_try_acquire_limited_success_and_release() { - let bytes = 2 * PERMIT_GRANULARITY_BYTES; - let manager = CompactionMemoryManager::new(bytes); - { - let guard = manager.try_acquire(PERMIT_GRANULARITY_BYTES).unwrap(); - assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); - assert_eq!(manager.used_bytes(), PERMIT_GRANULARITY_BYTES); - drop(guard); - } - assert_eq!(manager.used_bytes(), 0); - } - - #[test] - fn test_try_acquire_exceeds_limit() { - let limit = PERMIT_GRANULARITY_BYTES; - let manager = CompactionMemoryManager::new(limit); - let result = manager.try_acquire(limit + PERMIT_GRANULARITY_BYTES); - assert!(result.is_none()); - } - - #[tokio::test(flavor = "current_thread")] - async fn test_acquire_blocks_and_unblocks() { - let bytes = 2 * PERMIT_GRANULARITY_BYTES; - let manager = CompactionMemoryManager::new(bytes); - let guard = manager.try_acquire(bytes).unwrap(); - - // Spawn a task that will block on acquire() - let waiter = { - let manager = manager.clone(); - tokio::spawn(async move { - // This will block until memory is available - let _guard = manager.acquire(bytes).await.unwrap(); - }) - }; - - sleep(Duration::from_millis(10)).await; - // Release memory - this should unblock the waiter - drop(guard); - - // Waiter should complete now - waiter.await.unwrap(); - } - - #[test] - fn test_request_additional_success() { - let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit - let manager = CompactionMemoryManager::new(limit); - - // Acquire base quota (5MB) - let base = 5 * PERMIT_GRANULARITY_BYTES; - let mut guard = manager.try_acquire(base).unwrap(); - assert_eq!(guard.granted_bytes(), base); - assert_eq!(manager.used_bytes(), base); - - // Request additional memory (3MB) - should succeed and merge - assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES); - assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); - } - - #[test] - fn test_request_additional_exceeds_limit() { - let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit - let manager = CompactionMemoryManager::new(limit); - - // Acquire base quota (5MB) - let base = 5 * PERMIT_GRANULARITY_BYTES; - let mut guard = manager.try_acquire(base).unwrap(); - - // Request additional memory (3MB) - should succeed - assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); - - // Request more (3MB) - should fail (would exceed 10MB limit) - let result = guard.request_additional(3 * PERMIT_GRANULARITY_BYTES); - assert!(!result); - - // Still at 8MB - assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); - assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES); - } - - #[test] - fn test_request_additional_auto_release_on_guard_drop() { - let limit = 10 * PERMIT_GRANULARITY_BYTES; - let manager = CompactionMemoryManager::new(limit); - - { - let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); - - // Request additional - memory is merged into guard - assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); - - // When guard drops, all memory (base + additional) is released together - } - - // After scope, all memory should be released - assert_eq!(manager.used_bytes(), 0); - } - - #[test] - fn test_request_additional_unlimited() { - let manager = CompactionMemoryManager::new(0); // Unlimited - let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); - - // Should always succeed with unlimited manager - assert!(guard.request_additional(100 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 0); - assert_eq!(manager.used_bytes(), 0); - } - - #[test] - fn test_request_additional_zero_bytes() { - let limit = 10 * PERMIT_GRANULARITY_BYTES; - let manager = CompactionMemoryManager::new(limit); - - let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); - - // Request 0 bytes should succeed without affecting anything - assert!(guard.request_additional(0)); - assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES); - assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES); - } - - #[test] - fn test_early_release_partial_success() { - let limit = 10 * PERMIT_GRANULARITY_BYTES; - let manager = CompactionMemoryManager::new(limit); - - let mut guard = manager.try_acquire(8 * PERMIT_GRANULARITY_BYTES).unwrap(); - assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); - - // Release half - assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 4 * PERMIT_GRANULARITY_BYTES); - assert_eq!(manager.used_bytes(), 4 * PERMIT_GRANULARITY_BYTES); - - // Released memory should be available to others - let _guard2 = manager.try_acquire(4 * PERMIT_GRANULARITY_BYTES).unwrap(); - assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES); - } - - #[test] - fn test_early_release_partial_exceeds_granted() { - let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES); - let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); - - // Try to release more than granted - should fail - assert!(!guard.early_release_partial(10 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES); - assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES); - } - - #[test] - fn test_early_release_partial_unlimited() { - let manager = CompactionMemoryManager::new(0); - let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap(); - - // Unlimited guard - release should succeed (no-op) - assert!(guard.early_release_partial(50 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 0); - } - - #[test] - fn test_request_and_early_release_symmetry() { - let limit = 20 * PERMIT_GRANULARITY_BYTES; - let manager = CompactionMemoryManager::new(limit); - - let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap(); - - // Request additional - assert!(guard.request_additional(5 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES); - assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES); - - // Early release some - assert!(guard.early_release_partial(3 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 7 * PERMIT_GRANULARITY_BYTES); - assert_eq!(manager.used_bytes(), 7 * PERMIT_GRANULARITY_BYTES); - - // Request again - assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 9 * PERMIT_GRANULARITY_BYTES); - assert_eq!(manager.used_bytes(), 9 * PERMIT_GRANULARITY_BYTES); - - // Early release again - assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES); - assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES); - - drop(guard); - assert_eq!(manager.used_bytes(), 0); - } - - #[test] - fn test_small_allocation_rounds_up() { - // Test that allocations smaller than PERMIT_GRANULARITY_BYTES - // round up to 1 permit and can use request_additional() - let limit = 10 * PERMIT_GRANULARITY_BYTES; - let manager = CompactionMemoryManager::new(limit); - - let mut guard = manager.try_acquire(512 * 1024).unwrap(); // 512KB - assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); // Rounds up to 1MB - assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); // Can request more - assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES); - } - - #[test] - fn test_acquire_zero_bytes_lazy_allocation() { - // Test that acquire(0) returns 0 permits but can request_additional() later - let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES); - - let mut guard = manager.try_acquire(0).unwrap(); - assert_eq!(guard.granted_bytes(), 0); // No permits consumed - assert_eq!(manager.used_bytes(), 0); - - assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); // Lazy allocation - assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES); - } +/// Compaction memory manager. +pub type CompactionMemoryManager = MemoryManager; + +/// Compaction memory guard. +pub type CompactionMemoryGuard = MemoryGuard; + +/// Helper to construct a compaction memory manager without passing metrics explicitly. +pub fn new_compaction_memory_manager(limit_bytes: u64) -> CompactionMemoryManager { + CompactionMemoryManager::new(limit_bytes, CompactionMemoryMetrics) } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 831d01fa02..87a3ad7349 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -16,17 +16,16 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::Instant; +use common_memory_manager::OnExhaustedPolicy; use common_telemetry::{error, info, warn}; use itertools::Itertools; use snafu::ResultExt; use tokio::sync::mpsc; use crate::compaction::compactor::{CompactionRegion, Compactor}; -use crate::compaction::memory_manager::{ - CompactionMemoryGuard, CompactionMemoryManager, OnExhaustedPolicy, -}; +use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager}; use crate::compaction::picker::{CompactionTask, PickerOutput}; -use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu}; +use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu, MemoryAcquireFailedSnafu}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED}; use crate::region::RegionRoleState; @@ -130,7 +129,10 @@ impl CompactionTaskImpl { } Ok(Err(e)) => { timer.observe_duration(); - Err(e) + Err(e).with_context(|_| MemoryAcquireFailedSnafu { + region_id, + policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()), + }) } Err(_) => { timer.observe_duration(); @@ -148,21 +150,20 @@ impl CompactionTaskImpl { } } } - OnExhaustedPolicy::Skip => { - // Try to acquire, skip if not immediately available + OnExhaustedPolicy::Fail => { + // Try to acquire, fail immediately if not available self.memory_manager .try_acquire(requested_bytes) .ok_or_else(|| { warn!( - "Skipping compaction for region {} due to memory limit \ - (need {} bytes, limit {} bytes)", + "Compaction memory exhausted for region {} (policy=fail, need {} bytes, limit {} bytes)", region_id, requested_bytes, limit_bytes ); CompactionMemoryExhaustedSnafu { region_id, required_bytes: requested_bytes, limit_bytes, - policy: "skip".to_string(), + policy: "fail".to_string(), } .build() }) diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 11ce9de9ea..94593ea497 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -20,13 +20,13 @@ use std::time::Duration; use common_base::memory_limit::MemoryLimit; use common_base::readable_size::ReadableSize; +use common_memory_manager::OnExhaustedPolicy; use common_stat::{get_total_cpu_cores, get_total_memory_readable}; use common_telemetry::warn; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT; -use crate::compaction::memory_manager::OnExhaustedPolicy; use crate::error::Result; use crate::gc::GcConfig; use crate::memtable::MemtableConfig; diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 90a735381b..b23e2340ec 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -19,6 +19,7 @@ use common_datasource::compression::CompressionType; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use common_memory_manager; use common_runtime::JoinError; use common_time::Timestamp; use common_time::timestamp::TimeUnit; @@ -1042,11 +1043,7 @@ pub enum Error { ManualCompactionOverride {}, #[snafu(display( - "Compaction memory limit exceeded for region {}: required {} bytes, limit {} bytes (policy: {})", - region_id, - required_bytes, - limit_bytes, - policy + "Compaction memory limit exceeded for region {region_id}: required {required_bytes} bytes, limit {limit_bytes} bytes (policy: {policy})", ))] CompactionMemoryExhausted { region_id: RegionId, @@ -1057,20 +1054,12 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Requested compaction memory ({} bytes) exceeds total limit ({} bytes)", - requested_bytes, - limit_bytes - ))] - CompactionMemoryLimitExceeded { - requested_bytes: u64, - limit_bytes: u64, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Compaction memory semaphore unexpectedly closed"))] - CompactionMemorySemaphoreClosed { + #[snafu(display("Failed to acquire memory for region {region_id} (policy: {policy})"))] + MemoryAcquireFailed { + region_id: RegionId, + policy: String, + #[snafu(source)] + source: common_memory_manager::Error, #[snafu(implicit)] location: Location, }, @@ -1359,9 +1348,7 @@ impl ErrorExt for Error { CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted, - CompactionMemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted, - - CompactionMemorySemaphoreClosed { .. } => StatusCode::Unexpected, + MemoryAcquireFailed { source, .. } => source.status_code(), IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments, diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index bdf8ca05e6..9f91a51747 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -18,6 +18,7 @@ use std::sync::{Arc, Mutex}; use common_base::Plugins; use common_datasource::compression::CompressionType; +use common_memory_manager::OnExhaustedPolicy; use common_test_util::temp_dir::{TempDir, create_temp_dir}; use object_store::ObjectStore; use object_store::services::Fs; @@ -28,7 +29,7 @@ use tokio::sync::mpsc::Sender; use crate::access_layer::{AccessLayer, AccessLayerRef}; use crate::cache::CacheManager; use crate::compaction::CompactionScheduler; -use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy}; +use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager}; use crate::config::MitoConfig; use crate::error::Result; use crate::flush::FlushScheduler; @@ -101,7 +102,7 @@ impl SchedulerEnv { Arc::new(MitoConfig::default()), WorkerListener::default(), Plugins::new(), - Arc::new(CompactionMemoryManager::new(0)), + Arc::new(new_compaction_memory_manager(0)), OnExhaustedPolicy::default(), ) } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 69f41e905a..b398f92f42 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -59,7 +59,7 @@ use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch}; use crate::cache::write_cache::{WriteCache, WriteCacheRef}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::CompactionScheduler; -use crate::compaction::memory_manager::CompactionMemoryManager; +use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager}; use crate::config::MitoConfig; use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; @@ -217,7 +217,7 @@ impl WorkerGroup { .experimental_compaction_memory_limit .resolve(total_memory); let compaction_memory_manager = - Arc::new(CompactionMemoryManager::new(compaction_limit_bytes)); + Arc::new(new_compaction_memory_manager(compaction_limit_bytes)); let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job)); let workers = (0..config.num_workers) @@ -405,7 +405,7 @@ impl WorkerGroup { .experimental_compaction_memory_limit .resolve(total_memory); let compaction_memory_manager = - Arc::new(CompactionMemoryManager::new(compaction_limit_bytes)); + Arc::new(new_compaction_memory_manager(compaction_limit_bytes)); let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job)); let workers = (0..config.num_workers) .map(|id| {