fix: call df_func with literal (#4265)

* fix: call df_func with literal

* chore: rm dbg log forget to remove
This commit is contained in:
discord9
2024-07-05 14:21:22 +08:00
committed by GitHub
parent 9393a1c51e
commit c21e969329
4 changed files with 114 additions and 4 deletions

View File

@@ -20,6 +20,7 @@ use std::sync::{Arc, Mutex};
use bytes::BytesMut;
use common_error::ext::BoxedError;
use common_recordbatch::DfRecordBatch;
use common_telemetry::debug;
use datafusion_physical_expr::PhysicalExpr;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
@@ -211,7 +212,6 @@ impl DfScalarFunction {
pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result<Value, EvalError> {
// first eval exprs to construct values to feed to datafusion
let values: Vec<_> = Self::eval_args(values, exprs)?;
if values.is_empty() {
return InvalidArgumentSnafu {
reason: "values is empty".to_string(),
@@ -242,6 +242,7 @@ impl DfScalarFunction {
}
.build()
})?;
let res = self.fn_impl.evaluate(&rb).map_err(|err| {
EvalDatafusionSnafu {
raw: err,
@@ -296,6 +297,7 @@ impl RawDfScalarFn {
.context(DecodeRelSnafu)
.map_err(BoxedError::new)
.context(crate::error::ExternalSnafu)?;
debug!("Decoded scalar function: {:?}", f);
let input_schema = &self.input_schema;
let extensions = &self.extensions;

View File

@@ -16,6 +16,7 @@
use std::sync::Arc;
use common_telemetry::debug;
use datafusion_physical_expr::PhysicalExpr;
use datatypes::data_type::ConcreteDataType as CDT;
use snafu::{OptionExt, ResultExt};
@@ -127,11 +128,25 @@ pub(crate) fn proto_col(i: usize) -> substrait_proto::proto::FunctionArgument {
}
}
fn is_proto_literal(arg: &substrait_proto::proto::FunctionArgument) -> bool {
use substrait_proto::proto::expression;
matches!(
arg.arg_type.as_ref().unwrap(),
ArgType::Value(Expression {
rex_type: Some(expression::RexType::Literal(_)),
})
)
}
/// rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion
///
/// specially, if a argument is a literal, the replacement will not happen
fn rewrite_scalar_function(f: &ScalarFunction) -> ScalarFunction {
let mut f_rewrite = f.clone();
for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() {
*raw_expr = proto_col(idx);
if !is_proto_literal(raw_expr) {
*raw_expr = proto_col(idx)
}
}
f_rewrite
}
@@ -144,9 +159,9 @@ impl TypedExpr {
) -> Result<TypedExpr, Error> {
let (arg_exprs, arg_types): (Vec<_>, Vec<_>) =
arg_exprs_typed.into_iter().map(|e| (e.expr, e.typ)).unzip();
debug!("Before rewrite: {:?}", f);
let f_rewrite = rewrite_scalar_function(f);
debug!("After rewrite: {:?}", f_rewrite);
let input_schema = RelationType::new(arg_types).into_unnamed();
let raw_fn =
RawDfScalarFn::from_proto(&f_rewrite, input_schema.clone(), extensions.clone())?;

View File

@@ -59,3 +59,64 @@ DROP TABLE out_num_cnt;
Affected Rows: 0
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts);
Affected Rows: 0
INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- SQLNESS SLEEP 3s
SELECT col_0, col_1 FROM out_num_cnt;
+---------------------+-------+
| col_0 | col_1 |
+---------------------+-------+
| 2021-07-01T00:00:00 | 42 |
+---------------------+-------+
INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS SLEEP 2s
SELECT col_0, col_1 FROM out_num_cnt;
+---------------------+-------+
| col_0 | col_1 |
+---------------------+-------+
| 2021-07-01T00:00:00 | 42 |
| 2021-07-01T00:00:01 | 47 |
+---------------------+-------+
DROP FLOW test_numbers;
Affected Rows: 0
DROP TABLE numbers_input;
Affected Rows: 0
DROP TABLE out_num_cnt;
Affected Rows: 0

View File

@@ -29,3 +29,35 @@ SELECT col_0, window_start, window_end FROM out_num_cnt;
DROP FLOW test_numbers;
DROP TABLE numbers_input;
DROP TABLE out_num_cnt;
CREATE TABLE numbers_input (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_numbers
SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts), sum(number) FROM numbers_input GROUP BY date_trunc('second', ts);
INSERT INTO numbers_input
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
-- SQLNESS SLEEP 3s
SELECT col_0, col_1 FROM out_num_cnt;
INSERT INTO numbers_input
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
-- SQLNESS SLEEP 2s
SELECT col_0, col_1 FROM out_num_cnt;
DROP FLOW test_numbers;
DROP TABLE numbers_input;
DROP TABLE out_num_cnt;