feat: support quering with logical plan in gRPC layer (#344)

* impl logical exec & example

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test on upper api

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add todo to prost dep

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sign the TODO

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2022-10-25 16:05:53 +08:00
committed by GitHub
parent 2ca667cbdf
commit 7fe39e9187
10 changed files with 192 additions and 24 deletions

4
Cargo.lock generated
View File

@@ -805,7 +805,10 @@ dependencies = [
"datafusion",
"datanode",
"datatypes",
"prost 0.9.0",
"snafu",
"substrait 0.1.0",
"substrait 0.2.0",
"tokio",
"tonic",
"tracing",
@@ -1418,6 +1421,7 @@ dependencies = [
"sql",
"storage",
"store-api",
"substrait 0.1.0",
"table",
"table-engine",
"tempdir",

View File

@@ -27,6 +27,7 @@ message ObjectExpr {
message SelectExpr {
oneof expr {
string sql = 1;
bytes logical_plan = 2;
PhysicalPlan physical_plan = 15;
}
}

View File

@@ -13,13 +13,25 @@ common-grpc = { path = "../common/grpc" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
datatypes = { path = "../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
tonic = "0.8"
[dev-dependencies]
datanode = { path = "../datanode" }
substrait = { path = "../common/substrait" }
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[dev-dependencies.substrait_proto]
package = "substrait"
version = "0.2"
# TODO(ruihang): upgrade to 0.11 once substrait-rs supports it.
[dev-dependencies.prost_09]
package = "prost"
version = "0.9"

View File

@@ -0,0 +1,96 @@
use api::v1::{ColumnDataType, ColumnDef, CreateExpr};
use client::{admin::Admin, Client, Database};
use prost_09::Message;
use substrait_proto::protobuf::{
plan_rel::RelType as PlanRelType,
read_rel::{NamedTable, ReadType},
rel::RelType,
PlanRel, ReadRel, Rel,
};
use tracing::{event, Level};
fn main() {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish())
.unwrap();
run();
}
#[tokio::main]
async fn run() {
let client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let create_table_expr = CreateExpr {
catalog_name: Some("greptime".to_string()),
schema_name: Some("public".to_string()),
table_name: "test_logical_dist_exec".to_string(),
desc: None,
column_defs: vec![
ColumnDef {
name: "timestamp".to_string(),
datatype: ColumnDataType::Timestamp as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "key".to_string(),
datatype: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: None,
},
ColumnDef {
name: "value".to_string(),
datatype: ColumnDataType::Uint64 as i32,
is_nullable: false,
default_constraint: None,
},
],
time_index: "timestamp".to_string(),
primary_keys: vec!["key".to_string()],
create_if_not_exists: false,
table_options: Default::default(),
};
let admin = Admin::new("create table", client.clone());
let result = admin.create(create_table_expr).await.unwrap();
event!(Level::INFO, "create table result: {:#?}", result);
let logical = mock_logical_plan();
event!(Level::INFO, "plan size: {:#?}", logical.len());
let db = Database::new("greptime", client);
let result = db.logical_plan(logical).await.unwrap();
event!(Level::INFO, "result: {:#?}", result);
}
fn mock_logical_plan() -> Vec<u8> {
let catalog_name = "greptime".to_string();
let schema_name = "public".to_string();
let table_name = "test_logical_dist_exec".to_string();
let named_table = NamedTable {
names: vec![catalog_name, schema_name, table_name],
advanced_extension: None,
};
let read_type = ReadType::NamedTable(named_table);
let read_rel = ReadRel {
common: None,
base_schema: None,
filter: None,
projection: None,
advanced_extension: None,
read_type: Some(read_type),
};
let mut buf = vec![];
let rel = Rel {
rel_type: Some(RelType::Read(Box::new(read_rel))),
};
let plan_rel = PlanRel {
rel_type: Some(PlanRelType::Rel(rel)),
};
plan_rel.encode(&mut buf).unwrap();
buf
}

View File

@@ -107,6 +107,13 @@ impl Database {
self.do_select(select_expr).await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<ObjectResult> {
let select_expr = SelectExpr {
expr: Some(select_expr::Expr::LogicalPlan(logical_plan)),
};
self.do_select(select_expr).await
}
async fn do_select(&self, select_expr: SelectExpr) -> Result<ObjectResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,

View File

@@ -47,9 +47,12 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error> {
let rel = self.convert_plan(plan)?;
let plan_rel = PlanRel {
rel_type: Some(PlanRelType::Rel(rel)),
};
let mut buf = BytesMut::new();
rel.encode(&mut buf).context(EncodeRelSnafu)?;
plan_rel.encode(&mut buf).context(EncodeRelSnafu)?;
Ok(buf.freeze())
}
@@ -182,35 +185,35 @@ impl DFLogicalSubstraitConvertor {
}
.fail()?,
LogicalPlan::Filter(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical Filter",
}
.fail()?,
LogicalPlan::Window(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical Window",
}
.fail()?,
LogicalPlan::Aggregate(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical Aggregate",
}
.fail()?,
LogicalPlan::Sort(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical Sort",
}
.fail()?,
LogicalPlan::Join(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical Join",
}
.fail()?,
LogicalPlan::CrossJoin(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical CrossJoin",
}
.fail()?,
LogicalPlan::Repartition(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical Repartition",
}
.fail()?,
LogicalPlan::Union(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical Union",
}
.fail()?,
LogicalPlan::TableScan(table_scan) => {
@@ -220,11 +223,11 @@ impl DFLogicalSubstraitConvertor {
})
}
LogicalPlan::EmptyRelation(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical EmptyRelation",
}
.fail()?,
LogicalPlan::Limit(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
name: "DataFusion Logical Limit",
}
.fail()?,
LogicalPlan::CreateExternalTable(_)
@@ -321,8 +324,8 @@ mod test {
async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) {
let convertor = DFLogicalSubstraitConvertor::new(catalog);
let rel = convertor.convert_plan(plan.clone()).unwrap();
let tripped_plan = convertor.convert_rel(rel).unwrap();
let proto = convertor.encode(plan.clone()).unwrap();
let tripped_plan = convertor.decode(proto).unwrap();
assert_eq!(format!("{:?}", plan), format!("{:?}", tripped_plan));
}

View File

@@ -1,5 +1,5 @@
mod df_logical;
mod error;
pub mod error;
use bytes::{Buf, Bytes};

View File

@@ -5,9 +5,7 @@ edition = "2021"
[features]
default = ["python"]
python = [
"dep:script"
]
python = ["dep:script"]
[dependencies]
api = { path = "../api" }
@@ -23,7 +21,9 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
datatypes = { path = "../datatypes" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
@@ -39,6 +39,7 @@ snafu = { version = "0.7", features = ["backtraces"] }
sql = { path = "../sql" }
storage = { path = "../storage" }
store-api = { path = "../store-api" }
substrait = { path = "../common/substrait" }
table = { path = "../table" }
table-engine = { path = "../table-engine", features = ["test"] }
tokio = { version = "1.18", features = ["full"] }
@@ -46,22 +47,41 @@ tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["full"] }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies.arrow]
package = "arrow2"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
features = [
"io_csv",
"io_json",
"io_parquet",
"io_parquet_compression",
"io_ipc",
"ahash",
"compute",
"serde_types",
]
[dev-dependencies]
axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" }
client = { path = "../client" }
common-query = { path = "../common/query" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
tempdir = "0.3"
[dev-dependencies.arrow]
package = "arrow2"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
features = [
"io_csv",
"io_json",
"io_parquet",
"io_parquet_compression",
"io_ipc",
"ahash",
"compute",
"serde_types",
]

View File

@@ -16,6 +16,12 @@ pub enum Error {
source: query::error::Error,
},
#[snafu(display("Failed to decode logical plan, source: {}", source))]
DecodeLogicalPlan {
#[snafu(backtrace)]
source: substrait::error::Error,
},
#[snafu(display("Failed to execute physical plan, source: {}", source))]
ExecutePhysicalPlan {
#[snafu(backtrace)]
@@ -269,6 +275,7 @@ impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ExecuteSql { source } => source.status_code(),
Error::DecodeLogicalPlan { source } => source.status_code(),
Error::ExecutePhysicalPlan { source } => source.status_code(),
Error::NewCatalog { source } => source.status_code(),

View File

@@ -7,11 +7,16 @@ use catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::status_code::StatusCode;
use common_query::Output;
use common_telemetry::logging::{debug, info};
use query::plan::LogicalPlan;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler};
use snafu::prelude::*;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::requests::AddColumnRequest;
use crate::error::{self, InsertSnafu, Result, TableNotFoundSnafu, UnsupportedExprSnafu};
use crate::error::{
self, DecodeLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, Result, TableNotFoundSnafu,
UnsupportedExprSnafu,
};
use crate::instance::Instance;
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::insert::{self, insertion_expr_to_request};
@@ -155,6 +160,7 @@ impl Instance {
let expr = select_expr.expr;
match expr {
Some(select_expr::Expr::Sql(sql)) => self.execute_sql(&sql).await,
Some(select_expr::Expr::LogicalPlan(plan)) => self.execute_logical(plan).await,
Some(select_expr::Expr::PhysicalPlan(api::v1::PhysicalPlan { original_ql, plan })) => {
self.physical_planner
.execute(PhysicalPlanner::parse(plan)?, original_ql)
@@ -166,6 +172,18 @@ impl Instance {
.fail(),
}
}
async fn execute_logical(&self, plan_bytes: Vec<u8>) -> Result<Output> {
let logical_plan_converter = DFLogicalSubstraitConvertor::new(self.catalog_manager.clone());
let logical_plan = logical_plan_converter
.decode(plan_bytes.as_slice())
.context(DecodeLogicalPlanSnafu)?;
self.query_engine
.execute(&LogicalPlan::DfPlan(logical_plan))
.await
.context(ExecuteSqlSnafu)
}
}
#[async_trait]