From a00a766869587784736f5b6615334594b8f2ce14 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 3 Dec 2024 21:33:02 +0800 Subject: [PATCH] feat: scanner prepare by request --- src/mito2/src/read/scan_region.rs | 1 - src/mito2/src/read/seq_scan.rs | 42 +++++----- src/mito2/src/read/unordered_scan.rs | 13 +--- src/query/src/optimizer/parallelize_scan.rs | 2 +- src/store-api/src/region_engine.rs | 85 +++++++++++++++------ src/table/src/table/scan.rs | 15 ++-- 6 files changed, 100 insertions(+), 58 deletions(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 025a4333bb..f066796eaa 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -625,7 +625,6 @@ impl ScanInput { return Ok(sources); } - debug_assert!(self.parallelism.parallelism > 1); // Spawn a task for each source. let sources = sources .into_iter() diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 28fff84fdc..4af5f591d2 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -28,7 +28,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; +use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties}; use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; @@ -135,7 +135,8 @@ impl SeqScan { Self::build_reader_from_sources(stream_ctx, sources, None).await } - // TODO(yingwen): Only merge and dedup when num source > 1. + /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel + /// if possible. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn build_reader_from_sources( stream_ctx: &StreamContext, @@ -143,11 +144,12 @@ impl SeqScan { semaphore: Option>, ) -> Result { if let Some(semaphore) = semaphore.as_ref() { - // Read sources in parallel. We always spawn a task so we can control the parallelism - // by the semaphore. - sources = stream_ctx - .input - .create_parallel_sources(sources, semaphore.clone())?; + // Read sources in parallel. + if sources.len() > 1 { + sources = stream_ctx + .input + .create_parallel_sources(sources, semaphore.clone())?; + } } let mut builder = MergeReaderBuilder::from_sources(sources); @@ -194,11 +196,20 @@ impl SeqScan { } let stream_ctx = self.stream_ctx.clone(); - // FIXME(yingwen): 1. Get target partition from prepare. 2. Get parallelism from prepare. - let semaphore = Arc::new(Semaphore::new(self.properties.partitions.len())); + let semaphore = if self.properties.target_partitions() > self.properties.num_partitions() { + // We can use addtional tasks to read the data. This semaphore is partition level. + // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency + // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of + // files in a part range. + Some(Arc::new(Semaphore::new( + self.properties.target_partitions() - self.properties.num_partitions() + 1, + ))) + } else { + Some(Arc::new(Semaphore::new(1))) + }; let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; - let distinguish_range = self.properties.distinguish_partition_range(); + let distinguish_range = self.properties.distinguish_partition_range; let part_metrics = PartitionMetrics::new( self.stream_ctx.input.mapper.metadata().region_id, partition, @@ -230,7 +241,7 @@ impl SeqScan { ); let mut reader = - Self::build_reader_from_sources(&stream_ctx, sources, Some(semaphore.clone())) + Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone()) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -313,13 +324,8 @@ impl RegionScanner for SeqScan { self.scan_partition_impl(partition) } - fn prepare( - &mut self, - ranges: Vec>, - distinguish_partition_range: bool, - ) -> Result<(), BoxedError> { - self.properties.partitions = ranges; - self.properties.distinguish_partition_range = distinguish_partition_range; + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + self.properties.prepare(request); Ok(()) } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index c1ee34b08e..97db9b8659 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -27,7 +27,7 @@ use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; +use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties}; use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::range::RangeBuilderList; @@ -144,7 +144,7 @@ impl UnorderedScan { ); let stream_ctx = self.stream_ctx.clone(); let part_ranges = self.properties.partitions[partition].clone(); - let distinguish_range = self.properties.distinguish_partition_range(); + let distinguish_range = self.properties.distinguish_partition_range; let stream = try_stream! { part_metrics.on_first_poll(); @@ -231,13 +231,8 @@ impl RegionScanner for UnorderedScan { self.stream_ctx.input.mapper.output_schema() } - fn prepare( - &mut self, - ranges: Vec>, - distinguish_partition_range: bool, - ) -> Result<(), BoxedError> { - self.properties.partitions = ranges; - self.properties.distinguish_partition_range = distinguish_partition_range; + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + self.properties.prepare(request); Ok(()) } diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index 0151957424..296cd6c46f 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -94,7 +94,7 @@ impl ParallelizeScan { // update the partition ranges let new_exec = region_scan_exec - .with_new_partitions(partition_ranges) + .with_new_partitions(partition_ranges, expected_partition_num) .map_err(|e| DataFusionError::External(e.into_inner()))?; return Ok(Transformed::yes(Arc::new(new_exec))); } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 8dd706395d..c9b0ac53db 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -206,16 +206,13 @@ pub struct ScannerProperties { /// Whether to yield an empty batch to distinguish partition ranges. pub distinguish_partition_range: bool, + + /// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions. + target_partitions: usize, } impl ScannerProperties { - /// Initialize partitions with given parallelism for scanner. - pub fn with_parallelism(mut self, parallelism: usize) -> Self { - self.partitions = vec![vec![]; parallelism]; - self - } - - /// Set append mode for scanner. + /// Sets append mode for scanner. pub fn with_append_mode(mut self, append_mode: bool) -> Self { self.append_mode = append_mode; self @@ -234,9 +231,24 @@ impl ScannerProperties { append_mode, total_rows, distinguish_partition_range: false, + target_partitions: 0, } } + /// Updates the properties with the given [PrepareRequest]. + pub fn prepare(&mut self, request: PrepareRequest) { + if let Some(ranges) = request.ranges { + self.partitions = ranges; + } + if let Some(distinguish_partition_range) = request.distinguish_partition_range { + self.distinguish_partition_range = distinguish_partition_range; + } + if let Some(target_partitions) = request.target_partitions { + self.target_partitions = target_partitions; + } + } + + /// Returns the number of actual partitions. pub fn num_partitions(&self) -> usize { self.partitions.len() } @@ -249,8 +261,44 @@ impl ScannerProperties { self.total_rows } - pub fn distinguish_partition_range(&self) -> bool { - self.distinguish_partition_range + /// Returns the target partitions of the scanner. If it is not set, returns the number of partitions. + pub fn target_partitions(&self) -> usize { + if self.target_partitions == 0 { + self.num_partitions() + } else { + self.target_partitions + } + } +} + +/// Request to override the scanner properties. +#[derive(Default)] +pub struct PrepareRequest { + /// Assigned partition ranges. + pub ranges: Option>>, + /// Distringuishes partition range by empty batches. + pub distinguish_partition_range: Option, + /// The expected number of target partitions. + pub target_partitions: Option, +} + +impl PrepareRequest { + /// Sets the ranges. + pub fn with_ranges(mut self, ranges: Vec>) -> Self { + self.ranges = Some(ranges); + self + } + + /// Sets the distinguish partition range flag. + pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self { + self.distinguish_partition_range = Some(distinguish_partition_range); + self + } + + /// Sets the target partitions. + pub fn with_target_partitions(mut self, target_partitions: usize) -> Self { + self.target_partitions = Some(target_partitions); + self } } @@ -271,11 +319,7 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Prepares the scanner with the given partition ranges. /// /// This method is for the planner to adjust the scanner's behavior based on the partition ranges. - fn prepare( - &mut self, - ranges: Vec>, - distinguish_partition_range: bool, - ) -> Result<(), BoxedError>; + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>; /// Scans the partition and returns a stream of record batches. /// @@ -431,9 +475,7 @@ impl SinglePartitionScanner { Self { stream: Mutex::new(Some(stream)), schema, - properties: ScannerProperties::default() - .with_parallelism(1) - .with_append_mode(append_mode), + properties: ScannerProperties::default().with_append_mode(append_mode), metadata, } } @@ -454,13 +496,8 @@ impl RegionScanner for SinglePartitionScanner { self.schema.clone() } - fn prepare( - &mut self, - ranges: Vec>, - distinguish_partition_range: bool, - ) -> Result<(), BoxedError> { - self.properties.partitions = ranges; - self.properties.distinguish_partition_range = distinguish_partition_range; + fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + self.properties.prepare(request); Ok(()) } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 0eac7c0c35..e4b47fa4fb 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -35,7 +35,7 @@ use datafusion_common::{ColumnStatistics, DataFusionError, Statistics}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; use futures::{Stream, StreamExt}; -use store_api::region_engine::{PartitionRange, RegionScannerRef}; +use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScannerRef}; use crate::table::metrics::StreamMetrics; @@ -112,6 +112,7 @@ impl RegionScanExec { pub fn with_new_partitions( &self, partitions: Vec>, + target_partitions: usize, ) -> Result { if self.is_partition_set { warn!("Setting partition ranges more than once for RegionScanExec"); @@ -123,8 +124,11 @@ impl RegionScanExec { { let mut scanner = self.scanner.lock().unwrap(); - let distinguish_partition_range = scanner.properties().distinguish_partition_range(); - scanner.prepare(partitions, distinguish_partition_range)?; + scanner.prepare( + PrepareRequest::default() + .with_ranges(partitions) + .with_target_partitions(target_partitions), + )?; } Ok(Self { @@ -141,9 +145,10 @@ impl RegionScanExec { pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) { let mut scanner = self.scanner.lock().unwrap(); - let partition_ranges = scanner.properties().partitions.clone(); // set distinguish_partition_range won't fail - let _ = scanner.prepare(partition_ranges, distinguish_partition_range); + let _ = scanner.prepare( + PrepareRequest::default().with_distinguish_partition_range(distinguish_partition_range), + ); } pub fn time_index(&self) -> String {