Compare commits

...

16 Commits

Author SHA1 Message Date
Discord9
f995204060 test: more reduce tests 2023-09-06 16:38:51 +08:00
Discord9
93561291e4 support more binary function 2023-09-06 16:38:51 +08:00
Discord9
9f59d68391 eval func 2023-09-06 16:37:49 +08:00
Discord9
51083b12bd reduce_bucketed 2023-09-06 16:37:49 +08:00
Discord9
c80165c377 test: simple render 2023-09-06 16:37:49 +08:00
Discord9
76d8709774 sink&source 2023-09-06 16:37:49 +08:00
Discord9
2cf7d6d569 feat: build_accumulable 2023-09-06 16:37:49 +08:00
Discord9
045c8079e6 feat: flow util func 2023-09-06 16:37:49 +08:00
Discord9
54f2f6495f mfp & reduce partially 2023-09-06 16:37:49 +08:00
Discord9
2798d266f5 feat: render plan partially writen 2023-09-06 16:37:49 +08:00
Discord9
824d03a642 working on reduce 2023-09-06 16:36:41 +08:00
Discord9
47f41371d0 Arrangement&types 2023-09-06 16:36:41 +08:00
Discord9
d702b6e5c4 use newer DD 2023-09-06 16:36:41 +08:00
Discord9
13c02f3f92 basic skeleton 2023-09-06 16:36:41 +08:00
Discord9
b52eb2313e renamed as greptime-flow 2023-09-06 16:36:41 +08:00
Discord9
d422bc8401 basic demo 2023-09-06 16:36:41 +08:00
37 changed files with 5250 additions and 0 deletions

116
Cargo.lock generated
View File

@@ -8,6 +8,23 @@ version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "abomonation"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e72913c99b1f927aa7bd59a41518fdd9995f63ffc8760f211609e0241c4fb2"
[[package]]
name = "abomonation_derive"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e50e2a046af56a864c62d97b7153fda72c596e646be1b0c7963736821f6e1efa"
dependencies = [
"proc-macro2",
"quote",
"synstructure",
]
[[package]] [[package]]
name = "addr2line" name = "addr2line"
version = "0.20.0" version = "0.20.0"
@@ -1627,6 +1644,14 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "columnation"
version = "0.1.0"
source = "git+https://github.com/frankmcsherry/columnation#eb8e20c10e748dcbfe6266be8e24e14422d3de0f"
dependencies = [
"paste",
]
[[package]] [[package]]
name = "comfy-table" name = "comfy-table"
version = "7.0.1" version = "7.0.1"
@@ -2832,6 +2857,19 @@ version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
[[package]]
name = "differential-dataflow"
version = "0.12.0"
source = "git+https://github.com/TimelyDataflow/differential-dataflow#2b9ac68aab9a1bf3fc3e4c12fcabea9c9d1ecc6a"
dependencies = [
"abomonation",
"abomonation_derive",
"fnv",
"serde",
"serde_derive",
"timely",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.7" version = "0.10.7"
@@ -3250,6 +3288,19 @@ dependencies = [
"spin 0.9.8", "spin 0.9.8",
] ]
[[package]]
name = "flow"
version = "0.1.0"
dependencies = [
"common-telemetry",
"datafusion-expr",
"datafusion-substrait",
"datatypes",
"differential-dataflow",
"serde",
"timely",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@@ -9791,6 +9842,18 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "synstructure"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"unicode-xid",
]
[[package]] [[package]]
name = "system-configuration" name = "system-configuration"
version = "0.5.1" version = "0.5.1"
@@ -10169,6 +10232,59 @@ dependencies = [
"time-core", "time-core",
] ]
[[package]]
name = "timely"
version = "0.12.0"
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
dependencies = [
"abomonation",
"abomonation_derive",
"crossbeam-channel",
"futures-util",
"getopts",
"serde",
"serde_derive",
"timely_bytes",
"timely_communication",
"timely_container",
"timely_logging",
]
[[package]]
name = "timely_bytes"
version = "0.12.0"
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
[[package]]
name = "timely_communication"
version = "0.12.0"
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
dependencies = [
"abomonation",
"abomonation_derive",
"bincode",
"crossbeam-channel",
"getopts",
"serde",
"serde_derive",
"timely_bytes",
"timely_logging",
]
[[package]]
name = "timely_container"
version = "0.12.0"
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
dependencies = [
"columnation",
"serde",
]
[[package]]
name = "timely_logging"
version = "0.12.0"
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
[[package]] [[package]]
name = "timsort" name = "timsort"
version = "0.1.2" version = "0.1.2"

View File

@@ -46,6 +46,7 @@ members = [
"src/sql", "src/sql",
"src/storage", "src/storage",
"src/store-api", "src/store-api",
"src/flow",
"src/table", "src/table",
"src/table-procedure", "src/table-procedure",
"tests-integration", "tests-integration",

25
src/flow/Cargo.toml Normal file
View File

@@ -0,0 +1,25 @@
[package]
name = "flow"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# use version from crates.io for now to prevent version slewing
# disable default-features which include `abomonaion` which we don't need for IPC
# timely = {version = "0.12.0", default-features = false, features = ["bincode"]}
# differential-dataflow = "0.12.0"
# timely = "0.12.0"
# differential-dataflow = "0.12.0"
# TODO(discord9): fork later for fixed version git dependency
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = [
"bincode",
] }
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } #, rev = "99fa67db" }
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
serde = { version = "1.0", features = ["derive"] }
datatypes = { path = "../datatypes" }
common-telemetry = { path = "../common/telemetry" }

View File

@@ -0,0 +1,3 @@
//! for getting data from source and sending results to sink
//! and communicating with other parts of the database
//! also commands storage and computation layer

View File

@@ -0,0 +1,22 @@
use std::collections::BTreeMap;
use crate::expr::GlobalId;
/// Worker-local state that is maintained across dataflows.
///
/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
/// done between data ingress and egress.
pub struct ComputeState {
/// State kept for each installed compute collection.
///
/// Each collection has exactly one frontier.
/// How the frontier is communicated depends on the collection type:
/// * Frontiers of indexes are equal to the frontier of their corresponding traces in the
/// `TraceManager`.
/// * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
/// * Subscribes report their frontiers through the `subscribe_response_buffer`.
pub collections: BTreeMap<GlobalId, CollectionState>,
}
/// State maintained for a compute collection.
pub struct CollectionState {}

View File

