chore(flow): more comments&lint (#3680)

* chore: more comments&lint

* chore: per review

* chore: remove abundant dep
This commit is contained in:
discord9
2024-04-10 11:31:22 +08:00
committed by GitHub
parent 5d739932c0
commit c00c1d95ee
12 changed files with 245 additions and 56 deletions

10
Cargo.lock generated
View File

@@ -3388,22 +3388,32 @@ name = "flow"
version = "0.7.2"
dependencies = [
"api",
"catalog",
"common-catalog",
"common-decimal",
"common-error",
"common-macro",
"common-telemetry",
"common-time",
"datafusion-common",
"datafusion-expr",
"datafusion-substrait",
"datatypes",
"enum_dispatch",
"hydroflow",
"itertools 0.10.5",
"num-traits",
"prost 0.12.3",
"query",
"serde",
"serde_json",
"servers",
"session",
"smallvec",
"snafu",
"strum 0.25.0",
"substrait 0.7.2",
"table",
"tokio",
"tonic 0.10.2",
]

View File

@@ -14,10 +14,13 @@ common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
# This fork is simply for keeping our dependency in our org, and pin the version
# it is the same with upstream repo
datafusion-common.workspace = true
datafusion-expr.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", rev = "ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" }
itertools.workspace = true
num-traits = "0.2"
@@ -30,4 +33,11 @@ tokio.workspace = true
tonic.workspace = true
[dev-dependencies]
catalog.workspace = true
common-catalog.workspace = true
prost.workspace = true
query.workspace = true
serde_json = "1.0"
session.workspace = true
substrait.workspace = true
table.workspace = true

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Build and Compute the dataflow
mod render;
mod state;
mod types;

View File

@@ -18,6 +18,8 @@ use std::collections::HashMap;
use std::sync::OnceLock;
use common_time::DateTime;
use datafusion_expr::Operator;
use datafusion_substrait::logical_plan::consumer::name_to_op;
use datatypes::data_type::ConcreteDataType;
use datatypes::types::cast;
use datatypes::types::cast::CastOption;
@@ -316,7 +318,7 @@ impl BinaryFunc {
ConcreteDataType::null_datatype(),
ConcreteDataType::null_datatype()
],
output: ConcreteDataType::null_datatype(),
output: ConcreteDataType::boolean_datatype(),
generic_fn: match self {
Self::Eq => GenericFn::Eq,
Self::NotEq => GenericFn::NotEq,
@@ -391,7 +393,44 @@ impl BinaryFunc {
})
}
/// try it's best to infer types from the input types and expressions
///
/// if it can't found out types, will return None
pub(crate) fn infer_type_from(
generic: GenericFn,
arg_exprs: &[ScalarExpr],
arg_types: &[Option<ConcreteDataType>],
) -> Result<ConcreteDataType, Error> {
let ret = match (arg_types[0].as_ref(), arg_types[1].as_ref()) {
(Some(t1), Some(t2)) => {
ensure!(
t1 == t2,
InvalidQuerySnafu {
reason: format!(
"Binary function {:?} requires both arguments to have the same type",
generic
),
}
);
t1.clone()
}
(Some(t), None) | (None, Some(t)) => t.clone(),
_ => arg_exprs[0]
.as_literal()
.map(|lit| lit.data_type())
.or_else(|| arg_exprs[1].as_literal().map(|lit| lit.data_type()))
.with_context(|| InvalidQuerySnafu {
reason: format!(
"Binary function {:?} requires at least one argument with known type",
generic
),
})?,
};
Ok(ret)
}
/// choose the appropriate specialization based on the input types
/// return a specialization of the binary function and it's actual input and output type(so no null type present)
///
/// will try it best to extract from `arg_types` and `arg_exprs` to get the input types
/// if `arg_types` is not enough, it will try to extract from `arg_exprs` if `arg_exprs` is literal with known type
@@ -399,24 +438,39 @@ impl BinaryFunc {
name: &str,
arg_exprs: &[ScalarExpr],
arg_types: &[Option<ConcreteDataType>],
) -> Result<Self, Error> {
) -> Result<(Self, Signature), Error> {
// this `name_to_op` if error simply return a similar message of `unsupported function xxx` so
let op = name_to_op(name).or_else(|err| {
if let datafusion_common::DataFusionError::NotImplemented(msg) = err {
InvalidQuerySnafu {
reason: format!("Unsupported binary function: {}", msg),
}
.fail()
} else {
InvalidQuerySnafu {
reason: format!("Error when parsing binary function: {:?}", err),
}
.fail()
}
})?;
// get first arg type and make sure if both is some, they are the same
let generic_fn = {
match name {
"eq" => GenericFn::Eq,
"not_eq" => GenericFn::NotEq,
"lt" => GenericFn::Lt,
"lte" => GenericFn::Lte,
"gt" => GenericFn::Gt,
"gte" => GenericFn::Gte,
"add" => GenericFn::Add,
"sub" => GenericFn::Sub,
"mul" => GenericFn::Mul,
"div" => GenericFn::Div,
"mod" => GenericFn::Mod,
match op {
Operator::Eq => GenericFn::Eq,
Operator::NotEq => GenericFn::NotEq,
Operator::Lt => GenericFn::Lt,
Operator::LtEq => GenericFn::Lte,
Operator::Gt => GenericFn::Gt,
Operator::GtEq => GenericFn::Gte,
Operator::Plus => GenericFn::Add,
Operator::Minus => GenericFn::Sub,
Operator::Multiply => GenericFn::Mul,
Operator::Divide => GenericFn::Div,
Operator::Modulo => GenericFn::Mod,
_ => {
return InvalidQuerySnafu {
reason: format!("Unknown binary function: {}", name),
reason: format!("Unsupported binary function: {}", name),
}
.fail();
}
@@ -434,43 +488,25 @@ impl BinaryFunc {
}
);
let arg_type = match (arg_types[0].as_ref(), arg_types[1].as_ref()) {
(Some(t1), Some(t2)) => {
ensure!(
t1 == t2,
InvalidQuerySnafu {
reason: format!(
"Binary function {} requires both arguments to have the same type",
name
),
}
);
Some(t1.clone())
}
(Some(t), None) | (None, Some(t)) => Some(t.clone()),
_ => arg_exprs[0]
.as_literal()
.map(|lit| lit.data_type())
.or_else(|| arg_exprs[1].as_literal().map(|lit| lit.data_type())),
let arg_type = Self::infer_type_from(generic_fn, arg_exprs, arg_types)?;
// if type is not needed, we can erase input type to null to find correct functions for
// functions that do not need type
let query_input_type = if need_type {
arg_type.clone()
} else {
ConcreteDataType::null_datatype()
};
ensure!(
!need_type || arg_type.is_some(),
InvalidQuerySnafu {
reason: format!(
"Binary function {} requires at least one argument with known type",
name
),
}
);
let spec_fn = Self::specialization(generic_fn, query_input_type)?;
let spec_fn = Self::specialization(
let signature = Signature {
input: smallvec![arg_type.clone(), arg_type.clone()],
output: spec_fn.signature().output.clone(),
generic_fn,
arg_type
.clone()
.unwrap_or(ConcreteDataType::null_datatype()),
)?;
Ok(spec_fn)
};
Ok((spec_fn, signature))
}
/// Evaluate the function with given values and expression
@@ -756,7 +792,7 @@ fn test_binary_func_spec() {
]
)
.unwrap(),
BinaryFunc::AddInt32
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
assert_eq!(
@@ -766,7 +802,7 @@ fn test_binary_func_spec() {
&[Some(ConcreteDataType::int32_datatype()), None]
)
.unwrap(),
BinaryFunc::AddInt32
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
assert_eq!(
@@ -776,7 +812,7 @@ fn test_binary_func_spec() {
&[Some(ConcreteDataType::int32_datatype()), None]
)
.unwrap(),
BinaryFunc::AddInt32
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
assert_eq!(
@@ -786,7 +822,7 @@ fn test_binary_func_spec() {
&[Some(ConcreteDataType::int32_datatype()), None]
)
.unwrap(),
BinaryFunc::AddInt32
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
assert_eq!(
@@ -799,7 +835,31 @@ fn test_binary_func_spec() {
&[None, None]
)
.unwrap(),
BinaryFunc::AddInt32
(BinaryFunc::AddInt32, BinaryFunc::AddInt32.signature())
);
// this testcase make sure the specialization can find actual type from expression and fill in signature
assert_eq!(
BinaryFunc::from_str_expr_and_type(
"equal",
&[
ScalarExpr::Literal(Value::from(1i32), ConcreteDataType::int32_datatype()),
ScalarExpr::Column(0)
],
&[None, None]
)
.unwrap(),
(
BinaryFunc::Eq,
Signature {
input: smallvec![
ConcreteDataType::int32_datatype(),
ConcreteDataType::int32_datatype()
],
output: ConcreteDataType::boolean_datatype(),
generic_fn: GenericFn::Eq
}
)
);
matches!(

View File

@@ -427,6 +427,7 @@ impl MapFilterProject {
/// A wrapper type which indicates it is safe to simply evaluate all expressions.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct SafeMfpPlan {
/// the inner `MapFilterProject` that is safe to evaluate.
pub(crate) mfp: MapFilterProject,
}

View File

@@ -643,6 +643,7 @@ fn ty_eq_without_precision(left: ConcreteDataType, right: ConcreteDataType) -> b
&& matches!(right, ConcreteDataType::Interval(..))
}
#[allow(clippy::too_many_lines)]
#[cfg(test)]
mod test {
use super::*;

View File

@@ -130,6 +130,7 @@ impl AggregateFunc {
}
}
/// Generate signature for each aggregate function
macro_rules! generate_signature {
($value:ident, { $($user_arm:tt)* },
[ $(

View File

@@ -19,7 +19,8 @@
#![allow(dead_code)]
#![allow(unused_imports)]
#![warn(missing_docs)]
#[warn(clippy::missing_docs_in_private_items)]
#![warn(clippy::missing_docs_in_private_items)]
#![warn(clippy::too_many_lines)]
// allow unused for now because it should be use later
mod adapter;
mod compute;

View File

@@ -18,14 +18,16 @@
mod join;
mod reduce;
use datatypes::arrow::ipc::Map;
use serde::{Deserialize, Serialize};
pub(crate) use self::reduce::{AccumulablePlan, KeyValPlan, ReducePlan};
use crate::adapter::error::Error;
use crate::expr::{
AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr,
};
use crate::plan::join::JoinPlan;
use crate::repr::{DiffRow, RelationType};
use crate::repr::{ColumnType, DiffRow, RelationType};
/// A plan for a dataflow component. But with type to indicate the output type of the relation.
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
@@ -36,6 +38,76 @@ pub struct TypedPlan {
pub plan: Plan,
}
impl TypedPlan {
/// directly apply a mfp to the plan
pub fn mfp(self, mfp: MapFilterProject) -> Result<Self, Error> {
let plan = match self.plan {
Plan::Mfp {
input,
mfp: old_mfp,
} => Plan::Mfp {
input,
mfp: MapFilterProject::compose(old_mfp, mfp)?,
},
_ => Plan::Mfp {
input: Box::new(self.plan),
mfp,
},
};
Ok(TypedPlan {
typ: self.typ,
plan,
})
}
/// project the plan to the given expressions
pub fn projection(self, exprs: Vec<(ScalarExpr, ColumnType)>) -> Result<Self, Error> {
let input_arity = self.typ.column_types.len();
let output_arity = exprs.len();
let (exprs, expr_typs): (Vec<_>, Vec<_>) = exprs.into_iter().unzip();
let mfp = MapFilterProject::new(input_arity)
.map(exprs)?
.project(input_arity..input_arity + output_arity)?;
// special case for mfp to compose when the plan is already mfp
let plan = match self.plan {
Plan::Mfp {
input,
mfp: old_mfp,
} => Plan::Mfp {
input,
mfp: MapFilterProject::compose(old_mfp, mfp)?,
},
_ => Plan::Mfp {
input: Box::new(self.plan),
mfp,
},
};
let typ = RelationType::new(expr_typs);
Ok(TypedPlan { typ, plan })
}
/// Add a new filter to the plan, will filter out the records that do not satisfy the filter
pub fn filter(self, filter: (ScalarExpr, ColumnType)) -> Result<Self, Error> {
let plan = match self.plan {
Plan::Mfp {
input,
mfp: old_mfp,
} => Plan::Mfp {
input,
mfp: old_mfp.filter(vec![filter.0])?,
},
_ => Plan::Mfp {
input: Box::new(self.plan),
mfp: MapFilterProject::new(self.typ.column_types.len()).filter(vec![filter.0])?,
},
};
Ok(TypedPlan {
typ: self.typ,
plan,
})
}
}
/// TODO(discord9): support `TableFunc`by define FlatMap that map 1 to n)
/// Plan describe how to transform data in dataflow
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]

View File

@@ -27,7 +27,7 @@ use datatypes::types::cast;
use datatypes::types::cast::CastOption;
use datatypes::value::Value;
use itertools::Itertools;
pub(crate) use relation::{RelationDesc, RelationType};
pub(crate) use relation::{ColumnType, RelationDesc, RelationType};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;

View File

@@ -196,6 +196,35 @@ pub struct ColumnType {
pub nullable: bool,
}
impl ColumnType {
/// Constructs a new `ColumnType` from a scalar type and a nullability flag.
pub fn new(scalar_type: ConcreteDataType, nullable: bool) -> Self {
ColumnType {
scalar_type,
nullable,
}
}
/// Constructs a new `ColumnType` from a scalar type, with nullability set to
/// ***true***
pub fn new_nullable(scalar_type: ConcreteDataType) -> Self {
ColumnType {
scalar_type,
nullable: true,
}
}
/// Returns the scalar type of this column.
pub fn scalar_type(&self) -> &ConcreteDataType {
&self.scalar_type
}
/// Returns true if this column can be null.
pub fn nullable(&self) -> bool {
self.nullable
}
}
/// This method exists solely for the purpose of making ColumnType nullable by
/// default in unit tests. The default value of a bool is false, and the only
/// way to make an object take on any other value by default is to pass it a

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! utilities for managing state of dataflow execution
use std::collections::{BTreeMap, BTreeSet};
use std::ops::Bound;
use std::sync::Arc;