From b35eefcf45ef07f8e9f2d6163f1b1a4f4c3ea55a Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 7 Mar 2025 10:45:07 +0800 Subject: [PATCH] perf: rm coalesce batch when target_batch_size > fetch limit (#5658) * fix: rm coalesce > limit * fix: only rm one&test: sqlness --- src/query/src/optimizer/windowed_sort.rs | 30 +++++++++++++++++++ .../standalone/common/order/order_by.result | 2 -- .../common/order/windowed_sort.result | 3 -- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index d67ea6bd94..3ef6296247 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -80,6 +80,9 @@ impl WindowedSortPhysicalRule { let preserve_partitioning = sort_exec.preserve_partitioning(); let sort_input = remove_repartition(sort_exec.input().clone())?.data; + let sort_input = + remove_coalesce_batches_exec(sort_input, sort_exec.fetch())?.data; + // Gets scanner info from the input without repartition before filter. let Some(scanner_info) = fetch_partition_range(sort_input.clone())? else { return Ok(Transformed::no(plan)); @@ -237,6 +240,33 @@ fn remove_repartition( }) } +/// Remove `CoalesceBatchesExec` if the limit is less than the batch size. +/// +/// so that if limit is too small we can avoid need to scan for more rows than necessary +fn remove_coalesce_batches_exec( + plan: Arc, + fetch: Option, +) -> DataFusionResult>> { + let Some(fetch) = fetch else { + return Ok(Transformed::no(plan)); + }; + + // Avoid removing multiple coalesce batches + let mut is_done = false; + + plan.transform_down(|plan| { + if let Some(coalesce_batches_exec) = plan.as_any().downcast_ref::() { + let target_batch_size = coalesce_batches_exec.target_batch_size(); + if fetch < target_batch_size && !is_done { + is_done = true; + return Ok(Transformed::yes(coalesce_batches_exec.input().clone())); + } + } + + Ok(Transformed::no(plan)) + }) +} + /// Resolves alias of the time index column. /// /// i.e if a is time index, alias= {a:b, b:c}, then result should be {a, b}(not {a, c}) because projection is not transitive diff --git a/tests/cases/standalone/common/order/order_by.result b/tests/cases/standalone/common/order/order_by.result index 5535d40d6d..b052f8ffd9 100644 --- a/tests/cases/standalone/common/order/order_by.result +++ b/tests/cases/standalone/common/order/order_by.result @@ -300,14 +300,12 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2; | 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED |_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=1 fetch=2 REDACTED |_|_|_PartSortExec: expr=ts@1 DESC num_ranges=1 limit=2 REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| | 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED |_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=1 fetch=2 REDACTED |_|_|_PartSortExec: expr=ts@1 DESC num_ranges=1 limit=2 REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED |_|_|_| diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result index 10613d2f41..38fa690c4c 100644 --- a/tests/cases/standalone/common/order/windowed_sort.result +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -132,7 +132,6 @@ EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t LIMIT 4; |_|_|_| | 1_| 0_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST], fetch=4 REDACTED |_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=4 fetch=4 REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: i@0 > 2 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED |_|_|_| @@ -166,7 +165,6 @@ EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4; | 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED |_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=4 fetch=4 REDACTED |_|_|_PartSortExec: expr=t@1 DESC num_ranges=4 limit=4 REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: i@0 > 2 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED |_|_|_| @@ -200,7 +198,6 @@ EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4; | 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED |_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=4 REDACTED |_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=4 REDACTED -|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: t@1 > 8 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_|