mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: move memory_manager to common crate (#7408)
* feat: move memory_manager to common crate Signed-off-by: jeremyhi <fengjiachun@gmail.com> * chore: add license header Signed-off-by: jeremyhi <fengjiachun@gmail.com> * fix: by AI comment Signed-off-by: jeremyhi <fengjiachun@gmail.com> --------- Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
14
Cargo.lock
generated
14
Cargo.lock
generated
@@ -2328,6 +2328,19 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-memory-manager"
|
||||
version = "1.0.0-beta.2"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-telemetry",
|
||||
"humantime",
|
||||
"serde",
|
||||
"snafu 0.8.6",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-meta"
|
||||
version = "1.0.0-beta.2"
|
||||
@@ -7651,6 +7664,7 @@ dependencies = [
|
||||
"common-function",
|
||||
"common-grpc",
|
||||
"common-macro",
|
||||
"common-memory-manager",
|
||||
"common-meta",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
|
||||
@@ -21,6 +21,7 @@ members = [
|
||||
"src/common/grpc-expr",
|
||||
"src/common/macro",
|
||||
"src/common/mem-prof",
|
||||
"src/common/memory-manager",
|
||||
"src/common/meta",
|
||||
"src/common/options",
|
||||
"src/common/plugins",
|
||||
@@ -266,6 +267,7 @@ common-grpc = { path = "src/common/grpc" }
|
||||
common-grpc-expr = { path = "src/common/grpc-expr" }
|
||||
common-macro = { path = "src/common/macro" }
|
||||
common-mem-prof = { path = "src/common/mem-prof" }
|
||||
common-memory-manager = { path = "src/common/memory-manager" }
|
||||
common-meta = { path = "src/common/meta" }
|
||||
common-options = { path = "src/common/options" }
|
||||
common-plugins = { path = "src/common/plugins" }
|
||||
|
||||
@@ -142,7 +142,7 @@
|
||||
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
|
||||
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
|
||||
| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
|
||||
| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.<br/>Options: "wait" (default, 10s), "wait(<duration>)", "skip" |
|
||||
| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.<br/>Options: "wait" (default, 10s), "wait(<duration>)", "fail" |
|
||||
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
|
||||
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
|
||||
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. |
|
||||
@@ -524,7 +524,7 @@
|
||||
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
|
||||
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
|
||||
| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
|
||||
| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.<br/>Options: "wait" (default, 10s), "wait(<duration>)", "skip" |
|
||||
| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.<br/>Options: "wait" (default, 10s), "wait(<duration>)", "fail" |
|
||||
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
|
||||
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
|
||||
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` |
|
||||
|
||||
@@ -457,7 +457,7 @@ compress_manifest = false
|
||||
#+ experimental_compaction_memory_limit = "0"
|
||||
|
||||
## Behavior when compaction cannot acquire memory from the budget.
|
||||
## Options: "wait" (default, 10s), "wait(<duration>)", "skip"
|
||||
## Options: "wait" (default, 10s), "wait(<duration>)", "fail"
|
||||
## @toml2docs:none-default="wait"
|
||||
#+ experimental_compaction_on_exhausted = "wait"
|
||||
|
||||
|
||||
@@ -551,7 +551,7 @@ compress_manifest = false
|
||||
#+ experimental_compaction_memory_limit = "0"
|
||||
|
||||
## Behavior when compaction cannot acquire memory from the budget.
|
||||
## Options: "wait" (default, 10s), "wait(<duration>)", "skip"
|
||||
## Options: "wait" (default, 10s), "wait(<duration>)", "fail"
|
||||
## @toml2docs:none-default="wait"
|
||||
#+ experimental_compaction_on_exhausted = "wait"
|
||||
|
||||
|
||||
20
src/common/memory-manager/Cargo.toml
Normal file
20
src/common/memory-manager/Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "common-memory-manager"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-error = { workspace = true }
|
||||
common-macro = { workspace = true }
|
||||
common-telemetry = { workspace = true }
|
||||
humantime = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
snafu = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["rt", "macros"] }
|
||||
53
src/common/memory-manager/src/error.rs
Normal file
53
src/common/memory-manager/src/error.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use snafu::Snafu;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
"Memory limit exceeded: requested {requested_bytes} bytes, limit {limit_bytes} bytes"
|
||||
))]
|
||||
MemoryLimitExceeded {
|
||||
requested_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
},
|
||||
|
||||
#[snafu(display("Memory semaphore unexpectedly closed"))]
|
||||
MemorySemaphoreClosed,
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
|
||||
match self {
|
||||
MemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
MemorySemaphoreClosed => StatusCode::Unexpected,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
138
src/common/memory-manager/src/guard.rs
Normal file
138
src/common/memory-manager/src/guard.rs
Normal file
@@ -0,0 +1,138 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::{fmt, mem};
|
||||
|
||||
use common_telemetry::debug;
|
||||
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
|
||||
|
||||
use crate::manager::{MemoryMetrics, MemoryQuota, bytes_to_permits, permits_to_bytes};
|
||||
|
||||
/// Guard representing a slice of reserved memory.
|
||||
pub struct MemoryGuard<M: MemoryMetrics> {
|
||||
pub(crate) state: GuardState<M>,
|
||||
}
|
||||
|
||||
pub(crate) enum GuardState<M: MemoryMetrics> {
|
||||
Unlimited,
|
||||
Limited {
|
||||
permit: OwnedSemaphorePermit,
|
||||
quota: MemoryQuota<M>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<M: MemoryMetrics> MemoryGuard<M> {
|
||||
pub(crate) fn unlimited() -> Self {
|
||||
Self {
|
||||
state: GuardState::Unlimited,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota<M>) -> Self {
|
||||
Self {
|
||||
state: GuardState::Limited { permit, quota },
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns granted quota in bytes.
|
||||
pub fn granted_bytes(&self) -> u64 {
|
||||
match &self.state {
|
||||
GuardState::Unlimited => 0,
|
||||
GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to allocate additional memory during task execution.
|
||||
///
|
||||
/// On success, merges the new memory into this guard and returns true.
|
||||
/// On failure, returns false and leaves this guard unchanged.
|
||||
pub fn request_additional(&mut self, bytes: u64) -> bool {
|
||||
match &mut self.state {
|
||||
GuardState::Unlimited => true,
|
||||
GuardState::Limited { permit, quota } => {
|
||||
if bytes == 0 {
|
||||
return true;
|
||||
}
|
||||
|
||||
let additional_permits = bytes_to_permits(bytes);
|
||||
|
||||
match quota
|
||||
.semaphore
|
||||
.clone()
|
||||
.try_acquire_many_owned(additional_permits)
|
||||
{
|
||||
Ok(additional_permit) => {
|
||||
permit.merge(additional_permit);
|
||||
quota.update_in_use_metric();
|
||||
debug!("Allocated additional {} bytes", bytes);
|
||||
true
|
||||
}
|
||||
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
|
||||
quota.metrics.inc_rejected("request_additional");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Releases a portion of granted memory back to the pool early,
|
||||
/// before the guard is dropped.
|
||||
///
|
||||
/// Returns true if the release succeeds or is a no-op; false if the request exceeds granted.
|
||||
pub fn early_release_partial(&mut self, bytes: u64) -> bool {
|
||||
match &mut self.state {
|
||||
GuardState::Unlimited => true,
|
||||
GuardState::Limited { permit, quota } => {
|
||||
if bytes == 0 {
|
||||
return true;
|
||||
}
|
||||
|
||||
let release_permits = bytes_to_permits(bytes);
|
||||
|
||||
match permit.split(release_permits as usize) {
|
||||
Some(released_permit) => {
|
||||
let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
|
||||
drop(released_permit);
|
||||
quota.update_in_use_metric();
|
||||
debug!("Early released {} bytes from memory guard", released_bytes);
|
||||
true
|
||||
}
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
|
||||
fn drop(&mut self) {
|
||||
if let GuardState::Limited { permit, quota } =
|
||||
mem::replace(&mut self.state, GuardState::Unlimited)
|
||||
{
|
||||
let bytes = permits_to_bytes(permit.num_permits() as u32);
|
||||
drop(permit);
|
||||
quota.update_in_use_metric();
|
||||
debug!("Released memory: {} bytes", bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: MemoryMetrics> fmt::Debug for MemoryGuard<M> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("MemoryGuard")
|
||||
.field("granted_bytes", &self.granted_bytes())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
47
src/common/memory-manager/src/lib.rs
Normal file
47
src/common/memory-manager/src/lib.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Generic memory management for resource-constrained operations.
|
||||
//!
|
||||
//! This crate provides a reusable memory quota system based on semaphores,
|
||||
//! allowing different subsystems (compaction, flush, index build, etc.) to
|
||||
//! share the same allocation logic while using their own metrics.
|
||||
|
||||
mod error;
|
||||
mod guard;
|
||||
mod manager;
|
||||
mod policy;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub use error::{Error, Result};
|
||||
pub use guard::MemoryGuard;
|
||||
pub use manager::{MemoryManager, MemoryMetrics, PERMIT_GRANULARITY_BYTES};
|
||||
pub use policy::{DEFAULT_MEMORY_WAIT_TIMEOUT, OnExhaustedPolicy};
|
||||
|
||||
/// No-op metrics implementation for testing.
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
pub struct NoOpMetrics;
|
||||
|
||||
impl MemoryMetrics for NoOpMetrics {
|
||||
#[inline(always)]
|
||||
fn set_limit(&self, _: i64) {}
|
||||
|
||||
#[inline(always)]
|
||||
fn set_in_use(&self, _: i64) {}
|
||||
|
||||
#[inline(always)]
|
||||
fn inc_rejected(&self, _: &str) {}
|
||||
}
|
||||
173
src/common/memory-manager/src/manager.rs
Normal file
173
src/common/memory-manager/src/manager.rs
Normal file
@@ -0,0 +1,173 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use snafu::ensure;
|
||||
use tokio::sync::{Semaphore, TryAcquireError};
|
||||
|
||||
use crate::error::{MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result};
|
||||
use crate::guard::MemoryGuard;
|
||||
|
||||
/// Minimum bytes controlled by one semaphore permit.
|
||||
pub const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
|
||||
|
||||
/// Trait for recording memory usage metrics.
|
||||
pub trait MemoryMetrics: Clone + Send + Sync + 'static {
|
||||
fn set_limit(&self, bytes: i64);
|
||||
fn set_in_use(&self, bytes: i64);
|
||||
fn inc_rejected(&self, reason: &str);
|
||||
}
|
||||
|
||||
/// Generic memory manager for quota-controlled operations.
|
||||
#[derive(Clone)]
|
||||
pub struct MemoryManager<M: MemoryMetrics> {
|
||||
quota: Option<MemoryQuota<M>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct MemoryQuota<M: MemoryMetrics> {
|
||||
pub(crate) semaphore: Arc<Semaphore>,
|
||||
pub(crate) limit_permits: u32,
|
||||
pub(crate) metrics: M,
|
||||
}
|
||||
|
||||
impl<M: MemoryMetrics> MemoryManager<M> {
|
||||
/// Creates a new memory manager with the given limit in bytes.
|
||||
/// `limit_bytes = 0` disables the limit.
|
||||
pub fn new(limit_bytes: u64, metrics: M) -> Self {
|
||||
if limit_bytes == 0 {
|
||||
metrics.set_limit(0);
|
||||
return Self { quota: None };
|
||||
}
|
||||
|
||||
let limit_permits = bytes_to_permits(limit_bytes);
|
||||
let limit_aligned_bytes = permits_to_bytes(limit_permits);
|
||||
metrics.set_limit(limit_aligned_bytes as i64);
|
||||
|
||||
Self {
|
||||
quota: Some(MemoryQuota {
|
||||
semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
|
||||
limit_permits,
|
||||
metrics,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the configured limit in bytes (0 if unlimited).
|
||||
pub fn limit_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.limit_permits))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Returns currently used bytes.
|
||||
pub fn used_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.used_permits()))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Returns available bytes.
|
||||
pub fn available_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.available_permits_clamped()))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Acquires memory, waiting if necessary until enough is available.
|
||||
///
|
||||
/// # Errors
|
||||
/// - Returns error if requested bytes exceed the total limit
|
||||
/// - Returns error if the semaphore is unexpectedly closed
|
||||
pub async fn acquire(&self, bytes: u64) -> Result<MemoryGuard<M>> {
|
||||
match &self.quota {
|
||||
None => Ok(MemoryGuard::unlimited()),
|
||||
Some(quota) => {
|
||||
let permits = bytes_to_permits(bytes);
|
||||
|
||||
ensure!(
|
||||
permits <= quota.limit_permits,
|
||||
MemoryLimitExceededSnafu {
|
||||
requested_bytes: bytes,
|
||||
limit_bytes: permits_to_bytes(quota.limit_permits),
|
||||
}
|
||||
);
|
||||
|
||||
let permit = quota
|
||||
.semaphore
|
||||
.clone()
|
||||
.acquire_many_owned(permits)
|
||||
.await
|
||||
.map_err(|_| MemorySemaphoreClosedSnafu.build())?;
|
||||
quota.update_in_use_metric();
|
||||
Ok(MemoryGuard::limited(permit, quota.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to acquire memory. Returns Some(guard) on success, None if insufficient.
|
||||
pub fn try_acquire(&self, bytes: u64) -> Option<MemoryGuard<M>> {
|
||||
match &self.quota {
|
||||
None => Some(MemoryGuard::unlimited()),
|
||||
Some(quota) => {
|
||||
let permits = bytes_to_permits(bytes);
|
||||
|
||||
match quota.semaphore.clone().try_acquire_many_owned(permits) {
|
||||
Ok(permit) => {
|
||||
quota.update_in_use_metric();
|
||||
Some(MemoryGuard::limited(permit, quota.clone()))
|
||||
}
|
||||
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
|
||||
quota.metrics.inc_rejected("try_acquire");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M: MemoryMetrics> MemoryQuota<M> {
|
||||
pub(crate) fn used_permits(&self) -> u32 {
|
||||
self.limit_permits
|
||||
.saturating_sub(self.available_permits_clamped())
|
||||
}
|
||||
|
||||
pub(crate) fn available_permits_clamped(&self) -> u32 {
|
||||
self.semaphore
|
||||
.available_permits()
|
||||
.min(self.limit_permits as usize) as u32
|
||||
}
|
||||
|
||||
pub(crate) fn update_in_use_metric(&self) {
|
||||
let bytes = permits_to_bytes(self.used_permits());
|
||||
self.metrics.set_in_use(bytes as i64);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn bytes_to_permits(bytes: u64) -> u32 {
|
||||
bytes
|
||||
.saturating_add(PERMIT_GRANULARITY_BYTES - 1)
|
||||
.saturating_div(PERMIT_GRANULARITY_BYTES)
|
||||
.min(Semaphore::MAX_PERMITS as u64)
|
||||
.min(u32::MAX as u64) as u32
|
||||
}
|
||||
|
||||
pub(crate) fn permits_to_bytes(permits: u32) -> u64 {
|
||||
(permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
|
||||
}
|
||||
83
src/common/memory-manager/src/policy.rs
Normal file
83
src/common/memory-manager/src/policy.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use humantime::{format_duration, parse_duration};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Default wait timeout for memory acquisition.
|
||||
pub const DEFAULT_MEMORY_WAIT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Defines how to react when memory cannot be acquired immediately.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum OnExhaustedPolicy {
|
||||
/// Wait until enough memory is released, bounded by timeout.
|
||||
Wait { timeout: Duration },
|
||||
|
||||
/// Fail immediately if memory is not available.
|
||||
Fail,
|
||||
}
|
||||
|
||||
impl Default for OnExhaustedPolicy {
|
||||
fn default() -> Self {
|
||||
OnExhaustedPolicy::Wait {
|
||||
timeout: DEFAULT_MEMORY_WAIT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for OnExhaustedPolicy {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let text = match self {
|
||||
OnExhaustedPolicy::Fail => "fail".to_string(),
|
||||
OnExhaustedPolicy::Wait { timeout } if *timeout == DEFAULT_MEMORY_WAIT_TIMEOUT => {
|
||||
"wait".to_string()
|
||||
}
|
||||
OnExhaustedPolicy::Wait { timeout } => format!("wait({})", format_duration(*timeout)),
|
||||
};
|
||||
serializer.serialize_str(&text)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for OnExhaustedPolicy {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let raw = String::deserialize(deserializer)?;
|
||||
let lower = raw.to_ascii_lowercase();
|
||||
|
||||
// Accept both "skip" (legacy) and "fail".
|
||||
if lower == "skip" || lower == "fail" {
|
||||
return Ok(OnExhaustedPolicy::Fail);
|
||||
}
|
||||
if lower == "wait" {
|
||||
return Ok(OnExhaustedPolicy::default());
|
||||
}
|
||||
if lower.starts_with("wait(") && lower.ends_with(')') {
|
||||
let inner = &raw[5..raw.len() - 1];
|
||||
let timeout = parse_duration(inner).map_err(serde::de::Error::custom)?;
|
||||
return Ok(OnExhaustedPolicy::Wait { timeout });
|
||||
}
|
||||
|
||||
Err(serde::de::Error::custom(format!(
|
||||
"invalid memory policy: {}, expected wait, wait(<duration>), fail",
|
||||
raw
|
||||
)))
|
||||
}
|
||||
}
|
||||
247
src/common/memory-manager/src/tests.rs
Normal file
247
src/common/memory-manager/src/tests.rs
Normal file
@@ -0,0 +1,247 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use tokio::time::{Duration, sleep};
|
||||
|
||||
use crate::{MemoryManager, NoOpMetrics, PERMIT_GRANULARITY_BYTES};
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_unlimited() {
|
||||
let manager = MemoryManager::new(0, NoOpMetrics);
|
||||
let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(manager.limit_bytes(), 0);
|
||||
assert_eq!(guard.granted_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_limited_success_and_release() {
|
||||
let bytes = 2 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = MemoryManager::new(bytes, NoOpMetrics);
|
||||
{
|
||||
let guard = manager.try_acquire(PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), PERMIT_GRANULARITY_BYTES);
|
||||
drop(guard);
|
||||
}
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_exceeds_limit() {
|
||||
let limit = PERMIT_GRANULARITY_BYTES;
|
||||
let manager = MemoryManager::new(limit, NoOpMetrics);
|
||||
let result = manager.try_acquire(limit + PERMIT_GRANULARITY_BYTES);
|
||||
assert!(result.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn test_acquire_blocks_and_unblocks() {
|
||||
let bytes = 2 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = MemoryManager::new(bytes, NoOpMetrics);
|
||||
let guard = manager.try_acquire(bytes).unwrap();
|
||||
|
||||
// Spawn a task that will block on acquire()
|
||||
let waiter = {
|
||||
let manager = manager.clone();
|
||||
tokio::spawn(async move {
|
||||
// This will block until memory is available
|
||||
let _guard = manager.acquire(bytes).await.unwrap();
|
||||
})
|
||||
};
|
||||
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
// Release memory - this should unblock the waiter
|
||||
drop(guard);
|
||||
|
||||
// Waiter should complete now
|
||||
waiter.await.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_success() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
|
||||
let manager = MemoryManager::new(limit, NoOpMetrics);
|
||||
|
||||
// Acquire base quota (5MB)
|
||||
let base = 5 * PERMIT_GRANULARITY_BYTES;
|
||||
let mut guard = manager.try_acquire(base).unwrap();
|
||||
assert_eq!(guard.granted_bytes(), base);
|
||||
assert_eq!(manager.used_bytes(), base);
|
||||
|
||||
// Request additional memory (3MB) - should succeed and merge
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_exceeds_limit() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
|
||||
let manager = MemoryManager::new(limit, NoOpMetrics);
|
||||
|
||||
// Acquire base quota (5MB)
|
||||
let base = 5 * PERMIT_GRANULARITY_BYTES;
|
||||
let mut guard = manager.try_acquire(base).unwrap();
|
||||
|
||||
// Request additional memory (3MB) - should succeed
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Request more (3MB) - should fail (would exceed 10MB limit)
|
||||
let result = guard.request_additional(3 * PERMIT_GRANULARITY_BYTES);
|
||||
assert!(!result);
|
||||
|
||||
// Still at 8MB
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_auto_release_on_guard_drop() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = MemoryManager::new(limit, NoOpMetrics);
|
||||
|
||||
{
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Request additional - memory is merged into guard
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// When guard drops, all memory (base + additional) is released together
|
||||
}
|
||||
|
||||
// After scope, all memory should be released
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_unlimited() {
|
||||
let manager = MemoryManager::new(0, NoOpMetrics); // Unlimited
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Should always succeed with unlimited manager
|
||||
assert!(guard.request_additional(100 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 0);
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_zero_bytes() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = MemoryManager::new(limit, NoOpMetrics);
|
||||
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Request 0 bytes should succeed without affecting anything
|
||||
assert!(guard.request_additional(0));
|
||||
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_early_release_partial_success() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = MemoryManager::new(limit, NoOpMetrics);
|
||||
|
||||
let mut guard = manager.try_acquire(8 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Release half
|
||||
assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Released memory should be available to others
|
||||
let _guard2 = manager.try_acquire(4 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_early_release_partial_exceeds_granted() {
|
||||
let manager = MemoryManager::new(10 * PERMIT_GRANULARITY_BYTES, NoOpMetrics);
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Try to release more than granted - should fail
|
||||
assert!(!guard.early_release_partial(10 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_early_release_partial_unlimited() {
|
||||
let manager = MemoryManager::new(0, NoOpMetrics);
|
||||
let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Unlimited guard - release should succeed (no-op)
|
||||
assert!(guard.early_release_partial(50 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_and_early_release_symmetry() {
|
||||
let limit = 20 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = MemoryManager::new(limit, NoOpMetrics);
|
||||
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Request additional
|
||||
assert!(guard.request_additional(5 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Early release some
|
||||
assert!(guard.early_release_partial(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Request again
|
||||
assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Early release again
|
||||
assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
drop(guard);
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_small_allocation_rounds_up() {
|
||||
// Test that allocations smaller than PERMIT_GRANULARITY_BYTES
|
||||
// round up to 1 permit and can use request_additional()
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = MemoryManager::new(limit, NoOpMetrics);
|
||||
|
||||
let mut guard = manager.try_acquire(512 * 1024).unwrap(); // 512KB
|
||||
assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); // Rounds up to 1MB
|
||||
assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); // Can request more
|
||||
assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acquire_zero_bytes_lazy_allocation() {
|
||||
// Test that acquire(0) returns 0 permits but can request_additional() later
|
||||
let manager = MemoryManager::new(10 * PERMIT_GRANULARITY_BYTES, NoOpMetrics);
|
||||
|
||||
let mut guard = manager.try_acquire(0).unwrap();
|
||||
assert_eq!(guard.granted_bytes(), 0); // No permits consumed
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); // Lazy allocation
|
||||
assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
@@ -30,6 +30,7 @@ common-error.workspace = true
|
||||
common-grpc.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-memory-manager.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
|
||||
@@ -30,6 +30,7 @@ use std::time::Instant;
|
||||
use api::v1::region::compact_request;
|
||||
use api::v1::region::compact_request::Options;
|
||||
use common_base::Plugins;
|
||||
use common_memory_manager::OnExhaustedPolicy;
|
||||
use common_meta::key::SchemaMetadataManagerRef;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_time::range::TimestampRange;
|
||||
@@ -47,7 +48,7 @@ use tokio::sync::mpsc::{self, Sender};
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::{CacheManagerRef, CacheStrategy};
|
||||
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
|
||||
use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy};
|
||||
use crate::compaction::memory_manager::CompactionMemoryManager;
|
||||
use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
|
||||
use crate::compaction::task::CompactionTaskImpl;
|
||||
use crate::config::MitoConfig;
|
||||
@@ -809,6 +810,7 @@ mod tests {
|
||||
use tokio::sync::{Barrier, oneshot};
|
||||
|
||||
use super::*;
|
||||
use crate::compaction::memory_manager::{CompactionMemoryGuard, new_compaction_memory_manager};
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
use crate::region::ManifestContext;
|
||||
use crate::sst::FormatType;
|
||||
@@ -1181,7 +1183,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_concurrent_memory_competition() {
|
||||
let manager = Arc::new(CompactionMemoryManager::new(3 * 1024 * 1024)); // 3MB
|
||||
let manager = Arc::new(new_compaction_memory_manager(3 * 1024 * 1024)); // 3MB
|
||||
let barrier = Arc::new(Barrier::new(3));
|
||||
let mut handles = vec![];
|
||||
|
||||
@@ -1196,7 +1198,7 @@ mod tests {
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
let results: Vec<_> = futures::future::join_all(handles)
|
||||
let results: Vec<Option<CompactionMemoryGuard>> = futures::future::join_all(handles)
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|r| r.unwrap())
|
||||
|
||||
@@ -12,627 +12,39 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{fmt, mem};
|
||||
use common_memory_manager::{MemoryGuard, MemoryManager, MemoryMetrics};
|
||||
|
||||
use common_telemetry::debug;
|
||||
use humantime::{format_duration, parse_duration};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ensure;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
|
||||
|
||||
use crate::error::{
|
||||
CompactionMemoryLimitExceededSnafu, CompactionMemorySemaphoreClosedSnafu, Result,
|
||||
};
|
||||
use crate::metrics::{
|
||||
COMPACTION_MEMORY_IN_USE, COMPACTION_MEMORY_LIMIT, COMPACTION_MEMORY_REJECTED,
|
||||
};
|
||||
|
||||
/// Minimum bytes controlled by one semaphore permit.
|
||||
const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
|
||||
/// Compaction-specific memory metrics implementation.
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
pub struct CompactionMemoryMetrics;
|
||||
|
||||
/// Default wait timeout for compaction memory.
|
||||
pub const DEFAULT_MEMORY_WAIT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Defines how to react when compaction cannot acquire enough memory.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum OnExhaustedPolicy {
|
||||
/// Wait until enough memory is released, bounded by timeout.
|
||||
Wait { timeout: Duration },
|
||||
/// Skip the compaction if memory is not immediately available.
|
||||
Skip,
|
||||
impl MemoryMetrics for CompactionMemoryMetrics {
|
||||
fn set_limit(&self, bytes: i64) {
|
||||
COMPACTION_MEMORY_LIMIT.set(bytes);
|
||||
}
|
||||
|
||||
impl Default for OnExhaustedPolicy {
|
||||
fn default() -> Self {
|
||||
OnExhaustedPolicy::Wait {
|
||||
timeout: DEFAULT_MEMORY_WAIT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
fn set_in_use(&self, bytes: i64) {
|
||||
COMPACTION_MEMORY_IN_USE.set(bytes);
|
||||
}
|
||||
|
||||
impl Serialize for OnExhaustedPolicy {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let text = match self {
|
||||
OnExhaustedPolicy::Skip => "skip".to_string(),
|
||||
OnExhaustedPolicy::Wait { timeout } if *timeout == DEFAULT_MEMORY_WAIT_TIMEOUT => {
|
||||
"wait".to_string()
|
||||
}
|
||||
OnExhaustedPolicy::Wait { timeout } => {
|
||||
format!("wait({})", format_duration(*timeout))
|
||||
}
|
||||
};
|
||||
serializer.serialize_str(&text)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for OnExhaustedPolicy {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let raw = String::deserialize(deserializer)?;
|
||||
let lower = raw.to_ascii_lowercase();
|
||||
|
||||
if lower == "skip" {
|
||||
return Ok(OnExhaustedPolicy::Skip);
|
||||
}
|
||||
if lower == "wait" {
|
||||
return Ok(OnExhaustedPolicy::default());
|
||||
}
|
||||
if lower.starts_with("wait(") && lower.ends_with(')') {
|
||||
let inner = &raw[5..raw.len() - 1];
|
||||
let timeout = parse_duration(inner).map_err(serde::de::Error::custom)?;
|
||||
return Ok(OnExhaustedPolicy::Wait { timeout });
|
||||
}
|
||||
|
||||
Err(serde::de::Error::custom(format!(
|
||||
"invalid compaction memory policy: {}, expected wait, wait(<duration>) or skip",
|
||||
raw
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Global memory manager for compaction tasks.
|
||||
#[derive(Clone)]
|
||||
pub struct CompactionMemoryManager {
|
||||
quota: Option<MemoryQuota>,
|
||||
}
|
||||
|
||||
/// Shared memory quota state across all compaction guards.
|
||||
#[derive(Clone)]
|
||||
struct MemoryQuota {
|
||||
semaphore: Arc<Semaphore>,
|
||||
// Maximum permits (aligned to PERMIT_GRANULARITY_BYTES).
|
||||
limit_permits: u32,
|
||||
}
|
||||
|
||||
impl CompactionMemoryManager {
|
||||
/// Creates a new memory manager with the given limit in bytes.
|
||||
/// `limit_bytes = 0` disables the limit.
|
||||
pub fn new(limit_bytes: u64) -> Self {
|
||||
if limit_bytes == 0 {
|
||||
COMPACTION_MEMORY_LIMIT.set(0);
|
||||
return Self { quota: None };
|
||||
}
|
||||
|
||||
let limit_permits = bytes_to_permits(limit_bytes);
|
||||
let limit_aligned_bytes = permits_to_bytes(limit_permits);
|
||||
COMPACTION_MEMORY_LIMIT.set(limit_aligned_bytes as i64);
|
||||
|
||||
Self {
|
||||
quota: Some(MemoryQuota {
|
||||
semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
|
||||
limit_permits,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the configured limit in bytes (0 if unlimited).
|
||||
pub fn limit_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.limit_permits))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Returns currently used bytes.
|
||||
pub fn used_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.used_permits()))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Returns available bytes.
|
||||
pub fn available_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.available_permits_clamped()))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Acquires memory, waiting if necessary until enough is available.
|
||||
///
|
||||
/// # Errors
|
||||
/// - Returns error if requested bytes exceed the total limit
|
||||
/// - Returns error if the semaphore is unexpectedly closed
|
||||
pub async fn acquire(&self, bytes: u64) -> Result<CompactionMemoryGuard> {
|
||||
match &self.quota {
|
||||
None => Ok(CompactionMemoryGuard::unlimited()),
|
||||
Some(quota) => {
|
||||
let permits = bytes_to_permits(bytes);
|
||||
|
||||
// Fail-fast: reject if request exceeds total limit.
|
||||
ensure!(
|
||||
permits <= quota.limit_permits,
|
||||
CompactionMemoryLimitExceededSnafu {
|
||||
requested_bytes: bytes,
|
||||
limit_bytes: permits_to_bytes(quota.limit_permits),
|
||||
}
|
||||
);
|
||||
|
||||
let permit = quota
|
||||
.semaphore
|
||||
.clone()
|
||||
.acquire_many_owned(permits)
|
||||
.await
|
||||
.map_err(|_| CompactionMemorySemaphoreClosedSnafu.build())?;
|
||||
quota.update_in_use_metric();
|
||||
Ok(CompactionMemoryGuard::limited(permit, quota.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to acquire memory. Returns Some(guard) on success, None if insufficient.
|
||||
pub fn try_acquire(&self, bytes: u64) -> Option<CompactionMemoryGuard> {
|
||||
match &self.quota {
|
||||
None => Some(CompactionMemoryGuard::unlimited()),
|
||||
Some(quota) => {
|
||||
let permits = bytes_to_permits(bytes);
|
||||
|
||||
match quota.semaphore.clone().try_acquire_many_owned(permits) {
|
||||
Ok(permit) => {
|
||||
quota.update_in_use_metric();
|
||||
Some(CompactionMemoryGuard::limited(permit, quota.clone()))
|
||||
}
|
||||
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
|
||||
fn inc_rejected(&self, reason: &str) {
|
||||
COMPACTION_MEMORY_REJECTED
|
||||
.with_label_values(&["try_acquire"])
|
||||
.with_label_values(&[reason])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MemoryQuota {
|
||||
fn used_permits(&self) -> u32 {
|
||||
self.limit_permits
|
||||
.saturating_sub(self.available_permits_clamped())
|
||||
}
|
||||
|
||||
fn available_permits_clamped(&self) -> u32 {
|
||||
// Clamp to limit_permits to ensure we never report more available permits
|
||||
// than our configured limit, even if semaphore state becomes inconsistent.
|
||||
self.semaphore
|
||||
.available_permits()
|
||||
.min(self.limit_permits as usize) as u32
|
||||
}
|
||||
|
||||
fn update_in_use_metric(&self) {
|
||||
let bytes = permits_to_bytes(self.used_permits());
|
||||
COMPACTION_MEMORY_IN_USE.set(bytes as i64);
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard representing a slice of reserved compaction memory.
|
||||
///
|
||||
/// Memory is automatically released when this guard is dropped.
|
||||
pub struct CompactionMemoryGuard {
|
||||
state: GuardState,
|
||||
}
|
||||
|
||||
enum GuardState {
|
||||
Unlimited,
|
||||
Limited {
|
||||
// Holds all permits owned by this guard (base plus any additional).
|
||||
// Additional requests merge into this permit and are released together on drop.
|
||||
permit: OwnedSemaphorePermit,
|
||||
// Memory quota for requesting additional permits and updating metrics.
|
||||
quota: MemoryQuota,
|
||||
},
|
||||
}
|
||||
|
||||
impl CompactionMemoryGuard {
|
||||
fn unlimited() -> Self {
|
||||
Self {
|
||||
state: GuardState::Unlimited,
|
||||
}
|
||||
}
|
||||
|
||||
fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota) -> Self {
|
||||
Self {
|
||||
state: GuardState::Limited { permit, quota },
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns granted quota in bytes.
|
||||
pub fn granted_bytes(&self) -> u64 {
|
||||
match &self.state {
|
||||
GuardState::Unlimited => 0,
|
||||
GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to allocate additional memory during task execution.
|
||||
///
|
||||
/// On success, merges the new memory into this guard and returns true.
|
||||
/// On failure, returns false and leaves this guard unchanged.
|
||||
///
|
||||
/// # Behavior
|
||||
/// - Running tasks can request additional memory on top of their initial allocation
|
||||
/// - If total memory (all tasks) would exceed limit, returns false immediately
|
||||
/// - The task should gracefully handle false by failing or adjusting its strategy
|
||||
/// - The additional memory is merged into this guard and released together on drop
|
||||
pub fn request_additional(&mut self, bytes: u64) -> bool {
|
||||
match &mut self.state {
|
||||
GuardState::Unlimited => true,
|
||||
GuardState::Limited { permit, quota } => {
|
||||
// Early return for zero-byte requests (no-op)
|
||||
if bytes == 0 {
|
||||
return true;
|
||||
}
|
||||
|
||||
let additional_permits = bytes_to_permits(bytes);
|
||||
|
||||
// Try to acquire additional permits from the quota
|
||||
match quota
|
||||
.semaphore
|
||||
.clone()
|
||||
.try_acquire_many_owned(additional_permits)
|
||||
{
|
||||
Ok(additional_permit) => {
|
||||
// Merge into main permit
|
||||
permit.merge(additional_permit);
|
||||
quota.update_in_use_metric();
|
||||
|
||||
debug!("Allocated additional {} bytes", bytes);
|
||||
|
||||
true
|
||||
}
|
||||
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
|
||||
COMPACTION_MEMORY_REJECTED
|
||||
.with_label_values(&["request_additional"])
|
||||
.inc();
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Releases a portion of granted memory back to the pool early,
|
||||
/// before the guard is dropped.
|
||||
///
|
||||
/// This is useful when a task's memory requirement decreases during execution
|
||||
/// (e.g., after completing a memory-intensive phase). The guard remains valid
|
||||
/// with reduced quota, and the task can continue running.
|
||||
pub fn early_release_partial(&mut self, bytes: u64) -> bool {
|
||||
match &mut self.state {
|
||||
GuardState::Unlimited => true,
|
||||
GuardState::Limited { permit, quota } => {
|
||||
// Early return for zero-byte requests (no-op)
|
||||
if bytes == 0 {
|
||||
return true;
|
||||
}
|
||||
|
||||
let release_permits = bytes_to_permits(bytes);
|
||||
|
||||
// Split out the permits we want to release
|
||||
match permit.split(release_permits as usize) {
|
||||
Some(released_permit) => {
|
||||
let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
|
||||
|
||||
// Drop the split permit to return it to the quota
|
||||
drop(released_permit);
|
||||
quota.update_in_use_metric();
|
||||
|
||||
debug!(
|
||||
"Early released {} bytes from compaction memory guard",
|
||||
released_bytes
|
||||
);
|
||||
|
||||
true
|
||||
}
|
||||
None => {
|
||||
// Requested release exceeds granted amount
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CompactionMemoryGuard {
|
||||
fn drop(&mut self) {
|
||||
if let GuardState::Limited { permit, quota } =
|
||||
mem::replace(&mut self.state, GuardState::Unlimited)
|
||||
{
|
||||
let bytes = permits_to_bytes(permit.num_permits() as u32);
|
||||
|
||||
// Release permits before updating metrics to reflect latest usage.
|
||||
drop(permit);
|
||||
quota.update_in_use_metric();
|
||||
|
||||
debug!("Released compaction memory: {} bytes", bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for CompactionMemoryGuard {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("CompactionMemoryGuard")
|
||||
.field("granted_bytes", &self.granted_bytes())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
fn bytes_to_permits(bytes: u64) -> u32 {
|
||||
// Round up to the nearest permit.
|
||||
// Returns 0 for bytes=0, which allows lazy allocation via request_additional().
|
||||
// Non-zero bytes always round up to at least 1 permit due to the math.
|
||||
bytes
|
||||
.saturating_add(PERMIT_GRANULARITY_BYTES - 1)
|
||||
.saturating_div(PERMIT_GRANULARITY_BYTES)
|
||||
.min(Semaphore::MAX_PERMITS as u64)
|
||||
.min(u32::MAX as u64) as u32
|
||||
}
|
||||
|
||||
fn permits_to_bytes(permits: u32) -> u64 {
|
||||
(permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tokio::time::{Duration, sleep};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_unlimited() {
|
||||
let manager = CompactionMemoryManager::new(0);
|
||||
let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(manager.limit_bytes(), 0);
|
||||
assert_eq!(guard.granted_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_limited_success_and_release() {
|
||||
let bytes = 2 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(bytes);
|
||||
{
|
||||
let guard = manager.try_acquire(PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), PERMIT_GRANULARITY_BYTES);
|
||||
drop(guard);
|
||||
}
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_exceeds_limit() {
|
||||
let limit = PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
let result = manager.try_acquire(limit + PERMIT_GRANULARITY_BYTES);
|
||||
assert!(result.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn test_acquire_blocks_and_unblocks() {
|
||||
let bytes = 2 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(bytes);
|
||||
let guard = manager.try_acquire(bytes).unwrap();
|
||||
|
||||
// Spawn a task that will block on acquire()
|
||||
let waiter = {
|
||||
let manager = manager.clone();
|
||||
tokio::spawn(async move {
|
||||
// This will block until memory is available
|
||||
let _guard = manager.acquire(bytes).await.unwrap();
|
||||
})
|
||||
};
|
||||
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
// Release memory - this should unblock the waiter
|
||||
drop(guard);
|
||||
|
||||
// Waiter should complete now
|
||||
waiter.await.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_success() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
// Acquire base quota (5MB)
|
||||
let base = 5 * PERMIT_GRANULARITY_BYTES;
|
||||
let mut guard = manager.try_acquire(base).unwrap();
|
||||
assert_eq!(guard.granted_bytes(), base);
|
||||
assert_eq!(manager.used_bytes(), base);
|
||||
|
||||
// Request additional memory (3MB) - should succeed and merge
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_exceeds_limit() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
// Acquire base quota (5MB)
|
||||
let base = 5 * PERMIT_GRANULARITY_BYTES;
|
||||
let mut guard = manager.try_acquire(base).unwrap();
|
||||
|
||||
// Request additional memory (3MB) - should succeed
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Request more (3MB) - should fail (would exceed 10MB limit)
|
||||
let result = guard.request_additional(3 * PERMIT_GRANULARITY_BYTES);
|
||||
assert!(!result);
|
||||
|
||||
// Still at 8MB
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_auto_release_on_guard_drop() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
{
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Request additional - memory is merged into guard
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// When guard drops, all memory (base + additional) is released together
|
||||
}
|
||||
|
||||
// After scope, all memory should be released
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_unlimited() {
|
||||
let manager = CompactionMemoryManager::new(0); // Unlimited
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Should always succeed with unlimited manager
|
||||
assert!(guard.request_additional(100 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 0);
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_zero_bytes() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Request 0 bytes should succeed without affecting anything
|
||||
assert!(guard.request_additional(0));
|
||||
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_early_release_partial_success() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
let mut guard = manager.try_acquire(8 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Release half
|
||||
assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Released memory should be available to others
|
||||
let _guard2 = manager.try_acquire(4 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_early_release_partial_exceeds_granted() {
|
||||
let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES);
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Try to release more than granted - should fail
|
||||
assert!(!guard.early_release_partial(10 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_early_release_partial_unlimited() {
|
||||
let manager = CompactionMemoryManager::new(0);
|
||||
let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Unlimited guard - release should succeed (no-op)
|
||||
assert!(guard.early_release_partial(50 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_and_early_release_symmetry() {
|
||||
let limit = 20 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Request additional
|
||||
assert!(guard.request_additional(5 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Early release some
|
||||
assert!(guard.early_release_partial(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Request again
|
||||
assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Early release again
|
||||
assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
drop(guard);
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_small_allocation_rounds_up() {
|
||||
// Test that allocations smaller than PERMIT_GRANULARITY_BYTES
|
||||
// round up to 1 permit and can use request_additional()
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
let mut guard = manager.try_acquire(512 * 1024).unwrap(); // 512KB
|
||||
assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); // Rounds up to 1MB
|
||||
assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); // Can request more
|
||||
assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acquire_zero_bytes_lazy_allocation() {
|
||||
// Test that acquire(0) returns 0 permits but can request_additional() later
|
||||
let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
let mut guard = manager.try_acquire(0).unwrap();
|
||||
assert_eq!(guard.granted_bytes(), 0); // No permits consumed
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); // Lazy allocation
|
||||
assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
/// Compaction memory manager.
|
||||
pub type CompactionMemoryManager = MemoryManager<CompactionMemoryMetrics>;
|
||||
|
||||
/// Compaction memory guard.
|
||||
pub type CompactionMemoryGuard = MemoryGuard<CompactionMemoryMetrics>;
|
||||
|
||||
/// Helper to construct a compaction memory manager without passing metrics explicitly.
|
||||
pub fn new_compaction_memory_manager(limit_bytes: u64) -> CompactionMemoryManager {
|
||||
CompactionMemoryManager::new(limit_bytes, CompactionMemoryMetrics)
|
||||
}
|
||||
|
||||
@@ -16,17 +16,16 @@ use std::fmt::{Debug, Formatter};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_memory_manager::OnExhaustedPolicy;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use itertools::Itertools;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::compaction::compactor::{CompactionRegion, Compactor};
|
||||
use crate::compaction::memory_manager::{
|
||||
CompactionMemoryGuard, CompactionMemoryManager, OnExhaustedPolicy,
|
||||
};
|
||||
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
|
||||
use crate::compaction::picker::{CompactionTask, PickerOutput};
|
||||
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
|
||||
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu, MemoryAcquireFailedSnafu};
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
|
||||
use crate::region::RegionRoleState;
|
||||
@@ -130,7 +129,10 @@ impl CompactionTaskImpl {
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
timer.observe_duration();
|
||||
Err(e)
|
||||
Err(e).with_context(|_| MemoryAcquireFailedSnafu {
|
||||
region_id,
|
||||
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
|
||||
})
|
||||
}
|
||||
Err(_) => {
|
||||
timer.observe_duration();
|
||||
@@ -148,21 +150,20 @@ impl CompactionTaskImpl {
|
||||
}
|
||||
}
|
||||
}
|
||||
OnExhaustedPolicy::Skip => {
|
||||
// Try to acquire, skip if not immediately available
|
||||
OnExhaustedPolicy::Fail => {
|
||||
// Try to acquire, fail immediately if not available
|
||||
self.memory_manager
|
||||
.try_acquire(requested_bytes)
|
||||
.ok_or_else(|| {
|
||||
warn!(
|
||||
"Skipping compaction for region {} due to memory limit \
|
||||
(need {} bytes, limit {} bytes)",
|
||||
"Compaction memory exhausted for region {} (policy=fail, need {} bytes, limit {} bytes)",
|
||||
region_id, requested_bytes, limit_bytes
|
||||
);
|
||||
CompactionMemoryExhaustedSnafu {
|
||||
region_id,
|
||||
required_bytes: requested_bytes,
|
||||
limit_bytes,
|
||||
policy: "skip".to_string(),
|
||||
policy: "fail".to_string(),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
|
||||
@@ -20,13 +20,13 @@ use std::time::Duration;
|
||||
|
||||
use common_base::memory_limit::MemoryLimit;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_memory_manager::OnExhaustedPolicy;
|
||||
use common_stat::{get_total_cpu_cores, get_total_memory_readable};
|
||||
use common_telemetry::warn;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
|
||||
use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
|
||||
use crate::compaction::memory_manager::OnExhaustedPolicy;
|
||||
use crate::error::Result;
|
||||
use crate::gc::GcConfig;
|
||||
use crate::memtable::MemtableConfig;
|
||||
|
||||
@@ -19,6 +19,7 @@ use common_datasource::compression::CompressionType;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_memory_manager;
|
||||
use common_runtime::JoinError;
|
||||
use common_time::Timestamp;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
@@ -1042,11 +1043,7 @@ pub enum Error {
|
||||
ManualCompactionOverride {},
|
||||
|
||||
#[snafu(display(
|
||||
"Compaction memory limit exceeded for region {}: required {} bytes, limit {} bytes (policy: {})",
|
||||
region_id,
|
||||
required_bytes,
|
||||
limit_bytes,
|
||||
policy
|
||||
"Compaction memory limit exceeded for region {region_id}: required {required_bytes} bytes, limit {limit_bytes} bytes (policy: {policy})",
|
||||
))]
|
||||
CompactionMemoryExhausted {
|
||||
region_id: RegionId,
|
||||
@@ -1057,20 +1054,12 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Requested compaction memory ({} bytes) exceeds total limit ({} bytes)",
|
||||
requested_bytes,
|
||||
limit_bytes
|
||||
))]
|
||||
CompactionMemoryLimitExceeded {
|
||||
requested_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Compaction memory semaphore unexpectedly closed"))]
|
||||
CompactionMemorySemaphoreClosed {
|
||||
#[snafu(display("Failed to acquire memory for region {region_id} (policy: {policy})"))]
|
||||
MemoryAcquireFailed {
|
||||
region_id: RegionId,
|
||||
policy: String,
|
||||
#[snafu(source)]
|
||||
source: common_memory_manager::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
@@ -1359,9 +1348,7 @@ impl ErrorExt for Error {
|
||||
|
||||
CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
CompactionMemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
CompactionMemorySemaphoreClosed { .. } => StatusCode::Unexpected,
|
||||
MemoryAcquireFailed { source, .. } => source.status_code(),
|
||||
|
||||
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use common_base::Plugins;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use common_memory_manager::OnExhaustedPolicy;
|
||||
use common_test_util::temp_dir::{TempDir, create_temp_dir};
|
||||
use object_store::ObjectStore;
|
||||
use object_store::services::Fs;
|
||||
@@ -28,7 +29,7 @@ use tokio::sync::mpsc::Sender;
|
||||
use crate::access_layer::{AccessLayer, AccessLayerRef};
|
||||
use crate::cache::CacheManager;
|
||||
use crate::compaction::CompactionScheduler;
|
||||
use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy};
|
||||
use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Result;
|
||||
use crate::flush::FlushScheduler;
|
||||
@@ -101,7 +102,7 @@ impl SchedulerEnv {
|
||||
Arc::new(MitoConfig::default()),
|
||||
WorkerListener::default(),
|
||||
Plugins::new(),
|
||||
Arc::new(CompactionMemoryManager::new(0)),
|
||||
Arc::new(new_compaction_memory_manager(0)),
|
||||
OnExhaustedPolicy::default(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
|
||||
use crate::cache::write_cache::{WriteCache, WriteCacheRef};
|
||||
use crate::cache::{CacheManager, CacheManagerRef};
|
||||
use crate::compaction::CompactionScheduler;
|
||||
use crate::compaction::memory_manager::CompactionMemoryManager;
|
||||
use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
|
||||
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
|
||||
@@ -217,7 +217,7 @@ impl WorkerGroup {
|
||||
.experimental_compaction_memory_limit
|
||||
.resolve(total_memory);
|
||||
let compaction_memory_manager =
|
||||
Arc::new(CompactionMemoryManager::new(compaction_limit_bytes));
|
||||
Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
|
||||
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
|
||||
|
||||
let workers = (0..config.num_workers)
|
||||
@@ -405,7 +405,7 @@ impl WorkerGroup {
|
||||
.experimental_compaction_memory_limit
|
||||
.resolve(total_memory);
|
||||
let compaction_memory_manager =
|
||||
Arc::new(CompactionMemoryManager::new(compaction_limit_bytes));
|
||||
Arc::new(new_compaction_memory_manager(compaction_limit_bytes));
|
||||
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
|
||||
let workers = (0..config.num_workers)
|
||||
.map(|id| {
|
||||
|
||||
Reference in New Issue
Block a user