refactor: make table scan return physical plan (#326)

* refactor: return PhysicalPlan in Table trait's scan method, to support partitioned execution in Frontend's distribute read

* refactor: pub use necessary DataFusion types

* refactor: replace old "PhysicalPlan" and its adapters

Co-authored-by: luofucong <luofucong@greptime.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
LFC
2022-10-25 11:34:53 +08:00
committed by GitHub
parent 64dac51e83
commit 2ca667cbdf
39 changed files with 920 additions and 600 deletions

View File

@@ -2,7 +2,6 @@
mod catalog_adapter;
mod error;
pub mod plan_adapter;
mod planner;
use std::sync::Arc;
@@ -11,9 +10,14 @@ use catalog::CatalogListRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::udf::create_udf;
use common_function::scalars::FunctionRef;
use common_query::physical_plan::PhysicalPlanAdapter;
use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlan};
use common_query::{prelude::ScalarUdf, Output};
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use common_telemetry::timer;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use snafu::{OptionExt, ResultExt};
use sql::statements::statement::Statement;
use sql::{dialect::GenericDialect, parser::ParserContext};
@@ -22,14 +26,13 @@ pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
use crate::metric;
use crate::query_engine::{QueryContext, QueryEngineState};
use crate::{
datafusion::plan_adapter::PhysicalPlanAdapter,
datafusion::planner::{DfContextProviderAdapter, DfPlanner},
error::Result,
executor::QueryExecutor,
logical_optimizer::LogicalOptimizer,
physical_optimizer::PhysicalOptimizer,
physical_planner::PhysicalPlanner,
plan::{LogicalPlan, PhysicalPlan},
plan::LogicalPlan,
planner::Planner,
QueryEngine,
};
@@ -179,8 +182,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
.as_any()
.downcast_ref::<PhysicalPlanAdapter>()
.context(error::PhysicalPlanDowncastSnafu)?
.df_plan()
.clone();
.df_plan();
for optimizer in optimizers {
new_plan = optimizer
@@ -203,9 +205,24 @@ impl QueryExecutor for DatafusionQueryEngine {
let _timer = timer!(metric::METRIC_EXEC_PLAN_ELAPSED);
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => Ok(plan.execute(&ctx.state().runtime(), 0).await?),
1 => Ok(plan
.execute(0, ctx.state().runtime())
.await
.context(error::ExecutePhysicalPlanSnafu)?),
_ => {
unimplemented!();
// merge into a single partition
let plan =
CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(plan.clone())));
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
let df_stream = plan.execute(0, ctx.state().runtime()).await.context(
error::DatafusionSnafu {
msg: "Failed to execute DataFusion merge exec",
},
)?;
let stream = RecordBatchStreamAdapter::try_new(df_stream)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
Ok(Box::pin(stream))
}
}
}

View File

