mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
Compare commits
3 Commits
v1.0.0-bet
...
feat/slow-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ed33e1aeb | ||
|
|
2fcaa5ebc3 | ||
|
|
e2517dec80 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7672,7 +7672,6 @@ dependencies = [
|
||||
"either",
|
||||
"futures",
|
||||
"greptime-proto",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"index",
|
||||
"itertools 0.14.0",
|
||||
|
||||
@@ -141,8 +141,6 @@
|
||||
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
|
||||
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
|
||||
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
|
||||
| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
|
||||
| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.<br/>Options: "wait" (default, 10s), "wait(<duration>)", "skip" |
|
||||
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
|
||||
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
|
||||
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size`. |
|
||||
@@ -523,8 +521,6 @@
|
||||
| `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). |
|
||||
| `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). |
|
||||
| `region_engine.mito.max_background_purges` | Integer | Auto | Max number of running background purge jobs (default: number of cpu cores). |
|
||||
| `region_engine.mito.experimental_compaction_memory_limit` | String | 0 | Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit. |
|
||||
| `region_engine.mito.experimental_compaction_on_exhausted` | String | wait | Behavior when compaction cannot acquire memory from the budget.<br/>Options: "wait" (default, 10s), "wait(<duration>)", "skip" |
|
||||
| `region_engine.mito.auto_flush_interval` | String | `1h` | Interval to auto flush a region if it has not flushed yet. |
|
||||
| `region_engine.mito.global_write_buffer_size` | String | Auto | Global write buffer size for all regions. If not set, it's default to 1/8 of OS memory with a max limitation of 1GB. |
|
||||
| `region_engine.mito.global_write_buffer_reject_size` | String | Auto | Global write buffer size threshold to reject write requests. If not set, it's default to 2 times of `global_write_buffer_size` |
|
||||
|
||||
@@ -452,15 +452,6 @@ compress_manifest = false
|
||||
## @toml2docs:none-default="Auto"
|
||||
#+ max_background_purges = 8
|
||||
|
||||
## Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
|
||||
## @toml2docs:none-default="0"
|
||||
#+ experimental_compaction_memory_limit = "0"
|
||||
|
||||
## Behavior when compaction cannot acquire memory from the budget.
|
||||
## Options: "wait" (default, 10s), "wait(<duration>)", "skip"
|
||||
## @toml2docs:none-default="wait"
|
||||
#+ experimental_compaction_on_exhausted = "wait"
|
||||
|
||||
## Interval to auto flush a region if it has not flushed yet.
|
||||
auto_flush_interval = "1h"
|
||||
|
||||
|
||||
@@ -546,15 +546,6 @@ compress_manifest = false
|
||||
## @toml2docs:none-default="Auto"
|
||||
#+ max_background_purges = 8
|
||||
|
||||
## Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
|
||||
## @toml2docs:none-default="0"
|
||||
#+ experimental_compaction_memory_limit = "0"
|
||||
|
||||
## Behavior when compaction cannot acquire memory from the budget.
|
||||
## Options: "wait" (default, 10s), "wait(<duration>)", "skip"
|
||||
## @toml2docs:none-default="wait"
|
||||
#+ experimental_compaction_on_exhausted = "wait"
|
||||
|
||||
## Interval to auto flush a region if it has not flushed yet.
|
||||
auto_flush_interval = "1h"
|
||||
|
||||
|
||||
@@ -145,17 +145,6 @@ impl ObjbenchCommand {
|
||||
let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
|
||||
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
|
||||
let num_row_groups = parquet_meta.num_row_groups() as u64;
|
||||
let max_row_group_uncompressed_size: u64 = parquet_meta
|
||||
.row_groups()
|
||||
.iter()
|
||||
.map(|rg| {
|
||||
rg.columns()
|
||||
.iter()
|
||||
.map(|c| c.uncompressed_size() as u64)
|
||||
.sum::<u64>()
|
||||
})
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
|
||||
println!(
|
||||
"{} Metadata loaded - rows: {}, size: {} bytes",
|
||||
@@ -171,7 +160,6 @@ impl ObjbenchCommand {
|
||||
time_range: Default::default(),
|
||||
level: 0,
|
||||
file_size,
|
||||
max_row_group_uncompressed_size,
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
|
||||
@@ -135,9 +135,6 @@ async fn test_full_gc_workflow() {
|
||||
);
|
||||
}
|
||||
|
||||
/// Due to https://github.com/rust-lang/rust/issues/100141 can't have Instant early than process start time on non-linux OS
|
||||
/// This is fine since in real usage instant will always be after process start time
|
||||
#[cfg(target_os = "linux")]
|
||||
#[tokio::test]
|
||||
async fn test_tracker_cleanup() {
|
||||
init_default_ut_logging();
|
||||
|
||||
@@ -48,7 +48,6 @@ dotenv.workspace = true
|
||||
either.workspace = true
|
||||
futures.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
humantime.workspace = true
|
||||
index.workspace = true
|
||||
itertools.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
mod buckets;
|
||||
pub mod compactor;
|
||||
pub mod memory_manager;
|
||||
pub mod picker;
|
||||
pub mod run;
|
||||
mod task;
|
||||
@@ -47,8 +46,7 @@ use tokio::sync::mpsc::{self, Sender};
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::{CacheManagerRef, CacheStrategy};
|
||||
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
|
||||
use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy};
|
||||
use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
|
||||
use crate::compaction::picker::{CompactionTask, new_picker};
|
||||
use crate::compaction::task::CompactionTaskImpl;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{
|
||||
@@ -106,15 +104,12 @@ pub(crate) struct CompactionScheduler {
|
||||
request_sender: Sender<WorkerRequestWithTime>,
|
||||
cache_manager: CacheManagerRef,
|
||||
engine_config: Arc<MitoConfig>,
|
||||
memory_manager: Arc<CompactionMemoryManager>,
|
||||
memory_policy: OnExhaustedPolicy,
|
||||
listener: WorkerListener,
|
||||
/// Plugins for the compaction scheduler.
|
||||
plugins: Plugins,
|
||||
}
|
||||
|
||||
impl CompactionScheduler {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
scheduler: SchedulerRef,
|
||||
request_sender: Sender<WorkerRequestWithTime>,
|
||||
@@ -122,8 +117,6 @@ impl CompactionScheduler {
|
||||
engine_config: Arc<MitoConfig>,
|
||||
listener: WorkerListener,
|
||||
plugins: Plugins,
|
||||
memory_manager: Arc<CompactionMemoryManager>,
|
||||
memory_policy: OnExhaustedPolicy,
|
||||
) -> Self {
|
||||
Self {
|
||||
scheduler,
|
||||
@@ -131,8 +124,6 @@ impl CompactionScheduler {
|
||||
request_sender,
|
||||
cache_manager,
|
||||
engine_config,
|
||||
memory_manager,
|
||||
memory_policy,
|
||||
listener,
|
||||
plugins,
|
||||
}
|
||||
@@ -438,8 +429,7 @@ impl CompactionScheduler {
|
||||
};
|
||||
|
||||
// Create a local compaction task.
|
||||
let estimated_bytes = estimate_compaction_bytes(&picker_output);
|
||||
let local_compaction_task = Box::new(CompactionTaskImpl {
|
||||
let mut local_compaction_task = Box::new(CompactionTaskImpl {
|
||||
request_sender,
|
||||
waiters,
|
||||
start_time,
|
||||
@@ -447,27 +437,18 @@ impl CompactionScheduler {
|
||||
picker_output,
|
||||
compaction_region,
|
||||
compactor: Arc::new(DefaultCompactor {}),
|
||||
memory_manager: self.memory_manager.clone(),
|
||||
memory_policy: self.memory_policy,
|
||||
estimated_memory_bytes: estimated_bytes,
|
||||
});
|
||||
|
||||
self.submit_compaction_task(local_compaction_task, region_id)
|
||||
}
|
||||
|
||||
fn submit_compaction_task(
|
||||
&mut self,
|
||||
mut task: Box<CompactionTaskImpl>,
|
||||
region_id: RegionId,
|
||||
) -> Result<()> {
|
||||
// Submit the compaction task.
|
||||
self.scheduler
|
||||
.schedule(Box::pin(async move {
|
||||
INFLIGHT_COMPACTION_COUNT.inc();
|
||||
task.run().await;
|
||||
local_compaction_task.run().await;
|
||||
INFLIGHT_COMPACTION_COUNT.dec();
|
||||
}))
|
||||
.map_err(|e| {
|
||||
error!(e; "Failed to submit compaction request for region {}", region_id);
|
||||
// If failed to submit the job, we need to remove the region from the scheduler.
|
||||
self.region_status.remove(®ion_id);
|
||||
e
|
||||
})
|
||||
@@ -777,20 +758,6 @@ fn get_expired_ssts(
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Estimates compaction memory as the sum of all input files' maximum row-group
|
||||
/// uncompressed sizes.
|
||||
fn estimate_compaction_bytes(picker_output: &PickerOutput) -> u64 {
|
||||
picker_output
|
||||
.outputs
|
||||
.iter()
|
||||
.flat_map(|output| output.inputs.iter())
|
||||
.map(|file: &FileHandle| {
|
||||
let meta = file.meta_ref();
|
||||
meta.max_row_group_uncompressed_size
|
||||
})
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Pending compaction request that is supposed to run after current task is finished,
|
||||
/// typically used for manual compactions.
|
||||
struct PendingCompaction {
|
||||
@@ -806,7 +773,7 @@ struct PendingCompaction {
|
||||
mod tests {
|
||||
use api::v1::region::StrictWindow;
|
||||
use common_datasource::compression::CompressionType;
|
||||
use tokio::sync::{Barrier, oneshot};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use super::*;
|
||||
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
|
||||
@@ -1178,39 +1145,4 @@ mod tests {
|
||||
assert_eq!(result.unwrap(), 0); // is there a better way to check this?
|
||||
assert_eq!(0, scheduler.region_status.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_concurrent_memory_competition() {
|
||||
let manager = Arc::new(CompactionMemoryManager::new(3 * 1024 * 1024)); // 3MB
|
||||
let barrier = Arc::new(Barrier::new(3));
|
||||
let mut handles = vec![];
|
||||
|
||||
// Spawn 3 tasks competing for memory, each trying to acquire 2MB
|
||||
for _i in 0..3 {
|
||||
let mgr = manager.clone();
|
||||
let bar = barrier.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
bar.wait().await; // Synchronize start
|
||||
mgr.try_acquire(2 * 1024 * 1024)
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
let results: Vec<_> = futures::future::join_all(handles)
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|r| r.unwrap())
|
||||
.collect();
|
||||
|
||||
// Only 1 should succeed (3MB limit, 2MB request, can only fit one)
|
||||
let succeeded = results.iter().filter(|r| r.is_some()).count();
|
||||
let failed = results.iter().filter(|r| r.is_none()).count();
|
||||
|
||||
assert_eq!(succeeded, 1, "Expected exactly 1 task to acquire memory");
|
||||
assert_eq!(failed, 2, "Expected 2 tasks to fail");
|
||||
|
||||
// Clean up
|
||||
drop(results);
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -396,7 +396,6 @@ impl DefaultCompactor {
|
||||
time_range: sst_info.time_range,
|
||||
level: output.output_level,
|
||||
file_size: sst_info.file_size,
|
||||
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
|
||||
available_indexes: sst_info.index_metadata.build_available_indexes(),
|
||||
indexes: sst_info.index_metadata.build_indexes(),
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
|
||||
@@ -1,638 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{fmt, mem};
|
||||
|
||||
use common_telemetry::debug;
|
||||
use humantime::{format_duration, parse_duration};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ensure;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
|
||||
|
||||
use crate::error::{
|
||||
CompactionMemoryLimitExceededSnafu, CompactionMemorySemaphoreClosedSnafu, Result,
|
||||
};
|
||||
use crate::metrics::{
|
||||
COMPACTION_MEMORY_IN_USE, COMPACTION_MEMORY_LIMIT, COMPACTION_MEMORY_REJECTED,
|
||||
};
|
||||
|
||||
/// Minimum bytes controlled by one semaphore permit.
|
||||
const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
|
||||
|
||||
/// Default wait timeout for compaction memory.
|
||||
pub const DEFAULT_MEMORY_WAIT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Defines how to react when compaction cannot acquire enough memory.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum OnExhaustedPolicy {
|
||||
/// Wait until enough memory is released, bounded by timeout.
|
||||
Wait { timeout: Duration },
|
||||
/// Skip the compaction if memory is not immediately available.
|
||||
Skip,
|
||||
}
|
||||
|
||||
impl Default for OnExhaustedPolicy {
|
||||
fn default() -> Self {
|
||||
OnExhaustedPolicy::Wait {
|
||||
timeout: DEFAULT_MEMORY_WAIT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for OnExhaustedPolicy {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let text = match self {
|
||||
OnExhaustedPolicy::Skip => "skip".to_string(),
|
||||
OnExhaustedPolicy::Wait { timeout } if *timeout == DEFAULT_MEMORY_WAIT_TIMEOUT => {
|
||||
"wait".to_string()
|
||||
}
|
||||
OnExhaustedPolicy::Wait { timeout } => {
|
||||
format!("wait({})", format_duration(*timeout))
|
||||
}
|
||||
};
|
||||
serializer.serialize_str(&text)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for OnExhaustedPolicy {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let raw = String::deserialize(deserializer)?;
|
||||
let lower = raw.to_ascii_lowercase();
|
||||
|
||||
if lower == "skip" {
|
||||
return Ok(OnExhaustedPolicy::Skip);
|
||||
}
|
||||
if lower == "wait" {
|
||||
return Ok(OnExhaustedPolicy::default());
|
||||
}
|
||||
if lower.starts_with("wait(") && lower.ends_with(')') {
|
||||
let inner = &raw[5..raw.len() - 1];
|
||||
let timeout = parse_duration(inner).map_err(serde::de::Error::custom)?;
|
||||
return Ok(OnExhaustedPolicy::Wait { timeout });
|
||||
}
|
||||
|
||||
Err(serde::de::Error::custom(format!(
|
||||
"invalid compaction memory policy: {}, expected wait, wait(<duration>) or skip",
|
||||
raw
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Global memory manager for compaction tasks.
|
||||
#[derive(Clone)]
|
||||
pub struct CompactionMemoryManager {
|
||||
quota: Option<MemoryQuota>,
|
||||
}
|
||||
|
||||
/// Shared memory quota state across all compaction guards.
|
||||
#[derive(Clone)]
|
||||
struct MemoryQuota {
|
||||
semaphore: Arc<Semaphore>,
|
||||
// Maximum permits (aligned to PERMIT_GRANULARITY_BYTES).
|
||||
limit_permits: u32,
|
||||
}
|
||||
|
||||
impl CompactionMemoryManager {
|
||||
/// Creates a new memory manager with the given limit in bytes.
|
||||
/// `limit_bytes = 0` disables the limit.
|
||||
pub fn new(limit_bytes: u64) -> Self {
|
||||
if limit_bytes == 0 {
|
||||
COMPACTION_MEMORY_LIMIT.set(0);
|
||||
return Self { quota: None };
|
||||
}
|
||||
|
||||
let limit_permits = bytes_to_permits(limit_bytes);
|
||||
let limit_aligned_bytes = permits_to_bytes(limit_permits);
|
||||
COMPACTION_MEMORY_LIMIT.set(limit_aligned_bytes as i64);
|
||||
|
||||
Self {
|
||||
quota: Some(MemoryQuota {
|
||||
semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
|
||||
limit_permits,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the configured limit in bytes (0 if unlimited).
|
||||
pub fn limit_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.limit_permits))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Returns currently used bytes.
|
||||
pub fn used_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.used_permits()))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Returns available bytes.
|
||||
pub fn available_bytes(&self) -> u64 {
|
||||
self.quota
|
||||
.as_ref()
|
||||
.map(|quota| permits_to_bytes(quota.available_permits_clamped()))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Acquires memory, waiting if necessary until enough is available.
|
||||
///
|
||||
/// # Errors
|
||||
/// - Returns error if requested bytes exceed the total limit
|
||||
/// - Returns error if the semaphore is unexpectedly closed
|
||||
pub async fn acquire(&self, bytes: u64) -> Result<CompactionMemoryGuard> {
|
||||
match &self.quota {
|
||||
None => Ok(CompactionMemoryGuard::unlimited()),
|
||||
Some(quota) => {
|
||||
let permits = bytes_to_permits(bytes);
|
||||
|
||||
// Fail-fast: reject if request exceeds total limit.
|
||||
ensure!(
|
||||
permits <= quota.limit_permits,
|
||||
CompactionMemoryLimitExceededSnafu {
|
||||
requested_bytes: bytes,
|
||||
limit_bytes: permits_to_bytes(quota.limit_permits),
|
||||
}
|
||||
);
|
||||
|
||||
let permit = quota
|
||||
.semaphore
|
||||
.clone()
|
||||
.acquire_many_owned(permits)
|
||||
.await
|
||||
.map_err(|_| CompactionMemorySemaphoreClosedSnafu.build())?;
|
||||
quota.update_in_use_metric();
|
||||
Ok(CompactionMemoryGuard::limited(permit, quota.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to acquire memory. Returns Some(guard) on success, None if insufficient.
|
||||
pub fn try_acquire(&self, bytes: u64) -> Option<CompactionMemoryGuard> {
|
||||
match &self.quota {
|
||||
None => Some(CompactionMemoryGuard::unlimited()),
|
||||
Some(quota) => {
|
||||
let permits = bytes_to_permits(bytes);
|
||||
|
||||
match quota.semaphore.clone().try_acquire_many_owned(permits) {
|
||||
Ok(permit) => {
|
||||
quota.update_in_use_metric();
|
||||
Some(CompactionMemoryGuard::limited(permit, quota.clone()))
|
||||
}
|
||||
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
|
||||
COMPACTION_MEMORY_REJECTED
|
||||
.with_label_values(&["try_acquire"])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MemoryQuota {
|
||||
fn used_permits(&self) -> u32 {
|
||||
self.limit_permits
|
||||
.saturating_sub(self.available_permits_clamped())
|
||||
}
|
||||
|
||||
fn available_permits_clamped(&self) -> u32 {
|
||||
// Clamp to limit_permits to ensure we never report more available permits
|
||||
// than our configured limit, even if semaphore state becomes inconsistent.
|
||||
self.semaphore
|
||||
.available_permits()
|
||||
.min(self.limit_permits as usize) as u32
|
||||
}
|
||||
|
||||
fn update_in_use_metric(&self) {
|
||||
let bytes = permits_to_bytes(self.used_permits());
|
||||
COMPACTION_MEMORY_IN_USE.set(bytes as i64);
|
||||
}
|
||||
}
|
||||
|
||||
/// Guard representing a slice of reserved compaction memory.
|
||||
///
|
||||
/// Memory is automatically released when this guard is dropped.
|
||||
pub struct CompactionMemoryGuard {
|
||||
state: GuardState,
|
||||
}
|
||||
|
||||
enum GuardState {
|
||||
Unlimited,
|
||||
Limited {
|
||||
// Holds all permits owned by this guard (base plus any additional).
|
||||
// Additional requests merge into this permit and are released together on drop.
|
||||
permit: OwnedSemaphorePermit,
|
||||
// Memory quota for requesting additional permits and updating metrics.
|
||||
quota: MemoryQuota,
|
||||
},
|
||||
}
|
||||
|
||||
impl CompactionMemoryGuard {
|
||||
fn unlimited() -> Self {
|
||||
Self {
|
||||
state: GuardState::Unlimited,
|
||||
}
|
||||
}
|
||||
|
||||
fn limited(permit: OwnedSemaphorePermit, quota: MemoryQuota) -> Self {
|
||||
Self {
|
||||
state: GuardState::Limited { permit, quota },
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns granted quota in bytes.
|
||||
pub fn granted_bytes(&self) -> u64 {
|
||||
match &self.state {
|
||||
GuardState::Unlimited => 0,
|
||||
GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to allocate additional memory during task execution.
|
||||
///
|
||||
/// On success, merges the new memory into this guard and returns true.
|
||||
/// On failure, returns false and leaves this guard unchanged.
|
||||
///
|
||||
/// # Behavior
|
||||
/// - Running tasks can request additional memory on top of their initial allocation
|
||||
/// - If total memory (all tasks) would exceed limit, returns false immediately
|
||||
/// - The task should gracefully handle false by failing or adjusting its strategy
|
||||
/// - The additional memory is merged into this guard and released together on drop
|
||||
pub fn request_additional(&mut self, bytes: u64) -> bool {
|
||||
match &mut self.state {
|
||||
GuardState::Unlimited => true,
|
||||
GuardState::Limited { permit, quota } => {
|
||||
// Early return for zero-byte requests (no-op)
|
||||
if bytes == 0 {
|
||||
return true;
|
||||
}
|
||||
|
||||
let additional_permits = bytes_to_permits(bytes);
|
||||
|
||||
// Try to acquire additional permits from the quota
|
||||
match quota
|
||||
.semaphore
|
||||
.clone()
|
||||
.try_acquire_many_owned(additional_permits)
|
||||
{
|
||||
Ok(additional_permit) => {
|
||||
// Merge into main permit
|
||||
permit.merge(additional_permit);
|
||||
quota.update_in_use_metric();
|
||||
|
||||
debug!("Allocated additional {} bytes", bytes);
|
||||
|
||||
true
|
||||
}
|
||||
Err(TryAcquireError::NoPermits) | Err(TryAcquireError::Closed) => {
|
||||
COMPACTION_MEMORY_REJECTED
|
||||
.with_label_values(&["request_additional"])
|
||||
.inc();
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Releases a portion of granted memory back to the pool early,
|
||||
/// before the guard is dropped.
|
||||
///
|
||||
/// This is useful when a task's memory requirement decreases during execution
|
||||
/// (e.g., after completing a memory-intensive phase). The guard remains valid
|
||||
/// with reduced quota, and the task can continue running.
|
||||
pub fn early_release_partial(&mut self, bytes: u64) -> bool {
|
||||
match &mut self.state {
|
||||
GuardState::Unlimited => true,
|
||||
GuardState::Limited { permit, quota } => {
|
||||
// Early return for zero-byte requests (no-op)
|
||||
if bytes == 0 {
|
||||
return true;
|
||||
}
|
||||
|
||||
let release_permits = bytes_to_permits(bytes);
|
||||
|
||||
// Split out the permits we want to release
|
||||
match permit.split(release_permits as usize) {
|
||||
Some(released_permit) => {
|
||||
let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
|
||||
|
||||
// Drop the split permit to return it to the quota
|
||||
drop(released_permit);
|
||||
quota.update_in_use_metric();
|
||||
|
||||
debug!(
|
||||
"Early released {} bytes from compaction memory guard",
|
||||
released_bytes
|
||||
);
|
||||
|
||||
true
|
||||
}
|
||||
None => {
|
||||
// Requested release exceeds granted amount
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for CompactionMemoryGuard {
|
||||
fn drop(&mut self) {
|
||||
if let GuardState::Limited { permit, quota } =
|
||||
mem::replace(&mut self.state, GuardState::Unlimited)
|
||||
{
|
||||
let bytes = permits_to_bytes(permit.num_permits() as u32);
|
||||
|
||||
// Release permits before updating metrics to reflect latest usage.
|
||||
drop(permit);
|
||||
quota.update_in_use_metric();
|
||||
|
||||
debug!("Released compaction memory: {} bytes", bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for CompactionMemoryGuard {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("CompactionMemoryGuard")
|
||||
.field("granted_bytes", &self.granted_bytes())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
fn bytes_to_permits(bytes: u64) -> u32 {
|
||||
// Round up to the nearest permit.
|
||||
// Returns 0 for bytes=0, which allows lazy allocation via request_additional().
|
||||
// Non-zero bytes always round up to at least 1 permit due to the math.
|
||||
bytes
|
||||
.saturating_add(PERMIT_GRANULARITY_BYTES - 1)
|
||||
.saturating_div(PERMIT_GRANULARITY_BYTES)
|
||||
.min(Semaphore::MAX_PERMITS as u64)
|
||||
.min(u32::MAX as u64) as u32
|
||||
}
|
||||
|
||||
fn permits_to_bytes(permits: u32) -> u64 {
|
||||
(permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tokio::time::{Duration, sleep};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_unlimited() {
|
||||
let manager = CompactionMemoryManager::new(0);
|
||||
let guard = manager.try_acquire(10 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(manager.limit_bytes(), 0);
|
||||
assert_eq!(guard.granted_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_limited_success_and_release() {
|
||||
let bytes = 2 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(bytes);
|
||||
{
|
||||
let guard = manager.try_acquire(PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), PERMIT_GRANULARITY_BYTES);
|
||||
drop(guard);
|
||||
}
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_acquire_exceeds_limit() {
|
||||
let limit = PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
let result = manager.try_acquire(limit + PERMIT_GRANULARITY_BYTES);
|
||||
assert!(result.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn test_acquire_blocks_and_unblocks() {
|
||||
let bytes = 2 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(bytes);
|
||||
let guard = manager.try_acquire(bytes).unwrap();
|
||||
|
||||
// Spawn a task that will block on acquire()
|
||||
let waiter = {
|
||||
let manager = manager.clone();
|
||||
tokio::spawn(async move {
|
||||
// This will block until memory is available
|
||||
let _guard = manager.acquire(bytes).await.unwrap();
|
||||
})
|
||||
};
|
||||
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
// Release memory - this should unblock the waiter
|
||||
drop(guard);
|
||||
|
||||
// Waiter should complete now
|
||||
waiter.await.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_success() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
// Acquire base quota (5MB)
|
||||
let base = 5 * PERMIT_GRANULARITY_BYTES;
|
||||
let mut guard = manager.try_acquire(base).unwrap();
|
||||
assert_eq!(guard.granted_bytes(), base);
|
||||
assert_eq!(manager.used_bytes(), base);
|
||||
|
||||
// Request additional memory (3MB) - should succeed and merge
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_exceeds_limit() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES; // 10MB limit
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
// Acquire base quota (5MB)
|
||||
let base = 5 * PERMIT_GRANULARITY_BYTES;
|
||||
let mut guard = manager.try_acquire(base).unwrap();
|
||||
|
||||
// Request additional memory (3MB) - should succeed
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Request more (3MB) - should fail (would exceed 10MB limit)
|
||||
let result = guard.request_additional(3 * PERMIT_GRANULARITY_BYTES);
|
||||
assert!(!result);
|
||||
|
||||
// Still at 8MB
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(guard.granted_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_auto_release_on_guard_drop() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
{
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Request additional - memory is merged into guard
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// When guard drops, all memory (base + additional) is released together
|
||||
}
|
||||
|
||||
// After scope, all memory should be released
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_unlimited() {
|
||||
let manager = CompactionMemoryManager::new(0); // Unlimited
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Should always succeed with unlimited manager
|
||||
assert!(guard.request_additional(100 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 0);
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_additional_zero_bytes() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Request 0 bytes should succeed without affecting anything
|
||||
assert!(guard.request_additional(0));
|
||||
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_early_release_partial_success() {
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
let mut guard = manager.try_acquire(8 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Release half
|
||||
assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 4 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Released memory should be available to others
|
||||
let _guard2 = manager.try_acquire(4 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
assert_eq!(manager.used_bytes(), 8 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_early_release_partial_exceeds_granted() {
|
||||
let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES);
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Try to release more than granted - should fail
|
||||
assert!(!guard.early_release_partial(10 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_early_release_partial_unlimited() {
|
||||
let manager = CompactionMemoryManager::new(0);
|
||||
let mut guard = manager.try_acquire(100 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Unlimited guard - release should succeed (no-op)
|
||||
assert!(guard.early_release_partial(50 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_request_and_early_release_symmetry() {
|
||||
let limit = 20 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
let mut guard = manager.try_acquire(5 * PERMIT_GRANULARITY_BYTES).unwrap();
|
||||
|
||||
// Request additional
|
||||
assert!(guard.request_additional(5 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 10 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Early release some
|
||||
assert!(guard.early_release_partial(3 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 7 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Request again
|
||||
assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 9 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
// Early release again
|
||||
assert!(guard.early_release_partial(4 * PERMIT_GRANULARITY_BYTES));
|
||||
assert_eq!(guard.granted_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
assert_eq!(manager.used_bytes(), 5 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
drop(guard);
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_small_allocation_rounds_up() {
|
||||
// Test that allocations smaller than PERMIT_GRANULARITY_BYTES
|
||||
// round up to 1 permit and can use request_additional()
|
||||
let limit = 10 * PERMIT_GRANULARITY_BYTES;
|
||||
let manager = CompactionMemoryManager::new(limit);
|
||||
|
||||
let mut guard = manager.try_acquire(512 * 1024).unwrap(); // 512KB
|
||||
assert_eq!(guard.granted_bytes(), PERMIT_GRANULARITY_BYTES); // Rounds up to 1MB
|
||||
assert!(guard.request_additional(2 * PERMIT_GRANULARITY_BYTES)); // Can request more
|
||||
assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_acquire_zero_bytes_lazy_allocation() {
|
||||
// Test that acquire(0) returns 0 permits but can request_additional() later
|
||||
let manager = CompactionMemoryManager::new(10 * PERMIT_GRANULARITY_BYTES);
|
||||
|
||||
let mut guard = manager.try_acquire(0).unwrap();
|
||||
assert_eq!(guard.granted_bytes(), 0); // No permits consumed
|
||||
assert_eq!(manager.used_bytes(), 0);
|
||||
|
||||
assert!(guard.request_additional(3 * PERMIT_GRANULARITY_BYTES)); // Lazy allocation
|
||||
assert_eq!(guard.granted_bytes(), 3 * PERMIT_GRANULARITY_BYTES);
|
||||
}
|
||||
}
|
||||
@@ -22,13 +22,10 @@ use snafu::ResultExt;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::compaction::compactor::{CompactionRegion, Compactor};
|
||||
use crate::compaction::memory_manager::{
|
||||
CompactionMemoryGuard, CompactionMemoryManager, OnExhaustedPolicy,
|
||||
};
|
||||
use crate::compaction::picker::{CompactionTask, PickerOutput};
|
||||
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
|
||||
use crate::error::CompactRegionSnafu;
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
|
||||
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
|
||||
use crate::region::RegionRoleState;
|
||||
use crate::request::{
|
||||
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult,
|
||||
@@ -55,12 +52,6 @@ pub(crate) struct CompactionTaskImpl {
|
||||
pub(crate) compactor: Arc<dyn Compactor>,
|
||||
/// Output of the picker.
|
||||
pub(crate) picker_output: PickerOutput,
|
||||
/// Memory manager to acquire memory budget.
|
||||
pub(crate) memory_manager: Arc<CompactionMemoryManager>,
|
||||
/// Policy when memory is exhausted.
|
||||
pub(crate) memory_policy: OnExhaustedPolicy,
|
||||
/// Estimated memory bytes needed for this compaction.
|
||||
pub(crate) estimated_memory_bytes: u64,
|
||||
}
|
||||
|
||||
impl Debug for CompactionTaskImpl {
|
||||
@@ -90,86 +81,6 @@ impl CompactionTaskImpl {
|
||||
.for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
|
||||
}
|
||||
|
||||
/// Acquires memory budget based on the configured policy.
|
||||
///
|
||||
/// Returns an error if memory cannot be acquired according to the policy.
|
||||
async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
|
||||
let region_id = self.compaction_region.region_id;
|
||||
let requested_bytes = self.estimated_memory_bytes;
|
||||
let limit_bytes = self.memory_manager.limit_bytes();
|
||||
|
||||
if limit_bytes > 0 && requested_bytes > limit_bytes {
|
||||
warn!(
|
||||
"Compaction for region {} requires {} bytes but limit is {} bytes; cannot satisfy request",
|
||||
region_id, requested_bytes, limit_bytes
|
||||
);
|
||||
return Err(CompactionMemoryExhaustedSnafu {
|
||||
region_id,
|
||||
required_bytes: requested_bytes,
|
||||
limit_bytes,
|
||||
policy: "exceed_limit".to_string(),
|
||||
}
|
||||
.build());
|
||||
}
|
||||
|
||||
match self.memory_policy {
|
||||
OnExhaustedPolicy::Wait {
|
||||
timeout: wait_timeout,
|
||||
} => {
|
||||
let timer = COMPACTION_MEMORY_WAIT.start_timer();
|
||||
|
||||
match tokio::time::timeout(
|
||||
wait_timeout,
|
||||
self.memory_manager.acquire(requested_bytes),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(guard)) => {
|
||||
timer.observe_duration();
|
||||
Ok(guard)
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
timer.observe_duration();
|
||||
Err(e)
|
||||
}
|
||||
Err(_) => {
|
||||
timer.observe_duration();
|
||||
warn!(
|
||||
"Compaction for region {} waited {:?} for {} bytes but timed out",
|
||||
region_id, wait_timeout, requested_bytes
|
||||
);
|
||||
CompactionMemoryExhaustedSnafu {
|
||||
region_id,
|
||||
required_bytes: requested_bytes,
|
||||
limit_bytes,
|
||||
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
OnExhaustedPolicy::Skip => {
|
||||
// Try to acquire, skip if not immediately available
|
||||
self.memory_manager
|
||||
.try_acquire(requested_bytes)
|
||||
.ok_or_else(|| {
|
||||
warn!(
|
||||
"Skipping compaction for region {} due to memory limit \
|
||||
(need {} bytes, limit {} bytes)",
|
||||
region_id, requested_bytes, limit_bytes
|
||||
);
|
||||
CompactionMemoryExhaustedSnafu {
|
||||
region_id,
|
||||
required_bytes: requested_bytes,
|
||||
limit_bytes,
|
||||
policy: "skip".to_string(),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove expired ssts files, update manifest immediately
|
||||
/// and apply the edit to region version.
|
||||
///
|
||||
@@ -311,7 +222,7 @@ impl CompactionTaskImpl {
|
||||
}
|
||||
|
||||
/// Handles compaction failure, notifies all waiters.
|
||||
pub(crate) fn on_failure(&mut self, err: Arc<error::Error>) {
|
||||
fn on_failure(&mut self, err: Arc<error::Error>) {
|
||||
COMPACTION_FAILURE_COUNT.inc();
|
||||
for waiter in self.waiters.drain(..) {
|
||||
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
|
||||
@@ -338,26 +249,6 @@ impl CompactionTaskImpl {
|
||||
#[async_trait::async_trait]
|
||||
impl CompactionTask for CompactionTaskImpl {
|
||||
async fn run(&mut self) {
|
||||
// Acquire memory budget before starting compaction
|
||||
let _memory_guard = match self.acquire_memory_with_policy().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
error!(e; "Failed to acquire memory for compaction, region id: {}", self.compaction_region.region_id);
|
||||
let err = Arc::new(e);
|
||||
self.on_failure(err.clone());
|
||||
let notify = BackgroundNotify::CompactionFailed(CompactionFailed {
|
||||
region_id: self.compaction_region.region_id,
|
||||
err,
|
||||
});
|
||||
self.send_to_worker(WorkerRequest::Background {
|
||||
region_id: self.compaction_region.region_id,
|
||||
notify,
|
||||
})
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let notify = match self.handle_expiration_and_compaction().await {
|
||||
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
|
||||
region_id: self.compaction_region.region_id,
|
||||
|
||||
@@ -74,7 +74,6 @@ pub fn new_file_handle_with_size_and_sequence(
|
||||
),
|
||||
level,
|
||||
file_size,
|
||||
max_row_group_uncompressed_size: file_size,
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
|
||||
@@ -26,7 +26,6 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
|
||||
use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
|
||||
use crate::compaction::memory_manager::OnExhaustedPolicy;
|
||||
use crate::error::Result;
|
||||
use crate::gc::GcConfig;
|
||||
use crate::memtable::MemtableConfig;
|
||||
@@ -93,10 +92,6 @@ pub struct MitoConfig {
|
||||
pub max_background_compactions: usize,
|
||||
/// Max number of running background purge jobs (default: number of cpu cores).
|
||||
pub max_background_purges: usize,
|
||||
/// Memory budget for compaction tasks. Setting it to 0 or "unlimited" disables the limit.
|
||||
pub experimental_compaction_memory_limit: MemoryLimit,
|
||||
/// Behavior when compaction cannot acquire memory from the budget.
|
||||
pub experimental_compaction_on_exhausted: OnExhaustedPolicy,
|
||||
|
||||
// Flush configs:
|
||||
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
|
||||
@@ -183,8 +178,6 @@ impl Default for MitoConfig {
|
||||
max_background_flushes: divide_num_cpus(2),
|
||||
max_background_compactions: divide_num_cpus(4),
|
||||
max_background_purges: get_total_cpu_cores(),
|
||||
experimental_compaction_memory_limit: MemoryLimit::Unlimited,
|
||||
experimental_compaction_on_exhausted: OnExhaustedPolicy::default(),
|
||||
auto_flush_interval: Duration::from_secs(30 * 60),
|
||||
global_write_buffer_size: ReadableSize::gb(1),
|
||||
global_write_buffer_reject_size: ReadableSize::gb(2),
|
||||
|
||||
@@ -1041,40 +1041,6 @@ pub enum Error {
|
||||
#[snafu(display("Manual compaction is override by following operations."))]
|
||||
ManualCompactionOverride {},
|
||||
|
||||
#[snafu(display(
|
||||
"Compaction memory limit exceeded for region {}: required {} bytes, limit {} bytes (policy: {})",
|
||||
region_id,
|
||||
required_bytes,
|
||||
limit_bytes,
|
||||
policy
|
||||
))]
|
||||
CompactionMemoryExhausted {
|
||||
region_id: RegionId,
|
||||
required_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
policy: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Requested compaction memory ({} bytes) exceeds total limit ({} bytes)",
|
||||
requested_bytes,
|
||||
limit_bytes
|
||||
))]
|
||||
CompactionMemoryLimitExceeded {
|
||||
requested_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Compaction memory semaphore unexpectedly closed"))]
|
||||
CompactionMemorySemaphoreClosed {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Incompatible WAL provider change. This is typically caused by changing WAL provider in database config file without completely cleaning existing files. Global provider: {}, region provider: {}",
|
||||
global,
|
||||
@@ -1357,12 +1323,6 @@ impl ErrorExt for Error {
|
||||
|
||||
ManualCompactionOverride {} => StatusCode::Cancelled,
|
||||
|
||||
CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
CompactionMemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
CompactionMemorySemaphoreClosed { .. } => StatusCode::Unexpected,
|
||||
|
||||
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
ScanSeries { source, .. } => source.status_code(),
|
||||
|
||||
@@ -640,7 +640,6 @@ impl RegionFlushTask {
|
||||
time_range: sst_info.time_range,
|
||||
level: 0,
|
||||
file_size: sst_info.file_size,
|
||||
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
|
||||
available_indexes: sst_info.index_metadata.build_available_indexes(),
|
||||
indexes: sst_info.index_metadata.build_indexes(),
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
|
||||
@@ -244,7 +244,6 @@ async fn checkpoint_with_different_compression_types() {
|
||||
time_range: (0.into(), 10000000.into()),
|
||||
level: 0,
|
||||
file_size: 1024000,
|
||||
max_row_group_uncompressed_size: 1024000,
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
@@ -310,7 +309,6 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
|
||||
time_range: (0.into(), 10000000.into()),
|
||||
level: 0,
|
||||
file_size: 1024000,
|
||||
max_row_group_uncompressed_size: 1024000,
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
|
||||
@@ -974,19 +974,6 @@ impl EncodedBulkPart {
|
||||
/// Returns a `SstInfo` instance with information derived from this bulk part's metadata
|
||||
pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
|
||||
let unit = self.metadata.region_metadata.time_index_type().unit();
|
||||
let max_row_group_uncompressed_size: u64 = self
|
||||
.metadata
|
||||
.parquet_metadata
|
||||
.row_groups()
|
||||
.iter()
|
||||
.map(|rg| {
|
||||
rg.columns()
|
||||
.iter()
|
||||
.map(|c| c.uncompressed_size() as u64)
|
||||
.sum::<u64>()
|
||||
})
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
SstInfo {
|
||||
file_id,
|
||||
time_range: (
|
||||
@@ -994,7 +981,6 @@ impl EncodedBulkPart {
|
||||
Timestamp::new(self.metadata.max_timestamp, unit),
|
||||
),
|
||||
file_size: self.data.len() as u64,
|
||||
max_row_group_uncompressed_size,
|
||||
num_rows: self.metadata.num_rows,
|
||||
num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
|
||||
file_metadata: Some(self.metadata.parquet_metadata.clone()),
|
||||
|
||||
@@ -157,35 +157,6 @@ lazy_static! {
|
||||
"greptime_mito_inflight_compaction_count",
|
||||
"inflight compaction count",
|
||||
).unwrap();
|
||||
|
||||
/// Bytes reserved by compaction memory manager.
|
||||
pub static ref COMPACTION_MEMORY_IN_USE: IntGauge =
|
||||
register_int_gauge!(
|
||||
"greptime_mito_compaction_memory_in_use_bytes",
|
||||
"bytes currently reserved for compaction tasks",
|
||||
)
|
||||
.unwrap();
|
||||
/// Configured compaction memory limit.
|
||||
pub static ref COMPACTION_MEMORY_LIMIT: IntGauge =
|
||||
register_int_gauge!(
|
||||
"greptime_mito_compaction_memory_limit_bytes",
|
||||
"maximum bytes allowed for compaction tasks",
|
||||
)
|
||||
.unwrap();
|
||||
/// Wait time to obtain compaction memory.
|
||||
pub static ref COMPACTION_MEMORY_WAIT: Histogram = register_histogram!(
|
||||
"greptime_mito_compaction_memory_wait_seconds",
|
||||
"time waiting for compaction memory",
|
||||
// 0.01s ~ ~10s
|
||||
exponential_buckets(0.01, 2.0, 10).unwrap(),
|
||||
).unwrap();
|
||||
/// Counter of rejected compaction memory allocations.
|
||||
pub static ref COMPACTION_MEMORY_REJECTED: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_mito_compaction_memory_rejected_total",
|
||||
"number of compaction tasks rejected due to memory limit",
|
||||
&[TYPE_LABEL]
|
||||
).unwrap();
|
||||
}
|
||||
|
||||
// Query metrics.
|
||||
|
||||
@@ -1137,6 +1137,12 @@ impl ScanInput {
|
||||
self.files.len()
|
||||
}
|
||||
|
||||
/// Gets the file handle from a row group index.
|
||||
pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
|
||||
let file_index = index.index - self.num_memtables();
|
||||
&self.files[file_index]
|
||||
}
|
||||
|
||||
pub fn region_metadata(&self) -> &RegionMetadataRef {
|
||||
self.mapper.metadata()
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Utilities for scanners.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -26,6 +26,7 @@ use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder,
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::timestamp::timestamp_array_to_primitive;
|
||||
use futures::Stream;
|
||||
use lazy_static::lazy_static;
|
||||
use prometheus::IntGauge;
|
||||
use smallvec::SmallVec;
|
||||
use snafu::OptionExt;
|
||||
@@ -42,7 +43,7 @@ use crate::read::merge::{MergeMetrics, MergeMetricsReport};
|
||||
use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
|
||||
use crate::read::scan_region::StreamContext;
|
||||
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
use crate::sst::file::{FileTimeRange, RegionFileId};
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
|
||||
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
|
||||
@@ -52,6 +53,70 @@ use crate::sst::parquet::flat_format::time_index_column_index;
|
||||
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
|
||||
use crate::sst::parquet::row_group::ParquetFetchMetrics;
|
||||
|
||||
lazy_static! {
|
||||
/// Threshold for slow file scan warning in milliseconds.
|
||||
/// Can be configured via SLOW_FILE_SCAN_THRESHOLD_MS environment variable.
|
||||
/// Defaults to 1000ms (1 second).
|
||||
static ref SLOW_FILE_SCAN_THRESHOLD: Duration = {
|
||||
let threshold_ms = std::env::var("SLOW_FILE_SCAN_THRESHOLD_MS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse::<u64>().ok())
|
||||
.unwrap_or(1000);
|
||||
Duration::from_millis(threshold_ms)
|
||||
};
|
||||
}
|
||||
|
||||
/// Per-file scan metrics.
|
||||
#[derive(Default, Clone)]
|
||||
pub struct FileScanMetrics {
|
||||
/// Number of ranges (row groups) read from this file.
|
||||
pub num_ranges: usize,
|
||||
/// Number of rows read from this file.
|
||||
pub num_rows: usize,
|
||||
/// Time spent building file ranges/parts (file-level preparation).
|
||||
pub build_part_cost: Duration,
|
||||
/// Time spent building readers for this file (accumulated across all ranges).
|
||||
pub build_reader_cost: Duration,
|
||||
/// Time spent scanning this file (accumulated across all ranges).
|
||||
pub scan_cost: Duration,
|
||||
}
|
||||
|
||||
impl fmt::Debug for FileScanMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
|
||||
|
||||
if self.num_ranges > 0 {
|
||||
write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
|
||||
}
|
||||
if self.num_rows > 0 {
|
||||
write!(f, ", \"num_rows\":{}", self.num_rows)?;
|
||||
}
|
||||
if !self.build_reader_cost.is_zero() {
|
||||
write!(
|
||||
f,
|
||||
", \"build_reader_cost\":\"{:?}\"",
|
||||
self.build_reader_cost
|
||||
)?;
|
||||
}
|
||||
if !self.scan_cost.is_zero() {
|
||||
write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
|
||||
}
|
||||
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
impl FileScanMetrics {
|
||||
/// Merges another FileMetrics into this one.
|
||||
pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
|
||||
self.num_ranges += other.num_ranges;
|
||||
self.num_rows += other.num_rows;
|
||||
self.build_part_cost += other.build_part_cost;
|
||||
self.build_reader_cost += other.build_reader_cost;
|
||||
self.scan_cost += other.scan_cost;
|
||||
}
|
||||
}
|
||||
|
||||
/// Verbose scan metrics for a partition.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct ScanMetricsSet {
|
||||
@@ -151,6 +216,8 @@ pub(crate) struct ScanMetricsSet {
|
||||
fetch_metrics: Option<ParquetFetchMetrics>,
|
||||
/// Metadata cache metrics.
|
||||
metadata_cache_metrics: Option<MetadataCacheMetrics>,
|
||||
/// Per-file scan metrics, only populated when explain_verbose is true.
|
||||
per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for ScanMetricsSet {
|
||||
@@ -199,6 +266,7 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
fulltext_index_apply_metrics,
|
||||
fetch_metrics,
|
||||
metadata_cache_metrics,
|
||||
per_file_metrics,
|
||||
} = self;
|
||||
|
||||
// Write core metrics
|
||||
@@ -326,6 +394,20 @@ impl fmt::Debug for ScanMetricsSet {
|
||||
write!(f, ", \"dedup_metrics\":{:?}", dedup_metrics)?;
|
||||
}
|
||||
|
||||
// Write per-file metrics if present and non-empty
|
||||
if let Some(file_metrics) = per_file_metrics
|
||||
&& !file_metrics.is_empty()
|
||||
{
|
||||
write!(f, ", \"per_file_metrics\": {{")?;
|
||||
for (i, (file_id, metrics)) in file_metrics.iter().enumerate() {
|
||||
if i > 0 {
|
||||
write!(f, ", ")?;
|
||||
}
|
||||
write!(f, "\"{}\": {:?}", file_id, metrics)?;
|
||||
}
|
||||
write!(f, "}}")?;
|
||||
}
|
||||
|
||||
write!(f, ", \"stream_eof\":{stream_eof}}}")
|
||||
}
|
||||
}
|
||||
@@ -432,6 +514,17 @@ impl ScanMetricsSet {
|
||||
.merge_from(metadata_cache_metrics);
|
||||
}
|
||||
|
||||
/// Merges per-file metrics.
|
||||
fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
|
||||
let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
|
||||
for (file_id, metrics) in other {
|
||||
self_file_metrics
|
||||
.entry(*file_id)
|
||||
.or_default()
|
||||
.merge_from(metrics);
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets distributor metrics.
|
||||
fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
|
||||
let SeriesDistributorMetrics {
|
||||
@@ -722,11 +815,20 @@ impl PartitionMetrics {
|
||||
}
|
||||
|
||||
/// Merges [ReaderMetrics] and `build_reader_cost`.
|
||||
pub fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
|
||||
pub fn merge_reader_metrics(
|
||||
&self,
|
||||
metrics: &ReaderMetrics,
|
||||
per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
|
||||
) {
|
||||
self.0.build_parts_cost.add_duration(metrics.build_cost);
|
||||
|
||||
let mut metrics_set = self.0.metrics.lock().unwrap();
|
||||
metrics_set.merge_reader_metrics(metrics);
|
||||
|
||||
// Merge per-file metrics if provided
|
||||
if let Some(file_metrics) = per_file_metrics {
|
||||
metrics_set.merge_per_file_metrics(file_metrics);
|
||||
}
|
||||
}
|
||||
|
||||
/// Finishes the query.
|
||||
@@ -938,13 +1040,44 @@ pub(crate) async fn scan_file_ranges(
|
||||
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
|
||||
.await?;
|
||||
part_metrics.inc_num_file_ranges(ranges.len());
|
||||
part_metrics.merge_reader_metrics(&reader_metrics);
|
||||
part_metrics.merge_reader_metrics(&reader_metrics, None);
|
||||
|
||||
// Creates initial per-file metrics with build_part_cost.
|
||||
let init_per_file_metrics = if part_metrics.explain_verbose() {
|
||||
let file = stream_ctx.input.file_from_index(index);
|
||||
let file_id = file.file_id();
|
||||
|
||||
let mut map = HashMap::new();
|
||||
map.insert(
|
||||
file_id,
|
||||
FileScanMetrics {
|
||||
build_part_cost: reader_metrics.build_cost,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
Some(map)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Warn if build_part_cost exceeds threshold
|
||||
if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD {
|
||||
let file = stream_ctx.input.file_from_index(index);
|
||||
let file_id = file.file_id();
|
||||
common_telemetry::warn!(
|
||||
"Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}",
|
||||
part_metrics.0.region_id,
|
||||
file_id,
|
||||
reader_metrics.build_cost
|
||||
);
|
||||
}
|
||||
|
||||
Ok(build_file_range_scan_stream(
|
||||
stream_ctx,
|
||||
part_metrics,
|
||||
read_type,
|
||||
ranges,
|
||||
init_per_file_metrics,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -964,13 +1097,44 @@ pub(crate) async fn scan_flat_file_ranges(
|
||||
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
|
||||
.await?;
|
||||
part_metrics.inc_num_file_ranges(ranges.len());
|
||||
part_metrics.merge_reader_metrics(&reader_metrics);
|
||||
part_metrics.merge_reader_metrics(&reader_metrics, None);
|
||||
|
||||
// Creates initial per-file metrics with build_part_cost.
|
||||
let init_per_file_metrics = if part_metrics.explain_verbose() {
|
||||
let file = stream_ctx.input.file_from_index(index);
|
||||
let file_id = file.file_id();
|
||||
|
||||
let mut map = HashMap::new();
|
||||
map.insert(
|
||||
file_id,
|
||||
FileScanMetrics {
|
||||
build_part_cost: reader_metrics.build_cost,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
Some(map)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Warn if build_part_cost exceeds threshold
|
||||
if reader_metrics.build_cost > *SLOW_FILE_SCAN_THRESHOLD {
|
||||
let file = stream_ctx.input.file_from_index(index);
|
||||
let file_id = file.file_id();
|
||||
common_telemetry::warn!(
|
||||
"Slow file part build detected - region_id: {}, file_id: {}, build_part_cost: {:?}",
|
||||
part_metrics.0.region_id,
|
||||
file_id,
|
||||
reader_metrics.build_cost
|
||||
);
|
||||
}
|
||||
|
||||
Ok(build_flat_file_range_scan_stream(
|
||||
stream_ctx,
|
||||
part_metrics,
|
||||
read_type,
|
||||
ranges,
|
||||
init_per_file_metrics,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -980,6 +1144,7 @@ pub fn build_file_range_scan_stream(
|
||||
part_metrics: PartitionMetrics,
|
||||
read_type: &'static str,
|
||||
ranges: SmallVec<[FileRange; 2]>,
|
||||
mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
|
||||
) -> impl Stream<Item = Result<Batch>> {
|
||||
try_stream! {
|
||||
let fetch_metrics = if part_metrics.explain_verbose() {
|
||||
@@ -1006,6 +1171,34 @@ pub fn build_file_range_scan_stream(
|
||||
}
|
||||
if let Source::PruneReader(reader) = source {
|
||||
let prune_metrics = reader.metrics();
|
||||
|
||||
// Warn if build_cost + scan_cost exceeds threshold
|
||||
let total_cost = build_cost + prune_metrics.scan_cost;
|
||||
if total_cost > *SLOW_FILE_SCAN_THRESHOLD {
|
||||
let file_id = range.file_handle().file_id();
|
||||
common_telemetry::warn!(
|
||||
"Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})",
|
||||
part_metrics.0.region_id,
|
||||
file_id,
|
||||
total_cost,
|
||||
build_cost,
|
||||
prune_metrics.scan_cost
|
||||
);
|
||||
}
|
||||
|
||||
// Update per-file metrics if tracking is enabled
|
||||
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
|
||||
let file_id = range.file_handle().file_id();
|
||||
let file_metrics = file_metrics_map
|
||||
.entry(file_id)
|
||||
.or_insert_with(FileScanMetrics::default);
|
||||
|
||||
file_metrics.num_ranges += 1;
|
||||
file_metrics.num_rows += prune_metrics.num_rows;
|
||||
file_metrics.build_reader_cost += build_cost;
|
||||
file_metrics.scan_cost += prune_metrics.scan_cost;
|
||||
}
|
||||
|
||||
reader_metrics.merge_from(&prune_metrics);
|
||||
}
|
||||
}
|
||||
@@ -1013,7 +1206,7 @@ pub fn build_file_range_scan_stream(
|
||||
// Reports metrics.
|
||||
reader_metrics.observe_rows(read_type);
|
||||
reader_metrics.filter_metrics.observe();
|
||||
part_metrics.merge_reader_metrics(reader_metrics);
|
||||
part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1023,6 +1216,7 @@ pub fn build_flat_file_range_scan_stream(
|
||||
part_metrics: PartitionMetrics,
|
||||
read_type: &'static str,
|
||||
ranges: SmallVec<[FileRange; 2]>,
|
||||
mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
|
||||
) -> impl Stream<Item = Result<RecordBatch>> {
|
||||
try_stream! {
|
||||
let fetch_metrics = if part_metrics.explain_verbose() {
|
||||
@@ -1058,13 +1252,41 @@ pub fn build_flat_file_range_scan_stream(
|
||||
}
|
||||
|
||||
let prune_metrics = reader.metrics();
|
||||
|
||||
// Warn if build_cost + scan_cost exceeds threshold
|
||||
let total_cost = build_cost + prune_metrics.scan_cost;
|
||||
if total_cost > *SLOW_FILE_SCAN_THRESHOLD {
|
||||
let file_id = range.file_handle().file_id();
|
||||
common_telemetry::warn!(
|
||||
"Slow file scan detected - region_id: {}, file_id: {}, total_cost: {:?} (build_reader_cost: {:?}, scan_cost: {:?})",
|
||||
part_metrics.0.region_id,
|
||||
file_id,
|
||||
total_cost,
|
||||
build_cost,
|
||||
prune_metrics.scan_cost
|
||||
);
|
||||
}
|
||||
|
||||
// Update per-file metrics if tracking is enabled
|
||||
if let Some(file_metrics_map) = per_file_metrics.as_mut() {
|
||||
let file_id = range.file_handle().file_id();
|
||||
let file_metrics = file_metrics_map
|
||||
.entry(file_id)
|
||||
.or_insert_with(FileScanMetrics::default);
|
||||
|
||||
file_metrics.num_ranges += 1;
|
||||
file_metrics.num_rows += prune_metrics.num_rows;
|
||||
file_metrics.build_reader_cost += build_cost;
|
||||
file_metrics.scan_cost += prune_metrics.scan_cost;
|
||||
}
|
||||
|
||||
reader_metrics.merge_from(&prune_metrics);
|
||||
}
|
||||
|
||||
// Reports metrics.
|
||||
reader_metrics.observe_rows(read_type);
|
||||
reader_metrics.filter_metrics.observe();
|
||||
part_metrics.merge_reader_metrics(reader_metrics);
|
||||
part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -425,7 +425,6 @@ mod tests {
|
||||
time_range: FileTimeRange::default(),
|
||||
level: 0,
|
||||
file_size: 1024,
|
||||
max_row_group_uncompressed_size: 1024,
|
||||
available_indexes: SmallVec::new(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
|
||||
@@ -180,8 +180,6 @@ pub struct FileMeta {
|
||||
pub level: Level,
|
||||
/// Size of the file.
|
||||
pub file_size: u64,
|
||||
/// Maximum uncompressed row group size of the file. 0 means unknown.
|
||||
pub max_row_group_uncompressed_size: u64,
|
||||
/// Available indexes of the file.
|
||||
pub available_indexes: IndexTypes,
|
||||
/// Created indexes of the file for each column.
|
||||
@@ -250,11 +248,7 @@ impl Debug for FileMeta {
|
||||
)
|
||||
})
|
||||
.field("level", &self.level)
|
||||
.field("file_size", &ReadableSize(self.file_size))
|
||||
.field(
|
||||
"max_row_group_uncompressed_size",
|
||||
&ReadableSize(self.max_row_group_uncompressed_size),
|
||||
);
|
||||
.field("file_size", &ReadableSize(self.file_size));
|
||||
if !self.available_indexes.is_empty() {
|
||||
debug_struct
|
||||
.field("available_indexes", &self.available_indexes)
|
||||
@@ -658,7 +652,6 @@ mod tests {
|
||||
time_range: FileTimeRange::default(),
|
||||
level,
|
||||
file_size: 0,
|
||||
max_row_group_uncompressed_size: 0,
|
||||
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
indexes: vec![ColumnIndexMetadata {
|
||||
column_id: 0,
|
||||
@@ -710,7 +703,6 @@ mod tests {
|
||||
time_range: FileTimeRange::default(),
|
||||
level: 0,
|
||||
file_size: 0,
|
||||
max_row_group_uncompressed_size: 0,
|
||||
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
indexes: vec![ColumnIndexMetadata {
|
||||
column_id: 0,
|
||||
|
||||
@@ -251,7 +251,6 @@ mod tests {
|
||||
time_range: FileTimeRange::default(),
|
||||
level: 0,
|
||||
file_size: 4096,
|
||||
max_row_group_uncompressed_size: 4096,
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
@@ -322,7 +321,6 @@ mod tests {
|
||||
time_range: FileTimeRange::default(),
|
||||
level: 0,
|
||||
file_size: 4096,
|
||||
max_row_group_uncompressed_size: 4096,
|
||||
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
indexes: vec![ColumnIndexMetadata {
|
||||
column_id: 0,
|
||||
|
||||
@@ -224,7 +224,6 @@ mod tests {
|
||||
time_range: FileTimeRange::default(),
|
||||
level: 0,
|
||||
file_size: 4096,
|
||||
max_row_group_uncompressed_size: 4096,
|
||||
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
|
||||
indexes: vec![ColumnIndexMetadata {
|
||||
column_id: 0,
|
||||
|
||||
@@ -660,8 +660,8 @@ impl IndexBuildTask {
|
||||
// TODO(SNC123): optimize index batch
|
||||
loop {
|
||||
match parquet_reader.next_batch().await {
|
||||
Ok(Some(mut batch)) => {
|
||||
indexer.update(&mut batch).await;
|
||||
Ok(Some(batch)) => {
|
||||
indexer.update(&mut batch.clone()).await;
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
@@ -1549,7 +1549,6 @@ mod tests {
|
||||
region_id,
|
||||
file_id: sst_info.file_id,
|
||||
file_size: sst_info.file_size,
|
||||
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
num_rows: sst_info.num_rows as u64,
|
||||
num_row_groups: sst_info.num_row_groups,
|
||||
@@ -1624,7 +1623,6 @@ mod tests {
|
||||
region_id,
|
||||
file_id: sst_info.file_id,
|
||||
file_size: sst_info.file_size,
|
||||
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
num_rows: sst_info.num_rows as u64,
|
||||
num_row_groups: sst_info.num_row_groups,
|
||||
@@ -1728,7 +1726,6 @@ mod tests {
|
||||
region_id,
|
||||
file_id: sst_info.file_id,
|
||||
file_size: sst_info.file_size,
|
||||
max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
|
||||
index_file_size: sst_info.index_metadata.file_size,
|
||||
num_rows: sst_info.num_rows as u64,
|
||||
num_row_groups: sst_info.num_row_groups,
|
||||
|
||||
@@ -76,8 +76,6 @@ pub struct SstInfo {
|
||||
pub time_range: FileTimeRange,
|
||||
/// File size in bytes.
|
||||
pub file_size: u64,
|
||||
/// Maximum uncompressed row group size in bytes. 0 if unknown.
|
||||
pub max_row_group_uncompressed_size: u64,
|
||||
/// Number of rows.
|
||||
pub num_rows: usize,
|
||||
/// Number of row groups
|
||||
@@ -773,7 +771,6 @@ mod tests {
|
||||
time_range: info.time_range,
|
||||
level: 0,
|
||||
file_size: info.file_size,
|
||||
max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
|
||||
available_indexes: info.index_metadata.build_available_indexes(),
|
||||
indexes: info.index_metadata.build_indexes(),
|
||||
index_file_size: info.index_metadata.file_size,
|
||||
|
||||
@@ -213,23 +213,11 @@ where
|
||||
|
||||
// convert FileMetaData to ParquetMetaData
|
||||
let parquet_metadata = parse_parquet_metadata(file_meta)?;
|
||||
let max_row_group_uncompressed_size: u64 = parquet_metadata
|
||||
.row_groups()
|
||||
.iter()
|
||||
.map(|rg| {
|
||||
rg.columns()
|
||||
.iter()
|
||||
.map(|c| c.uncompressed_size() as u64)
|
||||
.sum::<u64>()
|
||||
})
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
let num_series = stats.series_estimator.finish();
|
||||
ssts.push(SstInfo {
|
||||
file_id: self.current_file,
|
||||
time_range,
|
||||
file_size,
|
||||
max_row_group_uncompressed_size,
|
||||
num_rows: stats.num_rows,
|
||||
num_row_groups: parquet_metadata.num_row_groups() as u64,
|
||||
file_metadata: Some(Arc::new(parquet_metadata)),
|
||||
|
||||
@@ -28,7 +28,6 @@ use tokio::sync::mpsc::Sender;
|
||||
use crate::access_layer::{AccessLayer, AccessLayerRef};
|
||||
use crate::cache::CacheManager;
|
||||
use crate::compaction::CompactionScheduler;
|
||||
use crate::compaction::memory_manager::{CompactionMemoryManager, OnExhaustedPolicy};
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Result;
|
||||
use crate::flush::FlushScheduler;
|
||||
@@ -101,8 +100,6 @@ impl SchedulerEnv {
|
||||
Arc::new(MitoConfig::default()),
|
||||
WorkerListener::default(),
|
||||
Plugins::new(),
|
||||
Arc::new(CompactionMemoryManager::new(0)),
|
||||
OnExhaustedPolicy::default(),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -123,7 +123,6 @@ pub fn sst_file_handle_with_file_id(file_id: FileId, start_ms: i64, end_ms: i64)
|
||||
),
|
||||
level: 0,
|
||||
file_size: 0,
|
||||
max_row_group_uncompressed_size: 0,
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
|
||||
@@ -101,7 +101,6 @@ impl VersionControlBuilder {
|
||||
),
|
||||
level: 0,
|
||||
file_size: 0, // We don't care file size.
|
||||
max_row_group_uncompressed_size: 0,
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
@@ -193,7 +192,6 @@ pub(crate) fn apply_edit(
|
||||
),
|
||||
level: 0,
|
||||
file_size: 0, // We don't care file size.
|
||||
max_row_group_uncompressed_size: 0,
|
||||
available_indexes: Default::default(),
|
||||
indexes: Default::default(),
|
||||
index_file_size: 0,
|
||||
|
||||
@@ -41,7 +41,6 @@ use common_base::readable_size::ReadableSize;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::SchemaMetadataManagerRef;
|
||||
use common_runtime::JoinHandle;
|
||||
use common_stat::get_total_memory_bytes;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use futures::future::try_join_all;
|
||||
use object_store::manager::ObjectStoreManagerRef;
|
||||
@@ -59,7 +58,6 @@ use tokio::sync::{Mutex, Semaphore, mpsc, oneshot, watch};
|
||||
use crate::cache::write_cache::{WriteCache, WriteCacheRef};
|
||||
use crate::cache::{CacheManager, CacheManagerRef};
|
||||
use crate::compaction::CompactionScheduler;
|
||||
use crate::compaction::memory_manager::CompactionMemoryManager;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{self, CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
|
||||
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
|
||||
@@ -207,17 +205,6 @@ impl WorkerGroup {
|
||||
.build(),
|
||||
);
|
||||
let time_provider = Arc::new(StdTimeProvider);
|
||||
let total_memory = get_total_memory_bytes();
|
||||
let total_memory = if total_memory > 0 {
|
||||
total_memory as u64
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let compaction_limit_bytes = config
|
||||
.experimental_compaction_memory_limit
|
||||
.resolve(total_memory);
|
||||
let compaction_memory_manager =
|
||||
Arc::new(CompactionMemoryManager::new(compaction_limit_bytes));
|
||||
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
|
||||
|
||||
let workers = (0..config.num_workers)
|
||||
@@ -234,7 +221,6 @@ impl WorkerGroup {
|
||||
purge_scheduler: purge_scheduler.clone(),
|
||||
listener: WorkerListener::default(),
|
||||
cache_manager: cache_manager.clone(),
|
||||
compaction_memory_manager: compaction_memory_manager.clone(),
|
||||
puffin_manager_factory: puffin_manager_factory.clone(),
|
||||
intermediate_manager: intermediate_manager.clone(),
|
||||
time_provider: time_provider.clone(),
|
||||
@@ -395,17 +381,6 @@ impl WorkerGroup {
|
||||
.write_cache(write_cache)
|
||||
.build(),
|
||||
);
|
||||
let total_memory = get_total_memory_bytes();
|
||||
let total_memory = if total_memory > 0 {
|
||||
total_memory as u64
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let compaction_limit_bytes = config
|
||||
.experimental_compaction_memory_limit
|
||||
.resolve(total_memory);
|
||||
let compaction_memory_manager =
|
||||
Arc::new(CompactionMemoryManager::new(compaction_limit_bytes));
|
||||
let gc_limiter = Arc::new(GcLimiter::new(config.gc.max_concurrent_gc_job));
|
||||
let workers = (0..config.num_workers)
|
||||
.map(|id| {
|
||||
@@ -421,7 +396,6 @@ impl WorkerGroup {
|
||||
purge_scheduler: purge_scheduler.clone(),
|
||||
listener: WorkerListener::new(listener.clone()),
|
||||
cache_manager: cache_manager.clone(),
|
||||
compaction_memory_manager: compaction_memory_manager.clone(),
|
||||
puffin_manager_factory: puffin_manager_factory.clone(),
|
||||
intermediate_manager: intermediate_manager.clone(),
|
||||
time_provider: time_provider.clone(),
|
||||
@@ -508,7 +482,6 @@ struct WorkerStarter<S> {
|
||||
purge_scheduler: SchedulerRef,
|
||||
listener: WorkerListener,
|
||||
cache_manager: CacheManagerRef,
|
||||
compaction_memory_manager: Arc<CompactionMemoryManager>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
intermediate_manager: IntermediateManager,
|
||||
time_provider: TimeProviderRef,
|
||||
@@ -561,11 +534,9 @@ impl<S: LogStore> WorkerStarter<S> {
|
||||
self.compact_job_pool,
|
||||
sender.clone(),
|
||||
self.cache_manager.clone(),
|
||||
self.config.clone(),
|
||||
self.config,
|
||||
self.listener.clone(),
|
||||
self.plugins.clone(),
|
||||
self.compaction_memory_manager.clone(),
|
||||
self.config.experimental_compaction_on_exhausted,
|
||||
),
|
||||
stalled_requests: StalledRequests::default(),
|
||||
listener: self.listener,
|
||||
|
||||
@@ -36,8 +36,8 @@ use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
|
||||
use datafusion::physical_plan::expressions::{CastExpr as PhyCast, Column as PhyColumn};
|
||||
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
|
||||
use datafusion::physical_plan::{
|
||||
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
|
||||
Partitioning, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
|
||||
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
|
||||
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
|
||||
};
|
||||
use datafusion::prelude::{Column, Expr};
|
||||
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
|
||||
@@ -180,33 +180,10 @@ impl HistogramFold {
|
||||
.index_of_column_by_name(None, &self.ts_column)
|
||||
.unwrap();
|
||||
|
||||
let tag_columns = exec_input
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, field)| {
|
||||
if idx == le_column_index || idx == field_column_index || idx == ts_column_index {
|
||||
None
|
||||
} else {
|
||||
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut partition_exprs = tag_columns.clone();
|
||||
partition_exprs.push(Arc::new(PhyColumn::new(
|
||||
self.input.schema().field(ts_column_index).name(),
|
||||
ts_column_index,
|
||||
)) as _);
|
||||
|
||||
let output_schema: SchemaRef = self.output_schema.inner().clone();
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::Hash(
|
||||
partition_exprs.clone(),
|
||||
exec_input.output_partitioning().partition_count(),
|
||||
),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
);
|
||||
@@ -215,8 +192,6 @@ impl HistogramFold {
|
||||
field_column_index,
|
||||
ts_column_index,
|
||||
input: exec_input,
|
||||
tag_columns,
|
||||
partition_exprs,
|
||||
quantile: self.quantile.into(),
|
||||
output_schema,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
@@ -278,9 +253,6 @@ pub struct HistogramFoldExec {
|
||||
/// Index for field column in the schema of input.
|
||||
field_column_index: usize,
|
||||
ts_column_index: usize,
|
||||
/// Tag columns are all columns except `le`, `field` and `ts` columns.
|
||||
tag_columns: Vec<Arc<dyn PhysicalExpr>>,
|
||||
partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
|
||||
quantile: f64,
|
||||
metric: ExecutionPlanMetricsSet,
|
||||
properties: PlanProperties,
|
||||
@@ -297,10 +269,10 @@ impl ExecutionPlan for HistogramFoldExec {
|
||||
|
||||
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
|
||||
let mut cols = self
|
||||
.tag_columns
|
||||
.iter()
|
||||
.tag_col_exprs()
|
||||
.into_iter()
|
||||
.map(|expr| PhysicalSortRequirement {
|
||||
expr: expr.clone(),
|
||||
expr,
|
||||
options: None,
|
||||
})
|
||||
.collect::<Vec<PhysicalSortRequirement>>();
|
||||
@@ -335,7 +307,7 @@ impl ExecutionPlan for HistogramFoldExec {
|
||||
}
|
||||
|
||||
fn required_input_distribution(&self) -> Vec<Distribution> {
|
||||
vec![Distribution::HashPartitioned(self.partition_exprs.clone())]
|
||||
self.input.required_input_distribution()
|
||||
}
|
||||
|
||||
fn maintains_input_order(&self) -> Vec<bool> {
|
||||
@@ -352,27 +324,15 @@ impl ExecutionPlan for HistogramFoldExec {
|
||||
children: Vec<Arc<dyn ExecutionPlan>>,
|
||||
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
|
||||
assert!(!children.is_empty());
|
||||
let new_input = children[0].clone();
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(self.output_schema.clone()),
|
||||
Partitioning::Hash(
|
||||
self.partition_exprs.clone(),
|
||||
new_input.output_partitioning().partition_count(),
|
||||
),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
);
|
||||
Ok(Arc::new(Self {
|
||||
input: new_input,
|
||||
input: children[0].clone(),
|
||||
metric: self.metric.clone(),
|
||||
le_column_index: self.le_column_index,
|
||||
ts_column_index: self.ts_column_index,
|
||||
tag_columns: self.tag_columns.clone(),
|
||||
partition_exprs: self.partition_exprs.clone(),
|
||||
quantile: self.quantile,
|
||||
output_schema: self.output_schema.clone(),
|
||||
field_column_index: self.field_column_index,
|
||||
properties,
|
||||
properties: self.properties.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -434,6 +394,30 @@ impl ExecutionPlan for HistogramFoldExec {
|
||||
}
|
||||
}
|
||||
|
||||
impl HistogramFoldExec {
|
||||
/// Return all the [PhysicalExpr] of tag columns in order.
|
||||
///
|
||||
/// Tag columns are all columns except `le`, `field` and `ts` columns.
|
||||
pub fn tag_col_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
|
||||
self.input
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, field)| {
|
||||
if idx == self.le_column_index
|
||||
|| idx == self.field_column_index
|
||||
|| idx == self.ts_column_index
|
||||
{
|
||||
None
|
||||
} else {
|
||||
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl DisplayAs for HistogramFoldExec {
|
||||
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match t {
|
||||
@@ -1067,83 +1051,9 @@ mod test {
|
||||
quantile: f64,
|
||||
ts_column_index: usize,
|
||||
) -> Arc<HistogramFoldExec> {
|
||||
let input: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
|
||||
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
|
||||
MemorySourceConfig::try_new(&[batches], schema.clone(), None).unwrap(),
|
||||
)));
|
||||
let output_schema: SchemaRef = Arc::new(
|
||||
HistogramFold::convert_schema(&Arc::new(input.schema().to_dfschema().unwrap()), "le")
|
||||
.unwrap()
|
||||
.as_arrow()
|
||||
.clone(),
|
||||
);
|
||||
|
||||
let (tag_columns, partition_exprs, properties) =
|
||||
build_test_plan_properties(&input, output_schema.clone(), ts_column_index);
|
||||
|
||||
Arc::new(HistogramFoldExec {
|
||||
le_column_index: 1,
|
||||
field_column_index: 2,
|
||||
quantile,
|
||||
ts_column_index,
|
||||
input,
|
||||
output_schema,
|
||||
tag_columns,
|
||||
partition_exprs,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
properties,
|
||||
})
|
||||
}
|
||||
|
||||
type PlanPropsResult = (
|
||||
Vec<Arc<dyn PhysicalExpr>>,
|
||||
Vec<Arc<dyn PhysicalExpr>>,
|
||||
PlanProperties,
|
||||
);
|
||||
|
||||
fn build_test_plan_properties(
|
||||
input: &Arc<dyn ExecutionPlan>,
|
||||
output_schema: SchemaRef,
|
||||
ts_column_index: usize,
|
||||
) -> PlanPropsResult {
|
||||
let tag_columns = input
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, field)| {
|
||||
if idx == 1 || idx == 2 || idx == ts_column_index {
|
||||
None
|
||||
} else {
|
||||
Some(Arc::new(PhyColumn::new(field.name(), idx)) as _)
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let partition_exprs = if tag_columns.is_empty() {
|
||||
vec![Arc::new(PhyColumn::new(
|
||||
input.schema().field(ts_column_index).name(),
|
||||
ts_column_index,
|
||||
)) as _]
|
||||
} else {
|
||||
tag_columns.clone()
|
||||
};
|
||||
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::Hash(
|
||||
partition_exprs.clone(),
|
||||
input.output_partitioning().partition_count(),
|
||||
),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
);
|
||||
|
||||
(tag_columns, partition_exprs, properties)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fold_overall() {
|
||||
let memory_exec: Arc<dyn ExecutionPlan> = Arc::new(prepare_test_data());
|
||||
let output_schema: SchemaRef = Arc::new(
|
||||
HistogramFold::convert_schema(
|
||||
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
|
||||
@@ -1153,17 +1063,50 @@ mod test {
|
||||
.as_arrow()
|
||||
.clone(),
|
||||
);
|
||||
let (tag_columns, partition_exprs, properties) =
|
||||
build_test_plan_properties(&memory_exec, output_schema.clone(), 0);
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
);
|
||||
|
||||
Arc::new(HistogramFoldExec {
|
||||
le_column_index: 1,
|
||||
field_column_index: 2,
|
||||
quantile,
|
||||
ts_column_index,
|
||||
input: memory_exec,
|
||||
output_schema,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
properties,
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fold_overall() {
|
||||
let memory_exec = Arc::new(prepare_test_data());
|
||||
let output_schema: SchemaRef = Arc::new(
|
||||
HistogramFold::convert_schema(
|
||||
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
|
||||
"le",
|
||||
)
|
||||
.unwrap()
|
||||
.as_arrow()
|
||||
.clone(),
|
||||
);
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(output_schema.clone()),
|
||||
Partitioning::UnknownPartitioning(1),
|
||||
EmissionType::Incremental,
|
||||
Boundedness::Bounded,
|
||||
);
|
||||
let fold_exec = Arc::new(HistogramFoldExec {
|
||||
le_column_index: 1,
|
||||
field_column_index: 2,
|
||||
quantile: 0.4,
|
||||
ts_column_index: 0,
|
||||
ts_column_index: 9999, // not exist but doesn't matter
|
||||
input: memory_exec,
|
||||
output_schema,
|
||||
tag_columns,
|
||||
partition_exprs,
|
||||
metric: ExecutionPlanMetricsSet::new(),
|
||||
properties,
|
||||
});
|
||||
|
||||
@@ -365,7 +365,7 @@ impl SeriesNormalizeStream {
|
||||
Arc::new(ts_column.clone()) as _
|
||||
} else {
|
||||
Arc::new(TimestampMillisecondArray::from_iter(
|
||||
ts_column.iter().map(|ts| ts.map(|ts| ts + self.offset)),
|
||||
ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)),
|
||||
))
|
||||
};
|
||||
let mut columns = input.columns().to_vec();
|
||||
@@ -518,11 +518,11 @@ mod test {
|
||||
"+---------------------+--------+------+\
|
||||
\n| timestamp | value | path |\
|
||||
\n+---------------------+--------+------+\
|
||||
\n| 1970-01-01T00:01:01 | 0.0 | foo |\
|
||||
\n| 1970-01-01T00:02:01 | 1.0 | foo |\
|
||||
\n| 1970-01-01T00:00:01 | 10.0 | foo |\
|
||||
\n| 1970-01-01T00:00:31 | 100.0 | foo |\
|
||||
\n| 1970-01-01T00:01:31 | 1000.0 | foo |\
|
||||
\n| 1970-01-01T00:00:59 | 0.0 | foo |\
|
||||
\n| 1970-01-01T00:01:59 | 1.0 | foo |\
|
||||
\n| 1969-12-31T23:59:59 | 10.0 | foo |\
|
||||
\n| 1970-01-01T00:00:29 | 100.0 | foo |\
|
||||
\n| 1970-01-01T00:01:29 | 1000.0 | foo |\
|
||||
\n+---------------------+--------+------+",
|
||||
);
|
||||
|
||||
|
||||
@@ -87,19 +87,11 @@ impl ParallelizeScan {
|
||||
&& order_expr.options.descending
|
||||
{
|
||||
for ranges in partition_ranges.iter_mut() {
|
||||
// Primary: end descending (larger end first)
|
||||
// Secondary: start descending (shorter range first when ends are equal)
|
||||
ranges.sort_by(|a, b| {
|
||||
b.end.cmp(&a.end).then_with(|| b.start.cmp(&a.start))
|
||||
});
|
||||
ranges.sort_by(|a, b| b.end.cmp(&a.end));
|
||||
}
|
||||
} else {
|
||||
for ranges in partition_ranges.iter_mut() {
|
||||
// Primary: start ascending (smaller start first)
|
||||
// Secondary: end ascending (shorter range first when starts are equal)
|
||||
ranges.sort_by(|a, b| {
|
||||
a.start.cmp(&b.start).then_with(|| a.end.cmp(&b.end))
|
||||
});
|
||||
ranges.sort_by(|a, b| a.start.cmp(&b.start));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -110,12 +110,12 @@ impl WindowedSortPhysicalRule {
|
||||
{
|
||||
sort_input
|
||||
} else {
|
||||
Arc::new(PartSortExec::try_new(
|
||||
Arc::new(PartSortExec::new(
|
||||
first_sort_expr.clone(),
|
||||
sort_exec.fetch(),
|
||||
scanner_info.partition_ranges.clone(),
|
||||
sort_input,
|
||||
)?)
|
||||
))
|
||||
};
|
||||
|
||||
let windowed_sort_exec = WindowedSortExec::try_new(
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1414,14 +1414,14 @@ impl PromPlanner {
|
||||
.clone()
|
||||
.gt_eq(DfExpr::Literal(
|
||||
ScalarValue::TimestampMillisecond(
|
||||
Some(self.ctx.start - offset_duration - self.ctx.lookback_delta - range),
|
||||
Some(self.ctx.start + offset_duration - self.ctx.lookback_delta - range),
|
||||
None,
|
||||
),
|
||||
None,
|
||||
))
|
||||
.and(time_index_expr.lt_eq(DfExpr::Literal(
|
||||
ScalarValue::TimestampMillisecond(
|
||||
Some(self.ctx.end - offset_duration + self.ctx.lookback_delta),
|
||||
Some(self.ctx.end + offset_duration + self.ctx.lookback_delta),
|
||||
None,
|
||||
),
|
||||
None,
|
||||
@@ -1437,14 +1437,14 @@ impl PromPlanner {
|
||||
.clone()
|
||||
.gt_eq(DfExpr::Literal(
|
||||
ScalarValue::TimestampMillisecond(
|
||||
Some(timestamp - offset_duration - lookback_delta - range),
|
||||
Some(timestamp + offset_duration - lookback_delta - range),
|
||||
None,
|
||||
),
|
||||
None,
|
||||
))
|
||||
.and(time_index_expr.clone().lt_eq(DfExpr::Literal(
|
||||
ScalarValue::TimestampMillisecond(
|
||||
Some(timestamp - offset_duration + lookback_delta),
|
||||
Some(timestamp + offset_duration + lookback_delta),
|
||||
None,
|
||||
),
|
||||
None,
|
||||
|
||||
@@ -84,31 +84,23 @@ pub struct WindowedSortExec {
|
||||
properties: PlanProperties,
|
||||
}
|
||||
|
||||
/// Checks that partition ranges are sorted correctly for the given sort direction.
|
||||
/// - Descending: sorted by (end DESC, start DESC) - shorter ranges first when ends are equal
|
||||
/// - Ascending: sorted by (start ASC, end ASC) - shorter ranges first when starts are equal
|
||||
pub fn check_partition_range_monotonicity(
|
||||
fn check_partition_range_monotonicity(
|
||||
ranges: &[Vec<PartitionRange>],
|
||||
descending: bool,
|
||||
) -> Result<()> {
|
||||
let is_valid = ranges.iter().all(|r| {
|
||||
if descending {
|
||||
// Primary: end descending, Secondary: start descending (shorter range first)
|
||||
r.windows(2)
|
||||
.all(|w| w[0].end > w[1].end || (w[0].end == w[1].end && w[0].start >= w[1].start))
|
||||
r.windows(2).all(|w| w[0].end >= w[1].end)
|
||||
} else {
|
||||
// Primary: start ascending, Secondary: end ascending (shorter range first)
|
||||
r.windows(2).all(|w| {
|
||||
w[0].start < w[1].start || (w[0].start == w[1].start && w[0].end <= w[1].end)
|
||||
})
|
||||
r.windows(2).all(|w| w[0].start <= w[1].start)
|
||||
}
|
||||
});
|
||||
|
||||
if !is_valid {
|
||||
let msg = if descending {
|
||||
"Input `PartitionRange`s are not sorted by (end DESC, start DESC)"
|
||||
"Input `PartitionRange`s's upper bound is not monotonic non-increase"
|
||||
} else {
|
||||
"Input `PartitionRange`s are not sorted by (start ASC, end ASC)"
|
||||
"Input `PartitionRange`s's lower bound is not monotonic non-decrease"
|
||||
};
|
||||
let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
|
||||
Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
|
||||
@@ -2837,9 +2829,8 @@ mod test {
|
||||
// generate input data
|
||||
for part_id in 0..rng.usize(0..part_cnt_bound) {
|
||||
let (start, end) = if descending {
|
||||
// Use 1..=range_offset_bound to ensure strictly decreasing end values
|
||||
let end = bound_val
|
||||
.map(|i| i - rng.i64(1..=range_offset_bound))
|
||||
.map(|i| i - rng.i64(0..range_offset_bound))
|
||||
.unwrap_or_else(|| rng.i64(..));
|
||||
bound_val = Some(end);
|
||||
let start = end - rng.i64(1..range_size_bound);
|
||||
@@ -2847,9 +2838,8 @@ mod test {
|
||||
let end = Timestamp::new(end, unit.into());
|
||||
(start, end)
|
||||
} else {
|
||||
// Use 1..=range_offset_bound to ensure strictly increasing start values
|
||||
let start = bound_val
|
||||
.map(|i| i + rng.i64(1..=range_offset_bound))
|
||||
.map(|i| i + rng.i64(0..range_offset_bound))
|
||||
.unwrap_or_else(|| rng.i64(..));
|
||||
bound_val = Some(start);
|
||||
let end = start + rng.i64(1..range_size_bound);
|
||||
|
||||
@@ -1495,8 +1495,6 @@ manifest_checkpoint_distance = 10
|
||||
experimental_manifest_keep_removed_file_count = 256
|
||||
experimental_manifest_keep_removed_file_ttl = "1h"
|
||||
compress_manifest = false
|
||||
experimental_compaction_memory_limit = "unlimited"
|
||||
experimental_compaction_on_exhausted = "wait"
|
||||
auto_flush_interval = "30m"
|
||||
enable_write_cache = false
|
||||
write_cache_path = ""
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
-- Minimal repro for histogram quantile over multi-partition input.
|
||||
create table histogram_gap_bucket (
|
||||
ts timestamp time index,
|
||||
le string,
|
||||
shard string,
|
||||
val double,
|
||||
primary key (shard, le)
|
||||
) partition on columns (shard) (
|
||||
shard < 'n',
|
||||
shard >= 'n'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into histogram_gap_bucket values
|
||||
(0, '0.5', 'a', 1),
|
||||
(0, '1', 'a', 2),
|
||||
(0, '+Inf', 'a', 2),
|
||||
(0, '0.5', 'z', 2),
|
||||
(0, '1', 'z', 4),
|
||||
(0, '+Inf', 'z', 4),
|
||||
(10000, '0.5', 'a', 1),
|
||||
(10000, '1', 'a', 2),
|
||||
(10000, '+Inf', 'a', 2),
|
||||
(10000, '0.5', 'z', 1),
|
||||
(10000, '1', 'z', 3),
|
||||
(10000, '+Inf', 'z', 3);
|
||||
|
||||
Affected Rows: 12
|
||||
|
||||
-- Ensure the physical plan keeps the required repartition/order before folding buckets.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE Hash\(\[ts@1\],.* Hash([ts@1],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[le@0,\sts@1\],.* Hash([le@0, ts@1],REDACTED
|
||||
tql analyze (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket));
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_HistogramFoldExec: le=@0, field=@2, quantile=0.5 REDACTED
|
||||
|_|_|_SortExec: expr=[ts@1 ASC NULLS LAST, CAST(le@0 AS Float64) ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([ts@1],REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[sum(histogram_gap_bucket.val)] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[le@0 as le, ts@1 as ts], aggr=[sum(histogram_gap_bucket.val)] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED
|
||||
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["shard", "le"] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_AggregateExec: mode=FinalPartitioned, gby=[le@0 as le, ts@1 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([le@0, ts@1],REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[le@1 as le, ts@0 as ts], aggr=[__sum_state(histogram_gap_bucket.val)] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[ts@0 as ts, le@1 as le, val@3 as val] REDACTED
|
||||
|_|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[10000], time index=[ts] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["shard", "le"] REDACTED
|
||||
|_|_|_CooperativeExec REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 2_|
|
||||
+-+-+-+
|
||||
|
||||
-- SQLNESS SORT_RESULT 2 1
|
||||
tql eval (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket));
|
||||
|
||||
+---------------------+-------------------------------+
|
||||
| ts | sum(histogram_gap_bucket.val) |
|
||||
+---------------------+-------------------------------+
|
||||
| 1970-01-01T00:00:00 | 0.5 |
|
||||
| 1970-01-01T00:00:10 | 0.5833333333333334 |
|
||||
+---------------------+-------------------------------+
|
||||
|
||||
drop table histogram_gap_bucket;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
-- Minimal repro for histogram quantile over multi-partition input.
|
||||
create table histogram_gap_bucket (
|
||||
ts timestamp time index,
|
||||
le string,
|
||||
shard string,
|
||||
val double,
|
||||
primary key (shard, le)
|
||||
) partition on columns (shard) (
|
||||
shard < 'n',
|
||||
shard >= 'n'
|
||||
);
|
||||
|
||||
insert into histogram_gap_bucket values
|
||||
(0, '0.5', 'a', 1),
|
||||
(0, '1', 'a', 2),
|
||||
(0, '+Inf', 'a', 2),
|
||||
(0, '0.5', 'z', 2),
|
||||
(0, '1', 'z', 4),
|
||||
(0, '+Inf', 'z', 4),
|
||||
(10000, '0.5', 'a', 1),
|
||||
(10000, '1', 'a', 2),
|
||||
(10000, '+Inf', 'a', 2),
|
||||
(10000, '0.5', 'z', 1),
|
||||
(10000, '1', 'z', 3),
|
||||
(10000, '+Inf', 'z', 3);
|
||||
|
||||
-- Ensure the physical plan keeps the required repartition/order before folding buckets.
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE Hash\(\[ts@1\],.* Hash([ts@1],REDACTED
|
||||
-- SQLNESS REPLACE Hash\(\[le@0,\sts@1\],.* Hash([le@0, ts@1],REDACTED
|
||||
tql analyze (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket));
|
||||
|
||||
-- SQLNESS SORT_RESULT 2 1
|
||||
tql eval (0, 10, '10s') histogram_quantile(0.5, sum by (le) (histogram_gap_bucket));
|
||||
|
||||
drop table histogram_gap_bucket;
|
||||
@@ -46,16 +46,6 @@ tql eval (1500, 1500, '1s') calculate_rate_offset_total;
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (1500, 1500, '1s') calculate_rate_offset_total offset 10m;
|
||||
|
||||
+---------------------+------+---+
|
||||
| ts | val | x |
|
||||
+---------------------+------+---+
|
||||
| 1970-01-01T00:25:00 | 30.0 | a |
|
||||
| 1970-01-01T00:25:00 | 60.0 | b |
|
||||
+---------------------+------+---+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (1500, 1500, '1s') calculate_rate_offset_total offset -10m;
|
||||
|
||||
+---------------------+-------+---+
|
||||
| ts | val | x |
|
||||
+---------------------+-------+---+
|
||||
@@ -64,13 +54,17 @@ tql eval (1500, 1500, '1s') calculate_rate_offset_total offset -10m;
|
||||
+---------------------+-------+---+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 0, '1s') calculate_rate_offset_total offset 10m;
|
||||
tql eval (1500, 1500, '1s') calculate_rate_offset_total offset -10m;
|
||||
|
||||
++
|
||||
++
|
||||
+---------------------+------+---+
|
||||
| ts | val | x |
|
||||
+---------------------+------+---+
|
||||
| 1970-01-01T00:25:00 | 30.0 | a |
|
||||
| 1970-01-01T00:25:00 | 60.0 | b |
|
||||
+---------------------+------+---+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 0, '1s') calculate_rate_offset_total offset -10m;
|
||||
tql eval (0, 0, '1s') calculate_rate_offset_total offset 10m;
|
||||
|
||||
+---------------------+------+---+
|
||||
| ts | val | x |
|
||||
@@ -79,9 +73,19 @@ tql eval (0, 0, '1s') calculate_rate_offset_total offset -10m;
|
||||
| 1970-01-01T00:00:00 | 40.0 | b |
|
||||
+---------------------+------+---+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 0, '1s') calculate_rate_offset_total offset -10m;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
tql eval (3000, 3000, '1s') calculate_rate_offset_total offset 10m;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3000, 3000, '1s') calculate_rate_offset_total offset -10m;
|
||||
|
||||
+---------------------+-------+---+
|
||||
| ts | val | x |
|
||||
+---------------------+-------+---+
|
||||
@@ -89,13 +93,6 @@ tql eval (3000, 3000, '1s') calculate_rate_offset_total offset 10m;
|
||||
| 1970-01-01T00:50:00 | 80.0 | a |
|
||||
+---------------------+-------+---+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3000, 3000, '1s') calculate_rate_offset_total offset -10m;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3000, 3000, '1s') rate(calculate_rate_window_total[10m]);
|
||||
|
||||
++
|
||||
|
||||
@@ -42,16 +42,13 @@ tql eval (1500, 1500, '1s') calculate_rate_offset_total offset -10m;
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 0, '1s') calculate_rate_offset_total offset 10m;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (0, 0, '1s') calculate_rate_offset_total offset -10m;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3000, 3000, '1s') calculate_rate_offset_total offset 10m;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3000, 3000, '1s') calculate_rate_offset_total offset -10m;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3000, 3000, '1s') rate(calculate_rate_window_total[10m]);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
-- Regression for offset direction: positive offsets should query past data.
|
||||
create table offset_direction (
|
||||
ts timestamp time index,
|
||||
val double,
|
||||
host string primary key
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into offset_direction values
|
||||
(940000, 10.0, 'a'),
|
||||
(1000000, 20.0, 'a'),
|
||||
(1060000, 30.0, 'a');
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
tql eval (1000, 1000, '1s') offset_direction;
|
||||
|
||||
+---------------------+------+------+
|
||||
| ts | val | host |
|
||||
+---------------------+------+------+
|
||||
| 1970-01-01T00:16:40 | 20.0 | a |
|
||||
+---------------------+------+------+
|
||||
|
||||
tql eval (1000, 1000, '1s') offset_direction offset 60s;
|
||||
|
||||
+---------------------+------+------+
|
||||
| ts | val | host |
|
||||
+---------------------+------+------+
|
||||
| 1970-01-01T00:16:40 | 10.0 | a |
|
||||
+---------------------+------+------+
|
||||
|
||||
tql eval (1000, 1000, '1s') offset_direction offset -60s;
|
||||
|
||||
+---------------------+------+------+
|
||||
| ts | val | host |
|
||||
+---------------------+------+------+
|
||||
| 1970-01-01T00:16:40 | 30.0 | a |
|
||||
+---------------------+------+------+
|
||||
|
||||
drop table offset_direction;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
-- Regression for offset direction: positive offsets should query past data.
|
||||
|
||||
create table offset_direction (
|
||||
ts timestamp time index,
|
||||
val double,
|
||||
host string primary key
|
||||
);
|
||||
|
||||
insert into offset_direction values
|
||||
(940000, 10.0, 'a'),
|
||||
(1000000, 20.0, 'a'),
|
||||
(1060000, 30.0, 'a');
|
||||
|
||||
tql eval (1000, 1000, '1s') offset_direction;
|
||||
|
||||
tql eval (1000, 1000, '1s') offset_direction offset 60s;
|
||||
|
||||
tql eval (1000, 1000, '1s') offset_direction offset -60s;
|
||||
|
||||
drop table offset_direction;
|
||||
@@ -319,7 +319,6 @@ insert into histogram4_bucket values
|
||||
|
||||
Affected Rows: 7
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
|
||||
|
||||
+---------------------+---+-----+
|
||||
@@ -333,7 +332,6 @@ drop table histogram4_bucket;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fff) (rate(greptime_servers_postgres_query_elapsed_bucket{instance=~"xxx"}[1m])));
|
||||
|
||||
++
|
||||
@@ -397,7 +395,6 @@ insert into histogram5_bucket values
|
||||
|
||||
Affected Rows: 12
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
|
||||
|
||||
+---------------------+---+--------------------+
|
||||
|
||||
@@ -184,12 +184,10 @@ insert into histogram4_bucket values
|
||||
-- INF here is missing
|
||||
;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (2900, 3000, '100s') histogram_quantile(0.9, histogram4_bucket);
|
||||
|
||||
drop table histogram4_bucket;
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval(0, 10, '10s') histogram_quantile(0.99, sum by(pod,instance, fff) (rate(greptime_servers_postgres_query_elapsed_bucket{instance=~"xxx"}[1m])));
|
||||
|
||||
-- test case where table exists but doesn't have 'le' column should raise error
|
||||
@@ -235,7 +233,7 @@ insert into histogram5_bucket values
|
||||
(3015000, "5", "a", 30),
|
||||
(3015000, "+Inf", "a", 50);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
|
||||
tql eval (3000, 3015, '3s') histogram_quantile(0.5, histogram5_bucket);
|
||||
|
||||
drop table histogram5_bucket;
|
||||
|
||||
@@ -676,10 +676,15 @@ WITH time_shifted AS (
|
||||
)
|
||||
SELECT * FROM time_shifted;
|
||||
|
||||
+----+-----+
|
||||
| ts | val |
|
||||
+----+-----+
|
||||
+----+-----+
|
||||
+---------------------+-----+
|
||||
| ts | val |
|
||||
+---------------------+-----+
|
||||
| 1970-01-01T00:00:00 | 3.0 |
|
||||
| 1970-01-01T00:00:10 | 3.0 |
|
||||
| 1970-01-01T00:00:20 | 3.0 |
|
||||
| 1970-01-01T00:00:30 | 3.0 |
|
||||
| 1970-01-01T00:00:40 | 3.0 |
|
||||
+---------------------+-----+
|
||||
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (partitioning.*) REDACTED
|
||||
@@ -697,7 +702,7 @@ SELECT * FROM time_shifted;
|
||||
| | PromInstantManipulate: range=[0..40000], lookback=[300000], interval=[10000], time index=[ts] |
|
||||
| | PromSeriesNormalize: offset=[50000], time index=[ts], filter NaN: [false] |
|
||||
| | PromSeriesDivide: tags=[] |
|
||||
| | Filter: metric.ts >= TimestampMillisecond(-350000, None) AND metric.ts <= TimestampMillisecond(290000, None) |
|
||||
| | Filter: metric.ts >= TimestampMillisecond(-250000, None) AND metric.ts <= TimestampMillisecond(390000, None) |
|
||||
| | TableScan: metric |
|
||||
| | ]] |
|
||||
| physical_plan | CooperativeExec |
|
||||
|
||||
Reference in New Issue
Block a user