mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
fix: flow compare null values (#5234)
* fix: flow compare null values * fix: fix again ck ty before cmp * chore: rm comment * fix: handle null * chore: typo * docs: update comment * refactor: per review * tests: more sqlness * tests: sqlness not show create table
This commit is contained in:
@@ -16,6 +16,7 @@ use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::new_null_array;
|
||||
use common_telemetry::trace;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::DataType;
|
||||
@@ -398,20 +399,54 @@ fn reduce_batch_subgraph(
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: here reduce numbers of eq to minimal by keeping slicing key/val batch
|
||||
let key_data_types = output_type
|
||||
.column_types
|
||||
.iter()
|
||||
.map(|t| t.scalar_type.clone())
|
||||
.collect_vec();
|
||||
|
||||
// TODO(discord9): here reduce numbers of eq to minimal by keeping slicing key/val batch
|
||||
for key_row in distinct_keys {
|
||||
let key_scalar_value = {
|
||||
let mut key_scalar_value = Vec::with_capacity(key_row.len());
|
||||
for key in key_row.iter() {
|
||||
for (key_idx, key) in key_row.iter().enumerate() {
|
||||
let v =
|
||||
key.try_to_scalar_value(&key.data_type())
|
||||
.context(DataTypeSnafu {
|
||||
msg: "can't convert key values to datafusion value",
|
||||
})?;
|
||||
let arrow_value =
|
||||
|
||||
let key_data_type = key_data_types.get(key_idx).context(InternalSnafu {
|
||||
reason: format!(
|
||||
"Key index out of bound, expected at most {} but got {}",
|
||||
output_type.column_types.len(),
|
||||
key_idx
|
||||
),
|
||||
})?;
|
||||
|
||||
// if incoming value's datatype is null, it need to be handled specially, see below
|
||||
if key_data_type.as_arrow_type() != v.data_type()
|
||||
&& !v.data_type().is_null()
|
||||
{
|
||||
crate::expr::error::InternalSnafu {
|
||||
reason: format!(
|
||||
"Key data type mismatch, expected {:?} but got {:?}",
|
||||
key_data_type.as_arrow_type(),
|
||||
v.data_type()
|
||||
),
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
|
||||
// handle single null key
|
||||
let arrow_value = if v.data_type().is_null() {
|
||||
let ret = new_null_array(&arrow::datatypes::DataType::Null, 1);
|
||||
arrow::array::Scalar::new(ret)
|
||||
} else {
|
||||
v.to_scalar().context(crate::expr::error::DatafusionSnafu {
|
||||
context: "can't convert key values to arrow value",
|
||||
})?;
|
||||
})?
|
||||
};
|
||||
key_scalar_value.push(arrow_value);
|
||||
}
|
||||
key_scalar_value
|
||||
@@ -423,7 +458,19 @@ fn reduce_batch_subgraph(
|
||||
.zip(key_batch.batch().iter())
|
||||
.map(|(key, col)| {
|
||||
// TODO(discord9): this takes half of the cpu! And this is redundant amount of `eq`!
|
||||
arrow::compute::kernels::cmp::eq(&key, &col.to_arrow_array().as_ref() as _)
|
||||
|
||||
// note that if lhs is a null, we still need to get all rows that are null! But can't use `eq` since
|
||||
// it will return null if input have null, so we need to use `is_null` instead
|
||||
if arrow::array::Datum::get(&key).0.data_type().is_null() {
|
||||
arrow::compute::kernels::boolean::is_null(
|
||||
col.to_arrow_array().as_ref() as _
|
||||
)
|
||||
} else {
|
||||
arrow::compute::kernels::cmp::eq(
|
||||
&key,
|
||||
&col.to_arrow_array().as_ref() as _,
|
||||
)
|
||||
}
|
||||
})
|
||||
.try_collect::<_, Vec<_>, _>()
|
||||
.context(ArrowSnafu {
|
||||
|
||||
231
tests/cases/standalone/common/flow/flow_null.result
Normal file
231
tests/cases/standalone/common/flow/flow_null.result
Normal file
@@ -0,0 +1,231 @@
|
||||
-- test null handling in flow
|
||||
-- test null handling in value part of key-value pair
|
||||
CREATE TABLE requests (
|
||||
service_name STRING,
|
||||
service_ip STRING,
|
||||
val INT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE sum_val_in_reqs (
|
||||
sum_val INT64,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE FLOW requests_long_term SINK TO sum_val_in_reqs AS
|
||||
SELECT
|
||||
sum(val) as sum_val,
|
||||
date_bin(INTERVAL '30 seconds', ts) as time_window,
|
||||
FROM
|
||||
requests
|
||||
GROUP BY
|
||||
time_window;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO
|
||||
requests
|
||||
VALUES
|
||||
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:00"),
|
||||
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
|
||||
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:30"),
|
||||
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
|
||||
(NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"),
|
||||
(NULL, "10.0.0.2", NULL, "2024-10-18 19:01:01"),
|
||||
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
|
||||
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");
|
||||
|
||||
Affected Rows: 8
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('requests_long_term');
|
||||
|
||||
+----------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('requests_long_term') |
|
||||
+----------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+----------------------------------------+
|
||||
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
sum_val_in_reqs;
|
||||
|
||||
+---------+---------------------+
|
||||
| sum_val | ts |
|
||||
+---------+---------------------+
|
||||
| 100 | 2024-10-18T19:00:00 |
|
||||
| 200 | 2024-10-18T19:00:30 |
|
||||
| 300 | 2024-10-18T19:01:00 |
|
||||
| 600 | 2024-10-18T19:01:30 |
|
||||
+---------+---------------------+
|
||||
|
||||
-- Test if FLOWS table works, but don't care about the result since it vary from runs
|
||||
SELECT
|
||||
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
|
||||
FROM
|
||||
INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
+--------------+
|
||||
| active_flows |
|
||||
+--------------+
|
||||
| 1 |
|
||||
+--------------+
|
||||
|
||||
DROP FLOW requests_long_term;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE sum_val_in_reqs;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE requests;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- test null handling in key part of key-value pair
|
||||
CREATE TABLE ngx_access_log (
|
||||
client STRING,
|
||||
country STRING,
|
||||
access_time TIMESTAMP TIME INDEX
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
|
||||
SELECT
|
||||
client,
|
||||
country as 'country',
|
||||
count(1) as country_count,
|
||||
date_bin(INTERVAL '1 hour', access_time) as time_window,
|
||||
FROM
|
||||
ngx_access_log
|
||||
GROUP BY
|
||||
client,
|
||||
country,
|
||||
time_window;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO
|
||||
ngx_access_log
|
||||
VALUES
|
||||
("cli1", null, 0),
|
||||
("cli1", null, 0),
|
||||
("cli2", null, 0),
|
||||
("cli2", null, 1),
|
||||
("cli1", "b", 0),
|
||||
("cli1", "c", 0);
|
||||
|
||||
Affected Rows: 6
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('calc_ngx_country');
|
||||
|
||||
+--------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('calc_ngx_country') |
|
||||
+--------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+--------------------------------------+
|
||||
|
||||
SELECT
|
||||
"ngx_access_log.client",
|
||||
"ngx_access_log.country",
|
||||
country_count,
|
||||
time_window
|
||||
FROM
|
||||
ngx_country;
|
||||
|
||||
+-----------------------+------------------------+---------------+---------------------+
|
||||
| ngx_access_log.client | ngx_access_log.country | country_count | time_window |
|
||||
+-----------------------+------------------------+---------------+---------------------+
|
||||
| cli1 | | 2 | 1970-01-01T00:00:00 |
|
||||
| cli1 | b | 1 | 1970-01-01T00:00:00 |
|
||||
| cli1 | c | 1 | 1970-01-01T00:00:00 |
|
||||
| cli2 | | 2 | 1970-01-01T00:00:00 |
|
||||
+-----------------------+------------------------+---------------+---------------------+
|
||||
|
||||
-- making sure distinct is working
|
||||
INSERT INTO
|
||||
ngx_access_log
|
||||
VALUES
|
||||
("cli1", "b", 1);
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('calc_ngx_country');
|
||||
|
||||
+--------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('calc_ngx_country') |
|
||||
+--------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+--------------------------------------+
|
||||
|
||||
SELECT
|
||||
"ngx_access_log.client",
|
||||
"ngx_access_log.country",
|
||||
country_count,
|
||||
time_window
|
||||
FROM
|
||||
ngx_country;
|
||||
|
||||
+-----------------------+------------------------+---------------+---------------------+
|
||||
| ngx_access_log.client | ngx_access_log.country | country_count | time_window |
|
||||
+-----------------------+------------------------+---------------+---------------------+
|
||||
| cli1 | | 2 | 1970-01-01T00:00:00 |
|
||||
| cli1 | b | 2 | 1970-01-01T00:00:00 |
|
||||
| cli1 | c | 1 | 1970-01-01T00:00:00 |
|
||||
| cli2 | | 2 | 1970-01-01T00:00:00 |
|
||||
+-----------------------+------------------------+---------------+---------------------+
|
||||
|
||||
INSERT INTO
|
||||
ngx_access_log
|
||||
VALUES
|
||||
("cli1", "c", 2);
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('calc_ngx_country');
|
||||
|
||||
+--------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('calc_ngx_country') |
|
||||
+--------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+--------------------------------------+
|
||||
|
||||
SELECT
|
||||
"ngx_access_log.client",
|
||||
"ngx_access_log.country",
|
||||
country_count,
|
||||
time_window
|
||||
FROM
|
||||
ngx_country;
|
||||
|
||||
+-----------------------+------------------------+---------------+---------------------+
|
||||
| ngx_access_log.client | ngx_access_log.country | country_count | time_window |
|
||||
+-----------------------+------------------------+---------------+---------------------+
|
||||
| cli1 | | 2 | 1970-01-01T00:00:00 |
|
||||
| cli1 | b | 2 | 1970-01-01T00:00:00 |
|
||||
| cli1 | c | 2 | 1970-01-01T00:00:00 |
|
||||
| cli2 | | 2 | 1970-01-01T00:00:00 |
|
||||
+-----------------------+------------------------+---------------+---------------------+
|
||||
|
||||
DROP FLOW calc_ngx_country;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE ngx_access_log;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE ngx_country;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
135
tests/cases/standalone/common/flow/flow_null.sql
Normal file
135
tests/cases/standalone/common/flow/flow_null.sql
Normal file
@@ -0,0 +1,135 @@
|
||||
-- test null handling in flow
|
||||
|
||||
-- test null handling in value part of key-value pair
|
||||
CREATE TABLE requests (
|
||||
service_name STRING,
|
||||
service_ip STRING,
|
||||
val INT,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
);
|
||||
|
||||
CREATE TABLE sum_val_in_reqs (
|
||||
sum_val INT64,
|
||||
ts TIMESTAMP TIME INDEX
|
||||
);
|
||||
|
||||
CREATE FLOW requests_long_term SINK TO sum_val_in_reqs AS
|
||||
SELECT
|
||||
sum(val) as sum_val,
|
||||
date_bin(INTERVAL '30 seconds', ts) as time_window,
|
||||
FROM
|
||||
requests
|
||||
GROUP BY
|
||||
time_window;
|
||||
|
||||
INSERT INTO
|
||||
requests
|
||||
VALUES
|
||||
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:00"),
|
||||
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
|
||||
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:30"),
|
||||
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
|
||||
(NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"),
|
||||
(NULL, "10.0.0.2", NULL, "2024-10-18 19:01:01"),
|
||||
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
|
||||
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('requests_long_term');
|
||||
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
sum_val_in_reqs;
|
||||
|
||||
-- Test if FLOWS table works, but don't care about the result since it vary from runs
|
||||
SELECT
|
||||
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
|
||||
FROM
|
||||
INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
DROP FLOW requests_long_term;
|
||||
|
||||
DROP TABLE sum_val_in_reqs;
|
||||
|
||||
DROP TABLE requests;
|
||||
|
||||
-- test null handling in key part of key-value pair
|
||||
CREATE TABLE ngx_access_log (
|
||||
client STRING,
|
||||
country STRING,
|
||||
access_time TIMESTAMP TIME INDEX
|
||||
);
|
||||
|
||||
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
|
||||
SELECT
|
||||
client,
|
||||
country as 'country',
|
||||
count(1) as country_count,
|
||||
date_bin(INTERVAL '1 hour', access_time) as time_window,
|
||||
FROM
|
||||
ngx_access_log
|
||||
GROUP BY
|
||||
client,
|
||||
country,
|
||||
time_window;
|
||||
|
||||
INSERT INTO
|
||||
ngx_access_log
|
||||
VALUES
|
||||
("cli1", null, 0),
|
||||
("cli1", null, 0),
|
||||
("cli2", null, 0),
|
||||
("cli2", null, 1),
|
||||
("cli1", "b", 0),
|
||||
("cli1", "c", 0);
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('calc_ngx_country');
|
||||
|
||||
SELECT
|
||||
"ngx_access_log.client",
|
||||
"ngx_access_log.country",
|
||||
country_count,
|
||||
time_window
|
||||
FROM
|
||||
ngx_country;
|
||||
|
||||
-- making sure distinct is working
|
||||
INSERT INTO
|
||||
ngx_access_log
|
||||
VALUES
|
||||
("cli1", "b", 1);
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('calc_ngx_country');
|
||||
|
||||
SELECT
|
||||
"ngx_access_log.client",
|
||||
"ngx_access_log.country",
|
||||
country_count,
|
||||
time_window
|
||||
FROM
|
||||
ngx_country;
|
||||
|
||||
INSERT INTO
|
||||
ngx_access_log
|
||||
VALUES
|
||||
("cli1", "c", 2);
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('calc_ngx_country');
|
||||
|
||||
SELECT
|
||||
"ngx_access_log.client",
|
||||
"ngx_access_log.country",
|
||||
country_count,
|
||||
time_window
|
||||
FROM
|
||||
ngx_country;
|
||||
|
||||
DROP FLOW calc_ngx_country;
|
||||
|
||||
DROP TABLE ngx_access_log;
|
||||
|
||||
DROP TABLE ngx_country;
|
||||
Reference in New Issue
Block a user