perf(mito): scan SSTs and memtables in parallel (#2852)

* feat: seq scan support parallelism

* feat: scan region by parallelism in config

* feat: enlarge channel size

* chore: parallel builder logs

* feat: use parallel reader accroding to source num

* chore: 128 channel size

* feat: add fetch cost metrics

* feat: add channel size to config

* feat: builder cost

* feat: logs

* feat: compiler error

* feat: fetch cost

* feat: convert cost

* chore: Revert "feat: logs"

This reverts commit 01e0df2c3a.

* chore: fix compiler errors

* feat: reduce channel size to 32

* chore: use workspace tokio-stream

* test: test scan in parallel

* chore: comment typos

* refactor: build all sources first

* test: test 0 parallelism

* feat: use parallel scan by default

* docs: update config example

* feat: log parallelism

* refactor: keep config in engine inner

* refactor: rename parallelism method

* docs: update docs

* test: fix config api test

* docs: update parallel scan comment

* feat: 0 for default parallelism
This commit is contained in:
Yingwen
2023-12-11 14:43:17 +08:00
committed by GitHub
parent 178018143d
commit 6a57f4975e
21 changed files with 398 additions and 51 deletions

1
Cargo.lock generated
View File

@@ -4898,6 +4898,7 @@ dependencies = [
"strum 0.25.0",
"table",
"tokio",
"tokio-stream",
"tokio-util",
"uuid",
]

View File

@@ -127,6 +127,7 @@ sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
tokio = { version = "1.28", features = ["full"] }
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.7"
tonic = { version = "0.10", features = ["tls"] }
@@ -168,7 +169,6 @@ frontend = { path = "src/frontend" }
log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
mito = { path = "src/mito" }
mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }

View File

@@ -89,6 +89,13 @@ vector_cache_size = "512MB"
page_cache_size = "512MB"
# Buffer size for SST writing.
sst_write_buffer_size = "8MB"
# Parallelism to scan a region (default: 1/4 of cpu cores).
# - 0: using the default value (1/4 of cpu cores).
# - 1: scan in current thread.
# - n: scan in parallelism n.
scan_parallelism = 0
# Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
parallel_scan_channel_size = 32
# Log options, see `standalone.example.toml`
# [logging]

View File

@@ -157,6 +157,13 @@ vector_cache_size = "512MB"
page_cache_size = "512MB"
# Buffer size for SST writing.
sst_write_buffer_size = "8MB"
# Parallelism to scan a region (default: 1/4 of cpu cores).
# - 0: using the default value (1/4 of cpu cores).
# - 1: scan in current thread.
# - n: scan in parallelism n.
scan_parallelism = 0
# Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
parallel_scan_channel_size = 32
# Log options
# [logging]

View File

@@ -35,7 +35,7 @@ prost.workspace = true
rand.workspace = true
session.workspace = true
snafu.workspace = true
tokio-stream = { version = "0.1", features = ["net"] }
tokio-stream = { workspace = true, features = ["net"] }
tokio.workspace = true
tonic.workspace = true

View File

@@ -62,7 +62,7 @@ sql.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio-stream = { version = "0.1", features = ["net"] }
tokio-stream = { workspace = true, features = ["net"] }
tokio.workspace = true
toml.workspace = true
tonic.workspace = true

View File

@@ -20,7 +20,7 @@ serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
table.workspace = true
tokio-stream = { version = "0.1", features = ["net"] }
tokio-stream = { workspace = true, features = ["net"] }
tokio.workspace = true
tonic.workspace = true

View File

@@ -49,7 +49,7 @@ snafu.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio-stream = { version = "0.1", features = ["net"] }
tokio-stream = { workspace = true, features = ["net"] }
tokio.workspace = true
toml.workspace = true
tonic.workspace = true

View File

@@ -58,6 +58,7 @@ snafu.workspace = true
store-api.workspace = true
strum.workspace = true
table.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tokio.workspace = true
uuid.workspace = true

