fix: eval_distinct_core&per review

This commit is contained in:
discord9
2024-04-24 19:18:38 +08:00
parent e8eb3f6756
commit ea117e1b4c

View File

@@ -125,19 +125,15 @@ impl<'referred, 'df> Context<'referred, 'df> {
match reduce_plan {
ReducePlan::Distinct => None,
ReducePlan::Accumulable(AccumulablePlan { distinct_aggrs, .. }) => {
if distinct_aggrs.is_empty() {
None
} else {
Some(
(0..distinct_aggrs.len())
.map(|_| {
let arr = self.compute_state.new_arrange(None);
arr.set_full_arrangement(true);
arr
})
.collect(),
)
}
(!distinct_aggrs.is_empty()).then(|| {
std::iter::repeat_with(|| {
let arr = self.compute_state.new_arrange(None);
arr.set_full_arrangement(true);
arr
})
.take(distinct_aggrs.len())
.collect()
})
}
}
}
@@ -247,28 +243,26 @@ fn eval_distinct_core(
let mut inner_map = BTreeMap::new();
kv.into_iter()
.filter_map(|((key, val), ts, diff)| {
let old_val = arrange
.get(now, &key)
.or_else(|| inner_map.get(&key).cloned());
// first check inner_map, then check the arrangement to make sure getting the newest value
let old_val = inner_map
.get(&key)
.cloned()
.or_else(|| arrange.get(now, &key));
let new_key_val = match (old_val, diff) {
// a new distinct row
(None, 1) => Some(((key, val), ts, diff)),
// delete old_val
(Some(old_val), -1) if old_val.0 == val => Some(((key, val), ts, -1)),
// if diff from newest value, also do update
(Some(old_val), diff) if old_val.0 == val && old_val.2 != diff => {
Some(((key, val), ts, diff))
}
_ => None,
};
new_key_val.clone().map(|((k, v), t, d)| {
if inner_map.contains_key(&k) {
// already exist, no need to emit this row
None
} else {
// does not exist, insert it and emit
inner_map.insert(k.clone(), (v.clone(), t, d));
Some(((k, v), t, d))
}
});
if let Some(((k, v), t, d)) = new_key_val.clone() {
// update the inner_map, so later upadtes can be checked against it
inner_map.insert(k.clone(), (v.clone(), t, d));
}
new_key_val
})
.collect_vec()
@@ -347,6 +341,11 @@ fn reduce_distinct_subgraph(
///
/// invariant: it'is assumed `kv`'s time is always <= now,
/// since it's from a Collection Bundle, where future inserts are stored in arrange
///
/// the data being send is just new rows that represent the new output after given input is processed
///
/// i.e: for example before new updates comes in, the output of query `SELECT sum(number) FROM table`
/// is 10, and after new updates comes in, the output is 15, then the new row being send is (15, now, 1)
fn reduce_accum_subgraph(
arrange: &ArrangeHandler,
distinct_input: &Option<Vec<ArrangeHandler>>,
@@ -644,6 +643,7 @@ fn from_accums_to_offsetted_accum(new_accums: Vec<Vec<Value>>) -> Vec<Value> {
.collect::<Vec<_>>()
}
/// Convert a value to a list of slice index
fn from_val_to_slice_idx(
value: Option<Value>,
expected_len: usize,
@@ -829,6 +829,79 @@ mod test {
/// | name | type |
/// |------|-------|
/// | col | Int64 |
///
/// this test include even more insert/delete case to cover all case for eval_distinct_core
#[test]
fn test_delete_reduce_distinct_accum() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
// same tick
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![1i64.into()]), 1, -1),
// next tick
(Row::new(vec![1i64.into()]), 2, 1),
(Row::new(vec![1i64.into()]), 3, -1),
// repeat in same tick
(Row::new(vec![1i64.into()]), 4, 1),
(Row::new(vec![1i64.into()]), 4, -1),
(Row::new(vec![1i64.into()]), 4, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
let key_val_plan = KeyValPlan {
key_plan: MapFilterProject::new(1).project([]).unwrap().into_safe(),
val_plan: MapFilterProject::new(1).project([0]).unwrap().into_safe(),
};
let distinct_aggrs = vec![AggrWithIndex::new(
AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: false,
},
0,
0,
)];
let accum_plan = AccumulablePlan {
full_aggrs: vec![AggregateExpr {
func: AggregateFunc::SumInt64,
expr: ScalarExpr::Column(0),
distinct: true,
}],
simple_aggrs: vec![],
distinct_aggrs,
};
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(Box::new(input_plan), key_val_plan, reduce_plan)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([
(1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
(2, vec![(Row::new(vec![1i64.into()]), 2, 1)]),
(3, vec![(Row::new(vec![0i64.into()]), 3, 1)]),
(4, vec![(Row::new(vec![1i64.into()]), 4, 1)]),
]);
run_and_check(&mut state, &mut df, 1..7, expected, output);
}
/// SELECT SUM(DISTINCT col) FROM table
///
/// table schema:
/// | name | type |
/// |------|-------|
/// | col | Int64 |
///
/// this test include insert and delete which should cover all case for eval_distinct_core
#[test]
fn test_basic_reduce_distinct_accum() {
let mut df = Hydroflow::new();
@@ -837,6 +910,7 @@ mod test {
let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![1i64.into()]), 1, -1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
(Row::new(vec![1i64.into()]), 4, 1),
@@ -881,9 +955,9 @@ mod test {
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([
(1, vec![(Row::new(vec![1i64.into()]), 1, 1)]),
(2, vec![(Row::new(vec![3i64.into()]), 2, 1)]),
(3, vec![(Row::new(vec![6i64.into()]), 3, 1)]),
(1, vec![(Row::new(vec![0i64.into()]), 1, 1)]),
(2, vec![(Row::new(vec![2i64.into()]), 2, 1)]),
(3, vec![(Row::new(vec![5i64.into()]), 3, 1)]),
(4, vec![(Row::new(vec![6i64.into()]), 4, 1)]),
(5, vec![(Row::new(vec![6i64.into()]), 5, 1)]),
(6, vec![(Row::new(vec![6i64.into()]), 6, 1)]),