feat: Substrait logical plan (#704)

* feat: use Substrait logical plan to query data from Datanode in Frontend in distributed mode

* fix: resolve PR comments

* fix: resolve PR comments

* fix: resolve PR comments

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-12-06 19:21:57 +08:00
committed by GitHub
parent 2034b40f33
commit 8959dbcef8
27 changed files with 315 additions and 179 deletions

View File

@@ -15,18 +15,19 @@
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use common_catalog::error::{
DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu,
};
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize, Serializer};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::{RawTableInfo, TableId, TableVersion};
use crate::consts::{
CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX,
};
use crate::error::{
DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu,
};
const CATALOG_KEY_PREFIX: &str = "__c";
const SCHEMA_KEY_PREFIX: &str = "__s";
const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg";
const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr";
const ALPHANUMERICS_NAME_PATTERN: &str = "[a-zA-Z_][a-zA-Z0-9_]*";

View File

@@ -29,6 +29,7 @@ use crate::error::{CreateTableSnafu, Result};
pub use crate::schema::{SchemaProvider, SchemaProviderRef};
pub mod error;
pub mod helper;
pub mod local;
pub mod remote;
pub mod schema;

View File

@@ -20,10 +20,6 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use async_stream::stream;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_catalog::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use common_telemetry::{debug, info};
use futures::Stream;
use futures_util::StreamExt;
@@ -39,6 +35,10 @@ use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu,
OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
};
use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,

View File

@@ -22,12 +22,12 @@ mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::{
KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider,
};
use catalog::{CatalogList, CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use datatypes::schema::Schema;
use futures_util::StreamExt;
use table::engine::{EngineContext, TableEngineRef};

View File

@@ -14,7 +14,6 @@ regex = "1.6"
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../../table" }
[dev-dependencies]
chrono = "0.4"

View File

@@ -25,9 +25,3 @@ pub const MIN_USER_TABLE_ID: u32 = 1024;
pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0;
/// scripts table id
pub const SCRIPTS_TABLE_ID: u32 = 1;
pub(crate) const CATALOG_KEY_PREFIX: &str = "__c";
pub(crate) const SCHEMA_KEY_PREFIX: &str = "__s";
pub(crate) const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg";
pub(crate) const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr";
pub const TABLE_ID_KEY_PREFIX: &str = "__tid";

View File

@@ -14,10 +14,3 @@
pub mod consts;
pub mod error;
mod helper;
pub use helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix,
build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey,
TableGlobalValue, TableRegionalKey, TableRegionalValue,
};

View File

@@ -14,6 +14,7 @@
use std::collections::HashMap;
use datafusion::logical_plan::DFSchemaRef;
use substrait_proto::protobuf::extensions::simple_extension_declaration::{
ExtensionFunction, MappingType,
};
@@ -23,6 +24,7 @@ use substrait_proto::protobuf::extensions::SimpleExtensionDeclaration;
pub struct ConvertorContext {
scalar_fn_names: HashMap<String, u32>,
scalar_fn_map: HashMap<u32, String>,
df_schema: Option<DFSchemaRef>,
}
impl ConvertorContext {
@@ -63,4 +65,13 @@ impl ConvertorContext {
}
result
}
pub(crate) fn set_df_schema(&mut self, schema: DFSchemaRef) {
debug_assert!(self.df_schema.is_none());
self.df_schema.get_or_insert(schema);
}
pub(crate) fn df_schema(&self) -> Option<&DFSchemaRef> {
self.df_schema.as_ref()
}
}

View File

