refactor(flow): split render.rs and other minor changes (#3750)

* refactor: split render.rs

* chore: per review

* chore: per review

* chore: docs explain `apply_updates` behavior

* chore: per review
This commit is contained in:
discord9
2024-04-22 17:48:09 +08:00
committed by GitHub
parent d0b2a11f2b
commit d4b814f698
15 changed files with 717 additions and 355 deletions

26
Cargo.lock generated
View File

@@ -4150,8 +4150,8 @@ dependencies = [
[[package]]
name = "hydroflow"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
version = "0.6.2"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320"
dependencies = [
"bincode",
"byteorder",
@@ -4183,7 +4183,7 @@ dependencies = [
[[package]]
name = "hydroflow_datalog"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320"
dependencies = [
"hydroflow_datalog_core",
"proc-macro-crate 1.3.1",
@@ -4194,8 +4194,8 @@ dependencies = [
[[package]]
name = "hydroflow_datalog_core"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
version = "0.6.1"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320"
dependencies = [
"hydroflow_lang",
"proc-macro-crate 1.3.1",
@@ -4209,8 +4209,8 @@ dependencies = [
[[package]]
name = "hydroflow_lang"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
version = "0.6.2"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320"
dependencies = [
"auto_impl",
"clap 4.5.4",
@@ -4230,7 +4230,7 @@ dependencies = [
[[package]]
name = "hydroflow_macro"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320"
dependencies = [
"hydroflow_lang",
"itertools 0.10.5",
@@ -4723,8 +4723,8 @@ dependencies = [
[[package]]
name = "lattices"
version = "0.5.3"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
version = "0.5.4"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320"
dependencies = [
"cc-traits",
"sealed",
@@ -7376,7 +7376,7 @@ checksum = "3b7e158a385023d209d6d5f2585c4b468f6dcb3dd5aca9b75c4f1678c05bb375"
[[package]]
name = "pusherator"
version = "0.0.5"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320"
dependencies = [
"either",
"variadics",
@@ -11148,7 +11148,7 @@ version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if 1.0.0",
"cfg-if 0.1.10",
"rand",
"static_assertions",
]
@@ -11562,7 +11562,7 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "variadics"
version = "0.0.4"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320"
dependencies = [
"sealed",
]

View File

@@ -14,14 +14,14 @@ common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
# This fork is simply for keeping our dependency in our org, and pin the version
# it is the same with upstream repo
datafusion-common.workspace = true
datafusion-expr.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", rev = "ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" }
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }
itertools.workspace = true
num-traits = "0.2"
serde.workspace = true

View File

@@ -18,25 +18,32 @@
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::ops::Range;
use std::rc::Rc;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use hydroflow::futures::SinkExt;
use hydroflow::lattices::cc_traits::Get;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use super::state::Scheduler;
use crate::adapter::error::{Error, EvalSnafu, InvalidQuerySnafu};
use crate::adapter::error::{Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu};
use crate::compute::state::DataflowState;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::{
self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr,
};
use crate::plan::Plan;
use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::{ArrangeHandler, Arrangement};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement};
mod map;
/// The Context for build a Operator with id of `GlobalId`
pub struct Context<'referred, 'df> {
@@ -86,8 +93,6 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}
// There is a false positive in using `Vec<ScalarExpr>` as key
#[allow(clippy::mutable_key_type)]
impl<'referred, 'df> Context<'referred, 'df> {
/// Interpret and execute plan
///
@@ -97,26 +102,62 @@ impl<'referred, 'df> Context<'referred, 'df> {
Plan::Constant { rows } => Ok(self.render_constant(rows)),
Plan::Get { id } => self.get_by_id(id),
Plan::Let { id, value, body } => self.eval_let(id, value, body),
Plan::Mfp { input, mfp } => {
self.render_map_filter_project_into_executable_dataflow(input, mfp)
Plan::Mfp { input, mfp } => self.render_mfp(input, mfp),
Plan::Reduce {
input: _,
key_val_plan: _,
reduce_plan: _,
} => NotImplementedSnafu {
reason: "Reduce is still WIP".to_string(),
}
Plan::Reduce { .. } => todo!(),
Plan::Join { .. } => todo!(),
Plan::Union { .. } => todo!(),
.fail(),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP".to_string(),
}
.fail(),
Plan::Union { .. } => NotImplementedSnafu {
reason: "Union is still WIP".to_string(),
}
.fail(),
}
}
/// render Constant, will only emit the `rows` once.
pub fn render_constant(&mut self, mut rows: Vec<DiffRow>) -> CollectionBundle {
/// render Constant, take all rows that have a timestamp not greater than the current time
///
/// Always assume input is sorted by timestamp
pub fn render_constant(&mut self, rows: Vec<DiffRow>) -> CollectionBundle {
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant");
let mut per_time: BTreeMap<repr::Timestamp, Vec<DiffRow>> = rows
.into_iter()
.group_by(|(_row, ts, _diff)| *ts)
.into_iter()
.map(|(k, v)| (k, v.into_iter().collect_vec()))
.collect();
let now = self.compute_state.current_time_ref();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
self.df
.add_subgraph_source("Constant", send_port, move |_ctx, send_port| {
if rows.is_empty() {
return;
}
send_port.give(std::mem::take(&mut rows));
});
let subgraph_id =
self.df
.add_subgraph_source("Constant", send_port, move |_ctx, send_port| {
// find the first timestamp that is greater than now
// use filter_map
let mut after = per_time.split_off(&(*now.borrow() + 1));
// swap
std::mem::swap(&mut per_time, &mut after);
let not_great_than_now = after;
not_great_than_now.into_iter().for_each(|(_ts, rows)| {
send_port.give(rows);
});
// schedule the next run
if let Some(next_run_time) = per_time.keys().next().copied() {
scheduler_inner.schedule_at(next_run_time);
}
});
scheduler.set_cur_subgraph(subgraph_id);
CollectionBundle::from_collection(Collection::from_port(recv_port))
}
@@ -161,144 +202,14 @@ impl<'referred, 'df> Context<'referred, 'df> {
let ret = self.render_plan(*body)?;
Ok(ret)
}
/// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time
/// TODO(discord9): schedule mfp operator to run when temporal filter need
///
/// `MapFilterProject`(`mfp` for short) is scheduled to run when there is enough amount of input updates
/// ***or*** when a future update in it's output buffer(a `Arrangement`) is supposed to emit now.
pub fn render_map_filter_project_into_executable_dataflow(
&mut self,
input: Box<Plan>,
mfp: MapFilterProject,
) -> Result<CollectionBundle, Error> {
let input = self.render_plan(*input)?;
// TODO(discord9): consider if check if contain temporal to determine if
// need arrange or not, or does this added complexity worth it
let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("mfp");
let input_arity = mfp.input_arity;
// default to have a arrange with only future updates, so it can be empty if no temporal filter is applied
// as stream only sends current updates and etc.
let arrange = Arrangement::new();
let arrange_handler = ArrangeHandler::from(arrange.clone());
let arrange_handler_inner = ArrangeHandler::from(arrange);
// This closure capture following variables:
let mfp_plan = MfpPlan::create_from(mfp)?;
let now = self.compute_state.current_time_ref();
let err_collector = self.err_collector.clone();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let subgraph = self.df.add_subgraph_in_out(
"mfp",
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
// mfp only need to passively receive updates from recvs
let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
mfp_subgraph(
&arrange_handler_inner,
data,
&mfp_plan,
*now.borrow(),
&err_collector,
&scheduler_inner,
send,
);
},
);
// register current subgraph in scheduler for future scheduling
scheduler.set_cur_subgraph(subgraph);
let arranged = BTreeMap::from([(
(0..input_arity).map(ScalarExpr::Column).collect_vec(),
Arranged::new(arrange_handler),
)]);
let bundle = CollectionBundle {
collection: Collection::from_port(out_recv_port),
arranged,
};
Ok(bundle)
}
}
fn mfp_subgraph(
arrange: &ArrangeHandler,
input: impl IntoIterator<Item = DiffRow>,
mfp_plan: &MfpPlan,
/// The Common argument for all `Subgraph` in the render process
struct SubgraphArg<'a> {
now: repr::Timestamp,
err_collector: &ErrCollector,
scheduler: &Scheduler,
send: &PortCtx<SEND, Toff>,
) {
let run_mfp = || {
let all_updates = eval_mfp_core(input, mfp_plan, now, err_collector);
arrange.write().apply_updates(now, all_updates)?;
Ok(())
};
err_collector.run(run_mfp);
// Deal with output:
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
// 2. Output the updates.
// 3. Truncate all updates within that range.
let from = arrange.read().last_compaction_time().map(|n| n + 1);
let from = from.unwrap_or(repr::Timestamp::MIN);
let output_kv = arrange.read().get_updates_in_range(from..=now);
// the output is expected to be key -> empty val
let output = output_kv
.into_iter()
.map(|((key, _v), ts, diff)| (key, ts, diff))
.collect_vec();
send.give(output);
let run_compaction = || {
arrange.write().compaction_to(now)?;
Ok(())
};
err_collector.run(run_compaction);
// schedule the next time this operator should run
if let Some(i) = arrange.read().get_next_update_time(&now) {
scheduler.schedule_at(i)
}
}
/// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator,
/// return the output updates **And** possibly any number of errors that occurred during the evaluation
fn eval_mfp_core(
input: impl IntoIterator<Item = DiffRow>,
mfp_plan: &MfpPlan,
now: repr::Timestamp,
err_collector: &ErrCollector,
) -> Vec<KeyValDiffRow> {
let mut all_updates = Vec::new();
for (mut row, _sys_time, diff) in input.into_iter() {
// this updates is expected to be only zero to two rows
let updates = mfp_plan.evaluate::<EvalError>(&mut row.inner, now, diff);
// TODO(discord9): refactor error handling
// Expect error in a single row to not interrupt the whole evaluation
let updates = updates
.filter_map(|r| match r {
Ok((key, ts, diff)) => Some(((key, Row::empty()), ts, diff)),
Err((err, _ts, _diff)) => {
err_collector.push_err(err);
None
}
})
.collect_vec();
all_updates.extend(updates);
}
all_updates
err_collector: &'a ErrCollector,
scheduler: &'a Scheduler,
send: &'a PortCtx<SEND, Toff>,
}
#[cfg(test)]
@@ -316,64 +227,30 @@ mod test {
use crate::expr::BinaryFunc;
use crate::repr::Row;
fn harness_test_ctx<'r, 'h>(
df: &'r mut Hydroflow<'h>,
state: &'r mut DataflowState,
) -> Context<'r, 'h> {
let err_collector = state.get_err_collector();
Context {
id: GlobalId::User(0),
df,
compute_state: state,
input_collection: BTreeMap::new(),
local_scope: Default::default(),
err_collector,
pub fn run_and_check(
state: &mut DataflowState,
df: &mut Hydroflow,
time_range: Range<i64>,
expected: BTreeMap<i64, Vec<DiffRow>>,
output: Rc<RefCell<Vec<DiffRow>>>,
) {
for now in time_range {
state.set_current_ts(now);
state.run_available_with_schedule(df);
assert!(state.get_err_collector().inner.borrow().is_empty());
if let Some(expected) = expected.get(&now) {
assert_eq!(*output.borrow(), *expected, "at ts={}", now);
} else {
assert_eq!(*output.borrow(), vec![], "at ts={}", now);
};
output.borrow_mut().clear();
}
}
/// test if temporal filter works properly
/// namely: if mfp operator can schedule a delete at the correct time
#[test]
fn test_render_mfp_with_temporal() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
];
let collection = ctx.render_constant(rows);
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
// temporal filter: now <= col(0) < now + 4
let mfp = MapFilterProject::new(1)
.filter(vec![
ScalarExpr::Column(0)
.call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype()))
.call_binary(
ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
BinaryFunc::Gte,
),
ScalarExpr::Column(0)
.call_binary(
ScalarExpr::literal(4i64.into(), ConcreteDataType::int64_datatype()),
BinaryFunc::SubInt64,
)
.call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype()))
.call_binary(
ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
BinaryFunc::Lt,
),
])
.unwrap();
let mut bundle = ctx
.render_map_filter_project_into_executable_dataflow(Box::new(input_plan), mfp)
.unwrap();
pub fn get_output_handle(
ctx: &mut Context,
mut bundle: CollectionBundle,
) -> Rc<RefCell<Vec<DiffRow>>> {
let collection = bundle.collection;
let _arranged = bundle.arranged.pop_first().unwrap().1;
let output = Rc::new(RefCell::new(vec![]));
@@ -388,93 +265,22 @@ mod test {
output_inner.borrow_mut().extend(res);
},
);
// drop ctx here to simulate actual process of compile first, run later scenario
drop(ctx);
// expected output at given time
let expected_output = BTreeMap::from([
(
0, // time
vec![
(Row::new(vec![1i64.into()]), 0, 1),
(Row::new(vec![2i64.into()]), 0, 1),
(Row::new(vec![3i64.into()]), 0, 1),
],
),
(
2, // time
vec![(Row::new(vec![1i64.into()]), 2, -1)],
),
(
3, // time
vec![(Row::new(vec![2i64.into()]), 3, -1)],
),
(
4, // time
vec![(Row::new(vec![3i64.into()]), 4, -1)],
),
]);
for now in 0i64..5 {
state.set_current_ts(now);
state.run_available_with_schedule(&mut df);
assert!(state.get_err_collector().inner.borrow().is_empty());
if let Some(expected) = expected_output.get(&now) {
assert_eq!(*output.borrow(), *expected);
} else {
assert_eq!(*output.borrow(), vec![]);
};
output.borrow_mut().clear();
}
output
}
/// test if mfp operator without temporal filter works properly
/// that is it filter the rows correctly
#[test]
fn test_render_mfp() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1.into()]), 1, 1),
(Row::new(vec![2.into()]), 2, 1),
(Row::new(vec![3.into()]), 3, 1),
];
let collection = ctx.render_constant(rows);
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
// filter: col(0)>1
let mfp = MapFilterProject::new(1)
.filter(vec![ScalarExpr::Column(0).call_binary(
ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()),
BinaryFunc::Gt,
)])
.unwrap();
let bundle = ctx
.render_map_filter_project_into_executable_dataflow(Box::new(input_plan), mfp)
.unwrap();
let collection = bundle.collection.clone(ctx.df);
ctx.df.add_subgraph_sink(
"test_render_constant",
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
assert_eq!(
res,
vec![
(Row::new(vec![2.into()]), 0, 1),
(Row::new(vec![3.into()]), 0, 1),
]
)
},
);
drop(ctx);
df.run_available();
pub fn harness_test_ctx<'r, 'h>(
df: &'r mut Hydroflow<'h>,
state: &'r mut DataflowState,
) -> Context<'r, 'h> {
let err_collector = state.get_err_collector();
Context {
id: GlobalId::User(0),
df,
compute_state: state,
input_collection: BTreeMap::new(),
local_scope: Default::default(),
err_collector,
}
}
/// test if constant operator works properly
@@ -494,7 +300,7 @@ mod test {
let collection = collection.collection.clone(ctx.df);
let cnt = Rc::new(RefCell::new(0));
let cnt_inner = cnt.clone();
ctx.df.add_subgraph_sink(
let res_subgraph_id = ctx.df.add_subgraph_sink(
"test_render_constant",
collection.into_inner(),
move |_ctx, recv| {
@@ -502,9 +308,16 @@ mod test {
*cnt_inner.borrow_mut() += data.iter().map(|v| v.len()).sum::<usize>();
},
);
ctx.compute_state.set_current_ts(2);
ctx.compute_state.run_available_with_schedule(ctx.df);
assert_eq!(*cnt.borrow(), 2);
ctx.compute_state.set_current_ts(3);
ctx.compute_state.run_available_with_schedule(ctx.df);
// to get output
ctx.df.schedule_subgraph(res_subgraph_id);
ctx.df.run_available();
assert_eq!(*cnt.borrow(), 3);
ctx.df.run_available();
assert_eq!(*cnt.borrow(), 3);
}
@@ -533,4 +346,33 @@ mod test {
assert_eq!(sum.borrow().to_owned(), 45);
}
#[test]
fn test_tee_auto_schedule() {
use hydroflow::scheduled::handoff::TeeingHandoff as Toff;
let mut df = Hydroflow::new();
let (send_port, recv_port) = df.make_edge::<_, Toff<i32>>("test_handoff");
let source = df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {
send.give(vec![i]);
}
});
let teed_recv_port = recv_port.tee(&mut df);
let sum = Rc::new(RefCell::new(0));
let sum_move = sum.clone();
let _sink = df.add_subgraph_sink("test_handoff_sink", teed_recv_port, move |_ctx, recv| {
let data = recv.take_inner();
*sum_move.borrow_mut() += data.iter().flat_map(|i| i.iter()).sum::<i32>();
});
drop(recv_port);
df.run_available();
assert_eq!(sum.borrow().to_owned(), 45);
df.schedule_subgraph(source);
df.run_available();
assert_eq!(sum.borrow().to_owned(), 90);
}
}

