test: simple render

This commit is contained in:
Discord9
2023-08-16 14:16:53 +08:00
parent 76d8709774
commit c80165c377
8 changed files with 111 additions and 18 deletions

View File

@@ -0,0 +1,22 @@
use std::collections::BTreeMap;
use crate::expr::GlobalId;
/// Worker-local state that is maintained across dataflows.
///
/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
/// done between data ingress and egress.
pub struct ComputeState {
/// State kept for each installed compute collection.
///
/// Each collection has exactly one frontier.
/// How the frontier is communicated depends on the collection type:
/// * Frontiers of indexes are equal to the frontier of their corresponding traces in the
/// `TraceManager`.
/// * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
/// * Subscribes report their frontiers through the `subscribe_response_buffer`.
pub collections: BTreeMap<GlobalId, CollectionState>,
}
/// State maintained for a compute collection.
pub struct CollectionState {}

View File

@@ -14,6 +14,8 @@ use timely::dataflow::{Scope, ScopeParent};
use timely::progress::timestamp::Refines;
use timely::progress::{Antichain, Timestamp};
use super::plan::Plan;
use super::types::DataflowDescription;
use crate::compute::render::RenderTimestamp;
use crate::compute::typedefs::{TraceErrHandle, TraceRowHandle};
use crate::expr::{GlobalId, Id, MapFilterProject, ScalarExpr};
@@ -225,10 +227,22 @@ where
S::Timestamp: Lattice + Refines<repr::Timestamp>,
{
/// TODO(discord9)" DataflowDesc & Plan & etc.
pub fn for_dataflow_in(scope: S) -> Self {
/// Creates a new empty Context from given dataflow
pub fn for_dataflow_in<Plan>(dataflow: &DataflowDescription<Plan, ()>, scope: S) -> Self {
let dataflow_id = scope.addr()[0];
let since_frontier = dataflow
.as_of
.clone()
.unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
// TODO(discord9)=: get since_frontier and until_frontier from dataflow_desc
todo!()
Self {
scope,
debug_name: dataflow.debug_name.clone(),
dataflow_id,
since_frontier,
until_frontier: dataflow.until.clone(),
bindings: BTreeMap::new(),
}
}
}

View File

@@ -1,4 +1,5 @@
//! for generate dataflow from logical plan and computing the dataflow
mod compute_state;
mod context;
mod plan;
mod render;

View File

@@ -85,6 +85,7 @@ impl AvailableCollections {
/// Rendering Plan
///
/// TODO(discord9): see if we ever need to support recursive plans
#[derive(Debug, Clone)]
pub enum Plan<T = repr::Timestamp> {
/// A collection containing a pre-determined collection.
Constant {
@@ -188,6 +189,7 @@ pub enum Plan<T = repr::Timestamp> {
}
/// TODO(discord9): impl GetPlan
#[derive(Debug, Clone)]
pub enum GetPlan {
/// Simply pass input arrangements on to the next stage.
PassArrangements,

View File

@@ -3,12 +3,16 @@
use differential_dataflow::lattice::Lattice;
use differential_dataflow::AsCollection;
use timely::communication::Allocate;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::operators::{Capture, ToStream};
use timely::dataflow::Scope;
use timely::progress::timestamp::Refines;
use timely::progress::Timestamp;
use timely::worker::Worker as TimelyWorker;
use super::types::DataflowDescription;
use crate::compute::compute_state::ComputeState;
use crate::compute::context::CollectionBundle;
use crate::compute::plan::Plan;
use crate::compute::types::BuildDesc;
@@ -19,6 +23,19 @@ use crate::storage::errors::DataflowError;
mod reduce;
/// Assemble the "compute" side of a dataflow, i.e. all but the sources.
///
/// This method imports sources from provided assets, and then builds the remaining
/// dataflow using "compute-local" assets like shared arrangements, and producing
/// both arrangements and sinks.
pub fn build_compute_dataflow<A: Allocate>(
timely_worker: &mut TimelyWorker<A>,
compute_state: &mut ComputeState,
dataflow: DataflowDescription<Plan, ()>,
) {
todo!()
}
pub trait RenderTimestamp: Timestamp + Lattice + Refines<repr::Timestamp> {
/// The system timestamp component of the timestamp.
///
@@ -60,6 +77,7 @@ where
G: Scope,
G::Timestamp: RenderTimestamp,
{
/// render plan and insert into context with given GlobalId
pub(crate) fn build_object(&mut self, object: BuildDesc<Plan>) {
// First, transform the relation expression into a render plan.
let bundle = self.render_plan(object.plan);
@@ -79,6 +97,7 @@ where
pub fn render_plan(&mut self, plan: Plan) -> CollectionBundle<S, Row> {
match plan {
Plan::Constant { rows } => {
dbg!(&rows);
let (rows, errs) = match rows {
Ok(rows) => (rows, Vec::new()),
Err(err) => (Vec::new(), vec![err]),
@@ -88,6 +107,7 @@ where
let ok_collection = rows
.into_iter()
.filter_map(move |(row, mut time, diff)| {
dbg!(&row);
time.advance_by(since_frontier.borrow());
if !until.less_equal(&time) {
Some((
@@ -198,7 +218,7 @@ where
let input = self.render_plan(*input);
self.render_reduce(input, key_val_plan, plan, input_key)
}
_ => todo!(),
_ => todo!("To be implemented"),
}
}
}
@@ -209,25 +229,45 @@ mod test {
use timely::dataflow::scopes::Child;
use super::*;
use crate::expr::LocalId;
use crate::expr::{GlobalId, LocalId};
use crate::repr::Diff;
#[test]
fn test_simple_plan_render() {
timely::execute_from_args(std::env::args(), move |worker| {
#[allow(clippy::print_stdout)]
fn test_constant_plan_render() {
let build_descs = vec![BuildDesc {
id: GlobalId::User(0),
plan: Plan::Constant {
rows: Ok(vec![(Row::default(), 0, 1)]),
},
}];
let dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
timely::execute_from_args(std::iter::empty::<String>(), move |worker| {
println!("worker: {:?}", worker.index());
let mut input = InputSession::<repr::Timestamp, Row, Diff>::new();
worker.dataflow(|scope: &mut Child<'_, _, repr::Timestamp>| {
let mut test_ctx = Context::<_, Row, _>::for_dataflow_in(scope.clone());
let plan = Plan::Constant {
rows: Ok(vec![(Row::default(), 0, 1)]),
};
let mut test_ctx = Context::<_, Row, _>::for_dataflow_in(&dataflow, scope.clone());
for build_desc in &build_descs {
test_ctx.build_object(build_desc.clone());
}
let input_collection = input.to_collection(scope);
let err_collection = InputSession::new().to_collection(scope);
let input_collection =
CollectionBundle::from_collections(input_collection, err_collection);
// insert collection
test_ctx.insert_id(Id::Local(LocalId(0)), input_collection);
let bundle = test_ctx.render_plan(plan);
let inspect = test_ctx
.lookup_id(Id::Global(GlobalId::User(0)))
.unwrap()
.as_specific_collection(None);
inspect.0.inspect(|x| println!("inspect {:?}", x));
});
});
// input.insert(Row::default());
input.update(Row::default(), 1);
input.advance_to(1);
})
.expect("Computation terminated abnormally");
}
}

View File

@@ -41,6 +41,22 @@ pub struct DataflowDescription<P, S: 'static = (), T = repr::Timestamp> {
pub debug_name: String,
}
impl<P, T> DataflowDescription<P, (), T> {
/// Creates a new dataflow description with a human-readable name.
pub fn new(name: String) -> Self {
Self {
source_imports: Default::default(),
index_imports: Default::default(),
objects_to_build: Vec::new(),
index_exports: Default::default(),
sink_exports: Default::default(),
as_of: Default::default(),
until: Antichain::new(),
debug_name: name,
}
}
}
/// An association of a global identifier to an expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct BuildDesc<P = Plan> {

View File

@@ -5,9 +5,4 @@ mod dataflow;
mod sinks;
mod sources;
/// An association of a global identifier to an expression.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct BuildDesc<P> {
pub id: GlobalId,
pub plan: P,
}
pub(crate) use dataflow::{BuildDesc, DataflowDescription, IndexDesc};

View File

@@ -1 +1,4 @@
//! TODO: Storage Layer: wrap grpc write request for providing definite collection for streaming process, and able to send read request should random access is needed
//! and store result of stream processing
pub(crate) mod errors;