diff --git a/Cargo.lock b/Cargo.lock index cf69b45ddd..111288d3bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2860,7 +2860,7 @@ checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" [[package]] name = "differential-dataflow" version = "0.12.0" -source = "git+https://github.com/TimelyDataflow/differential-dataflow?rev=99fa67db#99fa67db2b92d2ee938c6ffef0912908de3ef288" +source = "git+https://github.com/TimelyDataflow/differential-dataflow#2b9ac68aab9a1bf3fc3e4c12fcabea9c9d1ecc6a" dependencies = [ "abomonation", "abomonation_derive", @@ -3292,6 +3292,7 @@ dependencies = [ name = "flow" version = "0.1.0" dependencies = [ + "common-telemetry", "datafusion-expr", "datafusion-substrait", "datatypes", @@ -10261,6 +10262,7 @@ source = "git+https://github.com/TimelyDataflow/timely-dataflow#b990faba8ea59ec2 dependencies = [ "abomonation", "abomonation_derive", + "bincode", "crossbeam-channel", "getopts", "serde", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 0b09e02450..514228f4e0 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -13,9 +13,13 @@ edition = "2021" # timely = "0.12.0" # differential-dataflow = "0.12.0" # TODO(discord9): fork later for fixed version git dependency -timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } -differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow", rev ="99fa67db"} +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = [ + "bincode", +] } +differential-dataflow = { git = "https://github.com/TimelyDataflow/differential-dataflow" } #, rev = "99fa67db" } datafusion-expr.workspace = true datafusion-substrait.workspace = true serde = { version = "1.0", features = ["derive"] } datatypes = { path = "../datatypes" } + +common-telemetry = { path = "../common/telemetry" } diff --git a/src/flow/src/compute/context.rs b/src/flow/src/compute/context.rs index 18def84485..a218339f3f 100644 --- a/src/flow/src/compute/context.rs +++ b/src/flow/src/compute/context.rs @@ -281,6 +281,10 @@ where type ResultCollection = (Collection, Collection); +/// A bundle of the various ways a collection can be represented. +/// +/// This type maintains the invariant that it does contain at least one valid +/// source of data, either a collection or at least one arrangement. #[derive(Clone)] pub struct CollectionBundle where @@ -323,6 +327,18 @@ where } } + /// Inserts arrangements by the columns on which they are keyed. + pub fn from_columns>( + columns: I, + arrangements: ArrangementFlavor, + ) -> Self { + let mut keys = Vec::new(); + for column in columns { + keys.push(ScalarExpr::Column(column)); + } + Self::from_expressions(keys, arrangements) + } + /// The scope containing the collection bundle. pub fn scope(&self) -> S { if let Some((oks, _errs)) = &self.collection { @@ -630,6 +646,13 @@ where batch: C::Storage, } +/// Handle specialized to `Vec`-based container. +type PendingOutputHandle<'a, C, I> = OutputHandle< + 'a, + ::Time, + ::Item, + timely::dataflow::channels::pushers::Tee<::Time, ::Item>, +>; impl PendingWork where C::Key: PartialEq, @@ -649,12 +672,7 @@ where key: &Option, logic: &mut L, fuel: &mut usize, - output: &mut OutputHandle< - '_, - C::Time, - I::Item, - timely::dataflow::channels::pushers::Tee, - >, + output: &mut PendingOutputHandle<'_, C, I>, ) where I: IntoIterator, I::Item: Data, diff --git a/src/flow/src/compute/plan/mod.rs b/src/flow/src/compute/plan/mod.rs index 3c0778e8cd..cf6f914181 100644 --- a/src/flow/src/compute/plan/mod.rs +++ b/src/flow/src/compute/plan/mod.rs @@ -4,7 +4,7 @@ mod reduce; use std::collections::BTreeMap; use join::JoinPlan; -pub(crate) use reduce::{convert_indexes_to_skips, KeyValPlan, ReducePlan}; +pub(crate) use reduce::{convert_indexes_to_skips, AccumulablePlan, KeyValPlan, ReducePlan}; use serde::{Deserialize, Serialize}; use crate::expr::{Id, LocalId, MapFilterProject, ScalarExpr, TableFunc}; diff --git a/src/flow/src/compute/plan/reduce.rs b/src/flow/src/compute/plan/reduce.rs index 3703120e7c..82fbb6d498 100644 --- a/src/flow/src/compute/plan/reduce.rs +++ b/src/flow/src/compute/plan/reduce.rs @@ -111,8 +111,63 @@ pub struct AccumulablePlan { // TODO(discord9): others +/// Plan for computing a set of hierarchical aggregations. +/// +/// In the append-only setting we can render them in-place +/// with monotonic plans, but otherwise, we need to render +/// them with a reduction tree that splits the inputs into +/// small, and then progressively larger, buckets #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub enum HierarchicalPlan {} +pub enum HierarchicalPlan { + /// Plan hierarchical aggregations under monotonic inputs. + Monotonic(MonotonicPlan), + /// Plan for hierarchical aggregations under non-monotonic inputs. + Bucketed(BucketedPlan), +} + +/// Plan for computing a set of hierarchical aggregations with a +/// monotonic input. +/// +/// Here, the aggregations will be rendered in place. We don't +/// need to worry about retractions because the inputs are +/// append only, so we can change our computation to +/// only retain the "best" value in the diff field, instead +/// of holding onto all values. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct MonotonicPlan { + /// All of the aggregations we were asked to compute. + pub aggr_funcs: Vec, + /// Set of "skips" or calls to `nth()` an iterator needs to do over + /// the input to extract the relevant datums. + pub skips: Vec, + /// True if the input is logically but not physically monotonic, + /// and the operator must first consolidate the inputs to remove + /// potential negations. + pub must_consolidate: bool, +} + +/// Plan for computing a set of hierarchical aggregations +/// with non-monotonic inputs. +/// +/// To perform hierarchical aggregations with stable runtimes +/// under updates we'll subdivide the group key into buckets, compute +/// the reduction in each of those subdivided buckets and then combine +/// the results into a coarser bucket (one that represents a larger +/// fraction of the original input) and redo the reduction in another +/// layer. Effectively, we'll construct a min / max heap out of a series +/// of reduce operators (each one is a separate layer). +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct BucketedPlan { + /// All of the aggregations we were asked to compute. + pub aggr_funcs: Vec, + /// Set of "skips" or calls to `nth()` an iterator needs to do over + /// the input to extract the relevant datums. + pub skips: Vec, + /// The number of buckets in each layer of the reduction tree. Should + /// be decreasing, and ideally, a power of two so that we can easily + /// distribute values to buckets with `value.hashed() % buckets[layer]`. + pub buckets: Vec, +} #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum BasicPlan {} diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index ab9ad943f4..3878a11506 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -1,18 +1,26 @@ use std::collections::BTreeMap; +use common_telemetry::logging; +use datatypes::value::Value; use differential_dataflow::collection::AsCollection; +use differential_dataflow::difference::{Multiply, Semigroup}; use differential_dataflow::lattice::Lattice; +use differential_dataflow::operators::arrange::{Arrange, Arranged}; +use differential_dataflow::operators::reduce::ReduceCore; use differential_dataflow::Collection; -use timely::dataflow::Scope; +use serde::{Deserialize, Serialize}; +use timely::dataflow::{Scope, ScopeParent}; use timely::progress::timestamp::Refines; use timely::progress::Timestamp; -use crate::compute::context::CollectionBundle; -use crate::compute::plan::{convert_indexes_to_skips, KeyValPlan, ReducePlan}; +use crate::compute::context::{Arrangement, ArrangementFlavor, CollectionBundle}; +use crate::compute::plan::{convert_indexes_to_skips, AccumulablePlan, KeyValPlan, ReducePlan}; +use crate::compute::typedefs::{ErrValSpine, RowKeySpine, RowSpine}; use crate::compute::Context; -use crate::expr::ScalarExpr; +use crate::expr::{AggregateFunc, ScalarExpr}; use crate::repr::{Diff, Row}; -use crate::storage::errors::DataflowError; +use crate::storage::errors::{DataflowError, EvalError}; +use crate::util::{CollectionExt, ReduceExt}; impl Context where @@ -22,6 +30,7 @@ where { /// Renders a `Plan::Reduce` using various non-obvious techniques to /// minimize worst-case incremental update times and memory footprint. + #[allow(clippy::type_complexity)] pub fn render_reduce( &mut self, input: CollectionBundle, @@ -71,11 +80,7 @@ where // Evaluate the key expressions. let key = match key_plan.evaluate_into(&mut datums_local, &mut row_mfp) { Err(e) => { - return Some(( - Err(DataflowError::from(e)), - time.clone(), - diff.clone(), - )) + return Some((Err(DataflowError::from(e)), time.clone(), *diff)) } Ok(key) => key.expect("Row expected as no predicate was used"), }; @@ -84,34 +89,29 @@ where datums_local.truncate(skips.len()); let val = match val_plan.evaluate_iter(&mut datums_local) { Err(e) => { - return Some(( - Err(DataflowError::from(e)), - time.clone(), - diff.clone(), - )) + return Some((Err(DataflowError::from(e)), time.clone(), *diff)) } Ok(val) => val.expect("Row expected as no predicate was used"), }; row_buf.clear(); row_buf.extend(val); let row = row_buf.clone(); - Some((Ok((key, row)), time.clone(), diff.clone())) + Some((Ok((key, row)), time.clone(), *diff)) } }); - /* - // TODO(discord9): deal with errors by adding utility methods to split dataflow + + // TODO(discord9): find out how to do `consolidate_stream` without Abonomation // Demux out the potential errors from key and value selector evaluation. - let (ok) = key_val_input - .as_collection(); + let (ok, mut err) = key_val_input + .as_collection() // .consolidate_stream() - // .flat_map_fallible("OkErrDemux", Some); - let err = err_input; - // err = err.concat(&err_input); + .flat_map_fallible("OkErrDemux", Some); + + err = err.concat(&err_input); // Render the reduce plan self.render_reduce_plan(reduce_plan, ok, err, key_arity) - .leave_region()*/ - todo!() + .leave_region() }) } @@ -130,6 +130,666 @@ where where S: Scope, { + let mut errors = Vec::default(); + let arrangement = self.render_reduce_plan_inner(plan, collection, &mut errors, key_arity); + CollectionBundle::from_columns( + 0..key_arity, + ArrangementFlavor::Local( + arrangement, + err_input + .concatenate(errors) + .arrange_named("Arrange bundle err"), + ), + ) + } + + fn render_reduce_plan_inner( + &self, + plan: ReducePlan, + collection: Collection, + errors: &mut Vec>, + key_arity: usize, + ) -> Arrangement + where + S: Scope, + { + let arrangement: Arrangement = match plan { + // If we have no aggregations or just a single type of reduction, we + // can go ahead and render them directly. + ReducePlan::Distinct => { + let (arranged_output, errs) = self.build_distinct(collection); + errors.push(errs); + arranged_output + } + ReducePlan::Accumulable(expr) => { + let (arranged_output, errs) = self.build_accumulable(collection, expr); + errors.push(errs); + arranged_output + } + + // TODO(discord9): impl Distinct&Accumulate first + _ => todo!(), + }; todo!() } + + /// Build the dataflow to compute the set of distinct keys. + fn build_distinct( + &self, + collection: Collection, + ) -> (Arrangement, Collection) + where + S: Scope, + { + let (output, errors) = collection + .arrange_named::>("Arranged DistinctBy") + .reduce_pair::<_, RowSpine<_, _, _, _>, _, ErrValSpine<_, _, _>>( + "DistinctBy", + "DistinctByErrorCheck", + |_key: &Row, _input: &[(&Row, i64)], output: &mut Vec<(Row, i64)>| { + // We're pushing an empty row here because the key is implicitly added by the + // arrangement, and the permutation logic takes care of using the key part of the + // output. + output.push((Row::default(), 1)); + }, + move |key, input: &[(_, Diff)], output| { + for (_, count) in input.iter() { + if count.is_positive() { + continue; + } + let message = "Non-positive multiplicity in DistinctBy"; + output.push((EvalError::Internal(message.to_string()).into(), 1)); + return; + } + }, + ); + ( + output, + errors.as_collection(|_k: &Row, v: &DataflowError| v.clone()), + ) + } + + /// Build the dataflow to compute and arrange multiple accumulable aggregations. + /// + /// The incoming values are moved to the update's "difference" field, at which point + /// they can be accumulated in place. The `count` operator promotes the accumulated + /// values to data, at which point a final map applies operator-specific logic to + /// yield the final aggregate. + fn build_accumulable( + &self, + collection: Collection, + AccumulablePlan { + full_aggrs, + simple_aggrs, + distinct_aggrs, + }: AccumulablePlan, + ) -> (Arrangement, Collection) + where + S: Scope, + { + // we must have called this function with something to reduce + if full_aggrs.is_empty() || simple_aggrs.len() + distinct_aggrs.len() != full_aggrs.len() { + panic!( + "Incorrect numbers of aggregates in accummulable reduction rendering: {}", + &format!( + "full_aggrs={}, simple_aggrs={}, distinct_aggrs={}", + full_aggrs.len(), + simple_aggrs.len(), + distinct_aggrs.len(), + ), + ); + } + + // Some of the aggregations may have the `distinct` bit set, which means that they'll + // need to be extracted from `collection` and be subjected to `distinct` with `key`. + // Other aggregations can be directly moved in to the `diff` field. + // + // In each case, the resulting collection should have `data` shaped as `(key, ())` + // and a `diff` that is a vector with length `3 * aggrs.len()`. The three values are + // generally the count, and then two aggregation-specific values. The size could be + // reduced if we want to specialize for the aggregations. + + let float_scale = f64::from(1 << 24); + + // Instantiate a default vector for diffs with the correct types at each + // position. + let zero_diffs: (Vec<_>, Diff) = ( + full_aggrs + .iter() + .map(|f| match f.func { + AggregateFunc::Any | AggregateFunc::All => Accum::Bool { + trues: 0, + falses: 0, + }, + AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => Accum::Float { + accum: 0, + pos_infs: 0, + neg_infs: 0, + nans: 0, + non_nulls: 0, + }, + _ => Accum::SimpleNumber { + accum: 0, + non_nulls: 0, + }, + }) + .collect(), + 0, + ); + + let mut to_aggregate = Vec::new(); + if !simple_aggrs.is_empty() { + // First, collect all non-distinct aggregations in one pass. + let easy_cases = collection.explode_one({ + let zero_diffs = zero_diffs.clone(); + move |(key, row)| { + let mut diffs = zero_diffs.clone(); + + for (accumulable_index, datum_index, aggr) in simple_aggrs.iter() { + // Try to unpack only the datums we need + let datum = row.get(*datum_index).unwrap().clone(); + diffs.0[*accumulable_index] = + Accum::value_to_accumulator(datum.clone(), &aggr.func); + diffs.1 = 1; + } + ((key, ()), diffs) + } + }); + to_aggregate.push(easy_cases); + } + + // Next, collect all aggregations that require distinctness. + for (accumulable_index, datum_index, aggr) in distinct_aggrs.into_iter() { + let mut row_buf = Row::default(); + let collection = { + let arranged: Arranged = collection + .map(move |(key, row)| { + let value = row.get(datum_index).unwrap(); + row_buf.packer().push(value.clone()); + (key, row_buf.clone()) + }) + .map(|k| (k, ())) + .arrange_named::::Timestamp, Diff>>( + "Arranged Accumulable", + ); + // note `arranged` for convenient of type-infer with r-a + // first distinct, then reduce + arranged + .reduce_abelian::<_, RowKeySpine<_, _, _>>( + "Reduced Accumulable", + move |_k, _s, t: &mut Vec<((), i64)>| t.push(((), 1)), + ) + .as_collection(|k, _| k.clone()) + .explode_one({ + let zero_diffs = zero_diffs.clone(); + move |(key, row)| { + let datum = row.iter().next().unwrap(); + let mut diffs = zero_diffs.clone(); + diffs.0[accumulable_index] = + Accum::value_to_accumulator(datum.clone(), &aggr.func); + diffs.1 = 1; + ((key, ()), diffs) + } + }) + }; + to_aggregate.push(collection); + } + + // now concatenate, if necessary, multiple aggregations + let collection = if to_aggregate.len() == 1 { + to_aggregate.remove(0) + } else { + differential_dataflow::collection::concatenate(&mut collection.scope(), to_aggregate) + }; + + // reduce is done, convert accumulators to values + + let err_full_aggrs = full_aggrs.clone(); + let (arranged_output, arranged_errs) = collection + .arrange_named::, Diff)>>("ArrangeAccumulable") + .reduce_pair::<_, RowSpine<_, _, _, _>, _, ErrValSpine<_, _, _>>( + "ReduceAccumulable", + "AccumulableErrorCheck", + { + let mut row_buf = Row::default(); + move |_key: &Row, + input: &[(&(), (Vec, Diff))], + output: &mut Vec<(Row, i64)>| { + let (ref accums, total) = input[0].1; + let mut row_packer = row_buf.packer(); + + for (aggr, accum) in full_aggrs.iter().zip(accums) { + // The finished value depends on the aggregation function in a variety of ways. + // For all aggregates but count, if only null values were + // accumulated, then the output is null. + let value = if total > 0 + && accum.is_zero() + && aggr.func != AggregateFunc::Count + { + Value::Null + } else { + accum.accum_to_value(&aggr.func, total) + }; + + row_packer.push(value); + } + output.push((row_buf.clone(), 1)); + } + }, + move |key, input, output| { + let (ref accums, total) = input[0].1; + for (aggr, accum) in err_full_aggrs.iter().zip(accums) { + // We first test here if inputs without net-positive records are present, + // producing an error to the logs and to the query output if that is the case. + if total == 0 && !accum.is_zero() { + logging::error!( + "Net-zero records with non-zero accumulation in ReduceAccumulable: aggr={:?}, accum={:?}", aggr, &accum + ); + let message = format!( + "Invalid data in source, saw net-zero records for key {key:?} \ + with non-zero accumulation in accumulable aggregate" + ); + output.push((EvalError::Internal(message).into(), 1)); + } + match (&aggr.func, &accum) { + (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. }) + | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) + | (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => { + if accum.is_negative() { + logging::error!( + "Invalid negative unsigned aggregation in ReduceAccumulable aggr={aggr:?}, accum={accum:?}", + ); + let message = format!( + "Invalid data in source, saw negative accumulation with \ + unsigned type for key {key:?}" + ); + output.push((EvalError::Internal(message).into(), 1)); + } + } + _ => (), // no more errors to check for at this point! + } + } + }, + ); + ( + arranged_output, + arranged_errs.as_collection(|_key, error| error.clone()), + ) + } +} + +/// Accumulates values for the various types of accumulable aggregations. +/// +/// We assume that there are not more than 2^32 elements for the aggregation. +/// Thus we can perform a summation over i32 in an i64 accumulator +/// and not worry about exceeding its bounds. +/// +/// The float accumulator performs accumulation in fixed point arithmetic. The fixed +/// point representation has less precision than a double. It is entirely possible +/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic +/// to preserve group guarantees. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +enum Accum { + /// Accumulates boolean values. + Bool { + /// The number of `true` values observed. + trues: Diff, + /// The number of `false` values observed. + falses: Diff, + }, + /// Accumulates simple numeric values. + SimpleNumber { + /// The accumulation of all non-NULL values observed. + accum: i128, + /// The number of non-NULL values observed. + non_nulls: Diff, + }, + /// Accumulates float values. + Float { + /// Accumulates non-special float values, mapped to a fixed precision i128 domain to + /// preserve associativity and commutativity + accum: i128, + /// Counts +inf + pos_infs: Diff, + /// Counts -inf + neg_infs: Diff, + /// Counts NaNs + nans: Diff, + /// Counts non-NULL values + non_nulls: Diff, + }, +} + +impl Accum { + /// For storing floating number in fixed point representation, we need to scale + const FLOAT_SCALE: f64 = 16777216.0; + /// Initialize a accumulator from a datum. + fn value_to_accumulator(datum: Value, aggr: &AggregateFunc) -> Self { + match aggr { + AggregateFunc::Count => Accum::SimpleNumber { + accum: 0, // unused for AggregateFunc::Count + non_nulls: if datum.is_null() { 0 } else { 1 }, + }, + AggregateFunc::Any | AggregateFunc::All => match datum { + Value::Boolean(true) => Accum::Bool { + trues: 1, + falses: 0, + }, + Value::Null => Accum::Bool { + trues: 0, + falses: 0, + }, + Value::Boolean(false) => Accum::Bool { + trues: 0, + falses: 1, + }, + x => panic!("Invalid argument to AggregateFunc::Any: {x:?}"), + }, + AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => { + let n = match datum { + Value::Float32(n) => f64::from(*n), + Value::Float64(n) => *n, + Value::Null => 0f64, + x => panic!("Invalid argument to AggregateFunc::{aggr:?}: {x:?}"), + }; + + let nans = Diff::from(n.is_nan()); + let pos_infs = Diff::from(n == f64::INFINITY); + let neg_infs = Diff::from(n == f64::NEG_INFINITY); + let non_nulls = Diff::from(datum != Value::Null); + + // Map the floating point value onto a fixed precision domain + // All special values should map to zero, since they are tracked separately + let accum = if nans > 0 || pos_infs > 0 || neg_infs > 0 { + 0 + } else { + // This operation will truncate to i128::MAX if out of range. + // TODO(benesch): rewrite to avoid `as`. + #[allow(clippy::as_conversions)] + { + (n * Self::FLOAT_SCALE) as i128 + } + }; + + Accum::Float { + accum, + pos_infs, + neg_infs, + nans, + non_nulls, + } + } + _ => { + // Other accumulations need to disentangle the accumulable + // value from its NULL-ness, which is not quite as easily + // accumulated. + match datum { + Value::Int16(i) => Accum::SimpleNumber { + accum: i128::from(i), + non_nulls: 1, + }, + Value::Int32(i) => Accum::SimpleNumber { + accum: i128::from(i), + non_nulls: 1, + }, + Value::Int64(i) => Accum::SimpleNumber { + accum: i128::from(i), + non_nulls: 1, + }, + Value::UInt16(u) => Accum::SimpleNumber { + accum: i128::from(u), + non_nulls: 1, + }, + Value::UInt32(u) => Accum::SimpleNumber { + accum: i128::from(u), + non_nulls: 1, + }, + Value::UInt64(u) => Accum::SimpleNumber { + accum: i128::from(u), + non_nulls: 1, + }, + Value::Timestamp(t) => Accum::SimpleNumber { + accum: i128::from(t.value()), + non_nulls: 1, + }, + Value::Null => Accum::SimpleNumber { + accum: 0, + non_nulls: 0, + }, + x => panic!("Accumulating non-integer or unsupported data: {x:?}"), + } + } + } + } + + fn accum_to_value(&self, func: &AggregateFunc, total: i64) -> Value { + match (func, &self) { + (AggregateFunc::Count, Accum::SimpleNumber { non_nulls, .. }) => { + Value::Int64(*non_nulls) + } + (AggregateFunc::All, Accum::Bool { falses, trues }) => { + // If any false, else if all true, else must be no false and some nulls. + if *falses > 0 { + Value::Boolean(false) + } else if *trues == total { + Value::Boolean(true) + } else { + Value::Null + } + } + (AggregateFunc::Any, Accum::Bool { falses, trues }) => { + // If any true, else if all false, else must be no true and some nulls. + if *trues > 0 { + Value::Boolean(true) + } else if *falses == total { + Value::Boolean(false) + } else { + Value::Null + } + } + (AggregateFunc::SumInt16, Accum::SimpleNumber { accum, .. }) + | (AggregateFunc::SumInt32, Accum::SimpleNumber { accum, .. }) => { + // This conversion is safe, as long as we have less than 2^32 + // summands. + // TODO(benesch): are we guaranteed to have less than 2^32 summands? + // If so, rewrite to avoid `as`. + #[allow(clippy::as_conversions)] + Value::Int64(*accum as i64) + } + (AggregateFunc::SumInt64, Accum::SimpleNumber { accum, .. }) => { + Value::from(*accum as i64) + } + (AggregateFunc::SumUInt16, Accum::SimpleNumber { accum, .. }) + | (AggregateFunc::SumUInt32, Accum::SimpleNumber { accum, .. }) => { + if !accum.is_negative() { + // Our semantics of overflow are not clearly articulated. + // We adopt an unsigned + // wrapping behavior to match what we do above for signed types. + // TODO: remove potentially dangerous usage of `as`. + #[allow(clippy::as_conversions)] + Value::UInt64(*accum as u64) + } else { + // Note that we return a value here, but an error in the other + // operator of the reduce_pair. Therefore, we expect that this + // value will never be exposed as an output. + Value::Null + } + } + (AggregateFunc::SumUInt64, Accum::SimpleNumber { accum, .. }) => { + if !accum.is_negative() { + Value::UInt64(*accum as u64) + } else { + // Note that we return a value here, but an error in the other + // operator of the reduce_pair. Therefore, we expect that this + // value will never be exposed as an output. + Value::Null + } + } + ( + AggregateFunc::SumFloat32, + Accum::Float { + accum, + pos_infs, + neg_infs, + nans, + non_nulls: _, + }, + ) => { + if *nans > 0 || (*pos_infs > 0 && *neg_infs > 0) { + // NaNs are NaNs and cases where we've seen a + // mixture of positive and negative infinities. + Value::from(f32::NAN) + } else if *pos_infs > 0 { + Value::from(f32::INFINITY) + } else if *neg_infs > 0 { + Value::from(f32::NEG_INFINITY) + } else { + // TODO: remove potentially dangerous usage of `as`. + #[allow(clippy::as_conversions)] + { + Value::from(((*accum as f64) / Self::FLOAT_SCALE) as f32) + } + } + } + ( + AggregateFunc::SumFloat64, + Accum::Float { + accum, + pos_infs, + neg_infs, + nans, + non_nulls: _, + }, + ) => { + if *nans > 0 || (*pos_infs > 0 && *neg_infs > 0) { + // NaNs are NaNs and cases where we've seen a + // mixture of positive and negative infinities. + Value::from(f64::NAN) + } else if *pos_infs > 0 { + Value::from(f64::INFINITY) + } else if *neg_infs > 0 { + Value::from(f64::NEG_INFINITY) + } else { + // TODO(benesch): remove potentially dangerous usage of `as`. + #[allow(clippy::as_conversions)] + { + Value::from((*accum as f64) / Self::FLOAT_SCALE) + } + } + } + _ => panic!( + "Unexpected accumulation (aggr={:?}, accum={:?})", + func, &self + ), + } + } +} + +impl Semigroup for Accum { + fn is_zero(&self) -> bool { + match self { + Accum::Bool { trues, falses } => trues.is_zero() && falses.is_zero(), + Accum::SimpleNumber { accum, non_nulls } => accum.is_zero() && non_nulls.is_zero(), + Accum::Float { + accum, + pos_infs, + neg_infs, + nans, + non_nulls, + } => { + accum.is_zero() + && pos_infs.is_zero() + && neg_infs.is_zero() + && nans.is_zero() + && non_nulls.is_zero() + } + } + } + + fn plus_equals(&mut self, other: &Accum) { + match (&mut *self, other) { + ( + Accum::Bool { trues, falses }, + Accum::Bool { + trues: other_trues, + falses: other_falses, + }, + ) => { + *trues += other_trues; + *falses += other_falses; + } + ( + Accum::SimpleNumber { accum, non_nulls }, + Accum::SimpleNumber { + accum: other_accum, + non_nulls: other_non_nulls, + }, + ) => { + *accum += other_accum; + *non_nulls += other_non_nulls; + } + ( + Accum::Float { + accum, + pos_infs, + neg_infs, + nans, + non_nulls, + }, + Accum::Float { + accum: other_accum, + pos_infs: other_pos_infs, + neg_infs: other_neg_infs, + nans: other_nans, + non_nulls: other_non_nulls, + }, + ) => { + *accum = accum.checked_add(*other_accum).unwrap_or_else(|| { + logging::warn!("Float accumulator overflow. Incorrect results possible"); + accum.wrapping_add(*other_accum) + }); + *pos_infs += other_pos_infs; + *neg_infs += other_neg_infs; + *nans += other_nans; + *non_nulls += other_non_nulls; + } + (l, r) => unreachable!( + "Accumulator::plus_equals called with non-matching variants: {l:?} vs {r:?}" + ), + } + } +} + +impl Multiply for Accum { + type Output = Accum; + + fn multiply(self, factor: &Diff) -> Accum { + let factor = *factor; + match self { + Accum::Bool { trues, falses } => Accum::Bool { + trues: trues * factor, + falses: falses * factor, + }, + Accum::SimpleNumber { accum, non_nulls } => Accum::SimpleNumber { + accum: accum * i128::from(factor), + non_nulls: non_nulls * factor, + }, + Accum::Float { + accum, + pos_infs, + neg_infs, + nans, + non_nulls, + } => Accum::Float { + accum: accum.checked_mul(i128::from(factor)).unwrap_or_else(|| { + logging::warn!("Float accumulator overflow. Incorrect results possible"); + accum.wrapping_mul(i128::from(factor)) + }), + pos_infs: pos_infs * factor, + neg_infs: neg_infs * factor, + nans: nans * factor, + non_nulls: non_nulls * factor, + }, + } + } } diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index a6072b2e2c..205f38a876 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -1,7 +1,9 @@ #![allow(unused)] +#![allow(clippy::mutable_key_type)] mod adapter; mod compute; mod expr; mod repr; mod storage; +mod util; diff --git a/src/flow/src/repr/mod.rs b/src/flow/src/repr/mod.rs index feed1c3456..a29757efe0 100644 --- a/src/flow/src/repr/mod.rs +++ b/src/flow/src/repr/mod.rs @@ -1,8 +1,10 @@ //! basically a wrapper around the `datatype` crate //! for basic Data Representation use std::borrow::Borrow; +use std::slice::SliceIndex; use datatypes::value::Value; +use serde::{Deserialize, Serialize}; /// System-wide Record count difference type. pub type Diff = i64; @@ -12,15 +14,22 @@ pub type Diff = i64; /// TODO(discord9): use a more efficient representation ///i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\] -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Default)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)] pub struct Row { inner: Vec, } impl Row { + pub fn get(&self, idx: usize) -> Option<&Value> { + self.inner.get(idx) + } pub fn clear(&mut self) { self.inner.clear(); } + pub fn packer(&mut self) -> &mut Vec { + self.inner.clear(); + &mut self.inner + } pub fn pack(iter: I) -> Row where I: IntoIterator, @@ -41,6 +50,9 @@ impl Row { pub fn into_iter(self) -> impl Iterator { self.inner.into_iter() } + pub fn iter(&self) -> impl Iterator { + self.inner.iter() + } } /// System-wide default timestamp type diff --git a/src/flow/src/storage/errors.rs b/src/flow/src/storage/errors.rs index 9994f13eae..f8dd8236a7 100644 --- a/src/flow/src/storage/errors.rs +++ b/src/flow/src/storage/errors.rs @@ -1,13 +1,14 @@ use serde::{Deserialize, Serialize}; +// TODO(discord9): more error types #[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)] pub enum DataflowError { - EvalError(EvalError), + EvalError(Box), } impl From for DataflowError { fn from(e: EvalError) -> Self { - DataflowError::EvalError(e) + DataflowError::EvalError(Box::new(e)) } } @@ -15,4 +16,12 @@ impl From for DataflowError { pub enum EvalError { DivisionByZero, InvalidArgument(String), + Internal(String), +} + +#[test] +fn tell_goal() { + use differential_dataflow::ExchangeData; + fn a(_: T) {} + a(DataflowError::from(EvalError::DivisionByZero)); } diff --git a/src/flow/src/util/buffer.rs b/src/flow/src/util/buffer.rs new file mode 100644 index 0000000000..191565eeb9 --- /dev/null +++ b/src/flow/src/util/buffer.rs @@ -0,0 +1,150 @@ +use differential_dataflow::consolidation::consolidate_updates; +use differential_dataflow::difference::Semigroup; +use differential_dataflow::Data; +use timely::communication::Push; +use timely::dataflow::channels::Bundle; +use timely::dataflow::operators::generic::OutputHandle; +use timely::dataflow::operators::{Capability, InputCapability}; +use timely::progress::Timestamp; + +/// A buffer that consolidates updates +/// +/// The buffer implements a wrapper around [OutputHandle] consolidating elements pushed to it. It is +/// backed by a capacity-limited buffer, which means that compaction only occurs within the +/// dimensions of the buffer, i.e. the number of unique keys is less than half of the buffer's +/// capacity. +/// +/// A cap is retained whenever the current time changes to be able to flush on drop or when the time +/// changes again. +/// +/// The buffer is filled with updates until it reaches its capacity. At this point, the updates are +/// consolidated to free up space. This process repeats until the consolidation recovered less than +/// half of the buffer's capacity, at which point the buffer will be shipped. +/// +/// The buffer retains a capability to send data on flush. It will flush all data once dropped, if +/// time changes, or if the buffer capacity is reached. +pub struct ConsolidateBuffer<'a, 'b, T, D: Data, R: Semigroup, P> +where + P: Push> + 'a, + T: Data + Timestamp + 'a, + D: 'a, +{ + // a buffer for records, to send at self.cap + // Invariant: Buffer only contains data if cap is Some. + buffer: Vec<(D, T, R)>, + output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>, + cap: Option>, + port: usize, + previous_len: usize, +} + +impl<'a, 'b, T, D: Data, R: Semigroup, P> ConsolidateBuffer<'a, 'b, T, D, R, P> +where + T: Data + Timestamp + 'a, + P: Push> + 'a, +{ + /// Create a new [ConsolidateBuffer], wrapping the provided session. + /// + /// * `output_handle`: The output to send data to. + /// * 'port': The output port to retain capabilities for. + pub fn new(output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>, port: usize) -> Self { + Self { + output_handle, + port, + cap: None, + buffer: Vec::with_capacity(::timely::container::buffer::default_capacity::<(D, T, R)>()), + previous_len: 0, + } + } + + #[inline] + /// Provides an iterator of elements to the buffer + pub fn give_iterator>( + &mut self, + cap: &InputCapability, + iter: I, + ) { + for item in iter { + self.give(cap, item); + } + } + + /// Give an element to the buffer + pub fn give(&mut self, cap: &InputCapability, data: (D, T, R)) { + // Retain a cap for the current time, which will be used on flush. + if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) { + // Flush on capability change + self.flush(); + // Retain capability for the specified output port. + self.cap = Some(cap.delayed_for_output(cap.time(), self.port)); + } + self.give_internal(data); + } + + /// Give an element to the buffer, using a pre-fabricated capability. Note that the capability + /// must be valid for the associated output. + pub fn give_at(&mut self, cap: &Capability, data: (D, T, R)) { + // Retain a cap for the current time, which will be used on flush. + if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) { + // Flush on capability change + self.flush(); + // Retain capability. + self.cap = Some(cap.clone()); + } + self.give_internal(data); + } + + /// Give an element and possibly flush the buffer. Note that this needs to have access + /// to a capability, which the public functions ensure. + fn give_internal(&mut self, data: (D, T, R)) { + self.buffer.push(data); + + // Limit, if possible, the lifetime of the allocations for data + // and consolidate smaller buffers if we're in the lucky case + // of a small domain for D + if self.buffer.len() >= 2 * self.previous_len { + // Consolidate while the consolidation frees at least half the buffer + consolidate_updates(&mut self.buffer); + if self.buffer.len() > self.buffer.capacity() / 2 { + self.flush(); + } else { + self.previous_len = self.buffer.len(); + } + // At this point, it is an invariant across give calls that self.previous_len + // will be in the interval [0, self.buffer.capacity() / 2]. So, we will enter + // this if-statement block again when self.buffer.len() == self.buffer.capacity() + // or earlier. If consolidation is not effective to keep self.buffer.len() + // below half capacity, then flushing when more than half-full will + // maintain the invariant. + } + } + + /// Flush the internal buffer to the underlying session + pub fn flush(&mut self) { + if let Some(cap) = &self.cap { + self.output_handle.session(cap).give_vec(&mut self.buffer); + + // Ensure that the capacity is at least equal to the default in case + // it was reduced by give_vec. Note that we cannot rely here on give_vec + // returning us a buffer with zero capacity. + if self.buffer.capacity() < ::timely::container::buffer::default_capacity::<(D, T, R)>() + { + let to_reserve = ::timely::container::buffer::default_capacity::<(D, T, R)>() + - self.buffer.capacity(); + self.buffer.reserve_exact(to_reserve); + } + self.previous_len = 0; + } + } +} + +impl<'a, 'b, T, D: Data, R: Semigroup, P> Drop for ConsolidateBuffer<'a, 'b, T, D, R, P> +where + P: Push> + 'a, + T: Data + Timestamp + 'a, + D: 'a, +{ + fn drop(&mut self) { + self.flush(); + } +} diff --git a/src/flow/src/util/mod.rs b/src/flow/src/util/mod.rs index 2ac02b7630..a62a058c53 100644 --- a/src/flow/src/util/mod.rs +++ b/src/flow/src/util/mod.rs @@ -1,5 +1,7 @@ //! utilitys including extend differential dataflow to deal with errors and etc. +mod buffer; mod operator; mod reduce; + pub use operator::CollectionExt; pub use reduce::ReduceExt; diff --git a/src/flow/src/util/operator.rs b/src/flow/src/util/operator.rs index e506f72f94..3c9bc790ed 100644 --- a/src/flow/src/util/operator.rs +++ b/src/flow/src/util/operator.rs @@ -12,6 +12,8 @@ use timely::dataflow::operators::Capability; use timely::dataflow::{Scope, Stream}; use timely::{Data, ExchangeData}; +use crate::util::buffer::ConsolidateBuffer; + pub trait StreamExt where D1: Data, @@ -100,6 +102,18 @@ where E: Data, I: IntoIterator>, L: FnMut(D1) -> I + 'static; + + /// Replaces each record with another, with a new difference type. + /// + /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) + /// and move the data into the difference component. This will allow differential dataflow to update in-place. + fn explode_one(&self, logic: L) -> Collection>::Output> + where + D2: differential_dataflow::Data, + R2: Semigroup + Multiply, + >::Output: Data + Semigroup, + L: FnMut(D1) -> (D2, R2) + 'static, + G::Timestamp: Lattice; } impl StreamExt for Stream @@ -212,4 +226,32 @@ where }); (ok_stream.as_collection(), err_stream.as_collection()) } + + fn explode_one(&self, mut logic: L) -> Collection>::Output> + where + D2: differential_dataflow::Data, + R2: Semigroup + Multiply, + >::Output: Data + Semigroup, + L: FnMut(D1) -> (D2, R2) + 'static, + G::Timestamp: Lattice, + { + self.inner + .unary(Pipeline, "ExplodeOne", move |_, _| { + let mut buffer = Vec::new(); + move |input, output| { + let mut out = ConsolidateBuffer::new(output, 0); + input.for_each(|time, data| { + data.swap(&mut buffer); + out.give_iterator( + &time, + buffer.drain(..).map(|(x, t, d)| { + let (x, d2) = logic(x); + (x, t, d2.multiply(&d)) + }), + ); + }); + } + }) + .as_collection() + } }