diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index a300052eec..d0fdc1fdb9 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -105,13 +105,10 @@ impl<'referred, 'df> Context<'referred, 'df> { Plan::Let { id, value, body } => self.eval_let(id, value, body), Plan::Mfp { input, mfp } => self.render_mfp(input, mfp), Plan::Reduce { - input: _, - key_val_plan: _, - reduce_plan: _, - } => NotImplementedSnafu { - reason: "Reduce is still WIP".to_string(), - } - .fail(), + input, + key_val_plan, + reduce_plan, + } => self.render_reduce(input, key_val_plan, reduce_plan), Plan::Join { .. } => NotImplementedSnafu { reason: "Join is still WIP".to_string(), } diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index d9a67fff69..9f7358edf0 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -33,6 +33,7 @@ use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter}; impl<'referred, 'df> Context<'referred, 'df> { + const REDUCE: &'static str = "reduce"; /// render `Plan::Reduce` into executable dataflow // There is a false positive in using `Vec` as key due to `Value` have `bytes` variant #[allow(clippy::mutable_key_type)] @@ -49,15 +50,12 @@ impl<'referred, 'df> Context<'referred, 'df> { // the output is concat from key and val let output_key_arity = key_val_plan.key_plan.output_arity(); - // TODO: config global expire time from self + // TODO(discord9): config global expire time from self let arrange_handler = self.compute_state.new_arrange(None); // reduce need full arrangement to be able to query all keys - let arrange_handler_inner = - arrange_handler - .clone_full_arrange() - .with_context(|| PlanSnafu { - reason: "No write is expected at this point", - })?; + let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu { + reason: "No write is expected at this point", + })?; let distinct_input = self.add_accum_distinct_input_arrange(&reduce_plan); @@ -74,10 +72,10 @@ impl<'referred, 'df> Context<'referred, 'df> { let scheduler = self.compute_state.get_scheduler(); let scheduler_inner = scheduler.clone(); - let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("reduce"); + let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>(Self::REDUCE); let subgraph = self.df.add_subgraph_in_out( - "reduce", + Self::REDUCE, input.collection.into_inner(), out_send_port, move |_ctx, recv, send| { @@ -117,6 +115,9 @@ impl<'referred, 'df> Context<'referred, 'df> { /// Contrast to it name, it's for adding distinct input for /// accumulable reduce plan with distinct input, /// like `select COUNT(DISTINCT col) from table` + /// + /// The return value is optional a list of arrangement, which is created for distinct input, and should be the + /// same length as the distinct aggregation in accumulable reduce plan fn add_accum_distinct_input_arrange( &mut self, reduce_plan: &ReducePlan, @@ -167,7 +168,7 @@ fn split_row_to_key_val( let val = key_val_plan .val_plan .evaluate_into(&mut row.inner.clone(), row_buf)? - .with_context(|| InternalSnafu { + .context(InternalSnafu { reason: "val_plan should not contain any filter predicate", })?; Ok(Some(((key, val), sys_time, diff))) @@ -195,7 +196,7 @@ fn reduce_subgraph( ) { let mut row_buf = Row::empty(); let key_val = data.into_iter().filter_map(|(row, sys_time, diff)| { - // error is collected and then ignored + // error is collected and then the row is skipped err_collector .run(|| split_row_to_key_val(row, sys_time, diff, key_val_plan, &mut row_buf)) .flatten() @@ -230,16 +231,19 @@ fn reduce_subgraph( }; } -/// return distinct rows from the input, but do not update the arrangement +/// return distinct rows(distinct by row's key) from the input, but do not update the arrangement +/// +/// if the same key already exist, we only perserve the oldest value(It make sense for distinct input over key) fn eval_distinct_core( arrange: ArrangeReader, kv: impl IntoIterator, now: repr::Timestamp, err_collector: &ErrCollector, ) -> Vec { - // TODO: check if anything could go wrong - // FIX: in the input their are not deduplicated, so it's possible to have multiple updates for the same key let _ = err_collector; + + // note that we also need to keep track of the distinct rows inside the current input + // hence the `inner_map` to kepping track of the distinct rows let mut inner_map = BTreeMap::new(); kv.into_iter() .filter_map(|((key, val), ts, diff)| { @@ -254,9 +258,17 @@ fn eval_distinct_core( (Some(old_val), -1) if old_val.0 == val => Some(((key, val), ts, -1)), _ => None, }; - new_key_val - .clone() - .map(|((k, v), t, d)| inner_map.insert(k.clone(), (v.clone(), t, d))); + + new_key_val.clone().map(|((k, v), t, d)| { + if inner_map.contains_key(&k) { + // already exist, no need to emit this row + None + } else { + // does not exist, insert it and emit + inner_map.insert(k.clone(), (v.clone(), t, d)); + Some(((k, v), t, d)) + } + }); new_key_val }) .collect_vec() @@ -331,7 +343,7 @@ fn reduce_distinct_subgraph( /// eval accumulable reduce plan by eval aggregate function and reduce the result /// -/// TODO: eval distinct by adding distinct input arrangement +/// TODO(discord9): eval distinct by adding distinct input arrangement /// /// invariant: it'is assumed `kv`'s time is always <= now, /// since it's from a Collection Bundle, where future inserts are stored in arrange @@ -363,7 +375,9 @@ fn reduce_accum_subgraph( let mut all_updates = Vec::with_capacity(key2vals.len()); let mut all_outputs = Vec::with_capacity(key2vals.len()); - // lock the arrange for write + // lock the arrange for write for the rest of function body + // so to prevent wide race condition since we are going to update the arrangement by write after read + // TODO(discord9): consider key-based lock let mut arrange = arrange.write(); for (key, value_diffs) in key2vals { let col_diffs = { @@ -388,7 +402,7 @@ fn reduce_accum_subgraph( if let Some(res) = res { res } else { - // ignore for whatever reason + // errors is collected, we can just continue and send error back through `err_collector` continue; } }; @@ -481,6 +495,11 @@ fn eval_simple_aggrs( } } +/// Accumulate the output of aggregation functions +/// +/// The accum is a map from index to the accumulator of the aggregation function +/// +/// The output is a map from index to the output of the aggregation function #[derive(Debug)] struct AccumOutput { accum: BTreeMap>, @@ -635,18 +654,16 @@ fn from_val_to_slice_idx( .with_context(|_| DataTypeSnafu { msg: "Accum's first element should be a list", })? - .with_context(|| InternalSnafu { + .context(InternalSnafu { reason: "Accum's first element should be a list", })?; let ret: Vec = list .items() .iter() .map(|v| { - v.as_u64() - .map(|j| j as usize) - .with_context(|| InternalSnafu { - reason: "End offset should be a list of u64", - }) + v.as_u64().map(|j| j as usize).context(InternalSnafu { + reason: "End offset should be a list of u64", + }) }) .try_collect()?; ensure!( @@ -678,7 +695,7 @@ fn from_val_to_slice_idx( } // mainly for reduce's test -// TODO: add tests for accum ser/de +// TODO(discord9): add tests for accum ser/de #[cfg(test)] mod test { use std::cell::RefCell; @@ -825,6 +842,7 @@ mod test { (Row::new(vec![1i64.into()]), 4, 1), (Row::new(vec![2i64.into()]), 5, 1), (Row::new(vec![3i64.into()]), 6, 1), + (Row::new(vec![1i64.into()]), 7, 1), ]; let collection = ctx.render_constant(rows.clone()); ctx.insert_global(GlobalId::User(1), collection); @@ -869,6 +887,7 @@ mod test { (4, vec![(Row::new(vec![6i64.into()]), 4, 1)]), (5, vec![(Row::new(vec![6i64.into()]), 5, 1)]), (6, vec![(Row::new(vec![6i64.into()]), 6, 1)]), + (7, vec![(Row::new(vec![6i64.into()]), 7, 1)]), ]); run_and_check(&mut state, &mut df, 1..7, expected, output); } @@ -892,6 +911,7 @@ mod test { (Row::new(vec![1i64.into()]), 4, 1), (Row::new(vec![2i64.into()]), 5, 1), (Row::new(vec![3i64.into()]), 6, 1), + (Row::new(vec![1i64.into()]), 7, 1), ]; let collection = ctx.render_constant(rows.clone()); ctx.insert_global(GlobalId::User(1), collection); @@ -951,6 +971,7 @@ mod test { (4, vec![(Row::new(vec![7i64.into(), 6i64.into()]), 4, 1)]), (5, vec![(Row::new(vec![9i64.into(), 6i64.into()]), 5, 1)]), (6, vec![(Row::new(vec![12i64.into(), 6i64.into()]), 6, 1)]), + (7, vec![(Row::new(vec![13i64.into(), 6i64.into()]), 7, 1)]), ]); run_and_check(&mut state, &mut df, 1..7, expected, output); }