@@ -0,0 +1,743 @@
use std::collections::BTreeMap;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::wrappers::enter::TraceEnter;
use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
use differential_dataflow::{Collection, Data};
use timely::communication::message::RefOrMut;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::operators::Capability;
use timely::dataflow::scopes::Child;
use timely::dataflow::{Scope, ScopeParent};
use timely::progress::timestamp::Refines;
use timely::progress::{Antichain, Timestamp};
use super::plan::Plan;
use super::types::DataflowDescription;
use crate::compute::render::RenderTimestamp;
use crate::compute::typedefs::{TraceErrHandle, TraceRowHandle};
use crate::expr::{GlobalId, Id, MapFilterProject, ScalarExpr};
use crate::repr;
use crate::repr::{Diff, Row};
use crate::storage::errors::DataflowError;
// Local type definition to avoid the horror in signatures.
pub(crate) type KeyArrangement<S, K, V> =
Arranged<S, TraceRowHandle<K, V, <S as ScopeParent>::Timestamp, Diff>>;
pub(crate) type Arrangement<S, V> = KeyArrangement<S, V, V>;
pub(crate) type ErrArrangement<S> =
Arranged<S, TraceErrHandle<DataflowError, <S as ScopeParent>::Timestamp, Diff>>;
pub(crate) type ArrangementImport<S, V, T> = Arranged<
S,
TraceEnter<TraceFrontier<TraceRowHandle<V, V, T, Diff>>, <S as ScopeParent>::Timestamp>,
>;
pub(crate) type ErrArrangementImport<S, T> = Arranged<
S,
TraceEnter<
TraceFrontier<TraceErrHandle<DataflowError, T, Diff>>,
<S as ScopeParent>::Timestamp,
>,
>;
/// Describes flavor of arrangement: local or imported trace.
#[derive(Clone)]
pub enum ArrangementFlavor<S: Scope, V: Data, T = repr::Timestamp>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// A dataflow-local arrangement.
Local(Arrangement<S, V>, ErrArrangement<S>),
/// An imported trace from outside the dataflow.
///
/// The `GlobalId` identifier exists so that exports of this same trace
/// can refer back to and depend on the original instance.
Trace(
GlobalId,
ArrangementImport<S, V, T>,
ErrArrangementImport<S, T>,
),
}
impl<S: Scope, T> ArrangementFlavor<S, Row, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// Presents `self` as a stream of updates.
///
/// This method presents the contents as they are, without further computation.
/// If you have logic that could be applied to each record, consider using the
/// `flat_map` methods which allows this and can reduce the work done.
pub fn as_collection(&self) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
match &self {
ArrangementFlavor::Local(oks, errs) => (
oks.as_collection(move |k: &Row, v: &Row| {
// type annotated because rust-analzyer can't infer the type due to being complex closures
// see https://github.com/rust-lang/rust-analyzer/issues/6338
let mut k = k.clone();
k.extend(v.clone().into_iter());
k
}),
errs.as_collection(|k, &()| k.clone()),
),
ArrangementFlavor::Trace(_, oks, errs) => (
oks.as_collection(move |k, v| {
let mut k = k.clone();
k.extend(v.clone().into_iter());
k
}),
errs.as_collection(|k, &()| k.clone()),
),
}
}
/// Constructs and applies logic to elements of `self` and returns the results.
///
/// `constructor` takes a permutation and produces the logic to apply on elements. The logic
/// conceptually receives `(&Row, &Row)` pairs in the form of a slice. Only after borrowing
/// the elements and applying the permutation the datums will be in the expected order.
///
/// If `key` is set, this is a promise that `logic` will produce no results on
/// records for which the key does not evaluate to the value. This is used to
/// leap directly to exactly those records.
pub fn flat_map<I, C, L>(
&self,
key: Option<Row>,
constructor: C,
) -> (
timely::dataflow::Stream<S, I::Item>,
Collection<S, DataflowError, Diff>,
)
where
I: IntoIterator,
I::Item: Data,
C: FnOnce() -> L,
L: for<'a, 'b> FnMut(&'a [&'b RefOrMut<'b, Row>], &'a S::Timestamp, &'a Diff) -> I
+ 'static,
{
// Set a number of tuples after which the operator should yield.
// This allows us to remain responsive even when enumerating a substantial
// arrangement, as well as provides time to accumulate our produced output.
let refuel = 1000000;
match &self {
ArrangementFlavor::Local(oks, errs) => {
let mut logic = constructor();
let oks = CollectionBundle::<S, Row, T>::flat_map_core(
oks,
key,
move |k, v, t, d| logic(&[&k, &v], t, d),
refuel,
);
let errs = errs.as_collection(|k, &()| k.clone());
(oks, errs)
}
ArrangementFlavor::Trace(_, oks, errs) => {
let mut logic = constructor();
let oks = CollectionBundle::<S, Row, T>::flat_map_core(
oks,
key,
move |k, v, t, d| logic(&[&k, &v], t, d),
refuel,
);
let errs = errs.as_collection(|k, &()| k.clone());
(oks, errs)
}
}
}
}
impl<S: Scope, V: Data, T> ArrangementFlavor<S, V, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
pub fn scope(&self) -> S {
match self {
ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
}
}
/// Brings the arrangement flavor into a region.
pub fn enter_region<'a>(
&self,
region: &Child<'a, S, S::Timestamp>,
) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, V, T> {
match self {
ArrangementFlavor::Local(oks, errs) => {
ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
}
ArrangementFlavor::Trace(gid, oks, errs) => {
ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
}
}
}
}
impl<'a, S: Scope, V: Data, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, V, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// Extracts the arrangement flavor from a region.
pub fn leave_region(&self) -> ArrangementFlavor<S, V, T> {
match self {
ArrangementFlavor::Local(oks, errs) => {
ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
}
ArrangementFlavor::Trace(gid, oks, errs) => {
ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
}
}
}
}
pub struct Context<S, V: Data, T = repr::Timestamp>
where
T: Timestamp + Lattice,
S: Scope,
S::Timestamp: Lattice + Refines<T>,
{
/// The scope within which all managed collections exist.
///
/// It is an error to add any collections not contained in this scope.
pub(crate) scope: S,
/// The debug name of the dataflow associated with this context.
pub debug_name: String,
/// The Timely ID of the dataflow associated with this context.
pub dataflow_id: usize,
/// Frontier before which updates should not be emitted.
///
/// We *must* apply it to sinks, to ensure correct outputs.
/// We *should* apply it to sources and imported traces, because it improves performance.
pub since_frontier: Antichain<T>,
/// Frontier after which updates should not be emitted.
/// Used to limit the amount of work done when appropriate.
pub until_frontier: Antichain<T>,
/// Bindings of identifiers to collections.
pub bindings: BTreeMap<Id, CollectionBundle<S, V, T>>,
}
impl<S: Scope, V: Data> Context<S, V>
where
S::Timestamp: Lattice + Refines<repr::Timestamp>,
{
/// TODO(discord9)" DataflowDesc & Plan & etc.
/// Creates a new empty Context from given dataflow
pub fn for_dataflow_in<Plan>(dataflow: &DataflowDescription<Plan, ()>, scope: S) -> Self {
let dataflow_id = scope.addr()[0];
let since_frontier = dataflow
.as_of
.clone()
.unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
// TODO(discord9)=: get since_frontier and until_frontier from dataflow_desc
Self {
scope,
debug_name: dataflow.debug_name.clone(),
dataflow_id,
since_frontier,
until_frontier: dataflow.until.clone(),
bindings: BTreeMap::new(),
}
}
}
impl<S: Scope, V: Data, T: Lattice> Context<S, V, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// Insert a collection bundle by an identifier.
///
/// This is expected to be used to install external collections (sources, indexes, other views),
/// as well as for `Let` bindings of local collections.
pub fn insert_id(
&mut self,
id: Id,
collection: CollectionBundle<S, V, T>,
) -> Option<CollectionBundle<S, V, T>> {
self.bindings.insert(id, collection)
}
/// Remove a collection bundle by an identifier.
///
/// The primary use of this method is uninstalling `Let` bindings.
pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, V, T>> {
self.bindings.remove(&id)
}
/// Melds a collection bundle to whatever exists.
#[allow(clippy::map_entry)]
pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, V, T>) {
if !self.bindings.contains_key(&id) {
self.bindings.insert(id, collection);
} else {
let binding = self
.bindings
.get_mut(&id)
.expect("Binding verified to exist");
if collection.collection.is_some() {
binding.collection = collection.collection;
}
for (key, flavor) in collection.arranged.into_iter() {
binding.arranged.insert(key, flavor);
}
}
}
/// Look up a collection bundle by an identifier.
pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, V, T>> {
self.bindings.get(&id).cloned()
}
}
type ResultCollection<S, V> = (Collection<S, V, Diff>, Collection<S, DataflowError, Diff>);
/// A bundle of the various ways a collection can be represented.
///
/// This type maintains the invariant that it does contain at least one valid
/// source of data, either a collection or at least one arrangement.
#[derive(Clone)]
pub struct CollectionBundle<S, V, T = repr::Timestamp>
where
T: Timestamp + Lattice,
S: Scope,
S::Timestamp: Lattice + Refines<T>,
V: Data,
{
pub(crate) collection: Option<ResultCollection<S, V>>,
/// TODO(discord9): impl: 1. ScalarExpr(Could be from substrait), 2. Arrangement
pub(crate) arranged: BTreeMap<Vec<ScalarExpr>, ArrangementFlavor<S, V, T>>,
}
impl<S: Scope, V: Data, T: Lattice> CollectionBundle<S, V, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// Construct a new collection bundle from update streams.
pub fn from_collections(
oks: Collection<S, V, Diff>,
errs: Collection<S, DataflowError, Diff>,
) -> Self {
Self {
collection: Some((oks, errs)),
arranged: BTreeMap::default(),
}
}
/// Inserts arrangements by the expressions on which they are keyed.
pub fn from_expressions(
exprs: Vec<ScalarExpr>,
arrangements: ArrangementFlavor<S, V, T>,
) -> Self {
let mut arranged = BTreeMap::new();
arranged.insert(exprs, arrangements);
Self {
collection: None,
arranged,
}
}
/// Inserts arrangements by the columns on which they are keyed.
pub fn from_columns<I: IntoIterator<Item = usize>>(
columns: I,
arrangements: ArrangementFlavor<S, V, T>,
) -> Self {
let mut keys = Vec::new();
for column in columns {
keys.push(ScalarExpr::Column(column));
}
Self::from_expressions(keys, arrangements)
}
/// The scope containing the collection bundle.
pub fn scope(&self) -> S {
if let Some((oks, _errs)) = &self.collection {
oks.inner.scope()
} else {
self.arranged
.values()
.next()
.expect("Must contain a valid collection")
.scope()
}
}
/// Brings the collection bundle into a region.
pub fn enter_region<'a>(
&self,
region: &Child<'a, S, S::Timestamp>,
) -> CollectionBundle<Child<'a, S, S::Timestamp>, V, T> {
CollectionBundle {
collection: self
.collection
.as_ref()
.map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
arranged: self
.arranged
.iter()
.map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
.collect(),
}
}
}
impl<S, T> CollectionBundle<S, repr::Row, T>
where
T: timely::progress::Timestamp + Lattice,
S: Scope,
S::Timestamp: Refines<T> + Lattice + RenderTimestamp,
{
/// Presents `self` as a stream of updates, having been subjected to `mfp`.
///
/// This operator is able to apply the logic of `mfp` early, which can substantially
/// reduce the amount of data produced when `mfp` is non-trivial.
///
/// The `key_val` argument, when present, indicates that a specific arrangement should
/// be used, and if, in addition, the `val` component is present,
/// that we can seek to the supplied row.
pub fn as_collection_core(
&self,
mut mfp: MapFilterProject,
key_val: Option<(Vec<ScalarExpr>, Option<Row>)>,
until: Antichain<repr::Timestamp>,
) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
mfp.optimize();
let mfp_plan = mfp.into_plan().unwrap();
// If the MFP is trivial, we can just call `as_collection`.
// In the case that we weren't going to apply the `key_val` optimization,
// this path results in a slightly smaller and faster
// dataflow graph, and is intended to fix
let has_key_val = matches!(&key_val, Some((_key, Some(_val))));
if mfp_plan.is_identity() && !has_key_val {
let key = key_val.map(|(k, _v)| k);
return self.as_specific_collection(key.as_deref());
}
let (stream, errors) = self.flat_map(key_val, || {
let until = std::rc::Rc::new(until);
// this logic get executed every time a new row arrives
move |row_parts, time, diff| {
let until = std::rc::Rc::clone(&until);
let row_iters = row_parts
.iter()
.flat_map(|row| (**row).to_owned().into_iter());
let mut datums_local = Vec::new();
datums_local.extend(row_iters);
let time = time.clone();
let event_time: repr::Timestamp = *time.clone().event_time();
mfp_plan
.evaluate::<DataflowError, _>(
&mut datums_local,
event_time,
*diff,
move |time| !until.less_equal(time),
)
.map(move |x| match x {
Ok((row, event_time, diff)) => {
// Copy the whole time, and re-populate event time.
let mut time: S::Timestamp = time.clone();
*time.event_time() = event_time;
Ok((row, time, diff))
}
Err((e, event_time, diff)) => {
// Copy the whole time, and re-populate event time.
let mut time: S::Timestamp = time.clone();
*time.event_time() = event_time;
Err((e, time, diff))
}
})
}
});
use timely::dataflow::operators::ok_err::OkErr;
let (oks, errs) = stream.ok_err(|x| x);
use differential_dataflow::AsCollection;
let oks = oks.as_collection();
let errs = errs.as_collection();
(oks, errors.concat(&errs))
}
}
impl<'a, S: Scope, V: Data, T> CollectionBundle<Child<'a, S, S::Timestamp>, V, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// Extracts the collection bundle from a region.
pub fn leave_region(&self) -> CollectionBundle<S, V, T> {
CollectionBundle {
collection: self
.collection
.as_ref()
.map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
arranged: self
.arranged
.iter()
.map(|(key, bundle)| (key.clone(), bundle.leave_region()))
.collect(),
}
}
}
impl<S: Scope, T: Lattice> CollectionBundle<S, Row, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// Asserts that the arrangement for a specific key
/// (or the raw collection for no key) exists,
/// and returns the corresponding collection.
///
/// This returns the collection as-is, without
/// doing any unthinning transformation.
/// Therefore, it should be used when the appropriate transformation
/// was planned as part of a following MFP.
pub fn as_specific_collection(
&self,
key: Option<&[ScalarExpr]>,
) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
// Any operator that uses this method was told to use a particular
// collection during LIR planning, where we should have made
// sure that that collection exists.
//
// If it doesn't, we panic.
match key {
None => self
.collection
.clone()
.expect("The unarranged collection doesn't exist."),
Some(key) => self
.arranged
.get(key)
.unwrap_or_else(|| panic!("The collection arranged by {:?} doesn't exist.", key))
.as_collection(),
}
}
/// Constructs and applies logic to elements of a collection and returns the results.
///
/// `constructor` takes a permutation and produces the logic to apply on elements. The logic
/// conceptually receives `(&Row, &Row)` pairs in the form of a slice. Only after borrowing
/// the elements and applying the permutation the datums will be in the expected order.
///
/// If `key_val` is set, this is a promise that `logic` will produce no results on
/// records for which the key does not evaluate to the value. This is used when we
/// have an arrangement by that key to leap directly to exactly those records.
/// It is important that `logic` still guard against data that does not satisfy
/// this constraint, as this method does not statically know that it will have
/// that arrangement.
pub fn flat_map<I, C, L>(
&self,
key_val: Option<(Vec<ScalarExpr>, Option<Row>)>,
constructor: C,
) -> (
timely::dataflow::Stream<S, I::Item>,
Collection<S, DataflowError, Diff>,
)
where
I: IntoIterator,
I::Item: Data,
C: FnOnce() -> L,
L: for<'a, 'b> FnMut(&'a [&'b RefOrMut<'b, Row>], &'a S::Timestamp, &'a Diff) -> I
+ 'static,
{
// If `key_val` is set, we should have use the corresponding arrangement.
// If there isn't one, that implies an error in the contract between
// key-production and available arrangements.
if let Some((key, val)) = key_val {
let flavor = self
.arrangement(&key)
.expect("Should have ensured during planning that this arrangement exists.");
flavor.flat_map(val, constructor)
} else {
use timely::dataflow::operators::Map;
let (oks, errs) = self
.collection
.clone()
.expect("Invariant violated: CollectionBundle contains no collection.");
let mut logic = constructor();
(
oks.inner
.flat_map(move |(mut v, t, d)| logic(&[&RefOrMut::Mut(&mut v)], &t, &d)),
errs,
)
}
}
/// Factored out common logic for using literal keys in general traces.
///
/// This logic is sufficiently interesting that we want to write it only
/// once, and thereby avoid any skew in the two uses of the logic.
///
/// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
/// where key and value are rows.
fn flat_map_core<Tr, I, L>(
trace: &Arranged<S, Tr>,
key: Option<Row>,
mut logic: L,
refuel: usize,
) -> timely::dataflow::Stream<S, I::Item>
where
Tr: TraceReader<Key = Row, Val = Row, Time = S::Timestamp, R = repr::Diff>
+ Clone
+ 'static,
I: IntoIterator,
I::Item: Data,
L: for<'a, 'b> FnMut(
RefOrMut<'b, Row>,
RefOrMut<'b, Row>,
&'a S::Timestamp,
&'a repr::Diff,
) -> I
+ 'static,
{
let mode = if key.is_some() { "index" } else { "scan" };
let name = format!("ArrangementFlatMap({})", mode);
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
trace.stream.unary(Pipeline, &name, move |_, info| {
// Acquire an activator to reschedule the operator when it has unfinished work.
use timely::scheduling::Activator;
let activations = trace.stream.scope().activations();
let activator = Activator::new(&info.address[..], activations);
// Maintain a list of work to do, cursor to navigate and process.
let mut todo = std::collections::VecDeque::new();
move |input, output| {
// First, dequeue all batches.
input.for_each(|time, data| {
let capability = time.retain();
for batch in data.iter() {
// enqueue a capability, cursor, and batch.
todo.push_back(PendingWork::new(
capability.clone(),
batch.cursor(),
batch.clone(),
));
}
});
// Second, make progress on `todo`.
let mut fuel = refuel;
while !todo.is_empty() && fuel > 0 {
todo.front_mut()
.unwrap()
.do_work(&key, &mut logic, &mut fuel, output);
if fuel > 0 {
todo.pop_front();
}
}
// If we have not finished all work, re-activate the operator.
if !todo.is_empty() {
activator.activate();
}
}
})
}
/// Look up an arrangement by the expressions that form the key.
///
/// The result may be `None` if no such arrangement exists, or it may be one of many
/// "arrangement flavors" that represent the types of arranged data we might have.
pub fn arrangement(&self, key: &[ScalarExpr]) -> Option<ArrangementFlavor<S, Row, T>> {
self.arranged.get(key).cloned()
}
}
struct PendingWork<C>
where
C: Cursor,
C::Time: Timestamp,
{
capability: Capability<C::Time>,
cursor: C,
batch: C::Storage,
}
/// Handle specialized to `Vec`-based container.
type PendingOutputHandle<'a, C, I> = OutputHandle<
'a,
<C as Cursor>::Time,
<I as IntoIterator>::Item,
timely::dataflow::channels::pushers::Tee<<C as Cursor>::Time, <I as IntoIterator>::Item>,
>;
impl<C: Cursor> PendingWork<C>
where
C::Key: PartialEq,
C::Time: Timestamp,
{
/// Create a new bundle of pending work, from the capability, cursor, and backing storage.
fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
Self {
capability,
cursor,
batch,
}
}
/// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
fn do_work<I, L>(
&mut self,
key: &Option<C::Key>,
logic: &mut L,
fuel: &mut usize,
output: &mut PendingOutputHandle<'_, C, I>,
) where
I: IntoIterator,
I::Item: Data,
L: for<'a, 'b> FnMut(
RefOrMut<'b, C::Key>,
RefOrMut<'b, C::Val>,
&'a C::Time,
&'a C::R,
) -> I
+ 'static,
{
// Attempt to make progress on this batch.
let mut work: usize = 0;
let mut session = output.session(&self.capability);
if let Some(key) = key {
if self.cursor.get_key(&self.batch) != Some(key) {
self.cursor.seek_key(&self.batch, key);
}
if self.cursor.get_key(&self.batch) == Some(key) {
while let Some(val) = self.cursor.get_val(&self.batch) {
self.cursor.map_times(&self.batch, |time, diff| {
for datum in logic(RefOrMut::Ref(key), RefOrMut::Ref(val), time, diff) {
session.give(datum);
work += 1;
}
});
self.cursor.step_val(&self.batch);
if work >= *fuel {
*fuel = 0;
return;
}
}
}
} else {
while let Some(key) = self.cursor.get_key(&self.batch) {
while let Some(val) = self.cursor.get_val(&self.batch) {
self.cursor.map_times(&self.batch, |time, diff| {
for datum in logic(RefOrMut::Ref(key), RefOrMut::Ref(val), time, diff) {
session.give(datum);
work += 1;
}
});
self.cursor.step_val(&self.batch);
if work >= *fuel {
*fuel = 0;
return;
}
}
self.cursor.step_key(&self.batch);
}
}
*fuel -= work;
}
}

View File

@@ -0,0 +1,15 @@
//! for generate dataflow from logical plan and computing the dataflow
mod compute_state;
mod context;
mod plan;
mod render;
mod typedefs;
mod types;
pub use context::Context;
// TODO(discord9): make a simplified version of source/sink
// sink: simply get rows out of sinked collection/err collection and put it somewhere
// (R, T, D) row of course with since/until frontier to limit
// source: simply insert stuff into it

View File

@@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};
/// A delta query is implemented by a set of paths, one for each input.
///
/// Each delta query path responds to its input changes by repeated lookups
/// in arrangements for other join inputs. These lookups require specific
/// instructions about which expressions to use as keys. Along the way,
/// various closures are applied to filter and project as early as possible.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct DeltaJoinPlan {}

View File

@@ -0,0 +1,9 @@
use serde::{Deserialize, Serialize};
/// TODO(discord9): impl Join
/// A plan for the execution of a linear join.
///
/// A linear join is a sequence of stages, each of which introduces
/// a new collection. Each stage is represented by a [LinearStagePlan].
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct LinearJoinPlan {}

View File

@@ -0,0 +1,15 @@
use serde::{Deserialize, Serialize};
mod delta_join;
mod linear_join;
pub use delta_join::DeltaJoinPlan;
pub use linear_join::LinearJoinPlan;
/// TODO(discord9)(discord9): impl Join
/// A complete enumeration of possible join plans to render.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum JoinPlan {
/// A join implemented by a linear join.
Linear(LinearJoinPlan),
/// A join implemented by a delta join.
Delta(DeltaJoinPlan),
}

View File

