From 95d0c650ecbac28e106d643266649247b5f0dfed Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 9 Apr 2025 10:39:04 +0800 Subject: [PATCH] feat: pushdown select distinct in some cases (#5847) * feat: pushdown select distinct * test: add sqlness test * test: fix analyzer test --- src/query/src/dist_plan/analyzer.rs | 8 +----- src/query/src/dist_plan/commutativity.rs | 8 +++++- .../cases/distributed/explain/order_by.result | 18 +++---------- .../common/aggregate/distinct.result | 26 +++++++++++++++++++ .../standalone/common/aggregate/distinct.sql | 9 +++++++ 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index b0796a635b..6ebc078eea 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -489,13 +489,7 @@ mod test { let config = ConfigOptions::default(); let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); - let expected = [ - "Sort: t.number ASC NULLS LAST", - " Distinct:", - " Projection: t.number", - " MergeScan [is_placeholder=false]", - ] - .join("\n"); + let expected = ["Projection: t.number", " MergeScan [is_placeholder=false]"].join("\n"); assert_eq!(expected, result.to_string()); } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 194e2d3b9d..5b3cb0f2db 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -96,7 +96,13 @@ impl Categorizer { LogicalPlan::Extension(extension) => { Self::check_extension_plan(extension.node.as_ref() as _) } - LogicalPlan::Distinct(_) => Commutativity::Unimplemented, + LogicalPlan::Distinct(_) => { + if partition_cols.is_empty() { + Commutativity::Commutative + } else { + Commutativity::Unimplemented + } + } LogicalPlan::Unnest(_) => Commutativity::Commutative, LogicalPlan::Statement(_) => Commutativity::Unsupported, LogicalPlan::Values(_) => Commutativity::Unsupported, diff --git a/tests/cases/distributed/explain/order_by.result b/tests/cases/distributed/explain/order_by.result index ba819d5994..9f0f5002b1 100644 --- a/tests/cases/distributed/explain/order_by.result +++ b/tests/cases/distributed/explain/order_by.result @@ -12,13 +12,8 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1; +-+-+ | plan_type_| plan_| +-+-+ -| logical_plan_| Sort: integers.i % Int64(2) ASC NULLS LAST_| -|_|_Aggregate: groupBy=[[integers.i % Int64(2)]], aggr=[[]]_| -|_|_MergeScan [is_placeholder=false]_| -| physical_plan | SortPreservingMergeExec: [integers.i % Int64(2)@0 ASC NULLS LAST]_| -|_|_SortExec: expr=[integers.i % Int64(2)@0 ASC NULLS LAST], preserve_partitioning=[true]_| -|_|_AggregateExec: mode=SinglePartitioned, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[] | -|_|_MergeScanExec: REDACTED +| logical_plan_| MergeScan [is_placeholder=false]_| +| physical_plan | MergeScanExec: REDACTED |_|_| +-+-+ @@ -55,13 +50,8 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b; +-+-+ | plan_type_| plan_| +-+-+ -| logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_| -|_|_Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]_| -|_|_MergeScan [is_placeholder=false]_| -| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]_| -|_|_SortExec: expr=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], preserve_partitioning=[true] | -|_|_AggregateExec: mode=SinglePartitioned, gby=[a@0 as a, b@1 as b], aggr=[]_| -|_|_MergeScanExec: REDACTED +| logical_plan_| MergeScan [is_placeholder=false]_| +| physical_plan | MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/standalone/common/aggregate/distinct.result b/tests/cases/standalone/common/aggregate/distinct.result index 36d76bd4da..4130d26f71 100644 --- a/tests/cases/standalone/common/aggregate/distinct.result +++ b/tests/cases/standalone/common/aggregate/distinct.result @@ -78,6 +78,32 @@ SELECT DISTINCT ON (a) * FROM test ORDER BY a, t DESC; | 13 | 22 | 1970-01-01T00:00:00.002 | +----+----+-------------------------+ +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT DISTINCT a FROM test ORDER BY a; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_SortPreservingMergeExec: [a@0 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED +|_|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[] REDACTED +|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[] REDACTED +|_|_|_RepartitionExec: partitioning=REDACTED +|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED +|_|_|_| +|_|_| Total rows: 2_| ++-+-+-+ + DROP TABLE test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/aggregate/distinct.sql b/tests/cases/standalone/common/aggregate/distinct.sql index 24b0697dab..8fe8cec395 100644 --- a/tests/cases/standalone/common/aggregate/distinct.sql +++ b/tests/cases/standalone/common/aggregate/distinct.sql @@ -18,4 +18,13 @@ SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test; SELECT DISTINCT ON (a) * FROM test ORDER BY a, t DESC; +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT DISTINCT a FROM test ORDER BY a; + DROP TABLE test;