View File

@@ -0,0 +1,294 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::OptionExt;
use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::Context;
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr};
use crate::plan::Plan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::ArrangeHandler;
impl<'referred, 'df> Context<'referred, 'df> {
/// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time
/// TODO(discord9): schedule mfp operator to run when temporal filter need
///
/// `MapFilterProject`(`mfp` for short) is scheduled to run when there is enough amount of input updates
/// ***or*** when a future update in it's output buffer(a `Arrangement`) is supposed to emit now.
// There is a false positive in using `Vec<ScalarExpr>` as key due to `Value` have `bytes` variant
#[allow(clippy::mutable_key_type)]
pub fn render_mfp(
&mut self,
input: Box<Plan>,
mfp: MapFilterProject,
) -> Result<CollectionBundle, Error> {
let input = self.render_plan(*input)?;
// TODO(discord9): consider if check if contain temporal to determine if
// need arrange or not, or does this added complexity worth it
let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("mfp");
let output_arity = mfp.output_arity();
// default to have a arrange with only future updates, so it can be empty if no temporal filter is applied
// as stream only sends current updates and etc.
let arrange_handler = self.compute_state.new_arrange(None);
let arrange_handler_inner =
arrange_handler
.clone_future_only()
.with_context(|| PlanSnafu {
reason: "No write is expected at this point",
})?;
// This closure capture following variables:
let mfp_plan = MfpPlan::create_from(mfp)?;
let now = self.compute_state.current_time_ref();
let err_collector = self.err_collector.clone();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let subgraph = self.df.add_subgraph_in_out(
"mfp",
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
// mfp only need to passively receive updates from recvs
let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
mfp_subgraph(
&arrange_handler_inner,
data,
&mfp_plan,
*now.borrow(),
&err_collector,
&scheduler_inner,
send,
);
},
);
// register current subgraph in scheduler for future scheduling
scheduler.set_cur_subgraph(subgraph);
let arranged = BTreeMap::from([(
(0..output_arity).map(ScalarExpr::Column).collect_vec(),
Arranged::new(arrange_handler),
)]);
let bundle = CollectionBundle {
collection: Collection::from_port(out_recv_port),
arranged,
};
Ok(bundle)
}
}
fn mfp_subgraph(
arrange: &ArrangeHandler,
input: impl IntoIterator<Item = DiffRow>,
mfp_plan: &MfpPlan,
now: repr::Timestamp,
err_collector: &ErrCollector,
scheduler: &Scheduler,
send: &PortCtx<SEND, Toff>,
) {
let run_mfp = || {
let all_updates = eval_mfp_core(input, mfp_plan, now, err_collector);
arrange.write().apply_updates(now, all_updates)?;
Ok(())
};
err_collector.run(run_mfp);
// Deal with output:
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
// 2. Output the updates.
// 3. Truncate all updates within that range.
let from = arrange.read().last_compaction_time().map(|n| n + 1);
let from = from.unwrap_or(repr::Timestamp::MIN);
let output_kv = arrange.read().get_updates_in_range(from..=now);
// the output is expected to be key -> empty val
let output = output_kv
.into_iter()
.map(|((key, _v), ts, diff)| (key, ts, diff))
.collect_vec();
send.give(output);
let run_compaction = || {
arrange.write().compaction_to(now)?;
Ok(())
};
err_collector.run(run_compaction);
// schedule next time this subgraph should run
scheduler.schedule_for_arrange(&arrange.read(), now);
}
/// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator,
/// return the output updates **And** possibly any number of errors that occurred during the evaluation
fn eval_mfp_core(
input: impl IntoIterator<Item = DiffRow>,
mfp_plan: &MfpPlan,
now: repr::Timestamp,
err_collector: &ErrCollector,
) -> Vec<KeyValDiffRow> {
let mut all_updates = Vec::new();
for (mut row, _sys_time, diff) in input.into_iter() {
// this updates is expected to be only zero to two rows
let updates = mfp_plan.evaluate::<EvalError>(&mut row.inner, now, diff);
// TODO(discord9): refactor error handling
// Expect error in a single row to not interrupt the whole evaluation
let updates = updates
.filter_map(|r| match r {
Ok((key, ts, diff)) => Some(((key, Row::empty()), ts, diff)),
Err((err, _ts, _diff)) => {
err_collector.push_err(err);
None
}
})
.collect_vec();
all_updates.extend(updates);
}
all_updates
}
#[cfg(test)]
mod test {
use std::cell::RefCell;
use std::rc::Rc;
use datatypes::data_type::ConcreteDataType;
use hydroflow::scheduled::graph::Hydroflow;
use super::*;
use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check};
use crate::compute::state::DataflowState;
use crate::expr::{self, BinaryFunc, GlobalId};
/// test if temporal filter works properly
/// namely: if mfp operator can schedule a delete at the correct time
#[test]
fn test_render_mfp_with_temporal() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1i64.into()]), 0, 1),
(Row::new(vec![2i64.into()]), 0, 1),
(Row::new(vec![3i64.into()]), 0, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
// temporal filter: now <= col(0) < now + 4
let mfp = MapFilterProject::new(1)
.filter(vec![
ScalarExpr::Column(0)
.call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype()))
.call_binary(
ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
BinaryFunc::Gte,
),
ScalarExpr::Column(0)
.call_binary(
ScalarExpr::literal(4i64.into(), ConcreteDataType::int64_datatype()),
BinaryFunc::SubInt64,
)
.call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype()))
.call_binary(
ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
BinaryFunc::Lt,
),
])
.unwrap();
let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap();
let output = get_output_handle(&mut ctx, bundle);
// drop ctx here to simulate actual process of compile first, run later scenario
drop(ctx);
// expected output at given time
let expected_output = BTreeMap::from([
(
0, // time
vec![
(Row::new(vec![1i64.into()]), 0, 1),
(Row::new(vec![2i64.into()]), 0, 1),
(Row::new(vec![3i64.into()]), 0, 1),
],
),
(
2, // time
vec![(Row::new(vec![1i64.into()]), 2, -1)],
),
(
3, // time
vec![(Row::new(vec![2i64.into()]), 3, -1)],
),
(
4, // time
vec![(Row::new(vec![3i64.into()]), 4, -1)],
),
]);
run_and_check(&mut state, &mut df, 0..5, expected_output, output);
}
/// test if mfp operator without temporal filter works properly
/// that is it filter the rows correctly
#[test]
fn test_render_mfp() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1.into()]), 1, 1),
(Row::new(vec![2.into()]), 2, 1),
(Row::new(vec![3.into()]), 3, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
// filter: col(0)>1
let mfp = MapFilterProject::new(1)
.filter(vec![ScalarExpr::Column(0).call_binary(
ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()),
BinaryFunc::Gt,
)])
.unwrap();
let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap();
let output = get_output_handle(&mut ctx, bundle);
drop(ctx);
let expected = BTreeMap::from([
(2, vec![(Row::new(vec![2.into()]), 2, 1)]),
(3, vec![(Row::new(vec![3.into()]), 3, 1)]),
]);
run_and_check(&mut state, &mut df, 1..5, expected, output);
}
}