@@ -12,7 +12,6 @@ use datafusion::catalog::{
};
use datafusion::datasource::TableProvider as DfTableProvider;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::runtime_env::RuntimeEnv;
use snafu::ResultExt;
use table::{
table::adapter::{DfTableProviderAdapter, TableAdapter},
@@ -22,16 +21,12 @@ use table::{
use crate::datafusion::error;
pub struct DfCatalogListAdapter {
runtime: Arc<RuntimeEnv>,
catalog_list: CatalogListRef,
}
impl DfCatalogListAdapter {
pub fn new(runtime: Arc<RuntimeEnv>, catalog_list: CatalogListRef) -> DfCatalogListAdapter {
DfCatalogListAdapter {
runtime,
catalog_list,
}
pub fn new(catalog_list: CatalogListRef) -> DfCatalogListAdapter {
DfCatalogListAdapter { catalog_list }
}
}
@@ -47,16 +42,10 @@ impl DfCatalogList for DfCatalogListAdapter {
) -> Option<Arc<dyn DfCatalogProvider>> {
let catalog_adapter = Arc::new(CatalogProviderAdapter {
df_catalog_provider: catalog,
runtime: self.runtime.clone(),
});
self.catalog_list
.register_catalog(name, catalog_adapter)
.map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
.map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _)
}
fn catalog_names(&self) -> Vec<String> {
@@ -64,19 +53,15 @@ impl DfCatalogList for DfCatalogListAdapter {
}
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
self.catalog_list.catalog(name).map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
self.catalog_list
.catalog(name)
.map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _)
}
}
/// Datafusion's CatalogProvider -> greptime CatalogProvider
struct CatalogProviderAdapter {
df_catalog_provider: Arc<dyn DfCatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl CatalogProvider for CatalogProviderAdapter {
@@ -99,19 +84,13 @@ impl CatalogProvider for CatalogProviderAdapter {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.df_catalog_provider
.schema(name)
.map(|df_schema_provider| {
Arc::new(SchemaProviderAdapter {
df_schema_provider,
runtime: self.runtime.clone(),
}) as _
})
.map(|df_schema_provider| Arc::new(SchemaProviderAdapter { df_schema_provider }) as _)
}
}
///Greptime CatalogProvider -> datafusion's CatalogProvider
struct DfCatalogProviderAdapter {
catalog_provider: CatalogProviderRef,
runtime: Arc<RuntimeEnv>,
}
impl DfCatalogProvider for DfCatalogProviderAdapter {
@@ -124,19 +103,15 @@ impl DfCatalogProvider for DfCatalogProviderAdapter {
}
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
self.catalog_provider.schema(name).map(|schema_provider| {
Arc::new(DfSchemaProviderAdapter {
schema_provider,
runtime: self.runtime.clone(),
}) as _
})
self.catalog_provider
.schema(name)
.map(|schema_provider| Arc::new(DfSchemaProviderAdapter { schema_provider }) as _)
}
}
/// Greptime SchemaProvider -> datafusion SchemaProvider
struct DfSchemaProviderAdapter {
schema_provider: Arc<dyn SchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfSchemaProvider for DfSchemaProviderAdapter {
@@ -159,7 +134,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
name: String,
table: Arc<dyn DfTableProvider>,
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
let table = Arc::new(TableAdapter::new(table, self.runtime.clone())?);
let table = Arc::new(TableAdapter::new(table)?);
match self.schema_provider.register_table(name, table)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
@@ -181,7 +156,6 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
/// Datafusion SchemaProviderAdapter -> greptime SchemaProviderAdapter
struct SchemaProviderAdapter {
df_schema_provider: Arc<dyn DfSchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl SchemaProvider for SchemaProviderAdapter {
@@ -203,8 +177,8 @@ impl SchemaProvider for SchemaProviderAdapter {
Some(adapter) => adapter.table(),
None => {
// TODO(yingwen): Avoid panic here.
let adapter = TableAdapter::new(table_provider, self.runtime.clone())
.expect("convert datafusion table");
let adapter =
TableAdapter::new(table_provider).expect("convert datafusion table");
Arc::new(adapter) as _
}
}
@@ -233,8 +207,7 @@ impl SchemaProvider for SchemaProviderAdapter {
msg: "Fail to deregister table from datafusion",
})?
.map(|table| {
let adapter = TableAdapter::new(table, self.runtime.clone())
.context(error::TableSchemaMismatchSnafu)?;
let adapter = TableAdapter::new(table).context(error::TableSchemaMismatchSnafu)?;
Ok(Arc::new(adapter) as _)
})
.transpose()
@@ -259,7 +232,6 @@ mod tests {
df_catalog_provider: Arc::new(
datafusion::catalog::catalog::MemoryCatalogProvider::new(),
),
runtime: Arc::new(RuntimeEnv::default()),
};
adapter.register_schema(
@@ -271,7 +243,6 @@ mod tests {
#[test]
pub fn test_register_table() {
let adapter = DfSchemaProviderAdapter {
runtime: Arc::new(RuntimeEnv::default()),
schema_provider: Arc::new(MemorySchemaProvider::new()),
};
@@ -288,9 +259,7 @@ mod tests {
#[test]
pub fn test_register_catalog() {
let rt = Arc::new(RuntimeEnv::default());
let catalog_list = DfCatalogListAdapter {
runtime: rt.clone(),
catalog_list: new_memory_catalog_list().unwrap(),
};
assert!(catalog_list
@@ -298,7 +267,6 @@ mod tests {
"test_catalog".to_string(),
Arc::new(DfCatalogProviderAdapter {
catalog_provider: Arc::new(MemoryCatalogProvider::new()),
runtime: rt,
}),
)
.is_none());

View File

@@ -44,6 +44,21 @@ pub enum InnerError {
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display(
"Failed to convert DataFusion's recordbatch stream, source: {}",
source
))]
ConvertDfRecordBatchStream {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to execute physical plan, source: {}", source))]
ExecutePhysicalPlan {
#[snafu(backtrace)]
source: common_query::error::Error,
},
}
impl ErrorExt for InnerError {
@@ -59,6 +74,8 @@ impl ErrorExt for InnerError {
}
ParseSql { source, .. } => source.status_code(),
PlanSql { .. } => StatusCode::PlanQuery,
ConvertDfRecordBatchStream { source } => source.status_code(),
ExecutePhysicalPlan { source } => source.status_code(),
}
}

View File

@@ -1,215 +0,0 @@
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::{
error::Result as DfResult,
physical_plan::{
expressions::PhysicalSortExpr, ExecutionPlan, Partitioning as DfPartitioning,
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
},
};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter};
use crate::datafusion::error;
use crate::error::Result;
use crate::executor::Runtime;
use crate::plan::{Partitioning, PhysicalPlan};
/// Datafusion ExecutionPlan -> greptime PhysicalPlan
#[derive(Debug)]
pub struct PhysicalPlanAdapter {
plan: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
}
impl PhysicalPlanAdapter {
pub fn new(schema: SchemaRef, plan: Arc<dyn ExecutionPlan>) -> Self {
Self { schema, plan }
}
#[inline]
pub fn df_plan(&self) -> &Arc<dyn ExecutionPlan> {
&self.plan
}
}
#[async_trait::async_trait]
impl PhysicalPlan for PhysicalPlanAdapter {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
//FIXME(dennis)
Partitioning::UnknownPartitioning(1)
}
fn children(&self) -> Vec<Arc<dyn PhysicalPlan>> {
let mut plans: Vec<Arc<dyn PhysicalPlan>> = vec![];
for p in self.plan.children() {
let plan = PhysicalPlanAdapter::new(self.schema.clone(), p);
plans.push(Arc::new(plan));
}
plans
}
fn with_new_children(
&self,
children: Vec<Arc<dyn PhysicalPlan>>,
) -> Result<Arc<dyn PhysicalPlan>> {
let mut df_children: Vec<Arc<dyn ExecutionPlan>> = Vec::with_capacity(children.len());
for plan in children {
let p = Arc::new(ExecutionPlanAdapter {
plan,
schema: self.schema.clone(),
});
df_children.push(p);
}
let plan = self
.plan
.with_new_children(df_children)
.context(error::DatafusionSnafu {
msg: "Fail to add children to plan",
})?;
Ok(Arc::new(PhysicalPlanAdapter::new(
self.schema.clone(),
plan,
)))
}
async fn execute(
&self,
runtime: &Runtime,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let df_stream =
self.plan
.execute(partition, runtime.into())
.await
.context(error::DatafusionSnafu {
msg: "Fail to execute physical plan",
})?;
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(df_stream)
.context(error::TableSchemaMismatchSnafu)?,
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
/// Greptime PhysicalPlan -> datafusion ExecutionPlan.
#[derive(Debug)]
struct ExecutionPlanAdapter {
plan: Arc<dyn PhysicalPlan>,
schema: SchemaRef,
}
#[async_trait::async_trait]
impl ExecutionPlan for ExecutionPlanAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> DfSchemaRef {
self.schema.arrow_schema().clone()
}
fn output_partitioning(&self) -> DfPartitioning {
// FIXME(dennis)
DfPartitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
// FIXME(dennis)
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
// TODO(dennis)
vec![]
}
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let mut gt_children: Vec<Arc<dyn PhysicalPlan>> = Vec::with_capacity(children.len());
for plan in children {
let p = Arc::new(PhysicalPlanAdapter::new(self.schema.clone(), plan));
gt_children.push(p);
}
match self.plan.with_new_children(gt_children) {
Ok(plan) => Ok(Arc::new(ExecutionPlanAdapter {
schema: self.schema.clone(),
plan,
})),
Err(e) => Err(e.into()),
}
}
async fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
) -> DfResult<DfSendableRecordBatchStream> {
match self.plan.execute(&runtime.into(), partition).await {
Ok(stream) => Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream))),
Err(e) => Err(e.into()),
}
}
fn statistics(&self) -> Statistics {
//TODO(dennis)
Statistics::default()
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::Field;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_common::field_util::SchemaExt;
use datatypes::schema::Schema;
use super::*;
#[test]
fn test_physical_plan_adapter() {
let arrow_schema = arrow::datatypes::Schema::new(vec![Field::new(
"name",
arrow::datatypes::DataType::Utf8,
true,
)]);
let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap());
let physical_plan = PhysicalPlanAdapter::new(
schema.clone(),
Arc::new(EmptyExec::new(true, Arc::new(arrow_schema))),
);
assert!(physical_plan
.plan
.as_any()
.downcast_ref::<EmptyExec>()
.is_some());
let execution_plan_adapter = ExecutionPlanAdapter {
plan: Arc::new(physical_plan),
schema: schema.clone(),
};
assert_eq!(schema, execution_plan_adapter.schema);
}
}

