From 21a209f7ba7ef4a7db46373af30fad5becd3d686 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 7 Apr 2025 16:59:40 +0800 Subject: [PATCH] fix: skip replacing exprs of the DistinctOn node (#5823) * fix: handle distinct on specially * chore: update comment --- src/query/src/dist_plan/analyzer.rs | 6 ++--- .../src/optimizer/string_normalization.rs | 4 +-- src/query/src/optimizer/type_conversion.rs | 4 +-- src/query/src/range_select/plan_rewrite.rs | 26 +++++++++++++++++-- .../common/aggregate/distinct.result | 9 +++++++ .../standalone/common/aggregate/distinct.sql | 2 ++ 6 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index e09e2e3a1d..e52b6f9814 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -66,9 +66,9 @@ impl AnalyzerRule for DistPlannerAnalyzer { impl DistPlannerAnalyzer { fn inspect_plan_with_subquery(plan: LogicalPlan) -> DfResult> { - // Workaround for https://github.com/GreptimeTeam/greptimedb/issues/5469 - // FIXME(yingwen): Remove this once we update DataFusion. - if let LogicalPlan::Limit(_) = &plan { + // Workaround for https://github.com/GreptimeTeam/greptimedb/issues/5469 and https://github.com/GreptimeTeam/greptimedb/issues/5799 + // FIXME(yingwen): Remove the `Limit` plan once we update DataFusion. + if let LogicalPlan::Limit(_) | LogicalPlan::Distinct(_) = &plan { return Ok(Transformed::no(plan)); } diff --git a/src/query/src/optimizer/string_normalization.rs b/src/query/src/optimizer/string_normalization.rs index c62f5862ee..4277a3a5d1 100644 --- a/src/query/src/optimizer/string_normalization.rs +++ b/src/query/src/optimizer/string_normalization.rs @@ -46,7 +46,6 @@ impl AnalyzerRule for StringNormalizationRule { | LogicalPlan::Values(_) | LogicalPlan::Analyze(_) | LogicalPlan::Extension(_) - | LogicalPlan::Distinct(_) | LogicalPlan::Dml(_) | LogicalPlan::Copy(_) | LogicalPlan::RecursiveQuery(_) => { @@ -63,7 +62,8 @@ impl AnalyzerRule for StringNormalizationRule { Ok(Transformed::no(plan)) } } - LogicalPlan::Limit(_) + LogicalPlan::Distinct(_) + | LogicalPlan::Limit(_) | LogicalPlan::Explain(_) | LogicalPlan::Unnest(_) | LogicalPlan::Ddl(_) diff --git a/src/query/src/optimizer/type_conversion.rs b/src/query/src/optimizer/type_conversion.rs index 5f8a9f6283..5dcca3d95a 100644 --- a/src/query/src/optimizer/type_conversion.rs +++ b/src/query/src/optimizer/type_conversion.rs @@ -88,7 +88,6 @@ impl ExtensionAnalyzerRule for TypeConversionRule { | LogicalPlan::Sort { .. } | LogicalPlan::Union { .. } | LogicalPlan::Join { .. } - | LogicalPlan::Distinct { .. } | LogicalPlan::Values { .. } | LogicalPlan::Analyze { .. } => { let mut converter = TypeConverter { @@ -105,7 +104,8 @@ impl ExtensionAnalyzerRule for TypeConversionRule { plan.with_new_exprs(expr, inputs).map(Transformed::yes) } - LogicalPlan::Limit { .. } + LogicalPlan::Distinct { .. } + | LogicalPlan::Limit { .. } | LogicalPlan::Subquery { .. } | LogicalPlan::Explain { .. } | LogicalPlan::SubqueryAlias { .. } diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 8da43fe8f2..85b220ced0 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -32,8 +32,8 @@ use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::WildcardOptions; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{ - Aggregate, Analyze, Cast, Explain, Expr, ExprSchemable, Extension, LogicalPlan, - LogicalPlanBuilder, Projection, + Aggregate, Analyze, Cast, Distinct, DistinctOn, Explain, Expr, ExprSchemable, Extension, + LogicalPlan, LogicalPlanBuilder, Projection, }; use datafusion_optimizer::simplify_expressions::ExprSimplifier; use datatypes::prelude::ConcreteDataType; @@ -453,6 +453,28 @@ impl RangePlanRewriter { .context(DataFusionSnafu)? .build() } + LogicalPlan::Distinct(Distinct::On(DistinctOn { + on_expr, + select_expr, + sort_expr, + .. + })) => { + ensure!( + inputs.len() == 1, + RangeQuerySnafu { + msg: + "Illegal subplan nums when rewrite DistinctOn logical plan", + } + ); + LogicalPlanBuilder::from(inputs[0].clone()) + .distinct_on( + on_expr.clone(), + select_expr.clone(), + sort_expr.clone(), + ) + .context(DataFusionSnafu)? + .build() + } _ => plan.with_new_exprs(plan.expressions_consider_join(), inputs), } .context(DataFusionSnafu)?; diff --git a/tests/cases/standalone/common/aggregate/distinct.result b/tests/cases/standalone/common/aggregate/distinct.result index cf0aaf1ad3..36d76bd4da 100644 --- a/tests/cases/standalone/common/aggregate/distinct.result +++ b/tests/cases/standalone/common/aggregate/distinct.result @@ -69,6 +69,15 @@ SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test; | 11 | +-------------------------------------------------------------+ +SELECT DISTINCT ON (a) * FROM test ORDER BY a, t DESC; + ++----+----+-------------------------+ +| a | b | t | ++----+----+-------------------------+ +| 11 | 22 | 1970-01-01T00:00:00.004 | +| 13 | 22 | 1970-01-01T00:00:00.002 | ++----+----+-------------------------+ + DROP TABLE test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/aggregate/distinct.sql b/tests/cases/standalone/common/aggregate/distinct.sql index e0e4d1fca8..24b0697dab 100644 --- a/tests/cases/standalone/common/aggregate/distinct.sql +++ b/tests/cases/standalone/common/aggregate/distinct.sql @@ -16,4 +16,6 @@ SELECT DISTINCT MAX(b) FROM test GROUP BY a; SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test; +SELECT DISTINCT ON (a) * FROM test ORDER BY a, t DESC; + DROP TABLE test;