feat: Basic Definitions for Expression&Functions for Dataflow (#3267)

* added expression&func

* fix: EvalError derive&imports

* chore: add header

* feat: variadic func

* chore: minor adjust

* feat: accum

* feat: use accum for eval func

* feat: montonic min/max as accumulative

* feat: support min/max Date&DateTime

* chore: fix compile error&add test(WIP)

* test: sum, count, min, max

* feat: remove trait impl for EvalError

* chore: remove all impl retain only type definitions

* refactor: nest datatypes errors

* fix: remove `.build()`

* fix: not derive Clone

* docs: add comment for types

* feat: more func&remove `CurrentDatabase`
This commit is contained in:
discord9
2024-02-05 15:49:34 +08:00
committed by GitHub
parent 51feec2579
commit c62c67cf18
10 changed files with 428 additions and 3 deletions

1
Cargo.lock generated
View File

@@ -3397,6 +3397,7 @@ version = "0.6.0"
dependencies = [
"api",
"bimap",
"common-decimal",
"common-error",
"common-macro",
"common-meta",

View File

@@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
api.workspace = true
bimap = "0.6.3"
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true

View File

@@ -15,8 +15,14 @@
//! for declare Expression in dataflow, including map, reduce, id and join(TODO!) etc.
pub(crate) mod error;
mod func;
mod id;
mod linear;
mod relation;
mod scalar;
pub(crate) use error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
pub(crate) use id::{GlobalId, Id, LocalId};
pub(crate) use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
pub(crate) use relation::{AggregateExpr, AggregateFunc};
pub(crate) use scalar::ScalarExpr;

View File

@@ -29,7 +29,7 @@ use snafu::{Location, Snafu};
#[stack_trace_debug]
pub enum EvalError {
#[snafu(display("Division by zero"))]
DivisionByZero,
DivisionByZero { location: Location },
#[snafu(display("Type mismatch: expected {expected}, actual {actual}"))]
TypeMismatch {

103
src/flow/src/expr/func.rs Normal file
View File

@@ -0,0 +1,103 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::DateTime;
use datatypes::data_type::ConcreteDataType;
use datatypes::types::cast;
use datatypes::types::cast::CastOption;
use datatypes::value::Value;
use hydroflow::bincode::Error;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use super::ScalarExpr;
use crate::expr::error::CastValueSnafu;
use crate::expr::InvalidArgumentSnafu;
// TODO(discord9): more function & eval
use crate::{
expr::error::{EvalError, TryFromValueSnafu, TypeMismatchSnafu},
repr::Row,
};
/// UnmaterializableFunc is a function that can't be eval independently,
/// and require special handling
#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum UnmaterializableFunc {
Now,
CurrentSchema,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum UnaryFunc {
Not,
IsNull,
IsTrue,
IsFalse,
StepTimestamp,
Cast(ConcreteDataType),
}
/// TODO(discord9): support more binary functions for more types
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum BinaryFunc {
Eq,
NotEq,
Lt,
Lte,
Gt,
Gte,
AddInt16,
AddInt32,
AddInt64,
AddUInt16,
AddUInt32,
AddUInt64,
AddFloat32,
AddFloat64,
SubInt16,
SubInt32,
SubInt64,
SubUInt16,
SubUInt32,
SubUInt64,
SubFloat32,
SubFloat64,
MulInt16,
MulInt32,
MulInt64,
MulUInt16,
MulUInt32,
MulUInt64,
MulFloat32,
MulFloat64,
DivInt16,
DivInt32,
DivInt64,
DivUInt16,
DivUInt32,
DivUInt64,
DivFloat32,
DivFloat64,
ModInt16,
ModInt32,
ModInt64,
ModUInt16,
ModUInt32,
ModUInt64,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum VariadicFunc {
And,
Or,
}

View File

@@ -0,0 +1,96 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, BTreeSet};
use datatypes::value::Value;
use serde::{Deserialize, Serialize};
use crate::expr::error::EvalError;
use crate::expr::{Id, LocalId, ScalarExpr};
use crate::repr::{self, value_to_internal_ts, Diff, Row};
/// A compound operator that can be applied row-by-row.
///
/// This operator integrates the map, filter, and project operators.
/// It applies a sequences of map expressions, which are allowed to
/// refer to previous expressions, interleaved with predicates which
/// must be satisfied for an output to be produced. If all predicates
/// evaluate to `Datum::True` the data at the identified columns are
/// collected and produced as output in a packed `Row`.
///
/// This operator is a "builder" and its contents may contain expressions
/// that are not yet executable. For example, it may contain temporal
/// expressions in `self.expressions`, even though this is not something
/// we can directly evaluate. The plan creation methods will defensively
/// ensure that the right thing happens.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct MapFilterProject {
/// A sequence of expressions that should be appended to the row.
///
/// Many of these expressions may not be produced in the output,
/// and may only be present as common subexpressions.
pub expressions: Vec<ScalarExpr>,
/// Expressions that must evaluate to `Datum::True` for the output
/// row to be produced.
///
/// Each entry is prepended with a column identifier indicating
/// the column *before* which the predicate should first be applied.
/// Most commonly this would be one plus the largest column identifier
/// in the predicate's support, but it could be larger to implement
/// guarded evaluation of predicates.
///
/// This list should be sorted by the first field.
pub predicates: Vec<(usize, ScalarExpr)>,
/// A sequence of column identifiers whose data form the output row.
pub projection: Vec<usize>,
/// The expected number of input columns.
///
/// This is needed to ensure correct identification of newly formed
/// columns in the output.
pub input_arity: usize,
}
/// A wrapper type which indicates it is safe to simply evaluate all expressions.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct SafeMfpPlan {
pub(crate) mfp: MapFilterProject,
}
impl std::ops::Deref for SafeMfpPlan {
type Target = MapFilterProject;
fn deref(&self) -> &Self::Target {
&self.mfp
}
}
/// Predicates partitioned into temporal and non-temporal.
///
/// Temporal predicates require some recognition to determine their
/// structure, and it is best to do that once and re-use the results.
///
/// There are restrictions on the temporal predicates we currently support.
/// They must directly constrain `MzNow` from below or above,
/// by expressions that do not themselves contain `MzNow`.
/// Conjunctions of such constraints are also ok.
#[derive(Clone, Debug, PartialEq)]
pub struct MfpPlan {
/// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
pub(crate) mfp: SafeMfpPlan,
/// TODO(discord9): impl temporal filter later
/// Expressions that when evaluated lower-bound `MzNow`.
pub(crate) lower_bounds: Vec<ScalarExpr>,
/// Expressions that when evaluated upper-bound `MzNow`.
pub(crate) upper_bounds: Vec<ScalarExpr>,
}

View File

@@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize};
use crate::expr::ScalarExpr;
mod accum;
mod func;
/// Describes an aggregation expression.

View File

@@ -0,0 +1,71 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Accumulators for aggregate functions that's is accumulatable. i.e. sum/count
//!
//! Currently support sum, count, any, all
use std::fmt::Display;
use common_decimal::Decimal128;
use common_time::{Date, DateTime};
use datatypes::data_type::ConcreteDataType;
use datatypes::value::{OrderedF32, OrderedF64, OrderedFloat, Value};
use hydroflow::futures::stream::Concat;
use serde::{Deserialize, Serialize};
use crate::expr::error::{InternalSnafu, TryFromValueSnafu, TypeMismatchSnafu};
use crate::expr::{AggregateFunc, EvalError};
use crate::repr::Diff;
/// Accumulates values for the various types of accumulable aggregations.
///
/// We assume that there are not more than 2^32 elements for the aggregation.
/// Thus we can perform a summation over i32 in an i64 accumulator
/// and not worry about exceeding its bounds.
///
/// The float accumulator performs accumulation with tolerance for floating point error.
///
/// TODO(discord9): check for overflowing
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Accum {
/// Accumulates boolean values.
Bool {
/// The number of `true` values observed.
trues: Diff,
/// The number of `false` values observed.
falses: Diff,
},
/// Accumulates simple numeric values.
SimpleNumber {
/// The accumulation of all non-NULL values observed.
accum: i128,
/// The number of non-NULL values observed.
non_nulls: Diff,
},
/// Accumulates float values.
Float {
/// Accumulates non-special float values, i.e. not NaN, +inf, -inf.
/// accum will be set to zero if `non_nulls` is zero.
accum: OrderedF64,
/// Counts +inf
pos_infs: Diff,
/// Counts -inf
neg_infs: Diff,
/// Counts NaNs
nans: Diff,
/// Counts non-NULL values
non_nulls: Diff,
},
}

View File

@@ -0,0 +1,85 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::type_name;
use common_time::{Date, DateTime};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{OrderedF32, OrderedF64, Value};
use serde::{Deserialize, Serialize};
use crate::expr::error::{EvalError, TryFromValueSnafu, TypeMismatchSnafu};
use crate::expr::relation::accum::Accum;
use crate::repr::Diff;
/// Aggregate functions that can be applied to a group of rows.
///
/// `Mean` function is deliberately not included as it can be computed from `Sum` and `Count`, whose state can be better managed.
///
/// type of the input and output of the aggregate function:
///
/// `sum(i*)->i64, sum(u*)->u64`
///
/// `count()->i64`
///
/// `min/max(T)->T`
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub enum AggregateFunc {
MaxInt16,
MaxInt32,
MaxInt64,
MaxUInt16,
MaxUInt32,
MaxUInt64,
MaxFloat32,
MaxFloat64,
MaxBool,
MaxString,
MaxDate,
MaxDateTime,
MaxTimestamp,
MaxTime,
MaxDuration,
MaxInterval,
MinInt16,
MinInt32,
MinInt64,
MinUInt16,
MinUInt32,
MinUInt64,
MinFloat32,
MinFloat64,
MinBool,
MinString,
MinDate,
MinDateTime,
MinTimestamp,
MinTime,
MinDuration,
MinInterval,
SumInt16,
SumInt32,
SumInt64,
SumUInt16,
SumUInt32,
SumUInt64,
SumFloat32,
SumFloat64,
Count,
Any,
All,
}

View File

@@ -0,0 +1,61 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, BTreeSet};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use serde::{Deserialize, Serialize};
use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu};
use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc};
/// A scalar expression, which can be evaluated to a value.
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ScalarExpr {
/// A column of the input row
Column(usize),
/// A literal value.
/// Extra type info to know original type even when it is null
Literal(Value, ConcreteDataType),
/// A call to an unmaterializable function.
///
/// These functions cannot be evaluated by `ScalarExpr::eval`. They must
/// be transformed away by a higher layer.
CallUnmaterializable(UnmaterializableFunc),
CallUnary {
func: UnaryFunc,
expr: Box<ScalarExpr>,
},
CallBinary {
func: BinaryFunc,
expr1: Box<ScalarExpr>,
expr2: Box<ScalarExpr>,
},
CallVariadic {
func: VariadicFunc,
exprs: Vec<ScalarExpr>,
},
/// Conditionally evaluated expressions.
///
/// It is important that `then` and `els` only be evaluated if
/// `cond` is true or not, respectively. This is the only way
/// users can guard execution (other logical operator do not
/// short-circuit) and we need to preserve that.
If {
cond: Box<ScalarExpr>,
then: Box<ScalarExpr>,
els: Box<ScalarExpr>,
},
}