feat: build_accumulable

This commit is contained in:
Discord9
2023-08-04 16:33:28 +08:00
parent 045c8079e6
commit 2cf7d6d569
12 changed files with 995 additions and 39 deletions

4
Cargo.lock generated
View File

@@ -2860,7 +2860,7 @@ checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
[[package]]
name = "differential-dataflow"
version = "0.12.0"
source = "git+https://github.com/TimelyDataflow/differential-dataflow?rev=99fa67db#99fa67db2b92d2ee938c6ffef0912908de3ef288"
source = "git+https://github.com/TimelyDataflow/differential-dataflow#2b9ac68aab9a1bf3fc3e4c12fcabea9c9d1ecc6a"
dependencies = [
"abomonation",
"abomonation_derive",
@@ -3292,6 +3292,7 @@ dependencies = [
name = "flow"
version = "0.1.0"
dependencies = [
"common-telemetry",
"datafusion-expr",
"datafusion-substrait",
"datatypes",
@@ -10261,6 +10262,7 @@ source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2
dependencies = [
"abomonation",
"abomonation_derive",
"bincode",
"crossbeam-channel",
"getopts",
"serde",

View File

@@ -13,9 +13,13 @@ edition = "2021"
# timely = "0.12.0"
# differential-dataflow = "0.12.0"
# TODO(discord9): fork later for fixed version git dependency
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow", rev ="99fa67db"}
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = [
"bincode",
] }
differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } #, rev = "99fa67db" }
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
serde = { version = "1.0", features = ["derive"] }
datatypes = { path = "../datatypes" }
common-telemetry = { path = "../common/telemetry" }

View File

@@ -281,6 +281,10 @@ where
type ResultCollection<S, V> = (Collection<S, V, Diff>, Collection<S, DataflowError, Diff>);
/// A bundle of the various ways a collection can be represented.
///
/// This type maintains the invariant that it does contain at least one valid
/// source of data, either a collection or at least one arrangement.
#[derive(Clone)]
pub struct CollectionBundle<S, V, T = repr::Timestamp>
where
@@ -323,6 +327,18 @@ where
}
}
/// Inserts arrangements by the columns on which they are keyed.
pub fn from_columns<I: IntoIterator<Item = usize>>(
columns: I,
arrangements: ArrangementFlavor<S, V, T>,
) -> Self {
let mut keys = Vec::new();
for column in columns {
keys.push(ScalarExpr::Column(column));
}
Self::from_expressions(keys, arrangements)
}
/// The scope containing the collection bundle.
pub fn scope(&self) -> S {
if let Some((oks, _errs)) = &self.collection {
@@ -630,6 +646,13 @@ where
batch: C::Storage,
}
/// Handle specialized to `Vec`-based container.
type PendingOutputHandle<'a, C, I> = OutputHandle<
'a,
<C as Cursor>::Time,
<I as IntoIterator>::Item,
timely::dataflow::channels::pushers::Tee<<C as Cursor>::Time, <I as IntoIterator>::Item>,
>;
impl<C: Cursor> PendingWork<C>
where
C::Key: PartialEq,
@@ -649,12 +672,7 @@ where
key: &Option<C::Key>,
logic: &mut L,
fuel: &mut usize,
output: &mut OutputHandle<
'_,
C::Time,
I::Item,
timely::dataflow::channels::pushers::Tee<C::Time, I::Item>,
>,
output: &mut PendingOutputHandle<'_, C, I>,
) where
I: IntoIterator,
I::Item: Data,

View File

@@ -4,7 +4,7 @@ mod reduce;
use std::collections::BTreeMap;
use join::JoinPlan;
pub(crate) use reduce::{convert_indexes_to_skips, KeyValPlan, ReducePlan};
pub(crate) use reduce::{convert_indexes_to_skips, AccumulablePlan, KeyValPlan, ReducePlan};
use serde::{Deserialize, Serialize};
use crate::expr::{Id, LocalId, MapFilterProject, ScalarExpr, TableFunc};

View File

