working on reduce

This commit is contained in:
Discord9
2023-07-20 18:11:02 +08:00
parent 47f41371d0
commit 824d03a642
10 changed files with 302 additions and 6 deletions

View File

@@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};
/// A delta query is implemented by a set of paths, one for each input.
///
/// Each delta query path responds to its input changes by repeated lookups
/// in arrangements for other join inputs. These lookups require specific
/// instructions about which expressions to use as keys. Along the way,
/// various closures are applied to filter and project as early as possible.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct DeltaJoinPlan {}

View File

@@ -0,0 +1,9 @@
use serde::{Deserialize, Serialize};
/// TODO: impl Join
/// A plan for the execution of a linear join.
///
/// A linear join is a sequence of stages, each of which introduces
/// a new collection. Each stage is represented by a [LinearStagePlan].
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct LinearJoinPlan {}

View File

@@ -0,0 +1,15 @@
use serde::{Deserialize, Serialize};
mod delta_join;
mod linear_join;
pub use delta_join::DeltaJoinPlan;
pub use linear_join::LinearJoinPlan;
/// TODO(discord9): impl Join
/// A complete enumeration of possible join plans to render.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum JoinPlan {
/// A join implemented by a linear join.
Linear(LinearJoinPlan),
/// A join implemented by a delta join.
Delta(DeltaJoinPlan),
}

View File

@@ -1,9 +1,14 @@
mod join;
mod reduce;
use std::collections::BTreeMap;
use join::JoinPlan;
use reduce::{KeyValPlan, ReducePlan};
use serde::{Deserialize, Serialize};
use crate::expr::{Id, ScalarExpr};
use crate::repr::{self, Row};
use crate::expr::{Id, LocalId, MapFilterProject, ScalarExpr, TableFunc};
use crate::repr::{self, Diff, Row};
use crate::storage::errors::EvalError;
/// The forms in which an operator's output is available;
@@ -77,7 +82,110 @@ impl AvailableCollections {
}
}
pub enum Plan {
Constant { rows: Result<Vec<Row>, EvalError> },
Get { id: Id, keys: AvailableCollections },
/// Rendering Plan
///
/// TODO: see if we ever need to support recursive plans
pub enum Plan<T = repr::Timestamp> {
/// A collection containing a pre-determined collection.
Constant {
rows: Result<Vec<(Row, T, Diff)>, EvalError>,
},
/// A reference to a bound collection.
///
/// This is commonly either an external reference to an existing source or
/// maintained arrangement, or an internal reference to a `Let` identifier.
Get {
id: Id,
keys: AvailableCollections,
plan: GetPlan,
},
/// Binds `value` to `id`, and then results in `body` with that binding.
///
/// This stage has the effect of sharing `value` across multiple possible
/// uses in `body`, and is the only mechanism we have for sharing collection
/// information across parts of a dataflow.
///
/// The binding is not available outside of `body`.
Let {
/// The local identifier to be used, available to `body` as `Id::Local(id)`.
id: LocalId,
/// The collection that should be bound to `id`.
value: Box<Plan<T>>,
/// The collection that results, which is allowed to contain `Get` stages
/// that reference `Id::Local(id)`.
body: Box<Plan<T>>,
},
/// Map, Filter, and Project operators.
///
/// This stage contains work that we would ideally like to fuse to other plan
/// stages, but for practical reasons cannot. For example: reduce, threshold,
/// and topk stages are not able to absorb this operator.
Mfp {
/// The input collection.
input: Box<Plan<T>>,
/// Linear operator to apply to each record.
mfp: MapFilterProject,
/// Whether the input is from an arrangement, and if so,
/// whether we can seek to a specific value therein
input_key_val: Option<(Vec<ScalarExpr>, Option<Row>)>,
},
/// A variable number of output records for each input record.
///
/// This stage is a bit of a catch-all for logic that does not easily fit in
/// map stages. This includes table valued functions, but also functions of
/// multiple arguments, and functions that modify the sign of updates.
///
/// This stage allows a `MapFilterProject` operator to be fused to its output,
/// and this can be very important as otherwise the output of `func` is just
/// appended to the input record, for as many outputs as it has. This has the
/// unpleasant default behavior of repeating potentially large records that
/// are being unpacked, producing quadratic output in those cases. Instead,
/// in these cases use a `mfp` member that projects away these large fields.
FlatMap {
/// The input collection.
input: Box<Plan<T>>,
/// The variable-record emitting function.
func: TableFunc,
/// Expressions that for each row prepare the arguments to `func`.
exprs: Vec<ScalarExpr>,
/// Linear operator to apply to each record produced by `func`.
mfp: MapFilterProject,
/// The particular arrangement of the input we expect to use,
/// if any
input_key: Option<Vec<ScalarExpr>>,
},
/// A multiway relational equijoin, with fused map, filter, and projection.
///
/// This stage performs a multiway join among `inputs`, using the equality
/// constraints expressed in `plan`. The plan also describes the implementation
/// strategy we will use, and any pushed down per-record work.
Join {
/// An ordered list of inputs that will be joined.
inputs: Vec<Plan<T>>,
/// Detailed information about the implementation of the join.
///
/// This includes information about the implementation strategy, but also
/// any map, filter, project work that we might follow the join with, but
/// potentially pushed down into the implementation of the join.
plan: JoinPlan,
},
/// Aggregation by key.
Reduce {
/// The input collection.
input: Box<Plan<T>>,
/// A plan for changing input records into key, value pairs.
key_val_plan: KeyValPlan,
/// A plan for performing the reduce.
///
/// The implementation of reduction has several different strategies based
/// on the properties of the reduction, and the input itself. Please check
/// out the documentation for this type for more detail.
plan: ReducePlan,
/// The particular arrangement of the input we expect to use,
/// if any
input_key: Option<Vec<ScalarExpr>>,
},
}
/// TODO: impl GetPlan
pub enum GetPlan {}

View File

@@ -0,0 +1,76 @@
use serde::{Deserialize, Serialize};
use crate::expr::{AggregateExpr, MapFilterProject};
/// Plan for extracting keys and values in preparation for a reduction.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct KeyValPlan {
/// Extracts the columns used as the key.
pub key_plan: MapFilterProject,
/// Extracts the columns used to feed the aggregations.
pub val_plan: MapFilterProject,
}
/// TODO(discord9): Reduce Plan
/// A `ReducePlan` provides a concise description for how we will
/// execute a given reduce expression.
///
/// The provided reduce expression can have no
/// aggregations, in which case its just a `Distinct` and otherwise
/// it's composed of a combination of accumulable, hierarchical and
/// basic aggregations.
///
/// We want to try to centralize as much decision making about the
/// shape / general computation of the rendered dataflow graph
/// in this plan, and then make actually rendering the graph
/// be as simple (and compiler verifiable) as possible.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum ReducePlan {
/// Plan for not computing any aggregations, just determining the set of
/// distinct keys.
Distinct,
/// Plan for computing only accumulable aggregations.
Accumulable(AccumulablePlan),
/// Plan for computing only hierarchical aggregations.
Hierarchical(HierarchicalPlan),
/// Plan for computing only basic aggregations.
Basic(BasicPlan),
/// Plan for computing a mix of different kinds of aggregations.
/// We need to do extra work here to reassemble results back in the
/// requested order.
Collation(CollationPlan),
}
/// Plan for computing a set of accumulable aggregations.
///
/// We fuse all of the accumulable aggregations together
/// and compute them with one dataflow fragment. We need to
/// be careful to separate out the aggregations that
/// apply only to the distinct set of values. We need
/// to apply a distinct operator to those before we
/// combine them with everything else.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct AccumulablePlan {
/// All of the aggregations we were asked to compute, stored
/// in order.
pub full_aggrs: Vec<AggregateExpr>,
/// All of the non-distinct accumulable aggregates.
/// Each element represents:
/// (index of the aggregation among accumulable aggregations,
/// index of the datum among inputs, aggregation expr)
/// These will all be rendered together in one dataflow fragment.
pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
/// Same as above but for all of the `DISTINCT` accumulable aggregations.
pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
}
// TODO: others
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum HierarchicalPlan {}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum BasicPlan {}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct CollationPlan {}

