diff --git a/Cargo.lock b/Cargo.lock index 94f7a3eca1..1f65f1289c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2488,7 +2488,6 @@ version = "1.0.0-rc.2" dependencies = [ "common-error", "common-macro", - "common-telemetry", "humantime", "serde", "snafu 0.8.6", diff --git a/src/common/memory-manager/Cargo.toml b/src/common/memory-manager/Cargo.toml index a6be50f774..6686c98167 100644 --- a/src/common/memory-manager/Cargo.toml +++ b/src/common/memory-manager/Cargo.toml @@ -10,7 +10,6 @@ workspace = true [dependencies] common-error = { workspace = true } common-macro = { workspace = true } -common-telemetry = { workspace = true } humantime = { workspace = true } serde = { workspace = true } snafu = { workspace = true } diff --git a/src/common/memory-manager/src/guard.rs b/src/common/memory-manager/src/guard.rs index 770b6dec24..ad3111581b 100644 --- a/src/common/memory-manager/src/guard.rs +++ b/src/common/memory-manager/src/guard.rs @@ -14,14 +14,13 @@ use std::{fmt, mem}; -use common_telemetry::debug; use snafu::ensure; use tokio::sync::{OwnedSemaphorePermit, TryAcquireError}; use crate::error::{ MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result, }; -use crate::manager::{MemoryMetrics, MemoryQuota}; +use crate::manager::{MemoryMetrics, MemoryQuota, UnlimitedMemoryQuota}; use crate::policy::OnExhaustedPolicy; /// Guard representing a slice of reserved memory. @@ -30,31 +29,57 @@ pub struct MemoryGuard { } pub(crate) enum GuardState { - Unlimited, + Released, + Unlimited { + quota: UnlimitedMemoryQuota, + granted_bytes: u64, + }, Limited { - permit: OwnedSemaphorePermit, quota: MemoryQuota, + permit: OwnedSemaphorePermit, }, } +impl GuardState { + fn release(self) { + match self { + GuardState::Released => {} + GuardState::Unlimited { + quota, + granted_bytes, + } => { + quota.sub_in_use(granted_bytes); + } + GuardState::Limited { quota, permit } => { + quota.release_permit(permit); + } + } + } +} + impl MemoryGuard { - pub(crate) fn unlimited() -> Self { + pub(crate) fn unlimited(quota: UnlimitedMemoryQuota, bytes: u64) -> Self { + quota.add_in_use(bytes); Self { - state: GuardState::Unlimited, + state: GuardState::Unlimited { + quota, + granted_bytes: bytes, + }, } } - pub(crate) fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota) -> Self { + pub(crate) fn limited(quota: MemoryQuota, permit: OwnedSemaphorePermit) -> Self { Self { - state: GuardState::Limited { permit, quota }, + state: GuardState::Limited { quota, permit }, } } /// Returns granted quota in bytes. pub fn granted_bytes(&self) -> u64 { match &self.state { - GuardState::Unlimited => 0, - GuardState::Limited { permit, quota } => { + GuardState::Released => 0, + GuardState::Unlimited { granted_bytes, .. } => *granted_bytes, + GuardState::Limited { quota, permit } => { quota.permits_to_bytes(permit.num_permits() as u32) } } @@ -68,13 +93,24 @@ impl MemoryGuard { /// - Returns error if requested bytes would exceed the manager's total limit /// - Returns error if the semaphore is unexpectedly closed pub async fn acquire_additional(&mut self, bytes: u64) -> Result<()> { - match &mut self.state { - GuardState::Unlimited => Ok(()), - GuardState::Limited { permit, quota } => { - if bytes == 0 { - return Ok(()); - } + if bytes == 0 { + return Ok(()); + } + match &mut self.state { + GuardState::Released => { + debug_assert!(false, "released memory guard state should not be reused"); + Ok(()) + } + GuardState::Unlimited { + quota, + granted_bytes, + } => { + quota.add_in_use(bytes); + *granted_bytes = granted_bytes.saturating_add(bytes); + Ok(()) + } + GuardState::Limited { quota, permit } => { let additional_permits = quota.bytes_to_permits(bytes); let current_permits = permit.num_permits() as u32; @@ -95,7 +131,6 @@ impl MemoryGuard { permit.merge(additional_permit); quota.update_in_use_metric(); - debug!("Acquired additional {} bytes", bytes); Ok(()) } } @@ -106,13 +141,24 @@ impl MemoryGuard { /// On success, merges the new memory into this guard and returns true. /// On failure, returns false and leaves this guard unchanged. pub fn try_acquire_additional(&mut self, bytes: u64) -> bool { - match &mut self.state { - GuardState::Unlimited => true, - GuardState::Limited { permit, quota } => { - if bytes == 0 { - return true; - } + if bytes == 0 { + return true; + } + match &mut self.state { + GuardState::Released => { + debug_assert!(false, "released memory guard state should not be reused"); + false + } + GuardState::Unlimited { + quota, + granted_bytes, + } => { + quota.add_in_use(bytes); + *granted_bytes = granted_bytes.saturating_add(bytes); + true + } + GuardState::Limited { quota, permit } => { let additional_permits = quota.bytes_to_permits(bytes); match quota @@ -123,7 +169,6 @@ impl MemoryGuard { Ok(additional_permit) => { permit.merge(additional_permit); quota.update_in_use_metric(); - debug!("Acquired additional {} bytes", bytes); true } Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { @@ -168,7 +213,8 @@ impl MemoryGuard { MemoryLimitExceededSnafu { requested_bytes: bytes, limit_bytes: match &self.state { - GuardState::Unlimited => 0, // unreachable: unlimited mode always succeeds + GuardState::Released => 0, + GuardState::Unlimited { .. } => 0, // unreachable: unlimited mode always succeeds GuardState::Limited { quota, .. } => { quota.permits_to_bytes(quota.limit_permits) } @@ -184,22 +230,30 @@ impl MemoryGuard { /// /// Returns true if the release succeeds or is a no-op; false if the request exceeds granted. pub fn release_partial(&mut self, bytes: u64) -> bool { + if bytes == 0 { + return true; + } + match &mut self.state { - GuardState::Unlimited => true, - GuardState::Limited { permit, quota } => { - if bytes == 0 { - return true; + GuardState::Released => true, + GuardState::Unlimited { + quota, + granted_bytes, + } => { + if bytes > *granted_bytes { + return false; } + quota.sub_in_use(bytes); + *granted_bytes = granted_bytes.saturating_sub(bytes); + true + } + GuardState::Limited { quota, permit } => { let release_permits = quota.bytes_to_permits(bytes); match permit.split(release_permits as usize) { Some(released_permit) => { - let released_bytes = - quota.permits_to_bytes(released_permit.num_permits() as u32); - drop(released_permit); - quota.update_in_use_metric(); - debug!("Released {} bytes from memory guard", released_bytes); + quota.release_permit(released_permit); true } None => false, @@ -211,14 +265,7 @@ impl MemoryGuard { impl Drop for MemoryGuard { fn drop(&mut self) { - if let GuardState::Limited { permit, quota } = - mem::replace(&mut self.state, GuardState::Unlimited) - { - let bytes = quota.permits_to_bytes(permit.num_permits() as u32); - drop(permit); - quota.update_in_use_metric(); - debug!("Released memory: {} bytes", bytes); - } + mem::replace(&mut self.state, GuardState::Released).release(); } } diff --git a/src/common/memory-manager/src/manager.rs b/src/common/memory-manager/src/manager.rs index 50360d2a31..8cca5f220c 100644 --- a/src/common/memory-manager/src/manager.rs +++ b/src/common/memory-manager/src/manager.rs @@ -13,9 +13,10 @@ // limitations under the License. use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use snafu::ensure; -use tokio::sync::{Semaphore, TryAcquireError}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError}; use crate::error::{ MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result, @@ -34,7 +35,7 @@ pub trait MemoryMetrics: Clone + Send + Sync + 'static { /// Generic memory manager for quota-controlled operations. #[derive(Clone)] pub struct MemoryManager { - quota: Option>, + quota: MemoryQuotaState, } impl Default for MemoryManager { @@ -51,6 +52,18 @@ pub(crate) struct MemoryQuota { pub(crate) metrics: M, } +#[derive(Clone)] +pub(crate) struct UnlimitedMemoryQuota { + pub(crate) current_bytes: Arc, + pub(crate) metrics: M, +} + +#[derive(Clone)] +pub(crate) enum MemoryQuotaState { + Unlimited(UnlimitedMemoryQuota), + Limited(MemoryQuota), +} + impl MemoryManager { /// Creates a new memory manager with the given limit in bytes. /// `limit_bytes = 0` disables the limit. @@ -62,7 +75,12 @@ impl MemoryManager { pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self { if limit_bytes == 0 { metrics.set_limit(0); - return Self { quota: None }; + return Self { + quota: MemoryQuotaState::Unlimited(UnlimitedMemoryQuota { + current_bytes: Arc::new(AtomicU64::new(0)), + metrics, + }), + }; } let limit_permits = granularity.bytes_to_permits(limit_bytes); @@ -70,7 +88,7 @@ impl MemoryManager { metrics.set_limit(limit_aligned_bytes as i64); Self { - quota: Some(MemoryQuota { + quota: MemoryQuotaState::Limited(MemoryQuota { semaphore: Arc::new(Semaphore::new(limit_permits as usize)), limit_permits, granularity, @@ -81,26 +99,30 @@ impl MemoryManager { /// Returns the configured limit in bytes (0 if unlimited). pub fn limit_bytes(&self) -> u64 { - self.quota - .as_ref() - .map(|quota| quota.permits_to_bytes(quota.limit_permits)) - .unwrap_or(0) + match &self.quota { + MemoryQuotaState::Unlimited(_) => 0, + MemoryQuotaState::Limited(quota) => quota.permits_to_bytes(quota.limit_permits), + } } /// Returns currently used bytes. pub fn used_bytes(&self) -> u64 { - self.quota - .as_ref() - .map(|quota| quota.permits_to_bytes(quota.used_permits())) - .unwrap_or(0) + match &self.quota { + MemoryQuotaState::Unlimited(quota) => quota.current_bytes.load(Ordering::Acquire), + MemoryQuotaState::Limited(quota) => quota.permits_to_bytes(quota.used_permits()), + } } /// Returns available bytes. + /// + /// Unlimited managers report `u64::MAX`. pub fn available_bytes(&self) -> u64 { - self.quota - .as_ref() - .map(|quota| quota.permits_to_bytes(quota.available_permits_clamped())) - .unwrap_or(0) + match &self.quota { + MemoryQuotaState::Unlimited(_) => u64::MAX, + MemoryQuotaState::Limited(quota) => { + quota.permits_to_bytes(quota.available_permits_clamped()) + } + } } /// Acquires memory, waiting if necessary until enough is available. @@ -110,8 +132,8 @@ impl MemoryManager { /// - Returns error if the semaphore is unexpectedly closed pub async fn acquire(&self, bytes: u64) -> Result> { match &self.quota { - None => Ok(MemoryGuard::unlimited()), - Some(quota) => { + MemoryQuotaState::Unlimited(quota) => Ok(MemoryGuard::unlimited(quota.clone(), bytes)), + MemoryQuotaState::Limited(quota) => { let permits = quota.bytes_to_permits(bytes); ensure!( @@ -129,7 +151,7 @@ impl MemoryManager { .await .map_err(|_| MemorySemaphoreClosedSnafu.build())?; quota.update_in_use_metric(); - Ok(MemoryGuard::limited(permit, quota.clone())) + Ok(MemoryGuard::limited(quota.clone(), permit)) } } } @@ -137,14 +159,16 @@ impl MemoryManager { /// Tries to acquire memory. Returns Some(guard) on success, None if insufficient. pub fn try_acquire(&self, bytes: u64) -> Option> { match &self.quota { - None => Some(MemoryGuard::unlimited()), - Some(quota) => { + MemoryQuotaState::Unlimited(quota) => { + Some(MemoryGuard::unlimited(quota.clone(), bytes)) + } + MemoryQuotaState::Limited(quota) => { let permits = quota.bytes_to_permits(bytes); match quota.semaphore.clone().try_acquire_many_owned(permits) { Ok(permit) => { quota.update_in_use_metric(); - Some(MemoryGuard::limited(permit, quota.clone())) + Some(MemoryGuard::limited(quota.clone(), permit)) } Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => { quota.metrics.inc_rejected("try_acquire"); @@ -219,4 +243,49 @@ impl MemoryQuota { let bytes = self.permits_to_bytes(self.used_permits()); self.metrics.set_in_use(bytes as i64); } + + pub(crate) fn release_permit(&self, permit: OwnedSemaphorePermit) { + drop(permit); + self.update_in_use_metric(); + } +} + +impl UnlimitedMemoryQuota { + pub(crate) fn add_in_use(&self, bytes: u64) { + if bytes == 0 { + return; + } + + let previous = self + .current_bytes + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| { + Some(current.saturating_add(bytes)) + }) + .unwrap(); + let new_total = previous.saturating_add(bytes); + debug_assert!( + new_total >= previous, + "unlimited memory usage counter overflowed" + ); + self.metrics.set_in_use(new_total as i64); + } + + pub(crate) fn sub_in_use(&self, bytes: u64) { + if bytes == 0 { + return; + } + + let previous = self + .current_bytes + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| { + Some(current.saturating_sub(bytes)) + }) + .unwrap(); + debug_assert!( + previous >= bytes, + "unlimited memory usage counter underflowed: current={previous}, release={bytes}" + ); + let new_total = previous.saturating_sub(bytes); + self.metrics.set_in_use(new_total as i64); + } } diff --git a/src/common/memory-manager/src/tests.rs b/src/common/memory-manager/src/tests.rs index 886eef9dac..fe02703f0b 100644 --- a/src/common/memory-manager/src/tests.rs +++ b/src/common/memory-manager/src/tests.rs @@ -24,7 +24,9 @@ fn test_try_acquire_unlimited() { let manager = MemoryManager::new(0, NoOpMetrics); let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap(); assert_eq!(manager.limit_bytes(), 0); - assert_eq!(guard.granted_bytes(), 0); + assert_eq!(manager.available_bytes(), u64::MAX); + assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES); } #[test] @@ -136,7 +138,10 @@ fn test_request_additional_unlimited() { // Should always succeed with unlimited manager assert!(guard.try_acquire_additional(100 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 0); + assert_eq!(guard.granted_bytes(), 105 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 105 * PERMIT_GRANULARITY_BYTES); + + drop(guard); assert_eq!(manager.used_bytes(), 0); } @@ -187,9 +192,10 @@ fn test_early_release_partial_unlimited() { let manager = MemoryManager::new(0, NoOpMetrics); let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap(); - // Unlimited guard - release should succeed (no-op) + // Unlimited guard should track and release exact bytes. assert!(guard.release_partial(50 * PERMIT_GRANULARITY_BYTES)); - assert_eq!(guard.granted_bytes(), 0); + assert_eq!(guard.granted_bytes(), 50 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 50 * PERMIT_GRANULARITY_BYTES); } #[test] @@ -406,6 +412,6 @@ async fn test_acquire_additional_unlimited() { .acquire_additional(1000 * PERMIT_GRANULARITY_BYTES) .await .unwrap(); - assert_eq!(guard.granted_bytes(), 0); - assert_eq!(manager.used_bytes(), 0); + assert_eq!(guard.granted_bytes(), 1000 * PERMIT_GRANULARITY_BYTES); + assert_eq!(manager.used_bytes(), 1000 * PERMIT_GRANULARITY_BYTES); }