diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index bc1b84cb04..435f933995 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -14,11 +14,23 @@ //! Transform Substrait into execution plan use std::collections::HashMap; +use std::sync::Arc; use datatypes::data_type::ConcreteDataType as CDT; +use prost::Message; +use query::parser::QueryLanguageParser; +use query::plan::LogicalPlan; +use query::QueryEngine; +use session::context::QueryContext; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; -use crate::adapter::error::{Error, NotImplementedSnafu, TableNotFoundSnafu}; +use crate::adapter::error::{ + Error, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, InvalidQuerySubstraitSnafu, + NotImplementedSnafu, TableNotFoundSnafu, +}; +use crate::adapter::FlowWorkerContext; use crate::expr::GlobalId; +use crate::plan::TypedPlan; use crate::repr::RelationType; /// a simple macro to generate a not implemented error macro_rules! not_impl_err { @@ -44,7 +56,7 @@ mod literal; mod plan; use literal::{from_substrait_literal, from_substrait_type}; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use substrait::substrait_proto::proto::extensions::simple_extension_declaration::MappingType; use substrait::substrait_proto::proto::extensions::SimpleExtensionDeclaration; @@ -79,38 +91,33 @@ impl FunctionExtensions { } } -/// A context that holds the information of the dataflow -pub struct DataflowContext { - /// `id` refer to any source table in the dataflow, and `name` is the name of the table - /// which is a `Vec` in substrait - id_to_name: HashMap>, - /// see `id_to_name` - name_to_id: HashMap, GlobalId>, - /// the schema of the table - schema: HashMap, -} +/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan, +/// then to a substrait plan, and finally to a flow plan. +/// TODO: check if use empty `QueryContext` influence anything +pub async fn sql_to_flow_plan( + ctx: &mut FlowWorkerContext, + engine: &Arc, + sql: &str, +) -> Result { + let stmt = + QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).context(InvalidQueryPlanSnafu)?; + let plan = engine + .planner() + .plan(stmt, QueryContext::arc()) + .await + .context(InvalidQueryPlanSnafu)?; + let LogicalPlan::DfPlan(plan) = plan; -impl DataflowContext { - /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. - /// - /// Returns an error if no table has been registered with the provided names - pub fn table(&self, name: &Vec) -> Result<(GlobalId, RelationType), Error> { - let id = self - .name_to_id - .get(name) - .copied() - .with_context(|| TableNotFoundSnafu { - name: name.join("."), - })?; - let schema = self - .schema - .get(&id) - .cloned() - .with_context(|| TableNotFoundSnafu { - name: name.join("."), - })?; - Ok((id, schema)) - } + // encode then decode so to rely on the impl of conversion from logical plan to substrait plan + let bytes = DFLogicalSubstraitConvertor {} + .encode(&plan) + .context(InvalidQuerySubstraitSnafu)?; + + let sub_plan = substrait::substrait_proto::proto::Plan::decode(bytes) + .map_err(|inner| InvalidQueryProstSnafu { inner }.build())?; + let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan)?; + + Ok(flow_plan) } #[cfg(test)] @@ -131,12 +138,12 @@ mod test { use super::*; use crate::repr::ColumnType; - pub fn create_test_ctx() -> DataflowContext { + pub fn create_test_ctx() -> FlowWorkerContext { let gid = GlobalId::User(0); let name = vec!["numbers".to_string()]; let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); - DataflowContext { + FlowWorkerContext { id_to_name: HashMap::from([(gid, name.clone())]), name_to_id: HashMap::from([(name.clone(), gid)]), schema: HashMap::from([(gid, schema)]), diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index be4a42a9b5..f5d49bcae4 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -54,11 +54,11 @@ use crate::expr::{ }; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; -use crate::transform::{DataflowContext, FunctionExtensions}; +use crate::transform::{FlowWorkerContext, FunctionExtensions}; impl TypedExpr { fn from_substrait_agg_grouping( - ctx: &mut DataflowContext, + ctx: &mut FlowWorkerContext, groupings: &[Grouping], typ: &RelationType, extensions: &FunctionExtensions, @@ -84,7 +84,7 @@ impl TypedExpr { impl AggregateExpr { fn from_substrait_agg_measures( - ctx: &mut DataflowContext, + ctx: &mut FlowWorkerContext, measures: &[Measure], typ: &RelationType, extensions: &FunctionExtensions, @@ -218,7 +218,7 @@ impl KeyValPlan { impl TypedPlan { /// Convert AggregateRel into Flow's TypedPlan pub fn from_substrait_agg_rel( - ctx: &mut DataflowContext, + ctx: &mut FlowWorkerContext, agg: &proto::AggregateRel, extensions: &FunctionExtensions, ) -> Result { diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index fd73bb33d0..7290eaf183 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -23,12 +23,12 @@ use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanS use crate::expr::{MapFilterProject, TypedExpr}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, RelationType}; -use crate::transform::{DataflowContext, FunctionExtensions}; +use crate::transform::{FlowWorkerContext, FunctionExtensions}; impl TypedPlan { /// Convert Substrait Plan into Flow's TypedPlan pub fn from_substrait_plan( - ctx: &mut DataflowContext, + ctx: &mut FlowWorkerContext, plan: &SubPlan, ) -> Result { // Register function extension @@ -62,7 +62,7 @@ impl TypedPlan { /// Convert Substrait Rel into Flow's TypedPlan /// TODO: SELECT DISTINCT(does it get compile with something else?) pub fn from_substrait_rel( - ctx: &mut DataflowContext, + ctx: &mut FlowWorkerContext, rel: &Rel, extensions: &FunctionExtensions, ) -> Result {