@@ -0,0 +1,222 @@
mod join;
mod reduce;
use std::collections::BTreeMap;
use join::JoinPlan;
pub(crate) use reduce::{
convert_indexes_to_skips, AccumulablePlan, BucketedPlan, KeyValPlan, ReducePlan,
};
use serde::{Deserialize, Serialize};
use crate::expr::{Id, LocalId, MapFilterProject, ScalarExpr, TableFunc};
use crate::repr::{self, Diff, Row};
use crate::storage::errors::EvalError;
/// The forms in which an operator's output is available;
/// it can be considered the plan-time equivalent of
/// `render::context::CollectionBundle`.
///
/// These forms are either "raw", representing an unarranged collection,
/// or "arranged", representing one that has been arranged by some key.
///
/// The raw collection, if it exists, may be consumed directly.
///
/// The arranged collections are slightly more complicated:
/// Each key here is attached to a description of how the corresponding
/// arrangement is permuted to remove value columns
/// that are redundant with key columns. Thus, the first element in each
/// tuple of `arranged` is the arrangement key; the second is the map of
/// logical output columns to columns in the key or value of the deduplicated
/// representation, and the third is a "thinning expression",
/// or list of columns to include in the value
/// when arranging.
///
/// For example, assume a 5-column collection is to be arranged by the key
/// `[Column(2), Column(0) + Column(3), Column(1)]`.
/// Then `Column(1)` and `Column(2)` in the value are redundant with the key, and
/// only columns 0, 3, and 4 need to be stored separately.
/// The thinning expression will then be `[0, 3, 4]`.
///
/// The permutation represents how to recover the
/// original values (logically `[Column(0), Column(1), Column(2), Column(3), Column(4)]`)
/// from the key and value of the arrangement, logically
/// `[Column(2), Column(0) + Column(3), Column(1), Column(0), Column(3), Column(4)]`.
/// Thus, the permutation in this case should be `{0: 3, 1: 2, 2: 0, 3: 4, 4: 5}`.
///
/// Note that this description, while true at the time of writing, is merely illustrative;
/// users of this struct should not rely on the exact strategy used for generating
/// the permutations. As long as clients apply the thinning expression
/// when creating arrangements, and permute by the hashmap when reading them,
/// the contract of the function where they are generated (`expr::permutation_for_arrangement`)
/// ensures that the correct values will be read.
#[derive(Default, Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct AvailableCollections {
/// Whether the collection exists in unarranged form.
pub raw: bool,
/// The set of arrangements of the collection, along with a
/// column permutation mapping
pub arranged: Vec<KeyWithColumnPermutation>,
}
pub type KeyWithColumnPermutation = (Vec<ScalarExpr>, BTreeMap<usize, usize>, Vec<usize>);
impl AvailableCollections {
/// Represent a collection that has no arrangements.
pub fn new_raw() -> Self {
Self {
raw: true,
arranged: vec![],
}
}
/// Represent a collection that is arranged in the
/// specified ways.
pub fn new_arranged(arranged: Vec<KeyWithColumnPermutation>) -> Self {
assert!(
!arranged.is_empty(),
"Invariant violated: at least one collection must exist"
);
Self {
raw: false,
arranged,
}
}
}
/// Rendering Plan
///
/// TODO(discord9): see if we ever need to support recursive plans
#[derive(Debug, Clone)]
pub enum Plan<T = repr::Timestamp> {
/// A collection containing a pre-determined collection.
Constant {
rows: Result<Vec<(Row, T, Diff)>, EvalError>,
},
/// A reference to a bound collection.
///
/// This is commonly either an external reference to an existing source or
/// maintained arrangement, or an internal reference to a `Let` identifier.
Get {
id: Id,
keys: AvailableCollections,
plan: GetPlan,
},
/// Binds `value` to `id`, and then results in `body` with that binding.
///
/// This stage has the effect of sharing `value` across multiple possible
/// uses in `body`, and is the only mechanism we have for sharing collection
/// information across parts of a dataflow.
///
/// The binding is not available outside of `body`.
Let {
/// The local identifier to be used, available to `body` as `Id::Local(id)`.
id: LocalId,
/// The collection that should be bound to `id`.
value: Box<Plan<T>>,
/// The collection that results, which is allowed to contain `Get` stages
/// that reference `Id::Local(id)`.
body: Box<Plan<T>>,
},
/// Map, Filter, and Project operators.
///
/// This stage contains work that we would ideally like to fuse to other plan
/// stages, but for practical reasons cannot. For example: reduce, threshold,
/// and topk stages are not able to absorb this operator.
Mfp {
/// The input collection.
input: Box<Plan<T>>,
/// Linear operator to apply to each record.
mfp: MapFilterProject,
/// Whether the input is from an arrangement, and if so,
/// whether we can seek to a specific value therein
input_key_val: Option<(Vec<ScalarExpr>, Option<Row>)>,
},
/// A variable number of output records for each input record.
///
/// This stage is a bit of a catch-all for logic that does not easily fit in
/// map stages. This includes table valued functions, but also functions of
/// multiple arguments, and functions that modify the sign of updates.
///
/// This stage allows a `MapFilterProject` operator to be fused to its output,
/// and this can be very important as otherwise the output of `func` is just
/// appended to the input record, for as many outputs as it has. This has the
/// unpleasant default behavior of repeating potentially large records that
/// are being unpacked, producing quadratic output in those cases. Instead,
/// in these cases use a `mfp` member that projects away these large fields.
FlatMap {
/// The input collection.
input: Box<Plan<T>>,
/// The variable-record emitting function.
func: TableFunc,
/// Expressions that for each row prepare the arguments to `func`.
exprs: Vec<ScalarExpr>,
/// Linear operator to apply to each record produced by `func`.
mfp: MapFilterProject,
/// The particular arrangement of the input we expect to use,
/// if any
input_key: Option<Vec<ScalarExpr>>,
},
/// A multiway relational equijoin, with fused map, filter, and projection.
///
/// This stage performs a multiway join among `inputs`, using the equality
/// constraints expressed in `plan`. The plan also describes the implementation
/// strategy we will use, and any pushed down per-record work.
Join {
/// An ordered list of inputs that will be joined.
inputs: Vec<Plan<T>>,
/// Detailed information about the implementation of the join.
///
/// This includes information about the implementation strategy, but also
/// any map, filter, project work that we might follow the join with, but
/// potentially pushed down into the implementation of the join.
plan: JoinPlan,
},
/// Aggregation by key.
Reduce {
/// The input collection.
input: Box<Plan<T>>,
/// A plan for changing input records into key, value pairs.
key_val_plan: KeyValPlan,
/// A plan for performing the reduce.
///
/// The implementation of reduction has several different strategies based
/// on the properties of the reduction, and the input itself. Please check
/// out the documentation for this type for more detail.
plan: ReducePlan,
/// The particular arrangement of the input we expect to use,
/// if any
input_key: Option<Vec<ScalarExpr>>,
},
}
/// TODO(discord9): impl GetPlan
#[derive(Debug, Clone)]
pub enum GetPlan {
/// Simply pass input arrangements on to the next stage.
PassArrangements,
/// Using the supplied key, optionally seek the row, and apply the MFP.
Arrangement(Vec<ScalarExpr>, Option<Row>, MapFilterProject),
/// Scan the input collection (unarranged) and apply the MFP.
Collection(MapFilterProject),
}
/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
/// on the expected number of rows that will have the same group key.
fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
let mut buckets = vec![];
let mut current = 16;
// Plan for 4B records in the expected case if the user didn't specify a group size.
let limit = expected_group_size.unwrap_or(4_000_000_000);
// Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
// each layer gets from the preceding layer, while also limiting the number of layers.
while current < limit {
buckets.push(current);
current = current.saturating_mul(16);
}
buckets.reverse();
buckets
}

View File

