perf: parallelize file source region (#7285)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-11-24 19:37:48 +08:00
committed by GitHub
parent d180cc8f4b
commit b32ca3ad86
6 changed files with 26 additions and 1 deletions

View File

@@ -609,6 +609,10 @@ impl SeqScan {
}
impl RegionScanner for SeqScan {
fn name(&self) -> &str {
"SeqScan"
}
fn properties(&self) -> &ScannerProperties {
&self.properties
}

View File

@@ -284,6 +284,10 @@ fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
}
impl RegionScanner for SeriesScan {
fn name(&self) -> &str {
"SeriesScan"
}
fn properties(&self) -> &ScannerProperties {
&self.properties
}

View File

@@ -399,6 +399,10 @@ impl UnorderedScan {
}
impl RegionScanner for UnorderedScan {
fn name(&self) -> &str {
"UnorderedScan"
}
fn properties(&self) -> &ScannerProperties {
&self.properties
}

View File

@@ -62,7 +62,9 @@ impl ParallelizeScan {
plan.as_any().downcast_ref::<RegionScanExec>()
{
let expected_partition_num = config.execution.target_partitions;
if region_scan_exec.is_partition_set() {
if region_scan_exec.is_partition_set()
|| region_scan_exec.scanner_type().as_str() == "SinglePartition"
{
return Ok(Transformed::no(plan));
}

View File

@@ -421,6 +421,8 @@ pub struct QueryScanContext {
/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
/// You can use this trait to implement an [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan).
pub trait RegionScanner: Debug + DisplayAs + Send {
fn name(&self) -> &str;
/// Returns the properties of the scanner.
fn properties(&self) -> &ScannerProperties;
@@ -866,6 +868,10 @@ impl Debug for SinglePartitionScanner {
}
impl RegionScanner for SinglePartitionScanner {
fn name(&self) -> &str {
"SinglePartition"
}
fn properties(&self) -> &ScannerProperties {
&self.properties
}

View File

@@ -223,6 +223,11 @@ impl RegionScanExec {
self.is_partition_set
}
pub fn scanner_type(&self) -> String {
let scanner = self.scanner.lock().unwrap();
scanner.name().to_string()
}
/// Update the partition ranges of underlying scanner.
pub fn with_new_partitions(
&self,