From 95eccd6cde6ae5efdd955991c935e889cbe0d8f9 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 17 Dec 2025 19:08:51 +0800 Subject: [PATCH] feat: introduce granularity for memory manager (#7416) * feat: introduce granularity for memory manager Signed-off-by: jeremyhi * chore: add unit test Signed-off-by: jeremyhi * chore: remove granularity getter for mamanger Signed-off-by: jeremyhi * Update src/common/memory-manager/src/manager.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * feat: acquire_with_policy for manager Signed-off-by: jeremyhi --------- Signed-off-by: jeremyhi Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- src/common/memory-manager/src/error.rs | 10 ++ src/common/memory-manager/src/granularity.rs | 168 +++++++++++++++++++ src/common/memory-manager/src/guard.rs | 15 +- src/common/memory-manager/src/lib.rs | 4 +- src/common/memory-manager/src/manager.rs | 93 +++++++--- src/common/memory-manager/src/tests.rs | 5 +- src/mito2/src/compaction/task.rs | 82 +-------- src/mito2/src/error.rs | 18 +- 8 files changed, 273 insertions(+), 122 deletions(-) create mode 100644 src/common/memory-manager/src/granularity.rs diff --git a/src/common/memory-manager/src/error.rs b/src/common/memory-manager/src/error.rs index 5ff7d74ad6..455b3c6a6d 100644 --- a/src/common/memory-manager/src/error.rs +++ b/src/common/memory-manager/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::time::Duration; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -35,6 +36,14 @@ pub enum Error { #[snafu(display("Memory semaphore unexpectedly closed"))] MemorySemaphoreClosed, + + #[snafu(display( + "Timeout waiting for memory quota: requested {requested_bytes} bytes, waited {waited:?}" + ))] + MemoryAcquireTimeout { + requested_bytes: u64, + waited: Duration, + }, } impl ErrorExt for Error { @@ -44,6 +53,7 @@ impl ErrorExt for Error { match self { MemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted, MemorySemaphoreClosed => StatusCode::Unexpected, + MemoryAcquireTimeout { .. } => StatusCode::RuntimeResourcesExhausted, } } diff --git a/src/common/memory-manager/src/granularity.rs b/src/common/memory-manager/src/granularity.rs new file mode 100644 index 0000000000..2aa0d885ba --- /dev/null +++ b/src/common/memory-manager/src/granularity.rs @@ -0,0 +1,168 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +/// Memory permit granularity for different use cases. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum PermitGranularity { + /// 1 KB per permit + /// + /// Use for: + /// - HTTP/gRPC request limiting (small, high-concurrency operations) + /// - Small batch operations + /// - Scenarios requiring fine-grained fairness + Kilobyte, + + /// 1 MB per permit (default) + /// + /// Use for: + /// - Query execution memory management + /// - Compaction memory control + /// - Large, long-running operations + #[default] + Megabyte, +} + +impl PermitGranularity { + /// Returns the number of bytes per permit. + #[inline] + pub const fn bytes(self) -> u64 { + match self { + Self::Kilobyte => 1024, + Self::Megabyte => 1024 * 1024, + } + } + + /// Returns a human-readable string representation. + pub const fn as_str(self) -> &'static str { + match self { + Self::Kilobyte => "1KB", + Self::Megabyte => "1MB", + } + } + + /// Converts bytes to permits based on this granularity. + /// + /// Rounds up to ensure the requested bytes are fully covered. + /// Clamped to Semaphore::MAX_PERMITS. + #[inline] + pub fn bytes_to_permits(self, bytes: u64) -> u32 { + use tokio::sync::Semaphore; + + let granularity_bytes = self.bytes(); + bytes + .saturating_add(granularity_bytes - 1) + .saturating_div(granularity_bytes) + .min(Semaphore::MAX_PERMITS as u64) + .min(u32::MAX as u64) as u32 + } + + /// Converts permits to bytes based on this granularity. + #[inline] + pub fn permits_to_bytes(self, permits: u32) -> u64 { + (permits as u64).saturating_mul(self.bytes()) + } +} + +impl fmt::Display for PermitGranularity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bytes_to_permits_kilobyte() { + let granularity = PermitGranularity::Kilobyte; + + // Exact multiples + assert_eq!(granularity.bytes_to_permits(1024), 1); + assert_eq!(granularity.bytes_to_permits(2048), 2); + assert_eq!(granularity.bytes_to_permits(10 * 1024), 10); + + // Rounds up + assert_eq!(granularity.bytes_to_permits(1), 1); + assert_eq!(granularity.bytes_to_permits(1025), 2); + assert_eq!(granularity.bytes_to_permits(2047), 2); + } + + #[test] + fn test_bytes_to_permits_megabyte() { + let granularity = PermitGranularity::Megabyte; + + // Exact multiples + assert_eq!(granularity.bytes_to_permits(1024 * 1024), 1); + assert_eq!(granularity.bytes_to_permits(2 * 1024 * 1024), 2); + + // Rounds up + assert_eq!(granularity.bytes_to_permits(1), 1); + assert_eq!(granularity.bytes_to_permits(1024), 1); + assert_eq!(granularity.bytes_to_permits(1024 * 1024 + 1), 2); + } + + #[test] + fn test_bytes_to_permits_zero_bytes() { + assert_eq!(PermitGranularity::Kilobyte.bytes_to_permits(0), 0); + assert_eq!(PermitGranularity::Megabyte.bytes_to_permits(0), 0); + } + + #[test] + fn test_bytes_to_permits_clamps_to_maximum() { + use tokio::sync::Semaphore; + + let max_permits = (Semaphore::MAX_PERMITS as u64).min(u32::MAX as u64) as u32; + + assert_eq!( + PermitGranularity::Kilobyte.bytes_to_permits(u64::MAX), + max_permits + ); + assert_eq!( + PermitGranularity::Megabyte.bytes_to_permits(u64::MAX), + max_permits + ); + } + + #[test] + fn test_permits_to_bytes() { + assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(1), 1024); + assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(10), 10 * 1024); + + assert_eq!(PermitGranularity::Megabyte.permits_to_bytes(1), 1024 * 1024); + assert_eq!( + PermitGranularity::Megabyte.permits_to_bytes(10), + 10 * 1024 * 1024 + ); + } + + #[test] + fn test_round_trip_conversion() { + // Kilobyte: bytes -> permits -> bytes (should round up) + let kb = PermitGranularity::Kilobyte; + let permits = kb.bytes_to_permits(1500); + let bytes = kb.permits_to_bytes(permits); + assert!(bytes >= 1500); // Must cover original request + assert_eq!(bytes, 2048); // 2KB + + // Megabyte: bytes -> permits -> bytes (should round up) + let mb = PermitGranularity::Megabyte; + let permits = mb.bytes_to_permits(1500); + let bytes = mb.permits_to_bytes(permits); + assert!(bytes >= 1500); + assert_eq!(bytes, 1024 * 1024); // 1MB + } +} diff --git a/src/common/memory-manager/src/guard.rs b/src/common/memory-manager/src/guard.rs index e72e16ab5b..6bad763b6a 100644 --- a/src/common/memory-manager/src/guard.rs +++ b/src/common/memory-manager/src/guard.rs @@ -17,7 +17,7 @@ use std::{fmt, mem}; use common_telemetry::debug; use tokio::sync::{OwnedSemaphorePermit, TryAcquireError}; -use crate::manager::{MemoryMetrics, MemoryQuota, bytes_to_permits, permits_to_bytes}; +use crate::manager::{MemoryMetrics, MemoryQuota}; /// Guard representing a slice of reserved memory. pub struct MemoryGuard { @@ -49,7 +49,9 @@ impl MemoryGuard { pub fn granted_bytes(&self) -> u64 { match &self.state { GuardState::Unlimited => 0, - GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32), + GuardState::Limited { permit, quota } => { + quota.permits_to_bytes(permit.num_permits() as u32) + } } } @@ -65,7 +67,7 @@ impl MemoryGuard { return true; } - let additional_permits = bytes_to_permits(bytes); + let additional_permits = quota.bytes_to_permits(bytes); match quota .semaphore @@ -99,11 +101,12 @@ impl MemoryGuard { return true; } - let release_permits = bytes_to_permits(bytes); + let release_permits = quota.bytes_to_permits(bytes); match permit.split(release_permits as usize) { Some(released_permit) => { - let released_bytes = permits_to_bytes(released_permit.num_permits() as u32); + let released_bytes = + quota.permits_to_bytes(released_permit.num_permits() as u32); drop(released_permit); quota.update_in_use_metric(); debug!("Early released {} bytes from memory guard", released_bytes); @@ -121,7 +124,7 @@ impl Drop for MemoryGuard { if let GuardState::Limited { permit, quota } = mem::replace(&mut self.state, GuardState::Unlimited) { - let bytes = permits_to_bytes(permit.num_permits() as u32); + let bytes = quota.permits_to_bytes(permit.num_permits() as u32); drop(permit); quota.update_in_use_metric(); debug!("Released memory: {} bytes", bytes); diff --git a/src/common/memory-manager/src/lib.rs b/src/common/memory-manager/src/lib.rs index 61d52f6366..b1d858ef89 100644 --- a/src/common/memory-manager/src/lib.rs +++ b/src/common/memory-manager/src/lib.rs @@ -19,6 +19,7 @@ //! share the same allocation logic while using their own metrics. mod error; +mod granularity; mod guard; mod manager; mod policy; @@ -27,8 +28,9 @@ mod policy; mod tests; pub use error::{Error, Result}; +pub use granularity::PermitGranularity; pub use guard::MemoryGuard; -pub use manager::{MemoryManager, MemoryMetrics, PERMIT_GRANULARITY_BYTES}; +pub use manager::{MemoryManager, MemoryMetrics}; pub use policy::{DEFAULT_MEMORY_WAIT_TIMEOUT, OnExhaustedPolicy}; /// No-op metrics implementation for testing. diff --git a/src/common/memory-manager/src/manager.rs b/src/common/memory-manager/src/manager.rs index 8cc7b937e4..b29e08445c 100644 --- a/src/common/memory-manager/src/manager.rs +++ b/src/common/memory-manager/src/manager.rs @@ -17,11 +17,12 @@ use std::sync::Arc; use snafu::ensure; use tokio::sync::{Semaphore, TryAcquireError}; -use crate::error::{MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result}; +use crate::error::{ + MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result, +}; +use crate::granularity::PermitGranularity; use crate::guard::MemoryGuard; - -/// Minimum bytes controlled by one semaphore permit. -pub const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB +use crate::policy::OnExhaustedPolicy; /// Trait for recording memory usage metrics. pub trait MemoryMetrics: Clone + Send + Sync + 'static { @@ -40,6 +41,7 @@ pub struct MemoryManager { pub(crate) struct MemoryQuota { pub(crate) semaphore: Arc, pub(crate) limit_permits: u32, + pub(crate) granularity: PermitGranularity, pub(crate) metrics: M, } @@ -47,19 +49,25 @@ impl MemoryManager { /// Creates a new memory manager with the given limit in bytes. /// `limit_bytes = 0` disables the limit. pub fn new(limit_bytes: u64, metrics: M) -> Self { + Self::with_granularity(limit_bytes, PermitGranularity::default(), metrics) + } + + /// Creates a new memory manager with specified granularity. + pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self { if limit_bytes == 0 { metrics.set_limit(0); return Self { quota: None }; } - let limit_permits = bytes_to_permits(limit_bytes); - let limit_aligned_bytes = permits_to_bytes(limit_permits); + let limit_permits = granularity.bytes_to_permits(limit_bytes); + let limit_aligned_bytes = granularity.permits_to_bytes(limit_permits); metrics.set_limit(limit_aligned_bytes as i64); Self { quota: Some(MemoryQuota { semaphore: Arc::new(Semaphore::new(limit_permits as usize)), limit_permits, + granularity, metrics, }), } @@ -69,7 +77,7 @@ impl MemoryManager { pub fn limit_bytes(&self) -> u64 { self.quota .as_ref() - .map(|quota| permits_to_bytes(quota.limit_permits)) + .map(|quota| quota.permits_to_bytes(quota.limit_permits)) .unwrap_or(0) } @@ -77,7 +85,7 @@ impl MemoryManager { pub fn used_bytes(&self) -> u64 { self.quota .as_ref() - .map(|quota| permits_to_bytes(quota.used_permits())) + .map(|quota| quota.permits_to_bytes(quota.used_permits())) .unwrap_or(0) } @@ -85,7 +93,7 @@ impl MemoryManager { pub fn available_bytes(&self) -> u64 { self.quota .as_ref() - .map(|quota| permits_to_bytes(quota.available_permits_clamped())) + .map(|quota| quota.permits_to_bytes(quota.available_permits_clamped())) .unwrap_or(0) } @@ -98,13 +106,13 @@ impl MemoryManager { match &self.quota { None => Ok(MemoryGuard::unlimited()), Some(quota) => { - let permits = bytes_to_permits(bytes); + let permits = quota.bytes_to_permits(bytes); ensure!( permits <= quota.limit_permits, MemoryLimitExceededSnafu { requested_bytes: bytes, - limit_bytes: permits_to_bytes(quota.limit_permits), + limit_bytes: self.limit_bytes() } ); @@ -125,7 +133,7 @@ impl MemoryManager { match &self.quota { None => Some(MemoryGuard::unlimited()), Some(quota) => { - let permits = bytes_to_permits(bytes); + let permits = quota.bytes_to_permits(bytes); match quota.semaphore.clone().try_acquire_many_owned(permits) { Ok(permit) => { @@ -140,9 +148,56 @@ impl MemoryManager { } } } + + /// Acquires memory based on the given policy. + /// + /// - For `OnExhaustedPolicy::Wait`: Waits up to the timeout duration for memory to become available + /// - For `OnExhaustedPolicy::Fail`: Returns immediately if memory is not available + /// + /// # Errors + /// - `MemoryLimitExceeded`: Requested bytes exceed the total limit (both policies), or memory is currently exhausted (Fail policy only) + /// - `MemoryAcquireTimeout`: Timeout elapsed while waiting for memory (Wait policy only) + /// - `MemorySemaphoreClosed`: The internal semaphore is unexpectedly closed (rare, indicates system issue) + pub async fn acquire_with_policy( + &self, + bytes: u64, + policy: OnExhaustedPolicy, + ) -> Result> { + match policy { + OnExhaustedPolicy::Wait { timeout } => { + match tokio::time::timeout(timeout, self.acquire(bytes)).await { + Ok(Ok(guard)) => Ok(guard), + Ok(Err(e)) => Err(e), + Err(_elapsed) => { + // Timeout elapsed while waiting + MemoryAcquireTimeoutSnafu { + requested_bytes: bytes, + waited: timeout, + } + .fail() + } + } + } + OnExhaustedPolicy::Fail => self.try_acquire(bytes).ok_or_else(|| { + MemoryLimitExceededSnafu { + requested_bytes: bytes, + limit_bytes: self.limit_bytes(), + } + .build() + }), + } + } } impl MemoryQuota { + pub(crate) fn bytes_to_permits(&self, bytes: u64) -> u32 { + self.granularity.bytes_to_permits(bytes) + } + + pub(crate) fn permits_to_bytes(&self, permits: u32) -> u64 { + self.granularity.permits_to_bytes(permits) + } + pub(crate) fn used_permits(&self) -> u32 { self.limit_permits .saturating_sub(self.available_permits_clamped()) @@ -155,19 +210,7 @@ impl MemoryQuota { } pub(crate) fn update_in_use_metric(&self) { - let bytes = permits_to_bytes(self.used_permits()); + let bytes = self.permits_to_bytes(self.used_permits()); self.metrics.set_in_use(bytes as i64); } } - -pub(crate) fn bytes_to_permits(bytes: u64) -> u32 { - bytes - .saturating_add(PERMIT_GRANULARITY_BYTES - 1) - .saturating_div(PERMIT_GRANULARITY_BYTES) - .min(Semaphore::MAX_PERMITS as u64) - .min(u32::MAX as u64) as u32 -} - -pub(crate) fn permits_to_bytes(permits: u32) -> u64 { - (permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES) -} diff --git a/src/common/memory-manager/src/tests.rs b/src/common/memory-manager/src/tests.rs index 3a928f9c7c..c35889c00f 100644 --- a/src/common/memory-manager/src/tests.rs +++ b/src/common/memory-manager/src/tests.rs @@ -14,7 +14,10 @@ use tokio::time::{Duration, sleep}; -use crate::{MemoryManager, NoOpMetrics, PERMIT_GRANULARITY_BYTES}; +use crate::{MemoryManager, NoOpMetrics, PermitGranularity}; + +// Helper constant for tests - use default Megabyte granularity +const PERMIT_GRANULARITY_BYTES: u64 = PermitGranularity::Megabyte.bytes(); #[test] fn test_try_acquire_unlimited() { diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 87a3ad7349..77dc6f3c36 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -25,7 +25,7 @@ use tokio::sync::mpsc; use crate::compaction::compactor::{CompactionRegion, Compactor}; use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager}; use crate::compaction::picker::{CompactionTask, PickerOutput}; -use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu, MemoryAcquireFailedSnafu}; +use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED}; use crate::region::RegionRoleState; @@ -95,80 +95,16 @@ impl CompactionTaskImpl { async fn acquire_memory_with_policy(&self) -> error::Result { let region_id = self.compaction_region.region_id; let requested_bytes = self.estimated_memory_bytes; - let limit_bytes = self.memory_manager.limit_bytes(); + let policy = self.memory_policy; - if limit_bytes > 0 && requested_bytes > limit_bytes { - warn!( - "Compaction for region {} requires {} bytes but limit is {} bytes; cannot satisfy request", - region_id, requested_bytes, limit_bytes - ); - return Err(CompactionMemoryExhaustedSnafu { + let _timer = COMPACTION_MEMORY_WAIT.start_timer(); + self.memory_manager + .acquire_with_policy(requested_bytes, policy) + .await + .context(CompactionMemoryExhaustedSnafu { region_id, - required_bytes: requested_bytes, - limit_bytes, - policy: "exceed_limit".to_string(), - } - .build()); - } - - match self.memory_policy { - OnExhaustedPolicy::Wait { - timeout: wait_timeout, - } => { - let timer = COMPACTION_MEMORY_WAIT.start_timer(); - - match tokio::time::timeout( - wait_timeout, - self.memory_manager.acquire(requested_bytes), - ) - .await - { - Ok(Ok(guard)) => { - timer.observe_duration(); - Ok(guard) - } - Ok(Err(e)) => { - timer.observe_duration(); - Err(e).with_context(|_| MemoryAcquireFailedSnafu { - region_id, - policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()), - }) - } - Err(_) => { - timer.observe_duration(); - warn!( - "Compaction for region {} waited {:?} for {} bytes but timed out", - region_id, wait_timeout, requested_bytes - ); - CompactionMemoryExhaustedSnafu { - region_id, - required_bytes: requested_bytes, - limit_bytes, - policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()), - } - .fail() - } - } - } - OnExhaustedPolicy::Fail => { - // Try to acquire, fail immediately if not available - self.memory_manager - .try_acquire(requested_bytes) - .ok_or_else(|| { - warn!( - "Compaction memory exhausted for region {} (policy=fail, need {} bytes, limit {} bytes)", - region_id, requested_bytes, limit_bytes - ); - CompactionMemoryExhaustedSnafu { - region_id, - required_bytes: requested_bytes, - limit_bytes, - policy: "fail".to_string(), - } - .build() - }) - } - } + policy: format!("{policy:?}"), + }) } /// Remove expired ssts files, update manifest immediately diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index cda2c75403..ae7b02be7c 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1042,20 +1042,8 @@ pub enum Error { #[snafu(display("Manual compaction is override by following operations."))] ManualCompactionOverride {}, - #[snafu(display( - "Compaction memory limit exceeded for region {region_id}: required {required_bytes} bytes, limit {limit_bytes} bytes (policy: {policy})", - ))] + #[snafu(display("Compaction memory exhausted for region {region_id} (policy: {policy})",))] CompactionMemoryExhausted { - region_id: RegionId, - required_bytes: u64, - limit_bytes: u64, - policy: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to acquire memory for region {region_id} (policy: {policy})"))] - MemoryAcquireFailed { region_id: RegionId, policy: String, #[snafu(source)] @@ -1359,9 +1347,7 @@ impl ErrorExt for Error { ManualCompactionOverride {} => StatusCode::Cancelled, - CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted, - - MemoryAcquireFailed { source, .. } => source.status_code(), + CompactionMemoryExhausted { source, .. } => source.status_code(), IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,