diff --git a/Cargo.lock b/Cargo.lock index b1c18ed53f..95b98d170a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4059,9 +4059,8 @@ dependencies = [ [[package]] name = "hydroflow" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5129724896b4c3cf12f8e5f5af2f1d94b4c5933ae911189747025c6a5ff1346" +version = "0.6.0" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" dependencies = [ "bincode", "byteorder", @@ -4092,9 +4091,8 @@ dependencies = [ [[package]] name = "hydroflow_datalog" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41813c88b02f3bfa8f5962e125495aa47c8d382cf5d135b02da40af4342bc6fb" +version = "0.6.0" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" dependencies = [ "hydroflow_datalog_core", "proc-macro-crate 1.3.1", @@ -4105,9 +4103,8 @@ dependencies = [ [[package]] name = "hydroflow_datalog_core" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea77a3b2f09bba3d461f9ce0dee28798d3b07dafe77fc46de4675155f5925e53" +version = "0.6.0" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" dependencies = [ "hydroflow_lang", "proc-macro-crate 1.3.1", @@ -4121,9 +4118,8 @@ dependencies = [ [[package]] name = "hydroflow_lang" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3191eee8ef49b4a814e4c33a0ce0d7470b733dc6118ea744f7f15168c38803f" +version = "0.6.0" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" dependencies = [ "auto_impl", "clap 4.4.11", @@ -4142,9 +4138,8 @@ dependencies = [ [[package]] name = "hydroflow_macro" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9be25d2a927fe4e6afe3e204786e968e983f53f313cc561950ff1cd09ecd92fc" +version = "0.6.0" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" dependencies = [ "hydroflow_lang", "itertools 0.10.5", @@ -4646,9 +4641,8 @@ dependencies = [ [[package]] name = "lattices" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f3bff82353a971b61106a49369cfc1bd8398661107eadcb5387fcd21c43cac9" +version = "0.5.3" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" dependencies = [ "cc-traits", "sealed", @@ -7294,9 +7288,8 @@ checksum = "3b7e158a385023d209d6d5f2585c4b468f6dcb3dd5aca9b75c4f1678c05bb375" [[package]] name = "pusherator" -version = "0.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd486cb5153e0d8fa91d3daebae48917ae299b2569cc79901922f3923dc312ef" +version = "0.0.5" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" dependencies = [ "either", "variadics", @@ -11368,9 +11361,11 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "variadics" -version = "0.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4500f518837578bf2d62d9c12f47ecb5b5279da689574793b7bace8138b4784" +version = "0.0.4" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" +dependencies = [ + "sealed", +] [[package]] name = "vcpkg" diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index ab8a090d71..d0eed66643 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -16,7 +16,9 @@ common-telemetry.workspace = true common-time.workspace = true datatypes.workspace = true enum_dispatch = "0.3" -hydroflow = "0.5.0" +# This fork is simply for keeping our dependency in our org, and pin the version +# it is the same with upstream repo +hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", rev = "ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" } itertools.workspace = true num-traits = "0.2" serde.workspace = true diff --git a/src/flow/src/compute.rs b/src/flow/src/compute.rs new file mode 100644 index 0000000000..bb17c79dd8 --- /dev/null +++ b/src/flow/src/compute.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod render; +mod state; +mod types; diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs new file mode 100644 index 0000000000..2effabad5c --- /dev/null +++ b/src/flow/src/compute/render.rs @@ -0,0 +1,536 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! In this file, `render` means convert a static `Plan` into a Executable Dataflow +//! +//! And the [`Context`] is the environment for the render process, it contains all the necessary information for the render process + +use std::cell::RefCell; +use std::collections::{BTreeMap, VecDeque}; +use std::rc::Rc; + +use hydroflow::lattices::cc_traits::Get; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::scheduled::graph_ext::GraphExt; +use hydroflow::scheduled::port::{PortCtx, SEND}; +use itertools::Itertools; +use snafu::{OptionExt, ResultExt}; + +use super::state::Scheduler; +use crate::adapter::error::{Error, EvalSnafu, InvalidQuerySnafu}; +use crate::compute::state::DataflowState; +use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; +use crate::expr::{ + self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr, +}; +use crate::plan::Plan; +use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; +use crate::utils::{ArrangeHandler, Arrangement}; + +/// The Context for build a Operator with id of `GlobalId` +pub struct Context<'referred, 'df> { + pub id: GlobalId, + pub df: &'referred mut Hydroflow<'df>, + pub compute_state: &'referred mut DataflowState, + /// a list of all collections being used in the operator + pub input_collection: BTreeMap, + /// used by `Get`/`Let` Plan for getting/setting local variables + /// + /// TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead + local_scope: Vec>, + // Collect all errors in this operator's evaluation + err_collector: ErrCollector, +} + +impl<'referred, 'df> Drop for Context<'referred, 'df> { + fn drop(&mut self) { + for bundle in std::mem::take(&mut self.input_collection) + .into_values() + .chain( + std::mem::take(&mut self.local_scope) + .into_iter() + .flat_map(|v| v.into_iter()) + .map(|(_k, v)| v), + ) + { + bundle.collection.into_inner().drop(self.df); + drop(bundle.arranged); + } + // The automatically generated "drop glue" which recursively calls the destructors of all the fields (including the now empty `input_collection`) + } +} + +impl<'referred, 'df> Context<'referred, 'df> { + pub fn insert_global(&mut self, id: GlobalId, collection: CollectionBundle) { + self.input_collection.insert(id, collection); + } + + pub fn insert_local(&mut self, id: LocalId, collection: CollectionBundle) { + if let Some(last) = self.local_scope.last_mut() { + last.insert(id, collection); + } else { + let first = BTreeMap::from([(id, collection)]); + self.local_scope.push(first); + } + } +} + +// There is a false positive in using `Vec` as key +#[allow(clippy::mutable_key_type)] +impl<'referred, 'df> Context<'referred, 'df> { + /// Interpret and execute plan + /// + /// return the output of this plan + pub fn render_plan(&mut self, plan: Plan) -> Result { + match plan { + Plan::Constant { rows } => Ok(self.render_constant(rows)), + Plan::Get { id } => self.get_by_id(id), + Plan::Let { id, value, body } => self.eval_let(id, value, body), + Plan::Mfp { input, mfp } => { + self.render_map_filter_project_into_executable_dataflow(input, mfp) + } + Plan::Reduce { .. } => todo!(), + Plan::Join { .. } => todo!(), + Plan::Union { .. } => todo!(), + } + } + + /// render Constant, will only emit the `rows` once. + pub fn render_constant(&mut self, mut rows: Vec) -> CollectionBundle { + let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant"); + + self.df + .add_subgraph_source("Constant", send_port, move |_ctx, send_port| { + if rows.is_empty() { + return; + } + send_port.give(std::mem::take(&mut rows)); + }); + + CollectionBundle::from_collection(Collection::from_port(recv_port)) + } + + pub fn get_by_id(&mut self, id: expr::Id) -> Result { + let ret = match id { + expr::Id::Local(local) => { + let bundle = self + .local_scope + .iter() + .rev() + .find_map(|scope| scope.get(&local)) + .with_context(|| InvalidQuerySnafu { + reason: format!("Local variable {:?} not found", local), + })?; + bundle.clone(self.df) + } + expr::Id::Global(id) => { + let bundle = self + .input_collection + .get(&id) + .with_context(|| InvalidQuerySnafu { + reason: format!("Collection {:?} not found", id), + })?; + bundle.clone(self.df) + } + }; + Ok(ret) + } + + /// Eval `Let` operator, useful for assigning a value to a local variable + pub fn eval_let( + &mut self, + id: LocalId, + value: Box, + body: Box, + ) -> Result { + let value = self.render_plan(*value)?; + + self.local_scope.push(Default::default()); + self.insert_local(id, value); + let ret = self.render_plan(*body)?; + Ok(ret) + } + + /// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time + /// TODO(discord9): schedule mfp operator to run when temporal filter need + /// + /// `MapFilterProject`(`mfp` for short) is scheduled to run when there is enough amount of input updates + /// ***or*** when a future update in it's output buffer(a `Arrangement`) is supposed to emit now. + pub fn render_map_filter_project_into_executable_dataflow( + &mut self, + input: Box, + mfp: MapFilterProject, + ) -> Result { + let input = self.render_plan(*input)?; + // TODO(discord9): consider if check if contain temporal to determine if + // need arrange or not, or does this added complexity worth it + let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("mfp"); + let input_arity = mfp.input_arity; + + // default to have a arrange with only future updates, so it can be empty if no temporal filter is applied + // as stream only sends current updates and etc. + let arrange = Arrangement::new(); + let arrange_handler = ArrangeHandler::from(arrange.clone()); + let arrange_handler_inner = ArrangeHandler::from(arrange); + + // This closure capture following variables: + let mfp_plan = MfpPlan::create_from(mfp).context(EvalSnafu)?; + let now = self.compute_state.current_time_ref(); + + let err_collector = self.err_collector.clone(); + + // TODO(discord9): better way to schedule future run + let scheduler = self.compute_state.get_scheduler(); + let scheduler_inner = scheduler.clone(); + + let subgraph = self.df.add_subgraph_in_out( + "mfp", + input.collection.into_inner(), + out_send_port, + move |_ctx, recv, send| { + // mfp only need to passively receive updates from recvs + let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter()); + + mfp_subgraph( + &arrange_handler_inner, + data, + &mfp_plan, + *now.borrow(), + &err_collector, + &scheduler_inner, + send, + ); + }, + ); + + // register current subgraph in scheduler for future scheduling + scheduler.set_cur_subgraph(subgraph); + + let arranged = BTreeMap::from([( + (0..input_arity).map(ScalarExpr::Column).collect_vec(), + Arranged::new(arrange_handler), + )]); + + let bundle = CollectionBundle { + collection: Collection::from_port(out_recv_port), + arranged, + }; + Ok(bundle) + } +} + +fn mfp_subgraph( + arrange: &ArrangeHandler, + input: impl IntoIterator, + mfp_plan: &MfpPlan, + now: repr::Timestamp, + err_collector: &ErrCollector, + scheduler: &Scheduler, + send: &PortCtx, +) { + let run_mfp = || { + let all_updates = eval_mfp_core(input, mfp_plan, now, err_collector); + arrange.write().apply_updates(now, all_updates)?; + Ok(()) + }; + err_collector.run(run_mfp); + + // Deal with output: + // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. + // 2. Output the updates. + // 3. Truncate all updates within that range. + + let from = arrange.read().last_compaction_time().map(|n| n + 1); + let from = from.unwrap_or(repr::Timestamp::MIN); + let output_kv = arrange.read().get_updates_in_range(from..=now); + // the output is expected to be key -> empty val + let output = output_kv + .into_iter() + .map(|((key, _v), ts, diff)| (key, ts, diff)) + .collect_vec(); + send.give(output); + let run_compaction = || { + arrange.write().compaction_to(now)?; + Ok(()) + }; + err_collector.run(run_compaction); + + // schedule the next time this operator should run + if let Some(i) = arrange.read().get_next_update_time(&now) { + scheduler.schedule_at(i) + } +} + +/// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator, +/// return the output updates **And** possibly any number of errors that occurred during the evaluation +fn eval_mfp_core( + input: impl IntoIterator, + mfp_plan: &MfpPlan, + now: repr::Timestamp, + err_collector: &ErrCollector, +) -> Vec { + let mut all_updates = Vec::new(); + for (mut row, _sys_time, diff) in input.into_iter() { + // this updates is expected to be only zero to two rows + let updates = mfp_plan.evaluate::(&mut row.inner, now, diff); + // TODO(discord9): refactor error handling + // Expect error in a single row to not interrupt the whole evaluation + let updates = updates + .filter_map(|r| match r { + Ok((key, ts, diff)) => Some(((key, Row::empty()), ts, diff)), + Err((err, _ts, _diff)) => { + err_collector.push_err(err); + None + } + }) + .collect_vec(); + + all_updates.extend(updates); + } + all_updates +} + +#[cfg(test)] +mod test { + use std::cell::RefCell; + use std::rc::Rc; + + use common_time::DateTime; + use datatypes::data_type::ConcreteDataType; + use hydroflow::scheduled::graph::Hydroflow; + use hydroflow::scheduled::graph_ext::GraphExt; + use hydroflow::scheduled::handoff::VecHandoff; + + use super::*; + use crate::expr::BinaryFunc; + use crate::repr::Row; + + fn harness_test_ctx<'r, 'h>( + df: &'r mut Hydroflow<'h>, + state: &'r mut DataflowState, + ) -> Context<'r, 'h> { + let err_collector = state.get_err_collector(); + Context { + id: GlobalId::User(0), + df, + compute_state: state, + input_collection: BTreeMap::new(), + local_scope: Default::default(), + err_collector, + } + } + + /// test if temporal filter works properly + /// namely: if mfp operator can schedule a delete at the correct time + #[test] + fn test_render_mfp_with_temporal() { + let mut df = Hydroflow::new(); + let mut state = DataflowState::default(); + let mut ctx = harness_test_ctx(&mut df, &mut state); + + let rows = vec![ + (Row::new(vec![1i64.into()]), 1, 1), + (Row::new(vec![2i64.into()]), 2, 1), + (Row::new(vec![3i64.into()]), 3, 1), + ]; + let collection = ctx.render_constant(rows.clone()); + ctx.insert_global(GlobalId::User(1), collection); + let input_plan = Plan::Get { + id: expr::Id::Global(GlobalId::User(1)), + }; + // temporal filter: now <= col(0) < now + 4 + let mfp = MapFilterProject::new(1) + .filter(vec![ + ScalarExpr::Column(0) + .call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype())) + .call_binary( + ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now), + BinaryFunc::Gte, + ), + ScalarExpr::Column(0) + .call_binary( + ScalarExpr::literal(4i64.into(), ConcreteDataType::int64_datatype()), + BinaryFunc::SubInt64, + ) + .call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype())) + .call_binary( + ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now), + BinaryFunc::Lt, + ), + ]) + .unwrap(); + + let mut bundle = ctx + .render_map_filter_project_into_executable_dataflow(Box::new(input_plan), mfp) + .unwrap(); + let collection = bundle.collection; + let _arranged = bundle.arranged.pop_first().unwrap().1; + let output = Rc::new(RefCell::new(vec![])); + let output_inner = output.clone(); + let _subgraph = ctx.df.add_subgraph_sink( + "test_render_constant", + collection.into_inner(), + move |_ctx, recv| { + let data = recv.take_inner(); + let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec(); + output_inner.borrow_mut().clear(); + output_inner.borrow_mut().extend(res); + }, + ); + // drop ctx here to simulate actual process of compile first, run later scenario + drop(ctx); + // expected output at given time + let expected_output = BTreeMap::from([ + ( + 0, // time + vec![ + (Row::new(vec![1i64.into()]), 0, 1), + (Row::new(vec![2i64.into()]), 0, 1), + (Row::new(vec![3i64.into()]), 0, 1), + ], + ), + ( + 2, // time + vec![(Row::new(vec![1i64.into()]), 2, -1)], + ), + ( + 3, // time + vec![(Row::new(vec![2i64.into()]), 3, -1)], + ), + ( + 4, // time + vec![(Row::new(vec![3i64.into()]), 4, -1)], + ), + ]); + + for now in 0i64..5 { + state.set_current_ts(now); + state.run_available_with_schedule(&mut df); + assert!(state.get_err_collector().inner.borrow().is_empty()); + if let Some(expected) = expected_output.get(&now) { + assert_eq!(*output.borrow(), *expected); + } else { + assert_eq!(*output.borrow(), vec![]); + }; + output.borrow_mut().clear(); + } + } + + /// test if mfp operator without temporal filter works properly + /// that is it filter the rows correctly + #[test] + fn test_render_mfp() { + let mut df = Hydroflow::new(); + let mut state = DataflowState::default(); + let mut ctx = harness_test_ctx(&mut df, &mut state); + + let rows = vec![ + (Row::new(vec![1.into()]), 1, 1), + (Row::new(vec![2.into()]), 2, 1), + (Row::new(vec![3.into()]), 3, 1), + ]; + let collection = ctx.render_constant(rows.clone()); + ctx.insert_global(GlobalId::User(1), collection); + let input_plan = Plan::Get { + id: expr::Id::Global(GlobalId::User(1)), + }; + // filter: col(0)>1 + let mfp = MapFilterProject::new(1) + .filter(vec![ScalarExpr::Column(0).call_binary( + ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()), + BinaryFunc::Gt, + )]) + .unwrap(); + let bundle = ctx + .render_map_filter_project_into_executable_dataflow(Box::new(input_plan), mfp) + .unwrap(); + let collection = bundle.collection.clone(ctx.df); + + ctx.df.add_subgraph_sink( + "test_render_constant", + collection.into_inner(), + move |_ctx, recv| { + let data = recv.take_inner(); + let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec(); + assert_eq!( + res, + vec![ + (Row::new(vec![2.into()]), 0, 1), + (Row::new(vec![3.into()]), 0, 1), + ] + ) + }, + ); + drop(ctx); + + df.run_available(); + } + + /// test if constant operator works properly + /// that is it only emit once, not multiple times + #[test] + fn test_render_constant() { + let mut df = Hydroflow::new(); + let mut state = DataflowState::default(); + let mut ctx = harness_test_ctx(&mut df, &mut state); + + let rows = vec![ + (Row::empty(), 1, 1), + (Row::empty(), 2, 1), + (Row::empty(), 3, 1), + ]; + let collection = ctx.render_constant(rows.clone()); + let collection = collection.collection.clone(ctx.df); + let cnt = Rc::new(RefCell::new(0)); + let cnt_inner = cnt.clone(); + ctx.df.add_subgraph_sink( + "test_render_constant", + collection.into_inner(), + move |_ctx, recv| { + let data = recv.take_inner(); + *cnt_inner.borrow_mut() += data.iter().map(|v| v.len()).sum::(); + }, + ); + ctx.df.run_available(); + assert_eq!(*cnt.borrow(), 3); + ctx.df.run_available(); + assert_eq!(*cnt.borrow(), 3); + } + + /// a simple example to show how to use source and sink + #[test] + fn example_source_sink() { + let mut df = Hydroflow::new(); + let (send_port, recv_port) = df.make_edge::<_, VecHandoff>("test_handoff"); + df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| { + for i in 0..10 { + send.give(vec![i]); + } + }); + + let sum = Rc::new(RefCell::new(0)); + let sum_move = sum.clone(); + let sink = df.add_subgraph_sink("test_handoff_sink", recv_port, move |_ctx, recv| { + let data = recv.take_inner(); + *sum_move.borrow_mut() += data.iter().sum::(); + }); + + df.run_available(); + assert_eq!(sum.borrow().to_owned(), 45); + df.schedule_subgraph(sink); + df.run_available(); + + assert_eq!(sum.borrow().to_owned(), 45); + } +} diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs new file mode 100644 index 0000000000..af107185f6 --- /dev/null +++ b/src/flow/src/compute/state.rs @@ -0,0 +1,106 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cell::RefCell; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::rc::Rc; + +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::scheduled::SubgraphId; + +use crate::compute::types::ErrCollector; +use crate::repr::{self, Timestamp}; + +/// input/output of a dataflow +/// One `ComputeState` manage the input/output/schedule of one `Hydroflow` +#[derive(Default)] +pub struct DataflowState { + /// it is important to use a deque to maintain the order of subgraph here + /// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule + schedule_subgraph: Rc>>>, + /// Frontier (in sys time) before which updates should not be emitted. + /// + /// We *must* apply it to sinks, to ensure correct outputs. + /// We *should* apply it to sources and imported shared state, because it improves performance. + /// Which means it's also the current time in temporal filter to get current correct result + as_of: Rc>, + /// error collector local to this `ComputeState`, + /// useful for distinguishing errors from different `Hydroflow` + err_collector: ErrCollector, +} + +impl DataflowState { + /// schedule all subgraph that need to run with time <= `as_of` and run_available() + /// + /// return true if any subgraph actually executed + pub fn run_available_with_schedule(&mut self, df: &mut Hydroflow) -> bool { + // first split keys <= as_of into another map + let mut before = self + .schedule_subgraph + .borrow_mut() + .split_off(&(*self.as_of.borrow() + 1)); + std::mem::swap(&mut before, &mut self.schedule_subgraph.borrow_mut()); + for (_, v) in before { + for subgraph in v { + df.schedule_subgraph(subgraph); + } + } + df.run_available() + } + pub fn get_scheduler(&self) -> Scheduler { + Scheduler { + schedule_subgraph: self.schedule_subgraph.clone(), + cur_subgraph: Rc::new(RefCell::new(None)), + } + } + + /// return a handle to the current time, will update when `as_of` is updated + /// + /// so it can keep track of the current time even in a closure that is called later + pub fn current_time_ref(&self) -> Rc> { + self.as_of.clone() + } + + pub fn current_ts(&self) -> Timestamp { + *self.as_of.borrow() + } + + pub fn set_current_ts(&mut self, ts: Timestamp) { + self.as_of.replace(ts); + } + + pub fn get_err_collector(&self) -> ErrCollector { + self.err_collector.clone() + } +} + +#[derive(Clone)] +pub struct Scheduler { + schedule_subgraph: Rc>>>, + cur_subgraph: Rc>>, +} + +impl Scheduler { + pub fn schedule_at(&self, next_run_time: Timestamp) { + let mut schedule_subgraph = self.schedule_subgraph.borrow_mut(); + let subgraph = self.cur_subgraph.borrow(); + let subgraph = subgraph.as_ref().expect("Set SubgraphId before schedule"); + let subgraph_queue = schedule_subgraph.entry(next_run_time).or_default(); + subgraph_queue.push_back(*subgraph); + } + + pub fn set_cur_subgraph(&self, subgraph: SubgraphId) { + self.cur_subgraph.replace(Some(subgraph)); + } +} diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs new file mode 100644 index 0000000000..35c3e175c8 --- /dev/null +++ b/src/flow/src/compute/types.rs @@ -0,0 +1,162 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cell::RefCell; +use std::collections::{BTreeMap, VecDeque}; +use std::rc::Rc; +use std::sync::Arc; + +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::scheduled::handoff::TeeingHandoff; +use hydroflow::scheduled::port::RecvPort; +use hydroflow::scheduled::SubgraphId; +use tokio::sync::RwLock; + +use crate::compute::render::Context; +use crate::expr::{EvalError, ScalarExpr}; +use crate::repr::DiffRow; +use crate::utils::{ArrangeHandler, Arrangement}; + +pub type Toff = TeeingHandoff; + +/// A collection, represent a collections of data that is received from a handoff. +pub struct Collection { + /// represent a stream of updates recv from this port + stream: RecvPort>, +} + +impl Collection { + pub fn from_port(port: RecvPort>) -> Self { + Collection { stream: port } + } + + /// clone a collection, require a mutable reference to the hydroflow instance + /// + /// Note: need to be the same hydroflow instance that this collection is created from + pub fn clone(&self, df: &mut Hydroflow) -> Self { + Collection { + stream: self.stream.tee(df), + } + } + + pub fn into_inner(self) -> RecvPort> { + self.stream + } +} + +/// Arranged is a wrapper around `ArrangeHandler` that maintain a list of readers and a writer +pub struct Arranged { + pub arrangement: ArrangeHandler, + pub writer: Rc>>, + /// maintain a list of readers for the arrangement for the ease of scheduling + pub readers: Rc>>, +} + +impl Arranged { + pub fn new(arr: ArrangeHandler) -> Self { + Self { + arrangement: arr, + writer: Default::default(), + readers: Default::default(), + } + } + + /// Copy it's future only updates, internally `Rc-ed` so it's cheap to copy + pub fn try_copy_future(&self) -> Option { + self.arrangement + .clone_future_only() + .map(|arrangement| Arranged { + arrangement, + readers: self.readers.clone(), + writer: self.writer.clone(), + }) + } + + /// Copy the full arrangement, including the future and the current updates. + /// + /// Internally `Rc-ed` so it's cheap to copy + pub fn try_copy_full(&self) -> Option { + self.arrangement + .clone_full_arrange() + .map(|arrangement| Arranged { + arrangement, + readers: self.readers.clone(), + writer: self.writer.clone(), + }) + } + pub fn add_reader(&self, id: SubgraphId) { + self.readers.borrow_mut().push(id) + } +} + +/// A bundle of the various ways a collection can be represented. +/// +/// This type maintains the invariant that it does contain at least one(or both) valid +/// source of data, either a collection or at least one arrangement. This is for convenience +/// of reading the data from the collection. +pub struct CollectionBundle { + /// This is useful for passively reading the new updates from the collection + pub collection: Collection, + /// the key [`ScalarExpr`] indicate how the keys(also a [`Row`]) used in Arranged is extract from collection's [`Row`] + /// So it is the "index" of the arrangement + /// + /// The `Arranged` is the actual data source, it can be used to read the data from the collection by + /// using the key indicated by the `Vec` + pub arranged: BTreeMap, Arranged>, +} + +impl CollectionBundle { + pub fn from_collection(collection: Collection) -> Self { + Self { + collection, + arranged: BTreeMap::default(), + } + } + pub fn clone(&self, df: &mut Hydroflow) -> Self { + Self { + collection: self.collection.clone(df), + arranged: self + .arranged + .iter() + .map(|(k, v)| (k.clone(), v.try_copy_future().unwrap())) + .collect(), + } + } +} + +/// A thread local error collector, used to collect errors during the evaluation of the plan +/// +/// usually only the first error matters, but store all of them just in case +/// +/// Using a `VecDeque` to preserve the order of errors +/// when running dataflow continuously and need errors in order +#[derive(Default, Clone)] +pub struct ErrCollector { + pub inner: Rc>>, +} + +impl ErrCollector { + pub fn push_err(&self, err: EvalError) { + self.inner.borrow_mut().push_back(err) + } + + pub fn run(&self, f: F) + where + F: FnOnce() -> Result<(), EvalError>, + { + if let Err(e) = f() { + self.push_err(e) + } + } +} diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 85a127f09a..b7f0ba698a 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -26,7 +26,7 @@ use crate::expr::error::{ TypeMismatchSnafu, }; use crate::expr::{InvalidArgumentSnafu, ScalarExpr}; -use crate::repr::Row; +use crate::repr::{value_to_internal_ts, Row}; /// UnmaterializableFunc is a function that can't be eval independently, /// and require special handling @@ -80,13 +80,17 @@ impl UnaryFunc { } } Self::StepTimestamp => { + let ty = arg.data_type(); if let Value::DateTime(datetime) = arg { let datetime = DateTime::from(datetime.val() + 1); Ok(Value::from(datetime)) + } else if let Ok(v) = value_to_internal_ts(arg) { + let datetime = DateTime::from(v + 1); + Ok(Value::from(datetime)) } else { TypeMismatchSnafu { expected: ConcreteDataType::datetime_datatype(), - actual: arg.data_type(), + actual: ty, } .fail()? } diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index 8d28533d8c..91fa686428 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -97,7 +97,7 @@ impl AggregateFunc { /// Eval value, diff with accumulator /// - /// Expect self to be accumulable aggregate functio, i.e. sum/count + /// Expect self to be accumulable aggregate function, i.e. sum/count /// /// TODO(discord9): deal with overflow&better accumulator pub fn eval_diff_accumulable( diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 99867d7c77..1e232b68e1 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -16,6 +16,7 @@ #![allow(unused_imports)] // allow unused for now because it should be use later mod adapter; +mod compute; mod expr; mod plan; mod repr; diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 77b8ffb1f6..837d7f207a 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -42,14 +42,21 @@ pub enum Plan { /// Get CDC data from an source, be it external reference to an existing source or an internal /// reference to a `Let` identifier Get { id: Id }, - /// Create a temporary collection from given `value``, and make this bind only available + /// Create a temporary collection from given `value`, and make this bind only available /// in scope of `body` + /// + /// Similar to this rust code snippet: + /// ```rust, ignore + /// { + /// let id = value; + /// body + /// } Let { id: LocalId, value: Box, body: Box, }, - /// Map, Filter, and Project operators. + /// Map, Filter, and Project operators. Chained together. Mfp { /// The input collection. input: Box, diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 8d714df39a..91ad6c1c38 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -51,6 +51,8 @@ pub type DiffRow = (Row, Timestamp, Diff); pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); /// Convert a value that is or can be converted to Datetime to internal timestamp +/// +/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64` pub fn value_to_internal_ts(value: Value) -> Result { let is_supported_time_type = |arg: &Value| { let ty = arg.data_type(); diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 01c6539d2d..c2dc15dad4 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -207,7 +207,7 @@ impl Arrangement { } /// get the last compaction time - pub fn get_compaction(&self) -> Option { + pub fn last_compaction_time(&self) -> Option { self.last_compaction_time } @@ -253,7 +253,7 @@ impl Arrangement { /// advance time to `now` and consolidate all older(`now` included) updates to the first key /// /// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired - pub fn set_compaction(&mut self, now: Timestamp) -> Result, EvalError> { + pub fn compaction_to(&mut self, now: Timestamp) -> Result, EvalError> { let mut max_late_by: Option = None; let should_compact = self.split_lte(&now); @@ -546,7 +546,7 @@ mod test { vec![((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 1, 1)] ); assert_eq!(arr.spine.len(), 3); - arr.set_compaction(1).unwrap(); + arr.compaction_to(1).unwrap(); assert_eq!(arr.spine.len(), 3); } @@ -555,7 +555,7 @@ mod test { { let mut arr = arr.write(); assert_eq!(arr.spine.len(), 3); - arr.set_compaction(2).unwrap(); + arr.compaction_to(2).unwrap(); assert_eq!(arr.spine.len(), 2); } } @@ -605,7 +605,7 @@ mod test { ] ); assert_eq!(arr.spine.len(), 3); - arr.set_compaction(1).unwrap(); + arr.compaction_to(1).unwrap(); assert_eq!(arr.spine.len(), 3); } {