fix: limit compaction input num 0.15 (#7117)

* fix/limit-compaction-input-num-0.15:
 **Add File Count Limit and Debug Logging in Compaction Process**

 - **`run.rs`**: Introduced a new method `num_files` in `FileGroup` to count files. This aids in managing file limits during compaction.
 - **`twcs.rs`**: Implemented an environment variable `TWCS_MAX_INPUT_FILE_NUM` to limit the number of input files during compaction. Added debug logging to track the maximum compaction file number and info logging to enforce the file limit. Enhanced logic to skip large files in append mode and adjusted the
 compaction process to respect the new file count limit.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/limit-compaction-input-num-0.15:
 **Enhancements in `twcs.rs`**

 - Introduced a default value for `max_input_file_num` with `DEFAULT_MAX_INPUT_FILE_NUM` constant set to 32.
 - Added error handling for environment variable `TWCS_MAX_INPUT_FILE_NUM` using `warn` to log unrecognized values.
 - Improved logging in `TwcsPicker` to include the current total input files when enforcing the max input file number limit.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: typo

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-10-20 20:28:14 +08:00
committed by GitHub
parent 53fc32b0da
commit 02af2cb3cd
15 changed files with 81 additions and 41 deletions

View File

@@ -29,7 +29,7 @@ use futures::TryStreamExt;
use crate::error::InvalidArgumentsSnafu; use crate::error::InvalidArgumentsSnafu;
use crate::metadata::common::StoreConfig; use crate::metadata::common::StoreConfig;
use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_fromatter}; use crate::metadata::control::utils::{decode_key_value, get_table_id_by_name, json_formatter};
use crate::Tool; use crate::Tool;
/// Getting metadata from metadata store. /// Getting metadata from metadata store.
@@ -206,7 +206,7 @@ impl Tool for GetTableTool {
println!( println!(
"{}\n{}", "{}\n{}",
TableInfoKey::new(table_id), TableInfoKey::new(table_id),
json_fromatter(self.pretty, &*table_info) json_formatter(self.pretty, &*table_info)
); );
} else { } else {
println!("Table info not found"); println!("Table info not found");
@@ -221,7 +221,7 @@ impl Tool for GetTableTool {
println!( println!(
"{}\n{}", "{}\n{}",
TableRouteKey::new(table_id), TableRouteKey::new(table_id),
json_fromatter(self.pretty, &table_route) json_formatter(self.pretty, &table_route)
); );
} else { } else {
println!("Table route not found"); println!("Table route not found");

View File

@@ -27,7 +27,7 @@ pub fn decode_key_value(kv: KeyValue) -> CommonMetaResult<(String, String)> {
} }
/// Formats a value as a JSON string. /// Formats a value as a JSON string.
pub fn json_fromatter<T>(pretty: bool, value: &T) -> String pub fn json_formatter<T>(pretty: bool, value: &T) -> String
where where
T: Serialize, T: Serialize,
{ {

View File

@@ -19,7 +19,7 @@ use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
// An wapper for `Futures` that provides tracing instrument adapters. // A wrapper for `Futures` that provides tracing instrument adapters.
pub trait FutureExt: std::future::Future + Sized { pub trait FutureExt: std::future::Future + Sized {
fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self>; fn trace(self, span: tracing::span::Span) -> tracing::instrument::Instrumented<Self>;
} }

View File

@@ -71,6 +71,6 @@ pub struct LinearStagePlan {
/// The key expressions to use for the lookup relation. /// The key expressions to use for the lookup relation.
pub lookup_key: Vec<ScalarExpr>, pub lookup_key: Vec<ScalarExpr>,
/// The closure to apply to the concatenation of the key columns, /// The closure to apply to the concatenation of the key columns,
/// the stream value columns, and the lookup value colunms. /// the stream value columns, and the lookup value columns.
pub closure: JoinFilter, pub closure: JoinFilter,
} }

View File

@@ -525,7 +525,7 @@ impl MetaClient {
self.heartbeat_client()?.ask_leader().await self.heartbeat_client()?.ask_leader().await
} }
/// Returns a heartbeat bidirectional streaming: (sender, recever), the /// Returns a heartbeat bidirectional streaming: (sender, receiver), the
/// other end is the leader of `metasrv`. /// other end is the leader of `metasrv`.
/// ///
/// The `datanode` needs to use the sender to continuously send heartbeat /// The `datanode` needs to use the sender to continuously send heartbeat

View File

@@ -1190,7 +1190,7 @@ mod tests {
)); ));
handles.push(handle); handles.push(handle);
} }
// Wait for candidates to registrate themselves and renew their leases at least once. // Wait for candidates to register themselves and renew their leases at least once.
tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await; tokio::time::sleep(candidate_lease_ttl / 2 + Duration::from_secs(1)).await;
let (tx, _) = broadcast::channel(100); let (tx, _) = broadcast::channel(100);

View File

