From 533ae3378f40ece09ff40bf1ab7b4b55e662ee83 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 25 Apr 2024 11:26:05 +0800 Subject: [PATCH] chore: per review --- src/flow/src/compute/render/reduce.rs | 39 ++++++++++++++++++++------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index fe8f64560b..ae4e6c8219 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -261,7 +261,7 @@ fn eval_distinct_core( if let Some(((k, v), t, d)) = new_key_val.clone() { // update the inner_map, so later updates can be checked against it - inner_map.insert(k.clone(), (v.clone(), t, d)); + inner_map.insert(k, (v, t, d)); } new_key_val }) @@ -386,15 +386,13 @@ fn reduce_accum_subgraph( for (key, value_diffs) in key_to_vals { let col_diffs = { let row_len = value_diffs[0].0.len(); - // split value_diffs into columns - (0..row_len) - .map(|i| { - value_diffs - .iter() - .map(|(row, diff)| (row.get(i).cloned().unwrap(), *diff)) - .collect_vec() - }) - .collect_vec() + let res = err_collector.run(|| get_col_diffs(value_diffs, row_len)); + match res { + Some(res) => res, + // TODO(discord9): consider better error handling other than + // just skip the row and logging error + None => continue, + } }; let (accums, _, _) = arrange.get(now, &key).unwrap_or_default(); let accums = accums.inner; @@ -466,6 +464,27 @@ fn reduce_accum_subgraph( send.give(all_outputs); } +fn get_col_diffs( + value_diffs: Vec<(Row, repr::Diff)>, + row_len: usize, +) -> Result>, EvalError> { + ensure!( + value_diffs.iter().all(|(row, _)| row.len() == row_len), + InternalSnafu { + reason: "value_diffs should have rows with equal length" + } + ); + let ret = (0..row_len) + .map(|i| { + value_diffs + .iter() + .map(|(row, diff)| (row.get(i).cloned().unwrap(), *diff)) + .collect_vec() + }) + .collect_vec(); + Ok(ret) +} + /// Eval simple aggregate functions with no distinct input fn eval_simple_aggrs( simple_aggrs: &Vec,