feat(flow): flow streaming mode in list expr support (#6229)

* feat: flow streaming in list support

* chore: per review

* chore: per review

* fix: expr correct type
This commit is contained in:
discord9
2025-06-04 16:05:20 +08:00
committed by GitHub
parent 744a754246
commit cbafb6e00b
4 changed files with 297 additions and 3 deletions

View File

@@ -15,9 +15,14 @@
//! Scalar expressions.
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use arrow::array::{make_array, ArrayData, ArrayRef};
use arrow::array::{make_array, ArrayData, ArrayRef, BooleanArray};
use arrow::buffer::BooleanBuffer;
use arrow::compute::or_kleene;
use common_error::ext::BoxedError;
use datafusion::physical_expr_common::datum::compare_with_eq;
use datafusion_common::DataFusionError;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::value::Value;
use datatypes::vectors::{BooleanVector, Helper, VectorRef};
@@ -92,6 +97,10 @@ pub enum ScalarExpr {
then: Box<ScalarExpr>,
els: Box<ScalarExpr>,
},
InList {
expr: Box<ScalarExpr>,
list: Vec<ScalarExpr>,
},
}
impl ScalarExpr {
@@ -137,6 +146,7 @@ impl ScalarExpr {
.context(crate::error::ExternalSnafu)?;
Ok(ColumnType::new_nullable(typ))
}
ScalarExpr::InList { expr, .. } => expr.typ(context),
}
}
}
@@ -222,9 +232,57 @@ impl ScalarExpr {
exprs,
} => df_scalar_fn.eval_batch(batch, exprs),
ScalarExpr::If { cond, then, els } => Self::eval_if_then(batch, cond, then, els),
ScalarExpr::InList { expr, list } => Self::eval_in_list(batch, expr, list),
}
}
fn eval_in_list(
batch: &Batch,
expr: &ScalarExpr,
list: &[ScalarExpr],
) -> Result<VectorRef, EvalError> {
let eval_list = list
.iter()
.map(|e| e.eval_batch(batch))
.collect::<Result<Vec<_>, _>>()?;
let eval_expr = expr.eval_batch(batch)?;
ensure!(
eval_list
.iter()
.all(|v| v.data_type() == eval_expr.data_type()),
TypeMismatchSnafu {
expected: eval_expr.data_type(),
actual: eval_list
.iter()
.find(|v| v.data_type() != eval_expr.data_type())
.map(|v| v.data_type())
.unwrap(),
}
);
let lhs = eval_expr.to_arrow_array();
let found = eval_list
.iter()
.map(|v| v.to_arrow_array())
.try_fold(
BooleanArray::new(BooleanBuffer::new_unset(batch.row_count()), None),
|result, in_list_elem| -> Result<BooleanArray, DataFusionError> {
let rhs = compare_with_eq(&lhs, &in_list_elem, false)?;
Ok(or_kleene(&result, &rhs)?)
},
)
.with_context(|_| crate::expr::error::DatafusionSnafu {
context: "Failed to compare eval_expr with eval_list",
})?;
let res = BooleanVector::from(found);
Ok(Arc::new(res))
}
/// NOTE: this if then eval impl assume all given expr are pure, and will not change the state of the world
/// since it will evaluate both then and else branch and filter the result
fn eval_if_then(
@@ -337,6 +395,15 @@ impl ScalarExpr {
df_scalar_fn,
exprs,
} => df_scalar_fn.eval(values, exprs),
ScalarExpr::InList { expr, list } => {
let eval_expr = expr.eval(values)?;
let eval_list = list
.iter()
.map(|v| v.eval(values))
.collect::<Result<Vec<_>, _>>()?;
let found = eval_list.iter().any(|item| *item == eval_expr);
Ok(Value::Boolean(found))
}
}
}
@@ -514,6 +581,13 @@ impl ScalarExpr {
}
Ok(())
}
ScalarExpr::InList { expr, list } => {
f(expr)?;
for item in list {
f(item)?;
}
Ok(())
}
}
}
@@ -558,6 +632,13 @@ impl ScalarExpr {
}
Ok(())
}
ScalarExpr::InList { expr, list } => {
f(expr)?;
for item in list {
f(item)?;
}
Ok(())
}
}
}
}

View File

@@ -476,11 +476,27 @@ impl TypedExpr {
let substrait_expr = s.value.as_ref().with_context(|| InvalidQuerySnafu {
reason: "SingularOrList expression without value",
})?;
let typed_expr =
TypedExpr::from_substrait_rex(substrait_expr, input_schema, extensions).await?;
// Note that we didn't impl support to in list expr
if !s.options.is_empty() {
return not_impl_err!("In list expression is not supported");
let mut list = Vec::with_capacity(s.options.len());
for opt in s.options.iter() {
let opt_expr =
TypedExpr::from_substrait_rex(opt, input_schema, extensions).await?;
list.push(opt_expr.expr);
}
let in_list_expr = ScalarExpr::InList {
expr: Box::new(typed_expr.expr),
list,
};
Ok(TypedExpr::new(
in_list_expr,
ColumnType::new_nullable(CDT::boolean_datatype()),
))
} else {
Ok(typed_expr)
}
TypedExpr::from_substrait_rex(substrait_expr, input_schema, extensions).await
}
Some(RexType::Selection(field_ref)) => match &field_ref.reference_type {
Some(DirectReference(direct)) => match &direct.reference_type.as_ref() {

View File

@@ -0,0 +1,133 @@
CREATE TABLE base (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE TABLE sink (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE FLOW filter_out
SINK TO sink
AS
SELECT desc_str, ts FROM base
WHERE desc_str IN ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j');
Affected Rows: 0
SELECT options FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'filter_out';
+---------------------------+
| options |
+---------------------------+
| {"flow_type":"streaming"} |
+---------------------------+
INSERT INTO base VALUES
('a', '2023-01-01 00:00:00'),
('j', '2023-01-01 00:00:09'),
('l', '2023-01-01 00:00:08');
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_out');
+--------------------------------+
| ADMIN FLUSH_FLOW('filter_out') |
+--------------------------------+
| FLOW_FLUSHED |
+--------------------------------+
SELECT * FROM sink ORDER BY ts;
+----------+---------------------+
| desc_str | ts |
+----------+---------------------+
| a | 2023-01-01T00:00:00 |
| j | 2023-01-01T00:00:09 |
+----------+---------------------+
DROP FLOW filter_out;
Affected Rows: 0
DROP TABLE base;
Affected Rows: 0
DROP TABLE sink;
Affected Rows: 0
CREATE TABLE base (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE TABLE sink (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE FLOW filter_out
SINK TO sink
AS
SELECT desc_str, ts FROM base
WHERE desc_str NOT IN ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j');
Affected Rows: 0
SELECT options FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'filter_out';
+---------------------------+
| options |
+---------------------------+
| {"flow_type":"streaming"} |
+---------------------------+
INSERT INTO base VALUES
('a', '2023-01-01 00:00:00'),
('j', '2023-01-01 00:00:09'),
('l', '2023-01-01 00:00:08');
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_out');
+--------------------------------+
| ADMIN FLUSH_FLOW('filter_out') |
+--------------------------------+
| FLOW_FLUSHED |
+--------------------------------+
SELECT * FROM sink ORDER BY ts;
+----------+---------------------+
| desc_str | ts |
+----------+---------------------+
| l | 2023-01-01T00:00:08 |
+----------+---------------------+
DROP FLOW filter_out;
Affected Rows: 0
DROP TABLE base;
Affected Rows: 0
DROP TABLE sink;
Affected Rows: 0

View File

@@ -0,0 +1,64 @@
CREATE TABLE base (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
CREATE TABLE sink (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
CREATE FLOW filter_out
SINK TO sink
AS
SELECT desc_str, ts FROM base
WHERE desc_str IN ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j');
SELECT options FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'filter_out';
INSERT INTO base VALUES
('a', '2023-01-01 00:00:00'),
('j', '2023-01-01 00:00:09'),
('l', '2023-01-01 00:00:08');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_out');
SELECT * FROM sink ORDER BY ts;
DROP FLOW filter_out;
DROP TABLE base;
DROP TABLE sink;
CREATE TABLE base (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
CREATE TABLE sink (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
CREATE FLOW filter_out
SINK TO sink
AS
SELECT desc_str, ts FROM base
WHERE desc_str NOT IN ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j');
SELECT options FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'filter_out';
INSERT INTO base VALUES
('a', '2023-01-01 00:00:00'),
('j', '2023-01-01 00:00:09'),
('l', '2023-01-01 00:00:08');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_out');
SELECT * FROM sink ORDER BY ts;
DROP FLOW filter_out;
DROP TABLE base;
DROP TABLE sink;