From ec6335336428abec331bd624f329024f25adfbfe Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 15:27:00 +0800 Subject: [PATCH] refactor: Move planner and adapters to datafusion mod --- src/query/src/datafusion.rs | 14 +- src/query/src/datafusion/catalog_adapter.rs | 222 ++++++++++++++++++ src/query/src/datafusion/error.rs | 4 +- .../{adapter.rs => plan_adapter.rs} | 0 src/query/src/datafusion/planner.rs | 113 +++++++++ src/query/src/planner.rs | 107 +-------- src/query/src/query_engine/state.rs | 219 +---------------- 7 files changed, 355 insertions(+), 324 deletions(-) create mode 100644 src/query/src/datafusion/catalog_adapter.rs rename src/query/src/datafusion/{adapter.rs => plan_adapter.rs} (100%) create mode 100644 src/query/src/datafusion/planner.rs diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 29dd339b97..700a33534e 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -1,5 +1,9 @@ -mod adapter; -pub mod error; +//! Planner, QueryEngine implementations based on DataFusion. + +mod catalog_adapter; +mod error; +mod plan_adapter; +mod planner; use std::sync::Arc; @@ -7,17 +11,19 @@ use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use snafu::{OptionExt, ResultExt}; use sql::{dialect::GenericDialect, parser::ParserContext}; +pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; use crate::query_engine::{QueryContext, QueryEngineState}; use crate::{ catalog::CatalogListRef, - datafusion::adapter::PhysicalPlanAdapter, + datafusion::plan_adapter::PhysicalPlanAdapter, + datafusion::planner::{DfContextProviderAdapter, DfPlanner}, error::Result, executor::QueryExecutor, logical_optimizer::LogicalOptimizer, physical_optimizer::PhysicalOptimizer, physical_planner::PhysicalPlanner, plan::{LogicalPlan, PhysicalPlan}, - planner::{DfContextProviderAdapter, DfPlanner, Planner}, + planner::Planner, Output, QueryEngine, }; diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs new file mode 100644 index 0000000000..e357b28edb --- /dev/null +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -0,0 +1,222 @@ +//! Catalog adapter between datafusion and greptime query engine. + +use std::any::Any; +use std::sync::Arc; + +use datafusion::catalog::{ + catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider}, + schema::SchemaProvider as DfSchemaProvider, +}; +use datafusion::datasource::TableProvider as DfTableProvider; +use datafusion::error::Result as DataFusionResult; +use datafusion::execution::runtime_env::RuntimeEnv; +use snafu::ResultExt; +use table::{ + table::adapter::{DfTableProviderAdapter, TableAdapter}, + Table, +}; + +use crate::catalog::{schema::SchemaProvider, CatalogListRef, CatalogProvider}; +use crate::datafusion::error; +use crate::error::Result; + +pub struct DfCatalogListAdapter { + runtime: Arc, + catalog_list: CatalogListRef, +} + +impl DfCatalogListAdapter { + pub fn new(runtime: Arc, catalog_list: CatalogListRef) -> DfCatalogListAdapter { + DfCatalogListAdapter { + runtime, + catalog_list, + } + } +} + +impl DfCatalogList for DfCatalogListAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + let catalog_adapter = Arc::new(CatalogProviderAdapter { + df_cataglog_provider: catalog, + runtime: self.runtime.clone(), + }); + self.catalog_list + .register_catalog(name, catalog_adapter) + .map(|catalog_provider| { + Arc::new(DfCatalogProviderAdapter { + catalog_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } + + fn catalog_names(&self) -> Vec { + self.catalog_list.catalog_names() + } + + fn catalog(&self, name: &str) -> Option> { + self.catalog_list.catalog(name).map(|catalog_provider| { + Arc::new(DfCatalogProviderAdapter { + catalog_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } +} + +/// Datafusion's CatalogProvider -> greptime CatalogProvider +struct CatalogProviderAdapter { + df_cataglog_provider: Arc, + runtime: Arc, +} + +impl CatalogProvider for CatalogProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.df_cataglog_provider.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + self.df_cataglog_provider + .schema(name) + .map(|df_schema_provider| { + Arc::new(SchemaProviderAdapter { + df_schema_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } +} + +///Greptime CatalogProvider -> datafusion's CatalogProvider +struct DfCatalogProviderAdapter { + catalog_provider: Arc, + runtime: Arc, +} + +impl DfCatalogProvider for DfCatalogProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.catalog_provider.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + self.catalog_provider.schema(name).map(|schema_provider| { + Arc::new(DfSchemaProviderAdapter { + schema_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } +} + +/// Greptime SchemaProvider -> datafusion SchemaProvider +struct DfSchemaProviderAdapter { + schema_provider: Arc, + runtime: Arc, +} + +impl DfSchemaProvider for DfSchemaProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.schema_provider.table_names() + } + + fn table(&self, name: &str) -> Option> { + self.schema_provider + .table(name) + .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> DataFusionResult>> { + let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); + match self.schema_provider.register_table(name, table)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), + } + } + + fn deregister_table(&self, name: &str) -> DataFusionResult>> { + match self.schema_provider.deregister_table(name)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), + } + } + + fn table_exist(&self, name: &str) -> bool { + self.schema_provider.table_exist(name) + } +} + +/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter +struct SchemaProviderAdapter { + df_schema_provider: Arc, + runtime: Arc, +} + +impl SchemaProvider for SchemaProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + /// Retrieves the list of available table names in this schema. + fn table_names(&self) -> Vec { + self.df_schema_provider.table_names() + } + + fn table(&self, name: &str) -> Option> { + self.df_schema_provider.table(name).map(|table_provider| { + Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _ + }) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>> { + let table_provider = Arc::new(DfTableProviderAdapter::new(table)); + Ok(self + .df_schema_provider + .register_table(name, table_provider) + .context(error::DatafusionSnafu { + msg: "Fail to register table to datafusion", + })? + .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) + } + + fn deregister_table(&self, name: &str) -> Result>> { + Ok(self + .df_schema_provider + .deregister_table(name) + .context(error::DatafusionSnafu { + msg: "Fail to deregister table from datafusion", + })? + .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) + } + + fn table_exist(&self, name: &str) -> bool { + self.df_schema_provider.table_exist(name) + } +} diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 1d89deacd2..ba804e5081 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -22,7 +22,7 @@ pub enum InnerError { ParseSql { source: sql::errors::ParserError }, #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] - Planner { + PlanSql { sql: String, source: DataFusionError, backtrace: Backtrace, @@ -35,7 +35,7 @@ impl ErrorExt for InnerError { match self { ParseSql { source, .. } => source.status_code(), - Datafusion { .. } | PhysicalPlanDowncast { .. } | Planner { .. } => { + Datafusion { .. } | PhysicalPlanDowncast { .. } | PlanSql { .. } => { StatusCode::Internal } } diff --git a/src/query/src/datafusion/adapter.rs b/src/query/src/datafusion/plan_adapter.rs similarity index 100% rename from src/query/src/datafusion/adapter.rs rename to src/query/src/datafusion/plan_adapter.rs diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs new file mode 100644 index 0000000000..179b1099da --- /dev/null +++ b/src/query/src/datafusion/planner.rs @@ -0,0 +1,113 @@ +use std::sync::Arc; + +use arrow::datatypes::DataType; +use datafusion::catalog::TableReference; +use datafusion::datasource::TableProvider; +use datafusion::physical_plan::udaf::AggregateUDF; +use datafusion::physical_plan::udf::ScalarUDF; +use datafusion::sql::planner::{ContextProvider, SqlToRel}; +use snafu::ResultExt; +use sql::statements::query::Query; +use sql::statements::statement::Statement; +use table::table::adapter::DfTableProviderAdapter; + +use crate::{ + catalog::{self, CatalogListRef}, + datafusion::error, + error::Result, + plan::LogicalPlan, + planner::Planner, +}; + +pub struct DfPlanner<'a, S: ContextProvider> { + sql_to_rel: SqlToRel<'a, S>, +} + +impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { + /// Creates a DataFusion planner instance + pub fn new(schema_provider: &'a S) -> Self { + let rel = SqlToRel::new(schema_provider); + Self { sql_to_rel: rel } + } + + /// Converts QUERY statement to logical plan. + pub fn query_to_plan(&self, query: Box) -> Result { + // todo(hl): original SQL should be provided as an argument + let sql = query.inner.to_string(); + let result = self + .sql_to_rel + .query_to_plan(query.inner) + .context(error::PlanSqlSnafu { sql })?; + + Ok(LogicalPlan::DfPlan(result)) + } +} + +impl<'a, S> Planner for DfPlanner<'a, S> +where + S: ContextProvider + Send + Sync, +{ + /// Converts statement to logical plan using datafusion planner + fn statement_to_plan(&self, statement: Statement) -> Result { + match statement { + Statement::ShowDatabases(_) => { + todo!("Currently not supported") + } + Statement::Query(qb) => self.query_to_plan(qb), + Statement::Insert(_) => { + todo!() + } + } + } +} + +pub(crate) struct DfContextProviderAdapter<'a> { + catalog_list: &'a CatalogListRef, +} + +impl<'a> DfContextProviderAdapter<'a> { + pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self { + Self { catalog_list } + } +} + +impl<'a> ContextProvider for DfContextProviderAdapter<'a> { + fn get_table_provider(&self, name: TableReference) -> Option> { + let (catalog, schema, table) = match name { + TableReference::Bare { table } => ( + catalog::DEFAULT_CATALOG_NAME, + catalog::DEFAULT_SCHEMA_NAME, + table, + ), + TableReference::Partial { schema, table } => { + (catalog::DEFAULT_CATALOG_NAME, schema, table) + } + TableReference::Full { + catalog, + schema, + table, + } => (catalog, schema, table), + }; + + self.catalog_list + .catalog(catalog) + .and_then(|catalog_provider| catalog_provider.schema(schema)) + .and_then(|schema_provider| schema_provider.table(table)) + .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) + } + + fn get_function_meta(&self, _name: &str) -> Option> { + // TODO(dennis) + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + // TODO(dennis) + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + // TODO(dennis) + None + } +} diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index d00d51a6db..0814a03711 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -1,111 +1,8 @@ -use std::sync::Arc; - -use arrow::datatypes::DataType; -use datafusion::catalog::TableReference; -use datafusion::datasource::TableProvider; -use datafusion::physical_plan::udaf::AggregateUDF; -use datafusion::physical_plan::udf::ScalarUDF; -use datafusion::sql::planner::{ContextProvider, SqlToRel}; -use snafu::ResultExt; -use sql::statements::query::Query; use sql::statements::statement::Statement; -use table::table::adapter::DfTableProviderAdapter; -use crate::{ - catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}, - datafusion::error, - error::Result, - plan::LogicalPlan, -}; +use crate::{error::Result, plan::LogicalPlan}; +/// SQL logical planner. pub trait Planner: Send + Sync { fn statement_to_plan(&self, statement: Statement) -> Result; } - -pub struct DfPlanner<'a, S: ContextProvider> { - sql_to_rel: SqlToRel<'a, S>, -} - -impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { - /// Creates a DataFusion planner instance - pub fn new(schema_provider: &'a S) -> Self { - let rel = SqlToRel::new(schema_provider); - Self { sql_to_rel: rel } - } - - /// Converts QUERY statement to logical plan. - pub fn query_to_plan(&self, query: Box) -> Result { - // todo(hl): original SQL should be provided as an argument - let sql = query.inner.to_string(); - let result = self - .sql_to_rel - .query_to_plan(query.inner) - // FIXME(yingwen): Move DfPlanner to datafusion mod. - .context(error::PlannerSnafu { sql })?; - - Ok(LogicalPlan::DfPlan(result)) - } -} - -impl<'a, S> Planner for DfPlanner<'a, S> -where - S: ContextProvider + Send + Sync, -{ - /// Converts statement to logical plan using datafusion planner - fn statement_to_plan(&self, statement: Statement) -> Result { - match statement { - Statement::ShowDatabases(_) => { - todo!("Currently not supported") - } - Statement::Query(qb) => self.query_to_plan(qb), - Statement::Insert(_) => { - todo!() - } - } - } -} - -pub(crate) struct DfContextProviderAdapter<'a> { - catalog_list: &'a CatalogListRef, -} - -impl<'a> DfContextProviderAdapter<'a> { - pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self { - Self { catalog_list } - } -} - -impl<'a> ContextProvider for DfContextProviderAdapter<'a> { - fn get_table_provider(&self, name: TableReference) -> Option> { - let (catalog, schema, table) = match name { - TableReference::Bare { table } => (DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table), - TableReference::Partial { schema, table } => (DEFAULT_CATALOG_NAME, schema, table), - TableReference::Full { - catalog, - schema, - table, - } => (catalog, schema, table), - }; - - self.catalog_list - .catalog(catalog) - .and_then(|catalog_provider| catalog_provider.schema(schema)) - .and_then(|schema_provider| schema_provider.table(table)) - .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) - } - - fn get_function_meta(&self, _name: &str) -> Option> { - // TODO(dennis) - None - } - - fn get_aggregate_meta(&self, _name: &str) -> Option> { - // TODO(dennis) - None - } - - fn get_variable_type(&self, _variable_names: &[String]) -> Option { - // TODO(dennis) - None - } -} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f5db6dabed..f7301f6a99 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -1,24 +1,10 @@ -use std::any::Any; use std::fmt; use std::sync::Arc; -use datafusion::catalog::{ - catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider}, - schema::SchemaProvider as DfSchemaProvider, -}; -use datafusion::datasource::TableProvider as DfTableProvider; -use datafusion::error::Result as DataFusionResult; -use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; -use snafu::ResultExt; -use table::{ - table::adapter::{DfTableProviderAdapter, TableAdapter}, - Table, -}; -use crate::catalog::{self, schema::SchemaProvider, CatalogListRef, CatalogProvider}; -use crate::datafusion::error; -use crate::error::Result; +use crate::catalog::{self, CatalogListRef}; +use crate::datafusion::DfCatalogListAdapter; use crate::executor::Runtime; /// Query engine global state @@ -46,10 +32,10 @@ impl QueryEngineState { ); let df_context = ExecutionContext::with_config(config); - df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter { - catalog_list: catalog_list.clone(), - runtime: df_context.runtime_env(), - }); + df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter::new( + df_context.runtime_env(), + catalog_list.clone(), + )); Self { df_context, @@ -72,196 +58,3 @@ impl QueryEngineState { self.df_context.runtime_env().into() } } - -/// Adapters between datafusion and greptime query engine. -struct DfCatalogListAdapter { - runtime: Arc, - catalog_list: CatalogListRef, -} - -impl DfCatalogList for DfCatalogListAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn register_catalog( - &self, - name: String, - catalog: Arc, - ) -> Option> { - let catalog_adapter = Arc::new(CatalogProviderAdapter { - df_cataglog_provider: catalog, - runtime: self.runtime.clone(), - }); - self.catalog_list - .register_catalog(name, catalog_adapter) - .map(|catalog_provider| { - Arc::new(DfCatalogProviderAdapter { - catalog_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } - - fn catalog_names(&self) -> Vec { - self.catalog_list.catalog_names() - } - - fn catalog(&self, name: &str) -> Option> { - self.catalog_list.catalog(name).map(|catalog_provider| { - Arc::new(DfCatalogProviderAdapter { - catalog_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } -} - -/// Datafusion's CatalogProvider -> greptime CatalogProvider -struct CatalogProviderAdapter { - df_cataglog_provider: Arc, - runtime: Arc, -} - -impl CatalogProvider for CatalogProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - self.df_cataglog_provider.schema_names() - } - - fn schema(&self, name: &str) -> Option> { - self.df_cataglog_provider - .schema(name) - .map(|df_schema_provider| { - Arc::new(SchemaProviderAdapter { - df_schema_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } -} - -///Greptime CatalogProvider -> datafusion's CatalogProvider -struct DfCatalogProviderAdapter { - catalog_provider: Arc, - runtime: Arc, -} - -impl DfCatalogProvider for DfCatalogProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - self.catalog_provider.schema_names() - } - - fn schema(&self, name: &str) -> Option> { - self.catalog_provider.schema(name).map(|schema_provider| { - Arc::new(DfSchemaProviderAdapter { - schema_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } -} - -/// Greptime SchemaProvider -> datafusion SchemaProvider -struct DfSchemaProviderAdapter { - schema_provider: Arc, - runtime: Arc, -} - -impl DfSchemaProvider for DfSchemaProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Vec { - self.schema_provider.table_names() - } - - fn table(&self, name: &str) -> Option> { - self.schema_provider - .table(name) - .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) - } - - fn register_table( - &self, - name: String, - table: Arc, - ) -> DataFusionResult>> { - let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); - match self.schema_provider.register_table(name, table)? { - Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - None => Ok(None), - } - } - - fn deregister_table(&self, name: &str) -> DataFusionResult>> { - match self.schema_provider.deregister_table(name)? { - Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - None => Ok(None), - } - } - - fn table_exist(&self, name: &str) -> bool { - self.schema_provider.table_exist(name) - } -} - -/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter -struct SchemaProviderAdapter { - df_schema_provider: Arc, - runtime: Arc, -} - -impl SchemaProvider for SchemaProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - /// Retrieves the list of available table names in this schema. - fn table_names(&self) -> Vec { - self.df_schema_provider.table_names() - } - - fn table(&self, name: &str) -> Option> { - self.df_schema_provider.table(name).map(|table_provider| { - Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _ - }) - } - - fn register_table( - &self, - name: String, - table: Arc, - ) -> Result>> { - let table_provider = Arc::new(DfTableProviderAdapter::new(table)); - Ok(self - .df_schema_provider - .register_table(name, table_provider) - .context(error::DatafusionSnafu { - msg: "Fail to register table to datafusion", - })? - .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) - } - - fn deregister_table(&self, name: &str) -> Result>> { - Ok(self - .df_schema_provider - .deregister_table(name) - .context(error::DatafusionSnafu { - msg: "Fail to deregister table from datafusion", - })? - .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) - } - - fn table_exist(&self, name: &str) -> bool { - self.df_schema_provider.table_exist(name) - } -}