feat(flow): render map&related tests (#3581)

* feat: render map&related tests

* chore: license header

* chore: update Cargo.lock&remove unused

* refactor: rename ComputeState to DataflowState

* chore: use org fork

* chore: fix typos

* chore: per review

* chore: more explain to use `VecDeque` in err collector

* chore: typos

* chore: more comment on `Plan::Let`

* chore: typos

* refactor mfp rendering

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: update `now` in closure

* feat: use insert_local

* chore: remove unused

* chore: per review

* chore: fmt comment

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
discord9
2024-04-08 19:36:07 +08:00
committed by GitHub
parent 32b9639d7c
commit 28fd0dc276
12 changed files with 867 additions and 35 deletions

43
Cargo.lock generated
View File

@@ -4059,9 +4059,8 @@ dependencies = [
[[package]]
name = "hydroflow"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5129724896b4c3cf12f8e5f5af2f1d94b4c5933ae911189747025c6a5ff1346"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
dependencies = [
"bincode",
"byteorder",
@@ -4092,9 +4091,8 @@ dependencies = [
[[package]]
name = "hydroflow_datalog"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41813c88b02f3bfa8f5962e125495aa47c8d382cf5d135b02da40af4342bc6fb"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
dependencies = [
"hydroflow_datalog_core",
"proc-macro-crate 1.3.1",
@@ -4105,9 +4103,8 @@ dependencies = [
[[package]]
name = "hydroflow_datalog_core"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea77a3b2f09bba3d461f9ce0dee28798d3b07dafe77fc46de4675155f5925e53"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
dependencies = [
"hydroflow_lang",
"proc-macro-crate 1.3.1",
@@ -4121,9 +4118,8 @@ dependencies = [
[[package]]
name = "hydroflow_lang"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3191eee8ef49b4a814e4c33a0ce0d7470b733dc6118ea744f7f15168c38803f"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
dependencies = [
"auto_impl",
"clap 4.4.11",
@@ -4142,9 +4138,8 @@ dependencies = [
[[package]]
name = "hydroflow_macro"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9be25d2a927fe4e6afe3e204786e968e983f53f313cc561950ff1cd09ecd92fc"
version = "0.6.0"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
dependencies = [
"hydroflow_lang",
"itertools 0.10.5",
@@ -4646,9 +4641,8 @@ dependencies = [
[[package]]
name = "lattices"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f3bff82353a971b61106a49369cfc1bd8398661107eadcb5387fcd21c43cac9"
version = "0.5.3"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
dependencies = [
"cc-traits",
"sealed",
@@ -7294,9 +7288,8 @@ checksum = "3b7e158a385023d209d6d5f2585c4b468f6dcb3dd5aca9b75c4f1678c05bb375"
[[package]]
name = "pusherator"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd486cb5153e0d8fa91d3daebae48917ae299b2569cc79901922f3923dc312ef"
version = "0.0.5"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
dependencies = [
"either",
"variadics",
@@ -11368,9 +11361,11 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "variadics"
version = "0.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4500f518837578bf2d62d9c12f47ecb5b5279da689574793b7bace8138b4784"
version = "0.0.4"
source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94"
dependencies = [
"sealed",
]
[[package]]
name = "vcpkg"

View File

@@ -16,7 +16,9 @@ common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
enum_dispatch = "0.3"
hydroflow = "0.5.0"
# This fork is simply for keeping our dependency in our org, and pin the version
# it is the same with upstream repo
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", rev = "ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" }
itertools.workspace = true
num-traits = "0.2"
serde.workspace = true

17
src/flow/src/compute.rs Normal file
View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod render;
mod state;
mod types;

View File

@@ -0,0 +1,536 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! In this file, `render` means convert a static `Plan` into a Executable Dataflow
//!
//! And the [`Context`] is the environment for the render process, it contains all the necessary information for the render process
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use hydroflow::lattices::cc_traits::Get;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::port::{PortCtx, SEND};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use super::state::Scheduler;
use crate::adapter::error::{Error, EvalSnafu, InvalidQuerySnafu};
use crate::compute::state::DataflowState;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::expr::{
self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr,
};
use crate::plan::Plan;
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::{ArrangeHandler, Arrangement};
/// The Context for build a Operator with id of `GlobalId`
pub struct Context<'referred, 'df> {
pub id: GlobalId,
pub df: &'referred mut Hydroflow<'df>,
pub compute_state: &'referred mut DataflowState,
/// a list of all collections being used in the operator
pub input_collection: BTreeMap<GlobalId, CollectionBundle>,
/// used by `Get`/`Let` Plan for getting/setting local variables
///
/// TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead
local_scope: Vec<BTreeMap<LocalId, CollectionBundle>>,
// Collect all errors in this operator's evaluation
err_collector: ErrCollector,
}
impl<'referred, 'df> Drop for Context<'referred, 'df> {
fn drop(&mut self) {
for bundle in std::mem::take(&mut self.input_collection)
.into_values()
.chain(
std::mem::take(&mut self.local_scope)
.into_iter()
.flat_map(|v| v.into_iter())
.map(|(_k, v)| v),
)
{
bundle.collection.into_inner().drop(self.df);
drop(bundle.arranged);
}
// The automatically generated "drop glue" which recursively calls the destructors of all the fields (including the now empty `input_collection`)
}
}
impl<'referred, 'df> Context<'referred, 'df> {
pub fn insert_global(&mut self, id: GlobalId, collection: CollectionBundle) {
self.input_collection.insert(id, collection);
}
pub fn insert_local(&mut self, id: LocalId, collection: CollectionBundle) {
if let Some(last) = self.local_scope.last_mut() {
last.insert(id, collection);
} else {
let first = BTreeMap::from([(id, collection)]);
self.local_scope.push(first);
}
}
}
// There is a false positive in using `Vec<ScalarExpr>` as key
#[allow(clippy::mutable_key_type)]
impl<'referred, 'df> Context<'referred, 'df> {
/// Interpret and execute plan
///
/// return the output of this plan
pub fn render_plan(&mut self, plan: Plan) -> Result<CollectionBundle, Error> {
match plan {
Plan::Constant { rows } => Ok(self.render_constant(rows)),
Plan::Get { id } => self.get_by_id(id),
Plan::Let { id, value, body } => self.eval_let(id, value, body),
Plan::Mfp { input, mfp } => {
self.render_map_filter_project_into_executable_dataflow(input, mfp)
}
Plan::Reduce { .. } => todo!(),
Plan::Join { .. } => todo!(),
Plan::Union { .. } => todo!(),
}
}
/// render Constant, will only emit the `rows` once.
pub fn render_constant(&mut self, mut rows: Vec<DiffRow>) -> CollectionBundle {
let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant");
self.df
.add_subgraph_source("Constant", send_port, move |_ctx, send_port| {
if rows.is_empty() {
return;
}
send_port.give(std::mem::take(&mut rows));
});
CollectionBundle::from_collection(Collection::from_port(recv_port))
}
pub fn get_by_id(&mut self, id: expr::Id) -> Result<CollectionBundle, Error> {
let ret = match id {
expr::Id::Local(local) => {
let bundle = self
.local_scope
.iter()
.rev()
.find_map(|scope| scope.get(&local))
.with_context(|| InvalidQuerySnafu {
reason: format!("Local variable {:?} not found", local),
})?;
bundle.clone(self.df)
}
expr::Id::Global(id) => {
let bundle = self
.input_collection
.get(&id)
.with_context(|| InvalidQuerySnafu {
reason: format!("Collection {:?} not found", id),
})?;
bundle.clone(self.df)
}
};
Ok(ret)
}
/// Eval `Let` operator, useful for assigning a value to a local variable
pub fn eval_let(
&mut self,
id: LocalId,
value: Box<Plan>,
body: Box<Plan>,
) -> Result<CollectionBundle, Error> {
let value = self.render_plan(*value)?;
self.local_scope.push(Default::default());
self.insert_local(id, value);
let ret = self.render_plan(*body)?;
Ok(ret)
}
/// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time
/// TODO(discord9): schedule mfp operator to run when temporal filter need
///
/// `MapFilterProject`(`mfp` for short) is scheduled to run when there is enough amount of input updates
/// ***or*** when a future update in it's output buffer(a `Arrangement`) is supposed to emit now.
pub fn render_map_filter_project_into_executable_dataflow(
&mut self,
input: Box<Plan>,
mfp: MapFilterProject,
) -> Result<CollectionBundle, Error> {
let input = self.render_plan(*input)?;
// TODO(discord9): consider if check if contain temporal to determine if
// need arrange or not, or does this added complexity worth it
let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("mfp");
let input_arity = mfp.input_arity;
// default to have a arrange with only future updates, so it can be empty if no temporal filter is applied
// as stream only sends current updates and etc.
let arrange = Arrangement::new();
let arrange_handler = ArrangeHandler::from(arrange.clone());
let arrange_handler_inner = ArrangeHandler::from(arrange);
// This closure capture following variables:
let mfp_plan = MfpPlan::create_from(mfp).context(EvalSnafu)?;
let now = self.compute_state.current_time_ref();
let err_collector = self.err_collector.clone();
// TODO(discord9): better way to schedule future run
let scheduler = self.compute_state.get_scheduler();
let scheduler_inner = scheduler.clone();
let subgraph = self.df.add_subgraph_in_out(
"mfp",
input.collection.into_inner(),
out_send_port,
move |_ctx, recv, send| {
// mfp only need to passively receive updates from recvs
let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
mfp_subgraph(
&arrange_handler_inner,
data,
&mfp_plan,
*now.borrow(),
&err_collector,
&scheduler_inner,
send,
);
},
);
// register current subgraph in scheduler for future scheduling
scheduler.set_cur_subgraph(subgraph);
let arranged = BTreeMap::from([(
(0..input_arity).map(ScalarExpr::Column).collect_vec(),
Arranged::new(arrange_handler),
)]);
let bundle = CollectionBundle {
collection: Collection::from_port(out_recv_port),
arranged,
};
Ok(bundle)
}
}
fn mfp_subgraph(
arrange: &ArrangeHandler,
input: impl IntoIterator<Item = DiffRow>,
mfp_plan: &MfpPlan,
now: repr::Timestamp,
err_collector: &ErrCollector,
scheduler: &Scheduler,
send: &PortCtx<SEND, Toff>,
) {
let run_mfp = || {
let all_updates = eval_mfp_core(input, mfp_plan, now, err_collector);
arrange.write().apply_updates(now, all_updates)?;
Ok(())
};
err_collector.run(run_mfp);
// Deal with output:
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
// 2. Output the updates.
// 3. Truncate all updates within that range.
let from = arrange.read().last_compaction_time().map(|n| n + 1);
let from = from.unwrap_or(repr::Timestamp::MIN);
let output_kv = arrange.read().get_updates_in_range(from..=now);
// the output is expected to be key -> empty val
let output = output_kv
.into_iter()
.map(|((key, _v), ts, diff)| (key, ts, diff))
.collect_vec();
send.give(output);
let run_compaction = || {
arrange.write().compaction_to(now)?;
Ok(())
};
err_collector.run(run_compaction);
// schedule the next time this operator should run
if let Some(i) = arrange.read().get_next_update_time(&now) {
scheduler.schedule_at(i)
}
}
/// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator,
/// return the output updates **And** possibly any number of errors that occurred during the evaluation
fn eval_mfp_core(
input: impl IntoIterator<Item = DiffRow>,
mfp_plan: &MfpPlan,
now: repr::Timestamp,
err_collector: &ErrCollector,
) -> Vec<KeyValDiffRow> {
let mut all_updates = Vec::new();
for (mut row, _sys_time, diff) in input.into_iter() {
// this updates is expected to be only zero to two rows
let updates = mfp_plan.evaluate::<EvalError>(&mut row.inner, now, diff);
// TODO(discord9): refactor error handling
// Expect error in a single row to not interrupt the whole evaluation
let updates = updates
.filter_map(|r| match r {
Ok((key, ts, diff)) => Some(((key, Row::empty()), ts, diff)),
Err((err, _ts, _diff)) => {
err_collector.push_err(err);
None
}
})
.collect_vec();
all_updates.extend(updates);
}
all_updates
}
#[cfg(test)]
mod test {
use std::cell::RefCell;
use std::rc::Rc;
use common_time::DateTime;
use datatypes::data_type::ConcreteDataType;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::graph_ext::GraphExt;
use hydroflow::scheduled::handoff::VecHandoff;
use super::*;
use crate::expr::BinaryFunc;
use crate::repr::Row;
fn harness_test_ctx<'r, 'h>(
df: &'r mut Hydroflow<'h>,
state: &'r mut DataflowState,
) -> Context<'r, 'h> {
let err_collector = state.get_err_collector();
Context {
id: GlobalId::User(0),
df,
compute_state: state,
input_collection: BTreeMap::new(),
local_scope: Default::default(),
err_collector,
}
}
/// test if temporal filter works properly
/// namely: if mfp operator can schedule a delete at the correct time
#[test]
fn test_render_mfp_with_temporal() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1i64.into()]), 1, 1),
(Row::new(vec![2i64.into()]), 2, 1),
(Row::new(vec![3i64.into()]), 3, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
// temporal filter: now <= col(0) < now + 4
let mfp = MapFilterProject::new(1)
.filter(vec![
ScalarExpr::Column(0)
.call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype()))
.call_binary(
ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
BinaryFunc::Gte,
),
ScalarExpr::Column(0)
.call_binary(
ScalarExpr::literal(4i64.into(), ConcreteDataType::int64_datatype()),
BinaryFunc::SubInt64,
)
.call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype()))
.call_binary(
ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now),
BinaryFunc::Lt,
),
])
.unwrap();
let mut bundle = ctx
.render_map_filter_project_into_executable_dataflow(Box::new(input_plan), mfp)
.unwrap();
let collection = bundle.collection;
let _arranged = bundle.arranged.pop_first().unwrap().1;
let output = Rc::new(RefCell::new(vec![]));
let output_inner = output.clone();
let _subgraph = ctx.df.add_subgraph_sink(
"test_render_constant",
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
output_inner.borrow_mut().clear();
output_inner.borrow_mut().extend(res);
},
);
// drop ctx here to simulate actual process of compile first, run later scenario
drop(ctx);
// expected output at given time
let expected_output = BTreeMap::from([
(
0, // time
vec![
(Row::new(vec![1i64.into()]), 0, 1),
(Row::new(vec![2i64.into()]), 0, 1),
(Row::new(vec![3i64.into()]), 0, 1),
],
),
(
2, // time
vec![(Row::new(vec![1i64.into()]), 2, -1)],
),
(
3, // time
vec![(Row::new(vec![2i64.into()]), 3, -1)],
),
(
4, // time
vec![(Row::new(vec![3i64.into()]), 4, -1)],
),
]);
for now in 0i64..5 {
state.set_current_ts(now);
state.run_available_with_schedule(&mut df);
assert!(state.get_err_collector().inner.borrow().is_empty());
if let Some(expected) = expected_output.get(&now) {
assert_eq!(*output.borrow(), *expected);
} else {
assert_eq!(*output.borrow(), vec![]);
};
output.borrow_mut().clear();
}
}
/// test if mfp operator without temporal filter works properly
/// that is it filter the rows correctly
#[test]
fn test_render_mfp() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::new(vec![1.into()]), 1, 1),
(Row::new(vec![2.into()]), 2, 1),
(Row::new(vec![3.into()]), 3, 1),
];
let collection = ctx.render_constant(rows.clone());
ctx.insert_global(GlobalId::User(1), collection);
let input_plan = Plan::Get {
id: expr::Id::Global(GlobalId::User(1)),
};
// filter: col(0)>1
let mfp = MapFilterProject::new(1)
.filter(vec![ScalarExpr::Column(0).call_binary(
ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()),
BinaryFunc::Gt,
)])
.unwrap();
let bundle = ctx
.render_map_filter_project_into_executable_dataflow(Box::new(input_plan), mfp)
.unwrap();
let collection = bundle.collection.clone(ctx.df);
ctx.df.add_subgraph_sink(
"test_render_constant",
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec();
assert_eq!(
res,
vec![
(Row::new(vec![2.into()]), 0, 1),
(Row::new(vec![3.into()]), 0, 1),
]
)
},
);
drop(ctx);
df.run_available();
}
/// test if constant operator works properly
/// that is it only emit once, not multiple times
#[test]
fn test_render_constant() {
let mut df = Hydroflow::new();
let mut state = DataflowState::default();
let mut ctx = harness_test_ctx(&mut df, &mut state);
let rows = vec![
(Row::empty(), 1, 1),
(Row::empty(), 2, 1),
(Row::empty(), 3, 1),
];
let collection = ctx.render_constant(rows.clone());
let collection = collection.collection.clone(ctx.df);
let cnt = Rc::new(RefCell::new(0));
let cnt_inner = cnt.clone();
ctx.df.add_subgraph_sink(
"test_render_constant",
collection.into_inner(),
move |_ctx, recv| {
let data = recv.take_inner();
*cnt_inner.borrow_mut() += data.iter().map(|v| v.len()).sum::<usize>();
},
);
ctx.df.run_available();
assert_eq!(*cnt.borrow(), 3);
ctx.df.run_available();
assert_eq!(*cnt.borrow(), 3);
}
/// a simple example to show how to use source and sink
#[test]
fn example_source_sink() {
let mut df = Hydroflow::new();
let (send_port, recv_port) = df.make_edge::<_, VecHandoff<i32>>("test_handoff");
df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| {
for i in 0..10 {
send.give(vec![i]);
}
});
let sum = Rc::new(RefCell::new(0));
let sum_move = sum.clone();
let sink = df.add_subgraph_sink("test_handoff_sink", recv_port, move |_ctx, recv| {
let data = recv.take_inner();
*sum_move.borrow_mut() += data.iter().sum::<i32>();
});
df.run_available();
assert_eq!(sum.borrow().to_owned(), 45);
df.schedule_subgraph(sink);
df.run_available();
assert_eq!(sum.borrow().to_owned(), 45);
}
}

