diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 5281055b37..984faab350 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -134,8 +134,9 @@ impl SeqScan { /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { let start = Instant::now(); + let use_parallel = self.use_parallel_reader(); // Scans all memtables and SSTs. Builds a merge reader to merge results. - let mut reader = if self.parallelism > 1 { + let mut reader = if use_parallel { self.build_parallel_reader().await? } else { self.build_reader().await? @@ -155,7 +156,10 @@ impl SeqScan { yield batch; } - debug!("Seq scan finished, region_id: {:?}, metrics: {:?}", mapper.metadata().region_id, metrics); + debug!( + "Seq scan finished, region_id: {:?}, metrics: {:?}, use_parallel: {}", + mapper.metadata().region_id, metrics, use_parallel + ); // Update metrics. READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.scan_cost.as_secs_f64()); }; @@ -211,8 +215,6 @@ impl SeqScan { /// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel. async fn build_parallel_reader(&self) -> Result { - debug!("Build parallel reader, parallelism: {}", self.parallelism); - assert!(self.parallelism > 1); let semaphore = Arc::new(Semaphore::new(self.parallelism)); @@ -261,6 +263,11 @@ impl SeqScan { Ok(Box::new(builder.build().await?)) } + /// Returns whether to use a parallel reader. + fn use_parallel_reader(&self) -> bool { + self.parallelism > 1 && (self.files.len() + self.memtables.len()) > 1 + } + /// Scan the input source in another task. fn scan_source_in_background(mut input: Source, semaphore: Arc) -> BoxedBatchStream { let (sender, receiver) = mpsc::channel(64);