diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 0bf7c4dea4..42ed61d4f5 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -20,6 +20,7 @@ use std::sync::{Arc, Mutex}; use bytes::BytesMut; use common_error::ext::BoxedError; use common_recordbatch::DfRecordBatch; +use common_telemetry::debug; use datafusion_physical_expr::PhysicalExpr; use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; @@ -211,7 +212,6 @@ impl DfScalarFunction { pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { // first eval exprs to construct values to feed to datafusion let values: Vec<_> = Self::eval_args(values, exprs)?; - if values.is_empty() { return InvalidArgumentSnafu { reason: "values is empty".to_string(), @@ -242,6 +242,7 @@ impl DfScalarFunction { } .build() })?; + let res = self.fn_impl.evaluate(&rb).map_err(|err| { EvalDatafusionSnafu { raw: err, @@ -296,6 +297,7 @@ impl RawDfScalarFn { .context(DecodeRelSnafu) .map_err(BoxedError::new) .context(crate::error::ExternalSnafu)?; + debug!("Decoded scalar function: {:?}", f); let input_schema = &self.input_schema; let extensions = &self.extensions; diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 524ab9b546..b2784b08bc 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -16,6 +16,7 @@ use std::sync::Arc; +use common_telemetry::debug; use datafusion_physical_expr::PhysicalExpr; use datatypes::data_type::ConcreteDataType as CDT; use snafu::{OptionExt, ResultExt}; @@ -127,11 +128,25 @@ pub(crate) fn proto_col(i: usize) -> substrait_proto::proto::FunctionArgument { } } +fn is_proto_literal(arg: &substrait_proto::proto::FunctionArgument) -> bool { + use substrait_proto::proto::expression; + matches!( + arg.arg_type.as_ref().unwrap(), + ArgType::Value(Expression { + rex_type: Some(expression::RexType::Literal(_)), + }) + ) +} + /// rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion +/// +/// specially, if a argument is a literal, the replacement will not happen fn rewrite_scalar_function(f: &ScalarFunction) -> ScalarFunction { let mut f_rewrite = f.clone(); for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() { - *raw_expr = proto_col(idx); + if !is_proto_literal(raw_expr) { + *raw_expr = proto_col(idx) + } } f_rewrite } @@ -144,9 +159,9 @@ impl TypedExpr { ) -> Result { let (arg_exprs, arg_types): (Vec<_>, Vec<_>) = arg_exprs_typed.into_iter().map(|e| (e.expr, e.typ)).unzip(); - + debug!("Before rewrite: {:?}", f); let f_rewrite = rewrite_scalar_function(f); - + debug!("After rewrite: {:?}", f_rewrite); let input_schema = RelationType::new(arg_types).into_unnamed(); let raw_fn = RawDfScalarFn::from_proto(&f_rewrite, input_schema.clone(), extensions.clone())?; diff --git a/tests/cases/standalone/flow/basic.result b/tests/cases/standalone/flow/basic.result index c9a3c7714a..1d480e2f22 100644 --- a/tests/cases/standalone/flow/basic.result +++ b/tests/cases/standalone/flow/basic.result @@ -59,3 +59,64 @@ DROP TABLE out_num_cnt; Affected Rows: 0 +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts); + +Affected Rows: 0 + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 3s +SELECT col_0, col_1 FROM out_num_cnt; + ++---------------------+-------+ +| col_0 | col_1 | ++---------------------+-------+ +| 2021-07-01T00:00:00 | 42 | ++---------------------+-------+ + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, col_1 FROM out_num_cnt; + ++---------------------+-------+ +| col_0 | col_1 | ++---------------------+-------+ +| 2021-07-01T00:00:00 | 42 | +| 2021-07-01T00:00:01 | 47 | ++---------------------+-------+ + +DROP FLOW test_numbers; + +Affected Rows: 0 + +DROP TABLE numbers_input; + +Affected Rows: 0 + +DROP TABLE out_num_cnt; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow/basic.sql b/tests/cases/standalone/flow/basic.sql index 9043875dd1..8c0c5d038e 100644 --- a/tests/cases/standalone/flow/basic.sql +++ b/tests/cases/standalone/flow/basic.sql @@ -29,3 +29,35 @@ SELECT col_0, window_start, window_end FROM out_num_cnt; DROP FLOW test_numbers; DROP TABLE numbers_input; DROP TABLE out_num_cnt; + +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts); + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS SLEEP 3s +SELECT col_0, col_1 FROM out_num_cnt; + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +-- SQLNESS SLEEP 2s +SELECT col_0, col_1 FROM out_num_cnt; + +DROP FLOW test_numbers; +DROP TABLE numbers_input; +DROP TABLE out_num_cnt;