feat: migrate substrait to datafusion official implementation (#1238)

* some test cases will fail

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* revert version changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update substrait-proto version

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix compile

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update df again

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/common/substrait/Cargo.toml

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* revert COPY FROM / COPY TO sqlness to standalone only

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
Ruihang Xia
2023-04-17 15:54:35 +08:00
committed by GitHub
parent 0937ccdb61
commit dc85a4b5bb
14 changed files with 1099 additions and 253 deletions

1206
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -65,6 +65,7 @@ datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74a778ca6016a853a3c3add3fa8c6f12f4fe4561" }
futures = "0.3"
futures-util = "0.3"
parquet = "37.0"

View File

@@ -109,10 +109,16 @@ impl CatalogProvider for CatalogProviderAdapter {
}
///Greptime CatalogProvider -> datafusion's CatalogProvider
struct DfCatalogProviderAdapter {
pub struct DfCatalogProviderAdapter {
catalog_provider: CatalogProviderRef,
}
impl DfCatalogProviderAdapter {
pub fn new(catalog_provider: CatalogProviderRef) -> Self {
Self { catalog_provider }
}
}
impl DfCatalogProvider for DfCatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self

View File

@@ -14,16 +14,18 @@ common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
datafusion.workspace = true
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
datatypes = { path = "../../datatypes" }
futures = "0.3"
prost.workspace = true
session = { path = "../../session" }
snafu.workspace = true
table = { path = "../../table" }
query = { path = "../../query" }
[dependencies.substrait_proto]
package = "substrait"
version = "0.4"
version = "0.7"
[dev-dependencies]
datatypes = { path = "../../datatypes" }

View File

@@ -49,10 +49,10 @@ use crate::error::{
use crate::schema::{from_schema, to_schema};
use crate::SubstraitPlan;
pub struct DFLogicalSubstraitConvertor;
pub struct DFLogicalSubstraitConvertorDeprecated;
#[async_trait]
impl SubstraitPlan for DFLogicalSubstraitConvertor {
impl SubstraitPlan for DFLogicalSubstraitConvertorDeprecated {
type Error = Error;
type Plan = LogicalPlan;
@@ -76,7 +76,7 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
}
}
impl DFLogicalSubstraitConvertor {
impl DFLogicalSubstraitConvertorDeprecated {
async fn convert_plan(
&self,
mut plan: Plan,
@@ -197,6 +197,14 @@ impl DFLogicalSubstraitConvertor {
name: "Cross Relation",
}
.fail()?,
RelType::HashJoin(_) => UnsupportedPlanSnafu {
name: "Cross Relation",
}
.fail()?,
RelType::MergeJoin(_) => UnsupportedPlanSnafu {
name: "Cross Relation",
}
.fail()?,
};
Ok(logical_plan)
@@ -311,7 +319,7 @@ impl DFLogicalSubstraitConvertor {
}
}
impl DFLogicalSubstraitConvertor {
impl DFLogicalSubstraitConvertorDeprecated {
fn logical_plan_to_rel(
&self,
ctx: &mut ConvertorContext,
@@ -585,7 +593,7 @@ mod test {
}
async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) {
let convertor = DFLogicalSubstraitConvertor;
let convertor = DFLogicalSubstraitConvertorDeprecated;
let proto = convertor.encode(plan.clone()).unwrap();
let tripped_plan = convertor.decode(proto, catalog).await.unwrap();

View File

@@ -0,0 +1,72 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
use catalog::datafusion::catalog_adapter::DfCatalogProviderAdapter;
use catalog::CatalogManagerRef;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use datafusion::prelude::SessionContext;
use datafusion_expr::LogicalPlan;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
use prost::Message;
use snafu::ResultExt;
use substrait_proto::proto::Plan;
use crate::error::{DecodeDfPlanSnafu, DecodeRelSnafu, EncodeDfPlanSnafu, EncodeRelSnafu, Error};
use crate::SubstraitPlan;
pub struct DFLogicalSubstraitConvertor;
#[async_trait]
impl SubstraitPlan for DFLogicalSubstraitConvertor {
type Error = Error;
type Plan = LogicalPlan;
async fn decode<B: Buf + Send>(
&self,
message: B,
catalog_manager: CatalogManagerRef,
) -> Result<Self::Plan, Self::Error> {
let mut context = SessionContext::new();
context.register_catalog(
DEFAULT_CATALOG_NAME,
Arc::new(DfCatalogProviderAdapter::new(
catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.unwrap(),
)),
);
let plan = Plan::decode(message).context(DecodeRelSnafu)?;
let df_plan = from_substrait_plan(&mut context, &plan)
.await
.context(DecodeDfPlanSnafu)?;
Ok(df_plan)
}
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error> {
let mut buf = BytesMut::new();
let substrait_plan = to_substrait_plan(&plan).context(EncodeDfPlanSnafu)?;
substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;
Ok(buf.freeze())
}
}

View File

@@ -109,6 +109,18 @@ pub enum Error {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Failed to encode DataFusion plan, source: {}", source))]
EncodeDfPlan {
source: datafusion::error::DataFusionError,
location: Location,
},
#[snafu(display("Failed to decode DataFusion plan, source: {}", source))]
DecodeDfPlan {
source: datafusion::error::DataFusionError,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -129,7 +141,10 @@ impl ErrorExt for Error {
| Error::InvalidParameters { .. }
| Error::TableNotFound { .. }
| Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments,
Error::DFInternal { .. } | Error::Internal { .. } => StatusCode::Internal,
Error::DFInternal { .. }
| Error::Internal { .. }
| Error::EncodeDfPlan { .. }
| Error::DecodeDfPlan { .. } => StatusCode::Internal,
Error::ConvertDfSchema { source } => source.status_code(),
Error::ResolveTable { source, .. } => source.status_code(),
}

View File

@@ -18,6 +18,7 @@
mod context;
mod df_expr;
mod df_logical;
mod df_substrait;
pub mod error;
mod schema;
mod types;
@@ -26,7 +27,7 @@ use async_trait::async_trait;
use bytes::{Buf, Bytes};
use catalog::CatalogManagerRef;
pub use crate::df_logical::DFLogicalSubstraitConvertor;
pub use crate::df_substrait::DFLogicalSubstraitConvertor;
#[async_trait]
pub trait SubstraitPlan {

View File

@@ -770,7 +770,8 @@ mod test {
options: SortOptions::default(),
}],
Arc::new(merge),
);
)
.with_fetch(None);
assert_eq!(sort.output_partitioning().partition_count(), 1);
let session_ctx = SessionContext::new();

View File

@@ -75,18 +75,22 @@ impl QueryContext {
pub fn set_current_schema(&self, schema: &str) {
let last = self.current_schema.swap(Arc::new(schema.to_string()));
debug!(
"set new session default schema: {:?}, swap old: {:?}",
schema, last
)
if schema != last.as_str() {
debug!(
"set new session default schema: {:?}, swap old: {:?}",
schema, last
)
}
}
pub fn set_current_catalog(&self, catalog: &str) {
let last = self.current_catalog.swap(Arc::new(catalog.to_string()));
debug!(
"set new session default catalog: {:?}, swap old: {:?}",
catalog, last
)
if catalog != last.as_str() {
debug!(
"set new session default catalog: {:?}, swap old: {:?}",
catalog, last
)
}
}
}