feat: flow util func

This commit is contained in:
Discord9
2023-08-02 17:02:21 +08:00
parent 54f2f6495f
commit 045c8079e6
3 changed files with 288 additions and 0 deletions

5
src/flow/src/util/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
//! utilitys including extend differential dataflow to deal with errors and etc.
mod operator;
mod reduce;
pub use operator::CollectionExt;
pub use reduce::ReduceExt;

View File

@@ -0,0 +1,215 @@
use differential_dataflow::difference::{Multiply, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::{Batch, Trace, TraceReader};
use differential_dataflow::{AsCollection, Collection};
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
use timely::dataflow::operators::generic::operator::{self, Operator};
use timely::dataflow::operators::generic::{InputHandle, OperatorInfo, OutputHandle};
use timely::dataflow::operators::Capability;
use timely::dataflow::{Scope, Stream};
use timely::{Data, ExchangeData};
pub trait StreamExt<G, D1>
where
D1: Data,
G: Scope,
{
/// Like `timely::dataflow::operators::generic::operator::Operator::unary`,
/// but the logic function can handle failures.
///
/// Creates a new dataflow operator that partitions its input stream by a
/// parallelization strategy `pact` and repeatedly invokes `logic`, the
/// function returned by the function passed as `constructor`. The `logic`
/// function can read to the input stream and write to either of two output
/// streams, where the first output stream represents successful
/// computations and the second output stream represents failed
/// computations.
fn unary_fallible<D2, E, B, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> (Stream<G, D2>, Stream<G, E>)
where
D2: Data,
E: Data,
B: FnOnce(
Capability<G::Timestamp>,
OperatorInfo,
) -> Box<
dyn FnMut(
&mut InputHandle<G::Timestamp, D1, P::Puller>,
&mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>,
&mut OutputHandle<G::Timestamp, E, Tee<G::Timestamp, E>>,
) + 'static,
>,
P: ParallelizationContract<G::Timestamp, D1>;
/// Like [`timely::dataflow::operators::map::Map::flat_map`], but `logic`
/// is allowed to fail. The first returned stream will contain the
/// successful applications of `logic`, while the second returned stream
/// will contain the failed applications.
fn flat_map_fallible<D2, E, I, L>(&self, name: &str, logic: L) -> (Stream<G, D2>, Stream<G, E>)
where
D2: Data,
E: Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut(D1) -> I + 'static;
}
/// Extension methods for differential [`Collection`]s.
pub trait CollectionExt<G, D1, R>
where
G: Scope,
R: Semigroup,
{
/// Creates a new empty collection in `scope`.
fn empty(scope: &G) -> Collection<G, D1, R>;
/// Like [`Collection::map`], but `logic` is allowed to fail. The first
/// returned collection will contain successful applications of `logic`,
/// while the second returned collection will contain the failed
/// applications.
fn map_fallible<D2, E, L>(
&self,
name: &str,
mut logic: L,
) -> (Collection<G, D2, R>, Collection<G, E, R>)
where
D2: Data,
E: Data,
L: FnMut(D1) -> Result<D2, E> + 'static,
{
self.flat_map_fallible(name, move |record| Some(logic(record)))
}
/// Like [`Collection::flat_map`], but `logic` is allowed to fail. The first
/// returned collection will contain the successful applications of `logic`,
/// while the second returned collection will contain the failed
/// applications.
fn flat_map_fallible<D2, E, I, L>(
&self,
name: &str,
logic: L,
) -> (Collection<G, D2, R>, Collection<G, E, R>)
where
D2: Data,
E: Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut(D1) -> I + 'static;
}
impl<G, D1> StreamExt<G, D1> for Stream<G, D1>
where
D1: Data,
G: Scope,
{
fn unary_fallible<D2, E, B, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> (Stream<G, D2>, Stream<G, E>)
where
D2: Data,
E: Data,
B: FnOnce(
Capability<G::Timestamp>,
OperatorInfo,
) -> Box<
dyn FnMut(
&mut InputHandle<G::Timestamp, D1, P::Puller>,
&mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>,
&mut OutputHandle<G::Timestamp, E, Tee<G::Timestamp, E>>,
) + 'static,
>,
P: ParallelizationContract<G::Timestamp, D1>,
{
let mut builder = OperatorBuilderRc::new(name.into(), self.scope());
builder.set_notify(false);
let operator_info = builder.operator_info();
let mut input = builder.new_input(self, pact);
let (mut ok_output, ok_stream) = builder.new_output();
let (mut err_output, err_stream) = builder.new_output();
builder.build(move |mut capabilities| {
// `capabilities` should be a single-element vector.
let capability = capabilities.pop().unwrap();
let mut logic = constructor(capability, operator_info);
move |_frontiers| {
let mut ok_output_handle = ok_output.activate();
let mut err_output_handle = err_output.activate();
logic(&mut input, &mut ok_output_handle, &mut err_output_handle);
}
});
(ok_stream, err_stream)
}
#[allow(clippy::redundant_closure)]
fn flat_map_fallible<D2, E, I, L>(
&self,
name: &str,
mut logic: L,
) -> (Stream<G, D2>, Stream<G, E>)
where
D2: Data,
E: Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut(D1) -> I + 'static,
{
let mut storage = Vec::new();
self.unary_fallible(Pipeline, name, move |_, _| {
Box::new(move |input, ok_output, err_output| {
input.for_each(|time, data| {
let mut ok_session = ok_output.session(&time);
let mut err_session = err_output.session(&time);
data.swap(&mut storage);
for r in storage.drain(..).flat_map(|d1| logic(d1)) {
match r {
Ok(d2) => ok_session.give(d2),
Err(e) => err_session.give(e),
}
}
})
})
})
}
}
impl<G, D1, R> CollectionExt<G, D1, R> for Collection<G, D1, R>
where
G: Scope,
G::Timestamp: Data,
D1: Data,
R: Semigroup,
{
fn empty(scope: &G) -> Collection<G, D1, R> {
operator::empty(scope).as_collection()
}
fn flat_map_fallible<D2, E, I, L>(
&self,
name: &str,
mut logic: L,
) -> (Collection<G, D2, R>, Collection<G, E, R>)
where
D2: Data,
E: Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut(D1) -> I + 'static,
{
let (ok_stream, err_stream) = self.inner.flat_map_fallible(name, move |(d1, t, r)| {
logic(d1).into_iter().map(move |res| match res {
Ok(d2) => Ok((d2, t.clone(), r.clone())),
Err(e) => Err((e, t.clone(), r.clone())),
})
});
(ok_stream.as_collection(), err_stream.as_collection())
}
}

View File

@@ -0,0 +1,68 @@
use differential_dataflow::difference::{Abelian, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::trace::{Batch, Trace, TraceReader};
use differential_dataflow::Data;
use timely::dataflow::Scope;
/// Extension trait for `ReduceCore`, currently providing a reduction based
/// on an operator-pair approach.
pub trait ReduceExt<G: Scope, K: Data, V: Data, R: Semigroup>
where
G::Timestamp: Lattice + Ord,
{
/// This method produces a reduction pair based on the same input arrangement. Each reduction
/// in the pair operates with its own logic and the two output arrangements from the reductions
/// are produced as a result. The method is useful for reductions that need to present different
/// output views on the same input data. An example is producing an error-free reduction output
/// along with a separate error output indicating when the error-free output is valid.
fn reduce_pair<L1, T1, L2, T2>(
&self,
name1: &str,
name2: &str,
logic1: L1,
logic2: L2,
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
where
T1: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T1::Val: Data,
T1::R: Abelian,
T1::Batch: Batch,
L1: FnMut(&K, &[(&V, R)], &mut Vec<(T1::Val, T1::R)>) + 'static,
T2: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T2::Val: Data,
T2::R: Abelian,
T2::Batch: Batch,
L2: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static;
}
impl<G: Scope, K: Data, V: Data, Tr, R: Semigroup> ReduceExt<G, K, V, R> for Arranged<G, Tr>
where
G::Timestamp: Lattice + Ord,
Tr: TraceReader<Key = K, Val = V, Time = G::Timestamp, R = R> + Clone + 'static,
{
fn reduce_pair<L1, T1, L2, T2>(
&self,
name1: &str,
name2: &str,
logic1: L1,
logic2: L2,
) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
where
T1: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T1::Val: Data,
T1::R: Abelian,
T1::Batch: Batch,
L1: FnMut(&K, &[(&V, R)], &mut Vec<(T1::Val, T1::R)>) + 'static,
T2: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T2::Val: Data,
T2::R: Abelian,
T2::Batch: Batch,
L2: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static,
{
let arranged1 = self.reduce_abelian::<L1, T1>(name1, logic1);
let arranged2 = self.reduce_abelian::<L2, T2>(name2, logic2);
(arranged1, arranged2)
}
}