diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 043c37b2be..d54dfa4b9f 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -24,5 +24,6 @@ mod scalar; pub(crate) use error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu}; pub(crate) use func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}; pub(crate) use id::{GlobalId, Id, LocalId}; +pub(crate) use linear::{MapFilterProject, MfpPlan, SafeMfpPlan}; pub(crate) use relation::{AggregateExpr, AggregateFunc}; pub(crate) use scalar::ScalarExpr; diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 331b883996..d4a0ef5eda 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -45,7 +45,7 @@ use crate::repr::{self, value_to_internal_ts, Diff, Row}; /// expressions in `self.expressions`, even though this is not something /// we can directly evaluate. The plan creation methods will defensively /// ensure that the right thing happens. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub struct MapFilterProject { /// A sequence of expressions that should be appended to the row. /// @@ -415,7 +415,7 @@ impl MapFilterProject { } /// A wrapper type which indicates it is safe to simply evaluate all expressions. -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] pub struct SafeMfpPlan { pub(crate) mfp: MapFilterProject, } diff --git a/src/flow/src/expr/relation.rs b/src/flow/src/expr/relation.rs index 520c858534..db82c75425 100644 --- a/src/flow/src/expr/relation.rs +++ b/src/flow/src/expr/relation.rs @@ -21,7 +21,7 @@ mod accum; mod func; /// Describes an aggregation expression. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] pub struct AggregateExpr { /// Names the aggregation function. pub func: AggregateFunc, diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index c144f8ab50..ef7419888e 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -17,4 +17,5 @@ // allow unused for now because it should be use later mod adapter; mod expr; +mod plan; mod repr; diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs new file mode 100644 index 0000000000..77b8ffb1f6 --- /dev/null +++ b/src/flow/src/plan.rs @@ -0,0 +1,98 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module contain basic definition for dataflow's plan +//! that can be translate to hydro dataflow + +mod join; +mod reduce; + +use serde::{Deserialize, Serialize}; + +pub(crate) use self::reduce::{AccumulablePlan, KeyValPlan, ReducePlan}; +use crate::expr::{ + AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, +}; +use crate::plan::join::JoinPlan; +use crate::repr::{DiffRow, RelationType}; + +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] +pub struct TypedPlan { + /// output type of the relation + pub typ: RelationType, + pub plan: Plan, +} + +/// TODO(discord9): support `TableFunc`(by define FlatMap that map 1 to n) +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] +pub enum Plan { + /// A constant collection of rows. + Constant { rows: Vec }, + /// Get CDC data from an source, be it external reference to an existing source or an internal + /// reference to a `Let` identifier + Get { id: Id }, + /// Create a temporary collection from given `value``, and make this bind only available + /// in scope of `body` + Let { + id: LocalId, + value: Box, + body: Box, + }, + /// Map, Filter, and Project operators. + Mfp { + /// The input collection. + input: Box, + /// Linear operator to apply to each record. + mfp: MapFilterProject, + }, + /// Reduce operator, aggregation by key assembled from KeyValPlan + Reduce { + /// The input collection. + input: Box, + /// A plan for changing input records into key, value pairs. + key_val_plan: KeyValPlan, + /// A plan for performing the reduce. + /// + /// The implementation of reduction has several different strategies based + /// on the properties of the reduction, and the input itself. + reduce_plan: ReducePlan, + }, + /// A multiway relational equijoin, with fused map, filter, and projection. + /// + /// This stage performs a multiway join among `inputs`, using the equality + /// constraints expressed in `plan`. The plan also describes the implementation + /// strategy we will use, and any pushed down per-record work. + Join { + /// An ordered list of inputs that will be joined. + inputs: Vec, + /// Detailed information about the implementation of the join. + /// + /// This includes information about the implementation strategy, but also + /// any map, filter, project work that we might follow the join with, but + /// potentially pushed down into the implementation of the join. + plan: JoinPlan, + }, + /// Adds the contents of the input collections. + /// + /// Importantly, this is *multiset* union, so the multiplicities of records will + /// add. This is in contrast to *set* union, where the multiplicities would be + /// capped at one. A set union can be formed with `Union` followed by `Reduce` + /// implementing the "distinct" operator. + Union { + /// The input collections + inputs: Vec, + /// Whether to consolidate the output, e.g., cancel negated records. + consolidate_output: bool, + }, +} diff --git a/src/flow/src/plan/join.rs b/src/flow/src/plan/join.rs new file mode 100644 index 0000000000..13bb95f511 --- /dev/null +++ b/src/flow/src/plan/join.rs @@ -0,0 +1,78 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +use crate::expr::ScalarExpr; +use crate::plan::SafeMfpPlan; + +/// TODO(discord9): consider impl more join strategies +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] +pub enum JoinPlan { + Linear(LinearJoinPlan), +} + +/// Determine if a given row should stay in the output. And apply a map filter project before output the row +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] +pub struct JoinFilter { + /// each element in the outer vector will check if each expr in itself can be eval to same value + /// if not, the row will be filtered out. Useful for equi-join(join based on equality of some columns) + pub ready_equivalences: Vec>, + /// Apply a map filter project before output the row + pub before: SafeMfpPlan, +} + +/// A plan for the execution of a linear join. +/// +/// A linear join is a sequence of stages, each of which introduces +/// a new collection. Each stage is represented by a [LinearStagePlan]. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] +pub struct LinearJoinPlan { + /// The source relation from which we start the join. + pub source_relation: usize, + /// The arrangement to use for the source relation, if any + pub source_key: Option>, + /// An initial closure to apply before any stages. + /// + /// Values of `None` indicate the identity closure. + pub initial_closure: Option, + /// A *sequence* of stages to apply one after the other. + pub stage_plans: Vec, + /// A concluding filter to apply after the last stage. + /// + /// Values of `None` indicate the identity closure. + pub final_closure: Option, +} + +/// A plan for the execution of one stage of a linear join. +/// +/// Each stage is a binary join between the current accumulated +/// join results, and a new collection. The former is referred to +/// as the "stream" and the latter the "lookup". +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] +pub struct LinearStagePlan { + /// The index of the relation into which we will look up. + pub lookup_relation: usize, + /// The key expressions to use for the stream relation. + pub stream_key: Vec, + /// Columns to retain from the stream relation. + /// These columns are those that are not redundant with `stream_key`, + /// and cannot be read out of the key component of an arrangement. + pub stream_thinning: Vec, + /// The key expressions to use for the lookup relation. + pub lookup_key: Vec, + /// The closure to apply to the concatenation of the key columns, + /// the stream value columns, and the lookup value colunms. + pub closure: JoinFilter, +} diff --git a/src/flow/src/plan/reduce.rs b/src/flow/src/plan/reduce.rs new file mode 100644 index 0000000000..52dd3a509d --- /dev/null +++ b/src/flow/src/plan/reduce.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +use crate::expr::{AggregateExpr, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr}; + +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] +pub struct KeyValPlan { + pub key_plan: SafeMfpPlan, + pub val_plan: SafeMfpPlan, +} + +/// TODO(discord9): def&impl of Hierarchical aggregates(for min/max with support to deletion) and +/// basic aggregates(for other aggregate functions) and mixed aggregate +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] +pub enum ReducePlan { + /// Plan for not computing any aggregations, just determining the set of + /// distinct keys. + Distinct, + /// Plan for computing only accumulable aggregations. + /// Including simple functions like `sum`, `count`, `min/max`(without deletion) + Accumulable(AccumulablePlan), +} + +/// Accumulable plan for the execution of a reduction. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] +pub struct AccumulablePlan { + /// All of the aggregations we were asked to compute, stored + /// in order. + pub full_aggrs: Vec, + /// All of the non-distinct accumulable aggregates. + /// Each element represents: + /// (index of aggr output, index of value among inputs, aggr expr) + /// These will all be rendered together in one dataflow fragment. + pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>, + /// Same as above but for all of the `DISTINCT` accumulable aggregations. + pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>, +}