feat: sql_to_flow_plan

This commit is contained in:
discord9
2024-04-25 15:06:35 +08:00
parent 8240a1ace1
commit b19febc97c
3 changed files with 49 additions and 42 deletions

View File

@@ -14,11 +14,23 @@
//! Transform Substrait into execution plan
use std::collections::HashMap;
use std::sync::Arc;
use datatypes::data_type::ConcreteDataType as CDT;
use prost::Message;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::QueryEngine;
use session::context::QueryContext;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use crate::adapter::error::{Error, NotImplementedSnafu, TableNotFoundSnafu};
use crate::adapter::error::{
Error, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, InvalidQuerySubstraitSnafu,
NotImplementedSnafu, TableNotFoundSnafu,
};
use crate::adapter::FlowWorkerContext;
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
use crate::repr::RelationType;
/// a simple macro to generate a not implemented error
macro_rules! not_impl_err {
@@ -44,7 +56,7 @@ mod literal;
mod plan;
use literal::{from_substrait_literal, from_substrait_type};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use substrait::substrait_proto::proto::extensions::simple_extension_declaration::MappingType;
use substrait::substrait_proto::proto::extensions::SimpleExtensionDeclaration;
@@ -79,38 +91,33 @@ impl FunctionExtensions {
}
}
/// A context that holds the information of the dataflow
pub struct DataflowContext {
/// `id` refer to any source table in the dataflow, and `name` is the name of the table
/// which is a `Vec<String>` in substrait
id_to_name: HashMap<GlobalId, Vec<String>>,
/// see `id_to_name`
name_to_id: HashMap<Vec<String>, GlobalId>,
/// the schema of the table
schema: HashMap<GlobalId, RelationType>,
}
/// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan,
/// then to a substrait plan, and finally to a flow plan.
/// TODO: check if use empty `QueryContext` influence anything
pub async fn sql_to_flow_plan(
ctx: &mut FlowWorkerContext,
engine: &Arc<dyn QueryEngine>,
sql: &str,
) -> Result<TypedPlan, Error> {
let stmt =
QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).context(InvalidQueryPlanSnafu)?;
let plan = engine
.planner()
.plan(stmt, QueryContext::arc())
.await
.context(InvalidQueryPlanSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;
impl DataflowContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names
pub fn table(&self, name: &Vec<String>) -> Result<(GlobalId, RelationType), Error> {
let id = self
.name_to_id
.get(name)
.copied()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
let schema = self
.schema
.get(&id)
.cloned()
.with_context(|| TableNotFoundSnafu {
name: name.join("."),
})?;
Ok((id, schema))
}
// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
let bytes = DFLogicalSubstraitConvertor {}
.encode(&plan)
.context(InvalidQuerySubstraitSnafu)?;
let sub_plan = substrait::substrait_proto::proto::Plan::decode(bytes)
.map_err(|inner| InvalidQueryProstSnafu { inner }.build())?;
let flow_plan = TypedPlan::from_substrait_plan(ctx, &sub_plan)?;
Ok(flow_plan)
}
#[cfg(test)]
@@ -131,12 +138,12 @@ mod test {
use super::*;
use crate::repr::ColumnType;
pub fn create_test_ctx() -> DataflowContext {
pub fn create_test_ctx() -> FlowWorkerContext {
let gid = GlobalId::User(0);
let name = vec!["numbers".to_string()];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
DataflowContext {
FlowWorkerContext {
id_to_name: HashMap::from([(gid, name.clone())]),
name_to_id: HashMap::from([(name.clone(), gid)]),
schema: HashMap::from([(gid, schema)]),

View File

@@ -54,11 +54,11 @@ use crate::expr::{
};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::{DataflowContext, FunctionExtensions};
use crate::transform::{FlowWorkerContext, FunctionExtensions};
impl TypedExpr {
fn from_substrait_agg_grouping(
ctx: &mut DataflowContext,
ctx: &mut FlowWorkerContext,
groupings: &[Grouping],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -84,7 +84,7 @@ impl TypedExpr {
impl AggregateExpr {
fn from_substrait_agg_measures(
ctx: &mut DataflowContext,
ctx: &mut FlowWorkerContext,
measures: &[Measure],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -218,7 +218,7 @@ impl KeyValPlan {
impl TypedPlan {
/// Convert AggregateRel into Flow's TypedPlan
pub fn from_substrait_agg_rel(
ctx: &mut DataflowContext,
ctx: &mut FlowWorkerContext,
agg: &proto::AggregateRel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {

View File

@@ -23,12 +23,12 @@ use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanS
use crate::expr::{MapFilterProject, TypedExpr};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, RelationType};
use crate::transform::{DataflowContext, FunctionExtensions};
use crate::transform::{FlowWorkerContext, FunctionExtensions};
impl TypedPlan {
/// Convert Substrait Plan into Flow's TypedPlan
pub fn from_substrait_plan(
ctx: &mut DataflowContext,
ctx: &mut FlowWorkerContext,
plan: &SubPlan,
) -> Result<TypedPlan, Error> {
// Register function extension
@@ -62,7 +62,7 @@ impl TypedPlan {
/// Convert Substrait Rel into Flow's TypedPlan
/// TODO: SELECT DISTINCT(does it get compile with something else?)
pub fn from_substrait_rel(
ctx: &mut DataflowContext,
ctx: &mut FlowWorkerContext,
rel: &Rel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {