From 8959dbcef83507ccd76aaaffd2b44cab6426e68f Mon Sep 17 00:00:00 2001 From: LFC Date: Tue, 6 Dec 2022 19:21:57 +0800 Subject: [PATCH] feat: Substrait logical plan (#704) * feat: use Substrait logical plan to query data from Datanode in Frontend in distributed mode * fix: resolve PR comments * fix: resolve PR comments * fix: resolve PR comments Co-authored-by: luofucong --- Cargo.lock | 4 +- src/{common => }/catalog/src/helper.rs | 13 +- src/catalog/src/lib.rs | 1 + src/catalog/src/remote/manager.rs | 8 +- src/catalog/tests/remote_catalog_tests.rs | 2 +- src/common/catalog/Cargo.toml | 1 - src/common/catalog/src/consts.rs | 6 - src/common/catalog/src/lib.rs | 7 - src/common/substrait/src/context.rs | 11 ++ src/common/substrait/src/df_expr.rs | 34 ++++- src/common/substrait/src/df_logical.rs | 163 ++++++++++++++++------ src/common/substrait/src/error.rs | 7 + src/common/substrait/src/lib.rs | 7 +- src/common/substrait/src/types.rs | 70 +++++++++- src/datanode/src/instance/grpc.rs | 5 +- src/datatypes/src/schema.rs | 10 ++ src/frontend/Cargo.toml | 1 + src/frontend/src/catalog.rs | 11 +- src/frontend/src/error.rs | 7 + src/frontend/src/instance/distributed.rs | 2 +- src/frontend/src/table.rs | 6 +- src/frontend/src/table/scan.rs | 107 +++----------- src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/keys.rs | 2 +- src/meta-srv/src/service/router.rs | 2 +- src/table/Cargo.toml | 1 + src/table/src/metadata.rs | 5 +- 27 files changed, 315 insertions(+), 179 deletions(-) rename src/{common => }/catalog/src/helper.rs (98%) diff --git a/Cargo.lock b/Cargo.lock index abe4e0918c..33df1779b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1316,7 +1316,6 @@ dependencies = [ "serde", "serde_json", "snafu", - "table", "tempdir", "tokio", ] @@ -2458,6 +2457,7 @@ dependencies = [ "sql", "sqlparser 0.15.0", "store-api", + "substrait 0.1.0", "table", "tempdir", "tokio", @@ -3470,6 +3470,7 @@ version = "0.1.0" dependencies = [ "api", "async-trait", + "catalog", "common-base", "common-catalog", "common-error", @@ -6583,6 +6584,7 @@ version = "0.1.0" dependencies = [ "async-trait", "chrono", + "common-catalog", "common-error", "common-query", "common-recordbatch", diff --git a/src/common/catalog/src/helper.rs b/src/catalog/src/helper.rs similarity index 98% rename from src/common/catalog/src/helper.rs rename to src/catalog/src/helper.rs index dcfa08e8a7..2caf098865 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/catalog/src/helper.rs @@ -15,18 +15,19 @@ use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use common_catalog::error::{ + DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu, +}; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize, Serializer}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::{RawTableInfo, TableId, TableVersion}; -use crate::consts::{ - CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX, -}; -use crate::error::{ - DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu, -}; +const CATALOG_KEY_PREFIX: &str = "__c"; +const SCHEMA_KEY_PREFIX: &str = "__s"; +const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg"; +const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr"; const ALPHANUMERICS_NAME_PATTERN: &str = "[a-zA-Z_][a-zA-Z0-9_]*"; diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index fc7bb42b03..d71a0c6d5b 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -29,6 +29,7 @@ use crate::error::{CreateTableSnafu, Result}; pub use crate::schema::{SchemaProvider, SchemaProviderRef}; pub mod error; +pub mod helper; pub mod local; pub mod remote; pub mod schema; diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index ba7c09f6c0..c37acdc303 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -20,10 +20,6 @@ use std::sync::Arc; use arc_swap::ArcSwap; use async_stream::stream; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; -use common_catalog::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, - SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, -}; use common_telemetry::{debug, info}; use futures::Stream; use futures_util::StreamExt; @@ -39,6 +35,10 @@ use crate::error::{ CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu, OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu, }; +use crate::helper::{ + build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, + SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, +}; use crate::remote::{Kv, KvBackendRef}; use crate::{ handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index e5d8811e71..9903b8ff85 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -22,12 +22,12 @@ mod tests { use std::collections::HashSet; use std::sync::Arc; + use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use catalog::remote::{ KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, }; use catalog::{CatalogList, CatalogManager, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use datatypes::schema::Schema; use futures_util::StreamExt; use table::engine::{EngineContext, TableEngineRef}; diff --git a/src/common/catalog/Cargo.toml b/src/common/catalog/Cargo.toml index 5df337479c..b18c561caa 100644 --- a/src/common/catalog/Cargo.toml +++ b/src/common/catalog/Cargo.toml @@ -14,7 +14,6 @@ regex = "1.6" serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } -table = { path = "../../table" } [dev-dependencies] chrono = "0.4" diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 775cddcb42..118c53930b 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -25,9 +25,3 @@ pub const MIN_USER_TABLE_ID: u32 = 1024; pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0; /// scripts table id pub const SCRIPTS_TABLE_ID: u32 = 1; - -pub(crate) const CATALOG_KEY_PREFIX: &str = "__c"; -pub(crate) const SCHEMA_KEY_PREFIX: &str = "__s"; -pub(crate) const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg"; -pub(crate) const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr"; -pub const TABLE_ID_KEY_PREFIX: &str = "__tid"; diff --git a/src/common/catalog/src/lib.rs b/src/common/catalog/src/lib.rs index 30e01900b3..841420c219 100644 --- a/src/common/catalog/src/lib.rs +++ b/src/common/catalog/src/lib.rs @@ -14,10 +14,3 @@ pub mod consts; pub mod error; -mod helper; - -pub use helper::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, - build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, - TableGlobalValue, TableRegionalKey, TableRegionalValue, -}; diff --git a/src/common/substrait/src/context.rs b/src/common/substrait/src/context.rs index 893546ea48..b017e9cc9a 100644 --- a/src/common/substrait/src/context.rs +++ b/src/common/substrait/src/context.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +use datafusion::logical_plan::DFSchemaRef; use substrait_proto::protobuf::extensions::simple_extension_declaration::{ ExtensionFunction, MappingType, }; @@ -23,6 +24,7 @@ use substrait_proto::protobuf::extensions::SimpleExtensionDeclaration; pub struct ConvertorContext { scalar_fn_names: HashMap, scalar_fn_map: HashMap, + df_schema: Option, } impl ConvertorContext { @@ -63,4 +65,13 @@ impl ConvertorContext { } result } + + pub(crate) fn set_df_schema(&mut self, schema: DFSchemaRef) { + debug_assert!(self.df_schema.is_none()); + self.df_schema.get_or_insert(schema); + } + + pub(crate) fn df_schema(&self) -> Option<&DFSchemaRef> { + self.df_schema.as_ref() + } } diff --git a/src/common/substrait/src/df_expr.rs b/src/common/substrait/src/df_expr.rs index 8267fa9cc1..d924e7b085 100644 --- a/src/common/substrait/src/df_expr.rs +++ b/src/common/substrait/src/df_expr.rs @@ -16,7 +16,7 @@ use std::collections::VecDeque; use std::str::FromStr; use datafusion::logical_plan::{Column, Expr}; -use datafusion_expr::{expr_fn, BuiltinScalarFunction, Operator}; +use datafusion_expr::{expr_fn, lit, BuiltinScalarFunction, Operator}; use datatypes::schema::Schema; use snafu::{ensure, OptionExt}; use substrait_proto::protobuf::expression::field_reference::ReferenceType as FieldReferenceType; @@ -24,7 +24,7 @@ use substrait_proto::protobuf::expression::reference_segment::{ ReferenceType as SegReferenceType, StructField, }; use substrait_proto::protobuf::expression::{ - FieldReference, ReferenceSegment, RexType, ScalarFunction, + FieldReference, Literal, ReferenceSegment, RexType, ScalarFunction, }; use substrait_proto::protobuf::function_argument::ArgType; use substrait_proto::protobuf::Expression; @@ -33,15 +33,24 @@ use crate::context::ConvertorContext; use crate::error::{ EmptyExprSnafu, InvalidParametersSnafu, MissingFieldSnafu, Result, UnsupportedExprSnafu, }; +use crate::types::{literal_type_to_scalar_value, scalar_value_as_literal_type}; /// Convert substrait's `Expression` to DataFusion's `Expr`. -pub fn to_df_expr(ctx: &ConvertorContext, expression: Expression, schema: &Schema) -> Result { +pub(crate) fn to_df_expr( + ctx: &ConvertorContext, + expression: Expression, + schema: &Schema, +) -> Result { let expr_rex_type = expression.rex_type.context(EmptyExprSnafu)?; match expr_rex_type { - RexType::Literal(_) => UnsupportedExprSnafu { - name: "substrait Literal expression", + RexType::Literal(l) => { + let t = l.literal_type.context(MissingFieldSnafu { + field: "LiteralType", + plan: "Literal", + })?; + let v = literal_type_to_scalar_value(t)?; + Ok(lit(v)) } - .fail()?, RexType::Selection(selection) => convert_selection_rex(*selection, schema), RexType::ScalarFunction(scalar_fn) => convert_scalar_function(ctx, scalar_fn, schema), RexType::WindowFunction(_) @@ -453,10 +462,21 @@ pub fn expression_from_df_expr( } } // Don't merge them with other unsupported expr arms to preserve the ordering. - Expr::ScalarVariable(..) | Expr::Literal(..) => UnsupportedExprSnafu { + Expr::ScalarVariable(..) => UnsupportedExprSnafu { name: expr.to_string(), } .fail()?, + Expr::Literal(v) => { + let t = scalar_value_as_literal_type(v)?; + let l = Literal { + nullable: true, + type_variation_reference: 0, + literal_type: Some(t), + }; + Expression { + rex_type: Some(RexType::Literal(l)), + } + } Expr::BinaryExpr { left, op, right } => { let left = expression_from_df_expr(ctx, left, schema)?; let right = expression_from_df_expr(ctx, right, schema)?; diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index 8d53ef1b08..81909cf38d 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -18,7 +18,9 @@ use bytes::{Buf, Bytes, BytesMut}; use catalog::CatalogManagerRef; use common_error::prelude::BoxedError; use common_telemetry::debug; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::datasource::TableProvider; +use datafusion::logical_plan::plan::Filter; use datafusion::logical_plan::{LogicalPlan, TableScan, ToDFSchema}; use datafusion::physical_plan::project_schema; use prost::Message; @@ -29,31 +31,33 @@ use substrait_proto::protobuf::extensions::simple_extension_declaration::Mapping use substrait_proto::protobuf::plan_rel::RelType as PlanRelType; use substrait_proto::protobuf::read_rel::{NamedTable, ReadType}; use substrait_proto::protobuf::rel::RelType; -use substrait_proto::protobuf::{Plan, PlanRel, ReadRel, Rel}; +use substrait_proto::protobuf::{FilterRel, Plan, PlanRel, ReadRel, Rel}; use table::table::adapter::DfTableProviderAdapter; use crate::context::ConvertorContext; use crate::df_expr::{expression_from_df_expr, to_df_expr}; use crate::error::{ - DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error, InternalSnafu, + self, DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error, InternalSnafu, InvalidParametersSnafu, MissingFieldSnafu, SchemaNotMatchSnafu, TableNotFoundSnafu, UnknownPlanSnafu, UnsupportedExprSnafu, UnsupportedPlanSnafu, }; use crate::schema::{from_schema, to_schema}; use crate::SubstraitPlan; -pub struct DFLogicalSubstraitConvertor { - catalog_manager: CatalogManagerRef, -} +pub struct DFLogicalSubstraitConvertor; impl SubstraitPlan for DFLogicalSubstraitConvertor { type Error = Error; type Plan = LogicalPlan; - fn decode(&self, message: B) -> Result { + fn decode( + &self, + message: B, + catalog_manager: CatalogManagerRef, + ) -> Result { let plan = Plan::decode(message).context(DecodeRelSnafu)?; - self.convert_plan(plan) + self.convert_plan(plan, catalog_manager) } fn encode(&self, plan: Self::Plan) -> Result { @@ -67,13 +71,11 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { } impl DFLogicalSubstraitConvertor { - pub fn new(catalog_manager: CatalogManagerRef) -> Self { - Self { catalog_manager } - } -} - -impl DFLogicalSubstraitConvertor { - pub fn convert_plan(&self, mut plan: Plan) -> Result { + fn convert_plan( + &self, + mut plan: Plan, + catalog_manager: CatalogManagerRef, + ) -> Result { // prepare convertor context let mut ctx = ConvertorContext::default(); for simple_ext in plan.extensions { @@ -99,15 +101,51 @@ impl DFLogicalSubstraitConvertor { } .fail()? }; + + self.rel_to_logical_plan(&mut ctx, Box::new(rel), catalog_manager) + } + + fn rel_to_logical_plan( + &self, + ctx: &mut ConvertorContext, + rel: Box, + catalog_manager: CatalogManagerRef, + ) -> Result { let rel_type = rel.rel_type.context(EmptyPlanSnafu)?; // build logical plan let logical_plan = match rel_type { - RelType::Read(read_rel) => self.convert_read_rel(&mut ctx, read_rel), - RelType::Filter(_filter_rel) => UnsupportedPlanSnafu { - name: "Filter Relation", + RelType::Read(read_rel) => self.convert_read_rel(ctx, read_rel, catalog_manager)?, + RelType::Filter(filter) => { + let FilterRel { + common: _, + input, + condition, + advanced_extension: _, + } = *filter; + + let input = input.context(MissingFieldSnafu { + field: "input", + plan: "Filter", + })?; + let input = Arc::new(self.rel_to_logical_plan(ctx, input, catalog_manager)?); + + let condition = condition.context(MissingFieldSnafu { + field: "condition", + plan: "Filter", + })?; + + let schema = ctx.df_schema().context(InvalidParametersSnafu { + reason: "the underlying TableScan plan should have included a table schema", + })?; + let schema = schema + .clone() + .try_into() + .context(error::ConvertDfSchemaSnafu)?; + let predicate = to_df_expr(ctx, *condition, &schema)?; + + LogicalPlan::Filter(Filter { predicate, input }) } - .fail()?, RelType::Fetch(_fetch_rel) => UnsupportedPlanSnafu { name: "Fetch Relation", } @@ -148,7 +186,7 @@ impl DFLogicalSubstraitConvertor { name: "Cross Relation", } .fail()?, - }?; + }; Ok(logical_plan) } @@ -157,6 +195,7 @@ impl DFLogicalSubstraitConvertor { &self, ctx: &mut ConvertorContext, read_rel: Box, + catalog_manager: CatalogManagerRef, ) -> Result { // Extract the catalog, schema and table name from NamedTable. Assume the first three are those names. let read_type = read_rel.read_type.context(MissingFieldSnafu { @@ -192,8 +231,7 @@ impl DFLogicalSubstraitConvertor { .map(|mask_expr| self.convert_mask_expression(mask_expr)); // Get table handle from catalog manager - let table_ref = self - .catalog_manager + let table_ref = catalog_manager .table(&catalog_name, &schema_name, &table_name) .map_err(BoxedError::new) .context(InternalSnafu)? @@ -207,7 +245,7 @@ impl DFLogicalSubstraitConvertor { let retrieved_schema = to_schema(read_rel.base_schema.unwrap_or_default())?; let retrieved_arrow_schema = retrieved_schema.arrow_schema(); ensure!( - stored_schema.fields == retrieved_arrow_schema.fields, + same_schema_without_metadata(&stored_schema, retrieved_arrow_schema), SchemaNotMatchSnafu { substrait_schema: retrieved_arrow_schema.clone(), storage_schema: stored_schema @@ -227,9 +265,11 @@ impl DFLogicalSubstraitConvertor { .to_dfschema_ref() .context(DFInternalSnafu)?; - // TODO(ruihang): Support filters and limit + ctx.set_df_schema(projected_schema.clone()); + + // TODO(ruihang): Support limit Ok(LogicalPlan::TableScan(TableScan { - table_name, + table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name), source: adapter, projection, projected_schema, @@ -250,20 +290,42 @@ impl DFLogicalSubstraitConvertor { } impl DFLogicalSubstraitConvertor { - pub fn convert_df_plan(&self, plan: LogicalPlan) -> Result { - let mut ctx = ConvertorContext::default(); - - // TODO(ruihang): extract this translation logic into a separated function - // convert PlanRel - let rel = match plan { + fn logical_plan_to_rel( + &self, + ctx: &mut ConvertorContext, + plan: Arc, + ) -> Result { + Ok(match &*plan { LogicalPlan::Projection(_) => UnsupportedPlanSnafu { name: "DataFusion Logical Projection", } .fail()?, - LogicalPlan::Filter(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Filter", + LogicalPlan::Filter(filter) => { + let input = Some(Box::new( + self.logical_plan_to_rel(ctx, filter.input.clone())?, + )); + + let schema = plan + .schema() + .clone() + .try_into() + .context(error::ConvertDfSchemaSnafu)?; + let condition = Some(Box::new(expression_from_df_expr( + ctx, + &filter.predicate, + &schema, + )?)); + + let rel = FilterRel { + common: None, + input, + condition, + advanced_extension: None, + }; + Rel { + rel_type: Some(RelType::Filter(Box::new(rel))), + } } - .fail()?, LogicalPlan::Window(_) => UnsupportedPlanSnafu { name: "DataFusion Logical Window", } @@ -293,7 +355,7 @@ impl DFLogicalSubstraitConvertor { } .fail()?, LogicalPlan::TableScan(table_scan) => { - let read_rel = self.convert_table_scan_plan(&mut ctx, table_scan)?; + let read_rel = self.convert_table_scan_plan(ctx, table_scan)?; Rel { rel_type: Some(RelType::Read(Box::new(read_rel))), } @@ -319,7 +381,13 @@ impl DFLogicalSubstraitConvertor { ), } .fail()?, - }; + }) + } + + fn convert_df_plan(&self, plan: LogicalPlan) -> Result { + let mut ctx = ConvertorContext::default(); + + let rel = self.logical_plan_to_rel(&mut ctx, Arc::new(plan))?; // convert extension let extensions = ctx.generate_function_extension(); @@ -341,7 +409,7 @@ impl DFLogicalSubstraitConvertor { pub fn convert_table_scan_plan( &self, ctx: &mut ConvertorContext, - table_scan: TableScan, + table_scan: &TableScan, ) -> Result { let provider = table_scan .source @@ -363,7 +431,8 @@ impl DFLogicalSubstraitConvertor { // assemble projection let projection = table_scan .projection - .map(|proj| self.convert_schema_projection(&proj)); + .as_ref() + .map(|x| self.convert_schema_projection(x)); // assemble base (unprojected) schema using Table's schema. let base_schema = from_schema(&provider.table().schema())?; @@ -371,7 +440,8 @@ impl DFLogicalSubstraitConvertor { // make conjunction over a list of filters and convert the result to substrait let filter = if let Some(conjunction) = table_scan .filters - .into_iter() + .iter() + .cloned() .reduce(|accum, expr| accum.and(expr)) { Some(Box::new(expression_from_df_expr( @@ -412,6 +482,13 @@ impl DFLogicalSubstraitConvertor { } } +fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> bool { + lhs.fields.len() == rhs.fields.len() + && lhs.fields.iter().zip(rhs.fields.iter()).all(|(x, y)| { + x.name == y.name && x.data_type == y.data_type && x.is_nullable == y.is_nullable + }) +} + #[cfg(test)] mod test { use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; @@ -463,10 +540,10 @@ mod test { } async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) { - let convertor = DFLogicalSubstraitConvertor::new(catalog); + let convertor = DFLogicalSubstraitConvertor; let proto = convertor.encode(plan.clone()).unwrap(); - let tripped_plan = convertor.decode(proto).unwrap(); + let tripped_plan = convertor.decode(proto, catalog).unwrap(); assert_eq!(format!("{:?}", plan), format!("{:?}", tripped_plan)); } @@ -488,6 +565,7 @@ mod test { .await .unwrap(); let adapter = Arc::new(DfTableProviderAdapter::new(table_ref)); + let projection = vec![1, 3, 5]; let df_schema = adapter.schema().to_dfschema().unwrap(); let projected_fields = projection @@ -498,7 +576,10 @@ mod test { Arc::new(DFSchema::new_with_metadata(projected_fields, Default::default()).unwrap()); let table_scan_plan = LogicalPlan::TableScan(TableScan { - table_name: DEFAULT_TABLE_NAME.to_string(), + table_name: format!( + "{}.{}.{}", + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME + ), source: adapter, projection: Some(projection), projected_schema, diff --git a/src/common/substrait/src/error.rs b/src/common/substrait/src/error.rs index c33b3679fb..4455e9231c 100644 --- a/src/common/substrait/src/error.rs +++ b/src/common/substrait/src/error.rs @@ -99,6 +99,12 @@ pub enum Error { storage_schema: datafusion::arrow::datatypes::SchemaRef, backtrace: Backtrace, }, + + #[snafu(display("Failed to convert DataFusion schema, source: {}", source))] + ConvertDfSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -120,6 +126,7 @@ impl ErrorExt for Error { | Error::TableNotFound { .. } | Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments, Error::DFInternal { .. } | Error::Internal { .. } => StatusCode::Internal, + Error::ConvertDfSchema { source } => source.status_code(), } } diff --git a/src/common/substrait/src/lib.rs b/src/common/substrait/src/lib.rs index c318799a3b..04c5e82771 100644 --- a/src/common/substrait/src/lib.rs +++ b/src/common/substrait/src/lib.rs @@ -22,6 +22,7 @@ mod schema; mod types; use bytes::{Buf, Bytes}; +use catalog::CatalogManagerRef; pub use crate::df_logical::DFLogicalSubstraitConvertor; @@ -30,7 +31,11 @@ pub trait SubstraitPlan { type Plan; - fn decode(&self, message: B) -> Result; + fn decode( + &self, + message: B, + catalog_manager: CatalogManagerRef, + ) -> Result; fn encode(&self, plan: Self::Plan) -> Result; } diff --git a/src/common/substrait/src/types.rs b/src/common/substrait/src/types.rs index fd4cc34fbe..d1033c7a3e 100644 --- a/src/common/substrait/src/types.rs +++ b/src/common/substrait/src/types.rs @@ -18,11 +18,13 @@ //! Current we only have variations on integer types. Variation 0 (system preferred) are the same with base types, which //! are signed integer (i.e. I8 -> [i8]), and Variation 1 stands for unsigned integer (i.e. I8 -> [u8]). +use datafusion::scalar::ScalarValue; use datatypes::prelude::ConcreteDataType; +use substrait_proto::protobuf::expression::literal::LiteralType; use substrait_proto::protobuf::r#type::{self as s_type, Kind, Nullability}; -use substrait_proto::protobuf::Type as SType; +use substrait_proto::protobuf::{Type as SType, Type}; -use crate::error::{Result, UnsupportedConcreteTypeSnafu, UnsupportedSubstraitTypeSnafu}; +use crate::error::{self, Result, UnsupportedConcreteTypeSnafu, UnsupportedSubstraitTypeSnafu}; macro_rules! substrait_kind { ($desc:ident, $concrete_ty:ident) => {{ @@ -134,3 +136,67 @@ pub fn from_concrete_type(ty: ConcreteDataType, nullability: Option) -> Re Ok(SType { kind }) } + +pub(crate) fn scalar_value_as_literal_type(v: &ScalarValue) -> Result { + Ok(if v.is_null() { + LiteralType::Null(Type { kind: None }) + } else { + match v { + ScalarValue::Boolean(Some(v)) => LiteralType::Boolean(*v), + ScalarValue::Float32(Some(v)) => LiteralType::Fp32(*v), + ScalarValue::Float64(Some(v)) => LiteralType::Fp64(*v), + ScalarValue::Int8(Some(v)) => LiteralType::I8(*v as i32), + ScalarValue::Int16(Some(v)) => LiteralType::I16(*v as i32), + ScalarValue::Int32(Some(v)) => LiteralType::I32(*v), + ScalarValue::Int64(Some(v)) => LiteralType::I64(*v), + ScalarValue::LargeUtf8(Some(v)) => LiteralType::String(v.clone()), + ScalarValue::LargeBinary(Some(v)) => LiteralType::Binary(v.clone()), + // TODO(LFC): Implement other conversions: ScalarValue => LiteralType + _ => { + return error::UnsupportedExprSnafu { + name: format!("{:?}", v), + } + .fail() + } + } + }) +} + +pub(crate) fn literal_type_to_scalar_value(t: LiteralType) -> Result { + Ok(match t { + LiteralType::Null(Type { kind: Some(kind) }) => match kind { + Kind::Bool(_) => ScalarValue::Boolean(None), + Kind::I8(_) => ScalarValue::Int8(None), + Kind::I16(_) => ScalarValue::Int16(None), + Kind::I32(_) => ScalarValue::Int32(None), + Kind::I64(_) => ScalarValue::Int64(None), + Kind::Fp32(_) => ScalarValue::Float32(None), + Kind::Fp64(_) => ScalarValue::Float64(None), + Kind::String(_) => ScalarValue::LargeUtf8(None), + Kind::Binary(_) => ScalarValue::LargeBinary(None), + // TODO(LFC): Implement other conversions: Kind => ScalarValue + _ => { + return error::UnsupportedSubstraitTypeSnafu { + ty: format!("{:?}", kind), + } + .fail() + } + }, + LiteralType::Boolean(v) => ScalarValue::Boolean(Some(v)), + LiteralType::I8(v) => ScalarValue::Int8(Some(v as i8)), + LiteralType::I16(v) => ScalarValue::Int16(Some(v as i16)), + LiteralType::I32(v) => ScalarValue::Int32(Some(v)), + LiteralType::I64(v) => ScalarValue::Int64(Some(v)), + LiteralType::Fp32(v) => ScalarValue::Float32(Some(v)), + LiteralType::Fp64(v) => ScalarValue::Float64(Some(v)), + LiteralType::String(v) => ScalarValue::LargeUtf8(Some(v)), + LiteralType::Binary(v) => ScalarValue::LargeBinary(Some(v)), + // TODO(LFC): Implement other conversions: LiteralType => ScalarValue + _ => { + return error::UnsupportedSubstraitTypeSnafu { + ty: format!("{:?}", t), + } + .fail() + } + }) +} diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index ddc03a6436..41998c1590 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -151,9 +151,8 @@ impl Instance { } async fn execute_logical(&self, plan_bytes: Vec) -> Result { - let logical_plan_converter = DFLogicalSubstraitConvertor::new(self.catalog_manager.clone()); - let logical_plan = logical_plan_converter - .decode(plan_bytes.as_slice()) + let logical_plan = DFLogicalSubstraitConvertor + .decode(plan_bytes.as_slice(), self.catalog_manager.clone()) .context(DecodeLogicalPlanSnafu)?; self.query_engine diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index a1792fd665..e3a5661dfd 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -20,6 +20,7 @@ use std::sync::Arc; pub use arrow::datatypes::Metadata; use arrow::datatypes::{Field, Schema as ArrowSchema}; +use datafusion_common::DFSchemaRef; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -465,6 +466,15 @@ impl TryFrom for Schema { } } +impl TryFrom for Schema { + type Error = Error; + + fn try_from(value: DFSchemaRef) -> Result { + let s: ArrowSchema = value.as_ref().into(); + s.try_into() + } +} + fn try_parse_version(metadata: &Metadata, key: &str) -> Result { if let Some(value) = metadata.get(key) { let version = value diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a269983db8..5e3eee4b94 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -45,6 +45,7 @@ snafu = { version = "0.7", features = ["backtraces"] } sql = { path = "../sql" } sqlparser = "0.15" store-api = { path = "../store-api" } +substrait = { path = "../common/substrait" } table = { path = "../table" } tokio = { version = "1.18", features = ["full"] } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 86356db08c..aea667367f 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -17,13 +17,16 @@ use std::collections::HashSet; use std::sync::Arc; use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu}; +use catalog::helper::{ + build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, + TableGlobalKey, TableGlobalValue, +}; use catalog::remote::{Kv, KvBackendRef}; use catalog::{ CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef, }; -use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue}; use futures::StreamExt; use meta_client::rpc::TableName; use snafu::prelude::*; @@ -130,7 +133,7 @@ impl CatalogList for FrontendCatalogManager { let backend = self.backend.clone(); let res = std::thread::spawn(|| { common_runtime::block_on_read(async move { - let key = common_catalog::build_catalog_prefix(); + let key = build_catalog_prefix(); let mut iter = backend.range(key.as_bytes()); let mut res = HashSet::new(); @@ -180,7 +183,7 @@ impl CatalogProvider for FrontendCatalogProvider { let catalog_name = self.catalog_name.clone(); let res = std::thread::spawn(|| { common_runtime::block_on_read(async move { - let key = common_catalog::build_schema_prefix(&catalog_name); + let key = build_schema_prefix(&catalog_name); let mut iter = backend.range(key.as_bytes()); let mut res = HashSet::new(); @@ -242,7 +245,7 @@ impl SchemaProvider for FrontendSchemaProvider { std::thread::spawn(|| { common_runtime::block_on_read(async move { - let key = common_catalog::build_table_global_prefix(catalog_name, schema_name); + let key = build_table_global_prefix(catalog_name, schema_name); let mut iter = backend.range(key.as_bytes()); let mut res = HashSet::new(); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 9b6275c7bf..823ce693ce 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -445,6 +445,12 @@ pub enum Error { #[snafu(display("Table already exists: `{}`", table))] TableAlreadyExist { table: String, backtrace: Backtrace }, + + #[snafu(display("Failed to encode Substrait logical plan, source: {}", source))] + EncodeSubstraitLogicalPlan { + #[snafu(backtrace)] + source: substrait::error::Error, + }, } pub type Result = std::result::Result; @@ -536,6 +542,7 @@ impl ErrorExt for Error { Error::AlterExprToRequest { source, .. } => source.status_code(), Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable, Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, + Error::EncodeSubstraitLogicalPlan { source } => source.status_code(), } } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index a96f817035..d32e12ee24 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -17,11 +17,11 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::{AlterExpr, CreateDatabaseExpr, CreateExpr}; +use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; use catalog::CatalogList; use chrono::DateTime; use client::admin::{admin_result_to_output, Admin}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_catalog::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; use common_query::Output; use common_telemetry::{debug, error, info}; use datatypes::prelude::ConcreteDataType; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 8f97ba12f7..36d229a245 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -818,9 +818,9 @@ mod test { async fn new_dist_table() -> DistTable { let column_schemas = vec![ - ColumnSchema::new("ts", ConcreteDataType::uint64_datatype(), false), - ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("row_id", ConcreteDataType::uint32_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false), + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), false), ]; let schema = Arc::new(Schema::new(column_schemas.clone())); diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index 1919dc0fb6..14ea9a6a93 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -16,17 +16,14 @@ use std::fmt::Formatter; use std::sync::Arc; use api::v1::InsertExpr; -use client::{Database, ObjectResult, Select}; +use client::{Database, ObjectResult}; use common_query::prelude::Expr; use common_query::Output; use common_recordbatch::{util, RecordBatches}; -use datafusion::logical_plan::{LogicalPlan as DfLogicPlan, LogicalPlanBuilder}; -use datafusion_expr::Expr as DfExpr; -use datatypes::prelude::*; -use datatypes::schema::SchemaRef; +use datafusion::logical_plan::{LogicalPlan, LogicalPlanBuilder}; use meta_client::rpc::TableName; -use query::plan::LogicalPlan; use snafu::ResultExt; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::adapter::DfTableProviderAdapter; use table::TableRef; @@ -56,12 +53,13 @@ impl DatanodeInstance { pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result { let logical_plan = self.build_logical_plan(&plan)?; - // TODO(LFC): Directly pass in logical plan to GRPC interface when our substrait codec supports filter. - let sql = to_sql(logical_plan)?; + let substrait_plan = DFLogicalSubstraitConvertor + .encode(logical_plan) + .context(error::EncodeSubstraitLogicalPlanSnafu)?; let output = self .db - .select(Select::Sql(sql)) + .logical_plan(substrait_plan.to_vec()) .await .and_then(Output::try_from) .context(error::SelectSnafu)?; @@ -94,14 +92,25 @@ impl DatanodeInstance { ) .context(error::BuildDfLogicalPlanSnafu)?; + if let Some(filter) = table_scan + .filters + .iter() + .map(|x| x.df_expr()) + .cloned() + .reduce(|accum, expr| accum.and(expr)) + { + builder = builder + .filter(filter) + .context(error::BuildDfLogicalPlanSnafu)?; + } + if let Some(limit) = table_scan.limit { builder = builder .limit(limit) .context(error::BuildDfLogicalPlanSnafu)?; } - let plan = builder.build().context(error::BuildDfLogicalPlanSnafu)?; - Ok(LogicalPlan::DfPlan(plan)) + builder.build().context(error::BuildDfLogicalPlanSnafu) } } @@ -112,79 +121,3 @@ pub(crate) struct TableScanPlan { pub filters: Vec, pub limit: Option, } - -fn to_sql(plan: LogicalPlan) -> Result { - let LogicalPlan::DfPlan(plan) = plan; - let table_scan = match plan { - DfLogicPlan::TableScan(table_scan) => table_scan, - _ => unreachable!("unknown plan: {:?}", plan), - }; - - let schema: SchemaRef = Arc::new( - table_scan - .source - .schema() - .try_into() - .context(error::ConvertArrowSchemaSnafu)?, - ); - let projection = table_scan - .projection - .map(|x| { - x.iter() - .map(|i| schema.column_name_by_index(*i).to_string()) - .collect::>() - }) - .unwrap_or_else(|| { - schema - .column_schemas() - .iter() - .map(|x| x.name.clone()) - .collect::>() - }) - .join(", "); - - let mut sql = format!("select {} from {}", projection, &table_scan.table_name); - - let filters = table_scan - .filters - .iter() - .map(expr_to_sql) - .collect::>>()? - .join(" AND "); - if !filters.is_empty() { - sql.push_str(" where "); - sql.push_str(&filters); - } - - if let Some(limit) = table_scan.limit { - sql.push_str(" limit "); - sql.push_str(&limit.to_string()); - } - Ok(sql) -} - -fn expr_to_sql(expr: &DfExpr) -> Result { - Ok(match expr { - DfExpr::BinaryExpr { - ref left, - ref right, - ref op, - } => format!( - "{} {} {}", - expr_to_sql(left.as_ref())?, - op, - expr_to_sql(right.as_ref())? - ), - DfExpr::Column(c) => c.name.clone(), - DfExpr::Literal(sv) => { - let v: Value = Value::try_from(sv.clone()) - .with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?; - if matches!(v.data_type(), ConcreteDataType::String(_)) { - format!("'{}'", sv) - } else { - format!("{}", sv) - } - } - _ => unimplemented!("not implemented for {:?}", expr), - }) -} diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 738ca359fa..333cbac4d9 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -10,6 +10,7 @@ mock = [] [dependencies] api = { path = "../api" } async-trait = "0.1" +catalog = { path = "../catalog" } common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index 71a24acbd6..b7e215fec9 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -15,7 +15,7 @@ use std::str::FromStr; use api::v1::meta::TableName; -use common_catalog::TableGlobalKey; +use catalog::helper::TableGlobalKey; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index ba924e61d2..0c502be094 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -16,7 +16,7 @@ use api::v1::meta::{ router_server, CreateRequest, Error, PeerDict, PutRequest, RangeRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse, Table, TableRoute, TableRouteValue, }; -use common_catalog::{TableGlobalKey, TableGlobalValue}; +use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_telemetry::warn; use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 0d0d352cd7..8e7cebb40d 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -7,6 +7,7 @@ license = "Apache-2.0" [dependencies] async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 3fb367589e..2e0f722352 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use chrono::{DateTime, Utc}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; pub use datatypes::error::{Error as ConvertError, Result as ConvertResult}; use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; use derive_builder::Builder; @@ -333,9 +334,9 @@ pub struct TableInfo { /// Comment of the table. #[builder(default, setter(into))] pub desc: Option, - #[builder(default, setter(into))] + #[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))] pub catalog_name: String, - #[builder(default, setter(into))] + #[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))] pub schema_name: String, pub meta: TableMeta, #[builder(default = "TableType::Base")]