@@ -0,0 +1,233 @@
use serde::{Deserialize, Serialize};
use crate::expr::{AggregateExpr, AggregateFunc, MapFilterProject, SafeMfpPlan};
/// This enum represents the three potential types of aggregations.
#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub enum ReductionType {
/// Accumulable functions can be subtracted from (are invertible), and associative.
/// We can compute these results by moving some data to the diff field under arbitrary
/// changes to inputs. Examples include sum or count.
Accumulable,
/// Hierarchical functions are associative, which means we can split up the work of
/// computing them across subsets. Note that hierarchical reductions should also
/// reduce the data in some way, as otherwise rendering them hierarchically is not
/// worth it. Examples include min or max.
Hierarchical,
/// Basic, for lack of a better word, are functions that are neither accumulable
/// nor hierarchical. Examples include jsonb_agg.
Basic,
}
/// Plan for extracting keys and values in preparation for a reduction.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct KeyValPlan {
/// Extracts the columns used as the key.
pub key_plan: SafeMfpPlan,
/// Extracts the columns used to feed the aggregations.
pub val_plan: SafeMfpPlan,
}
/// Transforms a vector containing indexes of needed columns into one containing
/// the "skips" an iterator over a Row would need to perform to see those values.
///
/// This function requires that all of the elements in `indexes` are strictly
/// increasing.
///
/// # Examples
///
/// ```
/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
/// ```
pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
for i in 1..indexes.len() {
assert!(
indexes[i - 1] < indexes[i],
"convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
indexes,
);
}
for i in (1..indexes.len()).rev() {
indexes[i] -= indexes[i - 1];
indexes[i] -= 1;
}
indexes
}
/// A `ReducePlan` provides a concise description for how we will
/// execute a given reduce expression.
///
/// The provided reduce expression can have no
/// aggregations, in which case its just a `Distinct` and otherwise
/// it's composed of a combination of accumulable, hierarchical and
/// basic aggregations.
///
/// We want to try to centralize as much decision making about the
/// shape / general computation of the rendered dataflow graph
/// in this plan, and then make actually rendering the graph
/// be as simple (and compiler verifiable) as possible.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum ReducePlan {
/// Plan for not computing any aggregations, just determining the set of
/// distinct keys.
Distinct,
/// Plan for computing only accumulable aggregations.
Accumulable(AccumulablePlan),
/// Plan for computing only hierarchical aggregations.
Hierarchical(HierarchicalPlan),
/// Plan for computing only basic aggregations.
Basic(BasicPlan),
/// Plan for computing a mix of different kinds of aggregations.
/// We need to do extra work here to reassemble results back in the
/// requested order.
Collation(CollationPlan),
}
/// Plan for computing a set of accumulable aggregations.
///
/// We fuse all of the accumulable aggregations together
/// and compute them with one dataflow fragment. We need to
/// be careful to separate out the aggregations that
/// apply only to the distinct set of values. We need
/// to apply a distinct operator to those before we
/// combine them with everything else.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct AccumulablePlan {
/// All of the aggregations we were asked to compute, stored
/// in order.
pub full_aggrs: Vec<AggregateExpr>,
/// All of the non-distinct accumulable aggregates.
/// Each element represents:
/// (index of the aggregation among accumulable aggregations,
/// index of the datum among inputs, aggregation expr)
/// These will all be rendered together in one dataflow fragment.
pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
/// Same as above but for all of the `DISTINCT` accumulable aggregations.
pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
}
// TODO(discord9): others
/// Plan for computing a set of hierarchical aggregations.
///
/// In the append-only setting we can render them in-place
/// with monotonic plans, but otherwise, we need to render
/// them with a reduction tree that splits the inputs into
/// small, and then progressively larger, buckets
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum HierarchicalPlan {
/// Plan hierarchical aggregations under monotonic inputs.
Monotonic(MonotonicPlan),
/// Plan for hierarchical aggregations under non-monotonic inputs.
Bucketed(BucketedPlan),
}
/// Plan for computing a set of hierarchical aggregations with a
/// monotonic input.
///
/// Here, the aggregations will be rendered in place. We don't
/// need to worry about retractions because the inputs are
/// append only, so we can change our computation to
/// only retain the "best" value in the diff field, instead
/// of holding onto all values.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct MonotonicPlan {
/// All of the aggregations we were asked to compute.
pub aggr_funcs: Vec<AggregateFunc>,
/// Set of "skips" or calls to `nth()` an iterator needs to do over
/// the input to extract the relevant datums.
pub skips: Vec<usize>,
/// True if the input is logically but not physically monotonic,
/// and the operator must first consolidate the inputs to remove
/// potential negations.
pub must_consolidate: bool,
}
/// Plan for computing a set of hierarchical aggregations
/// with non-monotonic inputs.
///
/// To perform hierarchical aggregations with stable runtimes
/// under updates we'll subdivide the group key into buckets, compute
/// the reduction in each of those subdivided buckets and then combine
/// the results into a coarser bucket (one that represents a larger
/// fraction of the original input) and redo the reduction in another
/// layer. Effectively, we'll construct a min / max heap out of a series
/// of reduce operators (each one is a separate layer).
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct BucketedPlan {
/// All of the aggregations we were asked to compute.
pub aggr_funcs: Vec<AggregateFunc>,
/// Set of "skips" or calls to `nth()` an iterator needs to do over
/// the input to extract the relevant datums.
pub skips: Vec<usize>,
/// The number of buckets in each layer of the reduction tree. Should
/// be decreasing, and ideally, a power of two so that we can easily
/// distribute values to buckets with `value.hashed() % buckets[layer]`.
pub buckets: Vec<u64>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum BasicPlan {}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct CollationPlan {}
/// Determines whether a function can be accumulated in an update's "difference" field,
/// and whether it can be subjected to recursive (hierarchical) aggregation.
///
/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
/// which can be accumulated in-place using the addition operation on the type. Aggregations
/// that indicate they are accumulable will still need to provide an action that takes their
/// data and introduces it as a difference, and the post-processing when the accumulated value
/// is presented as data.
///
/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
/// increasingly large subsets of each key. This has the intended property that no invocation
/// is on a significantly large set of values (and so, no incremental update needs to reform
/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
/// input stream is append-only as then we only need to retain the "currently winning" value.
/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
fn reduction_type(func: &AggregateFunc) -> ReductionType {
match func {
AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64
| AggregateFunc::SumFloat32
| AggregateFunc::SumFloat64
| AggregateFunc::Count
| AggregateFunc::Any
| AggregateFunc::All => ReductionType::Accumulable,
AggregateFunc::MaxInt16
| AggregateFunc::MaxInt32
| AggregateFunc::MaxInt64
| AggregateFunc::MaxUInt16
| AggregateFunc::MaxUInt32
| AggregateFunc::MaxUInt64
| AggregateFunc::MaxFloat32
| AggregateFunc::MaxFloat64
| AggregateFunc::MaxBool
| AggregateFunc::MaxString
| AggregateFunc::MaxDate
| AggregateFunc::MaxTimestamp
| AggregateFunc::MaxTimestampTz
| AggregateFunc::MinInt16
| AggregateFunc::MinInt32
| AggregateFunc::MinInt64
| AggregateFunc::MinUInt16
| AggregateFunc::MinUInt32
| AggregateFunc::MinUInt64
| AggregateFunc::MinFloat32
| AggregateFunc::MinFloat64
| AggregateFunc::MinBool
| AggregateFunc::MinString
| AggregateFunc::MinDate
| AggregateFunc::MinTimestamp
| AggregateFunc::MinTimestampTz => ReductionType::Hierarchical,
_ => ReductionType::Basic,
}
}

View File

@@ -0,0 +1,60 @@
use std::hash::Hash;
use differential_dataflow::ExchangeData;
use crate::repr::Row;
/// Used to make possibly-validating code generic: think of this as a kind of `MaybeResult`,
/// specialized for use in compute. Validation code will only run when the error constructor is
/// Some.
pub(super) trait MaybeValidatingRow<T, E>: ExchangeData + Hash {
fn ok(t: T) -> Self;
fn into_error() -> Option<fn(E) -> Self>;
}
impl<E> MaybeValidatingRow<Row, E> for Row {
fn ok(t: Row) -> Self {
t
}
fn into_error() -> Option<fn(E) -> Self> {
None
}
}
impl<E> MaybeValidatingRow<(), E> for () {
fn ok(t: ()) -> Self {
t
}
fn into_error() -> Option<fn(E) -> Self> {
None
}
}
impl<E, R> MaybeValidatingRow<Vec<R>, E> for Vec<R>
where
R: ExchangeData + Hash,
{
fn ok(t: Vec<R>) -> Self {
t
}
fn into_error() -> Option<fn(E) -> Self> {
None
}
}
impl<T, E> MaybeValidatingRow<T, E> for Result<T, E>
where
T: ExchangeData + Hash,
E: ExchangeData + Hash,
{
fn ok(row: T) -> Self {
Ok(row)
}
fn into_error() -> Option<fn(E) -> Self> {
Some(Err)
}
}

View File

@@ -0,0 +1,626 @@
//! for building the flow graph from PLAN
//! this is basically the last step before actually running the flow graph
use differential_dataflow::lattice::Lattice;
use differential_dataflow::AsCollection;
use timely::communication::Allocate;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::operators::{Capture, ToStream};
use timely::dataflow::Scope;
use timely::progress::timestamp::Refines;
use timely::progress::Timestamp;
use timely::worker::Worker as TimelyWorker;
use super::types::DataflowDescription;
use crate::compute::compute_state::ComputeState;
use crate::compute::context::CollectionBundle;
use crate::compute::plan::Plan;
use crate::compute::types::BuildDesc;
use crate::compute::Context;
use crate::expr::Id;
use crate::repr::{self, Row};
use crate::storage::errors::DataflowError;
mod error;
mod reduce;
/// Assemble the "compute" side of a dataflow, i.e. all but the sources.
///
/// This method imports sources from provided assets, and then builds the remaining
/// dataflow using "compute-local" assets like shared arrangements, and producing
/// both arrangements and sinks.
pub fn build_compute_dataflow<A: Allocate>(
timely_worker: &mut TimelyWorker<A>,
compute_state: &mut ComputeState,
dataflow: DataflowDescription<Plan, ()>,
) {
todo!()
}
pub trait RenderTimestamp: Timestamp + Lattice + Refines<repr::Timestamp> {
/// The system timestamp component of the timestamp.
///
/// This is useful for manipulating the system time, as when delaying
/// updates for subsequent cancellation, as with monotonic reduction.
fn system_time(&mut self) -> &mut repr::Timestamp;
/// Effects a system delay in terms of the timestamp summary.
fn system_delay(delay: repr::Timestamp) -> <Self as Timestamp>::Summary;
/// The event timestamp component of the timestamp.
fn event_time(&mut self) -> &mut repr::Timestamp;
/// Effects an event delay in terms of the timestamp summary.
fn event_delay(delay: repr::Timestamp) -> <Self as Timestamp>::Summary;
/// Steps the timestamp back so that logical compaction to the output will
/// not conflate `self` with any historical times.
fn step_back(&self) -> Self;
}
impl RenderTimestamp for repr::Timestamp {
fn system_time(&mut self) -> &mut repr::Timestamp {
self
}
fn system_delay(delay: repr::Timestamp) -> <Self as Timestamp>::Summary {
delay
}
fn event_time(&mut self) -> &mut repr::Timestamp {
self
}
fn event_delay(delay: repr::Timestamp) -> <Self as Timestamp>::Summary {
delay
}
fn step_back(&self) -> Self {
self.saturating_sub(1)
}
}
// This implementation block allows child timestamps to vary from parent timestamps.
impl<G> Context<G, Row>
where
G: Scope,
G::Timestamp: RenderTimestamp,
{
/// render plan and insert into context with given GlobalId
pub(crate) fn build_object(&mut self, object: BuildDesc<Plan>) {
// First, transform the relation expression into a render plan.
let bundle = self.render_plan(object.plan);
self.insert_id(Id::Global(object.id), bundle);
}
}
impl<S> Context<S, Row>
where
S: Scope,
S::Timestamp: RenderTimestamp,
{
/// Renders a plan to a differential dataflow, producing the collection of results.
///
/// The return type reflects the uncertainty about the data representation, perhaps
/// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
pub fn render_plan(&mut self, plan: Plan) -> CollectionBundle<S, Row> {
match plan {
Plan::Constant { rows } => {
let (rows, errs) = match rows {
Ok(rows) => (rows, Vec::new()),
Err(err) => (Vec::new(), vec![err]),
};
let since_frontier = self.since_frontier.clone();
let until = self.until_frontier.clone();
let ok_collection = rows
.into_iter()
.filter_map(move |(row, mut time, diff)| {
time.advance_by(since_frontier.borrow());
if !until.less_equal(&time) {
Some((
row,
<S::Timestamp as Refines<repr::Timestamp>>::to_inner(time),
diff,
))
} else {
None
}
})
.to_stream(&mut self.scope)
.as_collection();
let mut error_time: repr::Timestamp = Timestamp::minimum();
error_time.advance_by(self.since_frontier.borrow());
let err_collection = errs
.into_iter()
.map(move |e| {
(
DataflowError::from(e),
<S::Timestamp as Refines<repr::Timestamp>>::to_inner(error_time),
1,
)
})
.to_stream(&mut self.scope)
.as_collection();
CollectionBundle::from_collections(ok_collection, err_collection)
}
Plan::Get { id, keys, plan } => {
// Recover the collection from `self` and then apply `mfp` to it.
// If `mfp` happens to be trivial, we can just return the collection.
let mut collection = self
.lookup_id(id)
.unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
match plan {
crate::compute::plan::GetPlan::PassArrangements => {
// Assert that each of `keys` are present in `collection`.
if !keys
.arranged
.iter()
.all(|(key, _, _)| collection.arranged.contains_key(key))
{
let not_included: Vec<_> = keys
.arranged
.iter()
.filter(|(key, _, _)| !collection.arranged.contains_key(key))
.map(|(key, _, _)| key)
.collect();
panic!(
"Those keys {:?} is not included in collections keys:{:?}",
not_included,
collection.arranged.keys().cloned().collect::<Vec<_>>()
);
}
assert!(keys.raw <= collection.collection.is_some());
// Retain only those keys we want to import.
collection.arranged.retain(|key, _val| {
keys.arranged.iter().any(|(key2, _, _)| key2 == key)
});
collection
}
crate::compute::plan::GetPlan::Arrangement(key, row, mfp) => {
let (oks, errs) = collection.as_collection_core(
mfp,
Some((key, row)),
self.until_frontier.clone(),
);
CollectionBundle::from_collections(oks, errs)
}
crate::compute::plan::GetPlan::Collection(mfp) => {
let (oks, errs) =
collection.as_collection_core(mfp, None, self.until_frontier.clone());
CollectionBundle::from_collections(oks, errs)
}
}
}
Plan::Let { id, value, body } => {
// Render `value` and bind it to `id`. Complain if this shadows an id.
let value = self.render_plan(*value);
let prebound = self.insert_id(Id::Local(id), value);
assert!(prebound.is_none());
let body = self.render_plan(*body);
self.remove_id(Id::Local(id));
body
}
Plan::Mfp {
input,
mfp,
input_key_val,
} => {
let input = self.render_plan(*input);
// If `mfp` is non-trivial, we should apply it and produce a collection.
if mfp.is_identity() {
input
} else {
let (oks, errs) =
input.as_collection_core(mfp, input_key_val, self.until_frontier.clone());
CollectionBundle::from_collections(oks, errs)
}
}
Plan::Reduce {
input,
key_val_plan,
plan,
input_key,
} => {
let input = self.render_plan(*input);
self.render_reduce(input, key_val_plan, plan, input_key)
}
_ => todo!("To be implemented"),
}
}
}
#[cfg(test)]
mod test {
use std::any::Any;
use std::collections::{BTreeMap, BTreeSet};
use std::rc::Rc;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use differential_dataflow::input::{Input, InputSession};
use differential_dataflow::Collection;
use timely::dataflow::scopes::Child;
use timely::dataflow::Stream;
use timely::Config;
use super::*;
use crate::compute::plan::{
AccumulablePlan, AvailableCollections, GetPlan, KeyValPlan, ReducePlan,
};
use crate::expr::{
AggregateExpr, BinaryFunc, GlobalId, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr,
UnaryFunc,
};
use crate::repr::Diff;
type OkStream<G> = Stream<G, (Row, repr::Timestamp, Diff)>;
type ErrStream<G> = Stream<G, (DataflowError, repr::Timestamp, Diff)>;
type OkCollection<G> = Collection<G, Row, Diff>;
type ErrCollection<G> = Collection<G, DataflowError, Diff>;
/// used as a token to prevent certain resources from being dropped
type AnyToken = Rc<dyn Any>;
struct MockSourceToken {
handle: InputSession<repr::Timestamp, Row, Diff>,
err_handle: InputSession<repr::Timestamp, DataflowError, Diff>,
}
fn mock_input_session(input: &mut InputSession<repr::Timestamp, Row, Diff>, cnt: i64) {
// TODO: mock a cpu usage monotonic input with timestamp
// cpu, mem, ts
// f32, f32, DateTime
let schema = [
ConcreteDataType::float32_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::datetime_datatype(),
];
let arrs = (0..cnt).map(|i| (i as f32 / cnt as f32, i as f32 / cnt as f32, i));
// need more mechanism to make timestamp also timestamp here
for (cpu, mem, ts) in arrs {
input.update(
Row::pack(vec![cpu.into(), mem.into(), Value::DateTime(ts.into())]),
1,
);
input.advance_to(ts as u64)
}
input.flush();
}
// a simple test to see if the dataflow can be built and run
fn exec_dataflow(
input_id: Vec<Id>,
dataflow: DataflowDescription<Plan>,
sink_ids: Vec<GlobalId>,
output_keys: Vec<Option<Vec<ScalarExpr>>>,
input_mock_length: i64,
) {
timely::execute(Config::thread(), move |worker| {
println!("worker: {:?}", worker.index());
let mut input = InputSession::<repr::Timestamp, Row, Diff>::new();
worker.dataflow_named(
"ProofOfConcept",
|scope: &mut Child<'_, _, repr::Timestamp>| {
let mut test_ctx =
Context::<_, Row, _>::for_dataflow_in(&dataflow, scope.clone());
let ok_collection = input.to_collection(scope);
let (err_handle, err_collection) = scope.new_collection();
let input_collection =
CollectionBundle::<_, _, repr::Timestamp>::from_collections(
ok_collection,
err_collection,
);
// TODO: generate `import_sources` from `dataflow.source_imports`
let import_sources: Vec<_> = input_id
.clone()
.into_iter()
.zip(vec![input_collection])
.collect();
// import sources
for (id, collection) in import_sources {
test_ctx.insert_id(id, collection);
}
for build_desc in &dataflow.objects_to_build {
test_ctx.build_object(build_desc.clone());
}
dbg!(test_ctx.bindings.keys());
// TODO: export sinks
for (sink, output_key) in sink_ids.iter().zip(output_keys.iter()) {
let sink = *sink;
println!("Inspecting sink {:?}", sink.clone());
let inspect = test_ctx.lookup_id(Id::Global(sink)).unwrap();
dbg!(inspect.collection.is_some());
dbg!(inspect.arranged.keys());
let inspect = inspect.as_specific_collection(output_key.as_deref());
inspect
.0
.inspect(move |x| println!("inspect {:?} {:?}", sink.clone(), x));
}
},
);
mock_input_session(&mut input, input_mock_length);
})
.expect("Computation terminated abnormally");
}
#[test]
fn test_simple_poc_reduce_group_by() {
// 1. build dataflow with input collection connected
// 2. give input
// type annotation is needed to prevent rust-analyzer to give up type deduction
// simple give dataflow information
// will be build by given dataflow information from other nodes later
// key is the third column
let place_holder =
ScalarExpr::Literal(Ok(Value::Boolean(true)), ConcreteDataType::int64_datatype());
let count_col = |i: usize| AggregateExpr {
func: crate::expr::AggregateFunc::Count,
expr: ScalarExpr::Column(i),
distinct: false,
};
let sum_col = |i: usize| AggregateExpr {
func: crate::expr::AggregateFunc::SumFloat32,
expr: ScalarExpr::Column(i),
distinct: false,
};
// equal to `SELECT minute, SUM(cpu) FROM input GROUP BY ts/300 as minute;
// cpu, mem, ts
// --map--> cpu, mem, ts/300
// --reduce--> ts/300, AVG(cpu), AVG(mem)
let cast_datetime = ScalarExpr::CallUnary {
func: UnaryFunc::CastDatetimeToInt64,
expr: Box::new(ScalarExpr::Column(2)),
};
let ts_div_5 = ScalarExpr::CallBinary {
func: BinaryFunc::DivInt64,
expr1: Box::new(cast_datetime),
expr2: Box::new(ScalarExpr::Literal(
Ok(Value::Int64(5.into())),
ConcreteDataType::int64_datatype(),
)),
};
let cast_int64_to_float32 = |i: usize| ScalarExpr::CallUnary {
func: UnaryFunc::CastInt64ToFloat32,
expr: Box::new(ScalarExpr::Column(i)),
};
let reduce_group_by_window = vec![
// cpu, mem, ts
// --reduce--> ts/300, SUM(cpu), SUM(mem), COUNT(cpu), COUNT(mem)
// -- map --> ts/300, AVG(cpu), AVG(mem)
BuildDesc {
id: GlobalId::User(0),
plan: Plan::Reduce {
input: Box::new(Plan::Get {
id: Id::Global(GlobalId::System(0)),
keys: AvailableCollections::new_raw(),
plan: GetPlan::Collection(
MapFilterProject::new(3).map([ts_div_5]).project([0, 1, 3]),
),
}),
key_val_plan: KeyValPlan {
key_plan: SafeMfpPlan {
mfp: MapFilterProject::new(3).project([2]),
},
val_plan: SafeMfpPlan {
mfp: MapFilterProject::new(3).project([0, 1]),
},
},
// --reduce--> ts/300(key), SUM(cpu), SUM(mem), COUNT(cpu), COUNT(mem)
plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![sum_col(0), sum_col(1), count_col(0), count_col(1)],
simple_aggrs: vec![
(0, 0, sum_col(0)),
(1, 1, sum_col(1)),
(2, 0, count_col(0)),
(3, 1, count_col(1)),
],
distinct_aggrs: vec![],
}),
input_key: None,
},
},
// 0 1 2 3 4
// ts/300(key), SUM(cpu), SUM(mem), COUNT(cpu), COUNT(mem),
// -- map --> AVG(cpu), AVG(mem), ts/300
BuildDesc {
id: GlobalId::User(1),
plan: Plan::Get {
id: Id::Global(GlobalId::User(0)),
// not used since plan is GetPlan::Arrangement
keys: AvailableCollections::new_raw(),
plan: GetPlan::Arrangement(
vec![ScalarExpr::Column(0)],
None,
MapFilterProject::new(5)
.map([
ScalarExpr::CallBinary {
func: BinaryFunc::DivFloat32,
expr1: Box::new(ScalarExpr::Column(1)),
expr2: Box::new(cast_int64_to_float32(3)),
},
ScalarExpr::CallBinary {
func: BinaryFunc::DivFloat32,
expr1: Box::new(ScalarExpr::Column(2)),
expr2: Box::new(cast_int64_to_float32(4)),
},
])
.project([0, 5, 6]),
),
},
},
];
let input_id = vec![Id::Global(GlobalId::System(0))];
let dataflow = {
let mut dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
dataflow.objects_to_build = reduce_group_by_window;
dataflow
};
let sink_ids = [GlobalId::User(0), GlobalId::User(1)];
exec_dataflow(
input_id.clone(),
dataflow.clone(),
sink_ids.to_vec(),
vec![Some(vec![ScalarExpr::Column(0)]), None],
10,
);
}
#[test]
fn test_simple_poc_reduce_count() {
// 1. build dataflow with input collection connected
// 2. give input
// type annotation is needed to prevent rust-analyzer to give up type deduction
// simple give dataflow information
// will be build by given dataflow information from other nodes later
// key is the third column
let place_holder =
ScalarExpr::Literal(Ok(Value::Boolean(true)), ConcreteDataType::int64_datatype());
let key_plan = SafeMfpPlan {
mfp: MapFilterProject::new(3)
.map([place_holder.clone()])
.project([3]),
};
let val_plan = SafeMfpPlan {
mfp: MapFilterProject::new(3).project([0, 1, 2]),
};
let count = AggregateExpr {
func: crate::expr::AggregateFunc::Count,
expr: place_holder,
distinct: false,
};
// equal to `SELECT COUNT(*) FROM input;`
let reduce_group_by_window = vec![
// count(true)
BuildDesc {
id: GlobalId::User(0),
plan: Plan::Reduce {
input: Box::new(Plan::Get {
id: Id::Global(GlobalId::System(0)),
keys: AvailableCollections::new_raw(),
plan: GetPlan::Collection(MapFilterProject::new(3)),
}),
key_val_plan: KeyValPlan { key_plan, val_plan },
plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![count.clone()],
simple_aggrs: vec![(0, 0, count)],
distinct_aggrs: vec![],
}),
input_key: None,
},
},
// get second column
BuildDesc {
id: GlobalId::User(1),
plan: Plan::Get {
id: Id::Global(GlobalId::User(0)),
// not used since plan is GetPlan::Arrangement
keys: AvailableCollections::new_raw(),
plan: GetPlan::Arrangement(
vec![ScalarExpr::Column(0)],
None,
MapFilterProject::new(2).project([1]),
),
},
},
];
let input_id = vec![Id::Global(GlobalId::System(0))];
let dataflow = {
let mut dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
dataflow.objects_to_build = reduce_group_by_window;
dataflow
};
let sink_ids = [GlobalId::User(1)];
exec_dataflow(
input_id.clone(),
dataflow.clone(),
sink_ids.to_vec(),
vec![None],
10,
);
}
#[test]
fn test_simple_poc_reduce_distinct() {
// 1. build dataflow with input collection connected
// 2. give input
// type annotation is needed to prevent rust-analyzer to give up type deduction
// simple give dataflow information
// will be build by given dataflow information from other nodes later
// window need date_trunc which is still WIP
// key is the third column
let key_plan = SafeMfpPlan {
mfp: MapFilterProject::new(3).project([2]),
};
let val_plan = SafeMfpPlan {
mfp: MapFilterProject::new(3).project([0, 1]),
};
// equal to `SELECT ts, COUNT(*) FROM input GROUP BY ts;`
let reduce_plan = vec![BuildDesc {
id: GlobalId::User(0),
plan: Plan::Reduce {
input: Box::new(Plan::Get {
id: Id::Global(GlobalId::System(0)),
keys: AvailableCollections::new_raw(),
plan: GetPlan::Collection(MapFilterProject::new(3)),
}),
key_val_plan: KeyValPlan { key_plan, val_plan },
plan: ReducePlan::Distinct,
input_key: None,
},
}];
let input_id = vec![Id::Global(GlobalId::System(0))];
let dataflow = {
let mut dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
dataflow.objects_to_build = reduce_plan;
dataflow
};
let sink_ids = [GlobalId::User(0)];
exec_dataflow(
input_id.clone(),
dataflow.clone(),
sink_ids.to_vec(),
vec![Some(vec![ScalarExpr::Column(0)])],
10,
);
}
#[test]
#[allow(clippy::print_stdout)]
fn test_constant_plan_render() {
let build_descs = vec![BuildDesc {
id: GlobalId::User(0),
plan: Plan::Constant {
rows: Ok(vec![(Row::default(), 0, 1)]),
},
}];
let dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
timely::execute_from_args(std::iter::empty::<String>(), move |worker| {
println!("worker: {:?}", worker.index());
let mut input = InputSession::<repr::Timestamp, Row, Diff>::new();
worker.dataflow(|scope: &mut Child<'_, _, repr::Timestamp>| {
let mut test_ctx = Context::<_, Row, _>::for_dataflow_in(&dataflow, scope.clone());
for build_desc in &build_descs {
test_ctx.build_object(build_desc.clone());
}
let input_collection = input.to_collection(scope);
let err_collection = InputSession::new().to_collection(scope);
let input_collection =
CollectionBundle::from_collections(input_collection, err_collection);
// insert collection
test_ctx.insert_id(Id::Local(LocalId(0)), input_collection);
let inspect = test_ctx
.lookup_id(Id::Global(GlobalId::User(0)))
.unwrap()
.as_specific_collection(None);
inspect.0.inspect(|x| println!("inspect {:?}", x));
});
// input.insert(Row::default());
input.update(Row::default(), 1);
input.advance_to(1);
})
.expect("Computation terminated abnormally");
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,20 @@
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
use crate::repr::{Diff, Row, Timestamp};
use crate::storage::errors::DataflowError;
// TODO(discord9): consider use ColValSpine for columnation storage
/// T: Time, R: Diff, O: Offset
pub type RowSpine<K, V, T, R, O = usize> = OrdValSpine<K, V, T, R, O>;
/// T: Time, R: Diff, O: Offset
pub type RowKeySpine<K, T, R, O = usize> = OrdKeySpine<K, T, R, O>;
/// T: Time, R: Diff, O: Offset
pub type ErrSpine<K, T, R, O = usize> = OrdKeySpine<K, T, R, O>;
/// T: Time, R: Diff, O: Offset
pub type ErrValSpine<K, T, R, O = usize> = OrdValSpine<K, DataflowError, T, R, O>;
pub type TraceRowHandle<K, V, T, R> = TraceAgent<RowSpine<K, V, T, R>>;
pub type TraceErrHandle<K, T, R> = TraceAgent<ErrSpine<K, T, R>>;
pub type KeysValsHandle = TraceRowHandle<Row, Row, Timestamp, Diff>;
pub type ErrsHandle = TraceErrHandle<DataflowError, Timestamp, Diff>;

