diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 7b31e5a5df..9c75a86ffa 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -15,9 +15,14 @@ //! Scalar expressions. use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Arc; -use arrow::array::{make_array, ArrayData, ArrayRef}; +use arrow::array::{make_array, ArrayData, ArrayRef, BooleanArray}; +use arrow::buffer::BooleanBuffer; +use arrow::compute::or_kleene; use common_error::ext::BoxedError; +use datafusion::physical_expr_common::datum::compare_with_eq; +use datafusion_common::DataFusionError; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::value::Value; use datatypes::vectors::{BooleanVector, Helper, VectorRef}; @@ -92,6 +97,10 @@ pub enum ScalarExpr { then: Box, els: Box, }, + InList { + expr: Box, + list: Vec, + }, } impl ScalarExpr { @@ -137,6 +146,7 @@ impl ScalarExpr { .context(crate::error::ExternalSnafu)?; Ok(ColumnType::new_nullable(typ)) } + ScalarExpr::InList { expr, .. } => expr.typ(context), } } } @@ -222,9 +232,57 @@ impl ScalarExpr { exprs, } => df_scalar_fn.eval_batch(batch, exprs), ScalarExpr::If { cond, then, els } => Self::eval_if_then(batch, cond, then, els), + ScalarExpr::InList { expr, list } => Self::eval_in_list(batch, expr, list), } } + fn eval_in_list( + batch: &Batch, + expr: &ScalarExpr, + list: &[ScalarExpr], + ) -> Result { + let eval_list = list + .iter() + .map(|e| e.eval_batch(batch)) + .collect::, _>>()?; + let eval_expr = expr.eval_batch(batch)?; + + ensure!( + eval_list + .iter() + .all(|v| v.data_type() == eval_expr.data_type()), + TypeMismatchSnafu { + expected: eval_expr.data_type(), + actual: eval_list + .iter() + .find(|v| v.data_type() != eval_expr.data_type()) + .map(|v| v.data_type()) + .unwrap(), + } + ); + + let lhs = eval_expr.to_arrow_array(); + + let found = eval_list + .iter() + .map(|v| v.to_arrow_array()) + .try_fold( + BooleanArray::new(BooleanBuffer::new_unset(batch.row_count()), None), + |result, in_list_elem| -> Result { + let rhs = compare_with_eq(&lhs, &in_list_elem, false)?; + + Ok(or_kleene(&result, &rhs)?) + }, + ) + .with_context(|_| crate::expr::error::DatafusionSnafu { + context: "Failed to compare eval_expr with eval_list", + })?; + + let res = BooleanVector::from(found); + + Ok(Arc::new(res)) + } + /// NOTE: this if then eval impl assume all given expr are pure, and will not change the state of the world /// since it will evaluate both then and else branch and filter the result fn eval_if_then( @@ -337,6 +395,15 @@ impl ScalarExpr { df_scalar_fn, exprs, } => df_scalar_fn.eval(values, exprs), + ScalarExpr::InList { expr, list } => { + let eval_expr = expr.eval(values)?; + let eval_list = list + .iter() + .map(|v| v.eval(values)) + .collect::, _>>()?; + let found = eval_list.iter().any(|item| *item == eval_expr); + Ok(Value::Boolean(found)) + } } } @@ -514,6 +581,13 @@ impl ScalarExpr { } Ok(()) } + ScalarExpr::InList { expr, list } => { + f(expr)?; + for item in list { + f(item)?; + } + Ok(()) + } } } @@ -558,6 +632,13 @@ impl ScalarExpr { } Ok(()) } + ScalarExpr::InList { expr, list } => { + f(expr)?; + for item in list { + f(item)?; + } + Ok(()) + } } } } diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index fba7b7717e..229c28b541 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -476,11 +476,27 @@ impl TypedExpr { let substrait_expr = s.value.as_ref().with_context(|| InvalidQuerySnafu { reason: "SingularOrList expression without value", })?; + let typed_expr = + TypedExpr::from_substrait_rex(substrait_expr, input_schema, extensions).await?; // Note that we didn't impl support to in list expr if !s.options.is_empty() { - return not_impl_err!("In list expression is not supported"); + let mut list = Vec::with_capacity(s.options.len()); + for opt in s.options.iter() { + let opt_expr = + TypedExpr::from_substrait_rex(opt, input_schema, extensions).await?; + list.push(opt_expr.expr); + } + let in_list_expr = ScalarExpr::InList { + expr: Box::new(typed_expr.expr), + list, + }; + Ok(TypedExpr::new( + in_list_expr, + ColumnType::new_nullable(CDT::boolean_datatype()), + )) + } else { + Ok(typed_expr) } - TypedExpr::from_substrait_rex(substrait_expr, input_schema, extensions).await } Some(RexType::Selection(field_ref)) => match &field_ref.reference_type { Some(DirectReference(direct)) => match &direct.reference_type.as_ref() { diff --git a/tests/cases/standalone/common/flow/flow_no_aggr.result b/tests/cases/standalone/common/flow/flow_no_aggr.result new file mode 100644 index 0000000000..94301ffb7f --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_no_aggr.result @@ -0,0 +1,133 @@ +CREATE TABLE base ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE TABLE sink ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE FLOW filter_out +SINK TO sink +AS +SELECT desc_str, ts FROM base +WHERE desc_str IN ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'); + +Affected Rows: 0 + +SELECT options FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'filter_out'; + ++---------------------------+ +| options | ++---------------------------+ +| {"flow_type":"streaming"} | ++---------------------------+ + +INSERT INTO base VALUES +('a', '2023-01-01 00:00:00'), +('j', '2023-01-01 00:00:09'), +('l', '2023-01-01 00:00:08'); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_out'); + ++--------------------------------+ +| ADMIN FLUSH_FLOW('filter_out') | ++--------------------------------+ +| FLOW_FLUSHED | ++--------------------------------+ + +SELECT * FROM sink ORDER BY ts; + ++----------+---------------------+ +| desc_str | ts | ++----------+---------------------+ +| a | 2023-01-01T00:00:00 | +| j | 2023-01-01T00:00:09 | ++----------+---------------------+ + +DROP FLOW filter_out; + +Affected Rows: 0 + +DROP TABLE base; + +Affected Rows: 0 + +DROP TABLE sink; + +Affected Rows: 0 + +CREATE TABLE base ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE TABLE sink ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE FLOW filter_out +SINK TO sink +AS +SELECT desc_str, ts FROM base +WHERE desc_str NOT IN ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'); + +Affected Rows: 0 + +SELECT options FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'filter_out'; + ++---------------------------+ +| options | ++---------------------------+ +| {"flow_type":"streaming"} | ++---------------------------+ + +INSERT INTO base VALUES +('a', '2023-01-01 00:00:00'), +('j', '2023-01-01 00:00:09'), +('l', '2023-01-01 00:00:08'); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_out'); + ++--------------------------------+ +| ADMIN FLUSH_FLOW('filter_out') | ++--------------------------------+ +| FLOW_FLUSHED | ++--------------------------------+ + +SELECT * FROM sink ORDER BY ts; + ++----------+---------------------+ +| desc_str | ts | ++----------+---------------------+ +| l | 2023-01-01T00:00:08 | ++----------+---------------------+ + +DROP FLOW filter_out; + +Affected Rows: 0 + +DROP TABLE base; + +Affected Rows: 0 + +DROP TABLE sink; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_no_aggr.sql b/tests/cases/standalone/common/flow/flow_no_aggr.sql new file mode 100644 index 0000000000..bb0ed65f78 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_no_aggr.sql @@ -0,0 +1,64 @@ +CREATE TABLE base ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +CREATE TABLE sink ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +CREATE FLOW filter_out +SINK TO sink +AS +SELECT desc_str, ts FROM base +WHERE desc_str IN ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'); + +SELECT options FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'filter_out'; + +INSERT INTO base VALUES +('a', '2023-01-01 00:00:00'), +('j', '2023-01-01 00:00:09'), +('l', '2023-01-01 00:00:08'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_out'); + +SELECT * FROM sink ORDER BY ts; + +DROP FLOW filter_out; +DROP TABLE base; +DROP TABLE sink; + +CREATE TABLE base ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +CREATE TABLE sink ( + desc_str STRING, + ts TIMESTAMP TIME INDEX +); + +CREATE FLOW filter_out +SINK TO sink +AS +SELECT desc_str, ts FROM base +WHERE desc_str NOT IN ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'); + +SELECT options FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'filter_out'; + +INSERT INTO base VALUES +('a', '2023-01-01 00:00:00'), +('j', '2023-01-01 00:00:09'), +('l', '2023-01-01 00:00:08'); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('filter_out'); + +SELECT * FROM sink ORDER BY ts; + +DROP FLOW filter_out; +DROP TABLE base; +DROP TABLE sink; +