mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
16 Commits
chore/debu
...
stream
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f995204060 | ||
|
|
93561291e4 | ||
|
|
9f59d68391 | ||
|
|
51083b12bd | ||
|
|
c80165c377 | ||
|
|
76d8709774 | ||
|
|
2cf7d6d569 | ||
|
|
045c8079e6 | ||
|
|
54f2f6495f | ||
|
|
2798d266f5 | ||
|
|
824d03a642 | ||
|
|
47f41371d0 | ||
|
|
d702b6e5c4 | ||
|
|
13c02f3f92 | ||
|
|
b52eb2313e | ||
|
|
d422bc8401 |
116
Cargo.lock
generated
116
Cargo.lock
generated
@@ -8,6 +8,23 @@ version = "0.11.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
|
||||
|
||||
[[package]]
|
||||
name = "abomonation"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "56e72913c99b1f927aa7bd59a41518fdd9995f63ffc8760f211609e0241c4fb2"
|
||||
|
||||
[[package]]
|
||||
name = "abomonation_derive"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e50e2a046af56a864c62d97b7153fda72c596e646be1b0c7963736821f6e1efa"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"synstructure",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.20.0"
|
||||
@@ -1627,6 +1644,14 @@ version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
|
||||
|
||||
[[package]]
|
||||
name = "columnation"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/frankmcsherry/columnation#eb8e20c10e748dcbfe6266be8e24e14422d3de0f"
|
||||
dependencies = [
|
||||
"paste",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "comfy-table"
|
||||
version = "7.0.1"
|
||||
@@ -2832,6 +2857,19 @@ version = "0.1.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
|
||||
|
||||
[[package]]
|
||||
name = "differential-dataflow"
|
||||
version = "0.12.0"
|
||||
source = "git+https://github.com/TimelyDataflow/differential-dataflow#2b9ac68aab9a1bf3fc3e4c12fcabea9c9d1ecc6a"
|
||||
dependencies = [
|
||||
"abomonation",
|
||||
"abomonation_derive",
|
||||
"fnv",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"timely",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
@@ -3250,6 +3288,19 @@ dependencies = [
|
||||
"spin 0.9.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"common-telemetry",
|
||||
"datafusion-expr",
|
||||
"datafusion-substrait",
|
||||
"datatypes",
|
||||
"differential-dataflow",
|
||||
"serde",
|
||||
"timely",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
@@ -9791,6 +9842,18 @@ version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
|
||||
|
||||
[[package]]
|
||||
name = "synstructure"
|
||||
version = "0.12.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
"unicode-xid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "system-configuration"
|
||||
version = "0.5.1"
|
||||
@@ -10169,6 +10232,59 @@ dependencies = [
|
||||
"time-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "timely"
|
||||
version = "0.12.0"
|
||||
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
|
||||
dependencies = [
|
||||
"abomonation",
|
||||
"abomonation_derive",
|
||||
"crossbeam-channel",
|
||||
"futures-util",
|
||||
"getopts",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"timely_bytes",
|
||||
"timely_communication",
|
||||
"timely_container",
|
||||
"timely_logging",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "timely_bytes"
|
||||
version = "0.12.0"
|
||||
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
|
||||
|
||||
[[package]]
|
||||
name = "timely_communication"
|
||||
version = "0.12.0"
|
||||
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
|
||||
dependencies = [
|
||||
"abomonation",
|
||||
"abomonation_derive",
|
||||
"bincode",
|
||||
"crossbeam-channel",
|
||||
"getopts",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"timely_bytes",
|
||||
"timely_logging",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "timely_container"
|
||||
version = "0.12.0"
|
||||
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
|
||||
dependencies = [
|
||||
"columnation",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "timely_logging"
|
||||
version = "0.12.0"
|
||||
source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2a7450809215145f3e940234a"
|
||||
|
||||
[[package]]
|
||||
name = "timsort"
|
||||
version = "0.1.2"
|
||||
|
||||
@@ -46,6 +46,7 @@ members = [
|
||||
"src/sql",
|
||||
"src/storage",
|
||||
"src/store-api",
|
||||
"src/flow",
|
||||
"src/table",
|
||||
"src/table-procedure",
|
||||
"tests-integration",
|
||||
|
||||
25
src/flow/Cargo.toml
Normal file
25
src/flow/Cargo.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
[package]
|
||||
name = "flow"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
# use version from crates.io for now to prevent version slewing
|
||||
# disable default-features which include `abomonaion` which we don't need for IPC
|
||||
# timely = {version = "0.12.0", default-features = false, features = ["bincode"]}
|
||||
# differential-dataflow = "0.12.0"
|
||||
# timely = "0.12.0"
|
||||
# differential-dataflow = "0.12.0"
|
||||
# TODO(discord9): fork later for fixed version git dependency
|
||||
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = [
|
||||
"bincode",
|
||||
] }
|
||||
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } #, rev = "99fa67db" }
|
||||
datafusion-expr.workspace = true
|
||||
datafusion-substrait.workspace = true
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
datatypes = { path = "../datatypes" }
|
||||
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
3
src/flow/src/adapter/mod.rs
Normal file
3
src/flow/src/adapter/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
//! for getting data from source and sending results to sink
|
||||
//! and communicating with other parts of the database
|
||||
//! also commands storage and computation layer
|
||||
22
src/flow/src/compute/compute_state.rs
Normal file
22
src/flow/src/compute/compute_state.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use crate::expr::GlobalId;
|
||||
|
||||
/// Worker-local state that is maintained across dataflows.
|
||||
///
|
||||
/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
|
||||
/// done between data ingress and egress.
|
||||
pub struct ComputeState {
|
||||
/// State kept for each installed compute collection.
|
||||
///
|
||||
/// Each collection has exactly one frontier.
|
||||
/// How the frontier is communicated depends on the collection type:
|
||||
/// * Frontiers of indexes are equal to the frontier of their corresponding traces in the
|
||||
/// `TraceManager`.
|
||||
/// * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
|
||||
/// * Subscribes report their frontiers through the `subscribe_response_buffer`.
|
||||
pub collections: BTreeMap<GlobalId, CollectionState>,
|
||||
}
|
||||
|
||||
/// State maintained for a compute collection.
|
||||
pub struct CollectionState {}
|
||||
743
src/flow/src/compute/context.rs
Normal file
743
src/flow/src/compute/context.rs
Normal file
@@ -0,0 +1,743 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use differential_dataflow::lattice::Lattice;
|
||||
use differential_dataflow::operators::arrange::Arranged;
|
||||
use differential_dataflow::trace::wrappers::enter::TraceEnter;
|
||||
use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
|
||||
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
|
||||
use differential_dataflow::{Collection, Data};
|
||||
use timely::communication::message::RefOrMut;
|
||||
use timely::dataflow::operators::generic::OutputHandle;
|
||||
use timely::dataflow::operators::Capability;
|
||||
use timely::dataflow::scopes::Child;
|
||||
use timely::dataflow::{Scope, ScopeParent};
|
||||
use timely::progress::timestamp::Refines;
|
||||
use timely::progress::{Antichain, Timestamp};
|
||||
|
||||
use super::plan::Plan;
|
||||
use super::types::DataflowDescription;
|
||||
use crate::compute::render::RenderTimestamp;
|
||||
use crate::compute::typedefs::{TraceErrHandle, TraceRowHandle};
|
||||
use crate::expr::{GlobalId, Id, MapFilterProject, ScalarExpr};
|
||||
use crate::repr;
|
||||
use crate::repr::{Diff, Row};
|
||||
use crate::storage::errors::DataflowError;
|
||||
|
||||
// Local type definition to avoid the horror in signatures.
|
||||
pub(crate) type KeyArrangement<S, K, V> =
|
||||
Arranged<S, TraceRowHandle<K, V, <S as ScopeParent>::Timestamp, Diff>>;
|
||||
pub(crate) type Arrangement<S, V> = KeyArrangement<S, V, V>;
|
||||
pub(crate) type ErrArrangement<S> =
|
||||
Arranged<S, TraceErrHandle<DataflowError, <S as ScopeParent>::Timestamp, Diff>>;
|
||||
pub(crate) type ArrangementImport<S, V, T> = Arranged<
|
||||
S,
|
||||
TraceEnter<TraceFrontier<TraceRowHandle<V, V, T, Diff>>, <S as ScopeParent>::Timestamp>,
|
||||
>;
|
||||
pub(crate) type ErrArrangementImport<S, T> = Arranged<
|
||||
S,
|
||||
TraceEnter<
|
||||
TraceFrontier<TraceErrHandle<DataflowError, T, Diff>>,
|
||||
<S as ScopeParent>::Timestamp,
|
||||
>,
|
||||
>;
|
||||
|
||||
/// Describes flavor of arrangement: local or imported trace.
|
||||
#[derive(Clone)]
|
||||
pub enum ArrangementFlavor<S: Scope, V: Data, T = repr::Timestamp>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
{
|
||||
/// A dataflow-local arrangement.
|
||||
Local(Arrangement<S, V>, ErrArrangement<S>),
|
||||
/// An imported trace from outside the dataflow.
|
||||
///
|
||||
/// The `GlobalId` identifier exists so that exports of this same trace
|
||||
/// can refer back to and depend on the original instance.
|
||||
Trace(
|
||||
GlobalId,
|
||||
ArrangementImport<S, V, T>,
|
||||
ErrArrangementImport<S, T>,
|
||||
),
|
||||
}
|
||||
|
||||
impl<S: Scope, T> ArrangementFlavor<S, Row, T>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
{
|
||||
/// Presents `self` as a stream of updates.
|
||||
///
|
||||
/// This method presents the contents as they are, without further computation.
|
||||
/// If you have logic that could be applied to each record, consider using the
|
||||
/// `flat_map` methods which allows this and can reduce the work done.
|
||||
pub fn as_collection(&self) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
|
||||
match &self {
|
||||
ArrangementFlavor::Local(oks, errs) => (
|
||||
oks.as_collection(move |k: &Row, v: &Row| {
|
||||
// type annotated because rust-analzyer can't infer the type due to being complex closures
|
||||
// see https://github.com/rust-lang/rust-analyzer/issues/6338
|
||||
let mut k = k.clone();
|
||||
k.extend(v.clone().into_iter());
|
||||
k
|
||||
}),
|
||||
errs.as_collection(|k, &()| k.clone()),
|
||||
),
|
||||
ArrangementFlavor::Trace(_, oks, errs) => (
|
||||
oks.as_collection(move |k, v| {
|
||||
let mut k = k.clone();
|
||||
k.extend(v.clone().into_iter());
|
||||
k
|
||||
}),
|
||||
errs.as_collection(|k, &()| k.clone()),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs and applies logic to elements of `self` and returns the results.
|
||||
///
|
||||
/// `constructor` takes a permutation and produces the logic to apply on elements. The logic
|
||||
/// conceptually receives `(&Row, &Row)` pairs in the form of a slice. Only after borrowing
|
||||
/// the elements and applying the permutation the datums will be in the expected order.
|
||||
///
|
||||
/// If `key` is set, this is a promise that `logic` will produce no results on
|
||||
/// records for which the key does not evaluate to the value. This is used to
|
||||
/// leap directly to exactly those records.
|
||||
pub fn flat_map<I, C, L>(
|
||||
&self,
|
||||
key: Option<Row>,
|
||||
constructor: C,
|
||||
) -> (
|
||||
timely::dataflow::Stream<S, I::Item>,
|
||||
Collection<S, DataflowError, Diff>,
|
||||
)
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: Data,
|
||||
C: FnOnce() -> L,
|
||||
L: for<'a, 'b> FnMut(&'a [&'b RefOrMut<'b, Row>], &'a S::Timestamp, &'a Diff) -> I
|
||||
+ 'static,
|
||||
{
|
||||
// Set a number of tuples after which the operator should yield.
|
||||
// This allows us to remain responsive even when enumerating a substantial
|
||||
// arrangement, as well as provides time to accumulate our produced output.
|
||||
let refuel = 1000000;
|
||||
|
||||
match &self {
|
||||
ArrangementFlavor::Local(oks, errs) => {
|
||||
let mut logic = constructor();
|
||||
let oks = CollectionBundle::<S, Row, T>::flat_map_core(
|
||||
oks,
|
||||
key,
|
||||
move |k, v, t, d| logic(&[&k, &v], t, d),
|
||||
refuel,
|
||||
);
|
||||
let errs = errs.as_collection(|k, &()| k.clone());
|
||||
(oks, errs)
|
||||
}
|
||||
ArrangementFlavor::Trace(_, oks, errs) => {
|
||||
let mut logic = constructor();
|
||||
let oks = CollectionBundle::<S, Row, T>::flat_map_core(
|
||||
oks,
|
||||
key,
|
||||
move |k, v, t, d| logic(&[&k, &v], t, d),
|
||||
refuel,
|
||||
);
|
||||
let errs = errs.as_collection(|k, &()| k.clone());
|
||||
(oks, errs)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Scope, V: Data, T> ArrangementFlavor<S, V, T>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
{
|
||||
pub fn scope(&self) -> S {
|
||||
match self {
|
||||
ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
|
||||
ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Brings the arrangement flavor into a region.
|
||||
pub fn enter_region<'a>(
|
||||
&self,
|
||||
region: &Child<'a, S, S::Timestamp>,
|
||||
) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, V, T> {
|
||||
match self {
|
||||
ArrangementFlavor::Local(oks, errs) => {
|
||||
ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
|
||||
}
|
||||
ArrangementFlavor::Trace(gid, oks, errs) => {
|
||||
ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S: Scope, V: Data, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, V, T>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
{
|
||||
/// Extracts the arrangement flavor from a region.
|
||||
pub fn leave_region(&self) -> ArrangementFlavor<S, V, T> {
|
||||
match self {
|
||||
ArrangementFlavor::Local(oks, errs) => {
|
||||
ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
|
||||
}
|
||||
ArrangementFlavor::Trace(gid, oks, errs) => {
|
||||
ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Context<S, V: Data, T = repr::Timestamp>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S: Scope,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
{
|
||||
/// The scope within which all managed collections exist.
|
||||
///
|
||||
/// It is an error to add any collections not contained in this scope.
|
||||
pub(crate) scope: S,
|
||||
/// The debug name of the dataflow associated with this context.
|
||||
pub debug_name: String,
|
||||
/// The Timely ID of the dataflow associated with this context.
|
||||
pub dataflow_id: usize,
|
||||
/// Frontier before which updates should not be emitted.
|
||||
///
|
||||
/// We *must* apply it to sinks, to ensure correct outputs.
|
||||
/// We *should* apply it to sources and imported traces, because it improves performance.
|
||||
pub since_frontier: Antichain<T>,
|
||||
/// Frontier after which updates should not be emitted.
|
||||
/// Used to limit the amount of work done when appropriate.
|
||||
pub until_frontier: Antichain<T>,
|
||||
/// Bindings of identifiers to collections.
|
||||
pub bindings: BTreeMap<Id, CollectionBundle<S, V, T>>,
|
||||
}
|
||||
|
||||
impl<S: Scope, V: Data> Context<S, V>
|
||||
where
|
||||
S::Timestamp: Lattice + Refines<repr::Timestamp>,
|
||||
{
|
||||
/// TODO(discord9)" DataflowDesc & Plan & etc.
|
||||
/// Creates a new empty Context from given dataflow
|
||||
pub fn for_dataflow_in<Plan>(dataflow: &DataflowDescription<Plan, ()>, scope: S) -> Self {
|
||||
let dataflow_id = scope.addr()[0];
|
||||
let since_frontier = dataflow
|
||||
.as_of
|
||||
.clone()
|
||||
.unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
|
||||
// TODO(discord9)=: get since_frontier and until_frontier from dataflow_desc
|
||||
Self {
|
||||
scope,
|
||||
debug_name: dataflow.debug_name.clone(),
|
||||
dataflow_id,
|
||||
since_frontier,
|
||||
until_frontier: dataflow.until.clone(),
|
||||
bindings: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Scope, V: Data, T: Lattice> Context<S, V, T>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
{
|
||||
/// Insert a collection bundle by an identifier.
|
||||
///
|
||||
/// This is expected to be used to install external collections (sources, indexes, other views),
|
||||
/// as well as for `Let` bindings of local collections.
|
||||
pub fn insert_id(
|
||||
&mut self,
|
||||
id: Id,
|
||||
collection: CollectionBundle<S, V, T>,
|
||||
) -> Option<CollectionBundle<S, V, T>> {
|
||||
self.bindings.insert(id, collection)
|
||||
}
|
||||
|
||||
/// Remove a collection bundle by an identifier.
|
||||
///
|
||||
/// The primary use of this method is uninstalling `Let` bindings.
|
||||
pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, V, T>> {
|
||||
self.bindings.remove(&id)
|
||||
}
|
||||
/// Melds a collection bundle to whatever exists.
|
||||
#[allow(clippy::map_entry)]
|
||||
pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, V, T>) {
|
||||
if !self.bindings.contains_key(&id) {
|
||||
self.bindings.insert(id, collection);
|
||||
} else {
|
||||
let binding = self
|
||||
.bindings
|
||||
.get_mut(&id)
|
||||
.expect("Binding verified to exist");
|
||||
if collection.collection.is_some() {
|
||||
binding.collection = collection.collection;
|
||||
}
|
||||
for (key, flavor) in collection.arranged.into_iter() {
|
||||
binding.arranged.insert(key, flavor);
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Look up a collection bundle by an identifier.
|
||||
pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, V, T>> {
|
||||
self.bindings.get(&id).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
type ResultCollection<S, V> = (Collection<S, V, Diff>, Collection<S, DataflowError, Diff>);
|
||||
|
||||
/// A bundle of the various ways a collection can be represented.
|
||||
///
|
||||
/// This type maintains the invariant that it does contain at least one valid
|
||||
/// source of data, either a collection or at least one arrangement.
|
||||
#[derive(Clone)]
|
||||
pub struct CollectionBundle<S, V, T = repr::Timestamp>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S: Scope,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
V: Data,
|
||||
{
|
||||
pub(crate) collection: Option<ResultCollection<S, V>>,
|
||||
/// TODO(discord9): impl: 1. ScalarExpr(Could be from substrait), 2. Arrangement
|
||||
pub(crate) arranged: BTreeMap<Vec<ScalarExpr>, ArrangementFlavor<S, V, T>>,
|
||||
}
|
||||
|
||||
impl<S: Scope, V: Data, T: Lattice> CollectionBundle<S, V, T>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
{
|
||||
/// Construct a new collection bundle from update streams.
|
||||
pub fn from_collections(
|
||||
oks: Collection<S, V, Diff>,
|
||||
errs: Collection<S, DataflowError, Diff>,
|
||||
) -> Self {
|
||||
Self {
|
||||
collection: Some((oks, errs)),
|
||||
arranged: BTreeMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts arrangements by the expressions on which they are keyed.
|
||||
pub fn from_expressions(
|
||||
exprs: Vec<ScalarExpr>,
|
||||
arrangements: ArrangementFlavor<S, V, T>,
|
||||
) -> Self {
|
||||
let mut arranged = BTreeMap::new();
|
||||
arranged.insert(exprs, arrangements);
|
||||
Self {
|
||||
collection: None,
|
||||
arranged,
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts arrangements by the columns on which they are keyed.
|
||||
pub fn from_columns<I: IntoIterator<Item = usize>>(
|
||||
columns: I,
|
||||
arrangements: ArrangementFlavor<S, V, T>,
|
||||
) -> Self {
|
||||
let mut keys = Vec::new();
|
||||
for column in columns {
|
||||
keys.push(ScalarExpr::Column(column));
|
||||
}
|
||||
Self::from_expressions(keys, arrangements)
|
||||
}
|
||||
|
||||
/// The scope containing the collection bundle.
|
||||
pub fn scope(&self) -> S {
|
||||
if let Some((oks, _errs)) = &self.collection {
|
||||
oks.inner.scope()
|
||||
} else {
|
||||
self.arranged
|
||||
.values()
|
||||
.next()
|
||||
.expect("Must contain a valid collection")
|
||||
.scope()
|
||||
}
|
||||
}
|
||||
|
||||
/// Brings the collection bundle into a region.
|
||||
pub fn enter_region<'a>(
|
||||
&self,
|
||||
region: &Child<'a, S, S::Timestamp>,
|
||||
) -> CollectionBundle<Child<'a, S, S::Timestamp>, V, T> {
|
||||
CollectionBundle {
|
||||
collection: self
|
||||
.collection
|
||||
.as_ref()
|
||||
.map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
|
||||
arranged: self
|
||||
.arranged
|
||||
.iter()
|
||||
.map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T> CollectionBundle<S, repr::Row, T>
|
||||
where
|
||||
T: timely::progress::Timestamp + Lattice,
|
||||
S: Scope,
|
||||
S::Timestamp: Refines<T> + Lattice + RenderTimestamp,
|
||||
{
|
||||
/// Presents `self` as a stream of updates, having been subjected to `mfp`.
|
||||
///
|
||||
/// This operator is able to apply the logic of `mfp` early, which can substantially
|
||||
/// reduce the amount of data produced when `mfp` is non-trivial.
|
||||
///
|
||||
/// The `key_val` argument, when present, indicates that a specific arrangement should
|
||||
/// be used, and if, in addition, the `val` component is present,
|
||||
/// that we can seek to the supplied row.
|
||||
pub fn as_collection_core(
|
||||
&self,
|
||||
mut mfp: MapFilterProject,
|
||||
key_val: Option<(Vec<ScalarExpr>, Option<Row>)>,
|
||||
until: Antichain<repr::Timestamp>,
|
||||
) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
|
||||
mfp.optimize();
|
||||
|
||||
let mfp_plan = mfp.into_plan().unwrap();
|
||||
|
||||
// If the MFP is trivial, we can just call `as_collection`.
|
||||
// In the case that we weren't going to apply the `key_val` optimization,
|
||||
// this path results in a slightly smaller and faster
|
||||
// dataflow graph, and is intended to fix
|
||||
let has_key_val = matches!(&key_val, Some((_key, Some(_val))));
|
||||
|
||||
if mfp_plan.is_identity() && !has_key_val {
|
||||
let key = key_val.map(|(k, _v)| k);
|
||||
return self.as_specific_collection(key.as_deref());
|
||||
}
|
||||
let (stream, errors) = self.flat_map(key_val, || {
|
||||
let until = std::rc::Rc::new(until);
|
||||
// this logic get executed every time a new row arrives
|
||||
move |row_parts, time, diff| {
|
||||
let until = std::rc::Rc::clone(&until);
|
||||
let row_iters = row_parts
|
||||
.iter()
|
||||
.flat_map(|row| (**row).to_owned().into_iter());
|
||||
let mut datums_local = Vec::new();
|
||||
datums_local.extend(row_iters);
|
||||
let time = time.clone();
|
||||
let event_time: repr::Timestamp = *time.clone().event_time();
|
||||
mfp_plan
|
||||
.evaluate::<DataflowError, _>(
|
||||
&mut datums_local,
|
||||
event_time,
|
||||
*diff,
|
||||
move |time| !until.less_equal(time),
|
||||
)
|
||||
.map(move |x| match x {
|
||||
Ok((row, event_time, diff)) => {
|
||||
// Copy the whole time, and re-populate event time.
|
||||
let mut time: S::Timestamp = time.clone();
|
||||
*time.event_time() = event_time;
|
||||
Ok((row, time, diff))
|
||||
}
|
||||
Err((e, event_time, diff)) => {
|
||||
// Copy the whole time, and re-populate event time.
|
||||
let mut time: S::Timestamp = time.clone();
|
||||
*time.event_time() = event_time;
|
||||
Err((e, time, diff))
|
||||
}
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
use timely::dataflow::operators::ok_err::OkErr;
|
||||
let (oks, errs) = stream.ok_err(|x| x);
|
||||
|
||||
use differential_dataflow::AsCollection;
|
||||
let oks = oks.as_collection();
|
||||
let errs = errs.as_collection();
|
||||
(oks, errors.concat(&errs))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S: Scope, V: Data, T> CollectionBundle<Child<'a, S, S::Timestamp>, V, T>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
{
|
||||
/// Extracts the collection bundle from a region.
|
||||
pub fn leave_region(&self) -> CollectionBundle<S, V, T> {
|
||||
CollectionBundle {
|
||||
collection: self
|
||||
.collection
|
||||
.as_ref()
|
||||
.map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
|
||||
arranged: self
|
||||
.arranged
|
||||
.iter()
|
||||
.map(|(key, bundle)| (key.clone(), bundle.leave_region()))
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Scope, T: Lattice> CollectionBundle<S, Row, T>
|
||||
where
|
||||
T: Timestamp + Lattice,
|
||||
S::Timestamp: Lattice + Refines<T>,
|
||||
{
|
||||
/// Asserts that the arrangement for a specific key
|
||||
/// (or the raw collection for no key) exists,
|
||||
/// and returns the corresponding collection.
|
||||
///
|
||||
/// This returns the collection as-is, without
|
||||
/// doing any unthinning transformation.
|
||||
/// Therefore, it should be used when the appropriate transformation
|
||||
/// was planned as part of a following MFP.
|
||||
pub fn as_specific_collection(
|
||||
&self,
|
||||
key: Option<&[ScalarExpr]>,
|
||||
) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
|
||||
// Any operator that uses this method was told to use a particular
|
||||
// collection during LIR planning, where we should have made
|
||||
// sure that that collection exists.
|
||||
//
|
||||
// If it doesn't, we panic.
|
||||
match key {
|
||||
None => self
|
||||
.collection
|
||||
.clone()
|
||||
.expect("The unarranged collection doesn't exist."),
|
||||
Some(key) => self
|
||||
.arranged
|
||||
.get(key)
|
||||
.unwrap_or_else(|| panic!("The collection arranged by {:?} doesn't exist.", key))
|
||||
.as_collection(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs and applies logic to elements of a collection and returns the results.
|
||||
///
|
||||
/// `constructor` takes a permutation and produces the logic to apply on elements. The logic
|
||||
/// conceptually receives `(&Row, &Row)` pairs in the form of a slice. Only after borrowing
|
||||
/// the elements and applying the permutation the datums will be in the expected order.
|
||||
///
|
||||
/// If `key_val` is set, this is a promise that `logic` will produce no results on
|
||||
/// records for which the key does not evaluate to the value. This is used when we
|
||||
/// have an arrangement by that key to leap directly to exactly those records.
|
||||
/// It is important that `logic` still guard against data that does not satisfy
|
||||
/// this constraint, as this method does not statically know that it will have
|
||||
/// that arrangement.
|
||||
pub fn flat_map<I, C, L>(
|
||||
&self,
|
||||
key_val: Option<(Vec<ScalarExpr>, Option<Row>)>,
|
||||
constructor: C,
|
||||
) -> (
|
||||
timely::dataflow::Stream<S, I::Item>,
|
||||
Collection<S, DataflowError, Diff>,
|
||||
)
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: Data,
|
||||
C: FnOnce() -> L,
|
||||
L: for<'a, 'b> FnMut(&'a [&'b RefOrMut<'b, Row>], &'a S::Timestamp, &'a Diff) -> I
|
||||
+ 'static,
|
||||
{
|
||||
// If `key_val` is set, we should have use the corresponding arrangement.
|
||||
// If there isn't one, that implies an error in the contract between
|
||||
// key-production and available arrangements.
|
||||
if let Some((key, val)) = key_val {
|
||||
let flavor = self
|
||||
.arrangement(&key)
|
||||
.expect("Should have ensured during planning that this arrangement exists.");
|
||||
flavor.flat_map(val, constructor)
|
||||
} else {
|
||||
use timely::dataflow::operators::Map;
|
||||
let (oks, errs) = self
|
||||
.collection
|
||||
.clone()
|
||||
.expect("Invariant violated: CollectionBundle contains no collection.");
|
||||
let mut logic = constructor();
|
||||
(
|
||||
oks.inner
|
||||
.flat_map(move |(mut v, t, d)| logic(&[&RefOrMut::Mut(&mut v)], &t, &d)),
|
||||
errs,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Factored out common logic for using literal keys in general traces.
|
||||
///
|
||||
/// This logic is sufficiently interesting that we want to write it only
|
||||
/// once, and thereby avoid any skew in the two uses of the logic.
|
||||
///
|
||||
/// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
|
||||
/// where key and value are rows.
|
||||
fn flat_map_core<Tr, I, L>(
|
||||
trace: &Arranged<S, Tr>,
|
||||
key: Option<Row>,
|
||||
mut logic: L,
|
||||
refuel: usize,
|
||||
) -> timely::dataflow::Stream<S, I::Item>
|
||||
where
|
||||
Tr: TraceReader<Key = Row, Val = Row, Time = S::Timestamp, R = repr::Diff>
|
||||
+ Clone
|
||||
+ 'static,
|
||||
I: IntoIterator,
|
||||
I::Item: Data,
|
||||
L: for<'a, 'b> FnMut(
|
||||
RefOrMut<'b, Row>,
|
||||
RefOrMut<'b, Row>,
|
||||
&'a S::Timestamp,
|
||||
&'a repr::Diff,
|
||||
) -> I
|
||||
+ 'static,
|
||||
{
|
||||
let mode = if key.is_some() { "index" } else { "scan" };
|
||||
let name = format!("ArrangementFlatMap({})", mode);
|
||||
use timely::dataflow::channels::pact::Pipeline;
|
||||
use timely::dataflow::operators::Operator;
|
||||
trace.stream.unary(Pipeline, &name, move |_, info| {
|
||||
// Acquire an activator to reschedule the operator when it has unfinished work.
|
||||
use timely::scheduling::Activator;
|
||||
let activations = trace.stream.scope().activations();
|
||||
let activator = Activator::new(&info.address[..], activations);
|
||||
// Maintain a list of work to do, cursor to navigate and process.
|
||||
let mut todo = std::collections::VecDeque::new();
|
||||
move |input, output| {
|
||||
// First, dequeue all batches.
|
||||
input.for_each(|time, data| {
|
||||
let capability = time.retain();
|
||||
for batch in data.iter() {
|
||||
// enqueue a capability, cursor, and batch.
|
||||
todo.push_back(PendingWork::new(
|
||||
capability.clone(),
|
||||
batch.cursor(),
|
||||
batch.clone(),
|
||||
));
|
||||
}
|
||||
});
|
||||
|
||||
// Second, make progress on `todo`.
|
||||
let mut fuel = refuel;
|
||||
while !todo.is_empty() && fuel > 0 {
|
||||
todo.front_mut()
|
||||
.unwrap()
|
||||
.do_work(&key, &mut logic, &mut fuel, output);
|
||||
if fuel > 0 {
|
||||
todo.pop_front();
|
||||
}
|
||||
}
|
||||
// If we have not finished all work, re-activate the operator.
|
||||
if !todo.is_empty() {
|
||||
activator.activate();
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Look up an arrangement by the expressions that form the key.
|
||||
///
|
||||
/// The result may be `None` if no such arrangement exists, or it may be one of many
|
||||
/// "arrangement flavors" that represent the types of arranged data we might have.
|
||||
pub fn arrangement(&self, key: &[ScalarExpr]) -> Option<ArrangementFlavor<S, Row, T>> {
|
||||
self.arranged.get(key).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
struct PendingWork<C>
|
||||
where
|
||||
C: Cursor,
|
||||
C::Time: Timestamp,
|
||||
{
|
||||
capability: Capability<C::Time>,
|
||||
cursor: C,
|
||||
batch: C::Storage,
|
||||
}
|
||||
|
||||
/// Handle specialized to `Vec`-based container.
|
||||
type PendingOutputHandle<'a, C, I> = OutputHandle<
|
||||
'a,
|
||||
<C as Cursor>::Time,
|
||||
<I as IntoIterator>::Item,
|
||||
timely::dataflow::channels::pushers::Tee<<C as Cursor>::Time, <I as IntoIterator>::Item>,
|
||||
>;
|
||||
impl<C: Cursor> PendingWork<C>
|
||||
where
|
||||
C::Key: PartialEq,
|
||||
C::Time: Timestamp,
|
||||
{
|
||||
/// Create a new bundle of pending work, from the capability, cursor, and backing storage.
|
||||
fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
|
||||
Self {
|
||||
capability,
|
||||
cursor,
|
||||
batch,
|
||||
}
|
||||
}
|
||||
/// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
|
||||
fn do_work<I, L>(
|
||||
&mut self,
|
||||
key: &Option<C::Key>,
|
||||
logic: &mut L,
|
||||
fuel: &mut usize,
|
||||
output: &mut PendingOutputHandle<'_, C, I>,
|
||||
) where
|
||||
I: IntoIterator,
|
||||
I::Item: Data,
|
||||
L: for<'a, 'b> FnMut(
|
||||
RefOrMut<'b, C::Key>,
|
||||
RefOrMut<'b, C::Val>,
|
||||
&'a C::Time,
|
||||
&'a C::R,
|
||||
) -> I
|
||||
+ 'static,
|
||||
{
|
||||
// Attempt to make progress on this batch.
|
||||
let mut work: usize = 0;
|
||||
let mut session = output.session(&self.capability);
|
||||
if let Some(key) = key {
|
||||
if self.cursor.get_key(&self.batch) != Some(key) {
|
||||
self.cursor.seek_key(&self.batch, key);
|
||||
}
|
||||
if self.cursor.get_key(&self.batch) == Some(key) {
|
||||
while let Some(val) = self.cursor.get_val(&self.batch) {
|
||||
self.cursor.map_times(&self.batch, |time, diff| {
|
||||
for datum in logic(RefOrMut::Ref(key), RefOrMut::Ref(val), time, diff) {
|
||||
session.give(datum);
|
||||
work += 1;
|
||||
}
|
||||
});
|
||||
self.cursor.step_val(&self.batch);
|
||||
if work >= *fuel {
|
||||
*fuel = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
while let Some(key) = self.cursor.get_key(&self.batch) {
|
||||
while let Some(val) = self.cursor.get_val(&self.batch) {
|
||||
self.cursor.map_times(&self.batch, |time, diff| {
|
||||
for datum in logic(RefOrMut::Ref(key), RefOrMut::Ref(val), time, diff) {
|
||||
session.give(datum);
|
||||
work += 1;
|
||||
}
|
||||
});
|
||||
self.cursor.step_val(&self.batch);
|
||||
if work >= *fuel {
|
||||
*fuel = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
self.cursor.step_key(&self.batch);
|
||||
}
|
||||
}
|
||||
*fuel -= work;
|
||||
}
|
||||
}
|
||||
15
src/flow/src/compute/mod.rs
Normal file
15
src/flow/src/compute/mod.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
//! for generate dataflow from logical plan and computing the dataflow
|
||||
mod compute_state;
|
||||
mod context;
|
||||
mod plan;
|
||||
mod render;
|
||||
mod typedefs;
|
||||
mod types;
|
||||
|
||||
pub use context::Context;
|
||||
|
||||
// TODO(discord9): make a simplified version of source/sink
|
||||
// sink: simply get rows out of sinked collection/err collection and put it somewhere
|
||||
// (R, T, D) row of course with since/until frontier to limit
|
||||
|
||||
// source: simply insert stuff into it
|
||||
10
src/flow/src/compute/plan/join/delta_join.rs
Normal file
10
src/flow/src/compute/plan/join/delta_join.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// A delta query is implemented by a set of paths, one for each input.
|
||||
///
|
||||
/// Each delta query path responds to its input changes by repeated lookups
|
||||
/// in arrangements for other join inputs. These lookups require specific
|
||||
/// instructions about which expressions to use as keys. Along the way,
|
||||
/// various closures are applied to filter and project as early as possible.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct DeltaJoinPlan {}
|
||||
9
src/flow/src/compute/plan/join/linear_join.rs
Normal file
9
src/flow/src/compute/plan/join/linear_join.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// TODO(discord9): impl Join
|
||||
/// A plan for the execution of a linear join.
|
||||
///
|
||||
/// A linear join is a sequence of stages, each of which introduces
|
||||
/// a new collection. Each stage is represented by a [LinearStagePlan].
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct LinearJoinPlan {}
|
||||
15
src/flow/src/compute/plan/join/mod.rs
Normal file
15
src/flow/src/compute/plan/join/mod.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
mod delta_join;
|
||||
mod linear_join;
|
||||
pub use delta_join::DeltaJoinPlan;
|
||||
pub use linear_join::LinearJoinPlan;
|
||||
|
||||
/// TODO(discord9)(discord9): impl Join
|
||||
/// A complete enumeration of possible join plans to render.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub enum JoinPlan {
|
||||
/// A join implemented by a linear join.
|
||||
Linear(LinearJoinPlan),
|
||||
/// A join implemented by a delta join.
|
||||
Delta(DeltaJoinPlan),
|
||||
}
|
||||
222
src/flow/src/compute/plan/mod.rs
Normal file
222
src/flow/src/compute/plan/mod.rs
Normal file
@@ -0,0 +1,222 @@
|
||||
mod join;
|
||||
mod reduce;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use join::JoinPlan;
|
||||
pub(crate) use reduce::{
|
||||
convert_indexes_to_skips, AccumulablePlan, BucketedPlan, KeyValPlan, ReducePlan,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::expr::{Id, LocalId, MapFilterProject, ScalarExpr, TableFunc};
|
||||
use crate::repr::{self, Diff, Row};
|
||||
use crate::storage::errors::EvalError;
|
||||
|
||||
/// The forms in which an operator's output is available;
|
||||
/// it can be considered the plan-time equivalent of
|
||||
/// `render::context::CollectionBundle`.
|
||||
///
|
||||
/// These forms are either "raw", representing an unarranged collection,
|
||||
/// or "arranged", representing one that has been arranged by some key.
|
||||
///
|
||||
/// The raw collection, if it exists, may be consumed directly.
|
||||
///
|
||||
/// The arranged collections are slightly more complicated:
|
||||
/// Each key here is attached to a description of how the corresponding
|
||||
/// arrangement is permuted to remove value columns
|
||||
/// that are redundant with key columns. Thus, the first element in each
|
||||
/// tuple of `arranged` is the arrangement key; the second is the map of
|
||||
/// logical output columns to columns in the key or value of the deduplicated
|
||||
/// representation, and the third is a "thinning expression",
|
||||
/// or list of columns to include in the value
|
||||
/// when arranging.
|
||||
///
|
||||
/// For example, assume a 5-column collection is to be arranged by the key
|
||||
/// `[Column(2), Column(0) + Column(3), Column(1)]`.
|
||||
/// Then `Column(1)` and `Column(2)` in the value are redundant with the key, and
|
||||
/// only columns 0, 3, and 4 need to be stored separately.
|
||||
/// The thinning expression will then be `[0, 3, 4]`.
|
||||
///
|
||||
/// The permutation represents how to recover the
|
||||
/// original values (logically `[Column(0), Column(1), Column(2), Column(3), Column(4)]`)
|
||||
/// from the key and value of the arrangement, logically
|
||||
/// `[Column(2), Column(0) + Column(3), Column(1), Column(0), Column(3), Column(4)]`.
|
||||
/// Thus, the permutation in this case should be `{0: 3, 1: 2, 2: 0, 3: 4, 4: 5}`.
|
||||
///
|
||||
/// Note that this description, while true at the time of writing, is merely illustrative;
|
||||
/// users of this struct should not rely on the exact strategy used for generating
|
||||
/// the permutations. As long as clients apply the thinning expression
|
||||
/// when creating arrangements, and permute by the hashmap when reading them,
|
||||
/// the contract of the function where they are generated (`expr::permutation_for_arrangement`)
|
||||
/// ensures that the correct values will be read.
|
||||
#[derive(Default, Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct AvailableCollections {
|
||||
/// Whether the collection exists in unarranged form.
|
||||
pub raw: bool,
|
||||
/// The set of arrangements of the collection, along with a
|
||||
/// column permutation mapping
|
||||
pub arranged: Vec<KeyWithColumnPermutation>,
|
||||
}
|
||||
|
||||
pub type KeyWithColumnPermutation = (Vec<ScalarExpr>, BTreeMap<usize, usize>, Vec<usize>);
|
||||
|
||||
impl AvailableCollections {
|
||||
/// Represent a collection that has no arrangements.
|
||||
pub fn new_raw() -> Self {
|
||||
Self {
|
||||
raw: true,
|
||||
arranged: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// Represent a collection that is arranged in the
|
||||
/// specified ways.
|
||||
pub fn new_arranged(arranged: Vec<KeyWithColumnPermutation>) -> Self {
|
||||
assert!(
|
||||
!arranged.is_empty(),
|
||||
"Invariant violated: at least one collection must exist"
|
||||
);
|
||||
Self {
|
||||
raw: false,
|
||||
arranged,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Rendering Plan
|
||||
///
|
||||
/// TODO(discord9): see if we ever need to support recursive plans
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Plan<T = repr::Timestamp> {
|
||||
/// A collection containing a pre-determined collection.
|
||||
Constant {
|
||||
rows: Result<Vec<(Row, T, Diff)>, EvalError>,
|
||||
},
|
||||
/// A reference to a bound collection.
|
||||
///
|
||||
/// This is commonly either an external reference to an existing source or
|
||||
/// maintained arrangement, or an internal reference to a `Let` identifier.
|
||||
Get {
|
||||
id: Id,
|
||||
keys: AvailableCollections,
|
||||
plan: GetPlan,
|
||||
},
|
||||
/// Binds `value` to `id`, and then results in `body` with that binding.
|
||||
///
|
||||
/// This stage has the effect of sharing `value` across multiple possible
|
||||
/// uses in `body`, and is the only mechanism we have for sharing collection
|
||||
/// information across parts of a dataflow.
|
||||
///
|
||||
/// The binding is not available outside of `body`.
|
||||
Let {
|
||||
/// The local identifier to be used, available to `body` as `Id::Local(id)`.
|
||||
id: LocalId,
|
||||
/// The collection that should be bound to `id`.
|
||||
value: Box<Plan<T>>,
|
||||
/// The collection that results, which is allowed to contain `Get` stages
|
||||
/// that reference `Id::Local(id)`.
|
||||
body: Box<Plan<T>>,
|
||||
},
|
||||
/// Map, Filter, and Project operators.
|
||||
///
|
||||
/// This stage contains work that we would ideally like to fuse to other plan
|
||||
/// stages, but for practical reasons cannot. For example: reduce, threshold,
|
||||
/// and topk stages are not able to absorb this operator.
|
||||
Mfp {
|
||||
/// The input collection.
|
||||
input: Box<Plan<T>>,
|
||||
/// Linear operator to apply to each record.
|
||||
mfp: MapFilterProject,
|
||||
/// Whether the input is from an arrangement, and if so,
|
||||
/// whether we can seek to a specific value therein
|
||||
input_key_val: Option<(Vec<ScalarExpr>, Option<Row>)>,
|
||||
},
|
||||
/// A variable number of output records for each input record.
|
||||
///
|
||||
/// This stage is a bit of a catch-all for logic that does not easily fit in
|
||||
/// map stages. This includes table valued functions, but also functions of
|
||||
/// multiple arguments, and functions that modify the sign of updates.
|
||||
///
|
||||
/// This stage allows a `MapFilterProject` operator to be fused to its output,
|
||||
/// and this can be very important as otherwise the output of `func` is just
|
||||
/// appended to the input record, for as many outputs as it has. This has the
|
||||
/// unpleasant default behavior of repeating potentially large records that
|
||||
/// are being unpacked, producing quadratic output in those cases. Instead,
|
||||
/// in these cases use a `mfp` member that projects away these large fields.
|
||||
FlatMap {
|
||||
/// The input collection.
|
||||
input: Box<Plan<T>>,
|
||||
/// The variable-record emitting function.
|
||||
func: TableFunc,
|
||||
/// Expressions that for each row prepare the arguments to `func`.
|
||||
exprs: Vec<ScalarExpr>,
|
||||
/// Linear operator to apply to each record produced by `func`.
|
||||
mfp: MapFilterProject,
|
||||
/// The particular arrangement of the input we expect to use,
|
||||
/// if any
|
||||
input_key: Option<Vec<ScalarExpr>>,
|
||||
},
|
||||
/// A multiway relational equijoin, with fused map, filter, and projection.
|
||||
///
|
||||
/// This stage performs a multiway join among `inputs`, using the equality
|
||||
/// constraints expressed in `plan`. The plan also describes the implementation
|
||||
/// strategy we will use, and any pushed down per-record work.
|
||||
Join {
|
||||
/// An ordered list of inputs that will be joined.
|
||||
inputs: Vec<Plan<T>>,
|
||||
/// Detailed information about the implementation of the join.
|
||||
///
|
||||
/// This includes information about the implementation strategy, but also
|
||||
/// any map, filter, project work that we might follow the join with, but
|
||||
/// potentially pushed down into the implementation of the join.
|
||||
plan: JoinPlan,
|
||||
},
|
||||
/// Aggregation by key.
|
||||
Reduce {
|
||||
/// The input collection.
|
||||
input: Box<Plan<T>>,
|
||||
/// A plan for changing input records into key, value pairs.
|
||||
key_val_plan: KeyValPlan,
|
||||
/// A plan for performing the reduce.
|
||||
///
|
||||
/// The implementation of reduction has several different strategies based
|
||||
/// on the properties of the reduction, and the input itself. Please check
|
||||
/// out the documentation for this type for more detail.
|
||||
plan: ReducePlan,
|
||||
/// The particular arrangement of the input we expect to use,
|
||||
/// if any
|
||||
input_key: Option<Vec<ScalarExpr>>,
|
||||
},
|
||||
}
|
||||
|
||||
/// TODO(discord9): impl GetPlan
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum GetPlan {
|
||||
/// Simply pass input arrangements on to the next stage.
|
||||
PassArrangements,
|
||||
/// Using the supplied key, optionally seek the row, and apply the MFP.
|
||||
Arrangement(Vec<ScalarExpr>, Option<Row>, MapFilterProject),
|
||||
/// Scan the input collection (unarranged) and apply the MFP.
|
||||
Collection(MapFilterProject),
|
||||
}
|
||||
|
||||
/// Returns bucket sizes, descending, suitable for hierarchical decomposition of an operator, based
|
||||
/// on the expected number of rows that will have the same group key.
|
||||
fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64> {
|
||||
let mut buckets = vec![];
|
||||
let mut current = 16;
|
||||
|
||||
// Plan for 4B records in the expected case if the user didn't specify a group size.
|
||||
let limit = expected_group_size.unwrap_or(4_000_000_000);
|
||||
|
||||
// Distribute buckets in powers of 16, so that we can strike a balance between how many inputs
|
||||
// each layer gets from the preceding layer, while also limiting the number of layers.
|
||||
while current < limit {
|
||||
buckets.push(current);
|
||||
current = current.saturating_mul(16);
|
||||
}
|
||||
|
||||
buckets.reverse();
|
||||
buckets
|
||||
}
|
||||
233
src/flow/src/compute/plan/reduce.rs
Normal file
233
src/flow/src/compute/plan/reduce.rs
Normal file
@@ -0,0 +1,233 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::expr::{AggregateExpr, AggregateFunc, MapFilterProject, SafeMfpPlan};
|
||||
|
||||
/// This enum represents the three potential types of aggregations.
|
||||
#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
|
||||
pub enum ReductionType {
|
||||
/// Accumulable functions can be subtracted from (are invertible), and associative.
|
||||
/// We can compute these results by moving some data to the diff field under arbitrary
|
||||
/// changes to inputs. Examples include sum or count.
|
||||
Accumulable,
|
||||
/// Hierarchical functions are associative, which means we can split up the work of
|
||||
/// computing them across subsets. Note that hierarchical reductions should also
|
||||
/// reduce the data in some way, as otherwise rendering them hierarchically is not
|
||||
/// worth it. Examples include min or max.
|
||||
Hierarchical,
|
||||
/// Basic, for lack of a better word, are functions that are neither accumulable
|
||||
/// nor hierarchical. Examples include jsonb_agg.
|
||||
Basic,
|
||||
}
|
||||
|
||||
/// Plan for extracting keys and values in preparation for a reduction.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct KeyValPlan {
|
||||
/// Extracts the columns used as the key.
|
||||
pub key_plan: SafeMfpPlan,
|
||||
/// Extracts the columns used to feed the aggregations.
|
||||
pub val_plan: SafeMfpPlan,
|
||||
}
|
||||
|
||||
/// Transforms a vector containing indexes of needed columns into one containing
|
||||
/// the "skips" an iterator over a Row would need to perform to see those values.
|
||||
///
|
||||
/// This function requires that all of the elements in `indexes` are strictly
|
||||
/// increasing.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
|
||||
/// ```
|
||||
pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
|
||||
for i in 1..indexes.len() {
|
||||
assert!(
|
||||
indexes[i - 1] < indexes[i],
|
||||
"convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
|
||||
indexes,
|
||||
);
|
||||
}
|
||||
|
||||
for i in (1..indexes.len()).rev() {
|
||||
indexes[i] -= indexes[i - 1];
|
||||
indexes[i] -= 1;
|
||||
}
|
||||
|
||||
indexes
|
||||
}
|
||||
|
||||
/// A `ReducePlan` provides a concise description for how we will
|
||||
/// execute a given reduce expression.
|
||||
///
|
||||
/// The provided reduce expression can have no
|
||||
/// aggregations, in which case its just a `Distinct` and otherwise
|
||||
/// it's composed of a combination of accumulable, hierarchical and
|
||||
/// basic aggregations.
|
||||
///
|
||||
/// We want to try to centralize as much decision making about the
|
||||
/// shape / general computation of the rendered dataflow graph
|
||||
/// in this plan, and then make actually rendering the graph
|
||||
/// be as simple (and compiler verifiable) as possible.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub enum ReducePlan {
|
||||
/// Plan for not computing any aggregations, just determining the set of
|
||||
/// distinct keys.
|
||||
Distinct,
|
||||
/// Plan for computing only accumulable aggregations.
|
||||
Accumulable(AccumulablePlan),
|
||||
/// Plan for computing only hierarchical aggregations.
|
||||
Hierarchical(HierarchicalPlan),
|
||||
/// Plan for computing only basic aggregations.
|
||||
Basic(BasicPlan),
|
||||
/// Plan for computing a mix of different kinds of aggregations.
|
||||
/// We need to do extra work here to reassemble results back in the
|
||||
/// requested order.
|
||||
Collation(CollationPlan),
|
||||
}
|
||||
|
||||
/// Plan for computing a set of accumulable aggregations.
|
||||
///
|
||||
/// We fuse all of the accumulable aggregations together
|
||||
/// and compute them with one dataflow fragment. We need to
|
||||
/// be careful to separate out the aggregations that
|
||||
/// apply only to the distinct set of values. We need
|
||||
/// to apply a distinct operator to those before we
|
||||
/// combine them with everything else.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct AccumulablePlan {
|
||||
/// All of the aggregations we were asked to compute, stored
|
||||
/// in order.
|
||||
pub full_aggrs: Vec<AggregateExpr>,
|
||||
/// All of the non-distinct accumulable aggregates.
|
||||
/// Each element represents:
|
||||
/// (index of the aggregation among accumulable aggregations,
|
||||
/// index of the datum among inputs, aggregation expr)
|
||||
/// These will all be rendered together in one dataflow fragment.
|
||||
pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
|
||||
/// Same as above but for all of the `DISTINCT` accumulable aggregations.
|
||||
pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
|
||||
}
|
||||
|
||||
// TODO(discord9): others
|
||||
|
||||
/// Plan for computing a set of hierarchical aggregations.
|
||||
///
|
||||
/// In the append-only setting we can render them in-place
|
||||
/// with monotonic plans, but otherwise, we need to render
|
||||
/// them with a reduction tree that splits the inputs into
|
||||
/// small, and then progressively larger, buckets
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub enum HierarchicalPlan {
|
||||
/// Plan hierarchical aggregations under monotonic inputs.
|
||||
Monotonic(MonotonicPlan),
|
||||
/// Plan for hierarchical aggregations under non-monotonic inputs.
|
||||
Bucketed(BucketedPlan),
|
||||
}
|
||||
|
||||
/// Plan for computing a set of hierarchical aggregations with a
|
||||
/// monotonic input.
|
||||
///
|
||||
/// Here, the aggregations will be rendered in place. We don't
|
||||
/// need to worry about retractions because the inputs are
|
||||
/// append only, so we can change our computation to
|
||||
/// only retain the "best" value in the diff field, instead
|
||||
/// of holding onto all values.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct MonotonicPlan {
|
||||
/// All of the aggregations we were asked to compute.
|
||||
pub aggr_funcs: Vec<AggregateFunc>,
|
||||
/// Set of "skips" or calls to `nth()` an iterator needs to do over
|
||||
/// the input to extract the relevant datums.
|
||||
pub skips: Vec<usize>,
|
||||
/// True if the input is logically but not physically monotonic,
|
||||
/// and the operator must first consolidate the inputs to remove
|
||||
/// potential negations.
|
||||
pub must_consolidate: bool,
|
||||
}
|
||||
|
||||
/// Plan for computing a set of hierarchical aggregations
|
||||
/// with non-monotonic inputs.
|
||||
///
|
||||
/// To perform hierarchical aggregations with stable runtimes
|
||||
/// under updates we'll subdivide the group key into buckets, compute
|
||||
/// the reduction in each of those subdivided buckets and then combine
|
||||
/// the results into a coarser bucket (one that represents a larger
|
||||
/// fraction of the original input) and redo the reduction in another
|
||||
/// layer. Effectively, we'll construct a min / max heap out of a series
|
||||
/// of reduce operators (each one is a separate layer).
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct BucketedPlan {
|
||||
/// All of the aggregations we were asked to compute.
|
||||
pub aggr_funcs: Vec<AggregateFunc>,
|
||||
/// Set of "skips" or calls to `nth()` an iterator needs to do over
|
||||
/// the input to extract the relevant datums.
|
||||
pub skips: Vec<usize>,
|
||||
/// The number of buckets in each layer of the reduction tree. Should
|
||||
/// be decreasing, and ideally, a power of two so that we can easily
|
||||
/// distribute values to buckets with `value.hashed() % buckets[layer]`.
|
||||
pub buckets: Vec<u64>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub enum BasicPlan {}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct CollationPlan {}
|
||||
|
||||
/// Determines whether a function can be accumulated in an update's "difference" field,
|
||||
/// and whether it can be subjected to recursive (hierarchical) aggregation.
|
||||
///
|
||||
/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
|
||||
/// which can be accumulated in-place using the addition operation on the type. Aggregations
|
||||
/// that indicate they are accumulable will still need to provide an action that takes their
|
||||
/// data and introduces it as a difference, and the post-processing when the accumulated value
|
||||
/// is presented as data.
|
||||
///
|
||||
/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
|
||||
/// increasingly large subsets of each key. This has the intended property that no invocation
|
||||
/// is on a significantly large set of values (and so, no incremental update needs to reform
|
||||
/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
|
||||
/// input stream is append-only as then we only need to retain the "currently winning" value.
|
||||
/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
|
||||
fn reduction_type(func: &AggregateFunc) -> ReductionType {
|
||||
match func {
|
||||
AggregateFunc::SumInt16
|
||||
| AggregateFunc::SumInt32
|
||||
| AggregateFunc::SumInt64
|
||||
| AggregateFunc::SumUInt16
|
||||
| AggregateFunc::SumUInt32
|
||||
| AggregateFunc::SumUInt64
|
||||
| AggregateFunc::SumFloat32
|
||||
| AggregateFunc::SumFloat64
|
||||
| AggregateFunc::Count
|
||||
| AggregateFunc::Any
|
||||
| AggregateFunc::All => ReductionType::Accumulable,
|
||||
AggregateFunc::MaxInt16
|
||||
| AggregateFunc::MaxInt32
|
||||
| AggregateFunc::MaxInt64
|
||||
| AggregateFunc::MaxUInt16
|
||||
| AggregateFunc::MaxUInt32
|
||||
| AggregateFunc::MaxUInt64
|
||||
| AggregateFunc::MaxFloat32
|
||||
| AggregateFunc::MaxFloat64
|
||||
| AggregateFunc::MaxBool
|
||||
| AggregateFunc::MaxString
|
||||
| AggregateFunc::MaxDate
|
||||
| AggregateFunc::MaxTimestamp
|
||||
| AggregateFunc::MaxTimestampTz
|
||||
| AggregateFunc::MinInt16
|
||||
| AggregateFunc::MinInt32
|
||||
| AggregateFunc::MinInt64
|
||||
| AggregateFunc::MinUInt16
|
||||
| AggregateFunc::MinUInt32
|
||||
| AggregateFunc::MinUInt64
|
||||
| AggregateFunc::MinFloat32
|
||||
| AggregateFunc::MinFloat64
|
||||
| AggregateFunc::MinBool
|
||||
| AggregateFunc::MinString
|
||||
| AggregateFunc::MinDate
|
||||
| AggregateFunc::MinTimestamp
|
||||
| AggregateFunc::MinTimestampTz => ReductionType::Hierarchical,
|
||||
_ => ReductionType::Basic,
|
||||
}
|
||||
}
|
||||
60
src/flow/src/compute/render/error.rs
Normal file
60
src/flow/src/compute/render/error.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use std::hash::Hash;
|
||||
|
||||
use differential_dataflow::ExchangeData;
|
||||
|
||||
use crate::repr::Row;
|
||||
|
||||
/// Used to make possibly-validating code generic: think of this as a kind of `MaybeResult`,
|
||||
/// specialized for use in compute. Validation code will only run when the error constructor is
|
||||
/// Some.
|
||||
pub(super) trait MaybeValidatingRow<T, E>: ExchangeData + Hash {
|
||||
fn ok(t: T) -> Self;
|
||||
fn into_error() -> Option<fn(E) -> Self>;
|
||||
}
|
||||
|
||||
impl<E> MaybeValidatingRow<Row, E> for Row {
|
||||
fn ok(t: Row) -> Self {
|
||||
t
|
||||
}
|
||||
|
||||
fn into_error() -> Option<fn(E) -> Self> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> MaybeValidatingRow<(), E> for () {
|
||||
fn ok(t: ()) -> Self {
|
||||
t
|
||||
}
|
||||
|
||||
fn into_error() -> Option<fn(E) -> Self> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, R> MaybeValidatingRow<Vec<R>, E> for Vec<R>
|
||||
where
|
||||
R: ExchangeData + Hash,
|
||||
{
|
||||
fn ok(t: Vec<R>) -> Self {
|
||||
t
|
||||
}
|
||||
|
||||
fn into_error() -> Option<fn(E) -> Self> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, E> MaybeValidatingRow<T, E> for Result<T, E>
|
||||
where
|
||||
T: ExchangeData + Hash,
|
||||
E: ExchangeData + Hash,
|
||||
{
|
||||
fn ok(row: T) -> Self {
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
fn into_error() -> Option<fn(E) -> Self> {
|
||||
Some(Err)
|
||||
}
|
||||
}
|
||||
626
src/flow/src/compute/render/mod.rs
Normal file
626
src/flow/src/compute/render/mod.rs
Normal file
@@ -0,0 +1,626 @@
|
||||
//! for building the flow graph from PLAN
|
||||
//! this is basically the last step before actually running the flow graph
|
||||
|
||||
use differential_dataflow::lattice::Lattice;
|
||||
use differential_dataflow::AsCollection;
|
||||
use timely::communication::Allocate;
|
||||
use timely::dataflow::operators::capture::Extract;
|
||||
use timely::dataflow::operators::{Capture, ToStream};
|
||||
use timely::dataflow::Scope;
|
||||
use timely::progress::timestamp::Refines;
|
||||
use timely::progress::Timestamp;
|
||||
use timely::worker::Worker as TimelyWorker;
|
||||
|
||||
use super::types::DataflowDescription;
|
||||
use crate::compute::compute_state::ComputeState;
|
||||
use crate::compute::context::CollectionBundle;
|
||||
use crate::compute::plan::Plan;
|
||||
use crate::compute::types::BuildDesc;
|
||||
use crate::compute::Context;
|
||||
use crate::expr::Id;
|
||||
use crate::repr::{self, Row};
|
||||
use crate::storage::errors::DataflowError;
|
||||
|
||||
mod error;
|
||||
mod reduce;
|
||||
|
||||
/// Assemble the "compute" side of a dataflow, i.e. all but the sources.
|
||||
///
|
||||
/// This method imports sources from provided assets, and then builds the remaining
|
||||
/// dataflow using "compute-local" assets like shared arrangements, and producing
|
||||
/// both arrangements and sinks.
|
||||
pub fn build_compute_dataflow<A: Allocate>(
|
||||
timely_worker: &mut TimelyWorker<A>,
|
||||
compute_state: &mut ComputeState,
|
||||
dataflow: DataflowDescription<Plan, ()>,
|
||||
) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub trait RenderTimestamp: Timestamp + Lattice + Refines<repr::Timestamp> {
|
||||
/// The system timestamp component of the timestamp.
|
||||
///
|
||||
/// This is useful for manipulating the system time, as when delaying
|
||||
/// updates for subsequent cancellation, as with monotonic reduction.
|
||||
fn system_time(&mut self) -> &mut repr::Timestamp;
|
||||
/// Effects a system delay in terms of the timestamp summary.
|
||||
fn system_delay(delay: repr::Timestamp) -> <Self as Timestamp>::Summary;
|
||||
/// The event timestamp component of the timestamp.
|
||||
fn event_time(&mut self) -> &mut repr::Timestamp;
|
||||
/// Effects an event delay in terms of the timestamp summary.
|
||||
fn event_delay(delay: repr::Timestamp) -> <Self as Timestamp>::Summary;
|
||||
/// Steps the timestamp back so that logical compaction to the output will
|
||||
/// not conflate `self` with any historical times.
|
||||
fn step_back(&self) -> Self;
|
||||
}
|
||||
|
||||
impl RenderTimestamp for repr::Timestamp {
|
||||
fn system_time(&mut self) -> &mut repr::Timestamp {
|
||||
self
|
||||
}
|
||||
fn system_delay(delay: repr::Timestamp) -> <Self as Timestamp>::Summary {
|
||||
delay
|
||||
}
|
||||
fn event_time(&mut self) -> &mut repr::Timestamp {
|
||||
self
|
||||
}
|
||||
fn event_delay(delay: repr::Timestamp) -> <Self as Timestamp>::Summary {
|
||||
delay
|
||||
}
|
||||
fn step_back(&self) -> Self {
|
||||
self.saturating_sub(1)
|
||||
}
|
||||
}
|
||||
|
||||
// This implementation block allows child timestamps to vary from parent timestamps.
|
||||
impl<G> Context<G, Row>
|
||||
where
|
||||
G: Scope,
|
||||
G::Timestamp: RenderTimestamp,
|
||||
{
|
||||
/// render plan and insert into context with given GlobalId
|
||||
pub(crate) fn build_object(&mut self, object: BuildDesc<Plan>) {
|
||||
// First, transform the relation expression into a render plan.
|
||||
let bundle = self.render_plan(object.plan);
|
||||
self.insert_id(Id::Global(object.id), bundle);
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Context<S, Row>
|
||||
where
|
||||
S: Scope,
|
||||
S::Timestamp: RenderTimestamp,
|
||||
{
|
||||
/// Renders a plan to a differential dataflow, producing the collection of results.
|
||||
///
|
||||
/// The return type reflects the uncertainty about the data representation, perhaps
|
||||
/// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
|
||||
pub fn render_plan(&mut self, plan: Plan) -> CollectionBundle<S, Row> {
|
||||
match plan {
|
||||
Plan::Constant { rows } => {
|
||||
let (rows, errs) = match rows {
|
||||
Ok(rows) => (rows, Vec::new()),
|
||||
Err(err) => (Vec::new(), vec![err]),
|
||||
};
|
||||
let since_frontier = self.since_frontier.clone();
|
||||
let until = self.until_frontier.clone();
|
||||
let ok_collection = rows
|
||||
.into_iter()
|
||||
.filter_map(move |(row, mut time, diff)| {
|
||||
time.advance_by(since_frontier.borrow());
|
||||
if !until.less_equal(&time) {
|
||||
Some((
|
||||
row,
|
||||
<S::Timestamp as Refines<repr::Timestamp>>::to_inner(time),
|
||||
diff,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.to_stream(&mut self.scope)
|
||||
.as_collection();
|
||||
let mut error_time: repr::Timestamp = Timestamp::minimum();
|
||||
error_time.advance_by(self.since_frontier.borrow());
|
||||
let err_collection = errs
|
||||
.into_iter()
|
||||
.map(move |e| {
|
||||
(
|
||||
DataflowError::from(e),
|
||||
<S::Timestamp as Refines<repr::Timestamp>>::to_inner(error_time),
|
||||
1,
|
||||
)
|
||||
})
|
||||
.to_stream(&mut self.scope)
|
||||
.as_collection();
|
||||
CollectionBundle::from_collections(ok_collection, err_collection)
|
||||
}
|
||||
Plan::Get { id, keys, plan } => {
|
||||
// Recover the collection from `self` and then apply `mfp` to it.
|
||||
// If `mfp` happens to be trivial, we can just return the collection.
|
||||
let mut collection = self
|
||||
.lookup_id(id)
|
||||
.unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
|
||||
match plan {
|
||||
crate::compute::plan::GetPlan::PassArrangements => {
|
||||
// Assert that each of `keys` are present in `collection`.
|
||||
if !keys
|
||||
.arranged
|
||||
.iter()
|
||||
.all(|(key, _, _)| collection.arranged.contains_key(key))
|
||||
{
|
||||
let not_included: Vec<_> = keys
|
||||
.arranged
|
||||
.iter()
|
||||
.filter(|(key, _, _)| !collection.arranged.contains_key(key))
|
||||
.map(|(key, _, _)| key)
|
||||
.collect();
|
||||
panic!(
|
||||
"Those keys {:?} is not included in collections keys:{:?}",
|
||||
not_included,
|
||||
collection.arranged.keys().cloned().collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
assert!(keys.raw <= collection.collection.is_some());
|
||||
// Retain only those keys we want to import.
|
||||
collection.arranged.retain(|key, _val| {
|
||||
keys.arranged.iter().any(|(key2, _, _)| key2 == key)
|
||||
});
|
||||
collection
|
||||
}
|
||||
crate::compute::plan::GetPlan::Arrangement(key, row, mfp) => {
|
||||
let (oks, errs) = collection.as_collection_core(
|
||||
mfp,
|
||||
Some((key, row)),
|
||||
self.until_frontier.clone(),
|
||||
);
|
||||
CollectionBundle::from_collections(oks, errs)
|
||||
}
|
||||
crate::compute::plan::GetPlan::Collection(mfp) => {
|
||||
let (oks, errs) =
|
||||
collection.as_collection_core(mfp, None, self.until_frontier.clone());
|
||||
CollectionBundle::from_collections(oks, errs)
|
||||
}
|
||||
}
|
||||
}
|
||||
Plan::Let { id, value, body } => {
|
||||
// Render `value` and bind it to `id`. Complain if this shadows an id.
|
||||
let value = self.render_plan(*value);
|
||||
let prebound = self.insert_id(Id::Local(id), value);
|
||||
assert!(prebound.is_none());
|
||||
|
||||
let body = self.render_plan(*body);
|
||||
self.remove_id(Id::Local(id));
|
||||
body
|
||||
}
|
||||
Plan::Mfp {
|
||||
input,
|
||||
mfp,
|
||||
input_key_val,
|
||||
} => {
|
||||
let input = self.render_plan(*input);
|
||||
// If `mfp` is non-trivial, we should apply it and produce a collection.
|
||||
if mfp.is_identity() {
|
||||
input
|
||||
} else {
|
||||
let (oks, errs) =
|
||||
input.as_collection_core(mfp, input_key_val, self.until_frontier.clone());
|
||||
CollectionBundle::from_collections(oks, errs)
|
||||
}
|
||||
}
|
||||
Plan::Reduce {
|
||||
input,
|
||||
key_val_plan,
|
||||
plan,
|
||||
input_key,
|
||||
} => {
|
||||
let input = self.render_plan(*input);
|
||||
self.render_reduce(input, key_val_plan, plan, input_key)
|
||||
}
|
||||
_ => todo!("To be implemented"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::any::Any;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::rc::Rc;
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use differential_dataflow::input::{Input, InputSession};
|
||||
use differential_dataflow::Collection;
|
||||
use timely::dataflow::scopes::Child;
|
||||
use timely::dataflow::Stream;
|
||||
use timely::Config;
|
||||
|
||||
use super::*;
|
||||
use crate::compute::plan::{
|
||||
AccumulablePlan, AvailableCollections, GetPlan, KeyValPlan, ReducePlan,
|
||||
};
|
||||
use crate::expr::{
|
||||
AggregateExpr, BinaryFunc, GlobalId, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr,
|
||||
UnaryFunc,
|
||||
};
|
||||
use crate::repr::Diff;
|
||||
type OkStream<G> = Stream<G, (Row, repr::Timestamp, Diff)>;
|
||||
type ErrStream<G> = Stream<G, (DataflowError, repr::Timestamp, Diff)>;
|
||||
type OkCollection<G> = Collection<G, Row, Diff>;
|
||||
type ErrCollection<G> = Collection<G, DataflowError, Diff>;
|
||||
/// used as a token to prevent certain resources from being dropped
|
||||
type AnyToken = Rc<dyn Any>;
|
||||
struct MockSourceToken {
|
||||
handle: InputSession<repr::Timestamp, Row, Diff>,
|
||||
err_handle: InputSession<repr::Timestamp, DataflowError, Diff>,
|
||||
}
|
||||
|
||||
fn mock_input_session(input: &mut InputSession<repr::Timestamp, Row, Diff>, cnt: i64) {
|
||||
// TODO: mock a cpu usage monotonic input with timestamp
|
||||
// cpu, mem, ts
|
||||
// f32, f32, DateTime
|
||||
let schema = [
|
||||
ConcreteDataType::float32_datatype(),
|
||||
ConcreteDataType::float32_datatype(),
|
||||
ConcreteDataType::datetime_datatype(),
|
||||
];
|
||||
let arrs = (0..cnt).map(|i| (i as f32 / cnt as f32, i as f32 / cnt as f32, i));
|
||||
// need more mechanism to make timestamp also timestamp here
|
||||
for (cpu, mem, ts) in arrs {
|
||||
input.update(
|
||||
Row::pack(vec![cpu.into(), mem.into(), Value::DateTime(ts.into())]),
|
||||
1,
|
||||
);
|
||||
input.advance_to(ts as u64)
|
||||
}
|
||||
input.flush();
|
||||
}
|
||||
|
||||
// a simple test to see if the dataflow can be built and run
|
||||
fn exec_dataflow(
|
||||
input_id: Vec<Id>,
|
||||
dataflow: DataflowDescription<Plan>,
|
||||
sink_ids: Vec<GlobalId>,
|
||||
output_keys: Vec<Option<Vec<ScalarExpr>>>,
|
||||
input_mock_length: i64,
|
||||
) {
|
||||
timely::execute(Config::thread(), move |worker| {
|
||||
println!("worker: {:?}", worker.index());
|
||||
let mut input = InputSession::<repr::Timestamp, Row, Diff>::new();
|
||||
worker.dataflow_named(
|
||||
"ProofOfConcept",
|
||||
|scope: &mut Child<'_, _, repr::Timestamp>| {
|
||||
let mut test_ctx =
|
||||
Context::<_, Row, _>::for_dataflow_in(&dataflow, scope.clone());
|
||||
|
||||
let ok_collection = input.to_collection(scope);
|
||||
let (err_handle, err_collection) = scope.new_collection();
|
||||
let input_collection =
|
||||
CollectionBundle::<_, _, repr::Timestamp>::from_collections(
|
||||
ok_collection,
|
||||
err_collection,
|
||||
);
|
||||
|
||||
// TODO: generate `import_sources` from `dataflow.source_imports`
|
||||
let import_sources: Vec<_> = input_id
|
||||
.clone()
|
||||
.into_iter()
|
||||
.zip(vec![input_collection])
|
||||
.collect();
|
||||
|
||||
// import sources
|
||||
for (id, collection) in import_sources {
|
||||
test_ctx.insert_id(id, collection);
|
||||
}
|
||||
|
||||
for build_desc in &dataflow.objects_to_build {
|
||||
test_ctx.build_object(build_desc.clone());
|
||||
}
|
||||
|
||||
dbg!(test_ctx.bindings.keys());
|
||||
|
||||
// TODO: export sinks
|
||||
|
||||
for (sink, output_key) in sink_ids.iter().zip(output_keys.iter()) {
|
||||
let sink = *sink;
|
||||
println!("Inspecting sink {:?}", sink.clone());
|
||||
let inspect = test_ctx.lookup_id(Id::Global(sink)).unwrap();
|
||||
dbg!(inspect.collection.is_some());
|
||||
dbg!(inspect.arranged.keys());
|
||||
let inspect = inspect.as_specific_collection(output_key.as_deref());
|
||||
inspect
|
||||
.0
|
||||
.inspect(move |x| println!("inspect {:?} {:?}", sink.clone(), x));
|
||||
}
|
||||
},
|
||||
);
|
||||
mock_input_session(&mut input, input_mock_length);
|
||||
})
|
||||
.expect("Computation terminated abnormally");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_poc_reduce_group_by() {
|
||||
// 1. build dataflow with input collection connected
|
||||
// 2. give input
|
||||
// type annotation is needed to prevent rust-analyzer to give up type deduction
|
||||
|
||||
// simple give dataflow information
|
||||
// will be build by given dataflow information from other nodes later
|
||||
// key is the third column
|
||||
let place_holder =
|
||||
ScalarExpr::Literal(Ok(Value::Boolean(true)), ConcreteDataType::int64_datatype());
|
||||
|
||||
let count_col = |i: usize| AggregateExpr {
|
||||
func: crate::expr::AggregateFunc::Count,
|
||||
expr: ScalarExpr::Column(i),
|
||||
distinct: false,
|
||||
};
|
||||
let sum_col = |i: usize| AggregateExpr {
|
||||
func: crate::expr::AggregateFunc::SumFloat32,
|
||||
expr: ScalarExpr::Column(i),
|
||||
distinct: false,
|
||||
};
|
||||
// equal to `SELECT minute, SUM(cpu) FROM input GROUP BY ts/300 as minute;
|
||||
// cpu, mem, ts
|
||||
// --map--> cpu, mem, ts/300
|
||||
// --reduce--> ts/300, AVG(cpu), AVG(mem)
|
||||
let cast_datetime = ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::CastDatetimeToInt64,
|
||||
expr: Box::new(ScalarExpr::Column(2)),
|
||||
};
|
||||
let ts_div_5 = ScalarExpr::CallBinary {
|
||||
func: BinaryFunc::DivInt64,
|
||||
expr1: Box::new(cast_datetime),
|
||||
expr2: Box::new(ScalarExpr::Literal(
|
||||
Ok(Value::Int64(5.into())),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)),
|
||||
};
|
||||
let cast_int64_to_float32 = |i: usize| ScalarExpr::CallUnary {
|
||||
func: UnaryFunc::CastInt64ToFloat32,
|
||||
expr: Box::new(ScalarExpr::Column(i)),
|
||||
};
|
||||
let reduce_group_by_window = vec![
|
||||
// cpu, mem, ts
|
||||
// --reduce--> ts/300, SUM(cpu), SUM(mem), COUNT(cpu), COUNT(mem)
|
||||
// -- map --> ts/300, AVG(cpu), AVG(mem)
|
||||
BuildDesc {
|
||||
id: GlobalId::User(0),
|
||||
plan: Plan::Reduce {
|
||||
input: Box::new(Plan::Get {
|
||||
id: Id::Global(GlobalId::System(0)),
|
||||
keys: AvailableCollections::new_raw(),
|
||||
plan: GetPlan::Collection(
|
||||
MapFilterProject::new(3).map([ts_div_5]).project([0, 1, 3]),
|
||||
),
|
||||
}),
|
||||
key_val_plan: KeyValPlan {
|
||||
key_plan: SafeMfpPlan {
|
||||
mfp: MapFilterProject::new(3).project([2]),
|
||||
},
|
||||
val_plan: SafeMfpPlan {
|
||||
mfp: MapFilterProject::new(3).project([0, 1]),
|
||||
},
|
||||
},
|
||||
// --reduce--> ts/300(key), SUM(cpu), SUM(mem), COUNT(cpu), COUNT(mem)
|
||||
plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![sum_col(0), sum_col(1), count_col(0), count_col(1)],
|
||||
simple_aggrs: vec![
|
||||
(0, 0, sum_col(0)),
|
||||
(1, 1, sum_col(1)),
|
||||
(2, 0, count_col(0)),
|
||||
(3, 1, count_col(1)),
|
||||
],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
input_key: None,
|
||||
},
|
||||
},
|
||||
// 0 1 2 3 4
|
||||
// ts/300(key), SUM(cpu), SUM(mem), COUNT(cpu), COUNT(mem),
|
||||
// -- map --> AVG(cpu), AVG(mem), ts/300
|
||||
BuildDesc {
|
||||
id: GlobalId::User(1),
|
||||
plan: Plan::Get {
|
||||
id: Id::Global(GlobalId::User(0)),
|
||||
// not used since plan is GetPlan::Arrangement
|
||||
keys: AvailableCollections::new_raw(),
|
||||
plan: GetPlan::Arrangement(
|
||||
vec![ScalarExpr::Column(0)],
|
||||
None,
|
||||
MapFilterProject::new(5)
|
||||
.map([
|
||||
ScalarExpr::CallBinary {
|
||||
func: BinaryFunc::DivFloat32,
|
||||
expr1: Box::new(ScalarExpr::Column(1)),
|
||||
expr2: Box::new(cast_int64_to_float32(3)),
|
||||
},
|
||||
ScalarExpr::CallBinary {
|
||||
func: BinaryFunc::DivFloat32,
|
||||
expr1: Box::new(ScalarExpr::Column(2)),
|
||||
expr2: Box::new(cast_int64_to_float32(4)),
|
||||
},
|
||||
])
|
||||
.project([0, 5, 6]),
|
||||
),
|
||||
},
|
||||
},
|
||||
];
|
||||
let input_id = vec![Id::Global(GlobalId::System(0))];
|
||||
let dataflow = {
|
||||
let mut dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
|
||||
dataflow.objects_to_build = reduce_group_by_window;
|
||||
dataflow
|
||||
};
|
||||
let sink_ids = [GlobalId::User(0), GlobalId::User(1)];
|
||||
exec_dataflow(
|
||||
input_id.clone(),
|
||||
dataflow.clone(),
|
||||
sink_ids.to_vec(),
|
||||
vec![Some(vec![ScalarExpr::Column(0)]), None],
|
||||
10,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_poc_reduce_count() {
|
||||
// 1. build dataflow with input collection connected
|
||||
// 2. give input
|
||||
// type annotation is needed to prevent rust-analyzer to give up type deduction
|
||||
|
||||
// simple give dataflow information
|
||||
// will be build by given dataflow information from other nodes later
|
||||
// key is the third column
|
||||
let place_holder =
|
||||
ScalarExpr::Literal(Ok(Value::Boolean(true)), ConcreteDataType::int64_datatype());
|
||||
let key_plan = SafeMfpPlan {
|
||||
mfp: MapFilterProject::new(3)
|
||||
.map([place_holder.clone()])
|
||||
.project([3]),
|
||||
};
|
||||
let val_plan = SafeMfpPlan {
|
||||
mfp: MapFilterProject::new(3).project([0, 1, 2]),
|
||||
};
|
||||
let count = AggregateExpr {
|
||||
func: crate::expr::AggregateFunc::Count,
|
||||
expr: place_holder,
|
||||
distinct: false,
|
||||
};
|
||||
// equal to `SELECT COUNT(*) FROM input;`
|
||||
let reduce_group_by_window = vec![
|
||||
// count(true)
|
||||
BuildDesc {
|
||||
id: GlobalId::User(0),
|
||||
plan: Plan::Reduce {
|
||||
input: Box::new(Plan::Get {
|
||||
id: Id::Global(GlobalId::System(0)),
|
||||
keys: AvailableCollections::new_raw(),
|
||||
plan: GetPlan::Collection(MapFilterProject::new(3)),
|
||||
}),
|
||||
key_val_plan: KeyValPlan { key_plan, val_plan },
|
||||
plan: ReducePlan::Accumulable(AccumulablePlan {
|
||||
full_aggrs: vec![count.clone()],
|
||||
simple_aggrs: vec![(0, 0, count)],
|
||||
distinct_aggrs: vec![],
|
||||
}),
|
||||
input_key: None,
|
||||
},
|
||||
},
|
||||
// get second column
|
||||
BuildDesc {
|
||||
id: GlobalId::User(1),
|
||||
plan: Plan::Get {
|
||||
id: Id::Global(GlobalId::User(0)),
|
||||
// not used since plan is GetPlan::Arrangement
|
||||
keys: AvailableCollections::new_raw(),
|
||||
plan: GetPlan::Arrangement(
|
||||
vec![ScalarExpr::Column(0)],
|
||||
None,
|
||||
MapFilterProject::new(2).project([1]),
|
||||
),
|
||||
},
|
||||
},
|
||||
];
|
||||
let input_id = vec![Id::Global(GlobalId::System(0))];
|
||||
let dataflow = {
|
||||
let mut dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
|
||||
dataflow.objects_to_build = reduce_group_by_window;
|
||||
dataflow
|
||||
};
|
||||
let sink_ids = [GlobalId::User(1)];
|
||||
exec_dataflow(
|
||||
input_id.clone(),
|
||||
dataflow.clone(),
|
||||
sink_ids.to_vec(),
|
||||
vec![None],
|
||||
10,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_poc_reduce_distinct() {
|
||||
// 1. build dataflow with input collection connected
|
||||
// 2. give input
|
||||
// type annotation is needed to prevent rust-analyzer to give up type deduction
|
||||
|
||||
// simple give dataflow information
|
||||
// will be build by given dataflow information from other nodes later
|
||||
// window need date_trunc which is still WIP
|
||||
// key is the third column
|
||||
let key_plan = SafeMfpPlan {
|
||||
mfp: MapFilterProject::new(3).project([2]),
|
||||
};
|
||||
let val_plan = SafeMfpPlan {
|
||||
mfp: MapFilterProject::new(3).project([0, 1]),
|
||||
};
|
||||
// equal to `SELECT ts, COUNT(*) FROM input GROUP BY ts;`
|
||||
let reduce_plan = vec![BuildDesc {
|
||||
id: GlobalId::User(0),
|
||||
plan: Plan::Reduce {
|
||||
input: Box::new(Plan::Get {
|
||||
id: Id::Global(GlobalId::System(0)),
|
||||
keys: AvailableCollections::new_raw(),
|
||||
plan: GetPlan::Collection(MapFilterProject::new(3)),
|
||||
}),
|
||||
key_val_plan: KeyValPlan { key_plan, val_plan },
|
||||
plan: ReducePlan::Distinct,
|
||||
input_key: None,
|
||||
},
|
||||
}];
|
||||
let input_id = vec![Id::Global(GlobalId::System(0))];
|
||||
let dataflow = {
|
||||
let mut dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
|
||||
dataflow.objects_to_build = reduce_plan;
|
||||
dataflow
|
||||
};
|
||||
let sink_ids = [GlobalId::User(0)];
|
||||
exec_dataflow(
|
||||
input_id.clone(),
|
||||
dataflow.clone(),
|
||||
sink_ids.to_vec(),
|
||||
vec![Some(vec![ScalarExpr::Column(0)])],
|
||||
10,
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
#[allow(clippy::print_stdout)]
|
||||
fn test_constant_plan_render() {
|
||||
let build_descs = vec![BuildDesc {
|
||||
id: GlobalId::User(0),
|
||||
plan: Plan::Constant {
|
||||
rows: Ok(vec![(Row::default(), 0, 1)]),
|
||||
},
|
||||
}];
|
||||
let dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
|
||||
|
||||
timely::execute_from_args(std::iter::empty::<String>(), move |worker| {
|
||||
println!("worker: {:?}", worker.index());
|
||||
let mut input = InputSession::<repr::Timestamp, Row, Diff>::new();
|
||||
worker.dataflow(|scope: &mut Child<'_, _, repr::Timestamp>| {
|
||||
let mut test_ctx = Context::<_, Row, _>::for_dataflow_in(&dataflow, scope.clone());
|
||||
for build_desc in &build_descs {
|
||||
test_ctx.build_object(build_desc.clone());
|
||||
}
|
||||
let input_collection = input.to_collection(scope);
|
||||
let err_collection = InputSession::new().to_collection(scope);
|
||||
let input_collection =
|
||||
CollectionBundle::from_collections(input_collection, err_collection);
|
||||
|
||||
// insert collection
|
||||
test_ctx.insert_id(Id::Local(LocalId(0)), input_collection);
|
||||
|
||||
let inspect = test_ctx
|
||||
.lookup_id(Id::Global(GlobalId::User(0)))
|
||||
.unwrap()
|
||||
.as_specific_collection(None);
|
||||
inspect.0.inspect(|x| println!("inspect {:?}", x));
|
||||
});
|
||||
// input.insert(Row::default());
|
||||
input.update(Row::default(), 1);
|
||||
input.advance_to(1);
|
||||
})
|
||||
.expect("Computation terminated abnormally");
|
||||
}
|
||||
}
|
||||
1001
src/flow/src/compute/render/reduce.rs
Normal file
1001
src/flow/src/compute/render/reduce.rs
Normal file
File diff suppressed because it is too large
Load Diff
20
src/flow/src/compute/typedefs.rs
Normal file
20
src/flow/src/compute/typedefs.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
use differential_dataflow::operators::arrange::TraceAgent;
|
||||
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
|
||||
|
||||
use crate::repr::{Diff, Row, Timestamp};
|
||||
use crate::storage::errors::DataflowError;
|
||||
|
||||
// TODO(discord9): consider use ColValSpine for columnation storage
|
||||
|
||||
/// T: Time, R: Diff, O: Offset
|
||||
pub type RowSpine<K, V, T, R, O = usize> = OrdValSpine<K, V, T, R, O>;
|
||||
/// T: Time, R: Diff, O: Offset
|
||||
pub type RowKeySpine<K, T, R, O = usize> = OrdKeySpine<K, T, R, O>;
|
||||
/// T: Time, R: Diff, O: Offset
|
||||
pub type ErrSpine<K, T, R, O = usize> = OrdKeySpine<K, T, R, O>;
|
||||
/// T: Time, R: Diff, O: Offset
|
||||
pub type ErrValSpine<K, T, R, O = usize> = OrdValSpine<K, DataflowError, T, R, O>;
|
||||
pub type TraceRowHandle<K, V, T, R> = TraceAgent<RowSpine<K, V, T, R>>;
|
||||
pub type TraceErrHandle<K, T, R> = TraceAgent<ErrSpine<K, T, R>>;
|
||||
pub type KeysValsHandle = TraceRowHandle<Row, Row, Timestamp, Diff>;
|
||||
pub type ErrsHandle = TraceErrHandle<DataflowError, Timestamp, Diff>;
|
||||
75
src/flow/src/compute/types/dataflow.rs
Normal file
75
src/flow/src/compute/types/dataflow.rs
Normal file
@@ -0,0 +1,75 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use timely::progress::Antichain;
|
||||
|
||||
use crate::compute::plan::Plan;
|
||||
use crate::compute::types::sinks::ComputeSinkDesc;
|
||||
use crate::compute::types::sources::SourceInstanceDesc;
|
||||
use crate::expr::{GlobalId, ScalarExpr};
|
||||
use crate::repr::{self, RelationType};
|
||||
|
||||
/// A description of a dataflow to construct and results to surface.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct DataflowDescription<P, S: 'static = (), T = repr::Timestamp> {
|
||||
/// Sources instantiations made available to the dataflow pair with monotonicity information.
|
||||
pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool)>,
|
||||
/// Indexes made available to the dataflow.
|
||||
/// (id of new index, description of index, relationtype of base source/view, monotonic)
|
||||
pub index_imports: BTreeMap<GlobalId, (IndexDesc, RelationType, bool)>,
|
||||
/// Views and indexes to be built and stored in the local context.
|
||||
/// Objects must be built in the specific order, as there may be
|
||||
/// dependencies of later objects on prior identifiers.
|
||||
pub objects_to_build: Vec<BuildDesc<P>>,
|
||||
/// Indexes to be made available to be shared with other dataflows
|
||||
/// (id of new index, description of index, relationtype of base source/view)
|
||||
pub index_exports: BTreeMap<GlobalId, (IndexDesc, RelationType)>,
|
||||
/// sinks to be created
|
||||
/// (id of new sink, description of sink)
|
||||
pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
|
||||
/// An optional frontier to which inputs should be advanced.
|
||||
///
|
||||
/// If this is set, it should override the default setting determined by
|
||||
/// the upper bound of `since` frontiers contributing to the dataflow.
|
||||
/// It is an error for this to be set to a frontier not beyond that default.
|
||||
pub as_of: Option<Antichain<T>>,
|
||||
/// Frontier beyond which the dataflow should not execute.
|
||||
/// Specifically, updates at times greater or equal to this frontier are suppressed.
|
||||
/// This is often set to `as_of + 1` to enable "batch" computations.
|
||||
pub until: Antichain<T>,
|
||||
/// Human readable name
|
||||
pub debug_name: String,
|
||||
}
|
||||
|
||||
impl<P, T> DataflowDescription<P, (), T> {
|
||||
/// Creates a new dataflow description with a human-readable name.
|
||||
pub fn new(name: String) -> Self {
|
||||
Self {
|
||||
source_imports: Default::default(),
|
||||
index_imports: Default::default(),
|
||||
objects_to_build: Vec::new(),
|
||||
index_exports: Default::default(),
|
||||
sink_exports: Default::default(),
|
||||
as_of: Default::default(),
|
||||
until: Antichain::new(),
|
||||
debug_name: name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An association of a global identifier to an expression.
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct BuildDesc<P = Plan> {
|
||||
pub id: GlobalId,
|
||||
pub plan: P,
|
||||
}
|
||||
|
||||
/// An index storing processed updates so they can be queried
|
||||
/// or reused in other computations
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
|
||||
pub struct IndexDesc {
|
||||
/// Identity of the collection the index is on.
|
||||
pub on_id: GlobalId,
|
||||
/// Expressions to be arranged, in order of decreasing primacy.
|
||||
pub key: Vec<ScalarExpr>,
|
||||
}
|
||||
8
src/flow/src/compute/types/mod.rs
Normal file
8
src/flow/src/compute/types/mod.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::expr::GlobalId;
|
||||
mod dataflow;
|
||||
mod sinks;
|
||||
mod sources;
|
||||
|
||||
pub(crate) use dataflow::{BuildDesc, DataflowDescription, IndexDesc};
|
||||
28
src/flow/src/compute/types/sinks.rs
Normal file
28
src/flow/src/compute/types/sinks.rs
Normal file
@@ -0,0 +1,28 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use timely::progress::Antichain;
|
||||
|
||||
use crate::expr::GlobalId;
|
||||
use crate::repr::{self, RelationDesc};
|
||||
|
||||
/// A sink for updates to a relational collection.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct ComputeSinkDesc<S: 'static = (), T = repr::Timestamp> {
|
||||
pub from: GlobalId,
|
||||
pub from_desc: RelationDesc,
|
||||
pub connection: ComputeSinkConnection<S>,
|
||||
pub with_snapshot: bool,
|
||||
pub up_to: Antichain<T>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub enum ComputeSinkConnection<S: 'static = ()> {
|
||||
// TODO(discord9): consider if ever needed
|
||||
Subscribe,
|
||||
Persist(PersistSinkConnection<S>),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct PersistSinkConnection<S> {
|
||||
pub value_desc: RelationDesc,
|
||||
pub storage_metadata: S,
|
||||
}
|
||||
26
src/flow/src/compute/types/sources.rs
Normal file
26
src/flow/src/compute/types/sources.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::expr::MapFilterProject;
|
||||
use crate::repr::RelationType;
|
||||
|
||||
/// A description of an instantiation of a source.
|
||||
///
|
||||
/// This includes a description of the source, but additionally any
|
||||
/// context-dependent options like the ability to apply filtering and
|
||||
/// projection to the records as they emerge.
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct SourceInstanceDesc<M> {
|
||||
/// Arguments for this instantiation of the source.
|
||||
pub arguments: SourceInstanceArguments,
|
||||
/// Additional metadata used by the storage client of a compute instance to read it.
|
||||
pub storage_metadata: M,
|
||||
/// The relation type of this source
|
||||
pub typ: RelationType,
|
||||
}
|
||||
|
||||
/// Per-source construction arguments.
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct SourceInstanceArguments {
|
||||
/// Linear operators to be applied record-by-record.
|
||||
pub operators: Option<MapFilterProject>,
|
||||
}
|
||||
224
src/flow/src/expr/func.rs
Normal file
224
src/flow/src/expr/func.rs
Normal file
@@ -0,0 +1,224 @@
|
||||
use datatypes::value::Value;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::ScalarExpr;
|
||||
// TODO(discord9): more function & eval
|
||||
use crate::{repr::Row, storage::errors::EvalError};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
|
||||
pub enum UnaryFunc {
|
||||
Not,
|
||||
IsNull,
|
||||
IsTrue,
|
||||
IsFalse,
|
||||
CastDatetimeToInt64,
|
||||
CastInt64ToFloat32,
|
||||
}
|
||||
|
||||
impl UnaryFunc {
|
||||
pub fn eval(&self, values: &[Value], expr: &ScalarExpr) -> Result<Value, EvalError> {
|
||||
let arg = expr.eval(values)?;
|
||||
match self {
|
||||
Self::CastDatetimeToInt64 => {
|
||||
let datetime = if let Value::DateTime(datetime) = arg {
|
||||
Ok(datetime.val())
|
||||
} else {
|
||||
Err(EvalError::TypeMismatch(format!(
|
||||
"cannot cast {:?} to datetime",
|
||||
arg
|
||||
)))
|
||||
}?;
|
||||
Ok(Value::from(datetime))
|
||||
}
|
||||
Self::CastInt64ToFloat32 => {
|
||||
let int64 = if let Value::Int64(int64) = arg {
|
||||
Ok(int64)
|
||||
} else {
|
||||
Err(EvalError::TypeMismatch(format!(
|
||||
"cannot cast {:?} to int64",
|
||||
arg
|
||||
)))
|
||||
}?;
|
||||
Ok(Value::from(int64 as f32))
|
||||
}
|
||||
_ => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO: support more binary functions for more types
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
|
||||
pub enum BinaryFunc {
|
||||
Eq,
|
||||
NotEq,
|
||||
Lt,
|
||||
Lte,
|
||||
Gt,
|
||||
Gte,
|
||||
AddInt16,
|
||||
AddInt32,
|
||||
AddInt64,
|
||||
AddUInt16,
|
||||
AddUInt32,
|
||||
AddUInt64,
|
||||
AddFloat32,
|
||||
AddFloat64,
|
||||
SubInt16,
|
||||
SubInt32,
|
||||
SubInt64,
|
||||
SubUInt16,
|
||||
SubUInt32,
|
||||
SubUInt64,
|
||||
SubFloat32,
|
||||
SubFloat64,
|
||||
MulInt16,
|
||||
MulInt32,
|
||||
MulInt64,
|
||||
MulUInt16,
|
||||
MulUInt32,
|
||||
MulUInt64,
|
||||
MulFloat32,
|
||||
MulFloat64,
|
||||
DivInt16,
|
||||
DivInt32,
|
||||
DivInt64,
|
||||
DivUInt16,
|
||||
DivUInt32,
|
||||
DivUInt64,
|
||||
DivFloat32,
|
||||
DivFloat64,
|
||||
ModInt16,
|
||||
ModInt32,
|
||||
ModInt64,
|
||||
ModUInt16,
|
||||
ModUInt32,
|
||||
ModUInt64,
|
||||
}
|
||||
|
||||
impl BinaryFunc {
|
||||
pub fn eval(
|
||||
&self,
|
||||
values: &[Value],
|
||||
expr1: &ScalarExpr,
|
||||
expr2: &ScalarExpr,
|
||||
) -> Result<Value, EvalError> {
|
||||
let left = expr1.eval(values)?;
|
||||
let right = expr2.eval(values)?;
|
||||
match self {
|
||||
Self::Eq => Ok(Value::from(left == right)),
|
||||
Self::NotEq => Ok(Value::from(left != right)),
|
||||
Self::Lt => Ok(Value::from(left < right)),
|
||||
Self::Lte => Ok(Value::from(left <= right)),
|
||||
Self::Gt => Ok(Value::from(left > right)),
|
||||
Self::Gte => Ok(Value::from(left >= right)),
|
||||
Self::AddInt16 => Ok(add::<i16>(left, right)?),
|
||||
Self::AddInt32 => Ok(add::<i32>(left, right)?),
|
||||
Self::AddInt64 => Ok(add::<i64>(left, right)?),
|
||||
Self::AddUInt16 => Ok(add::<u16>(left, right)?),
|
||||
Self::AddUInt32 => Ok(add::<u32>(left, right)?),
|
||||
Self::AddUInt64 => Ok(add::<u64>(left, right)?),
|
||||
Self::AddFloat32 => Ok(add::<f32>(left, right)?),
|
||||
Self::AddFloat64 => Ok(add::<f64>(left, right)?),
|
||||
|
||||
Self::SubInt16 => Ok(sub::<i16>(left, right)?),
|
||||
Self::SubInt32 => Ok(sub::<i32>(left, right)?),
|
||||
Self::SubInt64 => Ok(sub::<i64>(left, right)?),
|
||||
Self::SubUInt16 => Ok(sub::<u16>(left, right)?),
|
||||
Self::SubUInt32 => Ok(sub::<u32>(left, right)?),
|
||||
Self::SubUInt64 => Ok(sub::<u64>(left, right)?),
|
||||
Self::SubFloat32 => Ok(sub::<f32>(left, right)?),
|
||||
Self::SubFloat64 => Ok(sub::<f64>(left, right)?),
|
||||
|
||||
Self::MulInt16 => Ok(mul::<i16>(left, right)?),
|
||||
Self::MulInt32 => Ok(mul::<i32>(left, right)?),
|
||||
Self::MulInt64 => Ok(mul::<i64>(left, right)?),
|
||||
Self::MulUInt16 => Ok(mul::<u16>(left, right)?),
|
||||
Self::MulUInt32 => Ok(mul::<u32>(left, right)?),
|
||||
Self::MulUInt64 => Ok(mul::<u64>(left, right)?),
|
||||
Self::MulFloat32 => Ok(mul::<f32>(left, right)?),
|
||||
Self::MulFloat64 => Ok(mul::<f64>(left, right)?),
|
||||
|
||||
Self::DivInt16 => Ok(div::<i16>(left, right)?),
|
||||
Self::DivInt32 => Ok(div::<i32>(left, right)?),
|
||||
Self::DivInt64 => Ok(div::<i64>(left, right)?),
|
||||
Self::DivUInt16 => Ok(div::<u16>(left, right)?),
|
||||
Self::DivUInt32 => Ok(div::<u32>(left, right)?),
|
||||
Self::DivUInt64 => Ok(div::<u64>(left, right)?),
|
||||
Self::DivFloat32 => Ok(div::<f32>(left, right)?),
|
||||
Self::DivFloat64 => Ok(div::<f64>(left, right)?),
|
||||
|
||||
Self::ModInt16 => Ok(rem::<i16>(left, right)?),
|
||||
Self::ModInt32 => Ok(rem::<i32>(left, right)?),
|
||||
Self::ModInt64 => Ok(rem::<i64>(left, right)?),
|
||||
Self::ModUInt16 => Ok(rem::<u16>(left, right)?),
|
||||
Self::ModUInt32 => Ok(rem::<u32>(left, right)?),
|
||||
Self::ModUInt64 => Ok(rem::<u64>(left, right)?),
|
||||
|
||||
_ => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
|
||||
pub enum VariadicFunc {}
|
||||
|
||||
impl VariadicFunc {
|
||||
pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
fn add<T>(left: Value, right: Value) -> Result<Value, EvalError>
|
||||
where
|
||||
T: TryFrom<Value> + std::ops::Add<Output = T>,
|
||||
<T as TryFrom<Value>>::Error: std::fmt::Debug,
|
||||
Value: From<T>,
|
||||
{
|
||||
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
Ok(Value::from(left + right))
|
||||
}
|
||||
|
||||
fn sub<T>(left: Value, right: Value) -> Result<Value, EvalError>
|
||||
where
|
||||
T: TryFrom<Value> + std::ops::Sub<Output = T>,
|
||||
<T as TryFrom<Value>>::Error: std::fmt::Debug,
|
||||
Value: From<T>,
|
||||
{
|
||||
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
Ok(Value::from(left - right))
|
||||
}
|
||||
|
||||
fn mul<T>(left: Value, right: Value) -> Result<Value, EvalError>
|
||||
where
|
||||
T: TryFrom<Value> + std::ops::Mul<Output = T>,
|
||||
<T as TryFrom<Value>>::Error: std::fmt::Debug,
|
||||
Value: From<T>,
|
||||
{
|
||||
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
Ok(Value::from(left * right))
|
||||
}
|
||||
|
||||
fn div<T>(left: Value, right: Value) -> Result<Value, EvalError>
|
||||
where
|
||||
T: TryFrom<Value> + std::ops::Div<Output = T>,
|
||||
<T as TryFrom<Value>>::Error: std::fmt::Debug,
|
||||
Value: From<T>,
|
||||
{
|
||||
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
Ok(Value::from(left / right))
|
||||
}
|
||||
|
||||
fn rem<T>(left: Value, right: Value) -> Result<Value, EvalError>
|
||||
where
|
||||
T: TryFrom<Value> + std::ops::Rem<Output = T>,
|
||||
<T as TryFrom<Value>>::Error: std::fmt::Debug,
|
||||
Value: From<T>,
|
||||
{
|
||||
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
|
||||
Ok(Value::from(left % right))
|
||||
}
|
||||
24
src/flow/src/expr/id.rs
Normal file
24
src/flow/src/expr/id.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
|
||||
pub enum GlobalId {
|
||||
/// System namespace.
|
||||
System(u64),
|
||||
/// User namespace.
|
||||
User(u64),
|
||||
/// Transient namespace.
|
||||
Transient(u64),
|
||||
/// Dummy id for query being explained
|
||||
Explain,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
pub struct LocalId(pub(crate) u64);
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
pub enum Id {
|
||||
/// An identifier that refers to a local component of a dataflow.
|
||||
Local(LocalId),
|
||||
/// An identifier that refers to a global dataflow.
|
||||
Global(GlobalId),
|
||||
}
|
||||
381
src/flow/src/expr/linear.rs
Normal file
381
src/flow/src/expr/linear.rs
Normal file
@@ -0,0 +1,381 @@
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use datatypes::value::Value;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::expr::{Id, LocalId, ScalarExpr};
|
||||
use crate::repr::{self, Diff, Row};
|
||||
use crate::storage::errors::EvalError;
|
||||
|
||||
/// A compound operator that can be applied row-by-row.
|
||||
///
|
||||
/// This operator integrates the map, filter, and project operators.
|
||||
/// It applies a sequences of map expressions, which are allowed to
|
||||
/// refer to previous expressions, interleaved with predicates which
|
||||
/// must be satisfied for an output to be produced. If all predicates
|
||||
/// evaluate to `Datum::True` the data at the identified columns are
|
||||
/// collected and produced as output in a packed `Row`.
|
||||
///
|
||||
/// This operator is a "builder" and its contents may contain expressions
|
||||
/// that are not yet executable. For example, it may contain temporal
|
||||
/// expressions in `self.expressions`, even though this is not something
|
||||
/// we can directly evaluate. The plan creation methods will defensively
|
||||
/// ensure that the right thing happens.
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct MapFilterProject {
|
||||
/// A sequence of expressions that should be appended to the row.
|
||||
///
|
||||
/// Many of these expressions may not be produced in the output,
|
||||
/// and may only be present as common subexpressions.
|
||||
pub expressions: Vec<ScalarExpr>,
|
||||
/// Expressions that must evaluate to `Datum::True` for the output
|
||||
/// row to be produced.
|
||||
///
|
||||
/// Each entry is prepended with a column identifier indicating
|
||||
/// the column *before* which the predicate should first be applied.
|
||||
/// Most commonly this would be one plus the largest column identifier
|
||||
/// in the predicate's support, but it could be larger to implement
|
||||
/// guarded evaluation of predicates.
|
||||
///
|
||||
/// This list should be sorted by the first field.
|
||||
pub predicates: Vec<(usize, ScalarExpr)>,
|
||||
/// A sequence of column identifiers whose data form the output row.
|
||||
pub projection: Vec<usize>,
|
||||
/// The expected number of input columns.
|
||||
///
|
||||
/// This is needed to ensure correct identification of newly formed
|
||||
/// columns in the output.
|
||||
pub input_arity: usize,
|
||||
}
|
||||
|
||||
impl MapFilterProject {
|
||||
/// Create a no-op operator for an input of a supplied arity.
|
||||
pub fn new(input_arity: usize) -> Self {
|
||||
Self {
|
||||
expressions: Vec::new(),
|
||||
predicates: Vec::new(),
|
||||
projection: (0..input_arity).collect(),
|
||||
input_arity,
|
||||
}
|
||||
}
|
||||
|
||||
/// Given two mfps, return an mfp that applies one
|
||||
/// followed by the other.
|
||||
/// Note that the arguments are in the opposite order
|
||||
/// from how function composition is usually written in mathematics.
|
||||
pub fn compose(before: Self, after: Self) -> Self {
|
||||
let (m, f, p) = after.into_map_filter_project();
|
||||
before.map(m).filter(f).project(p)
|
||||
}
|
||||
|
||||
/// True if the operator describes the identity transformation.
|
||||
pub fn is_identity(&self) -> bool {
|
||||
self.expressions.is_empty()
|
||||
&& self.predicates.is_empty()
|
||||
&& self.projection.len() == self.input_arity
|
||||
&& self.projection.iter().enumerate().all(|(i, p)| i == *p)
|
||||
}
|
||||
|
||||
/// Retain only the indicated columns in the presented order.
|
||||
pub fn project<I>(mut self, columns: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = usize> + std::fmt::Debug,
|
||||
{
|
||||
self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
|
||||
self
|
||||
}
|
||||
|
||||
/// Retain only rows satisfying these predicates.
|
||||
///
|
||||
/// This method introduces predicates as eagerly as they can be evaluated,
|
||||
/// which may not be desired for predicates that may cause exceptions.
|
||||
/// If fine manipulation is required, the predicates can be added manually.
|
||||
pub fn filter<I>(mut self, predicates: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = ScalarExpr>,
|
||||
{
|
||||
for mut predicate in predicates {
|
||||
// Correct column references.
|
||||
predicate.permute(&self.projection[..]);
|
||||
|
||||
// Validate column references.
|
||||
assert!(predicate
|
||||
.support()
|
||||
.into_iter()
|
||||
.all(|c| c < self.input_arity + self.expressions.len()));
|
||||
|
||||
// Insert predicate as eagerly as it can be evaluated:
|
||||
// just after the largest column in its support is formed.
|
||||
let max_support = predicate
|
||||
.support()
|
||||
.into_iter()
|
||||
.max()
|
||||
.map(|c| c + 1)
|
||||
.unwrap_or(0);
|
||||
self.predicates.push((max_support, predicate))
|
||||
}
|
||||
// Stable sort predicates by position at which they take effect.
|
||||
// We put literal errors at the end as a stop-gap to avoid erroring
|
||||
// before we are able to evaluate any predicates that might prevent it.
|
||||
self.predicates
|
||||
.sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
|
||||
self
|
||||
}
|
||||
|
||||
/// Append the result of evaluating expressions to each row.
|
||||
pub fn map<I>(mut self, expressions: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = ScalarExpr>,
|
||||
{
|
||||
for mut expression in expressions {
|
||||
// Correct column references.
|
||||
expression.permute(&self.projection[..]);
|
||||
|
||||
// Validate column references.
|
||||
assert!(expression
|
||||
.support()
|
||||
.into_iter()
|
||||
.all(|c| c < self.input_arity + self.expressions.len()));
|
||||
|
||||
// Introduce expression and produce as output.
|
||||
self.expressions.push(expression);
|
||||
self.projection
|
||||
.push(self.input_arity + self.expressions.len() - 1);
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
/// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
|
||||
pub fn into_map_filter_project(self) -> (Vec<ScalarExpr>, Vec<ScalarExpr>, Vec<usize>) {
|
||||
let predicates = self
|
||||
.predicates
|
||||
.into_iter()
|
||||
.map(|(_pos, predicate)| predicate)
|
||||
.collect();
|
||||
(self.expressions, predicates, self.projection)
|
||||
}
|
||||
|
||||
/// As the arguments to `Map`, `Filter`, and `Project` operators.
|
||||
///
|
||||
/// In principle, this operator can be implemented as a sequence of
|
||||
/// more elemental operators, likely less efficiently.
|
||||
pub fn as_map_filter_project(&self) -> (Vec<ScalarExpr>, Vec<ScalarExpr>, Vec<usize>) {
|
||||
self.clone().into_map_filter_project()
|
||||
}
|
||||
}
|
||||
|
||||
impl MapFilterProject {
|
||||
pub fn optimize(&mut self) {
|
||||
// TODO(discord9): optimize later
|
||||
}
|
||||
|
||||
/// Convert the `MapFilterProject` into a staged evaluation plan.
|
||||
///
|
||||
/// The main behavior is extract temporal predicates, which cannot be evaluated
|
||||
/// using the standard machinery.
|
||||
pub fn into_plan(self) -> Result<MfpPlan, String> {
|
||||
MfpPlan::create_from(self)
|
||||
}
|
||||
|
||||
/// Lists input columns whose values are used in outputs.
|
||||
///
|
||||
/// It is entirely appropriate to determine the demand of an instance
|
||||
/// and then both apply a projection to the subject of the instance and
|
||||
/// `self.permute` this instance.
|
||||
pub fn demand(&self) -> BTreeSet<usize> {
|
||||
let mut demanded = BTreeSet::new();
|
||||
for (_index, pred) in self.predicates.iter() {
|
||||
demanded.extend(pred.support());
|
||||
}
|
||||
demanded.extend(self.projection.iter().cloned());
|
||||
for index in (0..self.expressions.len()).rev() {
|
||||
if demanded.contains(&(self.input_arity + index)) {
|
||||
demanded.extend(self.expressions[index].support());
|
||||
}
|
||||
}
|
||||
demanded.retain(|col| col < &self.input_arity);
|
||||
demanded
|
||||
}
|
||||
|
||||
/// Update input column references, due to an input projection or permutation.
|
||||
///
|
||||
/// The `shuffle` argument remaps expected column identifiers to new locations,
|
||||
/// with the expectation that `shuffle` describes all input columns, and so the
|
||||
/// intermediate results will be able to start at position `shuffle.len()`.
|
||||
///
|
||||
/// The supplied `shuffle` may not list columns that are not "demanded" by the
|
||||
/// instance, and so we should ensure that `self` is optimized to not reference
|
||||
/// columns that are not demanded.
|
||||
pub fn permute(&mut self, mut shuffle: BTreeMap<usize, usize>, new_input_arity: usize) {
|
||||
let (mut map, mut filter, mut project) = self.as_map_filter_project();
|
||||
for index in 0..map.len() {
|
||||
// Intermediate columns are just shifted.
|
||||
shuffle.insert(self.input_arity + index, new_input_arity + index);
|
||||
}
|
||||
for expr in map.iter_mut() {
|
||||
expr.permute_map(&shuffle);
|
||||
}
|
||||
for pred in filter.iter_mut() {
|
||||
pred.permute_map(&shuffle);
|
||||
}
|
||||
for proj in project.iter_mut() {
|
||||
assert!(shuffle[proj] < new_input_arity + map.len());
|
||||
*proj = shuffle[proj];
|
||||
}
|
||||
*self = Self::new(new_input_arity)
|
||||
.map(map)
|
||||
.filter(filter)
|
||||
.project(project)
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper type which indicates it is safe to simply evaluate all expressions.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub struct SafeMfpPlan {
|
||||
pub(crate) mfp: MapFilterProject,
|
||||
}
|
||||
|
||||
impl SafeMfpPlan {
|
||||
pub fn permute(&mut self, map: BTreeMap<usize, usize>, new_arity: usize) {
|
||||
self.mfp.permute(map, new_arity);
|
||||
}
|
||||
|
||||
/// Evaluates the linear operator on a supplied list of datums.
|
||||
///
|
||||
/// The arguments are the initial datums associated with the row,
|
||||
/// and an appropriately lifetimed arena for temporary allocations
|
||||
/// needed by scalar evaluation.
|
||||
///
|
||||
/// An `Ok` result will either be `None` if any predicate did not
|
||||
/// evaluate to `Value::Boolean(true)`, or the values of the columns listed
|
||||
/// by `self.projection` if all predicates passed. If an error
|
||||
/// occurs in the evaluation it is returned as an `Err` variant.
|
||||
/// As the evaluation exits early with failed predicates, it may
|
||||
/// miss some errors that would occur later in evaluation.
|
||||
///
|
||||
/// The `row` is not cleared first, but emptied if the function
|
||||
/// returns `Ok(Some(row)).
|
||||
#[inline(always)]
|
||||
pub fn evaluate_into(
|
||||
&self,
|
||||
values: &mut Vec<Value>,
|
||||
row_buf: &mut Row,
|
||||
) -> Result<Option<Row>, EvalError> {
|
||||
let passed_predicates = self.evaluate_inner(values)?;
|
||||
if !passed_predicates {
|
||||
Ok(None)
|
||||
} else {
|
||||
row_buf.clear();
|
||||
row_buf.extend(self.mfp.projection.iter().map(|c| values[*c].clone()));
|
||||
Ok(Some(row_buf.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
/// A version of `evaluate` which produces an iterator over `Datum`
|
||||
/// as output.
|
||||
///
|
||||
/// This version can be useful when one wants to capture the resulting
|
||||
/// datums without packing and then unpacking a row.
|
||||
#[inline(always)]
|
||||
pub fn evaluate_iter<'a>(
|
||||
&'a self,
|
||||
datums: &'a mut Vec<Value>,
|
||||
) -> Result<Option<impl Iterator<Item = Value> + 'a>, EvalError> {
|
||||
let passed_predicates = self.evaluate_inner(datums)?;
|
||||
if !passed_predicates {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(
|
||||
self.mfp.projection.iter().map(move |i| datums[*i].clone()),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Populates `datums` with `self.expressions` and tests `self.predicates`.
|
||||
///
|
||||
/// This does not apply `self.projection`, which is up to the calling method.
|
||||
pub fn evaluate_inner(&self, values: &mut Vec<Value>) -> Result<bool, EvalError> {
|
||||
let mut expression = 0;
|
||||
for (support, predicate) in self.mfp.predicates.iter() {
|
||||
while self.mfp.input_arity + expression < *support {
|
||||
values.push(self.mfp.expressions[expression].eval(&values[..])?);
|
||||
expression += 1;
|
||||
}
|
||||
if predicate.eval(&values[..])? != Value::Boolean(true) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
while expression < self.mfp.expressions.len() {
|
||||
values.push(self.mfp.expressions[expression].eval(&values[..])?);
|
||||
expression += 1;
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for SafeMfpPlan {
|
||||
type Target = MapFilterProject;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.mfp
|
||||
}
|
||||
}
|
||||
|
||||
/// Predicates partitioned into temporal and non-temporal.
|
||||
///
|
||||
/// Temporal predicates require some recognition to determine their
|
||||
/// structure, and it is best to do that once and re-use the results.
|
||||
///
|
||||
/// There are restrictions on the temporal predicates we currently support.
|
||||
/// They must directly constrain `MzNow` from below or above,
|
||||
/// by expressions that do not themselves contain `MzNow`.
|
||||
/// Conjunctions of such constraints are also ok.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct MfpPlan {
|
||||
/// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
|
||||
pub(crate) mfp: SafeMfpPlan,
|
||||
/// TODO(discord9): impl temporal filter later
|
||||
/// Expressions that when evaluated lower-bound `MzNow`.
|
||||
pub(crate) lower_bounds: Vec<ScalarExpr>,
|
||||
/// Expressions that when evaluated upper-bound `MzNow`.
|
||||
pub(crate) upper_bounds: Vec<ScalarExpr>,
|
||||
}
|
||||
|
||||
impl MfpPlan {
|
||||
pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
|
||||
Ok(Self {
|
||||
mfp: SafeMfpPlan { mfp },
|
||||
lower_bounds: Vec::new(),
|
||||
upper_bounds: Vec::new(),
|
||||
})
|
||||
}
|
||||
pub fn evaluate<E: From<EvalError>, V: Fn(&repr::Timestamp) -> bool>(
|
||||
&self,
|
||||
values: &mut Vec<Value>,
|
||||
time: repr::Timestamp,
|
||||
diff: Diff,
|
||||
valid_time: V,
|
||||
) -> impl Iterator<Item = Result<(Row, repr::Timestamp, Diff), (E, repr::Timestamp, Diff)>>
|
||||
{
|
||||
match self.mfp.evaluate_inner(values) {
|
||||
Err(e) => {
|
||||
return Some(Err((e.into(), time, diff)))
|
||||
.into_iter()
|
||||
.chain(None.into_iter());
|
||||
}
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
return None.into_iter().chain(None.into_iter());
|
||||
}
|
||||
}
|
||||
// TODO(discord9): Temporal filter
|
||||
let ret = Row::pack(self.mfp.mfp.projection.iter().map(|c| values[*c].clone()));
|
||||
Some(Ok((ret, time, diff)))
|
||||
.into_iter()
|
||||
.chain(None.into_iter())
|
||||
}
|
||||
/// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
|
||||
pub fn is_identity(&self) -> bool {
|
||||
self.mfp.mfp.is_identity() && self.lower_bounds.is_empty() && self.upper_bounds.is_empty()
|
||||
}
|
||||
}
|
||||
207
src/flow/src/expr/mod.rs
Normal file
207
src/flow/src/expr/mod.rs
Normal file
@@ -0,0 +1,207 @@
|
||||
//! for declare dataflow description that is the last step before build dataflow
|
||||
|
||||
mod func;
|
||||
mod id;
|
||||
mod linear;
|
||||
mod relation;
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
pub use id::{GlobalId, Id, LocalId};
|
||||
pub use linear::{MapFilterProject, SafeMfpPlan};
|
||||
pub(crate) use relation::{AggregateExpr, AggregateFunc, TableFunc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub(crate) use crate::expr::func::{BinaryFunc, UnaryFunc, VariadicFunc};
|
||||
use crate::storage::errors::EvalError;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub enum ScalarExpr {
|
||||
/// A column of the input row
|
||||
Column(usize),
|
||||
/// A literal value.
|
||||
Literal(Result<Value, EvalError>, ConcreteDataType),
|
||||
CallUnary {
|
||||
func: UnaryFunc,
|
||||
expr: Box<ScalarExpr>,
|
||||
},
|
||||
CallBinary {
|
||||
func: BinaryFunc,
|
||||
expr1: Box<ScalarExpr>,
|
||||
expr2: Box<ScalarExpr>,
|
||||
},
|
||||
CallVariadic {
|
||||
func: VariadicFunc,
|
||||
exprs: Vec<ScalarExpr>,
|
||||
},
|
||||
/// Conditionally evaluated expressions.
|
||||
///
|
||||
/// It is important that `then` and `els` only be evaluated if
|
||||
/// `cond` is true or not, respectively. This is the only way
|
||||
/// users can guard execution (other logical operator do not
|
||||
/// short-circuit) and we need to preserve that.
|
||||
If {
|
||||
cond: Box<ScalarExpr>,
|
||||
then: Box<ScalarExpr>,
|
||||
els: Box<ScalarExpr>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ScalarExpr {
|
||||
pub fn eval(&self, values: &[Value]) -> Result<Value, EvalError> {
|
||||
match self {
|
||||
ScalarExpr::Column(index) => Ok(values[*index].clone()),
|
||||
ScalarExpr::Literal(row_res, _ty) => row_res.clone(),
|
||||
ScalarExpr::CallUnary { func, expr } => func.eval(values, expr),
|
||||
ScalarExpr::CallBinary { func, expr1, expr2 } => func.eval(values, expr1, expr2),
|
||||
ScalarExpr::CallVariadic { func, exprs } => func.eval(values, exprs),
|
||||
ScalarExpr::If { cond, then, els } => match cond.eval(values) {
|
||||
Ok(Value::Boolean(true)) => then.eval(values),
|
||||
Ok(Value::Boolean(false)) => els.eval(values),
|
||||
_ => Err(EvalError::InvalidArgument(
|
||||
"if condition must be boolean".to_string(),
|
||||
)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrites column indices with their value in `permutation`.
|
||||
///
|
||||
/// This method is applicable even when `permutation` is not a
|
||||
/// strict permutation, and it only needs to have entries for
|
||||
/// each column referenced in `self`.
|
||||
pub fn permute(&mut self, permutation: &[usize]) {
|
||||
#[allow(deprecated)]
|
||||
self.visit_mut_post_nolimit(&mut |e| {
|
||||
if let ScalarExpr::Column(old_i) = e {
|
||||
*old_i = permutation[*old_i];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Rewrites column indices with their value in `permutation`.
|
||||
///
|
||||
/// This method is applicable even when `permutation` is not a
|
||||
/// strict permutation, and it only needs to have entries for
|
||||
/// each column referenced in `self`.
|
||||
pub fn permute_map(&mut self, permutation: &BTreeMap<usize, usize>) {
|
||||
#[allow(deprecated)]
|
||||
self.visit_mut_post_nolimit(&mut |e| {
|
||||
if let ScalarExpr::Column(old_i) = e {
|
||||
*old_i = permutation[old_i];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn support(&self) -> BTreeSet<usize> {
|
||||
let mut support = BTreeSet::new();
|
||||
#[allow(deprecated)]
|
||||
self.visit_post_nolimit(&mut |e| {
|
||||
if let ScalarExpr::Column(i) = e {
|
||||
support.insert(*i);
|
||||
}
|
||||
});
|
||||
support
|
||||
}
|
||||
|
||||
pub fn as_literal(&self) -> Option<Result<Value, &EvalError>> {
|
||||
if let ScalarExpr::Literal(lit, _column_type) = self {
|
||||
Some(lit.as_ref().map(|row| row.clone()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_literal(&self) -> bool {
|
||||
matches!(self, ScalarExpr::Literal(_, _))
|
||||
}
|
||||
|
||||
pub fn is_literal_true(&self) -> bool {
|
||||
Some(Ok(Value::Boolean(true))) == self.as_literal()
|
||||
}
|
||||
|
||||
pub fn is_literal_false(&self) -> bool {
|
||||
Some(Ok(Value::Boolean(false))) == self.as_literal()
|
||||
}
|
||||
|
||||
pub fn is_literal_null(&self) -> bool {
|
||||
Some(Ok(Value::Null)) == self.as_literal()
|
||||
}
|
||||
|
||||
pub fn is_literal_ok(&self) -> bool {
|
||||
matches!(self, ScalarExpr::Literal(Ok(_), _typ))
|
||||
}
|
||||
|
||||
pub fn is_literal_err(&self) -> bool {
|
||||
matches!(self, ScalarExpr::Literal(Err(_), _typ))
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarExpr {
|
||||
/// visit post-order without stack call limit, but may cause stack overflow
|
||||
fn visit_post_nolimit<F>(&self, f: &mut F)
|
||||
where
|
||||
F: FnMut(&Self),
|
||||
{
|
||||
self.visit_children(|e| e.visit_post_nolimit(f));
|
||||
f(self);
|
||||
}
|
||||
|
||||
fn visit_children<F>(&self, mut f: F)
|
||||
where
|
||||
F: FnMut(&Self),
|
||||
{
|
||||
match self {
|
||||
ScalarExpr::Column(_) | ScalarExpr::Literal(_, _) => (),
|
||||
ScalarExpr::CallUnary { func, expr } => f(expr),
|
||||
ScalarExpr::CallBinary { func, expr1, expr2 } => {
|
||||
f(expr1);
|
||||
f(expr2);
|
||||
}
|
||||
ScalarExpr::CallVariadic { func, exprs } => {
|
||||
for expr in exprs {
|
||||
f(expr);
|
||||
}
|
||||
}
|
||||
ScalarExpr::If { cond, then, els } => {
|
||||
f(cond);
|
||||
f(then);
|
||||
f(els);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_mut_post_nolimit<F>(&mut self, f: &mut F)
|
||||
where
|
||||
F: FnMut(&mut Self),
|
||||
{
|
||||
self.visit_mut_children(|e: &mut Self| e.visit_mut_post_nolimit(f));
|
||||
f(self);
|
||||
}
|
||||
|
||||
fn visit_mut_children<F>(&mut self, mut f: F)
|
||||
where
|
||||
F: FnMut(&mut Self),
|
||||
{
|
||||
match self {
|
||||
ScalarExpr::Column(_) | ScalarExpr::Literal(_, _) => (),
|
||||
ScalarExpr::CallUnary { func, expr } => f(expr),
|
||||
ScalarExpr::CallBinary { func, expr1, expr2 } => {
|
||||
f(expr1);
|
||||
f(expr2);
|
||||
}
|
||||
ScalarExpr::CallVariadic { func, exprs } => {
|
||||
for expr in exprs {
|
||||
f(expr);
|
||||
}
|
||||
}
|
||||
ScalarExpr::If { cond, then, els } => {
|
||||
f(cond);
|
||||
f(then);
|
||||
f(els);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
206
src/flow/src/expr/relation/func.rs
Normal file
206
src/flow/src/expr/relation/func.rs
Normal file
@@ -0,0 +1,206 @@
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::{OrderedF32, OrderedF64, Value};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
|
||||
pub enum AggregateFunc {
|
||||
MaxInt16,
|
||||
MaxInt32,
|
||||
MaxInt64,
|
||||
MaxUInt16,
|
||||
MaxUInt32,
|
||||
MaxUInt64,
|
||||
MaxFloat32,
|
||||
MaxFloat64,
|
||||
MaxBool,
|
||||
MaxString,
|
||||
MaxDate,
|
||||
MaxTimestamp,
|
||||
MaxTimestampTz,
|
||||
MinInt16,
|
||||
MinInt32,
|
||||
MinInt64,
|
||||
MinUInt16,
|
||||
MinUInt32,
|
||||
MinUInt64,
|
||||
MinFloat32,
|
||||
MinFloat64,
|
||||
MinBool,
|
||||
MinString,
|
||||
MinDate,
|
||||
MinTimestamp,
|
||||
MinTimestampTz,
|
||||
SumInt16,
|
||||
SumInt32,
|
||||
SumInt64,
|
||||
SumUInt16,
|
||||
SumUInt32,
|
||||
SumUInt64,
|
||||
SumFloat32,
|
||||
SumFloat64,
|
||||
Count,
|
||||
Any,
|
||||
All,
|
||||
}
|
||||
|
||||
impl AggregateFunc {
|
||||
pub fn eval<I>(&self, values: I) -> Value
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
{
|
||||
// TODO: impl more functions like min/max/sumTimestamp etc.
|
||||
match self {
|
||||
AggregateFunc::MaxInt16 => max_value::<I, i16>(values),
|
||||
AggregateFunc::MaxInt32 => max_value::<I, i32>(values),
|
||||
AggregateFunc::MaxInt64 => max_value::<I, i64>(values),
|
||||
AggregateFunc::MaxUInt16 => max_value::<I, u16>(values),
|
||||
AggregateFunc::MaxUInt32 => max_value::<I, u32>(values),
|
||||
AggregateFunc::MaxUInt64 => max_value::<I, u64>(values),
|
||||
AggregateFunc::MaxFloat32 => max_value::<I, OrderedF32>(values),
|
||||
AggregateFunc::MaxFloat64 => max_value::<I, OrderedF64>(values),
|
||||
AggregateFunc::MaxBool => max_value::<I, bool>(values),
|
||||
AggregateFunc::MaxString => max_string(values),
|
||||
|
||||
AggregateFunc::MinInt16 => min_value::<I, i16>(values),
|
||||
AggregateFunc::MinInt32 => min_value::<I, i32>(values),
|
||||
AggregateFunc::MinInt64 => min_value::<I, i64>(values),
|
||||
AggregateFunc::MinUInt16 => min_value::<I, u16>(values),
|
||||
AggregateFunc::MinUInt32 => min_value::<I, u32>(values),
|
||||
AggregateFunc::MinUInt64 => min_value::<I, u16>(values),
|
||||
AggregateFunc::MinFloat32 => min_value::<I, OrderedF32>(values),
|
||||
AggregateFunc::MinFloat64 => min_value::<I, OrderedF64>(values),
|
||||
AggregateFunc::MinBool => min_value::<I, bool>(values),
|
||||
AggregateFunc::MinString => min_string(values),
|
||||
|
||||
AggregateFunc::SumInt16 => sum_value::<I, i16, i64>(values),
|
||||
AggregateFunc::SumInt32 => sum_value::<I, i32, i64>(values),
|
||||
AggregateFunc::SumInt64 => sum_value::<I, i64, i64>(values),
|
||||
AggregateFunc::SumUInt16 => sum_value::<I, u16, u64>(values),
|
||||
AggregateFunc::SumUInt32 => sum_value::<I, u32, u64>(values),
|
||||
AggregateFunc::SumUInt64 => sum_value::<I, u64, u64>(values),
|
||||
AggregateFunc::SumFloat32 => sum_value::<I, f32, f32>(values),
|
||||
AggregateFunc::SumFloat64 => sum_value::<I, f64, f64>(values),
|
||||
|
||||
AggregateFunc::Count => count(values),
|
||||
AggregateFunc::All => all(values),
|
||||
AggregateFunc::Any => any(values),
|
||||
_ => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn max_string<I>(values: I) -> Value
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
{
|
||||
match values.into_iter().filter(|d| !d.is_null()).max_by(|a, b| {
|
||||
let a = a.as_value_ref();
|
||||
let a = a.as_string().expect("unexpected type").unwrap();
|
||||
let b = b.as_value_ref();
|
||||
let b = b.as_string().expect("unexpected type").unwrap();
|
||||
a.cmp(b)
|
||||
}) {
|
||||
Some(v) => v,
|
||||
None => Value::Null,
|
||||
}
|
||||
}
|
||||
|
||||
fn max_value<I, TypedValue>(values: I) -> Value
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
TypedValue: TryFrom<Value> + Ord,
|
||||
<TypedValue as TryFrom<Value>>::Error: std::fmt::Debug,
|
||||
Value: From<Option<TypedValue>>,
|
||||
{
|
||||
let x: Option<TypedValue> = values
|
||||
.into_iter()
|
||||
.filter(|v| !v.is_null())
|
||||
.map(|v| TypedValue::try_from(v).expect("unexpected type"))
|
||||
.max();
|
||||
x.into()
|
||||
}
|
||||
|
||||
fn min_string<I>(values: I) -> Value
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
{
|
||||
match values.into_iter().filter(|d| !d.is_null()).min_by(|a, b| {
|
||||
let a = a.as_value_ref();
|
||||
let a = a.as_string().expect("unexpected type").unwrap();
|
||||
let b = b.as_value_ref();
|
||||
let b = b.as_string().expect("unexpected type").unwrap();
|
||||
a.cmp(b)
|
||||
}) {
|
||||
Some(v) => v,
|
||||
None => Value::Null,
|
||||
}
|
||||
}
|
||||
|
||||
fn min_value<I, TypedValue>(values: I) -> Value
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
TypedValue: TryFrom<Value> + Ord,
|
||||
<TypedValue as TryFrom<Value>>::Error: std::fmt::Debug,
|
||||
Value: From<Option<TypedValue>>,
|
||||
{
|
||||
let x: Option<TypedValue> = values
|
||||
.into_iter()
|
||||
.filter(|v| !v.is_null())
|
||||
.map(|v| TypedValue::try_from(v).expect("unexpected type"))
|
||||
.min();
|
||||
x.into()
|
||||
}
|
||||
|
||||
fn sum_value<I, ValueType, ResultType>(values: I) -> Value
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
ValueType: TryFrom<Value>,
|
||||
<ValueType as TryFrom<Value>>::Error: std::fmt::Debug,
|
||||
Value: From<Option<ValueType>>,
|
||||
ResultType: From<ValueType> + std::iter::Sum + Into<Value>,
|
||||
{
|
||||
// If no row qualifies, then the result of COUNT is 0 (zero), and the result of any other aggregate function is the null value.
|
||||
let mut values = values.into_iter().filter(|v| !v.is_null()).peekable();
|
||||
if values.peek().is_none() {
|
||||
Value::Null
|
||||
} else {
|
||||
let x = values
|
||||
.map(|v| ResultType::from(ValueType::try_from(v).expect("unexpected type")))
|
||||
.sum::<ResultType>();
|
||||
x.into()
|
||||
}
|
||||
}
|
||||
|
||||
fn count<I>(values: I) -> Value
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
{
|
||||
let x = values.into_iter().filter(|v| !v.is_null()).count() as i64;
|
||||
Value::from(x)
|
||||
}
|
||||
|
||||
fn any<I>(datums: I) -> Value
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
{
|
||||
datums
|
||||
.into_iter()
|
||||
.fold(Value::Boolean(false), |state, next| match (state, next) {
|
||||
(Value::Boolean(true), _) | (_, Value::Boolean(true)) => Value::Boolean(true),
|
||||
(Value::Null, _) | (_, Value::Null) => Value::Null,
|
||||
_ => Value::Boolean(false),
|
||||
})
|
||||
}
|
||||
|
||||
fn all<I>(datums: I) -> Value
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
{
|
||||
datums
|
||||
.into_iter()
|
||||
.fold(Value::Boolean(true), |state, next| match (state, next) {
|
||||
(Value::Boolean(false), _) | (_, Value::Boolean(false)) => Value::Boolean(false),
|
||||
(Value::Null, _) | (_, Value::Null) => Value::Null,
|
||||
_ => Value::Boolean(true),
|
||||
})
|
||||
}
|
||||
22
src/flow/src/expr/relation/mod.rs
Normal file
22
src/flow/src/expr/relation/mod.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
pub(crate) use func::AggregateFunc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::expr::ScalarExpr;
|
||||
|
||||
mod func;
|
||||
|
||||
/// function that might emit multiple output record for one input row
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
|
||||
pub enum TableFunc {}
|
||||
|
||||
/// Describes an aggregation expression.
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct AggregateExpr {
|
||||
/// Names the aggregation function.
|
||||
pub func: AggregateFunc,
|
||||
/// An expression which extracts from each row the input to `func`.
|
||||
pub expr: ScalarExpr,
|
||||
/// Should the aggregation be applied only to distinct results in each group.
|
||||
#[serde(default)]
|
||||
pub distinct: bool,
|
||||
}
|
||||
9
src/flow/src/lib.rs
Normal file
9
src/flow/src/lib.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
#![allow(unused)]
|
||||
#![allow(clippy::mutable_key_type)]
|
||||
|
||||
mod adapter;
|
||||
mod compute;
|
||||
mod expr;
|
||||
mod repr;
|
||||
mod storage;
|
||||
mod util;
|
||||
62
src/flow/src/repr/mod.rs
Normal file
62
src/flow/src/repr/mod.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
//! basically a wrapper around the `datatype` crate
|
||||
//! for basic Data Representation
|
||||
use std::borrow::Borrow;
|
||||
use std::slice::SliceIndex;
|
||||
|
||||
use datatypes::value::Value;
|
||||
pub(crate) use relation::{RelationDesc, RelationType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
/// System-wide Record count difference type.
|
||||
pub type Diff = i64;
|
||||
|
||||
mod relation;
|
||||
mod timestamp;
|
||||
|
||||
/// A row is a vector of values.
|
||||
///
|
||||
/// TODO(discord9): use a more efficient representation
|
||||
///i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\]
|
||||
|
||||
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
|
||||
pub struct Row {
|
||||
inner: Vec<Value>,
|
||||
}
|
||||
|
||||
impl Row {
|
||||
pub fn get(&self, idx: usize) -> Option<&Value> {
|
||||
self.inner.get(idx)
|
||||
}
|
||||
pub fn clear(&mut self) {
|
||||
self.inner.clear();
|
||||
}
|
||||
pub fn packer(&mut self) -> &mut Vec<Value> {
|
||||
self.inner.clear();
|
||||
&mut self.inner
|
||||
}
|
||||
pub fn pack<I>(iter: I) -> Row
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
{
|
||||
Self {
|
||||
inner: iter.into_iter().collect(),
|
||||
}
|
||||
}
|
||||
pub fn unpack(&self) -> Vec<Value> {
|
||||
self.inner.clone()
|
||||
}
|
||||
pub fn extend<I>(&mut self, iter: I)
|
||||
where
|
||||
I: IntoIterator<Item = Value>,
|
||||
{
|
||||
self.inner.extend(iter);
|
||||
}
|
||||
pub fn into_iter(self) -> impl Iterator<Item = Value> {
|
||||
self.inner.into_iter()
|
||||
}
|
||||
pub fn iter(&self) -> impl Iterator<Item = &Value> {
|
||||
self.inner.iter()
|
||||
}
|
||||
}
|
||||
|
||||
/// System-wide default timestamp type
|
||||
pub type Timestamp = u64;
|
||||
342
src/flow/src/repr/relation.rs
Normal file
342
src/flow/src/repr/relation.rs
Normal file
@@ -0,0 +1,342 @@
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// The type of a relation.
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
|
||||
pub struct RelationType {
|
||||
/// The type for each column, in order.
|
||||
pub column_types: Vec<ColumnType>,
|
||||
/// Sets of indices that are "keys" for the collection.
|
||||
///
|
||||
/// Each element in this list is a set of column indices, each with the
|
||||
/// property that the collection contains at most one record with each
|
||||
/// distinct set of values for each column. Alternately, for a specific set
|
||||
/// of values assigned to the these columns there is at most one record.
|
||||
///
|
||||
/// A collection can contain multiple sets of keys, although it is common to
|
||||
/// have either zero or one sets of key indices.
|
||||
#[serde(default)]
|
||||
pub keys: Vec<Vec<usize>>,
|
||||
}
|
||||
|
||||
impl RelationType {
|
||||
/// Constructs a `RelationType` representing the relation with no columns and
|
||||
/// no keys.
|
||||
pub fn empty() -> Self {
|
||||
RelationType::new(vec![])
|
||||
}
|
||||
|
||||
/// Constructs a new `RelationType` from specified column types.
|
||||
///
|
||||
/// The `RelationType` will have no keys.
|
||||
pub fn new(column_types: Vec<ColumnType>) -> Self {
|
||||
RelationType {
|
||||
column_types,
|
||||
keys: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a new key for the relation.
|
||||
pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
|
||||
indices.sort_unstable();
|
||||
if !self.keys.contains(&indices) {
|
||||
self.keys.push(indices);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
|
||||
for key in keys {
|
||||
self = self.with_key(key)
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Computes the number of columns in the relation.
|
||||
pub fn arity(&self) -> usize {
|
||||
self.column_types.len()
|
||||
}
|
||||
|
||||
/// Gets the index of the columns used when creating a default index.
|
||||
pub fn default_key(&self) -> Vec<usize> {
|
||||
if let Some(key) = self.keys.first() {
|
||||
if key.is_empty() {
|
||||
(0..self.column_types.len()).collect()
|
||||
} else {
|
||||
key.clone()
|
||||
}
|
||||
} else {
|
||||
(0..self.column_types.len()).collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// True if any collection described by `self` could safely be described by `other`.
|
||||
///
|
||||
/// In practice this means checking that the scalar types match exactly, and that the
|
||||
/// nullability of `self` is at least as strict as `other`, and that all keys of `other`
|
||||
/// contain some key of `self` (as a set of key columns is less strict than any subset).
|
||||
pub fn subtypes(&self, other: &RelationType) -> bool {
|
||||
let all_keys = other.keys.iter().all(|key1| {
|
||||
self.keys
|
||||
.iter()
|
||||
.any(|key2| key1.iter().all(|k| key2.contains(k)))
|
||||
});
|
||||
if !all_keys {
|
||||
return false;
|
||||
}
|
||||
|
||||
if self.column_types.len() != other.column_types.len() {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (col1, col2) in self.column_types.iter().zip(other.column_types.iter()) {
|
||||
if col1.nullable && !col2.nullable {
|
||||
return false;
|
||||
}
|
||||
if col1.scalar_type != col2.scalar_type {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// The type of a `Value`
|
||||
///
|
||||
/// [`ColumnType`] bundles information about the scalar type of a datum (e.g.,
|
||||
/// Int32 or String) with its nullability.
|
||||
///
|
||||
/// To construct a column type, either initialize the struct directly, or
|
||||
/// use the [`ScalarType::nullable`] method.
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
|
||||
pub struct ColumnType {
|
||||
/// The underlying scalar type (e.g., Int32 or String) of this column.
|
||||
pub scalar_type: ConcreteDataType,
|
||||
/// Whether this datum can be null.`
|
||||
#[serde(default = "return_true")]
|
||||
pub nullable: bool,
|
||||
}
|
||||
|
||||
/// This method exists solely for the purpose of making ColumnType nullable by
|
||||
/// default in unit tests. The default value of a bool is false, and the only
|
||||
/// way to make an object take on any other value by default is to pass it a
|
||||
/// function that returns the desired default value. See
|
||||
/// <https://github.com/serde-rs/serde/issues/1030>
|
||||
#[inline(always)]
|
||||
fn return_true() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
/// A description of the shape of a relation.
|
||||
///
|
||||
/// It bundles a [`RelationType`] with the name of each column in the relation.
|
||||
/// Individual column names are optional.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// A `RelationDesc`s is typically constructed via its builder API:
|
||||
///
|
||||
/// ```
|
||||
/// use mz_repr::{ColumnType, RelationDesc, ScalarType};
|
||||
///
|
||||
/// let desc = RelationDesc::empty()
|
||||
/// .with_column("id", ScalarType::Int64.nullable(false))
|
||||
/// .with_column("price", ScalarType::Float64.nullable(true));
|
||||
/// ```
|
||||
///
|
||||
/// In more complicated cases, like when constructing a `RelationDesc` in
|
||||
/// response to user input, it may be more convenient to construct a relation
|
||||
/// type first, and imbue it with column names to form a `RelationDesc` later:
|
||||
///
|
||||
/// ```
|
||||
/// use mz_repr::RelationDesc;
|
||||
///
|
||||
/// # fn plan_query(_: &str) -> mz_repr::RelationType { mz_repr::RelationType::new(vec![]) }
|
||||
/// let relation_type = plan_query("SELECT * FROM table");
|
||||
/// let names = (0..relation_type.arity()).map(|i| match i {
|
||||
/// 0 => "first",
|
||||
/// 1 => "second",
|
||||
/// _ => "unknown",
|
||||
/// });
|
||||
/// let desc = RelationDesc::new(relation_type, names);
|
||||
/// ```
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
|
||||
pub struct RelationDesc {
|
||||
typ: RelationType,
|
||||
names: Vec<ColumnName>,
|
||||
}
|
||||
|
||||
impl RelationDesc {
|
||||
/// Constructs a new `RelationDesc` that represents the empty relation
|
||||
/// with no columns and no keys.
|
||||
pub fn empty() -> Self {
|
||||
RelationDesc {
|
||||
typ: RelationType::empty(),
|
||||
names: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs a new `RelationDesc` from a `RelationType` and an iterator
|
||||
/// over column names.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the arity of the `RelationType` is not equal to the number of
|
||||
/// items in `names`.
|
||||
pub fn new<I, N>(typ: RelationType, names: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = N>,
|
||||
N: Into<ColumnName>,
|
||||
{
|
||||
let names: Vec<_> = names.into_iter().map(|name| name.into()).collect();
|
||||
assert_eq!(typ.column_types.len(), names.len());
|
||||
RelationDesc { typ, names }
|
||||
}
|
||||
|
||||
pub fn from_names_and_types<I, T, N>(iter: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = (N, T)>,
|
||||
T: Into<ColumnType>,
|
||||
N: Into<ColumnName>,
|
||||
{
|
||||
let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
|
||||
let types = types.into_iter().map(Into::into).collect();
|
||||
let typ = RelationType::new(types);
|
||||
Self::new(typ, names)
|
||||
}
|
||||
/// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
|
||||
pub fn concat(mut self, other: Self) -> Self {
|
||||
let self_len = self.typ.column_types.len();
|
||||
self.names.extend(other.names);
|
||||
self.typ.column_types.extend(other.typ.column_types);
|
||||
for k in other.typ.keys {
|
||||
let k = k.into_iter().map(|idx| idx + self_len).collect();
|
||||
self = self.with_key(k);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Appends a column with the specified name and type.
|
||||
pub fn with_column<N>(mut self, name: N, column_type: ColumnType) -> Self
|
||||
where
|
||||
N: Into<ColumnName>,
|
||||
{
|
||||
self.typ.column_types.push(column_type);
|
||||
self.names.push(name.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds a new key for the relation.
|
||||
pub fn with_key(mut self, indices: Vec<usize>) -> Self {
|
||||
self.typ = self.typ.with_key(indices);
|
||||
self
|
||||
}
|
||||
|
||||
/// Drops all existing keys.
|
||||
pub fn without_keys(mut self) -> Self {
|
||||
self.typ.keys.clear();
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds a new relation description with the column names replaced with
|
||||
/// new names.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the arity of the relation type does not match the number of
|
||||
/// items in `names`.
|
||||
pub fn with_names<I, N>(self, names: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = N>,
|
||||
N: Into<ColumnName>,
|
||||
{
|
||||
Self::new(self.typ, names)
|
||||
}
|
||||
|
||||
/// Computes the number of columns in the relation.
|
||||
pub fn arity(&self) -> usize {
|
||||
self.typ.arity()
|
||||
}
|
||||
|
||||
/// Returns the relation type underlying this relation description.
|
||||
pub fn typ(&self) -> &RelationType {
|
||||
&self.typ
|
||||
}
|
||||
|
||||
/// Returns an iterator over the columns in this relation.
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &ColumnType)> {
|
||||
self.iter_names().zip(self.iter_types())
|
||||
}
|
||||
|
||||
/// Returns an iterator over the types of the columns in this relation.
|
||||
pub fn iter_types(&self) -> impl Iterator<Item = &ColumnType> {
|
||||
self.typ.column_types.iter()
|
||||
}
|
||||
|
||||
/// Returns an iterator over the names of the columns in this relation.
|
||||
pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
|
||||
self.names.iter()
|
||||
}
|
||||
|
||||
/// Finds a column by name.
|
||||
///
|
||||
/// Returns the index and type of the column named `name`. If no column with
|
||||
/// the specified name exists, returns `None`. If multiple columns have the
|
||||
/// specified name, the leftmost column is returned.
|
||||
pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
|
||||
self.iter_names()
|
||||
.position(|n| n == name)
|
||||
.map(|i| (i, &self.typ.column_types[i]))
|
||||
}
|
||||
|
||||
/// Gets the name of the `i`th column.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `i` is not a valid column index.
|
||||
pub fn get_name(&self, i: usize) -> &ColumnName {
|
||||
&self.names[i]
|
||||
}
|
||||
|
||||
/// Mutably gets the name of the `i`th column.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `i` is not a valid column index.
|
||||
pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
|
||||
&mut self.names[i]
|
||||
}
|
||||
|
||||
/// Gets the name of the `i`th column if that column name is unambiguous.
|
||||
///
|
||||
/// If at least one other column has the same name as the `i`th column,
|
||||
/// returns `None`. If the `i`th column has no name, returns `None`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `i` is not a valid column index.
|
||||
pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
|
||||
let name = &self.names[i];
|
||||
if self.iter_names().filter(|n| *n == name).count() == 1 {
|
||||
Some(name)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The name of a column in a [`RelationDesc`].
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
|
||||
pub struct ColumnName(pub(crate) String);
|
||||
|
||||
impl ColumnName {
|
||||
/// Returns this column name as a `str`.
|
||||
pub fn as_str(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the string underlying this column name.
|
||||
pub fn as_mut_str(&mut self) -> &mut String {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
1
src/flow/src/repr/timestamp.rs
Normal file
1
src/flow/src/repr/timestamp.rs
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
28
src/flow/src/storage/errors.rs
Normal file
28
src/flow/src/storage/errors.rs
Normal file
@@ -0,0 +1,28 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
// TODO(discord9): more error types
|
||||
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
|
||||
pub enum DataflowError {
|
||||
EvalError(Box<EvalError>),
|
||||
}
|
||||
|
||||
impl From<EvalError> for DataflowError {
|
||||
fn from(e: EvalError) -> Self {
|
||||
DataflowError::EvalError(Box::new(e))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
|
||||
pub enum EvalError {
|
||||
DivisionByZero,
|
||||
TypeMismatch(String),
|
||||
InvalidArgument(String),
|
||||
Internal(String),
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tell_goal() {
|
||||
use differential_dataflow::ExchangeData;
|
||||
fn a<T: ExchangeData>(_: T) {}
|
||||
a(DataflowError::from(EvalError::DivisionByZero));
|
||||
}
|
||||
4
src/flow/src/storage/mod.rs
Normal file
4
src/flow/src/storage/mod.rs
Normal file
@@ -0,0 +1,4 @@
|
||||
//! TODO: Storage Layer: wrap grpc write request for providing definite collection for streaming process, and able to send read request should random access is needed
|
||||
//! and store result of stream processing
|
||||
|
||||
pub(crate) mod errors;
|
||||
0
src/flow/src/storage/source.rs
Normal file
0
src/flow/src/storage/source.rs
Normal file
150
src/flow/src/util/buffer.rs
Normal file
150
src/flow/src/util/buffer.rs
Normal file
@@ -0,0 +1,150 @@
|
||||
use differential_dataflow::consolidation::consolidate_updates;
|
||||
use differential_dataflow::difference::Semigroup;
|
||||
use differential_dataflow::Data;
|
||||
use timely::communication::Push;
|
||||
use timely::dataflow::channels::Bundle;
|
||||
use timely::dataflow::operators::generic::OutputHandle;
|
||||
use timely::dataflow::operators::{Capability, InputCapability};
|
||||
use timely::progress::Timestamp;
|
||||
|
||||
/// A buffer that consolidates updates
|
||||
///
|
||||
/// The buffer implements a wrapper around [OutputHandle] consolidating elements pushed to it. It is
|
||||
/// backed by a capacity-limited buffer, which means that compaction only occurs within the
|
||||
/// dimensions of the buffer, i.e. the number of unique keys is less than half of the buffer's
|
||||
/// capacity.
|
||||
///
|
||||
/// A cap is retained whenever the current time changes to be able to flush on drop or when the time
|
||||
/// changes again.
|
||||
///
|
||||
/// The buffer is filled with updates until it reaches its capacity. At this point, the updates are
|
||||
/// consolidated to free up space. This process repeats until the consolidation recovered less than
|
||||
/// half of the buffer's capacity, at which point the buffer will be shipped.
|
||||
///
|
||||
/// The buffer retains a capability to send data on flush. It will flush all data once dropped, if
|
||||
/// time changes, or if the buffer capacity is reached.
|
||||
pub struct ConsolidateBuffer<'a, 'b, T, D: Data, R: Semigroup, P>
|
||||
where
|
||||
P: Push<Bundle<T, (D, T, R)>> + 'a,
|
||||
T: Data + Timestamp + 'a,
|
||||
D: 'a,
|
||||
{
|
||||
// a buffer for records, to send at self.cap
|
||||
// Invariant: Buffer only contains data if cap is Some.
|
||||
buffer: Vec<(D, T, R)>,
|
||||
output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>,
|
||||
cap: Option<Capability<T>>,
|
||||
port: usize,
|
||||
previous_len: usize,
|
||||
}
|
||||
|
||||
impl<'a, 'b, T, D: Data, R: Semigroup, P> ConsolidateBuffer<'a, 'b, T, D, R, P>
|
||||
where
|
||||
T: Data + Timestamp + 'a,
|
||||
P: Push<Bundle<T, (D, T, R)>> + 'a,
|
||||
{
|
||||
/// Create a new [ConsolidateBuffer], wrapping the provided session.
|
||||
///
|
||||
/// * `output_handle`: The output to send data to.
|
||||
/// * 'port': The output port to retain capabilities for.
|
||||
pub fn new(output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>, port: usize) -> Self {
|
||||
Self {
|
||||
output_handle,
|
||||
port,
|
||||
cap: None,
|
||||
buffer: Vec::with_capacity(::timely::container::buffer::default_capacity::<(D, T, R)>()),
|
||||
previous_len: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Provides an iterator of elements to the buffer
|
||||
pub fn give_iterator<I: Iterator<Item = (D, T, R)>>(
|
||||
&mut self,
|
||||
cap: &InputCapability<T>,
|
||||
iter: I,
|
||||
) {
|
||||
for item in iter {
|
||||
self.give(cap, item);
|
||||
}
|
||||
}
|
||||
|
||||
/// Give an element to the buffer
|
||||
pub fn give(&mut self, cap: &InputCapability<T>, data: (D, T, R)) {
|
||||
// Retain a cap for the current time, which will be used on flush.
|
||||
if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
|
||||
// Flush on capability change
|
||||
self.flush();
|
||||
// Retain capability for the specified output port.
|
||||
self.cap = Some(cap.delayed_for_output(cap.time(), self.port));
|
||||
}
|
||||
self.give_internal(data);
|
||||
}
|
||||
|
||||
/// Give an element to the buffer, using a pre-fabricated capability. Note that the capability
|
||||
/// must be valid for the associated output.
|
||||
pub fn give_at(&mut self, cap: &Capability<T>, data: (D, T, R)) {
|
||||
// Retain a cap for the current time, which will be used on flush.
|
||||
if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
|
||||
// Flush on capability change
|
||||
self.flush();
|
||||
// Retain capability.
|
||||
self.cap = Some(cap.clone());
|
||||
}
|
||||
self.give_internal(data);
|
||||
}
|
||||
|
||||
/// Give an element and possibly flush the buffer. Note that this needs to have access
|
||||
/// to a capability, which the public functions ensure.
|
||||
fn give_internal(&mut self, data: (D, T, R)) {
|
||||
self.buffer.push(data);
|
||||
|
||||
// Limit, if possible, the lifetime of the allocations for data
|
||||
// and consolidate smaller buffers if we're in the lucky case
|
||||
// of a small domain for D
|
||||
if self.buffer.len() >= 2 * self.previous_len {
|
||||
// Consolidate while the consolidation frees at least half the buffer
|
||||
consolidate_updates(&mut self.buffer);
|
||||
if self.buffer.len() > self.buffer.capacity() / 2 {
|
||||
self.flush();
|
||||
} else {
|
||||
self.previous_len = self.buffer.len();
|
||||
}
|
||||
// At this point, it is an invariant across give calls that self.previous_len
|
||||
// will be in the interval [0, self.buffer.capacity() / 2]. So, we will enter
|
||||
// this if-statement block again when self.buffer.len() == self.buffer.capacity()
|
||||
// or earlier. If consolidation is not effective to keep self.buffer.len()
|
||||
// below half capacity, then flushing when more than half-full will
|
||||
// maintain the invariant.
|
||||
}
|
||||
}
|
||||
|
||||
/// Flush the internal buffer to the underlying session
|
||||
pub fn flush(&mut self) {
|
||||
if let Some(cap) = &self.cap {
|
||||
self.output_handle.session(cap).give_vec(&mut self.buffer);
|
||||
|
||||
// Ensure that the capacity is at least equal to the default in case
|
||||
// it was reduced by give_vec. Note that we cannot rely here on give_vec
|
||||
// returning us a buffer with zero capacity.
|
||||
if self.buffer.capacity() < ::timely::container::buffer::default_capacity::<(D, T, R)>()
|
||||
{
|
||||
let to_reserve = ::timely::container::buffer::default_capacity::<(D, T, R)>()
|
||||
- self.buffer.capacity();
|
||||
self.buffer.reserve_exact(to_reserve);
|
||||
}
|
||||
self.previous_len = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'b, T, D: Data, R: Semigroup, P> Drop for ConsolidateBuffer<'a, 'b, T, D, R, P>
|
||||
where
|
||||
P: Push<Bundle<T, (D, T, R)>> + 'a,
|
||||
T: Data + Timestamp + 'a,
|
||||
D: 'a,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
self.flush();
|
||||
}
|
||||
}
|
||||
7
src/flow/src/util/mod.rs
Normal file
7
src/flow/src/util/mod.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
//! utilitys including extend differential dataflow to deal with errors and etc.
|
||||
mod buffer;
|
||||
mod operator;
|
||||
mod reduce;
|
||||
|
||||
pub use operator::CollectionExt;
|
||||
pub use reduce::ReduceExt;
|
||||
257
src/flow/src/util/operator.rs
Normal file
257
src/flow/src/util/operator.rs
Normal file
@@ -0,0 +1,257 @@
|
||||
use differential_dataflow::difference::{Multiply, Semigroup};
|
||||
use differential_dataflow::lattice::Lattice;
|
||||
use differential_dataflow::operators::arrange::Arrange;
|
||||
use differential_dataflow::trace::{Batch, Trace, TraceReader};
|
||||
use differential_dataflow::{AsCollection, Collection};
|
||||
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
|
||||
use timely::dataflow::channels::pushers::Tee;
|
||||
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
|
||||
use timely::dataflow::operators::generic::operator::{self, Operator};
|
||||
use timely::dataflow::operators::generic::{InputHandle, OperatorInfo, OutputHandle};
|
||||
use timely::dataflow::operators::Capability;
|
||||
use timely::dataflow::{Scope, Stream};
|
||||
use timely::{Data, ExchangeData};
|
||||
|
||||
use crate::util::buffer::ConsolidateBuffer;
|
||||
|
||||
pub trait StreamExt<G, D1>
|
||||
where
|
||||
D1: Data,
|
||||
G: Scope,
|
||||
{
|
||||
/// Like `timely::dataflow::operators::generic::operator::Operator::unary`,
|
||||
/// but the logic function can handle failures.
|
||||
///
|
||||
/// Creates a new dataflow operator that partitions its input stream by a
|
||||
/// parallelization strategy `pact` and repeatedly invokes `logic`, the
|
||||
/// function returned by the function passed as `constructor`. The `logic`
|
||||
/// function can read to the input stream and write to either of two output
|
||||
/// streams, where the first output stream represents successful
|
||||
/// computations and the second output stream represents failed
|
||||
/// computations.
|
||||
fn unary_fallible<D2, E, B, P>(
|
||||
&self,
|
||||
pact: P,
|
||||
name: &str,
|
||||
constructor: B,
|
||||
) -> (Stream<G, D2>, Stream<G, E>)
|
||||
where
|
||||
D2: Data,
|
||||
E: Data,
|
||||
B: FnOnce(
|
||||
Capability<G::Timestamp>,
|
||||
OperatorInfo,
|
||||
) -> Box<
|
||||
dyn FnMut(
|
||||
&mut InputHandle<G::Timestamp, D1, P::Puller>,
|
||||
&mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>,
|
||||
&mut OutputHandle<G::Timestamp, E, Tee<G::Timestamp, E>>,
|
||||
) + 'static,
|
||||
>,
|
||||
P: ParallelizationContract<G::Timestamp, D1>;
|
||||
|
||||
/// Like [`timely::dataflow::operators::map::Map::flat_map`], but `logic`
|
||||
/// is allowed to fail. The first returned stream will contain the
|
||||
/// successful applications of `logic`, while the second returned stream
|
||||
/// will contain the failed applications.
|
||||
fn flat_map_fallible<D2, E, I, L>(&self, name: &str, logic: L) -> (Stream<G, D2>, Stream<G, E>)
|
||||
where
|
||||
D2: Data,
|
||||
E: Data,
|
||||
I: IntoIterator<Item = Result<D2, E>>,
|
||||
L: FnMut(D1) -> I + 'static;
|
||||
}
|
||||
|
||||
/// Extension methods for differential [`Collection`]s.
|
||||
pub trait CollectionExt<G, D1, R>
|
||||
where
|
||||
G: Scope,
|
||||
R: Semigroup,
|
||||
{
|
||||
/// Creates a new empty collection in `scope`.
|
||||
fn empty(scope: &G) -> Collection<G, D1, R>;
|
||||
|
||||
/// Like [`Collection::map`], but `logic` is allowed to fail. The first
|
||||
/// returned collection will contain successful applications of `logic`,
|
||||
/// while the second returned collection will contain the failed
|
||||
/// applications.
|
||||
fn map_fallible<D2, E, L>(
|
||||
&self,
|
||||
name: &str,
|
||||
mut logic: L,
|
||||
) -> (Collection<G, D2, R>, Collection<G, E, R>)
|
||||
where
|
||||
D2: Data,
|
||||
E: Data,
|
||||
L: FnMut(D1) -> Result<D2, E> + 'static,
|
||||
{
|
||||
self.flat_map_fallible(name, move |record| Some(logic(record)))
|
||||
}
|
||||
|
||||
/// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first
|
||||
/// returned collection will contain the successful applications of `logic`,
|
||||
/// while the second returned collection will contain the failed
|
||||
/// applications.
|
||||
fn flat_map_fallible<D2, E, I, L>(
|
||||
&self,
|
||||
name: &str,
|
||||
logic: L,
|
||||
) -> (Collection<G, D2, R>, Collection<G, E, R>)
|
||||
where
|
||||
D2: Data,
|
||||
E: Data,
|
||||
I: IntoIterator<Item = Result<D2, E>>,
|
||||
L: FnMut(D1) -> I + 'static;
|
||||
|
||||
/// Replaces each record with another, with a new difference type.
|
||||
///
|
||||
/// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
|
||||
/// and move the data into the difference component. This will allow differential dataflow to update in-place.
|
||||
fn explode_one<D2, R2, L>(&self, logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
|
||||
where
|
||||
D2: differential_dataflow::Data,
|
||||
R2: Semigroup + Multiply<R>,
|
||||
<R2 as Multiply<R>>::Output: Data + Semigroup,
|
||||
L: FnMut(D1) -> (D2, R2) + 'static,
|
||||
G::Timestamp: Lattice;
|
||||
}
|
||||
|
||||
impl<G, D1> StreamExt<G, D1> for Stream<G, D1>
|
||||
where
|
||||
D1: Data,
|
||||
G: Scope,
|
||||
{
|
||||
fn unary_fallible<D2, E, B, P>(
|
||||
&self,
|
||||
pact: P,
|
||||
name: &str,
|
||||
constructor: B,
|
||||
) -> (Stream<G, D2>, Stream<G, E>)
|
||||
where
|
||||
D2: Data,
|
||||
E: Data,
|
||||
B: FnOnce(
|
||||
Capability<G::Timestamp>,
|
||||
OperatorInfo,
|
||||
) -> Box<
|
||||
dyn FnMut(
|
||||
&mut InputHandle<G::Timestamp, D1, P::Puller>,
|
||||
&mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>,
|
||||
&mut OutputHandle<G::Timestamp, E, Tee<G::Timestamp, E>>,
|
||||
) + 'static,
|
||||
>,
|
||||
P: ParallelizationContract<G::Timestamp, D1>,
|
||||
{
|
||||
let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
|
||||
builder.set_notify(false);
|
||||
|
||||
let operator_info = builder.operator_info();
|
||||
|
||||
let mut input = builder.new_input(self, pact);
|
||||
let (mut ok_output, ok_stream) = builder.new_output();
|
||||
let (mut err_output, err_stream) = builder.new_output();
|
||||
|
||||
builder.build(move |mut capabilities| {
|
||||
// `capabilities` should be a single-element vector.
|
||||
let capability = capabilities.pop().unwrap();
|
||||
let mut logic = constructor(capability, operator_info);
|
||||
move |_frontiers| {
|
||||
let mut ok_output_handle = ok_output.activate();
|
||||
let mut err_output_handle = err_output.activate();
|
||||
logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
|
||||
}
|
||||
});
|
||||
|
||||
(ok_stream, err_stream)
|
||||
}
|
||||
|
||||
#[allow(clippy::redundant_closure)]
|
||||
fn flat_map_fallible<D2, E, I, L>(
|
||||
&self,
|
||||
name: &str,
|
||||
mut logic: L,
|
||||
) -> (Stream<G, D2>, Stream<G, E>)
|
||||
where
|
||||
D2: Data,
|
||||
E: Data,
|
||||
I: IntoIterator<Item = Result<D2, E>>,
|
||||
L: FnMut(D1) -> I + 'static,
|
||||
{
|
||||
let mut storage = Vec::new();
|
||||
self.unary_fallible(Pipeline, name, move |_, _| {
|
||||
Box::new(move |input, ok_output, err_output| {
|
||||
input.for_each(|time, data| {
|
||||
let mut ok_session = ok_output.session(&time);
|
||||
let mut err_session = err_output.session(&time);
|
||||
data.swap(&mut storage);
|
||||
for r in storage.drain(..).flat_map(|d1| logic(d1)) {
|
||||
match r {
|
||||
Ok(d2) => ok_session.give(d2),
|
||||
Err(e) => err_session.give(e),
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<G, D1, R> CollectionExt<G, D1, R> for Collection<G, D1, R>
|
||||
where
|
||||
G: Scope,
|
||||
G::Timestamp: Data,
|
||||
D1: Data,
|
||||
R: Semigroup,
|
||||
{
|
||||
fn empty(scope: &G) -> Collection<G, D1, R> {
|
||||
operator::empty(scope).as_collection()
|
||||
}
|
||||
|
||||
fn flat_map_fallible<D2, E, I, L>(
|
||||
&self,
|
||||
name: &str,
|
||||
mut logic: L,
|
||||
) -> (Collection<G, D2, R>, Collection<G, E, R>)
|
||||
where
|
||||
D2: Data,
|
||||
E: Data,
|
||||
I: IntoIterator<Item = Result<D2, E>>,
|
||||
L: FnMut(D1) -> I + 'static,
|
||||
{
|
||||
let (ok_stream, err_stream) = self.inner.flat_map_fallible(name, move |(d1, t, r)| {
|
||||
logic(d1).into_iter().map(move |res| match res {
|
||||
Ok(d2) => Ok((d2, t.clone(), r.clone())),
|
||||
Err(e) => Err((e, t.clone(), r.clone())),
|
||||
})
|
||||
});
|
||||
(ok_stream.as_collection(), err_stream.as_collection())
|
||||
}
|
||||
|
||||
fn explode_one<D2, R2, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
|
||||
where
|
||||
D2: differential_dataflow::Data,
|
||||
R2: Semigroup + Multiply<R>,
|
||||
<R2 as Multiply<R>>::Output: Data + Semigroup,
|
||||
L: FnMut(D1) -> (D2, R2) + 'static,
|
||||
G::Timestamp: Lattice,
|
||||
{
|
||||
self.inner
|
||||
.unary(Pipeline, "ExplodeOne", move |_, _| {
|
||||
let mut buffer = Vec::new();
|
||||
move |input, output| {
|
||||
let mut out = ConsolidateBuffer::new(output, 0);
|
||||
input.for_each(|time, data| {
|
||||
data.swap(&mut buffer);
|
||||
out.give_iterator(
|
||||
&time,
|
||||
buffer.drain(..).map(|(x, t, d)| {
|
||||
let (x, d2) = logic(x);
|
||||
(x, t, d2.multiply(&d))
|
||||
}),
|
||||
);
|
||||
});
|
||||
}
|
||||
})
|
||||
.as_collection()
|
||||
}
|
||||
}
|
||||
68
src/flow/src/util/reduce.rs
Normal file
68
src/flow/src/util/reduce.rs
Normal file
@@ -0,0 +1,68 @@
|
||||
use differential_dataflow::difference::{Abelian, Semigroup};
|
||||
use differential_dataflow::lattice::Lattice;
|
||||
use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
|
||||
use differential_dataflow::operators::reduce::ReduceCore;
|
||||
use differential_dataflow::trace::{Batch, Trace, TraceReader};
|
||||
use differential_dataflow::Data;
|
||||
use timely::dataflow::Scope;
|
||||
|
||||
/// Extension trait for `ReduceCore`, currently providing a reduction based
|
||||
/// on an operator-pair approach.
|
||||
pub trait ReduceExt<G: Scope, K: Data, V: Data, R: Semigroup>
|
||||
where
|
||||
G::Timestamp: Lattice + Ord,
|
||||
{
|
||||
/// This method produces a reduction pair based on the same input arrangement. Each reduction
|
||||
/// in the pair operates with its own logic and the two output arrangements from the reductions
|
||||
/// are produced as a result. The method is useful for reductions that need to present different
|
||||
/// output views on the same input data. An example is producing an error-free reduction output
|
||||
/// along with a separate error output indicating when the error-free output is valid.
|
||||
fn reduce_pair<L1, T1, L2, T2>(
|
||||
&self,
|
||||
name1: &str,
|
||||
name2: &str,
|
||||
logic1: L1,
|
||||
logic2: L2,
|
||||
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
|
||||
where
|
||||
T1: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
|
||||
T1::Val: Data,
|
||||
T1::R: Abelian,
|
||||
T1::Batch: Batch,
|
||||
L1: FnMut(&K, &[(&V, R)], &mut Vec<(T1::Val, T1::R)>) + 'static,
|
||||
T2: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
|
||||
T2::Val: Data,
|
||||
T2::R: Abelian,
|
||||
T2::Batch: Batch,
|
||||
L2: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static;
|
||||
}
|
||||
|
||||
impl<G: Scope, K: Data, V: Data, Tr, R: Semigroup> ReduceExt<G, K, V, R> for Arranged<G, Tr>
|
||||
where
|
||||
G::Timestamp: Lattice + Ord,
|
||||
Tr: TraceReader<Key = K, Val = V, Time = G::Timestamp, R = R> + Clone + 'static,
|
||||
{
|
||||
fn reduce_pair<L1, T1, L2, T2>(
|
||||
&self,
|
||||
name1: &str,
|
||||
name2: &str,
|
||||
logic1: L1,
|
||||
logic2: L2,
|
||||
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
|
||||
where
|
||||
T1: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
|
||||
T1::Val: Data,
|
||||
T1::R: Abelian,
|
||||
T1::Batch: Batch,
|
||||
L1: FnMut(&K, &[(&V, R)], &mut Vec<(T1::Val, T1::R)>) + 'static,
|
||||
T2: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
|
||||
T2::Val: Data,
|
||||
T2::R: Abelian,
|
||||
T2::Batch: Batch,
|
||||
L2: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static,
|
||||
{
|
||||
let arranged1 = self.reduce_abelian::<L1, T1>(name1, logic1);
|
||||
let arranged2 = self.reduce_abelian::<L2, T2>(name2, logic2);
|
||||
(arranged1, arranged2)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user