View File

@@ -1,9 +1,12 @@
use crate::repr::Row;
use serde::{Deserialize, Serialize};
// TODO: more function & eval
use crate::repr::Row;
/// Stateless functions
#[derive(Debug, Clone)]
pub enum Func {
BuiltIn(BuiltInFunc),
/// still a strict Row-to-Row function
Custom(fn(Row) -> Row),
}
@@ -14,3 +17,10 @@ pub enum BuiltInFunc {
IsTrue,
IsFalse,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum AggregateFunc {
Count,
Any,
All,
}

View File

@@ -0,0 +1,44 @@
use serde::{Deserialize, Serialize};
use crate::expr::{Id, LocalId, ScalarExpr};
/// A compound operator that can be applied row-by-row.
///
/// This operator integrates the map, filter, and project operators.
/// It applies a sequences of map expressions, which are allowed to
/// refer to previous expressions, interleaved with predicates which
/// must be satisfied for an output to be produced. If all predicates
/// evaluate to `Datum::True` the data at the identified columns are
/// collected and produced as output in a packed `Row`.
///
/// This operator is a "builder" and its contents may contain expressions
/// that are not yet executable. For example, it may contain temporal
/// expressions in `self.expressions`, even though this is not something
/// we can directly evaluate. The plan creation methods will defensively
/// ensure that the right thing happens.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct MapFilterProject {
/// A sequence of expressions that should be appended to the row.
///
/// Many of these expressions may not be produced in the output,
/// and may only be present as common subexpressions.
pub expressions: Vec<ScalarExpr>,
/// Expressions that must evaluate to `Datum::True` for the output
/// row to be produced.
///
/// Each entry is prepended with a column identifier indicating
/// the column *before* which the predicate should first be applied.
/// Most commonly this would be one plus the largest column identifier
/// in the predicate's support, but it could be larger to implement
/// guarded evaluation of predicates.
///
/// This list should be sorted by the first field.
pub predicates: Vec<(usize, ScalarExpr)>,
/// A sequence of column identifiers whose data form the output row.
pub projection: Vec<usize>,
/// The expected number of input columns.
///
/// This is needed to ensure correct identification of newly formed
/// columns in the output.
pub input_arity: usize,
}

View File

@@ -2,10 +2,14 @@
mod func;
mod id;
mod linear;
mod relation;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
pub use id::{GlobalId, Id, LocalId};
pub use linear::MapFilterProject;
pub use relation::{AggregateExpr, TableFunc};
use serde::{Deserialize, Serialize};
use crate::storage::errors::DataflowError;

View File

View File

@@ -0,0 +1,20 @@
use serde::{Deserialize, Serialize};
use crate::expr::func::AggregateFunc;
use crate::expr::ScalarExpr;
/// function that might emit multiple output record for one input row
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub enum TableFunc {}
/// Describes an aggregation expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct AggregateExpr {
/// Names the aggregation function.
pub func: AggregateFunc,
/// An expression which extracts from each row the input to `func`.
pub expr: ScalarExpr,
/// Should the aggregation be applied only to distinct results in each group.
#[serde(default)]
pub distinct: bool,
}