diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 55be29ab18..3487fe5786 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -139,7 +139,7 @@ impl ExecutionPlan for SeriesDivideExec { fn required_input_ordering(&self) -> Vec>> { let input_schema = self.input.schema(); - let exprs = self + let exprs: Vec = self .tag_columns .iter() .map(|tag| PhysicalSortRequirement { @@ -148,7 +148,11 @@ impl ExecutionPlan for SeriesDivideExec { options: None, }) .collect(); - vec![Some(exprs)] + if !exprs.is_empty() { + vec![Some(exprs)] + } else { + vec![None] + } } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index 7807913e74..08262da196 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -142,7 +142,7 @@ impl PlanRewriter { return true; } - match Categorizer::check_plan(plan) { + match Categorizer::check_plan(plan, self.partition_cols.clone()) { Commutativity::Commutative => {} Commutativity::PartialCommutative => { if let Some(plan) = partial_commutative_transformer(plan) { @@ -161,7 +161,6 @@ impl PlanRewriter { self.stage.push(plan) } }, - Commutativity::CheckPartition | Commutativity::NonCommutative | Commutativity::Unimplemented | Commutativity::Unsupported => { @@ -351,11 +350,7 @@ mod test { let config = ConfigOptions::default(); let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); - let expected = [ - "Aggregate: groupBy=[[]], aggr=[[AVG(t.number)]]", - " MergeScan [is_placeholder=false]", - ] - .join("\n"); + let expected = "MergeScan [is_placeholder=false]"; assert_eq!(expected, format!("{:?}", result)); } @@ -402,11 +397,7 @@ mod test { let config = ConfigOptions::default(); let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap(); - let expected = [ - "Limit: skip=0, fetch=1", - " MergeScan [is_placeholder=false]", - ] - .join("\n"); + let expected = "MergeScan [is_placeholder=false]"; assert_eq!(expected, format!("{:?}", result)); } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 6765d84222..6bfcee5a16 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; +use datafusion_expr::utils::exprlist_to_columns; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; use promql::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, @@ -29,7 +31,6 @@ pub enum Commutativity { TransformedCommutative(Option), NonCommutative, Unimplemented, - CheckPartition, /// For unrelated plans like DDL Unsupported, } @@ -37,7 +38,9 @@ pub enum Commutativity { pub struct Categorizer {} impl Categorizer { - pub fn check_plan(plan: &LogicalPlan) -> Commutativity { + pub fn check_plan(plan: &LogicalPlan, partition_cols: Option>) -> Commutativity { + let partition_cols = partition_cols.unwrap_or_default(); + match plan { LogicalPlan::Projection(proj) => { for expr in &proj.expr { @@ -51,11 +54,23 @@ impl Categorizer { // TODO(ruihang): Change this to Commutative once Like is supported in substrait LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate), LogicalPlan::Window(_) => Commutativity::Unimplemented, - LogicalPlan::Aggregate(_) => { + LogicalPlan::Aggregate(aggr) => { + if Self::check_partition(&aggr.group_expr, &partition_cols) { + return Commutativity::Commutative; + } + // check all children exprs and uses the strictest level Commutativity::Unimplemented } - LogicalPlan::Sort(_) => Commutativity::Unimplemented, + LogicalPlan::Sort(_) => { + if partition_cols.is_empty() { + return Commutativity::Commutative; + } + + // sort plan needs to consider column priority + // We can implement a merge-sort on partial ordered data + Commutativity::Unimplemented + } LogicalPlan::Join(_) => Commutativity::NonCommutative, LogicalPlan::CrossJoin(_) => Commutativity::NonCommutative, LogicalPlan::Repartition(_) => { @@ -67,7 +82,17 @@ impl Categorizer { LogicalPlan::EmptyRelation(_) => Commutativity::NonCommutative, LogicalPlan::Subquery(_) => Commutativity::Unimplemented, LogicalPlan::SubqueryAlias(_) => Commutativity::Unimplemented, - LogicalPlan::Limit(_) => Commutativity::PartialCommutative, + LogicalPlan::Limit(limit) => { + // Only execute `fetch` on remote nodes. + // wait for https://github.com/apache/arrow-datafusion/pull/7669 + if partition_cols.is_empty() && limit.fetch.is_some() { + Commutativity::Commutative + } else if limit.skip == 0 && limit.fetch.is_some() { + Commutativity::PartialCommutative + } else { + Commutativity::Unimplemented + } + } LogicalPlan::Extension(extension) => { Self::check_extension_plan(extension.node.as_ref() as _) } @@ -93,7 +118,7 @@ impl Categorizer { || name == SeriesDivide::name() || name == MergeScanLogicalPlan::name() => { - Commutativity::Commutative + Commutativity::Unimplemented } _ => Commutativity::Unsupported, } @@ -142,6 +167,26 @@ impl Categorizer { | Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented, } } + + /// Return true if the given expr and partition cols satisfied the rule. + /// In this case the plan can be treated as fully commutative. + fn check_partition(exprs: &[Expr], partition_cols: &[String]) -> bool { + let mut ref_cols = HashSet::new(); + if exprlist_to_columns(exprs, &mut ref_cols).is_err() { + return false; + } + let ref_cols = ref_cols + .into_iter() + .map(|c| c.flat_name()) + .collect::>(); + for col in partition_cols { + if !ref_cols.contains(col) { + return false; + } + } + + true + } } pub type Transformer = Arc Option>; @@ -149,3 +194,23 @@ pub type Transformer = Arc Option>; pub fn partial_commutative_transformer(plan: &LogicalPlan) -> Option { Some(plan.clone()) } + +#[cfg(test)] +mod test { + use datafusion_expr::{LogicalPlanBuilder, Sort}; + + use super::*; + + #[test] + fn sort_on_empty_partition() { + let plan = LogicalPlan::Sort(Sort { + expr: vec![], + input: Arc::new(LogicalPlanBuilder::empty(false).build().unwrap()), + fetch: None, + }); + assert!(matches!( + Categorizer::check_plan(&plan, Some(vec![])), + Commutativity::Commutative + )); + } +} diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index e2a9b9c825..cf7afa26da 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -154,6 +154,7 @@ impl MergeScanExec { let regions = self.regions.clone(); let region_query_handler = self.region_query_handler.clone(); let metric = MergeScanMetric::new(&self.metric); + let schema = Self::arrow_schema_to_schema(self.schema())?; let stream = Box::pin(stream!({ let _finish_timer = metric.finish_time().timer(); @@ -176,12 +177,14 @@ impl MergeScanExec { while let Some(batch) = stream.next().await { let batch = batch?; + // reconstruct batch using `self.schema` + // to remove metadata and correct column name + let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?; metric.record_output_batch_rows(batch.num_rows()); - yield Ok(Self::remove_metadata_from_record_batch(batch)); - if let Some(first_consume_timer) = first_consume_timer.as_mut().take() { first_consume_timer.stop(); } + yield Ok(batch); } } })); @@ -193,14 +196,6 @@ impl MergeScanExec { })) } - fn remove_metadata_from_record_batch(batch: RecordBatch) -> RecordBatch { - let arrow_schema = batch.schema.arrow_schema().as_ref(); - let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema); - let schema_without_metadata = - Self::arrow_schema_to_schema(arrow_schema_without_metadata).unwrap(); - RecordBatch::new(schema_without_metadata, batch.columns().iter().cloned()).unwrap() - } - fn arrow_schema_without_metadata(arrow_schema: &ArrowSchema) -> ArrowSchemaRef { Arc::new(ArrowSchema::new( arrow_schema diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index b8eba89558..65af7e2869 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -160,7 +160,7 @@ impl MysqlServer { if let Err(e) = Self::do_handle(stream, spawn_ref, spawn_config).await { // TODO(LFC): Write this error to client as well, in MySQL text protocol. // Looks like we have to expose opensrv-mysql's `PacketWriter`? - warn!("Internal error occurred during query exec, server actively close the channel to let client try next time: {}.", e) + warn!(e; "Internal error occurred during query exec, server actively close the channel to let client try next time") } decrement_gauge!(crate::metrics::METRIC_MYSQL_CONNECTIONS, 1.0); }); diff --git a/tests/cases/distributed/explain/order_by.result b/tests/cases/distributed/explain/order_by.result index e4ff2d139b..4d08b3e861 100644 --- a/tests/cases/distributed/explain/order_by.result +++ b/tests/cases/distributed/explain/order_by.result @@ -44,10 +44,8 @@ EXPLAIN SELECT a, b FROM test ORDER BY a, b; +-+-+ | plan_type_| plan_| +-+-+ -| logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_| -|_|_MergeScan [is_placeholder=false]_| -| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] | -|_|_MergeScanExec: REDACTED +| logical_plan_| MergeScan [is_placeholder=false]_| +| physical_plan | MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/distributed/explain/single_partition.result b/tests/cases/distributed/explain/single_partition.result index f6b670fbce..0df318ff1b 100644 --- a/tests/cases/distributed/explain/single_partition.result +++ b/tests/cases/distributed/explain/single_partition.result @@ -34,15 +34,8 @@ EXPLAIN SELECT SUM(i) FROM single_partition; +-+-+ | plan_type_| plan_| +-+-+ -| logical_plan_| Aggregate: groupBy=[[]], aggr=[[SUM(single_partition.i)]]_| -|_|_Projection: single_partition.i_| -|_|_MergeScan [is_placeholder=false]_| -| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[SUM(single_partition.i)]_| -|_|_CoalescePartitionsExec_| -|_|_AggregateExec: mode=Partial, gby=[], aggr=[SUM(single_partition.i)]_| -|_|_RepartitionExec: partitioning=REDACTED -|_|_ProjectionExec: expr=[i@0 as i]_| -|_|_MergeScanExec: REDACTED +| logical_plan_| MergeScan [is_placeholder=false]_| +| physical_plan | MergeScanExec: REDACTED |_|_| +-+-+ @@ -56,10 +49,8 @@ EXPLAIN SELECT * FROM single_partition ORDER BY i DESC; +-+-+ | plan_type_| plan_| +-+-+ -| logical_plan_| Sort: single_partition.i DESC NULLS FIRST_| -|_|_MergeScan [is_placeholder=false]_| -| physical_plan | SortExec: expr=[i@0 DESC]_| -|_|_MergeScanExec: REDACTED +| logical_plan_| MergeScan [is_placeholder=false]_| +| physical_plan | MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/distributed/explain/subqueries.result b/tests/cases/distributed/explain/subqueries.result index e1a95fa3a5..8d20b8e159 100644 --- a/tests/cases/distributed/explain/subqueries.result +++ b/tests/cases/distributed/explain/subqueries.result @@ -116,9 +116,7 @@ EXPLAIN INSERT INTO other SELECT i, 2 FROM integers WHERE i=(SELECT MAX(i) FROM | | Projection: integers.i | | | MergeScan [is_placeholder=false] | | | SubqueryAlias: __scalar_sq_1 | -| | Aggregate: groupBy=[[]], aggr=[[MAX(integers.i)]] | -| | Projection: integers.i | -| | MergeScan [is_placeholder=false] | +| | MergeScan [is_placeholder=false] | +--------------+-------------------------------------------------------------------+ drop table other; diff --git a/tests/cases/distributed/optimizer/order_by.result b/tests/cases/distributed/optimizer/order_by.result index 7c43b90db1..b49f3eba46 100644 --- a/tests/cases/distributed/optimizer/order_by.result +++ b/tests/cases/distributed/optimizer/order_by.result @@ -15,8 +15,7 @@ explain select * from numbers order by number desc; +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Sort: numbers.number DESC NULLS FIRST | -| | MergeScan [is_placeholder=false] | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | SortExec: expr=[number@0 DESC] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | | | | @@ -28,8 +27,7 @@ explain select * from numbers order by number asc; +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Sort: numbers.number ASC NULLS LAST | -| | MergeScan [is_placeholder=false] | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | | | | @@ -41,9 +39,7 @@ explain select * from numbers order by number desc limit 10; +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Limit: skip=0, fetch=10 | -| | Sort: numbers.number DESC NULLS FIRST, fetch=10 | -| | MergeScan [is_placeholder=false] | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | GlobalLimitExec: skip=0, fetch=10 | | | SortExec: fetch=10, expr=[number@0 DESC] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | @@ -56,9 +52,7 @@ explain select * from numbers order by number asc limit 10; +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Limit: skip=0, fetch=10 | -| | Sort: numbers.number ASC NULLS LAST, fetch=10 | -| | MergeScan [is_placeholder=false] | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | GlobalLimitExec: skip=0, fetch=10 | | | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | diff --git a/tests/cases/distributed/tql-explain-analyze/analyze.result b/tests/cases/distributed/tql-explain-analyze/analyze.result index 701cec66ad..1c35d346c5 100644 --- a/tests/cases/distributed/tql-explain-analyze/analyze.result +++ b/tests/cases/distributed/tql-explain-analyze/analyze.result @@ -20,9 +20,10 @@ TQL ANALYZE (0, 10, '5s') test; +-+-+ | Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED |_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED |_|_PromSeriesDivideExec: tags=["k"], REDACTED -|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED +|_|_SortExec: expr=[k@2 ASC NULLS LAST], REDACTED |_|_MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/distributed/tql-explain-analyze/explain.result b/tests/cases/distributed/tql-explain-analyze/explain.result index b05436a5b9..7778dab2e1 100644 --- a/tests/cases/distributed/tql-explain-analyze/explain.result +++ b/tests/cases/distributed/tql-explain-analyze/explain.result @@ -18,13 +18,13 @@ TQL EXPLAIN (0, 10, '5s') test; | logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivide: tags=["k"] | -| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST | -| | MergeScan [is_placeholder=false] | +| | MergeScan [is_placeholder=false] | | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | -| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | -| | PromSeriesDivideExec: tags=["k"] | -| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] | -| | MergeScanExec: REDACTED +| | RepartitionExec: partitioning=REDACTED +| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivideExec: tags=["k"] | +| | SortExec: expr=[k@2 ASC NULLS LAST] | +| | MergeScanExec: REDACTED | | | +---------------+-----------------------------------------------------------------------------------------------+ @@ -47,21 +47,21 @@ Affected Rows: 0 -- SQLNESS REPLACE (peers.*) REDACTED TQL EXPLAIN host_load1{__field__="val"}; -+---------------+------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+------------------------------------------------------------------------------------------------------------------+ -| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] | -| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [false] | -| | PromSeriesDivide: tags=["collector", "host"] | -| | Sort: host_load1.collector DESC NULLS LAST, host_load1.host DESC NULLS LAST, host_load1.ts DESC NULLS LAST | -| | MergeScan [is_placeholder=false] | -| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] | -| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] | -| | PromSeriesDivideExec: tags=["collector", "host"] | -| | SortExec: expr=[collector@1 DESC NULLS LAST,host@2 DESC NULLS LAST,ts@3 DESC NULLS LAST] | -| | MergeScanExec: REDACTED -| | | -+---------------+------------------------------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------+ +| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [false] | +| | PromSeriesDivide: tags=["collector", "host"] | +| | MergeScan [is_placeholder=false] | +| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] | +| | RepartitionExec: partitioning=REDACTED +| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] | +| | PromSeriesDivideExec: tags=["collector", "host"] | +| | SortExec: expr=[collector@1 ASC NULLS LAST,host@2 ASC NULLS LAST] | +| | MergeScanExec: REDACTED +| | | ++---------------+------------------------------------------------------------------------------------------------+ DROP TABLE host_load1; diff --git a/tests/cases/standalone/common/tql/join.result b/tests/cases/standalone/common/tql/join.result index 56199732e0..689dac20ec 100644 --- a/tests/cases/standalone/common/tql/join.result +++ b/tests/cases/standalone/common/tql/join.result @@ -56,20 +56,13 @@ tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * +---------+---------------------+-----------------------------------------------------------------------------------------------------------+ | model | ts | completion.SUM(val * Float64(0.0015) / Float64(1000)) + prompt.SUM(val * Float64(0.0015) / Float64(1000)) | +---------+---------------------+-----------------------------------------------------------------------------------------------------------+ -| model-a | 1970-01-01T00:00:00 | 0.000825 | -| model-b | 1970-01-01T00:00:05 | 0.00066 | +| model-a | 1970-01-01T00:00:00 | 0.000165 | +| model-a | 1970-01-01T00:00:05 | 0.000165 | +| model-a | 1970-01-01T00:00:10 | 0.000495 | +| model-b | 1970-01-01T00:00:05 | 0.00033 | +| model-b | 1970-01-01T00:00:10 | 0.00033 | +---------+---------------------+-----------------------------------------------------------------------------------------------------------+ --- SQLNESS SORT_RESULT 3 1 -tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000); - -+---------------------+-----------------------------------------------------------------------------------------------------------+ -| ts | completion.SUM(val * Float64(0.0015) / Float64(1000)) + prompt.SUM(val * Float64(0.0015) / Float64(1000)) | -+---------------------+-----------------------------------------------------------------------------------------------------------+ -| 1970-01-01T00:00:00 | 0.000225 | -| 1970-01-01T00:00:05 | 0.00051 | -+---------------------+-----------------------------------------------------------------------------------------------------------+ - -- SQLNESS SORT_RESULT 3 1 tql eval(0, 10, '5s') sum(completion / 1000) + max(completion / 1000); diff --git a/tests/cases/standalone/common/tql/join.sql b/tests/cases/standalone/common/tql/join.sql index f67b4772a5..e9f5c8a544 100644 --- a/tests/cases/standalone/common/tql/join.sql +++ b/tests/cases/standalone/common/tql/join.sql @@ -29,9 +29,6 @@ tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) + sum(prompt * 0.0015 / 10 -- SQLNESS SORT_RESULT 3 1 tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000) by (model); --- SQLNESS SORT_RESULT 3 1 -tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000); - -- SQLNESS SORT_RESULT 3 1 tql eval(0, 10, '5s') sum(completion / 1000) + max(completion / 1000); diff --git a/tests/cases/standalone/limit/limit.result b/tests/cases/standalone/limit/limit.result index 0f58f3c0bf..0d39bee3e1 100644 --- a/tests/cases/standalone/limit/limit.result +++ b/tests/cases/standalone/limit/limit.result @@ -8,7 +8,7 @@ EXPLAIN SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) LIMIT +---------------+----------------------------------+ | plan_type | plan | +---------------+----------------------------------+ -| logical_plan | EmptyRelation | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | EmptyExec: produce_one_row=false | | | | +---------------+----------------------------------+ @@ -18,7 +18,7 @@ EXPLAIN SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) WHERE +---------------+----------------------------------+ | plan_type | plan | +---------------+----------------------------------+ -| logical_plan | EmptyRelation | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | EmptyExec: produce_one_row=false | | | | +---------------+----------------------------------+ diff --git a/tests/cases/standalone/optimizer/order_by.result b/tests/cases/standalone/optimizer/order_by.result index c1d5b7171a..129743b39d 100644 --- a/tests/cases/standalone/optimizer/order_by.result +++ b/tests/cases/standalone/optimizer/order_by.result @@ -13,8 +13,7 @@ explain select * from numbers order by number desc; +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Sort: numbers.number DESC NULLS FIRST | -| | MergeScan [is_placeholder=false] | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | SortExec: expr=[number@0 DESC] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | | | | @@ -25,8 +24,7 @@ explain select * from numbers order by number asc; +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Sort: numbers.number ASC NULLS LAST | -| | MergeScan [is_placeholder=false] | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | | | | @@ -37,9 +35,7 @@ explain select * from numbers order by number desc limit 10; +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Limit: skip=0, fetch=10 | -| | Sort: numbers.number DESC NULLS FIRST, fetch=10 | -| | MergeScan [is_placeholder=false] | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | GlobalLimitExec: skip=0, fetch=10 | | | SortExec: fetch=10, expr=[number@0 DESC] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | @@ -51,9 +47,7 @@ explain select * from numbers order by number asc limit 10; +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Limit: skip=0, fetch=10 | -| | Sort: numbers.number ASC NULLS LAST, fetch=10 | -| | MergeScan [is_placeholder=false] | +| logical_plan | MergeScan [is_placeholder=false] | | physical_plan | GlobalLimitExec: skip=0, fetch=10 | | | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] | | | StreamScanAdapter { stream: "", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | diff --git a/tests/cases/standalone/tql-explain-analyze/analyze.result b/tests/cases/standalone/tql-explain-analyze/analyze.result index 701cec66ad..1c35d346c5 100644 --- a/tests/cases/standalone/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/tql-explain-analyze/analyze.result @@ -20,9 +20,10 @@ TQL ANALYZE (0, 10, '5s') test; +-+-+ | Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED |_|_RepartitionExec: partitioning=REDACTED +|_|_RepartitionExec: partitioning=REDACTED |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED |_|_PromSeriesDivideExec: tags=["k"], REDACTED -|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED +|_|_SortExec: expr=[k@2 ASC NULLS LAST], REDACTED |_|_MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/standalone/tql-explain-analyze/explain.result b/tests/cases/standalone/tql-explain-analyze/explain.result index 1de5856fc5..d0ebadd06b 100644 --- a/tests/cases/standalone/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/tql-explain-analyze/explain.result @@ -18,13 +18,13 @@ TQL EXPLAIN (0, 10, '5s') test; | logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivide: tags=["k"] | -| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST | -| | MergeScan [is_placeholder=false] | +| | MergeScan [is_placeholder=false] | | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | -| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | -| | PromSeriesDivideExec: tags=["k"] | -| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] | -| | MergeScanExec: REDACTED +| | RepartitionExec: partitioning=REDACTED +| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivideExec: tags=["k"] | +| | SortExec: expr=[k@2 ASC NULLS LAST] | +| | MergeScanExec: REDACTED | | | +---------------+-----------------------------------------------------------------------------------------------+