From 55e8d3549cb57eb00281532f6ed9d1ade03ac5d3 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 26 Nov 2024 20:17:32 +0800 Subject: [PATCH] feat: assign partition ranges by rows --- src/mito2/src/read/range.rs | 138 ++++++++++++++++---- src/mito2/src/read/scan_region.rs | 54 ++++++++ src/mito2/src/read/scan_util.rs | 1 + src/mito2/src/read/seq_scan.rs | 12 +- src/query/src/optimizer/parallelize_scan.rs | 128 +++++++++++++++--- 5 files changed, 287 insertions(+), 46 deletions(-) diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 1944d171dd..c32c0223ba 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -32,7 +32,7 @@ use crate::sst::parquet::format::parquet_row_group_time_range; use crate::sst::parquet::reader::ReaderMetrics; use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; -const ALL_ROW_GROUPS: i64 = -1; +pub(crate) const ALL_ROW_GROUPS: i64 = -1; /// Index to access a row group. #[derive(Debug, Clone, Copy, PartialEq)] @@ -52,7 +52,7 @@ pub(crate) struct RangeMeta { /// The time range of the range. pub(crate) time_range: FileTimeRange, /// Indices to memtables or files. - indices: SmallVec<[usize; 2]>, + pub(crate) indices: SmallVec<[usize; 2]>, /// Indices to memtable/file row groups that this range scans. pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>, /// Estimated number of rows in the range. This can be 0 if the statistics are not available. @@ -87,7 +87,19 @@ impl RangeMeta { Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges); let ranges = group_ranges_for_seq_scan(ranges); - maybe_split_ranges_for_seq_scan(ranges) + // common_telemetry::info!( + // "DEBUG_SCAN: seq_scan_ranges before split, num_ranges: {}", + // ranges.len(), + // ); + // maybe_split_ranges_for_seq_scan(ranges) + // let ranges = + // maybe_split_ranges_for_seq_scan_groups(ranges, input.memtables.len(), &input.files); + // common_telemetry::info!( + // "DEBUG_SCAN: seq_scan_ranges after split, num_ranges: {}, ranges: {:?}", + // ranges.len(), + // ranges, + // ); + ranges } /// Creates a list of ranges from the `input` for unordered scan. @@ -132,7 +144,18 @@ impl RangeMeta { /// still preserve the order for [SeqScan]. pub(crate) fn can_split_preserve_order(&self) -> bool { // Only one source and multiple row groups. - self.indices.len() == 1 && self.row_group_indices.len() > 1 + if self.indices.len() == 1 { + if self.row_group_indices.len() > 1 { + return true; + } else if self.row_group_indices.len() == 1 + && self.row_group_indices[0].row_group_index == ALL_ROW_GROUPS + { + return true; + } + } + false + // fixme(yingwen): Remove this. + // self.indices.len() == 1 && self.row_group_indices.len() > 1 } /// Splits the range if it can preserve the order. @@ -154,6 +177,28 @@ impl RangeMeta { } } + /// Splits the range if it can preserve the order. + pub(crate) fn maybe_split_groups(self, num_row_groups: u64, output: &mut Vec) { + if self.can_split_preserve_order() && num_row_groups > 0 { + output.reserve(self.row_group_indices.len()); + let num_rows = self.num_rows / num_row_groups as usize; + // Splits by row group. + for row_group_index in 0..num_row_groups { + output.push(RangeMeta { + time_range: self.time_range, + indices: self.indices.clone(), + row_group_indices: smallvec![RowGroupIndex { + index: self.indices[0], + row_group_index: row_group_index as i64, + }], + num_rows, + }); + } + } else { + output.push(self); + } + } + fn push_unordered_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec) { // For append mode, we can parallelize reading memtables. for (memtable_index, memtable) in memtables.iter().enumerate() { @@ -263,31 +308,40 @@ impl RangeMeta { // For non append-only mode, each range only contains one file. for (i, file) in files.iter().enumerate() { let file_index = num_memtables + i; - if file.meta_ref().num_row_groups > 0 { - // All row groups share the same time range. - let row_group_indices = (0..file.meta_ref().num_row_groups) - .map(|row_group_index| RowGroupIndex { - index: file_index, - row_group_index: row_group_index as i64, - }) - .collect(); - ranges.push(RangeMeta { - time_range: file.time_range(), - indices: smallvec![file_index], - row_group_indices, - num_rows: file.meta_ref().num_rows as usize, - }); - } else { - ranges.push(RangeMeta { - time_range: file.time_range(), - indices: smallvec![file_index], - row_group_indices: smallvec![RowGroupIndex { - index: file_index, - row_group_index: ALL_ROW_GROUPS, - }], - num_rows: file.meta_ref().num_rows as usize, - }); - } + ranges.push(RangeMeta { + time_range: file.time_range(), + indices: smallvec![file_index], + row_group_indices: smallvec![RowGroupIndex { + index: file_index, + row_group_index: ALL_ROW_GROUPS, + }], + num_rows: file.meta_ref().num_rows as usize, + }); + // if file.meta_ref().num_row_groups > 0 { + // // All row groups share the same time range. + // let row_group_indices = (0..file.meta_ref().num_row_groups) + // .map(|row_group_index| RowGroupIndex { + // index: file_index, + // row_group_index: row_group_index as i64, + // }) + // .collect(); + // ranges.push(RangeMeta { + // time_range: file.time_range(), + // indices: smallvec![file_index], + // row_group_indices, + // num_rows: file.meta_ref().num_rows as usize, + // }); + // } else { + // ranges.push(RangeMeta { + // time_range: file.time_range(), + // indices: smallvec![file_index], + // row_group_indices: smallvec![RowGroupIndex { + // index: file_index, + // row_group_index: ALL_ROW_GROUPS, + // }], + // num_rows: file.meta_ref().num_rows as usize, + // }); + // } } } } @@ -341,6 +395,27 @@ fn maybe_split_ranges_for_seq_scan(ranges: Vec) -> Vec { new_ranges } +/// Splits the range into multiple smaller ranges. +/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()]. +fn maybe_split_ranges_for_seq_scan_groups( + ranges: Vec, + num_memtables: usize, + files: &[FileHandle], +) -> Vec { + let mut new_ranges = Vec::with_capacity(ranges.len()); + for range in ranges { + if range.indices[0] < num_memtables { + range.maybe_split(&mut new_ranges); + } else { + let index = range.indices[0] - num_memtables; + let file = &files[index]; + range.maybe_split_groups(file.meta_ref().num_row_groups, &mut new_ranges); + } + } + + new_ranges +} + /// Builder to create file ranges. #[derive(Default)] pub(crate) struct FileRangeBuilder { @@ -457,6 +532,11 @@ impl RangeBuilderList { Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges), None => { let builder = input.prune_file(file_index, reader_metrics).await?; + // common_telemetry::info!( + // "DEBUG_SCAN: build file ranges, file_index: {}, build_cost: {:?}", + // file_index, + // reader_metrics.build_cost + // ); builder.build_ranges(index.row_group_index, &mut ranges); self.set_file_builder(file_index, Arc::new(builder)); } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 7da80806f2..3a42ec42c9 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -635,6 +635,55 @@ impl ScanInput { Ok(sources) } + /// Scans sources in parallel. + /// + /// # Panics if the input doesn't allow parallel scan. + pub(crate) fn create_parallel_sources_no_semaphore( + &self, + sources: Vec, + ) -> Result> { + // Spawn a task for each source. + let sources = sources + .into_iter() + .map(|source| { + let (sender, receiver) = mpsc::channel(self.parallelism.channel_size); + self.spawn_scan_task_no_semaphore(source, sender); + let stream = Box::pin(ReceiverStream::new(receiver)); + Source::Stream(stream) + }) + .collect(); + Ok(sources) + } + + /// Scans the input source in another task and sends batches to the sender. + pub(crate) fn spawn_scan_task_no_semaphore( + &self, + mut input: Source, + sender: mpsc::Sender>, + ) { + common_runtime::spawn_global(async move { + loop { + // We release the permit before sending result to avoid the task waiting on + // the channel with the permit held. + let maybe_batch = { + // // Safety: We never close the semaphore. + // let _permit = semaphore.acquire().await.unwrap(); + input.next_batch().await + }; + match maybe_batch { + Ok(Some(batch)) => { + let _ = sender.send(Ok(batch)).await; + } + Ok(None) => break, + Err(e) => { + let _ = sender.send(Err(e)).await; + break; + } + } + } + }); + } + /// Prunes a memtable to scan and returns the builder to build readers. pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder { let memtable = &self.memtables[mem_index]; @@ -764,6 +813,11 @@ impl StreamContext { pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self { let query_start = input.query_start.unwrap_or_else(Instant::now); let ranges = RangeMeta::seq_scan_ranges(&input); + // common_telemetry::info!( + // "DEBUG_SCAN: seq_scan_ctx, num_ranges: {}, ranges: {:?}", + // ranges.len(), + // ranges + // ); READ_SST_COUNT.observe(input.num_files() as f64); Self { diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index df790d191a..79c2ad4478 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -172,6 +172,7 @@ pub(crate) fn scan_file_ranges( let build_reader_start = Instant::now(); let reader = range.reader(stream_ctx.input.series_row_selector).await?; let build_cost = build_reader_start.elapsed(); + // common_telemetry::info!("DEBUG_SCAN: scan file ranges, build reader, file_id: {}, index: {:?}, build_cost: {:?}", range.file_handle().file_id(), index, build_cost); part_metrics.inc_build_reader_cost(build_cost); let compat_batch = range.compat_batch(); let mut source = Source::PruneReader(reader); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 9498078ddb..d9407ec247 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -36,7 +36,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; -use crate::read::range::RangeBuilderList; +use crate::read::range::{RangeBuilderList, RowGroupIndex, ALL_ROW_GROUPS}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source}; @@ -160,7 +160,7 @@ impl SeqScan { // by the semaphore. sources = stream_ctx .input - .create_parallel_sources(sources, semaphore.clone())?; + .create_parallel_sources_no_semaphore(sources)?; } let mut builder = MergeReaderBuilder::from_sources(sources); @@ -230,6 +230,7 @@ impl SeqScan { stream_ctx.input.num_files(), )); // Scans each part. + // common_telemetry::info!("DEBUG_SCAN: scan partition range, partition: {}, num_ranges: {}", partition, partition_ranges.len()); for part_range in partition_ranges { let mut sources = Vec::new(); build_sources( @@ -241,6 +242,7 @@ impl SeqScan { &mut sources, ); + // common_telemetry::info!("DEBUG_SCAN: scan part range, partition: {}, num_sources: {}, part_range: {:?}", partition, sources.len(), part_range); let mut reader = Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone()) .await @@ -330,6 +332,12 @@ impl RegionScanner for SeqScan { ranges: Vec>, distinguish_partition_range: bool, ) -> Result<(), BoxedError> { + let num_partitions = ranges.len(); + let available_permits = self.semaphore.available_permits(); + if available_permits < num_partitions { + self.semaphore + .add_permits(num_partitions - available_permits); + } self.properties.partitions = ranges; self.properties.distinguish_partition_range = distinguish_partition_range; Ok(()) diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index 02cd04df87..395c66abf0 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BinaryHeap; use std::sync::Arc; use common_telemetry::debug; @@ -114,16 +115,48 @@ impl ParallelizeScan { /// if the number of ranges is smaller than `expected_partition_num`. But this will /// return at least one partition. fn assign_partition_range( - ranges: Vec, + mut ranges: Vec, expected_partition_num: usize, ) -> Vec> { let actual_partition_num = expected_partition_num.min(ranges.len()).max(1); let mut partition_ranges = vec![vec![]; actual_partition_num]; - // round-robin assignment - for (i, range) in ranges.into_iter().enumerate() { - let partition_idx = i % expected_partition_num; + // Sort ranges by number of rows in descending order. + ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows)); + + #[derive(Eq, PartialEq)] + struct HeapNode { + num_rows: usize, + partition_idx: usize, + } + + impl Ord for HeapNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Reverse for min-heap. + self.num_rows.cmp(&other.num_rows).reverse() + } + } + + impl PartialOrd for HeapNode { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + let mut part_heap = + BinaryHeap::from_iter((0..actual_partition_num).map(|partition_idx| HeapNode { + num_rows: 0, + partition_idx, + })); + + // Assigns the range to the partition with the smallest number of rows. + for range in ranges { + // Safety: actual_partition_num always > 0. + let mut node = part_heap.pop().unwrap(); + let partition_idx = node.partition_idx; + node.num_rows += range.num_rows; partition_ranges[partition_idx].push(range); + part_heap.push(node); } partition_ranges @@ -172,18 +205,18 @@ mod test { ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num); let expected = vec![ vec![ + PartitionRange { + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + num_rows: 250, + identifier: 4, + }, PartitionRange { start: Timestamp::new(0, TimeUnit::Second), end: Timestamp::new(10, TimeUnit::Second), num_rows: 100, identifier: 1, }, - PartitionRange { - start: Timestamp::new(20, TimeUnit::Second), - end: Timestamp::new(30, TimeUnit::Second), - num_rows: 150, - identifier: 3, - }, ], vec![ PartitionRange { @@ -193,10 +226,10 @@ mod test { identifier: 2, }, PartitionRange { - start: Timestamp::new(30, TimeUnit::Second), - end: Timestamp::new(40, TimeUnit::Second), - num_rows: 250, - identifier: 4, + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + num_rows: 150, + identifier: 3, }, ], ]; @@ -204,7 +237,8 @@ mod test { // assign 4 ranges to 5 partitions. Only 4 partitions are returned. let expected_partition_num = 5; - let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num); + let mut result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num); + result.sort_by_key(|ranges| ranges[0].identifier); let expected = vec![ vec![PartitionRange { start: Timestamp::new(0, TimeUnit::Second), @@ -237,4 +271,68 @@ mod test { let result = ParallelizeScan::assign_partition_range(vec![], 5); assert_eq!(result.len(), 1); } + + #[test] + fn test_assign_unbalance_partition_range() { + let ranges = vec![ + PartitionRange { + start: Timestamp::new(0, TimeUnit::Second), + end: Timestamp::new(10, TimeUnit::Second), + num_rows: 100, + identifier: 1, + }, + PartitionRange { + start: Timestamp::new(10, TimeUnit::Second), + end: Timestamp::new(20, TimeUnit::Second), + num_rows: 200, + identifier: 2, + }, + PartitionRange { + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + num_rows: 150, + identifier: 3, + }, + PartitionRange { + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + num_rows: 2500, + identifier: 4, + }, + ]; + + // assign to 2 partitions + let expected_partition_num = 2; + let result = + ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num); + let expected = vec![ + vec![PartitionRange { + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + num_rows: 2500, + identifier: 4, + }], + vec![ + PartitionRange { + start: Timestamp::new(10, TimeUnit::Second), + end: Timestamp::new(20, TimeUnit::Second), + num_rows: 200, + identifier: 2, + }, + PartitionRange { + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + num_rows: 150, + identifier: 3, + }, + PartitionRange { + start: Timestamp::new(0, TimeUnit::Second), + end: Timestamp::new(10, TimeUnit::Second), + num_rows: 100, + identifier: 1, + }, + ], + ]; + assert_eq!(result, expected); + } }