@@ -111,8 +111,63 @@ pub struct AccumulablePlan {
// TODO(discord9): others
/// Plan for computing a set of hierarchical aggregations.
///
/// In the append-only setting we can render them in-place
/// with monotonic plans, but otherwise, we need to render
/// them with a reduction tree that splits the inputs into
/// small, and then progressively larger, buckets
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum HierarchicalPlan {}
pub enum HierarchicalPlan {
/// Plan hierarchical aggregations under monotonic inputs.
Monotonic(MonotonicPlan),
/// Plan for hierarchical aggregations under non-monotonic inputs.
Bucketed(BucketedPlan),
}
/// Plan for computing a set of hierarchical aggregations with a
/// monotonic input.
///
/// Here, the aggregations will be rendered in place. We don't
/// need to worry about retractions because the inputs are
/// append only, so we can change our computation to
/// only retain the "best" value in the diff field, instead
/// of holding onto all values.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct MonotonicPlan {
/// All of the aggregations we were asked to compute.
pub aggr_funcs: Vec<AggregateFunc>,
/// Set of "skips" or calls to `nth()` an iterator needs to do over
/// the input to extract the relevant datums.
pub skips: Vec<usize>,
/// True if the input is logically but not physically monotonic,
/// and the operator must first consolidate the inputs to remove
/// potential negations.
pub must_consolidate: bool,
}
/// Plan for computing a set of hierarchical aggregations
/// with non-monotonic inputs.
///
/// To perform hierarchical aggregations with stable runtimes
/// under updates we'll subdivide the group key into buckets, compute
/// the reduction in each of those subdivided buckets and then combine
/// the results into a coarser bucket (one that represents a larger
/// fraction of the original input) and redo the reduction in another
/// layer. Effectively, we'll construct a min / max heap out of a series
/// of reduce operators (each one is a separate layer).
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct BucketedPlan {
/// All of the aggregations we were asked to compute.
pub aggr_funcs: Vec<AggregateFunc>,
/// Set of "skips" or calls to `nth()` an iterator needs to do over
/// the input to extract the relevant datums.
pub skips: Vec<usize>,
/// The number of buckets in each layer of the reduction tree. Should
/// be decreasing, and ideally, a power of two so that we can easily
/// distribute values to buckets with `value.hashed() % buckets[layer]`.
pub buckets: Vec<u64>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum BasicPlan {}

View File

@@ -1,18 +1,26 @@
use std::collections::BTreeMap;
use common_telemetry::logging;
use datatypes::value::Value;
use differential_dataflow::collection::AsCollection;
use differential_dataflow::difference::{Multiply, Semigroup};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::{Arrange, Arranged};
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::Collection;
use timely::dataflow::Scope;
use serde::{Deserialize, Serialize};
use timely::dataflow::{Scope, ScopeParent};
use timely::progress::timestamp::Refines;
use timely::progress::Timestamp;
use crate::compute::context::CollectionBundle;
use crate::compute::plan::{convert_indexes_to_skips, KeyValPlan, ReducePlan};
use crate::compute::context::{Arrangement, ArrangementFlavor, CollectionBundle};
use crate::compute::plan::{convert_indexes_to_skips, AccumulablePlan, KeyValPlan, ReducePlan};
use crate::compute::typedefs::{ErrValSpine, RowKeySpine, RowSpine};
use crate::compute::Context;
use crate::expr::ScalarExpr;
use crate::expr::{AggregateFunc, ScalarExpr};
use crate::repr::{Diff, Row};
use crate::storage::errors::DataflowError;
use crate::storage::errors::{DataflowError, EvalError};
use crate::util::{CollectionExt, ReduceExt};
impl<G, T> Context<G, Row, T>
where
@@ -22,6 +30,7 @@ where
{
/// Renders a `Plan::Reduce` using various non-obvious techniques to
/// minimize worst-case incremental update times and memory footprint.
#[allow(clippy::type_complexity)]
pub fn render_reduce(
&mut self,
input: CollectionBundle<G, Row, T>,
@@ -71,11 +80,7 @@ where
// Evaluate the key expressions.
let key = match key_plan.evaluate_into(&mut datums_local, &mut row_mfp) {
Err(e) => {
return Some((
Err(DataflowError::from(e)),
time.clone(),
diff.clone(),
))
return Some((Err(DataflowError::from(e)), time.clone(), *diff))
}
Ok(key) => key.expect("Row expected as no predicate was used"),
};
@@ -84,34 +89,29 @@ where
datums_local.truncate(skips.len());
let val = match val_plan.evaluate_iter(&mut datums_local) {
Err(e) => {
return Some((
Err(DataflowError::from(e)),
time.clone(),
diff.clone(),
))
return Some((Err(DataflowError::from(e)), time.clone(), *diff))
}
Ok(val) => val.expect("Row expected as no predicate was used"),
};
row_buf.clear();
row_buf.extend(val);
let row = row_buf.clone();
Some((Ok((key, row)), time.clone(), diff.clone()))
Some((Ok((key, row)), time.clone(), *diff))
}
});
/*
// TODO(discord9): deal with errors by adding utility methods to split dataflow
// TODO(discord9): find out how to do `consolidate_stream` without Abonomation
// Demux out the potential errors from key and value selector evaluation.
let (ok) = key_val_input
.as_collection();
let (ok, mut err) = key_val_input
.as_collection()
// .consolidate_stream()
// .flat_map_fallible("OkErrDemux", Some);
let err = err_input;
// err = err.concat(&err_input);
.flat_map_fallible("OkErrDemux", Some);
err = err.concat(&err_input);
// Render the reduce plan
self.render_reduce_plan(reduce_plan, ok, err, key_arity)
.leave_region()*/
todo!()
.leave_region()
})
}
@@ -130,6 +130,666 @@ where
where
S: Scope<Timestamp = G::Timestamp>,
{
let mut errors = Vec::default();
let arrangement = self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity);
CollectionBundle::from_columns(
0..key_arity,
ArrangementFlavor::Local(
arrangement,
err_input
.concatenate(errors)
.arrange_named("Arrange bundle err"),
),
)
}
fn render_reduce_plan_inner<S>(
&self,
plan: ReducePlan,
collection: Collection<S, (Row, Row), Diff>,
errors: &mut Vec<Collection<S, DataflowError, Diff>>,
key_arity: usize,
) -> Arrangement<S, Row>
where
S: Scope<Timestamp = G::Timestamp>,
{
let arrangement: Arrangement<S, Row> = match plan {
// If we have no aggregations or just a single type of reduction, we
// can go ahead and render them directly.
ReducePlan::Distinct => {
let (arranged_output, errs) = self.build_distinct(collection);
errors.push(errs);
arranged_output
}
ReducePlan::Accumulable(expr) => {
let (arranged_output, errs) = self.build_accumulable(collection, expr);
errors.push(errs);
arranged_output
}
// TODO(discord9): impl Distinct&Accumulate first
_ => todo!(),
};
todo!()
}
/// Build the dataflow to compute the set of distinct keys.
fn build_distinct<S>(
&self,
collection: Collection<S, (Row, Row), Diff>,
) -> (Arrangement<S, Row>, Collection<S, DataflowError, Diff>)
where
S: Scope<Timestamp = G::Timestamp>,
{
let (output, errors) = collection
.arrange_named::<RowSpine<_, _, _, _>>("Arranged DistinctBy")
.reduce_pair::<_, RowSpine<_, _, _, _>, _, ErrValSpine<_, _, _>>(
"DistinctBy",
"DistinctByErrorCheck",
|_key: &Row, _input: &[(&Row, i64)], output: &mut Vec<(Row, i64)>| {
// We're pushing an empty row here because the key is implicitly added by the
// arrangement, and the permutation logic takes care of using the key part of the
// output.
output.push((Row::default(), 1));
},
move |key, input: &[(_, Diff)], output| {
for (_, count) in input.iter() {
if count.is_positive() {
continue;
}
let message = "Non-positive multiplicity in DistinctBy";
output.push((EvalError::Internal(message.to_string()).into(), 1));
return;
}
},
);
(
output,
errors.as_collection(|_k: &Row, v: &DataflowError| v.clone()),
)
}
/// Build the dataflow to compute and arrange multiple accumulable aggregations.
///
/// The incoming values are moved to the update's "difference" field, at which point
/// they can be accumulated in place. The `count` operator promotes the accumulated
/// values to data, at which point a final map applies operator-specific logic to
/// yield the final aggregate.
fn build_accumulable<S>(
&self,
collection: Collection<S, (Row, Row), Diff>,
AccumulablePlan {
full_aggrs,
simple_aggrs,
distinct_aggrs,
}: AccumulablePlan,
) -> (Arrangement<S, Row>, Collection<S, DataflowError, Diff>)
where
S: Scope<Timestamp = G::Timestamp>,
{
// we must have called this function with something to reduce
if full_aggrs.is_empty() || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() {
panic!(
"Incorrect numbers of aggregates in accummulable reduction rendering: {}",
&format!(
"full_aggrs={}, simple_aggrs={}, distinct_aggrs={}",
full_aggrs.len(),
simple_aggrs.len(),
distinct_aggrs.len(),
),
);
}
// Some of the aggregations may have the `distinct` bit set, which means that they'll
// need to be extracted from `collection` and be subjected to `distinct` with `key`.
// Other aggregations can be directly moved in to the `diff` field.
//
// In each case, the resulting collection should have `data` shaped as `(key, ())`
// and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are
// generally the count, and then two aggregation-specific values. The size could be
// reduced if we want to specialize for the aggregations.
let float_scale = f64::from(1 << 24);
// Instantiate a default vector for diffs with the correct types at each
// position.
let zero_diffs: (Vec<_>, Diff) = (
full_aggrs
.iter()
.map(|f| match f.func {
AggregateFunc::Any | AggregateFunc::All => Accum::Bool {
trues: 0,
falses: 0,
},
AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float {
accum: 0,
pos_infs: 0,
neg_infs: 0,
nans: 0,
non_nulls: 0,
},
_ => Accum::SimpleNumber {
accum: 0,
non_nulls: 0,
},
})
.collect(),
0,
);
let mut to_aggregate = Vec::new();
if !simple_aggrs.is_empty() {
// First, collect all non-distinct aggregations in one pass.
let easy_cases = collection.explode_one({
let zero_diffs = zero_diffs.clone();
move |(key, row)| {
let mut diffs = zero_diffs.clone();
for (accumulable_index, datum_index, aggr) in simple_aggrs.iter() {
// Try to unpack only the datums we need
let datum = row.get(*datum_index).unwrap().clone();
diffs.0[*accumulable_index] =
Accum::value_to_accumulator(datum.clone(), &aggr.func);
diffs.1 = 1;
}
((key, ()), diffs)
}
});
to_aggregate.push(easy_cases);
}
// Next, collect all aggregations that require distinctness.
for (accumulable_index, datum_index, aggr) in distinct_aggrs.into_iter() {
let mut row_buf = Row::default();
let collection = {
let arranged: Arranged<S, _> = collection
.map(move |(key, row)| {
let value = row.get(datum_index).unwrap();
row_buf.packer().push(value.clone());
(key, row_buf.clone())
})
.map(|k| (k, ()))
.arrange_named::<RowKeySpine<(Row, Row), <G as ScopeParent>::Timestamp, Diff>>(
"Arranged Accumulable",
);
// note `arranged` for convenient of type-infer with r-a
// first distinct, then reduce
arranged
.reduce_abelian::<_, RowKeySpine<_, _, _>>(
"Reduced Accumulable",
move |_k, _s, t: &mut Vec<((), i64)>| t.push(((), 1)),
)
.as_collection(|k, _| k.clone())
.explode_one({
let zero_diffs = zero_diffs.clone();
move |(key, row)| {
let datum = row.iter().next().unwrap();
let mut diffs = zero_diffs.clone();
diffs.0[accumulable_index] =
Accum::value_to_accumulator(datum.clone(), &aggr.func);
diffs.1 = 1;
((key, ()), diffs)
}
})
};
to_aggregate.push(collection);
}
// now concatenate, if necessary, multiple aggregations
let collection = if to_aggregate.len() == 1 {
to_aggregate.remove(0)
} else {
differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate)
};
// reduce is done, convert accumulators to values
let err_full_aggrs = full_aggrs.clone();
let (arranged_output, arranged_errs) = collection
.arrange_named::<RowKeySpine<_, _, (Vec<Accum>, Diff)>>("ArrangeAccumulable")
.reduce_pair::<_, RowSpine<_, _, _, _>, _, ErrValSpine<_, _, _>>(
"ReduceAccumulable",
"AccumulableErrorCheck",
{
let mut row_buf = Row::default();
move |_key: &Row,
input: &[(&(), (Vec<Accum>, Diff))],
output: &mut Vec<(Row, i64)>| {
let (ref accums, total) = input[0].1;
let mut row_packer = row_buf.packer();
for (aggr, accum) in full_aggrs.iter().zip(accums) {
// The finished value depends on the aggregation function in a variety of ways.
// For all aggregates but count, if only null values were
// accumulated, then the output is null.
let value = if total > 0
&& accum.is_zero()
&& aggr.func != AggregateFunc::Count
{
Value::Null
} else {
accum.accum_to_value(&aggr.func, total)
};
row_packer.push(value);
}
output.push((row_buf.clone(), 1));
}
},
move |key, input, output| {
let (ref accums, total) = input[0].1;
for (aggr, accum) in err_full_aggrs.iter().zip(accums) {
// We first test here if inputs without net-positive records are present,
// producing an error to the logs and to the query output if that is the case.
if total == 0 && !accum.is_zero() {
logging::error!(
"Net-zero records with non-zero accumulation in ReduceAccumulable: aggr={:?}, accum={:?}", aggr, &accum
);
let message = format!(
"Invalid data in source, saw net-zero records for key {key:?} \
with non-zero accumulation in accumulable aggregate"
);
output.push((EvalError::Internal(message).into(), 1));
}
match (&aggr.func, &accum) {
(AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
| (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. })
| (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
if accum.is_negative() {
logging::error!(
"Invalid negative unsigned aggregation in ReduceAccumulable aggr={aggr:?}, accum={accum:?}",
);
let message = format!(
"Invalid data in source, saw negative accumulation with \
unsigned type for key {key:?}"
);
output.push((EvalError::Internal(message).into(), 1));
}
}
_ => (), // no more errors to check for at this point!
}
}
},
);
(
arranged_output,
arranged_errs.as_collection(|_key, error| error.clone()),
)
}
}
/// Accumulates values for the various types of accumulable aggregations.
///
/// We assume that there are not more than 2^32 elements for the aggregation.
/// Thus we can perform a summation over i32 in an i64 accumulator
/// and not worry about exceeding its bounds.
///
/// The float accumulator performs accumulation in fixed point arithmetic. The fixed
/// point representation has less precision than a double. It is entirely possible
/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
/// to preserve group guarantees.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
enum Accum {
/// Accumulates boolean values.
Bool {
/// The number of `true` values observed.
trues: Diff,
/// The number of `false` values observed.
falses: Diff,
},
/// Accumulates simple numeric values.
SimpleNumber {
/// The accumulation of all non-NULL values observed.
accum: i128,
/// The number of non-NULL values observed.
non_nulls: Diff,
},
/// Accumulates float values.
Float {
/// Accumulates non-special float values, mapped to a fixed precision i128 domain to
/// preserve associativity and commutativity
accum: i128,
/// Counts +inf
pos_infs: Diff,
/// Counts -inf
neg_infs: Diff,
/// Counts NaNs
nans: Diff,
/// Counts non-NULL values
non_nulls: Diff,
},
}
impl Accum {
/// For storing floating number in fixed point representation, we need to scale
const FLOAT_SCALE: f64 = 16777216.0;
/// Initialize a accumulator from a datum.
fn value_to_accumulator(datum: Value, aggr: &AggregateFunc) -> Self {
match aggr {
AggregateFunc::Count => Accum::SimpleNumber {
accum: 0, // unused for AggregateFunc::Count
non_nulls: if datum.is_null() { 0 } else { 1 },
},
AggregateFunc::Any | AggregateFunc::All => match datum {
Value::Boolean(true) => Accum::Bool {
trues: 1,
falses: 0,
},
Value::Null => Accum::Bool {
trues: 0,
falses: 0,
},
Value::Boolean(false) => Accum::Bool {
trues: 0,
falses: 1,
},
x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"),
},
AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
let n = match datum {
Value::Float32(n) => f64::from(*n),
Value::Float64(n) => *n,
Value::Null => 0f64,
x => panic!("Invalid argument to AggregateFunc::{aggr:?}: {x:?}"),
};
let nans = Diff::from(n.is_nan());
let pos_infs = Diff::from(n == f64::INFINITY);
let neg_infs = Diff::from(n == f64::NEG_INFINITY);
let non_nulls = Diff::from(datum != Value::Null);
// Map the floating point value onto a fixed precision domain
// All special values should map to zero, since they are tracked separately
let accum = if nans > 0 || pos_infs > 0 || neg_infs > 0 {
0
} else {
// This operation will truncate to i128::MAX if out of range.
// TODO(benesch): rewrite to avoid `as`.
#[allow(clippy::as_conversions)]
{
(n * Self::FLOAT_SCALE) as i128
}
};
Accum::Float {
accum,
pos_infs,
neg_infs,
nans,
non_nulls,
}
}
_ => {
// Other accumulations need to disentangle the accumulable
// value from its NULL-ness, which is not quite as easily
// accumulated.
match datum {
Value::Int16(i) => Accum::SimpleNumber {
accum: i128::from(i),
non_nulls: 1,
},
Value::Int32(i) => Accum::SimpleNumber {
accum: i128::from(i),
non_nulls: 1,
},
Value::Int64(i) => Accum::SimpleNumber {
accum: i128::from(i),
non_nulls: 1,
},
Value::UInt16(u) => Accum::SimpleNumber {
accum: i128::from(u),
non_nulls: 1,
},
Value::UInt32(u) => Accum::SimpleNumber {
accum: i128::from(u),
non_nulls: 1,
},
Value::UInt64(u) => Accum::SimpleNumber {
accum: i128::from(u),
non_nulls: 1,
},
Value::Timestamp(t) => Accum::SimpleNumber {
accum: i128::from(t.value()),
non_nulls: 1,
},
Value::Null => Accum::SimpleNumber {
accum: 0,
non_nulls: 0,
},
x => panic!("Accumulating non-integer or unsupported data: {x:?}"),
}
}
}
}
fn accum_to_value(&self, func: &AggregateFunc, total: i64) -> Value {
match (func, &self) {
(AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => {
Value::Int64(*non_nulls)
}
(AggregateFunc::All, Accum::Bool { falses, trues }) => {
// If any false, else if all true, else must be no false and some nulls.
if *falses > 0 {
Value::Boolean(false)
} else if *trues == total {
Value::Boolean(true)
} else {
Value::Null
}
}
(AggregateFunc::Any, Accum::Bool { falses, trues }) => {
// If any true, else if all false, else must be no true and some nulls.
if *trues > 0 {
Value::Boolean(true)
} else if *falses == total {
Value::Boolean(false)
} else {
Value::Null
}
}
(AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. })
| (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => {
// This conversion is safe, as long as we have less than 2^32
// summands.
// TODO(benesch): are we guaranteed to have less than 2^32 summands?
// If so, rewrite to avoid `as`.
#[allow(clippy::as_conversions)]
Value::Int64(*accum as i64)
}
(AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => {
Value::from(*accum as i64)
}
(AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. })
| (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => {
if !accum.is_negative() {
// Our semantics of overflow are not clearly articulated.
// We adopt an unsigned
// wrapping behavior to match what we do above for signed types.
// TODO: remove potentially dangerous usage of `as`.
#[allow(clippy::as_conversions)]
Value::UInt64(*accum as u64)
} else {
// Note that we return a value here, but an error in the other
// operator of the reduce_pair. Therefore, we expect that this
// value will never be exposed as an output.
Value::Null
}
}
(AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => {
if !accum.is_negative() {
Value::UInt64(*accum as u64)
} else {
// Note that we return a value here, but an error in the other
// operator of the reduce_pair. Therefore, we expect that this
// value will never be exposed as an output.
Value::Null
}
}
(
AggregateFunc::SumFloat32,
Accum::Float {
accum,
pos_infs,
neg_infs,
nans,
non_nulls: _,
},
) => {
if *nans > 0 || (*pos_infs > 0 && *neg_infs > 0) {
// NaNs are NaNs and cases where we've seen a
// mixture of positive and negative infinities.
Value::from(f32::NAN)
} else if *pos_infs > 0 {
Value::from(f32::INFINITY)
} else if *neg_infs > 0 {
Value::from(f32::NEG_INFINITY)
} else {
// TODO: remove potentially dangerous usage of `as`.
#[allow(clippy::as_conversions)]
{
Value::from(((*accum as f64) / Self::FLOAT_SCALE) as f32)
}
}
}
(
AggregateFunc::SumFloat64,
Accum::Float {
accum,
pos_infs,
neg_infs,
nans,
non_nulls: _,
},
) => {
if *nans > 0 || (*pos_infs > 0 && *neg_infs > 0) {
// NaNs are NaNs and cases where we've seen a
// mixture of positive and negative infinities.
Value::from(f64::NAN)
} else if *pos_infs > 0 {
Value::from(f64::INFINITY)
} else if *neg_infs > 0 {
Value::from(f64::NEG_INFINITY)
} else {
// TODO(benesch): remove potentially dangerous usage of `as`.
#[allow(clippy::as_conversions)]
{
Value::from((*accum as f64) / Self::FLOAT_SCALE)
}
}
}
_ => panic!(
"Unexpected accumulation (aggr={:?}, accum={:?})",
func, &self
),
}
}
}
impl Semigroup for Accum {
fn is_zero(&self) -> bool {
match self {
Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(),
Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(),
Accum::Float {
accum,
pos_infs,
neg_infs,
nans,
non_nulls,
} => {
accum.is_zero()
&& pos_infs.is_zero()
&& neg_infs.is_zero()
&& nans.is_zero()
&& non_nulls.is_zero()
}
}
}
fn plus_equals(&mut self, other: &Accum) {
match (&mut *self, other) {
(
Accum::Bool { trues, falses },
Accum::Bool {
trues: other_trues,
falses: other_falses,
},
) => {
*trues += other_trues;
*falses += other_falses;
}
(
Accum::SimpleNumber { accum, non_nulls },
Accum::SimpleNumber {
accum: other_accum,
non_nulls: other_non_nulls,
},
) => {
*accum += other_accum;
*non_nulls += other_non_nulls;
}
(
Accum::Float {
accum,
pos_infs,
neg_infs,
nans,
non_nulls,
},
Accum::Float {
accum: other_accum,
pos_infs: other_pos_infs,
neg_infs: other_neg_infs,
nans: other_nans,
non_nulls: other_non_nulls,
},
) => {
*accum = accum.checked_add(*other_accum).unwrap_or_else(|| {
logging::warn!("Float accumulator overflow. Incorrect results possible");
accum.wrapping_add(*other_accum)
});
*pos_infs += other_pos_infs;
*neg_infs += other_neg_infs;
*nans += other_nans;
*non_nulls += other_non_nulls;
}
(l, r) => unreachable!(
"Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}"
),
}
}
}
impl Multiply<Diff> for Accum {
type Output = Accum;
fn multiply(self, factor: &Diff) -> Accum {
let factor = *factor;
match self {
Accum::Bool { trues, falses } => Accum::Bool {
trues: trues * factor,
falses: falses * factor,
},
Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber {
accum: accum * i128::from(factor),
non_nulls: non_nulls * factor,
},
Accum::Float {
accum,
pos_infs,
neg_infs,
nans,
non_nulls,
} => Accum::Float {
accum: accum.checked_mul(i128::from(factor)).unwrap_or_else(|| {
logging::warn!("Float accumulator overflow. Incorrect results possible");
accum.wrapping_mul(i128::from(factor))
}),
pos_infs: pos_infs * factor,
neg_infs: neg_infs * factor,
nans: nans * factor,
non_nulls: non_nulls * factor,
},
}
}
}

View File

@@ -1,7 +1,9 @@
#![allow(unused)]
#![allow(clippy::mutable_key_type)]
mod adapter;
mod compute;
mod expr;
mod repr;
mod storage;
mod util;

View File

@@ -1,8 +1,10 @@
//! basically a wrapper around the `datatype` crate
//! for basic Data Representation
use std::borrow::Borrow;
use std::slice::SliceIndex;
use datatypes::value::Value;
use serde::{Deserialize, Serialize};
/// System-wide Record count difference type.
pub type Diff = i64;
@@ -12,15 +14,22 @@ pub type Diff = i64;
/// TODO(discord9): use a more efficient representation
///i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Default)]
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
pub struct Row {
inner: Vec<Value>,
}
impl Row {
pub fn get(&self, idx: usize) -> Option<&Value> {
self.inner.get(idx)
}
pub fn clear(&mut self) {
self.inner.clear();
}
pub fn packer(&mut self) -> &mut Vec<Value> {
self.inner.clear();
&mut self.inner
}
pub fn pack<I>(iter: I) -> Row
where
I: IntoIterator<Item = Value>,
@@ -41,6 +50,9 @@ impl Row {
pub fn into_iter(self) -> impl Iterator<Item = Value> {
self.inner.into_iter()
}
pub fn iter(&self) -> impl Iterator<Item = &Value> {
self.inner.iter()
}
}
/// System-wide default timestamp type

View File

@@ -1,13 +1,14 @@
use serde::{Deserialize, Serialize};
// TODO(discord9): more error types
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum DataflowError {
EvalError(EvalError),
EvalError(Box<EvalError>),
}
impl From<EvalError> for DataflowError {
fn from(e: EvalError) -> Self {
DataflowError::EvalError(e)
DataflowError::EvalError(Box::new(e))
}
}
@@ -15,4 +16,12 @@ impl From<EvalError> for DataflowError {
pub enum EvalError {
DivisionByZero,
InvalidArgument(String),
Internal(String),
}
#[test]
fn tell_goal() {
use differential_dataflow::ExchangeData;
fn a<T: ExchangeData>(_: T) {}
a(DataflowError::from(EvalError::DivisionByZero));
}

150
src/flow/src/util/buffer.rs Normal file
View File

@@ -0,0 +1,150 @@
use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::Data;
use timely::communication::Push;
use timely::dataflow::channels::Bundle;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::operators::{Capability, InputCapability};
use timely::progress::Timestamp;
/// A buffer that consolidates updates
///
/// The buffer implements a wrapper around [OutputHandle] consolidating elements pushed to it. It is
/// backed by a capacity-limited buffer, which means that compaction only occurs within the
/// dimensions of the buffer, i.e. the number of unique keys is less than half of the buffer's
/// capacity.
///
/// A cap is retained whenever the current time changes to be able to flush on drop or when the time
/// changes again.
///
/// The buffer is filled with updates until it reaches its capacity. At this point, the updates are
/// consolidated to free up space. This process repeats until the consolidation recovered less than
/// half of the buffer's capacity, at which point the buffer will be shipped.
///
/// The buffer retains a capability to send data on flush. It will flush all data once dropped, if
/// time changes, or if the buffer capacity is reached.
pub struct ConsolidateBuffer<'a, 'b, T, D: Data, R: Semigroup, P>
where
P: Push<Bundle<T, (D, T, R)>> + 'a,
T: Data + Timestamp + 'a,
D: 'a,
{
// a buffer for records, to send at self.cap
// Invariant: Buffer only contains data if cap is Some.
buffer: Vec<(D, T, R)>,
output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>,
cap: Option<Capability<T>>,
port: usize,
previous_len: usize,
}
impl<'a, 'b, T, D: Data, R: Semigroup, P> ConsolidateBuffer<'a, 'b, T, D, R, P>
where
T: Data + Timestamp + 'a,
P: Push<Bundle<T, (D, T, R)>> + 'a,
{
/// Create a new [ConsolidateBuffer], wrapping the provided session.
///
/// * `output_handle`: The output to send data to.
/// * 'port': The output port to retain capabilities for.
pub fn new(output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>, port: usize) -> Self {
Self {
output_handle,
port,
cap: None,
buffer: Vec::with_capacity(::timely::container::buffer::default_capacity::<(D, T, R)>()),
previous_len: 0,
}
}
#[inline]
/// Provides an iterator of elements to the buffer
pub fn give_iterator<I: Iterator<Item = (D, T, R)>>(
&mut self,
cap: &InputCapability<T>,
iter: I,
) {
for item in iter {
self.give(cap, item);
}
}
/// Give an element to the buffer
pub fn give(&mut self, cap: &InputCapability<T>, data: (D, T, R)) {
// Retain a cap for the current time, which will be used on flush.
if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
// Flush on capability change
self.flush();
// Retain capability for the specified output port.
self.cap = Some(cap.delayed_for_output(cap.time(), self.port));
}
self.give_internal(data);
}
/// Give an element to the buffer, using a pre-fabricated capability. Note that the capability
/// must be valid for the associated output.
pub fn give_at(&mut self, cap: &Capability<T>, data: (D, T, R)) {
// Retain a cap for the current time, which will be used on flush.
if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
// Flush on capability change
self.flush();
// Retain capability.
self.cap = Some(cap.clone());
}
self.give_internal(data);
}
/// Give an element and possibly flush the buffer. Note that this needs to have access
/// to a capability, which the public functions ensure.
fn give_internal(&mut self, data: (D, T, R)) {
self.buffer.push(data);
// Limit, if possible, the lifetime of the allocations for data
// and consolidate smaller buffers if we're in the lucky case
// of a small domain for D
if self.buffer.len() >= 2 * self.previous_len {
// Consolidate while the consolidation frees at least half the buffer
consolidate_updates(&mut self.buffer);
if self.buffer.len() > self.buffer.capacity() / 2 {
self.flush();
} else {
self.previous_len = self.buffer.len();
}
// At this point, it is an invariant across give calls that self.previous_len
// will be in the interval [0, self.buffer.capacity() / 2]. So, we will enter
// this if-statement block again when self.buffer.len() == self.buffer.capacity()
// or earlier. If consolidation is not effective to keep self.buffer.len()
// below half capacity, then flushing when more than half-full will
// maintain the invariant.
}
}
/// Flush the internal buffer to the underlying session
pub fn flush(&mut self) {
if let Some(cap) = &self.cap {
self.output_handle.session(cap).give_vec(&mut self.buffer);
// Ensure that the capacity is at least equal to the default in case
// it was reduced by give_vec. Note that we cannot rely here on give_vec
// returning us a buffer with zero capacity.
if self.buffer.capacity() < ::timely::container::buffer::default_capacity::<(D, T, R)>()
{
let to_reserve = ::timely::container::buffer::default_capacity::<(D, T, R)>()
- self.buffer.capacity();
self.buffer.reserve_exact(to_reserve);
}
self.previous_len = 0;
}
}
}
impl<'a, 'b, T, D: Data, R: Semigroup, P> Drop for ConsolidateBuffer<'a, 'b, T, D, R, P>
where
P: Push<Bundle<T, (D, T, R)>> + 'a,
T: Data + Timestamp + 'a,
D: 'a,
{
fn drop(&mut self) {
self.flush();
}
}

View File

@@ -1,5 +1,7 @@
//! utilitys including extend differential dataflow to deal with errors and etc.
mod buffer;
mod operator;
mod reduce;
pub use operator::CollectionExt;
pub use reduce::ReduceExt;

View File

@@ -12,6 +12,8 @@ use timely::dataflow::operators::Capability;
use timely::dataflow::{Scope, Stream};
use timely::{Data, ExchangeData};
use crate::util::buffer::ConsolidateBuffer;
pub trait StreamExt<G, D1>
where
D1: Data,
@@ -100,6 +102,18 @@ where
E: Data,
I: IntoIterator<Item = Result<D2, E>>,
L: FnMut(D1) -> I + 'static;
/// Replaces each record with another, with a new difference type.
///
/// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
/// and move the data into the difference component. This will allow differential dataflow to update in-place.
fn explode_one<D2, R2, L>(&self, logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
where
D2: differential_dataflow::Data,
R2: Semigroup + Multiply<R>,
<R2 as Multiply<R>>::Output: Data + Semigroup,
L: FnMut(D1) -> (D2, R2) + 'static,
G::Timestamp: Lattice;
}
impl<G, D1> StreamExt<G, D1> for Stream<G, D1>
@@ -212,4 +226,32 @@ where
});
(ok_stream.as_collection(), err_stream.as_collection())
}
fn explode_one<D2, R2, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
where
D2: differential_dataflow::Data,
R2: Semigroup + Multiply<R>,
<R2 as Multiply<R>>::Output: Data + Semigroup,
L: FnMut(D1) -> (D2, R2) + 'static,
G::Timestamp: Lattice,
{
self.inner
.unary(Pipeline, "ExplodeOne", move |_, _| {
let mut buffer = Vec::new();
move |input, output| {
let mut out = ConsolidateBuffer::new(output, 0);
input.for_each(|time, data| {
data.swap(&mut buffer);
out.give_iterator(
&time,
buffer.drain(..).map(|(x, t, d)| {
let (x, d2) = logic(x);
(x, t, d2.multiply(&d))
}),
);
});
}
})
.as_collection()
}
}