feat: impl grpc physical plan (#212)

* chore: rename "convert.rs" to "serde.rs"

* proto definition

* impl "projection"

* add mock_input_exec for test

* impl physical plan execution
This commit is contained in:
fys
2022-08-31 21:43:50 +08:00
committed by GitHub
parent ba93aa83f2
commit db55c69117
29 changed files with 734 additions and 75 deletions

14
Cargo.lock generated
View File

@@ -695,6 +695,8 @@ version = "0.1.0"
dependencies = [
"api",
"common-error",
"common-grpc",
"datafusion",
"snafu",
"tokio",
"tonic 0.8.0",
@@ -775,6 +777,17 @@ dependencies = [
"statrs",
]
[[package]]
name = "common-grpc"
version = "0.1.0"
dependencies = [
"api",
"arrow2",
"async-trait",
"datafusion",
"snafu",
]
[[package]]
name = "common-query"
version = "0.1.0"
@@ -1263,6 +1276,7 @@ dependencies = [
"client",
"common-base",
"common-error",
"common-grpc",
"common-query",
"common-recordbatch",
"common-runtime",

View File

@@ -6,6 +6,7 @@ members = [
"src/common/base",
"src/common/error",
"src/common/function",
"src/common/grpc",
"src/common/query",
"src/common/recordbatch",
"src/common/runtime",

View File

@@ -4,6 +4,7 @@ fn main() {
&[
"greptime/v1/insert.proto",
"greptime/v1/select.proto",
"greptime/v1/physical_plan.proto",
"greptime/v1/greptime.proto",
],
&["."],

View File

@@ -25,14 +25,19 @@ message ExprHeader {
uint32 version = 1;
}
// TODO(fys): Only support sql now, and
// will support promql etc in the future
// TODO(fys): Only support sql now, and will support promql etc in the future
message SelectExpr {
oneof expr {
string sql = 1;
PhysicalPlan physical_plan = 15;
}
}
message PhysicalPlan {
bytes original_ql = 1;
bytes plan = 2;
}
message InsertExpr {
string table_name = 1;
repeated bytes values = 2;

View File

@@ -0,0 +1,33 @@
syntax = "proto3";
package greptime.v1.codec;
message PhysicalPlanNode {
oneof PhysicalPlanType {
ProjectionExecNode projection = 1;
MockInputExecNode mock = 99;
// TODO(fys): impl other physical plan node
}
}
message ProjectionExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalExprNode expr = 2;
repeated string expr_name = 3;
}
message PhysicalExprNode {
oneof ExprType {
PhysicalColumn column = 1;
// TODO(fys): impl other physical expr node
}
}
message PhysicalColumn {
string name = 1;
uint64 index = 2;
}
message MockInputExecNode {
string name = 1;
}

View File

@@ -1,2 +1,4 @@
pub mod convert;
pub mod serde;
pub mod v1;
pub use prost::DecodeError;

View File

@@ -1,38 +1,34 @@
pub use prost::DecodeError;
use prost::Message;
use crate::v1::codec::{InsertBatch, SelectResult};
use crate::v1::codec::{InsertBatch, PhysicalPlanNode, SelectResult};
impl From<InsertBatch> for Vec<u8> {
fn from(insert: InsertBatch) -> Self {
insert.encode_to_vec()
}
macro_rules! impl_convert_with_bytes {
($data_type: ty) => {
impl From<$data_type> for Vec<u8> {
fn from(entity: $data_type) -> Self {
entity.encode_to_vec()
}
}
impl TryFrom<&[u8]> for $data_type {
type Error = DecodeError;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
<$data_type>::decode(value.as_ref())
}
}
};
}
impl TryFrom<Vec<u8>> for InsertBatch {
type Error = DecodeError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
InsertBatch::decode(value.as_ref())
}
}
impl From<SelectResult> for Vec<u8> {
fn from(result: SelectResult) -> Self {
result.encode_to_vec()
}
}
impl TryFrom<Vec<u8>> for SelectResult {
type Error = DecodeError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
SelectResult::decode(value.as_ref())
}
}
impl_convert_with_bytes!(InsertBatch);
impl_convert_with_bytes!(SelectResult);
impl_convert_with_bytes!(PhysicalPlanNode);
#[cfg(test)]
mod tests {
use std::ops::Deref;
use crate::v1::codec::*;
use crate::v1::column;
use crate::v1::Column;
@@ -44,7 +40,7 @@ mod tests {
let insert_batch = mock_insert_batch();
let bytes: Vec<u8> = insert_batch.into();
let insert: InsertBatch = bytes.try_into().unwrap();
let insert: InsertBatch = bytes.deref().try_into().unwrap();
assert_eq!(8, insert.row_count);
assert_eq!(1, insert.columns.len());
@@ -70,7 +66,7 @@ mod tests {
bytes[0] = 0b1;
bytes[1] = 0b1;
let insert: InsertBatch = bytes.try_into().unwrap();
let insert: InsertBatch = bytes.deref().try_into().unwrap();
assert_eq!(8, insert.row_count);
assert_eq!(1, insert.columns.len());
@@ -90,7 +86,7 @@ mod tests {
let select_result = mock_select_result();
let bytes: Vec<u8> = select_result.into();
let result: SelectResult = bytes.try_into().unwrap();
let result: SelectResult = bytes.deref().try_into().unwrap();
assert_eq!(8, result.row_count);
assert_eq!(1, result.columns.len());
@@ -116,7 +112,7 @@ mod tests {
bytes[0] = 0b1;
bytes[1] = 0b1;
let result: SelectResult = bytes.try_into().unwrap();
let result: SelectResult = bytes.deref().try_into().unwrap();
assert_eq!(8, result.row_count);
assert_eq!(1, result.columns.len());

View File

@@ -8,6 +8,8 @@ edition = "2021"
[dependencies]
api = { path = "../api" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
snafu = { version = "0.7", features = ["backtraces"] }
tonic = "0.8"

View File

@@ -0,0 +1,37 @@
use std::sync::Arc;
use client::{Client, Database};
use common_grpc::MockExecution;
use datafusion::physical_plan::{
expressions::Column, projection::ProjectionExec, ExecutionPlan, PhysicalExpr,
};
use tracing::{event, Level};
fn main() {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish())
.unwrap();
run();
}
#[tokio::main]
async fn run() {
let client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let db = Database::new("greptime", client);
let physical = mock_physical_plan();
let result = db.physical_plan(physical, None).await;
event!(Level::INFO, "result: {:#?}", result);
}
fn mock_physical_plan() -> Arc<dyn ExecutionPlan> {
let id_expr = Arc::new(Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
let age_expr = Arc::new(Column::new("age", 2)) as Arc<dyn PhysicalExpr>;
let expr = vec![(id_expr, "id".to_string()), (age_expr, "age".to_string())];
let input =
Arc::new(MockExecution::new("mock_input_exec".to_string())) as Arc<dyn ExecutionPlan>;
let projection = ProjectionExec::try_new(expr, input).unwrap();
Arc::new(projection)
}

View File

@@ -1,5 +1,4 @@
use api::v1::{select_expr, SelectExpr};
use client::{Client, Database};
use client::{Client, Database, Select};
use tracing::{event, Level};
fn main() {
@@ -14,10 +13,8 @@ async fn run() {
let client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let db = Database::new("greptime", client);
let select_expr = SelectExpr {
expr: Some(select_expr::Expr::Sql("select * from demo".to_string())),
};
let result = db.select(select_expr).await.unwrap();
let sql = Select::Sql("select * from demo".to_string());
let result = db.select(sql).await.unwrap();
event!(Level::INFO, "result: {:#?}", result);
}

View File

@@ -1,14 +1,22 @@
use std::ops::Deref;
use std::sync::Arc;
use api::v1::codec::SelectResult as GrpcSelectResult;
use api::v1::{
object_expr, object_result, DatabaseRequest, ExprHeader, InsertExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, SelectExpr,
object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan,
SelectExpr,
};
use common_error::status_code::StatusCode;
use common_grpc::AsExcutionPlan;
use common_grpc::DefaultAsPlanImpl;
use datafusion::physical_plan::ExecutionPlan;
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{self, MissingResultSnafu};
use crate::{
error::DatanodeSnafu, error::DecodeSelectSnafu, error::MissingHeaderSnafu, Client, Result,
error::DatanodeSnafu, error::DecodeSelectSnafu, error::EncodePhysicalSnafu,
error::MissingHeaderSnafu, Client, Result,
};
pub const PROTOCOL_VERSION: u32 = 1;
@@ -51,7 +59,34 @@ impl Database {
Ok(())
}
pub async fn select(&self, select_expr: SelectExpr) -> Result<ObjectResult> {
pub async fn select(&self, expr: Select) -> Result<ObjectResult> {
let select_expr = match expr {
Select::Sql(sql) => SelectExpr {
expr: Some(select_expr::Expr::Sql(sql)),
},
};
self.do_select(select_expr).await
}
pub async fn physical_plan(
&self,
physical: Arc<dyn ExecutionPlan>,
original_ql: Option<String>,
) -> Result<ObjectResult> {
let plan = DefaultAsPlanImpl::try_from_physical_plan(physical.clone())
.context(EncodePhysicalSnafu { physical })?
.bytes;
let original_ql = original_ql.unwrap_or_default();
let select_expr = SelectExpr {
expr: Some(select_expr::Expr::PhysicalPlan(PhysicalPlan {
original_ql: original_ql.into_bytes(),
plan,
})),
};
self.do_select(select_expr).await
}
async fn do_select(&self, select_expr: SelectExpr) -> Result<ObjectResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
@@ -81,7 +116,11 @@ impl Database {
let result = match obj_result {
object_result::Result::Select(select) => {
let result = select.raw_data.try_into().context(DecodeSelectSnafu)?;
let result = select
.raw_data
.deref()
.try_into()
.context(DecodeSelectSnafu)?;
ObjectResult::Select(result)
}
object_result::Result::Mutate(mutate) => ObjectResult::Mutate(mutate),
@@ -125,3 +164,7 @@ pub enum ObjectResult {
Select(GrpcSelectResult),
Mutate(GrpcMutateResult),
}
pub enum Select {
Sql(String),
}

View File

@@ -1,5 +1,8 @@
use api::convert::DecodeError;
use std::sync::Arc;
use api::serde::DecodeError;
use common_error::prelude::*;
use datafusion::physical_plan::ExecutionPlan;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -32,6 +35,13 @@ pub enum Error {
#[snafu(display("Error occurred on the data node, code: {}, msg: {}", code, msg))]
Datanode { code: u32, msg: String },
#[snafu(display("Failed to encode physical plan: {:?}, source: {}", physical, source))]
EncodePhysical {
physical: Arc<dyn ExecutionPlan>,
#[snafu(backtrace)]
source: common_grpc::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -4,6 +4,6 @@ mod error;
pub use self::{
client::Client,
database::{Database, ObjectResult},
database::{Database, ObjectResult, Select},
error::{Error, Result},
};

View File

@@ -0,0 +1,15 @@
[package]
name = "common-grpc"
version = "0.1.0"
edition = "2021"
[dependencies]
api = { path = "../../api" }
async-trait = "0.1"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
snafu = { version = "0.7", features = ["backtraces"] }
[dependencies.arrow]
package = "arrow2"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]

View File

@@ -0,0 +1,34 @@
use api::DecodeError;
use datafusion::error::DataFusionError;
use snafu::{Backtrace, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Unexpected empty physical plan type: {}", name))]
EmptyPhysicalPlan { name: String, backtrace: Backtrace },
#[snafu(display("Unexpected empty physical expr: {}", name))]
EmptyPhysicalExpr { name: String, backtrace: Backtrace },
#[snafu(display("Unsupported datafusion execution plan: {}", name))]
UnsupportedDfPlan { name: String, backtrace: Backtrace },
#[snafu(display("Unsupported datafusion physical expr: {}", name))]
UnsupportedDfExpr { name: String, backtrace: Backtrace },
#[snafu(display("Missing required field in protobuf, field: {}", field))]
MissingField { field: String, backtrace: Backtrace },
#[snafu(display("Failed to new datafusion projection exec, source: {}", source))]
NewProjection {
source: DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Failed to decode physical plan node, source: {}", source))]
DecodePhysicalPlanNode {
source: DecodeError,
backtrace: Backtrace,
},
}

View File

@@ -0,0 +1,8 @@
pub mod error;
pub mod physical;
pub use error::Error;
pub use physical::{
plan::{DefaultAsPlanImpl, MockExecution},
AsExcutionPlan,
};

View File

@@ -0,0 +1,18 @@
mod expr;
pub mod plan;
use std::{result::Result, sync::Arc};
use datafusion::physical_plan::ExecutionPlan;
pub type ExecutionPlanRef = Arc<dyn ExecutionPlan>;
pub trait AsExcutionPlan {
type Error: std::error::Error;
fn try_into_physical_plan(&self) -> Result<ExecutionPlanRef, Self::Error>;
fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result<Self, Self::Error>
where
Self: Sized;
}

View File

@@ -0,0 +1,82 @@
use std::{result::Result, sync::Arc};
use api::v1::codec;
use datafusion::physical_plan::{expressions::Column as DfColumn, PhysicalExpr as DfPhysicalExpr};
use snafu::OptionExt;
use crate::error::{EmptyPhysicalExprSnafu, Error, UnsupportedDfExprSnafu};
// grpc -> datafusion (physical expr)
pub(crate) fn parse_grpc_physical_expr(
proto: &codec::PhysicalExprNode,
) -> Result<Arc<dyn DfPhysicalExpr>, Error> {
let expr_type = proto.expr_type.as_ref().context(EmptyPhysicalExprSnafu {
name: format!("{:?}", proto),
})?;
// TODO(fys): impl other physical expr
let pexpr: Arc<dyn DfPhysicalExpr> = match expr_type {
codec::physical_expr_node::ExprType::Column(c) => {
let pcol = DfColumn::new(&c.name, c.index as usize);
Arc::new(pcol)
}
};
Ok(pexpr)
}
// datafusion -> grpc (physical expr)
pub(crate) fn parse_df_physical_expr(
df_expr: Arc<dyn DfPhysicalExpr>,
) -> Result<codec::PhysicalExprNode, Error> {
let expr = df_expr.as_any();
// TODO(fys): impl other physical expr
if let Some(expr) = expr.downcast_ref::<DfColumn>() {
Ok(codec::PhysicalExprNode {
expr_type: Some(codec::physical_expr_node::ExprType::Column(
codec::PhysicalColumn {
name: expr.name().to_string(),
index: expr.index() as u64,
},
)),
})
} else {
UnsupportedDfExprSnafu {
name: df_expr.to_string(),
}
.fail()?
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::codec::{physical_expr_node::ExprType::Column, PhysicalColumn, PhysicalExprNode};
use datafusion::physical_plan::{expressions::Column as DfColumn, PhysicalExpr};
use crate::physical::expr::{parse_df_physical_expr, parse_grpc_physical_expr};
#[test]
fn test_column_convert() {
// mock df_column_expr
let df_column = DfColumn::new("name", 11);
let df_column_clone = df_column.clone();
let df_expr = Arc::new(df_column) as Arc<dyn PhysicalExpr>;
// mock grpc_column_expr
let grpc_expr = PhysicalExprNode {
expr_type: Some(Column(PhysicalColumn {
name: "name".to_owned(),
index: 11,
})),
};
let result = parse_df_physical_expr(df_expr).unwrap();
assert_eq!(grpc_expr, result);
let result = parse_grpc_physical_expr(&grpc_expr).unwrap();
let df_column = result.as_any().downcast_ref::<DfColumn>().unwrap();
assert_eq!(df_column_clone, df_column.to_owned());
}
}

View File

@@ -0,0 +1,269 @@
use std::{ops::Deref, result::Result, sync::Arc};
use api::v1::codec::{
physical_plan_node::PhysicalPlanType, MockInputExecNode, PhysicalPlanNode, ProjectionExecNode,
};
use arrow::{
array::{PrimitiveArray, Utf8Array},
datatypes::{DataType, Field, Schema},
};
use async_trait::async_trait;
use datafusion::{
execution::runtime_env::RuntimeEnv,
field_util::SchemaExt,
physical_plan::{
memory::MemoryStream, projection::ProjectionExec, ExecutionPlan, PhysicalExpr,
SendableRecordBatchStream, Statistics,
},
record_batch::RecordBatch,
};
use snafu::{OptionExt, ResultExt};
use crate::error::{
DecodePhysicalPlanNodeSnafu, EmptyPhysicalPlanSnafu, Error, MissingFieldSnafu,
NewProjectionSnafu, UnsupportedDfPlanSnafu,
};
use crate::physical::{expr, AsExcutionPlan, ExecutionPlanRef};
pub struct DefaultAsPlanImpl {
pub bytes: Vec<u8>,
}
impl AsExcutionPlan for DefaultAsPlanImpl {
type Error = Error;
// Vec<u8> -> PhysicalPlanNode -> ExecutionPlanRef
fn try_into_physical_plan(&self) -> Result<ExecutionPlanRef, Self::Error> {
let physicalplan_node: PhysicalPlanNode = self
.bytes
.deref()
.try_into()
.context(DecodePhysicalPlanNodeSnafu)?;
physicalplan_node.try_into_physical_plan()
}
// ExecutionPlanRef -> PhysicalPlanNode -> Vec<u8>
fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result<Self, Self::Error>
where
Self: Sized,
{
let bytes: Vec<u8> = PhysicalPlanNode::try_from_physical_plan(plan)?.into();
Ok(DefaultAsPlanImpl { bytes })
}
}
impl AsExcutionPlan for PhysicalPlanNode {
type Error = Error;
fn try_into_physical_plan(&self) -> Result<ExecutionPlanRef, Self::Error> {
let plan = self
.physical_plan_type
.as_ref()
.context(EmptyPhysicalPlanSnafu {
name: format!("{:?}", self),
})?;
// TODO(fys): impl other physical plan type
match plan {
PhysicalPlanType::Projection(projection) => {
let input = if let Some(input) = &projection.input {
input.as_ref().try_into_physical_plan()?
} else {
MissingFieldSnafu { field: "input" }.fail()?
};
let exprs = projection
.expr
.iter()
.zip(projection.expr_name.iter())
.map(|(expr, name)| {
Ok((expr::parse_grpc_physical_expr(expr)?, name.to_string()))
})
.collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>, Error>>()?;
let projection =
ProjectionExec::try_new(exprs, input).context(NewProjectionSnafu)?;
Ok(Arc::new(projection))
}
PhysicalPlanType::Mock(mock) => Ok(Arc::new(MockExecution {
name: mock.name.to_string(),
})),
}
}
fn try_from_physical_plan(plan: ExecutionPlanRef) -> Result<Self, Self::Error>
where
Self: Sized,
{
let plan = plan.as_any();
if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
let input = PhysicalPlanNode::try_from_physical_plan(exec.input().to_owned())?;
let expr = exec
.expr()
.iter()
.map(|expr| expr::parse_df_physical_expr(expr.0.clone()))
.collect::<Result<Vec<_>, Error>>()?;
let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
Ok(PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
ProjectionExecNode {
input: Some(Box::new(input)),
expr,
expr_name,
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<MockExecution>() {
Ok(PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Mock(MockInputExecNode {
name: exec.name.clone(),
})),
})
} else {
UnsupportedDfPlanSnafu {
name: format!("{:?}", plan),
}
.fail()?
}
}
}
// TODO(fys): use "test" feature to enable it
#[derive(Debug)]
pub struct MockExecution {
name: String,
}
impl MockExecution {
pub fn new(name: String) -> Self {
Self { name }
}
}
#[async_trait]
impl ExecutionPlan for MockExecution {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
let field1 = Field::new("id", DataType::UInt32, false);
let field2 = Field::new("name", DataType::LargeUtf8, false);
let field3 = Field::new("age", DataType::UInt32, false);
Arc::new(arrow::datatypes::Schema::new(vec![field1, field2, field3]))
}
fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
unimplemented!()
}
fn output_ordering(
&self,
) -> Option<&[datafusion::physical_plan::expressions::PhysicalSortExpr]> {
unimplemented!()
}
fn children(&self) -> Vec<ExecutionPlanRef> {
unimplemented!()
}
fn with_new_children(
&self,
_children: Vec<ExecutionPlanRef>,
) -> datafusion::error::Result<ExecutionPlanRef> {
unimplemented!()
}
async fn execute(
&self,
_partition: usize,
_runtime: Arc<RuntimeEnv>,
) -> datafusion::error::Result<SendableRecordBatchStream> {
let id_array = Arc::new(PrimitiveArray::from_slice([1u32, 2, 3, 4, 5]));
let name_array = Arc::new(Utf8Array::<i64>::from_slice([
"zhangsan", "lisi", "wangwu", "Tony", "Mike",
]));
let age_array = Arc::new(PrimitiveArray::from_slice([25u32, 28, 27, 35, 25]));
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
Field::new("name", DataType::LargeUtf8, false),
Field::new("age", DataType::UInt32, false),
]));
let record_batch =
RecordBatch::try_new(schema, vec![id_array, name_array, age_array]).unwrap();
let data: Vec<RecordBatch> = vec![record_batch];
let projection = Some(vec![0, 1, 2]);
let stream = MemoryStream::try_new(data, self.schema(), projection).unwrap();
Ok(Box::pin(stream))
}
fn statistics(&self) -> Statistics {
todo!()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::codec::PhysicalPlanNode;
use datafusion::physical_plan::{expressions::Column, projection::ProjectionExec};
use crate::physical::{
plan::{DefaultAsPlanImpl, MockExecution},
{AsExcutionPlan, ExecutionPlanRef},
};
#[test]
fn test_convert_df_projection_with_bytes() {
let projection_exec = mock_df_projection();
let bytes = DefaultAsPlanImpl::try_from_physical_plan(projection_exec).unwrap();
let exec = bytes.try_into_physical_plan().unwrap();
verify_df_porjection(exec);
}
#[test]
fn test_convert_df_with_grpc_projection() {
let projection_exec = mock_df_projection();
let projection_node = PhysicalPlanNode::try_from_physical_plan(projection_exec).unwrap();
let exec = projection_node.try_into_physical_plan().unwrap();
verify_df_porjection(exec);
}
fn mock_df_projection() -> Arc<ProjectionExec> {
let mock_input = Arc::new(MockExecution {
name: "mock_input".to_string(),
});
let column1 = Arc::new(Column::new("id", 0));
let column2 = Arc::new(Column::new("name", 1));
Arc::new(
ProjectionExec::try_new(
vec![(column1, "id".to_string()), (column2, "name".to_string())],
mock_input,
)
.unwrap(),
)
}
fn verify_df_porjection(exec: ExecutionPlanRef) {
let projection_exec = exec.as_any().downcast_ref::<ProjectionExec>().unwrap();
let mock_input = projection_exec
.input()
.as_any()
.downcast_ref::<MockExecution>()
.unwrap();
assert_eq!("mock_input", mock_input.name);
assert_eq!(2, projection_exec.expr().len());
assert_eq!("id", projection_exec.expr()[0].1);
assert_eq!("name", projection_exec.expr()[1].1);
}
}

View File

@@ -18,10 +18,12 @@ axum-macros = "0.2"
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datatypes = { path = "../datatypes" }
hyper = { version = "0.14", features = ["full"] }
log-store = { path = "../log-store" }

View File

@@ -1,6 +1,6 @@
use std::any::Any;
use api::convert::DecodeError;
use api::serde::DecodeError;
use common_error::ext::BoxedError;
use common_error::prelude::*;
use datatypes::prelude::ConcreteDataType;
@@ -11,26 +11,32 @@ use table::error::Error as TableError;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Fail to execute sql, source: {}", source))]
#[snafu(display("Failed to execute sql, source: {}", source))]
ExecuteSql {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Fail to create catalog list, source: {}", source))]
#[snafu(display("Failed to execute physical plan, source: {}", source))]
ExecutePhysicalPlan {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Failed to create catalog list, source: {}", source))]
NewCatalog {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Fail to create table: {}, {}", table_name, source))]
#[snafu(display("Failed to create table: {}, source: {}", table_name, source))]
CreateTable {
table_name: String,
#[snafu(backtrace)]
source: TableError,
},
#[snafu(display("Fail to get table: {}, {}", table_name, source))]
#[snafu(display("Failed to get table: {}, source: {}", table_name, source))]
GetTable {
table_name: String,
#[snafu(backtrace)]
@@ -53,7 +59,7 @@ pub enum Error {
))]
ColumnValuesNumberMismatch { columns: usize, values: usize },
#[snafu(display("Fail to parse value: {}, {}", msg, backtrace))]
#[snafu(display("Failed to parse value: {}, {}", msg, backtrace))]
ParseSqlValue { msg: String, backtrace: Backtrace },
#[snafu(display(
@@ -68,7 +74,7 @@ pub enum Error {
actual: ConcreteDataType,
},
#[snafu(display("Fail to insert value to table: {}, {}", table_name, source))]
#[snafu(display("Failed to insert value to table: {}, source: {}", table_name, source))]
Insert {
table_name: String,
source: TableError,
@@ -77,7 +83,7 @@ pub enum Error {
#[snafu(display("Illegal insert data"))]
IllegalInsertData,
#[snafu(display("Fail to convert bytes to insert batch, {}", source))]
#[snafu(display("Failed to convert bytes to insert batch, source: {}", source))]
DecodeInsert { source: DecodeError },
#[snafu(display("Failed to start server, source: {}", source))]
@@ -86,19 +92,19 @@ pub enum Error {
source: servers::error::Error,
},
#[snafu(display("Fail to parse address {}, source: {}", addr, source))]
#[snafu(display("Failed to parse address {}, source: {}", addr, source))]
ParseAddr {
addr: String,
source: std::net::AddrParseError,
},
#[snafu(display("Fail to bind address {}, source: {}", addr, source))]
#[snafu(display("Failed to bind address {}, source: {}", addr, source))]
TcpBind {
addr: String,
source: std::io::Error,
},
#[snafu(display("Fail to start gRPC server, source: {}", source))]
#[snafu(display("Failed to start gRPC server, source: {}", source))]
StartGrpc { source: tonic::transport::Error },
#[snafu(display("Failed to create directory {}, source: {}", dir, source))]
@@ -132,12 +138,18 @@ pub enum Error {
#[snafu(display("Invalid CREATE TABLE sql statement, cause: {}", msg))]
InvalidCreateTableSql { msg: String, backtrace: Backtrace },
#[snafu(display("Failed to create schema when creating table: {}", source))]
#[snafu(display("Failed to create schema when creating table, source: {}", source))]
CreateSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to convert datafusion schema, source: {}", source))]
ConvertSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("SQL data type not supported yet: {:?}", t))]
SqlTypeNotSupported {
t: sql::ast::DataType,
@@ -156,11 +168,17 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to insert into system catalog table: {}", source))]
#[snafu(display("Failed to insert into system catalog table, source: {}", source))]
InsertSystemCatalog {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Failed to decode as physical plan, source: {}", source))]
IntoPhysicalPlan {
#[snafu(backtrace)]
source: common_grpc::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -169,10 +187,12 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ExecuteSql { source } => source.status_code(),
Error::ExecutePhysicalPlan { source } => source.status_code(),
Error::NewCatalog { source } => source.status_code(),
Error::CreateTable { source, .. } => source.status_code(),
Error::GetTable { source, .. } => source.status_code(),
Error::Insert { source, .. } => source.status_code(),
Error::ConvertSchema { source, .. } => source.status_code(),
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::ColumnValuesNumberMismatch { .. }
@@ -193,6 +213,7 @@ impl ErrorExt for Error {
| Error::CreateDir { .. }
| Error::InsertSystemCatalog { .. }
| Error::Conversion { .. }
| Error::IntoPhysicalPlan { .. }
| Error::UnsupportedExpr { .. } => StatusCode::Internal,
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),

View File

@@ -20,10 +20,12 @@ use table_engine::engine::MitoEngine;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, ExecuteSqlSnafu, InsertSnafu, NewCatalogSnafu, Result, TableNotFoundSnafu,
UnsupportedExprSnafu,
};
use crate::metric;
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::insert::insertion_expr_to_request;
use crate::server::grpc::plan::PhysicalPlanner;
use crate::server::grpc::select::to_object_result;
use crate::sql::{SqlHandler, SqlRequest};
@@ -36,6 +38,7 @@ pub struct Instance {
sql_handler: SqlHandler<DefaultEngine>,
// Catalog list
catalog_manager: CatalogManagerRef,
physical_planner: PhysicalPlanner,
}
pub type InstanceRef = Arc<Instance>;
@@ -63,9 +66,10 @@ impl Instance {
let query_engine = factory.query_engine().clone();
Ok(Self {
query_engine,
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()),
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
})
}
@@ -167,12 +171,23 @@ impl Instance {
}
async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult {
match select_expr.expr {
Some(select_expr::Expr::Sql(sql)) => {
let result = self.execute_sql(&sql).await;
to_object_result(result).await
let result = self.do_handle_select(select_expr).await;
to_object_result(result).await
}
async fn do_handle_select(&self, select_expr: SelectExpr) -> Result<Output> {
let expr = select_expr.expr;
match expr {
Some(select_expr::Expr::Sql(sql)) => self.execute_sql(&sql).await,
Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => {
self.physical_planner
.execute(PhysicalPlanner::parse(plan)?, original_ql)
.await
}
None => ObjectResult::default(),
_ => UnsupportedExprSnafu {
name: format!("{:?}", expr),
}
.fail(),
}
}

View File

@@ -1,3 +1,4 @@
pub(crate) mod handler;
pub mod insert;
pub mod select;
pub(crate) mod insert;
pub(crate) mod plan;
pub(crate) mod select;

View File

@@ -1,5 +1,6 @@
use std::{
collections::{hash_map::Entry, HashMap},
ops::Deref,
sync::Arc,
};
@@ -68,7 +69,7 @@ fn insert_batches(bytes_vec: Vec<Vec<u8>>) -> Result<Vec<InsertBatch>> {
let mut insert_batches = Vec::with_capacity(bytes_vec.len());
for bytes in bytes_vec {
insert_batches.push(bytes.try_into().context(DecodeInsertSnafu)?);
insert_batches.push(bytes.deref().try_into().context(DecodeInsertSnafu)?);
}
Ok(insert_batches)
}

View File

@@ -0,0 +1,44 @@
use std::sync::Arc;
use common_grpc::AsExcutionPlan;
use common_grpc::DefaultAsPlanImpl;
use datatypes::schema::Schema;
use query::PhysicalPlanAdapter;
use query::{plan::PhysicalPlan, Output, QueryEngineRef};
use snafu::ResultExt;
use crate::error::Result;
use crate::error::{ConvertSchemaSnafu, ExecutePhysicalPlanSnafu, IntoPhysicalPlanSnafu};
pub type PhysicalPlanRef = Arc<dyn PhysicalPlan>;
pub struct PhysicalPlanner {
query_engine: QueryEngineRef,
}
impl PhysicalPlanner {
pub fn new(query_engine: QueryEngineRef) -> Self {
Self { query_engine }
}
pub fn parse(bytes: Vec<u8>) -> Result<PhysicalPlanRef> {
let physical_plan = DefaultAsPlanImpl { bytes }
.try_into_physical_plan()
.context(IntoPhysicalPlanSnafu)?;
let schema: Arc<Schema> = Arc::new(
physical_plan
.schema()
.try_into()
.context(ConvertSchemaSnafu)?,
);
Ok(Arc::new(PhysicalPlanAdapter::new(schema, physical_plan)))
}
pub async fn execute(&self, plan: PhysicalPlanRef, _original_ql: Vec<u8>) -> Result<Output> {
self.query_engine
.execute_physical(&plan)
.await
.context(ExecutePhysicalPlanSnafu)
}
}

View File

@@ -2,7 +2,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::{codec::InsertBatch, column, select_expr, Column, SelectExpr};
use api::v1::{codec::InsertBatch, column, Column};
use client::{Client, Database, ObjectResult};
use servers::grpc::GrpcServer;
use servers::server::Server;
@@ -86,10 +86,10 @@ async fn test_insert_and_select() {
assert!(result.is_ok());
// select
let select_expr = SelectExpr {
expr: Some(select_expr::Expr::Sql("select * from demo".to_string())),
};
let result = db.select(select_expr).await.unwrap();
let result = db
.select(client::Select::Sql("select * from demo".to_string()))
.await
.unwrap();
assert!(matches!(result, ObjectResult::Select(_)));
match result {
ObjectResult::Select(select_result) => {

View File

@@ -2,7 +2,7 @@
mod catalog_adapter;
mod error;
mod plan_adapter;
pub mod plan_adapter;
mod planner;
use std::sync::Arc;
@@ -84,6 +84,11 @@ impl QueryEngine for DatafusionQueryEngine {
))
}
async fn execute_physical(&self, plan: &Arc<dyn PhysicalPlan>) -> Result<Output> {
let ctx = QueryContext::new(self.state.clone());
Ok(Output::RecordBatch(self.execute_stream(&ctx, plan).await?))
}
fn register_udf(&self, udf: ScalarUdf) {
self.state.register_udf(udf);
}

View File

@@ -11,6 +11,7 @@ pub mod plan;
pub mod planner;
pub mod query_engine;
pub use crate::datafusion::plan_adapter::PhysicalPlanAdapter;
pub use crate::query_engine::{
Output, QueryContext, QueryEngine, QueryEngineFactory, QueryEngineRef,
};

View File

@@ -12,7 +12,7 @@ use sql::statements::statement::Statement;
use crate::datafusion::DatafusionQueryEngine;
use crate::error::Result;
use crate::plan::LogicalPlan;
use crate::plan::{LogicalPlan, PhysicalPlan};
pub use crate::query_engine::context::QueryContext;
pub use crate::query_engine::state::QueryEngineState;
@@ -34,6 +34,8 @@ pub trait QueryEngine: Send + Sync {
async fn execute(&self, plan: &LogicalPlan) -> Result<Output>;
async fn execute_physical(&self, plan: &Arc<dyn PhysicalPlan>) -> Result<Output>;
fn register_udf(&self, udf: ScalarUdf);
fn register_aggregate_function(&self, func: AggregateFunctionMetaRef);