diff --git a/Cargo.lock b/Cargo.lock index 43de72832e..bbaa37e4c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4150,8 +4150,8 @@ dependencies = [ [[package]] name = "hydroflow" -version = "0.6.0" -source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" +version = "0.6.2" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320" dependencies = [ "bincode", "byteorder", @@ -4183,7 +4183,7 @@ dependencies = [ [[package]] name = "hydroflow_datalog" version = "0.6.0" -source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320" dependencies = [ "hydroflow_datalog_core", "proc-macro-crate 1.3.1", @@ -4194,8 +4194,8 @@ dependencies = [ [[package]] name = "hydroflow_datalog_core" -version = "0.6.0" -source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" +version = "0.6.1" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320" dependencies = [ "hydroflow_lang", "proc-macro-crate 1.3.1", @@ -4209,8 +4209,8 @@ dependencies = [ [[package]] name = "hydroflow_lang" -version = "0.6.0" -source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" +version = "0.6.2" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320" dependencies = [ "auto_impl", "clap 4.5.4", @@ -4230,7 +4230,7 @@ dependencies = [ [[package]] name = "hydroflow_macro" version = "0.6.0" -source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320" dependencies = [ "hydroflow_lang", "itertools 0.10.5", @@ -4723,8 +4723,8 @@ dependencies = [ [[package]] name = "lattices" -version = "0.5.3" -source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" +version = "0.5.4" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320" dependencies = [ "cc-traits", "sealed", @@ -7376,7 +7376,7 @@ checksum = "3b7e158a385023d209d6d5f2585c4b468f6dcb3dd5aca9b75c4f1678c05bb375" [[package]] name = "pusherator" version = "0.0.5" -source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320" dependencies = [ "either", "variadics", @@ -11148,7 +11148,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 0.1.10", "rand", "static_assertions", ] @@ -11562,7 +11562,7 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "variadics" version = "0.0.4" -source = "git+https://github.com/GreptimeTeam/hydroflow.git?rev=ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94#ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" +source = "git+https://github.com/GreptimeTeam/hydroflow.git?branch=main#b072ee026f97f8537165e1fb247101e0ab2fb320" dependencies = [ "sealed", ] diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 4e9f2e0002..28be2e4dae 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -14,14 +14,14 @@ common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true common-time.workspace = true +datafusion-common.workspace = true +datafusion-expr.workspace = true datafusion-substrait.workspace = true datatypes.workspace = true enum_dispatch = "0.3" # This fork is simply for keeping our dependency in our org, and pin the version # it is the same with upstream repo -datafusion-common.workspace = true -datafusion-expr.workspace = true -hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", rev = "ba2df44efd42b7c4d37ebefbf82e77c6f1d4cb94" } +hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true num-traits = "0.2" serde.workspace = true diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index f2b02e219a..fe3ab10d54 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -18,25 +18,32 @@ use std::cell::RefCell; use std::collections::{BTreeMap, VecDeque}; +use std::ops::Range; use std::rc::Rc; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::{ListValue, Value}; +use hydroflow::futures::SinkExt; use hydroflow::lattices::cc_traits::Get; use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::graph_ext::GraphExt; use hydroflow::scheduled::port::{PortCtx, SEND}; use itertools::Itertools; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use super::state::Scheduler; -use crate::adapter::error::{Error, EvalSnafu, InvalidQuerySnafu}; +use crate::adapter::error::{Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu}; use crate::compute::state::DataflowState; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; +use crate::expr::error::{DataTypeSnafu, InternalSnafu}; use crate::expr::{ self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr, }; -use crate::plan::Plan; +use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; -use crate::utils::{ArrangeHandler, Arrangement}; +use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement}; + +mod map; /// The Context for build a Operator with id of `GlobalId` pub struct Context<'referred, 'df> { @@ -86,8 +93,6 @@ impl<'referred, 'df> Context<'referred, 'df> { } } -// There is a false positive in using `Vec` as key -#[allow(clippy::mutable_key_type)] impl<'referred, 'df> Context<'referred, 'df> { /// Interpret and execute plan /// @@ -97,26 +102,62 @@ impl<'referred, 'df> Context<'referred, 'df> { Plan::Constant { rows } => Ok(self.render_constant(rows)), Plan::Get { id } => self.get_by_id(id), Plan::Let { id, value, body } => self.eval_let(id, value, body), - Plan::Mfp { input, mfp } => { - self.render_map_filter_project_into_executable_dataflow(input, mfp) + Plan::Mfp { input, mfp } => self.render_mfp(input, mfp), + Plan::Reduce { + input: _, + key_val_plan: _, + reduce_plan: _, + } => NotImplementedSnafu { + reason: "Reduce is still WIP".to_string(), } - Plan::Reduce { .. } => todo!(), - Plan::Join { .. } => todo!(), - Plan::Union { .. } => todo!(), + .fail(), + Plan::Join { .. } => NotImplementedSnafu { + reason: "Join is still WIP".to_string(), + } + .fail(), + Plan::Union { .. } => NotImplementedSnafu { + reason: "Union is still WIP".to_string(), + } + .fail(), } } - /// render Constant, will only emit the `rows` once. - pub fn render_constant(&mut self, mut rows: Vec) -> CollectionBundle { + /// render Constant, take all rows that have a timestamp not greater than the current time + /// + /// Always assume input is sorted by timestamp + pub fn render_constant(&mut self, rows: Vec) -> CollectionBundle { let (send_port, recv_port) = self.df.make_edge::<_, Toff>("constant"); + let mut per_time: BTreeMap> = rows + .into_iter() + .group_by(|(_row, ts, _diff)| *ts) + .into_iter() + .map(|(k, v)| (k, v.into_iter().collect_vec())) + .collect(); + let now = self.compute_state.current_time_ref(); + // TODO(discord9): better way to schedule future run + let scheduler = self.compute_state.get_scheduler(); + let scheduler_inner = scheduler.clone(); - self.df - .add_subgraph_source("Constant", send_port, move |_ctx, send_port| { - if rows.is_empty() { - return; - } - send_port.give(std::mem::take(&mut rows)); - }); + let subgraph_id = + self.df + .add_subgraph_source("Constant", send_port, move |_ctx, send_port| { + // find the first timestamp that is greater than now + // use filter_map + + let mut after = per_time.split_off(&(*now.borrow() + 1)); + // swap + std::mem::swap(&mut per_time, &mut after); + let not_great_than_now = after; + + not_great_than_now.into_iter().for_each(|(_ts, rows)| { + send_port.give(rows); + }); + // schedule the next run + if let Some(next_run_time) = per_time.keys().next().copied() { + scheduler_inner.schedule_at(next_run_time); + } + }); + scheduler.set_cur_subgraph(subgraph_id); CollectionBundle::from_collection(Collection::from_port(recv_port)) } @@ -161,144 +202,14 @@ impl<'referred, 'df> Context<'referred, 'df> { let ret = self.render_plan(*body)?; Ok(ret) } - - /// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time - /// TODO(discord9): schedule mfp operator to run when temporal filter need - /// - /// `MapFilterProject`(`mfp` for short) is scheduled to run when there is enough amount of input updates - /// ***or*** when a future update in it's output buffer(a `Arrangement`) is supposed to emit now. - pub fn render_map_filter_project_into_executable_dataflow( - &mut self, - input: Box, - mfp: MapFilterProject, - ) -> Result { - let input = self.render_plan(*input)?; - // TODO(discord9): consider if check if contain temporal to determine if - // need arrange or not, or does this added complexity worth it - let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("mfp"); - let input_arity = mfp.input_arity; - - // default to have a arrange with only future updates, so it can be empty if no temporal filter is applied - // as stream only sends current updates and etc. - let arrange = Arrangement::new(); - let arrange_handler = ArrangeHandler::from(arrange.clone()); - let arrange_handler_inner = ArrangeHandler::from(arrange); - - // This closure capture following variables: - let mfp_plan = MfpPlan::create_from(mfp)?; - let now = self.compute_state.current_time_ref(); - - let err_collector = self.err_collector.clone(); - - // TODO(discord9): better way to schedule future run - let scheduler = self.compute_state.get_scheduler(); - let scheduler_inner = scheduler.clone(); - - let subgraph = self.df.add_subgraph_in_out( - "mfp", - input.collection.into_inner(), - out_send_port, - move |_ctx, recv, send| { - // mfp only need to passively receive updates from recvs - let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter()); - - mfp_subgraph( - &arrange_handler_inner, - data, - &mfp_plan, - *now.borrow(), - &err_collector, - &scheduler_inner, - send, - ); - }, - ); - - // register current subgraph in scheduler for future scheduling - scheduler.set_cur_subgraph(subgraph); - - let arranged = BTreeMap::from([( - (0..input_arity).map(ScalarExpr::Column).collect_vec(), - Arranged::new(arrange_handler), - )]); - - let bundle = CollectionBundle { - collection: Collection::from_port(out_recv_port), - arranged, - }; - Ok(bundle) - } } -fn mfp_subgraph( - arrange: &ArrangeHandler, - input: impl IntoIterator, - mfp_plan: &MfpPlan, +/// The Common argument for all `Subgraph` in the render process +struct SubgraphArg<'a> { now: repr::Timestamp, - err_collector: &ErrCollector, - scheduler: &Scheduler, - send: &PortCtx, -) { - let run_mfp = || { - let all_updates = eval_mfp_core(input, mfp_plan, now, err_collector); - arrange.write().apply_updates(now, all_updates)?; - Ok(()) - }; - err_collector.run(run_mfp); - - // Deal with output: - // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. - // 2. Output the updates. - // 3. Truncate all updates within that range. - - let from = arrange.read().last_compaction_time().map(|n| n + 1); - let from = from.unwrap_or(repr::Timestamp::MIN); - let output_kv = arrange.read().get_updates_in_range(from..=now); - // the output is expected to be key -> empty val - let output = output_kv - .into_iter() - .map(|((key, _v), ts, diff)| (key, ts, diff)) - .collect_vec(); - send.give(output); - let run_compaction = || { - arrange.write().compaction_to(now)?; - Ok(()) - }; - err_collector.run(run_compaction); - - // schedule the next time this operator should run - if let Some(i) = arrange.read().get_next_update_time(&now) { - scheduler.schedule_at(i) - } -} - -/// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator, -/// return the output updates **And** possibly any number of errors that occurred during the evaluation -fn eval_mfp_core( - input: impl IntoIterator, - mfp_plan: &MfpPlan, - now: repr::Timestamp, - err_collector: &ErrCollector, -) -> Vec { - let mut all_updates = Vec::new(); - for (mut row, _sys_time, diff) in input.into_iter() { - // this updates is expected to be only zero to two rows - let updates = mfp_plan.evaluate::(&mut row.inner, now, diff); - // TODO(discord9): refactor error handling - // Expect error in a single row to not interrupt the whole evaluation - let updates = updates - .filter_map(|r| match r { - Ok((key, ts, diff)) => Some(((key, Row::empty()), ts, diff)), - Err((err, _ts, _diff)) => { - err_collector.push_err(err); - None - } - }) - .collect_vec(); - - all_updates.extend(updates); - } - all_updates + err_collector: &'a ErrCollector, + scheduler: &'a Scheduler, + send: &'a PortCtx, } #[cfg(test)] @@ -316,64 +227,30 @@ mod test { use crate::expr::BinaryFunc; use crate::repr::Row; - fn harness_test_ctx<'r, 'h>( - df: &'r mut Hydroflow<'h>, - state: &'r mut DataflowState, - ) -> Context<'r, 'h> { - let err_collector = state.get_err_collector(); - Context { - id: GlobalId::User(0), - df, - compute_state: state, - input_collection: BTreeMap::new(), - local_scope: Default::default(), - err_collector, + pub fn run_and_check( + state: &mut DataflowState, + df: &mut Hydroflow, + time_range: Range, + expected: BTreeMap>, + output: Rc>>, + ) { + for now in time_range { + state.set_current_ts(now); + state.run_available_with_schedule(df); + assert!(state.get_err_collector().inner.borrow().is_empty()); + if let Some(expected) = expected.get(&now) { + assert_eq!(*output.borrow(), *expected, "at ts={}", now); + } else { + assert_eq!(*output.borrow(), vec![], "at ts={}", now); + }; + output.borrow_mut().clear(); } } - /// test if temporal filter works properly - /// namely: if mfp operator can schedule a delete at the correct time - #[test] - fn test_render_mfp_with_temporal() { - let mut df = Hydroflow::new(); - let mut state = DataflowState::default(); - let mut ctx = harness_test_ctx(&mut df, &mut state); - - let rows = vec![ - (Row::new(vec![1i64.into()]), 1, 1), - (Row::new(vec![2i64.into()]), 2, 1), - (Row::new(vec![3i64.into()]), 3, 1), - ]; - let collection = ctx.render_constant(rows); - ctx.insert_global(GlobalId::User(1), collection); - let input_plan = Plan::Get { - id: expr::Id::Global(GlobalId::User(1)), - }; - // temporal filter: now <= col(0) < now + 4 - let mfp = MapFilterProject::new(1) - .filter(vec![ - ScalarExpr::Column(0) - .call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype())) - .call_binary( - ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now), - BinaryFunc::Gte, - ), - ScalarExpr::Column(0) - .call_binary( - ScalarExpr::literal(4i64.into(), ConcreteDataType::int64_datatype()), - BinaryFunc::SubInt64, - ) - .call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype())) - .call_binary( - ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now), - BinaryFunc::Lt, - ), - ]) - .unwrap(); - - let mut bundle = ctx - .render_map_filter_project_into_executable_dataflow(Box::new(input_plan), mfp) - .unwrap(); + pub fn get_output_handle( + ctx: &mut Context, + mut bundle: CollectionBundle, + ) -> Rc>> { let collection = bundle.collection; let _arranged = bundle.arranged.pop_first().unwrap().1; let output = Rc::new(RefCell::new(vec![])); @@ -388,93 +265,22 @@ mod test { output_inner.borrow_mut().extend(res); }, ); - // drop ctx here to simulate actual process of compile first, run later scenario - drop(ctx); - // expected output at given time - let expected_output = BTreeMap::from([ - ( - 0, // time - vec![ - (Row::new(vec![1i64.into()]), 0, 1), - (Row::new(vec![2i64.into()]), 0, 1), - (Row::new(vec![3i64.into()]), 0, 1), - ], - ), - ( - 2, // time - vec![(Row::new(vec![1i64.into()]), 2, -1)], - ), - ( - 3, // time - vec![(Row::new(vec![2i64.into()]), 3, -1)], - ), - ( - 4, // time - vec![(Row::new(vec![3i64.into()]), 4, -1)], - ), - ]); - - for now in 0i64..5 { - state.set_current_ts(now); - state.run_available_with_schedule(&mut df); - assert!(state.get_err_collector().inner.borrow().is_empty()); - if let Some(expected) = expected_output.get(&now) { - assert_eq!(*output.borrow(), *expected); - } else { - assert_eq!(*output.borrow(), vec![]); - }; - output.borrow_mut().clear(); - } + output } - /// test if mfp operator without temporal filter works properly - /// that is it filter the rows correctly - #[test] - fn test_render_mfp() { - let mut df = Hydroflow::new(); - let mut state = DataflowState::default(); - let mut ctx = harness_test_ctx(&mut df, &mut state); - - let rows = vec![ - (Row::new(vec![1.into()]), 1, 1), - (Row::new(vec![2.into()]), 2, 1), - (Row::new(vec![3.into()]), 3, 1), - ]; - let collection = ctx.render_constant(rows); - ctx.insert_global(GlobalId::User(1), collection); - let input_plan = Plan::Get { - id: expr::Id::Global(GlobalId::User(1)), - }; - // filter: col(0)>1 - let mfp = MapFilterProject::new(1) - .filter(vec![ScalarExpr::Column(0).call_binary( - ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()), - BinaryFunc::Gt, - )]) - .unwrap(); - let bundle = ctx - .render_map_filter_project_into_executable_dataflow(Box::new(input_plan), mfp) - .unwrap(); - let collection = bundle.collection.clone(ctx.df); - - ctx.df.add_subgraph_sink( - "test_render_constant", - collection.into_inner(), - move |_ctx, recv| { - let data = recv.take_inner(); - let res = data.into_iter().flat_map(|v| v.into_iter()).collect_vec(); - assert_eq!( - res, - vec![ - (Row::new(vec![2.into()]), 0, 1), - (Row::new(vec![3.into()]), 0, 1), - ] - ) - }, - ); - drop(ctx); - - df.run_available(); + pub fn harness_test_ctx<'r, 'h>( + df: &'r mut Hydroflow<'h>, + state: &'r mut DataflowState, + ) -> Context<'r, 'h> { + let err_collector = state.get_err_collector(); + Context { + id: GlobalId::User(0), + df, + compute_state: state, + input_collection: BTreeMap::new(), + local_scope: Default::default(), + err_collector, + } } /// test if constant operator works properly @@ -494,7 +300,7 @@ mod test { let collection = collection.collection.clone(ctx.df); let cnt = Rc::new(RefCell::new(0)); let cnt_inner = cnt.clone(); - ctx.df.add_subgraph_sink( + let res_subgraph_id = ctx.df.add_subgraph_sink( "test_render_constant", collection.into_inner(), move |_ctx, recv| { @@ -502,9 +308,16 @@ mod test { *cnt_inner.borrow_mut() += data.iter().map(|v| v.len()).sum::(); }, ); + ctx.compute_state.set_current_ts(2); + ctx.compute_state.run_available_with_schedule(ctx.df); + assert_eq!(*cnt.borrow(), 2); + + ctx.compute_state.set_current_ts(3); + ctx.compute_state.run_available_with_schedule(ctx.df); + // to get output + ctx.df.schedule_subgraph(res_subgraph_id); ctx.df.run_available(); - assert_eq!(*cnt.borrow(), 3); - ctx.df.run_available(); + assert_eq!(*cnt.borrow(), 3); } @@ -533,4 +346,33 @@ mod test { assert_eq!(sum.borrow().to_owned(), 45); } + + #[test] + fn test_tee_auto_schedule() { + use hydroflow::scheduled::handoff::TeeingHandoff as Toff; + let mut df = Hydroflow::new(); + let (send_port, recv_port) = df.make_edge::<_, Toff>("test_handoff"); + let source = df.add_subgraph_source("test_handoff_source", send_port, move |_ctx, send| { + for i in 0..10 { + send.give(vec![i]); + } + }); + let teed_recv_port = recv_port.tee(&mut df); + + let sum = Rc::new(RefCell::new(0)); + let sum_move = sum.clone(); + let _sink = df.add_subgraph_sink("test_handoff_sink", teed_recv_port, move |_ctx, recv| { + let data = recv.take_inner(); + *sum_move.borrow_mut() += data.iter().flat_map(|i| i.iter()).sum::(); + }); + drop(recv_port); + + df.run_available(); + assert_eq!(sum.borrow().to_owned(), 45); + + df.schedule_subgraph(source); + df.run_available(); + + assert_eq!(sum.borrow().to_owned(), 90); + } } diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs new file mode 100644 index 0000000000..a0e29f15a6 --- /dev/null +++ b/src/flow/src/compute/render/map.rs @@ -0,0 +1,294 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use hydroflow::scheduled::graph_ext::GraphExt; +use hydroflow::scheduled::port::{PortCtx, SEND}; +use itertools::Itertools; +use snafu::OptionExt; + +use crate::adapter::error::{Error, PlanSnafu}; +use crate::compute::render::Context; +use crate::compute::state::Scheduler; +use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; +use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr}; +use crate::plan::Plan; +use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; +use crate::utils::ArrangeHandler; + +impl<'referred, 'df> Context<'referred, 'df> { + /// render MapFilterProject, will only emit the `rows` once. Assume all incoming row's sys time being `now`` and ignore the row's stated sys time + /// TODO(discord9): schedule mfp operator to run when temporal filter need + /// + /// `MapFilterProject`(`mfp` for short) is scheduled to run when there is enough amount of input updates + /// ***or*** when a future update in it's output buffer(a `Arrangement`) is supposed to emit now. + // There is a false positive in using `Vec` as key due to `Value` have `bytes` variant + #[allow(clippy::mutable_key_type)] + pub fn render_mfp( + &mut self, + input: Box, + mfp: MapFilterProject, + ) -> Result { + let input = self.render_plan(*input)?; + // TODO(discord9): consider if check if contain temporal to determine if + // need arrange or not, or does this added complexity worth it + let (out_send_port, out_recv_port) = self.df.make_edge::<_, Toff>("mfp"); + + let output_arity = mfp.output_arity(); + + // default to have a arrange with only future updates, so it can be empty if no temporal filter is applied + // as stream only sends current updates and etc. + let arrange_handler = self.compute_state.new_arrange(None); + let arrange_handler_inner = + arrange_handler + .clone_future_only() + .with_context(|| PlanSnafu { + reason: "No write is expected at this point", + })?; + + // This closure capture following variables: + let mfp_plan = MfpPlan::create_from(mfp)?; + let now = self.compute_state.current_time_ref(); + + let err_collector = self.err_collector.clone(); + + // TODO(discord9): better way to schedule future run + let scheduler = self.compute_state.get_scheduler(); + let scheduler_inner = scheduler.clone(); + + let subgraph = self.df.add_subgraph_in_out( + "mfp", + input.collection.into_inner(), + out_send_port, + move |_ctx, recv, send| { + // mfp only need to passively receive updates from recvs + let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter()); + + mfp_subgraph( + &arrange_handler_inner, + data, + &mfp_plan, + *now.borrow(), + &err_collector, + &scheduler_inner, + send, + ); + }, + ); + + // register current subgraph in scheduler for future scheduling + scheduler.set_cur_subgraph(subgraph); + + let arranged = BTreeMap::from([( + (0..output_arity).map(ScalarExpr::Column).collect_vec(), + Arranged::new(arrange_handler), + )]); + + let bundle = CollectionBundle { + collection: Collection::from_port(out_recv_port), + arranged, + }; + Ok(bundle) + } +} + +fn mfp_subgraph( + arrange: &ArrangeHandler, + input: impl IntoIterator, + mfp_plan: &MfpPlan, + now: repr::Timestamp, + err_collector: &ErrCollector, + scheduler: &Scheduler, + send: &PortCtx, +) { + let run_mfp = || { + let all_updates = eval_mfp_core(input, mfp_plan, now, err_collector); + arrange.write().apply_updates(now, all_updates)?; + Ok(()) + }; + err_collector.run(run_mfp); + + // Deal with output: + // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. + // 2. Output the updates. + // 3. Truncate all updates within that range. + + let from = arrange.read().last_compaction_time().map(|n| n + 1); + let from = from.unwrap_or(repr::Timestamp::MIN); + let output_kv = arrange.read().get_updates_in_range(from..=now); + // the output is expected to be key -> empty val + let output = output_kv + .into_iter() + .map(|((key, _v), ts, diff)| (key, ts, diff)) + .collect_vec(); + send.give(output); + let run_compaction = || { + arrange.write().compaction_to(now)?; + Ok(()) + }; + err_collector.run(run_compaction); + + // schedule next time this subgraph should run + scheduler.schedule_for_arrange(&arrange.read(), now); +} + +/// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator, +/// return the output updates **And** possibly any number of errors that occurred during the evaluation +fn eval_mfp_core( + input: impl IntoIterator, + mfp_plan: &MfpPlan, + now: repr::Timestamp, + err_collector: &ErrCollector, +) -> Vec { + let mut all_updates = Vec::new(); + for (mut row, _sys_time, diff) in input.into_iter() { + // this updates is expected to be only zero to two rows + let updates = mfp_plan.evaluate::(&mut row.inner, now, diff); + // TODO(discord9): refactor error handling + // Expect error in a single row to not interrupt the whole evaluation + let updates = updates + .filter_map(|r| match r { + Ok((key, ts, diff)) => Some(((key, Row::empty()), ts, diff)), + Err((err, _ts, _diff)) => { + err_collector.push_err(err); + None + } + }) + .collect_vec(); + + all_updates.extend(updates); + } + all_updates +} + +#[cfg(test)] +mod test { + use std::cell::RefCell; + use std::rc::Rc; + + use datatypes::data_type::ConcreteDataType; + use hydroflow::scheduled::graph::Hydroflow; + + use super::*; + use crate::compute::render::test::{get_output_handle, harness_test_ctx, run_and_check}; + use crate::compute::state::DataflowState; + use crate::expr::{self, BinaryFunc, GlobalId}; + + /// test if temporal filter works properly + /// namely: if mfp operator can schedule a delete at the correct time + #[test] + fn test_render_mfp_with_temporal() { + let mut df = Hydroflow::new(); + let mut state = DataflowState::default(); + let mut ctx = harness_test_ctx(&mut df, &mut state); + + let rows = vec![ + (Row::new(vec![1i64.into()]), 0, 1), + (Row::new(vec![2i64.into()]), 0, 1), + (Row::new(vec![3i64.into()]), 0, 1), + ]; + let collection = ctx.render_constant(rows.clone()); + ctx.insert_global(GlobalId::User(1), collection); + let input_plan = Plan::Get { + id: expr::Id::Global(GlobalId::User(1)), + }; + // temporal filter: now <= col(0) < now + 4 + let mfp = MapFilterProject::new(1) + .filter(vec![ + ScalarExpr::Column(0) + .call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype())) + .call_binary( + ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now), + BinaryFunc::Gte, + ), + ScalarExpr::Column(0) + .call_binary( + ScalarExpr::literal(4i64.into(), ConcreteDataType::int64_datatype()), + BinaryFunc::SubInt64, + ) + .call_unary(expr::UnaryFunc::Cast(ConcreteDataType::datetime_datatype())) + .call_binary( + ScalarExpr::CallUnmaterializable(expr::UnmaterializableFunc::Now), + BinaryFunc::Lt, + ), + ]) + .unwrap(); + + let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap(); + let output = get_output_handle(&mut ctx, bundle); + // drop ctx here to simulate actual process of compile first, run later scenario + drop(ctx); + // expected output at given time + let expected_output = BTreeMap::from([ + ( + 0, // time + vec![ + (Row::new(vec![1i64.into()]), 0, 1), + (Row::new(vec![2i64.into()]), 0, 1), + (Row::new(vec![3i64.into()]), 0, 1), + ], + ), + ( + 2, // time + vec![(Row::new(vec![1i64.into()]), 2, -1)], + ), + ( + 3, // time + vec![(Row::new(vec![2i64.into()]), 3, -1)], + ), + ( + 4, // time + vec![(Row::new(vec![3i64.into()]), 4, -1)], + ), + ]); + run_and_check(&mut state, &mut df, 0..5, expected_output, output); + } + + /// test if mfp operator without temporal filter works properly + /// that is it filter the rows correctly + #[test] + fn test_render_mfp() { + let mut df = Hydroflow::new(); + let mut state = DataflowState::default(); + let mut ctx = harness_test_ctx(&mut df, &mut state); + + let rows = vec![ + (Row::new(vec![1.into()]), 1, 1), + (Row::new(vec![2.into()]), 2, 1), + (Row::new(vec![3.into()]), 3, 1), + ]; + let collection = ctx.render_constant(rows.clone()); + ctx.insert_global(GlobalId::User(1), collection); + let input_plan = Plan::Get { + id: expr::Id::Global(GlobalId::User(1)), + }; + // filter: col(0)>1 + let mfp = MapFilterProject::new(1) + .filter(vec![ScalarExpr::Column(0).call_binary( + ScalarExpr::literal(1.into(), ConcreteDataType::int32_datatype()), + BinaryFunc::Gt, + )]) + .unwrap(); + let bundle = ctx.render_mfp(Box::new(input_plan), mfp).unwrap(); + + let output = get_output_handle(&mut ctx, bundle); + drop(ctx); + let expected = BTreeMap::from([ + (2, vec![(Row::new(vec![2.into()]), 2, 1)]), + (3, vec![(Row::new(vec![3.into()]), 3, 1)]), + ]); + run_and_check(&mut state, &mut df, 1..5, expected, output); + } +} diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs index af107185f6..c062aae480 100644 --- a/src/flow/src/compute/state.rs +++ b/src/flow/src/compute/state.rs @@ -21,6 +21,7 @@ use hydroflow::scheduled::SubgraphId; use crate::compute::types::ErrCollector; use crate::repr::{self, Timestamp}; +use crate::utils::{ArrangeHandler, Arrangement}; /// input/output of a dataflow /// One `ComputeState` manage the input/output/schedule of one `Hydroflow` @@ -38,9 +39,26 @@ pub struct DataflowState { /// error collector local to this `ComputeState`, /// useful for distinguishing errors from different `Hydroflow` err_collector: ErrCollector, + /// save all used arrange in this dataflow, since usually there is no delete operation + /// we can just keep track of all used arrange and schedule subgraph when they need to be updated + arrange_used: Vec, } impl DataflowState { + pub fn new_arrange(&mut self, name: Option>) -> ArrangeHandler { + let arrange = name + .map(Arrangement::new_with_name) + .unwrap_or_else(Arrangement::new); + + let arr = ArrangeHandler::from(arrange); + // mark this arrange as used in this dataflow + self.arrange_used.push( + arr.clone_future_only() + .expect("No write happening at this point"), + ); + arr + } + /// schedule all subgraph that need to run with time <= `as_of` and run_available() /// /// return true if any subgraph actually executed @@ -85,8 +103,9 @@ impl DataflowState { } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Scheduler { + // this scheduler is shared with `DataflowState`, so it can schedule subgraph schedule_subgraph: Rc>>>, cur_subgraph: Rc>>, } @@ -100,6 +119,12 @@ impl Scheduler { subgraph_queue.push_back(*subgraph); } + pub fn schedule_for_arrange(&self, arrange: &Arrangement, now: Timestamp) { + if let Some(i) = arrange.get_next_update_time(&now) { + self.schedule_at(i) + } + } + pub fn set_cur_subgraph(&self, subgraph: SubgraphId) { self.cur_subgraph.replace(Some(subgraph)); } diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index 35c3e175c8..e14b584a82 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -28,7 +28,7 @@ use crate::expr::{EvalError, ScalarExpr}; use crate::repr::DiffRow; use crate::utils::{ArrangeHandler, Arrangement}; -pub type Toff = TeeingHandoff; +pub type Toff = TeeingHandoff; /// A collection, represent a collections of data that is received from a handoff. pub struct Collection { @@ -107,12 +107,17 @@ impl Arranged { /// of reading the data from the collection. pub struct CollectionBundle { /// This is useful for passively reading the new updates from the collection + /// + /// Invariant: the timestamp of the updates should always not greater than now, since future updates should be stored in the arrangement pub collection: Collection, /// the key [`ScalarExpr`] indicate how the keys(also a [`Row`]) used in Arranged is extract from collection's [`Row`] /// So it is the "index" of the arrangement /// /// The `Arranged` is the actual data source, it can be used to read the data from the collection by /// using the key indicated by the `Vec` + /// There is a false positive in using `Vec` as key due to `ScalarExpr::Literal` + /// contain a `Value` which have `bytes` variant + #[allow(clippy::mutable_key_type)] pub arranged: BTreeMap, Arranged>, } @@ -151,12 +156,16 @@ impl ErrCollector { self.inner.borrow_mut().push_back(err) } - pub fn run(&self, f: F) + pub fn run(&self, f: F) -> Option where - F: FnOnce() -> Result<(), EvalError>, + F: FnOnce() -> Result, { - if let Err(e) = f() { - self.push_err(e) + match f() { + Ok(r) => Some(r), + Err(e) => { + self.push_err(e); + None + } } } } diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 5c4480e749..bcdcfe36be 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -52,6 +52,13 @@ pub enum EvalError { location: Location, }, + #[snafu(display("{msg}"))] + DataType { + msg: String, + source: datatypes::Error, + location: Location, + }, + #[snafu(display("Invalid argument: {reason}"))] InvalidArgument { reason: String, location: Location }, diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 2070e46c84..2c49e4efbf 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -89,6 +89,11 @@ impl MapFilterProject { } } + /// The number of columns expected in the output row. + pub fn output_arity(&self) -> usize { + self.projection.len() + } + /// Given two mfps, return an mfp that applies one /// followed by the other. /// Note that the arguments are in the opposite order diff --git a/src/flow/src/expr/relation/accum.rs b/src/flow/src/expr/relation/accum.rs index 37e0f51130..c9affae760 100644 --- a/src/flow/src/expr/relation/accum.rs +++ b/src/flow/src/expr/relation/accum.rs @@ -18,7 +18,9 @@ //! So the overhead is acceptable. //! //! Currently support sum, count, any, all and min/max(with one caveat that min/max can't support delete with aggregate). +//! TODO: think of better ways to not ser/de every time a accum needed to be updated, since it's in a tight loop +use std::any::type_name; use std::fmt::Display; use common_decimal::Decimal128; @@ -39,6 +41,7 @@ use crate::repr::Diff; #[enum_dispatch] pub trait Accumulator: Sized { fn into_state(self) -> Vec; + fn update( &mut self, aggr_fn: &AggregateFunc, @@ -68,6 +71,21 @@ pub struct Bool { falses: Diff, } +impl Bool { + /// Expect two `Diff` type values, one for `true` and one for `false`. + pub fn try_from_iter(iter: &mut I) -> Result + where + I: Iterator, + { + Ok(Self { + trues: Diff::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)?, + falses: Diff::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)?, + }) + } +} + impl TryFrom> for Bool { type Error = EvalError; @@ -78,13 +96,9 @@ impl TryFrom> for Bool { reason: "Bool Accumulator state should have 2 values", } ); - let mut iter = state.into_iter(); - Ok(Self { - trues: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?, - falses: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?, - }) + Self::try_from_iter(&mut iter) } } @@ -157,6 +171,24 @@ pub struct SimpleNumber { non_nulls: Diff, } +impl SimpleNumber { + /// Expect one `Decimal128` and one `Diff` type values. + /// The `Decimal128` type is used to store the sum of all non-NULL values. + /// The `Diff` type is used to count the number of non-NULL values. + pub fn try_from_iter(iter: &mut I) -> Result + where + I: Iterator, + { + Ok(Self { + accum: Decimal128::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)? + .val(), + non_nulls: Diff::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)?, + }) + } +} + impl TryFrom> for SimpleNumber { type Error = EvalError; @@ -168,13 +200,7 @@ impl TryFrom> for SimpleNumber { } ); let mut iter = state.into_iter(); - - Ok(Self { - accum: Decimal128::try_from(iter.next().unwrap()) - .map_err(err_try_from_val)? - .val(), - non_nulls: Diff::try_from(iter.next().unwrap()).map_err(err_try_from_val)?, - }) + Self::try_from_iter(&mut iter) } } @@ -272,6 +298,34 @@ pub struct Float { non_nulls: Diff, } +impl Float { + /// Expect first value to be `OrderedF64` and the rest four values to be `Diff` type values. + pub fn try_from_iter(iter: &mut I) -> Result + where + I: Iterator, + { + let mut ret = Self { + accum: OrderedF64::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)?, + pos_infs: Diff::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)?, + neg_infs: Diff::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)?, + nans: Diff::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)?, + non_nulls: Diff::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)?, + }; + + // This prevent counter-intuitive behavior of summing over no values having non-zero results + if ret.non_nulls == 0 { + ret.accum = OrderedFloat::from(0.0); + } + + Ok(ret) + } +} + impl TryFrom> for Float { type Error = EvalError; @@ -385,6 +439,26 @@ pub struct OrdValue { non_nulls: Diff, } +impl OrdValue { + pub fn try_from_iter(iter: &mut I) -> Result + where + I: Iterator, + { + Ok(Self { + val: { + let v = iter.next().ok_or_else(fail_accum::)?; + if v == Value::Null { + None + } else { + Some(v) + } + }, + non_nulls: Diff::try_from(iter.next().ok_or_else(fail_accum::)?) + .map_err(err_try_from_val)?, + }) + } +} + impl TryFrom> for OrdValue { type Error = EvalError; @@ -593,6 +667,37 @@ impl Accum { }) } + pub fn try_from_iter( + aggr_fn: &AggregateFunc, + iter: &mut impl Iterator, + ) -> Result { + match aggr_fn { + AggregateFunc::Any + | AggregateFunc::All + | AggregateFunc::MaxBool + | AggregateFunc::MinBool => Ok(Self::from(Bool::try_from_iter(iter)?)), + AggregateFunc::SumInt16 + | AggregateFunc::SumInt32 + | AggregateFunc::SumInt64 + | AggregateFunc::SumUInt16 + | AggregateFunc::SumUInt32 + | AggregateFunc::SumUInt64 => Ok(Self::from(SimpleNumber::try_from_iter(iter)?)), + AggregateFunc::SumFloat32 | AggregateFunc::SumFloat64 => { + Ok(Self::from(Float::try_from_iter(iter)?)) + } + f if f.is_max() || f.is_min() || matches!(f, AggregateFunc::Count) => { + Ok(Self::from(OrdValue::try_from_iter(iter)?)) + } + f => Err(InternalSnafu { + reason: format!( + "Accumulator does not support this aggregation function: {:?}", + f + ), + } + .build()), + } + } + /// try to convert a vector of value into given aggregate function's accumulator pub fn try_into_accum(aggr_fn: &AggregateFunc, state: Vec) -> Result { match aggr_fn { @@ -623,6 +728,16 @@ impl Accum { } } +fn fail_accum() -> EvalError { + InternalSnafu { + reason: format!( + "list of values exhausted before a accum of type {} can be build from it", + type_name::() + ), + } + .build() +} + fn err_try_from_val(reason: T) -> EvalError { TryFromValueSnafu { msg: reason.to_string(), @@ -775,7 +890,9 @@ mod test { let mut acc = Accum::new_accum(&aggr_fn)?; acc.update_batch(&aggr_fn, input.clone())?; let row = acc.into_state(); - let acc = Accum::try_into_accum(&aggr_fn, row)?; + let acc = Accum::try_into_accum(&aggr_fn, row.clone())?; + let alter_acc = Accum::try_from_iter(&aggr_fn, &mut row.into_iter())?; + assert_eq!(acc, alter_acc); Ok(acc) }; let acc = match create_and_insert() { diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index 17751423aa..bcee991d64 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -112,18 +112,21 @@ impl AggregateFunc { /// Expect self to be accumulable aggregate function, i.e. sum/count /// /// TODO(discord9): deal with overflow&better accumulator - pub fn eval_diff_accumulable( + pub fn eval_diff_accumulable( &self, - accum: Vec, + accum: A, value_diffs: I, ) -> Result<(Value, Vec), EvalError> where + A: IntoIterator, I: IntoIterator, { - let mut accum = if accum.is_empty() { + let mut accum = accum.into_iter().peekable(); + + let mut accum = if accum.peek().is_none() { Accum::new_accum(self)? } else { - Accum::try_into_accum(self, accum)? + Accum::try_from_iter(self, &mut accum)? }; accum.update_batch(self, value_diffs)?; let res = accum.eval(self)?; diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 51a73d81a4..5e5723e44d 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -21,12 +21,12 @@ mod reduce; use datatypes::arrow::ipc::Map; use serde::{Deserialize, Serialize}; -pub(crate) use self::reduce::{AccumulablePlan, KeyValPlan, ReducePlan}; use crate::adapter::error::Error; use crate::expr::{ AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr, }; use crate::plan::join::JoinPlan; +pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan}; use crate::repr::{ColumnType, DiffRow, RelationType}; /// A plan for a dataflow component. But with type to indicate the output type of the relation. diff --git a/src/flow/src/plan/reduce.rs b/src/flow/src/plan/reduce.rs index 09dc44b37f..85c84a42f3 100644 --- a/src/flow/src/plan/reduce.rs +++ b/src/flow/src/plan/reduce.rs @@ -47,7 +47,33 @@ pub struct AccumulablePlan { /// Each element represents: /// (index of aggr output, index of value among inputs, aggr expr) /// These will all be rendered together in one dataflow fragment. - pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>, - /// Same as above but for all of the `DISTINCT` accumulable aggregations. - pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>, + /// + /// Invariant: the output index is the index of the aggregation in `full_aggrs` + /// which means output index is always smaller than the length of `full_aggrs` + pub simple_aggrs: Vec, + /// Same as `simple_aggrs` but for all of the `DISTINCT` accumulable aggregations. + pub distinct_aggrs: Vec, +} + +/// Invariant: the output index is the index of the aggregation in `full_aggrs` +/// which means output index is always smaller than the length of `full_aggrs` +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] +pub struct AggrWithIndex { + /// aggregation expression + pub expr: AggregateExpr, + /// index of aggr input among input row + pub input_idx: usize, + /// index of aggr output among output row + pub output_idx: usize, +} + +impl AggrWithIndex { + /// Create a new `AggrWithIndex` + pub fn new(expr: AggregateExpr, input_idx: usize, output_idx: usize) -> Self { + Self { + expr, + input_idx, + output_idx, + } + } } diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 4c50a77392..d21da39579 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -160,7 +160,7 @@ impl Row { self.inner.iter() } - /// eturns the number of elements in the row, also known as its 'length'. + /// Returns the number of elements in the row, also known as its 'length'. pub fn len(&self) -> usize { self.inner.len() } diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 7a320dddc8..be4a42a9b5 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -52,7 +52,7 @@ use crate::expr::{ AggregateExpr, AggregateFunc, BinaryFunc, GlobalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr, UnaryFunc, UnmaterializableFunc, VariadicFunc, }; -use crate::plan::{AccumulablePlan, KeyValPlan, Plan, ReducePlan, TypedPlan}; +use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::{DataflowContext, FunctionExtensions}; @@ -265,9 +265,17 @@ impl TypedPlan { reason: "Expect aggregate argument to be transformed into a column at this point", })?; if aggr_expr.distinct { - distinct_aggrs.push((output_column, input_column, aggr_expr.clone())); + distinct_aggrs.push(AggrWithIndex::new( + aggr_expr.clone(), + input_column, + output_column, + )); } else { - simple_aggrs.push((output_column, input_column, aggr_expr.clone())); + simple_aggrs.push(AggrWithIndex::new( + aggr_expr.clone(), + input_column, + output_column, + )); } } let accum_plan = AccumulablePlan { @@ -327,7 +335,7 @@ mod test { }, reduce_plan: ReducePlan::Accumulable(AccumulablePlan { full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![(0, 0, aggr_expr.clone())], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], distinct_aggrs: vec![], }), }), @@ -379,7 +387,7 @@ mod test { }, reduce_plan: ReducePlan::Accumulable(AccumulablePlan { full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![(0, 0, aggr_expr.clone())], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], distinct_aggrs: vec![], }), }), @@ -430,7 +438,7 @@ mod test { }, reduce_plan: ReducePlan::Accumulable(AccumulablePlan { full_aggrs: vec![aggr_expr.clone()], - simple_aggrs: vec![(0, 0, aggr_expr.clone())], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], distinct_aggrs: vec![], }), }), diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index f9e10ec7e3..6a56d8c5d8 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -131,6 +131,8 @@ pub struct Arrangement { /// TODO: batch size balancing? spine: Spine, /// if set to false, will not update current value of the arrangement, useful for case like `map -> arrange -> reduce` + /// + /// in which `arrange` operator only need future updates, and don't need to keep all updates full_arrangement: bool, /// flag to mark that this arrangement haven't been written to, so that it can be cloned and shared is_written: bool, @@ -138,6 +140,7 @@ pub struct Arrangement { expire_state: Option, /// the time that the last compaction happened, also know as current time last_compaction_time: Option, + name: Vec, } impl Arrangement { @@ -149,10 +152,22 @@ impl Arrangement { is_written: false, expire_state: None, last_compaction_time: None, + name: vec![], } } - /// apply updates into spine, all updates should have timestamps that are larger than spine's first key + pub fn new_with_name(name: Vec) -> Self { + Self { + spine: Default::default(), + full_arrangement: false, + is_written: false, + expire_state: None, + last_compaction_time: None, + name, + } + } + + /// apply updates into spine, with no respect of whether the updates are in futures, past, or now /// /// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired pub fn apply_updates( @@ -453,6 +468,11 @@ fn compact_diff_row(old_row: Option, new_row: &DiffRow) -> Option = tokio::sync::RwLockReadGuard<'a, Arrangement>; +/// Simply a type alias for WriteGuard of Arrangement +pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>; + /// A handler to the inner Arrangement, can be cloned and shared, useful for query it's inner state #[derive(Debug)] pub struct ArrangeHandler { @@ -494,16 +514,22 @@ impl ArrangeHandler { /// /// it's a cheap operation, since it's `Arc-ed` and only clone the `Arc` pub fn clone_full_arrange(&self) -> Option { - if self.read().is_written { - return None; + { + let zelf = self.read(); + if !zelf.full_arrangement && zelf.is_written { + return None; + } } - let mut arr = self.write(); - arr.full_arrangement = true; - drop(arr); + + self.write().full_arrangement = true; Some(Self { inner: self.inner.clone(), }) } + + pub fn set_full_arrangement(&self, full: bool) { + self.write().full_arrangement = full; + } } #[cfg(test)] @@ -566,7 +592,7 @@ mod test { } let arr2 = arr.clone_full_arrange(); - assert!(arr2.is_none()); + assert!(arr2.is_some()); { let mut arr = arr.write(); assert_eq!(arr.spine.len(), 3);