Files
greptimedb/tests/cases/standalone/common/flow/flow_null.result
discord9 a0900f5b90 feat(flow): use batching mode&fix sqlness (#5903)
* feat: use flow batching engine

broken: try using logical plan

fix: use dummy catalog for logical plan

fix: insert plan exec&sqlness grpc addr

feat: use frontend instance in flownode in standalone

feat: flow type in metasrv&fix: flush flow out of sync& column name alias

tests: sqlness update

tests: sqlness flow rebuild udpate

chore: per review

refactor: keep chnl mgr

refactor: use catalog mgr for get table

tests: use valid sql

fix: add more check

refactor: put flow type determine to frontend

* chore: update proto

* chore: update proto to main branch

* fix: add locks for create/drop flow&docs: update docs

* feat: flush_flow flush all ranges now

* test: add align time window test

* docs: explain `nodeid` use in check task

* refactor: AddAutoColumnRewriter check for Projection

* refactor: per review

* fix: query without time window also clean dirty time window

* chore: better logging

* chore: add comments per review

* refactor: per review

* chore: per review

* chore: per review rename args

* refactor: per review partially

* chore: update docs

* chore: use better error variant

* chore: better error variant

* refactor: rename FlowWorkerManager to FlowStreamingEngine

* rename again

* refactor: per review

* chore: rebase after #5963 merged

* refactor: rename all flow_worker_manager occurs

* docs: rm resolved TODO
2025-04-23 15:12:16 +00:00

339 lines
8.0 KiB
Plaintext

-- test null handling in flow
-- test null handling in value part of key-value pair
CREATE TABLE requests (
service_name STRING,
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE sum_val_in_reqs (
sum_val INT64,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE FLOW requests_long_term SINK TO sum_val_in_reqs AS
SELECT
sum(val) as sum_val,
date_bin(INTERVAL '30 seconds', ts) as time_window,
FROM
requests
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
requests
VALUES
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
(NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"),
(NULL, "10.0.0.2", NULL, "2024-10-18 19:01:01"),
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");
Affected Rows: 8
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('requests_long_term');
+----------------------------------------+
| ADMIN FLUSH_FLOW('requests_long_term') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT
*
FROM
sum_val_in_reqs;
+---------+---------------------+
| sum_val | ts |
+---------+---------------------+
| 100 | 2024-10-18T19:00:00 |
| 200 | 2024-10-18T19:00:30 |
| 300 | 2024-10-18T19:01:00 |
| 600 | 2024-10-18T19:01:30 |
+---------+---------------------+
-- Test if FLOWS table works, but don't care about the result since it vary from runs
SELECT
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
FROM
INFORMATION_SCHEMA.FLOWS;
+--------------+
| active_flows |
+--------------+
| 1 |
+--------------+
DROP FLOW requests_long_term;
Affected Rows: 0
DROP TABLE sum_val_in_reqs;
Affected Rows: 0
DROP TABLE requests;
Affected Rows: 0
-- test null handling in key part of key-value pair
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
client,
country as 'country',
count(1) as country_count,
date_bin(INTERVAL '1 hour', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
client,
country,
time_window;
Affected Rows: 0
INSERT INTO
ngx_access_log
VALUES
("cli1", null, 0),
("cli1", null, 0),
("cli2", null, 0),
("cli2", null, 1),
("cli1", "b", 0),
("cli1", "c", 0);
Affected Rows: 6
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
client,
country,
country_count,
time_window
FROM
ngx_country;
+--------+---------+---------------+---------------------+
| client | country | country_count | time_window |
+--------+---------+---------------+---------------------+
| cli1 | | 2 | 1970-01-01T00:00:00 |
| cli1 | b | 1 | 1970-01-01T00:00:00 |
| cli1 | c | 1 | 1970-01-01T00:00:00 |
| cli2 | | 2 | 1970-01-01T00:00:00 |
+--------+---------+---------------+---------------------+
-- making sure distinct is working
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 1);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
client,
country,
country_count,
time_window
FROM
ngx_country;
+--------+---------+---------------+---------------------+
| client | country | country_count | time_window |
+--------+---------+---------------+---------------------+
| cli1 | | 2 | 1970-01-01T00:00:00 |
| cli1 | b | 2 | 1970-01-01T00:00:00 |
| cli1 | c | 1 | 1970-01-01T00:00:00 |
| cli2 | | 2 | 1970-01-01T00:00:00 |
+--------+---------+---------------+---------------------+
INSERT INTO
ngx_access_log
VALUES
("cli1", "c", 2);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
client,
country,
country_count,
time_window
FROM
ngx_country;
+--------+---------+---------------+---------------------+
| client | country | country_count | time_window |
+--------+---------+---------------+---------------------+
| cli1 | | 2 | 1970-01-01T00:00:00 |
| cli1 | b | 2 | 1970-01-01T00:00:00 |
| cli1 | c | 2 | 1970-01-01T00:00:00 |
| cli2 | | 2 | 1970-01-01T00:00:00 |
+--------+---------+---------------+---------------------+
DROP FLOW calc_ngx_country;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
DROP TABLE ngx_country;
Affected Rows: 0
-- test nullable pk with no default value
CREATE TABLE nullable_pk (
pid INT NULL,
client STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (pid)
) WITH (
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE out_nullable_pk (
pid INT NULL,
client STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (pid, client)
);
Affected Rows: 0
CREATE FLOW calc_nullable_pk SINK TO out_nullable_pk AS
SELECT
pid,
client,
ts
FROM
nullable_pk;
Affected Rows: 0
INSERT INTO
nullable_pk
VALUES
(1, "name1", "2024-10-18 19:00:00"),
(2, "name2", "2024-10-18 19:00:00"),
(3, "name3", "2024-10-18 19:00:00");
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_nullable_pk');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_nullable_pk') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT * FROM out_nullable_pk;
+-----+--------+---------------------+
| pid | client | ts |
+-----+--------+---------------------+
| 1 | name1 | 2024-10-18T19:00:00 |
| 2 | name2 | 2024-10-18T19:00:00 |
| 3 | name3 | 2024-10-18T19:00:00 |
+-----+--------+---------------------+
-- pk is nullable
INSERT INTO
nullable_pk (client, ts)
VALUES
("name1", "2024-10-18 19:00:00"),
("name2", "2024-10-18 19:00:00"),
("name3", "2024-10-18 19:00:00");
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_nullable_pk');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_nullable_pk') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT * FROM out_nullable_pk;
+-----+--------+---------------------+
| pid | client | ts |
+-----+--------+---------------------+
| | name1 | 2024-10-18T19:00:00 |
| | name2 | 2024-10-18T19:00:00 |
| | name3 | 2024-10-18T19:00:00 |
| 1 | name1 | 2024-10-18T19:00:00 |
| 2 | name2 | 2024-10-18T19:00:00 |
| 3 | name3 | 2024-10-18T19:00:00 |
+-----+--------+---------------------+
DROP FLOW calc_nullable_pk;
Affected Rows: 0
DROP TABLE nullable_pk;
Affected Rows: 0
DROP TABLE out_nullable_pk;
Affected Rows: 0