From 47f41371d00162c7cbbe6ae4d741dc22b270f1ba Mon Sep 17 00:00:00 2001 From: Discord9 Date: Thu, 20 Jul 2023 15:18:39 +0800 Subject: [PATCH] Arrangement&types --- Cargo.lock | 9 ++- src/flow/Cargo.toml | 17 ++-- src/flow/src/adapter/mod.rs | 2 + src/flow/src/compute/context.rs | 93 ++++++++++++++++----- src/flow/src/compute/mod.rs | 5 +- src/flow/src/compute/plan/mod.rs | 83 +++++++++++++++++++ src/flow/src/compute/typedefs.rs | 20 +++++ src/flow/src/compute/types/mod.rs | 1 + src/flow/src/expr/func.rs | 16 ++++ src/flow/src/expr/lir.rs | 0 src/flow/src/expr/mod.rs | 18 +++++ src/flow/src/lib.rs | 129 +----------------------------- src/flow/src/render/mod.rs | 2 + src/flow/src/repr/mod.rs | 32 +++++++- src/flow/src/storage/errors.rs | 11 +++ src/flow/src/storage/mod.rs | 2 +- 16 files changed, 277 insertions(+), 163 deletions(-) create mode 100644 src/flow/src/compute/plan/mod.rs create mode 100644 src/flow/src/compute/typedefs.rs create mode 100644 src/flow/src/compute/types/mod.rs create mode 100644 src/flow/src/expr/func.rs delete mode 100644 src/flow/src/expr/lir.rs create mode 100644 src/flow/src/render/mod.rs create mode 100644 src/flow/src/storage/errors.rs diff --git a/Cargo.lock b/Cargo.lock index a4eb226493..79e59faa9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2852,7 +2852,8 @@ checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" [[package]] name = "differential-dataflow" version = "0.12.0" -source = "git+https://github.com/TimelyDataflow/differential-dataflow.git#99fa67db2b92d2ee938c6ffef0912908de3ef288" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecb0345111032cfd995a1e9c1b79387a0e6bf6690be5d8dd12a58f4861bc6d9" dependencies = [ "abomonation", "abomonation_derive", @@ -9588,9 +9589,11 @@ checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" name = "stream" version = "0.1.0" dependencies = [ - "abomonation", - "abomonation_derive", + "datafusion-expr", + "datafusion-substrait", + "datatypes", "differential-dataflow", + "serde", "timely", ] diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index cf35262501..c4cf4de9f3 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -6,11 +6,16 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -timely = {version = "0.12", features=["bincode"]} -differential-dataflow = "0.12" -abomonation = "0.7.3" -abomonation_derive = "0.5" +# use version from crates.io for now to prevent version slewing +# disable default-features which include `abomonaion` which we don't need for IPC +# timely = {version = "0.12.0", default-features = false, features = ["bincode"]} +# differential-dataflow = "0.12.0" +timely = "0.12.0" +differential-dataflow = "0.12.0" +# TODO: fork later for fixed version git dependency +# timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } +# differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow", rev ="99fa67db"} datafusion-expr.workspace = true datafusion-substrait.workspace = true -serde = {version = "1.0", features = ["derive"]} -datatypes = { path = "../datatypes" } \ No newline at end of file +serde = { version = "1.0", features = ["derive"] } +datatypes = { path = "../datatypes" } diff --git a/src/flow/src/adapter/mod.rs b/src/flow/src/adapter/mod.rs index 05d9a4b220..e9a3aa3352 100644 --- a/src/flow/src/adapter/mod.rs +++ b/src/flow/src/adapter/mod.rs @@ -1 +1,3 @@ //! for getting data from source and sending results to sink +//! and communicating with other parts of the database +//! also commands storage and computation layer diff --git a/src/flow/src/compute/context.rs b/src/flow/src/compute/context.rs index c4e93f152a..0fe4f02f78 100644 --- a/src/flow/src/compute/context.rs +++ b/src/flow/src/compute/context.rs @@ -1,15 +1,59 @@ -use std::marker::PhantomData; +use std::collections::BTreeMap; use differential_dataflow::lattice::Lattice; -use differential_dataflow::Collection; -use serde::{Deserialize, Serialize}; -use timely::dataflow::Scope; +use differential_dataflow::operators::arrange::Arranged; +use differential_dataflow::trace::wrappers::enter::TraceEnter; +use differential_dataflow::trace::wrappers::frontier::TraceFrontier; +use differential_dataflow::{Collection, Data}; +use timely::dataflow::{Scope, ScopeParent}; use timely::progress::timestamp::Refines; use timely::progress::{Antichain, Timestamp}; -use timely::Data; -use crate::Diff; -pub struct Context +use crate::compute::typedefs::{TraceErrHandle, TraceRowHandle}; +use crate::expr::{GlobalId, Id, ScalarExpr}; +use crate::repr; +use crate::repr::Diff; +use crate::storage::errors::DataflowError; + +// Local type definition to avoid the horror in signatures. +pub(crate) type KeyArrangement = + Arranged::Timestamp, Diff>>; +pub(crate) type Arrangement = KeyArrangement; +pub(crate) type ErrArrangement = + Arranged::Timestamp, Diff>>; +pub(crate) type ArrangementImport = Arranged< + S, + TraceEnter>, ::Timestamp>, +>; +pub(crate) type ErrArrangementImport = Arranged< + S, + TraceEnter< + TraceFrontier>, + ::Timestamp, + >, +>; + +/// Describes flavor of arrangement: local or imported trace. +#[derive(Clone)] +pub enum ArrangementFlavor +where + T: Timestamp + Lattice, + S::Timestamp: Lattice + Refines, +{ + /// A dataflow-local arrangement. + Local(Arrangement, ErrArrangement), + /// An imported trace from outside the dataflow. + /// + /// The `GlobalId` identifier exists so that exports of this same trace + /// can refer back to and depend on the original instance. + Trace( + GlobalId, + ArrangementImport, + ErrArrangementImport, + ), +} + +pub struct Context where T: Timestamp + Lattice, S: Scope, @@ -30,26 +74,35 @@ where pub since_frontier: Antichain, /// Frontier after which updates should not be emitted. /// Used to limit the amount of work done when appropriate. - pub upper_frontier: Antichain, + pub until_frontier: Antichain, + /// Bindings of identifiers to collections. + pub bindings: BTreeMap>, +} + +impl Context +where + S::Timestamp: Lattice + Refines, +{ + /// TODO" DataflowDesc & Plan & etc. + pub fn for_dataflow_in(scope: S) -> Self { + let dataflow_id = scope.addr()[0]; + // TODO(discord9): get since_frontier and until_frontier from dataflow_desc + todo!() + } } #[derive(Clone)] -pub struct CollectionBundle +pub struct CollectionBundle where T: Timestamp + Lattice, + S: Scope, S::Timestamp: Lattice + Refines, + V: Data, { pub(crate) collection: Collection, - /// TODO: impl arranged in memory - pub(crate) arranged: PhantomData, + /// TODO: impl: 1. ScalarExpr(Could be from substrait), 2. Arrangement + pub(crate) arranged: BTreeMap, ArrangementFlavor>, } -#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)] -pub enum DataflowError { - EvalError(EvalError), -} - -#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)] -pub enum EvalError { - DivisionByZero, -} +#[derive(Clone)] +pub enum ArrangementFlavored {} diff --git a/src/flow/src/compute/mod.rs b/src/flow/src/compute/mod.rs index bec1ec97f9..4741ae6d79 100644 --- a/src/flow/src/compute/mod.rs +++ b/src/flow/src/compute/mod.rs @@ -1,4 +1,5 @@ //! for generate dataflow from logical plan and computing the dataflow mod context; -use datafusion_expr::LogicalPlan as DfLogicalPlan; -use datafusion_substrait::logical_plan::producer::to_substrait_plan; +mod plan; +mod typedefs; +mod types; diff --git a/src/flow/src/compute/plan/mod.rs b/src/flow/src/compute/plan/mod.rs new file mode 100644 index 0000000000..80ba77f64b --- /dev/null +++ b/src/flow/src/compute/plan/mod.rs @@ -0,0 +1,83 @@ +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +use crate::expr::{Id, ScalarExpr}; +use crate::repr::{self, Row}; +use crate::storage::errors::EvalError; + +/// The forms in which an operator's output is available; +/// it can be considered the plan-time equivalent of +/// `render::context::CollectionBundle`. +/// +/// These forms are either "raw", representing an unarranged collection, +/// or "arranged", representing one that has been arranged by some key. +/// +/// The raw collection, if it exists, may be consumed directly. +/// +/// The arranged collections are slightly more complicated: +/// Each key here is attached to a description of how the corresponding +/// arrangement is permuted to remove value columns +/// that are redundant with key columns. Thus, the first element in each +/// tuple of `arranged` is the arrangement key; the second is the map of +/// logical output columns to columns in the key or value of the deduplicated +/// representation, and the third is a "thinning expression", +/// or list of columns to include in the value +/// when arranging. +/// +/// For example, assume a 5-column collection is to be arranged by the key +/// `[Column(2), Column(0) + Column(3), Column(1)]`. +/// Then `Column(1)` and `Column(2)` in the value are redundant with the key, and +/// only columns 0, 3, and 4 need to be stored separately. +/// The thinning expression will then be `[0, 3, 4]`. +/// +/// The permutation represents how to recover the +/// original values (logically `[Column(0), Column(1), Column(2), Column(3), Column(4)]`) +/// from the key and value of the arrangement, logically +/// `[Column(2), Column(0) + Column(3), Column(1), Column(0), Column(3), Column(4)]`. +/// Thus, the permutation in this case should be `{0: 3, 1: 2, 2: 0, 3: 4, 4: 5}`. +/// +/// Note that this description, while true at the time of writing, is merely illustrative; +/// users of this struct should not rely on the exact strategy used for generating +/// the permutations. As long as clients apply the thinning expression +/// when creating arrangements, and permute by the hashmap when reading them, +/// the contract of the function where they are generated (`expr::permutation_for_arrangement`) +/// ensures that the correct values will be read. +#[derive(Default, Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct AvailableCollections { + /// Whether the collection exists in unarranged form. + pub raw: bool, + /// The set of arrangements of the collection, along with a + /// column permutation mapping + pub arranged: Vec, +} + +pub type KeyWithColumnPermutation = (Vec, BTreeMap, Vec); + +impl AvailableCollections { + /// Represent a collection that has no arrangements. + pub fn new_raw() -> Self { + Self { + raw: true, + arranged: vec![], + } + } + + /// Represent a collection that is arranged in the + /// specified ways. + pub fn new_arranged(arranged: Vec) -> Self { + assert!( + !arranged.is_empty(), + "Invariant violated: at least one collection must exist" + ); + Self { + raw: false, + arranged, + } + } +} + +pub enum Plan { + Constant { rows: Result, EvalError> }, + Get { id: Id, keys: AvailableCollections }, +} diff --git a/src/flow/src/compute/typedefs.rs b/src/flow/src/compute/typedefs.rs new file mode 100644 index 0000000000..886e1de91a --- /dev/null +++ b/src/flow/src/compute/typedefs.rs @@ -0,0 +1,20 @@ +use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine}; + +use crate::repr::{Diff, Row, Timestamp}; +use crate::storage::errors::DataflowError; + +// TODO: consider use ColValSpine for columnation storage + +/// T: Time, R: Diff, O: Offset +pub type RowSpine = OrdValSpine; +/// T: Time, R: Diff, O: Offset +pub type RowKeySpine = OrdKeySpine; +/// T: Time, R: Diff, O: Offset +pub type ErrSpine = OrdKeySpine; +/// T: Time, R: Diff, O: Offset +pub type ErrValSpine = OrdValSpine; +pub type TraceRowHandle = TraceAgent>; +pub type TraceErrHandle = TraceAgent>; +pub type KeysValsHandle = TraceRowHandle; +pub type ErrsHandle = TraceErrHandle; diff --git a/src/flow/src/compute/types/mod.rs b/src/flow/src/compute/types/mod.rs new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/src/flow/src/compute/types/mod.rs @@ -0,0 +1 @@ + diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs new file mode 100644 index 0000000000..2324a5ee43 --- /dev/null +++ b/src/flow/src/expr/func.rs @@ -0,0 +1,16 @@ +use crate::repr::Row; + +/// Stateless functions +#[derive(Debug, Clone)] +pub enum Func { + BuiltIn(BuiltInFunc), + Custom(fn(Row) -> Row), +} + +#[derive(Debug, Clone)] +pub enum BuiltInFunc { + Not, + IsNull, + IsTrue, + IsFalse, +} diff --git a/src/flow/src/expr/lir.rs b/src/flow/src/expr/lir.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/flow/src/expr/mod.rs b/src/flow/src/expr/mod.rs index 11ad9a2ea6..fd051cc458 100644 --- a/src/flow/src/expr/mod.rs +++ b/src/flow/src/expr/mod.rs @@ -1,5 +1,23 @@ //! for declare dataflow description that is the last step before build dataflow +mod func; mod id; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::Value; pub use id::{GlobalId, Id, LocalId}; +use serde::{Deserialize, Serialize}; + +use crate::storage::errors::DataflowError; + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub enum ScalarExpr { + /// A column of the input row + Column(usize), + /// A literal value. + Literal(Result, ConcreteDataType), + CallFunc { + func: String, + exprs: Vec, + }, +} diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 705cdd0b7e..0c8a4c5b6b 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -1,133 +1,6 @@ mod adapter; mod compute; mod expr; +mod render; mod repr; mod storage; - -/// Record count difference type. -pub type Diff = i64; - -#[test] -fn demo_multitemporal() { - use differential_dataflow::input::InputSession; - use differential_dataflow::lattice::Lattice; - use differential_dataflow::operators::{Count, Join}; - use serde::{Deserialize, Serialize}; - use timely::progress::timestamp::Refines; - #[derive(Debug, Clone, Default, Eq, PartialEq, Hash, Deserialize, Serialize)] - /// (System, Event) - struct MT(usize, usize); - impl Ord for MT { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.0.cmp(&other.0).then(self.1.cmp(&other.1)) - } - } - impl PartialOrd for MT { - fn partial_cmp(&self, other: &Self) -> Option { - self.cmp(other).into() - } - } - impl timely::PartialOrder for MT { - fn less_equal(&self, other: &Self) -> bool { - self.0 <= other.0 && self.1 <= other.1 - } - } - impl timely::order::TotalOrder for MT {} - impl timely::progress::Timestamp for MT { - type Summary = MT; - fn minimum() -> Self { - Self(0, 0) - } - } - impl timely::progress::PathSummary for MT { - fn results_in(&self, src: &MT) -> Option { - self.0 - .results_in(&src.0) - .and_then(|x| self.1.results_in(&src.1).map(|y| MT(x, y))) - //.and_then(|x| self.1.results_in(&src.1).map(|y| MT(x, y))) - } - - fn followed_by(&self, other: &Self) -> Option { - self.0 - .followed_by(&other.0) - .and_then(|x| self.1.followed_by(&other.1).map(|y| MT(x, y))) - } - } - impl Refines<()> for MT { - fn to_inner(other: ()) -> Self { - Self(0, 0) - } - - fn to_outer(self) -> () { - todo!() - } - - fn summarize( - path: ::Summary, - ) -> <() as timely::progress::Timestamp>::Summary { - todo!() - } - } - impl Lattice for MT { - fn join(&self, other: &Self) -> Self { - Self(self.0.max(other.0), self.1.max(other.1)) - } - - fn meet(&self, other: &Self) -> Self { - Self(self.0.min(other.0), self.1.min(other.1)) - } - } - - // define a new timely dataflow computation. - timely::execute_from_args(["w2".to_string()].into_iter(), move |worker| { - // create an input collection of data. - let mut input = InputSession::new(); - - // define a new computation. - let probe = worker.dataflow(|scope| { - // create a new collection from our input. - let manages = input.to_collection(scope); - - // if (m2, m1) and (m1, p), then output (m1, (m2, p)) - manages - .map(|(m2, m1)| (m1, m2)) - .join(&manages) - .count() - //.inspect(|x| println!("{:?}", x)) - .probe() - }); - - // Read a size for our organization from the arguments. - let size = 10; - - // Load input (a binary tree). - input.advance_to(MT(0, 0)); - let mut person = worker.index(); - while person < size { - input.insert((person / 2, person)); - person += worker.peers(); - } - - // wait for data loading. - input.advance_to(MT(0, 0)); - input.flush(); - while probe.less_than(input.time()) { - worker.step(); - } - println!("{:?}\tdata loaded", worker.timer().elapsed()); - - let mut person = 1 + worker.index(); - while person < size { - input.remove((person / 2, person)); - input.insert((person / 3, person)); - input.advance_to(MT(0, person)); - input.flush(); - while probe.less_than(&input.time()) { - worker.step(); - } - println!("{:?}\tstep {} complete", worker.timer().elapsed(), person); - person += worker.peers(); - } - }) - .expect("Computation terminated abnormally"); -} diff --git a/src/flow/src/render/mod.rs b/src/flow/src/render/mod.rs new file mode 100644 index 0000000000..a9457011a0 --- /dev/null +++ b/src/flow/src/render/mod.rs @@ -0,0 +1,2 @@ +//! for building the flow graph from PLAN +//! this is basically the last step before actually running the flow graph diff --git a/src/flow/src/repr/mod.rs b/src/flow/src/repr/mod.rs index c4af69dbc3..435476e1e4 100644 --- a/src/flow/src/repr/mod.rs +++ b/src/flow/src/repr/mod.rs @@ -1,9 +1,35 @@ //! basically a wrapper around the `datatype` crate //! for basic Data Representation +use std::borrow::Borrow; + use datatypes::value::Value; +/// System-wide Record count difference type. +pub type Diff = i64; + /// A row is a vector of values. /// -/// TODO: use a more efficient representation -/// i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\] -pub type Row = Vec; +#[derive(Clone)] +pub struct Row { + /// TODO: use a more efficient representation + /// + /// i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\] + inner: Vec, +} + +impl Row { + pub fn pack(iter: I) -> Row + where + I: IntoIterator, + { + Self { + inner: iter.into_iter().collect(), + } + } + pub fn unpack(&self) -> Vec { + self.inner.clone() + } +} + +/// System-wide default timestamp type +pub type Timestamp = u64; diff --git a/src/flow/src/storage/errors.rs b/src/flow/src/storage/errors.rs new file mode 100644 index 0000000000..e395f2eae4 --- /dev/null +++ b/src/flow/src/storage/errors.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)] +pub enum DataflowError { + EvalError(EvalError), +} + +#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)] +pub enum EvalError { + DivisionByZero, +} diff --git a/src/flow/src/storage/mod.rs b/src/flow/src/storage/mod.rs index 8b13789179..e7779cba38 100644 --- a/src/flow/src/storage/mod.rs +++ b/src/flow/src/storage/mod.rs @@ -1 +1 @@ - +pub(crate) mod errors;