refactor: drop support of physical plan query interface (#714)

* refactor: drop support of physical plan query interface

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: collapse server/grpc sub-module

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: remove unused errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2022-12-06 19:23:32 +08:00
committed by GitHub
parent 8959dbcef8
commit 90c832b33d
17 changed files with 301 additions and 1022 deletions

View File

@@ -21,7 +21,6 @@ fn main() {
.compile(
&[
"greptime/v1/select.proto",
"greptime/v1/physical_plan.proto",
"greptime/v1/greptime.proto",
"greptime/v1/meta/common.proto",
"greptime/v1/meta/heartbeat.proto",

View File

@@ -29,15 +29,9 @@ message SelectExpr {
oneof expr {
string sql = 1;
bytes logical_plan = 2;
PhysicalPlan physical_plan = 15;
}
}
message PhysicalPlan {
bytes original_ql = 1;
bytes plan = 2;
}
message InsertExpr {
string schema_name = 1;
string table_name = 2;

View File

@@ -1,33 +0,0 @@
syntax = "proto3";
package greptime.v1.codec;
message PhysicalPlanNode {
oneof PhysicalPlanType {
ProjectionExecNode projection = 1;
MockInputExecNode mock = 99;
// TODO(fys): impl other physical plan node
}
}
message ProjectionExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalExprNode expr = 2;
repeated string expr_name = 3;
}
message PhysicalExprNode {
oneof ExprType {
PhysicalColumn column = 1;
// TODO(fys): impl other physical expr node
}
}
message PhysicalColumn {
string name = 1;
uint64 index = 2;
}
message MockInputExecNode {
string name = 1;
}

View File

@@ -15,7 +15,7 @@
pub use prost::DecodeError;
use prost::Message;
use crate::v1::codec::{PhysicalPlanNode, SelectResult};
use crate::v1::codec::SelectResult;
use crate::v1::meta::TableRouteValue;
macro_rules! impl_convert_with_bytes {
@@ -37,7 +37,6 @@ macro_rules! impl_convert_with_bytes {
}
impl_convert_with_bytes!(SelectResult);
impl_convert_with_bytes!(PhysicalPlanNode);
impl_convert_with_bytes!(TableRouteValue);
#[cfg(test)]

View File

@@ -1,51 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use client::{Client, Database};
use common_grpc::MockExecution;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::{ExecutionPlan, PhysicalExpr};
use tracing::{event, Level};
fn main() {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish())
.unwrap();
run();
}
#[tokio::main]
async fn run() {
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let db = Database::new("greptime", client);
let physical = mock_physical_plan();
let result = db.physical_plan(physical, None).await;
event!(Level::INFO, "result: {:#?}", result);
}
fn mock_physical_plan() -> Arc<dyn ExecutionPlan> {
let id_expr = Arc::new(Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
let age_expr = Arc::new(Column::new("age", 2)) as Arc<dyn PhysicalExpr>;
let expr = vec![(id_expr, "id".to_string()), (age_expr, "age".to_string())];
let input =
Arc::new(MockExecution::new("mock_input_exec".to_string())) as Arc<dyn ExecutionPlan>;
let projection = ProjectionExec::try_new(expr, input).unwrap();
Arc::new(projection)
}

View File

@@ -18,22 +18,17 @@ use api::v1::codec::SelectResult as GrpcSelectResult;
use api::v1::column::SemanticType;
use api::v1::{
object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan,
SelectExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, SelectExpr,
};
use common_error::status_code::StatusCode;
use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl};
use common_grpc_expr::column_to_vector;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::physical_plan::ExecutionPlan;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
ColumnToVectorSnafu, ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu, EncodePhysicalSnafu,
};
use crate::error::{ColumnToVectorSnafu, ConvertSchemaSnafu, DatanodeSnafu, DecodeSelectSnafu};
use crate::{error, Client, Result};
pub const PROTOCOL_VERSION: u32 = 1;
@@ -94,24 +89,6 @@ impl Database {
self.do_select(select_expr).await
}
pub async fn physical_plan(
&self,
physical: Arc<dyn ExecutionPlan>,
original_ql: Option<String>,
) -> Result<ObjectResult> {
let plan = DefaultAsPlanImpl::try_from_physical_plan(physical.clone())
.context(EncodePhysicalSnafu { physical })?
.bytes;
let original_ql = original_ql.unwrap_or_default();
let select_expr = SelectExpr {
expr: Some(select_expr::Expr::PhysicalPlan(PhysicalPlan {
original_ql: original_ql.into_bytes(),
plan,
})),
};
self.do_select(select_expr).await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<ObjectResult> {
let select_expr = SelectExpr {
expr: Some(select_expr::Expr::LogicalPlan(logical_plan)),

View File

@@ -14,9 +14,7 @@
use std::any::Any;
use api::DecodeError;
use common_error::prelude::{ErrorExt, StatusCode};
use datafusion::error::DataFusionError;
use snafu::{Backtrace, ErrorCompat, Snafu};
pub type Result<T> = std::result::Result<T, Error>;
@@ -24,33 +22,9 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Unexpected empty physical plan type: {}", name))]
EmptyPhysicalPlan { name: String, backtrace: Backtrace },
#[snafu(display("Unexpected empty physical expr: {}", name))]
EmptyPhysicalExpr { name: String, backtrace: Backtrace },
#[snafu(display("Unsupported datafusion execution plan: {}", name))]
UnsupportedDfPlan { name: String, backtrace: Backtrace },
#[snafu(display("Unsupported datafusion physical expr: {}", name))]
UnsupportedDfExpr { name: String, backtrace: Backtrace },
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display("Failed to new datafusion projection exec, source: {}", source))]
NewProjection {
source: DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Failed to decode physical plan node, source: {}", source))]
DecodePhysicalPlanNode {
source: DecodeError,
backtrace: Backtrace,
},
#[snafu(display(
"Write type mismatch, column name: {}, expected: {}, actual: {}",
column_name,
@@ -89,17 +63,8 @@ pub enum Error {
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::EmptyPhysicalPlan { .. }
| Error::EmptyPhysicalExpr { .. }
| Error::MissingField { .. }
| Error::TypeMismatch { .. } => StatusCode::InvalidArguments,
Error::UnsupportedDfPlan { .. } | Error::UnsupportedDfExpr { .. } => {
StatusCode::Unsupported
}
Error::NewProjection { .. }
| Error::DecodePhysicalPlanNode { .. }
| Error::CreateChannel { .. }
| Error::Conversion { .. } => StatusCode::Internal,
Error::MissingField { .. } | Error::TypeMismatch { .. } => StatusCode::InvalidArguments,
Error::CreateChannel { .. } | Error::Conversion { .. } => StatusCode::Internal,
Error::CollectRecordBatches { source } => source.status_code(),
Error::ColumnDataType { source } => source.status_code(),
}
@@ -126,50 +91,6 @@ mod tests {
None
}
#[test]
fn test_empty_physical_plan_error() {
let e = throw_none_option()
.context(EmptyPhysicalPlanSnafu { name: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_empty_physical_expr_error() {
let e = throw_none_option()
.context(EmptyPhysicalExprSnafu { name: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_unsupported_df_plan_error() {
let e = throw_none_option()
.context(UnsupportedDfPlanSnafu { name: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Unsupported);
}
#[test]
fn test_unsupported_df_expr_error() {
let e = throw_none_option()
.context(UnsupportedDfExprSnafu { name: "test" })
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Unsupported);
}
#[test]
fn test_missing_field_error() {
let e = throw_none_option()
@@ -181,33 +102,6 @@ mod tests {
assert_eq!(e.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_new_projection_error() {
fn throw_df_error() -> StdResult<DataFusionError> {
Err(DataFusionError::NotImplemented("".to_string()))
}
let e = throw_df_error().context(NewProjectionSnafu).err().unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_decode_physical_plan_node_error() {
fn throw_decode_error() -> StdResult<DecodeError> {
Err(DecodeError::new("test"))
}
let e = throw_decode_error()
.context(DecodePhysicalPlanNodeSnafu)
.err()
.unwrap();
assert!(e.backtrace_opt().is_some());
assert_eq!(e.status_code(), StatusCode::Internal);
}
#[test]
fn test_type_mismatch_error() {
let e = throw_none_option()

View File

@@ -14,10 +14,7 @@
pub mod channel_manager;
pub mod error;
pub mod physical;
pub mod select;
pub mod writer;
pub use error::Error;
pub use physical::plan::{DefaultAsPlanImpl, MockExecution};
pub use physical::AsExecutionPlan;

View File

@@ -1,33 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod expr;
pub mod plan;
use std::result::Result;
use std::sync::Arc;
use datafusion::physical_plan::ExecutionPlan;
pub type ExecutionPlanRef = Arc<dyn ExecutionPlan>;
pub trait AsExecutionPlan {
type Error: std::error::Error;
fn try_into_physical_plan(&self) -> Result<ExecutionPlanRef, Self::Error>;
fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result<Self, Self::Error>
where
Self: Sized;
}

View File

@@ -1,100 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::result::Result;
use std::sync::Arc;
use api::v1::codec;
use datafusion::physical_plan::expressions::Column as DfColumn;
use datafusion::physical_plan::PhysicalExpr as DfPhysicalExpr;
use snafu::OptionExt;
use crate::error::{EmptyPhysicalExprSnafu, Error, UnsupportedDfExprSnafu};
// grpc -> datafusion (physical expr)
pub(crate) fn parse_grpc_physical_expr(
proto: &codec::PhysicalExprNode,
) -> Result<Arc<dyn DfPhysicalExpr>, Error> {
let expr_type = proto.expr_type.as_ref().context(EmptyPhysicalExprSnafu {
name: format!("{:?}", proto),
})?;
// TODO(fys): impl other physical expr
let pexpr: Arc<dyn DfPhysicalExpr> = match expr_type {
codec::physical_expr_node::ExprType::Column(c) => {
let pcol = DfColumn::new(&c.name, c.index as usize);
Arc::new(pcol)
}
};
Ok(pexpr)
}
// datafusion -> grpc (physical expr)
pub(crate) fn parse_df_physical_expr(
df_expr: Arc<dyn DfPhysicalExpr>,
) -> Result<codec::PhysicalExprNode, Error> {
let expr = df_expr.as_any();
// TODO(fys): impl other physical expr
if let Some(expr) = expr.downcast_ref::<DfColumn>() {
Ok(codec::PhysicalExprNode {
expr_type: Some(codec::physical_expr_node::ExprType::Column(
codec::PhysicalColumn {
name: expr.name().to_string(),
index: expr.index() as u64,
},
)),
})
} else {
UnsupportedDfExprSnafu {
name: df_expr.to_string(),
}
.fail()?
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::codec::physical_expr_node::ExprType::Column;
use api::v1::codec::{PhysicalColumn, PhysicalExprNode};
use datafusion::physical_plan::expressions::Column as DfColumn;
use datafusion::physical_plan::PhysicalExpr;
use crate::physical::expr::{parse_df_physical_expr, parse_grpc_physical_expr};
#[test]
fn test_column_convert() {
// mock df_column_expr
let df_column = DfColumn::new("name", 11);
let df_column_clone = df_column.clone();
let df_expr = Arc::new(df_column) as Arc<dyn PhysicalExpr>;
// mock grpc_column_expr
let grpc_expr = PhysicalExprNode {
expr_type: Some(Column(PhysicalColumn {
name: "name".to_owned(),
index: 11,
})),
};
let result = parse_df_physical_expr(df_expr).unwrap();
assert_eq!(grpc_expr, result);
let result = parse_grpc_physical_expr(&grpc_expr).unwrap();
let df_column = result.as_any().downcast_ref::<DfColumn>().unwrap();
assert_eq!(df_column_clone, df_column.to_owned());
}
}

View File

@@ -1,280 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use std::result::Result;
use std::sync::Arc;
use api::v1::codec::physical_plan_node::PhysicalPlanType;
use api::v1::codec::{MockInputExecNode, PhysicalPlanNode, ProjectionExecNode};
use async_trait::async_trait;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::field_util::SchemaExt;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::{
ExecutionPlan, PhysicalExpr, SendableRecordBatchStream, Statistics,
};
use datafusion::record_batch::RecordBatch;
use datatypes::arrow::array::{PrimitiveArray, Utf8Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use snafu::{OptionExt, ResultExt};
use crate::error::{
DecodePhysicalPlanNodeSnafu, EmptyPhysicalPlanSnafu, Error, MissingFieldSnafu,
NewProjectionSnafu, UnsupportedDfPlanSnafu,
};
use crate::physical::{expr, AsExecutionPlan, ExecutionPlanRef};
pub struct DefaultAsPlanImpl {
pub bytes: Vec<u8>,
}
impl AsExecutionPlan for DefaultAsPlanImpl {
type Error = Error;
// Vec<u8> -> PhysicalPlanNode -> ExecutionPlanRef
fn try_into_physical_plan(&self) -> Result<ExecutionPlanRef, Self::Error> {
let physicalplan_node: PhysicalPlanNode = self
.bytes
.deref()
.try_into()
.context(DecodePhysicalPlanNodeSnafu)?;
physicalplan_node.try_into_physical_plan()
}
// ExecutionPlanRef -> PhysicalPlanNode -> Vec<u8>
fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result<Self, Self::Error>
where
Self: Sized,
{
let bytes: Vec<u8> = PhysicalPlanNode::try_from_physical_plan(plan)?.into();
Ok(DefaultAsPlanImpl { bytes })
}
}
impl AsExecutionPlan for PhysicalPlanNode {
type Error = Error;
fn try_into_physical_plan(&self) -> Result<ExecutionPlanRef, Self::Error> {
let plan = self
.physical_plan_type
.as_ref()
.context(EmptyPhysicalPlanSnafu {
name: format!("{:?}", self),
})?;
// TODO(fys): impl other physical plan type
match plan {
PhysicalPlanType::Projection(projection) => {
let input = if let Some(input) = &projection.input {
input.as_ref().try_into_physical_plan()?
} else {
MissingFieldSnafu { field: "input" }.fail()?
};
let exprs = projection
.expr
.iter()
.zip(projection.expr_name.iter())
.map(|(expr, name)| {
Ok((expr::parse_grpc_physical_expr(expr)?, name.to_string()))
})
.collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>, Error>>()?;
let projection =
ProjectionExec::try_new(exprs, input).context(NewProjectionSnafu)?;
Ok(Arc::new(projection))
}
PhysicalPlanType::Mock(mock) => Ok(Arc::new(MockExecution {
name: mock.name.to_string(),
})),
}
}
fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result<Self, Self::Error>
where
Self: Sized,
{
let plan = plan.as_any();
if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
let input = PhysicalPlanNode::try_from_physical_plan(exec.input().to_owned())?;
let expr = exec
.expr()
.iter()
.map(|expr| expr::parse_df_physical_expr(expr.0.clone()))
.collect::<Result<Vec<_>, Error>>()?;
let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
Ok(PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
ProjectionExecNode {
input: Some(Box::new(input)),
expr,
expr_name,
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<MockExecution>() {
Ok(PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Mock(MockInputExecNode {
name: exec.name.clone(),
})),
})
} else {
UnsupportedDfPlanSnafu {
name: format!("{:?}", plan),
}
.fail()?
}
}
}
// TODO(fys): use "test" feature to enable it
#[derive(Debug)]
pub struct MockExecution {
name: String,
}
impl MockExecution {
pub fn new(name: String) -> Self {
Self { name }
}
}
#[async_trait]
impl ExecutionPlan for MockExecution {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
let field1 = Field::new("id", DataType::UInt32, false);
let field2 = Field::new("name", DataType::Utf8, false);
let field3 = Field::new("age", DataType::UInt32, false);
Arc::new(Schema::new(vec![field1, field2, field3]))
}
fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
unimplemented!()
}
fn output_ordering(
&self,
) -> Option<&[datafusion::physical_plan::expressions::PhysicalSortExpr]> {
unimplemented!()
}
fn children(&self) -> Vec<ExecutionPlanRef> {
unimplemented!()
}
fn with_new_children(
&self,
_children: Vec<ExecutionPlanRef>,
) -> datafusion::error::Result<ExecutionPlanRef> {
unimplemented!()
}
async fn execute(
&self,
_partition: usize,
_runtime: Arc<RuntimeEnv>,
) -> datafusion::error::Result<SendableRecordBatchStream> {
let id_array = Arc::new(PrimitiveArray::from_slice([1u32, 2, 3, 4, 5]));
let name_array = Arc::new(Utf8Array::<i32>::from_slice([
"zhangsan", "lisi", "wangwu", "Tony", "Mike",
]));
let age_array = Arc::new(PrimitiveArray::from_slice([25u32, 28, 27, 35, 25]));
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::UInt32, false),
]));
let record_batch =
RecordBatch::try_new(schema, vec![id_array, name_array, age_array]).unwrap();
let data: Vec<RecordBatch> = vec![record_batch];
let projection = Some(vec![0, 1, 2]);
let stream = MemoryStream::try_new(data, self.schema(), projection).unwrap();
Ok(Box::pin(stream))
}
fn statistics(&self) -> Statistics {
todo!()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::codec::PhysicalPlanNode;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::projection::ProjectionExec;
use crate::physical::plan::{DefaultAsPlanImpl, MockExecution};
use crate::physical::{AsExecutionPlan, ExecutionPlanRef};
#[test]
fn test_convert_df_projection_with_bytes() {
let projection_exec = mock_df_projection();
let bytes = DefaultAsPlanImpl::try_from_physical_plan(projection_exec).unwrap();
let exec = bytes.try_into_physical_plan().unwrap();
verify_df_projection(exec);
}
#[test]
fn test_convert_df_with_grpc_projection() {
let projection_exec = mock_df_projection();
let projection_node = PhysicalPlanNode::try_from_physical_plan(projection_exec).unwrap();
let exec = projection_node.try_into_physical_plan().unwrap();
verify_df_projection(exec);
}
fn mock_df_projection() -> Arc<ProjectionExec> {
let mock_input = Arc::new(MockExecution {
name: "mock_input".to_string(),
});
let column1 = Arc::new(Column::new("id", 0));
let column2 = Arc::new(Column::new("name", 1));
Arc::new(
ProjectionExec::try_new(
vec![(column1, "id".to_string()), (column2, "name".to_string())],
mock_input,
)
.unwrap(),
)
}
fn verify_df_projection(exec: ExecutionPlanRef) {
let projection_exec = exec.as_any().downcast_ref::<ProjectionExec>().unwrap();
let mock_input = projection_exec
.input()
.as_any()
.downcast_ref::<MockExecution>()
.unwrap();
assert_eq!("mock_input", mock_input.name);
assert_eq!(2, projection_exec.expr().len());
assert_eq!("id", projection_exec.expr()[0].1);
assert_eq!("name", projection_exec.expr()[1].1);
}
}

View File

@@ -45,7 +45,6 @@ use crate::error::{
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
use crate::server::grpc::plan::PhysicalPlanner;
use crate::sql::SqlHandler;
mod grpc;
@@ -59,7 +58,6 @@ pub struct Instance {
pub(crate) query_engine: QueryEngineRef,
pub(crate) sql_handler: SqlHandler,
pub(crate) catalog_manager: CatalogManagerRef,
pub(crate) physical_planner: PhysicalPlanner,
pub(crate) script_executor: ScriptExecutor,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
#[allow(unused)]
@@ -159,7 +157,6 @@ impl Instance {
query_engine.clone(),
),
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,
meta_client,
heartbeat_task,

View File

@@ -39,7 +39,6 @@ use crate::error::{
UnsupportedExprSnafu,
};
use crate::instance::Instance;
use crate::server::grpc::plan::PhysicalPlanner;
impl Instance {
pub async fn execute_grpc_insert(
@@ -117,11 +116,6 @@ impl Instance {
self.execute_sql(&sql, Arc::new(QueryContext::new())).await
}
Some(select_expr::Expr::LogicalPlan(plan)) => self.execute_logical(plan).await,
Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => {
self.physical_planner
.execute(PhysicalPlanner::parse(plan)?, original_ql)
.await
}
_ => UnsupportedExprSnafu {
name: format!("{:?}", expr),
}

View File

@@ -31,7 +31,6 @@ use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
use crate::instance::{create_local_file_log_store, new_object_store, DefaultEngine, Instance};
use crate::script::ScriptExecutor;
use crate::server::grpc::plan::PhysicalPlanner;
use crate::sql::SqlHandler;
impl Instance {
@@ -63,7 +62,6 @@ impl Instance {
catalog_manager.clone(),
query_engine.clone(),
);
let physical_planner = PhysicalPlanner::new(query_engine.clone());
let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone())
.await
.unwrap();
@@ -79,7 +77,6 @@ impl Instance {
query_engine,
sql_handler,
catalog_manager,
physical_planner,
script_executor,
meta_client,
heartbeat_task,
@@ -133,7 +130,6 @@ impl Instance {
query_engine.clone(),
),
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,
table_id_provider: Some(Arc::new(LocalTableIdProvider::default())),
meta_client: Some(meta_client),

View File

@@ -12,5 +12,299 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod ddl;
pub(crate) mod plan;
use std::sync::Arc;
use api::result::AdminResultBuilder;
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
use common_telemetry::{error, info};
use futures::TryFutureExt;
use session::context::QueryContext;
use snafu::prelude::*;
use table::requests::DropTableRequest;
use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu};
use crate::instance::Instance;
use crate::sql::SqlRequest;
impl Instance {
/// Handle gRPC create table requests.
pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult {
// Respect CreateExpr's table id and region ids if present, or allocate table id
// from local table id provider and set region id to 0.
let table_id = if let Some(table_id) = expr.table_id {
info!(
"Creating table {:?}.{:?}.{:?} with table id from frontend: {}",
expr.catalog_name, expr.schema_name, expr.table_name, table_id
);
table_id
} else {
match self.table_id_provider.as_ref() {
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Internal as u32)
.err_msg("Table id provider absent in standalone mode".to_string())
.build();
}
Some(table_id_provider) => {
match table_id_provider
.next_table_id()
.await
.context(BumpTableIdSnafu)
{
Ok(table_id) => {
info!(
"Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}",
&expr.catalog_name, &expr.schema_name, expr.table_name, table_id
);
table_id
}
Err(e) => {
error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name);
return AdminResultBuilder::default()
.status_code(e.status_code() as u32)
.err_msg(e.to_string())
.build();
}
}
}
}
};
let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu);
let result = futures::future::ready(request)
.and_then(|request| {
self.sql_handler().execute(
SqlRequest::CreateTable(request),
Arc::new(QueryContext::new()),
)
})
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
// Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug.
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult {
let request = match alter_expr_to_request(expr)
.context(AlterExprToRequestSnafu)
.transpose()
{
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(0, 0)
.build()
}
Some(req) => req,
};
let result = futures::future::ready(request)
.and_then(|request| {
self.sql_handler()
.execute(SqlRequest::Alter(request), Arc::new(QueryContext::new()))
})
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult {
let req = DropTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
};
let result = self
.sql_handler()
.execute(SqlRequest::DropTable(req), Arc::new(QueryContext::new()))
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as _, 0)
.build(),
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::{ColumnDataType, ColumnDef};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc_expr::create_table_schema;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_create_expr_to_request() {
common_telemetry::init_default_ut_logging();
let expr = testing_create_expr();
let request = create_expr_to_request(1024, expr).unwrap();
assert_eq!(request.id, MIN_USER_TABLE_ID);
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");
assert_eq!(request.desc, Some("blabla".to_string()));
assert_eq!(request.schema, expected_table_schema());
assert_eq!(request.primary_key_indices, vec![1, 0]);
assert!(request.create_if_not_exists);
let mut expr = testing_create_expr();
expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()];
let result = create_expr_to_request(1025, expr);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"),
"{}",
err_msg
);
}
#[test]
fn test_create_table_schema() {
let mut expr = testing_create_expr();
let schema = create_table_schema(&expr).unwrap();
assert_eq!(schema, expected_table_schema());
expr.time_index = "not-exist-column".to_string();
let result = create_table_schema(&expr);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Missing timestamp column"),
"actual: {}",
err_msg
);
}
#[test]
fn test_create_column_schema() {
let column_def = ColumnDef {
name: "a".to_string(),
datatype: 1024,
is_nullable: true,
default_constraint: None,
};
let result = column_def.try_as_column_schema();
assert!(matches!(
result.unwrap_err(),
api::error::Error::UnknownColumnDataType { .. }
));
let column_def = ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: None,
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable());
let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value"));
let column_def = ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: Some(default_constraint.clone().try_into().unwrap()),
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable());
assert_eq!(
default_constraint,
*column_schema.default_constraint().unwrap()
);
}
fn testing_create_expr() -> CreateExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::Timestamp as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "cpu".to_string(),
datatype: ColumnDataType::Float32 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "memory".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
table_name: "my-metrics".to_string(),
desc: Some("blabla".to_string()),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: Default::default(),
table_id: Some(MIN_USER_TABLE_ID),
region_ids: vec![0],
}
}
fn expected_table_schema() -> SchemaRef {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false)
.with_time_index(true),
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
];
Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.build()
.unwrap(),
)
}
}

View File

@@ -1,310 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::result::AdminResultBuilder;
use api::v1::{AdminResult, AlterExpr, CreateExpr, DropTableExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
use common_telemetry::{error, info};
use futures::TryFutureExt;
use session::context::QueryContext;
use snafu::prelude::*;
use table::requests::DropTableRequest;
use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu};
use crate::instance::Instance;
use crate::sql::SqlRequest;
impl Instance {
/// Handle gRPC create table requests.
pub(crate) async fn handle_create(&self, expr: CreateExpr) -> AdminResult {
// Respect CreateExpr's table id and region ids if present, or allocate table id
// from local table id provider and set region id to 0.
let table_id = if let Some(table_id) = expr.table_id {
info!(
"Creating table {:?}.{:?}.{:?} with table id from frontend: {}",
expr.catalog_name, expr.schema_name, expr.table_name, table_id
);
table_id
} else {
match self.table_id_provider.as_ref() {
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Internal as u32)
.err_msg("Table id provider absent in standalone mode".to_string())
.build();
}
Some(table_id_provider) => {
match table_id_provider
.next_table_id()
.await
.context(BumpTableIdSnafu)
{
Ok(table_id) => {
info!(
"Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}",
&expr.catalog_name, &expr.schema_name, expr.table_name, table_id
);
table_id
}
Err(e) => {
error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name);
return AdminResultBuilder::default()
.status_code(e.status_code() as u32)
.err_msg(e.to_string())
.build();
}
}
}
}
};
let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu);
let result = futures::future::ready(request)
.and_then(|request| {
self.sql_handler().execute(
SqlRequest::CreateTable(request),
Arc::new(QueryContext::new()),
)
})
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
// Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug.
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult {
let request = match alter_expr_to_request(expr)
.context(AlterExprToRequestSnafu)
.transpose()
{
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(0, 0)
.build()
}
Some(req) => req,
};
let result = futures::future::ready(request)
.and_then(|request| {
self.sql_handler()
.execute(SqlRequest::Alter(request), Arc::new(QueryContext::new()))
})
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult {
let req = DropTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
};
let result = self
.sql_handler()
.execute(SqlRequest::DropTable(req), Arc::new(QueryContext::new()))
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as _, 0)
.build(),
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::{ColumnDataType, ColumnDef};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_grpc_expr::create_table_schema;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder, SchemaRef};
use datatypes::value::Value;
use super::*;
#[tokio::test(flavor = "multi_thread")]
async fn test_create_expr_to_request() {
common_telemetry::init_default_ut_logging();
let expr = testing_create_expr();
let request = create_expr_to_request(1024, expr).unwrap();
assert_eq!(request.id, MIN_USER_TABLE_ID);
assert_eq!(request.catalog_name, "greptime".to_string());
assert_eq!(request.schema_name, "public".to_string());
assert_eq!(request.table_name, "my-metrics");
assert_eq!(request.desc, Some("blabla".to_string()));
assert_eq!(request.schema, expected_table_schema());
assert_eq!(request.primary_key_indices, vec![1, 0]);
assert!(request.create_if_not_exists);
let mut expr = testing_create_expr();
expr.primary_keys = vec!["host".to_string(), "not-exist-column".to_string()];
let result = create_expr_to_request(1025, expr);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Column `not-exist-column` not found in table `my-metrics`"),
"{}",
err_msg
);
}
#[test]
fn test_create_table_schema() {
let mut expr = testing_create_expr();
let schema = create_table_schema(&expr).unwrap();
assert_eq!(schema, expected_table_schema());
expr.time_index = "not-exist-column".to_string();
let result = create_table_schema(&expr);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Missing timestamp column"),
"actual: {}",
err_msg
);
}
#[test]
fn test_create_column_schema() {
let column_def = ColumnDef {
name: "a".to_string(),
datatype: 1024,
is_nullable: true,
default_constraint: None,
};
let result = column_def.try_as_column_schema();
assert!(matches!(
result.unwrap_err(),
api::error::Error::UnknownColumnDataType { .. }
));
let column_def = ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: None,
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable());
let default_constraint = ColumnDefaultConstraint::Value(Value::from("default value"));
let column_def = ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: Some(default_constraint.clone().try_into().unwrap()),
};
let column_schema = column_def.try_as_column_schema().unwrap();
assert_eq!(column_schema.name, "a");
assert_eq!(column_schema.data_type, ConcreteDataType::string_datatype());
assert!(column_schema.is_nullable());
assert_eq!(
default_constraint,
*column_schema.default_constraint().unwrap()
);
}
fn testing_create_expr() -> CreateExpr {
let column_defs = vec![
ColumnDef {
name: "host".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::Timestamp as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "cpu".to_string(),
datatype: ColumnDataType::Float32 as i32,
is_nullable: true,
default_constraint: None,
},
ColumnDef {
name: "memory".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: true,
default_constraint: None,
},
];
CreateExpr {
catalog_name: None,
schema_name: None,
table_name: "my-metrics".to_string(),
desc: Some("blabla".to_string()),
column_defs,
time_index: "ts".to_string(),
primary_keys: vec!["ts".to_string(), "host".to_string()],
create_if_not_exists: true,
table_options: Default::default(),
table_id: Some(MIN_USER_TABLE_ID),
region_ids: vec![0],
}
}
fn expected_table_schema() -> SchemaRef {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("ts", ConcreteDataType::timestamp_millis_datatype(), false)
.with_time_index(true),
ColumnSchema::new("cpu", ConcreteDataType::float32_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
];
Arc::new(
SchemaBuilder::try_from(column_schemas)
.unwrap()
.build()
.unwrap(),
)
}
}

