eval func

This commit is contained in:
Discord9
2023-08-23 15:44:05 +08:00
parent 51083b12bd
commit 9f59d68391
4 changed files with 55 additions and 9 deletions

View File

@@ -4,7 +4,9 @@ mod reduce;
use std::collections::BTreeMap;
use join::JoinPlan;
pub(crate) use reduce::{convert_indexes_to_skips, AccumulablePlan, KeyValPlan, ReducePlan, BucketedPlan};
pub(crate) use reduce::{
convert_indexes_to_skips, AccumulablePlan, BucketedPlan, KeyValPlan, ReducePlan,
};
use serde::{Deserialize, Serialize};
use crate::expr::{Id, LocalId, MapFilterProject, ScalarExpr, TableFunc};
@@ -217,4 +219,4 @@ fn bucketing_of_expected_group_size(expected_group_size: Option<u64>) -> Vec<u64
buckets.reverse();
buckets
}
}

View File

@@ -1,12 +1,13 @@
use differential_dataflow::ExchangeData;
use std::hash::Hash;
use differential_dataflow::ExchangeData;
use crate::repr::Row;
/// Used to make possibly-validating code generic: think of this as a kind of `MaybeResult`,
/// specialized for use in compute. Validation code will only run when the error constructor is
/// Some.
pub(super) trait MaybeValidatingRow<T, E>: ExchangeData + Hash {
pub(super) trait MaybeValidatingRow<T, E>: ExchangeData + Hash {
fn ok(t: T) -> Self;
fn into_error() -> Option<fn(E) -> Self>;
}

View File

@@ -21,8 +21,8 @@ use crate::expr::Id;
use crate::repr::{self, Row};
use crate::storage::errors::DataflowError;
mod reduce;
mod error;
mod reduce;
/// Assemble the "compute" side of a dataflow, i.e. all but the sources.
///

View File

@@ -1,4 +1,5 @@
use datatypes::value::Value;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{OrderedF32, OrderedF64, Value};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
@@ -43,8 +44,50 @@ pub enum AggregateFunc {
}
impl AggregateFunc {
pub fn eval<I>(&self, values: I)->Value
where I: IntoIterator<Item = Value>{
todo!()
pub fn eval<I>(&self, values: I) -> Value
where
I: IntoIterator<Item = Value>,
{
match self {
AggregateFunc::MaxInt16 => max_value::<I, i16>(values),
AggregateFunc::MaxInt32 => max_value::<I, i32>(values),
AggregateFunc::MaxInt64 => max_value::<I, i64>(values),
AggregateFunc::MaxUInt16 => max_value::<I, u16>(values),
AggregateFunc::MaxUInt32 => max_value::<I, u32>(values),
AggregateFunc::MaxUInt64 => max_value::<I, u64>(values),
AggregateFunc::MaxFloat32 => max_value::<I, OrderedF32>(values),
AggregateFunc::MaxFloat64 => max_value::<I, OrderedF64>(values),
_ => todo!(),
}
}
}
fn max_value<I, TypedValue>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
TypedValue: TryFrom<Value> + Ord,
<TypedValue as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<Option<TypedValue>>,
{
let x: Option<TypedValue> = values
.into_iter()
.filter(|v| !v.is_null())
.map(|v| TypedValue::try_from(v).expect("unexpected type"))
.max();
x.into()
}
fn min_value<I, TypedValue>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
TypedValue: TryFrom<Value> + Ord,
<TypedValue as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<Option<TypedValue>>,
{
let x: Option<TypedValue> = values
.into_iter()
.filter(|v| !v.is_null())
.map(|v| TypedValue::try_from(v).expect("unexpected type"))
.min();
x.into()
}