Arrangement&types

This commit is contained in:
Discord9
2023-07-20 15:18:39 +08:00
parent d702b6e5c4
commit 47f41371d0
16 changed files with 277 additions and 163 deletions

9
Cargo.lock generated
View File

@@ -2852,7 +2852,8 @@ checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
[[package]]
name = "differential-dataflow"
version = "0.12.0"
source = "git+https://github.com/TimelyDataflow/differential-dataflow.git#99fa67db2b92d2ee938c6ffef0912908de3ef288"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cecb0345111032cfd995a1e9c1b79387a0e6bf6690be5d8dd12a58f4861bc6d9"
dependencies = [
"abomonation",
"abomonation_derive",
@@ -9588,9 +9589,11 @@ checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
name = "stream"
version = "0.1.0"
dependencies = [
"abomonation",
"abomonation_derive",
"datafusion-expr",
"datafusion-substrait",
"datatypes",
"differential-dataflow",
"serde",
"timely",
]

View File

@@ -6,11 +6,16 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
timely = {version = "0.12", features=["bincode"]}
differential-dataflow = "0.12"
abomonation = "0.7.3"
abomonation_derive = "0.5"
# use version from crates.io for now to prevent version slewing
# disable default-features which include `abomonaion` which we don't need for IPC
# timely = {version = "0.12.0", default-features = false, features = ["bincode"]}
# differential-dataflow = "0.12.0"
timely = "0.12.0"
differential-dataflow = "0.12.0"
# TODO: fork later for fixed version git dependency
# timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
# differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow", rev ="99fa67db"}
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
serde = {version = "1.0", features = ["derive"]}
datatypes = { path = "../datatypes" }
serde = { version = "1.0", features = ["derive"] }
datatypes = { path = "../datatypes" }

View File

@@ -1 +1,3 @@
//! for getting data from source and sending results to sink
//! and communicating with other parts of the database
//! also commands storage and computation layer

View File

@@ -1,15 +1,59 @@
use std::marker::PhantomData;
use std::collections::BTreeMap;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::Collection;
use serde::{Deserialize, Serialize};
use timely::dataflow::Scope;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::wrappers::enter::TraceEnter;
use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
use differential_dataflow::{Collection, Data};
use timely::dataflow::{Scope, ScopeParent};
use timely::progress::timestamp::Refines;
use timely::progress::{Antichain, Timestamp};
use timely::Data;
use crate::Diff;
pub struct Context<S, T>
use crate::compute::typedefs::{TraceErrHandle, TraceRowHandle};
use crate::expr::{GlobalId, Id, ScalarExpr};
use crate::repr;
use crate::repr::Diff;
use crate::storage::errors::DataflowError;
// Local type definition to avoid the horror in signatures.
pub(crate) type KeyArrangement<S, K, V> =
Arranged<S, TraceRowHandle<K, V, <S as ScopeParent>::Timestamp, Diff>>;
pub(crate) type Arrangement<S, V> = KeyArrangement<S, V, V>;
pub(crate) type ErrArrangement<S> =
Arranged<S, TraceErrHandle<DataflowError, <S as ScopeParent>::Timestamp, Diff>>;
pub(crate) type ArrangementImport<S, V, T> = Arranged<
S,
TraceEnter<TraceFrontier<TraceRowHandle<V, V, T, Diff>>, <S as ScopeParent>::Timestamp>,
>;
pub(crate) type ErrArrangementImport<S, T> = Arranged<
S,
TraceEnter<
TraceFrontier<TraceErrHandle<DataflowError, T, Diff>>,
<S as ScopeParent>::Timestamp,
>,
>;
/// Describes flavor of arrangement: local or imported trace.
#[derive(Clone)]
pub enum ArrangementFlavor<S: Scope, V: Data, T = repr::Timestamp>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
/// A dataflow-local arrangement.
Local(Arrangement<S, V>, ErrArrangement<S>),
/// An imported trace from outside the dataflow.
///
/// The `GlobalId` identifier exists so that exports of this same trace
/// can refer back to and depend on the original instance.
Trace(
GlobalId,
ArrangementImport<S, V, T>,
ErrArrangementImport<S, T>,
),
}
pub struct Context<S, V: Data, T = repr::Timestamp>
where
T: Timestamp + Lattice,
S: Scope,
@@ -30,26 +74,35 @@ where
pub since_frontier: Antichain<T>,
/// Frontier after which updates should not be emitted.
/// Used to limit the amount of work done when appropriate.
pub upper_frontier: Antichain<T>,
pub until_frontier: Antichain<T>,
/// Bindings of identifiers to collections.
pub bindings: BTreeMap<Id, CollectionBundle<S, V, T>>,
}
impl<S: Scope, V: Data> Context<S, V>
where
S::Timestamp: Lattice + Refines<repr::Timestamp>,
{
/// TODO" DataflowDesc & Plan & etc.
pub fn for_dataflow_in(scope: S) -> Self {
let dataflow_id = scope.addr()[0];
// TODO(discord9): get since_frontier and until_frontier from dataflow_desc
todo!()
}
}
#[derive(Clone)]
pub struct CollectionBundle<S: Scope, V: Data, T>
pub struct CollectionBundle<S, V, T = repr::Timestamp>
where
T: Timestamp + Lattice,
S: Scope,
S::Timestamp: Lattice + Refines<T>,
V: Data,
{
pub(crate) collection: Collection<S, V, Diff>,
/// TODO: impl arranged in memory
pub(crate) arranged: PhantomData<T>,
/// TODO: impl: 1. ScalarExpr(Could be from substrait), 2. Arrangement
pub(crate) arranged: BTreeMap<Vec<ScalarExpr>, ArrangementFlavor<S, V, T>>,
}
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum DataflowError {
EvalError(EvalError),
}
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum EvalError {
DivisionByZero,
}
#[derive(Clone)]
pub enum ArrangementFlavored {}

