diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index edb7d3dd0d..26c5267a66 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -68,6 +68,8 @@ pub struct MitoConfig { // Other configs: /// Buffer size for SST writing. pub sst_write_buffer_size: ReadableSize, + /// Scan parallelism. + pub scan_parallelism: usize, } impl Default for MitoConfig { @@ -86,6 +88,7 @@ impl Default for MitoConfig { vector_cache_size: ReadableSize::mb(512), page_cache_size: ReadableSize::mb(512), sst_write_buffer_size: ReadableSize::mb(8), + scan_parallelism: 0, } } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 558dec2a6c..e924b616c7 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -115,6 +115,8 @@ impl MitoEngine { struct EngineInner { /// Region workers group. workers: WorkerGroup, + /// Parallelism to scan data. + scan_parallelism: usize, } impl EngineInner { @@ -124,8 +126,10 @@ impl EngineInner { log_store: Arc, object_store_manager: ObjectStoreManagerRef, ) -> EngineInner { + let scan_parallelism = config.scan_parallelism; EngineInner { workers: WorkerGroup::start(config, log_store, object_store_manager), + scan_parallelism, } } @@ -173,7 +177,8 @@ impl EngineInner { region.access_layer.clone(), request, Some(cache_manager), - ); + ) + .parallelism(self.scan_parallelism); scan_region.scanner() } @@ -299,6 +304,7 @@ impl MitoEngine { listener: Option, ) -> MitoEngine { config.sanitize(); + let scan_parallelism = config.scan_parallelism; MitoEngine { inner: Arc::new(EngineInner { @@ -309,6 +315,7 @@ impl MitoEngine { write_buffer_manager, listener, ), + scan_parallelism, }), } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 2238573248..7ddb3447f2 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -115,6 +115,8 @@ pub(crate) struct ScanRegion { request: ScanRequest, /// Cache. cache_manager: Option, + /// Parallelism to scan. + parallelism: usize, } impl ScanRegion { @@ -130,9 +132,17 @@ impl ScanRegion { access_layer, request, cache_manager, + parallelism: 0, } } + /// Sets parallelism. + #[must_use] + pub(crate) fn parallelism(mut self, parallelism: usize) -> Self { + self.parallelism = parallelism; + self + } + /// Returns a [Scanner] to scan the region. pub(crate) fn scanner(self) -> Result { self.seq_scan().map(Scanner::Seq) @@ -196,7 +206,8 @@ impl ScanRegion { .with_predicate(Some(predicate)) .with_memtables(memtables) .with_files(files) - .with_cache(self.cache_manager); + .with_cache(self.cache_manager) + .with_parallelism(self.parallelism); Ok(seq_scan) }