diff --git a/Cargo.lock b/Cargo.lock index 2af5ac466e..a1ca0da6a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -805,7 +805,10 @@ dependencies = [ "datafusion", "datanode", "datatypes", + "prost 0.9.0", "snafu", + "substrait 0.1.0", + "substrait 0.2.0", "tokio", "tonic", "tracing", @@ -1418,6 +1421,7 @@ dependencies = [ "sql", "storage", "store-api", + "substrait 0.1.0", "table", "table-engine", "tempdir", diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 417a1b205e..163b872a63 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -27,6 +27,7 @@ message ObjectExpr { message SelectExpr { oneof expr { string sql = 1; + bytes logical_plan = 2; PhysicalPlan physical_plan = 15; } } diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 82bb407f9c..620037e4a8 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -13,13 +13,25 @@ common-grpc = { path = "../common/grpc" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-time = { path = "../common/time" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ + "simd", +] } datatypes = { path = "../datatypes" } snafu = { version = "0.7", features = ["backtraces"] } tonic = "0.8" [dev-dependencies] datanode = { path = "../datanode" } +substrait = { path = "../common/substrait" } tokio = { version = "1.0", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +[dev-dependencies.substrait_proto] +package = "substrait" +version = "0.2" + +# TODO(ruihang): upgrade to 0.11 once substrait-rs supports it. +[dev-dependencies.prost_09] +package = "prost" +version = "0.9" diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs new file mode 100644 index 0000000000..6b0f8233cc --- /dev/null +++ b/src/client/examples/logical.rs @@ -0,0 +1,96 @@ +use api::v1::{ColumnDataType, ColumnDef, CreateExpr}; +use client::{admin::Admin, Client, Database}; +use prost_09::Message; +use substrait_proto::protobuf::{ + plan_rel::RelType as PlanRelType, + read_rel::{NamedTable, ReadType}, + rel::RelType, + PlanRel, ReadRel, Rel, +}; +use tracing::{event, Level}; + +fn main() { + tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) + .unwrap(); + + run(); +} + +#[tokio::main] +async fn run() { + let client = Client::connect("http://127.0.0.1:3001").await.unwrap(); + + let create_table_expr = CreateExpr { + catalog_name: Some("greptime".to_string()), + schema_name: Some("public".to_string()), + table_name: "test_logical_dist_exec".to_string(), + desc: None, + column_defs: vec![ + ColumnDef { + name: "timestamp".to_string(), + datatype: ColumnDataType::Timestamp as i32, + is_nullable: false, + default_constraint: None, + }, + ColumnDef { + name: "key".to_string(), + datatype: ColumnDataType::Uint64 as i32, + is_nullable: false, + default_constraint: None, + }, + ColumnDef { + name: "value".to_string(), + datatype: ColumnDataType::Uint64 as i32, + is_nullable: false, + default_constraint: None, + }, + ], + time_index: "timestamp".to_string(), + primary_keys: vec!["key".to_string()], + create_if_not_exists: false, + table_options: Default::default(), + }; + + let admin = Admin::new("create table", client.clone()); + let result = admin.create(create_table_expr).await.unwrap(); + event!(Level::INFO, "create table result: {:#?}", result); + + let logical = mock_logical_plan(); + event!(Level::INFO, "plan size: {:#?}", logical.len()); + let db = Database::new("greptime", client); + let result = db.logical_plan(logical).await.unwrap(); + + event!(Level::INFO, "result: {:#?}", result); +} + +fn mock_logical_plan() -> Vec { + let catalog_name = "greptime".to_string(); + let schema_name = "public".to_string(); + let table_name = "test_logical_dist_exec".to_string(); + + let named_table = NamedTable { + names: vec![catalog_name, schema_name, table_name], + advanced_extension: None, + }; + let read_type = ReadType::NamedTable(named_table); + + let read_rel = ReadRel { + common: None, + base_schema: None, + filter: None, + projection: None, + advanced_extension: None, + read_type: Some(read_type), + }; + + let mut buf = vec![]; + let rel = Rel { + rel_type: Some(RelType::Read(Box::new(read_rel))), + }; + let plan_rel = PlanRel { + rel_type: Some(PlanRelType::Rel(rel)), + }; + plan_rel.encode(&mut buf).unwrap(); + + buf +} diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 6a26131da9..2a1d8dc76e 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -107,6 +107,13 @@ impl Database { self.do_select(select_expr).await } + pub async fn logical_plan(&self, logical_plan: Vec) -> Result { + let select_expr = SelectExpr { + expr: Some(select_expr::Expr::LogicalPlan(logical_plan)), + }; + self.do_select(select_expr).await + } + async fn do_select(&self, select_expr: SelectExpr) -> Result { let header = ExprHeader { version: PROTOCOL_VERSION, diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index d9881603d4..92ba98a923 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -47,9 +47,12 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { fn encode(&self, plan: Self::Plan) -> Result { let rel = self.convert_plan(plan)?; + let plan_rel = PlanRel { + rel_type: Some(PlanRelType::Rel(rel)), + }; let mut buf = BytesMut::new(); - rel.encode(&mut buf).context(EncodeRelSnafu)?; + plan_rel.encode(&mut buf).context(EncodeRelSnafu)?; Ok(buf.freeze()) } @@ -182,35 +185,35 @@ impl DFLogicalSubstraitConvertor { } .fail()?, LogicalPlan::Filter(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical Filter", } .fail()?, LogicalPlan::Window(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical Window", } .fail()?, LogicalPlan::Aggregate(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical Aggregate", } .fail()?, LogicalPlan::Sort(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical Sort", } .fail()?, LogicalPlan::Join(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical Join", } .fail()?, LogicalPlan::CrossJoin(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical CrossJoin", } .fail()?, LogicalPlan::Repartition(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical Repartition", } .fail()?, LogicalPlan::Union(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical Union", } .fail()?, LogicalPlan::TableScan(table_scan) => { @@ -220,11 +223,11 @@ impl DFLogicalSubstraitConvertor { }) } LogicalPlan::EmptyRelation(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical EmptyRelation", } .fail()?, LogicalPlan::Limit(_) => UnsupportedPlanSnafu { - name: "DataFusion Logical Projection", + name: "DataFusion Logical Limit", } .fail()?, LogicalPlan::CreateExternalTable(_) @@ -321,8 +324,8 @@ mod test { async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) { let convertor = DFLogicalSubstraitConvertor::new(catalog); - let rel = convertor.convert_plan(plan.clone()).unwrap(); - let tripped_plan = convertor.convert_rel(rel).unwrap(); + let proto = convertor.encode(plan.clone()).unwrap(); + let tripped_plan = convertor.decode(proto).unwrap(); assert_eq!(format!("{:?}", plan), format!("{:?}", tripped_plan)); } diff --git a/src/common/substrait/src/lib.rs b/src/common/substrait/src/lib.rs index d5576daaa9..3808ce9ec6 100644 --- a/src/common/substrait/src/lib.rs +++ b/src/common/substrait/src/lib.rs @@ -1,5 +1,5 @@ mod df_logical; -mod error; +pub mod error; use bytes::{Buf, Bytes}; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index d9112adef1..e6ae94049a 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -5,9 +5,7 @@ edition = "2021" [features] default = ["python"] -python = [ - "dep:script" -] +python = ["dep:script"] [dependencies] api = { path = "../api" } @@ -23,7 +21,9 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ + "simd", +] } datatypes = { path = "../datatypes" } futures = "0.3" hyper = { version = "0.14", features = ["full"] } @@ -39,6 +39,7 @@ snafu = { version = "0.7", features = ["backtraces"] } sql = { path = "../sql" } storage = { path = "../storage" } store-api = { path = "../store-api" } +substrait = { path = "../common/substrait" } table = { path = "../table" } table-engine = { path = "../table-engine", features = ["test"] } tokio = { version = "1.18", features = ["full"] } @@ -46,22 +47,41 @@ tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.3", features = ["full"] } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies.arrow] package = "arrow2" version = "0.10" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] +features = [ + "io_csv", + "io_json", + "io_parquet", + "io_parquet_compression", + "io_ipc", + "ahash", + "compute", + "serde_types", +] [dev-dependencies] axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } client = { path = "../client" } common-query = { path = "../common/query" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [ + "simd", +] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } tempdir = "0.3" [dev-dependencies.arrow] package = "arrow2" version = "0.10" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] +features = [ + "io_csv", + "io_json", + "io_parquet", + "io_parquet_compression", + "io_ipc", + "ahash", + "compute", + "serde_types", +] diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 01c5cd1e39..a0b7be3784 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -16,6 +16,12 @@ pub enum Error { source: query::error::Error, }, + #[snafu(display("Failed to decode logical plan, source: {}", source))] + DecodeLogicalPlan { + #[snafu(backtrace)] + source: substrait::error::Error, + }, + #[snafu(display("Failed to execute physical plan, source: {}", source))] ExecutePhysicalPlan { #[snafu(backtrace)] @@ -269,6 +275,7 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::ExecuteSql { source } => source.status_code(), + Error::DecodeLogicalPlan { source } => source.status_code(), Error::ExecutePhysicalPlan { source } => source.status_code(), Error::NewCatalog { source } => source.status_code(), diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 250df8504f..5b736f7345 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -7,11 +7,16 @@ use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::status_code::StatusCode; use common_query::Output; use common_telemetry::logging::{debug, info}; +use query::plan::LogicalPlan; use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler}; use snafu::prelude::*; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::requests::AddColumnRequest; -use crate::error::{self, InsertSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu}; +use crate::error::{ + self, DecodeLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, Result, TableNotFoundSnafu, + UnsupportedExprSnafu, +}; use crate::instance::Instance; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; use crate::server::grpc::insert::{self, insertion_expr_to_request}; @@ -155,6 +160,7 @@ impl Instance { let expr = select_expr.expr; match expr { Some(select_expr::Expr::Sql(sql)) => self.execute_sql(&sql).await, + Some(select_expr::Expr::LogicalPlan(plan)) => self.execute_logical(plan).await, Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => { self.physical_planner .execute(PhysicalPlanner::parse(plan)?, original_ql) @@ -166,6 +172,18 @@ impl Instance { .fail(), } } + + async fn execute_logical(&self, plan_bytes: Vec) -> Result { + let logical_plan_converter = DFLogicalSubstraitConvertor::new(self.catalog_manager.clone()); + let logical_plan = logical_plan_converter + .decode(plan_bytes.as_slice()) + .context(DecodeLogicalPlanSnafu)?; + + self.query_engine + .execute(&LogicalPlan::DfPlan(logical_plan)) + .await + .context(ExecuteSqlSnafu) + } } #[async_trait]