refactor: Move planner and adapters to datafusion mod

This commit is contained in:
evenyag
2022-05-07 15:27:00 +08:00
parent 6a20657591
commit ec63353364
7 changed files with 355 additions and 324 deletions

View File

@@ -1,5 +1,9 @@
mod adapter;
pub mod error;
//! Planner, QueryEngine implementations based on DataFusion.
mod catalog_adapter;
mod error;
mod plan_adapter;
mod planner;
use std::sync::Arc;
@@ -7,17 +11,19 @@ use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use snafu::{OptionExt, ResultExt};
use sql::{dialect::GenericDialect, parser::ParserContext};
pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
use crate::query_engine::{QueryContext, QueryEngineState};
use crate::{
catalog::CatalogListRef,
datafusion::adapter::PhysicalPlanAdapter,
datafusion::plan_adapter::PhysicalPlanAdapter,
datafusion::planner::{DfContextProviderAdapter, DfPlanner},
error::Result,
executor::QueryExecutor,
logical_optimizer::LogicalOptimizer,
physical_optimizer::PhysicalOptimizer,
physical_planner::PhysicalPlanner,
plan::{LogicalPlan, PhysicalPlan},
planner::{DfContextProviderAdapter, DfPlanner, Planner},
planner::Planner,
Output, QueryEngine,
};

View File

