diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index c90ea89b90..aad883c51a 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -177,6 +177,7 @@ impl SeqScan { part_metrics, range_builder_list.clone(), &mut sources, + None, ) .await?; } @@ -210,6 +211,7 @@ impl SeqScan { part_metrics, range_builder_list.clone(), &mut sources, + None, ) .await?; } @@ -378,6 +380,7 @@ impl SeqScan { let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.stream_ctx.input.compaction; let distinguish_range = self.properties.distinguish_partition_range; + let file_scan_semaphore = if compaction { None } else { semaphore.clone() }; let stream = try_stream! { part_metrics.on_first_poll(); @@ -399,6 +402,7 @@ impl SeqScan { &part_metrics, range_builder_list.clone(), &mut sources, + file_scan_semaphore.clone(), ).await?; let mut metrics = ScannerMetrics::default(); @@ -475,6 +479,7 @@ impl SeqScan { let semaphore = self.new_semaphore(); let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.stream_ctx.input.compaction; + let file_scan_semaphore = if compaction { None } else { semaphore.clone() }; let stream = try_stream! { part_metrics.on_first_poll(); @@ -493,6 +498,7 @@ impl SeqScan { &part_metrics, range_builder_list.clone(), &mut sources, + file_scan_semaphore.clone(), ).await?; let mut metrics = ScannerMetrics::default(); @@ -682,6 +688,7 @@ pub(crate) async fn build_sources( part_metrics: &PartitionMetrics, range_builder_list: Arc, sources: &mut Vec, + semaphore: Option>, ) -> Result<()> { // Gets range meta. let range_meta = &stream_ctx.ranges[part_range.identifier]; @@ -699,35 +706,78 @@ pub(crate) async fn build_sources( } } - sources.reserve(range_meta.row_group_indices.len()); - for index in &range_meta.row_group_indices { - let stream = if stream_ctx.is_mem_range_index(*index) { + let read_type = if compaction { + "compaction" + } else { + "seq_scan_files" + }; + let num_indices = range_meta.row_group_indices.len(); + if num_indices == 0 { + return Ok(()); + } + + sources.reserve(num_indices); + let mut ordered_sources = Vec::with_capacity(num_indices); + ordered_sources.resize_with(num_indices, || None); + let mut file_scan_tasks = Vec::new(); + + for (position, index) in range_meta.row_group_indices.iter().enumerate() { + if stream_ctx.is_mem_range_index(*index) { let stream = scan_mem_ranges( stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range, ); - Box::pin(stream) as _ + ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _)); } else if stream_ctx.is_file_range_index(*index) { - let read_type = if compaction { - "compaction" + if let Some(semaphore_ref) = semaphore.as_ref() { + // run in parallel, controlled by semaphore + let stream_ctx = stream_ctx.clone(); + let part_metrics = part_metrics.clone(); + let range_builder_list = range_builder_list.clone(); + let semaphore = Arc::clone(semaphore_ref); + let row_group_index = *index; + file_scan_tasks.push(async move { + let _permit = semaphore.acquire().await.unwrap(); + let stream = scan_file_ranges( + stream_ctx, + part_metrics, + row_group_index, + read_type, + range_builder_list, + ) + .await?; + Ok((position, Source::Stream(Box::pin(stream) as _))) + }); } else { - "seq_scan_files" - }; - let stream = scan_file_ranges( - stream_ctx.clone(), - part_metrics.clone(), - *index, - read_type, - range_builder_list.clone(), - ) - .await?; - Box::pin(stream) as _ + // no semaphore, run sequentially + let stream = scan_file_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + read_type, + range_builder_list.clone(), + ) + .await?; + ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _)); + } } else { - scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await? - }; - sources.push(Source::Stream(stream)); + let stream = + scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?; + ordered_sources[position] = Some(Source::Stream(stream)); + } + } + + if !file_scan_tasks.is_empty() { + let results = futures::future::try_join_all(file_scan_tasks).await?; + for (position, source) in results { + ordered_sources[position] = Some(source); + } + } + + for source in ordered_sources.into_iter().flatten() { + sources.push(source); } Ok(()) } @@ -740,6 +790,7 @@ pub(crate) async fn build_flat_sources( part_metrics: &PartitionMetrics, range_builder_list: Arc, sources: &mut Vec, + semaphore: Option>, ) -> Result<()> { // Gets range meta. let range_meta = &stream_ctx.ranges[part_range.identifier]; @@ -757,29 +808,72 @@ pub(crate) async fn build_flat_sources( } } - sources.reserve(range_meta.row_group_indices.len()); - for index in &range_meta.row_group_indices { - let stream = if stream_ctx.is_mem_range_index(*index) { + let read_type = if compaction { + "compaction" + } else { + "seq_scan_files" + }; + let num_indices = range_meta.row_group_indices.len(); + if num_indices == 0 { + return Ok(()); + } + + sources.reserve(num_indices); + let mut ordered_sources = Vec::with_capacity(num_indices); + ordered_sources.resize_with(num_indices, || None); + let mut file_scan_tasks = Vec::new(); + + for (position, index) in range_meta.row_group_indices.iter().enumerate() { + if stream_ctx.is_mem_range_index(*index) { let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index); - Box::pin(stream) as _ + ordered_sources[position] = Some(Box::pin(stream) as _); } else if stream_ctx.is_file_range_index(*index) { - let read_type = if compaction { - "compaction" + if let Some(semaphore_ref) = semaphore.as_ref() { + // run in parallel, controlled by semaphore + let stream_ctx = stream_ctx.clone(); + let part_metrics = part_metrics.clone(); + let range_builder_list = range_builder_list.clone(); + let semaphore = Arc::clone(semaphore_ref); + let row_group_index = *index; + file_scan_tasks.push(async move { + let _permit = semaphore.acquire().await.unwrap(); + let stream = scan_flat_file_ranges( + stream_ctx, + part_metrics, + row_group_index, + read_type, + range_builder_list, + ) + .await?; + Ok((position, Box::pin(stream) as _)) + }); } else { - "seq_scan_files" - }; - let stream = scan_flat_file_ranges( - stream_ctx.clone(), - part_metrics.clone(), - *index, - read_type, - range_builder_list.clone(), - ) - .await?; - Box::pin(stream) as _ + // no semaphore, run sequentially + let stream = scan_flat_file_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + read_type, + range_builder_list.clone(), + ) + .await?; + ordered_sources[position] = Some(Box::pin(stream) as _); + } } else { - scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await? - }; + let stream = + scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?; + ordered_sources[position] = Some(stream); + } + } + + if !file_scan_tasks.is_empty() { + let results = futures::future::try_join_all(file_scan_tasks).await?; + for (position, stream) in results { + ordered_sources[position] = Some(stream); + } + } + + for stream in ordered_sources.into_iter().flatten() { sources.push(stream); } Ok(()) diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index a99e3c46bb..fe48df9e79 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -423,6 +423,7 @@ impl SeriesDistributor { &part_metrics, range_builder_list.clone(), &mut sources, + self.semaphore.clone(), ) .await?; } @@ -507,6 +508,7 @@ impl SeriesDistributor { &part_metrics, range_builder_list.clone(), &mut sources, + self.semaphore.clone(), ) .await?; }