View File

@@ -24,13 +24,16 @@ use serde::{Deserialize, Serialize};
const DEFAULT_MAX_BG_JOB: usize = 4;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct MitoConfig {
// Worker configs:
/// Number of region workers (default 1).
/// Number of region workers (default: 1/2 of cpu cores).
/// Sets to 0 to use the default value.
pub num_workers: usize,
/// Request channel size of each worker (default 128).
pub worker_channel_size: usize,
@@ -68,12 +71,19 @@ pub struct MitoConfig {
// Other configs:
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
/// Parallelism to scan a region (default: 1/4 of cpu cores).
/// - 0: using the default value (1/4 of cpu cores).
/// - 1: scan in current thread.
/// - n: scan in parallelism n.
pub scan_parallelism: usize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
}
impl Default for MitoConfig {
fn default() -> Self {
MitoConfig {
num_workers: num_cpus::get() / 2,
num_workers: divide_num_cpus(2),
worker_channel_size: 128,
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
@@ -86,6 +96,8 @@ impl Default for MitoConfig {
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
sst_write_buffer_size: ReadableSize::mb(8),
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
}
}
}
@@ -93,16 +105,9 @@ impl Default for MitoConfig {
impl MitoConfig {
/// Sanitize incorrect configurations.
pub(crate) fn sanitize(&mut self) {
// Sanitize worker num.
let num_workers_before = self.num_workers;
// Use default value if `num_workers` is 0.
if self.num_workers == 0 {
self.num_workers = (num_cpus::get() / 2).max(1);
}
if num_workers_before != self.num_workers {
warn!(
"Sanitize worker num {} to {}",
num_workers_before, self.num_workers
);
self.num_workers = divide_num_cpus(2);
}
// Sanitize channel size.
@@ -131,5 +136,27 @@ impl MitoConfig {
self.sst_write_buffer_size
);
}
// Use default value if `scan_parallelism` is 0.
if self.scan_parallelism == 0 {
self.scan_parallelism = divide_num_cpus(4);
}
if self.parallel_scan_channel_size < 1 {
self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
warn!(
"Sanitize scan channel size to {}",
self.parallel_scan_channel_size
);
}
}
}
/// Divide cpu num by a non-zero `divisor` and returns at least 1.
fn divide_num_cpus(divisor: usize) -> usize {
debug_assert!(divisor > 0);
let cores = num_cpus::get();
debug_assert!(cores > 0);
(cores + divisor - 1) / divisor
}

View File

@@ -33,6 +33,8 @@ pub mod listener;
#[cfg(test)]
mod open_test;
#[cfg(test)]
mod parallel_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod prune_test;
@@ -40,6 +42,7 @@ mod prune_test;
mod set_readonly_test;
#[cfg(test)]
mod truncate_test;
use std::sync::Arc;
use async_trait::async_trait;
@@ -56,7 +59,7 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::error::{RecvSnafu, RegionNotFoundSnafu, Result};
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner};
use crate::region::RegionUsage;
use crate::request::WorkerRequest;
use crate::worker::WorkerGroup;
@@ -114,6 +117,8 @@ impl MitoEngine {
struct EngineInner {
/// Region workers group.
workers: WorkerGroup,
/// Config of the engine.
config: Arc<MitoConfig>,
}
impl EngineInner {
@@ -123,8 +128,10 @@ impl EngineInner {
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
) -> EngineInner {
let config = Arc::new(config);
EngineInner {
workers: WorkerGroup::start(config, log_store, object_store_manager),
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager),
config,
}
}
@@ -171,12 +178,18 @@ impl EngineInner {
let version = region.version();
// Get cache.
let cache_manager = self.workers.cache_manager();
let scan_parallelism = ScanParallism {
parallelism: self.config.scan_parallelism,
channel_size: self.config.parallel_scan_channel_size,
};
let scan_region = ScanRegion::new(
version,
region.access_layer.clone(),
request,
Some(cache_manager),
);
)
.with_parallelism(scan_parallelism);
scan_region.scanner()
}
@@ -303,15 +316,17 @@ impl MitoEngine {
) -> MitoEngine {
config.sanitize();
let config = Arc::new(config);
MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(
config,
config.clone(),
log_store,
object_store_manager,
write_buffer_manager,
listener,
),
config,
}),
}
}

