feat: track unlimited usage in memory manager (#7811)

* feat: track unlimited usage in memory manager

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: by gemini comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: remove unused import

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-03-15 20:52:27 -07:00
committed by GitHub
parent 306e8398cf
commit c6f1ef8aec
5 changed files with 192 additions and 72 deletions

1
Cargo.lock generated
View File

@@ -2488,7 +2488,6 @@ version = "1.0.0-rc.2"
dependencies = [
"common-error",
"common-macro",
"common-telemetry",
"humantime",
"serde",
"snafu 0.8.6",

View File

@@ -10,7 +10,6 @@ workspace = true
[dependencies]
common-error = { workspace = true }
common-macro = { workspace = true }
common-telemetry = { workspace = true }
humantime = { workspace = true }
serde = { workspace = true }
snafu = { workspace = true }

View File

@@ -14,14 +14,13 @@
use std::{fmt, mem};
use common_telemetry::debug;
use snafu::ensure;
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use crate::error::{
MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
};
use crate::manager::{MemoryMetrics, MemoryQuota};
use crate::manager::{MemoryMetrics, MemoryQuota, UnlimitedMemoryQuota};
use crate::policy::OnExhaustedPolicy;
/// Guard representing a slice of reserved memory.
@@ -30,31 +29,57 @@ pub struct MemoryGuard<M: MemoryMetrics> {
}
pub(crate) enum GuardState<M: MemoryMetrics> {
Unlimited,
Released,
Unlimited {
quota: UnlimitedMemoryQuota<M>,
granted_bytes: u64,
},
Limited {
permit: OwnedSemaphorePermit,
quota: MemoryQuota<M>,
permit: OwnedSemaphorePermit,
},
}
impl<M: MemoryMetrics> GuardState<M> {
fn release(self) {
match self {
GuardState::Released => {}
GuardState::Unlimited {
quota,
granted_bytes,
} => {
quota.sub_in_use(granted_bytes);
}
GuardState::Limited { quota, permit } => {
quota.release_permit(permit);
}
}
}
}
impl<M: MemoryMetrics> MemoryGuard<M> {
pub(crate) fn unlimited() -> Self {
pub(crate) fn unlimited(quota: UnlimitedMemoryQuota<M>, bytes: u64) -> Self {
quota.add_in_use(bytes);
Self {
state: GuardState::Unlimited,
state: GuardState::Unlimited {
quota,
granted_bytes: bytes,
},
}
}
pub(crate) fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota<M>) -> Self {
pub(crate) fn limited(quota: MemoryQuota<M>, permit: OwnedSemaphorePermit) -> Self {
Self {
state: GuardState::Limited { permit, quota },
state: GuardState::Limited { quota, permit },
}
}
/// Returns granted quota in bytes.
pub fn granted_bytes(&self) -> u64 {
match &self.state {
GuardState::Unlimited => 0,
GuardState::Limited { permit, quota } => {
GuardState::Released => 0,
GuardState::Unlimited { granted_bytes, .. } => *granted_bytes,
GuardState::Limited { quota, permit } => {
quota.permits_to_bytes(permit.num_permits() as u32)
}
}
@@ -68,13 +93,24 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
/// - Returns error if requested bytes would exceed the manager's total limit
/// - Returns error if the semaphore is unexpectedly closed
pub async fn acquire_additional(&mut self, bytes: u64) -> Result<()> {
match &mut self.state {
GuardState::Unlimited => Ok(()),
GuardState::Limited { permit, quota } => {
if bytes == 0 {
return Ok(());
}
if bytes == 0 {
return Ok(());
}
match &mut self.state {
GuardState::Released => {
debug_assert!(false, "released memory guard state should not be reused");
Ok(())
}
GuardState::Unlimited {
quota,
granted_bytes,
} => {
quota.add_in_use(bytes);
*granted_bytes = granted_bytes.saturating_add(bytes);
Ok(())
}
GuardState::Limited { quota, permit } => {
let additional_permits = quota.bytes_to_permits(bytes);
let current_permits = permit.num_permits() as u32;
@@ -95,7 +131,6 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
permit.merge(additional_permit);
quota.update_in_use_metric();
debug!("Acquired additional {} bytes", bytes);
Ok(())
}
}
@@ -106,13 +141,24 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
/// On success, merges the new memory into this guard and returns true.
/// On failure, returns false and leaves this guard unchanged.
pub fn try_acquire_additional(&mut self, bytes: u64) -> bool {
match &mut self.state {
GuardState::Unlimited => true,
GuardState::Limited { permit, quota } => {
if bytes == 0 {
return true;
}
if bytes == 0 {
return true;
}
match &mut self.state {
GuardState::Released => {
debug_assert!(false, "released memory guard state should not be reused");
false
}
GuardState::Unlimited {
quota,
granted_bytes,
} => {
quota.add_in_use(bytes);
*granted_bytes = granted_bytes.saturating_add(bytes);
true
}
GuardState::Limited { quota, permit } => {
let additional_permits = quota.bytes_to_permits(bytes);
match quota
@@ -123,7 +169,6 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
Ok(additional_permit) => {
permit.merge(additional_permit);
quota.update_in_use_metric();
debug!("Acquired additional {} bytes", bytes);
true
}
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
@@ -168,7 +213,8 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
MemoryLimitExceededSnafu {
requested_bytes: bytes,
limit_bytes: match &self.state {
GuardState::Unlimited => 0, // unreachable: unlimited mode always succeeds
GuardState::Released => 0,
GuardState::Unlimited { .. } => 0, // unreachable: unlimited mode always succeeds
GuardState::Limited { quota, .. } => {
quota.permits_to_bytes(quota.limit_permits)
}
@@ -184,22 +230,30 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
///
/// Returns true if the release succeeds or is a no-op; false if the request exceeds granted.
pub fn release_partial(&mut self, bytes: u64) -> bool {
if bytes == 0 {
return true;
}
match &mut self.state {
GuardState::Unlimited => true,
GuardState::Limited { permit, quota } => {
if bytes == 0 {
return true;
GuardState::Released => true,
GuardState::Unlimited {
quota,
granted_bytes,
} => {
if bytes > *granted_bytes {
return false;
}
quota.sub_in_use(bytes);
*granted_bytes = granted_bytes.saturating_sub(bytes);
true
}
GuardState::Limited { quota, permit } => {
let release_permits = quota.bytes_to_permits(bytes);
match permit.split(release_permits as usize) {
Some(released_permit) => {
let released_bytes =
quota.permits_to_bytes(released_permit.num_permits() as u32);
drop(released_permit);
quota.update_in_use_metric();
debug!("Released {} bytes from memory guard", released_bytes);
quota.release_permit(released_permit);
true
}
None => false,
@@ -211,14 +265,7 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
fn drop(&mut self) {
if let GuardState::Limited { permit, quota } =
mem::replace(&mut self.state, GuardState::Unlimited)
{
let bytes = quota.permits_to_bytes(permit.num_permits() as u32);
drop(permit);
quota.update_in_use_metric();
debug!("Released memory: {} bytes", bytes);
}
mem::replace(&mut self.state, GuardState::Released).release();
}
}

View File

@@ -13,9 +13,10 @@
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use snafu::ensure;
use tokio::sync::{Semaphore, TryAcquireError};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
use crate::error::{
MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
@@ -34,7 +35,7 @@ pub trait MemoryMetrics: Clone + Send + Sync + 'static {
/// Generic memory manager for quota-controlled operations.
#[derive(Clone)]
pub struct MemoryManager<M: MemoryMetrics> {
quota: Option<MemoryQuota<M>>,
quota: MemoryQuotaState<M>,
}
impl<M: MemoryMetrics + Default> Default for MemoryManager<M> {
@@ -51,6 +52,18 @@ pub(crate) struct MemoryQuota<M: MemoryMetrics> {
pub(crate) metrics: M,
}
#[derive(Clone)]
pub(crate) struct UnlimitedMemoryQuota<M: MemoryMetrics> {
pub(crate) current_bytes: Arc<AtomicU64>,
pub(crate) metrics: M,
}
#[derive(Clone)]
pub(crate) enum MemoryQuotaState<M: MemoryMetrics> {
Unlimited(UnlimitedMemoryQuota<M>),
Limited(MemoryQuota<M>),
}
impl<M: MemoryMetrics> MemoryManager<M> {
/// Creates a new memory manager with the given limit in bytes.
/// `limit_bytes = 0` disables the limit.
@@ -62,7 +75,12 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self {
if limit_bytes == 0 {
metrics.set_limit(0);
return Self { quota: None };
return Self {
quota: MemoryQuotaState::Unlimited(UnlimitedMemoryQuota {
current_bytes: Arc::new(AtomicU64::new(0)),
metrics,
}),
};
}
let limit_permits = granularity.bytes_to_permits(limit_bytes);
@@ -70,7 +88,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
metrics.set_limit(limit_aligned_bytes as i64);
Self {
quota: Some(MemoryQuota {
quota: MemoryQuotaState::Limited(MemoryQuota {
semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
limit_permits,
granularity,
@@ -81,26 +99,30 @@ impl<M: MemoryMetrics> MemoryManager<M> {
/// Returns the configured limit in bytes (0 if unlimited).
pub fn limit_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| quota.permits_to_bytes(quota.limit_permits))
.unwrap_or(0)
match &self.quota {
MemoryQuotaState::Unlimited(_) => 0,
MemoryQuotaState::Limited(quota) => quota.permits_to_bytes(quota.limit_permits),
}
}
/// Returns currently used bytes.
pub fn used_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| quota.permits_to_bytes(quota.used_permits()))
.unwrap_or(0)
match &self.quota {
MemoryQuotaState::Unlimited(quota) => quota.current_bytes.load(Ordering::Acquire),
MemoryQuotaState::Limited(quota) => quota.permits_to_bytes(quota.used_permits()),
}
}
/// Returns available bytes.
///
/// Unlimited managers report `u64::MAX`.
pub fn available_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| quota.permits_to_bytes(quota.available_permits_clamped()))
.unwrap_or(0)
match &self.quota {
MemoryQuotaState::Unlimited(_) => u64::MAX,
MemoryQuotaState::Limited(quota) => {
quota.permits_to_bytes(quota.available_permits_clamped())
}
}
}
/// Acquires memory, waiting if necessary until enough is available.
@@ -110,8 +132,8 @@ impl<M: MemoryMetrics> MemoryManager<M> {
/// - Returns error if the semaphore is unexpectedly closed
pub async fn acquire(&self, bytes: u64) -> Result<MemoryGuard<M>> {
match &self.quota {
None => Ok(MemoryGuard::unlimited()),
Some(quota) => {
MemoryQuotaState::Unlimited(quota) => Ok(MemoryGuard::unlimited(quota.clone(), bytes)),
MemoryQuotaState::Limited(quota) => {
let permits = quota.bytes_to_permits(bytes);
ensure!(
@@ -129,7 +151,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
.await
.map_err(|_| MemorySemaphoreClosedSnafu.build())?;
quota.update_in_use_metric();
Ok(MemoryGuard::limited(permit, quota.clone()))
Ok(MemoryGuard::limited(quota.clone(), permit))
}
}
}
@@ -137,14 +159,16 @@ impl<M: MemoryMetrics> MemoryManager<M> {
/// Tries to acquire memory. Returns Some(guard) on success, None if insufficient.
pub fn try_acquire(&self, bytes: u64) -> Option<MemoryGuard<M>> {
match &self.quota {
None => Some(MemoryGuard::unlimited()),
Some(quota) => {
MemoryQuotaState::Unlimited(quota) => {
Some(MemoryGuard::unlimited(quota.clone(), bytes))
}
MemoryQuotaState::Limited(quota) => {
let permits = quota.bytes_to_permits(bytes);
match quota.semaphore.clone().try_acquire_many_owned(permits) {
Ok(permit) => {
quota.update_in_use_metric();
Some(MemoryGuard::limited(permit, quota.clone()))
Some(MemoryGuard::limited(quota.clone(), permit))
}
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
quota.metrics.inc_rejected("try_acquire");
@@ -219,4 +243,49 @@ impl<M: MemoryMetrics> MemoryQuota<M> {
let bytes = self.permits_to_bytes(self.used_permits());
self.metrics.set_in_use(bytes as i64);
}
pub(crate) fn release_permit(&self, permit: OwnedSemaphorePermit) {
drop(permit);
self.update_in_use_metric();
}
}
impl<M: MemoryMetrics> UnlimitedMemoryQuota<M> {
pub(crate) fn add_in_use(&self, bytes: u64) {
if bytes == 0 {
return;
}
let previous = self
.current_bytes
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
Some(current.saturating_add(bytes))
})
.unwrap();
let new_total = previous.saturating_add(bytes);
debug_assert!(
new_total >= previous,
"unlimited memory usage counter overflowed"
);
self.metrics.set_in_use(new_total as i64);
}
pub(crate) fn sub_in_use(&self, bytes: u64) {
if bytes == 0 {
return;
}
let previous = self
.current_bytes
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |current| {
Some(current.saturating_sub(bytes))
})
.unwrap();
debug_assert!(
previous >= bytes,
"unlimited memory usage counter underflowed: current={previous}, release={bytes}"
);
let new_total = previous.saturating_sub(bytes);
self.metrics.set_in_use(new_total as i64);
}
}

View File

@@ -24,7 +24,9 @@ fn test_try_acquire_unlimited() {
let manager = MemoryManager::new(0, NoOpMetrics);
let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap();
assert_eq!(manager.limit_bytes(), 0);
assert_eq!(guard.granted_bytes(), 0);
assert_eq!(manager.available_bytes(), u64::MAX);
assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
}
#[test]
@@ -136,7 +138,10 @@ fn test_request_additional_unlimited() {
// Should always succeed with unlimited manager
assert!(guard.try_acquire_additional(100 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 0);
assert_eq!(guard.granted_bytes(), 105 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 105 * PERMIT_GRANULARITY_BYTES);
drop(guard);
assert_eq!(manager.used_bytes(), 0);
}
@@ -187,9 +192,10 @@ fn test_early_release_partial_unlimited() {
let manager = MemoryManager::new(0, NoOpMetrics);
let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap();
// Unlimited guard - release should succeed (no-op)
// Unlimited guard should track and release exact bytes.
assert!(guard.release_partial(50 * PERMIT_GRANULARITY_BYTES));
assert_eq!(guard.granted_bytes(), 0);
assert_eq!(guard.granted_bytes(), 50 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 50 * PERMIT_GRANULARITY_BYTES);
}
#[test]
@@ -406,6 +412,6 @@ async fn test_acquire_additional_unlimited() {
.acquire_additional(1000 * PERMIT_GRANULARITY_BYTES)
.await
.unwrap();
assert_eq!(guard.granted_bytes(), 0);
assert_eq!(manager.used_bytes(), 0);
assert_eq!(guard.granted_bytes(), 1000 * PERMIT_GRANULARITY_BYTES);
assert_eq!(manager.used_bytes(), 1000 * PERMIT_GRANULARITY_BYTES);
}