From c62c67cf181c774a405eaf0e7245403a1e1f7685 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Mon, 5 Feb 2024 15:49:34 +0800 Subject: [PATCH] feat: Basic Definitions for Expression&Functions for Dataflow (#3267) * added expression&func * fix: EvalError derive&imports * chore: add header * feat: variadic func * chore: minor adjust * feat: accum * feat: use accum for eval func * feat: montonic min/max as accumulative * feat: support min/max Date&DateTime * chore: fix compile error&add test(WIP) * test: sum, count, min, max * feat: remove trait impl for EvalError * chore: remove all impl retain only type definitions * refactor: nest datatypes errors * fix: remove `.build()` * fix: not derive Clone * docs: add comment for types * feat: more func&remove `CurrentDatabase` --- Cargo.lock | 1 + src/flow/Cargo.toml | 1 + src/flow/src/expr.rs | 10 ++- src/flow/src/expr/error.rs | 2 +- src/flow/src/expr/func.rs | 103 ++++++++++++++++++++++++++++ src/flow/src/expr/linear.rs | 96 ++++++++++++++++++++++++++ src/flow/src/expr/relation.rs | 1 + src/flow/src/expr/relation/accum.rs | 71 +++++++++++++++++++ src/flow/src/expr/relation/func.rs | 85 +++++++++++++++++++++++ src/flow/src/expr/scalar.rs | 61 ++++++++++++++++ 10 files changed, 428 insertions(+), 3 deletions(-) create mode 100644 src/flow/src/expr/func.rs create mode 100644 src/flow/src/expr/linear.rs create mode 100644 src/flow/src/expr/relation/accum.rs create mode 100644 src/flow/src/expr/relation/func.rs create mode 100644 src/flow/src/expr/scalar.rs diff --git a/Cargo.lock b/Cargo.lock index 65c9740f0b..d2a0ced8bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3397,6 +3397,7 @@ version = "0.6.0" dependencies = [ "api", "bimap", + "common-decimal", "common-error", "common-macro", "common-meta", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index d500b2351a..f20aa5d07e 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] api.workspace = true bimap = "0.6.3" +common-decimal.workspace = true common-error.workspace = true common-macro.workspace = true common-meta.workspace = true diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index cc83ffc713..043c37b2be 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -15,8 +15,14 @@ //! for declare Expression in dataflow, including map, reduce, id and join(TODO!) etc. pub(crate) mod error; +mod func; mod id; +mod linear; +mod relation; +mod scalar; +pub(crate) use error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu}; +pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}; pub(crate) use id::{GlobalId, Id, LocalId}; - -pub(crate) use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu}; +pub(crate) use relation::{AggregateExpr, AggregateFunc}; +pub(crate) use scalar::ScalarExpr; diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 8c459200c8..0fd58ba1cf 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -29,7 +29,7 @@ use snafu::{Location, Snafu}; #[stack_trace_debug] pub enum EvalError { #[snafu(display("Division by zero"))] - DivisionByZero, + DivisionByZero { location: Location }, #[snafu(display("Type mismatch: expected {expected}, actual {actual}"))] TypeMismatch { diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs new file mode 100644 index 0000000000..eed43f65a7 --- /dev/null +++ b/src/flow/src/expr/func.rs @@ -0,0 +1,103 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_time::DateTime; +use datatypes::data_type::ConcreteDataType; +use datatypes::types::cast; +use datatypes::types::cast::CastOption; +use datatypes::value::Value; +use hydroflow::bincode::Error; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use super::ScalarExpr; +use crate::expr::error::CastValueSnafu; +use crate::expr::InvalidArgumentSnafu; +// TODO(discord9): more function & eval +use crate::{ + expr::error::{EvalError, TryFromValueSnafu, TypeMismatchSnafu}, + repr::Row, +}; + +/// UnmaterializableFunc is a function that can't be eval independently, +/// and require special handling +#[derive(Ord, PartialOrd, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub enum UnmaterializableFunc { + Now, + CurrentSchema, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] +pub enum UnaryFunc { + Not, + IsNull, + IsTrue, + IsFalse, + StepTimestamp, + Cast(ConcreteDataType), +} +/// TODO(discord9): support more binary functions for more types +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] +pub enum BinaryFunc { + Eq, + NotEq, + Lt, + Lte, + Gt, + Gte, + AddInt16, + AddInt32, + AddInt64, + AddUInt16, + AddUInt32, + AddUInt64, + AddFloat32, + AddFloat64, + SubInt16, + SubInt32, + SubInt64, + SubUInt16, + SubUInt32, + SubUInt64, + SubFloat32, + SubFloat64, + MulInt16, + MulInt32, + MulInt64, + MulUInt16, + MulUInt32, + MulUInt64, + MulFloat32, + MulFloat64, + DivInt16, + DivInt32, + DivInt64, + DivUInt16, + DivUInt32, + DivUInt64, + DivFloat32, + DivFloat64, + ModInt16, + ModInt32, + ModInt64, + ModUInt16, + ModUInt32, + ModUInt64, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] +pub enum VariadicFunc { + And, + Or, +} diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs new file mode 100644 index 0000000000..eddb98c49e --- /dev/null +++ b/src/flow/src/expr/linear.rs @@ -0,0 +1,96 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, BTreeSet}; + +use datatypes::value::Value; +use serde::{Deserialize, Serialize}; + +use crate::expr::error::EvalError; +use crate::expr::{Id, LocalId, ScalarExpr}; +use crate::repr::{self, value_to_internal_ts, Diff, Row}; + +/// A compound operator that can be applied row-by-row. +/// +/// This operator integrates the map, filter, and project operators. +/// It applies a sequences of map expressions, which are allowed to +/// refer to previous expressions, interleaved with predicates which +/// must be satisfied for an output to be produced. If all predicates +/// evaluate to `Datum::True` the data at the identified columns are +/// collected and produced as output in a packed `Row`. +/// +/// This operator is a "builder" and its contents may contain expressions +/// that are not yet executable. For example, it may contain temporal +/// expressions in `self.expressions`, even though this is not something +/// we can directly evaluate. The plan creation methods will defensively +/// ensure that the right thing happens. +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct MapFilterProject { + /// A sequence of expressions that should be appended to the row. + /// + /// Many of these expressions may not be produced in the output, + /// and may only be present as common subexpressions. + pub expressions: Vec, + /// Expressions that must evaluate to `Datum::True` for the output + /// row to be produced. + /// + /// Each entry is prepended with a column identifier indicating + /// the column *before* which the predicate should first be applied. + /// Most commonly this would be one plus the largest column identifier + /// in the predicate's support, but it could be larger to implement + /// guarded evaluation of predicates. + /// + /// This list should be sorted by the first field. + pub predicates: Vec<(usize, ScalarExpr)>, + /// A sequence of column identifiers whose data form the output row. + pub projection: Vec, + /// The expected number of input columns. + /// + /// This is needed to ensure correct identification of newly formed + /// columns in the output. + pub input_arity: usize, +} + +/// A wrapper type which indicates it is safe to simply evaluate all expressions. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct SafeMfpPlan { + pub(crate) mfp: MapFilterProject, +} + +impl std::ops::Deref for SafeMfpPlan { + type Target = MapFilterProject; + fn deref(&self) -> &Self::Target { + &self.mfp + } +} + +/// Predicates partitioned into temporal and non-temporal. +/// +/// Temporal predicates require some recognition to determine their +/// structure, and it is best to do that once and re-use the results. +/// +/// There are restrictions on the temporal predicates we currently support. +/// They must directly constrain `MzNow` from below or above, +/// by expressions that do not themselves contain `MzNow`. +/// Conjunctions of such constraints are also ok. +#[derive(Clone, Debug, PartialEq)] +pub struct MfpPlan { + /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`. + pub(crate) mfp: SafeMfpPlan, + /// TODO(discord9): impl temporal filter later + /// Expressions that when evaluated lower-bound `MzNow`. + pub(crate) lower_bounds: Vec, + /// Expressions that when evaluated upper-bound `MzNow`. + pub(crate) upper_bounds: Vec, +} diff --git a/src/flow/src/expr/relation.rs b/src/flow/src/expr/relation.rs index 9e6072e789..520c858534 100644 --- a/src/flow/src/expr/relation.rs +++ b/src/flow/src/expr/relation.rs @@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize}; use crate::expr::ScalarExpr; +mod accum; mod func; /// Describes an aggregation expression. diff --git a/src/flow/src/expr/relation/accum.rs b/src/flow/src/expr/relation/accum.rs new file mode 100644 index 0000000000..e2b136e849 --- /dev/null +++ b/src/flow/src/expr/relation/accum.rs @@ -0,0 +1,71 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Accumulators for aggregate functions that's is accumulatable. i.e. sum/count +//! +//! Currently support sum, count, any, all + +use std::fmt::Display; + +use common_decimal::Decimal128; +use common_time::{Date, DateTime}; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::{OrderedF32, OrderedF64, OrderedFloat, Value}; +use hydroflow::futures::stream::Concat; +use serde::{Deserialize, Serialize}; + +use crate::expr::error::{InternalSnafu, TryFromValueSnafu, TypeMismatchSnafu}; +use crate::expr::{AggregateFunc, EvalError}; +use crate::repr::Diff; + +/// Accumulates values for the various types of accumulable aggregations. +/// +/// We assume that there are not more than 2^32 elements for the aggregation. +/// Thus we can perform a summation over i32 in an i64 accumulator +/// and not worry about exceeding its bounds. +/// +/// The float accumulator performs accumulation with tolerance for floating point error. +/// +/// TODO(discord9): check for overflowing +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum Accum { + /// Accumulates boolean values. + Bool { + /// The number of `true` values observed. + trues: Diff, + /// The number of `false` values observed. + falses: Diff, + }, + /// Accumulates simple numeric values. + SimpleNumber { + /// The accumulation of all non-NULL values observed. + accum: i128, + /// The number of non-NULL values observed. + non_nulls: Diff, + }, + /// Accumulates float values. + Float { + /// Accumulates non-special float values, i.e. not NaN, +inf, -inf. + /// accum will be set to zero if `non_nulls` is zero. + accum: OrderedF64, + /// Counts +inf + pos_infs: Diff, + /// Counts -inf + neg_infs: Diff, + /// Counts NaNs + nans: Diff, + /// Counts non-NULL values + non_nulls: Diff, + }, +} diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs new file mode 100644 index 0000000000..4c82281736 --- /dev/null +++ b/src/flow/src/expr/relation/func.rs @@ -0,0 +1,85 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::type_name; + +use common_time::{Date, DateTime}; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::{OrderedF32, OrderedF64, Value}; +use serde::{Deserialize, Serialize}; + +use crate::expr::error::{EvalError, TryFromValueSnafu, TypeMismatchSnafu}; +use crate::expr::relation::accum::Accum; +use crate::repr::Diff; + +/// Aggregate functions that can be applied to a group of rows. +/// +/// `Mean` function is deliberately not included as it can be computed from `Sum` and `Count`, whose state can be better managed. +/// +/// type of the input and output of the aggregate function: +/// +/// `sum(i*)->i64, sum(u*)->u64` +/// +/// `count()->i64` +/// +/// `min/max(T)->T` +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] +pub enum AggregateFunc { + MaxInt16, + MaxInt32, + MaxInt64, + MaxUInt16, + MaxUInt32, + MaxUInt64, + MaxFloat32, + MaxFloat64, + MaxBool, + MaxString, + MaxDate, + MaxDateTime, + MaxTimestamp, + MaxTime, + MaxDuration, + MaxInterval, + + MinInt16, + MinInt32, + MinInt64, + MinUInt16, + MinUInt32, + MinUInt64, + MinFloat32, + MinFloat64, + MinBool, + MinString, + MinDate, + MinDateTime, + MinTimestamp, + MinTime, + MinDuration, + MinInterval, + + SumInt16, + SumInt32, + SumInt64, + SumUInt16, + SumUInt32, + SumUInt64, + SumFloat32, + SumFloat64, + + Count, + Any, + All, +} diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs new file mode 100644 index 0000000000..3c1d745a86 --- /dev/null +++ b/src/flow/src/expr/scalar.rs @@ -0,0 +1,61 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, BTreeSet}; + +use datatypes::prelude::ConcreteDataType; +use datatypes::value::Value; +use serde::{Deserialize, Serialize}; + +use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu}; +use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}; + +/// A scalar expression, which can be evaluated to a value. +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ScalarExpr { + /// A column of the input row + Column(usize), + /// A literal value. + /// Extra type info to know original type even when it is null + Literal(Value, ConcreteDataType), + /// A call to an unmaterializable function. + /// + /// These functions cannot be evaluated by `ScalarExpr::eval`. They must + /// be transformed away by a higher layer. + CallUnmaterializable(UnmaterializableFunc), + CallUnary { + func: UnaryFunc, + expr: Box, + }, + CallBinary { + func: BinaryFunc, + expr1: Box, + expr2: Box, + }, + CallVariadic { + func: VariadicFunc, + exprs: Vec, + }, + /// Conditionally evaluated expressions. + /// + /// It is important that `then` and `els` only be evaluated if + /// `cond` is true or not, respectively. This is the only way + /// users can guard execution (other logical operator do not + /// short-circuit) and we need to preserve that. + If { + cond: Box, + then: Box, + els: Box, + }, +}