support more binary function

This commit is contained in:
Discord9
2023-09-01 14:29:56 +08:00
parent 9f59d68391
commit 93561291e4
6 changed files with 385 additions and 5 deletions

View File

@@ -7,3 +7,9 @@ mod typedefs;
mod types;
pub use context::Context;
// TODO(discord9): make a simplified version of source/sink
// sink: simply get rows out of sinked collection/err collection and put it somewhere
// (R, T, D) row of course with since/until frontier to limit
// source: simply insert stuff into it

View File

@@ -98,7 +98,6 @@ where
pub fn render_plan(&mut self, plan: Plan) -> CollectionBundle<S, Row> {
match plan {
Plan::Constant { rows } => {
dbg!(&rows);
let (rows, errs) = match rows {
Ok(rows) => (rows, Vec::new()),
Err(err) => (Vec::new(), vec![err]),
@@ -108,7 +107,6 @@ where
let ok_collection = rows
.into_iter()
.filter_map(move |(row, mut time, diff)| {
dbg!(&row);
time.advance_by(since_frontier.borrow());
if !until.less_equal(&time) {
Some((
@@ -226,12 +224,120 @@ where
#[cfg(test)]
mod test {
use differential_dataflow::input::InputSession;
use std::any::Any;
use std::rc::Rc;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use differential_dataflow::input::{Input, InputSession};
use differential_dataflow::Collection;
use timely::dataflow::scopes::Child;
use timely::dataflow::Stream;
use timely::Config;
use super::*;
use crate::expr::{GlobalId, LocalId};
use crate::repr::Diff;
type OkStream<G> = Stream<G, (Row, repr::Timestamp, Diff)>;
type ErrStream<G> = Stream<G, (DataflowError, repr::Timestamp, Diff)>;
type OkCollection<G> = Collection<G, Row, Diff>;
type ErrCollection<G> = Collection<G, DataflowError, Diff>;
/// used as a token to prevent certain resources from being dropped
type AnyToken = Rc<dyn Any>;
struct MockSourceToken {
handle: InputSession<repr::Timestamp, Row, Diff>,
err_handle: InputSession<repr::Timestamp, DataflowError, Diff>,
}
fn mock_input_session(input: &mut InputSession<repr::Timestamp, Row, Diff>) {
// TODO: mock a cpu usage monotonic input with timestamp
// cpu, mem, ts
// f32, f32, DateTime
let schema = [
ConcreteDataType::float32_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::datetime_datatype(),
];
let cnt = 50;
let arrs = (0..cnt).map(|i| (i as f32 / cnt as f32, i as f32 / cnt as f32, i));
// need more mechanism to make timestamp also timestamp here
for (cpu, mem, ts) in arrs {
input.update(
Row::pack(vec![cpu.into(), mem.into(), Value::DateTime(ts.into())]),
1,
);
input.advance_to(ts as u64)
}
input.flush();
}
#[test]
fn test_simple_poc_with_input_built() {
// 1. build dataflow with input collection connected
// 2. give input
// type annotation is needed to prevent rust-analyzer to give up type deduction
// simple give dataflow information
// will be build by given dataflow information from other nodes later
let dataflow = {
let reduce_group_by_window = vec![BuildDesc {
id: GlobalId::User(1),
plan: Plan::Constant {
rows: Ok(vec![(Row::default(), 0, 1)]),
},
}];
let mut dataflow = DataflowDescription::<Plan, ()>::new("test".to_string());
dataflow.objects_to_build = reduce_group_by_window;
dataflow
};
timely::execute(Config::thread(), move |worker| {
println!("worker: {:?}", worker.index());
let mut input = InputSession::<repr::Timestamp, Row, Diff>::new();
worker.dataflow_named(
"ProofOfConcept",
|scope: &mut Child<'_, _, repr::Timestamp>| {
let mut test_ctx =
Context::<_, Row, _>::for_dataflow_in(&dataflow, scope.clone());
let ok_collection = input.to_collection(scope);
let (err_handle, err_collection) = scope.new_collection();
let input_collection =
CollectionBundle::<_, _, repr::Timestamp>::from_collections(
ok_collection,
err_collection,
);
// TODO: generate `import_sources` from `dataflow.source_imports`
let import_sources = vec![(Id::Global(GlobalId::User(0)), input_collection)];
// import sources
for (id, collection) in import_sources {
test_ctx.insert_id(id, collection);
}
for build_desc in &dataflow.objects_to_build {
test_ctx.build_object(build_desc.clone());
}
dbg!(test_ctx.bindings.keys());
// TODO: export sinks
let sink_ids = [GlobalId::User(0)];
for sink in sink_ids {
let inspect = test_ctx
.lookup_id(Id::Global(sink))
.unwrap()
.as_specific_collection(None);
inspect.0.inspect(|x| println!("{:?}", x));
}
},
);
mock_input_session(&mut input);
})
.expect("Computation terminated abnormally");
}
#[test]
#[allow(clippy::print_stdout)]
fn test_constant_plan_render() {

View File

@@ -19,8 +19,54 @@ impl UnaryFunc {
}
}
/// TODO: support more binary functions for more types
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub enum BinaryFunc {}
pub enum BinaryFunc {
Eq,
NotEq,
Lt,
Lte,
Gt,
Gte,
AddInt16,
AddInt32,
AddInt64,
AddUInt16,
AddUInt32,
AddUInt64,
AddFloat32,
AddFloat64,
SubInt16,
SubInt32,
SubInt64,
SubUInt16,
SubUInt32,
SubUInt64,
SubFloat32,
SubFloat64,
MulInt16,
MulInt32,
MulInt64,
MulUInt16,
MulUInt32,
MulUInt64,
MulFloat32,
MulFloat64,
DivInt16,
DivInt32,
DivInt64,
DivUInt16,
DivUInt32,
DivUInt64,
DivFloat32,
DivFloat64,
ModInt16,
ModInt32,
ModInt64,
ModUInt16,
ModUInt32,
ModUInt64,
}
impl BinaryFunc {
pub fn eval(
@@ -29,7 +75,60 @@ impl BinaryFunc {
expr1: &ScalarExpr,
expr2: &ScalarExpr,
) -> Result<Value, EvalError> {
todo!()
let left = expr1.eval(values)?;
let right = expr2.eval(values)?;
match self {
Self::Eq => Ok(Value::from(left == right)),
Self::NotEq => Ok(Value::from(left != right)),
Self::Lt => Ok(Value::from(left < right)),
Self::Lte => Ok(Value::from(left <= right)),
Self::Gt => Ok(Value::from(left > right)),
Self::Gte => Ok(Value::from(left >= right)),
Self::AddInt16 => Ok(add::<i16>(left, right)?),
Self::AddInt32 => Ok(add::<i32>(left, right)?),
Self::AddInt64 => Ok(add::<i64>(left, right)?),
Self::AddUInt16 => Ok(add::<u16>(left, right)?),
Self::AddUInt32 => Ok(add::<u32>(left, right)?),
Self::AddUInt64 => Ok(add::<u64>(left, right)?),
Self::AddFloat32 => Ok(add::<f32>(left, right)?),
Self::AddFloat64 => Ok(add::<f64>(left, right)?),
Self::SubInt16 => Ok(sub::<i16>(left, right)?),
Self::SubInt32 => Ok(sub::<i32>(left, right)?),
Self::SubInt64 => Ok(sub::<i64>(left, right)?),
Self::SubUInt16 => Ok(sub::<u16>(left, right)?),
Self::SubUInt32 => Ok(sub::<u32>(left, right)?),
Self::SubUInt64 => Ok(sub::<u64>(left, right)?),
Self::SubFloat32 => Ok(sub::<f32>(left, right)?),
Self::SubFloat64 => Ok(sub::<f64>(left, right)?),
Self::MulInt16 => Ok(mul::<i16>(left, right)?),
Self::MulInt32 => Ok(mul::<i32>(left, right)?),
Self::MulInt64 => Ok(mul::<i64>(left, right)?),
Self::MulUInt16 => Ok(mul::<u16>(left, right)?),
Self::MulUInt32 => Ok(mul::<u32>(left, right)?),
Self::MulUInt64 => Ok(mul::<u64>(left, right)?),
Self::MulFloat32 => Ok(mul::<f32>(left, right)?),
Self::MulFloat64 => Ok(mul::<f64>(left, right)?),
Self::DivInt16 => Ok(div::<i16>(left, right)?),
Self::DivInt32 => Ok(div::<i32>(left, right)?),
Self::DivInt64 => Ok(div::<i64>(left, right)?),
Self::DivUInt16 => Ok(div::<u16>(left, right)?),
Self::DivUInt32 => Ok(div::<u32>(left, right)?),
Self::DivUInt64 => Ok(div::<u64>(left, right)?),
Self::DivFloat32 => Ok(div::<f32>(left, right)?),
Self::DivFloat64 => Ok(div::<f64>(left, right)?),
Self::ModInt16 => Ok(rem::<i16>(left, right)?),
Self::ModInt32 => Ok(rem::<i32>(left, right)?),
Self::ModInt64 => Ok(rem::<i64>(left, right)?),
Self::ModUInt16 => Ok(rem::<u16>(left, right)?),
Self::ModUInt32 => Ok(rem::<u32>(left, right)?),
Self::ModUInt64 => Ok(rem::<u64>(left, right)?),
_ => todo!(),
}
}
}
@@ -41,3 +140,58 @@ impl VariadicFunc {
todo!()
}
}
fn add<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Add<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left + right))
}
fn sub<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Sub<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left - right))
}
fn mul<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Mul<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left * right))
}
fn div<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Div<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left / right))
}
fn rem<T>(left: Value, right: Value) -> Result<Value, EvalError>
where
T: TryFrom<Value> + std::ops::Rem<Output = T>,
<T as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<T>,
{
let left = T::try_from(left).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
let right = T::try_from(right).map_err(|e| EvalError::TypeMismatch(format!("{:?}", e)))?;
Ok(Value::from(left % right))
}