View File

@@ -0,0 +1,106 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::rc::Rc;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::SubgraphId;
use crate::compute::types::ErrCollector;
use crate::repr::{self, Timestamp};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
#[derive(Default)]
pub struct DataflowState {
/// it is important to use a deque to maintain the order of subgraph here
/// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule
schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
/// Frontier (in sys time) before which updates should not be emitted.
///
/// We *must* apply it to sinks, to ensure correct outputs.
/// We *should* apply it to sources and imported shared state, because it improves performance.
/// Which means it's also the current time in temporal filter to get current correct result
as_of: Rc<RefCell<Timestamp>>,
/// error collector local to this `ComputeState`,
/// useful for distinguishing errors from different `Hydroflow`
err_collector: ErrCollector,
}
impl DataflowState {
/// schedule all subgraph that need to run with time <= `as_of` and run_available()
///
/// return true if any subgraph actually executed
pub fn run_available_with_schedule(&mut self, df: &mut Hydroflow) -> bool {
// first split keys <= as_of into another map
let mut before = self
.schedule_subgraph
.borrow_mut()
.split_off(&(*self.as_of.borrow() + 1));
std::mem::swap(&mut before, &mut self.schedule_subgraph.borrow_mut());
for (_, v) in before {
for subgraph in v {
df.schedule_subgraph(subgraph);
}
}
df.run_available()
}
pub fn get_scheduler(&self) -> Scheduler {
Scheduler {
schedule_subgraph: self.schedule_subgraph.clone(),
cur_subgraph: Rc::new(RefCell::new(None)),
}
}
/// return a handle to the current time, will update when `as_of` is updated
///
/// so it can keep track of the current time even in a closure that is called later
pub fn current_time_ref(&self) -> Rc<RefCell<Timestamp>> {
self.as_of.clone()
}
pub fn current_ts(&self) -> Timestamp {
*self.as_of.borrow()
}
pub fn set_current_ts(&mut self, ts: Timestamp) {
self.as_of.replace(ts);
}
pub fn get_err_collector(&self) -> ErrCollector {
self.err_collector.clone()
}
}
#[derive(Clone)]
pub struct Scheduler {
schedule_subgraph: Rc<RefCell<BTreeMap<Timestamp, VecDeque<SubgraphId>>>>,
cur_subgraph: Rc<RefCell<Option<SubgraphId>>>,
}
impl Scheduler {
pub fn schedule_at(&self, next_run_time: Timestamp) {
let mut schedule_subgraph = self.schedule_subgraph.borrow_mut();
let subgraph = self.cur_subgraph.borrow();
let subgraph = subgraph.as_ref().expect("Set SubgraphId before schedule");
let subgraph_queue = schedule_subgraph.entry(next_run_time).or_default();
subgraph_queue.push_back(*subgraph);
}
pub fn set_cur_subgraph(&self, subgraph: SubgraphId) {
self.cur_subgraph.replace(Some(subgraph));
}
}