View File

@@ -0,0 +1,75 @@
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;
use crate::compute::plan::Plan;
use crate::compute::types::sinks::ComputeSinkDesc;
use crate::compute::types::sources::SourceInstanceDesc;
use crate::expr::{GlobalId, ScalarExpr};
use crate::repr::{self, RelationType};
/// A description of a dataflow to construct and results to surface.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct DataflowDescription<P, S: 'static = (), T = repr::Timestamp> {
/// Sources instantiations made available to the dataflow pair with monotonicity information.
pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool)>,
/// Indexes made available to the dataflow.
/// (id of new index, description of index, relationtype of base source/view, monotonic)
pub index_imports: BTreeMap<GlobalId, (IndexDesc, RelationType, bool)>,
/// Views and indexes to be built and stored in the local context.
/// Objects must be built in the specific order, as there may be
/// dependencies of later objects on prior identifiers.
pub objects_to_build: Vec<BuildDesc<P>>,
/// Indexes to be made available to be shared with other dataflows
/// (id of new index, description of index, relationtype of base source/view)
pub index_exports: BTreeMap<GlobalId, (IndexDesc, RelationType)>,
/// sinks to be created
/// (id of new sink, description of sink)
pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
/// An optional frontier to which inputs should be advanced.
///
/// If this is set, it should override the default setting determined by
/// the upper bound of `since` frontiers contributing to the dataflow.
/// It is an error for this to be set to a frontier not beyond that default.
pub as_of: Option<Antichain<T>>,
/// Frontier beyond which the dataflow should not execute.
/// Specifically, updates at times greater or equal to this frontier are suppressed.
/// This is often set to `as_of + 1` to enable "batch" computations.
pub until: Antichain<T>,
/// Human readable name
pub debug_name: String,
}
impl<P, T> DataflowDescription<P, (), T> {
/// Creates a new dataflow description with a human-readable name.
pub fn new(name: String) -> Self {
Self {
source_imports: Default::default(),
index_imports: Default::default(),
objects_to_build: Vec::new(),
index_exports: Default::default(),
sink_exports: Default::default(),
as_of: Default::default(),
until: Antichain::new(),
debug_name: name,
}
}
}
/// An association of a global identifier to an expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct BuildDesc<P = Plan> {
pub id: GlobalId,
pub plan: P,
}
/// An index storing processed updates so they can be queried
/// or reused in other computations
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct IndexDesc {
/// Identity of the collection the index is on.
pub on_id: GlobalId,
/// Expressions to be arranged, in order of decreasing primacy.
pub key: Vec<ScalarExpr>,
}

View File

@@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
use crate::expr::GlobalId;
mod dataflow;
mod sinks;
mod sources;
pub(crate) use dataflow::{BuildDesc, DataflowDescription, IndexDesc};

View File

@@ -0,0 +1,28 @@
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;
use crate::expr::GlobalId;
use crate::repr::{self, RelationDesc};
/// A sink for updates to a relational collection.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct ComputeSinkDesc<S: 'static = (), T = repr::Timestamp> {
pub from: GlobalId,
pub from_desc: RelationDesc,
pub connection: ComputeSinkConnection<S>,
pub with_snapshot: bool,
pub up_to: Antichain<T>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum ComputeSinkConnection<S: 'static = ()> {
// TODO(discord9): consider if ever needed
Subscribe,
Persist(PersistSinkConnection<S>),
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct PersistSinkConnection<S> {
pub value_desc: RelationDesc,
pub storage_metadata: S,
}

View File

@@ -0,0 +1,26 @@
use serde::{Deserialize, Serialize};
use crate::expr::MapFilterProject;
use crate::repr::RelationType;
/// A description of an instantiation of a source.
///
/// This includes a description of the source, but additionally any
/// context-dependent options like the ability to apply filtering and
/// projection to the records as they emerge.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct SourceInstanceDesc<M> {
/// Arguments for this instantiation of the source.
pub arguments: SourceInstanceArguments,
/// Additional metadata used by the storage client of a compute instance to read it.
pub storage_metadata: M,
/// The relation type of this source
pub typ: RelationType,
}
/// Per-source construction arguments.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct SourceInstanceArguments {
/// Linear operators to be applied record-by-record.
pub operators: Option<MapFilterProject>,
}

224
src/flow/src/expr/func.rs Normal file
View File

@@ -0,0 +1,224 @@
use datatypes::value::Value;
use serde::{Deserialize, Serialize};
use super::ScalarExpr;
// TODO(discord9): more function & eval
use crate::{repr::Row, storage::errors::EvalError};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum UnaryFunc {
Not,
IsNull,
IsTrue,
IsFalse,
CastDatetimeToInt64,
CastInt64ToFloat32,
}
impl UnaryFunc {
pub fn eval(&self, values: &[Value], expr: &ScalarExpr) -> Result<Value, EvalError> {
let arg = expr.eval(values)?;
match self {
Self::CastDatetimeToInt64 => {
let datetime = if let Value::DateTime(datetime) = arg {
Ok(datetime.val())
} else {
Err(EvalError::TypeMismatch(format!(
"cannot cast {:?} to datetime",
arg
)))
}?;
Ok(Value::from(datetime))
}
Self::CastInt64ToFloat32 => {
let int64 = if let Value::Int64(int64) = arg {
Ok(int64)
} else {
Err(EvalError::TypeMismatch(format!(
"cannot cast {:?} to int64",
arg
)))
}?;
Ok(Value::from(int64 as f32))
}
_ => todo!(),
}
}
}
/// TODO: support more binary functions for more types
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum BinaryFunc {
Eq,
NotEq,
Lt,
Lte,
Gt,
Gte,
AddInt16,
AddInt32,
AddInt64,
AddUInt16,
AddUInt32,
AddUInt64,
AddFloat32,
AddFloat64,
SubInt16,
SubInt32,
SubInt64,
SubUInt16,
SubUInt32,
SubUInt64,
SubFloat32,
SubFloat64,
MulInt16,
MulInt32,
MulInt64,
MulUInt16,
MulUInt32,
MulUInt64,
MulFloat32,
MulFloat64,
DivInt16,
DivInt32,
DivInt64,
DivUInt16,
DivUInt32,
DivUInt64,
DivFloat32,
DivFloat64,
ModInt16,
ModInt32,
ModInt64,
ModUInt16,
ModUInt32,
ModUInt64,
}
impl BinaryFunc {
pub fn eval(
&self,
values: &[Value],
expr1: &ScalarExpr,
expr2: &ScalarExpr,
) -> Result<Value, EvalError> {
let left = expr1.eval(values)?;
let right = expr2.eval(values)?;
match self {
Self::Eq => Ok(Value::from(left == right)),
Self::NotEq => Ok(Value::from(left != right)),
Self::Lt => Ok(Value::from(left < right)),
Self::Lte => Ok(Value::from(left <= right)),
Self::Gt => Ok(Value::from(left > right)),
Self::Gte => Ok(Value::from(left >= right)),
Self::AddInt16 => Ok(add::<i16>(left, right)?),
Self::AddInt32 => Ok(add::<i32>(left, right)?),
Self::AddInt64 => Ok(add::<i64>(left, right)?),
Self::AddUInt16 => Ok(add::<u16>(left, right)?),
Self::AddUInt32 => Ok(add::<u32>(left, right)?),
Self::AddUInt64 => Ok(add::<u64>(left, right)?),
Self::AddFloat32 => Ok(add::<f32>(left, right)?),
Self::AddFloat64 => Ok(add::<f64>(left, right)?),
Self::SubInt16 => Ok(sub::<i16>(left, right)?),
Self::SubInt32 => Ok(sub::<i32>(left, right)?),
Self::SubInt64 => Ok(sub::<i64>(left, right)?),
Self::SubUInt16 => Ok(sub::<u16>(left, right)?),
Self::SubUInt32 => Ok(sub::<u32>(left, right)?),
Self::SubUInt64 => Ok(sub::<u64>(left, right)?),
Self::SubFloat32 => Ok(sub::<f32>(left, right)?),
Self::SubFloat64 => Ok(sub::<f64>(left, right)?),
Self::MulInt16 => Ok(mul::<i16>(left, right)?),
Self::MulInt32 => Ok(mul::<i32>(left, right)?),
Self::MulInt64 => Ok(mul::<i64>(left, right)?),
Self::MulUInt16 => Ok(mul::<u16>(left, right)?),
Self::MulUInt32 => Ok(mul::<u32>(left, right)?),
Self::MulUInt64 => Ok(mul::<u64>(left, right)?),
Self::MulFloat32 => Ok(mul::<f32>(left, right)?),
Self::MulFloat64 => Ok(mul::<f64>(left, right)?),
Self::DivInt16 => Ok(div::<i16>(left, right)?),
Self::DivInt32 => Ok(div::<i32>(left, right)?),
Self::DivInt64 => Ok(div::<i64>(left, right)?),
Self::DivUInt16 => Ok(div::<u16>(left, right)?),
Self::DivUInt32 => Ok(div::<u32>(left, right)?),
Self::DivUInt64 => Ok(div::<u64>(left, right)?),
Self::DivFloat32 => Ok(div::<f32>(left, right)?),
Self::DivFloat64 => Ok(div::<f64>(left, right)?),
Self::ModInt16 => Ok(rem::<i16>(left, right)?),
Self::ModInt32 => Ok(rem::<i32>(left, right)?),
Self::ModInt64 => Ok(rem::<i64>(left, right)?),
Self::ModUInt16 => Ok(rem::<u16>(left, right)?),
Self::ModUInt32 => Ok(rem::<u32>(left, right)?),
Self::ModUInt64 => Ok(rem::<u64>(left, right)?),
_ => todo!(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum VariadicFunc {}
impl VariadicFunc {
pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
todo!()
}
}
fn add<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Add<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left + right))
}
fn sub<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Sub<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left - right))
}
fn mul<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Mul<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left * right))
}
fn div<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Div<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left / right))
}
fn rem<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Rem<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left % right))
}

24
src/flow/src/expr/id.rs Normal file
View File

