diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 9365c8e1e8..3d3993d454 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -94,6 +94,8 @@ impl WindowedSortPhysicalRule { && scanner_info .time_index .contains(input_schema.field(column_expr.index()).name()) + && sort_exec.fetch().is_none() + // skip if there is a limit, as dyn filter along is good enough in this case { } else { return Ok(Transformed::no(plan)); diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index e12479cc5a..2c537304c8 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -237,14 +237,16 @@ impl ExecutionPlan for PartSortExec { } else { internal_err!("No children found")? }; - // create a new dynamic filter when with_new_children, as the old filter is bound to the old input and cannot be reused - let new = Self::try_new( - self.expression.clone(), - self.limit, - self.partition_ranges.clone(), - new_input.clone(), - )?; - Ok(Arc::new(new)) + let mut new_exec = self.as_ref().clone(); + new_exec.input = new_input.clone(); + let properties = new_exec.input.properties(); + new_exec.properties = Arc::new(PlanProperties::new( + new_exec.input.equivalence_properties().clone(), + new_exec.input.output_partitioning().clone(), + properties.emission_type, + properties.boundedness, + )); + Ok(Arc::new(new_exec)) } fn execute( diff --git a/tests/cases/distributed/explain/order_by.result b/tests/cases/distributed/explain/order_by.result index 362849afea..6ce8b4e170 100644 --- a/tests/cases/distributed/explain/order_by.result +++ b/tests/cases/distributed/explain/order_by.result @@ -126,8 +126,7 @@ EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5; |_|_|_| | 1_| 0_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts] REDACTED |_|_|_SortPreservingMergeExec: [test_pk.t__temp__0@2 DESC], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@1 DESC num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[t@1 DESC], preserve_partitioning=[true], filter=[t@1 IS NULL OR t@1 > 2] REDACTED |_|_|_ProjectionExec: expr=[i@0 as i, t@1 as t, t@1 as test_pk.t__temp__0] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| @@ -150,8 +149,7 @@ EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMI |_|_|_| | 1_| 0_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts] REDACTED |_|_|_SortPreservingMergeExec: [t@1 DESC], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@1 DESC num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[t@1 DESC], preserve_partitioning=[true], filter=[t@1 IS NULL OR t@1 > 2] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| |_|_| Total rows: 5_| diff --git a/tests/cases/standalone/common/order/order_by.result b/tests/cases/standalone/common/order/order_by.result index 13ac8caebe..9733778906 100644 --- a/tests/cases/standalone/common/order/order_by.result +++ b/tests/cases/standalone/common/order/order_by.result @@ -299,14 +299,12 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED -|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED fetch=2 REDACTED -|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED limit=2 REDACTED +|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true], filter=[ts@1 IS NULL OR ts@1 > 6000] REDACTED |_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| | 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED -|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=REDACTED fetch=2 REDACTED -|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=REDACTED limit=2 REDACTED +|_|_|_SortExec: TopK(fetch=2), expr=[ts@1 DESC], preserve_partitioning=[true] REDACTED |_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result index 4e550bf311..f85aa8c04e 100644 --- a/tests/cases/standalone/common/order/windowed_sort.result +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -70,7 +70,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[t@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -103,8 +103,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@1 DESC num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[t@1 DESC], preserve_partitioning=[true] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -137,7 +136,7 @@ EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t LIMIT 4; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST], fetch=4 REDACTED -|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=REDACTED fetch=4 REDACTED +|_|_|_SortExec: TopK(fetch=4), expr=[t@1 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_FilterExec: i@0 > 2 REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED |_|_|_| @@ -171,8 +170,7 @@ EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED -|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=REDACTED fetch=4 REDACTED -|_|_|_PartSortExec: expr=t@1 DESC num_ranges=REDACTED limit=4 REDACTED +|_|_|_SortExec: TopK(fetch=4), expr=[t@1 DESC], preserve_partitioning=[true] REDACTED |_|_|_FilterExec: i@0 > 2 REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED |_|_|_| @@ -206,8 +204,7 @@ EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED -|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=REDACTED fetch=4 REDACTED -|_|_|_PartSortExec: expr=t@1 DESC num_ranges=REDACTED limit=4 REDACTED +|_|_|_SortExec: TopK(fetch=4), expr=[t@1 DESC], preserve_partitioning=[true] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":2, "mem_ranges":1, "files":1, "file_ranges":1} REDACTED |_|_|_| |_|_| Total rows: 4_| @@ -289,8 +286,7 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[t@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -312,8 +308,7 @@ EXPLAIN ANALYZE VERBOSE SELECT * FROM test_pk ORDER BY t LIMIT 5; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[t@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":4, "mem_ranges":1, "REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -346,8 +341,7 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@2 DESC], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@2 DESC num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[t@2 DESC], preserve_partitioning=[true] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -381,8 +375,7 @@ EXPLAIN ANALYZE SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[t@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":4, "mem_ranges":1, "files":3, "file_ranges":3} REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -404,8 +397,7 @@ EXPLAIN ANALYZE VERBOSE SELECT * FROM test_pk where pk > 7 ORDER BY t LIMIT 5; |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[t@2 ASC NULLS LAST], preserve_partitioning=[true] REDACTED |_|_|_SeqScan: region=REDACTED, {"partition_count":{"count":4, "mem_ranges":1, "REDACTED |_|_|_| |_|_| Total rows: 5_| diff --git a/tests/cases/standalone/optimizer/order_by.result b/tests/cases/standalone/optimizer/order_by.result index 8bc4c14816..06b06ae442 100644 --- a/tests/cases/standalone/optimizer/order_by.result +++ b/tests/cases/standalone/optimizer/order_by.result @@ -142,8 +142,7 @@ EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY t DESC LIMIT 5; |_|_|_| | 1_| 0_|_ProjectionExec: expr=[i@0 as i, alias_ts@1 as alias_ts] REDACTED |_|_|_SortPreservingMergeExec: [t@2 DESC], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=alias_ts@1 DESC num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=alias_ts@1 DESC num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[alias_ts@1 DESC], preserve_partitioning=[true], filter=[alias_ts@1 IS NULL OR alias_ts@1 > 2] REDACTED |_|_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts, t@1 as t] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_| @@ -165,8 +164,7 @@ EXPLAIN ANALYZE SELECT i, t AS alias_ts FROM test_pk ORDER BY alias_ts DESC LIMI |_|_|_MergeScanExec: REDACTED |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [alias_ts@1 DESC], fetch=5 REDACTED -|_|_|_WindowedSortExec: expr=alias_ts@1 DESC num_ranges=REDACTED fetch=5 REDACTED -|_|_|_PartSortExec: expr=alias_ts@1 DESC num_ranges=REDACTED limit=5 REDACTED +|_|_|_SortExec: TopK(fetch=5), expr=[alias_ts@1 DESC], preserve_partitioning=[true], filter=[alias_ts@1 IS NULL OR alias_ts@1 > 2] REDACTED |_|_|_ProjectionExec: expr=[i@0 as i, t@1 as alias_ts] REDACTED |_|_|_SeqScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0} REDACTED |_|_|_|