View File

@@ -21,6 +21,7 @@ use hydroflow::scheduled::SubgraphId;
use crate::compute::types::ErrCollector;
use crate::repr::{self, Timestamp};
use crate::utils::{ArrangeHandler, Arrangement};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
@@ -38,9 +39,26 @@ pub struct DataflowState {
/// error collector local to this `ComputeState`,
/// useful for distinguishing errors from different `Hydroflow`
err_collector: ErrCollector,
/// save all used arrange in this dataflow, since usually there is no delete operation
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
arrange_used: Vec<ArrangeHandler>,
}
impl DataflowState {
pub fn new_arrange(&mut self, name: Option<Vec<String>>) -> ArrangeHandler {
let arrange = name
.map(Arrangement::new_with_name)
.unwrap_or_else(Arrangement::new);
let arr = ArrangeHandler::from(arrange);
// mark this arrange as used in this dataflow
self.arrange_used.push(
arr.clone_future_only()
.expect("No write happening at this point"),
);
arr
}
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
///
/// return true if any subgraph actually executed
@@ -85,8 +103,9 @@ impl DataflowState {
}
}
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Scheduler {
// this scheduler is shared with `DataflowState`, so it can schedule subgraph
schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
cur_subgraph: Rc<RefCell<Option<SubgraphId>>>,
}
@@ -100,6 +119,12 @@ impl Scheduler {
subgraph_queue.push_back(*subgraph);
}
pub fn schedule_for_arrange(&self, arrange: &Arrangement, now: Timestamp) {
if let Some(i) = arrange.get_next_update_time(&now) {
self.schedule_at(i)
}
}
pub fn set_cur_subgraph(&self, subgraph: SubgraphId) {
self.cur_subgraph.replace(Some(subgraph));
}

