diff --git a/Cargo.lock b/Cargo.lock
index 6781ac3895..bdf431f7ad 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2328,6 +2328,19 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "common-memory-manager"
+version = "1.0.0-beta.2"
+dependencies = [
+ "common-error",
+ "common-macro",
+ "common-telemetry",
+ "humantime",
+ "serde",
+ "snafu 0.8.6",
+ "tokio",
+]
+
[[package]]
name = "common-meta"
version = "1.0.0-beta.2"
@@ -7651,6 +7664,7 @@ dependencies = [
"common-function",
"common-grpc",
"common-macro",
+ "common-memory-manager",
"common-meta",
"common-query",
"common-recordbatch",
diff --git a/Cargo.toml b/Cargo.toml
index c327702ca8..a8a09ae993 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,6 +21,7 @@ members = [
"src/common/grpc-expr",
"src/common/macro",
"src/common/mem-prof",
+ "src/common/memory-manager",
"src/common/meta",
"src/common/options",
"src/common/plugins",
@@ -266,6 +267,7 @@ common-grpc = { path = "src/common/grpc" }
common-grpc-expr = { path = "src/common/grpc-expr" }
common-macro = { path = "src/common/macro" }
common-mem-prof = { path = "src/common/mem-prof" }
+common-memory-manager = { path = "src/common/memory-manager" }
common-meta = { path = "src/common/meta" }
common-options = { path = "src/common/options" }
common-plugins = { path = "src/common/plugins" }
diff --git a/config/config.md b/config/config.md
index dc663cead4..56eda451a4 100644
--- a/config/config.md
+++ b/config/config.md
@@ -142,7 +142,7 @@
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
-| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "skip" |
+| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "fail" |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. |
@@ -524,7 +524,7 @@
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
-| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "skip" |
+| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.
Options: "wait" (default, 10s), "wait()", "fail" |
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 0159fc237b..d159d52ae1 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -457,7 +457,7 @@ compress_manifest = false
#+ experimental_compaction_memory_limit = "0"
## Behavior when compaction cannot acquire memory from the budget.
-## Options: "wait" (default, 10s), "wait()", "skip"
+## Options: "wait" (default, 10s), "wait()", "fail"
## @toml2docs:none-default="wait"
#+ experimental_compaction_on_exhausted = "wait"
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 653b0cb4af..ff0b8c3d5e 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -551,7 +551,7 @@ compress_manifest = false
#+ experimental_compaction_memory_limit = "0"
## Behavior when compaction cannot acquire memory from the budget.
-## Options: "wait" (default, 10s), "wait()", "skip"
+## Options: "wait" (default, 10s), "wait()", "fail"
## @toml2docs:none-default="wait"
#+ experimental_compaction_on_exhausted = "wait"
diff --git a/src/common/memory-manager/Cargo.toml b/src/common/memory-manager/Cargo.toml
new file mode 100644
index 0000000000..a6be50f774
--- /dev/null
+++ b/src/common/memory-manager/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "common-memory-manager"
+version.workspace = true
+edition.workspace = true
+license.workspace = true
+
+[lints]
+workspace = true
+
+[dependencies]
+common-error = { workspace = true }
+common-macro = { workspace = true }
+common-telemetry = { workspace = true }
+humantime = { workspace = true }
+serde = { workspace = true }
+snafu = { workspace = true }
+tokio = { workspace = true, features = ["sync"] }
+
+[dev-dependencies]
+tokio = { workspace = true, features = ["rt", "macros"] }
diff --git a/src/common/memory-manager/src/error.rs b/src/common/memory-manager/src/error.rs
new file mode 100644
index 0000000000..5ff7d74ad6
--- /dev/null
+++ b/src/common/memory-manager/src/error.rs
@@ -0,0 +1,53 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::any::Any;
+
+use common_error::ext::ErrorExt;
+use common_error::status_code::StatusCode;
+use common_macro::stack_trace_debug;
+use snafu::Snafu;
+
+pub type Result = std::result::Result;
+
+#[derive(Snafu)]
+#[snafu(visibility(pub))]
+#[stack_trace_debug]
+pub enum Error {
+ #[snafu(display(
+ "Memory limit exceeded: requested {requested_bytes} bytes, limit {limit_bytes} bytes"
+ ))]
+ MemoryLimitExceeded {
+ requested_bytes: u64,
+ limit_bytes: u64,
+ },
+
+ #[snafu(display("Memory semaphore unexpectedly closed"))]
+ MemorySemaphoreClosed,
+}
+
+impl ErrorExt for Error {
+ fn status_code(&self) -> StatusCode {
+ use Error::*;
+
+ match self {
+ MemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
+ MemorySemaphoreClosed => StatusCode::Unexpected,
+ }
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+}
diff --git a/src/common/memory-manager/src/guard.rs b/src/common/memory-manager/src/guard.rs
new file mode 100644
index 0000000000..e72e16ab5b
--- /dev/null
+++ b/src/common/memory-manager/src/guard.rs
@@ -0,0 +1,138 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::{fmt, mem};
+
+use common_telemetry::debug;
+use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
+
+use crate::manager::{MemoryMetrics, MemoryQuota, bytes_to_permits, permits_to_bytes};
+
+/// Guard representing a slice of reserved memory.
+pub struct MemoryGuard {
+ pub(crate) state: GuardState,
+}
+
+pub(crate) enum GuardState {
+ Unlimited,
+ Limited {
+ permit: OwnedSemaphorePermit,
+ quota: MemoryQuota,
+ },
+}
+
+impl MemoryGuard {
+ pub(crate) fn unlimited() -> Self {
+ Self {
+ state: GuardState::Unlimited,
+ }
+ }
+
+ pub(crate) fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota) -> Self {
+ Self {
+ state: GuardState::Limited { permit, quota },
+ }
+ }
+
+ /// Returns granted quota in bytes.
+ pub fn granted_bytes(&self) -> u64 {
+ match &self.state {
+ GuardState::Unlimited => 0,
+ GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
+ }
+ }
+
+ /// Tries to allocate additional memory during task execution.
+ ///
+ /// On success, merges the new memory into this guard and returns true.
+ /// On failure, returns false and leaves this guard unchanged.
+ pub fn request_additional(&mut self, bytes: u64) -> bool {
+ match &mut self.state {
+ GuardState::Unlimited => true,
+ GuardState::Limited { permit, quota } => {
+ if bytes == 0 {
+ return true;
+ }
+
+ let additional_permits = bytes_to_permits(bytes);
+
+ match quota
+ .semaphore
+ .clone()
+ .try_acquire_many_owned(additional_permits)
+ {
+ Ok(additional_permit) => {
+ permit.merge(additional_permit);
+ quota.update_in_use_metric();
+ debug!("Allocated additional {} bytes", bytes);
+ true
+ }
+ Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
+ quota.metrics.inc_rejected("request_additional");
+ false
+ }
+ }
+ }
+ }
+ }
+
+ /// Releases a portion of granted memory back to the pool early,
+ /// before the guard is dropped.
+ ///
+ /// Returns true if the release succeeds or is a no-op; false if the request exceeds granted.
+ pub fn early_release_partial(&mut self, bytes: u64) -> bool {
+ match &mut self.state {
+ GuardState::Unlimited => true,
+ GuardState::Limited { permit, quota } => {
+ if bytes == 0 {
+ return true;
+ }
+
+ let release_permits = bytes_to_permits(bytes);
+
+ match permit.split(release_permits as usize) {
+ Some(released_permit) => {
+ let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
+ drop(released_permit);
+ quota.update_in_use_metric();
+ debug!("Early released {} bytes from memory guard", released_bytes);
+ true
+ }
+ None => false,
+ }
+ }
+ }
+ }
+}
+
+impl Drop for MemoryGuard {
+ fn drop(&mut self) {
+ if let GuardState::Limited { permit, quota } =
+ mem::replace(&mut self.state, GuardState::Unlimited)
+ {
+ let bytes = permits_to_bytes(permit.num_permits() as u32);
+ drop(permit);
+ quota.update_in_use_metric();
+ debug!("Released memory: {} bytes", bytes);
+ }
+ }
+}
+
+impl fmt::Debug for MemoryGuard {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MemoryGuard")
+ .field("granted_bytes", &self.granted_bytes())
+ .finish()
+ }
+}
diff --git a/src/common/memory-manager/src/lib.rs b/src/common/memory-manager/src/lib.rs
new file mode 100644
index 0000000000..61d52f6366
--- /dev/null
+++ b/src/common/memory-manager/src/lib.rs
@@ -0,0 +1,47 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Generic memory management for resource-constrained operations.
+//!
+//! This crate provides a reusable memory quota system based on semaphores,
+//! allowing different subsystems (compaction, flush, index build, etc.) to
+//! share the same allocation logic while using their own metrics.
+
+mod error;
+mod guard;
+mod manager;
+mod policy;
+
+#[cfg(test)]
+mod tests;
+
+pub use error::{Error, Result};
+pub use guard::MemoryGuard;
+pub use manager::{MemoryManager, MemoryMetrics, PERMIT_GRANULARITY_BYTES};
+pub use policy::{DEFAULT_MEMORY_WAIT_TIMEOUT, OnExhaustedPolicy};
+
+/// No-op metrics implementation for testing.
+#[derive(Clone, Copy, Debug, Default)]
+pub struct NoOpMetrics;
+
+impl MemoryMetrics for NoOpMetrics {
+ #[inline(always)]
+ fn set_limit(&self, _: i64) {}
+
+ #[inline(always)]
+ fn set_in_use(&self, _: i64) {}
+
+ #[inline(always)]
+ fn inc_rejected(&self, _: &str) {}
+}
diff --git a/src/common/memory-manager/src/manager.rs b/src/common/memory-manager/src/manager.rs
new file mode 100644
index 0000000000..8cc7b937e4
--- /dev/null
+++ b/src/common/memory-manager/src/manager.rs
@@ -0,0 +1,173 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use snafu::ensure;
+use tokio::sync::{Semaphore, TryAcquireError};
+
+use crate::error::{MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result};
+use crate::guard::MemoryGuard;
+
+/// Minimum bytes controlled by one semaphore permit.
+pub const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
+
+/// Trait for recording memory usage metrics.
+pub trait MemoryMetrics: Clone + Send + Sync + 'static {
+ fn set_limit(&self, bytes: i64);
+ fn set_in_use(&self, bytes: i64);
+ fn inc_rejected(&self, reason: &str);
+}
+
+/// Generic memory manager for quota-controlled operations.
+#[derive(Clone)]
+pub struct MemoryManager {
+ quota: Option>,
+}
+
+#[derive(Clone)]
+pub(crate) struct MemoryQuota {
+ pub(crate) semaphore: Arc,
+ pub(crate) limit_permits: u32,
+ pub(crate) metrics: M,
+}
+
+impl MemoryManager {
+ /// Creates a new memory manager with the given limit in bytes.
+ /// `limit_bytes = 0` disables the limit.
+ pub fn new(limit_bytes: u64, metrics: M) -> Self {
+ if limit_bytes == 0 {
+ metrics.set_limit(0);
+ return Self { quota: None };
+ }
+
+ let limit_permits = bytes_to_permits(limit_bytes);
+ let limit_aligned_bytes = permits_to_bytes(limit_permits);
+ metrics.set_limit(limit_aligned_bytes as i64);
+
+ Self {
+ quota: Some(MemoryQuota {
+ semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
+ limit_permits,
+ metrics,
+ }),
+ }
+ }
+
+ /// Returns the configured limit in bytes (0 if unlimited).
+ pub fn limit_bytes(&self) -> u64 {
+ self.quota
+ .as_ref()
+ .map(|quota| permits_to_bytes(quota.limit_permits))
+ .unwrap_or(0)
+ }
+
+ /// Returns currently used bytes.
+ pub fn used_bytes(&self) -> u64 {
+ self.quota
+ .as_ref()
+ .map(|quota| permits_to_bytes(quota.used_permits()))
+ .unwrap_or(0)
+ }
+
+ /// Returns available bytes.
+ pub fn available_bytes(&self) -> u64 {
+ self.quota
+ .as_ref()
+ .map(|quota| permits_to_bytes(quota.available_permits_clamped()))
+ .unwrap_or(0)
+ }
+
+ /// Acquires memory, waiting if necessary until enough is available.
+ ///
+ /// # Errors
+ /// - Returns error if requested bytes exceed the total limit
+ /// - Returns error if the semaphore is unexpectedly closed
+ pub async fn acquire(&self, bytes: u64) -> Result> {
+ match &self.quota {
+ None => Ok(MemoryGuard::unlimited()),
+ Some(quota) => {
+ let permits = bytes_to_permits(bytes);
+
+ ensure!(
+ permits <= quota.limit_permits,
+ MemoryLimitExceededSnafu {
+ requested_bytes: bytes,
+ limit_bytes: permits_to_bytes(quota.limit_permits),
+ }
+ );
+
+ let permit = quota
+ .semaphore
+ .clone()
+ .acquire_many_owned(permits)
+ .await
+ .map_err(|_| MemorySemaphoreClosedSnafu.build())?;
+ quota.update_in_use_metric();
+ Ok(MemoryGuard::limited(permit, quota.clone()))
+ }
+ }
+ }
+
+ /// Tries to acquire memory. Returns Some(guard) on success, None if insufficient.
+ pub fn try_acquire(&self, bytes: u64) -> Option> {
+ match &self.quota {
+ None => Some(MemoryGuard::unlimited()),
+ Some(quota) => {
+ let permits = bytes_to_permits(bytes);
+
+ match quota.semaphore.clone().try_acquire_many_owned(permits) {
+ Ok(permit) => {
+ quota.update_in_use_metric();
+ Some(MemoryGuard::limited(permit, quota.clone()))
+ }
+ Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
+ quota.metrics.inc_rejected("try_acquire");
+ None
+ }
+ }
+ }
+ }
+ }
+}
+
+impl MemoryQuota {
+ pub(crate) fn used_permits(&self) -> u32 {
+ self.limit_permits
+ .saturating_sub(self.available_permits_clamped())
+ }
+
+ pub(crate) fn available_permits_clamped(&self) -> u32 {
+ self.semaphore
+ .available_permits()
+ .min(self.limit_permits as usize) as u32
+ }
+
+ pub(crate) fn update_in_use_metric(&self) {
+ let bytes = permits_to_bytes(self.used_permits());
+ self.metrics.set_in_use(bytes as i64);
+ }
+}
+
+pub(crate) fn bytes_to_permits(bytes: u64) -> u32 {
+ bytes
+ .saturating_add(PERMIT_GRANULARITY_BYTES - 1)
+ .saturating_div(PERMIT_GRANULARITY_BYTES)
+ .min(Semaphore::MAX_PERMITS as u64)
+ .min(u32::MAX as u64) as u32
+}
+
+pub(crate) fn permits_to_bytes(permits: u32) -> u64 {
+ (permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
+}
diff --git a/src/common/memory-manager/src/policy.rs b/src/common/memory-manager/src/policy.rs
new file mode 100644
index 0000000000..3f19568b8f
--- /dev/null
+++ b/src/common/memory-manager/src/policy.rs
@@ -0,0 +1,83 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::time::Duration;
+
+use humantime::{format_duration, parse_duration};
+use serde::{Deserialize, Serialize};
+
+/// Default wait timeout for memory acquisition.
+pub const DEFAULT_MEMORY_WAIT_TIMEOUT: Duration = Duration::from_secs(10);
+
+/// Defines how to react when memory cannot be acquired immediately.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum OnExhaustedPolicy {
+ /// Wait until enough memory is released, bounded by timeout.
+ Wait { timeout: Duration },
+
+ /// Fail immediately if memory is not available.
+ Fail,
+}
+
+impl Default for OnExhaustedPolicy {
+ fn default() -> Self {
+ OnExhaustedPolicy::Wait {
+ timeout: DEFAULT_MEMORY_WAIT_TIMEOUT,
+ }
+ }
+}
+
+impl Serialize for OnExhaustedPolicy {
+ fn serialize(&self, serializer: S) -> Result
+ where
+ S: serde::Serializer,
+ {
+ let text = match self {
+ OnExhaustedPolicy::Fail => "fail".to_string(),
+ OnExhaustedPolicy::Wait { timeout } if *timeout == DEFAULT_MEMORY_WAIT_TIMEOUT => {
+ "wait".to_string()
+ }
+ OnExhaustedPolicy::Wait { timeout } => format!("wait({})", format_duration(*timeout)),
+ };
+ serializer.serialize_str(&text)
+ }
+}
+
+impl<'de> Deserialize<'de> for OnExhaustedPolicy {
+ fn deserialize(deserializer: D) -> Result
+ where
+ D: serde::Deserializer<'de>,
+ {
+ let raw = String::deserialize(deserializer)?;
+ let lower = raw.to_ascii_lowercase();
+
+ // Accept both "skip" (legacy) and "fail".
+ if lower == "skip" || lower == "fail" {
+ return Ok(OnExhaustedPolicy::Fail);
+ }
+ if lower == "wait" {
+ return Ok(OnExhaustedPolicy::default());
+ }
+ if lower.starts_with("wait(") && lower.ends_with(')') {
+ let inner = &raw[5..raw.len() - 1];
+ let timeout = parse_duration(inner).map_err(serde::de::Error::custom)?;
+ return Ok(OnExhaustedPolicy::Wait { timeout });
+ }
+
+ Err(serde::de::Error::custom(format!(
+ "invalid memory policy: {}, expected wait, wait(), fail",
+ raw
+ )))
+ }
+}
diff --git a/src/common/memory-manager/src/tests.rs b/src/common/memory-manager/src/tests.rs
new file mode 100644
index 0000000000..3a928f9c7c
--- /dev/null
+++ b/src/common/memory-manager/src/tests.rs
@@ -0,0 +1,247 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use tokio::time::{Duration, sleep};
+
+use crate::{MemoryManager, NoOpMetrics, PERMIT_GRANULARITY_BYTES};
+
+#[test]
+fn test_try_acquire_unlimited() {
+ let manager = MemoryManager::new(0, NoOpMetrics);
+ let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap();
+ assert_eq!(manager.limit_bytes(), 0);
+ assert_eq!(guard.granted_bytes(), 0);
+}
+
+#[test]
+fn test_try_acquire_limited_success_and_release() {
+ let bytes = 2 * PERMIT_GRANULARITY_BYTES;
+ let manager = MemoryManager::new(bytes, NoOpMetrics);
+ {
+ let guard = manager.try_acquire(PERMIT_GRANULARITY_BYTES).unwrap();
+ assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), PERMIT_GRANULARITY_BYTES);
+ drop(guard);
+ }
+ assert_eq!(manager.used_bytes(), 0);
+}
+
+#[test]
+fn test_try_acquire_exceeds_limit() {
+ let limit = PERMIT_GRANULARITY_BYTES;
+ let manager = MemoryManager::new(limit, NoOpMetrics);
+ let result = manager.try_acquire(limit + PERMIT_GRANULARITY_BYTES);
+ assert!(result.is_none());
+}
+
+#[tokio::test(flavor = "current_thread")]
+async fn test_acquire_blocks_and_unblocks() {
+ let bytes = 2 * PERMIT_GRANULARITY_BYTES;
+ let manager = MemoryManager::new(bytes, NoOpMetrics);
+ let guard = manager.try_acquire(bytes).unwrap();
+
+ // Spawn a task that will block on acquire()
+ let waiter = {
+ let manager = manager.clone();
+ tokio::spawn(async move {
+ // This will block until memory is available
+ let _guard = manager.acquire(bytes).await.unwrap();
+ })
+ };
+
+ sleep(Duration::from_millis(10)).await;
+ // Release memory - this should unblock the waiter
+ drop(guard);
+
+ // Waiter should complete now
+ waiter.await.unwrap();
+}
+
+#[test]
+fn test_request_additional_success() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
+ let manager = MemoryManager::new(limit, NoOpMetrics);
+
+ // Acquire base quota (5MB)
+ let base = 5 * PERMIT_GRANULARITY_BYTES;
+ let mut guard = manager.try_acquire(base).unwrap();
+ assert_eq!(guard.granted_bytes(), base);
+ assert_eq!(manager.used_bytes(), base);
+
+ // Request additional memory (3MB) - should succeed and merge
+ assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+}
+
+#[test]
+fn test_request_additional_exceeds_limit() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
+ let manager = MemoryManager::new(limit, NoOpMetrics);
+
+ // Acquire base quota (5MB)
+ let base = 5 * PERMIT_GRANULARITY_BYTES;
+ let mut guard = manager.try_acquire(base).unwrap();
+
+ // Request additional memory (3MB) - should succeed
+ assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+
+ // Request more (3MB) - should fail (would exceed 10MB limit)
+ let result = guard.request_additional(3 * PERMIT_GRANULARITY_BYTES);
+ assert!(!result);
+
+ // Still at 8MB
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+}
+
+#[test]
+fn test_request_additional_auto_release_on_guard_drop() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES;
+ let manager = MemoryManager::new(limit, NoOpMetrics);
+
+ {
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Request additional - memory is merged into guard
+ assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+
+ // When guard drops, all memory (base + additional) is released together
+ }
+
+ // After scope, all memory should be released
+ assert_eq!(manager.used_bytes(), 0);
+}
+
+#[test]
+fn test_request_additional_unlimited() {
+ let manager = MemoryManager::new(0, NoOpMetrics); // Unlimited
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Should always succeed with unlimited manager
+ assert!(guard.request_additional(100 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 0);
+ assert_eq!(manager.used_bytes(), 0);
+}
+
+#[test]
+fn test_request_additional_zero_bytes() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES;
+ let manager = MemoryManager::new(limit, NoOpMetrics);
+
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Request 0 bytes should succeed without affecting anything
+ assert!(guard.request_additional(0));
+ assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+}
+
+#[test]
+fn test_early_release_partial_success() {
+ let limit = 10 * PERMIT_GRANULARITY_BYTES;
+ let manager = MemoryManager::new(limit, NoOpMetrics);
+
+ let mut guard = manager.try_acquire(8 * PERMIT_GRANULARITY_BYTES).unwrap();
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+
+ // Release half
+ assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
+
+ // Released memory should be available to others
+ let _guard2 = manager.try_acquire(4 * PERMIT_GRANULARITY_BYTES).unwrap();
+ assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
+}
+
+#[test]
+fn test_early_release_partial_exceeds_granted() {
+ let manager = MemoryManager::new(10 * PERMIT_GRANULARITY_BYTES, NoOpMetrics);
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Try to release more than granted - should fail
+ assert!(!guard.early_release_partial(10 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+}
+
+#[test]
+fn test_early_release_partial_unlimited() {
+ let manager = MemoryManager::new(0, NoOpMetrics);
+ let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Unlimited guard - release should succeed (no-op)
+ assert!(guard.early_release_partial(50 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 0);
+}
+
+#[test]
+fn test_request_and_early_release_symmetry() {
+ let limit = 20 * PERMIT_GRANULARITY_BYTES;
+ let manager = MemoryManager::new(limit, NoOpMetrics);
+
+ let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
+
+ // Request additional
+ assert!(guard.request_additional(5 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
+
+ // Early release some
+ assert!(guard.early_release_partial(3 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
+
+ // Request again
+ assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
+
+ // Early release again
+ assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
+ assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+ assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
+
+ drop(guard);
+ assert_eq!(manager.used_bytes(), 0);
+}
+
+#[test]
+fn test_small_allocation_rounds_up() {
+ // Test that allocations smaller than PERMIT_GRANULARITY_BYTES
+ // round up to 1 permit and can use request_additional()
+ let limit = 10 * PERMIT_GRANULARITY_BYTES;
+ let manager = MemoryManager::new(limit, NoOpMetrics);
+
+ let mut guard = manager.try_acquire(512 * 1024).unwrap(); // 512KB
+ assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); // Rounds up to 1MB
+ assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); // Can request more
+ assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
+}
+
+#[test]
+fn test_acquire_zero_bytes_lazy_allocation() {
+ // Test that acquire(0) returns 0 permits but can request_additional() later
+ let manager = MemoryManager::new(10 * PERMIT_GRANULARITY_BYTES, NoOpMetrics);
+
+ let mut guard = manager.try_acquire(0).unwrap();
+ assert_eq!(guard.granted_bytes(), 0); // No permits consumed
+ assert_eq!(manager.used_bytes(), 0);
+
+ assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); // Lazy allocation
+ assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
+}
diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml
index 68d996723e..c453534317 100644
--- a/src/mito2/Cargo.toml
+++ b/src/mito2/Cargo.toml
@@ -30,6 +30,7 @@ common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
+common-memory-manager.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs
index bda22ac652..3bb3fe932f 100644
--- a/src/mito2/src/compaction.rs
+++ b/src/mito2/src/compaction.rs
@@ -30,6 +30,7 @@ use std::time::Instant;
use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use common_base::Plugins;
+use common_memory_manager::OnExhaustedPolicy;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
use common_time::range::TimestampRange;
@@ -47,7 +48,7 @@ use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
-use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy};
+use crate::compaction::memory_manager::CompactionMemoryManager;
use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
@@ -809,6 +810,7 @@ mod tests {
use tokio::sync::{Barrier, oneshot};
use super::*;
+ use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::ManifestContext;
use crate::sst::FormatType;
@@ -1181,7 +1183,7 @@ mod tests {
#[tokio::test]
async fn test_concurrent_memory_competition() {
- let manager = Arc::new(CompactionMemoryManager::new(3 * 1024 * 1024)); // 3MB
+ let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
@@ -1196,7 +1198,7 @@ mod tests {
handles.push(handle);
}
- let results: Vec<_> = futures::future::join_all(handles)
+ let results: Vec