diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 454188141d..c5f1111d37 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -69,7 +69,6 @@ fn test_load_datanode_example_config() { region_engine: vec![ RegionEngineConfig::Mito(MitoConfig { auto_flush_interval: Duration::from_secs(3600), - scan_parallelism: 0, experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)), ..Default::default() }), @@ -205,7 +204,6 @@ fn test_load_standalone_example_config() { RegionEngineConfig::Mito(MitoConfig { auto_flush_interval: Duration::from_secs(3600), experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)), - scan_parallelism: 0, ..Default::default() }), RegionEngineConfig::File(EngineConfig {}), diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 8cd2b08f2e..d251ea77ce 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -30,7 +30,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5); /// Default channel size for parallel scan task. -const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32; +pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32; // Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8; @@ -104,11 +104,6 @@ pub struct MitoConfig { // Other configs: /// Buffer size for SST writing. pub sst_write_buffer_size: ReadableSize, - /// Parallelism to scan a region (default: 1/4 of cpu cores). - /// - 0: using the default value (1/4 of cpu cores). - /// - 1: scan in current thread. - /// - n: scan in parallelism n. - pub scan_parallelism: usize, /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32). pub parallel_scan_channel_size: usize, /// Whether to allow stale entries read during replay. @@ -153,7 +148,6 @@ impl Default for MitoConfig { experimental_write_cache_size: ReadableSize::gb(1), experimental_write_cache_ttl: None, sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, - scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, index: IndexConfig::default(), @@ -226,11 +220,6 @@ impl MitoConfig { ); } - // Use default value if `scan_parallelism` is 0. - if self.scan_parallelism == 0 { - self.scan_parallelism = divide_num_cpus(4); - } - if self.parallel_scan_channel_size < 1 { self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE; warn!( diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index c60b7c4107..a518da3253 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -90,7 +90,7 @@ use crate::error::{ }; use crate::manifest::action::RegionEdit; use crate::metrics::HANDLE_REQUEST_ELAPSED; -use crate::read::scan_region::{ScanParallelism, ScanRegion, Scanner}; +use crate::read::scan_region::{ScanRegion, Scanner}; use crate::request::{RegionEditRequest, WorkerRequest}; use crate::wal::entry_distributor::{ build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, @@ -171,19 +171,9 @@ impl MitoEngine { self.scan_region(region_id, request)?.scanner() } - /// Returns a region scanner to scan the region for `request`. - fn region_scanner( - &self, - region_id: RegionId, - request: ScanRequest, - ) -> Result { - let scanner = self.scanner(region_id, request)?; - scanner.region_scanner() - } - /// Scans a region. fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { - self.inner.handle_query(region_id, request) + self.inner.scan_region(region_id, request) } /// Edit region's metadata by [RegionEdit] directly. Use with care. @@ -423,7 +413,7 @@ impl EngineInner { } /// Handles the scan `request` and returns a [ScanRegion]. - fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result { + fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { let query_start = Instant::now(); // Reading a region doesn't need to go through the region worker thread. let region = self @@ -433,14 +423,10 @@ impl EngineInner { let version = region.version(); // Get cache. let cache_manager = self.workers.cache_manager(); - let scan_parallelism = ScanParallelism { - parallelism: self.config.scan_parallelism, - channel_size: self.config.parallel_scan_channel_size, - }; let scan_region = ScanRegion::new(version, region.access_layer.clone(), request, cache_manager) - .with_parallelism(scan_parallelism) + .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) .with_start_time(query_start); @@ -538,7 +524,9 @@ impl RegionEngine for MitoEngine { region_id: RegionId, request: ScanRequest, ) -> Result { - self.region_scanner(region_id, request) + self.scan_region(region_id, request) + .map_err(BoxedError::new)? + .region_scanner() .map_err(BoxedError::new) } diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index ab8515aa13..c9f61c5db3 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -92,7 +92,6 @@ async fn test_append_mode_compaction() { let mut env = TestEnv::new(); let engine = env .create_engine(MitoConfig { - scan_parallelism: 2, ..Default::default() }) .await; @@ -176,19 +175,19 @@ async fn test_append_mode_compaction() { | b | 1.0 | 1970-01-01T00:00:01 | +-------+---------+---------------------+"; // Scans in parallel. - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); assert_eq!(2, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); + scanner.set_target_partitions(2); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); - // Reopens engine with parallelism 1. + // Reopens engine. let engine = env .reopen_engine( engine, MitoConfig { - scan_parallelism: 1, ..Default::default() }, ) diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs index 08f4d05650..e74aba5655 100644 --- a/src/mito2/src/engine/merge_mode_test.rs +++ b/src/mito2/src/engine/merge_mode_test.rs @@ -92,7 +92,6 @@ async fn test_merge_mode_compaction() { let mut env = TestEnv::new(); let engine = env .create_engine(MitoConfig { - scan_parallelism: 2, ..Default::default() }) .await; @@ -190,19 +189,19 @@ async fn test_merge_mode_compaction() { | a | | 13.0 | 1970-01-01T00:00:03 | +-------+---------+---------+---------------------+"; // Scans in parallel. - let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); + let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); assert_eq!(1, scanner.num_files()); assert_eq!(1, scanner.num_memtables()); + scanner.set_target_partitions(2); let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"])); - // Reopens engine with parallelism 1. + // Reopens engine. let engine = env .reopen_engine( engine, MitoConfig { - scan_parallelism: 1, ..Default::default() }, ) diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index 53cc0dca8f..3d5dab3540 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -37,7 +37,6 @@ async fn scan_in_parallel( ) { let engine = env .open_engine(MitoConfig { - scan_parallelism: parallelism, parallel_scan_channel_size: channel_size, ..Default::default() }) @@ -57,7 +56,9 @@ async fn scan_in_parallel( .unwrap(); let request = ScanRequest::default(); - let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let mut scanner = engine.scanner(region_id, request).unwrap(); + scanner.set_target_partitions(parallelism); + let stream = scanner.scan().await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index f066796eaa..471cc1a8e5 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -33,6 +33,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::file_cache::FileCacheRef; use crate::cache::CacheManagerRef; +use crate::config::DEFAULT_SCAN_CHANNEL_SIZE; use crate::error::Result; use crate::memtable::MemtableRef; use crate::metrics::READ_SST_COUNT; @@ -68,15 +69,6 @@ impl Scanner { Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await, } } - - /// Returns a [RegionScanner] to scan the region. - #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] - pub(crate) fn region_scanner(self) -> Result { - match self { - Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)), - Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)), - } - } } #[cfg(test)] @@ -104,6 +96,17 @@ impl Scanner { Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(), } } + + /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner. + pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) { + use store_api::region_engine::{PrepareRequest, RegionScanner}; + + let request = PrepareRequest::default().with_target_partitions(target_partitions); + match self { + Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(), + Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(), + } + } } #[cfg_attr(doc, aquamarine::aquamarine)] @@ -165,8 +168,8 @@ pub(crate) struct ScanRegion { request: ScanRequest, /// Cache. cache_manager: CacheManagerRef, - /// Parallelism to scan. - parallelism: ScanParallelism, + /// Capacity of the channel to send data from parallel scan tasks to the main task. + parallel_scan_channel_size: usize, /// Whether to ignore inverted index. ignore_inverted_index: bool, /// Whether to ignore fulltext index. @@ -188,17 +191,20 @@ impl ScanRegion { access_layer, request, cache_manager, - parallelism: ScanParallelism::default(), + parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, ignore_inverted_index: false, ignore_fulltext_index: false, start_time: None, } } - /// Sets parallelism. + /// Sets parallel scan task channel size. #[must_use] - pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self { - self.parallelism = parallelism; + pub(crate) fn with_parallel_scan_channel_size( + mut self, + parallel_scan_channel_size: usize, + ) -> Self { + self.parallel_scan_channel_size = parallel_scan_channel_size; self } @@ -224,7 +230,7 @@ impl ScanRegion { /// Returns a [Scanner] to scan the region. pub(crate) fn scanner(self) -> Result { - if self.version.options.append_mode && self.request.series_row_selector.is_none() { + if self.use_unordered_scan() { // If table is append only and there is no series row selector, we use unordered scan in query. // We still use seq scan in compaction. self.unordered_scan().map(Scanner::Unordered) @@ -233,6 +239,16 @@ impl ScanRegion { } } + /// Returns a [RegionScanner] to scan the region. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] + pub(crate) fn region_scanner(self) -> Result { + if self.use_unordered_scan() { + self.unordered_scan().map(|scanner| Box::new(scanner) as _) + } else { + self.seq_scan().map(|scanner| Box::new(scanner) as _) + } + } + /// Scan sequentially. pub(crate) fn seq_scan(self) -> Result { let input = self.scan_input(true)?; @@ -251,6 +267,13 @@ impl ScanRegion { Ok(SeqScan::new(input, false)) } + /// Returns true if the region can use unordered scan for current request. + fn use_unordered_scan(&self) -> bool { + // If table is append only and there is no series row selector, we use unordered scan in query. + // We still use seq scan in compaction. + self.version.options.append_mode && self.request.series_row_selector.is_none() + } + /// Creates a scan input. fn scan_input(mut self, filter_deleted: bool) -> Result { let time_range = self.build_time_range_predicate(); @@ -314,7 +337,7 @@ impl ScanRegion { .with_cache(self.cache_manager) .with_inverted_index_applier(inverted_index_applier) .with_fulltext_index_applier(fulltext_index_applier) - .with_parallelism(self.parallelism) + .with_parallel_scan_channel_size(self.parallel_scan_channel_size) .with_start_time(self.start_time) .with_append_mode(self.version.options.append_mode) .with_filter_deleted(filter_deleted) @@ -428,15 +451,6 @@ impl ScanRegion { } } -/// Config for parallel scan. -#[derive(Debug, Clone, Copy, Default)] -pub(crate) struct ScanParallelism { - /// Number of tasks expect to spawn to read data. - pub(crate) parallelism: usize, - /// Channel size to send batches. Only takes effect when the parallelism > 1. - pub(crate) channel_size: usize, -} - /// Returns true if the time range of a SST `file` matches the `predicate`. fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { if predicate == &TimestampRange::min_to_max() { @@ -466,8 +480,8 @@ pub(crate) struct ScanInput { pub(crate) cache_manager: CacheManagerRef, /// Ignores file not found error. ignore_file_not_found: bool, - /// Parallelism to scan data. - pub(crate) parallelism: ScanParallelism, + /// Capacity of the channel to send data from parallel scan tasks to the main task. + pub(crate) parallel_scan_channel_size: usize, /// Index appliers. inverted_index_applier: Option, fulltext_index_applier: Option, @@ -496,7 +510,7 @@ impl ScanInput { files: Vec::new(), cache_manager: CacheManagerRef::default(), ignore_file_not_found: false, - parallelism: ScanParallelism::default(), + parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, inverted_index_applier: None, fulltext_index_applier: None, query_start: None, @@ -549,10 +563,13 @@ impl ScanInput { self } - /// Sets scan parallelism. + /// Sets scan task channel size. #[must_use] - pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self { - self.parallelism = parallelism; + pub(crate) fn with_parallel_scan_channel_size( + mut self, + parallel_scan_channel_size: usize, + ) -> Self { + self.parallel_scan_channel_size = parallel_scan_channel_size; self } @@ -629,7 +646,7 @@ impl ScanInput { let sources = sources .into_iter() .map(|source| { - let (sender, receiver) = mpsc::channel(self.parallelism.channel_size); + let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size); self.spawn_scan_task(source, semaphore.clone(), sender); let stream = Box::pin(ReceiverStream::new(receiver)); Source::Stream(stream) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 4af5f591d2..5dd80f3d52 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -197,7 +197,8 @@ impl SeqScan { let stream_ctx = self.stream_ctx.clone(); let semaphore = if self.properties.target_partitions() > self.properties.num_partitions() { - // We can use addtional tasks to read the data. This semaphore is partition level. + // We can use addtional tasks to read the data if we have more target partitions than acutal partitions. + // This semaphore is partition level. // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of // files in a part range.