View File

@@ -0,0 +1,133 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Tests for parallel scan.
use std::collections::HashMap;
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util::{
build_delete_rows_for_key, build_rows_for_key, delete_rows, delete_rows_schema, flush_region,
put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
async fn scan_in_parallel(
env: &mut TestEnv,
region_id: RegionId,
region_dir: &str,
parallelism: usize,
channel_size: usize,
) {
let engine = env
.open_engine(MitoConfig {
scan_parallelism: parallelism,
parallel_scan_channel_size: channel_size,
..Default::default()
})
.await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: region_dir.to_string(),
options: HashMap::default(),
}),
)
.await
.unwrap();
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
| b | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_parallel_scan() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();
let column_schemas = rows_schema(&request);
let delete_schema = delete_rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
// SST0
flush_region(&engine, region_id, None).await;
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
// SST1
flush_region(&engine, region_id, None).await;
// Delete (a, 2)
let rows = Rows {
schema: delete_schema.clone(),
rows: build_delete_rows_for_key("a", 2, 3),
};
delete_rows(&engine, region_id, rows).await;
// SST2
flush_region(&engine, region_id, None).await;
// Delete (b, 0), (b, 1)
let rows = Rows {
schema: delete_schema,
rows: build_delete_rows_for_key("b", 0, 2),
};
delete_rows(&engine, region_id, rows).await;
engine.stop().await.unwrap();
scan_in_parallel(&mut env, region_id, &region_dir, 0, 1).await;
scan_in_parallel(&mut env, region_id, &region_dir, 1, 1).await;
scan_in_parallel(&mut env, region_id, &region_dir, 2, 1).await;
scan_in_parallel(&mut env, region_id, &region_dir, 2, 8).await;
scan_in_parallel(&mut env, region_id, &region_dir, 4, 8).await;
scan_in_parallel(&mut env, region_id, &region_dir, 8, 2).await;
}

View File

