diff --git a/src/flow/src/compute/plan/mod.rs b/src/flow/src/compute/plan/mod.rs index 9e42000891..41705504a0 100644 --- a/src/flow/src/compute/plan/mod.rs +++ b/src/flow/src/compute/plan/mod.rs @@ -4,7 +4,9 @@ mod reduce; use std::collections::BTreeMap; use join::JoinPlan; -pub(crate) use reduce::{convert_indexes_to_skips, AccumulablePlan, KeyValPlan, ReducePlan, BucketedPlan}; +pub(crate) use reduce::{ + convert_indexes_to_skips, AccumulablePlan, BucketedPlan, KeyValPlan, ReducePlan, +}; use serde::{Deserialize, Serialize}; use crate::expr::{Id, LocalId, MapFilterProject, ScalarExpr, TableFunc}; @@ -217,4 +219,4 @@ fn bucketing_of_expected_group_size(expected_group_size: Option) -> Vec: ExchangeData + Hash { +pub(super) trait MaybeValidatingRow: ExchangeData + Hash { fn ok(t: T) -> Self; fn into_error() -> Option Self>; } diff --git a/src/flow/src/compute/render/mod.rs b/src/flow/src/compute/render/mod.rs index a4ddfa6fa8..6e379626cf 100644 --- a/src/flow/src/compute/render/mod.rs +++ b/src/flow/src/compute/render/mod.rs @@ -21,8 +21,8 @@ use crate::expr::Id; use crate::repr::{self, Row}; use crate::storage::errors::DataflowError; -mod reduce; mod error; +mod reduce; /// Assemble the "compute" side of a dataflow, i.e. all but the sources. /// diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index 074b0e6e61..039c857dbe 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -1,4 +1,5 @@ -use datatypes::value::Value; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::{OrderedF32, OrderedF64, Value}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] @@ -43,8 +44,50 @@ pub enum AggregateFunc { } impl AggregateFunc { - pub fn eval(&self, values: I)->Value - where I: IntoIterator{ - todo!() + pub fn eval(&self, values: I) -> Value + where + I: IntoIterator, + { + match self { + AggregateFunc::MaxInt16 => max_value::(values), + AggregateFunc::MaxInt32 => max_value::(values), + AggregateFunc::MaxInt64 => max_value::(values), + AggregateFunc::MaxUInt16 => max_value::(values), + AggregateFunc::MaxUInt32 => max_value::(values), + AggregateFunc::MaxUInt64 => max_value::(values), + AggregateFunc::MaxFloat32 => max_value::(values), + AggregateFunc::MaxFloat64 => max_value::(values), + _ => todo!(), + } } } + +fn max_value(values: I) -> Value +where + I: IntoIterator, + TypedValue: TryFrom + Ord, + >::Error: std::fmt::Debug, + Value: From>, +{ + let x: Option = values + .into_iter() + .filter(|v| !v.is_null()) + .map(|v| TypedValue::try_from(v).expect("unexpected type")) + .max(); + x.into() +} + +fn min_value(values: I) -> Value +where + I: IntoIterator, + TypedValue: TryFrom + Ord, + >::Error: std::fmt::Debug, + Value: From>, +{ + let x: Option = values + .into_iter() + .filter(|v| !v.is_null()) + .map(|v| TypedValue::try_from(v).expect("unexpected type")) + .min(); + x.into() +}