feat: use parallel reader accroding to source num

This commit is contained in:
evenyag
2023-11-29 20:35:09 +08:00
parent 81d1b31d65
commit 3c95b3bd44

View File

@@ -134,8 +134,9 @@ impl SeqScan {
/// Builds a stream for the query.
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
let start = Instant::now();
let use_parallel = self.use_parallel_reader();
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut reader = if self.parallelism > 1 {
let mut reader = if use_parallel {
self.build_parallel_reader().await?
} else {
self.build_reader().await?
@@ -155,7 +156,10 @@ impl SeqScan {
yield batch;
}
debug!("Seq scan finished, region_id: {:?}, metrics: {:?}", mapper.metadata().region_id, metrics);
debug!(
"Seq scan finished, region_id: {:?}, metrics: {:?}, use_parallel: {}",
mapper.metadata().region_id, metrics, use_parallel
);
// Update metrics.
READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.scan_cost.as_secs_f64());
};
@@ -211,8 +215,6 @@ impl SeqScan {
/// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel.
async fn build_parallel_reader(&self) -> Result<BoxedBatchReader> {
debug!("Build parallel reader, parallelism: {}", self.parallelism);
assert!(self.parallelism > 1);
let semaphore = Arc::new(Semaphore::new(self.parallelism));
@@ -261,6 +263,11 @@ impl SeqScan {
Ok(Box::new(builder.build().await?))
}
/// Returns whether to use a parallel reader.
fn use_parallel_reader(&self) -> bool {
self.parallelism > 1 && (self.files.len() + self.memtables.len()) > 1
}
/// Scan the input source in another task.
fn scan_source_in_background(mut input: Source, semaphore: Arc<Semaphore>) -> BoxedBatchStream {
let (sender, receiver) = mpsc::channel(64);