From adc374c9a3ff35d02d7312f4bc73aff6aeaf5d2a Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 3 Dec 2024 18:57:04 +0800 Subject: [PATCH] feat: only split in non-compaction seq scan --- src/mito2/src/compaction.rs | 5 +- src/mito2/src/read/range.rs | 193 +++++++++----------- src/mito2/src/read/scan_region.rs | 66 +------ src/mito2/src/read/seq_scan.rs | 45 ++--- src/query/src/optimizer/parallelize_scan.rs | 37 ++-- 5 files changed, 122 insertions(+), 224 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 44aa03a67d..873b1dced1 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -589,10 +589,7 @@ impl<'a> CompactionSstReaderBuilder<'a> { scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?); } - SeqScan::new(scan_input) - .with_compaction() - .build_reader() - .await + SeqScan::new(scan_input, true).build_reader().await } } diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index c32c0223ba..3a4f888b59 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -32,7 +32,7 @@ use crate::sst::parquet::format::parquet_row_group_time_range; use crate::sst::parquet::reader::ReaderMetrics; use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; -pub(crate) const ALL_ROW_GROUPS: i64 = -1; +const ALL_ROW_GROUPS: i64 = -1; /// Index to access a row group. #[derive(Debug, Clone, Copy, PartialEq)] @@ -81,25 +81,18 @@ impl RangeMeta { } /// Creates a list of ranges from the `input` for seq scan. - pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec { + /// If `compaction` is true, it doesn't split the ranges. + pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec { let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len()); Self::push_seq_mem_ranges(&input.memtables, &mut ranges); Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges); let ranges = group_ranges_for_seq_scan(ranges); - // common_telemetry::info!( - // "DEBUG_SCAN: seq_scan_ranges before split, num_ranges: {}", - // ranges.len(), - // ); - // maybe_split_ranges_for_seq_scan(ranges) - // let ranges = - // maybe_split_ranges_for_seq_scan_groups(ranges, input.memtables.len(), &input.files); - // common_telemetry::info!( - // "DEBUG_SCAN: seq_scan_ranges after split, num_ranges: {}, ranges: {:?}", - // ranges.len(), - // ranges, - // ); - ranges + if compaction { + // We don't split ranges in compaction. + return ranges; + } + maybe_split_ranges_for_seq_scan(ranges, input.memtables.len(), &input.files) } /// Creates a list of ranges from the `input` for unordered scan. @@ -117,13 +110,13 @@ impl RangeMeta { } /// Returns true if the time range of given `meta` overlaps with the time range of this meta. - pub(crate) fn overlaps(&self, meta: &RangeMeta) -> bool { + fn overlaps(&self, meta: &RangeMeta) -> bool { overlaps(&self.time_range, &meta.time_range) } /// Merges given `meta` to this meta. /// It assumes that the time ranges overlap and they don't have the same file or memtable index. - pub(crate) fn merge(&mut self, mut other: RangeMeta) { + fn merge(&mut self, mut other: RangeMeta) { debug_assert!(self.overlaps(&other)); debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx))); debug_assert!(self @@ -142,44 +135,16 @@ impl RangeMeta { /// Returns true if we can split the range into multiple smaller ranges and /// still preserve the order for [SeqScan]. - pub(crate) fn can_split_preserve_order(&self) -> bool { - // Only one source and multiple row groups. - if self.indices.len() == 1 { - if self.row_group_indices.len() > 1 { - return true; - } else if self.row_group_indices.len() == 1 - && self.row_group_indices[0].row_group_index == ALL_ROW_GROUPS - { - return true; - } - } - false - // fixme(yingwen): Remove this. - // self.indices.len() == 1 && self.row_group_indices.len() > 1 + fn can_split_preserve_order(&self) -> bool { + self.indices.len() == 1 } /// Splits the range if it can preserve the order. - pub(crate) fn maybe_split(self, output: &mut Vec) { - if self.can_split_preserve_order() { - output.reserve(self.row_group_indices.len()); - let num_rows = self.num_rows / self.row_group_indices.len(); - // Splits by row group. - for index in self.row_group_indices { - output.push(RangeMeta { - time_range: self.time_range, - indices: self.indices.clone(), - row_group_indices: smallvec![index], - num_rows, - }); - } - } else { - output.push(self); - } - } - - /// Splits the range if it can preserve the order. - pub(crate) fn maybe_split_groups(self, num_row_groups: u64, output: &mut Vec) { - if self.can_split_preserve_order() && num_row_groups > 0 { + /// The range should only has one file or memtable. `num_row_groups` should be + /// the number of row groups in the file or memtable. + fn maybe_split(self, num_row_groups: u64, output: &mut Vec) { + debug_assert!(self.can_split_preserve_order()); + if self.can_split_preserve_order() && num_row_groups > 1 { output.reserve(self.row_group_indices.len()); let num_rows = self.num_rows / num_row_groups as usize; // Splits by row group. @@ -317,31 +282,6 @@ impl RangeMeta { }], num_rows: file.meta_ref().num_rows as usize, }); - // if file.meta_ref().num_row_groups > 0 { - // // All row groups share the same time range. - // let row_group_indices = (0..file.meta_ref().num_row_groups) - // .map(|row_group_index| RowGroupIndex { - // index: file_index, - // row_group_index: row_group_index as i64, - // }) - // .collect(); - // ranges.push(RangeMeta { - // time_range: file.time_range(), - // indices: smallvec![file_index], - // row_group_indices, - // num_rows: file.meta_ref().num_rows as usize, - // }); - // } else { - // ranges.push(RangeMeta { - // time_range: file.time_range(), - // indices: smallvec![file_index], - // row_group_indices: smallvec![RowGroupIndex { - // index: file_index, - // row_group_index: ALL_ROW_GROUPS, - // }], - // num_rows: file.meta_ref().num_rows as usize, - // }); - // } } } } @@ -386,18 +326,7 @@ fn group_ranges_for_seq_scan(mut ranges: Vec) -> Vec { /// Splits the range into multiple smaller ranges. /// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()]. -fn maybe_split_ranges_for_seq_scan(ranges: Vec) -> Vec { - let mut new_ranges = Vec::with_capacity(ranges.len()); - for range in ranges { - range.maybe_split(&mut new_ranges); - } - - new_ranges -} - -/// Splits the range into multiple smaller ranges. -/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()]. -fn maybe_split_ranges_for_seq_scan_groups( +fn maybe_split_ranges_for_seq_scan( ranges: Vec, num_memtables: usize, files: &[FileHandle], @@ -405,11 +334,13 @@ fn maybe_split_ranges_for_seq_scan_groups( let mut new_ranges = Vec::with_capacity(ranges.len()); for range in ranges { if range.indices[0] < num_memtables { - range.maybe_split(&mut new_ranges); + // We don't split memtables. + new_ranges.push(range); } else { + // Get meta for this file. let index = range.indices[0] - num_memtables; let file = &files[index]; - range.maybe_split_groups(file.meta_ref().num_row_groups, &mut new_ranges); + range.maybe_split(file.meta_ref().num_row_groups, &mut new_ranges); } } @@ -532,11 +463,6 @@ impl RangeBuilderList { Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges), None => { let builder = input.prune_file(file_index, reader_metrics).await?; - // common_telemetry::info!( - // "DEBUG_SCAN: build file ranges, file_index: {}, build_cost: {:?}", - // file_index, - // reader_metrics.build_cost - // ); builder.build_ranges(index.row_group_index, &mut ranges); self.set_file_builder(file_index, Arc::new(builder)); } @@ -581,6 +507,8 @@ mod tests { use common_time::Timestamp; use super::*; + use crate::sst::file::{FileId, FileMeta}; + use crate::test_util::new_noop_file_purger; type Output = (Vec, i64, i64); @@ -736,7 +664,7 @@ mod tests { assert!(range.can_split_preserve_order()); let mut output = Vec::new(); - range.maybe_split(&mut output); + range.maybe_split(2, &mut output); assert_eq!( output, @@ -783,26 +711,60 @@ mod tests { assert!(!range.can_split_preserve_order()); let mut output = Vec::new(); - range.maybe_split(&mut output); + range.maybe_split(2, &mut output); assert_eq!(1, output.len()); } + fn new_file_handle( + file_id: FileId, + start_ts: i64, + end_ts: i64, + num_row_groups: u64, + ) -> FileHandle { + let file_purger = new_noop_file_purger(); + FileHandle::new( + FileMeta { + region_id: 0.into(), + file_id, + time_range: ( + Timestamp::new_second(start_ts), + Timestamp::new_second(end_ts), + ), + level: 0, + file_size: 0, + available_indexes: Default::default(), + index_file_size: 0, + num_rows: 0, + num_row_groups, + }, + file_purger, + ) + } + #[test] fn test_maybe_split_ranges() { + let files = vec![ + new_file_handle(FileId::random(), 1000, 2000, 2), + new_file_handle(FileId::random(), 3000, 4000, 1), + new_file_handle(FileId::random(), 3000, 4000, 1), + ]; let ranges = vec![ + RangeMeta { + time_range: (Timestamp::new_second(0), Timestamp::new_second(500)), + indices: smallvec![0], + row_group_indices: smallvec![RowGroupIndex { + index: 0, + row_group_index: 0, + },], + num_rows: 4, + }, RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), indices: smallvec![1], - row_group_indices: smallvec![ - RowGroupIndex { - index: 1, - row_group_index: 0 - }, - RowGroupIndex { - index: 1, - row_group_index: 1 - } - ], + row_group_indices: smallvec![RowGroupIndex { + index: 1, + row_group_index: ALL_ROW_GROUPS, + },], num_rows: 4, }, RangeMeta { @@ -811,20 +773,29 @@ mod tests { row_group_indices: smallvec![ RowGroupIndex { index: 2, - row_group_index: 0 + row_group_index: ALL_ROW_GROUPS, }, RowGroupIndex { index: 3, - row_group_index: 0 + row_group_index: ALL_ROW_GROUPS, } ], num_rows: 5, }, ]; - let output = maybe_split_ranges_for_seq_scan(ranges); + let output = maybe_split_ranges_for_seq_scan(ranges, 1, &files); assert_eq!( output, vec![ + RangeMeta { + time_range: (Timestamp::new_second(0), Timestamp::new_second(500)), + indices: smallvec![0], + row_group_indices: smallvec![RowGroupIndex { + index: 0, + row_group_index: 0 + },], + num_rows: 4, + }, RangeMeta { time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)), indices: smallvec![1], diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 3a42ec42c9..2624f29547 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -236,7 +236,7 @@ impl ScanRegion { /// Scan sequentially. pub(crate) fn seq_scan(self) -> Result { let input = self.scan_input(true)?; - Ok(SeqScan::new(input)) + Ok(SeqScan::new(input, false)) } /// Unordered scan. @@ -248,7 +248,7 @@ impl ScanRegion { #[cfg(test)] pub(crate) fn scan_without_filter_deleted(self) -> Result { let input = self.scan_input(false)?; - Ok(SeqScan::new(input)) + Ok(SeqScan::new(input, false)) } /// Creates a scan input. @@ -621,6 +621,10 @@ impl ScanInput { sources: Vec, semaphore: Arc, ) -> Result> { + if sources.len() > 1 { + return Ok(sources); + } + debug_assert!(self.parallelism.parallelism > 1); // Spawn a task for each source. let sources = sources @@ -635,55 +639,6 @@ impl ScanInput { Ok(sources) } - /// Scans sources in parallel. - /// - /// # Panics if the input doesn't allow parallel scan. - pub(crate) fn create_parallel_sources_no_semaphore( - &self, - sources: Vec, - ) -> Result> { - // Spawn a task for each source. - let sources = sources - .into_iter() - .map(|source| { - let (sender, receiver) = mpsc::channel(self.parallelism.channel_size); - self.spawn_scan_task_no_semaphore(source, sender); - let stream = Box::pin(ReceiverStream::new(receiver)); - Source::Stream(stream) - }) - .collect(); - Ok(sources) - } - - /// Scans the input source in another task and sends batches to the sender. - pub(crate) fn spawn_scan_task_no_semaphore( - &self, - mut input: Source, - sender: mpsc::Sender>, - ) { - common_runtime::spawn_global(async move { - loop { - // We release the permit before sending result to avoid the task waiting on - // the channel with the permit held. - let maybe_batch = { - // // Safety: We never close the semaphore. - // let _permit = semaphore.acquire().await.unwrap(); - input.next_batch().await - }; - match maybe_batch { - Ok(Some(batch)) => { - let _ = sender.send(Ok(batch)).await; - } - Ok(None) => break, - Err(e) => { - let _ = sender.send(Err(e)).await; - break; - } - } - } - }); - } - /// Prunes a memtable to scan and returns the builder to build readers. pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder { let memtable = &self.memtables[mem_index]; @@ -810,14 +765,9 @@ pub(crate) struct StreamContext { impl StreamContext { /// Creates a new [StreamContext] for [SeqScan]. - pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self { + pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self { let query_start = input.query_start.unwrap_or_else(Instant::now); - let ranges = RangeMeta::seq_scan_ranges(&input); - // common_telemetry::info!( - // "DEBUG_SCAN: seq_scan_ctx, num_ranges: {}, ranges: {:?}", - // ranges.len(), - // ranges - // ); + let ranges = RangeMeta::seq_scan_ranges(&input, compaction); READ_SST_COUNT.observe(input.num_files() as f64); Self { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index d9407ec247..6c12be2704 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -36,7 +36,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; -use crate::read::range::{RangeBuilderList, RowGroupIndex, ALL_ROW_GROUPS}; +use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source}; @@ -51,39 +51,27 @@ pub struct SeqScan { properties: ScannerProperties, /// Context of streams. stream_ctx: Arc, - /// Semaphore to control scan parallelism of files. - /// Streams created by the scanner share the same semaphore. - semaphore: Arc, /// The scanner is used for compaction. compaction: bool, } impl SeqScan { - /// Creates a new [SeqScan]. - pub(crate) fn new(input: ScanInput) -> Self { - // TODO(yingwen): Set permits according to partition num. But we need to support file - // level parallelism. - let parallelism = input.parallelism.parallelism.max(1); + /// Creates a new [SeqScan] with the given input and compaction flag. + /// If `compaction` is true, the scanner will not attempt to split ranges. + pub(crate) fn new(input: ScanInput, compaction: bool) -> Self { let mut properties = ScannerProperties::default() .with_append_mode(input.append_mode) .with_total_rows(input.total_rows()); - let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input)); + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction)); properties.partitions = vec![stream_ctx.partition_ranges()]; Self { properties, stream_ctx, - semaphore: Arc::new(Semaphore::new(parallelism)), compaction: false, } } - /// Sets the scanner to be used for compaction. - pub(crate) fn with_compaction(mut self) -> Self { - self.compaction = true; - self - } - /// Builds a stream for the query. /// /// The returned stream is not partitioned and will contains all the data. If want @@ -115,7 +103,6 @@ impl SeqScan { let reader = Self::build_all_merge_reader( &self.stream_ctx, partition_ranges, - self.semaphore.clone(), self.compaction, &part_metrics, ) @@ -127,7 +114,6 @@ impl SeqScan { async fn build_all_merge_reader( stream_ctx: &Arc, partition_ranges: &[PartitionRange], - semaphore: Arc, compaction: bool, part_metrics: &PartitionMetrics, ) -> Result { @@ -146,21 +132,21 @@ impl SeqScan { &mut sources, ); } - Self::build_reader_from_sources(stream_ctx, sources, semaphore).await + Self::build_reader_from_sources(stream_ctx, sources, None).await } #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn build_reader_from_sources( stream_ctx: &StreamContext, mut sources: Vec, - semaphore: Arc, + semaphore: Option>, ) -> Result { - if stream_ctx.input.parallelism.parallelism > 1 { + if let Some(semaphore) = semaphore.as_ref() { // Read sources in parallel. We always spawn a task so we can control the parallelism // by the semaphore. sources = stream_ctx .input - .create_parallel_sources_no_semaphore(sources)?; + .create_parallel_sources(sources, semaphore.clone())?; } let mut builder = MergeReaderBuilder::from_sources(sources); @@ -207,7 +193,8 @@ impl SeqScan { } let stream_ctx = self.stream_ctx.clone(); - let semaphore = self.semaphore.clone(); + // FIXME(yingwen): Get target partition from prepare. + let semaphore = Arc::new(Semaphore::new(self.properties.partitions.len())); let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; let distinguish_range = self.properties.distinguish_partition_range(); @@ -230,7 +217,6 @@ impl SeqScan { stream_ctx.input.num_files(), )); // Scans each part. - // common_telemetry::info!("DEBUG_SCAN: scan partition range, partition: {}, num_ranges: {}", partition, partition_ranges.len()); for part_range in partition_ranges { let mut sources = Vec::new(); build_sources( @@ -242,9 +228,8 @@ impl SeqScan { &mut sources, ); - // common_telemetry::info!("DEBUG_SCAN: scan part range, partition: {}, num_sources: {}, part_range: {:?}", partition, sources.len(), part_range); let mut reader = - Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone()) + Self::build_reader_from_sources(&stream_ctx, sources, Some(semaphore.clone())) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -332,12 +317,6 @@ impl RegionScanner for SeqScan { ranges: Vec>, distinguish_partition_range: bool, ) -> Result<(), BoxedError> { - let num_partitions = ranges.len(); - let available_permits = self.semaphore.available_permits(); - if available_permits < num_partitions { - self.semaphore - .add_permits(num_partitions - available_permits); - } self.properties.partitions = ranges; self.properties.distinguish_partition_range = distinguish_partition_range; Ok(()) diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index ec5d813eac..0151957424 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -251,14 +251,13 @@ mod test { // assign 4 ranges to 5 partitions. Only 4 partitions are returned. let expected_partition_num = 5; - let mut result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num); - result.sort_by_key(|ranges| ranges[0].identifier); + let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num); let expected = vec![ vec![PartitionRange { - start: Timestamp::new(0, TimeUnit::Second), - end: Timestamp::new(10, TimeUnit::Second), - num_rows: 100, - identifier: 1, + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + num_rows: 250, + identifier: 4, }], vec![PartitionRange { start: Timestamp::new(10, TimeUnit::Second), @@ -266,18 +265,20 @@ mod test { num_rows: 200, identifier: 2, }], - vec![PartitionRange { - start: Timestamp::new(20, TimeUnit::Second), - end: Timestamp::new(30, TimeUnit::Second), - num_rows: 150, - identifier: 3, - }], - vec![PartitionRange { - start: Timestamp::new(30, TimeUnit::Second), - end: Timestamp::new(40, TimeUnit::Second), - num_rows: 250, - identifier: 4, - }], + vec![ + PartitionRange { + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + num_rows: 150, + identifier: 3, + }, + PartitionRange { + start: Timestamp::new(0, TimeUnit::Second), + end: Timestamp::new(10, TimeUnit::Second), + num_rows: 100, + identifier: 1, + }, + ], ]; assert_eq!(result, expected);