@@ -39,6 +39,8 @@ use datatypes::vectors::{
TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
Vector, VectorRef,
};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber};
@@ -668,6 +670,8 @@ pub enum Source {
Reader(BoxedBatchReader),
/// Source from a [BoxedBatchIterator].
Iter(BoxedBatchIterator),
/// Source from a [BoxedBatchStream].
Stream(BoxedBatchStream),
}
impl Source {
@@ -676,6 +680,7 @@ impl Source {
match self {
Source::Reader(reader) => reader.next_batch().await,
Source::Iter(iter) => iter.next().transpose(),
Source::Stream(stream) => stream.try_next().await,
}
}
}
@@ -698,6 +703,9 @@ pub trait BatchReader: Send {
/// Pointer to [BatchReader].
pub type BoxedBatchReader = Box<dyn BatchReader>;
/// Pointer to a stream that yields [Batch].
pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;
#[async_trait::async_trait]
impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
async fn next_batch(&mut self) -> Result<Option<Batch>> {

View File

@@ -329,6 +329,11 @@ impl MergeReaderBuilder {
MergeReaderBuilder::default()
}
/// Creates a builder from sources.
pub fn from_sources(sources: Vec<Source>) -> MergeReaderBuilder {
MergeReaderBuilder { sources }
}
/// Pushes a batch reader to sources.
pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self {
self.sources.push(Source::Reader(reader));
@@ -365,6 +370,8 @@ struct Metrics {
num_output_rows: usize,
/// Number of deleted rows.
num_deleted_rows: usize,
/// Cost to fetch batches from sources.
fetch_cost: Duration,
}
/// A `Node` represent an individual input data source to be merged.
@@ -383,7 +390,9 @@ impl Node {
/// It tries to fetch one batch from the `source`.
async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
// Ensures batch is not empty.
let start = Instant::now();
let current_batch = source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0);
Ok(Node {
@@ -420,8 +429,10 @@ impl Node {
/// Panics if the node has reached EOF.
async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result<Batch> {
let current = self.current_batch.take().unwrap();
let start = Instant::now();
// Ensures batch is not empty.
self.current_batch = self.source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += self
.current_batch
.as_ref()

View File

@@ -115,6 +115,8 @@ pub(crate) struct ScanRegion {
request: ScanRequest,
/// Cache.
cache_manager: Option<CacheManagerRef>,
/// Parallelism to scan.
parallelism: ScanParallism,
}
impl ScanRegion {
@@ -130,9 +132,17 @@ impl ScanRegion {
access_layer,
request,
cache_manager,
parallelism: ScanParallism::default(),
}
}
/// Sets parallelism.
#[must_use]
pub(crate) fn with_parallelism(mut self, parallelism: ScanParallism) -> Self {
self.parallelism = parallelism;
self
}
/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
self.seq_scan().map(Scanner::Seq)
@@ -196,7 +206,8 @@ impl ScanRegion {
.with_predicate(Some(predicate))
.with_memtables(memtables)
.with_files(files)
.with_cache(self.cache_manager);
.with_cache(self.cache_manager)
.with_parallelism(self.parallelism);
Ok(seq_scan)
}
@@ -215,6 +226,22 @@ impl ScanRegion {
}
}
/// Config for parallel scan.
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct ScanParallism {
/// Number of tasks expect to spawn to read data.
pub(crate) parallelism: usize,
/// Channel size to send batches. Only takes effect when the parallelism > 1.
pub(crate) channel_size: usize,
}
impl ScanParallism {
/// Returns true if we allow parallel scan.
pub(crate) fn allow_parallel_scan(&self) -> bool {
self.parallelism > 1
}
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {

View File

@@ -25,6 +25,8 @@ use common_telemetry::{debug, error};
use common_time::range::TimestampRange;
use snafu::ResultExt;
use table::predicate::Predicate;
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::{CacheManager, CacheManagerRef};
@@ -34,7 +36,8 @@ use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::compat::{self, CompatReader};
use crate::read::merge::MergeReaderBuilder;
use crate::read::projection::ProjectionMapper;
use crate::read::{BatchReader, BoxedBatchReader};
use crate::read::scan_region::ScanParallism;
use crate::read::{BatchReader, BoxedBatchReader, BoxedBatchStream, Source};
use crate::sst::file::FileHandle;
/// Scans a region and returns rows in a sorted sequence.
@@ -57,6 +60,8 @@ pub struct SeqScan {
cache_manager: Option<CacheManagerRef>,
/// Ignores file not found error.
ignore_file_not_found: bool,
/// Parallelism to scan data.
parallelism: ScanParallism,
}
impl SeqScan {
@@ -72,6 +77,7 @@ impl SeqScan {
files: Vec::new(),
cache_manager: None,
ignore_file_not_found: false,
parallelism: ScanParallism::default(),
}
}
@@ -117,18 +123,32 @@ impl SeqScan {
self
}
/// Sets scan parallelism.
#[must_use]
pub(crate) fn with_parallelism(mut self, parallelism: ScanParallism) -> Self {
self.parallelism = parallelism;
self
}
/// Builds a stream for the query.
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
let start = Instant::now();
let mut metrics = Metrics::default();
let use_parallel = self.use_parallel_reader();
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut reader = self.build_reader().await?;
let mut metrics = Metrics {
scan_cost: start.elapsed(),
let mut reader = if use_parallel {
self.build_parallel_reader().await?
} else {
self.build_reader().await?
};
let elapsed = start.elapsed();
metrics.build_reader_cost = elapsed;
metrics.scan_cost = elapsed;
// Creates a stream to poll the batch reader and convert batch into record batch.
let mapper = self.mapper.clone();
let cache_manager = self.cache_manager.clone();
let parallelism = self.parallelism.parallelism;
let stream = try_stream! {
let cache = cache_manager.as_ref().map(|cache| cache.as_ref());
while let Some(batch) =
@@ -137,7 +157,10 @@ impl SeqScan {
yield batch;
}
debug!("Seq scan finished, region_id: {:?}, metrics: {:?}", mapper.metadata().region_id, metrics);
debug!(
"Seq scan finished, region_id: {:?}, metrics: {:?}, use_parallel: {}, parallelism: {}",
mapper.metadata().region_id, metrics, use_parallel, parallelism,
);
// Update metrics.
READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.scan_cost.as_secs_f64());
};
@@ -152,10 +175,35 @@ impl SeqScan {
/// Builds a [BoxedBatchReader] from sequential scan.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut builder = MergeReaderBuilder::new();
let sources = self.build_sources().await?;
let mut builder = MergeReaderBuilder::from_sources(sources);
Ok(Box::new(builder.build().await?))
}
/// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel.
async fn build_parallel_reader(&self) -> Result<BoxedBatchReader> {
assert!(self.parallelism.allow_parallel_scan());
// Scall all memtables and SSTs.
let sources = self.build_sources().await?;
let semaphore = Arc::new(Semaphore::new(self.parallelism.parallelism));
// Spawn a task for each source.
let sources = sources
.into_iter()
.map(|source| {
let stream = self.spawn_scan_task(source, semaphore.clone());
Source::Stream(stream)
})
.collect();
let mut builder = MergeReaderBuilder::from_sources(sources);
Ok(Box::new(builder.build().await?))
}
/// Builds and returns sources to read.
async fn build_sources(&self) -> Result<Vec<Source>> {
let mut sources = Vec::with_capacity(self.memtables.len() + self.files.len());
for mem in &self.memtables {
let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone());
builder.push_batch_iter(iter);
sources.push(Source::Iter(iter));
}
for file in &self.files {
let maybe_reader = self
@@ -179,16 +227,50 @@ impl SeqScan {
}
};
if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) {
builder.push_batch_reader(Box::new(reader));
sources.push(Source::Reader(Box::new(reader)));
} else {
// They have different schema. We need to adapt the batch first so the
// mapper can convert the it.
// mapper can convert it.
let compat_reader =
CompatReader::new(&self.mapper, reader.metadata().clone(), reader)?;
builder.push_batch_reader(Box::new(compat_reader));
sources.push(Source::Reader(Box::new(compat_reader)));
}
}
Ok(Box::new(builder.build().await?))
Ok(sources)
}
/// Returns whether to use a parallel reader.
fn use_parallel_reader(&self) -> bool {
self.parallelism.allow_parallel_scan() && (self.files.len() + self.memtables.len()) > 1
}
/// Scan the input source in another task.
fn spawn_scan_task(&self, mut input: Source, semaphore: Arc<Semaphore>) -> BoxedBatchStream {
let (sender, receiver) = mpsc::channel(self.parallelism.channel_size);
tokio::spawn(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit holded
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
input.next_batch().await
};
match maybe_batch {
Ok(Some(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Ok(None) => break,
Err(e) => {
let _ = sender.send(Err(e)).await;
break;
}
}
}
});
Box::pin(ReceiverStream::new(receiver))
}
/// Fetch a batch from the reader and convert it into a record batch.
@@ -211,7 +293,9 @@ impl SeqScan {
return Ok(None);
};
let convert_start = Instant::now();
let record_batch = mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
metrics.scan_cost += start.elapsed();
Ok(Some(record_batch))
@@ -221,8 +305,12 @@ impl SeqScan {
/// Metrics for [SeqScan].
#[derive(Debug, Default)]
struct Metrics {
/// Duration to build the reader.
build_reader_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration to convert batches.
convert_cost: Duration,
}
#[cfg(test)]

View File

@@ -196,6 +196,15 @@ impl TestEnv {
)
}
/// Open the engine.
pub async fn open_engine(&mut self, config: MitoConfig) -> MitoEngine {
MitoEngine::new(
config,
self.logstore.clone().unwrap(),
self.object_store_manager.clone().unwrap(),
)
}
/// Only initializes the object store manager, returns the default object store.
pub fn init_object_store_manager(&mut self) -> ObjectStore {
self.object_store_manager = Some(Arc::new(self.create_object_store_manager()));
@@ -206,7 +215,11 @@ impl TestEnv {
pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
WorkerGroup::start(config, Arc::new(log_store), Arc::new(object_store_manager))
WorkerGroup::start(
Arc::new(config),
Arc::new(log_store),
Arc::new(object_store_manager),
)
}
async fn create_log_and_object_store_manager(

View File

@@ -111,11 +111,10 @@ impl WorkerGroup {
///
/// The number of workers should be power of two.
pub(crate) fn start<S: LogStore>(
config: MitoConfig,
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
) -> WorkerGroup {
let config = Arc::new(config);
let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
));
@@ -205,13 +204,12 @@ impl WorkerGroup {
///
/// The number of workers should be power of two.
pub(crate) fn start_for_test<S: LogStore>(
config: MitoConfig,
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerGroup {
let config = Arc::new(config);
let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,

View File

@@ -72,4 +72,4 @@ stats-cli = "3.0"
store-api.workspace = true
streaming-stats = "0.2"
table = { workspace = true, features = ["testing"] }
tokio-stream = "0.1"
tokio-stream.workspace = true

View File

@@ -86,7 +86,7 @@ sql.workspace = true
strum.workspace = true
table.workspace = true
tokio-rustls = "0.24"
tokio-stream = { version = "0.1", features = ["net"] }
tokio-stream = { workspace = true, features = ["net"] }
tokio.workspace = true
tonic-reflection = "0.10"
tonic.workspace = true

View File

@@ -717,7 +717,6 @@ providers = []
[[datanode.region_engine]]
[datanode.region_engine.mito]
num_workers = {}
worker_channel_size = 128
worker_request_batch_size = 64
manifest_checkpoint_distance = 10
@@ -730,6 +729,7 @@ sst_meta_cache_size = "128MiB"
vector_cache_size = "512MiB"
page_cache_size = "512MiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
[[datanode.region_engine]]
@@ -741,27 +741,38 @@ enable_otlp_tracing = false
[logging]
enable_otlp_tracing = false"#,
store_type,
num_cpus::get() / 2
);
let body_text = drop_lines_with_inconsistent_results(res_get.text().await);
assert_eq!(body_text, expected_toml_str);
}
fn drop_lines_with_inconsistent_results(input: String) -> String {
let inconsistent_results = [
"dir =",
"data_home =",
"bucket =",
"root =",
"endpoint =",
"region =",
"cache_path =",
"cache_capacity =",
"sas_token =",
"scope =",
"num_workers =",
"scan_parallelism =",
];
input
.lines()
.filter(|line| {
// ignores
!line.trim().starts_with("dir =")
&& !line.trim().starts_with("data_home =")
&& !line.trim().starts_with("bucket =")
&& !line.trim().starts_with("root =")
&& !line.trim().starts_with("endpoint =")
&& !line.trim().starts_with("region =")
&& !line.trim().starts_with("cache_path =")
&& !line.trim().starts_with("cache_capacity =")
&& !line.trim().starts_with("sas_token =")
&& !line.trim().starts_with("scope =")
let line = line.trim();
for prefix in inconsistent_results {
if line.starts_with(prefix) {
return false;
}
}
true
})
.collect::<Vec<&str>>()
.join(