mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 12:52:57 +00:00
* feat: use flow batching engine broken: try using logical plan fix: use dummy catalog for logical plan fix: insert plan exec&sqlness grpc addr feat: use frontend instance in flownode in standalone feat: flow type in metasrv&fix: flush flow out of sync& column name alias tests: sqlness update tests: sqlness flow rebuild udpate chore: per review refactor: keep chnl mgr refactor: use catalog mgr for get table tests: use valid sql fix: add more check refactor: put flow type determine to frontend * chore: update proto * chore: update proto to main branch * fix: add locks for create/drop flow&docs: update docs * feat: flush_flow flush all ranges now * test: add align time window test * docs: explain `nodeid` use in check task * refactor: AddAutoColumnRewriter check for Projection * refactor: per review * fix: query without time window also clean dirty time window * chore: better logging * chore: add comments per review * refactor: per review * chore: per review * chore: per review rename args * refactor: per review partially * chore: update docs * chore: use better error variant * chore: better error variant * refactor: rename FlowWorkerManager to FlowStreamingEngine * rename again * refactor: per review * chore: rebase after #5963 merged * refactor: rename all flow_worker_manager occurs * docs: rm resolved TODO
339 lines
8.0 KiB
Plaintext
339 lines
8.0 KiB
Plaintext
-- test null handling in flow
|
|
-- test null handling in value part of key-value pair
|
|
CREATE TABLE requests (
|
|
service_name STRING,
|
|
service_ip STRING,
|
|
val INT,
|
|
ts TIMESTAMP TIME INDEX
|
|
)WITH(
|
|
append_mode = 'true'
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE TABLE sum_val_in_reqs (
|
|
sum_val INT64,
|
|
ts TIMESTAMP TIME INDEX
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE FLOW requests_long_term SINK TO sum_val_in_reqs AS
|
|
SELECT
|
|
sum(val) as sum_val,
|
|
date_bin(INTERVAL '30 seconds', ts) as time_window,
|
|
FROM
|
|
requests
|
|
GROUP BY
|
|
time_window;
|
|
|
|
Affected Rows: 0
|
|
|
|
INSERT INTO
|
|
requests
|
|
VALUES
|
|
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:00"),
|
|
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
|
|
(NULL, "10.0.0.1", NULL, "2024-10-18 19:00:30"),
|
|
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
|
|
(NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"),
|
|
(NULL, "10.0.0.2", NULL, "2024-10-18 19:01:01"),
|
|
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
|
|
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");
|
|
|
|
Affected Rows: 8
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('requests_long_term');
|
|
|
|
+----------------------------------------+
|
|
| ADMIN FLUSH_FLOW('requests_long_term') |
|
|
+----------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+----------------------------------------+
|
|
|
|
SELECT
|
|
*
|
|
FROM
|
|
sum_val_in_reqs;
|
|
|
|
+---------+---------------------+
|
|
| sum_val | ts |
|
|
+---------+---------------------+
|
|
| 100 | 2024-10-18T19:00:00 |
|
|
| 200 | 2024-10-18T19:00:30 |
|
|
| 300 | 2024-10-18T19:01:00 |
|
|
| 600 | 2024-10-18T19:01:30 |
|
|
+---------+---------------------+
|
|
|
|
-- Test if FLOWS table works, but don't care about the result since it vary from runs
|
|
SELECT
|
|
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
|
|
FROM
|
|
INFORMATION_SCHEMA.FLOWS;
|
|
|
|
+--------------+
|
|
| active_flows |
|
|
+--------------+
|
|
| 1 |
|
|
+--------------+
|
|
|
|
DROP FLOW requests_long_term;
|
|
|
|
Affected Rows: 0
|
|
|
|
DROP TABLE sum_val_in_reqs;
|
|
|
|
Affected Rows: 0
|
|
|
|
DROP TABLE requests;
|
|
|
|
Affected Rows: 0
|
|
|
|
-- test null handling in key part of key-value pair
|
|
CREATE TABLE ngx_access_log (
|
|
client STRING,
|
|
country STRING,
|
|
access_time TIMESTAMP TIME INDEX
|
|
)WITH(
|
|
append_mode = 'true'
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
|
|
SELECT
|
|
client,
|
|
country as 'country',
|
|
count(1) as country_count,
|
|
date_bin(INTERVAL '1 hour', access_time) as time_window,
|
|
FROM
|
|
ngx_access_log
|
|
GROUP BY
|
|
client,
|
|
country,
|
|
time_window;
|
|
|
|
Affected Rows: 0
|
|
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", null, 0),
|
|
("cli1", null, 0),
|
|
("cli2", null, 0),
|
|
("cli2", null, 1),
|
|
("cli1", "b", 0),
|
|
("cli1", "c", 0);
|
|
|
|
Affected Rows: 6
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('calc_ngx_country');
|
|
|
|
+--------------------------------------+
|
|
| ADMIN FLUSH_FLOW('calc_ngx_country') |
|
|
+--------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+--------------------------------------+
|
|
|
|
SELECT
|
|
client,
|
|
country,
|
|
country_count,
|
|
time_window
|
|
FROM
|
|
ngx_country;
|
|
|
|
+--------+---------+---------------+---------------------+
|
|
| client | country | country_count | time_window |
|
|
+--------+---------+---------------+---------------------+
|
|
| cli1 | | 2 | 1970-01-01T00:00:00 |
|
|
| cli1 | b | 1 | 1970-01-01T00:00:00 |
|
|
| cli1 | c | 1 | 1970-01-01T00:00:00 |
|
|
| cli2 | | 2 | 1970-01-01T00:00:00 |
|
|
+--------+---------+---------------+---------------------+
|
|
|
|
-- making sure distinct is working
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", "b", 1);
|
|
|
|
Affected Rows: 1
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('calc_ngx_country');
|
|
|
|
+--------------------------------------+
|
|
| ADMIN FLUSH_FLOW('calc_ngx_country') |
|
|
+--------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+--------------------------------------+
|
|
|
|
SELECT
|
|
client,
|
|
country,
|
|
country_count,
|
|
time_window
|
|
FROM
|
|
ngx_country;
|
|
|
|
+--------+---------+---------------+---------------------+
|
|
| client | country | country_count | time_window |
|
|
+--------+---------+---------------+---------------------+
|
|
| cli1 | | 2 | 1970-01-01T00:00:00 |
|
|
| cli1 | b | 2 | 1970-01-01T00:00:00 |
|
|
| cli1 | c | 1 | 1970-01-01T00:00:00 |
|
|
| cli2 | | 2 | 1970-01-01T00:00:00 |
|
|
+--------+---------+---------------+---------------------+
|
|
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", "c", 2);
|
|
|
|
Affected Rows: 1
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('calc_ngx_country');
|
|
|
|
+--------------------------------------+
|
|
| ADMIN FLUSH_FLOW('calc_ngx_country') |
|
|
+--------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+--------------------------------------+
|
|
|
|
SELECT
|
|
client,
|
|
country,
|
|
country_count,
|
|
time_window
|
|
FROM
|
|
ngx_country;
|
|
|
|
+--------+---------+---------------+---------------------+
|
|
| client | country | country_count | time_window |
|
|
+--------+---------+---------------+---------------------+
|
|
| cli1 | | 2 | 1970-01-01T00:00:00 |
|
|
| cli1 | b | 2 | 1970-01-01T00:00:00 |
|
|
| cli1 | c | 2 | 1970-01-01T00:00:00 |
|
|
| cli2 | | 2 | 1970-01-01T00:00:00 |
|
|
+--------+---------+---------------+---------------------+
|
|
|
|
DROP FLOW calc_ngx_country;
|
|
|
|
Affected Rows: 0
|
|
|
|
DROP TABLE ngx_access_log;
|
|
|
|
Affected Rows: 0
|
|
|
|
DROP TABLE ngx_country;
|
|
|
|
Affected Rows: 0
|
|
|
|
-- test nullable pk with no default value
|
|
CREATE TABLE nullable_pk (
|
|
pid INT NULL,
|
|
client STRING,
|
|
ts TIMESTAMP TIME INDEX,
|
|
PRIMARY KEY (pid)
|
|
) WITH (
|
|
append_mode = 'true'
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE TABLE out_nullable_pk (
|
|
pid INT NULL,
|
|
client STRING,
|
|
ts TIMESTAMP TIME INDEX,
|
|
PRIMARY KEY (pid, client)
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE FLOW calc_nullable_pk SINK TO out_nullable_pk AS
|
|
SELECT
|
|
pid,
|
|
client,
|
|
ts
|
|
FROM
|
|
nullable_pk;
|
|
|
|
Affected Rows: 0
|
|
|
|
INSERT INTO
|
|
nullable_pk
|
|
VALUES
|
|
(1, "name1", "2024-10-18 19:00:00"),
|
|
(2, "name2", "2024-10-18 19:00:00"),
|
|
(3, "name3", "2024-10-18 19:00:00");
|
|
|
|
Affected Rows: 3
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('calc_nullable_pk');
|
|
|
|
+--------------------------------------+
|
|
| ADMIN FLUSH_FLOW('calc_nullable_pk') |
|
|
+--------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+--------------------------------------+
|
|
|
|
SELECT * FROM out_nullable_pk;
|
|
|
|
+-----+--------+---------------------+
|
|
| pid | client | ts |
|
|
+-----+--------+---------------------+
|
|
| 1 | name1 | 2024-10-18T19:00:00 |
|
|
| 2 | name2 | 2024-10-18T19:00:00 |
|
|
| 3 | name3 | 2024-10-18T19:00:00 |
|
|
+-----+--------+---------------------+
|
|
|
|
-- pk is nullable
|
|
INSERT INTO
|
|
nullable_pk (client, ts)
|
|
VALUES
|
|
("name1", "2024-10-18 19:00:00"),
|
|
("name2", "2024-10-18 19:00:00"),
|
|
("name3", "2024-10-18 19:00:00");
|
|
|
|
Affected Rows: 3
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('calc_nullable_pk');
|
|
|
|
+--------------------------------------+
|
|
| ADMIN FLUSH_FLOW('calc_nullable_pk') |
|
|
+--------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+--------------------------------------+
|
|
|
|
SELECT * FROM out_nullable_pk;
|
|
|
|
+-----+--------+---------------------+
|
|
| pid | client | ts |
|
|
+-----+--------+---------------------+
|
|
| | name1 | 2024-10-18T19:00:00 |
|
|
| | name2 | 2024-10-18T19:00:00 |
|
|
| | name3 | 2024-10-18T19:00:00 |
|
|
| 1 | name1 | 2024-10-18T19:00:00 |
|
|
| 2 | name2 | 2024-10-18T19:00:00 |
|
|
| 3 | name3 | 2024-10-18T19:00:00 |
|
|
+-----+--------+---------------------+
|
|
|
|
DROP FLOW calc_nullable_pk;
|
|
|
|
Affected Rows: 0
|
|
|
|
DROP TABLE nullable_pk;
|
|
|
|
Affected Rows: 0
|
|
|
|
DROP TABLE out_nullable_pk;
|
|
|
|
Affected Rows: 0
|
|
|