diff --git a/Cargo.lock b/Cargo.lock index c2be732e37..7ad9da1fa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3388,22 +3388,32 @@ name = "flow" version = "0.7.2" dependencies = [ "api", + "catalog", + "common-catalog", "common-decimal", "common-error", "common-macro", "common-telemetry", "common-time", + "datafusion-common", + "datafusion-expr", + "datafusion-substrait", "datatypes", "enum_dispatch", "hydroflow", "itertools 0.10.5", "num-traits", + "prost 0.12.3", + "query", "serde", "serde_json", "servers", + "session", "smallvec", "snafu", "strum 0.25.0", + "substrait 0.7.2", + "table", "tokio", "tonic 0.10.2", ] diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 611d06b934..d5be926121 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -14,10 +14,13 @@ common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true common-time.workspace = true +datafusion-substrait.workspace = true datatypes.workspace = true enum_dispatch = "0.3" # This fork is simply for keeping our dependency in our org, and pin the version # it is the same with upstream repo +datafusion-common.workspace = true +datafusion-expr.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", rev = "ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" } itertools.workspace = true num-traits = "0.2" @@ -30,4 +33,11 @@ tokio.workspace = true tonic.workspace = true [dev-dependencies] +catalog.workspace = true +common-catalog.workspace = true +prost.workspace = true +query.workspace = true serde_json = "1.0" +session.workspace = true +substrait.workspace = true +table.workspace = true diff --git a/src/flow/src/compute.rs b/src/flow/src/compute.rs index bb17c79dd8..294716edf0 100644 --- a/src/flow/src/compute.rs +++ b/src/flow/src/compute.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Build and Compute the dataflow + mod render; mod state; mod types; diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 92fc20bc9f..06b64f7c4a 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -18,6 +18,8 @@ use std::collections::HashMap; use std::sync::OnceLock; use common_time::DateTime; +use datafusion_expr::Operator; +use datafusion_substrait::logical_plan::consumer::name_to_op; use datatypes::data_type::ConcreteDataType; use datatypes::types::cast; use datatypes::types::cast::CastOption; @@ -316,7 +318,7 @@ impl BinaryFunc { ConcreteDataType::null_datatype(), ConcreteDataType::null_datatype() ], - output: ConcreteDataType::null_datatype(), + output: ConcreteDataType::boolean_datatype(), generic_fn: match self { Self::Eq => GenericFn::Eq, Self::NotEq => GenericFn::NotEq, @@ -391,7 +393,44 @@ impl BinaryFunc { }) } + /// try it's best to infer types from the input types and expressions + /// + /// if it can't found out types, will return None + pub(crate) fn infer_type_from( + generic: GenericFn, + arg_exprs: &[ScalarExpr], + arg_types: &[Option], + ) -> Result { + let ret = match (arg_types[0].as_ref(), arg_types[1].as_ref()) { + (Some(t1), Some(t2)) => { + ensure!( + t1 == t2, + InvalidQuerySnafu { + reason: format!( + "Binary function {:?} requires both arguments to have the same type", + generic + ), + } + ); + t1.clone() + } + (Some(t), None) | (None, Some(t)) => t.clone(), + _ => arg_exprs[0] + .as_literal() + .map(|lit| lit.data_type()) + .or_else(|| arg_exprs[1].as_literal().map(|lit| lit.data_type())) + .with_context(|| InvalidQuerySnafu { + reason: format!( + "Binary function {:?} requires at least one argument with known type", + generic + ), + })?, + }; + Ok(ret) + } + /// choose the appropriate specialization based on the input types + /// return a specialization of the binary function and it's actual input and output type(so no null type present) /// /// will try it best to extract from `arg_types` and `arg_exprs` to get the input types /// if `arg_types` is not enough, it will try to extract from `arg_exprs` if `arg_exprs` is literal with known type @@ -399,24 +438,39 @@ impl BinaryFunc { name: &str, arg_exprs: &[ScalarExpr], arg_types: &[Option], - ) -> Result { + ) -> Result<(Self, Signature), Error> { + // this `name_to_op` if error simply return a similar message of `unsupported function xxx` so + let op = name_to_op(name).or_else(|err| { + if let datafusion_common::DataFusionError::NotImplemented(msg) = err { + InvalidQuerySnafu { + reason: format!("Unsupported binary function: {}", msg), + } + .fail() + } else { + InvalidQuerySnafu { + reason: format!("Error when parsing binary function: {:?}", err), + } + .fail() + } + })?; + // get first arg type and make sure if both is some, they are the same let generic_fn = { - match name { - "eq" => GenericFn::Eq, - "not_eq" => GenericFn::NotEq, - "lt" => GenericFn::Lt, - "lte" => GenericFn::Lte, - "gt" => GenericFn::Gt, - "gte" => GenericFn::Gte, - "add" => GenericFn::Add, - "sub" => GenericFn::Sub, - "mul" => GenericFn::Mul, - "div" => GenericFn::Div, - "mod" => GenericFn::Mod, + match op { + Operator::Eq => GenericFn::Eq, + Operator::NotEq => GenericFn::NotEq, + Operator::Lt => GenericFn::Lt, + Operator::LtEq => GenericFn::Lte, + Operator::Gt => GenericFn::Gt, + Operator::GtEq => GenericFn::Gte, + Operator::Plus => GenericFn::Add, + Operator::Minus => GenericFn::Sub, + Operator::Multiply => GenericFn::Mul, + Operator::Divide => GenericFn::Div, + Operator::Modulo => GenericFn::Mod, _ => { return InvalidQuerySnafu { - reason: format!("Unknown binary function: {}", name), + reason: format!("Unsupported binary function: {}", name), } .fail(); } @@ -434,43 +488,25 @@ impl BinaryFunc { } ); - let arg_type = match (arg_types[0].as_ref(), arg_types[1].as_ref()) { - (Some(t1), Some(t2)) => { - ensure!( - t1 == t2, - InvalidQuerySnafu { - reason: format!( - "Binary function {} requires both arguments to have the same type", - name - ), - } - ); - Some(t1.clone()) - } - (Some(t), None) | (None, Some(t)) => Some(t.clone()), - _ => arg_exprs[0] - .as_literal() - .map(|lit| lit.data_type()) - .or_else(|| arg_exprs[1].as_literal().map(|lit| lit.data_type())), + let arg_type = Self::infer_type_from(generic_fn, arg_exprs, arg_types)?; + + // if type is not needed, we can erase input type to null to find correct functions for + // functions that do not need type + let query_input_type = if need_type { + arg_type.clone() + } else { + ConcreteDataType::null_datatype() }; - ensure!( - !need_type || arg_type.is_some(), - InvalidQuerySnafu { - reason: format!( - "Binary function {} requires at least one argument with known type", - name - ), - } - ); + let spec_fn = Self::specialization(generic_fn, query_input_type)?; - let spec_fn = Self::specialization( + let signature = Signature { + input: smallvec![arg_type.clone(), arg_type.clone()], + output: spec_fn.signature().output.clone(), generic_fn, - arg_type - .clone() - .unwrap_or(ConcreteDataType::null_datatype()), - )?; - Ok(spec_fn) + }; + + Ok((spec_fn, signature)) } /// Evaluate the function with given values and expression @@ -756,7 +792,7 @@ fn test_binary_func_spec() { ] ) .unwrap(), - BinaryFunc::AddInt32 + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) ); assert_eq!( @@ -766,7 +802,7 @@ fn test_binary_func_spec() { &[Some(ConcreteDataType::int32_datatype()), None] ) .unwrap(), - BinaryFunc::AddInt32 + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) ); assert_eq!( @@ -776,7 +812,7 @@ fn test_binary_func_spec() { &[Some(ConcreteDataType::int32_datatype()), None] ) .unwrap(), - BinaryFunc::AddInt32 + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) ); assert_eq!( @@ -786,7 +822,7 @@ fn test_binary_func_spec() { &[Some(ConcreteDataType::int32_datatype()), None] ) .unwrap(), - BinaryFunc::AddInt32 + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) ); assert_eq!( @@ -799,7 +835,31 @@ fn test_binary_func_spec() { &[None, None] ) .unwrap(), - BinaryFunc::AddInt32 + (BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature()) + ); + + // this testcase make sure the specialization can find actual type from expression and fill in signature + assert_eq!( + BinaryFunc::from_str_expr_and_type( + "equal", + &[ + ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()), + ScalarExpr::Column(0) + ], + &[None, None] + ) + .unwrap(), + ( + BinaryFunc::Eq, + Signature { + input: smallvec![ + ConcreteDataType::int32_datatype(), + ConcreteDataType::int32_datatype() + ], + output: ConcreteDataType::boolean_datatype(), + generic_fn: GenericFn::Eq + } + ) ); matches!( diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 37e6d1df87..2070e46c84 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -427,6 +427,7 @@ impl MapFilterProject { /// A wrapper type which indicates it is safe to simply evaluate all expressions. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] pub struct SafeMfpPlan { + /// the inner `MapFilterProject` that is safe to evaluate. pub(crate) mfp: MapFilterProject, } diff --git a/src/flow/src/expr/relation/accum.rs b/src/flow/src/expr/relation/accum.rs index ed898d0c4d..37e0f51130 100644 --- a/src/flow/src/expr/relation/accum.rs +++ b/src/flow/src/expr/relation/accum.rs @@ -643,6 +643,7 @@ fn ty_eq_without_precision(left: ConcreteDataType, right: ConcreteDataType) -> b && matches!(right, ConcreteDataType::Interval(..)) } +#[allow(clippy::too_many_lines)] #[cfg(test)] mod test { use super::*; diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index a0b049b1a2..8c765024c6 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -130,6 +130,7 @@ impl AggregateFunc { } } +/// Generate signature for each aggregate function macro_rules! generate_signature { ($value:ident, { $($user_arm:tt)* }, [ $( diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 53c995462f..e16bcf45ec 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -19,7 +19,8 @@ #![allow(dead_code)] #![allow(unused_imports)] #![warn(missing_docs)] -#[warn(clippy::missing_docs_in_private_items)] +#![warn(clippy::missing_docs_in_private_items)] +#![warn(clippy::too_many_lines)] // allow unused for now because it should be use later mod adapter; mod compute; diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index b4ed48a1c7..2bf1301d07 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -18,14 +18,16 @@ mod join; mod reduce; +use datatypes::arrow::ipc::Map; use serde::{Deserialize, Serialize}; pub(crate) use self::reduce::{AccumulablePlan, KeyValPlan, ReducePlan}; +use crate::adapter::error::Error; use crate::expr::{ AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, }; use crate::plan::join::JoinPlan; -use crate::repr::{DiffRow, RelationType}; +use crate::repr::{ColumnType, DiffRow, RelationType}; /// A plan for a dataflow component. But with type to indicate the output type of the relation. #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] @@ -36,6 +38,76 @@ pub struct TypedPlan { pub plan: Plan, } +impl TypedPlan { + /// directly apply a mfp to the plan + pub fn mfp(self, mfp: MapFilterProject) -> Result { + let plan = match self.plan { + Plan::Mfp { + input, + mfp: old_mfp, + } => Plan::Mfp { + input, + mfp: MapFilterProject::compose(old_mfp, mfp)?, + }, + _ => Plan::Mfp { + input: Box::new(self.plan), + mfp, + }, + }; + Ok(TypedPlan { + typ: self.typ, + plan, + }) + } + + /// project the plan to the given expressions + pub fn projection(self, exprs: Vec<(ScalarExpr, ColumnType)>) -> Result { + let input_arity = self.typ.column_types.len(); + let output_arity = exprs.len(); + let (exprs, expr_typs): (Vec<_>, Vec<_>) = exprs.into_iter().unzip(); + let mfp = MapFilterProject::new(input_arity) + .map(exprs)? + .project(input_arity..input_arity + output_arity)?; + // special case for mfp to compose when the plan is already mfp + let plan = match self.plan { + Plan::Mfp { + input, + mfp: old_mfp, + } => Plan::Mfp { + input, + mfp: MapFilterProject::compose(old_mfp, mfp)?, + }, + _ => Plan::Mfp { + input: Box::new(self.plan), + mfp, + }, + }; + let typ = RelationType::new(expr_typs); + Ok(TypedPlan { typ, plan }) + } + + /// Add a new filter to the plan, will filter out the records that do not satisfy the filter + pub fn filter(self, filter: (ScalarExpr, ColumnType)) -> Result { + let plan = match self.plan { + Plan::Mfp { + input, + mfp: old_mfp, + } => Plan::Mfp { + input, + mfp: old_mfp.filter(vec![filter.0])?, + }, + _ => Plan::Mfp { + input: Box::new(self.plan), + mfp: MapFilterProject::new(self.typ.column_types.len()).filter(vec![filter.0])?, + }, + }; + Ok(TypedPlan { + typ: self.typ, + plan, + }) + } +} + /// TODO(discord9): support `TableFunc`(by define FlatMap that map 1 to n) /// Plan describe how to transform data in dataflow #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 5239869c39..4c50a77392 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -27,7 +27,7 @@ use datatypes::types::cast; use datatypes::types::cast::CastOption; use datatypes::value::Value; use itertools::Itertools; -pub(crate) use relation::{RelationDesc, RelationType}; +pub(crate) use relation::{ColumnType, RelationDesc, RelationType}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index c1f7bfdc88..fa110c4564 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -196,6 +196,35 @@ pub struct ColumnType { pub nullable: bool, } +impl ColumnType { + /// Constructs a new `ColumnType` from a scalar type and a nullability flag. + pub fn new(scalar_type: ConcreteDataType, nullable: bool) -> Self { + ColumnType { + scalar_type, + nullable, + } + } + + /// Constructs a new `ColumnType` from a scalar type, with nullability set to + /// ***true*** + pub fn new_nullable(scalar_type: ConcreteDataType) -> Self { + ColumnType { + scalar_type, + nullable: true, + } + } + + /// Returns the scalar type of this column. + pub fn scalar_type(&self) -> &ConcreteDataType { + &self.scalar_type + } + + /// Returns true if this column can be null. + pub fn nullable(&self) -> bool { + self.nullable + } +} + /// This method exists solely for the purpose of making ColumnType nullable by /// default in unit tests. The default value of a bool is false, and the only /// way to make an object take on any other value by default is to pass it a diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 8623f3fada..f9e10ec7e3 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! utilities for managing state of dataflow execution + use std::collections::{BTreeMap, BTreeSet}; use std::ops::Bound; use std::sync::Arc;