Merge branch 'main' into transform-count-min-max

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-11-01 17:45:18 +08:00
16 changed files with 472 additions and 227 deletions

View File

@@ -101,6 +101,7 @@ impl WindowedSortPhysicalRule {
} else {
Arc::new(PartSortExec::new(
first_sort_expr.clone(),
sort_exec.fetch(),
scanner_info.partition_ranges.clone(),
sort_exec.input().clone(),
))

View File

@@ -47,6 +47,7 @@ use crate::downcast_ts_array;
pub struct PartSortExec {
/// Physical sort expressions(that is, sort by timestamp)
expression: PhysicalSortExpr,
limit: Option<usize>,
input: Arc<dyn ExecutionPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
@@ -57,6 +58,7 @@ pub struct PartSortExec {
impl PartSortExec {
pub fn new(
expression: PhysicalSortExpr,
limit: Option<usize>,
partition_ranges: Vec<Vec<PartitionRange>>,
input: Arc<dyn ExecutionPlan>,
) -> Self {
@@ -69,6 +71,7 @@ impl PartSortExec {
Self {
expression,
limit,
input,
metrics,
partition_ranges,
@@ -95,6 +98,7 @@ impl PartSortExec {
let df_stream = Box::pin(PartSortStream::new(
context,
self,
self.limit,
input_stream,
self.partition_ranges[partition].clone(),
partition,
@@ -106,7 +110,16 @@ impl PartSortExec {
impl DisplayAs for PartSortExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PartSortExec {}", self.expression)
write!(
f,
"PartSortExec: expr={} num_ranges={}",
self.expression,
self.partition_ranges.len(),
)?;
if let Some(limit) = self.limit {
write!(f, " limit={}", limit)?;
}
Ok(())
}
}
@@ -138,6 +151,7 @@ impl ExecutionPlan for PartSortExec {
};
Ok(Arc::new(Self::new(
self.expression.clone(),
self.limit,
self.partition_ranges.clone(),
new_input.clone(),
)))
@@ -170,6 +184,7 @@ struct PartSortStream {
reservation: MemoryReservation,
buffer: Vec<DfRecordBatch>,
expression: PhysicalSortExpr,
limit: Option<usize>,
produced: usize,
input: DfSendableRecordBatchStream,
input_complete: bool,
@@ -185,6 +200,7 @@ impl PartSortStream {
fn new(
context: Arc<TaskContext>,
sort: &PartSortExec,
limit: Option<usize>,
input: DfSendableRecordBatchStream,
partition_ranges: Vec<PartitionRange>,
partition: usize,
@@ -194,6 +210,7 @@ impl PartSortStream {
.register(&context.runtime_env().memory_pool),
buffer: Vec::new(),
expression: sort.expression.clone(),
limit,
produced: 0,
input,
input_complete: false,
@@ -294,7 +311,7 @@ impl PartSortStream {
)
})?;
let indices = sort_to_indices(&sort_column, opt, None).map_err(|e| {
let indices = sort_to_indices(&sort_column, opt, self.limit).map_err(|e| {
DataFusionError::ArrowError(
e,
Some(format!("Fail to sort to indices at {}", location!())),
@@ -674,6 +691,7 @@ mod test {
expr: Arc::new(Column::new("ts", 0)),
options: opt,
},
None,
vec![ranges],
Arc::new(mock_input),
);

View File

@@ -169,7 +169,16 @@ impl WindowedSortExec {
impl DisplayAs for WindowedSortExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WindowedSortExec")
write!(
f,
"WindowedSortExec: expr={} num_ranges={}",
self.expression,
self.ranges.len()
)?;
if let Some(fetch) = self.fetch {
write!(f, " fetch={}", fetch)?;
}
Ok(())
}
}