diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 28bb5a833f..1288d3b061 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -125,19 +125,15 @@ impl<'referred, 'df> Context<'referred, 'df> { match reduce_plan { ReducePlan::Distinct => None, ReducePlan::Accumulable(AccumulablePlan { distinct_aggrs, .. }) => { - if distinct_aggrs.is_empty() { - None - } else { - Some( - (0..distinct_aggrs.len()) - .map(|_| { - let arr = self.compute_state.new_arrange(None); - arr.set_full_arrangement(true); - arr - }) - .collect(), - ) - } + (!distinct_aggrs.is_empty()).then(|| { + std::iter::repeat_with(|| { + let arr = self.compute_state.new_arrange(None); + arr.set_full_arrangement(true); + arr + }) + .take(distinct_aggrs.len()) + .collect() + }) } } } @@ -247,28 +243,26 @@ fn eval_distinct_core( let mut inner_map = BTreeMap::new(); kv.into_iter() .filter_map(|((key, val), ts, diff)| { - let old_val = arrange - .get(now, &key) - .or_else(|| inner_map.get(&key).cloned()); + // first check inner_map, then check the arrangement to make sure getting the newest value + let old_val = inner_map + .get(&key) + .cloned() + .or_else(|| arrange.get(now, &key)); let new_key_val = match (old_val, diff) { // a new distinct row (None, 1) => Some(((key, val), ts, diff)), - // delete old_val - (Some(old_val), -1) if old_val.0 == val => Some(((key, val), ts, -1)), + // if diff from newest value, also do update + (Some(old_val), diff) if old_val.0 == val && old_val.2 != diff => { + Some(((key, val), ts, diff)) + } _ => None, }; - new_key_val.clone().map(|((k, v), t, d)| { - if inner_map.contains_key(&k) { - // already exist, no need to emit this row - None - } else { - // does not exist, insert it and emit - inner_map.insert(k.clone(), (v.clone(), t, d)); - Some(((k, v), t, d)) - } - }); + if let Some(((k, v), t, d)) = new_key_val.clone() { + // update the inner_map, so later upadtes can be checked against it + inner_map.insert(k.clone(), (v.clone(), t, d)); + } new_key_val }) .collect_vec() @@ -347,6 +341,11 @@ fn reduce_distinct_subgraph( /// /// invariant: it'is assumed `kv`'s time is always <= now, /// since it's from a Collection Bundle, where future inserts are stored in arrange +/// +/// the data being send is just new rows that represent the new output after given input is processed +/// +/// i.e: for example before new updates comes in, the output of query `SELECT sum(number) FROM table` +/// is 10, and after new updates comes in, the output is 15, then the new row being send is (15, now, 1) fn reduce_accum_subgraph( arrange: &ArrangeHandler, distinct_input: &Option>, @@ -644,6 +643,7 @@ fn from_accums_to_offsetted_accum(new_accums: Vec>) -> Vec { .collect::>() } +/// Convert a value to a list of slice index fn from_val_to_slice_idx( value: Option, expected_len: usize, @@ -829,6 +829,79 @@ mod test { /// | name | type | /// |------|-------| /// | col | Int64 | + /// + /// this test include even more insert/delete case to cover all case for eval_distinct_core + #[test] + fn test_delete_reduce_distinct_accum() { + let mut df = Hydroflow::new(); + let mut state = DataflowState::default(); + let mut ctx = harness_test_ctx(&mut df, &mut state); + + let rows = vec![ + // same tick + (Row::new(vec![1i64.into()]), 1, 1), + (Row::new(vec![1i64.into()]), 1, -1), + // next tick + (Row::new(vec![1i64.into()]), 2, 1), + (Row::new(vec![1i64.into()]), 3, -1), + // repeat in same tick + (Row::new(vec![1i64.into()]), 4, 1), + (Row::new(vec![1i64.into()]), 4, -1), + (Row::new(vec![1i64.into()]), 4, 1), + ]; + let collection = ctx.render_constant(rows.clone()); + ctx.insert_global(GlobalId::User(1), collection); + let input_plan = Plan::Get { + id: expr::Id::Global(GlobalId::User(1)), + }; + let key_val_plan = KeyValPlan { + key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(), + val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(), + }; + + let distinct_aggrs = vec![AggrWithIndex::new( + AggregateExpr { + func: AggregateFunc::SumInt64, + expr: ScalarExpr::Column(0), + distinct: false, + }, + 0, + 0, + )]; + let accum_plan = AccumulablePlan { + full_aggrs: vec![AggregateExpr { + func: AggregateFunc::SumInt64, + expr: ScalarExpr::Column(0), + distinct: true, + }], + simple_aggrs: vec![], + distinct_aggrs, + }; + + let reduce_plan = ReducePlan::Accumulable(accum_plan); + let bundle = ctx + .render_reduce(Box::new(input_plan), key_val_plan, reduce_plan) + .unwrap(); + + let output = get_output_handle(&mut ctx, bundle); + drop(ctx); + let expected = BTreeMap::from([ + (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]), + (2, vec![(Row::new(vec![1i64.into()]), 2, 1)]), + (3, vec![(Row::new(vec![0i64.into()]), 3, 1)]), + (4, vec![(Row::new(vec![1i64.into()]), 4, 1)]), + ]); + run_and_check(&mut state, &mut df, 1..7, expected, output); + } + + /// SELECT SUM(DISTINCT col) FROM table + /// + /// table schema: + /// | name | type | + /// |------|-------| + /// | col | Int64 | + /// + /// this test include insert and delete which should cover all case for eval_distinct_core #[test] fn test_basic_reduce_distinct_accum() { let mut df = Hydroflow::new(); @@ -837,6 +910,7 @@ mod test { let rows = vec![ (Row::new(vec![1i64.into()]), 1, 1), + (Row::new(vec![1i64.into()]), 1, -1), (Row::new(vec![2i64.into()]), 2, 1), (Row::new(vec![3i64.into()]), 3, 1), (Row::new(vec![1i64.into()]), 4, 1), @@ -881,9 +955,9 @@ mod test { let output = get_output_handle(&mut ctx, bundle); drop(ctx); let expected = BTreeMap::from([ - (1, vec![(Row::new(vec![1i64.into()]), 1, 1)]), - (2, vec![(Row::new(vec![3i64.into()]), 2, 1)]), - (3, vec![(Row::new(vec![6i64.into()]), 3, 1)]), + (1, vec![(Row::new(vec![0i64.into()]), 1, 1)]), + (2, vec![(Row::new(vec![2i64.into()]), 2, 1)]), + (3, vec![(Row::new(vec![5i64.into()]), 3, 1)]), (4, vec![(Row::new(vec![6i64.into()]), 4, 1)]), (5, vec![(Row::new(vec![6i64.into()]), 5, 1)]), (6, vec![(Row::new(vec![6i64.into()]), 6, 1)]),