diff --git a/src/flow/src/util/mod.rs b/src/flow/src/util/mod.rs new file mode 100644 index 0000000000..2ac02b7630 --- /dev/null +++ b/src/flow/src/util/mod.rs @@ -0,0 +1,5 @@ +//! utilitys including extend differential dataflow to deal with errors and etc. +mod operator; +mod reduce; +pub use operator::CollectionExt; +pub use reduce::ReduceExt; diff --git a/src/flow/src/util/operator.rs b/src/flow/src/util/operator.rs new file mode 100644 index 0000000000..e506f72f94 --- /dev/null +++ b/src/flow/src/util/operator.rs @@ -0,0 +1,215 @@ +use differential_dataflow::difference::{Multiply, Semigroup}; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::operators::arrange::Arrange; +use differential_dataflow::trace::{Batch, Trace, TraceReader}; +use differential_dataflow::{AsCollection, Collection}; +use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline}; +use timely::dataflow::channels::pushers::Tee; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc; +use timely::dataflow::operators::generic::operator::{self, Operator}; +use timely::dataflow::operators::generic::{InputHandle, OperatorInfo, OutputHandle}; +use timely::dataflow::operators::Capability; +use timely::dataflow::{Scope, Stream}; +use timely::{Data, ExchangeData}; + +pub trait StreamExt +where + D1: Data, + G: Scope, +{ + /// Like `timely::dataflow::operators::generic::operator::Operator::unary`, + /// but the logic function can handle failures. + /// + /// Creates a new dataflow operator that partitions its input stream by a + /// parallelization strategy `pact` and repeatedly invokes `logic`, the + /// function returned by the function passed as `constructor`. The `logic` + /// function can read to the input stream and write to either of two output + /// streams, where the first output stream represents successful + /// computations and the second output stream represents failed + /// computations. + fn unary_fallible( + &self, + pact: P, + name: &str, + constructor: B, + ) -> (Stream, Stream) + where + D2: Data, + E: Data, + B: FnOnce( + Capability, + OperatorInfo, + ) -> Box< + dyn FnMut( + &mut InputHandle, + &mut OutputHandle>, + &mut OutputHandle>, + ) + 'static, + >, + P: ParallelizationContract; + + /// Like [`timely::dataflow::operators::map::Map::flat_map`], but `logic` + /// is allowed to fail. The first returned stream will contain the + /// successful applications of `logic`, while the second returned stream + /// will contain the failed applications. + fn flat_map_fallible(&self, name: &str, logic: L) -> (Stream, Stream) + where + D2: Data, + E: Data, + I: IntoIterator>, + L: FnMut(D1) -> I + 'static; +} + +/// Extension methods for differential [`Collection`]s. +pub trait CollectionExt +where + G: Scope, + R: Semigroup, +{ + /// Creates a new empty collection in `scope`. + fn empty(scope: &G) -> Collection; + + /// Like [`Collection::map`], but `logic` is allowed to fail. The first + /// returned collection will contain successful applications of `logic`, + /// while the second returned collection will contain the failed + /// applications. + fn map_fallible( + &self, + name: &str, + mut logic: L, + ) -> (Collection, Collection) + where + D2: Data, + E: Data, + L: FnMut(D1) -> Result + 'static, + { + self.flat_map_fallible(name, move |record| Some(logic(record))) + } + + /// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first + /// returned collection will contain the successful applications of `logic`, + /// while the second returned collection will contain the failed + /// applications. + fn flat_map_fallible( + &self, + name: &str, + logic: L, + ) -> (Collection, Collection) + where + D2: Data, + E: Data, + I: IntoIterator>, + L: FnMut(D1) -> I + 'static; +} + +impl StreamExt for Stream +where + D1: Data, + G: Scope, +{ + fn unary_fallible( + &self, + pact: P, + name: &str, + constructor: B, + ) -> (Stream, Stream) + where + D2: Data, + E: Data, + B: FnOnce( + Capability, + OperatorInfo, + ) -> Box< + dyn FnMut( + &mut InputHandle, + &mut OutputHandle>, + &mut OutputHandle>, + ) + 'static, + >, + P: ParallelizationContract, + { + let mut builder = OperatorBuilderRc::new(name.into(), self.scope()); + builder.set_notify(false); + + let operator_info = builder.operator_info(); + + let mut input = builder.new_input(self, pact); + let (mut ok_output, ok_stream) = builder.new_output(); + let (mut err_output, err_stream) = builder.new_output(); + + builder.build(move |mut capabilities| { + // `capabilities` should be a single-element vector. + let capability = capabilities.pop().unwrap(); + let mut logic = constructor(capability, operator_info); + move |_frontiers| { + let mut ok_output_handle = ok_output.activate(); + let mut err_output_handle = err_output.activate(); + logic(&mut input, &mut ok_output_handle, &mut err_output_handle); + } + }); + + (ok_stream, err_stream) + } + + #[allow(clippy::redundant_closure)] + fn flat_map_fallible( + &self, + name: &str, + mut logic: L, + ) -> (Stream, Stream) + where + D2: Data, + E: Data, + I: IntoIterator>, + L: FnMut(D1) -> I + 'static, + { + let mut storage = Vec::new(); + self.unary_fallible(Pipeline, name, move |_, _| { + Box::new(move |input, ok_output, err_output| { + input.for_each(|time, data| { + let mut ok_session = ok_output.session(&time); + let mut err_session = err_output.session(&time); + data.swap(&mut storage); + for r in storage.drain(..).flat_map(|d1| logic(d1)) { + match r { + Ok(d2) => ok_session.give(d2), + Err(e) => err_session.give(e), + } + } + }) + }) + }) + } +} + +impl CollectionExt for Collection +where + G: Scope, + G::Timestamp: Data, + D1: Data, + R: Semigroup, +{ + fn empty(scope: &G) -> Collection { + operator::empty(scope).as_collection() + } + + fn flat_map_fallible( + &self, + name: &str, + mut logic: L, + ) -> (Collection, Collection) + where + D2: Data, + E: Data, + I: IntoIterator>, + L: FnMut(D1) -> I + 'static, + { + let (ok_stream, err_stream) = self.inner.flat_map_fallible(name, move |(d1, t, r)| { + logic(d1).into_iter().map(move |res| match res { + Ok(d2) => Ok((d2, t.clone(), r.clone())), + Err(e) => Err((e, t.clone(), r.clone())), + }) + }); + (ok_stream.as_collection(), err_stream.as_collection()) + } +} diff --git a/src/flow/src/util/reduce.rs b/src/flow/src/util/reduce.rs new file mode 100644 index 0000000000..c9e981acce --- /dev/null +++ b/src/flow/src/util/reduce.rs @@ -0,0 +1,68 @@ +use differential_dataflow::difference::{Abelian, Semigroup}; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; +use differential_dataflow::operators::reduce::ReduceCore; +use differential_dataflow::trace::{Batch, Trace, TraceReader}; +use differential_dataflow::Data; +use timely::dataflow::Scope; + +/// Extension trait for `ReduceCore`, currently providing a reduction based +/// on an operator-pair approach. +pub trait ReduceExt +where + G::Timestamp: Lattice + Ord, +{ + /// This method produces a reduction pair based on the same input arrangement. Each reduction + /// in the pair operates with its own logic and the two output arrangements from the reductions + /// are produced as a result. The method is useful for reductions that need to present different + /// output views on the same input data. An example is producing an error-free reduction output + /// along with a separate error output indicating when the error-free output is valid. + fn reduce_pair( + &self, + name1: &str, + name2: &str, + logic1: L1, + logic2: L2, + ) -> (Arranged>, Arranged>) + where + T1: Trace + TraceReader + 'static, + T1::Val: Data, + T1::R: Abelian, + T1::Batch: Batch, + L1: FnMut(&K, &[(&V, R)], &mut Vec<(T1::Val, T1::R)>) + 'static, + T2: Trace + TraceReader + 'static, + T2::Val: Data, + T2::R: Abelian, + T2::Batch: Batch, + L2: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static; +} + +impl ReduceExt for Arranged +where + G::Timestamp: Lattice + Ord, + Tr: TraceReader + Clone + 'static, +{ + fn reduce_pair( + &self, + name1: &str, + name2: &str, + logic1: L1, + logic2: L2, + ) -> (Arranged>, Arranged>) + where + T1: Trace + TraceReader + 'static, + T1::Val: Data, + T1::R: Abelian, + T1::Batch: Batch, + L1: FnMut(&K, &[(&V, R)], &mut Vec<(T1::Val, T1::R)>) + 'static, + T2: Trace + TraceReader + 'static, + T2::Val: Data, + T2::R: Abelian, + T2::Batch: Batch, + L2: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static, + { + let arranged1 = self.reduce_abelian::(name1, logic1); + let arranged2 = self.reduce_abelian::(name2, logic2); + (arranged1, arranged2) + } +}