View File

@@ -0,0 +1,162 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::rc::Rc;
use std::sync::Arc;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::scheduled::handoff::TeeingHandoff;
use hydroflow::scheduled::port::RecvPort;
use hydroflow::scheduled::SubgraphId;
use tokio::sync::RwLock;
use crate::compute::render::Context;
use crate::expr::{EvalError, ScalarExpr};
use crate::repr::DiffRow;
use crate::utils::{ArrangeHandler, Arrangement};
pub type Toff = TeeingHandoff<DiffRow>;
/// A collection, represent a collections of data that is received from a handoff.
pub struct Collection<T: 'static> {
/// represent a stream of updates recv from this port
stream: RecvPort<TeeingHandoff<T>>,
}
impl<T: 'static + Clone> Collection<T> {
pub fn from_port(port: RecvPort<TeeingHandoff<T>>) -> Self {
Collection { stream: port }
}
/// clone a collection, require a mutable reference to the hydroflow instance
///
/// Note: need to be the same hydroflow instance that this collection is created from
pub fn clone(&self, df: &mut Hydroflow) -> Self {
Collection {
stream: self.stream.tee(df),
}
}
pub fn into_inner(self) -> RecvPort<TeeingHandoff<T>> {
self.stream
}
}
/// Arranged is a wrapper around `ArrangeHandler` that maintain a list of readers and a writer
pub struct Arranged {
pub arrangement: ArrangeHandler,
pub writer: Rc<RefCell<Option<SubgraphId>>>,
/// maintain a list of readers for the arrangement for the ease of scheduling
pub readers: Rc<RefCell<Vec<SubgraphId>>>,
}
impl Arranged {
pub fn new(arr: ArrangeHandler) -> Self {
Self {
arrangement: arr,
writer: Default::default(),
readers: Default::default(),
}
}
/// Copy it's future only updates, internally `Rc-ed` so it's cheap to copy
pub fn try_copy_future(&self) -> Option<Self> {
self.arrangement
.clone_future_only()
.map(|arrangement| Arranged {
arrangement,
readers: self.readers.clone(),
writer: self.writer.clone(),
})
}
/// Copy the full arrangement, including the future and the current updates.
///
/// Internally `Rc-ed` so it's cheap to copy
pub fn try_copy_full(&self) -> Option<Self> {
self.arrangement
.clone_full_arrange()
.map(|arrangement| Arranged {
arrangement,
readers: self.readers.clone(),
writer: self.writer.clone(),
})
}
pub fn add_reader(&self, id: SubgraphId) {
self.readers.borrow_mut().push(id)
}
}
/// A bundle of the various ways a collection can be represented.
///
/// This type maintains the invariant that it does contain at least one(or both) valid
/// source of data, either a collection or at least one arrangement. This is for convenience
/// of reading the data from the collection.
pub struct CollectionBundle {
/// This is useful for passively reading the new updates from the collection
pub collection: Collection<DiffRow>,
/// the key [`ScalarExpr`] indicate how the keys(also a [`Row`]) used in Arranged is extract from collection's [`Row`]
/// So it is the "index" of the arrangement
///
/// The `Arranged` is the actual data source, it can be used to read the data from the collection by
/// using the key indicated by the `Vec<ScalarExpr>`
pub arranged: BTreeMap<Vec<ScalarExpr>, Arranged>,
}
impl CollectionBundle {
pub fn from_collection(collection: Collection<DiffRow>) -> Self {
Self {
collection,
arranged: BTreeMap::default(),
}
}
pub fn clone(&self, df: &mut Hydroflow) -> Self {
Self {
collection: self.collection.clone(df),
arranged: self
.arranged
.iter()
.map(|(k, v)| (k.clone(), v.try_copy_future().unwrap()))
.collect(),
}
}
}
/// A thread local error collector, used to collect errors during the evaluation of the plan
///
/// usually only the first error matters, but store all of them just in case
///
/// Using a `VecDeque` to preserve the order of errors
/// when running dataflow continuously and need errors in order
#[derive(Default, Clone)]
pub struct ErrCollector {
pub inner: Rc<RefCell<VecDeque<EvalError>>>,
}
impl ErrCollector {
pub fn push_err(&self, err: EvalError) {
self.inner.borrow_mut().push_back(err)
}
pub fn run<F>(&self, f: F)
where
F: FnOnce() -> Result<(), EvalError>,
{
if let Err(e) = f() {
self.push_err(e)
}
}
}

