feat: render plan partially writen

This commit is contained in:
Discord9
2023-07-25 15:35:36 +08:00
parent 824d03a642
commit 2798d266f5
17 changed files with 327 additions and 37 deletions

24
Cargo.lock generated
View File

@@ -3281,6 +3281,18 @@ dependencies = [
"spin 0.9.8",
]
[[package]]
name = "flow"
version = "0.1.0"
dependencies = [
"datafusion-expr",
"datafusion-substrait",
"datatypes",
"differential-dataflow",
"serde",
"timely",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -9585,18 +9597,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
[[package]]
name = "stream"
version = "0.1.0"
dependencies = [
"datafusion-expr",
"datafusion-substrait",
"datatypes",
"differential-dataflow",
"serde",
"timely",
]
[[package]]
name = "streaming-stats"
version = "0.2.3"

View File

@@ -1,5 +1,5 @@
[package]
name = "stream"
name = "flow"
version = "0.1.0"
edition = "2021"
@@ -12,7 +12,7 @@ edition = "2021"
# differential-dataflow = "0.12.0"
timely = "0.12.0"
differential-dataflow = "0.12.0"
# TODO: fork later for fixed version git dependency
# TODO(discord9): fork later for fixed version git dependency
# timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
# differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow", rev ="99fa67db"}
datafusion-expr.workspace = true

View File

@@ -9,10 +9,11 @@ use timely::dataflow::{Scope, ScopeParent};
use timely::progress::timestamp::Refines;
use timely::progress::{Antichain, Timestamp};
use crate::compute::render::RenderTimestamp;
use crate::compute::typedefs::{TraceErrHandle, TraceRowHandle};
use crate::expr::{GlobalId, Id, ScalarExpr};
use crate::expr::{GlobalId, Id, MapFilterProject, ScalarExpr};
use crate::repr;
use crate::repr::Diff;
use crate::repr::{Diff, Row};
use crate::storage::errors::DataflowError;
// Local type definition to avoid the horror in signatures.
@@ -83,14 +84,63 @@ impl<S: Scope, V: Data> Context<S, V>
where
S::Timestamp: Lattice + Refines<repr::Timestamp>,
{
/// TODO" DataflowDesc & Plan & etc.
/// TODO(discord9)" DataflowDesc & Plan & etc.
pub fn for_dataflow_in(scope: S) -> Self {
let dataflow_id = scope.addr()[0];
// TODO(discord9): get since_frontier and until_frontier from dataflow_desc
// TODO(discord9)=: get since_frontier and until_frontier from dataflow_desc
todo!()
}
}
impl<S: Scope, V: Data, T: Lattice> Context<S, V, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// Insert a collection bundle by an identifier.
///
/// This is expected to be used to install external collections (sources, indexes, other views),
/// as well as for `Let` bindings of local collections.
pub fn insert_id(
&mut self,
id: Id,
collection: CollectionBundle<S, V, T>,
) -> Option<CollectionBundle<S, V, T>> {
self.bindings.insert(id, collection)
}
/// Remove a collection bundle by an identifier.
///
/// The primary use of this method is uninstalling `Let` bindings.
pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, V, T>> {
self.bindings.remove(&id)
}
/// Melds a collection bundle to whatever exists.
#[allow(clippy::map_entry)]
pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, V, T>) {
if !self.bindings.contains_key(&id) {
self.bindings.insert(id, collection);
} else {
let binding = self
.bindings
.get_mut(&id)
.expect("Binding verified to exist");
if collection.collection.is_some() {
binding.collection = collection.collection;
}
for (key, flavor) in collection.arranged.into_iter() {
binding.arranged.insert(key, flavor);
}
}
}
/// Look up a collection bundle by an identifier.
pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, V, T>> {
self.bindings.get(&id).cloned()
}
}
type ResultCollection<S, V> = (Collection<S, V, Diff>, Collection<S, DataflowError, Diff>);
#[derive(Clone)]
pub struct CollectionBundle<S, V, T = repr::Timestamp>
where
@@ -99,10 +149,52 @@ where
S::Timestamp: Lattice + Refines<T>,
V: Data,
{
pub(crate) collection: Collection<S, V, Diff>,
/// TODO: impl: 1. ScalarExpr(Could be from substrait), 2. Arrangement
pub(crate) collection: Option<ResultCollection<S, V>>,
/// TODO(discord9): impl: 1. ScalarExpr(Could be from substrait), 2. Arrangement
pub(crate) arranged: BTreeMap<Vec<ScalarExpr>, ArrangementFlavor<S, V, T>>,
}
impl<S: Scope, V: Data, T: Lattice> CollectionBundle<S, V, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// Construct a new collection bundle from update streams.
pub fn from_collections(
oks: Collection<S, V, Diff>,
errs: Collection<S, DataflowError, Diff>,
) -> Self {
Self {
collection: Some((oks, errs)),
arranged: BTreeMap::default(),
}
}
}
impl<S, T> CollectionBundle<S, repr::Row, T>
where
T: timely::progress::Timestamp + Lattice,
S: Scope,
S::Timestamp: Refines<T> + Lattice + RenderTimestamp,
{
/// Presents `self` as a stream of updates, having been subjected to `mfp`.
///
/// This operator is able to apply the logic of `mfp` early, which can substantially
/// reduce the amount of data produced when `mfp` is non-trivial.
///
/// The `key_val` argument, when present, indicates that a specific arrangement should
/// be used, and if, in addition, the `val` component is present,
/// that we can seek to the supplied row.
pub fn as_collection_core(
&self,
mut mfp: MapFilterProject,
key_val: Option<(Vec<ScalarExpr>, Option<Row>)>,
until: Antichain<repr::Timestamp>,
) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
mfp.optimize();
todo!()
}
}
#[derive(Clone)]
pub enum ArrangementFlavored {}

