fix: specify input ordering and distribution for prom plan (#2204)

* fix: specify input ordering and distribution for prom plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-08-18 17:45:46 +08:00
committed by GitHub
parent e1ce1d86a1
commit 3150f4b22e
6 changed files with 30 additions and 13 deletions

View File

@@ -26,7 +26,7 @@ use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogi
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use datatypes::arrow::array::TimestampMillisecondArray;
@@ -166,6 +166,10 @@ impl ExecutionPlan for SeriesNormalizeExec {
self.input.schema()
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
}
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
}

View File

@@ -24,7 +24,8 @@ use datafusion::common::{DFSchema, DFSchemaRef};
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion::physical_plan::expressions::Column as ColumnExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
@@ -136,7 +137,19 @@ impl ExecutionPlan for SeriesDivideExec {
vec![Distribution::SinglePartition]
}
// TODO(ruihang): specify required input ordering
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let input_schema = self.input.schema();
let exprs = self
.tag_columns
.iter()
.map(|tag| PhysicalSortRequirement {
// Safety: the tag column names is verified in the planning phase
expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()),
options: None,
})
.collect();
vec![Some(exprs)]
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()

View File

@@ -19,10 +19,10 @@ TQL ANALYZE (0, 10, '5s') test;
| plan_type_| plan_|
+-+-+
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED
|_|_MergeScanExec: peers=[REDACTED
|_|_|
+-+-+

View File

@@ -22,8 +22,8 @@ TQL EXPLAIN (0, 10, '5s') test;
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] |
| | MergeScanExec: peers=[REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
@@ -57,8 +57,8 @@ TQL EXPLAIN host_load1{__field__="val"};
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] |
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | SortExec: expr=[collector@1 DESC NULLS LAST,host@2 DESC NULLS LAST,ts@3 DESC NULLS LAST] |
| | MergeScanExec: peers=[REDACTED
| | |
+---------------+------------------------------------------------------------------------------------------------------------------+

View File

@@ -18,10 +18,10 @@ TQL ANALYZE (0, 10, '5s') test;
| plan_type_| plan_|
+-+-+
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED
|_|_StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] }, REDACTED
|_|_|
+-+-+

View File

@@ -21,8 +21,8 @@ TQL EXPLAIN (0, 10, '5s') test;
| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+