diff --git a/src/flow/src/compute/render/mod.rs b/src/flow/src/compute/render/mod.rs index 62b65bf98b..6477323341 100644 --- a/src/flow/src/compute/render/mod.rs +++ b/src/flow/src/compute/render/mod.rs @@ -225,6 +225,7 @@ where #[cfg(test)] mod test { use std::any::Any; + use std::collections::{BTreeMap, BTreeSet}; use std::rc::Rc; use datatypes::prelude::ConcreteDataType; @@ -236,7 +237,13 @@ mod test { use timely::Config; use super::*; - use crate::expr::{GlobalId, LocalId}; + use crate::compute::plan::{ + AccumulablePlan, AvailableCollections, GetPlan, KeyValPlan, ReducePlan, + }; + use crate::expr::{ + AggregateExpr, BinaryFunc, GlobalId, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, + UnaryFunc, + }; use crate::repr::Diff; type OkStream = Stream; type ErrStream = Stream; @@ -249,7 +256,7 @@ mod test { err_handle: InputSession, } - fn mock_input_session(input: &mut InputSession) { + fn mock_input_session(input: &mut InputSession, cnt: i64) { // TODO: mock a cpu usage monotonic input with timestamp // cpu, mem, ts // f32, f32, DateTime @@ -258,7 +265,6 @@ mod test { ConcreteDataType::float32_datatype(), ConcreteDataType::datetime_datatype(), ]; - let cnt = 50; let arrs = (0..cnt).map(|i| (i as f32 / cnt as f32, i as f32 / cnt as f32, i)); // need more mechanism to make timestamp also timestamp here for (cpu, mem, ts) in arrs { @@ -271,26 +277,14 @@ mod test { input.flush(); } - #[test] - fn test_simple_poc_with_input_built() { - // 1. build dataflow with input collection connected - // 2. give input - // type annotation is needed to prevent rust-analyzer to give up type deduction - - // simple give dataflow information - // will be build by given dataflow information from other nodes later - let dataflow = { - let reduce_group_by_window = vec![BuildDesc { - id: GlobalId::User(1), - plan: Plan::Constant { - rows: Ok(vec![(Row::default(), 0, 1)]), - }, - }]; - let mut dataflow = DataflowDescription::::new("test".to_string()); - dataflow.objects_to_build = reduce_group_by_window; - dataflow - }; - + // a simple test to see if the dataflow can be built and run + fn exec_dataflow( + input_id: Vec, + dataflow: DataflowDescription, + sink_ids: Vec, + output_keys: Vec>>, + input_mock_length: i64, + ) { timely::execute(Config::thread(), move |worker| { println!("worker: {:?}", worker.index()); let mut input = InputSession::::new(); @@ -309,7 +303,11 @@ mod test { ); // TODO: generate `import_sources` from `dataflow.source_imports` - let import_sources = vec![(Id::Global(GlobalId::User(0)), input_collection)]; + let import_sources: Vec<_> = input_id + .clone() + .into_iter() + .zip(vec![input_collection]) + .collect(); // import sources for (id, collection) in import_sources { @@ -323,21 +321,269 @@ mod test { dbg!(test_ctx.bindings.keys()); // TODO: export sinks - let sink_ids = [GlobalId::User(0)]; - for sink in sink_ids { - let inspect = test_ctx - .lookup_id(Id::Global(sink)) - .unwrap() - .as_specific_collection(None); - inspect.0.inspect(|x| println!("{:?}", x)); + for (sink, output_key) in sink_ids.iter().zip(output_keys.iter()) { + let sink = *sink; + println!("Inspecting sink {:?}", sink.clone()); + let inspect = test_ctx.lookup_id(Id::Global(sink)).unwrap(); + dbg!(inspect.collection.is_some()); + dbg!(inspect.arranged.keys()); + let inspect = inspect.as_specific_collection(output_key.as_deref()); + inspect + .0 + .inspect(move |x| println!("inspect {:?} {:?}", sink.clone(), x)); } }, ); - mock_input_session(&mut input); + mock_input_session(&mut input, input_mock_length); }) .expect("Computation terminated abnormally"); } + + #[test] + fn test_simple_poc_reduce_group_by() { + // 1. build dataflow with input collection connected + // 2. give input + // type annotation is needed to prevent rust-analyzer to give up type deduction + + // simple give dataflow information + // will be build by given dataflow information from other nodes later + // key is the third column + let place_holder = + ScalarExpr::Literal(Ok(Value::Boolean(true)), ConcreteDataType::int64_datatype()); + + let count_col = |i: usize| AggregateExpr { + func: crate::expr::AggregateFunc::Count, + expr: ScalarExpr::Column(i), + distinct: false, + }; + let sum_col = |i: usize| AggregateExpr { + func: crate::expr::AggregateFunc::SumFloat32, + expr: ScalarExpr::Column(i), + distinct: false, + }; + // equal to `SELECT minute, SUM(cpu) FROM input GROUP BY ts/300 as minute; + // cpu, mem, ts + // --map--> cpu, mem, ts/300 + // --reduce--> ts/300, AVG(cpu), AVG(mem) + let cast_datetime = ScalarExpr::CallUnary { + func: UnaryFunc::CastDatetimeToInt64, + expr: Box::new(ScalarExpr::Column(2)), + }; + let ts_div_5 = ScalarExpr::CallBinary { + func: BinaryFunc::DivInt64, + expr1: Box::new(cast_datetime), + expr2: Box::new(ScalarExpr::Literal( + Ok(Value::Int64(5.into())), + ConcreteDataType::int64_datatype(), + )), + }; + let cast_int64_to_float32 = |i: usize| ScalarExpr::CallUnary { + func: UnaryFunc::CastInt64ToFloat32, + expr: Box::new(ScalarExpr::Column(i)), + }; + let reduce_group_by_window = vec![ + // cpu, mem, ts + // --reduce--> ts/300, SUM(cpu), SUM(mem), COUNT(cpu), COUNT(mem) + // -- map --> ts/300, AVG(cpu), AVG(mem) + BuildDesc { + id: GlobalId::User(0), + plan: Plan::Reduce { + input: Box::new(Plan::Get { + id: Id::Global(GlobalId::System(0)), + keys: AvailableCollections::new_raw(), + plan: GetPlan::Collection( + MapFilterProject::new(3).map([ts_div_5]).project([0, 1, 3]), + ), + }), + key_val_plan: KeyValPlan { + key_plan: SafeMfpPlan { + mfp: MapFilterProject::new(3).project([2]), + }, + val_plan: SafeMfpPlan { + mfp: MapFilterProject::new(3).project([0, 1]), + }, + }, + // --reduce--> ts/300(key), SUM(cpu), SUM(mem), COUNT(cpu), COUNT(mem) + plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![sum_col(0), sum_col(1), count_col(0), count_col(1)], + simple_aggrs: vec![ + (0, 0, sum_col(0)), + (1, 1, sum_col(1)), + (2, 0, count_col(0)), + (3, 1, count_col(1)), + ], + distinct_aggrs: vec![], + }), + input_key: None, + }, + }, + // 0 1 2 3 4 + // ts/300(key), SUM(cpu), SUM(mem), COUNT(cpu), COUNT(mem), + // -- map --> AVG(cpu), AVG(mem), ts/300 + BuildDesc { + id: GlobalId::User(1), + plan: Plan::Get { + id: Id::Global(GlobalId::User(0)), + // not used since plan is GetPlan::Arrangement + keys: AvailableCollections::new_raw(), + plan: GetPlan::Arrangement( + vec![ScalarExpr::Column(0)], + None, + MapFilterProject::new(5) + .map([ + ScalarExpr::CallBinary { + func: BinaryFunc::DivFloat32, + expr1: Box::new(ScalarExpr::Column(1)), + expr2: Box::new(cast_int64_to_float32(3)), + }, + ScalarExpr::CallBinary { + func: BinaryFunc::DivFloat32, + expr1: Box::new(ScalarExpr::Column(2)), + expr2: Box::new(cast_int64_to_float32(4)), + }, + ]) + .project([0, 5, 6]), + ), + }, + }, + ]; + let input_id = vec![Id::Global(GlobalId::System(0))]; + let dataflow = { + let mut dataflow = DataflowDescription::::new("test".to_string()); + dataflow.objects_to_build = reduce_group_by_window; + dataflow + }; + let sink_ids = [GlobalId::User(0), GlobalId::User(1)]; + exec_dataflow( + input_id.clone(), + dataflow.clone(), + sink_ids.to_vec(), + vec![Some(vec![ScalarExpr::Column(0)]), None], + 10, + ); + } + + #[test] + fn test_simple_poc_reduce_count() { + // 1. build dataflow with input collection connected + // 2. give input + // type annotation is needed to prevent rust-analyzer to give up type deduction + + // simple give dataflow information + // will be build by given dataflow information from other nodes later + // key is the third column + let place_holder = + ScalarExpr::Literal(Ok(Value::Boolean(true)), ConcreteDataType::int64_datatype()); + let key_plan = SafeMfpPlan { + mfp: MapFilterProject::new(3) + .map([place_holder.clone()]) + .project([3]), + }; + let val_plan = SafeMfpPlan { + mfp: MapFilterProject::new(3).project([0, 1, 2]), + }; + let count = AggregateExpr { + func: crate::expr::AggregateFunc::Count, + expr: place_holder, + distinct: false, + }; + // equal to `SELECT COUNT(*) FROM input;` + let reduce_group_by_window = vec![ + // count(true) + BuildDesc { + id: GlobalId::User(0), + plan: Plan::Reduce { + input: Box::new(Plan::Get { + id: Id::Global(GlobalId::System(0)), + keys: AvailableCollections::new_raw(), + plan: GetPlan::Collection(MapFilterProject::new(3)), + }), + key_val_plan: KeyValPlan { key_plan, val_plan }, + plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![count.clone()], + simple_aggrs: vec![(0, 0, count)], + distinct_aggrs: vec![], + }), + input_key: None, + }, + }, + // get second column + BuildDesc { + id: GlobalId::User(1), + plan: Plan::Get { + id: Id::Global(GlobalId::User(0)), + // not used since plan is GetPlan::Arrangement + keys: AvailableCollections::new_raw(), + plan: GetPlan::Arrangement( + vec![ScalarExpr::Column(0)], + None, + MapFilterProject::new(2).project([1]), + ), + }, + }, + ]; + let input_id = vec![Id::Global(GlobalId::System(0))]; + let dataflow = { + let mut dataflow = DataflowDescription::::new("test".to_string()); + dataflow.objects_to_build = reduce_group_by_window; + dataflow + }; + let sink_ids = [GlobalId::User(1)]; + exec_dataflow( + input_id.clone(), + dataflow.clone(), + sink_ids.to_vec(), + vec![None], + 10, + ); + } + + #[test] + fn test_simple_poc_reduce_distinct() { + // 1. build dataflow with input collection connected + // 2. give input + // type annotation is needed to prevent rust-analyzer to give up type deduction + + // simple give dataflow information + // will be build by given dataflow information from other nodes later + // window need date_trunc which is still WIP + // key is the third column + let key_plan = SafeMfpPlan { + mfp: MapFilterProject::new(3).project([2]), + }; + let val_plan = SafeMfpPlan { + mfp: MapFilterProject::new(3).project([0, 1]), + }; + // equal to `SELECT ts, COUNT(*) FROM input GROUP BY ts;` + let reduce_plan = vec![BuildDesc { + id: GlobalId::User(0), + plan: Plan::Reduce { + input: Box::new(Plan::Get { + id: Id::Global(GlobalId::System(0)), + keys: AvailableCollections::new_raw(), + plan: GetPlan::Collection(MapFilterProject::new(3)), + }), + key_val_plan: KeyValPlan { key_plan, val_plan }, + plan: ReducePlan::Distinct, + input_key: None, + }, + }]; + let input_id = vec![Id::Global(GlobalId::System(0))]; + let dataflow = { + let mut dataflow = DataflowDescription::::new("test".to_string()); + dataflow.objects_to_build = reduce_plan; + dataflow + }; + let sink_ids = [GlobalId::User(0)]; + exec_dataflow( + input_id.clone(), + dataflow.clone(), + sink_ids.to_vec(), + vec![Some(vec![ScalarExpr::Column(0)])], + 10, + ); + } #[test] #[allow(clippy::print_stdout)] fn test_constant_plan_render() { diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 90b4ae71ae..272a8b427b 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -172,7 +172,7 @@ where // TODO(discord9): impl Distinct&Accumulate first _ => todo!(), }; - todo!() + arrangement } /// Build the dataflow to compute the set of distinct keys. diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index fc541c40cc..08c9359757 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -11,11 +11,38 @@ pub enum UnaryFunc { IsNull, IsTrue, IsFalse, + CastDatetimeToInt64, + CastInt64ToFloat32, } impl UnaryFunc { pub fn eval(&self, values: &[Value], expr: &ScalarExpr) -> Result { - todo!() + let arg = expr.eval(values)?; + match self { + Self::CastDatetimeToInt64 => { + let datetime = if let Value::DateTime(datetime) = arg { + Ok(datetime.val()) + } else { + Err(EvalError::TypeMismatch(format!( + "cannot cast {:?} to datetime", + arg + ))) + }?; + Ok(Value::from(datetime)) + } + Self::CastInt64ToFloat32 => { + let int64 = if let Value::Int64(int64) = arg { + Ok(int64) + } else { + Err(EvalError::TypeMismatch(format!( + "cannot cast {:?} to int64", + arg + ))) + }?; + Ok(Value::from(int64 as f32)) + } + _ => todo!(), + } } } diff --git a/src/flow/src/expr/mod.rs b/src/flow/src/expr/mod.rs index 04868d4f80..ee3c25721b 100644 --- a/src/flow/src/expr/mod.rs +++ b/src/flow/src/expr/mod.rs @@ -14,7 +14,7 @@ pub use linear::{MapFilterProject, SafeMfpPlan}; pub(crate) use relation::{AggregateExpr, AggregateFunc, TableFunc}; use serde::{Deserialize, Serialize}; -use crate::expr::func::{BinaryFunc, UnaryFunc, VariadicFunc}; +pub(crate) use crate::expr::func::{BinaryFunc, UnaryFunc, VariadicFunc}; use crate::storage::errors::EvalError; #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]