From b74fb7be4900c8a4f7fd8f25bd29ffa0b527a105 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 3 Dec 2024 15:04:43 +0800 Subject: [PATCH] feat: balance partition rows --- src/query/src/optimizer/parallelize_scan.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index 395c66abf0..46b016982f 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -108,6 +108,7 @@ impl ParallelizeScan { Ok(result) } + // TODO(yingwen): Update comment. /// Distribute [`PartitionRange`]s to each partition. /// /// Currently we use a simple round-robin strategy to assign ranges to partitions. @@ -118,11 +119,24 @@ impl ParallelizeScan { mut ranges: Vec, expected_partition_num: usize, ) -> Vec> { - let actual_partition_num = expected_partition_num.min(ranges.len()).max(1); - let mut partition_ranges = vec![vec![]; actual_partition_num]; + if ranges.is_empty() { + // Returns a single partition with no range. + return vec![vec![]]; + } // Sort ranges by number of rows in descending order. ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows)); + // Get the max row number of the ranges. Note that the number of rows may be 0 if statistics are not available. + let max_rows = ranges[0].num_rows; + let total_rows = ranges.iter().map(|range| range.num_rows).sum::(); + // Computes the partition num by the max row number. This eliminates the unbalance of the partitions. + let balanced_partition_num = if max_rows > 0 { + total_rows / max_rows + } else { + ranges.len() + }; + let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1); + let mut partition_ranges = vec![vec![]; actual_partition_num]; #[derive(Eq, PartialEq)] struct HeapNode {