View File

@@ -26,7 +26,7 @@ use crate::expr::error::{
TypeMismatchSnafu,
};
use crate::expr::{InvalidArgumentSnafu, ScalarExpr};
use crate::repr::Row;
use crate::repr::{value_to_internal_ts, Row};
/// UnmaterializableFunc is a function that can't be eval independently,
/// and require special handling
@@ -80,13 +80,17 @@ impl UnaryFunc {
}
}
Self::StepTimestamp => {
let ty = arg.data_type();
if let Value::DateTime(datetime) = arg {
let datetime = DateTime::from(datetime.val() + 1);
Ok(Value::from(datetime))
} else if let Ok(v) = value_to_internal_ts(arg) {
let datetime = DateTime::from(v + 1);
Ok(Value::from(datetime))
} else {
TypeMismatchSnafu {
expected: ConcreteDataType::datetime_datatype(),
actual: arg.data_type(),
actual: ty,
}
.fail()?
}

View File

@@ -97,7 +97,7 @@ impl AggregateFunc {
/// Eval value, diff with accumulator
///
/// Expect self to be accumulable aggregate functio, i.e. sum/count
/// Expect self to be accumulable aggregate function, i.e. sum/count
///
/// TODO(discord9): deal with overflow&better accumulator
pub fn eval_diff_accumulable<I>(

View File

@@ -16,6 +16,7 @@
#![allow(unused_imports)]
// allow unused for now because it should be use later
mod adapter;
mod compute;
mod expr;
mod plan;
mod repr;

View File

@@ -42,14 +42,21 @@ pub enum Plan {
/// Get CDC data from an source, be it external reference to an existing source or an internal
/// reference to a `Let` identifier
Get { id: Id },
/// Create a temporary collection from given `value``, and make this bind only available
/// Create a temporary collection from given `value`, and make this bind only available
/// in scope of `body`
///
/// Similar to this rust code snippet:
/// ```rust, ignore
/// {
/// let id = value;
/// body
/// }
Let {
id: LocalId,
value: Box<Plan>,
body: Box<Plan>,
},
/// Map, Filter, and Project operators.
/// Map, Filter, and Project operators. Chained together.
Mfp {
/// The input collection.
input: Box<Plan>,

View File

@@ -51,6 +51,8 @@ pub type DiffRow = (Row, Timestamp, Diff);
pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// Convert a value that is or can be converted to Datetime to internal timestamp
///
/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64`
pub fn value_to_internal_ts(value: Value) -> Result<Timestamp, EvalError> {
let is_supported_time_type = |arg: &Value| {
let ty = arg.data_type();

View File

@@ -207,7 +207,7 @@ impl Arrangement {
}
/// get the last compaction time
pub fn get_compaction(&self) -> Option<Timestamp> {
pub fn last_compaction_time(&self) -> Option<Timestamp> {
self.last_compaction_time
}
@@ -253,7 +253,7 @@ impl Arrangement {
/// advance time to `now` and consolidate all older(`now` included) updates to the first key
///
/// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired
pub fn set_compaction(&mut self, now: Timestamp) -> Result<Option<Duration>, EvalError> {
pub fn compaction_to(&mut self, now: Timestamp) -> Result<Option<Duration>, EvalError> {
let mut max_late_by: Option<Duration> = None;
let should_compact = self.split_lte(&now);
@@ -546,7 +546,7 @@ mod test {
vec![((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 1, 1)]
);
assert_eq!(arr.spine.len(), 3);
arr.set_compaction(1).unwrap();
arr.compaction_to(1).unwrap();
assert_eq!(arr.spine.len(), 3);
}
@@ -555,7 +555,7 @@ mod test {
{
let mut arr = arr.write();
assert_eq!(arr.spine.len(), 3);
arr.set_compaction(2).unwrap();
arr.compaction_to(2).unwrap();
assert_eq!(arr.spine.len(), 2);
}
}
@@ -605,7 +605,7 @@ mod test {
]
);
assert_eq!(arr.spine.len(), 3);
arr.set_compaction(1).unwrap();
arr.compaction_to(1).unwrap();
assert_eq!(arr.spine.len(), 3);
}
{