@@ -0,0 +1,24 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum GlobalId {
/// System namespace.
System(u64),
/// User namespace.
User(u64),
/// Transient namespace.
Transient(u64),
/// Dummy id for query being explained
Explain,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct LocalId(pub(crate) u64);
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum Id {
/// An identifier that refers to a local component of a dataflow.
Local(LocalId),
/// An identifier that refers to a global dataflow.
Global(GlobalId),
}

381
src/flow/src/expr/linear.rs Normal file
View File

@@ -0,0 +1,381 @@
use std::collections::{BTreeMap, BTreeSet};
use datatypes::value::Value;
use serde::{Deserialize, Serialize};
use crate::expr::{Id, LocalId, ScalarExpr};
use crate::repr::{self, Diff, Row};
use crate::storage::errors::EvalError;
/// A compound operator that can be applied row-by-row.
///
/// This operator integrates the map, filter, and project operators.
/// It applies a sequences of map expressions, which are allowed to
/// refer to previous expressions, interleaved with predicates which
/// must be satisfied for an output to be produced. If all predicates
/// evaluate to `Datum::True` the data at the identified columns are
/// collected and produced as output in a packed `Row`.
///
/// This operator is a "builder" and its contents may contain expressions
/// that are not yet executable. For example, it may contain temporal
/// expressions in `self.expressions`, even though this is not something
/// we can directly evaluate. The plan creation methods will defensively
/// ensure that the right thing happens.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct MapFilterProject {
/// A sequence of expressions that should be appended to the row.
///
/// Many of these expressions may not be produced in the output,
/// and may only be present as common subexpressions.
pub expressions: Vec<ScalarExpr>,
/// Expressions that must evaluate to `Datum::True` for the output
/// row to be produced.
///
/// Each entry is prepended with a column identifier indicating
/// the column *before* which the predicate should first be applied.
/// Most commonly this would be one plus the largest column identifier
/// in the predicate's support, but it could be larger to implement
/// guarded evaluation of predicates.
///
/// This list should be sorted by the first field.
pub predicates: Vec<(usize, ScalarExpr)>,
/// A sequence of column identifiers whose data form the output row.
pub projection: Vec<usize>,
/// The expected number of input columns.
///
/// This is needed to ensure correct identification of newly formed
/// columns in the output.
pub input_arity: usize,
}
impl MapFilterProject {
/// Create a no-op operator for an input of a supplied arity.
pub fn new(input_arity: usize) -> Self {
Self {
expressions: Vec::new(),
predicates: Vec::new(),
projection: (0..input_arity).collect(),
input_arity,
}
}
/// Given two mfps, return an mfp that applies one
/// followed by the other.
/// Note that the arguments are in the opposite order
/// from how function composition is usually written in mathematics.
pub fn compose(before: Self, after: Self) -> Self {
let (m, f, p) = after.into_map_filter_project();
before.map(m).filter(f).project(p)
}
/// True if the operator describes the identity transformation.
pub fn is_identity(&self) -> bool {
self.expressions.is_empty()
&& self.predicates.is_empty()
&& self.projection.len() == self.input_arity
&& self.projection.iter().enumerate().all(|(i, p)| i == *p)
}
/// Retain only the indicated columns in the presented order.
pub fn project<I>(mut self, columns: I) -> Self
where
I: IntoIterator<Item = usize> + std::fmt::Debug,
{
self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
self
}
/// Retain only rows satisfying these predicates.
///
/// This method introduces predicates as eagerly as they can be evaluated,
/// which may not be desired for predicates that may cause exceptions.
/// If fine manipulation is required, the predicates can be added manually.
pub fn filter<I>(mut self, predicates: I) -> Self
where
I: IntoIterator<Item = ScalarExpr>,
{
for mut predicate in predicates {
// Correct column references.
predicate.permute(&self.projection[..]);
// Validate column references.
assert!(predicate
.support()
.into_iter()
.all(|c| c < self.input_arity + self.expressions.len()));
// Insert predicate as eagerly as it can be evaluated:
// just after the largest column in its support is formed.
let max_support = predicate
.support()
.into_iter()
.max()
.map(|c| c + 1)
.unwrap_or(0);
self.predicates.push((max_support, predicate))
}
// Stable sort predicates by position at which they take effect.
// We put literal errors at the end as a stop-gap to avoid erroring
// before we are able to evaluate any predicates that might prevent it.
self.predicates
.sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
self
}
/// Append the result of evaluating expressions to each row.
pub fn map<I>(mut self, expressions: I) -> Self
where
I: IntoIterator<Item = ScalarExpr>,
{
for mut expression in expressions {
// Correct column references.
expression.permute(&self.projection[..]);
// Validate column references.
assert!(expression
.support()
.into_iter()
.all(|c| c < self.input_arity + self.expressions.len()));
// Introduce expression and produce as output.
self.expressions.push(expression);
self.projection
.push(self.input_arity + self.expressions.len() - 1);
}
self
}
/// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
pub fn into_map_filter_project(self) -> (Vec<ScalarExpr>, Vec<ScalarExpr>, Vec<usize>) {
let predicates = self
.predicates
.into_iter()
.map(|(_pos, predicate)| predicate)
.collect();
(self.expressions, predicates, self.projection)
}
/// As the arguments to `Map`, `Filter`, and `Project` operators.
///
/// In principle, this operator can be implemented as a sequence of
/// more elemental operators, likely less efficiently.
pub fn as_map_filter_project(&self) -> (Vec<ScalarExpr>, Vec<ScalarExpr>, Vec<usize>) {
self.clone().into_map_filter_project()
}
}
impl MapFilterProject {
pub fn optimize(&mut self) {
// TODO(discord9): optimize later
}
/// Convert the `MapFilterProject` into a staged evaluation plan.
///
/// The main behavior is extract temporal predicates, which cannot be evaluated
/// using the standard machinery.
pub fn into_plan(self) -> Result<MfpPlan, String> {
MfpPlan::create_from(self)
}
/// Lists input columns whose values are used in outputs.
///
/// It is entirely appropriate to determine the demand of an instance
/// and then both apply a projection to the subject of the instance and
/// `self.permute` this instance.
pub fn demand(&self) -> BTreeSet<usize> {
let mut demanded = BTreeSet::new();
for (_index, pred) in self.predicates.iter() {
demanded.extend(pred.support());
}
demanded.extend(self.projection.iter().cloned());
for index in (0..self.expressions.len()).rev() {
if demanded.contains(&(self.input_arity + index)) {
demanded.extend(self.expressions[index].support());
}
}
demanded.retain(|col| col < &self.input_arity);
demanded
}
/// Update input column references, due to an input projection or permutation.
///
/// The `shuffle` argument remaps expected column identifiers to new locations,
/// with the expectation that `shuffle` describes all input columns, and so the
/// intermediate results will be able to start at position `shuffle.len()`.
///
/// The supplied `shuffle` may not list columns that are not "demanded" by the
/// instance, and so we should ensure that `self` is optimized to not reference
/// columns that are not demanded.
pub fn permute(&mut self, mut shuffle: BTreeMap<usize, usize>, new_input_arity: usize) {
let (mut map, mut filter, mut project) = self.as_map_filter_project();
for index in 0..map.len() {
// Intermediate columns are just shifted.
shuffle.insert(self.input_arity + index, new_input_arity + index);
}
for expr in map.iter_mut() {
expr.permute_map(&shuffle);
}
for pred in filter.iter_mut() {
pred.permute_map(&shuffle);
}
for proj in project.iter_mut() {
assert!(shuffle[proj] < new_input_arity + map.len());
*proj = shuffle[proj];
}
*self = Self::new(new_input_arity)
.map(map)
.filter(filter)
.project(project)
}
}
/// A wrapper type which indicates it is safe to simply evaluate all expressions.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct SafeMfpPlan {
pub(crate) mfp: MapFilterProject,
}
impl SafeMfpPlan {
pub fn permute(&mut self, map: BTreeMap<usize, usize>, new_arity: usize) {
self.mfp.permute(map, new_arity);
}
/// Evaluates the linear operator on a supplied list of datums.
///
/// The arguments are the initial datums associated with the row,
/// and an appropriately lifetimed arena for temporary allocations
/// needed by scalar evaluation.
///
/// An `Ok` result will either be `None` if any predicate did not
/// evaluate to `Value::Boolean(true)`, or the values of the columns listed
/// by `self.projection` if all predicates passed. If an error
/// occurs in the evaluation it is returned as an `Err` variant.
/// As the evaluation exits early with failed predicates, it may
/// miss some errors that would occur later in evaluation.
///
/// The `row` is not cleared first, but emptied if the function
/// returns `Ok(Some(row)).
#[inline(always)]
pub fn evaluate_into(
&self,
values: &mut Vec<Value>,
row_buf: &mut Row,
) -> Result<Option<Row>, EvalError> {
let passed_predicates = self.evaluate_inner(values)?;
if !passed_predicates {
Ok(None)
} else {
row_buf.clear();
row_buf.extend(self.mfp.projection.iter().map(|c| values[*c].clone()));
Ok(Some(row_buf.clone()))
}
}
/// A version of `evaluate` which produces an iterator over `Datum`
/// as output.
///
/// This version can be useful when one wants to capture the resulting
/// datums without packing and then unpacking a row.
#[inline(always)]
pub fn evaluate_iter<'a>(
&'a self,
datums: &'a mut Vec<Value>,
) -> Result<Option<impl Iterator<Item = Value> + 'a>, EvalError> {
let passed_predicates = self.evaluate_inner(datums)?;
if !passed_predicates {
Ok(None)
} else {
Ok(Some(
self.mfp.projection.iter().map(move |i| datums[*i].clone()),
))
}
}
/// Populates `datums` with `self.expressions` and tests `self.predicates`.
///
/// This does not apply `self.projection`, which is up to the calling method.
pub fn evaluate_inner(&self, values: &mut Vec<Value>) -> Result<bool, EvalError> {
let mut expression = 0;
for (support, predicate) in self.mfp.predicates.iter() {
while self.mfp.input_arity + expression < *support {
values.push(self.mfp.expressions[expression].eval(&values[..])?);
expression += 1;
}
if predicate.eval(&values[..])? != Value::Boolean(true) {
return Ok(false);
}
}
while expression < self.mfp.expressions.len() {
values.push(self.mfp.expressions[expression].eval(&values[..])?);
expression += 1;
}
Ok(true)
}
}
impl std::ops::Deref for SafeMfpPlan {
type Target = MapFilterProject;
fn deref(&self) -> &Self::Target {
&self.mfp
}
}
/// Predicates partitioned into temporal and non-temporal.
///
/// Temporal predicates require some recognition to determine their
/// structure, and it is best to do that once and re-use the results.
///
/// There are restrictions on the temporal predicates we currently support.
/// They must directly constrain `MzNow` from below or above,
/// by expressions that do not themselves contain `MzNow`.
/// Conjunctions of such constraints are also ok.
#[derive(Clone, Debug, PartialEq)]
pub struct MfpPlan {
/// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
pub(crate) mfp: SafeMfpPlan,
/// TODO(discord9): impl temporal filter later
/// Expressions that when evaluated lower-bound `MzNow`.
pub(crate) lower_bounds: Vec<ScalarExpr>,
/// Expressions that when evaluated upper-bound `MzNow`.
pub(crate) upper_bounds: Vec<ScalarExpr>,
}
impl MfpPlan {
pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
Ok(Self {
mfp: SafeMfpPlan { mfp },
lower_bounds: Vec::new(),
upper_bounds: Vec::new(),
})
}
pub fn evaluate<E: From<EvalError>, V: Fn(&repr::Timestamp) -> bool>(
&self,
values: &mut Vec<Value>,
time: repr::Timestamp,
diff: Diff,
valid_time: V,
) -> impl Iterator<Item = Result<(Row, repr::Timestamp, Diff), (E, repr::Timestamp, Diff)>>
{
match self.mfp.evaluate_inner(values) {
Err(e) => {
return Some(Err((e.into(), time, diff)))
.into_iter()
.chain(None.into_iter());
}
Ok(true) => {}
Ok(false) => {
return None.into_iter().chain(None.into_iter());
}
}
// TODO(discord9): Temporal filter
let ret = Row::pack(self.mfp.mfp.projection.iter().map(|c| values[*c].clone()));
Some(Ok((ret, time, diff)))
.into_iter()
.chain(None.into_iter())
}
/// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
pub fn is_identity(&self) -> bool {
self.mfp.mfp.is_identity() && self.lower_bounds.is_empty() && self.upper_bounds.is_empty()
}
}

207
src/flow/src/expr/mod.rs Normal file
View File

@@ -0,0 +1,207 @@
//! for declare dataflow description that is the last step before build dataflow
mod func;
mod id;
mod linear;
mod relation;
use std::collections::{BTreeMap, BTreeSet};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
pub use id::{GlobalId, Id, LocalId};
pub use linear::{MapFilterProject, SafeMfpPlan};
pub(crate) use relation::{AggregateExpr, AggregateFunc, TableFunc};
use serde::{Deserialize, Serialize};
pub(crate) use crate::expr::func::{BinaryFunc, UnaryFunc, VariadicFunc};
use crate::storage::errors::EvalError;
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ScalarExpr {
/// A column of the input row
Column(usize),
/// A literal value.
Literal(Result<Value, EvalError>, ConcreteDataType),
CallUnary {
func: UnaryFunc,
expr: Box<ScalarExpr>,
},
CallBinary {
func: BinaryFunc,
expr1: Box<ScalarExpr>,
expr2: Box<ScalarExpr>,
},
CallVariadic {
func: VariadicFunc,
exprs: Vec<ScalarExpr>,
},
/// Conditionally evaluated expressions.
///
/// It is important that `then` and `els` only be evaluated if
/// `cond` is true or not, respectively. This is the only way
/// users can guard execution (other logical operator do not
/// short-circuit) and we need to preserve that.
If {
cond: Box<ScalarExpr>,
then: Box<ScalarExpr>,
els: Box<ScalarExpr>,
},
}
impl ScalarExpr {
pub fn eval(&self, values: &[Value]) -> Result<Value, EvalError> {
match self {
ScalarExpr::Column(index) => Ok(values[*index].clone()),
ScalarExpr::Literal(row_res, _ty) => row_res.clone(),
ScalarExpr::CallUnary { func, expr } => func.eval(values, expr),
ScalarExpr::CallBinary { func, expr1, expr2 } => func.eval(values, expr1, expr2),
ScalarExpr::CallVariadic { func, exprs } => func.eval(values, exprs),
ScalarExpr::If { cond, then, els } => match cond.eval(values) {
Ok(Value::Boolean(true)) => then.eval(values),
Ok(Value::Boolean(false)) => els.eval(values),
_ => Err(EvalError::InvalidArgument(
"if condition must be boolean".to_string(),
)),
},
}
}
/// Rewrites column indices with their value in `permutation`.
///
/// This method is applicable even when `permutation` is not a
/// strict permutation, and it only needs to have entries for
/// each column referenced in `self`.
pub fn permute(&mut self, permutation: &[usize]) {
#[allow(deprecated)]
self.visit_mut_post_nolimit(&mut |e| {
if let ScalarExpr::Column(old_i) = e {
*old_i = permutation[*old_i];
}
});
}
/// Rewrites column indices with their value in `permutation`.
///
/// This method is applicable even when `permutation` is not a
/// strict permutation, and it only needs to have entries for
/// each column referenced in `self`.
pub fn permute_map(&mut self, permutation: &BTreeMap<usize, usize>) {
#[allow(deprecated)]
self.visit_mut_post_nolimit(&mut |e| {
if let ScalarExpr::Column(old_i) = e {
*old_i = permutation[old_i];
}
});
}
pub fn support(&self) -> BTreeSet<usize> {
let mut support = BTreeSet::new();
#[allow(deprecated)]
self.visit_post_nolimit(&mut |e| {
if let ScalarExpr::Column(i) = e {
support.insert(*i);
}
});
support
}
pub fn as_literal(&self) -> Option<Result<Value, &EvalError>> {
if let ScalarExpr::Literal(lit, _column_type) = self {
Some(lit.as_ref().map(|row| row.clone()))
} else {
None
}
}
pub fn is_literal(&self) -> bool {
matches!(self, ScalarExpr::Literal(_, _))
}
pub fn is_literal_true(&self) -> bool {
Some(Ok(Value::Boolean(true))) == self.as_literal()
}
pub fn is_literal_false(&self) -> bool {
Some(Ok(Value::Boolean(false))) == self.as_literal()
}
pub fn is_literal_null(&self) -> bool {
Some(Ok(Value::Null)) == self.as_literal()
}
pub fn is_literal_ok(&self) -> bool {
matches!(self, ScalarExpr::Literal(Ok(_), _typ))
}
pub fn is_literal_err(&self) -> bool {
matches!(self, ScalarExpr::Literal(Err(_), _typ))
}
}
impl ScalarExpr {
/// visit post-order without stack call limit, but may cause stack overflow
fn visit_post_nolimit<F>(&self, f: &mut F)
where
F: FnMut(&Self),
{
self.visit_children(|e| e.visit_post_nolimit(f));
f(self);
}
fn visit_children<F>(&self, mut f: F)
where
F: FnMut(&Self),
{
match self {
ScalarExpr::Column(_) | ScalarExpr::Literal(_, _) => (),
ScalarExpr::CallUnary { func, expr } => f(expr),
ScalarExpr::CallBinary { func, expr1, expr2 } => {
f(expr1);
f(expr2);
}
ScalarExpr::CallVariadic { func, exprs } => {
for expr in exprs {
f(expr);
}
}
ScalarExpr::If { cond, then, els } => {
f(cond);
f(then);
f(els);
}
}
}
fn visit_mut_post_nolimit<F>(&mut self, f: &mut F)
where
F: FnMut(&mut Self),
{
self.visit_mut_children(|e: &mut Self| e.visit_mut_post_nolimit(f));
f(self);
}
fn visit_mut_children<F>(&mut self, mut f: F)
where
F: FnMut(&mut Self),
{
match self {
ScalarExpr::Column(_) | ScalarExpr::Literal(_, _) => (),
ScalarExpr::CallUnary { func, expr } => f(expr),
ScalarExpr::CallBinary { func, expr1, expr2 } => {
f(expr1);
f(expr2);
}
ScalarExpr::CallVariadic { func, exprs } => {
for expr in exprs {
f(expr);
}
}
ScalarExpr::If { cond, then, els } => {
f(cond);
f(then);
f(els);
}
}
}
}

View File