View File

@@ -1,9 +1,9 @@
use std::sync::Arc;
use common_query::physical_plan::PhysicalPlan;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::execution::runtime_env::RuntimeEnv;
use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext};
use crate::{error::Result, query_engine::QueryContext};
/// Executor to run [ExecutionPlan].
#[async_trait::async_trait]
@@ -14,27 +14,3 @@ pub trait QueryExecutor {
plan: &Arc<dyn PhysicalPlan>,
) -> Result<SendableRecordBatchStream>;
}
/// Execution runtime environment
#[derive(Clone, Default)]
pub struct Runtime {
runtime: Arc<RuntimeEnv>,
}
impl From<Arc<RuntimeEnv>> for Runtime {
fn from(runtime: Arc<RuntimeEnv>) -> Self {
Runtime { runtime }
}
}
impl From<Runtime> for Arc<RuntimeEnv> {
fn from(r: Runtime) -> Arc<RuntimeEnv> {
r.runtime
}
}
impl From<&Runtime> for Arc<RuntimeEnv> {
fn from(r: &Runtime) -> Arc<RuntimeEnv> {
r.runtime.clone()
}
}

View File

@@ -12,5 +12,4 @@ pub mod plan;
pub mod planner;
pub mod query_engine;
pub use crate::datafusion::plan_adapter::PhysicalPlanAdapter;
pub use crate::query_engine::{QueryContext, QueryEngine, QueryEngineFactory, QueryEngineRef};

