feat: remove scan_parallelism

This commit is contained in:
evenyag
2024-12-03 22:06:35 +08:00
parent a00a766869
commit c3af2b1f90
8 changed files with 69 additions and 77 deletions

View File

@@ -69,7 +69,6 @@ fn test_load_datanode_example_config() {
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
}),
@@ -205,7 +204,6 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
scan_parallelism: 0,
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),

View File

@@ -30,7 +30,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
@@ -104,11 +104,6 @@ pub struct MitoConfig {
// Other configs:
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
/// Parallelism to scan a region (default: 1/4 of cpu cores).
/// - 0: using the default value (1/4 of cpu cores).
/// - 1: scan in current thread.
/// - n: scan in parallelism n.
pub scan_parallelism: usize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
@@ -153,7 +148,6 @@ impl Default for MitoConfig {
experimental_write_cache_size: ReadableSize::gb(1),
experimental_write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
index: IndexConfig::default(),
@@ -226,11 +220,6 @@ impl MitoConfig {
);
}
// Use default value if `scan_parallelism` is 0.
if self.scan_parallelism == 0 {
self.scan_parallelism = divide_num_cpus(4);
}
if self.parallel_scan_channel_size < 1 {
self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
warn!(

View File

@@ -90,7 +90,7 @@ use crate::error::{
};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallelism, ScanRegion, Scanner};
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
@@ -171,19 +171,9 @@ impl MitoEngine {
self.scan_region(region_id, request)?.scanner()
}
/// Returns a region scanner to scan the region for `request`.
fn region_scanner(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef> {
let scanner = self.scanner(region_id, request)?;
scanner.region_scanner()
}
/// Scans a region.
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
self.inner.handle_query(region_id, request)
self.inner.scan_region(region_id, request)
}
/// Edit region's metadata by [RegionEdit] directly. Use with care.
@@ -423,7 +413,7 @@ impl EngineInner {
}
/// Handles the scan `request` and returns a [ScanRegion].
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
let query_start = Instant::now();
// Reading a region doesn't need to go through the region worker thread.
let region = self
@@ -433,14 +423,10 @@ impl EngineInner {
let version = region.version();
// Get cache.
let cache_manager = self.workers.cache_manager();
let scan_parallelism = ScanParallelism {
parallelism: self.config.scan_parallelism,
channel_size: self.config.parallel_scan_channel_size,
};
let scan_region =
ScanRegion::new(version, region.access_layer.clone(), request, cache_manager)
.with_parallelism(scan_parallelism)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);
@@ -538,7 +524,9 @@ impl RegionEngine for MitoEngine {
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
self.region_scanner(region_id, request)
self.scan_region(region_id, request)
.map_err(BoxedError::new)?
.region_scanner()
.map_err(BoxedError::new)
}

View File

@@ -92,7 +92,6 @@ async fn test_append_mode_compaction() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
scan_parallelism: 2,
..Default::default()
})
.await;
@@ -176,19 +175,19 @@ async fn test_append_mode_compaction() {
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
// Scans in parallel.
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Reopens engine with parallelism 1.
// Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
scan_parallelism: 1,
..Default::default()
},
)

View File

@@ -92,7 +92,6 @@ async fn test_merge_mode_compaction() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
scan_parallelism: 2,
..Default::default()
})
.await;
@@ -190,19 +189,19 @@ async fn test_merge_mode_compaction() {
| a | | 13.0 | 1970-01-01T00:00:03 |
+-------+---------+---------+---------------------+";
// Scans in parallel.
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Reopens engine with parallelism 1.
// Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
scan_parallelism: 1,
..Default::default()
},
)

View File

