chore: per review

This commit is contained in:
discord9
2024-04-25 11:26:05 +08:00
parent 843ef050d6
commit 533ae3378f

View File

@@ -261,7 +261,7 @@ fn eval_distinct_core(
if let Some(((k, v), t, d)) = new_key_val.clone() {
// update the inner_map, so later updates can be checked against it
inner_map.insert(k.clone(), (v.clone(), t, d));
inner_map.insert(k, (v, t, d));
}
new_key_val
})
@@ -386,15 +386,13 @@ fn reduce_accum_subgraph(
for (key, value_diffs) in key_to_vals {
let col_diffs = {
let row_len = value_diffs[0].0.len();
// split value_diffs into columns
(0..row_len)
.map(|i| {
value_diffs
.iter()
.map(|(row, diff)| (row.get(i).cloned().unwrap(), *diff))
.collect_vec()
})
.collect_vec()
let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));
match res {
Some(res) => res,
// TODO(discord9): consider better error handling other than
// just skip the row and logging error
None => continue,
}
};
let (accums, _, _) = arrange.get(now, &key).unwrap_or_default();
let accums = accums.inner;
@@ -466,6 +464,27 @@ fn reduce_accum_subgraph(
send.give(all_outputs);
}
fn get_col_diffs(
value_diffs: Vec<(Row, repr::Diff)>,
row_len: usize,
) -> Result<Vec<Vec<(Value, i64)>>, EvalError> {
ensure!(
value_diffs.iter().all(|(row, _)| row.len() == row_len),
InternalSnafu {
reason: "value_diffs should have rows with equal length"
}
);
let ret = (0..row_len)
.map(|i| {
value_diffs
.iter()
.map(|(row, diff)| (row.get(i).cloned().unwrap(), *diff))
.collect_vec()
})
.collect_vec();
Ok(ret)
}
/// Eval simple aggregate functions with no distinct input
fn eval_simple_aggrs(
simple_aggrs: &Vec<AggrWithIndex>,