From 3299aebe95ad1788c69d67292484b506fbfa9841 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 29 Mar 2026 08:43:52 +0800 Subject: [PATCH] perf: skip redundant series divide repartitions --- .../src/extension_plan/series_divide.rs | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 4a0b32f4e3..acf5f4377d 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -397,6 +397,20 @@ impl ExecutionPlan for SeriesDivideExec { if self.tag_columns.is_empty() { return vec![Distribution::SinglePartition]; } + + // If the upstream already produces a single partition, `SeriesDivide` can safely process + // the full ordered stream directly. Repartitioning a single partition into the global + // target partition count only adds shuffle/merge overhead without improving correctness. + if self + .input + .properties() + .output_partitioning() + .partition_count() + <= 1 + { + return vec![Distribution::UnspecifiedDistribution]; + } + let schema = self.input.schema(); vec![Distribution::HashPartitioned( self.tag_columns @@ -740,6 +754,45 @@ mod test { )) } + fn prepare_multi_partition_test_data() -> DataSourceExec { + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new("path", DataType::Utf8, true), + Field::new( + "time_index", + DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + ])); + + let data_1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["000", "000"])) as _, + Arc::new(StringArray::from(vec!["foo", "foo"])) as _, + Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from( + vec![1000, 2000], + )) as _, + ], + ) + .unwrap(); + let data_2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["001", "001"])) as _, + Arc::new(StringArray::from(vec!["bar", "bar"])) as _, + Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from( + vec![3000, 4000], + )) as _, + ], + ) + .unwrap(); + + DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![data_1], vec![data_2]], schema, None).unwrap(), + )) + } + #[test] fn pruning_should_keep_tags_and_time_index_columns_for_exec() { let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap(); @@ -760,6 +813,38 @@ mod test { assert_eq!(required.as_slice(), &[0, 1, 2]); } + #[test] + fn single_partition_input_skips_repartition_requirement() { + let divide_exec = SeriesDivideExec { + tag_columns: vec!["host".to_string()], + time_index_column: "time_index".to_string(), + input: Arc::new(prepare_test_data()), + metric: ExecutionPlanMetricsSet::new(), + }; + + let requirement = divide_exec.required_input_distribution(); + assert!(matches!( + requirement.as_slice(), + [Distribution::UnspecifiedDistribution] + )); + } + + #[test] + fn multi_partition_input_still_requires_hash_partitioning() { + let divide_exec = SeriesDivideExec { + tag_columns: vec!["host".to_string()], + time_index_column: "time_index".to_string(), + input: Arc::new(prepare_multi_partition_test_data()), + metric: ExecutionPlanMetricsSet::new(), + }; + + let requirement = divide_exec.required_input_distribution(); + assert!(matches!( + requirement.as_slice(), + [Distribution::HashPartitioned(_)] + )); + } + #[tokio::test] async fn overall_data() { let memory_exec = Arc::new(prepare_test_data());