View File

@@ -1,4 +1,5 @@
//! for generate dataflow from logical plan and computing the dataflow
mod context;
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
mod plan;
mod typedefs;
mod types;

View File

@@ -0,0 +1,83 @@
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use crate::expr::{Id, ScalarExpr};
use crate::repr::{self, Row};
use crate::storage::errors::EvalError;
/// The forms in which an operator's output is available;
/// it can be considered the plan-time equivalent of
/// `render::context::CollectionBundle`.
///
/// These forms are either "raw", representing an unarranged collection,
/// or "arranged", representing one that has been arranged by some key.
///
/// The raw collection, if it exists, may be consumed directly.
///
/// The arranged collections are slightly more complicated:
/// Each key here is attached to a description of how the corresponding
/// arrangement is permuted to remove value columns
/// that are redundant with key columns. Thus, the first element in each
/// tuple of `arranged` is the arrangement key; the second is the map of
/// logical output columns to columns in the key or value of the deduplicated
/// representation, and the third is a "thinning expression",
/// or list of columns to include in the value
/// when arranging.
///
/// For example, assume a 5-column collection is to be arranged by the key
/// `[Column(2), Column(0) + Column(3), Column(1)]`.
/// Then `Column(1)` and `Column(2)` in the value are redundant with the key, and
/// only columns 0, 3, and 4 need to be stored separately.
/// The thinning expression will then be `[0, 3, 4]`.
///
/// The permutation represents how to recover the
/// original values (logically `[Column(0), Column(1), Column(2), Column(3), Column(4)]`)
/// from the key and value of the arrangement, logically
/// `[Column(2), Column(0) + Column(3), Column(1), Column(0), Column(3), Column(4)]`.
/// Thus, the permutation in this case should be `{0: 3, 1: 2, 2: 0, 3: 4, 4: 5}`.
///
/// Note that this description, while true at the time of writing, is merely illustrative;
/// users of this struct should not rely on the exact strategy used for generating
/// the permutations. As long as clients apply the thinning expression
/// when creating arrangements, and permute by the hashmap when reading them,
/// the contract of the function where they are generated (`expr::permutation_for_arrangement`)
/// ensures that the correct values will be read.
#[derive(Default, Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct AvailableCollections {
/// Whether the collection exists in unarranged form.
pub raw: bool,
/// The set of arrangements of the collection, along with a
/// column permutation mapping
pub arranged: Vec<KeyWithColumnPermutation>,
}
pub type KeyWithColumnPermutation = (Vec<ScalarExpr>, BTreeMap<usize, usize>, Vec<usize>);
impl AvailableCollections {
/// Represent a collection that has no arrangements.
pub fn new_raw() -> Self {
Self {
raw: true,
arranged: vec![],
}
}
/// Represent a collection that is arranged in the
/// specified ways.
pub fn new_arranged(arranged: Vec<KeyWithColumnPermutation>) -> Self {
assert!(
!arranged.is_empty(),
"Invariant violated: at least one collection must exist"
);
Self {
raw: false,
arranged,
}
}
}
pub enum Plan {
Constant { rows: Result<Vec<Row>, EvalError> },
Get { id: Id, keys: AvailableCollections },
}

