test: more sqlness tests for flow (#4988)

* tests: more flow testcase

* tests(WIP): more tests

* tests: more flow tests

* test: wired regex for sqlness

* refactor: put blog&example to two files
This commit is contained in:
discord9
2024-11-14 15:40:14 +08:00
committed by GitHub
parent 1101e98651
commit 35898f0b2e
9 changed files with 1653 additions and 121 deletions

View File

@@ -19,12 +19,13 @@ Affected Rows: 0
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
admin flush_flow('test_numbers_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| 0 |
| FLOW_FLUSHED |
+----------------------------------------+
-- SQLNESS ARG restart=true
@@ -36,12 +37,13 @@ VALUES
Affected Rows: 2
admin flush_flow('test_numbers_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| 1 |
| FLOW_FLUSHED |
+----------------------------------------+
SELECT
@@ -57,12 +59,13 @@ FROM
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+---------------------------------+---------------------+---------------------+
admin flush_flow('test_numbers_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| 0 |
| FLOW_FLUSHED |
+----------------------------------------+
INSERT INTO
@@ -73,12 +76,13 @@ VALUES
Affected Rows: 2
admin flush_flow('test_numbers_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| 1 |
| FLOW_FLUSHED |
+----------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
@@ -128,12 +132,13 @@ Affected Rows: 0
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
admin flush_flow('test_distinct_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN flush_flow('test_distinct_basic') |
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| 0 |
| FLOW_FLUSHED |
+-----------------------------------------+
-- SQLNESS ARG restart=true
@@ -146,12 +151,13 @@ VALUES
Affected Rows: 3
admin flush_flow('test_distinct_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN flush_flow('test_distinct_basic') |
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| 1 |
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT
@@ -166,12 +172,13 @@ FROM
| 22 |
+-----+
admin flush_flow('test_distinct_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN flush_flow('test_distinct_basic') |
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| 0 |
| FLOW_FLUSHED |
+-----------------------------------------+
INSERT INTO
@@ -182,12 +189,13 @@ VALUES
Affected Rows: 2
admin flush_flow('test_distinct_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN flush_flow('test_distinct_basic') |
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| 1 |
| FLOW_FLUSHED |
+-----------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
@@ -306,12 +314,13 @@ VALUES
Affected Rows: 2
admin flush_flow('find_approx_rate');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN flush_flow('find_approx_rate') |
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| 1 |
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
@@ -334,12 +343,13 @@ VALUES
Affected Rows: 2
admin flush_flow('find_approx_rate');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN flush_flow('find_approx_rate') |
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| 1 |
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
@@ -392,12 +402,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
@@ -419,12 +430,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
@@ -445,12 +457,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
@@ -505,12 +518,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
@@ -533,12 +547,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
@@ -560,12 +575,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
@@ -633,12 +649,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| 1 |
| FLOW_FLUSHED |
+-------------------------------------+
-- This table should not exist yet
@@ -657,12 +674,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| 1 |
| FLOW_FLUSHED |
+-------------------------------------+
SHOW TABLES LIKE 'temp_alerts';
@@ -693,12 +711,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| 1 |
| FLOW_FLUSHED |
+-------------------------------------+
SELECT
@@ -769,12 +788,13 @@ VALUES
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_distribution');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_distribution') |
+-------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT
@@ -802,12 +822,13 @@ VALUES
Affected Rows: 5
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_distribution');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_distribution') |
+-------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT
@@ -880,12 +901,13 @@ VALUES
Affected Rows: 8
admin flush_flow('requests_long_term');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('requests_long_term');
+----------------------------------------+
| ADMIN flush_flow('requests_long_term') |
| ADMIN FLUSH_FLOW('requests_long_term') |
+----------------------------------------+
| 1 |
| FLOW_FLUSHED |
+----------------------------------------+
SELECT
@@ -916,3 +938,187 @@ DROP TABLE requests;
Affected Rows: 0
CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE android_log_abnormal (
crash BIGINT NULL,
fatal BIGINT NULL,
backtrace BIGINT NULL,
anr BIGINT NULL,
time_window TIMESTAMP(9) TIME INDEX,
update_at TIMESTAMP,
);
Affected Rows: 0
CREATE FLOW calc_android_log_abnormal
SINK TO android_log_abnormal
AS
SELECT
sum(case when `log` LIKE '%am_crash%' then 1 else 0 end) as crash,
sum(case when `log` LIKE '%FATAL EXCEPTION%' then 1 else 0 end) as fatal,
sum(case when `log` LIKE '%backtrace%' then 1 else 0 end) as backtrace,
sum(case when `log` LIKE '%am_anr%' then 1 else 0 end) as anr,
date_bin(INTERVAL '5 minutes', ts) as time_window,
FROM android_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
+-----------------------------------------------+
| ADMIN FLUSH_FLOW('calc_android_log_abnormal') |
+-----------------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------------+
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
+-------+-------+-----------+-----+
| crash | fatal | backtrace | anr |
+-------+-------+-----------+-----+
| 1 | 0 | 1 | 0 |
+-------+-------+-----------+-----+
INSERT INTO android_log values
("FATAL EXCEPTION", "2021-07-01 00:01:01.000"),
("mamam_anraaaa", "2021-07-01 00:01:01.000");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
+-----------------------------------------------+
| ADMIN FLUSH_FLOW('calc_android_log_abnormal') |
+-----------------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------------+
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
+-------+-------+-----------+-----+
| crash | fatal | backtrace | anr |
+-------+-------+-----------+-----+
| 1 | 1 | 1 | 1 |
+-------+-------+-----------+-----+
DROP FLOW calc_android_log_abnormal;
Affected Rows: 0
DROP TABLE android_log_abnormal;
Affected Rows: 0
DROP TABLE android_log;
Affected Rows: 0
CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE android_log_abnormal (
crash BIGINT NULL,
fatal BIGINT NULL,
backtrace BIGINT NULL,
anr BIGINT NULL,
time_window TIMESTAMP(9) TIME INDEX,
update_at TIMESTAMP,
);
Affected Rows: 0
CREATE FLOW calc_android_log_abnormal
SINK TO android_log_abnormal
AS
SELECT
sum(case when regexp_like(`log`, '.*am_crash.*') then 1 else 0 end) as crash,
sum(case when regexp_like(`log`, '.*FATAL EXCEPTION.*') then 1 else 0 end) as fatal,
sum(case when regexp_like(`log`, '.*backtrace.*') then 1 else 0 end) as backtrace,
sum(case when regexp_like(`log`, '.*am_anr.*') then 1 else 0 end) as anr,
date_bin(INTERVAL '5 minutes', ts) as time_window,
FROM android_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
+-----------------------------------------------+
| ADMIN FLUSH_FLOW('calc_android_log_abnormal') |
+-----------------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------------+
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
+-------+-------+-----------+-----+
| crash | fatal | backtrace | anr |
+-------+-------+-----------+-----+
| 1 | 0 | 1 | 0 |
+-------+-------+-----------+-----+
INSERT INTO android_log values
("FATAL EXCEPTION", "2021-07-01 00:01:01.000"),
("mamam_anraaaa", "2021-07-01 00:01:01.000");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
+-----------------------------------------------+
| ADMIN FLUSH_FLOW('calc_android_log_abnormal') |
+-----------------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------------+
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
+-------+-------+-----------+-----+
| crash | fatal | backtrace | anr |
+-------+-------+-----------+-----+
| 1 | 1 | 1 | 1 |
+-------+-------+-----------+-----+
DROP FLOW calc_android_log_abnormal;
Affected Rows: 0
DROP TABLE android_log_abnormal;
Affected Rows: 0
DROP TABLE android_log;
Affected Rows: 0

View File

@@ -15,7 +15,8 @@ GROUP BY
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
admin flush_flow('test_numbers_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
-- SQLNESS ARG restart=true
INSERT INTO
@@ -24,7 +25,8 @@ VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
admin flush_flow('test_numbers_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
SELECT
"SUM(numbers_input_basic.number)",
@@ -33,7 +35,8 @@ SELECT
FROM
out_num_cnt_basic;
admin flush_flow('test_numbers_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
INSERT INTO
numbers_input_basic
@@ -41,7 +44,8 @@ VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
admin flush_flow('test_numbers_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT
@@ -73,7 +77,8 @@ FROM
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
admin flush_flow('test_distinct_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
-- SQLNESS ARG restart=true
INSERT INTO
@@ -83,14 +88,16 @@ VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
admin flush_flow('test_distinct_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
SELECT
dis
FROM
out_distinct_basic;
admin flush_flow('test_distinct_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
INSERT INTO
distinct_basic
@@ -98,7 +105,8 @@ VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
admin flush_flow('test_distinct_basic');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT
@@ -173,7 +181,8 @@ VALUES
(101, '2025-01-01 00:00:01'),
(300, '2025-01-01 00:00:29');
admin flush_flow('find_approx_rate');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
@@ -187,7 +196,8 @@ VALUES
(450, '2025-01-01 00:00:32'),
(500, '2025-01-01 00:00:37');
admin flush_flow('find_approx_rate');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT
rate,
@@ -220,6 +230,7 @@ INSERT INTO
VALUES
("cli1", "b", 0);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SELECT
@@ -233,6 +244,7 @@ INSERT INTO
VALUES
("cli1", "b", 1);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SELECT
@@ -245,6 +257,7 @@ INSERT INTO
VALUES
("cli1", "c", 2);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SELECT
@@ -280,6 +293,7 @@ INSERT INTO
VALUES
("cli1", "b", 0);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SELECT
@@ -294,6 +308,7 @@ INSERT INTO
VALUES
("cli1", "b", 1);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SELECT
@@ -307,6 +322,7 @@ INSERT INTO
VALUES
("cli1", "c", 2);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SELECT
@@ -353,6 +369,7 @@ INSERT INTO
VALUES
(1, "room1", 50, 0);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
-- This table should not exist yet
@@ -363,6 +380,7 @@ INSERT INTO
VALUES
(1, "room1", 150, 1);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
SHOW TABLES LIKE 'temp_alerts';
@@ -379,6 +397,7 @@ INSERT INTO
VALUES
(2, "room1", 0, 2);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
SELECT
@@ -429,6 +448,7 @@ INSERT INTO
VALUES
("cli1", 200, 100, 0);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_distribution');
SELECT
@@ -448,6 +468,7 @@ VALUES
("cli1", 200, 210, 1),
("cli2", 200, 300, 1);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_distribution');
SELECT
@@ -497,7 +518,8 @@ VALUES
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");
admin flush_flow('requests_long_term');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('requests_long_term');
SELECT
*
@@ -508,4 +530,108 @@ DROP FLOW requests_long_term;
DROP TABLE requests_without_ip;
DROP TABLE requests;
DROP TABLE requests;
CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
);
CREATE TABLE android_log_abnormal (
crash BIGINT NULL,
fatal BIGINT NULL,
backtrace BIGINT NULL,
anr BIGINT NULL,
time_window TIMESTAMP(9) TIME INDEX,
update_at TIMESTAMP,
);
CREATE FLOW calc_android_log_abnormal
SINK TO android_log_abnormal
AS
SELECT
sum(case when `log` LIKE '%am_crash%' then 1 else 0 end) as crash,
sum(case when `log` LIKE '%FATAL EXCEPTION%' then 1 else 0 end) as fatal,
sum(case when `log` LIKE '%backtrace%' then 1 else 0 end) as backtrace,
sum(case when `log` LIKE '%am_anr%' then 1 else 0 end) as anr,
date_bin(INTERVAL '5 minutes', ts) as time_window,
FROM android_log
GROUP BY
time_window;
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
INSERT INTO android_log values
("FATAL EXCEPTION", "2021-07-01 00:01:01.000"),
("mamam_anraaaa", "2021-07-01 00:01:01.000");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
DROP FLOW calc_android_log_abnormal;
DROP TABLE android_log_abnormal;
DROP TABLE android_log;
CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
);
CREATE TABLE android_log_abnormal (
crash BIGINT NULL,
fatal BIGINT NULL,
backtrace BIGINT NULL,
anr BIGINT NULL,
time_window TIMESTAMP(9) TIME INDEX,
update_at TIMESTAMP,
);
CREATE FLOW calc_android_log_abnormal
SINK TO android_log_abnormal
AS
SELECT
sum(case when regexp_like(`log`, '.*am_crash.*') then 1 else 0 end) as crash,
sum(case when regexp_like(`log`, '.*FATAL EXCEPTION.*') then 1 else 0 end) as fatal,
sum(case when regexp_like(`log`, '.*backtrace.*') then 1 else 0 end) as backtrace,
sum(case when regexp_like(`log`, '.*am_anr.*') then 1 else 0 end) as anr,
date_bin(INTERVAL '5 minutes', ts) as time_window,
FROM android_log
GROUP BY
time_window;
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
INSERT INTO android_log values
("FATAL EXCEPTION", "2021-07-01 00:01:01.000"),
("mamam_anraaaa", "2021-07-01 00:01:01.000");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
DROP FLOW calc_android_log_abnormal;
DROP TABLE android_log_abnormal;
DROP TABLE android_log;

View File

@@ -0,0 +1,106 @@
-- blog usecase
CREATE TABLE velocity (
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
left_wheel FLOAT,
right_wheel FLOAT,
TIME INDEX(ts)
);
Affected Rows: 0
CREATE TABLE avg_speed (
avg_speed DOUBLE,
start_window TIMESTAMP TIME INDEX,
end_window TIMESTAMP,
update_at TIMESTAMP,
);
Affected Rows: 0
CREATE FLOW calc_avg_speed SINK TO avg_speed AS
SELECT
avg((left_wheel + right_wheel) / 2)
FROM
velocity
WHERE
left_wheel > 0.5
AND right_wheel > 0.5
AND left_wheel < 60
AND right_wheel < 60
GROUP BY
tumble(ts, '5 second');
Affected Rows: 0
INSERT INTO
velocity
VALUES
("2021-07-01 00:00:00.200", 0.0, 0.7),
("2021-07-01 00:00:00.200", 0.0, 61.0),
("2021-07-01 00:00:02.500", 2.0, 1.0,);
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_avg_speed');
+------------------------------------+
| ADMIN FLUSH_FLOW('calc_avg_speed') |
+------------------------------------+
| FLOW_FLUSHED |
+------------------------------------+
SELECT
avg_speed,
start_window
FROM
avg_speed;
+-----------+---------------------+
| avg_speed | start_window |
+-----------+---------------------+
| 1.5 | 2021-07-01T00:00:00 |
+-----------+---------------------+
INSERT INTO
velocity
VALUES
("2021-07-01 00:00:05.100", 5.0, 4.0),
("2021-07-01 00:00:09.600", 2.3, 2.1);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_avg_speed');
+------------------------------------+
| ADMIN FLUSH_FLOW('calc_avg_speed') |
+------------------------------------+
| FLOW_FLUSHED |
+------------------------------------+
SELECT
avg_speed,
start_window
FROM
avg_speed;
+--------------------+---------------------+
| avg_speed | start_window |
+--------------------+---------------------+
| 1.5 | 2021-07-01T00:00:00 |
| 3.3499999046325684 | 2021-07-01T00:00:05 |
+--------------------+---------------------+
DROP FLOW calc_avg_speed;
Affected Rows: 0
DROP TABLE velocity;
Affected Rows: 0
DROP TABLE avg_speed;
Affected Rows: 0

View File

@@ -0,0 +1,64 @@
-- blog usecase
CREATE TABLE velocity (
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
left_wheel FLOAT,
right_wheel FLOAT,
TIME INDEX(ts)
);
CREATE TABLE avg_speed (
avg_speed DOUBLE,
start_window TIMESTAMP TIME INDEX,
end_window TIMESTAMP,
update_at TIMESTAMP,
);
CREATE FLOW calc_avg_speed SINK TO avg_speed AS
SELECT
avg((left_wheel + right_wheel) / 2)
FROM
velocity
WHERE
left_wheel > 0.5
AND right_wheel > 0.5
AND left_wheel < 60
AND right_wheel < 60
GROUP BY
tumble(ts, '5 second');
INSERT INTO
velocity
VALUES
("2021-07-01 00:00:00.200", 0.0, 0.7),
("2021-07-01 00:00:00.200", 0.0, 61.0),
("2021-07-01 00:00:02.500", 2.0, 1.0,);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_avg_speed');
SELECT
avg_speed,
start_window
FROM
avg_speed;
INSERT INTO
velocity
VALUES
("2021-07-01 00:00:05.100", 5.0, 4.0),
("2021-07-01 00:00:09.600", 2.3, 2.1);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_avg_speed');
SELECT
avg_speed,
start_window
FROM
avg_speed;
DROP FLOW calc_avg_speed;
DROP TABLE velocity;
DROP TABLE avg_speed;

View File

@@ -15,12 +15,13 @@ SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second
Affected Rows: 0
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 0 |
| FLOW_FLUSHED |
+------------------------------------------+
INSERT INTO numbers_input_df_func
@@ -31,12 +32,13 @@ VALUES
Affected Rows: 2
-- flush flow to make sure that table is created and data is inserted
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+------------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
@@ -48,12 +50,13 @@ SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM o
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+----------------------------------------+---------------------+---------------------+
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 0 |
| FLOW_FLUSHED |
+------------------------------------------+
INSERT INTO numbers_input_df_func
@@ -63,12 +66,13 @@ VALUES
Affected Rows: 2
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+------------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
@@ -110,12 +114,13 @@ SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second
Affected Rows: 0
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 0 |
| FLOW_FLUSHED |
+------------------------------------------+
INSERT INTO numbers_input_df_func
@@ -126,12 +131,13 @@ VALUES
Affected Rows: 2
-- flush flow to make sure that table is created and data is inserted
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+------------------------------------------+
SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
@@ -142,12 +148,13 @@ SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM o
| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+----------------------------------------+---------------------+---------------------+
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 0 |
| FLOW_FLUSHED |
+------------------------------------------+
INSERT INTO numbers_input_df_func
@@ -157,12 +164,13 @@ VALUES
Affected Rows: 2
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+------------------------------------------+
SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
@@ -203,12 +211,13 @@ SELECT max(number) - min(number) as maxmin, date_bin(INTERVAL '1 second', ts, '2
Affected Rows: 0
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 0 |
| FLOW_FLUSHED |
+------------------------------------------+
INSERT INTO numbers_input_df_func
@@ -218,12 +227,13 @@ VALUES
Affected Rows: 2
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+------------------------------------------+
SELECT maxmin, time_window FROM out_num_cnt_df_func;
@@ -234,12 +244,13 @@ SELECT maxmin, time_window FROM out_num_cnt_df_func;
| 2 | 2021-07-01T00:00:00 |
+--------+---------------------+
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 0 |
| FLOW_FLUSHED |
+------------------------------------------+
INSERT INTO numbers_input_df_func
@@ -249,12 +260,13 @@ VALUES
Affected Rows: 2
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+------------------------------------------+
SELECT maxmin, time_window FROM out_num_cnt_df_func;
@@ -295,12 +307,13 @@ SELECT date_trunc('second', ts) as time_window, sum(number) as sum_num FROM numb
Affected Rows: 0
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 0 |
| FLOW_FLUSHED |
+------------------------------------------+
INSERT INTO numbers_input_df_func
@@ -310,12 +323,13 @@ VALUES
Affected Rows: 2
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+------------------------------------------+
SELECT time_window, sum_num FROM out_num_cnt;
@@ -326,12 +340,13 @@ SELECT time_window, sum_num FROM out_num_cnt;
| 2021-07-01T00:00:00 | 42 |
+---------------------+---------+
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 0 |
| FLOW_FLUSHED |
+------------------------------------------+
INSERT INTO numbers_input_df_func
@@ -341,12 +356,13 @@ VALUES
Affected Rows: 2
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
+------------------------------------------+
| ADMIN flush_flow('test_numbers_df_func') |
| ADMIN FLUSH_FLOW('test_numbers_df_func') |
+------------------------------------------+
| 1 |
| FLOW_FLUSHED |
+------------------------------------------+
SELECT time_window, sum_num FROM out_num_cnt;

View File

@@ -11,7 +11,8 @@ SINK TO out_num_cnt_df_func
AS
SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
@@ -19,19 +20,22 @@ VALUES
(22, "2021-07-01 00:00:00.600");
-- flush flow to make sure that table is created and data is inserted
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
@@ -53,7 +57,8 @@ SINK TO out_num_cnt_df_func
AS
SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
@@ -61,18 +66,21 @@ VALUES
(22, "2021-07-01 00:00:00.600");
-- flush flow to make sure that table is created and data is inserted
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(-24,"2021-07-01 00:00:01.500");
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func;
@@ -93,25 +101,29 @@ SINK TO out_num_cnt_df_func
AS
SELECT max(number) - min(number) as maxmin, date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::Timestamp) as time_window FROM numbers_input_df_func GROUP BY time_window;
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT maxmin, time_window FROM out_num_cnt_df_func;
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT maxmin, time_window FROM out_num_cnt_df_func;
@@ -133,25 +145,29 @@ SINK TO out_num_cnt
AS
SELECT date_trunc('second', ts) as time_window, sum(number) as sum_num FROM numbers_input_df_func GROUP BY date_trunc('second', ts);
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT time_window, sum_num FROM out_num_cnt;
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
INSERT INTO numbers_input_df_func
VALUES
(23,"2021-07-01 00:00:01.000"),
(24,"2021-07-01 00:00:01.500");
admin flush_flow('test_numbers_df_func');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_df_func');
SELECT time_window, sum_num FROM out_num_cnt;

View File

@@ -0,0 +1,587 @@
-- user guide example
CREATE TABLE `ngx_access_log` (
`client` STRING NULL,
`ua_platform` STRING NULL,
`referer` STRING NULL,
`method` STRING NULL,
`endpoint` STRING NULL,
`trace_id` STRING NULL FULLTEXT,
`protocol` STRING NULL,
`status` SMALLINT UNSIGNED NULL,
`size` DOUBLE NULL,
`agent` STRING NULL,
`access_time` TIMESTAMP(3) NOT NULL,
TIME INDEX (`access_time`)
) WITH(append_mode = 'true');
Affected Rows: 0
CREATE TABLE `ngx_statistics` (
`status` SMALLINT UNSIGNED NULL,
`total_logs` BIGINT NULL,
`min_size` DOUBLE NULL,
`max_size` DOUBLE NULL,
`avg_size` DOUBLE NULL,
`high_size_count` BIGINT NULL,
`time_window` TIMESTAMP time index,
`update_at` TIMESTAMP NULL,
PRIMARY KEY (`status`)
);
Affected Rows: 0
CREATE FLOW ngx_aggregation SINK TO ngx_statistics COMMENT 'Aggregate statistics for ngx_access_log' AS
SELECT
status,
count(client) AS total_logs,
min(size) as min_size,
max(size) as max_size,
avg(size) as avg_size,
sum(
case
when `size` > 550 then 1
else 0
end
) as high_size_count,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
status,
time_window;
Affected Rows: 0
INSERT INTO
ngx_access_log
VALUES
(
"android",
"Android",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
200,
1000,
"agent",
"2021-07-01 00:00:01.000"
),
(
"ios",
"iOS",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
200,
500,
"agent",
"2021-07-01 00:00:30.500"
),
(
"android",
"Android",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
200,
600,
"agent",
"2021-07-01 00:01:01.000"
),
(
"ios",
"iOS",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
404,
700,
"agent",
"2021-07-01 00:01:01.500"
);
Affected Rows: 4
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('ngx_aggregation');
+-------------------------------------+
| ADMIN FLUSH_FLOW('ngx_aggregation') |
+-------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------+
SELECT
status,
total_logs,
min_size,
max_size,
avg_size,
high_size_count,
time_window
FROM
ngx_statistics;
+--------+------------+----------+----------+----------+-----------------+---------------------+
| status | total_logs | min_size | max_size | avg_size | high_size_count | time_window |
+--------+------------+----------+----------+----------+-----------------+---------------------+
| 200 | 2 | 500.0 | 1000.0 | 750.0 | 1 | 2021-07-01T00:00:00 |
| 200 | 1 | 600.0 | 600.0 | 600.0 | 1 | 2021-07-01T00:01:00 |
| 404 | 1 | 700.0 | 700.0 | 700.0 | 1 | 2021-07-01T00:01:00 |
+--------+------------+----------+----------+----------+-----------------+---------------------+
INSERT INTO
ngx_access_log
VALUES
(
"android",
"Android",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
200,
500,
"agent",
"2021-07-01 00:01:01.000"
),
(
"ios",
"iOS",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
404,
800,
"agent",
"2021-07-01 00:01:01.500"
);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('ngx_aggregation');
+-------------------------------------+
| ADMIN FLUSH_FLOW('ngx_aggregation') |
+-------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------+
SELECT
status,
total_logs,
min_size,
max_size,
avg_size,
high_size_count,
time_window
FROM
ngx_statistics;
+--------+------------+----------+----------+----------+-----------------+---------------------+
| status | total_logs | min_size | max_size | avg_size | high_size_count | time_window |
+--------+------------+----------+----------+----------+-----------------+---------------------+
| 200 | 2 | 500.0 | 1000.0 | 750.0 | 1 | 2021-07-01T00:00:00 |
| 200 | 2 | 500.0 | 600.0 | 550.0 | 1 | 2021-07-01T00:01:00 |
| 404 | 2 | 700.0 | 800.0 | 750.0 | 2 | 2021-07-01T00:01:00 |
+--------+------------+----------+----------+----------+-----------------+---------------------+
DROP FLOW ngx_aggregation;
Affected Rows: 0
DROP TABLE ngx_statistics;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
/* Usecase example */
/* Real-time analytics example */
/* input table */
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);
Affected Rows: 0
/* sink table */
CREATE TABLE ngx_country (
country STRING,
update_at TIMESTAMP,
__ts_placeholder TIMESTAMP TIME INDEX,
PRIMARY KEY(country)
);
Affected Rows: 0
/* create flow task to calculate the distinct country */
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
DISTINCT country,
FROM
ngx_access_log;
Affected Rows: 0
/* insert some data */
INSERT INTO
ngx_access_log
VALUES
("client1", "US", "2022-01-01 00:00:00"),
("client2", "US", "2022-01-01 00:00:01"),
("client3", "UK", "2022-01-01 00:00:02"),
("client4", "UK", "2022-01-01 00:00:03"),
("client5", "CN", "2022-01-01 00:00:04"),
("client6", "CN", "2022-01-01 00:00:05"),
("client7", "JP", "2022-01-01 00:00:06"),
("client8", "JP", "2022-01-01 00:00:07"),
("client9", "KR", "2022-01-01 00:00:08"),
("client10", "KR", "2022-01-01 00:00:09");
Affected Rows: 10
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
country
FROM
ngx_country;
+---------+
| country |
+---------+
| CN |
| JP |
| KR |
| UK |
| US |
+---------+
DROP FLOW calc_ngx_country;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
DROP TABLE ngx_country;
Affected Rows: 0
/* Real-time analytics example: Time Window */
/* input table */
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);
Affected Rows: 0
/* input table create same as above */
/* sink table */
CREATE TABLE ngx_country (
country STRING,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP,
PRIMARY KEY(country)
);
Affected Rows: 0
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
DISTINCT country,
date_bin(INTERVAL '1 hour', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
country,
time_window;
Affected Rows: 0
/* insert data using the same data as above */
/* insert some data */
INSERT INTO
ngx_access_log
VALUES
("client1", "US", "2022-01-01 00:00:00"),
("client2", "US", "2022-01-01 00:00:01"),
("client3", "UK", "2022-01-01 00:00:02"),
("client4", "UK", "2022-01-01 00:00:03"),
("client5", "CN", "2022-01-01 00:00:04"),
("client6", "CN", "2022-01-01 00:00:05"),
("client7", "JP", "2022-01-01 00:00:06"),
("client8", "JP", "2022-01-01 00:00:07"),
("client9", "KR", "2022-01-01 00:00:08"),
("client10", "KR", "2022-01-01 00:00:09");
Affected Rows: 10
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
country,
time_window
FROM
ngx_country;
+---------+---------------------+
| country | time_window |
+---------+---------------------+
| CN | 2022-01-01T00:00:00 |
| JP | 2022-01-01T00:00:00 |
| KR | 2022-01-01T00:00:00 |
| UK | 2022-01-01T00:00:00 |
| US | 2022-01-01T00:00:00 |
+---------+---------------------+
DROP FLOW calc_ngx_country;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
DROP TABLE ngx_country;
Affected Rows: 0
/* create input table */
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
/* Real-time monitoring example */
/* create sink table */
CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
update_at TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
);
Affected Rows: 0
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
SELECT
sensor_id,
loc,
max(temperature) as max_temp,
FROM
temp_sensor_data
GROUP BY
sensor_id,
loc
HAVING
max_temp > 100;
Affected Rows: 0
INSERT INTO
temp_sensor_data
VALUES
(1, "room1", 98.5, "2022-01-01 00:00:00"),
(2, "room2", 99.5, "2022-01-01 00:00:01");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------+
SELECT
sensor_id,
loc,
max_temp
FROM
temp_alerts;
++
++
INSERT INTO
temp_sensor_data
VALUES
(1, "room1", 101.5, "2022-01-01 00:00:02"),
(2, "room2", 102.5, "2022-01-01 00:00:03");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------+
/* wait at least one second for flow to update results to sink table */
SELECT
sensor_id,
loc,
max_temp
FROM
temp_alerts;
+-----------+-------+----------+
| sensor_id | loc | max_temp |
+-----------+-------+----------+
| 1 | room1 | 101.5 |
| 2 | room2 | 102.5 |
+-----------+-------+----------+
DROP FLOW temp_monitoring;
Affected Rows: 0
DROP TABLE temp_sensor_data;
Affected Rows: 0
DROP TABLE temp_alerts;
Affected Rows: 0
/* Real-time dashboard */
/* create input table */
CREATE TABLE ngx_access_log (
client STRING,
stat INT,
size INT,
access_time TIMESTAMP TIME INDEX
);
Affected Rows: 0
/* create sink table */
CREATE TABLE ngx_distribution (
stat INT,
bucket_size INT,
total_logs BIGINT,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP,
/* auto generated column to store the last update time */
PRIMARY KEY(stat, bucket_size)
);
Affected Rows: 0
/* create flow task to calculate the distribution of packet sizes for each status code */
CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS
SELECT
stat,
trunc(size, -1) :: INT as bucket_size,
count(client) AS total_logs,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
stat,
time_window,
bucket_size;
Affected Rows: 0
INSERT INTO
ngx_access_log
VALUES
("cli1", 200, 100, "2022-01-01 00:00:00"),
("cli2", 200, 104, "2022-01-01 00:00:01"),
("cli3", 200, 120, "2022-01-01 00:00:02"),
("cli4", 200, 124, "2022-01-01 00:00:03"),
("cli5", 200, 140, "2022-01-01 00:00:04"),
("cli6", 404, 144, "2022-01-01 00:00:05"),
("cli7", 404, 160, "2022-01-01 00:00:06"),
("cli8", 404, 164, "2022-01-01 00:00:07"),
("cli9", 404, 180, "2022-01-01 00:00:08"),
("cli10", 404, 184, "2022-01-01 00:00:09");
Affected Rows: 10
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_distribution');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_distribution') |
+-------------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT
stat,
bucket_size,
total_logs,
time_window
FROM
ngx_distribution;
+------+-------------+------------+---------------------+
| stat | bucket_size | total_logs | time_window |
+------+-------------+------------+---------------------+
| 200 | 100 | 2 | 2022-01-01T00:00:00 |
| 200 | 120 | 2 | 2022-01-01T00:00:00 |
| 200 | 140 | 1 | 2022-01-01T00:00:00 |
| 404 | 140 | 1 | 2022-01-01T00:00:00 |
| 404 | 160 | 2 | 2022-01-01T00:00:00 |
| 404 | 180 | 2 | 2022-01-01T00:00:00 |
+------+-------------+------------+---------------------+
DROP FLOW calc_ngx_distribution;
Affected Rows: 0
DROP TABLE ngx_distribution;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0

View File

@@ -0,0 +1,411 @@
-- user guide example
CREATE TABLE `ngx_access_log` (
`client` STRING NULL,
`ua_platform` STRING NULL,
`referer` STRING NULL,
`method` STRING NULL,
`endpoint` STRING NULL,
`trace_id` STRING NULL FULLTEXT,
`protocol` STRING NULL,
`status` SMALLINT UNSIGNED NULL,
`size` DOUBLE NULL,
`agent` STRING NULL,
`access_time` TIMESTAMP(3) NOT NULL,
TIME INDEX (`access_time`)
) WITH(append_mode = 'true');
CREATE TABLE `ngx_statistics` (
`status` SMALLINT UNSIGNED NULL,
`total_logs` BIGINT NULL,
`min_size` DOUBLE NULL,
`max_size` DOUBLE NULL,
`avg_size` DOUBLE NULL,
`high_size_count` BIGINT NULL,
`time_window` TIMESTAMP time index,
`update_at` TIMESTAMP NULL,
PRIMARY KEY (`status`)
);
CREATE FLOW ngx_aggregation SINK TO ngx_statistics COMMENT 'Aggregate statistics for ngx_access_log' AS
SELECT
status,
count(client) AS total_logs,
min(size) as min_size,
max(size) as max_size,
avg(size) as avg_size,
sum(
case
when `size` > 550 then 1
else 0
end
) as high_size_count,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
status,
time_window;
INSERT INTO
ngx_access_log
VALUES
(
"android",
"Android",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
200,
1000,
"agent",
"2021-07-01 00:00:01.000"
),
(
"ios",
"iOS",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
200,
500,
"agent",
"2021-07-01 00:00:30.500"
),
(
"android",
"Android",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
200,
600,
"agent",
"2021-07-01 00:01:01.000"
),
(
"ios",
"iOS",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
404,
700,
"agent",
"2021-07-01 00:01:01.500"
);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('ngx_aggregation');
SELECT
status,
total_logs,
min_size,
max_size,
avg_size,
high_size_count,
time_window
FROM
ngx_statistics;
INSERT INTO
ngx_access_log
VALUES
(
"android",
"Android",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
200,
500,
"agent",
"2021-07-01 00:01:01.000"
),
(
"ios",
"iOS",
"referer",
"GET",
"/api/v1",
"trace_id",
"HTTP",
404,
800,
"agent",
"2021-07-01 00:01:01.500"
);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('ngx_aggregation');
SELECT
status,
total_logs,
min_size,
max_size,
avg_size,
high_size_count,
time_window
FROM
ngx_statistics;
DROP FLOW ngx_aggregation;
DROP TABLE ngx_statistics;
DROP TABLE ngx_access_log;
/* Usecase example */
/* Real-time analytics example */
/* input table */
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);
/* sink table */
CREATE TABLE ngx_country (
country STRING,
update_at TIMESTAMP,
__ts_placeholder TIMESTAMP TIME INDEX,
PRIMARY KEY(country)
);
/* create flow task to calculate the distinct country */
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
DISTINCT country,
FROM
ngx_access_log;
/* insert some data */
INSERT INTO
ngx_access_log
VALUES
("client1", "US", "2022-01-01 00:00:00"),
("client2", "US", "2022-01-01 00:00:01"),
("client3", "UK", "2022-01-01 00:00:02"),
("client4", "UK", "2022-01-01 00:00:03"),
("client5", "CN", "2022-01-01 00:00:04"),
("client6", "CN", "2022-01-01 00:00:05"),
("client7", "JP", "2022-01-01 00:00:06"),
("client8", "JP", "2022-01-01 00:00:07"),
("client9", "KR", "2022-01-01 00:00:08"),
("client10", "KR", "2022-01-01 00:00:09");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SELECT
country
FROM
ngx_country;
DROP FLOW calc_ngx_country;
DROP TABLE ngx_access_log;
DROP TABLE ngx_country;
/* Real-time analytics example: Time Window */
/* input table */
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);
/* input table create same as above */
/* sink table */
CREATE TABLE ngx_country (
country STRING,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP,
PRIMARY KEY(country)
);
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
DISTINCT country,
date_bin(INTERVAL '1 hour', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
country,
time_window;
/* insert data using the same data as above */
/* insert some data */
INSERT INTO
ngx_access_log
VALUES
("client1", "US", "2022-01-01 00:00:00"),
("client2", "US", "2022-01-01 00:00:01"),
("client3", "UK", "2022-01-01 00:00:02"),
("client4", "UK", "2022-01-01 00:00:03"),
("client5", "CN", "2022-01-01 00:00:04"),
("client6", "CN", "2022-01-01 00:00:05"),
("client7", "JP", "2022-01-01 00:00:06"),
("client8", "JP", "2022-01-01 00:00:07"),
("client9", "KR", "2022-01-01 00:00:08"),
("client10", "KR", "2022-01-01 00:00:09");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
SELECT
country,
time_window
FROM
ngx_country;
DROP FLOW calc_ngx_country;
DROP TABLE ngx_access_log;
DROP TABLE ngx_country;
/* create input table */
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX
);
/* Real-time monitoring example */
/* create sink table */
CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
update_at TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
);
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
SELECT
sensor_id,
loc,
max(temperature) as max_temp,
FROM
temp_sensor_data
GROUP BY
sensor_id,
loc
HAVING
max_temp > 100;
INSERT INTO
temp_sensor_data
VALUES
(1, "room1", 98.5, "2022-01-01 00:00:00"),
(2, "room2", 99.5, "2022-01-01 00:00:01");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
SELECT
sensor_id,
loc,
max_temp
FROM
temp_alerts;
INSERT INTO
temp_sensor_data
VALUES
(1, "room1", 101.5, "2022-01-01 00:00:02"),
(2, "room2", 102.5, "2022-01-01 00:00:03");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
/* wait at least one second for flow to update results to sink table */
SELECT
sensor_id,
loc,
max_temp
FROM
temp_alerts;
DROP FLOW temp_monitoring;
DROP TABLE temp_sensor_data;
DROP TABLE temp_alerts;
/* Real-time dashboard */
/* create input table */
CREATE TABLE ngx_access_log (
client STRING,
stat INT,
size INT,
access_time TIMESTAMP TIME INDEX
);
/* create sink table */
CREATE TABLE ngx_distribution (
stat INT,
bucket_size INT,
total_logs BIGINT,
time_window TIMESTAMP TIME INDEX,
update_at TIMESTAMP,
/* auto generated column to store the last update time */
PRIMARY KEY(stat, bucket_size)
);
/* create flow task to calculate the distribution of packet sizes for each status code */
CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS
SELECT
stat,
trunc(size, -1) :: INT as bucket_size,
count(client) AS total_logs,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
stat,
time_window,
bucket_size;
INSERT INTO
ngx_access_log
VALUES
("cli1", 200, 100, "2022-01-01 00:00:00"),
("cli2", 200, 104, "2022-01-01 00:00:01"),
("cli3", 200, 120, "2022-01-01 00:00:02"),
("cli4", 200, 124, "2022-01-01 00:00:03"),
("cli5", 200, 140, "2022-01-01 00:00:04"),
("cli6", 404, 144, "2022-01-01 00:00:05"),
("cli7", 404, 160, "2022-01-01 00:00:06"),
("cli8", 404, 164, "2022-01-01 00:00:07"),
("cli9", 404, 180, "2022-01-01 00:00:08"),
("cli10", 404, 184, "2022-01-01 00:00:09");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_distribution');
SELECT
stat,
bucket_size,
total_logs,
time_window
FROM
ngx_distribution;
DROP FLOW calc_ngx_distribution;
DROP TABLE ngx_distribution;
DROP TABLE ngx_access_log;