@@ -0,0 +1,206 @@
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{OrderedF32, OrderedF64, Value};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub enum AggregateFunc {
MaxInt16,
MaxInt32,
MaxInt64,
MaxUInt16,
MaxUInt32,
MaxUInt64,
MaxFloat32,
MaxFloat64,
MaxBool,
MaxString,
MaxDate,
MaxTimestamp,
MaxTimestampTz,
MinInt16,
MinInt32,
MinInt64,
MinUInt16,
MinUInt32,
MinUInt64,
MinFloat32,
MinFloat64,
MinBool,
MinString,
MinDate,
MinTimestamp,
MinTimestampTz,
SumInt16,
SumInt32,
SumInt64,
SumUInt16,
SumUInt32,
SumUInt64,
SumFloat32,
SumFloat64,
Count,
Any,
All,
}
impl AggregateFunc {
pub fn eval<I>(&self, values: I) -> Value
where
I: IntoIterator<Item = Value>,
{
// TODO: impl more functions like min/max/sumTimestamp etc.
match self {
AggregateFunc::MaxInt16 => max_value::<I, i16>(values),
AggregateFunc::MaxInt32 => max_value::<I, i32>(values),
AggregateFunc::MaxInt64 => max_value::<I, i64>(values),
AggregateFunc::MaxUInt16 => max_value::<I, u16>(values),
AggregateFunc::MaxUInt32 => max_value::<I, u32>(values),
AggregateFunc::MaxUInt64 => max_value::<I, u64>(values),
AggregateFunc::MaxFloat32 => max_value::<I, OrderedF32>(values),
AggregateFunc::MaxFloat64 => max_value::<I, OrderedF64>(values),
AggregateFunc::MaxBool => max_value::<I, bool>(values),
AggregateFunc::MaxString => max_string(values),
AggregateFunc::MinInt16 => min_value::<I, i16>(values),
AggregateFunc::MinInt32 => min_value::<I, i32>(values),
AggregateFunc::MinInt64 => min_value::<I, i64>(values),
AggregateFunc::MinUInt16 => min_value::<I, u16>(values),
AggregateFunc::MinUInt32 => min_value::<I, u32>(values),
AggregateFunc::MinUInt64 => min_value::<I, u16>(values),
AggregateFunc::MinFloat32 => min_value::<I, OrderedF32>(values),
AggregateFunc::MinFloat64 => min_value::<I, OrderedF64>(values),
AggregateFunc::MinBool => min_value::<I, bool>(values),
AggregateFunc::MinString => min_string(values),
AggregateFunc::SumInt16 => sum_value::<I, i16, i64>(values),
AggregateFunc::SumInt32 => sum_value::<I, i32, i64>(values),
AggregateFunc::SumInt64 => sum_value::<I, i64, i64>(values),
AggregateFunc::SumUInt16 => sum_value::<I, u16, u64>(values),
AggregateFunc::SumUInt32 => sum_value::<I, u32, u64>(values),
AggregateFunc::SumUInt64 => sum_value::<I, u64, u64>(values),
AggregateFunc::SumFloat32 => sum_value::<I, f32, f32>(values),
AggregateFunc::SumFloat64 => sum_value::<I, f64, f64>(values),
AggregateFunc::Count => count(values),
AggregateFunc::All => all(values),
AggregateFunc::Any => any(values),
_ => todo!(),
}
}
}
fn max_string<I>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
{
match values.into_iter().filter(|d| !d.is_null()).max_by(|a, b| {
let a = a.as_value_ref();
let a = a.as_string().expect("unexpected type").unwrap();
let b = b.as_value_ref();
let b = b.as_string().expect("unexpected type").unwrap();
a.cmp(b)
}) {
Some(v) => v,
None => Value::Null,
}
}
fn max_value<I, TypedValue>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
TypedValue: TryFrom<Value> + Ord,
<TypedValue as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<Option<TypedValue>>,
{
let x: Option<TypedValue> = values
.into_iter()
.filter(|v| !v.is_null())
.map(|v| TypedValue::try_from(v).expect("unexpected type"))
.max();
x.into()
}
fn min_string<I>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
{
match values.into_iter().filter(|d| !d.is_null()).min_by(|a, b| {
let a = a.as_value_ref();
let a = a.as_string().expect("unexpected type").unwrap();
let b = b.as_value_ref();
let b = b.as_string().expect("unexpected type").unwrap();
a.cmp(b)
}) {
Some(v) => v,
None => Value::Null,
}
}
fn min_value<I, TypedValue>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
TypedValue: TryFrom<Value> + Ord,
<TypedValue as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<Option<TypedValue>>,
{
let x: Option<TypedValue> = values
.into_iter()
.filter(|v| !v.is_null())
.map(|v| TypedValue::try_from(v).expect("unexpected type"))
.min();
x.into()
}
fn sum_value<I, ValueType, ResultType>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
ValueType: TryFrom<Value>,
<ValueType as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<Option<ValueType>>,
ResultType: From<ValueType> + std::iter::Sum + Into<Value>,
{
// If no row qualifies, then the result of COUNT is 0 (zero), and the result of any other aggregate function is the null value.
let mut values = values.into_iter().filter(|v| !v.is_null()).peekable();
if values.peek().is_none() {
Value::Null
} else {
let x = values
.map(|v| ResultType::from(ValueType::try_from(v).expect("unexpected type")))
.sum::<ResultType>();
x.into()
}
}
fn count<I>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
{
let x = values.into_iter().filter(|v| !v.is_null()).count() as i64;
Value::from(x)
}
fn any<I>(datums: I) -> Value
where
I: IntoIterator<Item = Value>,
{
datums
.into_iter()
.fold(Value::Boolean(false), |state, next| match (state, next) {
(Value::Boolean(true), _) | (_, Value::Boolean(true)) => Value::Boolean(true),
(Value::Null, _) | (_, Value::Null) => Value::Null,
_ => Value::Boolean(false),
})
}
fn all<I>(datums: I) -> Value
where
I: IntoIterator<Item = Value>,
{
datums
.into_iter()
.fold(Value::Boolean(true), |state, next| match (state, next) {
(Value::Boolean(false), _) | (_, Value::Boolean(false)) => Value::Boolean(false),
(Value::Null, _) | (_, Value::Null) => Value::Null,
_ => Value::Boolean(true),
})
}

View File

@@ -0,0 +1,22 @@
pub(crate) use func::AggregateFunc;
use serde::{Deserialize, Serialize};
use crate::expr::ScalarExpr;
mod func;
/// function that might emit multiple output record for one input row
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub enum TableFunc {}
/// Describes an aggregation expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct AggregateExpr {
/// Names the aggregation function.
pub func: AggregateFunc,
/// An expression which extracts from each row the input to `func`.
pub expr: ScalarExpr,
/// Should the aggregation be applied only to distinct results in each group.
#[serde(default)]
pub distinct: bool,
}

9
src/flow/src/lib.rs Normal file
View File

@@ -0,0 +1,9 @@
#![allow(unused)]
#![allow(clippy::mutable_key_type)]
mod adapter;
mod compute;
mod expr;
mod repr;
mod storage;
mod util;

62
src/flow/src/repr/mod.rs Normal file
View File

@@ -0,0 +1,62 @@
//! basically a wrapper around the `datatype` crate
//! for basic Data Representation
use std::borrow::Borrow;
use std::slice::SliceIndex;
use datatypes::value::Value;
pub(crate) use relation::{RelationDesc, RelationType};
use serde::{Deserialize, Serialize};
/// System-wide Record count difference type.
pub type Diff = i64;
mod relation;
mod timestamp;
/// A row is a vector of values.
///
/// TODO(discord9): use a more efficient representation
///i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\]
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
pub struct Row {
inner: Vec<Value>,
}
impl Row {
pub fn get(&self, idx: usize) -> Option<&Value> {
self.inner.get(idx)
}
pub fn clear(&mut self) {
self.inner.clear();
}
pub fn packer(&mut self) -> &mut Vec<Value> {
self.inner.clear();
&mut self.inner
}
pub fn pack<I>(iter: I) -> Row
where
I: IntoIterator<Item = Value>,
{
Self {
inner: iter.into_iter().collect(),
}
}
pub fn unpack(&self) -> Vec<Value> {
self.inner.clone()
}
pub fn extend<I>(&mut self, iter: I)
where
I: IntoIterator<Item = Value>,
{
self.inner.extend(iter);
}
pub fn into_iter(self) -> impl Iterator<Item = Value> {
self.inner.into_iter()
}
pub fn iter(&self) -> impl Iterator<Item = &Value> {
self.inner.iter()
}
}
/// System-wide default timestamp type
pub type Timestamp = u64;

View File

@@ -0,0 +1,342 @@
use datatypes::prelude::ConcreteDataType;
use serde::{Deserialize, Serialize};
/// The type of a relation.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct RelationType {
/// The type for each column, in order.
pub column_types: Vec<ColumnType>,
/// Sets of indices that are "keys" for the collection.
///
/// Each element in this list is a set of column indices, each with the
/// property that the collection contains at most one record with each
/// distinct set of values for each column. Alternately, for a specific set
/// of values assigned to the these columns there is at most one record.
///
/// A collection can contain multiple sets of keys, although it is common to
/// have either zero or one sets of key indices.
#[serde(default)]
pub keys: Vec<Vec<usize>>,
}
impl RelationType {
/// Constructs a `RelationType` representing the relation with no columns and
/// no keys.
pub fn empty() -> Self {
RelationType::new(vec![])
}
/// Constructs a new `RelationType` from specified column types.
///
/// The `RelationType` will have no keys.
pub fn new(column_types: Vec<ColumnType>) -> Self {
RelationType {
column_types,
keys: Vec::new(),
}
}
/// Adds a new key for the relation.
pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
indices.sort_unstable();
if !self.keys.contains(&indices) {
self.keys.push(indices);
}
self
}
pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
for key in keys {
self = self.with_key(key)
}
self
}
/// Computes the number of columns in the relation.
pub fn arity(&self) -> usize {
self.column_types.len()
}
/// Gets the index of the columns used when creating a default index.
pub fn default_key(&self) -> Vec<usize> {
if let Some(key) = self.keys.first() {
if key.is_empty() {
(0..self.column_types.len()).collect()
} else {
key.clone()
}
} else {
(0..self.column_types.len()).collect()
}
}
/// True if any collection described by `self` could safely be described by `other`.
///
/// In practice this means checking that the scalar types match exactly, and that the
/// nullability of `self` is at least as strict as `other`, and that all keys of `other`
/// contain some key of `self` (as a set of key columns is less strict than any subset).
pub fn subtypes(&self, other: &RelationType) -> bool {
let all_keys = other.keys.iter().all(|key1| {
self.keys
.iter()
.any(|key2| key1.iter().all(|k| key2.contains(k)))
});
if !all_keys {
return false;
}
if self.column_types.len() != other.column_types.len() {
return false;
}
for (col1, col2) in self.column_types.iter().zip(other.column_types.iter()) {
if col1.nullable && !col2.nullable {
return false;
}
if col1.scalar_type != col2.scalar_type {
return false;
}
}
true
}
}
/// The type of a `Value`
///
/// [`ColumnType`] bundles information about the scalar type of a datum (e.g.,
/// Int32 or String) with its nullability.
///
/// To construct a column type, either initialize the struct directly, or
/// use the [`ScalarType::nullable`] method.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct ColumnType {
/// The underlying scalar type (e.g., Int32 or String) of this column.
pub scalar_type: ConcreteDataType,
/// Whether this datum can be null.`
#[serde(default = "return_true")]
pub nullable: bool,
}
/// This method exists solely for the purpose of making ColumnType nullable by
/// default in unit tests. The default value of a bool is false, and the only
/// way to make an object take on any other value by default is to pass it a
/// function that returns the desired default value. See
/// <https://github.com/serde-rs/serde/issues/1030>
#[inline(always)]
fn return_true() -> bool {
true
}
/// A description of the shape of a relation.
///
/// It bundles a [`RelationType`] with the name of each column in the relation.
/// Individual column names are optional.
///
/// # Examples
///
/// A `RelationDesc`s is typically constructed via its builder API:
///
/// ```
/// use mz_repr::{ColumnType, RelationDesc, ScalarType};
///
/// let desc = RelationDesc::empty()
/// .with_column("id", ScalarType::Int64.nullable(false))
/// .with_column("price", ScalarType::Float64.nullable(true));
/// ```
///
/// In more complicated cases, like when constructing a `RelationDesc` in
/// response to user input, it may be more convenient to construct a relation
/// type first, and imbue it with column names to form a `RelationDesc` later:
///
/// ```
/// use mz_repr::RelationDesc;
///
/// # fn plan_query(_: &str) -> mz_repr::RelationType { mz_repr::RelationType::new(vec![]) }
/// let relation_type = plan_query("SELECT * FROM table");
/// let names = (0..relation_type.arity()).map(|i| match i {
/// 0 => "first",
/// 1 => "second",
/// _ => "unknown",
/// });
/// let desc = RelationDesc::new(relation_type, names);
/// ```
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct RelationDesc {
typ: RelationType,
names: Vec<ColumnName>,
}
impl RelationDesc {
/// Constructs a new `RelationDesc` that represents the empty relation
/// with no columns and no keys.
pub fn empty() -> Self {
RelationDesc {
typ: RelationType::empty(),
names: vec![],
}
}
/// Constructs a new `RelationDesc` from a `RelationType` and an iterator
/// over column names.
///
/// # Panics
///
/// Panics if the arity of the `RelationType` is not equal to the number of
/// items in `names`.
pub fn new<I, N>(typ: RelationType, names: I) -> Self
where
I: IntoIterator<Item = N>,
N: Into<ColumnName>,
{
let names: Vec<_> = names.into_iter().map(|name| name.into()).collect();
assert_eq!(typ.column_types.len(), names.len());
RelationDesc { typ, names }
}
pub fn from_names_and_types<I, T, N>(iter: I) -> Self
where
I: IntoIterator<Item = (N, T)>,
T: Into<ColumnType>,
N: Into<ColumnName>,
{
let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
let types = types.into_iter().map(Into::into).collect();
let typ = RelationType::new(types);
Self::new(typ, names)
}
/// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
pub fn concat(mut self, other: Self) -> Self {
let self_len = self.typ.column_types.len();
self.names.extend(other.names);
self.typ.column_types.extend(other.typ.column_types);
for k in other.typ.keys {
let k = k.into_iter().map(|idx| idx + self_len).collect();
self = self.with_key(k);
}
self
}
/// Appends a column with the specified name and type.
pub fn with_column<N>(mut self, name: N, column_type: ColumnType) -> Self
where
N: Into<ColumnName>,
{
self.typ.column_types.push(column_type);
self.names.push(name.into());
self
}
/// Adds a new key for the relation.
pub fn with_key(mut self, indices: Vec<usize>) -> Self {
self.typ = self.typ.with_key(indices);
self
}
/// Drops all existing keys.
pub fn without_keys(mut self) -> Self {
self.typ.keys.clear();
self
}
/// Builds a new relation description with the column names replaced with
/// new names.
///
/// # Panics
///
/// Panics if the arity of the relation type does not match the number of
/// items in `names`.
pub fn with_names<I, N>(self, names: I) -> Self
where
I: IntoIterator<Item = N>,
N: Into<ColumnName>,
{
Self::new(self.typ, names)
}
/// Computes the number of columns in the relation.
pub fn arity(&self) -> usize {
self.typ.arity()
}
/// Returns the relation type underlying this relation description.
pub fn typ(&self) -> &RelationType {
&self.typ
}
/// Returns an iterator over the columns in this relation.
pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &ColumnType)> {
self.iter_names().zip(self.iter_types())
}
/// Returns an iterator over the types of the columns in this relation.
pub fn iter_types(&self) -> impl Iterator<Item = &ColumnType> {
self.typ.column_types.iter()
}
/// Returns an iterator over the names of the columns in this relation.
pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
self.names.iter()
}
/// Finds a column by name.
///
/// Returns the index and type of the column named `name`. If no column with
/// the specified name exists, returns `None`. If multiple columns have the
/// specified name, the leftmost column is returned.
pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
self.iter_names()
.position(|n| n == name)
.map(|i| (i, &self.typ.column_types[i]))
}
/// Gets the name of the `i`th column.
///
/// # Panics
///
/// Panics if `i` is not a valid column index.
pub fn get_name(&self, i: usize) -> &ColumnName {
&self.names[i]
}
/// Mutably gets the name of the `i`th column.
///
/// # Panics
///
/// Panics if `i` is not a valid column index.
pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
&mut self.names[i]
}
/// Gets the name of the `i`th column if that column name is unambiguous.
///
/// If at least one other column has the same name as the `i`th column,
/// returns `None`. If the `i`th column has no name, returns `None`.
///
/// # Panics
///
/// Panics if `i` is not a valid column index.
pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
let name = &self.names[i];
if self.iter_names().filter(|n| *n == name).count() == 1 {
Some(name)
} else {
None
}
}
}
/// The name of a column in a [`RelationDesc`].
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct ColumnName(pub(crate) String);
impl ColumnName {
/// Returns this column name as a `str`.
pub fn as_str(&self) -> &str {
&self.0
}
/// Returns a mutable reference to the string underlying this column name.
pub fn as_mut_str(&mut self) -> &mut String {
&mut self.0
}
}

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,28 @@
use serde::{Deserialize, Serialize};
// TODO(discord9): more error types
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum DataflowError {
EvalError(Box<EvalError>),
}
impl From<EvalError> for DataflowError {
fn from(e: EvalError) -> Self {
DataflowError::EvalError(Box::new(e))
}
}
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum EvalError {
DivisionByZero,
TypeMismatch(String),
InvalidArgument(String),
Internal(String),
}
#[test]
fn tell_goal() {
use differential_dataflow::ExchangeData;
fn a<T: ExchangeData>(_: T) {}
a(DataflowError::from(EvalError::DivisionByZero));
}