View File

@@ -1,6 +1,8 @@
use std::sync::Arc;
use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext};
use common_query::physical_plan::PhysicalPlan;
use crate::{error::Result, query_engine::QueryContext};
pub trait PhysicalOptimizer {
fn optimize_physical_plan(

View File

@@ -1,7 +1,9 @@
use std::sync::Arc;
use common_query::physical_plan::PhysicalPlan;
use crate::error::Result;
use crate::plan::{LogicalPlan, PhysicalPlan};
use crate::plan::LogicalPlan;
use crate::query_engine::QueryContext;
/// Physical query planner that converts a `LogicalPlan` to an

View File

@@ -1,13 +1,6 @@
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::logical_plan::LogicalPlan as DfLogicalPlan;
use datatypes::schema::SchemaRef;
use crate::error::Result;
use crate::executor::Runtime;
/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
@@ -21,50 +14,3 @@ use crate::executor::Runtime;
pub enum LogicalPlan {
DfPlan(DfLogicalPlan),
}
/// Partitioning schemes supported by operators.
#[derive(Debug, Clone)]
pub enum Partitioning {
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
}
impl Partitioning {
/// Returns the number of partitions in this partitioning scheme
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
UnknownPartitioning(n) => *n,
}
}
}
#[async_trait::async_trait]
pub trait PhysicalPlan: Send + Sync + Any + Debug {
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;
/// Get a list of child execution plans that provide the input for this plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
/// values for binary nodes (such as joins).
fn children(&self) -> Vec<Arc<dyn PhysicalPlan>>;
/// Returns a new plan where all children were replaced by new plans.
/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
fn with_new_children(
&self,
children: Vec<Arc<dyn PhysicalPlan>>,
) -> Result<Arc<dyn PhysicalPlan>>;
/// creates an iterator
async fn execute(
&self,
_runtime: &Runtime,
partition: usize,
) -> Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -6,13 +6,14 @@ use std::sync::Arc;
use catalog::CatalogList;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY};
use common_query::physical_plan::PhysicalPlan;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use sql::statements::statement::Statement;
use crate::datafusion::DatafusionQueryEngine;
use crate::error::Result;
use crate::plan::{LogicalPlan, PhysicalPlan};
use crate::plan::LogicalPlan;
pub use crate::query_engine::context::QueryContext;
pub use crate::query_engine::state::QueryEngineState;

View File

@@ -4,6 +4,7 @@ use std::sync::{Arc, RwLock};
use catalog::CatalogListRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::physical_plan::RuntimeEnv;
use common_query::prelude::ScalarUdf;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::optimizer::eliminate_limit::EliminateLimit;
@@ -15,7 +16,6 @@ use datafusion::optimizer::to_approx_perc::ToApproxPerc;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use crate::datafusion::DfCatalogListAdapter;
use crate::executor::Runtime;
use crate::optimizer::TypeConversionRule;
/// Query engine global state
@@ -58,10 +58,8 @@ impl QueryEngineState {
let df_context = ExecutionContext::with_config(config);
df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter::new(
df_context.runtime_env(),
catalog_list.clone(),
));
df_context.state.lock().catalog_list =
Arc::new(DfCatalogListAdapter::new(catalog_list.clone()));
Self {
df_context,
@@ -108,7 +106,7 @@ impl QueryEngineState {
}
#[inline]
pub(crate) fn runtime(&self) -> Runtime {
self.df_context.runtime_env().into()
pub(crate) fn runtime(&self) -> Arc<RuntimeEnv> {
self.df_context.runtime_env()
}
}