View File

@@ -1,5 +1,8 @@
//! for generate dataflow from logical plan and computing the dataflow
mod context;
mod plan;
mod render;
mod typedefs;
mod types;
pub use context::Context;

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
/// TODO: impl Join
/// TODO(discord9): impl Join
/// A plan for the execution of a linear join.
///
/// A linear join is a sequence of stages, each of which introduces

View File

@@ -4,7 +4,7 @@ mod linear_join;
pub use delta_join::DeltaJoinPlan;
pub use linear_join::LinearJoinPlan;
/// TODO(discord9): impl Join
/// TODO(discord9)(discord9): impl Join
/// A complete enumeration of possible join plans to render.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum JoinPlan {

View File

@@ -84,7 +84,7 @@ impl AvailableCollections {
/// Rendering Plan
///
/// TODO: see if we ever need to support recursive plans
/// TODO(discord9): see if we ever need to support recursive plans
pub enum Plan<T = repr::Timestamp> {
/// A collection containing a pre-determined collection.
Constant {
@@ -187,5 +187,12 @@ pub enum Plan<T = repr::Timestamp> {
},
}
/// TODO: impl GetPlan
pub enum GetPlan {}
/// TODO(discord9): impl GetPlan
pub enum GetPlan {
/// Simply pass input arrangements on to the next stage.
PassArrangements,
/// Using the supplied key, optionally seek the row, and apply the MFP.
Arrangement(Vec<ScalarExpr>, Option<Row>, MapFilterProject),
/// Scan the input collection (unarranged) and apply the MFP.
Collection(MapFilterProject),
}

View File

@@ -11,7 +11,7 @@ pub struct KeyValPlan {
pub val_plan: MapFilterProject,
}
/// TODO(discord9): Reduce Plan
/// TODO(discord9)(discord9): Reduce Plan
/// A `ReducePlan` provides a concise description for how we will
/// execute a given reduce expression.
///
@@ -64,7 +64,7 @@ pub struct AccumulablePlan {
pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
}
// TODO: others
// TODO(discord9): others
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum HierarchicalPlan {}

View File

@@ -0,0 +1,146 @@
//! for building the flow graph from PLAN
//! this is basically the last step before actually running the flow graph
use differential_dataflow::lattice::Lattice;
use differential_dataflow::AsCollection;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::operators::{Capture, ToStream};
use timely::dataflow::Scope;
use timely::progress::timestamp::Refines;
use timely::progress::Timestamp;
use crate::compute::context::CollectionBundle;
use crate::compute::plan::Plan;
use crate::compute::Context;
use crate::expr::Id;
use crate::repr::{self, Row};
use crate::storage::errors::DataflowError;
pub trait RenderTimestamp: Timestamp + Lattice + Refines<repr::Timestamp> {}
impl<S> Context<S, Row>
where
S: Scope,
S::Timestamp: RenderTimestamp,
{
/// Renders a plan to a differential dataflow, producing the collection of results.
///
/// The return type reflects the uncertainty about the data representation, perhaps
/// as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
pub fn render_plan(&mut self, plan: Plan) -> CollectionBundle<S, Row> {
match plan {
Plan::Constant { rows } => {
let (rows, errs) = match rows {
Ok(rows) => (rows, Vec::new()),
Err(err) => (Vec::new(), vec![err]),
};
let since_frontier = self.since_frontier.clone();
let until = self.until_frontier.clone();
let ok_collection = rows
.into_iter()
.filter_map(move |(row, mut time, diff)| {
time.advance_by(since_frontier.borrow());
if !until.less_equal(&time) {
Some((
row,
<S::Timestamp as Refines<repr::Timestamp>>::to_inner(time),
diff,
))
} else {
None
}
})
.to_stream(&mut self.scope)
.as_collection();
let mut error_time: repr::Timestamp = Timestamp::minimum();
error_time.advance_by(self.since_frontier.borrow());
let err_collection = errs
.into_iter()
.map(move |e| {
(
DataflowError::from(e),
<S::Timestamp as Refines<repr::Timestamp>>::to_inner(error_time),
1,
)
})
.to_stream(&mut self.scope)
.as_collection();
CollectionBundle::from_collections(ok_collection, err_collection)
}
Plan::Get { id, keys, plan } => {
// Recover the collection from `self` and then apply `mfp` to it.
// If `mfp` happens to be trivial, we can just return the collection.
let mut collection = self
.lookup_id(id)
.unwrap_or_else(|| panic!("Get({:?}) not found at render time", id));
match plan {
crate::compute::plan::GetPlan::PassArrangements => {
// Assert that each of `keys` are present in `collection`.
if !keys
.arranged
.iter()
.all(|(key, _, _)| collection.arranged.contains_key(key))
{
let not_included: Vec<_> = keys
.arranged
.iter()
.filter(|(key, _, _)| !collection.arranged.contains_key(key))
.map(|(key, _, _)| key)
.collect();
panic!(
"Those keys {:?} is not included in collections keys:{:?}",
not_included,
collection.arranged.keys().cloned().collect::<Vec<_>>()
);
}
assert!(keys.raw <= collection.collection.is_some());
// Retain only those keys we want to import.
collection.arranged.retain(|key, _val| {
keys.arranged.iter().any(|(key2, _, _)| key2 == key)
});
collection
}
crate::compute::plan::GetPlan::Arrangement(key, row, mfp) => {
let (oks, errs) = collection.as_collection_core(
mfp,
Some((key, row)),
self.until_frontier.clone(),
);
CollectionBundle::from_collections(oks, errs)
}
crate::compute::plan::GetPlan::Collection(mfp) => {
let (oks, errs) =
collection.as_collection_core(mfp, None, self.until_frontier.clone());
CollectionBundle::from_collections(oks, errs)
}
}
}
Plan::Let { id, value, body } => {
// Render `value` and bind it to `id`. Complain if this shadows an id.
let value = self.render_plan(*value);
let prebound = self.insert_id(Id::Local(id), value);
assert!(prebound.is_none());
let body = self.render_plan(*body);
self.remove_id(Id::Local(id));
body
}
Plan::Mfp {
input,
mfp,
input_key_val,
} => {
let input = self.render_plan(*input);
// If `mfp` is non-trivial, we should apply it and produce a collection.
if mfp.is_identity() {
input
} else {
let (oks, errs) =
input.as_collection_core(mfp, input_key_val, self.until_frontier.clone());
CollectionBundle::from_collections(oks, errs)
}
}
_ => todo!(),
}
}
}

View File

@@ -4,7 +4,7 @@ use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpin
use crate::repr::{Diff, Row, Timestamp};
use crate::storage::errors::DataflowError;
// TODO: consider use ColValSpine for columnation storage
// TODO(discord9): consider use ColValSpine for columnation storage
/// T: Time, R: Diff, O: Offset
pub type RowSpine<K, V, T, R, O = usize> = OrdValSpine<K, V, T, R, O>;

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
// TODO: more function & eval
// TODO(discord9): more function & eval
use crate::repr::Row;
/// Stateless functions
#[derive(Debug, Clone)]

View File

@@ -42,3 +42,42 @@ pub struct MapFilterProject {
/// columns in the output.
pub input_arity: usize,
}
impl MapFilterProject {
pub fn optimize(&mut self) {
// TODO(discord9): optimize later
}
/// True if the operator describes the identity transformation.
pub fn is_identity(&self) -> bool {
self.expressions.is_empty()
&& self.predicates.is_empty()
&& self.projection.len() == self.input_arity
&& self.projection.iter().enumerate().all(|(i, p)| i == *p)
}
}
/// A wrapper type which indicates it is safe to simply evaluate all expressions.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct SafeMfpPlan {
pub(crate) mfp: MapFilterProject,
}
/// Predicates partitioned into temporal and non-temporal.
///
/// Temporal predicates require some recognition to determine their
/// structure, and it is best to do that once and re-use the results.
///
/// There are restrictions on the temporal predicates we currently support.
/// They must directly constrain `MzNow` from below or above,
/// by expressions that do not themselves contain `MzNow`.
/// Conjunctions of such constraints are also ok.
#[derive(Clone, Debug, PartialEq)]
pub struct MfpPlan {
/// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
pub(crate) mfp: SafeMfpPlan,
/// Expressions that when evaluated lower-bound `MzNow`.
pub(crate) lower_bounds: Vec<ScalarExpr>,
/// Expressions that when evaluated upper-bound `MzNow`.
pub(crate) upper_bounds: Vec<ScalarExpr>,
}

View File

@@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use crate::storage::errors::DataflowError;
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum ScalarExpr {
/// A column of the input row
Column(usize),

View File

@@ -1,6 +1,5 @@
mod adapter;
mod compute;
mod expr;
mod render;
mod repr;
mod storage;

View File

@@ -1,2 +0,0 @@
//! for building the flow graph from PLAN
//! this is basically the last step before actually running the flow graph

View File

@@ -9,11 +9,11 @@ pub type Diff = i64;
/// A row is a vector of values.
///
#[derive(Clone)]
/// TODO(discord9): use a more efficient representation
///i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Row {
/// TODO: use a more efficient representation
///
/// i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\]
inner: Vec<Value>,
}

View File

@@ -5,6 +5,12 @@ pub enum DataflowError {
EvalError(EvalError),
}
impl From<EvalError> for DataflowError {
fn from(e: EvalError) -> Self {
DataflowError::EvalError(e)
}
}
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum EvalError {
DivisionByZero,