View File

@@ -28,7 +28,7 @@ use crate::expr::{EvalError, ScalarExpr};
use crate::repr::DiffRow;
use crate::utils::{ArrangeHandler, Arrangement};
pub type Toff = TeeingHandoff<DiffRow>;
pub type Toff<T = DiffRow> = TeeingHandoff<T>;
/// A collection, represent a collections of data that is received from a handoff.
pub struct Collection<T: 'static> {
@@ -107,12 +107,17 @@ impl Arranged {
/// of reading the data from the collection.
pub struct CollectionBundle {
/// This is useful for passively reading the new updates from the collection
///
/// Invariant: the timestamp of the updates should always not greater than now, since future updates should be stored in the arrangement
pub collection: Collection<DiffRow>,
/// the key [`ScalarExpr`] indicate how the keys(also a [`Row`]) used in Arranged is extract from collection's [`Row`]
/// So it is the "index" of the arrangement
///
/// The `Arranged` is the actual data source, it can be used to read the data from the collection by
/// using the key indicated by the `Vec<ScalarExpr>`
/// There is a false positive in using `Vec<ScalarExpr>` as key due to `ScalarExpr::Literal`
/// contain a `Value` which have `bytes` variant
#[allow(clippy::mutable_key_type)]
pub arranged: BTreeMap<Vec<ScalarExpr>, Arranged>,
}
@@ -151,12 +156,16 @@ impl ErrCollector {
self.inner.borrow_mut().push_back(err)
}
pub fn run<F>(&self, f: F)
pub fn run<F, R>(&self, f: F) -> Option<R>
where
F: FnOnce() -> Result<(), EvalError>,
F: FnOnce() -> Result<R, EvalError>,
{
if let Err(e) = f() {
self.push_err(e)
match f() {
Ok(r) => Some(r),
Err(e) => {
self.push_err(e);
None
}
}
}
}

View File

@@ -52,6 +52,13 @@ pub enum EvalError {
location: Location,
},
#[snafu(display("{msg}"))]
DataType {
msg: String,
source: datatypes::Error,
location: Location,
},
#[snafu(display("Invalid argument: {reason}"))]
InvalidArgument { reason: String, location: Location },

View File

@@ -89,6 +89,11 @@ impl MapFilterProject {
}
}
/// The number of columns expected in the output row.
pub fn output_arity(&self) -> usize {
self.projection.len()
}
/// Given two mfps, return an mfp that applies one
/// followed by the other.
/// Note that the arguments are in the opposite order

