perf: rm coalesce batch when target_batch_size > fetch limit (#5658)

* fix: rm coalesce > limit

* fix: only rm one&test: sqlness
This commit is contained in:
discord9
2025-03-07 10:45:07 +08:00
committed by GitHub
parent 408dd55a2f
commit b35eefcf45
3 changed files with 30 additions and 5 deletions

View File

@@ -80,6 +80,9 @@ impl WindowedSortPhysicalRule {
let preserve_partitioning = sort_exec.preserve_partitioning();
let sort_input = remove_repartition(sort_exec.input().clone())?.data;
let sort_input =
remove_coalesce_batches_exec(sort_input, sort_exec.fetch())?.data;
// Gets scanner info from the input without repartition before filter.
let Some(scanner_info) = fetch_partition_range(sort_input.clone())? else {
return Ok(Transformed::no(plan));
@@ -237,6 +240,33 @@ fn remove_repartition(
})
}
/// Remove `CoalesceBatchesExec` if the limit is less than the batch size.
///
/// so that if limit is too small we can avoid need to scan for more rows than necessary
fn remove_coalesce_batches_exec(
plan: Arc<dyn ExecutionPlan>,
fetch: Option<usize>,
) -> DataFusionResult<Transformed<Arc<dyn ExecutionPlan>>> {
let Some(fetch) = fetch else {
return Ok(Transformed::no(plan));
};
// Avoid removing multiple coalesce batches
let mut is_done = false;
plan.transform_down(|plan| {
if let Some(coalesce_batches_exec) = plan.as_any().downcast_ref::<CoalesceBatchesExec>() {
let target_batch_size = coalesce_batches_exec.target_batch_size();
if fetch < target_batch_size && !is_done {
is_done = true;
return Ok(Transformed::yes(coalesce_batches_exec.input().clone()));
}
}
Ok(Transformed::no(plan))
})
}
/// Resolves alias of the time index column.
///
/// i.e if a is time index, alias= {a:b, b:c}, then result should be {a, b}(not {a, c}) because projection is not transitive

View File

@@ -300,14 +300,12 @@ explain analyze select tag from t where num > 6 order by ts desc limit 2;
| 1_| 0_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=1 fetch=2 REDACTED
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=1 limit=2 REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|
| 1_| 1_|_SortPreservingMergeExec: [ts@1 DESC], fetch=2 REDACTED
|_|_|_WindowedSortExec: expr=ts@1 DESC num_ranges=1 fetch=2 REDACTED
|_|_|_PartSortExec: expr=ts@1 DESC num_ranges=1 limit=2 REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: num@2 > 6, projection=[tag@0, ts@1] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|

View File

@@ -132,7 +132,6 @@ EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t LIMIT 4;
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST], fetch=4 REDACTED
|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=4 fetch=4 REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: i@0 > 2 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
@@ -166,7 +165,6 @@ EXPLAIN ANALYZE SELECT * FROM test where i > 2 ORDER BY t DESC LIMIT 4;
| 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=4 fetch=4 REDACTED
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=4 limit=4 REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: i@0 > 2 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
@@ -200,7 +198,6 @@ EXPLAIN ANALYZE SELECT * FROM test where t > 8 ORDER BY t DESC LIMIT 4;
| 1_| 0_|_SortPreservingMergeExec: [t@1 DESC], fetch=4 REDACTED
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=4 REDACTED
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=4 REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: t@1 > 8 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_|