feat: pushdown aggr, limit and sort plan (#2495)

* check partition for aggr plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle empty partition rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove CheckPartition option

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update some valid sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* opt-out promql plan and update sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix limit

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix insert select subquery

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update unit test result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/query/src/dist_plan/analyzer.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-28 14:35:45 +08:00
committed by GitHub
parent 52ac093110
commit 10ecc30817
17 changed files with 139 additions and 117 deletions

View File

@@ -139,7 +139,7 @@ impl ExecutionPlan for SeriesDivideExec {
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let input_schema = self.input.schema();
let exprs = self
let exprs: Vec<PhysicalSortRequirement> = self
.tag_columns
.iter()
.map(|tag| PhysicalSortRequirement {
@@ -148,7 +148,11 @@ impl ExecutionPlan for SeriesDivideExec {
options: None,
})
.collect();
vec![Some(exprs)]
if !exprs.is_empty() {
vec![Some(exprs)]
} else {
vec![None]
}
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {

View File

@@ -142,7 +142,7 @@ impl PlanRewriter {
return true;
}
match Categorizer::check_plan(plan) {
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
@@ -161,7 +161,6 @@ impl PlanRewriter {
self.stage.push(plan)
}
},
Commutativity::CheckPartition
| Commutativity::NonCommutative
| Commutativity::Unimplemented
| Commutativity::Unsupported => {
@@ -351,11 +350,7 @@ mod test {
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = [
"Aggregate: groupBy=[[]], aggr=[[AVG(t.number)]]",
" MergeScan [is_placeholder=false]",
]
.join("\n");
let expected = "MergeScan [is_placeholder=false]";
assert_eq!(expected, format!("{:?}", result));
}
@@ -402,11 +397,7 @@ mod test {
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = [
"Limit: skip=0, fetch=1",
" MergeScan [is_placeholder=false]",
]
.join("\n");
let expected = "MergeScan [is_placeholder=false]";
assert_eq!(expected, format!("{:?}", result));
}

View File

@@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use datafusion_expr::utils::exprlist_to_columns;
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
@@ -29,7 +31,6 @@ pub enum Commutativity {
TransformedCommutative(Option<Transformer>),
NonCommutative,
Unimplemented,
CheckPartition,
/// For unrelated plans like DDL
Unsupported,
}
@@ -37,7 +38,9 @@ pub enum Commutativity {
pub struct Categorizer {}
impl Categorizer {
pub fn check_plan(plan: &LogicalPlan) -> Commutativity {
pub fn check_plan(plan: &LogicalPlan, partition_cols: Option<Vec<String>>) -> Commutativity {
let partition_cols = partition_cols.unwrap_or_default();
match plan {
LogicalPlan::Projection(proj) => {
for expr in &proj.expr {
@@ -51,11 +54,23 @@ impl Categorizer {
// TODO(ruihang): Change this to Commutative once Like is supported in substrait
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
LogicalPlan::Window(_) => Commutativity::Unimplemented,
LogicalPlan::Aggregate(_) => {
LogicalPlan::Aggregate(aggr) => {
if Self::check_partition(&aggr.group_expr, &partition_cols) {
return Commutativity::Commutative;
}
// check all children exprs and uses the strictest level
Commutativity::Unimplemented
}
LogicalPlan::Sort(_) => Commutativity::Unimplemented,
LogicalPlan::Sort(_) => {
if partition_cols.is_empty() {
return Commutativity::Commutative;
}
// sort plan needs to consider column priority
// We can implement a merge-sort on partial ordered data
Commutativity::Unimplemented
}
LogicalPlan::Join(_) => Commutativity::NonCommutative,
LogicalPlan::CrossJoin(_) => Commutativity::NonCommutative,
LogicalPlan::Repartition(_) => {
@@ -67,7 +82,17 @@ impl Categorizer {
LogicalPlan::EmptyRelation(_) => Commutativity::NonCommutative,
LogicalPlan::Subquery(_) => Commutativity::Unimplemented,
LogicalPlan::SubqueryAlias(_) => Commutativity::Unimplemented,
LogicalPlan::Limit(_) => Commutativity::PartialCommutative,
LogicalPlan::Limit(limit) => {
// Only execute `fetch` on remote nodes.
// wait for https://github.com/apache/arrow-datafusion/pull/7669
if partition_cols.is_empty() && limit.fetch.is_some() {
Commutativity::Commutative
} else if limit.skip == 0 && limit.fetch.is_some() {
Commutativity::PartialCommutative
} else {
Commutativity::Unimplemented
}
}
LogicalPlan::Extension(extension) => {
Self::check_extension_plan(extension.node.as_ref() as _)
}
@@ -93,7 +118,7 @@ impl Categorizer {
|| name == SeriesDivide::name()
|| name == MergeScanLogicalPlan::name() =>
{
Commutativity::Commutative
Commutativity::Unimplemented
}
_ => Commutativity::Unsupported,
}
@@ -142,6 +167,26 @@ impl Categorizer {
| Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented,
}
}
/// Return true if the given expr and partition cols satisfied the rule.
/// In this case the plan can be treated as fully commutative.
fn check_partition(exprs: &[Expr], partition_cols: &[String]) -> bool {
let mut ref_cols = HashSet::new();
if exprlist_to_columns(exprs, &mut ref_cols).is_err() {
return false;
}
let ref_cols = ref_cols
.into_iter()
.map(|c| c.flat_name())
.collect::<HashSet<_>>();
for col in partition_cols {
if !ref_cols.contains(col) {
return false;
}
}
true
}
}
pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;
@@ -149,3 +194,23 @@ pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;
pub fn partial_commutative_transformer(plan: &LogicalPlan) -> Option<LogicalPlan> {
Some(plan.clone())
}
#[cfg(test)]
mod test {
use datafusion_expr::{LogicalPlanBuilder, Sort};
use super::*;
#[test]
fn sort_on_empty_partition() {
let plan = LogicalPlan::Sort(Sort {
expr: vec![],
input: Arc::new(LogicalPlanBuilder::empty(false).build().unwrap()),
fetch: None,
});
assert!(matches!(
Categorizer::check_plan(&plan, Some(vec![])),
Commutativity::Commutative
));
}
}

View File

@@ -154,6 +154,7 @@ impl MergeScanExec {
let regions = self.regions.clone();
let region_query_handler = self.region_query_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
let schema = Self::arrow_schema_to_schema(self.schema())?;
let stream = Box::pin(stream!({
let _finish_timer = metric.finish_time().timer();
@@ -176,12 +177,14 @@ impl MergeScanExec {
while let Some(batch) = stream.next().await {
let batch = batch?;
// reconstruct batch using `self.schema`
// to remove metadata and correct column name
let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
metric.record_output_batch_rows(batch.num_rows());
yield Ok(Self::remove_metadata_from_record_batch(batch));
if let Some(first_consume_timer) = first_consume_timer.as_mut().take() {
first_consume_timer.stop();
}
yield Ok(batch);
}
}
}));
@@ -193,14 +196,6 @@ impl MergeScanExec {
}))
}
fn remove_metadata_from_record_batch(batch: RecordBatch) -> RecordBatch {
let arrow_schema = batch.schema.arrow_schema().as_ref();
let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema);
let schema_without_metadata =
Self::arrow_schema_to_schema(arrow_schema_without_metadata).unwrap();
RecordBatch::new(schema_without_metadata, batch.columns().iter().cloned()).unwrap()
}
fn arrow_schema_without_metadata(arrow_schema: &ArrowSchema) -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(
arrow_schema

View File

@@ -160,7 +160,7 @@ impl MysqlServer {
if let Err(e) = Self::do_handle(stream, spawn_ref, spawn_config).await {
// TODO(LFC): Write this error to client as well, in MySQL text protocol.
// Looks like we have to expose opensrv-mysql's `PacketWriter`?
warn!("Internal error occurred during query exec, server actively close the channel to let client try next time: {}.", e)
warn!(e; "Internal error occurred during query exec, server actively close the channel to let client try next time")
}
decrement_gauge!(crate::metrics::METRIC_MYSQL_CONNECTIONS, 1.0);
});

View File

@@ -44,10 +44,8 @@ EXPLAIN SELECT a, b FROM test ORDER BY a, b;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] |
|_|_MergeScanExec: REDACTED
| logical_plan_| MergeScan [is_placeholder=false]_|
| physical_plan | MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -34,15 +34,8 @@ EXPLAIN SELECT SUM(i) FROM single_partition;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Aggregate: groupBy=[[]], aggr=[[SUM(single_partition.i)]]_|
|_|_Projection: single_partition.i_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[SUM(single_partition.i)]_|
|_|_CoalescePartitionsExec_|
|_|_AggregateExec: mode=Partial, gby=[], aggr=[SUM(single_partition.i)]_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_ProjectionExec: expr=[i@0 as i]_|
|_|_MergeScanExec: REDACTED
| logical_plan_| MergeScan [is_placeholder=false]_|
| physical_plan | MergeScanExec: REDACTED
|_|_|
+-+-+
@@ -56,10 +49,8 @@ EXPLAIN SELECT * FROM single_partition ORDER BY i DESC;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| Sort: single_partition.i DESC NULLS FIRST_|
|_|_MergeScan [is_placeholder=false]_|
| physical_plan | SortExec: expr=[i@0 DESC]_|
|_|_MergeScanExec: REDACTED
| logical_plan_| MergeScan [is_placeholder=false]_|
| physical_plan | MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -116,9 +116,7 @@ EXPLAIN INSERT INTO other SELECT i, 2 FROM integers WHERE i=(SELECT MAX(i) FROM
| | Projection: integers.i |
| | MergeScan [is_placeholder=false] |
| | SubqueryAlias: __scalar_sq_1 |
| | Aggregate: groupBy=[[]], aggr=[[MAX(integers.i)]] |
| | Projection: integers.i |
| | MergeScan [is_placeholder=false] |
| | MergeScan [is_placeholder=false] |
+--------------+-------------------------------------------------------------------+
drop table other;

View File

@@ -15,8 +15,7 @@ explain select * from numbers order by number desc;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
@@ -28,8 +27,7 @@ explain select * from numbers order by number asc;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
@@ -41,9 +39,7 @@ explain select * from numbers order by number desc limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
@@ -56,9 +52,7 @@ explain select * from numbers order by number asc limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |

View File

@@ -20,9 +20,10 @@ TQL ANALYZE (0, 10, '5s') test;
+-+-+
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED
|_|_SortExec: expr=[k@2 ASC NULLS LAST], REDACTED
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -18,13 +18,13 @@ TQL EXPLAIN (0, 10, '5s') test;
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] |
| | MergeScanExec: REDACTED
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
@@ -47,21 +47,21 @@ Affected Rows: 0
-- SQLNESS REPLACE (peers.*) REDACTED
TQL EXPLAIN host_load1{__field__="val"};
+---------------+------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivide: tags=["collector", "host"] |
| | Sort: host_load1.collector DESC NULLS LAST, host_load1.host DESC NULLS LAST, host_load1.ts DESC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | SortExec: expr=[collector@1 DESC NULLS LAST,host@2 DESC NULLS LAST,ts@3 DESC NULLS LAST] |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------+
+---------------+------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivide: tags=["collector", "host"] |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | SortExec: expr=[collector@1 ASC NULLS LAST,host@2 ASC NULLS LAST] |
| | MergeScanExec: REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------+
DROP TABLE host_load1;

View File

@@ -56,20 +56,13 @@ tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt *
+---------+---------------------+-----------------------------------------------------------------------------------------------------------+
| model | ts | completion.SUM(val * Float64(0.0015) / Float64(1000)) + prompt.SUM(val * Float64(0.0015) / Float64(1000)) |
+---------+---------------------+-----------------------------------------------------------------------------------------------------------+
| model-a | 1970-01-01T00:00:00 | 0.000825 |
| model-b | 1970-01-01T00:00:05 | 0.00066 |
| model-a | 1970-01-01T00:00:00 | 0.000165 |
| model-a | 1970-01-01T00:00:05 | 0.000165 |
| model-a | 1970-01-01T00:00:10 | 0.000495 |
| model-b | 1970-01-01T00:00:05 | 0.00033 |
| model-b | 1970-01-01T00:00:10 | 0.00033 |
+---------+---------------------+-----------------------------------------------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000);
+---------------------+-----------------------------------------------------------------------------------------------------------+
| ts | completion.SUM(val * Float64(0.0015) / Float64(1000)) + prompt.SUM(val * Float64(0.0015) / Float64(1000)) |
+---------------------+-----------------------------------------------------------------------------------------------------------+
| 1970-01-01T00:00:00 | 0.000225 |
| 1970-01-01T00:00:05 | 0.00051 |
+---------------------+-----------------------------------------------------------------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
tql eval(0, 10, '5s') sum(completion / 1000) + max(completion / 1000);

View File

@@ -29,9 +29,6 @@ tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) + sum(prompt * 0.0015 / 10
-- SQLNESS SORT_RESULT 3 1
tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000) by (model);
-- SQLNESS SORT_RESULT 3 1
tql eval(0, 10, '5s') sum(completion * 0.0015 / 1000) by (model) + sum(prompt * 0.0015 / 1000);
-- SQLNESS SORT_RESULT 3 1
tql eval(0, 10, '5s') sum(completion / 1000) + max(completion / 1000);

View File

@@ -8,7 +8,7 @@ EXPLAIN SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) LIMIT
+---------------+----------------------------------+
| plan_type | plan |
+---------------+----------------------------------+
| logical_plan | EmptyRelation |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | EmptyExec: produce_one_row=false |
| | |
+---------------+----------------------------------+
@@ -18,7 +18,7 @@ EXPLAIN SELECT * FROM (SELECT SUM(number) FROM numbers LIMIT 100000000000) WHERE
+---------------+----------------------------------+
| plan_type | plan |
+---------------+----------------------------------+
| logical_plan | EmptyRelation |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | EmptyExec: produce_one_row=false |
| | |
+---------------+----------------------------------+

View File

@@ -13,8 +13,7 @@ explain select * from numbers order by number desc;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
@@ -25,8 +24,7 @@ explain select * from numbers order by number asc;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
@@ -37,9 +35,7 @@ explain select * from numbers order by number desc limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
@@ -51,9 +47,7 @@ explain select * from numbers order by number asc limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| logical_plan | MergeScan [is_placeholder=false] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |

View File

@@ -20,9 +20,10 @@ TQL ANALYZE (0, 10, '5s') test;
+-+-+
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED
|_|_SortExec: expr=[k@2 ASC NULLS LAST], REDACTED
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+

View File

@@ -18,13 +18,13 @@ TQL EXPLAIN (0, 10, '5s') test;
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] |
| | MergeScanExec: REDACTED
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+