feat: build partition sources in parallel (#7243)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-11-17 19:44:48 +08:00
committed by GitHub
parent 6c93c7d299
commit 1eb8d6b76b
2 changed files with 135 additions and 39 deletions

View File

@@ -177,6 +177,7 @@ impl SeqScan {
part_metrics,
range_builder_list.clone(),
&mut sources,
None,
)
.await?;
}
@@ -210,6 +211,7 @@ impl SeqScan {
part_metrics,
range_builder_list.clone(),
&mut sources,
None,
)
.await?;
}
@@ -378,6 +380,7 @@ impl SeqScan {
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.stream_ctx.input.compaction;
let distinguish_range = self.properties.distinguish_partition_range;
let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -399,6 +402,7 @@ impl SeqScan {
&part_metrics,
range_builder_list.clone(),
&mut sources,
file_scan_semaphore.clone(),
).await?;
let mut metrics = ScannerMetrics::default();
@@ -475,6 +479,7 @@ impl SeqScan {
let semaphore = self.new_semaphore();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.stream_ctx.input.compaction;
let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -493,6 +498,7 @@ impl SeqScan {
&part_metrics,
range_builder_list.clone(),
&mut sources,
file_scan_semaphore.clone(),
).await?;
let mut metrics = ScannerMetrics::default();
@@ -682,6 +688,7 @@ pub(crate) async fn build_sources(
part_metrics: &PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
sources: &mut Vec<Source>,
semaphore: Option<Arc<Semaphore>>,
) -> Result<()> {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
@@ -699,35 +706,78 @@ pub(crate) async fn build_sources(
}
}
sources.reserve(range_meta.row_group_indices.len());
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {
let read_type = if compaction {
"compaction"
} else {
"seq_scan_files"
};
let num_indices = range_meta.row_group_indices.len();
if num_indices == 0 {
return Ok(());
}
sources.reserve(num_indices);
let mut ordered_sources = Vec::with_capacity(num_indices);
ordered_sources.resize_with(num_indices, || None);
let mut file_scan_tasks = Vec::new();
for (position, index) in range_meta.row_group_indices.iter().enumerate() {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
range_meta.time_range,
);
Box::pin(stream) as _
ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
} else if stream_ctx.is_file_range_index(*index) {
let read_type = if compaction {
"compaction"
if let Some(semaphore_ref) = semaphore.as_ref() {
// run in parallel, controlled by semaphore
let stream_ctx = stream_ctx.clone();
let part_metrics = part_metrics.clone();
let range_builder_list = range_builder_list.clone();
let semaphore = Arc::clone(semaphore_ref);
let row_group_index = *index;
file_scan_tasks.push(async move {
let _permit = semaphore.acquire().await.unwrap();
let stream = scan_file_ranges(
stream_ctx,
part_metrics,
row_group_index,
read_type,
range_builder_list,
)
.await?;
Ok((position, Source::Stream(Box::pin(stream) as _)))
});
} else {
"seq_scan_files"
};
let stream = scan_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
range_builder_list.clone(),
)
.await?;
Box::pin(stream) as _
// no semaphore, run sequentially
let stream = scan_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
range_builder_list.clone(),
)
.await?;
ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
}
} else {
scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
};
sources.push(Source::Stream(stream));
let stream =
scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
ordered_sources[position] = Some(Source::Stream(stream));
}
}
if !file_scan_tasks.is_empty() {
let results = futures::future::try_join_all(file_scan_tasks).await?;
for (position, source) in results {
ordered_sources[position] = Some(source);
}
}
for source in ordered_sources.into_iter().flatten() {
sources.push(source);
}
Ok(())
}
@@ -740,6 +790,7 @@ pub(crate) async fn build_flat_sources(
part_metrics: &PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
sources: &mut Vec<BoxedRecordBatchStream>,
semaphore: Option<Arc<Semaphore>>,
) -> Result<()> {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
@@ -757,29 +808,72 @@ pub(crate) async fn build_flat_sources(
}
}
sources.reserve(range_meta.row_group_indices.len());
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {
let read_type = if compaction {
"compaction"
} else {
"seq_scan_files"
};
let num_indices = range_meta.row_group_indices.len();
if num_indices == 0 {
return Ok(());
}
sources.reserve(num_indices);
let mut ordered_sources = Vec::with_capacity(num_indices);
ordered_sources.resize_with(num_indices, || None);
let mut file_scan_tasks = Vec::new();
for (position, index) in range_meta.row_group_indices.iter().enumerate() {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
Box::pin(stream) as _
ordered_sources[position] = Some(Box::pin(stream) as _);
} else if stream_ctx.is_file_range_index(*index) {
let read_type = if compaction {
"compaction"
if let Some(semaphore_ref) = semaphore.as_ref() {
// run in parallel, controlled by semaphore
let stream_ctx = stream_ctx.clone();
let part_metrics = part_metrics.clone();
let range_builder_list = range_builder_list.clone();
let semaphore = Arc::clone(semaphore_ref);
let row_group_index = *index;
file_scan_tasks.push(async move {
let _permit = semaphore.acquire().await.unwrap();
let stream = scan_flat_file_ranges(
stream_ctx,
part_metrics,
row_group_index,
read_type,
range_builder_list,
)
.await?;
Ok((position, Box::pin(stream) as _))
});
} else {
"seq_scan_files"
};
let stream = scan_flat_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
range_builder_list.clone(),
)
.await?;
Box::pin(stream) as _
// no semaphore, run sequentially
let stream = scan_flat_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
range_builder_list.clone(),
)
.await?;
ordered_sources[position] = Some(Box::pin(stream) as _);
}
} else {
scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
};
let stream =
scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
ordered_sources[position] = Some(stream);
}
}
if !file_scan_tasks.is_empty() {
let results = futures::future::try_join_all(file_scan_tasks).await?;
for (position, stream) in results {
ordered_sources[position] = Some(stream);
}
}
for stream in ordered_sources.into_iter().flatten() {
sources.push(stream);
}
Ok(())

View File

@@ -423,6 +423,7 @@ impl SeriesDistributor {
&part_metrics,
range_builder_list.clone(),
&mut sources,
self.semaphore.clone(),
)
.await?;
}
@@ -507,6 +508,7 @@ impl SeriesDistributor {
&part_metrics,
range_builder_list.clone(),
&mut sources,
self.semaphore.clone(),
)
.await?;
}