View File

@@ -0,0 +1,20 @@
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
use crate::repr::{Diff, Row, Timestamp};
use crate::storage::errors::DataflowError;
// TODO: consider use ColValSpine for columnation storage
/// T: Time, R: Diff, O: Offset
pub type RowSpine<K, V, T, R, O = usize> = OrdValSpine<K, V, T, R, O>;
/// T: Time, R: Diff, O: Offset
pub type RowKeySpine<K, T, R, O = usize> = OrdKeySpine<K, T, R, O>;
/// T: Time, R: Diff, O: Offset
pub type ErrSpine<K, T, R, O = usize> = OrdKeySpine<K, T, R, O>;
/// T: Time, R: Diff, O: Offset
pub type ErrValSpine<K, T, R, O = usize> = OrdValSpine<K, DataflowError, T, R, O>;
pub type TraceRowHandle<K, V, T, R> = TraceAgent<RowSpine<K, V, T, R>>;
pub type TraceErrHandle<K, T, R> = TraceAgent<ErrSpine<K, T, R>>;
pub type KeysValsHandle = TraceRowHandle<Row, Row, Timestamp, Diff>;
pub type ErrsHandle = TraceErrHandle<DataflowError, Timestamp, Diff>;

View File

@@ -0,0 +1 @@

16
src/flow/src/expr/func.rs Normal file
View File

@@ -0,0 +1,16 @@
use crate::repr::Row;
/// Stateless functions
#[derive(Debug, Clone)]
pub enum Func {
BuiltIn(BuiltInFunc),
Custom(fn(Row) -> Row),
}
#[derive(Debug, Clone)]
pub enum BuiltInFunc {
Not,
IsNull,
IsTrue,
IsFalse,
}

View File

@@ -1,5 +1,23 @@
//! for declare dataflow description that is the last step before build dataflow
mod func;
mod id;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
pub use id::{GlobalId, Id, LocalId};
use serde::{Deserialize, Serialize};
use crate::storage::errors::DataflowError;
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub enum ScalarExpr {
/// A column of the input row
Column(usize),
/// A literal value.
Literal(Result<Value, DataflowError>, ConcreteDataType),
CallFunc {
func: String,
exprs: Vec<ScalarExpr>,
},
}

View File

