From db55c69117cfac72d95b96689b3ea33edfae47de Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Wed, 31 Aug 2022 21:43:50 +0800 Subject: [PATCH] feat: impl grpc physical plan (#212) * chore: rename "convert.rs" to "serde.rs" * proto definition * impl "projection" * add mock_input_exec for test * impl physical plan execution --- Cargo.lock | 14 ++ Cargo.toml | 1 + src/api/build.rs | 1 + src/api/greptime/v1/database.proto | 9 +- src/api/greptime/v1/physical_plan.proto | 33 +++ src/api/src/lib.rs | 4 +- src/api/src/{convert.rs => serde.rs} | 56 +++-- src/client/Cargo.toml | 2 + src/client/examples/physical.rs | 37 ++++ src/client/examples/select.rs | 9 +- src/client/src/database.rs | 53 ++++- src/client/src/error.rs | 12 +- src/client/src/lib.rs | 2 +- src/common/grpc/Cargo.toml | 15 ++ src/common/grpc/src/error.rs | 34 +++ src/common/grpc/src/lib.rs | 8 + src/common/grpc/src/physical.rs | 18 ++ src/common/grpc/src/physical/expr.rs | 82 ++++++++ src/common/grpc/src/physical/plan.rs | 269 ++++++++++++++++++++++++ src/datanode/Cargo.toml | 2 + src/datanode/src/error.rs | 47 +++-- src/datanode/src/instance.rs | 27 ++- src/datanode/src/server/grpc.rs | 5 +- src/datanode/src/server/grpc/insert.rs | 3 +- src/datanode/src/server/grpc/plan.rs | 44 ++++ src/datanode/src/tests/grpc_test.rs | 10 +- src/query/src/datafusion.rs | 7 +- src/query/src/lib.rs | 1 + src/query/src/query_engine.rs | 4 +- 29 files changed, 734 insertions(+), 75 deletions(-) create mode 100644 src/api/greptime/v1/physical_plan.proto rename src/api/src/{convert.rs => serde.rs} (79%) create mode 100644 src/client/examples/physical.rs create mode 100644 src/common/grpc/Cargo.toml create mode 100644 src/common/grpc/src/error.rs create mode 100644 src/common/grpc/src/lib.rs create mode 100644 src/common/grpc/src/physical.rs create mode 100644 src/common/grpc/src/physical/expr.rs create mode 100644 src/common/grpc/src/physical/plan.rs create mode 100644 src/datanode/src/server/grpc/plan.rs diff --git a/Cargo.lock b/Cargo.lock index 678291cea5..db49f1d960 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -695,6 +695,8 @@ version = "0.1.0" dependencies = [ "api", "common-error", + "common-grpc", + "datafusion", "snafu", "tokio", "tonic 0.8.0", @@ -775,6 +777,17 @@ dependencies = [ "statrs", ] +[[package]] +name = "common-grpc" +version = "0.1.0" +dependencies = [ + "api", + "arrow2", + "async-trait", + "datafusion", + "snafu", +] + [[package]] name = "common-query" version = "0.1.0" @@ -1263,6 +1276,7 @@ dependencies = [ "client", "common-base", "common-error", + "common-grpc", "common-query", "common-recordbatch", "common-runtime", diff --git a/Cargo.toml b/Cargo.toml index 7dd14b8809..4c9782eec0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "src/common/base", "src/common/error", "src/common/function", + "src/common/grpc", "src/common/query", "src/common/recordbatch", "src/common/runtime", diff --git a/src/api/build.rs b/src/api/build.rs index 2cb96d7444..412a3fdcbd 100644 --- a/src/api/build.rs +++ b/src/api/build.rs @@ -4,6 +4,7 @@ fn main() { &[ "greptime/v1/insert.proto", "greptime/v1/select.proto", + "greptime/v1/physical_plan.proto", "greptime/v1/greptime.proto", ], &["."], diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index b69fec9c92..6d59ef60d1 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -25,14 +25,19 @@ message ExprHeader { uint32 version = 1; } -// TODO(fys): Only support sql now, and -// will support promql etc in the future +// TODO(fys): Only support sql now, and will support promql etc in the future message SelectExpr { oneof expr { string sql = 1; + PhysicalPlan physical_plan = 15; } } +message PhysicalPlan { + bytes original_ql = 1; + bytes plan = 2; +} + message InsertExpr { string table_name = 1; repeated bytes values = 2; diff --git a/src/api/greptime/v1/physical_plan.proto b/src/api/greptime/v1/physical_plan.proto new file mode 100644 index 0000000000..58444a5af4 --- /dev/null +++ b/src/api/greptime/v1/physical_plan.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; + +package greptime.v1.codec; + +message PhysicalPlanNode { + oneof PhysicalPlanType { + ProjectionExecNode projection = 1; + MockInputExecNode mock = 99; + // TODO(fys): impl other physical plan node + } +} + +message ProjectionExecNode { + PhysicalPlanNode input = 1; + repeated PhysicalExprNode expr = 2; + repeated string expr_name = 3; +} + +message PhysicalExprNode { + oneof ExprType { + PhysicalColumn column = 1; + // TODO(fys): impl other physical expr node + } +} + +message PhysicalColumn { + string name = 1; + uint64 index = 2; +} + +message MockInputExecNode { + string name = 1; +} diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index cc4fb22275..ef132c15fe 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -1,2 +1,4 @@ -pub mod convert; +pub mod serde; pub mod v1; + +pub use prost::DecodeError; diff --git a/src/api/src/convert.rs b/src/api/src/serde.rs similarity index 79% rename from src/api/src/convert.rs rename to src/api/src/serde.rs index 4d3a1fd3f7..1f8e540ed1 100644 --- a/src/api/src/convert.rs +++ b/src/api/src/serde.rs @@ -1,38 +1,34 @@ pub use prost::DecodeError; use prost::Message; -use crate::v1::codec::{InsertBatch, SelectResult}; +use crate::v1::codec::{InsertBatch, PhysicalPlanNode, SelectResult}; -impl From for Vec { - fn from(insert: InsertBatch) -> Self { - insert.encode_to_vec() - } +macro_rules! impl_convert_with_bytes { + ($data_type: ty) => { + impl From<$data_type> for Vec { + fn from(entity: $data_type) -> Self { + entity.encode_to_vec() + } + } + + impl TryFrom<&[u8]> for $data_type { + type Error = DecodeError; + + fn try_from(value: &[u8]) -> Result { + <$data_type>::decode(value.as_ref()) + } + } + }; } -impl TryFrom> for InsertBatch { - type Error = DecodeError; - - fn try_from(value: Vec) -> Result { - InsertBatch::decode(value.as_ref()) - } -} - -impl From for Vec { - fn from(result: SelectResult) -> Self { - result.encode_to_vec() - } -} - -impl TryFrom> for SelectResult { - type Error = DecodeError; - - fn try_from(value: Vec) -> Result { - SelectResult::decode(value.as_ref()) - } -} +impl_convert_with_bytes!(InsertBatch); +impl_convert_with_bytes!(SelectResult); +impl_convert_with_bytes!(PhysicalPlanNode); #[cfg(test)] mod tests { + use std::ops::Deref; + use crate::v1::codec::*; use crate::v1::column; use crate::v1::Column; @@ -44,7 +40,7 @@ mod tests { let insert_batch = mock_insert_batch(); let bytes: Vec = insert_batch.into(); - let insert: InsertBatch = bytes.try_into().unwrap(); + let insert: InsertBatch = bytes.deref().try_into().unwrap(); assert_eq!(8, insert.row_count); assert_eq!(1, insert.columns.len()); @@ -70,7 +66,7 @@ mod tests { bytes[0] = 0b1; bytes[1] = 0b1; - let insert: InsertBatch = bytes.try_into().unwrap(); + let insert: InsertBatch = bytes.deref().try_into().unwrap(); assert_eq!(8, insert.row_count); assert_eq!(1, insert.columns.len()); @@ -90,7 +86,7 @@ mod tests { let select_result = mock_select_result(); let bytes: Vec = select_result.into(); - let result: SelectResult = bytes.try_into().unwrap(); + let result: SelectResult = bytes.deref().try_into().unwrap(); assert_eq!(8, result.row_count); assert_eq!(1, result.columns.len()); @@ -116,7 +112,7 @@ mod tests { bytes[0] = 0b1; bytes[1] = 0b1; - let result: SelectResult = bytes.try_into().unwrap(); + let result: SelectResult = bytes.deref().try_into().unwrap(); assert_eq!(8, result.row_count); assert_eq!(1, result.columns.len()); diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 908a447091..b5f328f710 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -8,6 +8,8 @@ edition = "2021" [dependencies] api = { path = "../api" } common-error = { path = "../common/error" } +common-grpc = { path = "../common/grpc" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } snafu = { version = "0.7", features = ["backtraces"] } tonic = "0.8" diff --git a/src/client/examples/physical.rs b/src/client/examples/physical.rs new file mode 100644 index 0000000000..44b1e22f27 --- /dev/null +++ b/src/client/examples/physical.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use client::{Client, Database}; +use common_grpc::MockExecution; +use datafusion::physical_plan::{ + expressions::Column, projection::ProjectionExec, ExecutionPlan, PhysicalExpr, +}; +use tracing::{event, Level}; + +fn main() { + tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) + .unwrap(); + + run(); +} + +#[tokio::main] +async fn run() { + let client = Client::connect("http://127.0.0.1:3001").await.unwrap(); + let db = Database::new("greptime", client); + + let physical = mock_physical_plan(); + let result = db.physical_plan(physical, None).await; + + event!(Level::INFO, "result: {:#?}", result); +} + +fn mock_physical_plan() -> Arc { + let id_expr = Arc::new(Column::new("id", 0)) as Arc; + let age_expr = Arc::new(Column::new("age", 2)) as Arc; + let expr = vec![(id_expr, "id".to_string()), (age_expr, "age".to_string())]; + + let input = + Arc::new(MockExecution::new("mock_input_exec".to_string())) as Arc; + let projection = ProjectionExec::try_new(expr, input).unwrap(); + Arc::new(projection) +} diff --git a/src/client/examples/select.rs b/src/client/examples/select.rs index ac335fe686..442360bf37 100644 --- a/src/client/examples/select.rs +++ b/src/client/examples/select.rs @@ -1,5 +1,4 @@ -use api::v1::{select_expr, SelectExpr}; -use client::{Client, Database}; +use client::{Client, Database, Select}; use tracing::{event, Level}; fn main() { @@ -14,10 +13,8 @@ async fn run() { let client = Client::connect("http://127.0.0.1:3001").await.unwrap(); let db = Database::new("greptime", client); - let select_expr = SelectExpr { - expr: Some(select_expr::Expr::Sql("select * from demo".to_string())), - }; - let result = db.select(select_expr).await.unwrap(); + let sql = Select::Sql("select * from demo".to_string()); + let result = db.select(sql).await.unwrap(); event!(Level::INFO, "result: {:#?}", result); } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index f670b1faf1..009ca7ca1d 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -1,14 +1,22 @@ +use std::ops::Deref; +use std::sync::Arc; + use api::v1::codec::SelectResult as GrpcSelectResult; use api::v1::{ - object_expr, object_result, DatabaseRequest, ExprHeader, InsertExpr, - MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, SelectExpr, + object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr, + MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan, + SelectExpr, }; use common_error::status_code::StatusCode; +use common_grpc::AsExcutionPlan; +use common_grpc::DefaultAsPlanImpl; +use datafusion::physical_plan::ExecutionPlan; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{self, MissingResultSnafu}; use crate::{ - error::DatanodeSnafu, error::DecodeSelectSnafu, error::MissingHeaderSnafu, Client, Result, + error::DatanodeSnafu, error::DecodeSelectSnafu, error::EncodePhysicalSnafu, + error::MissingHeaderSnafu, Client, Result, }; pub const PROTOCOL_VERSION: u32 = 1; @@ -51,7 +59,34 @@ impl Database { Ok(()) } - pub async fn select(&self, select_expr: SelectExpr) -> Result { + pub async fn select(&self, expr: Select) -> Result { + let select_expr = match expr { + Select::Sql(sql) => SelectExpr { + expr: Some(select_expr::Expr::Sql(sql)), + }, + }; + self.do_select(select_expr).await + } + + pub async fn physical_plan( + &self, + physical: Arc, + original_ql: Option, + ) -> Result { + let plan = DefaultAsPlanImpl::try_from_physical_plan(physical.clone()) + .context(EncodePhysicalSnafu { physical })? + .bytes; + let original_ql = original_ql.unwrap_or_default(); + let select_expr = SelectExpr { + expr: Some(select_expr::Expr::PhysicalPlan(PhysicalPlan { + original_ql: original_ql.into_bytes(), + plan, + })), + }; + self.do_select(select_expr).await + } + + async fn do_select(&self, select_expr: SelectExpr) -> Result { let header = ExprHeader { version: PROTOCOL_VERSION, }; @@ -81,7 +116,11 @@ impl Database { let result = match obj_result { object_result::Result::Select(select) => { - let result = select.raw_data.try_into().context(DecodeSelectSnafu)?; + let result = select + .raw_data + .deref() + .try_into() + .context(DecodeSelectSnafu)?; ObjectResult::Select(result) } object_result::Result::Mutate(mutate) => ObjectResult::Mutate(mutate), @@ -125,3 +164,7 @@ pub enum ObjectResult { Select(GrpcSelectResult), Mutate(GrpcMutateResult), } + +pub enum Select { + Sql(String), +} diff --git a/src/client/src/error.rs b/src/client/src/error.rs index 95a0e74aff..e588ff844b 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -1,5 +1,8 @@ -use api::convert::DecodeError; +use std::sync::Arc; + +use api::serde::DecodeError; use common_error::prelude::*; +use datafusion::physical_plan::ExecutionPlan; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -32,6 +35,13 @@ pub enum Error { #[snafu(display("Error occurred on the data node, code: {}, msg: {}", code, msg))] Datanode { code: u32, msg: String }, + + #[snafu(display("Failed to encode physical plan: {:?}, source: {}", physical, source))] + EncodePhysical { + physical: Arc, + #[snafu(backtrace)] + source: common_grpc::Error, + }, } pub type Result = std::result::Result; diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index b054dddc44..a28abe64b7 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -4,6 +4,6 @@ mod error; pub use self::{ client::Client, - database::{Database, ObjectResult}, + database::{Database, ObjectResult, Select}, error::{Error, Result}, }; diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml new file mode 100644 index 0000000000..c9d49817f7 --- /dev/null +++ b/src/common/grpc/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "common-grpc" +version = "0.1.0" +edition = "2021" + +[dependencies] +api = { path = "../../api" } +async-trait = "0.1" +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +snafu = { version = "0.7", features = ["backtraces"] } + +[dependencies.arrow] +package = "arrow2" +version = "0.10" +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs new file mode 100644 index 0000000000..992167e27e --- /dev/null +++ b/src/common/grpc/src/error.rs @@ -0,0 +1,34 @@ +use api::DecodeError; +use datafusion::error::DataFusionError; +use snafu::{Backtrace, Snafu}; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Unexpected empty physical plan type: {}", name))] + EmptyPhysicalPlan { name: String, backtrace: Backtrace }, + + #[snafu(display("Unexpected empty physical expr: {}", name))] + EmptyPhysicalExpr { name: String, backtrace: Backtrace }, + + #[snafu(display("Unsupported datafusion execution plan: {}", name))] + UnsupportedDfPlan { name: String, backtrace: Backtrace }, + + #[snafu(display("Unsupported datafusion physical expr: {}", name))] + UnsupportedDfExpr { name: String, backtrace: Backtrace }, + + #[snafu(display("Missing required field in protobuf, field: {}", field))] + MissingField { field: String, backtrace: Backtrace }, + + #[snafu(display("Failed to new datafusion projection exec, source: {}", source))] + NewProjection { + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to decode physical plan node, source: {}", source))] + DecodePhysicalPlanNode { + source: DecodeError, + backtrace: Backtrace, + }, +} diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs new file mode 100644 index 0000000000..202f244b9d --- /dev/null +++ b/src/common/grpc/src/lib.rs @@ -0,0 +1,8 @@ +pub mod error; +pub mod physical; + +pub use error::Error; +pub use physical::{ + plan::{DefaultAsPlanImpl, MockExecution}, + AsExcutionPlan, +}; diff --git a/src/common/grpc/src/physical.rs b/src/common/grpc/src/physical.rs new file mode 100644 index 0000000000..2745cadc32 --- /dev/null +++ b/src/common/grpc/src/physical.rs @@ -0,0 +1,18 @@ +mod expr; +pub mod plan; + +use std::{result::Result, sync::Arc}; + +use datafusion::physical_plan::ExecutionPlan; + +pub type ExecutionPlanRef = Arc; + +pub trait AsExcutionPlan { + type Error: std::error::Error; + + fn try_into_physical_plan(&self) -> Result; + + fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result + where + Self: Sized; +} diff --git a/src/common/grpc/src/physical/expr.rs b/src/common/grpc/src/physical/expr.rs new file mode 100644 index 0000000000..79e75ff6e2 --- /dev/null +++ b/src/common/grpc/src/physical/expr.rs @@ -0,0 +1,82 @@ +use std::{result::Result, sync::Arc}; + +use api::v1::codec; +use datafusion::physical_plan::{expressions::Column as DfColumn, PhysicalExpr as DfPhysicalExpr}; +use snafu::OptionExt; + +use crate::error::{EmptyPhysicalExprSnafu, Error, UnsupportedDfExprSnafu}; + +// grpc -> datafusion (physical expr) +pub(crate) fn parse_grpc_physical_expr( + proto: &codec::PhysicalExprNode, +) -> Result, Error> { + let expr_type = proto.expr_type.as_ref().context(EmptyPhysicalExprSnafu { + name: format!("{:?}", proto), + })?; + + // TODO(fys): impl other physical expr + let pexpr: Arc = match expr_type { + codec::physical_expr_node::ExprType::Column(c) => { + let pcol = DfColumn::new(&c.name, c.index as usize); + Arc::new(pcol) + } + }; + Ok(pexpr) +} + +// datafusion -> grpc (physical expr) +pub(crate) fn parse_df_physical_expr( + df_expr: Arc, +) -> Result { + let expr = df_expr.as_any(); + + // TODO(fys): impl other physical expr + if let Some(expr) = expr.downcast_ref::() { + Ok(codec::PhysicalExprNode { + expr_type: Some(codec::physical_expr_node::ExprType::Column( + codec::PhysicalColumn { + name: expr.name().to_string(), + index: expr.index() as u64, + }, + )), + }) + } else { + UnsupportedDfExprSnafu { + name: df_expr.to_string(), + } + .fail()? + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::codec::{physical_expr_node::ExprType::Column, PhysicalColumn, PhysicalExprNode}; + use datafusion::physical_plan::{expressions::Column as DfColumn, PhysicalExpr}; + + use crate::physical::expr::{parse_df_physical_expr, parse_grpc_physical_expr}; + + #[test] + fn test_column_convert() { + // mock df_column_expr + let df_column = DfColumn::new("name", 11); + let df_column_clone = df_column.clone(); + let df_expr = Arc::new(df_column) as Arc; + + // mock grpc_column_expr + let grpc_expr = PhysicalExprNode { + expr_type: Some(Column(PhysicalColumn { + name: "name".to_owned(), + index: 11, + })), + }; + + let result = parse_df_physical_expr(df_expr).unwrap(); + assert_eq!(grpc_expr, result); + + let result = parse_grpc_physical_expr(&grpc_expr).unwrap(); + let df_column = result.as_any().downcast_ref::().unwrap(); + assert_eq!(df_column_clone, df_column.to_owned()); + } +} diff --git a/src/common/grpc/src/physical/plan.rs b/src/common/grpc/src/physical/plan.rs new file mode 100644 index 0000000000..9b5a9079f1 --- /dev/null +++ b/src/common/grpc/src/physical/plan.rs @@ -0,0 +1,269 @@ +use std::{ops::Deref, result::Result, sync::Arc}; + +use api::v1::codec::{ + physical_plan_node::PhysicalPlanType, MockInputExecNode, PhysicalPlanNode, ProjectionExecNode, +}; +use arrow::{ + array::{PrimitiveArray, Utf8Array}, + datatypes::{DataType, Field, Schema}, +}; +use async_trait::async_trait; +use datafusion::{ + execution::runtime_env::RuntimeEnv, + field_util::SchemaExt, + physical_plan::{ + memory::MemoryStream, projection::ProjectionExec, ExecutionPlan, PhysicalExpr, + SendableRecordBatchStream, Statistics, + }, + record_batch::RecordBatch, +}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{ + DecodePhysicalPlanNodeSnafu, EmptyPhysicalPlanSnafu, Error, MissingFieldSnafu, + NewProjectionSnafu, UnsupportedDfPlanSnafu, +}; +use crate::physical::{expr, AsExcutionPlan, ExecutionPlanRef}; + +pub struct DefaultAsPlanImpl { + pub bytes: Vec, +} + +impl AsExcutionPlan for DefaultAsPlanImpl { + type Error = Error; + + // Vec -> PhysicalPlanNode -> ExecutionPlanRef + fn try_into_physical_plan(&self) -> Result { + let physicalplan_node: PhysicalPlanNode = self + .bytes + .deref() + .try_into() + .context(DecodePhysicalPlanNodeSnafu)?; + physicalplan_node.try_into_physical_plan() + } + + // ExecutionPlanRef -> PhysicalPlanNode -> Vec + fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result + where + Self: Sized, + { + let bytes: Vec = PhysicalPlanNode::try_from_physical_plan(plan)?.into(); + Ok(DefaultAsPlanImpl { bytes }) + } +} + +impl AsExcutionPlan for PhysicalPlanNode { + type Error = Error; + + fn try_into_physical_plan(&self) -> Result { + let plan = self + .physical_plan_type + .as_ref() + .context(EmptyPhysicalPlanSnafu { + name: format!("{:?}", self), + })?; + + // TODO(fys): impl other physical plan type + match plan { + PhysicalPlanType::Projection(projection) => { + let input = if let Some(input) = &projection.input { + input.as_ref().try_into_physical_plan()? + } else { + MissingFieldSnafu { field: "input" }.fail()? + }; + let exprs = projection + .expr + .iter() + .zip(projection.expr_name.iter()) + .map(|(expr, name)| { + Ok((expr::parse_grpc_physical_expr(expr)?, name.to_string())) + }) + .collect::, String)>, Error>>()?; + + let projection = + ProjectionExec::try_new(exprs, input).context(NewProjectionSnafu)?; + + Ok(Arc::new(projection)) + } + PhysicalPlanType::Mock(mock) => Ok(Arc::new(MockExecution { + name: mock.name.to_string(), + })), + } + } + + fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result + where + Self: Sized, + { + let plan = plan.as_any(); + + if let Some(exec) = plan.downcast_ref::() { + let input = PhysicalPlanNode::try_from_physical_plan(exec.input().to_owned())?; + + let expr = exec + .expr() + .iter() + .map(|expr| expr::parse_df_physical_expr(expr.0.clone())) + .collect::, Error>>()?; + + let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect(); + + Ok(PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::Projection(Box::new( + ProjectionExecNode { + input: Some(Box::new(input)), + expr, + expr_name, + }, + ))), + }) + } else if let Some(exec) = plan.downcast_ref::() { + Ok(PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::Mock(MockInputExecNode { + name: exec.name.clone(), + })), + }) + } else { + UnsupportedDfPlanSnafu { + name: format!("{:?}", plan), + } + .fail()? + } + } +} + +// TODO(fys): use "test" feature to enable it +#[derive(Debug)] +pub struct MockExecution { + name: String, +} + +impl MockExecution { + pub fn new(name: String) -> Self { + Self { name } + } +} + +#[async_trait] +impl ExecutionPlan for MockExecution { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + let field1 = Field::new("id", DataType::UInt32, false); + let field2 = Field::new("name", DataType::LargeUtf8, false); + let field3 = Field::new("age", DataType::UInt32, false); + Arc::new(arrow::datatypes::Schema::new(vec![field1, field2, field3])) + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + unimplemented!() + } + + fn output_ordering( + &self, + ) -> Option<&[datafusion::physical_plan::expressions::PhysicalSortExpr]> { + unimplemented!() + } + + fn children(&self) -> Vec { + unimplemented!() + } + + fn with_new_children( + &self, + _children: Vec, + ) -> datafusion::error::Result { + unimplemented!() + } + + async fn execute( + &self, + _partition: usize, + _runtime: Arc, + ) -> datafusion::error::Result { + let id_array = Arc::new(PrimitiveArray::from_slice([1u32, 2, 3, 4, 5])); + let name_array = Arc::new(Utf8Array::::from_slice([ + "zhangsan", "lisi", "wangwu", "Tony", "Mike", + ])); + let age_array = Arc::new(PrimitiveArray::from_slice([25u32, 28, 27, 35, 25])); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new("name", DataType::LargeUtf8, false), + Field::new("age", DataType::UInt32, false), + ])); + let record_batch = + RecordBatch::try_new(schema, vec![id_array, name_array, age_array]).unwrap(); + let data: Vec = vec![record_batch]; + let projection = Some(vec![0, 1, 2]); + let stream = MemoryStream::try_new(data, self.schema(), projection).unwrap(); + Ok(Box::pin(stream)) + } + + fn statistics(&self) -> Statistics { + todo!() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::codec::PhysicalPlanNode; + use datafusion::physical_plan::{expressions::Column, projection::ProjectionExec}; + + use crate::physical::{ + plan::{DefaultAsPlanImpl, MockExecution}, + {AsExcutionPlan, ExecutionPlanRef}, + }; + + #[test] + fn test_convert_df_projection_with_bytes() { + let projection_exec = mock_df_projection(); + + let bytes = DefaultAsPlanImpl::try_from_physical_plan(projection_exec).unwrap(); + let exec = bytes.try_into_physical_plan().unwrap(); + + verify_df_porjection(exec); + } + + #[test] + fn test_convert_df_with_grpc_projection() { + let projection_exec = mock_df_projection(); + + let projection_node = PhysicalPlanNode::try_from_physical_plan(projection_exec).unwrap(); + let exec = projection_node.try_into_physical_plan().unwrap(); + + verify_df_porjection(exec); + } + + fn mock_df_projection() -> Arc { + let mock_input = Arc::new(MockExecution { + name: "mock_input".to_string(), + }); + let column1 = Arc::new(Column::new("id", 0)); + let column2 = Arc::new(Column::new("name", 1)); + Arc::new( + ProjectionExec::try_new( + vec![(column1, "id".to_string()), (column2, "name".to_string())], + mock_input, + ) + .unwrap(), + ) + } + + fn verify_df_porjection(exec: ExecutionPlanRef) { + let projection_exec = exec.as_any().downcast_ref::().unwrap(); + let mock_input = projection_exec + .input() + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!("mock_input", mock_input.name); + assert_eq!(2, projection_exec.expr().len()); + assert_eq!("id", projection_exec.expr()[0].1); + assert_eq!("name", projection_exec.expr()[1].1); + } +} diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index b3e85e3a01..e03447f782 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -18,10 +18,12 @@ axum-macros = "0.2" catalog = { path = "../catalog" } common-base = { path = "../common/base" } common-error = { path = "../common/error" } +common-grpc = { path = "../common/grpc" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } datatypes = { path = "../datatypes" } hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 003b54f403..0ef2930969 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,6 +1,6 @@ use std::any::Any; -use api::convert::DecodeError; +use api::serde::DecodeError; use common_error::ext::BoxedError; use common_error::prelude::*; use datatypes::prelude::ConcreteDataType; @@ -11,26 +11,32 @@ use table::error::Error as TableError; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Fail to execute sql, source: {}", source))] + #[snafu(display("Failed to execute sql, source: {}", source))] ExecuteSql { #[snafu(backtrace)] source: query::error::Error, }, - #[snafu(display("Fail to create catalog list, source: {}", source))] + #[snafu(display("Failed to execute physical plan, source: {}", source))] + ExecutePhysicalPlan { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Failed to create catalog list, source: {}", source))] NewCatalog { #[snafu(backtrace)] source: catalog::error::Error, }, - #[snafu(display("Fail to create table: {}, {}", table_name, source))] + #[snafu(display("Failed to create table: {}, source: {}", table_name, source))] CreateTable { table_name: String, #[snafu(backtrace)] source: TableError, }, - #[snafu(display("Fail to get table: {}, {}", table_name, source))] + #[snafu(display("Failed to get table: {}, source: {}", table_name, source))] GetTable { table_name: String, #[snafu(backtrace)] @@ -53,7 +59,7 @@ pub enum Error { ))] ColumnValuesNumberMismatch { columns: usize, values: usize }, - #[snafu(display("Fail to parse value: {}, {}", msg, backtrace))] + #[snafu(display("Failed to parse value: {}, {}", msg, backtrace))] ParseSqlValue { msg: String, backtrace: Backtrace }, #[snafu(display( @@ -68,7 +74,7 @@ pub enum Error { actual: ConcreteDataType, }, - #[snafu(display("Fail to insert value to table: {}, {}", table_name, source))] + #[snafu(display("Failed to insert value to table: {}, source: {}", table_name, source))] Insert { table_name: String, source: TableError, @@ -77,7 +83,7 @@ pub enum Error { #[snafu(display("Illegal insert data"))] IllegalInsertData, - #[snafu(display("Fail to convert bytes to insert batch, {}", source))] + #[snafu(display("Failed to convert bytes to insert batch, source: {}", source))] DecodeInsert { source: DecodeError }, #[snafu(display("Failed to start server, source: {}", source))] @@ -86,19 +92,19 @@ pub enum Error { source: servers::error::Error, }, - #[snafu(display("Fail to parse address {}, source: {}", addr, source))] + #[snafu(display("Failed to parse address {}, source: {}", addr, source))] ParseAddr { addr: String, source: std::net::AddrParseError, }, - #[snafu(display("Fail to bind address {}, source: {}", addr, source))] + #[snafu(display("Failed to bind address {}, source: {}", addr, source))] TcpBind { addr: String, source: std::io::Error, }, - #[snafu(display("Fail to start gRPC server, source: {}", source))] + #[snafu(display("Failed to start gRPC server, source: {}", source))] StartGrpc { source: tonic::transport::Error }, #[snafu(display("Failed to create directory {}, source: {}", dir, source))] @@ -132,12 +138,18 @@ pub enum Error { #[snafu(display("Invalid CREATE TABLE sql statement, cause: {}", msg))] InvalidCreateTableSql { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to create schema when creating table: {}", source))] + #[snafu(display("Failed to create schema when creating table, source: {}", source))] CreateSchema { #[snafu(backtrace)] source: datatypes::error::Error, }, + #[snafu(display("Failed to convert datafusion schema, source: {}", source))] + ConvertSchema { + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + #[snafu(display("SQL data type not supported yet: {:?}", t))] SqlTypeNotSupported { t: sql::ast::DataType, @@ -156,11 +168,17 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to insert into system catalog table: {}", source))] + #[snafu(display("Failed to insert into system catalog table, source: {}", source))] InsertSystemCatalog { #[snafu(backtrace)] source: catalog::error::Error, }, + + #[snafu(display("Failed to decode as physical plan, source: {}", source))] + IntoPhysicalPlan { + #[snafu(backtrace)] + source: common_grpc::Error, + }, } pub type Result = std::result::Result; @@ -169,10 +187,12 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::ExecuteSql { source } => source.status_code(), + Error::ExecutePhysicalPlan { source } => source.status_code(), Error::NewCatalog { source } => source.status_code(), Error::CreateTable { source, .. } => source.status_code(), Error::GetTable { source, .. } => source.status_code(), Error::Insert { source, .. } => source.status_code(), + Error::ConvertSchema { source, .. } => source.status_code(), Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, Error::ColumnValuesNumberMismatch { .. } @@ -193,6 +213,7 @@ impl ErrorExt for Error { | Error::CreateDir { .. } | Error::InsertSystemCatalog { .. } | Error::Conversion { .. } + | Error::IntoPhysicalPlan { .. } | Error::UnsupportedExpr { .. } => StatusCode::Internal, Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index a5f4e4ed75..f2963584d2 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -20,10 +20,12 @@ use table_engine::engine::MitoEngine; use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; use crate::error::{ self, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, TableNotFoundSnafu, + UnsupportedExprSnafu, }; use crate::metric; use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder}; use crate::server::grpc::insert::insertion_expr_to_request; +use crate::server::grpc::plan::PhysicalPlanner; use crate::server::grpc::select::to_object_result; use crate::sql::{SqlHandler, SqlRequest}; @@ -36,6 +38,7 @@ pub struct Instance { sql_handler: SqlHandler, // Catalog list catalog_manager: CatalogManagerRef, + physical_planner: PhysicalPlanner, } pub type InstanceRef = Arc; @@ -63,9 +66,10 @@ impl Instance { let query_engine = factory.query_engine().clone(); Ok(Self { - query_engine, + query_engine: query_engine.clone(), sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()), catalog_manager, + physical_planner: PhysicalPlanner::new(query_engine), }) } @@ -167,12 +171,23 @@ impl Instance { } async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult { - match select_expr.expr { - Some(select_expr::Expr::Sql(sql)) => { - let result = self.execute_sql(&sql).await; - to_object_result(result).await + let result = self.do_handle_select(select_expr).await; + to_object_result(result).await + } + + async fn do_handle_select(&self, select_expr: SelectExpr) -> Result { + let expr = select_expr.expr; + match expr { + Some(select_expr::Expr::Sql(sql)) => self.execute_sql(&sql).await, + Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => { + self.physical_planner + .execute(PhysicalPlanner::parse(plan)?, original_ql) + .await } - None => ObjectResult::default(), + _ => UnsupportedExprSnafu { + name: format!("{:?}", expr), + } + .fail(), } } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 285516957a..bb48357e44 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -1,3 +1,4 @@ pub(crate) mod handler; -pub mod insert; -pub mod select; +pub(crate) mod insert; +pub(crate) mod plan; +pub(crate) mod select; diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index 854899580f..3067f5ec40 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -1,5 +1,6 @@ use std::{ collections::{hash_map::Entry, HashMap}, + ops::Deref, sync::Arc, }; @@ -68,7 +69,7 @@ fn insert_batches(bytes_vec: Vec>) -> Result> { let mut insert_batches = Vec::with_capacity(bytes_vec.len()); for bytes in bytes_vec { - insert_batches.push(bytes.try_into().context(DecodeInsertSnafu)?); + insert_batches.push(bytes.deref().try_into().context(DecodeInsertSnafu)?); } Ok(insert_batches) } diff --git a/src/datanode/src/server/grpc/plan.rs b/src/datanode/src/server/grpc/plan.rs new file mode 100644 index 0000000000..2c6713c7b7 --- /dev/null +++ b/src/datanode/src/server/grpc/plan.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; + +use common_grpc::AsExcutionPlan; +use common_grpc::DefaultAsPlanImpl; +use datatypes::schema::Schema; +use query::PhysicalPlanAdapter; +use query::{plan::PhysicalPlan, Output, QueryEngineRef}; +use snafu::ResultExt; + +use crate::error::Result; +use crate::error::{ConvertSchemaSnafu, ExecutePhysicalPlanSnafu, IntoPhysicalPlanSnafu}; + +pub type PhysicalPlanRef = Arc; + +pub struct PhysicalPlanner { + query_engine: QueryEngineRef, +} + +impl PhysicalPlanner { + pub fn new(query_engine: QueryEngineRef) -> Self { + Self { query_engine } + } + + pub fn parse(bytes: Vec) -> Result { + let physical_plan = DefaultAsPlanImpl { bytes } + .try_into_physical_plan() + .context(IntoPhysicalPlanSnafu)?; + + let schema: Arc = Arc::new( + physical_plan + .schema() + .try_into() + .context(ConvertSchemaSnafu)?, + ); + Ok(Arc::new(PhysicalPlanAdapter::new(schema, physical_plan))) + } + + pub async fn execute(&self, plan: PhysicalPlanRef, _original_ql: Vec) -> Result { + self.query_engine + .execute_physical(&plan) + .await + .context(ExecutePhysicalPlanSnafu) + } +} diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 83c9ca6193..1d3d180c09 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use api::v1::{codec::InsertBatch, column, select_expr, Column, SelectExpr}; +use api::v1::{codec::InsertBatch, column, Column}; use client::{Client, Database, ObjectResult}; use servers::grpc::GrpcServer; use servers::server::Server; @@ -86,10 +86,10 @@ async fn test_insert_and_select() { assert!(result.is_ok()); // select - let select_expr = SelectExpr { - expr: Some(select_expr::Expr::Sql("select * from demo".to_string())), - }; - let result = db.select(select_expr).await.unwrap(); + let result = db + .select(client::Select::Sql("select * from demo".to_string())) + .await + .unwrap(); assert!(matches!(result, ObjectResult::Select(_))); match result { ObjectResult::Select(select_result) => { diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index dda290af72..6e4248bed9 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -2,7 +2,7 @@ mod catalog_adapter; mod error; -mod plan_adapter; +pub mod plan_adapter; mod planner; use std::sync::Arc; @@ -84,6 +84,11 @@ impl QueryEngine for DatafusionQueryEngine { )) } + async fn execute_physical(&self, plan: &Arc) -> Result { + let ctx = QueryContext::new(self.state.clone()); + Ok(Output::RecordBatch(self.execute_stream(&ctx, plan).await?)) + } + fn register_udf(&self, udf: ScalarUdf) { self.state.register_udf(udf); } diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 5af13ae1b3..e7598f4bdc 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -11,6 +11,7 @@ pub mod plan; pub mod planner; pub mod query_engine; +pub use crate::datafusion::plan_adapter::PhysicalPlanAdapter; pub use crate::query_engine::{ Output, QueryContext, QueryEngine, QueryEngineFactory, QueryEngineRef, }; diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 7d0e0dd0a8..56acb830f0 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -12,7 +12,7 @@ use sql::statements::statement::Statement; use crate::datafusion::DatafusionQueryEngine; use crate::error::Result; -use crate::plan::LogicalPlan; +use crate::plan::{LogicalPlan, PhysicalPlan}; pub use crate::query_engine::context::QueryContext; pub use crate::query_engine::state::QueryEngineState; @@ -34,6 +34,8 @@ pub trait QueryEngine: Send + Sync { async fn execute(&self, plan: &LogicalPlan) -> Result; + async fn execute_physical(&self, plan: &Arc) -> Result; + fn register_udf(&self, udf: ScalarUdf); fn register_aggregate_function(&self, func: AggregateFunctionMetaRef);