feat: serialize/deserialize logical and execution plan via substrait (#317)

* fix: change Utf8Array indice type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: remove unused sub-crate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: impl for both Logical and Execution plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: move test-util subcrate into table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test: table scan logical plan round trip

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* drop support of physical plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix warnings

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename trait fns to encode/decode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* address review comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2022-10-24 15:29:33 +08:00
committed by GitHub
parent 6fc45e31e0
commit 8ab43b65ea
9 changed files with 590 additions and 27 deletions

112
Cargo.lock generated
View File

@@ -93,7 +93,7 @@ name = "api"
version = "0.1.0"
dependencies = [
"datatypes",
"prost",
"prost 0.11.0",
"snafu",
"tonic",
"tonic-build",
@@ -1040,8 +1040,8 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e57ff02e8ad8e06ab9731d5dc72dc23bef9200778eae1a89d555d8c42e5d4a86"
dependencies = [
"prost",
"prost-types",
"prost 0.11.0",
"prost-types 0.11.1",
"tonic",
"tracing-core",
]
@@ -1058,7 +1058,7 @@ dependencies = [
"futures",
"hdrhistogram",
"humantime",
"prost-types",
"prost-types 0.11.1",
"serde",
"serde_json",
"thread_local",
@@ -1628,7 +1628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1259da3b15ec7e54bd7203adb2c4335adb9ca1d47b56220d650e52c247e824a"
dependencies = [
"http",
"prost",
"prost 0.11.0",
"tokio",
"tokio-stream",
"tonic",
@@ -2504,10 +2504,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "logical-plans"
version = "0.1.0"
[[package]]
name = "lru"
version = "0.7.8"
@@ -3610,6 +3606,16 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prost"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
dependencies = [
"bytes",
"prost-derive 0.9.0",
]
[[package]]
name = "prost"
version = "0.11.0"
@@ -3617,7 +3623,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.11.0",
]
[[package]]
name = "prost-build"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
dependencies = [
"bytes",
"heck 0.3.3",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prost 0.9.0",
"prost-types 0.9.0",
"regex",
"tempfile",
"which",
]
[[package]]
@@ -3633,13 +3659,26 @@ dependencies = [
"log",
"multimap",
"petgraph",
"prost",
"prost-types",
"prost 0.11.0",
"prost-types 0.11.1",
"regex",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9cc1a3263e07e0bf68e96268f37665207b49560d98739662cdfaae215c720fe"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-derive"
version = "0.11.0"
@@ -3653,6 +3692,16 @@ dependencies = [
"syn",
]
[[package]]
name = "prost-types"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
dependencies = [
"bytes",
"prost 0.9.0",
]
[[package]]
name = "prost-types"
version = "0.11.1"
@@ -3660,7 +3709,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e"
dependencies = [
"bytes",
"prost",
"prost 0.11.0",
]
[[package]]
@@ -4844,7 +4893,7 @@ dependencies = [
"object-store",
"paste",
"planus",
"prost",
"prost 0.11.0",
"rand 0.8.5",
"regex",
"serde",
@@ -4986,6 +5035,35 @@ dependencies = [
"winapi",
]
[[package]]
name = "substrait"
version = "0.1.0"
dependencies = [
"bytes",
"catalog",
"common-error",
"datafusion",
"datatypes",
"futures",
"prost 0.9.0",
"snafu",
"substrait 0.2.0",
"table",
"tokio",
]
[[package]]
name = "substrait"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46079e9004f5e069eae2976d4e23ea29c4e215b1096d3d53b76b19879f346100"
dependencies = [
"glob",
"prost 0.9.0",
"prost-build 0.9.0",
"prost-types 0.9.0",
]
[[package]]
name = "subtle"
version = "2.4.1"
@@ -5412,8 +5490,8 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"prost 0.11.0",
"prost-derive 0.11.0",
"tokio",
"tokio-stream",
"tokio-util",
@@ -5432,7 +5510,7 @@ checksum = "2fbcd2800e34e743b9ae795867d5f77b535d3a3be69fd731e39145719752df8c"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"prost-build 0.11.1",
"quote",
"syn",
]

View File

@@ -12,13 +12,13 @@ members = [
"src/common/query",
"src/common/recordbatch",
"src/common/runtime",
"src/common/substrait",
"src/common/telemetry",
"src/common/time",
"src/datanode",
"src/datatypes",
"src/frontend",
"src/log-store",
"src/logical-plans",
"src/meta-client",
"src/meta-srv",
"src/object-store",

View File

@@ -184,7 +184,7 @@ impl ExecutionPlan for MockExecution {
_runtime: Arc<RuntimeEnv>,
) -> datafusion::error::Result<SendableRecordBatchStream> {
let id_array = Arc::new(PrimitiveArray::from_slice([1u32, 2, 3, 4, 5]));
let name_array = Arc::new(Utf8Array::<i64>::from_slice([
let name_array = Arc::new(Utf8Array::<i32>::from_slice([
"zhangsan", "lisi", "wangwu", "Tony", "Mike",
]));
let age_array = Arc::new(PrimitiveArray::from_slice([25u32, 28, 27, 35, 25]));

View File

@@ -0,0 +1,25 @@
[package]
name = "substrait"
version = "0.1.0"
edition = "2021"
[dependencies]
bytes = "1.1"
catalog = { path = "../../catalog" }
common-error = { path = "../error" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
] }
futures = "0.3"
prost = "0.9"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../../table" }
[dependencies.substrait_proto]
package = "substrait"
version = "0.2"
[dev-dependencies]
datatypes = { path = "../../datatypes" }
table = { path = "../../table" }
tokio = { version = "1.0", features = ["full"] }

View File

@@ -0,0 +1,360 @@
use std::sync::Arc;
use bytes::{Buf, Bytes, BytesMut};
use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use datafusion::datasource::TableProvider;
use datafusion::logical_plan::{LogicalPlan, TableScan, ToDFSchema};
use prost::Message;
use snafu::ensure;
use snafu::{OptionExt, ResultExt};
use substrait_proto::protobuf::plan_rel::RelType as PlanRelType;
use substrait_proto::protobuf::read_rel::{NamedTable, ReadType};
use substrait_proto::protobuf::rel::RelType;
use substrait_proto::protobuf::PlanRel;
use substrait_proto::protobuf::ReadRel;
use substrait_proto::protobuf::Rel;
use table::table::adapter::DfTableProviderAdapter;
use crate::error::Error;
use crate::error::{
DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, InternalSnafu,
InvalidParametersSnafu, MissingFieldSnafu, TableNotFoundSnafu, UnknownPlanSnafu,
UnsupportedExprSnafu, UnsupportedPlanSnafu,
};
use crate::SubstraitPlan;
pub struct DFLogicalSubstraitConvertor {
catalog_manager: CatalogManagerRef,
}
impl SubstraitPlan for DFLogicalSubstraitConvertor {
type Error = Error;
type Plan = LogicalPlan;
fn decode<B: Buf + Send>(&self, message: B) -> Result<Self::Plan, Self::Error> {
let plan_rel = PlanRel::decode(message).context(DecodeRelSnafu)?;
let rel = match plan_rel.rel_type.context(EmptyPlanSnafu)? {
PlanRelType::Rel(rel) => rel,
PlanRelType::Root(_) => UnsupportedPlanSnafu {
name: "Root Relation",
}
.fail()?,
};
self.convert_rel(rel)
}
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error> {
let rel = self.convert_plan(plan)?;
let mut buf = BytesMut::new();
rel.encode(&mut buf).context(EncodeRelSnafu)?;
Ok(buf.freeze())
}
}
impl DFLogicalSubstraitConvertor {
pub fn new(catalog_manager: CatalogManagerRef) -> Self {
Self { catalog_manager }
}
}
impl DFLogicalSubstraitConvertor {
pub fn convert_rel(&self, rel: Rel) -> Result<LogicalPlan, Error> {
let rel_type = rel.rel_type.context(EmptyPlanSnafu)?;
let logical_plan = match rel_type {
RelType::Read(read_rel) => self.convert_read_rel(read_rel),
RelType::Filter(_filter_rel) => UnsupportedPlanSnafu {
name: "Filter Relation",
}
.fail()?,
RelType::Fetch(_fetch_rel) => UnsupportedPlanSnafu {
name: "Fetch Relation",
}
.fail()?,
RelType::Aggregate(_aggr_rel) => UnsupportedPlanSnafu {
name: "Fetch Relation",
}
.fail()?,
RelType::Sort(_sort_rel) => UnsupportedPlanSnafu {
name: "Sort Relation",
}
.fail()?,
RelType::Join(_join_rel) => UnsupportedPlanSnafu {
name: "Join Relation",
}
.fail()?,
RelType::Project(_project_rel) => UnsupportedPlanSnafu {
name: "Project Relation",
}
.fail()?,
RelType::Set(_set_rel) => UnsupportedPlanSnafu {
name: "Set Relation",
}
.fail()?,
RelType::ExtensionSingle(_ext_single_rel) => UnsupportedPlanSnafu {
name: "Extension Single Relation",
}
.fail()?,
RelType::ExtensionMulti(_ext_multi_rel) => UnsupportedPlanSnafu {
name: "Extension Multi Relation",
}
.fail()?,
RelType::ExtensionLeaf(_ext_leaf_rel) => UnsupportedPlanSnafu {
name: "Extension Leaf Relation",
}
.fail()?,
RelType::Cross(_cross_rel) => UnsupportedPlanSnafu {
name: "Cross Relation",
}
.fail()?,
}?;
Ok(logical_plan)
}
fn convert_read_rel(&self, read_rel: Box<ReadRel>) -> Result<LogicalPlan, Error> {
// Extract the catalog, schema and table name from NamedTable. Assume the first three are those names.
let read_type = read_rel.read_type.context(MissingFieldSnafu {
field: "read_type",
plan: "Read",
})?;
let (table_name, schema_name, catalog_name) = match read_type {
ReadType::NamedTable(mut named_table) => {
ensure!(
named_table.names.len() == 3,
InvalidParametersSnafu {
reason:
"NamedTable should contains three names for catalog, schema and table",
}
);
(
named_table.names.pop().unwrap(),
named_table.names.pop().unwrap(),
named_table.names.pop().unwrap(),
)
}
ReadType::VirtualTable(_) | ReadType::LocalFiles(_) | ReadType::ExtensionTable(_) => {
UnsupportedExprSnafu {
name: "Non-NamedTable Read",
}
.fail()?
}
};
// Get table handle from catalog manager
let table_ref = self
.catalog_manager
.table(Some(&catalog_name), Some(&schema_name), &table_name)
.map_err(BoxedError::new)
.context(InternalSnafu)?
.context(TableNotFoundSnafu {
name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
})?;
let adapter = Arc::new(DfTableProviderAdapter::new(table_ref));
// Get schema direct from the table.
// TODO(ruihang): Maybe need to verify the schema with the one in Substrait?
let schema = adapter
.schema()
.to_dfschema_ref()
.context(DFInternalSnafu)?;
// TODO(ruihang): Support projection, filters and limit
Ok(LogicalPlan::TableScan(TableScan {
table_name,
source: adapter,
projection: None,
projected_schema: schema,
filters: vec![],
limit: None,
}))
}
}
impl DFLogicalSubstraitConvertor {
pub fn convert_plan(&self, plan: LogicalPlan) -> Result<Rel, Error> {
match plan {
LogicalPlan::Projection(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::Filter(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::Window(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::Aggregate(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::Sort(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::Join(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::CrossJoin(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::Repartition(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::Union(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::TableScan(table_scan) => {
let read_rel = self.convert_table_scan_plan(table_scan)?;
Ok(Rel {
rel_type: Some(RelType::Read(Box::new(read_rel))),
})
}
LogicalPlan::EmptyRelation(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::Limit(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Values(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_) => InvalidParametersSnafu {
reason: format!(
"Trying to convert DDL/DML plan to substrait proto, plan: {:?}",
plan
),
}
.fail()?,
}
}
pub fn convert_table_scan_plan(&self, table_scan: TableScan) -> Result<ReadRel, Error> {
let provider = table_scan
.source
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownPlanSnafu)?;
let table_info = provider.table().table_info();
let catalog_name = table_info.catalog_name.clone();
let schema_name = table_info.schema_name.clone();
let table_name = table_info.name.clone();
let named_table = NamedTable {
names: vec![catalog_name, schema_name, table_name],
advanced_extension: None,
};
let read_type = ReadType::NamedTable(named_table);
let read_rel = ReadRel {
common: None,
base_schema: None,
filter: None,
projection: None,
advanced_extension: None,
read_type: Some(read_type),
};
Ok(read_rel)
}
}
#[cfg(test)]
mod test {
use catalog::{
memory::{MemoryCatalogProvider, MemorySchemaProvider},
CatalogList, CatalogProvider, LocalCatalogManager, RegisterTableRequest,
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use datatypes::schema::Schema;
use table::{requests::CreateTableRequest, test_util::EmptyTable, test_util::MockTableEngine};
use super::*;
const DEFAULT_TABLE_NAME: &str = "SubstraitTable";
async fn build_mock_catalog_manager() -> CatalogManagerRef {
let mock_table_engine = Arc::new(MockTableEngine::new());
let catalog_manager = Arc::new(
LocalCatalogManager::try_new(mock_table_engine)
.await
.unwrap(),
);
let schema_provider = Arc::new(MemorySchemaProvider::new());
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider);
catalog_manager.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
catalog_manager.init().await.unwrap();
catalog_manager
}
fn build_create_table_request<N: ToString>(table_name: N) -> CreateTableRequest {
CreateTableRequest {
id: 1,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
desc: None,
schema: Arc::new(Schema::new(vec![])),
primary_key_indices: vec![],
create_if_not_exists: true,
table_options: Default::default(),
}
}
async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) {
let convertor = DFLogicalSubstraitConvertor::new(catalog);
let rel = convertor.convert_plan(plan.clone()).unwrap();
let tripped_plan = convertor.convert_rel(rel).unwrap();
assert_eq!(format!("{:?}", plan), format!("{:?}", tripped_plan));
}
#[tokio::test]
async fn test_bare_table_scan() {
let catalog_manager = build_mock_catalog_manager().await;
let table_ref = Arc::new(EmptyTable::new(build_create_table_request(
DEFAULT_TABLE_NAME,
)));
catalog_manager
.register_table(RegisterTableRequest {
catalog: Some(DEFAULT_CATALOG_NAME.to_string()),
schema: Some(DEFAULT_SCHEMA_NAME.to_string()),
table_name: DEFAULT_TABLE_NAME.to_string(),
table_id: 1,
table: table_ref.clone(),
})
.await
.unwrap();
let adapter = Arc::new(DfTableProviderAdapter::new(table_ref));
let schema = adapter.schema().to_dfschema_ref().unwrap();
let table_scan_plan = LogicalPlan::TableScan(TableScan {
table_name: DEFAULT_TABLE_NAME.to_string(),
source: adapter,
projection: None,
projected_schema: schema,
filters: vec![],
limit: None,
});
logical_plan_round_trip(table_scan_plan, catalog_manager).await;
}
}

View File

@@ -0,0 +1,92 @@
use std::any::Any;
use common_error::prelude::{BoxedError, ErrorExt, StatusCode};
use datafusion::error::DataFusionError;
use prost::{DecodeError, EncodeError};
use snafu::{Backtrace, ErrorCompat, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Unsupported physical expr: {}", name))]
UnsupportedPlan { name: String, backtrace: Backtrace },
#[snafu(display("Unsupported physical plan: {}", name))]
UnsupportedExpr { name: String, backtrace: Backtrace },
#[snafu(display("Failed to decode substrait relation, source: {}", source))]
DecodeRel {
source: DecodeError,
backtrace: Backtrace,
},
#[snafu(display("Failed to encode substrait relation, source: {}", source))]
EncodeRel {
source: EncodeError,
backtrace: Backtrace,
},
#[snafu(display("Input plan is empty"))]
EmptyPlan { backtrace: Backtrace },
#[snafu(display("Input expression is empty"))]
EmptyExpr { backtrace: Backtrace },
#[snafu(display("Missing required field in protobuf, field: {}, plan: {}", field, plan))]
MissingField {
field: String,
plan: String,
backtrace: Backtrace,
},
#[snafu(display("Invalid parameters: {}", reason))]
InvalidParameters {
reason: String,
backtrace: Backtrace,
},
#[snafu(display("Internal error from DataFusion: {}", source))]
DFInternal {
source: DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Internal error: {}", source))]
Internal {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Table quering not found: {}", name))]
TableNotFound { name: String, backtrace: Backtrace },
#[snafu(display("Cannot convert plan doesn't belong to GrepTimeDB"))]
UnknownPlan { backtrace: Backtrace },
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::UnsupportedPlan { .. } | Error::UnsupportedExpr { .. } => {
StatusCode::Unsupported
}
Error::UnknownPlan { .. }
| Error::EncodeRel { .. }
| Error::DecodeRel { .. }
| Error::EmptyPlan { .. }
| Error::EmptyExpr { .. }
| Error::MissingField { .. }
| Error::InvalidParameters { .. }
| Error::TableNotFound { .. } => StatusCode::InvalidArguments,
Error::DFInternal { .. } | Error::Internal { .. } => StatusCode::Internal,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,16 @@
mod df_logical;
mod error;
use bytes::{Buf, Bytes};
pub use crate::df_logical::DFLogicalSubstraitConvertor;
pub trait SubstraitPlan {
type Error: std::error::Error;
type Plan;
fn decode<B: Buf + Send>(&self, message: B) -> Result<Self::Plan, Self::Error>;
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error>;
}

View File

@@ -1,7 +0,0 @@
[package]
name = "logical-plans"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

View File

@@ -1 +0,0 @@