From 90c832b33ddfda7a2fafaa80bdb33d9a3dcb4687 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 6 Dec 2022 19:23:32 +0800 Subject: [PATCH] refactor: drop support of physical plan query interface (#714) * refactor: drop support of physical plan query interface Signed-off-by: Ruihang Xia * refactor: collapse server/grpc sub-module Signed-off-by: Ruihang Xia * refactor: remove unused errors Signed-off-by: Ruihang Xia Signed-off-by: Ruihang Xia --- src/api/build.rs | 1 - src/api/greptime/v1/database.proto | 6 - src/api/greptime/v1/physical_plan.proto | 33 --- src/api/src/serde.rs | 3 +- src/client/examples/physical.rs | 51 ---- src/client/src/database.rs | 27 +-- src/common/grpc/src/error.rs | 110 +-------- src/common/grpc/src/lib.rs | 3 - src/common/grpc/src/physical.rs | 33 --- src/common/grpc/src/physical/expr.rs | 100 -------- src/common/grpc/src/physical/plan.rs | 280 --------------------- src/datanode/src/instance.rs | 3 - src/datanode/src/instance/grpc.rs | 6 - src/datanode/src/mock.rs | 4 - src/datanode/src/server/grpc.rs | 298 ++++++++++++++++++++++- src/datanode/src/server/grpc/ddl.rs | 310 ------------------------ src/datanode/src/server/grpc/plan.rs | 55 ----- 17 files changed, 301 insertions(+), 1022 deletions(-) delete mode 100644 src/api/greptime/v1/physical_plan.proto delete mode 100644 src/client/examples/physical.rs delete mode 100644 src/common/grpc/src/physical.rs delete mode 100644 src/common/grpc/src/physical/expr.rs delete mode 100644 src/common/grpc/src/physical/plan.rs delete mode 100644 src/datanode/src/server/grpc/ddl.rs delete mode 100644 src/datanode/src/server/grpc/plan.rs diff --git a/src/api/build.rs b/src/api/build.rs index f3ff5f6600..3a3008c481 100644 --- a/src/api/build.rs +++ b/src/api/build.rs @@ -21,7 +21,6 @@ fn main() { .compile( &[ "greptime/v1/select.proto", - "greptime/v1/physical_plan.proto", "greptime/v1/greptime.proto", "greptime/v1/meta/common.proto", "greptime/v1/meta/heartbeat.proto", diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 1cd6a6ee3e..6b65f6386e 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -29,15 +29,9 @@ message SelectExpr { oneof expr { string sql = 1; bytes logical_plan = 2; - PhysicalPlan physical_plan = 15; } } -message PhysicalPlan { - bytes original_ql = 1; - bytes plan = 2; -} - message InsertExpr { string schema_name = 1; string table_name = 2; diff --git a/src/api/greptime/v1/physical_plan.proto b/src/api/greptime/v1/physical_plan.proto deleted file mode 100644 index 58444a5af4..0000000000 --- a/src/api/greptime/v1/physical_plan.proto +++ /dev/null @@ -1,33 +0,0 @@ -syntax = "proto3"; - -package greptime.v1.codec; - -message PhysicalPlanNode { - oneof PhysicalPlanType { - ProjectionExecNode projection = 1; - MockInputExecNode mock = 99; - // TODO(fys): impl other physical plan node - } -} - -message ProjectionExecNode { - PhysicalPlanNode input = 1; - repeated PhysicalExprNode expr = 2; - repeated string expr_name = 3; -} - -message PhysicalExprNode { - oneof ExprType { - PhysicalColumn column = 1; - // TODO(fys): impl other physical expr node - } -} - -message PhysicalColumn { - string name = 1; - uint64 index = 2; -} - -message MockInputExecNode { - string name = 1; -} diff --git a/src/api/src/serde.rs b/src/api/src/serde.rs index 1523bfbcfe..18dd19b5fa 100644 --- a/src/api/src/serde.rs +++ b/src/api/src/serde.rs @@ -15,7 +15,7 @@ pub use prost::DecodeError; use prost::Message; -use crate::v1::codec::{PhysicalPlanNode, SelectResult}; +use crate::v1::codec::SelectResult; use crate::v1::meta::TableRouteValue; macro_rules! impl_convert_with_bytes { @@ -37,7 +37,6 @@ macro_rules! impl_convert_with_bytes { } impl_convert_with_bytes!(SelectResult); -impl_convert_with_bytes!(PhysicalPlanNode); impl_convert_with_bytes!(TableRouteValue); #[cfg(test)] diff --git a/src/client/examples/physical.rs b/src/client/examples/physical.rs deleted file mode 100644 index 4f60fc7c43..0000000000 --- a/src/client/examples/physical.rs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use client::{Client, Database}; -use common_grpc::MockExecution; -use datafusion::physical_plan::expressions::Column; -use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr}; -use tracing::{event, Level}; - -fn main() { - tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) - .unwrap(); - - run(); -} - -#[tokio::main] -async fn run() { - let client = Client::with_urls(vec!["127.0.0.1:3001"]); - let db = Database::new("greptime", client); - - let physical = mock_physical_plan(); - let result = db.physical_plan(physical, None).await; - - event!(Level::INFO, "result: {:#?}", result); -} - -fn mock_physical_plan() -> Arc { - let id_expr = Arc::new(Column::new("id", 0)) as Arc; - let age_expr = Arc::new(Column::new("age", 2)) as Arc; - let expr = vec![(id_expr, "id".to_string()), (age_expr, "age".to_string())]; - - let input = - Arc::new(MockExecution::new("mock_input_exec".to_string())) as Arc; - let projection = ProjectionExec::try_new(expr, input).unwrap(); - Arc::new(projection) -} diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 3228a74cf8..54ab889bf5 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -18,22 +18,17 @@ use api::v1::codec::SelectResult as GrpcSelectResult; use api::v1::column::SemanticType; use api::v1::{ object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr, - MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan, - SelectExpr, + MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, SelectExpr, }; use common_error::status_code::StatusCode; -use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl}; use common_grpc_expr::column_to_vector; use common_query::Output; use common_recordbatch::{RecordBatch, RecordBatches}; -use datafusion::physical_plan::ExecutionPlan; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{ - ColumnToVectorSnafu, ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu, -}; +use crate::error::{ColumnToVectorSnafu, ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu}; use crate::{error, Client, Result}; pub const PROTOCOL_VERSION: u32 = 1; @@ -94,24 +89,6 @@ impl Database { self.do_select(select_expr).await } - pub async fn physical_plan( - &self, - physical: Arc, - original_ql: Option, - ) -> Result { - let plan = DefaultAsPlanImpl::try_from_physical_plan(physical.clone()) - .context(EncodePhysicalSnafu { physical })? - .bytes; - let original_ql = original_ql.unwrap_or_default(); - let select_expr = SelectExpr { - expr: Some(select_expr::Expr::PhysicalPlan(PhysicalPlan { - original_ql: original_ql.into_bytes(), - plan, - })), - }; - self.do_select(select_expr).await - } - pub async fn logical_plan(&self, logical_plan: Vec) -> Result { let select_expr = SelectExpr { expr: Some(select_expr::Expr::LogicalPlan(logical_plan)), diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index bfc326e597..05c1b37d56 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -14,9 +14,7 @@ use std::any::Any; -use api::DecodeError; use common_error::prelude::{ErrorExt, StatusCode}; -use datafusion::error::DataFusionError; use snafu::{Backtrace, ErrorCompat, Snafu}; pub type Result = std::result::Result; @@ -24,33 +22,9 @@ pub type Result = std::result::Result; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Unexpected empty physical plan type: {}", name))] - EmptyPhysicalPlan { name: String, backtrace: Backtrace }, - - #[snafu(display("Unexpected empty physical expr: {}", name))] - EmptyPhysicalExpr { name: String, backtrace: Backtrace }, - - #[snafu(display("Unsupported datafusion execution plan: {}", name))] - UnsupportedDfPlan { name: String, backtrace: Backtrace }, - - #[snafu(display("Unsupported datafusion physical expr: {}", name))] - UnsupportedDfExpr { name: String, backtrace: Backtrace }, - #[snafu(display("Missing required field in protobuf, field: {}", field))] MissingField { field: String, backtrace: Backtrace }, - #[snafu(display("Failed to new datafusion projection exec, source: {}", source))] - NewProjection { - source: DataFusionError, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to decode physical plan node, source: {}", source))] - DecodePhysicalPlanNode { - source: DecodeError, - backtrace: Backtrace, - }, - #[snafu(display( "Write type mismatch, column name: {}, expected: {}, actual: {}", column_name, @@ -89,17 +63,8 @@ pub enum Error { impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::EmptyPhysicalPlan { .. } - | Error::EmptyPhysicalExpr { .. } - | Error::MissingField { .. } - | Error::TypeMismatch { .. } => StatusCode::InvalidArguments, - Error::UnsupportedDfPlan { .. } | Error::UnsupportedDfExpr { .. } => { - StatusCode::Unsupported - } - Error::NewProjection { .. } - | Error::DecodePhysicalPlanNode { .. } - | Error::CreateChannel { .. } - | Error::Conversion { .. } => StatusCode::Internal, + Error::MissingField { .. } | Error::TypeMismatch { .. } => StatusCode::InvalidArguments, + Error::CreateChannel { .. } | Error::Conversion { .. } => StatusCode::Internal, Error::CollectRecordBatches { source } => source.status_code(), Error::ColumnDataType { source } => source.status_code(), } @@ -126,50 +91,6 @@ mod tests { None } - #[test] - fn test_empty_physical_plan_error() { - let e = throw_none_option() - .context(EmptyPhysicalPlanSnafu { name: "test" }) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::InvalidArguments); - } - - #[test] - fn test_empty_physical_expr_error() { - let e = throw_none_option() - .context(EmptyPhysicalExprSnafu { name: "test" }) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::InvalidArguments); - } - - #[test] - fn test_unsupported_df_plan_error() { - let e = throw_none_option() - .context(UnsupportedDfPlanSnafu { name: "test" }) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::Unsupported); - } - - #[test] - fn test_unsupported_df_expr_error() { - let e = throw_none_option() - .context(UnsupportedDfExprSnafu { name: "test" }) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::Unsupported); - } - #[test] fn test_missing_field_error() { let e = throw_none_option() @@ -181,33 +102,6 @@ mod tests { assert_eq!(e.status_code(), StatusCode::InvalidArguments); } - #[test] - fn test_new_projection_error() { - fn throw_df_error() -> StdResult { - Err(DataFusionError::NotImplemented("".to_string())) - } - - let e = throw_df_error().context(NewProjectionSnafu).err().unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::Internal); - } - - #[test] - fn test_decode_physical_plan_node_error() { - fn throw_decode_error() -> StdResult { - Err(DecodeError::new("test")) - } - - let e = throw_decode_error() - .context(DecodePhysicalPlanNodeSnafu) - .err() - .unwrap(); - - assert!(e.backtrace_opt().is_some()); - assert_eq!(e.status_code(), StatusCode::Internal); - } - #[test] fn test_type_mismatch_error() { let e = throw_none_option() diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs index a3d95e0805..9ea0b06cae 100644 --- a/src/common/grpc/src/lib.rs +++ b/src/common/grpc/src/lib.rs @@ -14,10 +14,7 @@ pub mod channel_manager; pub mod error; -pub mod physical; pub mod select; pub mod writer; pub use error::Error; -pub use physical::plan::{DefaultAsPlanImpl, MockExecution}; -pub use physical::AsExecutionPlan; diff --git a/src/common/grpc/src/physical.rs b/src/common/grpc/src/physical.rs deleted file mode 100644 index 40ce20bef6..0000000000 --- a/src/common/grpc/src/physical.rs +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod expr; -pub mod plan; - -use std::result::Result; -use std::sync::Arc; - -use datafusion::physical_plan::ExecutionPlan; - -pub type ExecutionPlanRef = Arc; - -pub trait AsExecutionPlan { - type Error: std::error::Error; - - fn try_into_physical_plan(&self) -> Result; - - fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result - where - Self: Sized; -} diff --git a/src/common/grpc/src/physical/expr.rs b/src/common/grpc/src/physical/expr.rs deleted file mode 100644 index f0186d192e..0000000000 --- a/src/common/grpc/src/physical/expr.rs +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::result::Result; -use std::sync::Arc; - -use api::v1::codec; -use datafusion::physical_plan::expressions::Column as DfColumn; -use datafusion::physical_plan::PhysicalExpr as DfPhysicalExpr; -use snafu::OptionExt; - -use crate::error::{EmptyPhysicalExprSnafu, Error, UnsupportedDfExprSnafu}; - -// grpc -> datafusion (physical expr) -pub(crate) fn parse_grpc_physical_expr( - proto: &codec::PhysicalExprNode, -) -> Result, Error> { - let expr_type = proto.expr_type.as_ref().context(EmptyPhysicalExprSnafu { - name: format!("{:?}", proto), - })?; - - // TODO(fys): impl other physical expr - let pexpr: Arc = match expr_type { - codec::physical_expr_node::ExprType::Column(c) => { - let pcol = DfColumn::new(&c.name, c.index as usize); - Arc::new(pcol) - } - }; - Ok(pexpr) -} - -// datafusion -> grpc (physical expr) -pub(crate) fn parse_df_physical_expr( - df_expr: Arc, -) -> Result { - let expr = df_expr.as_any(); - - // TODO(fys): impl other physical expr - if let Some(expr) = expr.downcast_ref::() { - Ok(codec::PhysicalExprNode { - expr_type: Some(codec::physical_expr_node::ExprType::Column( - codec::PhysicalColumn { - name: expr.name().to_string(), - index: expr.index() as u64, - }, - )), - }) - } else { - UnsupportedDfExprSnafu { - name: df_expr.to_string(), - } - .fail()? - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::v1::codec::physical_expr_node::ExprType::Column; - use api::v1::codec::{PhysicalColumn, PhysicalExprNode}; - use datafusion::physical_plan::expressions::Column as DfColumn; - use datafusion::physical_plan::PhysicalExpr; - - use crate::physical::expr::{parse_df_physical_expr, parse_grpc_physical_expr}; - - #[test] - fn test_column_convert() { - // mock df_column_expr - let df_column = DfColumn::new("name", 11); - let df_column_clone = df_column.clone(); - let df_expr = Arc::new(df_column) as Arc; - - // mock grpc_column_expr - let grpc_expr = PhysicalExprNode { - expr_type: Some(Column(PhysicalColumn { - name: "name".to_owned(), - index: 11, - })), - }; - - let result = parse_df_physical_expr(df_expr).unwrap(); - assert_eq!(grpc_expr, result); - - let result = parse_grpc_physical_expr(&grpc_expr).unwrap(); - let df_column = result.as_any().downcast_ref::().unwrap(); - assert_eq!(df_column_clone, df_column.to_owned()); - } -} diff --git a/src/common/grpc/src/physical/plan.rs b/src/common/grpc/src/physical/plan.rs deleted file mode 100644 index 798f31b452..0000000000 --- a/src/common/grpc/src/physical/plan.rs +++ /dev/null @@ -1,280 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::ops::Deref; -use std::result::Result; -use std::sync::Arc; - -use api::v1::codec::physical_plan_node::PhysicalPlanType; -use api::v1::codec::{MockInputExecNode, PhysicalPlanNode, ProjectionExecNode}; -use async_trait::async_trait; -use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::field_util::SchemaExt; -use datafusion::physical_plan::memory::MemoryStream; -use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::{ - ExecutionPlan, PhysicalExpr, SendableRecordBatchStream, Statistics, -}; -use datafusion::record_batch::RecordBatch; -use datatypes::arrow::array::{PrimitiveArray, Utf8Array}; -use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use snafu::{OptionExt, ResultExt}; - -use crate::error::{ - DecodePhysicalPlanNodeSnafu, EmptyPhysicalPlanSnafu, Error, MissingFieldSnafu, - NewProjectionSnafu, UnsupportedDfPlanSnafu, -}; -use crate::physical::{expr, AsExecutionPlan, ExecutionPlanRef}; - -pub struct DefaultAsPlanImpl { - pub bytes: Vec, -} - -impl AsExecutionPlan for DefaultAsPlanImpl { - type Error = Error; - - // Vec -> PhysicalPlanNode -> ExecutionPlanRef - fn try_into_physical_plan(&self) -> Result { - let physicalplan_node: PhysicalPlanNode = self - .bytes - .deref() - .try_into() - .context(DecodePhysicalPlanNodeSnafu)?; - physicalplan_node.try_into_physical_plan() - } - - // ExecutionPlanRef -> PhysicalPlanNode -> Vec - fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result - where - Self: Sized, - { - let bytes: Vec = PhysicalPlanNode::try_from_physical_plan(plan)?.into(); - Ok(DefaultAsPlanImpl { bytes }) - } -} - -impl AsExecutionPlan for PhysicalPlanNode { - type Error = Error; - - fn try_into_physical_plan(&self) -> Result { - let plan = self - .physical_plan_type - .as_ref() - .context(EmptyPhysicalPlanSnafu { - name: format!("{:?}", self), - })?; - - // TODO(fys): impl other physical plan type - match plan { - PhysicalPlanType::Projection(projection) => { - let input = if let Some(input) = &projection.input { - input.as_ref().try_into_physical_plan()? - } else { - MissingFieldSnafu { field: "input" }.fail()? - }; - let exprs = projection - .expr - .iter() - .zip(projection.expr_name.iter()) - .map(|(expr, name)| { - Ok((expr::parse_grpc_physical_expr(expr)?, name.to_string())) - }) - .collect::, String)>, Error>>()?; - - let projection = - ProjectionExec::try_new(exprs, input).context(NewProjectionSnafu)?; - - Ok(Arc::new(projection)) - } - PhysicalPlanType::Mock(mock) => Ok(Arc::new(MockExecution { - name: mock.name.to_string(), - })), - } - } - - fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result - where - Self: Sized, - { - let plan = plan.as_any(); - - if let Some(exec) = plan.downcast_ref::() { - let input = PhysicalPlanNode::try_from_physical_plan(exec.input().to_owned())?; - - let expr = exec - .expr() - .iter() - .map(|expr| expr::parse_df_physical_expr(expr.0.clone())) - .collect::, Error>>()?; - - let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect(); - - Ok(PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::Projection(Box::new( - ProjectionExecNode { - input: Some(Box::new(input)), - expr, - expr_name, - }, - ))), - }) - } else if let Some(exec) = plan.downcast_ref::() { - Ok(PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::Mock(MockInputExecNode { - name: exec.name.clone(), - })), - }) - } else { - UnsupportedDfPlanSnafu { - name: format!("{:?}", plan), - } - .fail()? - } - } -} - -// TODO(fys): use "test" feature to enable it -#[derive(Debug)] -pub struct MockExecution { - name: String, -} - -impl MockExecution { - pub fn new(name: String) -> Self { - Self { name } - } -} - -#[async_trait] -impl ExecutionPlan for MockExecution { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn schema(&self) -> SchemaRef { - let field1 = Field::new("id", DataType::UInt32, false); - let field2 = Field::new("name", DataType::Utf8, false); - let field3 = Field::new("age", DataType::UInt32, false); - Arc::new(Schema::new(vec![field1, field2, field3])) - } - - fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { - unimplemented!() - } - - fn output_ordering( - &self, - ) -> Option<&[datafusion::physical_plan::expressions::PhysicalSortExpr]> { - unimplemented!() - } - - fn children(&self) -> Vec { - unimplemented!() - } - - fn with_new_children( - &self, - _children: Vec, - ) -> datafusion::error::Result { - unimplemented!() - } - - async fn execute( - &self, - _partition: usize, - _runtime: Arc, - ) -> datafusion::error::Result { - let id_array = Arc::new(PrimitiveArray::from_slice([1u32, 2, 3, 4, 5])); - let name_array = Arc::new(Utf8Array::::from_slice([ - "zhangsan", "lisi", "wangwu", "Tony", "Mike", - ])); - let age_array = Arc::new(PrimitiveArray::from_slice([25u32, 28, 27, 35, 25])); - let schema = Arc::new(Schema::new(vec![ - Field::new("id", DataType::UInt32, false), - Field::new("name", DataType::Utf8, false), - Field::new("age", DataType::UInt32, false), - ])); - let record_batch = - RecordBatch::try_new(schema, vec![id_array, name_array, age_array]).unwrap(); - let data: Vec = vec![record_batch]; - let projection = Some(vec![0, 1, 2]); - let stream = MemoryStream::try_new(data, self.schema(), projection).unwrap(); - Ok(Box::pin(stream)) - } - - fn statistics(&self) -> Statistics { - todo!() - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::v1::codec::PhysicalPlanNode; - use datafusion::physical_plan::expressions::Column; - use datafusion::physical_plan::projection::ProjectionExec; - - use crate::physical::plan::{DefaultAsPlanImpl, MockExecution}; - use crate::physical::{AsExecutionPlan, ExecutionPlanRef}; - - #[test] - fn test_convert_df_projection_with_bytes() { - let projection_exec = mock_df_projection(); - - let bytes = DefaultAsPlanImpl::try_from_physical_plan(projection_exec).unwrap(); - let exec = bytes.try_into_physical_plan().unwrap(); - - verify_df_projection(exec); - } - - #[test] - fn test_convert_df_with_grpc_projection() { - let projection_exec = mock_df_projection(); - - let projection_node = PhysicalPlanNode::try_from_physical_plan(projection_exec).unwrap(); - let exec = projection_node.try_into_physical_plan().unwrap(); - - verify_df_projection(exec); - } - - fn mock_df_projection() -> Arc { - let mock_input = Arc::new(MockExecution { - name: "mock_input".to_string(), - }); - let column1 = Arc::new(Column::new("id", 0)); - let column2 = Arc::new(Column::new("name", 1)); - Arc::new( - ProjectionExec::try_new( - vec![(column1, "id".to_string()), (column2, "name".to_string())], - mock_input, - ) - .unwrap(), - ) - } - - fn verify_df_projection(exec: ExecutionPlanRef) { - let projection_exec = exec.as_any().downcast_ref::().unwrap(); - let mock_input = projection_exec - .input() - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("mock_input", mock_input.name); - assert_eq!(2, projection_exec.expr().len()); - assert_eq!("id", projection_exec.expr()[0].1); - assert_eq!("name", projection_exec.expr()[1].1); - } -} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 27cd13e12e..caaf51b42d 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -45,7 +45,6 @@ use crate::error::{ }; use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; -use crate::server::grpc::plan::PhysicalPlanner; use crate::sql::SqlHandler; mod grpc; @@ -59,7 +58,6 @@ pub struct Instance { pub(crate) query_engine: QueryEngineRef, pub(crate) sql_handler: SqlHandler, pub(crate) catalog_manager: CatalogManagerRef, - pub(crate) physical_planner: PhysicalPlanner, pub(crate) script_executor: ScriptExecutor, pub(crate) table_id_provider: Option, #[allow(unused)] @@ -159,7 +157,6 @@ impl Instance { query_engine.clone(), ), catalog_manager, - physical_planner: PhysicalPlanner::new(query_engine), script_executor, meta_client, heartbeat_task, diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 41998c1590..3f6d531633 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -39,7 +39,6 @@ use crate::error::{ UnsupportedExprSnafu, }; use crate::instance::Instance; -use crate::server::grpc::plan::PhysicalPlanner; impl Instance { pub async fn execute_grpc_insert( @@ -117,11 +116,6 @@ impl Instance { self.execute_sql(&sql, Arc::new(QueryContext::new())).await } Some(select_expr::Expr::LogicalPlan(plan)) => self.execute_logical(plan).await, - Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => { - self.physical_planner - .execute(PhysicalPlanner::parse(plan)?, original_ql) - .await - } _ => UnsupportedExprSnafu { name: format!("{:?}", expr), } diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 73b758cc13..8282169025 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -31,7 +31,6 @@ use crate::error::Result; use crate::heartbeat::HeartbeatTask; use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance}; use crate::script::ScriptExecutor; -use crate::server::grpc::plan::PhysicalPlanner; use crate::sql::SqlHandler; impl Instance { @@ -63,7 +62,6 @@ impl Instance { catalog_manager.clone(), query_engine.clone(), ); - let physical_planner = PhysicalPlanner::new(query_engine.clone()); let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()) .await .unwrap(); @@ -79,7 +77,6 @@ impl Instance { query_engine, sql_handler, catalog_manager, - physical_planner, script_executor, meta_client, heartbeat_task, @@ -133,7 +130,6 @@ impl Instance { query_engine.clone(), ), catalog_manager, - physical_planner: PhysicalPlanner::new(query_engine), script_executor, table_id_provider: Some(Arc::new(LocalTableIdProvider::default())), meta_client: Some(meta_client), diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 1a75f4c571..26108eb020 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -12,5 +12,299 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod ddl; -pub(crate) mod plan; +use std::sync::Arc; + +use api::result::AdminResultBuilder; +use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr}; +use common_error::prelude::{ErrorExt, StatusCode}; +use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; +use common_query::Output; +use common_telemetry::{error, info}; +use futures::TryFutureExt; +use session::context::QueryContext; +use snafu::prelude::*; +use table::requests::DropTableRequest; + +use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu}; +use crate::instance::Instance; +use crate::sql::SqlRequest; + +impl Instance { + /// Handle gRPC create table requests. + pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult { + // Respect CreateExpr's table id and region ids if present, or allocate table id + // from local table id provider and set region id to 0. + let table_id = if let Some(table_id) = expr.table_id { + info!( + "Creating table {:?}.{:?}.{:?} with table id from frontend: {}", + expr.catalog_name, expr.schema_name, expr.table_name, table_id + ); + table_id + } else { + match self.table_id_provider.as_ref() { + None => { + return AdminResultBuilder::default() + .status_code(StatusCode::Internal as u32) + .err_msg("Table id provider absent in standalone mode".to_string()) + .build(); + } + Some(table_id_provider) => { + match table_id_provider + .next_table_id() + .await + .context(BumpTableIdSnafu) + { + Ok(table_id) => { + info!( + "Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}", + &expr.catalog_name, &expr.schema_name, expr.table_name, table_id + ); + table_id + } + Err(e) => { + error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name); + return AdminResultBuilder::default() + .status_code(e.status_code() as u32) + .err_msg(e.to_string()) + .build(); + } + } + } + } + }; + + let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu); + let result = futures::future::ready(request) + .and_then(|request| { + self.sql_handler().execute( + SqlRequest::CreateTable(request), + Arc::new(QueryContext::new()), + ) + }) + .await; + match result { + Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + // Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug. + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), + Err(err) => AdminResultBuilder::default() + .status_code(err.status_code() as u32) + .err_msg(err.to_string()) + .build(), + } + } + + pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult { + let request = match alter_expr_to_request(expr) + .context(AlterExprToRequestSnafu) + .transpose() + { + None => { + return AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(0, 0) + .build() + } + Some(req) => req, + }; + + let result = futures::future::ready(request) + .and_then(|request| { + self.sql_handler() + .execute(SqlRequest::Alter(request), Arc::new(QueryContext::new())) + }) + .await; + match result { + Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as u32, 0) + .build(), + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), + Err(err) => AdminResultBuilder::default() + .status_code(err.status_code() as u32) + .err_msg(err.to_string()) + .build(), + } + } + + pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult { + let req = DropTableRequest { + catalog_name: expr.catalog_name, + schema_name: expr.schema_name, + table_name: expr.table_name, + }; + let result = self + .sql_handler() + .execute(SqlRequest::DropTable(req), Arc::new(QueryContext::new())) + .await; + match result { + Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() + .status_code(StatusCode::Success as u32) + .mutate_result(rows as _, 0) + .build(), + Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), + Err(err) => AdminResultBuilder::default() + .status_code(err.status_code() as u32) + .err_msg(err.to_string()) + .build(), + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::{ColumnDataType, ColumnDef}; + use common_catalog::consts::MIN_USER_TABLE_ID; + use common_grpc_expr::create_table_schema; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef}; + use datatypes::value::Value; + + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn test_create_expr_to_request() { + common_telemetry::init_default_ut_logging(); + let expr = testing_create_expr(); + let request = create_expr_to_request(1024, expr).unwrap(); + assert_eq!(request.id, MIN_USER_TABLE_ID); + assert_eq!(request.catalog_name, "greptime".to_string()); + assert_eq!(request.schema_name, "public".to_string()); + assert_eq!(request.table_name, "my-metrics"); + assert_eq!(request.desc, Some("blabla".to_string())); + assert_eq!(request.schema, expected_table_schema()); + assert_eq!(request.primary_key_indices, vec![1, 0]); + assert!(request.create_if_not_exists); + + let mut expr = testing_create_expr(); + expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; + let result = create_expr_to_request(1025, expr); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"), + "{}", + err_msg + ); + } + + #[test] + fn test_create_table_schema() { + let mut expr = testing_create_expr(); + let schema = create_table_schema(&expr).unwrap(); + assert_eq!(schema, expected_table_schema()); + + expr.time_index = "not-exist-column".to_string(); + let result = create_table_schema(&expr); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Missing timestamp column"), + "actual: {}", + err_msg + ); + } + + #[test] + + fn test_create_column_schema() { + let column_def = ColumnDef { + name: "a".to_string(), + datatype: 1024, + is_nullable: true, + default_constraint: None, + }; + let result = column_def.try_as_column_schema(); + assert!(matches!( + result.unwrap_err(), + api::error::Error::UnknownColumnDataType { .. } + )); + + let column_def = ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: None, + }; + let column_schema = column_def.try_as_column_schema().unwrap(); + assert_eq!(column_schema.name, "a"); + assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); + assert!(column_schema.is_nullable()); + + let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value")); + let column_def = ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: Some(default_constraint.clone().try_into().unwrap()), + }; + let column_schema = column_def.try_as_column_schema().unwrap(); + assert_eq!(column_schema.name, "a"); + assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); + assert!(column_schema.is_nullable()); + assert_eq!( + default_constraint, + *column_schema.default_constraint().unwrap() + ); + } + + fn testing_create_expr() -> CreateExpr { + let column_defs = vec![ + ColumnDef { + name: "host".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: false, + default_constraint: None, + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::Timestamp as i32, + is_nullable: false, + default_constraint: None, + }, + ColumnDef { + name: "cpu".to_string(), + datatype: ColumnDataType::Float32 as i32, + is_nullable: true, + default_constraint: None, + }, + ColumnDef { + name: "memory".to_string(), + datatype: ColumnDataType::Float64 as i32, + is_nullable: true, + default_constraint: None, + }, + ]; + CreateExpr { + catalog_name: None, + schema_name: None, + table_name: "my-metrics".to_string(), + desc: Some("blabla".to_string()), + column_defs, + time_index: "ts".to_string(), + primary_keys: vec!["ts".to_string(), "host".to_string()], + create_if_not_exists: true, + table_options: Default::default(), + table_id: Some(MIN_USER_TABLE_ID), + region_ids: vec![0], + } + } + + fn expected_table_schema() -> SchemaRef { + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false) + .with_time_index(true), + ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ]; + Arc::new( + SchemaBuilder::try_from(column_schemas) + .unwrap() + .build() + .unwrap(), + ) + } +} diff --git a/src/datanode/src/server/grpc/ddl.rs b/src/datanode/src/server/grpc/ddl.rs deleted file mode 100644 index 26108eb020..0000000000 --- a/src/datanode/src/server/grpc/ddl.rs +++ /dev/null @@ -1,310 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use api::result::AdminResultBuilder; -use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr}; -use common_error::prelude::{ErrorExt, StatusCode}; -use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; -use common_query::Output; -use common_telemetry::{error, info}; -use futures::TryFutureExt; -use session::context::QueryContext; -use snafu::prelude::*; -use table::requests::DropTableRequest; - -use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu}; -use crate::instance::Instance; -use crate::sql::SqlRequest; - -impl Instance { - /// Handle gRPC create table requests. - pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult { - // Respect CreateExpr's table id and region ids if present, or allocate table id - // from local table id provider and set region id to 0. - let table_id = if let Some(table_id) = expr.table_id { - info!( - "Creating table {:?}.{:?}.{:?} with table id from frontend: {}", - expr.catalog_name, expr.schema_name, expr.table_name, table_id - ); - table_id - } else { - match self.table_id_provider.as_ref() { - None => { - return AdminResultBuilder::default() - .status_code(StatusCode::Internal as u32) - .err_msg("Table id provider absent in standalone mode".to_string()) - .build(); - } - Some(table_id_provider) => { - match table_id_provider - .next_table_id() - .await - .context(BumpTableIdSnafu) - { - Ok(table_id) => { - info!( - "Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}", - &expr.catalog_name, &expr.schema_name, expr.table_name, table_id - ); - table_id - } - Err(e) => { - error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name); - return AdminResultBuilder::default() - .status_code(e.status_code() as u32) - .err_msg(e.to_string()) - .build(); - } - } - } - } - }; - - let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu); - let result = futures::future::ready(request) - .and_then(|request| { - self.sql_handler().execute( - SqlRequest::CreateTable(request), - Arc::new(QueryContext::new()), - ) - }) - .await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - // Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug. - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } - } - - pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult { - let request = match alter_expr_to_request(expr) - .context(AlterExprToRequestSnafu) - .transpose() - { - None => { - return AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(0, 0) - .build() - } - Some(req) => req, - }; - - let result = futures::future::ready(request) - .and_then(|request| { - self.sql_handler() - .execute(SqlRequest::Alter(request), Arc::new(QueryContext::new())) - }) - .await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } - } - - pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult { - let req = DropTableRequest { - catalog_name: expr.catalog_name, - schema_name: expr.schema_name, - table_name: expr.table_name, - }; - let result = self - .sql_handler() - .execute(SqlRequest::DropTable(req), Arc::new(QueryContext::new())) - .await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as _, 0) - .build(), - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use api::v1::{ColumnDataType, ColumnDef}; - use common_catalog::consts::MIN_USER_TABLE_ID; - use common_grpc_expr::create_table_schema; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef}; - use datatypes::value::Value; - - use super::*; - - #[tokio::test(flavor = "multi_thread")] - async fn test_create_expr_to_request() { - common_telemetry::init_default_ut_logging(); - let expr = testing_create_expr(); - let request = create_expr_to_request(1024, expr).unwrap(); - assert_eq!(request.id, MIN_USER_TABLE_ID); - assert_eq!(request.catalog_name, "greptime".to_string()); - assert_eq!(request.schema_name, "public".to_string()); - assert_eq!(request.table_name, "my-metrics"); - assert_eq!(request.desc, Some("blabla".to_string())); - assert_eq!(request.schema, expected_table_schema()); - assert_eq!(request.primary_key_indices, vec![1, 0]); - assert!(request.create_if_not_exists); - - let mut expr = testing_create_expr(); - expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()]; - let result = create_expr_to_request(1025, expr); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"), - "{}", - err_msg - ); - } - - #[test] - fn test_create_table_schema() { - let mut expr = testing_create_expr(); - let schema = create_table_schema(&expr).unwrap(); - assert_eq!(schema, expected_table_schema()); - - expr.time_index = "not-exist-column".to_string(); - let result = create_table_schema(&expr); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("Missing timestamp column"), - "actual: {}", - err_msg - ); - } - - #[test] - - fn test_create_column_schema() { - let column_def = ColumnDef { - name: "a".to_string(), - datatype: 1024, - is_nullable: true, - default_constraint: None, - }; - let result = column_def.try_as_column_schema(); - assert!(matches!( - result.unwrap_err(), - api::error::Error::UnknownColumnDataType { .. } - )); - - let column_def = ColumnDef { - name: "a".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: None, - }; - let column_schema = column_def.try_as_column_schema().unwrap(); - assert_eq!(column_schema.name, "a"); - assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); - assert!(column_schema.is_nullable()); - - let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value")); - let column_def = ColumnDef { - name: "a".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: Some(default_constraint.clone().try_into().unwrap()), - }; - let column_schema = column_def.try_as_column_schema().unwrap(); - assert_eq!(column_schema.name, "a"); - assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype()); - assert!(column_schema.is_nullable()); - assert_eq!( - default_constraint, - *column_schema.default_constraint().unwrap() - ); - } - - fn testing_create_expr() -> CreateExpr { - let column_defs = vec![ - ColumnDef { - name: "host".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: false, - default_constraint: None, - }, - ColumnDef { - name: "ts".to_string(), - datatype: ColumnDataType::Timestamp as i32, - is_nullable: false, - default_constraint: None, - }, - ColumnDef { - name: "cpu".to_string(), - datatype: ColumnDataType::Float32 as i32, - is_nullable: true, - default_constraint: None, - }, - ColumnDef { - name: "memory".to_string(), - datatype: ColumnDataType::Float64 as i32, - is_nullable: true, - default_constraint: None, - }, - ]; - CreateExpr { - catalog_name: None, - schema_name: None, - table_name: "my-metrics".to_string(), - desc: Some("blabla".to_string()), - column_defs, - time_index: "ts".to_string(), - primary_keys: vec!["ts".to_string(), "host".to_string()], - create_if_not_exists: true, - table_options: Default::default(), - table_id: Some(MIN_USER_TABLE_ID), - region_ids: vec![0], - } - } - - fn expected_table_schema() -> SchemaRef { - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false) - .with_time_index(true), - ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ]; - Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .unwrap(), - ) - } -} diff --git a/src/datanode/src/server/grpc/plan.rs b/src/datanode/src/server/grpc/plan.rs deleted file mode 100644 index 5c228852f2..0000000000 --- a/src/datanode/src/server/grpc/plan.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl}; -use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef}; -use common_query::Output; -use datatypes::schema::Schema; -use query::QueryEngineRef; -use snafu::ResultExt; - -use crate::error::{ConvertSchemaSnafu, ExecutePhysicalPlanSnafu, IntoPhysicalPlanSnafu, Result}; - -pub struct PhysicalPlanner { - query_engine: QueryEngineRef, -} - -impl PhysicalPlanner { - pub fn new(query_engine: QueryEngineRef) -> Self { - Self { query_engine } - } - - pub fn parse(bytes: Vec) -> Result { - let physical_plan = DefaultAsPlanImpl { bytes } - .try_into_physical_plan() - .context(IntoPhysicalPlanSnafu)?; - - let schema: Arc = Arc::new( - physical_plan - .schema() - .try_into() - .context(ConvertSchemaSnafu)?, - ); - Ok(Arc::new(PhysicalPlanAdapter::new(schema, physical_plan))) - } - - pub async fn execute(&self, plan: PhysicalPlanRef, _original_ql: Vec) -> Result { - self.query_engine - .execute_physical(&plan) - .await - .context(ExecutePhysicalPlanSnafu) - } -}