From 7152a1b79e8b038324a1979babcedb397cc1190a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 20 Apr 2023 17:58:48 +0800 Subject: [PATCH] feat: expose output_ordering on scan plan (#1425) Signed-off-by: Ruihang Xia --- src/common/query/src/physical_plan.rs | 9 ++- src/table/src/table/numbers.rs | 19 +++++- src/table/src/table/scan.rs | 13 ++++ .../common/optimizer/order_by.result | 60 +++++++++++++++++++ .../standalone/common/optimizer/order_by.sql | 9 +++ 5 files changed, 106 insertions(+), 4 deletions(-) create mode 100644 tests/cases/standalone/common/optimizer/order_by.result create mode 100644 tests/cases/standalone/common/optimizer/order_by.sql diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index 51d902efb6..c949cdf414 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -49,13 +49,18 @@ pub trait PhysicalPlan: Debug + Send + Sync { /// Specifies the output partitioning scheme of this plan fn output_partitioning(&self) -> Partitioning; + /// returns `Some(keys)` that describes how the output was sorted. + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + /// Get a list of child physical plans that provide the input for this plan. The returned list /// will be empty for leaf nodes, will contain a single value for unary nodes, or two /// values for binary nodes (such as joins). fn children(&self) -> Vec; /// Returns a new plan where all children were replaced by new plans. - /// The size of `children` must be equal to the size of `PhysicalPlan::children()`. + /// The size of `children` must be equal to the size of [`PhysicalPlan::children()`]. fn with_new_children(&self, children: Vec) -> Result; /// Creates an RecordBatch stream. @@ -149,7 +154,7 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None + self.0.output_ordering() } fn children(&self) -> Vec> { diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index b814601078..3047346409 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -19,8 +19,11 @@ use std::sync::Arc; use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream}; +use datafusion::arrow::compute::SortOptions; use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch; use datafusion_common::from_slice::FromSlice; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::PhysicalSortRequirement; use datatypes::arrow::array::UInt32Array; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; @@ -33,6 +36,8 @@ use crate::metadata::{TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, use crate::table::scan::SimpleTableScan; use crate::table::{Expr, Table}; +const NUMBER_COLUMN: &str = "number"; + /// numbers table for test #[derive(Debug, Clone)] pub struct NumbersTable { @@ -49,7 +54,7 @@ impl NumbersTable { pub fn with_name(table_id: TableId, name: String) -> Self { let column_schemas = vec![ColumnSchema::new( - "number", + NUMBER_COLUMN, ConcreteDataType::uint32_datatype(), false, )]; @@ -118,7 +123,17 @@ impl Table for NumbersTable { schema: self.schema.clone(), already_run: false, }); - Ok(Arc::new(SimpleTableScan::new(stream))) + let output_ordering = vec![PhysicalSortRequirement::new( + Arc::new(Column::new(NUMBER_COLUMN, 0)), + Some(SortOptions { + descending: false, + nulls_first: false, + }), + ) + .into()]; + Ok(Arc::new( + SimpleTableScan::new(stream).with_output_ordering(output_ordering), + )) } async fn flush(&self, _region_number: Option, _wait: Option) -> Result<()> { diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 61c1fd721a..01acd5854a 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -21,12 +21,14 @@ use common_query::error::Result as QueryResult; use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; use common_recordbatch::SendableRecordBatchStream; use datafusion::execution::context::TaskContext; +use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::SchemaRef; use snafu::OptionExt; pub struct SimpleTableScan { stream: Mutex>, schema: SchemaRef, + output_ordering: Option>, } impl Debug for SimpleTableScan { @@ -41,11 +43,18 @@ impl Debug for SimpleTableScan { impl SimpleTableScan { pub fn new(stream: SendableRecordBatchStream) -> Self { let schema = stream.schema(); + Self { stream: Mutex::new(Some(stream)), schema, + output_ordering: None, } } + + pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { + self.output_ordering = Some(output_ordering); + self + } } impl PhysicalPlan for SimpleTableScan { @@ -61,6 +70,10 @@ impl PhysicalPlan for SimpleTableScan { Partitioning::UnknownPartitioning(1) } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.output_ordering.as_deref() + } + fn children(&self) -> Vec { vec![] } diff --git a/tests/cases/standalone/common/optimizer/order_by.result b/tests/cases/standalone/common/optimizer/order_by.result new file mode 100644 index 0000000000..d661c83068 --- /dev/null +++ b/tests/cases/standalone/common/optimizer/order_by.result @@ -0,0 +1,60 @@ +explain select * from numbers; + ++---------------+----------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------+ +| logical_plan | TableScan: numbers projection=[number] | +| physical_plan | ExecutionPlan(PlaceHolder) | +| | | ++---------------+----------------------------------------+ + +explain select * from numbers order by number desc; + ++---------------+------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------+ +| logical_plan | Sort: numbers.number DESC NULLS FIRST | +| | TableScan: numbers projection=[number] | +| physical_plan | SortExec: expr=[number@0 DESC] | +| | ExecutionPlan(PlaceHolder) | +| | | ++---------------+------------------------------------------+ + +explain select * from numbers order by number asc; + ++---------------+------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------+ +| logical_plan | Sort: numbers.number ASC NULLS LAST | +| | TableScan: numbers projection=[number] | +| physical_plan | ExecutionPlan(PlaceHolder) | +| | | ++---------------+------------------------------------------+ + +explain select * from numbers order by number desc limit 10; + ++---------------+---------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------+ +| logical_plan | Limit: skip=0, fetch=10 | +| | Sort: numbers.number DESC NULLS FIRST, fetch=10 | +| | TableScan: numbers projection=[number] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: fetch=10, expr=[number@0 DESC] | +| | ExecutionPlan(PlaceHolder) | +| | | ++---------------+---------------------------------------------------+ + +explain select * from numbers order by number asc limit 10; + ++---------------+-------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------+ +| logical_plan | Limit: skip=0, fetch=10 | +| | Sort: numbers.number ASC NULLS LAST, fetch=10 | +| | TableScan: numbers projection=[number] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | ExecutionPlan(PlaceHolder) | +| | | ++---------------+-------------------------------------------------+ + diff --git a/tests/cases/standalone/common/optimizer/order_by.sql b/tests/cases/standalone/common/optimizer/order_by.sql new file mode 100644 index 0000000000..eb8bf6e5f0 --- /dev/null +++ b/tests/cases/standalone/common/optimizer/order_by.sql @@ -0,0 +1,9 @@ +explain select * from numbers; + +explain select * from numbers order by number desc; + +explain select * from numbers order by number asc; + +explain select * from numbers order by number desc limit 10; + +explain select * from numbers order by number asc limit 10;