From 93561291e4b574ba88634a2d0d5039feea00ff63 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Fri, 1 Sep 2023 14:29:56 +0800 Subject: [PATCH] support more binary function --- src/flow/src/compute/mod.rs | 6 ++ src/flow/src/compute/render/mod.rs | 112 +++++++++++++++++++- src/flow/src/expr/func.rs | 158 ++++++++++++++++++++++++++++- src/flow/src/expr/relation/func.rs | 113 +++++++++++++++++++++ src/flow/src/storage/errors.rs | 1 + src/flow/src/storage/source.rs | 0 6 files changed, 385 insertions(+), 5 deletions(-) create mode 100644 src/flow/src/storage/source.rs diff --git a/src/flow/src/compute/mod.rs b/src/flow/src/compute/mod.rs index 2f079c7ed6..023785a70a 100644 --- a/src/flow/src/compute/mod.rs +++ b/src/flow/src/compute/mod.rs @@ -7,3 +7,9 @@ mod typedefs; mod types; pub use context::Context; + +// TODO(discord9): make a simplified version of source/sink +// sink: simply get rows out of sinked collection/err collection and put it somewhere +// (R, T, D) row of course with since/until frontier to limit + +// source: simply insert stuff into it diff --git a/src/flow/src/compute/render/mod.rs b/src/flow/src/compute/render/mod.rs index 6e379626cf..62b65bf98b 100644 --- a/src/flow/src/compute/render/mod.rs +++ b/src/flow/src/compute/render/mod.rs @@ -98,7 +98,6 @@ where pub fn render_plan(&mut self, plan: Plan) -> CollectionBundle { match plan { Plan::Constant { rows } => { - dbg!(&rows); let (rows, errs) = match rows { Ok(rows) => (rows, Vec::new()), Err(err) => (Vec::new(), vec![err]), @@ -108,7 +107,6 @@ where let ok_collection = rows .into_iter() .filter_map(move |(row, mut time, diff)| { - dbg!(&row); time.advance_by(since_frontier.borrow()); if !until.less_equal(&time) { Some(( @@ -226,12 +224,120 @@ where #[cfg(test)] mod test { - use differential_dataflow::input::InputSession; + use std::any::Any; + use std::rc::Rc; + + use datatypes::prelude::ConcreteDataType; + use datatypes::value::Value; + use differential_dataflow::input::{Input, InputSession}; + use differential_dataflow::Collection; use timely::dataflow::scopes::Child; + use timely::dataflow::Stream; + use timely::Config; use super::*; use crate::expr::{GlobalId, LocalId}; use crate::repr::Diff; + type OkStream = Stream; + type ErrStream = Stream; + type OkCollection = Collection; + type ErrCollection = Collection; + /// used as a token to prevent certain resources from being dropped + type AnyToken = Rc; + struct MockSourceToken { + handle: InputSession, + err_handle: InputSession, + } + + fn mock_input_session(input: &mut InputSession) { + // TODO: mock a cpu usage monotonic input with timestamp + // cpu, mem, ts + // f32, f32, DateTime + let schema = [ + ConcreteDataType::float32_datatype(), + ConcreteDataType::float32_datatype(), + ConcreteDataType::datetime_datatype(), + ]; + let cnt = 50; + let arrs = (0..cnt).map(|i| (i as f32 / cnt as f32, i as f32 / cnt as f32, i)); + // need more mechanism to make timestamp also timestamp here + for (cpu, mem, ts) in arrs { + input.update( + Row::pack(vec![cpu.into(), mem.into(), Value::DateTime(ts.into())]), + 1, + ); + input.advance_to(ts as u64) + } + input.flush(); + } + + #[test] + fn test_simple_poc_with_input_built() { + // 1. build dataflow with input collection connected + // 2. give input + // type annotation is needed to prevent rust-analyzer to give up type deduction + + // simple give dataflow information + // will be build by given dataflow information from other nodes later + let dataflow = { + let reduce_group_by_window = vec![BuildDesc { + id: GlobalId::User(1), + plan: Plan::Constant { + rows: Ok(vec![(Row::default(), 0, 1)]), + }, + }]; + let mut dataflow = DataflowDescription::::new("test".to_string()); + dataflow.objects_to_build = reduce_group_by_window; + dataflow + }; + + timely::execute(Config::thread(), move |worker| { + println!("worker: {:?}", worker.index()); + let mut input = InputSession::::new(); + worker.dataflow_named( + "ProofOfConcept", + |scope: &mut Child<'_, _, repr::Timestamp>| { + let mut test_ctx = + Context::<_, Row, _>::for_dataflow_in(&dataflow, scope.clone()); + + let ok_collection = input.to_collection(scope); + let (err_handle, err_collection) = scope.new_collection(); + let input_collection = + CollectionBundle::<_, _, repr::Timestamp>::from_collections( + ok_collection, + err_collection, + ); + + // TODO: generate `import_sources` from `dataflow.source_imports` + let import_sources = vec![(Id::Global(GlobalId::User(0)), input_collection)]; + + // import sources + for (id, collection) in import_sources { + test_ctx.insert_id(id, collection); + } + + for build_desc in &dataflow.objects_to_build { + test_ctx.build_object(build_desc.clone()); + } + + dbg!(test_ctx.bindings.keys()); + + // TODO: export sinks + let sink_ids = [GlobalId::User(0)]; + + for sink in sink_ids { + let inspect = test_ctx + .lookup_id(Id::Global(sink)) + .unwrap() + .as_specific_collection(None); + inspect.0.inspect(|x| println!("{:?}", x)); + } + }, + ); + mock_input_session(&mut input); + }) + .expect("Computation terminated abnormally"); + } #[test] #[allow(clippy::print_stdout)] fn test_constant_plan_render() { diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index e4f8080cbe..fc541c40cc 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -19,8 +19,54 @@ impl UnaryFunc { } } +/// TODO: support more binary functions for more types #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] -pub enum BinaryFunc {} +pub enum BinaryFunc { + Eq, + NotEq, + Lt, + Lte, + Gt, + Gte, + AddInt16, + AddInt32, + AddInt64, + AddUInt16, + AddUInt32, + AddUInt64, + AddFloat32, + AddFloat64, + SubInt16, + SubInt32, + SubInt64, + SubUInt16, + SubUInt32, + SubUInt64, + SubFloat32, + SubFloat64, + MulInt16, + MulInt32, + MulInt64, + MulUInt16, + MulUInt32, + MulUInt64, + MulFloat32, + MulFloat64, + DivInt16, + DivInt32, + DivInt64, + DivUInt16, + DivUInt32, + DivUInt64, + DivFloat32, + DivFloat64, + ModInt16, + ModInt32, + ModInt64, + ModUInt16, + ModUInt32, + ModUInt64, +} impl BinaryFunc { pub fn eval( @@ -29,7 +75,60 @@ impl BinaryFunc { expr1: &ScalarExpr, expr2: &ScalarExpr, ) -> Result { - todo!() + let left = expr1.eval(values)?; + let right = expr2.eval(values)?; + match self { + Self::Eq => Ok(Value::from(left == right)), + Self::NotEq => Ok(Value::from(left != right)), + Self::Lt => Ok(Value::from(left < right)), + Self::Lte => Ok(Value::from(left <= right)), + Self::Gt => Ok(Value::from(left > right)), + Self::Gte => Ok(Value::from(left >= right)), + Self::AddInt16 => Ok(add::(left, right)?), + Self::AddInt32 => Ok(add::(left, right)?), + Self::AddInt64 => Ok(add::(left, right)?), + Self::AddUInt16 => Ok(add::(left, right)?), + Self::AddUInt32 => Ok(add::(left, right)?), + Self::AddUInt64 => Ok(add::(left, right)?), + Self::AddFloat32 => Ok(add::(left, right)?), + Self::AddFloat64 => Ok(add::(left, right)?), + + Self::SubInt16 => Ok(sub::(left, right)?), + Self::SubInt32 => Ok(sub::(left, right)?), + Self::SubInt64 => Ok(sub::(left, right)?), + Self::SubUInt16 => Ok(sub::(left, right)?), + Self::SubUInt32 => Ok(sub::(left, right)?), + Self::SubUInt64 => Ok(sub::(left, right)?), + Self::SubFloat32 => Ok(sub::(left, right)?), + Self::SubFloat64 => Ok(sub::(left, right)?), + + Self::MulInt16 => Ok(mul::(left, right)?), + Self::MulInt32 => Ok(mul::(left, right)?), + Self::MulInt64 => Ok(mul::(left, right)?), + Self::MulUInt16 => Ok(mul::(left, right)?), + Self::MulUInt32 => Ok(mul::(left, right)?), + Self::MulUInt64 => Ok(mul::(left, right)?), + Self::MulFloat32 => Ok(mul::(left, right)?), + Self::MulFloat64 => Ok(mul::(left, right)?), + + Self::DivInt16 => Ok(div::(left, right)?), + Self::DivInt32 => Ok(div::(left, right)?), + Self::DivInt64 => Ok(div::(left, right)?), + Self::DivUInt16 => Ok(div::(left, right)?), + Self::DivUInt32 => Ok(div::(left, right)?), + Self::DivUInt64 => Ok(div::(left, right)?), + Self::DivFloat32 => Ok(div::(left, right)?), + Self::DivFloat64 => Ok(div::(left, right)?), + + Self::ModInt16 => Ok(rem::(left, right)?), + Self::ModInt32 => Ok(rem::(left, right)?), + Self::ModInt64 => Ok(rem::(left, right)?), + Self::ModUInt16 => Ok(rem::(left, right)?), + Self::ModUInt32 => Ok(rem::(left, right)?), + Self::ModUInt64 => Ok(rem::(left, right)?), + + _ => todo!(), + } } } @@ -41,3 +140,58 @@ impl VariadicFunc { todo!() } } + +fn add(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Add, + >::Error: std::fmt::Debug, + Value: From, +{ + let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + Ok(Value::from(left + right)) +} + +fn sub(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Sub, + >::Error: std::fmt::Debug, + Value: From, +{ + let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + Ok(Value::from(left - right)) +} + +fn mul(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Mul, + >::Error: std::fmt::Debug, + Value: From, +{ + let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + Ok(Value::from(left * right)) +} + +fn div(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Div, + >::Error: std::fmt::Debug, + Value: From, +{ + let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + Ok(Value::from(left / right)) +} + +fn rem(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Rem, + >::Error: std::fmt::Debug, + Value: From, +{ + let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?; + Ok(Value::from(left % right)) +} diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index 039c857dbe..6561524a8f 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -48,6 +48,7 @@ impl AggregateFunc { where I: IntoIterator, { + // TODO: impl more functions like min/max/sumTimestamp etc. match self { AggregateFunc::MaxInt16 => max_value::(values), AggregateFunc::MaxInt32 => max_value::(values), @@ -57,11 +58,53 @@ impl AggregateFunc { AggregateFunc::MaxUInt64 => max_value::(values), AggregateFunc::MaxFloat32 => max_value::(values), AggregateFunc::MaxFloat64 => max_value::(values), + AggregateFunc::MaxBool => max_value::(values), + AggregateFunc::MaxString => max_string(values), + + AggregateFunc::MinInt16 => min_value::(values), + AggregateFunc::MinInt32 => min_value::(values), + AggregateFunc::MinInt64 => min_value::(values), + AggregateFunc::MinUInt16 => min_value::(values), + AggregateFunc::MinUInt32 => min_value::(values), + AggregateFunc::MinUInt64 => min_value::(values), + AggregateFunc::MinFloat32 => min_value::(values), + AggregateFunc::MinFloat64 => min_value::(values), + AggregateFunc::MinBool => min_value::(values), + AggregateFunc::MinString => min_string(values), + + AggregateFunc::SumInt16 => sum_value::(values), + AggregateFunc::SumInt32 => sum_value::(values), + AggregateFunc::SumInt64 => sum_value::(values), + AggregateFunc::SumUInt16 => sum_value::(values), + AggregateFunc::SumUInt32 => sum_value::(values), + AggregateFunc::SumUInt64 => sum_value::(values), + AggregateFunc::SumFloat32 => sum_value::(values), + AggregateFunc::SumFloat64 => sum_value::(values), + + AggregateFunc::Count => count(values), + AggregateFunc::All => all(values), + AggregateFunc::Any => any(values), _ => todo!(), } } } +fn max_string(values: I) -> Value +where + I: IntoIterator, +{ + match values.into_iter().filter(|d| !d.is_null()).max_by(|a, b| { + let a = a.as_value_ref(); + let a = a.as_string().expect("unexpected type").unwrap(); + let b = b.as_value_ref(); + let b = b.as_string().expect("unexpected type").unwrap(); + a.cmp(b) + }) { + Some(v) => v, + None => Value::Null, + } +} + fn max_value(values: I) -> Value where I: IntoIterator, @@ -77,6 +120,22 @@ where x.into() } +fn min_string(values: I) -> Value +where + I: IntoIterator, +{ + match values.into_iter().filter(|d| !d.is_null()).min_by(|a, b| { + let a = a.as_value_ref(); + let a = a.as_string().expect("unexpected type").unwrap(); + let b = b.as_value_ref(); + let b = b.as_string().expect("unexpected type").unwrap(); + a.cmp(b) + }) { + Some(v) => v, + None => Value::Null, + } +} + fn min_value(values: I) -> Value where I: IntoIterator, @@ -91,3 +150,57 @@ where .min(); x.into() } + +fn sum_value(values: I) -> Value +where + I: IntoIterator, + ValueType: TryFrom, + >::Error: std::fmt::Debug, + Value: From>, + ResultType: From + std::iter::Sum + Into, +{ + // If no row qualifies, then the result of COUNT is 0 (zero), and the result of any other aggregate function is the null value. + let mut values = values.into_iter().filter(|v| !v.is_null()).peekable(); + if values.peek().is_none() { + Value::Null + } else { + let x = values + .map(|v| ResultType::from(ValueType::try_from(v).expect("unexpected type"))) + .sum::(); + x.into() + } +} + +fn count(values: I) -> Value +where + I: IntoIterator, +{ + let x = values.into_iter().filter(|v| !v.is_null()).count() as i64; + Value::from(x) +} + +fn any(datums: I) -> Value +where + I: IntoIterator, +{ + datums + .into_iter() + .fold(Value::Boolean(false), |state, next| match (state, next) { + (Value::Boolean(true), _) | (_, Value::Boolean(true)) => Value::Boolean(true), + (Value::Null, _) | (_, Value::Null) => Value::Null, + _ => Value::Boolean(false), + }) +} + +fn all(datums: I) -> Value +where + I: IntoIterator, +{ + datums + .into_iter() + .fold(Value::Boolean(true), |state, next| match (state, next) { + (Value::Boolean(false), _) | (_, Value::Boolean(false)) => Value::Boolean(false), + (Value::Null, _) | (_, Value::Null) => Value::Null, + _ => Value::Boolean(true), + }) +} diff --git a/src/flow/src/storage/errors.rs b/src/flow/src/storage/errors.rs index f8dd8236a7..458e6266f5 100644 --- a/src/flow/src/storage/errors.rs +++ b/src/flow/src/storage/errors.rs @@ -15,6 +15,7 @@ impl From for DataflowError { #[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)] pub enum EvalError { DivisionByZero, + TypeMismatch(String), InvalidArgument(String), Internal(String), } diff --git a/src/flow/src/storage/source.rs b/src/flow/src/storage/source.rs new file mode 100644 index 0000000000..e69de29bb2