diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index 28977ef4d5..968e8a6d37 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -26,7 +26,7 @@ use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogi use datafusion::physical_expr::PhysicalSortExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, }; use datatypes::arrow::array::TimestampMillisecondArray; @@ -166,6 +166,10 @@ impl ExecutionPlan for SeriesNormalizeExec { self.input.schema() } + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] + } + fn output_partitioning(&self) -> Partitioning { self.input.output_partitioning() } diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 502d08ce0b..55be29ab18 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -24,7 +24,8 @@ use datafusion::common::{DFSchema, DFSchemaRef}; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore}; -use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; +use datafusion::physical_plan::expressions::Column as ColumnExpr; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream, @@ -136,7 +137,19 @@ impl ExecutionPlan for SeriesDivideExec { vec![Distribution::SinglePartition] } - // TODO(ruihang): specify required input ordering + fn required_input_ordering(&self) -> Vec>> { + let input_schema = self.input.schema(); + let exprs = self + .tag_columns + .iter() + .map(|tag| PhysicalSortRequirement { + // Safety: the tag column names is verified in the planning phase + expr: Arc::new(ColumnExpr::new_with_schema(tag, &input_schema).unwrap()), + options: None, + }) + .collect(); + vec![Some(exprs)] + } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { self.input.output_ordering() diff --git a/tests/cases/distributed/tql-explain-analyze/analyze.result b/tests/cases/distributed/tql-explain-analyze/analyze.result index c75c9b00f4..c8e8a785f9 100644 --- a/tests/cases/distributed/tql-explain-analyze/analyze.result +++ b/tests/cases/distributed/tql-explain-analyze/analyze.result @@ -19,10 +19,10 @@ TQL ANALYZE (0, 10, '5s') test; | plan_type_| plan_| +-+-+ | Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED +|_|_RepartitionExec: partitioning=REDACTED |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED -|_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_PromSeriesDivideExec: tags=["k"], REDACTED +|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED |_|_MergeScanExec: peers=[REDACTED |_|_| +-+-+ diff --git a/tests/cases/distributed/tql-explain-analyze/explain.result b/tests/cases/distributed/tql-explain-analyze/explain.result index 5f924aafde..9c6bf36e3f 100644 --- a/tests/cases/distributed/tql-explain-analyze/explain.result +++ b/tests/cases/distributed/tql-explain-analyze/explain.result @@ -22,8 +22,8 @@ TQL EXPLAIN (0, 10, '5s') test; | | MergeScan [is_placeholder=false] | | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | -| | RepartitionExec: partitioning=REDACTED -| | PromSeriesDivideExec: tags=["k"] | +| | PromSeriesDivideExec: tags=["k"] | +| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] | | | MergeScanExec: peers=[REDACTED | | | +---------------+-----------------------------------------------------------------------------------------------+ @@ -57,8 +57,8 @@ TQL EXPLAIN host_load1{__field__="val"}; | | MergeScan [is_placeholder=false] | | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] | | | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] | -| | RepartitionExec: partitioning=REDACTED -| | PromSeriesDivideExec: tags=["collector", "host"] | +| | PromSeriesDivideExec: tags=["collector", "host"] | +| | SortExec: expr=[collector@1 DESC NULLS LAST,host@2 DESC NULLS LAST,ts@3 DESC NULLS LAST] | | | MergeScanExec: peers=[REDACTED | | | +---------------+------------------------------------------------------------------------------------------------------------------+ diff --git a/tests/cases/standalone/tql-explain-analyze/analyze.result b/tests/cases/standalone/tql-explain-analyze/analyze.result index 38d7b58777..46e4b3b4e7 100644 --- a/tests/cases/standalone/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/tql-explain-analyze/analyze.result @@ -18,10 +18,10 @@ TQL ANALYZE (0, 10, '5s') test; | plan_type_| plan_| +-+-+ | Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED +|_|_RepartitionExec: partitioning=REDACTED |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED -|_|_RepartitionExec: partitioning=REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_PromSeriesDivideExec: tags=["k"], REDACTED +|_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED |_|_StreamScanAdapter { stream: "", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] }, REDACTED |_|_| +-+-+ diff --git a/tests/cases/standalone/tql-explain-analyze/explain.result b/tests/cases/standalone/tql-explain-analyze/explain.result index 4c00bbeef1..ae1b4bb936 100644 --- a/tests/cases/standalone/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/tql-explain-analyze/explain.result @@ -21,8 +21,8 @@ TQL EXPLAIN (0, 10, '5s') test; | | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] | | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | -| | RepartitionExec: partitioning=REDACTED -| | PromSeriesDivideExec: tags=["k"] | +| | PromSeriesDivideExec: tags=["k"] | +| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] | | | StreamScanAdapter { stream: "", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | | | | +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+