diff --git a/Cargo.toml b/Cargo.toml index b8386aae7e..e44f63534d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ members = [ "src/sql", "src/storage", "src/store-api", - "src/stream", + "src/flow", "src/table", "src/table-procedure", "tests-integration", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 26b7eaf668..cf35262501 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -6,7 +6,11 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -timely = "0.12" +timely = {version = "0.12", features=["bincode"]} differential-dataflow = "0.12" abomonation = "0.7.3" -abomonation_derive = "0.5" \ No newline at end of file +abomonation_derive = "0.5" +datafusion-expr.workspace = true +datafusion-substrait.workspace = true +serde = {version = "1.0", features = ["derive"]} +datatypes = { path = "../datatypes" } \ No newline at end of file diff --git a/src/flow/src/adapter/mod.rs b/src/flow/src/adapter/mod.rs new file mode 100644 index 0000000000..05d9a4b220 --- /dev/null +++ b/src/flow/src/adapter/mod.rs @@ -0,0 +1 @@ +//! for getting data from source and sending results to sink diff --git a/src/flow/src/compute/context.rs b/src/flow/src/compute/context.rs new file mode 100644 index 0000000000..c4e93f152a --- /dev/null +++ b/src/flow/src/compute/context.rs @@ -0,0 +1,55 @@ +use std::marker::PhantomData; + +use differential_dataflow::lattice::Lattice; +use differential_dataflow::Collection; +use serde::{Deserialize, Serialize}; +use timely::dataflow::Scope; +use timely::progress::timestamp::Refines; +use timely::progress::{Antichain, Timestamp}; +use timely::Data; + +use crate::Diff; +pub struct Context +where + T: Timestamp + Lattice, + S: Scope, + S::Timestamp: Lattice + Refines, +{ + /// The scope within which all managed collections exist. + /// + /// It is an error to add any collections not contained in this scope. + pub(crate) scope: S, + /// The debug name of the dataflow associated with this context. + pub debug_name: String, + /// The Timely ID of the dataflow associated with this context. + pub dataflow_id: usize, + /// Frontier before which updates should not be emitted. + /// + /// We *must* apply it to sinks, to ensure correct outputs. + /// We *should* apply it to sources and imported traces, because it improves performance. + pub since_frontier: Antichain, + /// Frontier after which updates should not be emitted. + /// Used to limit the amount of work done when appropriate. + pub upper_frontier: Antichain, +} + +#[derive(Clone)] +pub struct CollectionBundle +where + T: Timestamp + Lattice, + S::Timestamp: Lattice + Refines, +{ + pub(crate) collection: Collection, + /// TODO: impl arranged in memory + pub(crate) arranged: PhantomData, +} + +#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)] +pub enum DataflowError { + EvalError(EvalError), +} + +#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)] +pub enum EvalError { + DivisionByZero, +} diff --git a/src/flow/src/compute/mod.rs b/src/flow/src/compute/mod.rs new file mode 100644 index 0000000000..bec1ec97f9 --- /dev/null +++ b/src/flow/src/compute/mod.rs @@ -0,0 +1,4 @@ +//! for generate dataflow from logical plan and computing the dataflow +mod context; +use datafusion_expr::LogicalPlan as DfLogicalPlan; +use datafusion_substrait::logical_plan::producer::to_substrait_plan; diff --git a/src/flow/src/expr/id.rs b/src/flow/src/expr/id.rs new file mode 100644 index 0000000000..7f055a793f --- /dev/null +++ b/src/flow/src/expr/id.rs @@ -0,0 +1,22 @@ +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum GlobalId { + /// System namespace. + System(u64), + /// User namespace. + User(u64), + /// Transient namespace. + Transient(u64), + /// Dummy id for query being explained + Explain, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct LocalId(pub(crate) u64); + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum Id { + /// An identifier that refers to a local component of a dataflow. + Local(LocalId), + /// An identifier that refers to a global dataflow. + Global(GlobalId), +} diff --git a/src/flow/src/expr/lir.rs b/src/flow/src/expr/lir.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/flow/src/expr/mod.rs b/src/flow/src/expr/mod.rs new file mode 100644 index 0000000000..11ad9a2ea6 --- /dev/null +++ b/src/flow/src/expr/mod.rs @@ -0,0 +1,5 @@ +//! for declare dataflow description that is the last step before build dataflow + +mod id; + +pub use id::{GlobalId, Id, LocalId}; diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index d7d68cbfc3..705cdd0b7e 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -1,13 +1,20 @@ -use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::Count; -use timely::progress::timestamp::Refines; +mod adapter; +mod compute; +mod expr; +mod repr; +mod storage; + +/// Record count difference type. +pub type Diff = i64; #[test] -fn demo() { - use abomonation_derive::Abomonation; +fn demo_multitemporal() { use differential_dataflow::input::InputSession; - use differential_dataflow::operators::Join; - #[derive(Debug, Clone, Default, Eq, PartialEq, Hash, Abomonation)] + use differential_dataflow::lattice::Lattice; + use differential_dataflow::operators::{Count, Join}; + use serde::{Deserialize, Serialize}; + use timely::progress::timestamp::Refines; + #[derive(Debug, Clone, Default, Eq, PartialEq, Hash, Deserialize, Serialize)] /// (System, Event) struct MT(usize, usize); impl Ord for MT { @@ -91,7 +98,7 @@ fn demo() { }); // Read a size for our organization from the arguments. - let size = 100; + let size = 10; // Load input (a binary tree). input.advance_to(MT(0, 0)); diff --git a/src/flow/src/repr/mod.rs b/src/flow/src/repr/mod.rs new file mode 100644 index 0000000000..c4af69dbc3 --- /dev/null +++ b/src/flow/src/repr/mod.rs @@ -0,0 +1,9 @@ +//! basically a wrapper around the `datatype` crate +//! for basic Data Representation +use datatypes::value::Value; + +/// A row is a vector of values. +/// +/// TODO: use a more efficient representation +/// i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\] +pub type Row = Vec; diff --git a/src/flow/src/storage/mod.rs b/src/flow/src/storage/mod.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/src/flow/src/storage/mod.rs @@ -0,0 +1 @@ +