From d581688fd230af660d170d23ac2adcde5122ae72 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 23 Aug 2023 03:32:20 -0500 Subject: [PATCH] fix: dist planner has wrong behavior in table with multiple partitions (#2237) * fix: dist planner has wrong behavior in table with multiple partitions Signed-off-by: Ruihang Xia * Update tests/cases/distributed/explain/multi_partitions.sql Co-authored-by: Zhenchi --------- Signed-off-by: Ruihang Xia Co-authored-by: Zhenchi --- src/query/src/dist_plan/analyzer.rs | 11 ++++-- src/query/src/dist_plan/commutativity.rs | 2 +- tests-integration/src/instance.rs | 2 + .../explain/multi_partitions.result | 38 +++++++++++++++++++ .../distributed/explain/multi_partitions.sql | 23 +++++++++++ .../distributed/optimizer/order_by.result | 10 ++--- 6 files changed, 77 insertions(+), 9 deletions(-) create mode 100644 tests/cases/distributed/explain/multi_partitions.result create mode 100644 tests/cases/distributed/explain/multi_partitions.sql diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 450e804a36..2ace7f2ea9 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -159,6 +159,7 @@ impl ExpandState { } } +#[derive(Debug)] struct CommutativeVisitor { next_stage: Vec, // hash of the stop node @@ -325,7 +326,7 @@ mod test { let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); let expected = [ "Aggregate: groupBy=[[]], aggr=[[AVG(t.number)]]", - " TableScan: t", + " MergeScan [is_placeholder=false]", ] .join("\n"); assert_eq!(expected, format!("{:?}", result)); @@ -352,7 +353,7 @@ mod test { let expected = [ "Sort: t.number ASC NULLS LAST", " Distinct:", - " TableScan: t", + " MergeScan [is_placeholder=false]", ] .join("\n"); assert_eq!(expected, format!("{:?}", result)); @@ -374,7 +375,11 @@ mod test { let config = ConfigOptions::default(); let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); - let expected = ["Limit: skip=0, fetch=1", " TableScan: t"].join("\n"); + let expected = [ + "Limit: skip=0, fetch=1", + " MergeScan [is_placeholder=false]", + ] + .join("\n"); assert_eq!(expected, format!("{:?}", result)); } } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 8519e53096..6765d84222 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -63,7 +63,7 @@ impl Categorizer { Commutativity::Unimplemented } LogicalPlan::Union(_) => Commutativity::Unimplemented, - LogicalPlan::TableScan(_) => Commutativity::CheckPartition, + LogicalPlan::TableScan(_) => Commutativity::Commutative, LogicalPlan::EmptyRelation(_) => Commutativity::NonCommutative, LogicalPlan::Subquery(_) => Commutativity::Unimplemented, LogicalPlan::SubqueryAlias(_) => Commutativity::Unimplemented, diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 3f94697b0c..d97f4e78df 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -61,6 +61,8 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_distributed_exec_sql() { + common_telemetry::init_default_ut_logging(); + let distributed = tests::create_distributed_instance("test_distributed_exec_sql").await; let frontend = distributed.frontend(); let instance = frontend.as_ref(); diff --git a/tests/cases/distributed/explain/multi_partitions.result b/tests/cases/distributed/explain/multi_partitions.result new file mode 100644 index 0000000000..d15882b5bb --- /dev/null +++ b/tests/cases/distributed/explain/multi_partitions.result @@ -0,0 +1,38 @@ +CREATE TABLE demo( + host STRING, + ts TIMESTAMP, + cpu DOUBLE NULL, + memory DOUBLE NULL, + disk_util DOUBLE DEFAULT 9.9, + TIME INDEX (ts), + PRIMARY KEY(host) +) +PARTITION BY RANGE COLUMNS (host) ( + PARTITION r0 VALUES LESS THAN ('550-A'), + PARTITION r1 VALUES LESS THAN ('550-W'), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +); + +Affected Rows: 0 + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +explain SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: demo.host ASC NULLS LAST_| +|_|_MergeScan [is_placeholder=false]_| +| physical_plan | SortExec: expr=[host@0 ASC NULLS LAST]_| +|_|_MergeScanExec: peers=[REDACTED +|_|_| ++-+-+ + +drop table demo; + +Affected Rows: 1 + diff --git a/tests/cases/distributed/explain/multi_partitions.sql b/tests/cases/distributed/explain/multi_partitions.sql new file mode 100644 index 0000000000..e3fa3f6791 --- /dev/null +++ b/tests/cases/distributed/explain/multi_partitions.sql @@ -0,0 +1,23 @@ +CREATE TABLE demo( + host STRING, + ts TIMESTAMP, + cpu DOUBLE NULL, + memory DOUBLE NULL, + disk_util DOUBLE DEFAULT 9.9, + TIME INDEX (ts), + PRIMARY KEY(host) +) +PARTITION BY RANGE COLUMNS (host) ( + PARTITION r0 VALUES LESS THAN ('550-A'), + PARTITION r1 VALUES LESS THAN ('550-W'), + PARTITION r2 VALUES LESS THAN (MAXVALUE), +); + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +explain SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host; + +drop table demo; diff --git a/tests/cases/distributed/optimizer/order_by.result b/tests/cases/distributed/optimizer/order_by.result index f847696124..c1d5b7171a 100644 --- a/tests/cases/distributed/optimizer/order_by.result +++ b/tests/cases/distributed/optimizer/order_by.result @@ -3,7 +3,7 @@ explain select * from numbers; +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | TableScan: numbers projection=[number] | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | | | | +---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -14,7 +14,7 @@ explain select * from numbers order by number desc; | plan_type | plan | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Sort: numbers.number DESC NULLS FIRST | -| | TableScan: numbers projection=[number] | +| | MergeScan [is_placeholder=false] | | physical_plan | SortExec: expr=[number@0 DESC] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | | | | @@ -26,7 +26,7 @@ explain select * from numbers order by number asc; | plan_type | plan | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Sort: numbers.number ASC NULLS LAST | -| | TableScan: numbers projection=[number] | +| | MergeScan [is_placeholder=false] | | physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | | | | @@ -39,7 +39,7 @@ explain select * from numbers order by number desc limit 10; +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Limit: skip=0, fetch=10 | | | Sort: numbers.number DESC NULLS FIRST, fetch=10 | -| | TableScan: numbers projection=[number] | +| | MergeScan [is_placeholder=false] | | physical_plan | GlobalLimitExec: skip=0, fetch=10 | | | SortExec: fetch=10, expr=[number@0 DESC] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | @@ -53,7 +53,7 @@ explain select * from numbers order by number asc limit 10; +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Limit: skip=0, fetch=10 | | | Sort: numbers.number ASC NULLS LAST, fetch=10 | -| | TableScan: numbers projection=[number] | +| | MergeScan [is_placeholder=false] | | physical_plan | GlobalLimitExec: skip=0, fetch=10 | | | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |