mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: introduce granularity for memory manager (#7416)
* feat: introduce granularity for memory manager Signed-off-by: jeremyhi <fengjiachun@gmail.com> * chore: add unit test Signed-off-by: jeremyhi <fengjiachun@gmail.com> * chore: remove granularity getter for mamanger Signed-off-by: jeremyhi <fengjiachun@gmail.com> * Update src/common/memory-manager/src/manager.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * feat: acquire_with_policy for manager Signed-off-by: jeremyhi <fengjiachun@gmail.com> --------- Signed-off-by: jeremyhi <fengjiachun@gmail.com> Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
@@ -35,6 +36,14 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Memory semaphore unexpectedly closed"))]
|
||||
MemorySemaphoreClosed,
|
||||
|
||||
#[snafu(display(
|
||||
"Timeout waiting for memory quota: requested {requested_bytes} bytes, waited {waited:?}"
|
||||
))]
|
||||
MemoryAcquireTimeout {
|
||||
requested_bytes: u64,
|
||||
waited: Duration,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -44,6 +53,7 @@ impl ErrorExt for Error {
|
||||
match self {
|
||||
MemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
MemorySemaphoreClosed => StatusCode::Unexpected,
|
||||
MemoryAcquireTimeout { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
168
src/common/memory-manager/src/granularity.rs
Normal file
168
src/common/memory-manager/src/granularity.rs
Normal file
@@ -0,0 +1,168 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
|
||||
/// Memory permit granularity for different use cases.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub enum PermitGranularity {
|
||||
/// 1 KB per permit
|
||||
///
|
||||
/// Use for:
|
||||
/// - HTTP/gRPC request limiting (small, high-concurrency operations)
|
||||
/// - Small batch operations
|
||||
/// - Scenarios requiring fine-grained fairness
|
||||
Kilobyte,
|
||||
|
||||
/// 1 MB per permit (default)
|
||||
///
|
||||
/// Use for:
|
||||
/// - Query execution memory management
|
||||
/// - Compaction memory control
|
||||
/// - Large, long-running operations
|
||||
#[default]
|
||||
Megabyte,
|
||||
}
|
||||
|
||||
impl PermitGranularity {
|
||||
/// Returns the number of bytes per permit.
|
||||
#[inline]
|
||||
pub const fn bytes(self) -> u64 {
|
||||
match self {
|
||||
Self::Kilobyte => 1024,
|
||||
Self::Megabyte => 1024 * 1024,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a human-readable string representation.
|
||||
pub const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Kilobyte => "1KB",
|
||||
Self::Megabyte => "1MB",
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts bytes to permits based on this granularity.
|
||||
///
|
||||
/// Rounds up to ensure the requested bytes are fully covered.
|
||||
/// Clamped to Semaphore::MAX_PERMITS.
|
||||
#[inline]
|
||||
pub fn bytes_to_permits(self, bytes: u64) -> u32 {
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
let granularity_bytes = self.bytes();
|
||||
bytes
|
||||
.saturating_add(granularity_bytes - 1)
|
||||
.saturating_div(granularity_bytes)
|
||||
.min(Semaphore::MAX_PERMITS as u64)
|
||||
.min(u32::MAX as u64) as u32
|
||||
}
|
||||
|
||||
/// Converts permits to bytes based on this granularity.
|
||||
#[inline]
|
||||
pub fn permits_to_bytes(self, permits: u32) -> u64 {
|
||||
(permits as u64).saturating_mul(self.bytes())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for PermitGranularity {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_permits_kilobyte() {
|
||||
let granularity = PermitGranularity::Kilobyte;
|
||||
|
||||
// Exact multiples
|
||||
assert_eq!(granularity.bytes_to_permits(1024), 1);
|
||||
assert_eq!(granularity.bytes_to_permits(2048), 2);
|
||||
assert_eq!(granularity.bytes_to_permits(10 * 1024), 10);
|
||||
|
||||
// Rounds up
|
||||
assert_eq!(granularity.bytes_to_permits(1), 1);
|
||||
assert_eq!(granularity.bytes_to_permits(1025), 2);
|
||||
assert_eq!(granularity.bytes_to_permits(2047), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_permits_megabyte() {
|
||||
let granularity = PermitGranularity::Megabyte;
|
||||
|
||||
// Exact multiples
|
||||
assert_eq!(granularity.bytes_to_permits(1024 * 1024), 1);
|
||||
assert_eq!(granularity.bytes_to_permits(2 * 1024 * 1024), 2);
|
||||
|
||||
// Rounds up
|
||||
assert_eq!(granularity.bytes_to_permits(1), 1);
|
||||
assert_eq!(granularity.bytes_to_permits(1024), 1);
|
||||
assert_eq!(granularity.bytes_to_permits(1024 * 1024 + 1), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_permits_zero_bytes() {
|
||||
assert_eq!(PermitGranularity::Kilobyte.bytes_to_permits(0), 0);
|
||||
assert_eq!(PermitGranularity::Megabyte.bytes_to_permits(0), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bytes_to_permits_clamps_to_maximum() {
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
let max_permits = (Semaphore::MAX_PERMITS as u64).min(u32::MAX as u64) as u32;
|
||||
|
||||
assert_eq!(
|
||||
PermitGranularity::Kilobyte.bytes_to_permits(u64::MAX),
|
||||
max_permits
|
||||
);
|
||||
assert_eq!(
|
||||
PermitGranularity::Megabyte.bytes_to_permits(u64::MAX),
|
||||
max_permits
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_permits_to_bytes() {
|
||||
assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(1), 1024);
|
||||
assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(10), 10 * 1024);
|
||||
|
||||
assert_eq!(PermitGranularity::Megabyte.permits_to_bytes(1), 1024 * 1024);
|
||||
assert_eq!(
|
||||
PermitGranularity::Megabyte.permits_to_bytes(10),
|
||||
10 * 1024 * 1024
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_round_trip_conversion() {
|
||||
// Kilobyte: bytes -> permits -> bytes (should round up)
|
||||
let kb = PermitGranularity::Kilobyte;
|
||||
let permits = kb.bytes_to_permits(1500);
|
||||
let bytes = kb.permits_to_bytes(permits);
|
||||
assert!(bytes >= 1500); // Must cover original request
|
||||
assert_eq!(bytes, 2048); // 2KB
|
||||
|
||||
// Megabyte: bytes -> permits -> bytes (should round up)
|
||||
let mb = PermitGranularity::Megabyte;
|
||||
let permits = mb.bytes_to_permits(1500);
|
||||
let bytes = mb.permits_to_bytes(permits);
|
||||
assert!(bytes >= 1500);
|
||||
assert_eq!(bytes, 1024 * 1024); // 1MB
|
||||
}
|
||||
}
|
||||
@@ -17,7 +17,7 @@ use std::{fmt, mem};
|
||||
use common_telemetry::debug;
|
||||
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
|
||||
|
||||
use crate::manager::{MemoryMetrics, MemoryQuota, bytes_to_permits, permits_to_bytes};
|
||||
use crate::manager::{MemoryMetrics, MemoryQuota};
|
||||
|
||||
/// Guard representing a slice of reserved memory.
|
||||
pub struct MemoryGuard<M: MemoryMetrics> {
|
||||
@@ -49,7 +49,9 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
|
||||
pub fn granted_bytes(&self) -> u64 {
|
||||
match &self.state {
|
||||
GuardState::Unlimited => 0,
|
||||
GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
|
||||
GuardState::Limited { permit, quota } => {
|
||||
quota.permits_to_bytes(permit.num_permits() as u32)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +67,7 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
|
||||
return true;
|
||||
}
|
||||
|
||||
let additional_permits = bytes_to_permits(bytes);
|
||||
let additional_permits = quota.bytes_to_permits(bytes);
|
||||
|
||||
match quota
|
||||
.semaphore
|
||||
@@ -99,11 +101,12 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
|
||||
return true;
|
||||
}
|
||||
|
||||
let release_permits = bytes_to_permits(bytes);
|
||||
let release_permits = quota.bytes_to_permits(bytes);
|
||||
|
||||
match permit.split(release_permits as usize) {
|
||||
Some(released_permit) => {
|
||||
let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
|
||||
let released_bytes =
|
||||
quota.permits_to_bytes(released_permit.num_permits() as u32);
|
||||
drop(released_permit);
|
||||
quota.update_in_use_metric();
|
||||
debug!("Early released {} bytes from memory guard", released_bytes);
|
||||
@@ -121,7 +124,7 @@ impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
|
||||
if let GuardState::Limited { permit, quota } =
|
||||
mem::replace(&mut self.state, GuardState::Unlimited)
|
||||
{
|
||||
let bytes = permits_to_bytes(permit.num_permits() as u32);
|
||||
let bytes = quota.permits_to_bytes(permit.num_permits() as u32);
|
||||
drop(permit);
|
||||
quota.update_in_use_metric();
|
||||
debug!("Released memory: {} bytes", bytes);
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
//! share the same allocation logic while using their own metrics.
|
||||
|
||||
mod error;
|
||||
mod granularity;
|
||||
mod guard;
|
||||
mod manager;
|
||||
mod policy;
|
||||
@@ -27,8 +28,9 @@ mod policy;
|
||||
mod tests;
|
||||
|
||||
pub use error::{Error, Result};
|
||||
pub use granularity::PermitGranularity;
|
||||
pub use guard::MemoryGuard;
|
||||
pub use manager::{MemoryManager, MemoryMetrics, PERMIT_GRANULARITY_BYTES};
|
||||
pub use manager::{MemoryManager, MemoryMetrics};
|
||||
pub use policy::{DEFAULT_MEMORY_WAIT_TIMEOUT, OnExhaustedPolicy};
|
||||
|
||||
/// No-op metrics implementation for testing.
|
||||
|
||||
@@ -17,11 +17,12 @@ use std::sync::Arc;
|
||||
use snafu::ensure;
|
||||
use tokio::sync::{Semaphore, TryAcquireError};
|
||||
|
||||
use crate::error::{MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result};
|
||||
use crate::error::{
|
||||
MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
|
||||
};
|
||||
use crate::granularity::PermitGranularity;
|
||||
use crate::guard::MemoryGuard;
|
||||
|
||||
/// Minimum bytes controlled by one semaphore permit.
|
||||
pub const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
|
||||
use crate::policy::OnExhaustedPolicy;
|
||||
|
||||
/// Trait for recording memory usage metrics.
|
||||
pub trait MemoryMetrics: Clone + Send + Sync + 'static {
|
||||
@@ -40,6 +41,7 @@ pub struct MemoryManager<M: MemoryMetrics> {
|
||||
pub(crate) struct MemoryQuota<M: MemoryMetrics> {
|
||||
pub(crate) semaphore: Arc<Semaphore>,
|
||||
pub(crate) limit_permits: u32,
|
||||
pub(crate) granularity: PermitGranularity,
|
||||
pub(crate) metrics: M,
|
||||
}
|
||||
|
||||
@@ -47,19 +49,25 @@ impl<M: MemoryMetrics> MemoryManager<M> {
|
||||
/// Creates a new memory manager with the given limit in bytes.
|
||||
/// `limit_bytes = 0` disables the limit.
|
||||
pub fn new(limit_bytes: u64, metrics: M) -> Self {
|
||||
Self::with_granularity(limit_bytes, PermitGranularity::default(), metrics)
|
||||
}
|
||||
|
||||
/// Creates a new memory manager with specified granularity.
|
||||
pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self {
|
||||
if limit_bytes == 0 {
|
||||
metrics.set_limit(0);
|
||||
return Self { quota: None };
|
||||
}
|
||||
|
||||
let limit_permits = bytes_to_permits(limit_bytes);
|
||||
let limit_aligned_bytes = permits_to_bytes(limit_permits);
|
||||
let limit_permits = granularity.bytes_to_permits(limit_bytes);
|
||||
let limit_aligned_bytes = granularity.permits_to_bytes(limit_permits);
|
||||
metrics.set_limit(limit_aligned_bytes as i64);
|
||||
|
||||
Self {
|
||||
quota: Some(MemoryQuota {
|
||||
semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
|
||||
limit_permits,
|
||||
granularity,
|
||||
metrics,
|
||||
}),
|
||||
}
|
||||
@@ -69,7 +77,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
|
||||
pub fn limit_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.limit_permits))
|
||||
.map(|quota| quota.permits_to_bytes(quota.limit_permits))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
@@ -77,7 +85,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
|
||||
pub fn used_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.used_permits()))
|
||||
.map(|quota| quota.permits_to_bytes(quota.used_permits()))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
@@ -85,7 +93,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
|
||||
pub fn available_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.available_permits_clamped()))
|
||||
.map(|quota| quota.permits_to_bytes(quota.available_permits_clamped()))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
@@ -98,13 +106,13 @@ impl<M: MemoryMetrics> MemoryManager<M> {
|
||||
match &self.quota {
|
||||
None => Ok(MemoryGuard::unlimited()),
|
||||
Some(quota) => {
|
||||
let permits = bytes_to_permits(bytes);
|
||||
let permits = quota.bytes_to_permits(bytes);
|
||||
|
||||
ensure!(
|
||||
permits <= quota.limit_permits,
|
||||
MemoryLimitExceededSnafu {
|
||||
requested_bytes: bytes,
|
||||
limit_bytes: permits_to_bytes(quota.limit_permits),
|
||||
limit_bytes: self.limit_bytes()
|
||||
}
|
||||
);
|
||||
|
||||
@@ -125,7 +133,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
|
||||
match &self.quota {
|
||||
None => Some(MemoryGuard::unlimited()),
|
||||
Some(quota) => {
|
||||
let permits = bytes_to_permits(bytes);
|
||||
let permits = quota.bytes_to_permits(bytes);
|
||||
|
||||
match quota.semaphore.clone().try_acquire_many_owned(permits) {
|
||||
Ok(permit) => {
|
||||
@@ -140,9 +148,56 @@ impl<M: MemoryMetrics> MemoryManager<M> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Acquires memory based on the given policy.
|
||||
///
|
||||
/// - For `OnExhaustedPolicy::Wait`: Waits up to the timeout duration for memory to become available
|
||||
/// - For `OnExhaustedPolicy::Fail`: Returns immediately if memory is not available
|
||||
///
|
||||
/// # Errors
|
||||
/// - `MemoryLimitExceeded`: Requested bytes exceed the total limit (both policies), or memory is currently exhausted (Fail policy only)
|
||||
/// - `MemoryAcquireTimeout`: Timeout elapsed while waiting for memory (Wait policy only)
|
||||
/// - `MemorySemaphoreClosed`: The internal semaphore is unexpectedly closed (rare, indicates system issue)
|
||||
pub async fn acquire_with_policy(
|
||||
&self,
|
||||
bytes: u64,
|
||||
policy: OnExhaustedPolicy,
|
||||
) -> Result<MemoryGuard<M>> {
|
||||
match policy {
|
||||
OnExhaustedPolicy::Wait { timeout } => {
|
||||
match tokio::time::timeout(timeout, self.acquire(bytes)).await {
|
||||
Ok(Ok(guard)) => Ok(guard),
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(_elapsed) => {
|
||||
// Timeout elapsed while waiting
|
||||
MemoryAcquireTimeoutSnafu {
|
||||
requested_bytes: bytes,
|
||||
waited: timeout,
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
OnExhaustedPolicy::Fail => self.try_acquire(bytes).ok_or_else(|| {
|
||||
MemoryLimitExceededSnafu {
|
||||
requested_bytes: bytes,
|
||||
limit_bytes: self.limit_bytes(),
|
||||
}
|
||||
.build()
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: MemoryMetrics> MemoryQuota<M> {
|
||||
pub(crate) fn bytes_to_permits(&self, bytes: u64) -> u32 {
|
||||
self.granularity.bytes_to_permits(bytes)
|
||||
}
|
||||
|
||||
pub(crate) fn permits_to_bytes(&self, permits: u32) -> u64 {
|
||||
self.granularity.permits_to_bytes(permits)
|
||||
}
|
||||
|
||||
pub(crate) fn used_permits(&self) -> u32 {
|
||||
self.limit_permits
|
||||
.saturating_sub(self.available_permits_clamped())
|
||||
@@ -155,19 +210,7 @@ impl<M: MemoryMetrics> MemoryQuota<M> {
|
||||
}
|
||||
|
||||
pub(crate) fn update_in_use_metric(&self) {
|
||||
let bytes = permits_to_bytes(self.used_permits());
|
||||
let bytes = self.permits_to_bytes(self.used_permits());
|
||||
self.metrics.set_in_use(bytes as i64);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn bytes_to_permits(bytes: u64) -> u32 {
|
||||
bytes
|
||||
.saturating_add(PERMIT_GRANULARITY_BYTES - 1)
|
||||
.saturating_div(PERMIT_GRANULARITY_BYTES)
|
||||
.min(Semaphore::MAX_PERMITS as u64)
|
||||
.min(u32::MAX as u64) as u32
|
||||
}
|
||||
|
||||
pub(crate) fn permits_to_bytes(permits: u32) -> u64 {
|
||||
(permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
|
||||
}
|
||||
|
||||
@@ -14,7 +14,10 @@
|
||||
|
||||
use tokio::time::{Duration, sleep};
|
||||
|
||||
use crate::{MemoryManager, NoOpMetrics, PERMIT_GRANULARITY_BYTES};
|
||||
use crate::{MemoryManager, NoOpMetrics, PermitGranularity};
|
||||
|
||||
// Helper constant for tests - use default Megabyte granularity
|
||||
const PERMIT_GRANULARITY_BYTES: u64 = PermitGranularity::Megabyte.bytes();
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_unlimited() {
|
||||
|
||||
@@ -25,7 +25,7 @@ use tokio::sync::mpsc;
|
||||
use crate::compaction::compactor::{CompactionRegion, Compactor};
|
||||
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
|
||||
use crate::compaction::picker::{CompactionTask, PickerOutput};
|
||||
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu, MemoryAcquireFailedSnafu};
|
||||
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
|
||||
use crate::region::RegionRoleState;
|
||||
@@ -95,80 +95,16 @@ impl CompactionTaskImpl {
|
||||
async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
|
||||
let region_id = self.compaction_region.region_id;
|
||||
let requested_bytes = self.estimated_memory_bytes;
|
||||
let limit_bytes = self.memory_manager.limit_bytes();
|
||||
let policy = self.memory_policy;
|
||||
|
||||
if limit_bytes > 0 && requested_bytes > limit_bytes {
|
||||
warn!(
|
||||
"Compaction for region {} requires {} bytes but limit is {} bytes; cannot satisfy request",
|
||||
region_id, requested_bytes, limit_bytes
|
||||
);
|
||||
return Err(CompactionMemoryExhaustedSnafu {
|
||||
let _timer = COMPACTION_MEMORY_WAIT.start_timer();
|
||||
self.memory_manager
|
||||
.acquire_with_policy(requested_bytes, policy)
|
||||
.await
|
||||
.context(CompactionMemoryExhaustedSnafu {
|
||||
region_id,
|
||||
required_bytes: requested_bytes,
|
||||
limit_bytes,
|
||||
policy: "exceed_limit".to_string(),
|
||||
}
|
||||
.build());
|
||||
}
|
||||
|
||||
match self.memory_policy {
|
||||
OnExhaustedPolicy::Wait {
|
||||
timeout: wait_timeout,
|
||||
} => {
|
||||
let timer = COMPACTION_MEMORY_WAIT.start_timer();
|
||||
|
||||
match tokio::time::timeout(
|
||||
wait_timeout,
|
||||
self.memory_manager.acquire(requested_bytes),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(guard)) => {
|
||||
timer.observe_duration();
|
||||
Ok(guard)
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
timer.observe_duration();
|
||||
Err(e).with_context(|_| MemoryAcquireFailedSnafu {
|
||||
region_id,
|
||||
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
|
||||
})
|
||||
}
|
||||
Err(_) => {
|
||||
timer.observe_duration();
|
||||
warn!(
|
||||
"Compaction for region {} waited {:?} for {} bytes but timed out",
|
||||
region_id, wait_timeout, requested_bytes
|
||||
);
|
||||
CompactionMemoryExhaustedSnafu {
|
||||
region_id,
|
||||
required_bytes: requested_bytes,
|
||||
limit_bytes,
|
||||
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
OnExhaustedPolicy::Fail => {
|
||||
// Try to acquire, fail immediately if not available
|
||||
self.memory_manager
|
||||
.try_acquire(requested_bytes)
|
||||
.ok_or_else(|| {
|
||||
warn!(
|
||||
"Compaction memory exhausted for region {} (policy=fail, need {} bytes, limit {} bytes)",
|
||||
region_id, requested_bytes, limit_bytes
|
||||
);
|
||||
CompactionMemoryExhaustedSnafu {
|
||||
region_id,
|
||||
required_bytes: requested_bytes,
|
||||
limit_bytes,
|
||||
policy: "fail".to_string(),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
}
|
||||
}
|
||||
policy: format!("{policy:?}"),
|
||||
})
|
||||
}
|
||||
|
||||
/// Remove expired ssts files, update manifest immediately
|
||||
|
||||
@@ -1042,20 +1042,8 @@ pub enum Error {
|
||||
#[snafu(display("Manual compaction is override by following operations."))]
|
||||
ManualCompactionOverride {},
|
||||
|
||||
#[snafu(display(
|
||||
"Compaction memory limit exceeded for region {region_id}: required {required_bytes} bytes, limit {limit_bytes} bytes (policy: {policy})",
|
||||
))]
|
||||
#[snafu(display("Compaction memory exhausted for region {region_id} (policy: {policy})",))]
|
||||
CompactionMemoryExhausted {
|
||||
region_id: RegionId,
|
||||
required_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
policy: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to acquire memory for region {region_id} (policy: {policy})"))]
|
||||
MemoryAcquireFailed {
|
||||
region_id: RegionId,
|
||||
policy: String,
|
||||
#[snafu(source)]
|
||||
@@ -1359,9 +1347,7 @@ impl ErrorExt for Error {
|
||||
|
||||
ManualCompactionOverride {} => StatusCode::Cancelled,
|
||||
|
||||
CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
MemoryAcquireFailed { source, .. } => source.status_code(),
|
||||
CompactionMemoryExhausted { source, .. } => source.status_code(),
|
||||
|
||||
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
|
||||
Reference in New Issue
Block a user