View File

@@ -48,6 +48,7 @@ impl AggregateFunc {
where
I: IntoIterator<Item = Value>,
{
// TODO: impl more functions like min/max/sumTimestamp etc.
match self {
AggregateFunc::MaxInt16 => max_value::<I, i16>(values),
AggregateFunc::MaxInt32 => max_value::<I, i32>(values),
@@ -57,11 +58,53 @@ impl AggregateFunc {
AggregateFunc::MaxUInt64 => max_value::<I, u64>(values),
AggregateFunc::MaxFloat32 => max_value::<I, OrderedF32>(values),
AggregateFunc::MaxFloat64 => max_value::<I, OrderedF64>(values),
AggregateFunc::MaxBool => max_value::<I, bool>(values),
AggregateFunc::MaxString => max_string(values),
AggregateFunc::MinInt16 => min_value::<I, i16>(values),
AggregateFunc::MinInt32 => min_value::<I, i32>(values),
AggregateFunc::MinInt64 => min_value::<I, i64>(values),
AggregateFunc::MinUInt16 => min_value::<I, u16>(values),
AggregateFunc::MinUInt32 => min_value::<I, u32>(values),
AggregateFunc::MinUInt64 => min_value::<I, u16>(values),
AggregateFunc::MinFloat32 => min_value::<I, OrderedF32>(values),
AggregateFunc::MinFloat64 => min_value::<I, OrderedF64>(values),
AggregateFunc::MinBool => min_value::<I, bool>(values),
AggregateFunc::MinString => min_string(values),
AggregateFunc::SumInt16 => sum_value::<I, i16, i64>(values),
AggregateFunc::SumInt32 => sum_value::<I, i32, i64>(values),
AggregateFunc::SumInt64 => sum_value::<I, i64, i64>(values),
AggregateFunc::SumUInt16 => sum_value::<I, u16, u64>(values),
AggregateFunc::SumUInt32 => sum_value::<I, u32, u64>(values),
AggregateFunc::SumUInt64 => sum_value::<I, u64, u64>(values),
AggregateFunc::SumFloat32 => sum_value::<I, f32, f32>(values),
AggregateFunc::SumFloat64 => sum_value::<I, f64, f64>(values),
AggregateFunc::Count => count(values),
AggregateFunc::All => all(values),
AggregateFunc::Any => any(values),
_ => todo!(),
}
}
}
fn max_string<I>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
{
match values.into_iter().filter(|d| !d.is_null()).max_by(|a, b| {
let a = a.as_value_ref();
let a = a.as_string().expect("unexpected type").unwrap();
let b = b.as_value_ref();
let b = b.as_string().expect("unexpected type").unwrap();
a.cmp(b)
}) {
Some(v) => v,
None => Value::Null,
}
}
fn max_value<I, TypedValue>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
@@ -77,6 +120,22 @@ where
x.into()
}
fn min_string<I>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
{
match values.into_iter().filter(|d| !d.is_null()).min_by(|a, b| {
let a = a.as_value_ref();
let a = a.as_string().expect("unexpected type").unwrap();
let b = b.as_value_ref();
let b = b.as_string().expect("unexpected type").unwrap();
a.cmp(b)
}) {
Some(v) => v,
None => Value::Null,
}
}
fn min_value<I, TypedValue>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
@@ -91,3 +150,57 @@ where
.min();
x.into()
}
fn sum_value<I, ValueType, ResultType>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
ValueType: TryFrom<Value>,
<ValueType as TryFrom<Value>>::Error: std::fmt::Debug,
Value: From<Option<ValueType>>,
ResultType: From<ValueType> + std::iter::Sum + Into<Value>,
{
// If no row qualifies, then the result of COUNT is 0 (zero), and the result of any other aggregate function is the null value.
let mut values = values.into_iter().filter(|v| !v.is_null()).peekable();
if values.peek().is_none() {
Value::Null
} else {
let x = values
.map(|v| ResultType::from(ValueType::try_from(v).expect("unexpected type")))
.sum::<ResultType>();
x.into()
}
}
fn count<I>(values: I) -> Value
where
I: IntoIterator<Item = Value>,
{
let x = values.into_iter().filter(|v| !v.is_null()).count() as i64;
Value::from(x)
}
fn any<I>(datums: I) -> Value
where
I: IntoIterator<Item = Value>,
{
datums
.into_iter()
.fold(Value::Boolean(false), |state, next| match (state, next) {
(Value::Boolean(true), _) | (_, Value::Boolean(true)) => Value::Boolean(true),
(Value::Null, _) | (_, Value::Null) => Value::Null,
_ => Value::Boolean(false),
})
}
fn all<I>(datums: I) -> Value
where
I: IntoIterator<Item = Value>,
{
datums
.into_iter()
.fold(Value::Boolean(true), |state, next| match (state, next) {
(Value::Boolean(false), _) | (_, Value::Boolean(false)) => Value::Boolean(false),
(Value::Null, _) | (_, Value::Null) => Value::Null,
_ => Value::Boolean(true),
})
}

View File

@@ -15,6 +15,7 @@ impl From<EvalError> for DataflowError {
#[derive(Ord, PartialOrd, Clone, Debug, Eq, Deserialize, Serialize, PartialEq, Hash)]
pub enum EvalError {
DivisionByZero,
TypeMismatch(String),
InvalidArgument(String),
Internal(String),
}

View File