Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-15 08:44:33 +08:00
parent ffe1030d2d
commit 024ca1af79

View File

@@ -120,9 +120,13 @@ impl ReduceAggregateRepartition {
}
fn can_reduce_repartition(repartition_exec: &RepartitionExec) -> bool {
let has_direct_promql_input =
Self::has_direct_promql_partial_input(repartition_exec.input());
if Self::contains_promql_exec_deep(repartition_exec.input()) {
let has_direct_promql_input = matches!(
repartition_exec.input().as_any().downcast_ref::<AggregateExec>(),
Some(partial_agg)
if partial_agg.mode() == &AggregateMode::Partial
&& Self::contains_promql_exec(partial_agg.input(), true)
);
if Self::contains_promql_exec(repartition_exec.input(), false) {
return has_direct_promql_input;
}
@@ -143,45 +147,23 @@ impl ReduceAggregateRepartition {
coarsening_satisfaction.is_subset()
}
fn has_direct_promql_partial_input(plan: &Arc<dyn ExecutionPlan>) -> bool {
let Some(partial_agg) = plan.as_any().downcast_ref::<AggregateExec>() else {
return false;
};
partial_agg.mode() == &AggregateMode::Partial
&& Self::contains_promql_vector_exec(partial_agg.input())
}
fn contains_promql_vector_exec(plan: &Arc<dyn ExecutionPlan>) -> bool {
if Self::is_promql_vector_exec(plan) {
fn contains_promql_exec(plan: &Arc<dyn ExecutionPlan>, stop_at_aggregate: bool) -> bool {
let plan_any = plan.as_any();
if plan_any.is::<SeriesDivideExec>()
|| plan_any.is::<SeriesNormalizeExec>()
|| plan_any.is::<RangeManipulateExec>()
|| plan_any.is::<InstantManipulateExec>()
{
return true;
}
if plan.as_any().is::<AggregateExec>() {
if stop_at_aggregate && plan_any.is::<AggregateExec>() {
return false;
}
plan.children()
.into_iter()
.any(Self::contains_promql_vector_exec)
}
fn contains_promql_exec_deep(plan: &Arc<dyn ExecutionPlan>) -> bool {
if Self::is_promql_vector_exec(plan) {
return true;
}
plan.children()
.into_iter()
.any(Self::contains_promql_exec_deep)
}
fn is_promql_vector_exec(plan: &Arc<dyn ExecutionPlan>) -> bool {
let plan = plan.as_any();
plan.is::<SeriesDivideExec>()
|| plan.is::<SeriesNormalizeExec>()
|| plan.is::<RangeManipulateExec>()
|| plan.is::<InstantManipulateExec>()
.any(|child| Self::contains_promql_exec(child, stop_at_aggregate))
}
}
@@ -205,15 +187,12 @@ mod tests {
use super::ReduceAggregateRepartition;
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
fn input_exec() -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]))
}
fn input_exec(schema: SchemaRef) -> Arc<dyn ExecutionPlan> {
]));
let config = MemorySourceConfig::try_new(&[vec![]], schema, None).unwrap();
DataSourceExec::from_data_source(config)
}
@@ -241,13 +220,6 @@ mod tests {
)?))
}
fn round_robin_repartition(input: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(RepartitionExec::try_new(
input,
Partitioning::RoundRobinBatch(8),
)?))
}
fn aggregate(
mode: AggregateMode,
input: Arc<dyn ExecutionPlan>,
@@ -284,7 +256,7 @@ mod tests {
#[test]
fn rewrites_final_partitioned_subset_repartition() -> Result<()> {
let raw = input_exec(schema());
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?;
let partial = aggregate(
AggregateMode::Partial,
@@ -315,7 +287,7 @@ mod tests {
#[test]
fn rewrites_single_partitioned_subset_repartition() -> Result<()> {
let raw = input_exec(schema());
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?;
let final_repartition = repartition(finer.clone(), &["a"], &finer.schema())?;
let final_agg = aggregate(
@@ -338,7 +310,7 @@ mod tests {
#[test]
fn keeps_equal_partitioning_keys() -> Result<()> {
let raw = input_exec(schema());
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?;
let partial = aggregate(
AggregateMode::Partial,
@@ -369,7 +341,7 @@ mod tests {
#[test]
fn rewrites_when_finer_key_order_differs() -> Result<()> {
let raw = input_exec(schema());
let raw = input_exec();
let finer = repartition(raw.clone(), &["c", "a", "b"], &raw.schema())?;
let partial = aggregate(
AggregateMode::Partial,
@@ -400,7 +372,7 @@ mod tests {
#[test]
fn rewrites_when_repartition_satisfies_group_by_with_subset_keys() -> Result<()> {
let raw = input_exec(schema());
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?;
let final_repartition = repartition(finer.clone(), &["a"], &finer.schema())?;
let final_agg = aggregate(
@@ -423,7 +395,7 @@ mod tests {
#[test]
fn keeps_non_hash_repartition_child() -> Result<()> {
let raw = input_exec(schema());
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b"], &raw.schema())?;
let partial = aggregate(
AggregateMode::Partial,
@@ -432,7 +404,10 @@ mod tests {
raw.schema(),
vec![],
)?;
let final_repartition = round_robin_repartition(partial.clone())?;
let final_repartition = Arc::new(RepartitionExec::try_new(
partial.clone(),
Partitioning::RoundRobinBatch(8),
)?);
let final_agg = aggregate(
AggregateMode::FinalPartitioned,
final_repartition,
@@ -454,7 +429,7 @@ mod tests {
#[test]
fn rewrites_subset_partitioning_through_projection() -> Result<()> {
let raw = input_exec(schema());
let raw = input_exec();
let finer = repartition(raw.clone(), &["a", "b", "c"], &raw.schema())?;
let projected = project_with_aliases(finer, &[("a", "x"), ("b", "y"), ("c", "z")])?;
let final_repartition = repartition(projected.clone(), &["x", "y"], &projected.schema())?;
@@ -477,7 +452,7 @@ mod tests {
#[test]
fn keeps_non_subset_repartition() -> Result<()> {
let raw = input_exec(schema());
let raw = input_exec();
let coarser = repartition(raw.clone(), &["a"], &raw.schema())?;
let final_repartition = repartition(coarser.clone(), &["a", "b"], &coarser.schema())?;
let final_agg = aggregate(