@@ -0,0 +1,222 @@
//! Catalog adapter between datafusion and greptime query engine.
use std::any::Any;
use std::sync::Arc;
use datafusion::catalog::{
catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider},
schema::SchemaProvider as DfSchemaProvider,
};
use datafusion::datasource::TableProvider as DfTableProvider;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::runtime_env::RuntimeEnv;
use snafu::ResultExt;
use table::{
table::adapter::{DfTableProviderAdapter, TableAdapter},
Table,
};
use crate::catalog::{schema::SchemaProvider, CatalogListRef, CatalogProvider};
use crate::datafusion::error;
use crate::error::Result;
pub struct DfCatalogListAdapter {
runtime: Arc<RuntimeEnv>,
catalog_list: CatalogListRef,
}
impl DfCatalogListAdapter {
pub fn new(runtime: Arc<RuntimeEnv>, catalog_list: CatalogListRef) -> DfCatalogListAdapter {
DfCatalogListAdapter {
runtime,
catalog_list,
}
}
}
impl DfCatalogList for DfCatalogListAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn DfCatalogProvider>,
) -> Option<Arc<dyn DfCatalogProvider>> {
let catalog_adapter = Arc::new(CatalogProviderAdapter {
df_cataglog_provider: catalog,
runtime: self.runtime.clone(),
});
self.catalog_list
.register_catalog(name, catalog_adapter)
.map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
}
fn catalog_names(&self) -> Vec<String> {
self.catalog_list.catalog_names()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
self.catalog_list.catalog(name).map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
/// Datafusion's CatalogProvider -> greptime CatalogProvider
struct CatalogProviderAdapter {
df_cataglog_provider: Arc<dyn DfCatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl CatalogProvider for CatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.df_cataglog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.df_cataglog_provider
.schema(name)
.map(|df_schema_provider| {
Arc::new(SchemaProviderAdapter {
df_schema_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
///Greptime CatalogProvider -> datafusion's CatalogProvider
struct DfCatalogProviderAdapter {
catalog_provider: Arc<dyn CatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfCatalogProvider for DfCatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.catalog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
self.catalog_provider.schema(name).map(|schema_provider| {
Arc::new(DfSchemaProviderAdapter {
schema_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
/// Greptime SchemaProvider -> datafusion SchemaProvider
struct DfSchemaProviderAdapter {
schema_provider: Arc<dyn SchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfSchemaProvider for DfSchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn DfTableProvider>> {
self.schema_provider
.table(name)
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
fn register_table(
&self,
name: String,
table: Arc<dyn DfTableProvider>,
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
let table = Arc::new(TableAdapter::new(table, self.runtime.clone()));
match self.schema_provider.register_table(name, table)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
}
}
fn deregister_table(&self, name: &str) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
match self.schema_provider.deregister_table(name)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
}
}
fn table_exist(&self, name: &str) -> bool {
self.schema_provider.table_exist(name)
}
}
/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter
struct SchemaProviderAdapter {
df_schema_provider: Arc<dyn DfSchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl SchemaProvider for SchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String> {
self.df_schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn Table>> {
self.df_schema_provider.table(name).map(|table_provider| {
Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _
})
}
fn register_table(
&self,
name: String,
table: Arc<dyn Table>,
) -> Result<Option<Arc<dyn Table>>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
Ok(self
.df_schema_provider
.register_table(name, table_provider)
.context(error::DatafusionSnafu {
msg: "Fail to register table to datafusion",
})?
.map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)))
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn Table>>> {
Ok(self
.df_schema_provider
.deregister_table(name)
.context(error::DatafusionSnafu {
msg: "Fail to deregister table from datafusion",
})?
.map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))
}
fn table_exist(&self, name: &str) -> bool {
self.df_schema_provider.table_exist(name)
}
}

View File

@@ -22,7 +22,7 @@ pub enum InnerError {
ParseSql { source: sql::errors::ParserError },
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
Planner {
PlanSql {
sql: String,
source: DataFusionError,
backtrace: Backtrace,
@@ -35,7 +35,7 @@ impl ErrorExt for InnerError {
match self {
ParseSql { source, .. } => source.status_code(),
Datafusion { .. } | PhysicalPlanDowncast { .. } | Planner { .. } => {
Datafusion { .. } | PhysicalPlanDowncast { .. } | PlanSql { .. } => {
StatusCode::Internal
}
}

View File

@@ -0,0 +1,113 @@
use std::sync::Arc;
use arrow::datatypes::DataType;
use datafusion::catalog::TableReference;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use snafu::ResultExt;
use sql::statements::query::Query;
use sql::statements::statement::Statement;
use table::table::adapter::DfTableProviderAdapter;
use crate::{
catalog::{self, CatalogListRef},
datafusion::error,
error::Result,
plan::LogicalPlan,
planner::Planner,
};
pub struct DfPlanner<'a, S: ContextProvider> {
sql_to_rel: SqlToRel<'a, S>,
}
impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> {
/// Creates a DataFusion planner instance
pub fn new(schema_provider: &'a S) -> Self {
let rel = SqlToRel::new(schema_provider);
Self { sql_to_rel: rel }
}
/// Converts QUERY statement to logical plan.
pub fn query_to_plan(&self, query: Box<Query>) -> Result<LogicalPlan> {
// todo(hl): original SQL should be provided as an argument
let sql = query.inner.to_string();
let result = self
.sql_to_rel
.query_to_plan(query.inner)
.context(error::PlanSqlSnafu { sql })?;
Ok(LogicalPlan::DfPlan(result))
}
}
impl<'a, S> Planner for DfPlanner<'a, S>
where
S: ContextProvider + Send + Sync,
{
/// Converts statement to logical plan using datafusion planner
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
match statement {
Statement::ShowDatabases(_) => {
todo!("Currently not supported")
}
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Insert(_) => {
todo!()
}
}
}
}
pub(crate) struct DfContextProviderAdapter<'a> {
catalog_list: &'a CatalogListRef,
}
impl<'a> DfContextProviderAdapter<'a> {
pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self {
Self { catalog_list }
}
}
impl<'a> ContextProvider for DfContextProviderAdapter<'a> {
fn get_table_provider(&self, name: TableReference) -> Option<Arc<dyn TableProvider>> {
let (catalog, schema, table) = match name {
TableReference::Bare { table } => (
catalog::DEFAULT_CATALOG_NAME,
catalog::DEFAULT_SCHEMA_NAME,
table,
),
TableReference::Partial { schema, table } => {
(catalog::DEFAULT_CATALOG_NAME, schema, table)
}
TableReference::Full {
catalog,
schema,
table,
} => (catalog, schema, table),
};
self.catalog_list
.catalog(catalog)
.and_then(|catalog_provider| catalog_provider.schema(schema))
.and_then(|schema_provider| schema_provider.table(table))
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
// TODO(dennis)
None
}
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
// TODO(dennis)
None
}
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
// TODO(dennis)
None
}
}

View File

@@ -1,111 +1,8 @@
use std::sync::Arc;
use arrow::datatypes::DataType;
use datafusion::catalog::TableReference;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use snafu::ResultExt;
use sql::statements::query::Query;
use sql::statements::statement::Statement;
use table::table::adapter::DfTableProviderAdapter;
use crate::{
catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME},
datafusion::error,
error::Result,
plan::LogicalPlan,
};
use crate::{error::Result, plan::LogicalPlan};
/// SQL logical planner.
pub trait Planner: Send + Sync {
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan>;
}
pub struct DfPlanner<'a, S: ContextProvider> {
sql_to_rel: SqlToRel<'a, S>,
}
impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> {
/// Creates a DataFusion planner instance
pub fn new(schema_provider: &'a S) -> Self {
let rel = SqlToRel::new(schema_provider);
Self { sql_to_rel: rel }
}
/// Converts QUERY statement to logical plan.
pub fn query_to_plan(&self, query: Box<Query>) -> Result<LogicalPlan> {
// todo(hl): original SQL should be provided as an argument
let sql = query.inner.to_string();
let result = self
.sql_to_rel
.query_to_plan(query.inner)
// FIXME(yingwen): Move DfPlanner to datafusion mod.
.context(error::PlannerSnafu { sql })?;
Ok(LogicalPlan::DfPlan(result))
}
}
impl<'a, S> Planner for DfPlanner<'a, S>
where
S: ContextProvider + Send + Sync,
{
/// Converts statement to logical plan using datafusion planner
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
match statement {
Statement::ShowDatabases(_) => {
todo!("Currently not supported")
}
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Insert(_) => {
todo!()
}
}
}
}
pub(crate) struct DfContextProviderAdapter<'a> {
catalog_list: &'a CatalogListRef,
}
impl<'a> DfContextProviderAdapter<'a> {
pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self {
Self { catalog_list }
}
}
impl<'a> ContextProvider for DfContextProviderAdapter<'a> {
fn get_table_provider(&self, name: TableReference) -> Option<Arc<dyn TableProvider>> {
let (catalog, schema, table) = match name {
TableReference::Bare { table } => (DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table),
TableReference::Partial { schema, table } => (DEFAULT_CATALOG_NAME, schema, table),
TableReference::Full {
catalog,
schema,
table,
} => (catalog, schema, table),
};
self.catalog_list
.catalog(catalog)
.and_then(|catalog_provider| catalog_provider.schema(schema))
.and_then(|schema_provider| schema_provider.table(table))
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
// TODO(dennis)
None
}
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
// TODO(dennis)
None
}
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
// TODO(dennis)
None
}
}

View File

@@ -1,24 +1,10 @@
use std::any::Any;
use std::fmt;
use std::sync::Arc;
use datafusion::catalog::{
catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider},
schema::SchemaProvider as DfSchemaProvider,
};
use datafusion::datasource::TableProvider as DfTableProvider;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use snafu::ResultExt;
use table::{
table::adapter::{DfTableProviderAdapter, TableAdapter},
Table,
};
use crate::catalog::{self, schema::SchemaProvider, CatalogListRef, CatalogProvider};
use crate::datafusion::error;
use crate::error::Result;
use crate::catalog::{self, CatalogListRef};
use crate::datafusion::DfCatalogListAdapter;
use crate::executor::Runtime;
/// Query engine global state
@@ -46,10 +32,10 @@ impl QueryEngineState {
);
let df_context = ExecutionContext::with_config(config);
df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter {
catalog_list: catalog_list.clone(),
runtime: df_context.runtime_env(),
});
df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter::new(
df_context.runtime_env(),
catalog_list.clone(),
));
Self {
df_context,
@@ -72,196 +58,3 @@ impl QueryEngineState {
self.df_context.runtime_env().into()
}
}
/// Adapters between datafusion and greptime query engine.
struct DfCatalogListAdapter {
runtime: Arc<RuntimeEnv>,
catalog_list: CatalogListRef,
}
impl DfCatalogList for DfCatalogListAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn DfCatalogProvider>,
) -> Option<Arc<dyn DfCatalogProvider>> {
let catalog_adapter = Arc::new(CatalogProviderAdapter {
df_cataglog_provider: catalog,
runtime: self.runtime.clone(),
});
self.catalog_list
.register_catalog(name, catalog_adapter)
.map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
}
fn catalog_names(&self) -> Vec<String> {
self.catalog_list.catalog_names()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
self.catalog_list.catalog(name).map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
/// Datafusion's CatalogProvider -> greptime CatalogProvider
struct CatalogProviderAdapter {
df_cataglog_provider: Arc<dyn DfCatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl CatalogProvider for CatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.df_cataglog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.df_cataglog_provider
.schema(name)
.map(|df_schema_provider| {
Arc::new(SchemaProviderAdapter {
df_schema_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
///Greptime CatalogProvider -> datafusion's CatalogProvider
struct DfCatalogProviderAdapter {
catalog_provider: Arc<dyn CatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfCatalogProvider for DfCatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.catalog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
self.catalog_provider.schema(name).map(|schema_provider| {
Arc::new(DfSchemaProviderAdapter {
schema_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
/// Greptime SchemaProvider -> datafusion SchemaProvider
struct DfSchemaProviderAdapter {
schema_provider: Arc<dyn SchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfSchemaProvider for DfSchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn DfTableProvider>> {
self.schema_provider
.table(name)
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
fn register_table(
&self,
name: String,
table: Arc<dyn DfTableProvider>,
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
let table = Arc::new(TableAdapter::new(table, self.runtime.clone()));
match self.schema_provider.register_table(name, table)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
}
}
fn deregister_table(&self, name: &str) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
match self.schema_provider.deregister_table(name)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
}
}
fn table_exist(&self, name: &str) -> bool {
self.schema_provider.table_exist(name)
}
}
/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter
struct SchemaProviderAdapter {
df_schema_provider: Arc<dyn DfSchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl SchemaProvider for SchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String> {
self.df_schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn Table>> {
self.df_schema_provider.table(name).map(|table_provider| {
Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _
})
}
fn register_table(
&self,
name: String,
table: Arc<dyn Table>,
) -> Result<Option<Arc<dyn Table>>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
Ok(self
.df_schema_provider
.register_table(name, table_provider)
.context(error::DatafusionSnafu {
msg: "Fail to register table to datafusion",
})?
.map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)))
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn Table>>> {
Ok(self
.df_schema_provider
.deregister_table(name)
.context(error::DatafusionSnafu {
msg: "Fail to deregister table from datafusion",
})?
.map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))
}
fn table_exist(&self, name: &str) -> bool {
self.df_schema_provider.table_exist(name)
}
}