View File

@@ -0,0 +1,4 @@
//! TODO: Storage Layer: wrap grpc write request for providing definite collection for streaming process, and able to send read request should random access is needed
//! and store result of stream processing
pub(crate) mod errors;

View File

150
src/flow/src/util/buffer.rs Normal file
View File

@@ -0,0 +1,150 @@
use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::Data;
use timely::communication::Push;
use timely::dataflow::channels::Bundle;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::operators::{Capability, InputCapability};
use timely::progress::Timestamp;
/// A buffer that consolidates updates
///
/// The buffer implements a wrapper around [OutputHandle] consolidating elements pushed to it. It is
/// backed by a capacity-limited buffer, which means that compaction only occurs within the
/// dimensions of the buffer, i.e. the number of unique keys is less than half of the buffer's
/// capacity.
///
/// A cap is retained whenever the current time changes to be able to flush on drop or when the time
/// changes again.
///
/// The buffer is filled with updates until it reaches its capacity. At this point, the updates are
/// consolidated to free up space. This process repeats until the consolidation recovered less than
/// half of the buffer's capacity, at which point the buffer will be shipped.
///
/// The buffer retains a capability to send data on flush. It will flush all data once dropped, if
/// time changes, or if the buffer capacity is reached.
pub struct ConsolidateBuffer<'a, 'b, T, D: Data, R: Semigroup, P>
where
P: Push<Bundle<T, (D, T, R)>> + 'a,
T: Data + Timestamp + 'a,
D: 'a,
{
// a buffer for records, to send at self.cap
// Invariant: Buffer only contains data if cap is Some.
buffer: Vec<(D, T, R)>,
output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>,
cap: Option<Capability<T>>,
port: usize,
previous_len: usize,
}
impl<'a, 'b, T, D: Data, R: Semigroup, P> ConsolidateBuffer<'a, 'b, T, D, R, P>
where
T: Data + Timestamp + 'a,
P: Push<Bundle<T, (D, T, R)>> + 'a,
{
/// Create a new [ConsolidateBuffer], wrapping the provided session.
///
/// * `output_handle`: The output to send data to.
/// * 'port': The output port to retain capabilities for.
pub fn new(output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>, port: usize) -> Self {
Self {
output_handle,
port,
cap: None,
buffer: Vec::with_capacity(::timely::container::buffer::default_capacity::<(D, T, R)>()),
previous_len: 0,
}
}
#[inline]
/// Provides an iterator of elements to the buffer
pub fn give_iterator<I: Iterator<Item = (D, T, R)>>(
&mut self,
cap: &InputCapability<T>,
iter: I,
) {
for item in iter {
self.give(cap, item);
}
}
/// Give an element to the buffer
pub fn give(&mut self, cap: &InputCapability<T>, data: (D, T, R)) {
// Retain a cap for the current time, which will be used on flush.
if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
// Flush on capability change
self.flush();
// Retain capability for the specified output port.
self.cap = Some(cap.delayed_for_output(cap.time(), self.port));
}
self.give_internal(data);
}
/// Give an element to the buffer, using a pre-fabricated capability. Note that the capability
/// must be valid for the associated output.
pub fn give_at(&mut self, cap: &Capability<T>, data: (D, T, R)) {
// Retain a cap for the current time, which will be used on flush.
if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
// Flush on capability change
self.flush();
// Retain capability.
self.cap = Some(cap.clone());
}
self.give_internal(data);
}
/// Give an element and possibly flush the buffer. Note that this needs to have access
/// to a capability, which the public functions ensure.
fn give_internal(&mut self, data: (D, T, R)) {
self.buffer.push(data);
// Limit, if possible, the lifetime of the allocations for data
// and consolidate smaller buffers if we're in the lucky case
// of a small domain for D
if self.buffer.len() >= 2 * self.previous_len {
// Consolidate while the consolidation frees at least half the buffer
consolidate_updates(&mut self.buffer);
if self.buffer.len() > self.buffer.capacity() / 2 {
self.flush();
} else {
self.previous_len = self.buffer.len();
}
// At this point, it is an invariant across give calls that self.previous_len
// will be in the interval [0, self.buffer.capacity() / 2]. So, we will enter
// this if-statement block again when self.buffer.len() == self.buffer.capacity()
// or earlier. If consolidation is not effective to keep self.buffer.len()
// below half capacity, then flushing when more than half-full will
// maintain the invariant.
}
}
/// Flush the internal buffer to the underlying session
pub fn flush(&mut self) {
if let Some(cap) = &self.cap {
self.output_handle.session(cap).give_vec(&mut self.buffer);
// Ensure that the capacity is at least equal to the default in case
// it was reduced by give_vec. Note that we cannot rely here on give_vec
// returning us a buffer with zero capacity.
if self.buffer.capacity() < ::timely::container::buffer::default_capacity::<(D, T, R)>()
{
let to_reserve = ::timely::container::buffer::default_capacity::<(D, T, R)>()
- self.buffer.capacity();
self.buffer.reserve_exact(to_reserve);
}
self.previous_len = 0;
}
}
}
impl<'a, 'b, T, D: Data, R: Semigroup, P> Drop for ConsolidateBuffer<'a, 'b, T, D, R, P>
where
P: Push<Bundle<T, (D, T, R)>> + 'a,
T: Data + Timestamp + 'a,
D: 'a,
{
fn drop(&mut self) {
self.flush();
}
}

7
src/flow/src/util/mod.rs Normal file
View File

@@ -0,0 +1,7 @@
//! utilitys including extend differential dataflow to deal with errors and etc.
mod buffer;
mod operator;
mod reduce;
pub use operator::CollectionExt;
pub use reduce::ReduceExt;

View File

@@ -0,0 +1,257 @@
use differential_dataflow::difference::{Multiply, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::{Batch, Trace, TraceReader};
use differential_dataflow::{AsCollection, Collection};
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
use timely::dataflow::operators::generic::operator::{self, Operator};
use timely::dataflow::operators::generic::{InputHandle, OperatorInfo, OutputHandle};
use timely::dataflow::operators::Capability;
use timely::dataflow::{Scope, Stream};
use timely::{Data, ExchangeData};
use crate::util::buffer::ConsolidateBuffer;
pub trait StreamExt<G, D1>
where
D1: Data,
G: Scope,
{
/// Like `timely::dataflow::operators::generic::operator::Operator::unary`,
/// but the logic function can handle failures.
///
/// Creates a new dataflow operator that partitions its input stream by a
/// parallelization strategy `pact` and repeatedly invokes `logic`, the
/// function returned by the function passed as `constructor`. The `logic`
/// function can read to the input stream and write to either of two output
/// streams, where the first output stream represents successful
/// computations and the second output stream represents failed
/// computations.
fn unary_fallible<D2, E, B, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> (Stream<G, D2>, Stream<G, E>)
where
D2: Data,
E: Data,
B: FnOnce(
Capability<G::Timestamp>,
OperatorInfo,
) -> Box<
dyn FnMut(
&mut InputHandle<G::Timestamp, D1, P::Puller>,
&mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>,
&mut OutputHandle<G::Timestamp, E, Tee<G::Timestamp, E>>,
) + 'static,
>,
P: ParallelizationContract<G::Timestamp, D1>;
/// Like [`timely::dataflow::operators::map::Map::flat_map`], but `logic`
/// is allowed to fail. The first returned stream will contain the
/// successful applications of `logic`, while the second returned stream
/// will contain the failed applications.
fn flat_map_fallible<D2, E, I, L>(&self, name: &str, logic: L) -> (Stream<G, D2>, Stream<G, E>)
where
D2: Data,
E: Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut(D1) -> I + 'static;
}
/// Extension methods for differential [`Collection`]s.
pub trait CollectionExt<G, D1, R>
where
G: Scope,
R: Semigroup,
{
/// Creates a new empty collection in `scope`.
fn empty(scope: &G) -> Collection<G, D1, R>;
/// Like [`Collection::map`], but `logic` is allowed to fail. The first
/// returned collection will contain successful applications of `logic`,
/// while the second returned collection will contain the failed
/// applications.
fn map_fallible<D2, E, L>(
&self,
name: &str,
mut logic: L,
) -> (Collection<G, D2, R>, Collection<G, E, R>)
where
D2: Data,
E: Data,
L: FnMut(D1) -> Result<D2, E> + 'static,
{
self.flat_map_fallible(name, move |record| Some(logic(record)))
}
/// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first
/// returned collection will contain the successful applications of `logic`,
/// while the second returned collection will contain the failed
/// applications.
fn flat_map_fallible<D2, E, I, L>(
&self,
name: &str,
logic: L,
) -> (Collection<G, D2, R>, Collection<G, E, R>)
where
D2: Data,
E: Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut(D1) -> I + 'static;
/// Replaces each record with another, with a new difference type.
///
/// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
/// and move the data into the difference component. This will allow differential dataflow to update in-place.
fn explode_one<D2, R2, L>(&self, logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
where
D2: differential_dataflow::Data,
R2: Semigroup + Multiply<R>,
<R2 as Multiply<R>>::Output: Data + Semigroup,
L: FnMut(D1) -> (D2, R2) + 'static,
G::Timestamp: Lattice;
}
impl<G, D1> StreamExt<G, D1> for Stream<G, D1>
where
D1: Data,
G: Scope,
{
fn unary_fallible<D2, E, B, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> (Stream<G, D2>, Stream<G, E>)
where
D2: Data,
E: Data,
B: FnOnce(
Capability<G::Timestamp>,
OperatorInfo,
) -> Box<
dyn FnMut(
&mut InputHandle<G::Timestamp, D1, P::Puller>,
&mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>,
&mut OutputHandle<G::Timestamp, E, Tee<G::Timestamp, E>>,
) + 'static,
>,
P: ParallelizationContract<G::Timestamp, D1>,
{
let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
builder.set_notify(false);
let operator_info = builder.operator_info();
let mut input = builder.new_input(self, pact);
let (mut ok_output, ok_stream) = builder.new_output();
let (mut err_output, err_stream) = builder.new_output();
builder.build(move |mut capabilities| {
// `capabilities` should be a single-element vector.
let capability = capabilities.pop().unwrap();
let mut logic = constructor(capability, operator_info);
move |_frontiers| {
let mut ok_output_handle = ok_output.activate();
let mut err_output_handle = err_output.activate();
logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
}
});
(ok_stream, err_stream)
}
#[allow(clippy::redundant_closure)]
fn flat_map_fallible<D2, E, I, L>(
&self,
name: &str,
mut logic: L,
) -> (Stream<G, D2>, Stream<G, E>)
where
D2: Data,
E: Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut(D1) -> I + 'static,
{
let mut storage = Vec::new();
self.unary_fallible(Pipeline, name, move |_, _| {
Box::new(move |input, ok_output, err_output| {
input.for_each(|time, data| {
let mut ok_session = ok_output.session(&time);
let mut err_session = err_output.session(&time);
data.swap(&mut storage);
for r in storage.drain(..).flat_map(|d1| logic(d1)) {
match r {
Ok(d2) => ok_session.give(d2),
Err(e) => err_session.give(e),
}
}
})
})
})
}
}
impl<G, D1, R> CollectionExt<G, D1, R> for Collection<G, D1, R>
where
G: Scope,
G::Timestamp: Data,
D1: Data,
R: Semigroup,
{
fn empty(scope: &G) -> Collection<G, D1, R> {
operator::empty(scope).as_collection()
}
fn flat_map_fallible<D2, E, I, L>(
&self,
name: &str,
mut logic: L,
) -> (Collection<G, D2, R>, Collection<G, E, R>)
where
D2: Data,
E: Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut(D1) -> I + 'static,
{
let (ok_stream, err_stream) = self.inner.flat_map_fallible(name, move |(d1, t, r)| {
logic(d1).into_iter().map(move |res| match res {
Ok(d2) => Ok((d2, t.clone(), r.clone())),
Err(e) => Err((e, t.clone(), r.clone())),
})
});
(ok_stream.as_collection(), err_stream.as_collection())
}
fn explode_one<D2, R2, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
where
D2: differential_dataflow::Data,
R2: Semigroup + Multiply<R>,
<R2 as Multiply<R>>::Output: Data + Semigroup,
L: FnMut(D1) -> (D2, R2) + 'static,
G::Timestamp: Lattice,
{
self.inner
.unary(Pipeline, "ExplodeOne", move |_, _| {
let mut buffer = Vec::new();
move |input, output| {
let mut out = ConsolidateBuffer::new(output, 0);
input.for_each(|time, data| {
data.swap(&mut buffer);
out.give_iterator(
&time,
buffer.drain(..).map(|(x, t, d)| {
let (x, d2) = logic(x);
(x, t, d2.multiply(&d))
}),
);
});
}
})
.as_collection()
}
}

View File

@@ -0,0 +1,68 @@
use differential_dataflow::difference::{Abelian, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::trace::{Batch, Trace, TraceReader};
use differential_dataflow::Data;
use timely::dataflow::Scope;
/// Extension trait for `ReduceCore`, currently providing a reduction based
/// on an operator-pair approach.
pub trait ReduceExt<G: Scope, K: Data, V: Data, R: Semigroup>
where
G::Timestamp: Lattice + Ord,
{
/// This method produces a reduction pair based on the same input arrangement. Each reduction
/// in the pair operates with its own logic and the two output arrangements from the reductions
/// are produced as a result. The method is useful for reductions that need to present different
/// output views on the same input data. An example is producing an error-free reduction output
/// along with a separate error output indicating when the error-free output is valid.
fn reduce_pair<L1, T1, L2, T2>(
&self,
name1: &str,
name2: &str,
logic1: L1,
logic2: L2,
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
where
T1: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T1::Val: Data,
T1::R: Abelian,
T1::Batch: Batch,
L1: FnMut(&K, &[(&V, R)], &mut Vec<(T1::Val, T1::R)>) + 'static,
T2: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T2::Val: Data,
T2::R: Abelian,
T2::Batch: Batch,
L2: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static;
}
impl<G: Scope, K: Data, V: Data, Tr, R: Semigroup> ReduceExt<G, K, V, R> for Arranged<G, Tr>
where
G::Timestamp: Lattice + Ord,
Tr: TraceReader<Key = K, Val = V, Time = G::Timestamp, R = R> + Clone + 'static,
{
fn reduce_pair<L1, T1, L2, T2>(
&self,
name1: &str,
name2: &str,
logic1: L1,
logic2: L2,
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
where
T1: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T1::Val: Data,
T1::R: Abelian,
T1::Batch: Batch,
L1: FnMut(&K, &[(&V, R)], &mut Vec<(T1::Val, T1::R)>) + 'static,
T2: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T2::Val: Data,
T2::R: Abelian,
T2::Batch: Batch,
L2: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static,
{
let arranged1 = self.reduce_abelian::<L1, T1>(name1, logic1);
let arranged2 = self.reduce_abelian::<L2, T2>(name2, logic2);
(arranged1, arranged2)
}
}