@@ -1012,7 +1012,7 @@ mod tests {
)); ));
handles.push(handle); handles.push(handle);
} }
// Wait for candidates to registrate themselves and renew their leases at least once. // Wait for candidates to register themselves and renew their leases at least once.
tokio::time::sleep(Duration::from_secs(3)).await; tokio::time::sleep(Duration::from_secs(3)).await;
let (tx, _) = broadcast::channel(100); let (tx, _) = broadcast::channel(100);

View File

@@ -163,6 +163,10 @@ impl FileGroup {
self.files.push(file); self.files.push(file);
} }
pub(crate) fn num_files(&self) -> usize {
self.files.len()
}
#[cfg(test)] #[cfg(test)]
pub(crate) fn files(&self) -> &[FileHandle] { pub(crate) fn files(&self) -> &[FileHandle] {
&self.files[..] &self.files[..]

View File

@@ -16,9 +16,11 @@ use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug; use std::fmt::Debug;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::str::FromStr;
use common_base::readable_size::ReadableSize; use common_base::readable_size::ReadableSize;
use common_telemetry::info; use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::timestamp::TimeUnit; use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned; use common_time::timestamp_millis::BucketAligned;
use common_time::Timestamp; use common_time::Timestamp;
@@ -36,6 +38,9 @@ use crate::sst::version::LevelMeta;
const LEVEL_COMPACTED: Level = 1; const LEVEL_COMPACTED: Level = 1;
/// Default value for max compaction input file num.
const DEFAULT_MAX_INPUT_FILE_NUM: usize = 32;
/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
/// candidates. /// candidates.
#[derive(Debug)] #[derive(Debug)]
@@ -58,6 +63,17 @@ impl TwcsPicker {
time_windows: &mut BTreeMap<i64, Window>, time_windows: &mut BTreeMap<i64, Window>,
active_window: Option<i64>, active_window: Option<i64>,
) -> Vec<CompactionOutput> { ) -> Vec<CompactionOutput> {
let max_input_file_num = std::env::var("TWCS_MAX_INPUT_FILE_NUM")
.ok()
.and_then(|v| {
usize::from_str(&v)
.inspect_err(|_| {
warn!("Cannot recognize value for TWCS_MAX_INPUT_FILE_NUM: `{v}`");
})
.ok()
})
.unwrap_or(DEFAULT_MAX_INPUT_FILE_NUM);
debug!("Max compaction file num from env: {:?}", max_input_file_num);
let mut output = vec![]; let mut output = vec![];
for (window, files) in time_windows { for (window, files) in time_windows {
if files.files.is_empty() { if files.files.is_empty() {
@@ -66,20 +82,21 @@ impl TwcsPicker {
let mut files_to_merge: Vec<_> = files.files().cloned().collect(); let mut files_to_merge: Vec<_> = files.files().cloned().collect();
// Filter out large files in append mode - they won't benefit from compaction // Filter out large files in append mode - they won't benefit from compaction
if self.append_mode { if self.append_mode
if let Some(max_size) = self.max_output_file_size { && let Some(max_size) = self.max_output_file_size
let (kept_files, ignored_files) = files_to_merge {
.into_iter() let (kept_files, ignored_files) = files_to_merge
.partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0()); .into_iter()
files_to_merge = kept_files; .partition(|fg| fg.size() <= max_size as usize && fg.is_all_level_0());
info!( files_to_merge = kept_files;
"Skipped {} large files in append mode for region {}, window {}, max_size: {}", info!(
ignored_files.len(), "Compaction for {} skipped {} large files in append mode for region {}, window {}, max_size: {}",
region_id, region_id,
window, ignored_files.len(),
max_size region_id,
); window,
} max_size
);
} }
let sorted_runs = find_sorted_runs(&mut files_to_merge); let sorted_runs = find_sorted_runs(&mut files_to_merge);
@@ -91,7 +108,7 @@ impl TwcsPicker {
return output; return output;
} }
let inputs = if found_runs > 1 { let mut inputs = if found_runs > 1 {
reduce_runs(sorted_runs) reduce_runs(sorted_runs)
} else { } else {
let run = sorted_runs.last().unwrap(); let run = sorted_runs.last().unwrap();
@@ -101,8 +118,27 @@ impl TwcsPicker {
// no overlapping files, try merge small files // no overlapping files, try merge small files
merge_seq_files(run.items(), self.max_output_file_size) merge_seq_files(run.items(), self.max_output_file_size)
}; };
let total_input_files: usize = inputs.iter().map(|g| g.num_files()).sum();
if !inputs.is_empty() { if total_input_files > max_input_file_num {
let mut num_picked_files = 0;
inputs = inputs
.into_iter()
.take_while(|g| {
let current_group_file_num = g.num_files();
if current_group_file_num + num_picked_files <= max_input_file_num {
num_picked_files += current_group_file_num;
true
} else {
false
}
})
.collect::<Vec<_>>();
info!(
"Compaction for region {} enforces max input file num limit: {}, current total: {}, input: {:?}",
region_id, max_input_file_num, total_input_files, inputs
);
}
if inputs.len() > 1 {
log_pick_result( log_pick_result(
region_id, region_id,
*window, *window,

View File

@@ -76,7 +76,7 @@ pub struct RegionManifestOptions {
/// -RegionMetadataRef metadata /// -RegionMetadataRef metadata
/// } /// }
/// class RegionEdit { /// class RegionEdit {
/// -VersionNumber regoin_version /// -VersionNumber region_version
/// -Vec~FileMeta~ files_to_add /// -Vec~FileMeta~ files_to_add
/// -Vec~FileMeta~ files_to_remove /// -Vec~FileMeta~ files_to_remove
/// -SequenceNumber flushed_sequence /// -SequenceNumber flushed_sequence

View File

@@ -70,7 +70,7 @@ impl ScalarCalculate {
interval: Millisecond, interval: Millisecond,
input: LogicalPlan, input: LogicalPlan,
time_index: &str, time_index: &str,
tag_colunms: &[String], tag_columns: &[String],
field_column: &str, field_column: &str,
table_name: Option<&str>, table_name: Option<&str>,
) -> Result<Self> { ) -> Result<Self> {
@@ -97,7 +97,7 @@ impl ScalarCalculate {
end, end,
interval, interval,
time_index: time_index.to_string(), time_index: time_index.to_string(),
tag_columns: tag_colunms.to_vec(), tag_columns: tag_columns.to_vec(),
field_column: field_column.to_string(), field_column: field_column.to_string(),
input, input,
output_schema: Arc::new(schema), output_schema: Arc::new(schema),

View File

@@ -82,7 +82,7 @@ impl ExtensionPlanner for MergeSortExtensionPlanner {
// and we only need to do a merge sort, otherwise fallback to quick sort // and we only need to do a merge sort, otherwise fallback to quick sort
let can_merge_sort = partition_cnt >= region_cnt; let can_merge_sort = partition_cnt >= region_cnt;
if can_merge_sort { if can_merge_sort {
// TODO(discord9): use `SortPreversingMergeExec here` // TODO(discord9): use `SortPreservingMergeExec here`
} }
// for now merge sort only exist in logical plan, and have the same effect as `Sort` // for now merge sort only exist in logical plan, and have the same effect as `Sort`
// doesn't change the execution plan, this will change in the future // doesn't change the execution plan, this will change in the future

View File

@@ -352,7 +352,7 @@ async fn dryrun_pipeline_inner(
) )
.await?; .await?;
let colume_type_key = "colume_type"; let column_type_key = "column_type";
let data_type_key = "data_type"; let data_type_key = "data_type";
let name_key = "name"; let name_key = "name";
@@ -376,7 +376,7 @@ async fn dryrun_pipeline_inner(
JsonValue::String(cs.datatype().as_str_name().to_string()), JsonValue::String(cs.datatype().as_str_name().to_string()),
); );
map.insert( map.insert(
colume_type_key.to_string(), column_type_key.to_string(),
JsonValue::String(cs.semantic_type().as_str_name().to_string()), JsonValue::String(cs.semantic_type().as_str_name().to_string()),
); );
map.insert( map.insert(
@@ -409,7 +409,7 @@ async fn dryrun_pipeline_inner(
); );
map.insert( map.insert(
"semantic_type".to_string(), "semantic_type".to_string(),
schema[idx][colume_type_key].clone(), schema[idx][column_type_key].clone(),
); );
map.insert( map.insert(
"data_type".to_string(), "data_type".to_string(),

View File

@@ -51,7 +51,7 @@ mod tests {
use crate::statements::statement::Statement; use crate::statements::statement::Statement;
#[test] #[test]
fn test_display_for_tuncate_table() { fn test_display_for_truncate_table() {
let sql = r"truncate table t1;"; let sql = r"truncate table t1;";
let stmts: Vec<Statement> = let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())

View File

@@ -3174,37 +3174,37 @@ transform:
let dryrun_schema = json!([ let dryrun_schema = json!([
{ {
"colume_type": "FIELD", "column_type": "FIELD",
"data_type": "INT32", "data_type": "INT32",
"fulltext": false, "fulltext": false,
"name": "id1" "name": "id1"
}, },
{ {
"colume_type": "FIELD", "column_type": "FIELD",
"data_type": "INT32", "data_type": "INT32",
"fulltext": false, "fulltext": false,
"name": "id2" "name": "id2"
}, },
{ {
"colume_type": "FIELD", "column_type": "FIELD",
"data_type": "STRING", "data_type": "STRING",
"fulltext": false, "fulltext": false,
"name": "type" "name": "type"
}, },
{ {
"colume_type": "FIELD", "column_type": "FIELD",
"data_type": "STRING", "data_type": "STRING",
"fulltext": false, "fulltext": false,
"name": "log" "name": "log"
}, },
{ {
"colume_type": "FIELD", "column_type": "FIELD",
"data_type": "STRING", "data_type": "STRING",
"fulltext": false, "fulltext": false,
"name": "logger" "name": "logger"
}, },
{ {
"colume_type": "TIMESTAMP", "column_type": "TIMESTAMP",
"data_type": "TIMESTAMP_NANOSECOND", "data_type": "TIMESTAMP_NANOSECOND",
"fulltext": false, "fulltext": false,
"name": "time" "name": "time"