From a7aa556763ecb491a5bff8db76ae24b70f89a737 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 28 Jun 2024 21:45:22 +0800 Subject: [PATCH] feat: output multiple partition in MergeScanExec (#4227) * feat: output multiple partition in MergeScanExec Signed-off-by: Ruihang Xia * fix range manipulate Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- .../src/extension_plan/range_manipulate.rs | 8 ++++- src/query/src/dist_plan/merge_scan.rs | 23 +++++++++--- src/query/src/dist_plan/planner.rs | 1 + .../distributed/explain/join_10_tables.result | 2 -- .../explain/multi_partitions.result | 3 +- .../cases/distributed/explain/order_by.result | 2 -- .../distributed/explain/subqueries.result | 2 -- .../optimizer/filter_push_down.result | 2 +- .../common/order/order_by_exceptions.result | 6 ++-- .../cases/standalone/common/partition.result | 12 ++++--- tests/cases/standalone/common/partition.sql | 4 +++ .../cases/standalone/common/range/nest.result | 2 ++ .../common/tql-explain-analyze/analyze.result | 12 ++++--- .../common/tql-explain-analyze/explain.result | 35 +++++++++++++------ 14 files changed, 77 insertions(+), 37 deletions(-) diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index 973fa5a38e..27ba8f4f3d 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -301,6 +301,12 @@ impl ExecutionPlan for RangeManipulateExec { children: Vec>, ) -> DataFusionResult> { assert!(!children.is_empty()); + let exec_input = children[0].clone(); + let properties = PlanProperties::new( + EquivalenceProperties::new(self.output_schema.clone()), + exec_input.properties().partitioning.clone(), + exec_input.properties().execution_mode, + ); Ok(Arc::new(Self { start: self.start, end: self.end, @@ -312,7 +318,7 @@ impl ExecutionPlan for RangeManipulateExec { output_schema: self.output_schema.clone(), input: children[0].clone(), metric: self.metric.clone(), - properties: self.properties.clone(), + properties, })) } diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 93bbff1f56..3b17a531db 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -134,6 +134,7 @@ pub struct MergeScanExec { /// Metrics from sub stages sub_stage_metrics: Arc>>, query_ctx: QueryContextRef, + target_partition: usize, } impl std::fmt::Debug for MergeScanExec { @@ -154,11 +155,12 @@ impl MergeScanExec { arrow_schema: &ArrowSchema, region_query_handler: RegionQueryHandlerRef, query_ctx: QueryContextRef, + target_partition: usize, ) -> Result { let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema); let properties = PlanProperties::new( EquivalenceProperties::new(arrow_schema_without_metadata.clone()), - Partitioning::UnknownPartitioning(1), + Partitioning::UnknownPartitioning(target_partition), ExecutionMode::Bounded, ); let schema_without_metadata = @@ -174,10 +176,15 @@ impl MergeScanExec { sub_stage_metrics: Arc::default(), properties, query_ctx, + target_partition, }) } - pub fn to_stream(&self, context: Arc) -> Result { + pub fn to_stream( + &self, + context: Arc, + partition: usize, + ) -> Result { let regions = self.regions.clone(); let region_query_handler = self.region_query_handler.clone(); let metric = MergeScanMetric::new(&self.metric); @@ -189,6 +196,7 @@ impl MergeScanExec { let current_schema = self.query_ctx.current_schema().to_string(); let timezone = self.query_ctx.timezone().to_string(); let extensions = self.query_ctx.extensions(); + let target_partition = self.target_partition; let sub_sgate_metrics_moved = self.sub_stage_metrics.clone(); let plan = self.plan.clone(); @@ -198,7 +206,12 @@ impl MergeScanExec { let mut ready_timer = metric.ready_time().timer(); let mut first_consume_timer = Some(metric.first_consume_time().timer()); - for region_id in regions { + for region_id in regions + .iter() + .skip(partition) + .step_by(target_partition) + .copied() + { let request = QueryRequest { header: Some(RegionRequestHeader { tracing_context: tracing_context.to_w3c(), @@ -325,11 +338,11 @@ impl ExecutionPlan for MergeScanExec { fn execute( &self, - _partition: usize, + partition: usize, context: Arc, ) -> Result { Ok(Box::pin(DfRecordBatchStreamAdapter::new( - self.to_stream(context)?, + self.to_stream(context, partition)?, ))) } diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 7b56538da4..41227e8687 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -107,6 +107,7 @@ impl ExtensionPlanner for DistExtensionPlanner { &schema, self.region_query_handler.clone(), query_ctx, + session_state.config().target_partitions(), )?; Ok(Some(Arc::new(merge_scan_plan) as _)) } diff --git a/tests/cases/distributed/explain/join_10_tables.result b/tests/cases/distributed/explain/join_10_tables.result index 172d43aa1d..5fa0194c14 100644 --- a/tests/cases/distributed/explain/join_10_tables.result +++ b/tests/cases/distributed/explain/join_10_tables.result @@ -121,13 +121,11 @@ limit 1; |_|_RepartitionExec: partitioning=REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_FilterExec: vin@1 IS NOT NULL_| -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_FilterExec: vin@1 IS NOT NULL_| -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED diff --git a/tests/cases/distributed/explain/multi_partitions.result b/tests/cases/distributed/explain/multi_partitions.result index 660a4eee30..6bbb6abb8e 100644 --- a/tests/cases/distributed/explain/multi_partitions.result +++ b/tests/cases/distributed/explain/multi_partitions.result @@ -27,7 +27,8 @@ explain SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY hos +-+-+ | logical_plan_| Sort: demo.host ASC NULLS LAST_| |_|_MergeScan [is_placeholder=false]_| -| physical_plan | SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[false]_| +| physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[true]_| |_|_MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/distributed/explain/order_by.result b/tests/cases/distributed/explain/order_by.result index 221ac05fcd..64ddc07be9 100644 --- a/tests/cases/distributed/explain/order_by.result +++ b/tests/cases/distributed/explain/order_by.result @@ -21,7 +21,6 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1; |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED |_|_AggregateExec: mode=Partial, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[]_| -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_| +-+-+ @@ -68,7 +67,6 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b; |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_RepartitionExec: partitioning=REDACTED |_|_AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]_| -|_|_RepartitionExec: partitioning=REDACTED |_|_MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/distributed/explain/subqueries.result b/tests/cases/distributed/explain/subqueries.result index f0e41496a4..1a9ccd4e36 100644 --- a/tests/cases/distributed/explain/subqueries.result +++ b/tests/cases/distributed/explain/subqueries.result @@ -94,10 +94,8 @@ order by t.i desc; |_|_CoalescePartitionsExec_| |_|_CoalesceBatchesExec: target_batch_size=8192_| |_|_FilterExec: i@0 IS NOT NULL_| -|_|_RepartitionExec: partitioning=REDACTED |_|_ProjectionExec: expr=[i@0 as i]_| |_|_MergeScanExec: REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_ProjectionExec: expr=[]_| |_|_MergeScanExec: REDACTED |_|_| diff --git a/tests/cases/distributed/optimizer/filter_push_down.result b/tests/cases/distributed/optimizer/filter_push_down.result index fa4dab8a9a..6adff4ba76 100644 --- a/tests/cases/distributed/optimizer/filter_push_down.result +++ b/tests/cases/distributed/optimizer/filter_push_down.result @@ -259,9 +259,9 @@ EXPLAIN SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2) a1 WHER | physical_plan | CoalescePartitionsExec_| |_|_ProjectionExec: expr=[false as cond]_| |_|_CrossJoinExec_| +|_|_CoalescePartitionsExec_| |_|_ProjectionExec: expr=[]_| |_|_MergeScanExec: REDACTED -|_|_RepartitionExec: partitioning=REDACTED |_|_ProjectionExec: expr=[]_| |_|_MergeScanExec: REDACTED |_|_| diff --git a/tests/cases/standalone/common/order/order_by_exceptions.result b/tests/cases/standalone/common/order/order_by_exceptions.result index 851eaacb3e..9679c45958 100644 --- a/tests/cases/standalone/common/order/order_by_exceptions.result +++ b/tests/cases/standalone/common/order/order_by_exceptions.result @@ -81,11 +81,9 @@ EXPLAIN SELECT a % 2, b FROM test UNION SELECT a % 2 AS k, b FROM test ORDER BY | | AggregateExec: mode=Partial, gby=[test.a % Int64(2)@0 as test.a % Int64(2), b@1 as b], aggr=[] | | | UnionExec | | | ProjectionExec: expr=[CAST(a@0 AS Int64) % 2 as test.a % Int64(2), b@1 as b] | -| | RepartitionExec: REDACTED -| | MergeScanExec: REDACTED +| | MergeScanExec: REDACTED | | ProjectionExec: expr=[CAST(a@0 AS Int64) % 2 as test.a % Int64(2), b@1 as b] | -| | RepartitionExec: REDACTED -| | MergeScanExec: REDACTED +| | MergeScanExec: REDACTED | | | +---------------+-----------------------------------------------------------------------------------------------------------+ diff --git a/tests/cases/standalone/common/partition.result b/tests/cases/standalone/common/partition.result index c062465ad8..3fb5f50ddb 100644 --- a/tests/cases/standalone/common/partition.result +++ b/tests/cases/standalone/common/partition.result @@ -46,15 +46,16 @@ INSERT INTO my_table VALUES Affected Rows: 8 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM my_table; +------+---+-------------------------+ | a | b | ts | +------+---+-------------------------+ | 100 | a | 1970-01-01T00:00:00.001 | -| 200 | b | 1970-01-01T00:00:00.002 | | 1100 | c | 1970-01-01T00:00:00.003 | | 1200 | d | 1970-01-01T00:00:00.004 | +| 200 | b | 1970-01-01T00:00:00.002 | | 2000 | e | 1970-01-01T00:00:00.005 | | 2100 | f | 1970-01-01T00:00:00.006 | | 2200 | g | 1970-01-01T00:00:00.007 | @@ -65,14 +66,15 @@ DELETE FROM my_table WHERE a < 150; Affected Rows: 1 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM my_table; +------+---+-------------------------+ | a | b | ts | +------+---+-------------------------+ -| 200 | b | 1970-01-01T00:00:00.002 | | 1100 | c | 1970-01-01T00:00:00.003 | | 1200 | d | 1970-01-01T00:00:00.004 | +| 200 | b | 1970-01-01T00:00:00.002 | | 2000 | e | 1970-01-01T00:00:00.005 | | 2100 | f | 1970-01-01T00:00:00.006 | | 2200 | g | 1970-01-01T00:00:00.007 | @@ -83,14 +85,15 @@ DELETE FROM my_table WHERE a < 2200 AND a > 1500; Affected Rows: 2 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM my_table; +------+---+-------------------------+ | a | b | ts | +------+---+-------------------------+ -| 200 | b | 1970-01-01T00:00:00.002 | | 1100 | c | 1970-01-01T00:00:00.003 | | 1200 | d | 1970-01-01T00:00:00.004 | +| 200 | b | 1970-01-01T00:00:00.002 | | 2200 | g | 1970-01-01T00:00:00.007 | | 2400 | h | 1970-01-01T00:00:00.008 | +------+---+-------------------------+ @@ -148,15 +151,16 @@ INSERT INTO my_table VALUES Affected Rows: 8 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM my_table; +------+---+-------------------------+ | a | b | ts | +------+---+-------------------------+ | 100 | a | 1970-01-01T00:00:00.001 | -| 200 | b | 1970-01-01T00:00:00.002 | | 1100 | c | 1970-01-01T00:00:00.003 | | 1200 | d | 1970-01-01T00:00:00.004 | +| 200 | b | 1970-01-01T00:00:00.002 | | 2000 | e | 1970-01-01T00:00:00.005 | | 2100 | f | 1970-01-01T00:00:00.006 | | 2200 | g | 1970-01-01T00:00:00.007 | diff --git a/tests/cases/standalone/common/partition.sql b/tests/cases/standalone/common/partition.sql index 569f53f078..55f3d87cf9 100644 --- a/tests/cases/standalone/common/partition.sql +++ b/tests/cases/standalone/common/partition.sql @@ -26,14 +26,17 @@ INSERT INTO my_table VALUES (2200, 'g', 7), (2400, 'h', 8); +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM my_table; DELETE FROM my_table WHERE a < 150; +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM my_table; DELETE FROM my_table WHERE a < 2200 AND a > 1500; +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM my_table; DELETE FROM my_table WHERE a < 2500; @@ -66,6 +69,7 @@ INSERT INTO my_table VALUES (2200, 'g', 7), (2400, 'h', 8); +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM my_table; DROP TABLE my_table; diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result index af9a9824f9..0beecbdf2d 100644 --- a/tests/cases/standalone/common/range/nest.result +++ b/tests/cases/standalone/common/range/nest.result @@ -58,6 +58,7 @@ EXPLAIN SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; | logical_plan_| RangeSelect: range_exprs=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host.host], time_index=ts | |_|_MergeScan [is_placeholder=false]_| | physical_plan | RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts | +|_|_CoalescePartitionsExec_| |_|_MergeScanExec: REDACTED |_|_| +-+-+ @@ -72,6 +73,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; | stage | node | plan_| +-+-+-+ | 0_| 0_|_RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts REDACTED +|_|_|_CoalescePartitionsExec REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SeqScan: partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index 091ca74c49..ee989101de 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -21,7 +21,8 @@ TQL ANALYZE (0, 10, '5s') test; | 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED |_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED |_|_|_PromSeriesDivideExec: tags=["k"] REDACTED -|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED @@ -50,7 +51,8 @@ TQL ANALYZE (0, 10, '1s', '2s') test; | 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j] REDACTED |_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED |_|_|_PromSeriesDivideExec: tags=["k"] REDACTED -|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED @@ -78,7 +80,8 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp | 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED |_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED |_|_|_PromSeriesDivideExec: tags=["k"] REDACTED -|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED @@ -108,7 +111,8 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test; | 0_| 0_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j] REDACTED |_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] REDACTED |_|_|_PromSeriesDivideExec: tags=["k"] REDACTED -|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] REDACTED +|_|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST] REDACTED +|_|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] REDACTED diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index 0cfe77f1a8..966a68f9ef 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -22,8 +22,9 @@ TQL EXPLAIN (0, 10, '5s') test; | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivideExec: tags=["k"] | -| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] | -| | MergeScanExec: REDACTED +| | SortPreservingMergeExec: [k@2 ASC NULLS LAST] | +| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] | +| | MergeScanExec: REDACTED | | | +---------------+-----------------------------------------------------------------------------------------------+ @@ -43,8 +44,9 @@ TQL EXPLAIN (0, 10, '1s', '2s') test; | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] | | | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivideExec: tags=["k"] | -| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] | -| | MergeScanExec: REDACTED +| | SortPreservingMergeExec: [k@2 ASC NULLS LAST] | +| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] | +| | MergeScanExec: REDACTED | | | +---------------+---------------------------------------------------------------------------------------------+ @@ -63,8 +65,9 @@ TQL EXPLAIN ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp | physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | | | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | | | PromSeriesDivideExec: tags=["k"] | -| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false] | -| | MergeScanExec: REDACTED +| | SortPreservingMergeExec: [k@2 ASC NULLS LAST] | +| | SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true] | +| | MergeScanExec: REDACTED | | | +---------------+-----------------------------------------------------------------------------------------------+ @@ -151,13 +154,20 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | physical_plan after aggregate_statistics_| SAME TEXT AS ABOVE_| | physical_plan after join_selection_| SAME TEXT AS ABOVE_| | physical_plan after LimitedDistinctAggregation_| SAME TEXT AS ABOVE_| -| physical_plan after EnforceDistribution_| SAME TEXT AS ABOVE_| +| physical_plan after EnforceDistribution_| OutputRequirementExec_| +|_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| +|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| +|_|_PromSeriesDivideExec: tags=["k"]_| +|_|_CoalescePartitionsExec_| +|_|_MergeScanExec: REDACTED +|_|_| | physical_plan after CombinePartialFinalAggregate_| SAME TEXT AS ABOVE_| | physical_plan after EnforceSorting_| OutputRequirementExec_| |_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivideExec: tags=["k"]_| -|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false]_| +|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_| +|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_| |_|_MergeScanExec: REDACTED |_|_| | physical_plan after OptimizeAggregateOrder_| SAME TEXT AS ABOVE_| @@ -166,7 +176,8 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | physical_plan after OutputRequirements_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivideExec: tags=["k"]_| -|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false]_| +|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_| +|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_| |_|_MergeScanExec: REDACTED |_|_| | physical_plan after LimitAggregation_| SAME TEXT AS ABOVE_| @@ -176,13 +187,15 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; | physical_plan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| |_|_PromSeriesDivideExec: tags=["k"]_| -|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false]_| +|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST]_| +|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true]_| |_|_MergeScanExec: REDACTED |_|_| | physical_plan_with_stats_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j], statistics=[Rows=Inexact(0), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] | |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| |_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| -|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[false], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_SortPreservingMergeExec: [k@2 ASC NULLS LAST], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| +|_|_SortExec: expr=[k@2 ASC NULLS LAST], preserve_partitioning=[true], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| |_|_MergeScanExec: REDACTED |_|_| +-+-+