basic skeleton

This commit is contained in:
Discord9
2023-07-18 15:46:36 +08:00
parent b52eb2313e
commit 13c02f3f92
11 changed files with 119 additions and 11 deletions

View File

@@ -46,7 +46,7 @@ members = [
"src/sql",
"src/storage",
"src/store-api",
"src/stream",
"src/flow",
"src/table",
"src/table-procedure",
"tests-integration",

View File

@@ -6,7 +6,11 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
timely = "0.12"
timely = {version = "0.12", features=["bincode"]}
differential-dataflow = "0.12"
abomonation = "0.7.3"
abomonation_derive = "0.5"
abomonation_derive = "0.5"
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
serde = {version = "1.0", features = ["derive"]}
datatypes = { path = "../datatypes" }

View File

@@ -0,0 +1 @@
//! for getting data from source and sending results to sink

View File

@@ -0,0 +1,55 @@
use std::marker::PhantomData;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::Collection;
use serde::{Deserialize, Serialize};
use timely::dataflow::Scope;
use timely::progress::timestamp::Refines;
use timely::progress::{Antichain, Timestamp};
use timely::Data;
use crate::Diff;
pub struct Context<S, T>
where
T: Timestamp + Lattice,
S: Scope,
S::Timestamp: Lattice + Refines<T>,
{
/// The scope within which all managed collections exist.
///
/// It is an error to add any collections not contained in this scope.
pub(crate) scope: S,
/// The debug name of the dataflow associated with this context.
pub debug_name: String,
/// The Timely ID of the dataflow associated with this context.
pub dataflow_id: usize,
/// Frontier before which updates should not be emitted.
///
/// We *must* apply it to sinks, to ensure correct outputs.
/// We *should* apply it to sources and imported traces, because it improves performance.
pub since_frontier: Antichain<T>,
/// Frontier after which updates should not be emitted.
/// Used to limit the amount of work done when appropriate.
pub upper_frontier: Antichain<T>,
}
#[derive(Clone)]
pub struct CollectionBundle<S: Scope, V: Data, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
{
pub(crate) collection: Collection<S, V, Diff>,
/// TODO: impl arranged in memory
pub(crate) arranged: PhantomData<T>,
}
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum DataflowError {
EvalError(EvalError),
}
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum EvalError {
DivisionByZero,
}

View File

@@ -0,0 +1,4 @@
//! for generate dataflow from logical plan and computing the dataflow
mod context;
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;

22
src/flow/src/expr/id.rs Normal file
View File

@@ -0,0 +1,22 @@
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum GlobalId {
/// System namespace.
System(u64),
/// User namespace.
User(u64),
/// Transient namespace.
Transient(u64),
/// Dummy id for query being explained
Explain,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct LocalId(pub(crate) u64);
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum Id {
/// An identifier that refers to a local component of a dataflow.
Local(LocalId),
/// An identifier that refers to a global dataflow.
Global(GlobalId),
}

0
src/flow/src/expr/lir.rs Normal file
View File

5
src/flow/src/expr/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
//! for declare dataflow description that is the last step before build dataflow
mod id;
pub use id::{GlobalId, Id, LocalId};

View File

@@ -1,13 +1,20 @@
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::Count;
use timely::progress::timestamp::Refines;
mod adapter;
mod compute;
mod expr;
mod repr;
mod storage;
/// Record count difference type.
pub type Diff = i64;
#[test]
fn demo() {
use abomonation_derive::Abomonation;
fn demo_multitemporal() {
use differential_dataflow::input::InputSession;
use differential_dataflow::operators::Join;
#[derive(Debug, Clone, Default, Eq, PartialEq, Hash, Abomonation)]
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::{Count, Join};
use serde::{Deserialize, Serialize};
use timely::progress::timestamp::Refines;
#[derive(Debug, Clone, Default, Eq, PartialEq, Hash, Deserialize, Serialize)]
/// (System, Event)
struct MT(usize, usize);
impl Ord for MT {
@@ -91,7 +98,7 @@ fn demo() {
});
// Read a size for our organization from the arguments.
let size = 100;
let size = 10;
// Load input (a binary tree).
input.advance_to(MT(0, 0));

9
src/flow/src/repr/mod.rs Normal file
View File

@@ -0,0 +1,9 @@
//! basically a wrapper around the `datatype` crate
//! for basic Data Representation
use datatypes::value::Value;
/// A row is a vector of values.
///
/// TODO: use a more efficient representation
/// i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\]
pub type Row = Vec<Value>;

View File

@@ -0,0 +1 @@