feat: expose output_ordering on scan plan (#1425)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-04-20 17:58:48 +08:00
committed by GitHub
parent f2cfd8e608
commit 7152a1b79e
5 changed files with 106 additions and 4 deletions

View File

@@ -49,13 +49,18 @@ pub trait PhysicalPlan: Debug + Send + Sync {
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;
/// returns `Some(keys)` that describes how the output was sorted.
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
/// Get a list of child physical plans that provide the input for this plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
/// values for binary nodes (such as joins).
fn children(&self) -> Vec<PhysicalPlanRef>;
/// Returns a new plan where all children were replaced by new plans.
/// The size of `children` must be equal to the size of `PhysicalPlan::children()`.
/// The size of `children` must be equal to the size of [`PhysicalPlan::children()`].
fn with_new_children(&self, children: Vec<PhysicalPlanRef>) -> Result<PhysicalPlanRef>;
/// Creates an RecordBatch stream.
@@ -149,7 +154,7 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
self.0.output_ordering()
}
fn children(&self) -> Vec<Arc<dyn DfPhysicalPlan>> {

View File

@@ -19,8 +19,11 @@ use std::sync::Arc;
use common_query::physical_plan::PhysicalPlanRef;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream};
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
use datafusion_common::from_slice::FromSlice;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortRequirement;
use datatypes::arrow::array::UInt32Array;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
@@ -33,6 +36,8 @@ use crate::metadata::{TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder,
use crate::table::scan::SimpleTableScan;
use crate::table::{Expr, Table};
const NUMBER_COLUMN: &str = "number";
/// numbers table for test
#[derive(Debug, Clone)]
pub struct NumbersTable {
@@ -49,7 +54,7 @@ impl NumbersTable {
pub fn with_name(table_id: TableId, name: String) -> Self {
let column_schemas = vec![ColumnSchema::new(
"number",
NUMBER_COLUMN,
ConcreteDataType::uint32_datatype(),
false,
)];
@@ -118,7 +123,17 @@ impl Table for NumbersTable {
schema: self.schema.clone(),
already_run: false,
});
Ok(Arc::new(SimpleTableScan::new(stream)))
let output_ordering = vec![PhysicalSortRequirement::new(
Arc::new(Column::new(NUMBER_COLUMN, 0)),
Some(SortOptions {
descending: false,
nulls_first: false,
}),
)
.into()];
Ok(Arc::new(
SimpleTableScan::new(stream).with_output_ordering(output_ordering),
))
}
async fn flush(&self, _region_number: Option<RegionNumber>, _wait: Option<bool>) -> Result<()> {

View File

@@ -21,12 +21,14 @@ use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::SendableRecordBatchStream;
use datafusion::execution::context::TaskContext;
use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::SchemaRef;
use snafu::OptionExt;
pub struct SimpleTableScan {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
output_ordering: Option<Vec<PhysicalSortExpr>>,
}
impl Debug for SimpleTableScan {
@@ -41,11 +43,18 @@ impl Debug for SimpleTableScan {
impl SimpleTableScan {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema();
Self {
stream: Mutex::new(Some(stream)),
schema,
output_ordering: None,
}
}
pub fn with_output_ordering(mut self, output_ordering: Vec<PhysicalSortExpr>) -> Self {
self.output_ordering = Some(output_ordering);
self
}
}
impl PhysicalPlan for SimpleTableScan {
@@ -61,6 +70,10 @@ impl PhysicalPlan for SimpleTableScan {
Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.output_ordering.as_deref()
}
fn children(&self) -> Vec<PhysicalPlanRef> {
vec![]
}

View File

@@ -0,0 +1,60 @@
explain select * from numbers;
+---------------+----------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------+
| logical_plan | TableScan: numbers projection=[number] |
| physical_plan | ExecutionPlan(PlaceHolder) |
| | |
+---------------+----------------------------------------+
explain select * from numbers order by number desc;
+---------------+------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+------------------------------------------+
explain select * from numbers order by number asc;
+---------------+------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | TableScan: numbers projection=[number] |
| physical_plan | ExecutionPlan(PlaceHolder) |
| | |
+---------------+------------------------------------------+
explain select * from numbers order by number desc limit 10;
+---------------+---------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+---------------------------------------------------+
explain select * from numbers order by number asc limit 10;
+---------------+-------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+-------------------------------------------------+

View File

@@ -0,0 +1,9 @@
explain select * from numbers;
explain select * from numbers order by number desc;
explain select * from numbers order by number asc;
explain select * from numbers order by number desc limit 10;
explain select * from numbers order by number asc limit 10;