common_memory_manager/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use snafu::ensure;
18use tokio::sync::{Semaphore, TryAcquireError};
19
20use crate::error::{
21    MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
22};
23use crate::granularity::PermitGranularity;
24use crate::guard::MemoryGuard;
25use crate::policy::OnExhaustedPolicy;
26
27/// Trait for recording memory usage metrics.
28pub trait MemoryMetrics: Clone + Send + Sync + 'static {
29    fn set_limit(&self, bytes: i64);
30    fn set_in_use(&self, bytes: i64);
31    fn inc_rejected(&self, reason: &str);
32}
33
34/// Generic memory manager for quota-controlled operations.
35#[derive(Clone)]
36pub struct MemoryManager<M: MemoryMetrics> {
37    quota: Option<MemoryQuota<M>>,
38}
39
40impl<M: MemoryMetrics + Default> Default for MemoryManager<M> {
41    fn default() -> Self {
42        Self::new(0, M::default())
43    }
44}
45
46#[derive(Clone)]
47pub(crate) struct MemoryQuota<M: MemoryMetrics> {
48    pub(crate) semaphore: Arc<Semaphore>,
49    pub(crate) limit_permits: u32,
50    pub(crate) granularity: PermitGranularity,
51    pub(crate) metrics: M,
52}
53
54impl<M: MemoryMetrics> MemoryManager<M> {
55    /// Creates a new memory manager with the given limit in bytes.
56    /// `limit_bytes = 0` disables the limit.
57    pub fn new(limit_bytes: u64, metrics: M) -> Self {
58        Self::with_granularity(limit_bytes, PermitGranularity::default(), metrics)
59    }
60
61    /// Creates a new memory manager with specified granularity.
62    pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self {
63        if limit_bytes == 0 {
64            metrics.set_limit(0);
65            return Self { quota: None };
66        }
67
68        let limit_permits = granularity.bytes_to_permits(limit_bytes);
69        let limit_aligned_bytes = granularity.permits_to_bytes(limit_permits);
70        metrics.set_limit(limit_aligned_bytes as i64);
71
72        Self {
73            quota: Some(MemoryQuota {
74                semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
75                limit_permits,
76                granularity,
77                metrics,
78            }),
79        }
80    }
81
82    /// Returns the configured limit in bytes (0 if unlimited).
83    pub fn limit_bytes(&self) -> u64 {
84        self.quota
85            .as_ref()
86            .map(|quota| quota.permits_to_bytes(quota.limit_permits))
87            .unwrap_or(0)
88    }
89
90    /// Returns currently used bytes.
91    pub fn used_bytes(&self) -> u64 {
92        self.quota
93            .as_ref()
94            .map(|quota| quota.permits_to_bytes(quota.used_permits()))
95            .unwrap_or(0)
96    }
97
98    /// Returns available bytes.
99    pub fn available_bytes(&self) -> u64 {
100        self.quota
101            .as_ref()
102            .map(|quota| quota.permits_to_bytes(quota.available_permits_clamped()))
103            .unwrap_or(0)
104    }
105
106    /// Acquires memory, waiting if necessary until enough is available.
107    ///
108    /// # Errors
109    /// - Returns error if requested bytes exceed the total limit
110    /// - Returns error if the semaphore is unexpectedly closed
111    pub async fn acquire(&self, bytes: u64) -> Result<MemoryGuard<M>> {
112        match &self.quota {
113            None => Ok(MemoryGuard::unlimited()),
114            Some(quota) => {
115                let permits = quota.bytes_to_permits(bytes);
116
117                ensure!(
118                    permits <= quota.limit_permits,
119                    MemoryLimitExceededSnafu {
120                        requested_bytes: bytes,
121                        limit_bytes: self.limit_bytes()
122                    }
123                );
124
125                let permit = quota
126                    .semaphore
127                    .clone()
128                    .acquire_many_owned(permits)
129                    .await
130                    .map_err(|_| MemorySemaphoreClosedSnafu.build())?;
131                quota.update_in_use_metric();
132                Ok(MemoryGuard::limited(permit, quota.clone()))
133            }
134        }
135    }
136
137    /// Tries to acquire memory. Returns Some(guard) on success, None if insufficient.
138    pub fn try_acquire(&self, bytes: u64) -> Option<MemoryGuard<M>> {
139        match &self.quota {
140            None => Some(MemoryGuard::unlimited()),
141            Some(quota) => {
142                let permits = quota.bytes_to_permits(bytes);
143
144                match quota.semaphore.clone().try_acquire_many_owned(permits) {
145                    Ok(permit) => {
146                        quota.update_in_use_metric();
147                        Some(MemoryGuard::limited(permit, quota.clone()))
148                    }
149                    Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
150                        quota.metrics.inc_rejected("try_acquire");
151                        None
152                    }
153                }
154            }
155        }
156    }
157
158    /// Acquires memory based on the given policy.
159    ///
160    /// - For `OnExhaustedPolicy::Wait`: Waits up to the timeout duration for memory to become available
161    /// - For `OnExhaustedPolicy::Fail`: Returns immediately if memory is not available
162    ///
163    /// # Errors
164    /// - `MemoryLimitExceeded`: Requested bytes exceed the total limit (both policies), or memory is currently exhausted (Fail policy only)
165    /// - `MemoryAcquireTimeout`: Timeout elapsed while waiting for memory (Wait policy only)
166    /// - `MemorySemaphoreClosed`: The internal semaphore is unexpectedly closed (rare, indicates system issue)
167    pub async fn acquire_with_policy(
168        &self,
169        bytes: u64,
170        policy: OnExhaustedPolicy,
171    ) -> Result<MemoryGuard<M>> {
172        match policy {
173            OnExhaustedPolicy::Wait { timeout } => {
174                match tokio::time::timeout(timeout, self.acquire(bytes)).await {
175                    Ok(Ok(guard)) => Ok(guard),
176                    Ok(Err(e)) => Err(e),
177                    Err(_elapsed) => {
178                        // Timeout elapsed while waiting
179                        MemoryAcquireTimeoutSnafu {
180                            requested_bytes: bytes,
181                            waited: timeout,
182                        }
183                        .fail()
184                    }
185                }
186            }
187            OnExhaustedPolicy::Fail => self.try_acquire(bytes).ok_or_else(|| {
188                MemoryLimitExceededSnafu {
189                    requested_bytes: bytes,
190                    limit_bytes: self.limit_bytes(),
191                }
192                .build()
193            }),
194        }
195    }
196}
197
198impl<M: MemoryMetrics> MemoryQuota<M> {
199    pub(crate) fn bytes_to_permits(&self, bytes: u64) -> u32 {
200        self.granularity.bytes_to_permits(bytes)
201    }
202
203    pub(crate) fn permits_to_bytes(&self, permits: u32) -> u64 {
204        self.granularity.permits_to_bytes(permits)
205    }
206
207    pub(crate) fn used_permits(&self) -> u32 {
208        self.limit_permits
209            .saturating_sub(self.available_permits_clamped())
210    }
211
212    pub(crate) fn available_permits_clamped(&self) -> u32 {
213        self.semaphore
214            .available_permits()
215            .min(self.limit_permits as usize) as u32
216    }
217
218    pub(crate) fn update_in_use_metric(&self) {
219        let bytes = self.permits_to_bytes(self.used_permits());
220        self.metrics.set_in_use(bytes as i64);
221    }
222}