@@ -37,7 +37,6 @@ async fn scan_in_parallel(
) {
let engine = env
.open_engine(MitoConfig {
scan_parallelism: parallelism,
parallel_scan_channel_size: channel_size,
..Default::default()
})
@@ -57,7 +56,9 @@ async fn scan_in_parallel(
.unwrap();
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let mut scanner = engine.scanner(region_id, request).unwrap();
scanner.set_target_partitions(parallelism);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+

View File

@@ -33,6 +33,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::metrics::READ_SST_COUNT;
@@ -68,15 +69,6 @@ impl Scanner {
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
}
}
/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)),
Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)),
}
}
}
#[cfg(test)]
@@ -104,6 +96,17 @@ impl Scanner {
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
}
}
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
let request = PrepareRequest::default().with_target_partitions(target_partitions);
match self {
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -165,8 +168,8 @@ pub(crate) struct ScanRegion {
request: ScanRequest,
/// Cache.
cache_manager: CacheManagerRef,
/// Parallelism to scan.
parallelism: ScanParallelism,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size: usize,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
/// Whether to ignore fulltext index.
@@ -188,17 +191,20 @@ impl ScanRegion {
access_layer,
request,
cache_manager,
parallelism: ScanParallelism::default(),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
ignore_inverted_index: false,
ignore_fulltext_index: false,
start_time: None,
}
}
/// Sets parallelism.
/// Sets parallel scan task channel size.
#[must_use]
pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self {
self.parallelism = parallelism;
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
@@ -224,7 +230,7 @@ impl ScanRegion {
/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
if self.version.options.append_mode && self.request.series_row_selector.is_none() {
if self.use_unordered_scan() {
// If table is append only and there is no series row selector, we use unordered scan in query.
// We still use seq scan in compaction.
self.unordered_scan().map(Scanner::Unordered)
@@ -233,6 +239,16 @@ impl ScanRegion {
}
}
/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
if self.use_unordered_scan() {
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
} else {
self.seq_scan().map(|scanner| Box::new(scanner) as _)
}
}
/// Scan sequentially.
pub(crate) fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input(true)?;
@@ -251,6 +267,13 @@ impl ScanRegion {
Ok(SeqScan::new(input, false))
}
/// Returns true if the region can use unordered scan for current request.
fn use_unordered_scan(&self) -> bool {
// If table is append only and there is no series row selector, we use unordered scan in query.
// We still use seq scan in compaction.
self.version.options.append_mode && self.request.series_row_selector.is_none()
}
/// Creates a scan input.
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
let time_range = self.build_time_range_predicate();
@@ -314,7 +337,7 @@ impl ScanRegion {
.with_cache(self.cache_manager)
.with_inverted_index_applier(inverted_index_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_parallelism(self.parallelism)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted)
@@ -428,15 +451,6 @@ impl ScanRegion {
}
}
/// Config for parallel scan.
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct ScanParallelism {
/// Number of tasks expect to spawn to read data.
pub(crate) parallelism: usize,
/// Channel size to send batches. Only takes effect when the parallelism > 1.
pub(crate) channel_size: usize,
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
@@ -466,8 +480,8 @@ pub(crate) struct ScanInput {
pub(crate) cache_manager: CacheManagerRef,
/// Ignores file not found error.
ignore_file_not_found: bool,
/// Parallelism to scan data.
pub(crate) parallelism: ScanParallelism,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
pub(crate) parallel_scan_channel_size: usize,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
@@ -496,7 +510,7 @@ impl ScanInput {
files: Vec::new(),
cache_manager: CacheManagerRef::default(),
ignore_file_not_found: false,
parallelism: ScanParallelism::default(),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
inverted_index_applier: None,
fulltext_index_applier: None,
query_start: None,
@@ -549,10 +563,13 @@ impl ScanInput {
self
}
/// Sets scan parallelism.
/// Sets scan task channel size.
#[must_use]
pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self {
self.parallelism = parallelism;
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
@@ -629,7 +646,7 @@ impl ScanInput {
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallelism.channel_size);
let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
self.spawn_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)

View File

@@ -197,7 +197,8 @@ impl SeqScan {
let stream_ctx = self.stream_ctx.clone();
let semaphore = if self.properties.target_partitions() > self.properties.num_partitions() {
// We can use addtional tasks to read the data. This semaphore is partition level.
// We can use addtional tasks to read the data if we have more target partitions than acutal partitions.
// This semaphore is partition level.
// We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
// of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
// files in a part range.