perf: skip redundant series divide repartitions

This commit is contained in:
Ruihang Xia
2026-03-29 08:43:52 +08:00
parent 767c3b44c8
commit 3299aebe95

View File

@@ -397,6 +397,20 @@ impl ExecutionPlan for SeriesDivideExec {
if self.tag_columns.is_empty() {
return vec![Distribution::SinglePartition];
}
// If the upstream already produces a single partition, `SeriesDivide` can safely process
// the full ordered stream directly. Repartitioning a single partition into the global
// target partition count only adds shuffle/merge overhead without improving correctness.
if self
.input
.properties()
.output_partitioning()
.partition_count()
<= 1
{
return vec![Distribution::UnspecifiedDistribution];
}
let schema = self.input.schema();
vec![Distribution::HashPartitioned(
self.tag_columns
@@ -740,6 +754,45 @@ mod test {
))
}
fn prepare_multi_partition_test_data() -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
Field::new("path", DataType::Utf8, true),
Field::new(
"time_index",
DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
false,
),
]));
let data_1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["000", "000"])) as _,
Arc::new(StringArray::from(vec!["foo", "foo"])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![1000, 2000],
)) as _,
],
)
.unwrap();
let data_2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["001", "001"])) as _,
Arc::new(StringArray::from(vec!["bar", "bar"])) as _,
Arc::new(datafusion::arrow::array::TimestampMillisecondArray::from(
vec![3000, 4000],
)) as _,
],
)
.unwrap();
DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![data_1], vec![data_2]], schema, None).unwrap(),
))
}
#[test]
fn pruning_should_keep_tags_and_time_index_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
@@ -760,6 +813,38 @@ mod test {
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[test]
fn single_partition_input_skips_repartition_requirement() {
let divide_exec = SeriesDivideExec {
tag_columns: vec!["host".to_string()],
time_index_column: "time_index".to_string(),
input: Arc::new(prepare_test_data()),
metric: ExecutionPlanMetricsSet::new(),
};
let requirement = divide_exec.required_input_distribution();
assert!(matches!(
requirement.as_slice(),
[Distribution::UnspecifiedDistribution]
));
}
#[test]
fn multi_partition_input_still_requires_hash_partitioning() {
let divide_exec = SeriesDivideExec {
tag_columns: vec!["host".to_string()],
time_index_column: "time_index".to_string(),
input: Arc::new(prepare_multi_partition_test_data()),
metric: ExecutionPlanMetricsSet::new(),
};
let requirement = divide_exec.required_input_distribution();
assert!(matches!(
requirement.as_slice(),
[Distribution::HashPartitioned(_)]
));
}
#[tokio::test]
async fn overall_data() {
let memory_exec = Arc::new(prepare_test_data());