View File

@@ -18,7 +18,9 @@
//! So the overhead is acceptable.
//!
//! Currently support sum, count, any, all and min/max(with one caveat that min/max can't support delete with aggregate).
//! TODO: think of better ways to not ser/de every time a accum needed to be updated, since it's in a tight loop
use std::any::type_name;
use std::fmt::Display;
use common_decimal::Decimal128;
@@ -39,6 +41,7 @@ use crate::repr::Diff;
#[enum_dispatch]
pub trait Accumulator: Sized {
fn into_state(self) -> Vec<Value>;
fn update(
&mut self,
aggr_fn: &AggregateFunc,
@@ -68,6 +71,21 @@ pub struct Bool {
falses: Diff,
}
impl Bool {
/// Expect two `Diff` type values, one for `true` and one for `false`.
pub fn try_from_iter<I>(iter: &mut I) -> Result<Self, EvalError>
where
I: Iterator<Item = Value>,
{
Ok(Self {
trues: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
falses: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
})
}
}
impl TryFrom<Vec<Value>> for Bool {
type Error = EvalError;
@@ -78,13 +96,9 @@ impl TryFrom<Vec<Value>> for Bool {
reason: "Bool Accumulator state should have 2 values",
}
);
let mut iter = state.into_iter();
Ok(Self {
trues: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?,
falses: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?,
})
Self::try_from_iter(&mut iter)
}
}
@@ -157,6 +171,24 @@ pub struct SimpleNumber {
non_nulls: Diff,
}
impl SimpleNumber {
/// Expect one `Decimal128` and one `Diff` type values.
/// The `Decimal128` type is used to store the sum of all non-NULL values.
/// The `Diff` type is used to count the number of non-NULL values.
pub fn try_from_iter<I>(iter: &mut I) -> Result<Self, EvalError>
where
I: Iterator<Item = Value>,
{
Ok(Self {
accum: Decimal128::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?
.val(),
non_nulls: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
})
}
}
impl TryFrom<Vec<Value>> for SimpleNumber {
type Error = EvalError;
@@ -168,13 +200,7 @@ impl TryFrom<Vec<Value>> for SimpleNumber {
}
);
let mut iter = state.into_iter();
Ok(Self {
accum: Decimal128::try_from(iter.next().unwrap())
.map_err(err_try_from_val)?
.val(),
non_nulls: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?,
})
Self::try_from_iter(&mut iter)
}
}
@@ -272,6 +298,34 @@ pub struct Float {
non_nulls: Diff,
}
impl Float {
/// Expect first value to be `OrderedF64` and the rest four values to be `Diff` type values.
pub fn try_from_iter<I>(iter: &mut I) -> Result<Self, EvalError>
where
I: Iterator<Item = Value>,
{
let mut ret = Self {
accum: OrderedF64::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
pos_infs: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
neg_infs: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
nans: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
non_nulls: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
};
// This prevent counter-intuitive behavior of summing over no values having non-zero results
if ret.non_nulls == 0 {
ret.accum = OrderedFloat::from(0.0);
}
Ok(ret)
}
}
impl TryFrom<Vec<Value>> for Float {
type Error = EvalError;
@@ -385,6 +439,26 @@ pub struct OrdValue {
non_nulls: Diff,
}
impl OrdValue {
pub fn try_from_iter<I>(iter: &mut I) -> Result<Self, EvalError>
where
I: Iterator<Item = Value>,
{
Ok(Self {
val: {
let v = iter.next().ok_or_else(fail_accum::<Self>)?;
if v == Value::Null {
None
} else {
Some(v)
}
},
non_nulls: Diff::try_from(iter.next().ok_or_else(fail_accum::<Self>)?)
.map_err(err_try_from_val)?,
})
}
}
impl TryFrom<Vec<Value>> for OrdValue {
type Error = EvalError;
@@ -593,6 +667,37 @@ impl Accum {
})
}
pub fn try_from_iter(
aggr_fn: &AggregateFunc,
iter: &mut impl Iterator<Item = Value>,
) -> Result<Self, EvalError> {
match aggr_fn {
AggregateFunc::Any
| AggregateFunc::All
| AggregateFunc::MaxBool
| AggregateFunc::MinBool => Ok(Self::from(Bool::try_from_iter(iter)?)),
AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64 => Ok(Self::from(SimpleNumber::try_from_iter(iter)?)),
AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => {
Ok(Self::from(Float::try_from_iter(iter)?))
}
f if f.is_max() || f.is_min() || matches!(f, AggregateFunc::Count) => {
Ok(Self::from(OrdValue::try_from_iter(iter)?))
}
f => Err(InternalSnafu {
reason: format!(
"Accumulator does not support this aggregation function: {:?}",
f
),
}
.build()),
}
}
/// try to convert a vector of value into given aggregate function's accumulator
pub fn try_into_accum(aggr_fn: &AggregateFunc, state: Vec<Value>) -> Result<Self, EvalError> {
match aggr_fn {
@@ -623,6 +728,16 @@ impl Accum {
}
}
fn fail_accum<T>() -> EvalError {
InternalSnafu {
reason: format!(
"list of values exhausted before a accum of type {} can be build from it",
type_name::<T>()
),
}
.build()
}
fn err_try_from_val<T: Display>(reason: T) -> EvalError {
TryFromValueSnafu {
msg: reason.to_string(),
@@ -775,7 +890,9 @@ mod test {
let mut acc = Accum::new_accum(&aggr_fn)?;
acc.update_batch(&aggr_fn, input.clone())?;
let row = acc.into_state();
let acc = Accum::try_into_accum(&aggr_fn, row)?;
let acc = Accum::try_into_accum(&aggr_fn, row.clone())?;
let alter_acc = Accum::try_from_iter(&aggr_fn, &mut row.into_iter())?;
assert_eq!(acc, alter_acc);
Ok(acc)
};
let acc = match create_and_insert() {

View File

@@ -112,18 +112,21 @@ impl AggregateFunc {
/// Expect self to be accumulable aggregate function, i.e. sum/count
///
/// TODO(discord9): deal with overflow&better accumulator
pub fn eval_diff_accumulable<I>(
pub fn eval_diff_accumulable<A, I>(
&self,
accum: Vec<Value>,
accum: A,
value_diffs: I,
) -> Result<(Value, Vec<Value>), EvalError>
where
A: IntoIterator<Item = Value>,
I: IntoIterator<Item = (Value, Diff)>,
{
let mut accum = if accum.is_empty() {
let mut accum = accum.into_iter().peekable();
let mut accum = if accum.peek().is_none() {
Accum::new_accum(self)?
} else {
Accum::try_into_accum(self, accum)?
Accum::try_from_iter(self, &mut accum)?
};
accum.update_batch(self, value_diffs)?;
let res = accum.eval(self)?;

View File

@@ -21,12 +21,12 @@ mod reduce;
use datatypes::arrow::ipc::Map;
use serde::{Deserialize, Serialize};
pub(crate) use self::reduce::{AccumulablePlan, KeyValPlan, ReducePlan};
use crate::adapter::error::Error;
use crate::expr::{
AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr,
};
use crate::plan::join::JoinPlan;
pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan};
use crate::repr::{ColumnType, DiffRow, RelationType};
/// A plan for a dataflow component. But with type to indicate the output type of the relation.

View File

@@ -47,7 +47,33 @@ pub struct AccumulablePlan {
/// Each element represents:
/// (index of aggr output, index of value among inputs, aggr expr)
/// These will all be rendered together in one dataflow fragment.
pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
/// Same as above but for all of the `DISTINCT` accumulable aggregations.
pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
///
/// Invariant: the output index is the index of the aggregation in `full_aggrs`
/// which means output index is always smaller than the length of `full_aggrs`
pub simple_aggrs: Vec<AggrWithIndex>,
/// Same as `simple_aggrs` but for all of the `DISTINCT` accumulable aggregations.
pub distinct_aggrs: Vec<AggrWithIndex>,
}
/// Invariant: the output index is the index of the aggregation in `full_aggrs`
/// which means output index is always smaller than the length of `full_aggrs`
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
pub struct AggrWithIndex {
/// aggregation expression
pub expr: AggregateExpr,
/// index of aggr input among input row
pub input_idx: usize,
/// index of aggr output among output row
pub output_idx: usize,
}
impl AggrWithIndex {
/// Create a new `AggrWithIndex`
pub fn new(expr: AggregateExpr, input_idx: usize, output_idx: usize) -> Self {
Self {
expr,
input_idx,
output_idx,
}
}
}

View File

@@ -160,7 +160,7 @@ impl Row {
self.inner.iter()
}
/// eturns the number of elements in the row, also known as its 'length'.
/// Returns the number of elements in the row, also known as its 'length'.
pub fn len(&self) -> usize {
self.inner.len()
}

View File

@@ -52,7 +52,7 @@ use crate::expr::{
AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, SafeMfpPlan, ScalarExpr,
TypedExpr, UnaryFunc, UnmaterializableFunc, VariadicFunc,
};
use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::{DataflowContext, FunctionExtensions};
@@ -265,9 +265,17 @@ impl TypedPlan {
reason: "Expect aggregate argument to be transformed into a column at this point",
})?;
if aggr_expr.distinct {
distinct_aggrs.push((output_column, input_column, aggr_expr.clone()));
distinct_aggrs.push(AggrWithIndex::new(
aggr_expr.clone(),
input_column,
output_column,
));
} else {
simple_aggrs.push((output_column, input_column, aggr_expr.clone()));
simple_aggrs.push(AggrWithIndex::new(
aggr_expr.clone(),
input_column,
output_column,
));
}
}
let accum_plan = AccumulablePlan {
@@ -327,7 +335,7 @@ mod test {
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![aggr_expr.clone()],
simple_aggrs: vec![(0, 0, aggr_expr.clone())],
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
distinct_aggrs: vec![],
}),
}),
@@ -379,7 +387,7 @@ mod test {
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![aggr_expr.clone()],
simple_aggrs: vec![(0, 0, aggr_expr.clone())],
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
distinct_aggrs: vec![],
}),
}),
@@ -430,7 +438,7 @@ mod test {
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![aggr_expr.clone()],
simple_aggrs: vec![(0, 0, aggr_expr.clone())],
simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)],
distinct_aggrs: vec![],
}),
}),

