diff --git a/src/flow/src/compute/plan/join/delta_join.rs b/src/flow/src/compute/plan/join/delta_join.rs new file mode 100644 index 0000000000..cf671827ff --- /dev/null +++ b/src/flow/src/compute/plan/join/delta_join.rs @@ -0,0 +1,10 @@ +use serde::{Deserialize, Serialize}; + +/// A delta query is implemented by a set of paths, one for each input. +/// +/// Each delta query path responds to its input changes by repeated lookups +/// in arrangements for other join inputs. These lookups require specific +/// instructions about which expressions to use as keys. Along the way, +/// various closures are applied to filter and project as early as possible. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct DeltaJoinPlan {} diff --git a/src/flow/src/compute/plan/join/linear_join.rs b/src/flow/src/compute/plan/join/linear_join.rs new file mode 100644 index 0000000000..6af015e0ed --- /dev/null +++ b/src/flow/src/compute/plan/join/linear_join.rs @@ -0,0 +1,9 @@ +use serde::{Deserialize, Serialize}; + +/// TODO: impl Join +/// A plan for the execution of a linear join. +/// +/// A linear join is a sequence of stages, each of which introduces +/// a new collection. Each stage is represented by a [LinearStagePlan]. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct LinearJoinPlan {} diff --git a/src/flow/src/compute/plan/join/mod.rs b/src/flow/src/compute/plan/join/mod.rs new file mode 100644 index 0000000000..dc064bf584 --- /dev/null +++ b/src/flow/src/compute/plan/join/mod.rs @@ -0,0 +1,15 @@ +use serde::{Deserialize, Serialize}; +mod delta_join; +mod linear_join; +pub use delta_join::DeltaJoinPlan; +pub use linear_join::LinearJoinPlan; + +/// TODO(discord9): impl Join +/// A complete enumeration of possible join plans to render. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum JoinPlan { + /// A join implemented by a linear join. + Linear(LinearJoinPlan), + /// A join implemented by a delta join. + Delta(DeltaJoinPlan), +} diff --git a/src/flow/src/compute/plan/mod.rs b/src/flow/src/compute/plan/mod.rs index 80ba77f64b..d0bdd1b192 100644 --- a/src/flow/src/compute/plan/mod.rs +++ b/src/flow/src/compute/plan/mod.rs @@ -1,9 +1,14 @@ +mod join; +mod reduce; + use std::collections::BTreeMap; +use join::JoinPlan; +use reduce::{KeyValPlan, ReducePlan}; use serde::{Deserialize, Serialize}; -use crate::expr::{Id, ScalarExpr}; -use crate::repr::{self, Row}; +use crate::expr::{Id, LocalId, MapFilterProject, ScalarExpr, TableFunc}; +use crate::repr::{self, Diff, Row}; use crate::storage::errors::EvalError; /// The forms in which an operator's output is available; @@ -77,7 +82,110 @@ impl AvailableCollections { } } -pub enum Plan { - Constant { rows: Result, EvalError> }, - Get { id: Id, keys: AvailableCollections }, +/// Rendering Plan +/// +/// TODO: see if we ever need to support recursive plans +pub enum Plan { + /// A collection containing a pre-determined collection. + Constant { + rows: Result, EvalError>, + }, + /// A reference to a bound collection. + /// + /// This is commonly either an external reference to an existing source or + /// maintained arrangement, or an internal reference to a `Let` identifier. + Get { + id: Id, + keys: AvailableCollections, + plan: GetPlan, + }, + /// Binds `value` to `id`, and then results in `body` with that binding. + /// + /// This stage has the effect of sharing `value` across multiple possible + /// uses in `body`, and is the only mechanism we have for sharing collection + /// information across parts of a dataflow. + /// + /// The binding is not available outside of `body`. + Let { + /// The local identifier to be used, available to `body` as `Id::Local(id)`. + id: LocalId, + /// The collection that should be bound to `id`. + value: Box>, + /// The collection that results, which is allowed to contain `Get` stages + /// that reference `Id::Local(id)`. + body: Box>, + }, + /// Map, Filter, and Project operators. + /// + /// This stage contains work that we would ideally like to fuse to other plan + /// stages, but for practical reasons cannot. For example: reduce, threshold, + /// and topk stages are not able to absorb this operator. + Mfp { + /// The input collection. + input: Box>, + /// Linear operator to apply to each record. + mfp: MapFilterProject, + /// Whether the input is from an arrangement, and if so, + /// whether we can seek to a specific value therein + input_key_val: Option<(Vec, Option)>, + }, + /// A variable number of output records for each input record. + /// + /// This stage is a bit of a catch-all for logic that does not easily fit in + /// map stages. This includes table valued functions, but also functions of + /// multiple arguments, and functions that modify the sign of updates. + /// + /// This stage allows a `MapFilterProject` operator to be fused to its output, + /// and this can be very important as otherwise the output of `func` is just + /// appended to the input record, for as many outputs as it has. This has the + /// unpleasant default behavior of repeating potentially large records that + /// are being unpacked, producing quadratic output in those cases. Instead, + /// in these cases use a `mfp` member that projects away these large fields. + FlatMap { + /// The input collection. + input: Box>, + /// The variable-record emitting function. + func: TableFunc, + /// Expressions that for each row prepare the arguments to `func`. + exprs: Vec, + /// Linear operator to apply to each record produced by `func`. + mfp: MapFilterProject, + /// The particular arrangement of the input we expect to use, + /// if any + input_key: Option>, + }, + /// A multiway relational equijoin, with fused map, filter, and projection. + /// + /// This stage performs a multiway join among `inputs`, using the equality + /// constraints expressed in `plan`. The plan also describes the implementation + /// strategy we will use, and any pushed down per-record work. + Join { + /// An ordered list of inputs that will be joined. + inputs: Vec>, + /// Detailed information about the implementation of the join. + /// + /// This includes information about the implementation strategy, but also + /// any map, filter, project work that we might follow the join with, but + /// potentially pushed down into the implementation of the join. + plan: JoinPlan, + }, + /// Aggregation by key. + Reduce { + /// The input collection. + input: Box>, + /// A plan for changing input records into key, value pairs. + key_val_plan: KeyValPlan, + /// A plan for performing the reduce. + /// + /// The implementation of reduction has several different strategies based + /// on the properties of the reduction, and the input itself. Please check + /// out the documentation for this type for more detail. + plan: ReducePlan, + /// The particular arrangement of the input we expect to use, + /// if any + input_key: Option>, + }, } + +/// TODO: impl GetPlan +pub enum GetPlan {} diff --git a/src/flow/src/compute/plan/reduce.rs b/src/flow/src/compute/plan/reduce.rs new file mode 100644 index 0000000000..9600c8e905 --- /dev/null +++ b/src/flow/src/compute/plan/reduce.rs @@ -0,0 +1,76 @@ +use serde::{Deserialize, Serialize}; + +use crate::expr::{AggregateExpr, MapFilterProject}; + +/// Plan for extracting keys and values in preparation for a reduction. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct KeyValPlan { + /// Extracts the columns used as the key. + pub key_plan: MapFilterProject, + /// Extracts the columns used to feed the aggregations. + pub val_plan: MapFilterProject, +} + +/// TODO(discord9): Reduce Plan +/// A `ReducePlan` provides a concise description for how we will +/// execute a given reduce expression. +/// +/// The provided reduce expression can have no +/// aggregations, in which case its just a `Distinct` and otherwise +/// it's composed of a combination of accumulable, hierarchical and +/// basic aggregations. +/// +/// We want to try to centralize as much decision making about the +/// shape / general computation of the rendered dataflow graph +/// in this plan, and then make actually rendering the graph +/// be as simple (and compiler verifiable) as possible. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum ReducePlan { + /// Plan for not computing any aggregations, just determining the set of + /// distinct keys. + Distinct, + /// Plan for computing only accumulable aggregations. + Accumulable(AccumulablePlan), + /// Plan for computing only hierarchical aggregations. + Hierarchical(HierarchicalPlan), + /// Plan for computing only basic aggregations. + Basic(BasicPlan), + /// Plan for computing a mix of different kinds of aggregations. + /// We need to do extra work here to reassemble results back in the + /// requested order. + Collation(CollationPlan), +} + +/// Plan for computing a set of accumulable aggregations. +/// +/// We fuse all of the accumulable aggregations together +/// and compute them with one dataflow fragment. We need to +/// be careful to separate out the aggregations that +/// apply only to the distinct set of values. We need +/// to apply a distinct operator to those before we +/// combine them with everything else. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct AccumulablePlan { + /// All of the aggregations we were asked to compute, stored + /// in order. + pub full_aggrs: Vec, + /// All of the non-distinct accumulable aggregates. + /// Each element represents: + /// (index of the aggregation among accumulable aggregations, + /// index of the datum among inputs, aggregation expr) + /// These will all be rendered together in one dataflow fragment. + pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>, + /// Same as above but for all of the `DISTINCT` accumulable aggregations. + pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>, +} + +// TODO: others + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum HierarchicalPlan {} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum BasicPlan {} + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct CollationPlan {} diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 2324a5ee43..c985741e9b 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -1,9 +1,12 @@ -use crate::repr::Row; +use serde::{Deserialize, Serialize}; +// TODO: more function & eval +use crate::repr::Row; /// Stateless functions #[derive(Debug, Clone)] pub enum Func { BuiltIn(BuiltInFunc), + /// still a strict Row-to-Row function Custom(fn(Row) -> Row), } @@ -14,3 +17,10 @@ pub enum BuiltInFunc { IsTrue, IsFalse, } + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum AggregateFunc { + Count, + Any, + All, +} diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs new file mode 100644 index 0000000000..1486ec7fc2 --- /dev/null +++ b/src/flow/src/expr/linear.rs @@ -0,0 +1,44 @@ +use serde::{Deserialize, Serialize}; + +use crate::expr::{Id, LocalId, ScalarExpr}; + +/// A compound operator that can be applied row-by-row. +/// +/// This operator integrates the map, filter, and project operators. +/// It applies a sequences of map expressions, which are allowed to +/// refer to previous expressions, interleaved with predicates which +/// must be satisfied for an output to be produced. If all predicates +/// evaluate to `Datum::True` the data at the identified columns are +/// collected and produced as output in a packed `Row`. +/// +/// This operator is a "builder" and its contents may contain expressions +/// that are not yet executable. For example, it may contain temporal +/// expressions in `self.expressions`, even though this is not something +/// we can directly evaluate. The plan creation methods will defensively +/// ensure that the right thing happens. +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct MapFilterProject { + /// A sequence of expressions that should be appended to the row. + /// + /// Many of these expressions may not be produced in the output, + /// and may only be present as common subexpressions. + pub expressions: Vec, + /// Expressions that must evaluate to `Datum::True` for the output + /// row to be produced. + /// + /// Each entry is prepended with a column identifier indicating + /// the column *before* which the predicate should first be applied. + /// Most commonly this would be one plus the largest column identifier + /// in the predicate's support, but it could be larger to implement + /// guarded evaluation of predicates. + /// + /// This list should be sorted by the first field. + pub predicates: Vec<(usize, ScalarExpr)>, + /// A sequence of column identifiers whose data form the output row. + pub projection: Vec, + /// The expected number of input columns. + /// + /// This is needed to ensure correct identification of newly formed + /// columns in the output. + pub input_arity: usize, +} diff --git a/src/flow/src/expr/mod.rs b/src/flow/src/expr/mod.rs index fd051cc458..85b21753b1 100644 --- a/src/flow/src/expr/mod.rs +++ b/src/flow/src/expr/mod.rs @@ -2,10 +2,14 @@ mod func; mod id; +mod linear; +mod relation; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; pub use id::{GlobalId, Id, LocalId}; +pub use linear::MapFilterProject; +pub use relation::{AggregateExpr, TableFunc}; use serde::{Deserialize, Serialize}; use crate::storage::errors::DataflowError; diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/flow/src/expr/relation/mod.rs b/src/flow/src/expr/relation/mod.rs new file mode 100644 index 0000000000..0dc8cb4183 --- /dev/null +++ b/src/flow/src/expr/relation/mod.rs @@ -0,0 +1,20 @@ +use serde::{Deserialize, Serialize}; + +use crate::expr::func::AggregateFunc; +use crate::expr::ScalarExpr; + +/// function that might emit multiple output record for one input row +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] +pub enum TableFunc {} + +/// Describes an aggregation expression. +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct AggregateExpr { + /// Names the aggregation function. + pub func: AggregateFunc, + /// An expression which extracts from each row the input to `func`. + pub expr: ScalarExpr, + /// Should the aggregation be applied only to distinct results in each group. + #[serde(default)] + pub distinct: bool, +}