@@ -1,133 +1,6 @@
mod adapter;
mod compute;
mod expr;
mod render;
mod repr;
mod storage;
/// Record count difference type.
pub type Diff = i64;
#[test]
fn demo_multitemporal() {
use differential_dataflow::input::InputSession;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::{Count, Join};
use serde::{Deserialize, Serialize};
use timely::progress::timestamp::Refines;
#[derive(Debug, Clone, Default, Eq, PartialEq, Hash, Deserialize, Serialize)]
/// (System, Event)
struct MT(usize, usize);
impl Ord for MT {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.cmp(&other.0).then(self.1.cmp(&other.1))
}
}
impl PartialOrd for MT {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.cmp(other).into()
}
}
impl timely::PartialOrder for MT {
fn less_equal(&self, other: &Self) -> bool {
self.0 <= other.0 && self.1 <= other.1
}
}
impl timely::order::TotalOrder for MT {}
impl timely::progress::Timestamp for MT {
type Summary = MT;
fn minimum() -> Self {
Self(0, 0)
}
}
impl timely::progress::PathSummary<MT> for MT {
fn results_in(&self, src: &MT) -> Option<MT> {
self.0
.results_in(&src.0)
.and_then(|x| self.1.results_in(&src.1).map(|y| MT(x, y)))
//.and_then(|x| self.1.results_in(&src.1).map(|y| MT(x, y)))
}
fn followed_by(&self, other: &Self) -> Option<Self> {
self.0
.followed_by(&other.0)
.and_then(|x| self.1.followed_by(&other.1).map(|y| MT(x, y)))
}
}
impl Refines<()> for MT {
fn to_inner(other: ()) -> Self {
Self(0, 0)
}
fn to_outer(self) -> () {
todo!()
}
fn summarize(
path: <Self as timely::progress::Timestamp>::Summary,
) -> <() as timely::progress::Timestamp>::Summary {
todo!()
}
}
impl Lattice for MT {
fn join(&self, other: &Self) -> Self {
Self(self.0.max(other.0), self.1.max(other.1))
}
fn meet(&self, other: &Self) -> Self {
Self(self.0.min(other.0), self.1.min(other.1))
}
}
// define a new timely dataflow computation.
timely::execute_from_args(["w2".to_string()].into_iter(), move |worker| {
// create an input collection of data.
let mut input = InputSession::new();
// define a new computation.
let probe = worker.dataflow(|scope| {
// create a new collection from our input.
let manages = input.to_collection(scope);
// if (m2, m1) and (m1, p), then output (m1, (m2, p))
manages
.map(|(m2, m1)| (m1, m2))
.join(&manages)
.count()
//.inspect(|x| println!("{:?}", x))
.probe()
});
// Read a size for our organization from the arguments.
let size = 10;
// Load input (a binary tree).
input.advance_to(MT(0, 0));
let mut person = worker.index();
while person < size {
input.insert((person / 2, person));
person += worker.peers();
}
// wait for data loading.
input.advance_to(MT(0, 0));
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
println!("{:?}\tdata loaded", worker.timer().elapsed());
let mut person = 1 + worker.index();
while person < size {
input.remove((person / 2, person));
input.insert((person / 3, person));
input.advance_to(MT(0, person));
input.flush();
while probe.less_than(&input.time()) {
worker.step();
}
println!("{:?}\tstep {} complete", worker.timer().elapsed(), person);
person += worker.peers();
}
})
.expect("Computation terminated abnormally");
}

View File

@@ -0,0 +1,2 @@
//! for building the flow graph from PLAN
//! this is basically the last step before actually running the flow graph

View File

@@ -1,9 +1,35 @@
//! basically a wrapper around the `datatype` crate
//! for basic Data Representation
use std::borrow::Borrow;
use datatypes::value::Value;
/// System-wide Record count difference type.
pub type Diff = i64;
/// A row is a vector of values.
///
/// TODO: use a more efficient representation
/// i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\]
pub type Row = Vec<Value>;
#[derive(Clone)]
pub struct Row {
/// TODO: use a more efficient representation
///
/// i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\]
inner: Vec<Value>,
}
impl Row {
pub fn pack<I>(iter: I) -> Row
where
I: IntoIterator<Item = Value>,
{
Self {
inner: iter.into_iter().collect(),
}
}
pub fn unpack(&self) -> Vec<Value> {
self.inner.clone()
}
}
/// System-wide default timestamp type
pub type Timestamp = u64;

View File

@@ -0,0 +1,11 @@
use serde::{Deserialize, Serialize};
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum DataflowError {
EvalError(EvalError),
}
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum EvalError {
DivisionByZero,
}

View File

@@ -1 +1 @@
pub(crate) mod errors;