feat: scan region by parallelism in config

This commit is contained in:
evenyag
2023-11-29 17:58:50 +08:00
parent 1d44c4e236
commit cb7ed6ad58
3 changed files with 23 additions and 2 deletions

View File

@@ -68,6 +68,8 @@ pub struct MitoConfig {
// Other configs:
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
/// Scan parallelism.
pub scan_parallelism: usize,
}
impl Default for MitoConfig {
@@ -86,6 +88,7 @@ impl Default for MitoConfig {
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
sst_write_buffer_size: ReadableSize::mb(8),
scan_parallelism: 0,
}
}
}

View File

@@ -115,6 +115,8 @@ impl MitoEngine {
struct EngineInner {
/// Region workers group.
workers: WorkerGroup,
/// Parallelism to scan data.
scan_parallelism: usize,
}
impl EngineInner {
@@ -124,8 +126,10 @@ impl EngineInner {
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
) -> EngineInner {
let scan_parallelism = config.scan_parallelism;
EngineInner {
workers: WorkerGroup::start(config, log_store, object_store_manager),
scan_parallelism,
}
}
@@ -173,7 +177,8 @@ impl EngineInner {
region.access_layer.clone(),
request,
Some(cache_manager),
);
)
.parallelism(self.scan_parallelism);
scan_region.scanner()
}
@@ -299,6 +304,7 @@ impl MitoEngine {
listener: Option<crate::engine::listener::EventListenerRef>,
) -> MitoEngine {
config.sanitize();
let scan_parallelism = config.scan_parallelism;
MitoEngine {
inner: Arc::new(EngineInner {
@@ -309,6 +315,7 @@ impl MitoEngine {
write_buffer_manager,
listener,
),
scan_parallelism,
}),
}
}

View File

@@ -115,6 +115,8 @@ pub(crate) struct ScanRegion {
request: ScanRequest,
/// Cache.
cache_manager: Option<CacheManagerRef>,
/// Parallelism to scan.
parallelism: usize,
}
impl ScanRegion {
@@ -130,9 +132,17 @@ impl ScanRegion {
access_layer,
request,
cache_manager,
parallelism: 0,
}
}
/// Sets parallelism.
#[must_use]
pub(crate) fn parallelism(mut self, parallelism: usize) -> Self {
self.parallelism = parallelism;
self
}
/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
self.seq_scan().map(Scanner::Seq)
@@ -196,7 +206,8 @@ impl ScanRegion {
.with_predicate(Some(predicate))
.with_memtables(memtables)
.with_files(files)
.with_cache(self.cache_manager);
.with_cache(self.cache_manager)
.with_parallelism(self.parallelism);
Ok(seq_scan)
}