feat: balance partition rows

This commit is contained in:
evenyag
2024-12-03 15:04:43 +08:00
parent 55e8d3549c
commit b74fb7be49

View File

@@ -108,6 +108,7 @@ impl ParallelizeScan {
Ok(result)
}
// TODO(yingwen): Update comment.
/// Distribute [`PartitionRange`]s to each partition.
///
/// Currently we use a simple round-robin strategy to assign ranges to partitions.
@@ -118,11 +119,24 @@ impl ParallelizeScan {
mut ranges: Vec<PartitionRange>,
expected_partition_num: usize,
) -> Vec<Vec<PartitionRange>> {
let actual_partition_num = expected_partition_num.min(ranges.len()).max(1);
let mut partition_ranges = vec![vec![]; actual_partition_num];
if ranges.is_empty() {
// Returns a single partition with no range.
return vec![vec![]];
}
// Sort ranges by number of rows in descending order.
ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
// Get the max row number of the ranges. Note that the number of rows may be 0 if statistics are not available.
let max_rows = ranges[0].num_rows;
let total_rows = ranges.iter().map(|range| range.num_rows).sum::<usize>();
// Computes the partition num by the max row number. This eliminates the unbalance of the partitions.
let balanced_partition_num = if max_rows > 0 {
total_rows / max_rows
} else {
ranges.len()
};
let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1);
let mut partition_ranges = vec![vec![]; actual_partition_num];
#[derive(Eq, PartialEq)]
struct HeapNode {