fix: distinct input dedup&change per review

This commit is contained in:
discord9
2024-04-24 14:58:04 +08:00
parent 874d756dba
commit cce40f6a48
2 changed files with 52 additions and 34 deletions

View File

@@ -105,13 +105,10 @@ impl<'referred, 'df> Context<'referred, 'df> {
Plan::Let { id, value, body } => self.eval_let(id, value, body),
Plan::Mfp { input, mfp } => self.render_mfp(input, mfp),
Plan::Reduce {
input: _,
key_val_plan: _,
reduce_plan: _,
} => NotImplementedSnafu {
reason: "Reduce is still WIP".to_string(),
}
.fail(),
input,
key_val_plan,
reduce_plan,
} => self.render_reduce(input, key_val_plan, reduce_plan),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP".to_string(),
}

View File

@@ -33,6 +33,7 @@ use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter};
impl<'referred, 'df> Context<'referred, 'df> {
const REDUCE: &'static str = "reduce";
/// render `Plan::Reduce` into executable dataflow
// There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
#[allow(clippy::mutable_key_type)]
@@ -49,15 +50,12 @@ impl<'referred, 'df> Context<'referred, 'df> {
// the output is concat from key and val
let output_key_arity = key_val_plan.key_plan.output_arity();
// TODO: config global expire time from self
// TODO(discord9): config global expire time from self
let arrange_handler = self.compute_state.new_arrange(None);
// reduce need full arrangement to be able to query all keys
let arrange_handler_inner =
arrange_handler
.clone_full_arrange()
.with_context(|| PlanSnafu {
reason: "No write is expected at this point",
})?;
let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
reason: "No write is expected at this point",
})?;
let distinct_input = self.add_accum_distinct_input_arrange(&reduce_plan);
@@ -74,10 +72,10 @@ impl<'referred, 'df> Context<'referred, 'df> {
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("reduce");
let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>(Self::REDUCE);
let subgraph = self.df.add_subgraph_in_out(
"reduce",
Self::REDUCE,
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
@@ -117,6 +115,9 @@ impl<'referred, 'df> Context<'referred, 'df> {
/// Contrast to it name, it's for adding distinct input for
/// accumulable reduce plan with distinct input,
/// like `select COUNT(DISTINCT col) from table`
///
/// The return value is optional a list of arrangement, which is created for distinct input, and should be the
/// same length as the distinct aggregation in accumulable reduce plan
fn add_accum_distinct_input_arrange(
&mut self,
reduce_plan: &ReducePlan,
@@ -167,7 +168,7 @@ fn split_row_to_key_val(
let val = key_val_plan
.val_plan
.evaluate_into(&mut row.inner.clone(), row_buf)?
.with_context(|| InternalSnafu {
.context(InternalSnafu {
reason: "val_plan should not contain any filter predicate",
})?;
Ok(Some(((key, val), sys_time, diff)))
@@ -195,7 +196,7 @@ fn reduce_subgraph(
) {
let mut row_buf = Row::empty();
let key_val = data.into_iter().filter_map(|(row, sys_time, diff)| {
// error is collected and then ignored
// error is collected and then the row is skipped
err_collector
.run(|| split_row_to_key_val(row, sys_time, diff, key_val_plan, &mut row_buf))
.flatten()
@@ -230,16 +231,19 @@ fn reduce_subgraph(
};
}
/// return distinct rows from the input, but do not update the arrangement
/// return distinct rows(distinct by row's key) from the input, but do not update the arrangement
///
/// if the same key already exist, we only perserve the oldest value(It make sense for distinct input over key)
fn eval_distinct_core(
arrange: ArrangeReader,
kv: impl IntoIterator<Item = KeyValDiffRow>,
now: repr::Timestamp,
err_collector: &ErrCollector,
) -> Vec<KeyValDiffRow> {
// TODO: check if anything could go wrong
// FIX: in the input their are not deduplicated, so it's possible to have multiple updates for the same key
let _ = err_collector;
// note that we also need to keep track of the distinct rows inside the current input
// hence the `inner_map` to kepping track of the distinct rows
let mut inner_map = BTreeMap::new();
kv.into_iter()
.filter_map(|((key, val), ts, diff)| {
@@ -254,9 +258,17 @@ fn eval_distinct_core(
(Some(old_val), -1) if old_val.0 == val => Some(((key, val), ts, -1)),
_ => None,
};
new_key_val
.clone()
.map(|((k, v), t, d)| inner_map.insert(k.clone(), (v.clone(), t, d)));
new_key_val.clone().map(|((k, v), t, d)| {
if inner_map.contains_key(&k) {
// already exist, no need to emit this row
None
} else {
// does not exist, insert it and emit
inner_map.insert(k.clone(), (v.clone(), t, d));
Some(((k, v), t, d))
}
});
new_key_val
})
.collect_vec()
@@ -331,7 +343,7 @@ fn reduce_distinct_subgraph(
/// eval accumulable reduce plan by eval aggregate function and reduce the result
///
/// TODO: eval distinct by adding distinct input arrangement
/// TODO(discord9): eval distinct by adding distinct input arrangement
///
/// invariant: it'is assumed `kv`'s time is always <= now,
/// since it's from a Collection Bundle, where future inserts are stored in arrange
@@ -363,7 +375,9 @@ fn reduce_accum_subgraph(
let mut all_updates = Vec::with_capacity(key2vals.len());
let mut all_outputs = Vec::with_capacity(key2vals.len());
// lock the arrange for write
// lock the arrange for write for the rest of function body
// so to prevent wide race condition since we are going to update the arrangement by write after read
// TODO(discord9): consider key-based lock
let mut arrange = arrange.write();
for (key, value_diffs) in key2vals {
let col_diffs = {
@@ -388,7 +402,7 @@ fn reduce_accum_subgraph(
if let Some(res) = res {
res
} else {
// ignore for whatever reason
// errors is collected, we can just continue and send error back through `err_collector`
continue;
}
};
@@ -481,6 +495,11 @@ fn eval_simple_aggrs(
}
}
/// Accumulate the output of aggregation functions
///
/// The accum is a map from index to the accumulator of the aggregation function
///
/// The output is a map from index to the output of the aggregation function
#[derive(Debug)]
struct AccumOutput {
accum: BTreeMap<usize, Vec<Value>>,
@@ -635,18 +654,16 @@ fn from_val_to_slice_idx(
.with_context(|_| DataTypeSnafu {
msg: "Accum's first element should be a list",
})?
.with_context(|| InternalSnafu {
.context(InternalSnafu {
reason: "Accum's first element should be a list",
})?;
let ret: Vec<usize> = list
.items()
.iter()
.map(|v| {
v.as_u64()
.map(|j| j as usize)
.with_context(|| InternalSnafu {
reason: "End offset should be a list of u64",
})
v.as_u64().map(|j| j as usize).context(InternalSnafu {
reason: "End offset should be a list of u64",
})
})
.try_collect()?;
ensure!(
@@ -678,7 +695,7 @@ fn from_val_to_slice_idx(
}
// mainly for reduce's test
// TODO: add tests for accum ser/de
// TODO(discord9): add tests for accum ser/de
#[cfg(test)]
mod test {
use std::cell::RefCell;
@@ -825,6 +842,7 @@ mod test {
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![2i64.into()]), 5, 1),
(Row::new(vec![3i64.into()]), 6, 1),
(Row::new(vec![1i64.into()]), 7, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
@@ -869,6 +887,7 @@ mod test {
(4, vec![(Row::new(vec![6i64.into()]), 4, 1)]),
(5, vec![(Row::new(vec![6i64.into()]), 5, 1)]),
(6, vec![(Row::new(vec![6i64.into()]), 6, 1)]),
(7, vec![(Row::new(vec![6i64.into()]), 7, 1)]),
]);
run_and_check(&mut state, &mut df, 1..7, expected, output);
}
@@ -892,6 +911,7 @@ mod test {
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![2i64.into()]), 5, 1),
(Row::new(vec![3i64.into()]), 6, 1),
(Row::new(vec![1i64.into()]), 7, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
@@ -951,6 +971,7 @@ mod test {
(4, vec![(Row::new(vec![7i64.into(), 6i64.into()]), 4, 1)]),
(5, vec![(Row::new(vec![9i64.into(), 6i64.into()]), 5, 1)]),
(6, vec![(Row::new(vec![12i64.into(), 6i64.into()]), 6, 1)]),
(7, vec![(Row::new(vec![13i64.into(), 6i64.into()]), 7, 1)]),
]);
run_and_check(&mut state, &mut df, 1..7, expected, output);
}