diff --git a/src/cli/src/metadata/control/get.rs b/src/cli/src/metadata/control/get.rs index 38a488b53d..1ea713123c 100644 --- a/src/cli/src/metadata/control/get.rs +++ b/src/cli/src/metadata/control/get.rs @@ -29,7 +29,7 @@ use futures::TryStreamExt; use crate::error::InvalidArgumentsSnafu; use crate::metadata::common::StoreConfig; -use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter}; +use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_formatter}; use crate::Tool; /// Getting metadata from metadata store. @@ -206,7 +206,7 @@ impl Tool for GetTableTool { println!( "{}\n{}", TableInfoKey::new(table_id), - json_fromatter(self.pretty, &*table_info) + json_formatter(self.pretty, &*table_info) ); } else { println!("Table info not found"); @@ -221,7 +221,7 @@ impl Tool for GetTableTool { println!( "{}\n{}", TableRouteKey::new(table_id), - json_fromatter(self.pretty, &table_route) + json_formatter(self.pretty, &table_route) ); } else { println!("Table route not found"); diff --git a/src/cli/src/metadata/control/utils.rs b/src/cli/src/metadata/control/utils.rs index f557cb6be3..d563f310a5 100644 --- a/src/cli/src/metadata/control/utils.rs +++ b/src/cli/src/metadata/control/utils.rs @@ -27,7 +27,7 @@ pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> { } /// Formats a value as a JSON string. -pub fn json_fromatter(pretty: bool, value: &T) -> String +pub fn json_formatter(pretty: bool, value: &T) -> String where T: Serialize, { diff --git a/src/common/telemetry/src/tracing_context.rs b/src/common/telemetry/src/tracing_context.rs index bf9c3dd916..f103d48c69 100644 --- a/src/common/telemetry/src/tracing_context.rs +++ b/src/common/telemetry/src/tracing_context.rs @@ -19,7 +19,7 @@ use opentelemetry::propagation::TextMapPropagator; use opentelemetry_sdk::propagation::TraceContextPropagator; use tracing_opentelemetry::OpenTelemetrySpanExt; -// An wapper for `Futures` that provides tracing instrument adapters. +// A wrapper for `Futures` that provides tracing instrument adapters. pub trait FutureExt: std::future::Future + Sized { fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented; } diff --git a/src/flow/src/plan/join.rs b/src/flow/src/plan/join.rs index 1a437dd00d..411b00ee02 100644 --- a/src/flow/src/plan/join.rs +++ b/src/flow/src/plan/join.rs @@ -71,6 +71,6 @@ pub struct LinearStagePlan { /// The key expressions to use for the lookup relation. pub lookup_key: Vec, /// The closure to apply to the concatenation of the key columns, - /// the stream value columns, and the lookup value colunms. + /// the stream value columns, and the lookup value columns. pub closure: JoinFilter, } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index ebf2a6a167..cf7e953062 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -525,7 +525,7 @@ impl MetaClient { self.heartbeat_client()?.ask_leader().await } - /// Returns a heartbeat bidirectional streaming: (sender, recever), the + /// Returns a heartbeat bidirectional streaming: (sender, receiver), the /// other end is the leader of `metasrv`. /// /// The `datanode` needs to use the sender to continuously send heartbeat diff --git a/src/meta-srv/src/election/rds/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs index e36e1bfacd..d9a9405cad 100644 --- a/src/meta-srv/src/election/rds/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -1190,7 +1190,7 @@ mod tests { )); handles.push(handle); } - // Wait for candidates to registrate themselves and renew their leases at least once. + // Wait for candidates to register themselves and renew their leases at least once. tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await; let (tx, _) = broadcast::channel(100); diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index 7caa3a249b..e216d987fc 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -1012,7 +1012,7 @@ mod tests { )); handles.push(handle); } - // Wait for candidates to registrate themselves and renew their leases at least once. + // Wait for candidates to register themselves and renew their leases at least once. tokio::time::sleep(Duration::from_secs(3)).await; let (tx, _) = broadcast::channel(100); diff --git a/src/mito2/src/compaction/run.rs b/src/mito2/src/compaction/run.rs index cb73aa4607..c148f57585 100644 --- a/src/mito2/src/compaction/run.rs +++ b/src/mito2/src/compaction/run.rs @@ -163,6 +163,10 @@ impl FileGroup { self.files.push(file); } + pub(crate) fn num_files(&self) -> usize { + self.files.len() + } + #[cfg(test)] pub(crate) fn files(&self) -> &[FileHandle] { &self.files[..] diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 0ec7a542dd..c37a5375bd 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -16,9 +16,11 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use std::num::NonZeroU64; +use std::str::FromStr; use common_base::readable_size::ReadableSize; -use common_telemetry::info; +use common_telemetry::tracing::warn; +use common_telemetry::{debug, info}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; @@ -36,6 +38,9 @@ use crate::sst::version::LevelMeta; const LEVEL_COMPACTED: Level = 1; +/// Default value for max compaction input file num. +const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32; + /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// candidates. #[derive(Debug)] @@ -58,6 +63,17 @@ impl TwcsPicker { time_windows: &mut BTreeMap, active_window: Option, ) -> Vec { + let max_input_file_num = std::env::var("TWCS_MAX_INPUT_FILE_NUM") + .ok() + .and_then(|v| { + usize::from_str(&v) + .inspect_err(|_| { + warn!("Cannot recognize value for TWCS_MAX_INPUT_FILE_NUM: `{v}`"); + }) + .ok() + }) + .unwrap_or(DEFAULT_MAX_INPUT_FILE_NUM); + debug!("Max compaction file num from env: {:?}", max_input_file_num); let mut output = vec![]; for (window, files) in time_windows { if files.files.is_empty() { @@ -66,20 +82,21 @@ impl TwcsPicker { let mut files_to_merge: Vec<_> = files.files().cloned().collect(); // Filter out large files in append mode - they won't benefit from compaction - if self.append_mode { - if let Some(max_size) = self.max_output_file_size { - let (kept_files, ignored_files) = files_to_merge - .into_iter() - .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0()); - files_to_merge = kept_files; - info!( - "Skipped {} large files in append mode for region {}, window {}, max_size: {}", - ignored_files.len(), - region_id, - window, - max_size - ); - } + if self.append_mode + && let Some(max_size) = self.max_output_file_size + { + let (kept_files, ignored_files) = files_to_merge + .into_iter() + .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0()); + files_to_merge = kept_files; + info!( + "Compaction for {} skipped {} large files in append mode for region {}, window {}, max_size: {}", + region_id, + ignored_files.len(), + region_id, + window, + max_size + ); } let sorted_runs = find_sorted_runs(&mut files_to_merge); @@ -91,7 +108,7 @@ impl TwcsPicker { return output; } - let inputs = if found_runs > 1 { + let mut inputs = if found_runs > 1 { reduce_runs(sorted_runs) } else { let run = sorted_runs.last().unwrap(); @@ -101,8 +118,27 @@ impl TwcsPicker { // no overlapping files, try merge small files merge_seq_files(run.items(), self.max_output_file_size) }; - - if !inputs.is_empty() { + let total_input_files: usize = inputs.iter().map(|g| g.num_files()).sum(); + if total_input_files > max_input_file_num { + let mut num_picked_files = 0; + inputs = inputs + .into_iter() + .take_while(|g| { + let current_group_file_num = g.num_files(); + if current_group_file_num + num_picked_files <= max_input_file_num { + num_picked_files += current_group_file_num; + true + } else { + false + } + }) + .collect::>(); + info!( + "Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}", + region_id, max_input_file_num, total_input_files, inputs + ); + } + if inputs.len() > 1 { log_pick_result( region_id, *window, diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index b5f43a9984..2c47879fc3 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -76,7 +76,7 @@ pub struct RegionManifestOptions { /// -RegionMetadataRef metadata /// } /// class RegionEdit { -/// -VersionNumber regoin_version +/// -VersionNumber region_version /// -Vec~FileMeta~ files_to_add /// -Vec~FileMeta~ files_to_remove /// -SequenceNumber flushed_sequence diff --git a/src/promql/src/extension_plan/scalar_calculate.rs b/src/promql/src/extension_plan/scalar_calculate.rs index 86363cc154..e29aadebab 100644 --- a/src/promql/src/extension_plan/scalar_calculate.rs +++ b/src/promql/src/extension_plan/scalar_calculate.rs @@ -70,7 +70,7 @@ impl ScalarCalculate { interval: Millisecond, input: LogicalPlan, time_index: &str, - tag_colunms: &[String], + tag_columns: &[String], field_column: &str, table_name: Option<&str>, ) -> Result { @@ -97,7 +97,7 @@ impl ScalarCalculate { end, interval, time_index: time_index.to_string(), - tag_columns: tag_colunms.to_vec(), + tag_columns: tag_columns.to_vec(), field_column: field_column.to_string(), input, output_schema: Arc::new(schema), diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 1d79cc204a..a7cc939993 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -82,7 +82,7 @@ impl ExtensionPlanner for MergeSortExtensionPlanner { // and we only need to do a merge sort, otherwise fallback to quick sort let can_merge_sort = partition_cnt >= region_cnt; if can_merge_sort { - // TODO(discord9): use `SortPreversingMergeExec here` + // TODO(discord9): use `SortPreservingMergeExec here` } // for now merge sort only exist in logical plan, and have the same effect as `Sort` // doesn't change the execution plan, this will change in the future diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 0400e92140..763d8d143a 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -352,7 +352,7 @@ async fn dryrun_pipeline_inner( ) .await?; - let colume_type_key = "colume_type"; + let column_type_key = "column_type"; let data_type_key = "data_type"; let name_key = "name"; @@ -376,7 +376,7 @@ async fn dryrun_pipeline_inner( JsonValue::String(cs.datatype().as_str_name().to_string()), ); map.insert( - colume_type_key.to_string(), + column_type_key.to_string(), JsonValue::String(cs.semantic_type().as_str_name().to_string()), ); map.insert( @@ -409,7 +409,7 @@ async fn dryrun_pipeline_inner( ); map.insert( "semantic_type".to_string(), - schema[idx][colume_type_key].clone(), + schema[idx][column_type_key].clone(), ); map.insert( "data_type".to_string(), diff --git a/src/sql/src/statements/truncate.rs b/src/sql/src/statements/truncate.rs index 710b5f72df..8a0673a32b 100644 --- a/src/sql/src/statements/truncate.rs +++ b/src/sql/src/statements/truncate.rs @@ -51,7 +51,7 @@ mod tests { use crate::statements::statement::Statement; #[test] - fn test_display_for_tuncate_table() { + fn test_display_for_truncate_table() { let sql = r"truncate table t1;"; let stmts: Vec = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f994706e37..19226c0b14 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -3174,37 +3174,37 @@ transform: let dryrun_schema = json!([ { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "INT32", "fulltext": false, "name": "id1" }, { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "INT32", "fulltext": false, "name": "id2" }, { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "STRING", "fulltext": false, "name": "type" }, { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "STRING", "fulltext": false, "name": "log" }, { - "colume_type": "FIELD", + "column_type": "FIELD", "data_type": "STRING", "fulltext": false, "name": "logger" }, { - "colume_type": "TIMESTAMP", + "column_type": "TIMESTAMP", "data_type": "TIMESTAMP_NANOSECOND", "fulltext": false, "name": "time"