From 2798d266f5aa7d7b0dc86b91ad464a7ea06e6e14 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 25 Jul 2023 15:35:36 +0800 Subject: [PATCH] feat: render plan partially writen --- Cargo.lock | 24 +-- src/flow/Cargo.toml | 4 +- src/flow/src/compute/context.rs | 104 ++++++++++++- src/flow/src/compute/mod.rs | 3 + src/flow/src/compute/plan/join/linear_join.rs | 2 +- src/flow/src/compute/plan/join/mod.rs | 2 +- src/flow/src/compute/plan/mod.rs | 13 +- src/flow/src/compute/plan/reduce.rs | 4 +- src/flow/src/compute/render/mod.rs | 146 ++++++++++++++++++ src/flow/src/compute/typedefs.rs | 2 +- src/flow/src/expr/func.rs | 2 +- src/flow/src/expr/linear.rs | 39 +++++ src/flow/src/expr/mod.rs | 2 +- src/flow/src/lib.rs | 1 - src/flow/src/render/mod.rs | 2 - src/flow/src/repr/mod.rs | 8 +- src/flow/src/storage/errors.rs | 6 + 17 files changed, 327 insertions(+), 37 deletions(-) create mode 100644 src/flow/src/compute/render/mod.rs delete mode 100644 src/flow/src/render/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 79e59faa9c..0fe83f0260 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3281,6 +3281,18 @@ dependencies = [ "spin 0.9.8", ] +[[package]] +name = "flow" +version = "0.1.0" +dependencies = [ + "datafusion-expr", + "datafusion-substrait", + "datatypes", + "differential-dataflow", + "serde", + "timely", +] + [[package]] name = "fnv" version = "1.0.7" @@ -9585,18 +9597,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" -[[package]] -name = "stream" -version = "0.1.0" -dependencies = [ - "datafusion-expr", - "datafusion-substrait", - "datatypes", - "differential-dataflow", - "serde", - "timely", -] - [[package]] name = "streaming-stats" version = "0.2.3" diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index c4cf4de9f3..76d0bc152f 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "stream" +name = "flow" version = "0.1.0" edition = "2021" @@ -12,7 +12,7 @@ edition = "2021" # differential-dataflow = "0.12.0" timely = "0.12.0" differential-dataflow = "0.12.0" -# TODO: fork later for fixed version git dependency +# TODO(discord9): fork later for fixed version git dependency # timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } # differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow", rev ="99fa67db"} datafusion-expr.workspace = true diff --git a/src/flow/src/compute/context.rs b/src/flow/src/compute/context.rs index 0fe4f02f78..e75e0ff63e 100644 --- a/src/flow/src/compute/context.rs +++ b/src/flow/src/compute/context.rs @@ -9,10 +9,11 @@ use timely::dataflow::{Scope, ScopeParent}; use timely::progress::timestamp::Refines; use timely::progress::{Antichain, Timestamp}; +use crate::compute::render::RenderTimestamp; use crate::compute::typedefs::{TraceErrHandle, TraceRowHandle}; -use crate::expr::{GlobalId, Id, ScalarExpr}; +use crate::expr::{GlobalId, Id, MapFilterProject, ScalarExpr}; use crate::repr; -use crate::repr::Diff; +use crate::repr::{Diff, Row}; use crate::storage::errors::DataflowError; // Local type definition to avoid the horror in signatures. @@ -83,14 +84,63 @@ impl Context where S::Timestamp: Lattice + Refines, { - /// TODO" DataflowDesc & Plan & etc. + /// TODO(discord9)" DataflowDesc & Plan & etc. pub fn for_dataflow_in(scope: S) -> Self { let dataflow_id = scope.addr()[0]; - // TODO(discord9): get since_frontier and until_frontier from dataflow_desc + // TODO(discord9)=: get since_frontier and until_frontier from dataflow_desc todo!() } } +impl Context +where + T: Timestamp + Lattice, + S::Timestamp: Lattice + Refines, +{ + /// Insert a collection bundle by an identifier. + /// + /// This is expected to be used to install external collections (sources, indexes, other views), + /// as well as for `Let` bindings of local collections. + pub fn insert_id( + &mut self, + id: Id, + collection: CollectionBundle, + ) -> Option> { + self.bindings.insert(id, collection) + } + + /// Remove a collection bundle by an identifier. + /// + /// The primary use of this method is uninstalling `Let` bindings. + pub fn remove_id(&mut self, id: Id) -> Option> { + self.bindings.remove(&id) + } + /// Melds a collection bundle to whatever exists. + #[allow(clippy::map_entry)] + pub fn update_id(&mut self, id: Id, collection: CollectionBundle) { + if !self.bindings.contains_key(&id) { + self.bindings.insert(id, collection); + } else { + let binding = self + .bindings + .get_mut(&id) + .expect("Binding verified to exist"); + if collection.collection.is_some() { + binding.collection = collection.collection; + } + for (key, flavor) in collection.arranged.into_iter() { + binding.arranged.insert(key, flavor); + } + } + } + /// Look up a collection bundle by an identifier. + pub fn lookup_id(&self, id: Id) -> Option> { + self.bindings.get(&id).cloned() + } +} + +type ResultCollection = (Collection, Collection); + #[derive(Clone)] pub struct CollectionBundle where @@ -99,10 +149,52 @@ where S::Timestamp: Lattice + Refines, V: Data, { - pub(crate) collection: Collection, - /// TODO: impl: 1. ScalarExpr(Could be from substrait), 2. Arrangement + pub(crate) collection: Option>, + /// TODO(discord9): impl: 1. ScalarExpr(Could be from substrait), 2. Arrangement pub(crate) arranged: BTreeMap, ArrangementFlavor>, } +impl CollectionBundle +where + T: Timestamp + Lattice, + S::Timestamp: Lattice + Refines, +{ + /// Construct a new collection bundle from update streams. + pub fn from_collections( + oks: Collection, + errs: Collection, + ) -> Self { + Self { + collection: Some((oks, errs)), + arranged: BTreeMap::default(), + } + } +} + +impl CollectionBundle +where + T: timely::progress::Timestamp + Lattice, + S: Scope, + S::Timestamp: Refines + Lattice + RenderTimestamp, +{ + /// Presents `self` as a stream of updates, having been subjected to `mfp`. + /// + /// This operator is able to apply the logic of `mfp` early, which can substantially + /// reduce the amount of data produced when `mfp` is non-trivial. + /// + /// The `key_val` argument, when present, indicates that a specific arrangement should + /// be used, and if, in addition, the `val` component is present, + /// that we can seek to the supplied row. + pub fn as_collection_core( + &self, + mut mfp: MapFilterProject, + key_val: Option<(Vec, Option)>, + until: Antichain, + ) -> (Collection, Collection) { + mfp.optimize(); + todo!() + } +} + #[derive(Clone)] pub enum ArrangementFlavored {} diff --git a/src/flow/src/compute/mod.rs b/src/flow/src/compute/mod.rs index 4741ae6d79..90b3c79b1e 100644 --- a/src/flow/src/compute/mod.rs +++ b/src/flow/src/compute/mod.rs @@ -1,5 +1,8 @@ //! for generate dataflow from logical plan and computing the dataflow mod context; mod plan; +mod render; mod typedefs; mod types; + +pub use context::Context; diff --git a/src/flow/src/compute/plan/join/linear_join.rs b/src/flow/src/compute/plan/join/linear_join.rs index 6af015e0ed..d117eed5e8 100644 --- a/src/flow/src/compute/plan/join/linear_join.rs +++ b/src/flow/src/compute/plan/join/linear_join.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -/// TODO: impl Join +/// TODO(discord9): impl Join /// A plan for the execution of a linear join. /// /// A linear join is a sequence of stages, each of which introduces diff --git a/src/flow/src/compute/plan/join/mod.rs b/src/flow/src/compute/plan/join/mod.rs index dc064bf584..64260f5e81 100644 --- a/src/flow/src/compute/plan/join/mod.rs +++ b/src/flow/src/compute/plan/join/mod.rs @@ -4,7 +4,7 @@ mod linear_join; pub use delta_join::DeltaJoinPlan; pub use linear_join::LinearJoinPlan; -/// TODO(discord9): impl Join +/// TODO(discord9)(discord9): impl Join /// A complete enumeration of possible join plans to render. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum JoinPlan { diff --git a/src/flow/src/compute/plan/mod.rs b/src/flow/src/compute/plan/mod.rs index d0bdd1b192..755afc7915 100644 --- a/src/flow/src/compute/plan/mod.rs +++ b/src/flow/src/compute/plan/mod.rs @@ -84,7 +84,7 @@ impl AvailableCollections { /// Rendering Plan /// -/// TODO: see if we ever need to support recursive plans +/// TODO(discord9): see if we ever need to support recursive plans pub enum Plan { /// A collection containing a pre-determined collection. Constant { @@ -187,5 +187,12 @@ pub enum Plan { }, } -/// TODO: impl GetPlan -pub enum GetPlan {} +/// TODO(discord9): impl GetPlan +pub enum GetPlan { + /// Simply pass input arrangements on to the next stage. + PassArrangements, + /// Using the supplied key, optionally seek the row, and apply the MFP. + Arrangement(Vec, Option, MapFilterProject), + /// Scan the input collection (unarranged) and apply the MFP. + Collection(MapFilterProject), +} diff --git a/src/flow/src/compute/plan/reduce.rs b/src/flow/src/compute/plan/reduce.rs index 9600c8e905..c261936229 100644 --- a/src/flow/src/compute/plan/reduce.rs +++ b/src/flow/src/compute/plan/reduce.rs @@ -11,7 +11,7 @@ pub struct KeyValPlan { pub val_plan: MapFilterProject, } -/// TODO(discord9): Reduce Plan +/// TODO(discord9)(discord9): Reduce Plan /// A `ReducePlan` provides a concise description for how we will /// execute a given reduce expression. /// @@ -64,7 +64,7 @@ pub struct AccumulablePlan { pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>, } -// TODO: others +// TODO(discord9): others #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum HierarchicalPlan {} diff --git a/src/flow/src/compute/render/mod.rs b/src/flow/src/compute/render/mod.rs new file mode 100644 index 0000000000..7021234fe2 --- /dev/null +++ b/src/flow/src/compute/render/mod.rs @@ -0,0 +1,146 @@ +//! for building the flow graph from PLAN +//! this is basically the last step before actually running the flow graph + +use differential_dataflow::lattice::Lattice; +use differential_dataflow::AsCollection; +use timely::dataflow::operators::capture::Extract; +use timely::dataflow::operators::{Capture, ToStream}; +use timely::dataflow::Scope; +use timely::progress::timestamp::Refines; +use timely::progress::Timestamp; + +use crate::compute::context::CollectionBundle; +use crate::compute::plan::Plan; +use crate::compute::Context; +use crate::expr::Id; +use crate::repr::{self, Row}; +use crate::storage::errors::DataflowError; + +pub trait RenderTimestamp: Timestamp + Lattice + Refines {} + +impl Context +where + S: Scope, + S::Timestamp: RenderTimestamp, +{ + /// Renders a plan to a differential dataflow, producing the collection of results. + /// + /// The return type reflects the uncertainty about the data representation, perhaps + /// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches. + pub fn render_plan(&mut self, plan: Plan) -> CollectionBundle { + match plan { + Plan::Constant { rows } => { + let (rows, errs) = match rows { + Ok(rows) => (rows, Vec::new()), + Err(err) => (Vec::new(), vec![err]), + }; + let since_frontier = self.since_frontier.clone(); + let until = self.until_frontier.clone(); + let ok_collection = rows + .into_iter() + .filter_map(move |(row, mut time, diff)| { + time.advance_by(since_frontier.borrow()); + if !until.less_equal(&time) { + Some(( + row, + >::to_inner(time), + diff, + )) + } else { + None + } + }) + .to_stream(&mut self.scope) + .as_collection(); + let mut error_time: repr::Timestamp = Timestamp::minimum(); + error_time.advance_by(self.since_frontier.borrow()); + let err_collection = errs + .into_iter() + .map(move |e| { + ( + DataflowError::from(e), + >::to_inner(error_time), + 1, + ) + }) + .to_stream(&mut self.scope) + .as_collection(); + CollectionBundle::from_collections(ok_collection, err_collection) + } + Plan::Get { id, keys, plan } => { + // Recover the collection from `self` and then apply `mfp` to it. + // If `mfp` happens to be trivial, we can just return the collection. + let mut collection = self + .lookup_id(id) + .unwrap_or_else(|| panic!("Get({:?}) not found at render time", id)); + match plan { + crate::compute::plan::GetPlan::PassArrangements => { + // Assert that each of `keys` are present in `collection`. + if !keys + .arranged + .iter() + .all(|(key, _, _)| collection.arranged.contains_key(key)) + { + let not_included: Vec<_> = keys + .arranged + .iter() + .filter(|(key, _, _)| !collection.arranged.contains_key(key)) + .map(|(key, _, _)| key) + .collect(); + panic!( + "Those keys {:?} is not included in collections keys:{:?}", + not_included, + collection.arranged.keys().cloned().collect::>() + ); + } + assert!(keys.raw <= collection.collection.is_some()); + // Retain only those keys we want to import. + collection.arranged.retain(|key, _val| { + keys.arranged.iter().any(|(key2, _, _)| key2 == key) + }); + collection + } + crate::compute::plan::GetPlan::Arrangement(key, row, mfp) => { + let (oks, errs) = collection.as_collection_core( + mfp, + Some((key, row)), + self.until_frontier.clone(), + ); + CollectionBundle::from_collections(oks, errs) + } + crate::compute::plan::GetPlan::Collection(mfp) => { + let (oks, errs) = + collection.as_collection_core(mfp, None, self.until_frontier.clone()); + CollectionBundle::from_collections(oks, errs) + } + } + } + Plan::Let { id, value, body } => { + // Render `value` and bind it to `id`. Complain if this shadows an id. + let value = self.render_plan(*value); + let prebound = self.insert_id(Id::Local(id), value); + assert!(prebound.is_none()); + + let body = self.render_plan(*body); + self.remove_id(Id::Local(id)); + body + } + Plan::Mfp { + input, + mfp, + input_key_val, + } => { + let input = self.render_plan(*input); + // If `mfp` is non-trivial, we should apply it and produce a collection. + if mfp.is_identity() { + input + } else { + let (oks, errs) = + input.as_collection_core(mfp, input_key_val, self.until_frontier.clone()); + CollectionBundle::from_collections(oks, errs) + } + } + _ => todo!(), + } + } +} diff --git a/src/flow/src/compute/typedefs.rs b/src/flow/src/compute/typedefs.rs index 886e1de91a..046962f4e7 100644 --- a/src/flow/src/compute/typedefs.rs +++ b/src/flow/src/compute/typedefs.rs @@ -4,7 +4,7 @@ use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpin use crate::repr::{Diff, Row, Timestamp}; use crate::storage::errors::DataflowError; -// TODO: consider use ColValSpine for columnation storage +// TODO(discord9): consider use ColValSpine for columnation storage /// T: Time, R: Diff, O: Offset pub type RowSpine = OrdValSpine; diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index c985741e9b..8c482fdd80 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -// TODO: more function & eval +// TODO(discord9): more function & eval use crate::repr::Row; /// Stateless functions #[derive(Debug, Clone)] diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 1486ec7fc2..9633416214 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -42,3 +42,42 @@ pub struct MapFilterProject { /// columns in the output. pub input_arity: usize, } + +impl MapFilterProject { + pub fn optimize(&mut self) { + // TODO(discord9): optimize later + } + + /// True if the operator describes the identity transformation. + pub fn is_identity(&self) -> bool { + self.expressions.is_empty() + && self.predicates.is_empty() + && self.projection.len() == self.input_arity + && self.projection.iter().enumerate().all(|(i, p)| i == *p) + } +} + +/// A wrapper type which indicates it is safe to simply evaluate all expressions. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct SafeMfpPlan { + pub(crate) mfp: MapFilterProject, +} + +/// Predicates partitioned into temporal and non-temporal. +/// +/// Temporal predicates require some recognition to determine their +/// structure, and it is best to do that once and re-use the results. +/// +/// There are restrictions on the temporal predicates we currently support. +/// They must directly constrain `MzNow` from below or above, +/// by expressions that do not themselves contain `MzNow`. +/// Conjunctions of such constraints are also ok. +#[derive(Clone, Debug, PartialEq)] +pub struct MfpPlan { + /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`. + pub(crate) mfp: SafeMfpPlan, + /// Expressions that when evaluated lower-bound `MzNow`. + pub(crate) lower_bounds: Vec, + /// Expressions that when evaluated upper-bound `MzNow`. + pub(crate) upper_bounds: Vec, +} diff --git a/src/flow/src/expr/mod.rs b/src/flow/src/expr/mod.rs index 85b21753b1..56f04b14f2 100644 --- a/src/flow/src/expr/mod.rs +++ b/src/flow/src/expr/mod.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use crate::storage::errors::DataflowError; -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord)] pub enum ScalarExpr { /// A column of the input row Column(usize), diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 0c8a4c5b6b..6c7c436396 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -1,6 +1,5 @@ mod adapter; mod compute; mod expr; -mod render; mod repr; mod storage; diff --git a/src/flow/src/render/mod.rs b/src/flow/src/render/mod.rs deleted file mode 100644 index a9457011a0..0000000000 --- a/src/flow/src/render/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -//! for building the flow graph from PLAN -//! this is basically the last step before actually running the flow graph diff --git a/src/flow/src/repr/mod.rs b/src/flow/src/repr/mod.rs index 435476e1e4..8a63626425 100644 --- a/src/flow/src/repr/mod.rs +++ b/src/flow/src/repr/mod.rs @@ -9,11 +9,11 @@ pub type Diff = i64; /// A row is a vector of values. /// -#[derive(Clone)] +/// TODO(discord9): use a more efficient representation +///i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\] + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct Row { - /// TODO: use a more efficient representation - /// - /// i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\] inner: Vec, } diff --git a/src/flow/src/storage/errors.rs b/src/flow/src/storage/errors.rs index e395f2eae4..71dee275d9 100644 --- a/src/flow/src/storage/errors.rs +++ b/src/flow/src/storage/errors.rs @@ -5,6 +5,12 @@ pub enum DataflowError { EvalError(EvalError), } +impl From for DataflowError { + fn from(e: EvalError) -> Self { + DataflowError::EvalError(e) + } +} + #[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)] pub enum EvalError { DivisionByZero,