@@ -16,7 +16,7 @@ use std::collections::VecDeque;
use std::str::FromStr;
use datafusion::logical_plan::{Column, Expr};
use datafusion_expr::{expr_fn, BuiltinScalarFunction, Operator};
use datafusion_expr::{expr_fn, lit, BuiltinScalarFunction, Operator};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt};
use substrait_proto::protobuf::expression::field_reference::ReferenceType as FieldReferenceType;
@@ -24,7 +24,7 @@ use substrait_proto::protobuf::expression::reference_segment::{
ReferenceType as SegReferenceType, StructField,
};
use substrait_proto::protobuf::expression::{
FieldReference, ReferenceSegment, RexType, ScalarFunction,
FieldReference, Literal, ReferenceSegment, RexType, ScalarFunction,
};
use substrait_proto::protobuf::function_argument::ArgType;
use substrait_proto::protobuf::Expression;
@@ -33,15 +33,24 @@ use crate::context::ConvertorContext;
use crate::error::{
EmptyExprSnafu, InvalidParametersSnafu, MissingFieldSnafu, Result, UnsupportedExprSnafu,
};
use crate::types::{literal_type_to_scalar_value, scalar_value_as_literal_type};
/// Convert substrait's `Expression` to DataFusion's `Expr`.
pub fn to_df_expr(ctx: &ConvertorContext, expression: Expression, schema: &Schema) -> Result<Expr> {
pub(crate) fn to_df_expr(
ctx: &ConvertorContext,
expression: Expression,
schema: &Schema,
) -> Result<Expr> {
let expr_rex_type = expression.rex_type.context(EmptyExprSnafu)?;
match expr_rex_type {
RexType::Literal(_) => UnsupportedExprSnafu {
name: "substrait Literal expression",
RexType::Literal(l) => {
let t = l.literal_type.context(MissingFieldSnafu {
field: "LiteralType",
plan: "Literal",
})?;
let v = literal_type_to_scalar_value(t)?;
Ok(lit(v))
}
.fail()?,
RexType::Selection(selection) => convert_selection_rex(*selection, schema),
RexType::ScalarFunction(scalar_fn) => convert_scalar_function(ctx, scalar_fn, schema),
RexType::WindowFunction(_)
@@ -453,10 +462,21 @@ pub fn expression_from_df_expr(
}
}
// Don't merge them with other unsupported expr arms to preserve the ordering.
Expr::ScalarVariable(..) | Expr::Literal(..) => UnsupportedExprSnafu {
Expr::ScalarVariable(..) => UnsupportedExprSnafu {
name: expr.to_string(),
}
.fail()?,
Expr::Literal(v) => {
let t = scalar_value_as_literal_type(v)?;
let l = Literal {
nullable: true,
type_variation_reference: 0,
literal_type: Some(t),
};
Expression {
rex_type: Some(RexType::Literal(l)),
}
}
Expr::BinaryExpr { left, op, right } => {
let left = expression_from_df_expr(ctx, left, schema)?;
let right = expression_from_df_expr(ctx, right, schema)?;

View File

@@ -18,7 +18,9 @@ use bytes::{Buf, Bytes, BytesMut};
use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_telemetry::debug;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::datasource::TableProvider;
use datafusion::logical_plan::plan::Filter;
use datafusion::logical_plan::{LogicalPlan, TableScan, ToDFSchema};
use datafusion::physical_plan::project_schema;
use prost::Message;
@@ -29,31 +31,33 @@ use substrait_proto::protobuf::extensions::simple_extension_declaration::Mapping
use substrait_proto::protobuf::plan_rel::RelType as PlanRelType;
use substrait_proto::protobuf::read_rel::{NamedTable, ReadType};
use substrait_proto::protobuf::rel::RelType;
use substrait_proto::protobuf::{Plan, PlanRel, ReadRel, Rel};
use substrait_proto::protobuf::{FilterRel, Plan, PlanRel, ReadRel, Rel};
use table::table::adapter::DfTableProviderAdapter;
use crate::context::ConvertorContext;
use crate::df_expr::{expression_from_df_expr, to_df_expr};
use crate::error::{
DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error, InternalSnafu,
self, DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error, InternalSnafu,
InvalidParametersSnafu, MissingFieldSnafu, SchemaNotMatchSnafu, TableNotFoundSnafu,
UnknownPlanSnafu, UnsupportedExprSnafu, UnsupportedPlanSnafu,
};
use crate::schema::{from_schema, to_schema};
use crate::SubstraitPlan;
pub struct DFLogicalSubstraitConvertor {
catalog_manager: CatalogManagerRef,
}
pub struct DFLogicalSubstraitConvertor;
impl SubstraitPlan for DFLogicalSubstraitConvertor {
type Error = Error;
type Plan = LogicalPlan;
fn decode<B: Buf + Send>(&self, message: B) -> Result<Self::Plan, Self::Error> {
fn decode<B: Buf + Send>(
&self,
message: B,
catalog_manager: CatalogManagerRef,
) -> Result<Self::Plan, Self::Error> {
let plan = Plan::decode(message).context(DecodeRelSnafu)?;
self.convert_plan(plan)
self.convert_plan(plan, catalog_manager)
}
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error> {
@@ -67,13 +71,11 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
}
impl DFLogicalSubstraitConvertor {
pub fn new(catalog_manager: CatalogManagerRef) -> Self {
Self { catalog_manager }
}
}
impl DFLogicalSubstraitConvertor {
pub fn convert_plan(&self, mut plan: Plan) -> Result<LogicalPlan, Error> {
fn convert_plan(
&self,
mut plan: Plan,
catalog_manager: CatalogManagerRef,
) -> Result<LogicalPlan, Error> {
// prepare convertor context
let mut ctx = ConvertorContext::default();
for simple_ext in plan.extensions {
@@ -99,15 +101,51 @@ impl DFLogicalSubstraitConvertor {
}
.fail()?
};
self.rel_to_logical_plan(&mut ctx, Box::new(rel), catalog_manager)
}
fn rel_to_logical_plan(
&self,
ctx: &mut ConvertorContext,
rel: Box<Rel>,
catalog_manager: CatalogManagerRef,
) -> Result<LogicalPlan, Error> {
let rel_type = rel.rel_type.context(EmptyPlanSnafu)?;
// build logical plan
let logical_plan = match rel_type {
RelType::Read(read_rel) => self.convert_read_rel(&mut ctx, read_rel),
RelType::Filter(_filter_rel) => UnsupportedPlanSnafu {
name: "Filter Relation",
RelType::Read(read_rel) => self.convert_read_rel(ctx, read_rel, catalog_manager)?,
RelType::Filter(filter) => {
let FilterRel {
common: _,
input,
condition,
advanced_extension: _,
} = *filter;
let input = input.context(MissingFieldSnafu {
field: "input",
plan: "Filter",
})?;
let input = Arc::new(self.rel_to_logical_plan(ctx, input, catalog_manager)?);
let condition = condition.context(MissingFieldSnafu {
field: "condition",
plan: "Filter",
})?;
let schema = ctx.df_schema().context(InvalidParametersSnafu {
reason: "the underlying TableScan plan should have included a table schema",
})?;
let schema = schema
.clone()
.try_into()
.context(error::ConvertDfSchemaSnafu)?;
let predicate = to_df_expr(ctx, *condition, &schema)?;
LogicalPlan::Filter(Filter { predicate, input })
}
.fail()?,
RelType::Fetch(_fetch_rel) => UnsupportedPlanSnafu {
name: "Fetch Relation",
}
@@ -148,7 +186,7 @@ impl DFLogicalSubstraitConvertor {
name: "Cross Relation",
}
.fail()?,
}?;
};
Ok(logical_plan)
}
@@ -157,6 +195,7 @@ impl DFLogicalSubstraitConvertor {
&self,
ctx: &mut ConvertorContext,
read_rel: Box<ReadRel>,
catalog_manager: CatalogManagerRef,
) -> Result<LogicalPlan, Error> {
// Extract the catalog, schema and table name from NamedTable. Assume the first three are those names.
let read_type = read_rel.read_type.context(MissingFieldSnafu {
@@ -192,8 +231,7 @@ impl DFLogicalSubstraitConvertor {
.map(|mask_expr| self.convert_mask_expression(mask_expr));
// Get table handle from catalog manager
let table_ref = self
.catalog_manager
let table_ref = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.map_err(BoxedError::new)
.context(InternalSnafu)?
@@ -207,7 +245,7 @@ impl DFLogicalSubstraitConvertor {
let retrieved_schema = to_schema(read_rel.base_schema.unwrap_or_default())?;
let retrieved_arrow_schema = retrieved_schema.arrow_schema();
ensure!(
stored_schema.fields == retrieved_arrow_schema.fields,
same_schema_without_metadata(&stored_schema, retrieved_arrow_schema),
SchemaNotMatchSnafu {
substrait_schema: retrieved_arrow_schema.clone(),
storage_schema: stored_schema
@@ -227,9 +265,11 @@ impl DFLogicalSubstraitConvertor {
.to_dfschema_ref()
.context(DFInternalSnafu)?;
// TODO(ruihang): Support filters and limit
ctx.set_df_schema(projected_schema.clone());
// TODO(ruihang): Support limit
Ok(LogicalPlan::TableScan(TableScan {
table_name,
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
source: adapter,
projection,
projected_schema,
@@ -250,20 +290,42 @@ impl DFLogicalSubstraitConvertor {
}
impl DFLogicalSubstraitConvertor {
pub fn convert_df_plan(&self, plan: LogicalPlan) -> Result<Plan, Error> {
let mut ctx = ConvertorContext::default();
// TODO(ruihang): extract this translation logic into a separated function
// convert PlanRel
let rel = match plan {
fn logical_plan_to_rel(
&self,
ctx: &mut ConvertorContext,
plan: Arc<LogicalPlan>,
) -> Result<Rel, Error> {
Ok(match &*plan {
LogicalPlan::Projection(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Projection",
}
.fail()?,
LogicalPlan::Filter(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Filter",
LogicalPlan::Filter(filter) => {
let input = Some(Box::new(
self.logical_plan_to_rel(ctx, filter.input.clone())?,
));
let schema = plan
.schema()
.clone()
.try_into()
.context(error::ConvertDfSchemaSnafu)?;
let condition = Some(Box::new(expression_from_df_expr(
ctx,
&filter.predicate,
&schema,
)?));
let rel = FilterRel {
common: None,
input,
condition,
advanced_extension: None,
};
Rel {
rel_type: Some(RelType::Filter(Box::new(rel))),
}
}
.fail()?,
LogicalPlan::Window(_) => UnsupportedPlanSnafu {
name: "DataFusion Logical Window",
}
@@ -293,7 +355,7 @@ impl DFLogicalSubstraitConvertor {
}
.fail()?,
LogicalPlan::TableScan(table_scan) => {
let read_rel = self.convert_table_scan_plan(&mut ctx, table_scan)?;
let read_rel = self.convert_table_scan_plan(ctx, table_scan)?;
Rel {
rel_type: Some(RelType::Read(Box::new(read_rel))),
}
@@ -319,7 +381,13 @@ impl DFLogicalSubstraitConvertor {
),
}
.fail()?,
};
})
}
fn convert_df_plan(&self, plan: LogicalPlan) -> Result<Plan, Error> {
let mut ctx = ConvertorContext::default();
let rel = self.logical_plan_to_rel(&mut ctx, Arc::new(plan))?;
// convert extension
let extensions = ctx.generate_function_extension();
@@ -341,7 +409,7 @@ impl DFLogicalSubstraitConvertor {
pub fn convert_table_scan_plan(
&self,
ctx: &mut ConvertorContext,
table_scan: TableScan,
table_scan: &TableScan,
) -> Result<ReadRel, Error> {
let provider = table_scan
.source
@@ -363,7 +431,8 @@ impl DFLogicalSubstraitConvertor {
// assemble projection
let projection = table_scan
.projection
.map(|proj| self.convert_schema_projection(&proj));
.as_ref()
.map(|x| self.convert_schema_projection(x));
// assemble base (unprojected) schema using Table's schema.
let base_schema = from_schema(&provider.table().schema())?;
@@ -371,7 +440,8 @@ impl DFLogicalSubstraitConvertor {
// make conjunction over a list of filters and convert the result to substrait
let filter = if let Some(conjunction) = table_scan
.filters
.into_iter()
.iter()
.cloned()
.reduce(|accum, expr| accum.and(expr))
{
Some(Box::new(expression_from_df_expr(
@@ -412,6 +482,13 @@ impl DFLogicalSubstraitConvertor {
}
}
fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> bool {
lhs.fields.len() == rhs.fields.len()
&& lhs.fields.iter().zip(rhs.fields.iter()).all(|(x, y)| {
x.name == y.name && x.data_type == y.data_type && x.is_nullable == y.is_nullable
})
}
#[cfg(test)]
mod test {
use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
@@ -463,10 +540,10 @@ mod test {
}
async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) {
let convertor = DFLogicalSubstraitConvertor::new(catalog);
let convertor = DFLogicalSubstraitConvertor;
let proto = convertor.encode(plan.clone()).unwrap();
let tripped_plan = convertor.decode(proto).unwrap();
let tripped_plan = convertor.decode(proto, catalog).unwrap();
assert_eq!(format!("{:?}", plan), format!("{:?}", tripped_plan));
}
@@ -488,6 +565,7 @@ mod test {
.await
.unwrap();
let adapter = Arc::new(DfTableProviderAdapter::new(table_ref));
let projection = vec![1, 3, 5];
let df_schema = adapter.schema().to_dfschema().unwrap();
let projected_fields = projection
@@ -498,7 +576,10 @@ mod test {
Arc::new(DFSchema::new_with_metadata(projected_fields, Default::default()).unwrap());
let table_scan_plan = LogicalPlan::TableScan(TableScan {
table_name: DEFAULT_TABLE_NAME.to_string(),
table_name: format!(
"{}.{}.{}",
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME
),
source: adapter,
projection: Some(projection),
projected_schema,

View File

@@ -99,6 +99,12 @@ pub enum Error {
storage_schema: datafusion::arrow::datatypes::SchemaRef,
backtrace: Backtrace,
},
#[snafu(display("Failed to convert DataFusion schema, source: {}", source))]
ConvertDfSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -120,6 +126,7 @@ impl ErrorExt for Error {
| Error::TableNotFound { .. }
| Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments,
Error::DFInternal { .. } | Error::Internal { .. } => StatusCode::Internal,
Error::ConvertDfSchema { source } => source.status_code(),
}
}

View File

@@ -22,6 +22,7 @@ mod schema;
mod types;
use bytes::{Buf, Bytes};
use catalog::CatalogManagerRef;
pub use crate::df_logical::DFLogicalSubstraitConvertor;
@@ -30,7 +31,11 @@ pub trait SubstraitPlan {
type Plan;
fn decode<B: Buf + Send>(&self, message: B) -> Result<Self::Plan, Self::Error>;
fn decode<B: Buf + Send>(
&self,
message: B,
catalog_manager: CatalogManagerRef,
) -> Result<Self::Plan, Self::Error>;
fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error>;
}

View File

@@ -18,11 +18,13 @@
//! Current we only have variations on integer types. Variation 0 (system preferred) are the same with base types, which
//! are signed integer (i.e. I8 -> [i8]), and Variation 1 stands for unsigned integer (i.e. I8 -> [u8]).
use datafusion::scalar::ScalarValue;
use datatypes::prelude::ConcreteDataType;
use substrait_proto::protobuf::expression::literal::LiteralType;
use substrait_proto::protobuf::r#type::{self as s_type, Kind, Nullability};
use substrait_proto::protobuf::Type as SType;
use substrait_proto::protobuf::{Type as SType, Type};
use crate::error::{Result, UnsupportedConcreteTypeSnafu, UnsupportedSubstraitTypeSnafu};
use crate::error::{self, Result, UnsupportedConcreteTypeSnafu, UnsupportedSubstraitTypeSnafu};
macro_rules! substrait_kind {
($desc:ident, $concrete_ty:ident) => {{
@@ -134,3 +136,67 @@ pub fn from_concrete_type(ty: ConcreteDataType, nullability: Option<bool>) -> Re
Ok(SType { kind })
}
pub(crate) fn scalar_value_as_literal_type(v: &ScalarValue) -> Result<LiteralType> {
Ok(if v.is_null() {
LiteralType::Null(Type { kind: None })
} else {
match v {
ScalarValue::Boolean(Some(v)) => LiteralType::Boolean(*v),
ScalarValue::Float32(Some(v)) => LiteralType::Fp32(*v),
ScalarValue::Float64(Some(v)) => LiteralType::Fp64(*v),
ScalarValue::Int8(Some(v)) => LiteralType::I8(*v as i32),
ScalarValue::Int16(Some(v)) => LiteralType::I16(*v as i32),
ScalarValue::Int32(Some(v)) => LiteralType::I32(*v),
ScalarValue::Int64(Some(v)) => LiteralType::I64(*v),
ScalarValue::LargeUtf8(Some(v)) => LiteralType::String(v.clone()),
ScalarValue::LargeBinary(Some(v)) => LiteralType::Binary(v.clone()),
// TODO(LFC): Implement other conversions: ScalarValue => LiteralType
_ => {
return error::UnsupportedExprSnafu {
name: format!("{:?}", v),
}
.fail()
}
}
})
}
pub(crate) fn literal_type_to_scalar_value(t: LiteralType) -> Result<ScalarValue> {
Ok(match t {
LiteralType::Null(Type { kind: Some(kind) }) => match kind {
Kind::Bool(_) => ScalarValue::Boolean(None),
Kind::I8(_) => ScalarValue::Int8(None),
Kind::I16(_) => ScalarValue::Int16(None),
Kind::I32(_) => ScalarValue::Int32(None),
Kind::I64(_) => ScalarValue::Int64(None),
Kind::Fp32(_) => ScalarValue::Float32(None),
Kind::Fp64(_) => ScalarValue::Float64(None),
Kind::String(_) => ScalarValue::LargeUtf8(None),
Kind::Binary(_) => ScalarValue::LargeBinary(None),
// TODO(LFC): Implement other conversions: Kind => ScalarValue
_ => {
return error::UnsupportedSubstraitTypeSnafu {
ty: format!("{:?}", kind),
}
.fail()
}
},
LiteralType::Boolean(v) => ScalarValue::Boolean(Some(v)),
LiteralType::I8(v) => ScalarValue::Int8(Some(v as i8)),
LiteralType::I16(v) => ScalarValue::Int16(Some(v as i16)),
LiteralType::I32(v) => ScalarValue::Int32(Some(v)),
LiteralType::I64(v) => ScalarValue::Int64(Some(v)),
LiteralType::Fp32(v) => ScalarValue::Float32(Some(v)),
LiteralType::Fp64(v) => ScalarValue::Float64(Some(v)),
LiteralType::String(v) => ScalarValue::LargeUtf8(Some(v)),
LiteralType::Binary(v) => ScalarValue::LargeBinary(Some(v)),
// TODO(LFC): Implement other conversions: LiteralType => ScalarValue
_ => {
return error::UnsupportedSubstraitTypeSnafu {
ty: format!("{:?}", t),
}
.fail()
}
})
}

View File

@@ -151,9 +151,8 @@ impl Instance {
}
async fn execute_logical(&self, plan_bytes: Vec<u8>) -> Result<Output> {
let logical_plan_converter = DFLogicalSubstraitConvertor::new(self.catalog_manager.clone());
let logical_plan = logical_plan_converter
.decode(plan_bytes.as_slice())
let logical_plan = DFLogicalSubstraitConvertor
.decode(plan_bytes.as_slice(), self.catalog_manager.clone())
.context(DecodeLogicalPlanSnafu)?;
self.query_engine

View File

@@ -20,6 +20,7 @@ use std::sync::Arc;
pub use arrow::datatypes::Metadata;
use arrow::datatypes::{Field, Schema as ArrowSchema};
use datafusion_common::DFSchemaRef;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -465,6 +466,15 @@ impl TryFrom<ArrowSchema> for Schema {
}
}
impl TryFrom<DFSchemaRef> for Schema {
type Error = Error;
fn try_from(value: DFSchemaRef) -> Result<Self> {
let s: ArrowSchema = value.as_ref().into();
s.try_into()
}
}
fn try_parse_version(metadata: &Metadata, key: &str) -> Result<u32> {
if let Some(value) = metadata.get(key) {
let version = value

View File

@@ -45,6 +45,7 @@ snafu = { version = "0.7", features = ["backtraces"] }
sql = { path = "../sql" }
sqlparser = "0.15"
store-api = { path = "../store-api" }
substrait = { path = "../common/substrait" }
table = { path = "../table" }
tokio = { version = "1.18", features = ["full"] }

View File

@@ -17,13 +17,16 @@ use std::collections::HashSet;
use std::sync::Arc;
use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu};
use catalog::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue,
};
use catalog::remote::{Kv, KvBackendRef};
use catalog::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider,
SchemaProviderRef,
};
use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue};
use futures::StreamExt;
use meta_client::rpc::TableName;
use snafu::prelude::*;
@@ -130,7 +133,7 @@ impl CatalogList for FrontendCatalogManager {
let backend = self.backend.clone();
let res = std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let key = common_catalog::build_catalog_prefix();
let key = build_catalog_prefix();
let mut iter = backend.range(key.as_bytes());
let mut res = HashSet::new();
@@ -180,7 +183,7 @@ impl CatalogProvider for FrontendCatalogProvider {
let catalog_name = self.catalog_name.clone();
let res = std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let key = common_catalog::build_schema_prefix(&catalog_name);
let key = build_schema_prefix(&catalog_name);
let mut iter = backend.range(key.as_bytes());
let mut res = HashSet::new();
@@ -242,7 +245,7 @@ impl SchemaProvider for FrontendSchemaProvider {
std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let key = common_catalog::build_table_global_prefix(catalog_name, schema_name);
let key = build_table_global_prefix(catalog_name, schema_name);
let mut iter = backend.range(key.as_bytes());
let mut res = HashSet::new();

View File

@@ -445,6 +445,12 @@ pub enum Error {
#[snafu(display("Table already exists: `{}`", table))]
TableAlreadyExist { table: String, backtrace: Backtrace },
#[snafu(display("Failed to encode Substrait logical plan, source: {}", source))]
EncodeSubstraitLogicalPlan {
#[snafu(backtrace)]
source: substrait::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -536,6 +542,7 @@ impl ErrorExt for Error {
Error::AlterExprToRequest { source, .. } => source.status_code(),
Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable,
Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Error::EncodeSubstraitLogicalPlan { source } => source.status_code(),
}
}

View File

@@ -17,11 +17,11 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{AlterExpr, CreateDatabaseExpr, CreateExpr};
use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
use catalog::CatalogList;
use chrono::DateTime;
use client::admin::{admin_result_to_output, Admin};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
use common_query::Output;
use common_telemetry::{debug, error, info};
use datatypes::prelude::ConcreteDataType;

View File

@@ -818,9 +818,9 @@ mod test {
async fn new_dist_table() -> DistTable {
let column_schemas = vec![
ColumnSchema::new("ts", ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("row_id", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("row_id", ConcreteDataType::int32_datatype(), false),
];
let schema = Arc::new(Schema::new(column_schemas.clone()));

View File

@@ -16,17 +16,14 @@ use std::fmt::Formatter;
use std::sync::Arc;
use api::v1::InsertExpr;
use client::{Database, ObjectResult, Select};
use client::{Database, ObjectResult};
use common_query::prelude::Expr;
use common_query::Output;
use common_recordbatch::{util, RecordBatches};
use datafusion::logical_plan::{LogicalPlan as DfLogicPlan, LogicalPlanBuilder};
use datafusion_expr::Expr as DfExpr;
use datatypes::prelude::*;
use datatypes::schema::SchemaRef;
use datafusion::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use meta_client::rpc::TableName;
use query::plan::LogicalPlan;
use snafu::ResultExt;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
@@ -56,12 +53,13 @@ impl DatanodeInstance {
pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> Result<RecordBatches> {
let logical_plan = self.build_logical_plan(&plan)?;
// TODO(LFC): Directly pass in logical plan to GRPC interface when our substrait codec supports filter.
let sql = to_sql(logical_plan)?;
let substrait_plan = DFLogicalSubstraitConvertor
.encode(logical_plan)
.context(error::EncodeSubstraitLogicalPlanSnafu)?;
let output = self
.db
.select(Select::Sql(sql))
.logical_plan(substrait_plan.to_vec())
.await
.and_then(Output::try_from)
.context(error::SelectSnafu)?;
@@ -94,14 +92,25 @@ impl DatanodeInstance {
)
.context(error::BuildDfLogicalPlanSnafu)?;
if let Some(filter) = table_scan
.filters
.iter()
.map(|x| x.df_expr())
.cloned()
.reduce(|accum, expr| accum.and(expr))
{
builder = builder
.filter(filter)
.context(error::BuildDfLogicalPlanSnafu)?;
}
if let Some(limit) = table_scan.limit {
builder = builder
.limit(limit)
.context(error::BuildDfLogicalPlanSnafu)?;
}
let plan = builder.build().context(error::BuildDfLogicalPlanSnafu)?;
Ok(LogicalPlan::DfPlan(plan))
builder.build().context(error::BuildDfLogicalPlanSnafu)
}
}
@@ -112,79 +121,3 @@ pub(crate) struct TableScanPlan {
pub filters: Vec<Expr>,
pub limit: Option<usize>,
}
fn to_sql(plan: LogicalPlan) -> Result<String> {
let LogicalPlan::DfPlan(plan) = plan;
let table_scan = match plan {
DfLogicPlan::TableScan(table_scan) => table_scan,
_ => unreachable!("unknown plan: {:?}", plan),
};
let schema: SchemaRef = Arc::new(
table_scan
.source
.schema()
.try_into()
.context(error::ConvertArrowSchemaSnafu)?,
);
let projection = table_scan
.projection
.map(|x| {
x.iter()
.map(|i| schema.column_name_by_index(*i).to_string())
.collect::<Vec<String>>()
})
.unwrap_or_else(|| {
schema
.column_schemas()
.iter()
.map(|x| x.name.clone())
.collect::<Vec<String>>()
})
.join(", ");
let mut sql = format!("select {} from {}", projection, &table_scan.table_name);
let filters = table_scan
.filters
.iter()
.map(expr_to_sql)
.collect::<Result<Vec<String>>>()?
.join(" AND ");
if !filters.is_empty() {
sql.push_str(" where ");
sql.push_str(&filters);
}
if let Some(limit) = table_scan.limit {
sql.push_str(" limit ");
sql.push_str(&limit.to_string());
}
Ok(sql)
}
fn expr_to_sql(expr: &DfExpr) -> Result<String> {
Ok(match expr {
DfExpr::BinaryExpr {
ref left,
ref right,
ref op,
} => format!(
"{} {} {}",
expr_to_sql(left.as_ref())?,
op,
expr_to_sql(right.as_ref())?
),
DfExpr::Column(c) => c.name.clone(),
DfExpr::Literal(sv) => {
let v: Value = Value::try_from(sv.clone())
.with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?;
if matches!(v.data_type(), ConcreteDataType::String(_)) {
format!("'{}'", sv)
} else {
format!("{}", sv)
}
}
_ => unimplemented!("not implemented for {:?}", expr),
})
}

View File

@@ -10,6 +10,7 @@ mock = []
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }

View File

@@ -15,7 +15,7 @@
use std::str::FromStr;
use api::v1::meta::TableName;
use common_catalog::TableGlobalKey;
use catalog::helper::TableGlobalKey;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};

View File

@@ -16,7 +16,7 @@ use api::v1::meta::{
router_server, CreateRequest, Error, PeerDict, PutRequest, RangeRequest, Region, RegionRoute,
ResponseHeader, RouteRequest, RouteResponse, Table, TableRoute, TableRouteValue,
};
use common_catalog::{TableGlobalKey, TableGlobalValue};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};

View File

@@ -7,6 +7,7 @@ license = "Apache-2.0"
[dependencies]
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }

View File

@@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use chrono::{DateTime, Utc};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef};
use derive_builder::Builder;
@@ -333,9 +334,9 @@ pub struct TableInfo {
/// Comment of the table.
#[builder(default, setter(into))]
pub desc: Option<String>,
#[builder(default, setter(into))]
#[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
pub catalog_name: String,
#[builder(default, setter(into))]
#[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
pub schema_name: String,
pub meta: TableMeta,
#[builder(default = "TableType::Base")]