View File

@@ -131,6 +131,8 @@ pub struct Arrangement {
/// TODO: batch size balancing?
spine: Spine,
/// if set to false, will not update current value of the arrangement, useful for case like `map -> arrange -> reduce`
///
/// in which `arrange` operator only need future updates, and don't need to keep all updates
full_arrangement: bool,
/// flag to mark that this arrangement haven't been written to, so that it can be cloned and shared
is_written: bool,
@@ -138,6 +140,7 @@ pub struct Arrangement {
expire_state: Option<KeyExpiryManager>,
/// the time that the last compaction happened, also know as current time
last_compaction_time: Option<Timestamp>,
name: Vec<String>,
}
impl Arrangement {
@@ -149,10 +152,22 @@ impl Arrangement {
is_written: false,
expire_state: None,
last_compaction_time: None,
name: vec![],
}
}
/// apply updates into spine, all updates should have timestamps that are larger than spine's first key
pub fn new_with_name(name: Vec<String>) -> Self {
Self {
spine: Default::default(),
full_arrangement: false,
is_written: false,
expire_state: None,
last_compaction_time: None,
name,
}
}
/// apply updates into spine, with no respect of whether the updates are in futures, past, or now
///
/// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired
pub fn apply_updates(
@@ -453,6 +468,11 @@ fn compact_diff_row(old_row: Option<DiffRow>, new_row: &DiffRow) -> Option<DiffR
}
}
/// Simply a type alias for ReadGuard of Arrangement
pub type ArrangeReader<'a> = tokio::sync::RwLockReadGuard<'a, Arrangement>;
/// Simply a type alias for WriteGuard of Arrangement
pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>;
/// A handler to the inner Arrangement, can be cloned and shared, useful for query it's inner state
#[derive(Debug)]
pub struct ArrangeHandler {
@@ -494,16 +514,22 @@ impl ArrangeHandler {
///
/// it's a cheap operation, since it's `Arc-ed` and only clone the `Arc`
pub fn clone_full_arrange(&self) -> Option<Self> {
if self.read().is_written {
return None;
{
let zelf = self.read();
if !zelf.full_arrangement && zelf.is_written {
return None;
}
}
let mut arr = self.write();
arr.full_arrangement = true;
drop(arr);
self.write().full_arrangement = true;
Some(Self {
inner: self.inner.clone(),
})
}
pub fn set_full_arrangement(&self, full: bool) {
self.write().full_arrangement = full;
}
}
#[cfg(test)]
@@ -566,7 +592,7 @@ mod test {
}
let arr2 = arr.clone_full_arrange();
assert!(arr2.is_none());
assert!(arr2.is_some());
{
let mut arr = arr.write();
assert_eq!(arr.spine.len(), 3);