From 02af2cb3cd5d6f0da1d7ebcb99f7d817278bfdf8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 20 Oct 2025 20:28:14 +0800 Subject: [PATCH] fix: limit compaction input num 0.15 (#7117) * fix/limit-compaction-input-num-0.15: **Add File Count Limit and Debug Logging in Compaction Process** - **`run.rs`**: Introduced a new method `num_files` in `FileGroup` to count files. This aids in managing file limits during compaction. - **`twcs.rs`**: Implemented an environment variable `TWCS_MAX_INPUT_FILE_NUM` to limit the number of input files during compaction. Added debug logging to track the maximum compaction file number and info logging to enforce the file limit. Enhanced logic to skip large files in append mode and adjusted the compaction process to respect the new file count limit. Signed-off-by: Lei, HUANG * fix/limit-compaction-input-num-0.15: **Enhancements in `twcs.rs`** - Introduced a default value for `max_input_file_num` with `DEFAULT_MAX_INPUT_FILE_NUM` constant set to 32. - Added error handling for environment variable `TWCS_MAX_INPUT_FILE_NUM` using `warn` to log unrecognized values. - Improved logging in `TwcsPicker` to include the current total input files when enforcing the max input file number limit. Signed-off-by: Lei, HUANG * fix: typo Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/cli/src/metadata/control/get.rs | 6 +- src/cli/src/metadata/control/utils.rs | 2 +- src/common/telemetry/src/tracing_context.rs | 2 +- src/flow/src/plan/join.rs | 2 +- src/meta-client/src/client.rs | 2 +- src/meta-srv/src/election/rds/mysql.rs | 2 +- src/meta-srv/src/election/rds/postgres.rs | 2 +- src/mito2/src/compaction/run.rs | 4 ++ src/mito2/src/compaction/twcs.rs | 72 ++++++++++++++----- src/mito2/src/manifest/manager.rs | 2 +- .../src/extension_plan/scalar_calculate.rs | 4 +- src/query/src/dist_plan/planner.rs | 2 +- src/servers/src/http/event.rs | 6 +- src/sql/src/statements/truncate.rs | 2 +- tests-integration/tests/http.rs | 12 ++-- 15 files changed, 81 insertions(+), 41 deletions(-) diff --git a/src/cli/src/metadata/control/get.rs b/src/cli/src/metadata/control/get.rs index 38a488b53d..1ea713123c 100644 --- a/src/cli/src/metadata/control/get.rs +++ b/src/cli/src/metadata/control/get.rs @@ -29,7 +29,7 @@ use futures::TryStreamExt; use crate::error::InvalidArgumentsSnafu; use crate::metadata::common::StoreConfig; -use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter}; +use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_formatter}; use crate::Tool; /// Getting metadata from metadata store. @@ -206,7 +206,7 @@ impl Tool for GetTableTool { println!( "{}\n{}", TableInfoKey::new(table_id), - json_fromatter(self.pretty, &*table_info) + json_formatter(self.pretty, &*table_info) ); } else { println!("Table info not found"); @@ -221,7 +221,7 @@ impl Tool for GetTableTool { println!( "{}\n{}", TableRouteKey::new(table_id), - json_fromatter(self.pretty, &table_route) + json_formatter(self.pretty, &table_route) ); } else { println!("Table route not found"); diff --git a/src/cli/src/metadata/control/utils.rs b/src/cli/src/metadata/control/utils.rs index f557cb6be3..d563f310a5 100644 --- a/src/cli/src/metadata/control/utils.rs +++ b/src/cli/src/metadata/control/utils.rs @@ -27,7 +27,7 @@ pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> { } /// Formats a value as a JSON string. -pub fn json_fromatter(pretty: bool, value: &T) -> String +pub fn json_formatter(pretty: bool, value: &T) -> String where T: Serialize, { diff --git a/src/common/telemetry/src/tracing_context.rs b/src/common/telemetry/src/tracing_context.rs index bf9c3dd916..f103d48c69 100644 --- a/src/common/telemetry/src/tracing_context.rs +++ b/src/common/telemetry/src/tracing_context.rs @@ -19,7 +19,7 @@ use opentelemetry::propagation::TextMapPropagator; use opentelemetry_sdk::propagation::TraceContextPropagator; use tracing_opentelemetry::OpenTelemetrySpanExt; -// An wapper for `Futures` that provides tracing instrument adapters. +// A wrapper for `Futures` that provides tracing instrument adapters. pub trait FutureExt: std::future::Future + Sized { fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented; } diff --git a/src/flow/src/plan/join.rs b/src/flow/src/plan/join.rs index 1a437dd00d..411b00ee02 100644 --- a/src/flow/src/plan/join.rs +++ b/src/flow/src/plan/join.rs @@ -71,6 +71,6 @@ pub struct LinearStagePlan { /// The key expressions to use for the lookup relation. pub lookup_key: Vec, /// The closure to apply to the concatenation of the key columns, - /// the stream value columns, and the lookup value colunms. + /// the stream value columns, and the lookup value columns. pub closure: JoinFilter, } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index ebf2a6a167..cf7e953062 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -525,7 +525,7 @@ impl MetaClient { self.heartbeat_client()?.ask_leader().await } - /// Returns a heartbeat bidirectional streaming: (sender, recever), the + /// Returns a heartbeat bidirectional streaming: (sender, receiver), the /// other end is the leader of `metasrv`. /// /// The `datanode` needs to use the sender to continuously send heartbeat diff --git a/src/meta-srv/src/election/rds/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs index e36e1bfacd..d9a9405cad 100644 --- a/src/meta-srv/src/election/rds/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -1190,7 +1190,7 @@ mod tests { )); handles.push(handle); } - // Wait for candidates to registrate themselves and renew their leases at least once. + // Wait for candidates to register themselves and renew their leases at least once. tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await; let (tx, _) = broadcast::channel(100); diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index 7caa3a249b..e216d987fc 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -1012,7 +1012,7 @@ mod tests { )); handles.push(handle); } - // Wait for candidates to registrate themselves and renew their leases at least once. + // Wait for candidates to register themselves and renew their leases at least once. tokio::time::sleep(Duration::from_secs(3)).await; let (tx, _) = broadcast::channel(100); diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index cb73aa4607..c148f57585 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -163,6 +163,10 @@ impl FileGroup { self.files.push(file); } + pub(crate) fn num_files(&self) -> usize { + self.files.len() + } + #[cfg(test)] pub(crate) fn files(&self) -> &[FileHandle] { &self.files[..] diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 0ec7a542dd..c37a5375bd 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -16,9 +16,11 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::num::NonZeroU64; +use std::str::FromStr; use common_base::readable_size::ReadableSize; -use common_telemetry::info; +use common_telemetry::tracing::warn; +use common_telemetry::{debug, info}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; @@ -36,6 +38,9 @@ use crate::sst::version::LevelMeta; const LEVEL_COMPACTED: Level = 1; +/// Default value for max compaction input file num. +const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32; + /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// candidates. #[derive(Debug)] @@ -58,6 +63,17 @@ impl TwcsPicker { time_windows: &mut BTreeMap, active_window: Option, ) -> Vec { + let max_input_file_num = std::env::var("TWCS_MAX_INPUT_FILE_NUM") + .ok() + .and_then(|v| { + usize::from_str(&v) + .inspect_err(|_| { + warn!("Cannot recognize value for TWCS_MAX_INPUT_FILE_NUM: `{v}`"); + }) + .ok() + }) + .unwrap_or(DEFAULT_MAX_INPUT_FILE_NUM); + debug!("Max compaction file num from env: {:?}", max_input_file_num); let mut output = vec![]; for (window, files) in time_windows { if files.files.is_empty() { @@ -66,20 +82,21 @@ impl TwcsPicker { let mut files_to_merge: Vec<_> = files.files().cloned().collect(); // Filter out large files in append mode - they won't benefit from compaction - if self.append_mode { - if let Some(max_size) = self.max_output_file_size { - let (kept_files, ignored_files) = files_to_merge - .into_iter() - .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0()); - files_to_merge = kept_files; - info!( - "Skipped {} large files in append mode for region {}, window {}, max_size: {}", - ignored_files.len(), - region_id, - window, - max_size - ); - } + if self.append_mode + && let Some(max_size) = self.max_output_file_size + { + let (kept_files, ignored_files) = files_to_merge + .into_iter() + .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0()); + files_to_merge = kept_files; + info!( + "Compaction for {} skipped {} large files in append mode for region {}, window {}, max_size: {}", + region_id, + ignored_files.len(), + region_id, + window, + max_size + ); } let sorted_runs = find_sorted_runs(&mut files_to_merge); @@ -91,7 +108,7 @@ impl TwcsPicker { return output; } - let inputs = if found_runs > 1 { + let mut inputs = if found_runs > 1 { reduce_runs(sorted_runs) } else { let run = sorted_runs.last().unwrap(); @@ -101,8 +118,27 @@ impl TwcsPicker { // no overlapping files, try merge small files merge_seq_files(run.items(), self.max_output_file_size) }; - - if !inputs.is_empty() { + let total_input_files: usize = inputs.iter().map(|g| g.num_files()).sum(); + if total_input_files > max_input_file_num { + let mut num_picked_files = 0; + inputs = inputs + .into_iter() + .take_while(|g| { + let current_group_file_num = g.num_files(); + if current_group_file_num + num_picked_files <= max_input_file_num { + num_picked_files += current_group_file_num; + true + } else { + false + } + }) + .collect::>(); + info!( + "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}", + region_id, max_input_file_num, total_input_files, inputs + ); + } + if inputs.len() > 1 { log_pick_result( region_id, *window, diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index b5f43a9984..2c47879fc3 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -76,7 +76,7 @@ pub struct RegionManifestOptions { /// -RegionMetadataRef metadata /// } /// class RegionEdit { -/// -VersionNumber regoin_version +/// -VersionNumber region_version /// -Vec~FileMeta~ files_to_add /// -Vec~FileMeta~ files_to_remove /// -SequenceNumber flushed_sequence diff --git a/src/promql/src/extension_plan/scalar_calculate.rs b/src/promql/src/extension_plan/scalar_calculate.rs index 86363cc154..e29aadebab 100644 --- a/src/promql/src/extension_plan/scalar_calculate.rs +++ b/src/promql/src/extension_plan/scalar_calculate.rs @@ -70,7 +70,7 @@ impl ScalarCalculate { interval: Millisecond, input: LogicalPlan, time_index: &str, - tag_colunms: &[String], + tag_columns: &[String], field_column: &str, table_name: Option<&str>, ) -> Result { @@ -97,7 +97,7 @@ impl ScalarCalculate { end, interval, time_index: time_index.to_string(), - tag_columns: tag_colunms.to_vec(), + tag_columns: tag_columns.to_vec(), field_column: field_column.to_string(), input, output_schema: Arc::new(schema), diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 1d79cc204a..a7cc939993 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -82,7 +82,7 @@ impl ExtensionPlanner for MergeSortExtensionPlanner { // and we only need to do a merge sort, otherwise fallback to quick sort let can_merge_sort = partition_cnt >= region_cnt; if can_merge_sort { - // TODO(discord9): use `SortPreversingMergeExec here` + // TODO(discord9): use `SortPreservingMergeExec here` } // for now merge sort only exist in logical plan, and have the same effect as `Sort` // doesn't change the execution plan, this will change in the future diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 0400e92140..763d8d143a 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -352,7 +352,7 @@ async fn dryrun_pipeline_inner( ) .await?; - let colume_type_key = "colume_type"; + let column_type_key = "column_type"; let data_type_key = "data_type"; let name_key = "name"; @@ -376,7 +376,7 @@ async fn dryrun_pipeline_inner( JsonValue::String(cs.datatype().as_str_name().to_string()), ); map.insert( - colume_type_key.to_string(), + column_type_key.to_string(), JsonValue::String(cs.semantic_type().as_str_name().to_string()), ); map.insert( @@ -409,7 +409,7 @@ async fn dryrun_pipeline_inner( ); map.insert( "semantic_type".to_string(), - schema[idx][colume_type_key].clone(), + schema[idx][column_type_key].clone(), ); map.insert( "data_type".to_string(), diff --git a/src/sql/src/statements/truncate.rs b/src/sql/src/statements/truncate.rs index 710b5f72df..8a0673a32b 100644 --- a/src/sql/src/statements/truncate.rs +++ b/src/sql/src/statements/truncate.rs @@ -51,7 +51,7 @@ mod tests { use crate::statements::statement::Statement; #[test] - fn test_display_for_tuncate_table() { + fn test_display_for_truncate_table() { let sql = r"truncate table t1;"; let stmts: Vec = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f994706e37..19226c0b14 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -3174,37 +3174,37 @@ transform: let dryrun_schema = json!([ { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "INT32", "fulltext": false, "name": "id1" }, { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "INT32", "fulltext": false, "name": "id2" }, { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "STRING", "fulltext": false, "name": "type" }, { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "STRING", "fulltext": false, "name": "log" }, { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "STRING", "fulltext": false, "name": "logger" }, { - "colume_type": "TIMESTAMP", + "column_type": "TIMESTAMP", "data_type": "TIMESTAMP_NANOSECOND", "fulltext": false, "name": "time"