View File

@@ -1,55 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_grpc::{AsExecutionPlan, DefaultAsPlanImpl};
use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef};
use common_query::Output;
use datatypes::schema::Schema;
use query::QueryEngineRef;
use snafu::ResultExt;
use crate::error::{ConvertSchemaSnafu, ExecutePhysicalPlanSnafu, IntoPhysicalPlanSnafu, Result};
pub struct PhysicalPlanner {
query_engine: QueryEngineRef,
}
impl PhysicalPlanner {
pub fn new(query_engine: QueryEngineRef) -> Self {
Self { query_engine }
}
pub fn parse(bytes: Vec<u8>) -> Result<PhysicalPlanRef> {
let physical_plan = DefaultAsPlanImpl { bytes }
.try_into_physical_plan()
.context(IntoPhysicalPlanSnafu)?;
let schema: Arc<Schema> = Arc::new(
physical_plan
.schema()
.try_into()
.context(ConvertSchemaSnafu)?,
);
Ok(Arc::new(PhysicalPlanAdapter::new(schema, physical_plan)))
}
pub async fn execute(&self, plan: PhysicalPlanRef, _original_ql: Vec<u8>) -> Result<Output> {
self.query_engine
.execute_physical(&plan)
.await
.context(ExecutePhysicalPlanSnafu)
}
}