From 35898f0b2ebdd0e8062227bb55b9e50a6c21be5d Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:40:14 +0800 Subject: [PATCH] test: more sqlness tests for flow (#4988) * tests: more flow testcase * tests(WIP): more tests * tests: more flow tests * test: wired regex for sqlness * refactor: put blog&example to two files --- src/flow/src/adapter.rs | 2 +- .../standalone/common/flow/flow_basic.result | 294 +++++++-- .../standalone/common/flow/flow_basic.sql | 150 ++++- .../standalone/common/flow/flow_blog.result | 106 ++++ .../standalone/common/flow/flow_blog.sql | 64 ++ .../common/flow/flow_call_df_func.result | 112 ++-- .../common/flow/flow_call_df_func.sql | 48 +- .../common/flow/flow_user_guide.result | 587 ++++++++++++++++++ .../common/flow/flow_user_guide.sql | 411 ++++++++++++ 9 files changed, 1653 insertions(+), 121 deletions(-) create mode 100644 tests/cases/standalone/common/flow/flow_blog.result create mode 100644 tests/cases/standalone/common/flow/flow_blog.sql create mode 100644 tests/cases/standalone/common/flow/flow_user_guide.result create mode 100644 tests/cases/standalone/common/flow/flow_user_guide.sql diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index d564e1dd09..dadc99f8ec 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -299,7 +299,7 @@ impl FlowWorkerManager { )]); } if row.len() != proto_schema.len() { - InternalSnafu { + UnexpectedSnafu { reason: format!( "Flow output row length mismatch, expect {} got {}, the columns in schema are: {:?}", proto_schema.len(), diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index 33d4ddacba..d3cc7f6680 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -19,12 +19,13 @@ Affected Rows: 0 -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -admin flush_flow('test_numbers_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ -| ADMIN flush_flow('test_numbers_basic') | +| ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ -| 0 | +| FLOW_FLUSHED | +----------------------------------------+ -- SQLNESS ARG restart=true @@ -36,12 +37,13 @@ VALUES Affected Rows: 2 -admin flush_flow('test_numbers_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ -| ADMIN flush_flow('test_numbers_basic') | +| ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ -| 1 | +| FLOW_FLUSHED | +----------------------------------------+ SELECT @@ -57,12 +59,13 @@ FROM | 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +---------------------------------+---------------------+---------------------+ -admin flush_flow('test_numbers_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ -| ADMIN flush_flow('test_numbers_basic') | +| ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ -| 0 | +| FLOW_FLUSHED | +----------------------------------------+ INSERT INTO @@ -73,12 +76,13 @@ VALUES Affected Rows: 2 -admin flush_flow('test_numbers_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ -| ADMIN flush_flow('test_numbers_basic') | +| ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ -| 1 | +| FLOW_FLUSHED | +----------------------------------------+ -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion @@ -128,12 +132,13 @@ Affected Rows: 0 -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -admin flush_flow('test_distinct_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ -| ADMIN flush_flow('test_distinct_basic') | +| ADMIN FLUSH_FLOW('test_distinct_basic') | +-----------------------------------------+ -| 0 | +| FLOW_FLUSHED | +-----------------------------------------+ -- SQLNESS ARG restart=true @@ -146,12 +151,13 @@ VALUES Affected Rows: 3 -admin flush_flow('test_distinct_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ -| ADMIN flush_flow('test_distinct_basic') | +| ADMIN FLUSH_FLOW('test_distinct_basic') | +-----------------------------------------+ -| 1 | +| FLOW_FLUSHED | +-----------------------------------------+ SELECT @@ -166,12 +172,13 @@ FROM | 22 | +-----+ -admin flush_flow('test_distinct_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ -| ADMIN flush_flow('test_distinct_basic') | +| ADMIN FLUSH_FLOW('test_distinct_basic') | +-----------------------------------------+ -| 0 | +| FLOW_FLUSHED | +-----------------------------------------+ INSERT INTO @@ -182,12 +189,13 @@ VALUES Affected Rows: 2 -admin flush_flow('test_distinct_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ -| ADMIN flush_flow('test_distinct_basic') | +| ADMIN FLUSH_FLOW('test_distinct_basic') | +-----------------------------------------+ -| 1 | +| FLOW_FLUSHED | +-----------------------------------------+ -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion @@ -306,12 +314,13 @@ VALUES Affected Rows: 2 -admin flush_flow('find_approx_rate'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); +--------------------------------------+ -| ADMIN flush_flow('find_approx_rate') | +| ADMIN FLUSH_FLOW('find_approx_rate') | +--------------------------------------+ -| 1 | +| FLOW_FLUSHED | +--------------------------------------+ SELECT @@ -334,12 +343,13 @@ VALUES Affected Rows: 2 -admin flush_flow('find_approx_rate'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); +--------------------------------------+ -| ADMIN flush_flow('find_approx_rate') | +| ADMIN FLUSH_FLOW('find_approx_rate') | +--------------------------------------+ -| 1 | +| FLOW_FLUSHED | +--------------------------------------+ SELECT @@ -392,12 +402,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ -| 1 | +| FLOW_FLUSHED | +--------------------------------------+ SELECT @@ -419,12 +430,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ -| 1 | +| FLOW_FLUSHED | +--------------------------------------+ SELECT @@ -445,12 +457,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ -| 1 | +| FLOW_FLUSHED | +--------------------------------------+ SELECT @@ -505,12 +518,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ -| 1 | +| FLOW_FLUSHED | +--------------------------------------+ SELECT @@ -533,12 +547,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ -| 1 | +| FLOW_FLUSHED | +--------------------------------------+ SELECT @@ -560,12 +575,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ -| 1 | +| FLOW_FLUSHED | +--------------------------------------+ SELECT @@ -633,12 +649,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('temp_monitoring'); +-------------------------------------+ | ADMIN FLUSH_FLOW('temp_monitoring') | +-------------------------------------+ -| 1 | +| FLOW_FLUSHED | +-------------------------------------+ -- This table should not exist yet @@ -657,12 +674,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('temp_monitoring'); +-------------------------------------+ | ADMIN FLUSH_FLOW('temp_monitoring') | +-------------------------------------+ -| 1 | +| FLOW_FLUSHED | +-------------------------------------+ SHOW TABLES LIKE 'temp_alerts'; @@ -693,12 +711,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('temp_monitoring'); +-------------------------------------+ | ADMIN FLUSH_FLOW('temp_monitoring') | +-------------------------------------+ -| 1 | +| FLOW_FLUSHED | +-------------------------------------+ SELECT @@ -769,12 +788,13 @@ VALUES Affected Rows: 1 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_distribution'); +-------------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_distribution') | +-------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +-------------------------------------------+ SELECT @@ -802,12 +822,13 @@ VALUES Affected Rows: 5 +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_distribution'); +-------------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_distribution') | +-------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +-------------------------------------------+ SELECT @@ -880,12 +901,13 @@ VALUES Affected Rows: 8 -admin flush_flow('requests_long_term'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('requests_long_term'); +----------------------------------------+ -| ADMIN flush_flow('requests_long_term') | +| ADMIN FLUSH_FLOW('requests_long_term') | +----------------------------------------+ -| 1 | +| FLOW_FLUSHED | +----------------------------------------+ SELECT @@ -916,3 +938,187 @@ DROP TABLE requests; Affected Rows: 0 +CREATE TABLE android_log ( + `log` STRING, + ts TIMESTAMP(9), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE TABLE android_log_abnormal ( + crash BIGINT NULL, + fatal BIGINT NULL, + backtrace BIGINT NULL, + anr BIGINT NULL, + time_window TIMESTAMP(9) TIME INDEX, + update_at TIMESTAMP, +); + +Affected Rows: 0 + +CREATE FLOW calc_android_log_abnormal +SINK TO android_log_abnormal +AS +SELECT + sum(case when `log` LIKE '%am_crash%' then 1 else 0 end) as crash, + sum(case when `log` LIKE '%FATAL EXCEPTION%' then 1 else 0 end) as fatal, + sum(case when `log` LIKE '%backtrace%' then 1 else 0 end) as backtrace, + sum(case when `log` LIKE '%am_anr%' then 1 else 0 end) as anr, + date_bin(INTERVAL '5 minutes', ts) as time_window, +FROM android_log +GROUP BY + time_window; + +Affected Rows: 0 + +INSERT INTO android_log values +("am_crash", "2021-07-01 00:01:01.000"), +("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_android_log_abnormal'); + ++-----------------------------------------------+ +| ADMIN FLUSH_FLOW('calc_android_log_abnormal') | ++-----------------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------------+ + +SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; + ++-------+-------+-----------+-----+ +| crash | fatal | backtrace | anr | ++-------+-------+-----------+-----+ +| 1 | 0 | 1 | 0 | ++-------+-------+-----------+-----+ + +INSERT INTO android_log values +("FATAL EXCEPTION", "2021-07-01 00:01:01.000"), +("mamam_anraaaa", "2021-07-01 00:01:01.000"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_android_log_abnormal'); + ++-----------------------------------------------+ +| ADMIN FLUSH_FLOW('calc_android_log_abnormal') | ++-----------------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------------+ + +SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; + ++-------+-------+-----------+-----+ +| crash | fatal | backtrace | anr | ++-------+-------+-----------+-----+ +| 1 | 1 | 1 | 1 | ++-------+-------+-----------+-----+ + +DROP FLOW calc_android_log_abnormal; + +Affected Rows: 0 + +DROP TABLE android_log_abnormal; + +Affected Rows: 0 + +DROP TABLE android_log; + +Affected Rows: 0 + +CREATE TABLE android_log ( + `log` STRING, + ts TIMESTAMP(9), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE TABLE android_log_abnormal ( + crash BIGINT NULL, + fatal BIGINT NULL, + backtrace BIGINT NULL, + anr BIGINT NULL, + time_window TIMESTAMP(9) TIME INDEX, + update_at TIMESTAMP, +); + +Affected Rows: 0 + +CREATE FLOW calc_android_log_abnormal +SINK TO android_log_abnormal +AS +SELECT + sum(case when regexp_like(`log`, '.*am_crash.*') then 1 else 0 end) as crash, + sum(case when regexp_like(`log`, '.*FATAL EXCEPTION.*') then 1 else 0 end) as fatal, + sum(case when regexp_like(`log`, '.*backtrace.*') then 1 else 0 end) as backtrace, + sum(case when regexp_like(`log`, '.*am_anr.*') then 1 else 0 end) as anr, + date_bin(INTERVAL '5 minutes', ts) as time_window, +FROM android_log +GROUP BY + time_window; + +Affected Rows: 0 + +INSERT INTO android_log values +("am_crash", "2021-07-01 00:01:01.000"), +("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_android_log_abnormal'); + ++-----------------------------------------------+ +| ADMIN FLUSH_FLOW('calc_android_log_abnormal') | ++-----------------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------------+ + +SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; + ++-------+-------+-----------+-----+ +| crash | fatal | backtrace | anr | ++-------+-------+-----------+-----+ +| 1 | 0 | 1 | 0 | ++-------+-------+-----------+-----+ + +INSERT INTO android_log values +("FATAL EXCEPTION", "2021-07-01 00:01:01.000"), +("mamam_anraaaa", "2021-07-01 00:01:01.000"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_android_log_abnormal'); + ++-----------------------------------------------+ +| ADMIN FLUSH_FLOW('calc_android_log_abnormal') | ++-----------------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------------+ + +SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; + ++-------+-------+-----------+-----+ +| crash | fatal | backtrace | anr | ++-------+-------+-----------+-----+ +| 1 | 1 | 1 | 1 | ++-------+-------+-----------+-----+ + +DROP FLOW calc_android_log_abnormal; + +Affected Rows: 0 + +DROP TABLE android_log_abnormal; + +Affected Rows: 0 + +DROP TABLE android_log; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 179a19195d..3a1a53d0ed 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -15,7 +15,8 @@ GROUP BY -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -admin flush_flow('test_numbers_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); -- SQLNESS ARG restart=true INSERT INTO @@ -24,7 +25,8 @@ VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -admin flush_flow('test_numbers_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); SELECT "SUM(numbers_input_basic.number)", @@ -33,7 +35,8 @@ SELECT FROM out_num_cnt_basic; -admin flush_flow('test_numbers_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); INSERT INTO numbers_input_basic @@ -41,7 +44,8 @@ VALUES (23, "2021-07-01 00:00:01.000"), (24, "2021-07-01 00:00:01.500"); -admin flush_flow('test_numbers_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_basic'); -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion SELECT @@ -73,7 +77,8 @@ FROM -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -admin flush_flow('test_distinct_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); -- SQLNESS ARG restart=true INSERT INTO @@ -83,14 +88,16 @@ VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -admin flush_flow('test_distinct_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); SELECT dis FROM out_distinct_basic; -admin flush_flow('test_distinct_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); INSERT INTO distinct_basic @@ -98,7 +105,8 @@ VALUES (23, "2021-07-01 00:00:01.000"), (24, "2021-07-01 00:00:01.500"); -admin flush_flow('test_distinct_basic'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion SELECT @@ -173,7 +181,8 @@ VALUES (101, '2025-01-01 00:00:01'), (300, '2025-01-01 00:00:29'); -admin flush_flow('find_approx_rate'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); SELECT rate, @@ -187,7 +196,8 @@ VALUES (450, '2025-01-01 00:00:32'), (500, '2025-01-01 00:00:37'); -admin flush_flow('find_approx_rate'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); SELECT rate, @@ -220,6 +230,7 @@ INSERT INTO VALUES ("cli1", "b", 0); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); SELECT @@ -233,6 +244,7 @@ INSERT INTO VALUES ("cli1", "b", 1); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); SELECT @@ -245,6 +257,7 @@ INSERT INTO VALUES ("cli1", "c", 2); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); SELECT @@ -280,6 +293,7 @@ INSERT INTO VALUES ("cli1", "b", 0); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); SELECT @@ -294,6 +308,7 @@ INSERT INTO VALUES ("cli1", "b", 1); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); SELECT @@ -307,6 +322,7 @@ INSERT INTO VALUES ("cli1", "c", 2); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); SELECT @@ -353,6 +369,7 @@ INSERT INTO VALUES (1, "room1", 50, 0); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('temp_monitoring'); -- This table should not exist yet @@ -363,6 +380,7 @@ INSERT INTO VALUES (1, "room1", 150, 1); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('temp_monitoring'); SHOW TABLES LIKE 'temp_alerts'; @@ -379,6 +397,7 @@ INSERT INTO VALUES (2, "room1", 0, 2); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('temp_monitoring'); SELECT @@ -429,6 +448,7 @@ INSERT INTO VALUES ("cli1", 200, 100, 0); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_distribution'); SELECT @@ -448,6 +468,7 @@ VALUES ("cli1", 200, 210, 1), ("cli2", 200, 300, 1); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_distribution'); SELECT @@ -497,7 +518,8 @@ VALUES ("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"), ("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31"); -admin flush_flow('requests_long_term'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('requests_long_term'); SELECT * @@ -508,4 +530,108 @@ DROP FLOW requests_long_term; DROP TABLE requests_without_ip; -DROP TABLE requests; \ No newline at end of file +DROP TABLE requests; + +CREATE TABLE android_log ( + `log` STRING, + ts TIMESTAMP(9), + TIME INDEX(ts) +); + +CREATE TABLE android_log_abnormal ( + crash BIGINT NULL, + fatal BIGINT NULL, + backtrace BIGINT NULL, + anr BIGINT NULL, + time_window TIMESTAMP(9) TIME INDEX, + update_at TIMESTAMP, +); + +CREATE FLOW calc_android_log_abnormal +SINK TO android_log_abnormal +AS +SELECT + sum(case when `log` LIKE '%am_crash%' then 1 else 0 end) as crash, + sum(case when `log` LIKE '%FATAL EXCEPTION%' then 1 else 0 end) as fatal, + sum(case when `log` LIKE '%backtrace%' then 1 else 0 end) as backtrace, + sum(case when `log` LIKE '%am_anr%' then 1 else 0 end) as anr, + date_bin(INTERVAL '5 minutes', ts) as time_window, +FROM android_log +GROUP BY + time_window; + +INSERT INTO android_log values +("am_crash", "2021-07-01 00:01:01.000"), +("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_android_log_abnormal'); + +SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; + +INSERT INTO android_log values +("FATAL EXCEPTION", "2021-07-01 00:01:01.000"), +("mamam_anraaaa", "2021-07-01 00:01:01.000"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_android_log_abnormal'); + +SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; + +DROP FLOW calc_android_log_abnormal; + +DROP TABLE android_log_abnormal; + +DROP TABLE android_log; + +CREATE TABLE android_log ( + `log` STRING, + ts TIMESTAMP(9), + TIME INDEX(ts) +); + +CREATE TABLE android_log_abnormal ( + crash BIGINT NULL, + fatal BIGINT NULL, + backtrace BIGINT NULL, + anr BIGINT NULL, + time_window TIMESTAMP(9) TIME INDEX, + update_at TIMESTAMP, +); + +CREATE FLOW calc_android_log_abnormal +SINK TO android_log_abnormal +AS +SELECT + sum(case when regexp_like(`log`, '.*am_crash.*') then 1 else 0 end) as crash, + sum(case when regexp_like(`log`, '.*FATAL EXCEPTION.*') then 1 else 0 end) as fatal, + sum(case when regexp_like(`log`, '.*backtrace.*') then 1 else 0 end) as backtrace, + sum(case when regexp_like(`log`, '.*am_anr.*') then 1 else 0 end) as anr, + date_bin(INTERVAL '5 minutes', ts) as time_window, +FROM android_log +GROUP BY + time_window; + +INSERT INTO android_log values +("am_crash", "2021-07-01 00:01:01.000"), +("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_android_log_abnormal'); + +SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; + +INSERT INTO android_log values +("FATAL EXCEPTION", "2021-07-01 00:01:01.000"), +("mamam_anraaaa", "2021-07-01 00:01:01.000"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_android_log_abnormal'); + +SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; + +DROP FLOW calc_android_log_abnormal; + +DROP TABLE android_log_abnormal; + +DROP TABLE android_log; diff --git a/tests/cases/standalone/common/flow/flow_blog.result b/tests/cases/standalone/common/flow/flow_blog.result new file mode 100644 index 0000000000..3046e147c0 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_blog.result @@ -0,0 +1,106 @@ +-- blog usecase +CREATE TABLE velocity ( + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + left_wheel FLOAT, + right_wheel FLOAT, + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE TABLE avg_speed ( + avg_speed DOUBLE, + start_window TIMESTAMP TIME INDEX, + end_window TIMESTAMP, + update_at TIMESTAMP, +); + +Affected Rows: 0 + +CREATE FLOW calc_avg_speed SINK TO avg_speed AS +SELECT + avg((left_wheel + right_wheel) / 2) +FROM + velocity +WHERE + left_wheel > 0.5 + AND right_wheel > 0.5 + AND left_wheel < 60 + AND right_wheel < 60 +GROUP BY + tumble(ts, '5 second'); + +Affected Rows: 0 + +INSERT INTO + velocity +VALUES + ("2021-07-01 00:00:00.200", 0.0, 0.7), + ("2021-07-01 00:00:00.200", 0.0, 61.0), + ("2021-07-01 00:00:02.500", 2.0, 1.0,); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_avg_speed'); + ++------------------------------------+ +| ADMIN FLUSH_FLOW('calc_avg_speed') | ++------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------+ + +SELECT + avg_speed, + start_window +FROM + avg_speed; + ++-----------+---------------------+ +| avg_speed | start_window | ++-----------+---------------------+ +| 1.5 | 2021-07-01T00:00:00 | ++-----------+---------------------+ + +INSERT INTO + velocity +VALUES + ("2021-07-01 00:00:05.100", 5.0, 4.0), + ("2021-07-01 00:00:09.600", 2.3, 2.1); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_avg_speed'); + ++------------------------------------+ +| ADMIN FLUSH_FLOW('calc_avg_speed') | ++------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------+ + +SELECT + avg_speed, + start_window +FROM + avg_speed; + ++--------------------+---------------------+ +| avg_speed | start_window | ++--------------------+---------------------+ +| 1.5 | 2021-07-01T00:00:00 | +| 3.3499999046325684 | 2021-07-01T00:00:05 | ++--------------------+---------------------+ + +DROP FLOW calc_avg_speed; + +Affected Rows: 0 + +DROP TABLE velocity; + +Affected Rows: 0 + +DROP TABLE avg_speed; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_blog.sql b/tests/cases/standalone/common/flow/flow_blog.sql new file mode 100644 index 0000000000..f40614bd9a --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_blog.sql @@ -0,0 +1,64 @@ +-- blog usecase +CREATE TABLE velocity ( + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + left_wheel FLOAT, + right_wheel FLOAT, + TIME INDEX(ts) +); + +CREATE TABLE avg_speed ( + avg_speed DOUBLE, + start_window TIMESTAMP TIME INDEX, + end_window TIMESTAMP, + update_at TIMESTAMP, +); + +CREATE FLOW calc_avg_speed SINK TO avg_speed AS +SELECT + avg((left_wheel + right_wheel) / 2) +FROM + velocity +WHERE + left_wheel > 0.5 + AND right_wheel > 0.5 + AND left_wheel < 60 + AND right_wheel < 60 +GROUP BY + tumble(ts, '5 second'); + +INSERT INTO + velocity +VALUES + ("2021-07-01 00:00:00.200", 0.0, 0.7), + ("2021-07-01 00:00:00.200", 0.0, 61.0), + ("2021-07-01 00:00:02.500", 2.0, 1.0,); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_avg_speed'); + +SELECT + avg_speed, + start_window +FROM + avg_speed; + +INSERT INTO + velocity +VALUES + ("2021-07-01 00:00:05.100", 5.0, 4.0), + ("2021-07-01 00:00:09.600", 2.3, 2.1); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_avg_speed'); + +SELECT + avg_speed, + start_window +FROM + avg_speed; + +DROP FLOW calc_avg_speed; + +DROP TABLE velocity; + +DROP TABLE avg_speed; diff --git a/tests/cases/standalone/common/flow/flow_call_df_func.result b/tests/cases/standalone/common/flow/flow_call_df_func.result index 0a8f4218bd..a2b300796b 100644 --- a/tests/cases/standalone/common/flow/flow_call_df_func.result +++ b/tests/cases/standalone/common/flow/flow_call_df_func.result @@ -15,12 +15,13 @@ SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second Affected Rows: 0 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 0 | +| FLOW_FLUSHED | +------------------------------------------+ INSERT INTO numbers_input_df_func @@ -31,12 +32,13 @@ VALUES Affected Rows: 2 -- flush flow to make sure that table is created and data is inserted -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +------------------------------------------+ -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion @@ -48,12 +50,13 @@ SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM o | 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +----------------------------------------+---------------------+---------------------+ -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 0 | +| FLOW_FLUSHED | +------------------------------------------+ INSERT INTO numbers_input_df_func @@ -63,12 +66,13 @@ VALUES Affected Rows: 2 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +------------------------------------------+ -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion @@ -110,12 +114,13 @@ SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second Affected Rows: 0 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 0 | +| FLOW_FLUSHED | +------------------------------------------+ INSERT INTO numbers_input_df_func @@ -126,12 +131,13 @@ VALUES Affected Rows: 2 -- flush flow to make sure that table is created and data is inserted -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +------------------------------------------+ SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; @@ -142,12 +148,13 @@ SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM o | 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +----------------------------------------+---------------------+---------------------+ -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 0 | +| FLOW_FLUSHED | +------------------------------------------+ INSERT INTO numbers_input_df_func @@ -157,12 +164,13 @@ VALUES Affected Rows: 2 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +------------------------------------------+ SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; @@ -203,12 +211,13 @@ SELECT max(number) - min(number) as maxmin, date_bin(INTERVAL '1 second', ts, '2 Affected Rows: 0 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 0 | +| FLOW_FLUSHED | +------------------------------------------+ INSERT INTO numbers_input_df_func @@ -218,12 +227,13 @@ VALUES Affected Rows: 2 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +------------------------------------------+ SELECT maxmin, time_window FROM out_num_cnt_df_func; @@ -234,12 +244,13 @@ SELECT maxmin, time_window FROM out_num_cnt_df_func; | 2 | 2021-07-01T00:00:00 | +--------+---------------------+ -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 0 | +| FLOW_FLUSHED | +------------------------------------------+ INSERT INTO numbers_input_df_func @@ -249,12 +260,13 @@ VALUES Affected Rows: 2 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +------------------------------------------+ SELECT maxmin, time_window FROM out_num_cnt_df_func; @@ -295,12 +307,13 @@ SELECT date_trunc('second', ts) as time_window, sum(number) as sum_num FROM numb Affected Rows: 0 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 0 | +| FLOW_FLUSHED | +------------------------------------------+ INSERT INTO numbers_input_df_func @@ -310,12 +323,13 @@ VALUES Affected Rows: 2 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +------------------------------------------+ SELECT time_window, sum_num FROM out_num_cnt; @@ -326,12 +340,13 @@ SELECT time_window, sum_num FROM out_num_cnt; | 2021-07-01T00:00:00 | 42 | +---------------------+---------+ -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 0 | +| FLOW_FLUSHED | +------------------------------------------+ INSERT INTO numbers_input_df_func @@ -341,12 +356,13 @@ VALUES Affected Rows: 2 -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); +------------------------------------------+ -| ADMIN flush_flow('test_numbers_df_func') | +| ADMIN FLUSH_FLOW('test_numbers_df_func') | +------------------------------------------+ -| 1 | +| FLOW_FLUSHED | +------------------------------------------+ SELECT time_window, sum_num FROM out_num_cnt; diff --git a/tests/cases/standalone/common/flow/flow_call_df_func.sql b/tests/cases/standalone/common/flow/flow_call_df_func.sql index 389a0975c6..a6e0030d05 100644 --- a/tests/cases/standalone/common/flow/flow_call_df_func.sql +++ b/tests/cases/standalone/common/flow/flow_call_df_func.sql @@ -11,7 +11,8 @@ SINK TO out_num_cnt_df_func AS SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); INSERT INTO numbers_input_df_func VALUES @@ -19,19 +20,22 @@ VALUES (22, "2021-07-01 00:00:00.600"); -- flush flow to make sure that table is created and data is inserted -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (-24,"2021-07-01 00:00:01.500"); -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion SELECT "SUM(abs(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; @@ -53,7 +57,8 @@ SINK TO out_num_cnt_df_func AS SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); INSERT INTO numbers_input_df_func VALUES @@ -61,18 +66,21 @@ VALUES (22, "2021-07-01 00:00:00.600"); -- flush flow to make sure that table is created and data is inserted -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (-24,"2021-07-01 00:00:01.500"); -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); SELECT "abs(SUM(numbers_input_df_func.number))", window_start, window_end FROM out_num_cnt_df_func; @@ -93,25 +101,29 @@ SINK TO out_num_cnt_df_func AS SELECT max(number) - min(number) as maxmin, date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00'::Timestamp) as time_window FROM numbers_input_df_func GROUP BY time_window; -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); INSERT INTO numbers_input_df_func VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); SELECT maxmin, time_window FROM out_num_cnt_df_func; -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (24,"2021-07-01 00:00:01.500"); -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); SELECT maxmin, time_window FROM out_num_cnt_df_func; @@ -133,25 +145,29 @@ SINK TO out_num_cnt AS SELECT date_trunc('second', ts) as time_window, sum(number) as sum_num FROM numbers_input_df_func GROUP BY date_trunc('second', ts); -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); INSERT INTO numbers_input_df_func VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); SELECT time_window, sum_num FROM out_num_cnt; -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (24,"2021-07-01 00:00:01.500"); -admin flush_flow('test_numbers_df_func'); +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_numbers_df_func'); SELECT time_window, sum_num FROM out_num_cnt; diff --git a/tests/cases/standalone/common/flow/flow_user_guide.result b/tests/cases/standalone/common/flow/flow_user_guide.result new file mode 100644 index 0000000000..e0d7575dff --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_user_guide.result @@ -0,0 +1,587 @@ +-- user guide example +CREATE TABLE `ngx_access_log` ( + `client` STRING NULL, + `ua_platform` STRING NULL, + `referer` STRING NULL, + `method` STRING NULL, + `endpoint` STRING NULL, + `trace_id` STRING NULL FULLTEXT, + `protocol` STRING NULL, + `status` SMALLINT UNSIGNED NULL, + `size` DOUBLE NULL, + `agent` STRING NULL, + `access_time` TIMESTAMP(3) NOT NULL, + TIME INDEX (`access_time`) +) WITH(append_mode = 'true'); + +Affected Rows: 0 + +CREATE TABLE `ngx_statistics` ( + `status` SMALLINT UNSIGNED NULL, + `total_logs` BIGINT NULL, + `min_size` DOUBLE NULL, + `max_size` DOUBLE NULL, + `avg_size` DOUBLE NULL, + `high_size_count` BIGINT NULL, + `time_window` TIMESTAMP time index, + `update_at` TIMESTAMP NULL, + PRIMARY KEY (`status`) +); + +Affected Rows: 0 + +CREATE FLOW ngx_aggregation SINK TO ngx_statistics COMMENT 'Aggregate statistics for ngx_access_log' AS +SELECT + status, + count(client) AS total_logs, + min(size) as min_size, + max(size) as max_size, + avg(size) as avg_size, + sum( + case + when `size` > 550 then 1 + else 0 + end + ) as high_size_count, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + status, + time_window; + +Affected Rows: 0 + +INSERT INTO + ngx_access_log +VALUES + ( + "android", + "Android", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 200, + 1000, + "agent", + "2021-07-01 00:00:01.000" + ), + ( + "ios", + "iOS", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 200, + 500, + "agent", + "2021-07-01 00:00:30.500" + ), + ( + "android", + "Android", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 200, + 600, + "agent", + "2021-07-01 00:01:01.000" + ), + ( + "ios", + "iOS", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 404, + 700, + "agent", + "2021-07-01 00:01:01.500" + ); + +Affected Rows: 4 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('ngx_aggregation'); + ++-------------------------------------+ +| ADMIN FLUSH_FLOW('ngx_aggregation') | ++-------------------------------------+ +| FLOW_FLUSHED | ++-------------------------------------+ + +SELECT + status, + total_logs, + min_size, + max_size, + avg_size, + high_size_count, + time_window +FROM + ngx_statistics; + ++--------+------------+----------+----------+----------+-----------------+---------------------+ +| status | total_logs | min_size | max_size | avg_size | high_size_count | time_window | ++--------+------------+----------+----------+----------+-----------------+---------------------+ +| 200 | 2 | 500.0 | 1000.0 | 750.0 | 1 | 2021-07-01T00:00:00 | +| 200 | 1 | 600.0 | 600.0 | 600.0 | 1 | 2021-07-01T00:01:00 | +| 404 | 1 | 700.0 | 700.0 | 700.0 | 1 | 2021-07-01T00:01:00 | ++--------+------------+----------+----------+----------+-----------------+---------------------+ + +INSERT INTO + ngx_access_log +VALUES + ( + "android", + "Android", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 200, + 500, + "agent", + "2021-07-01 00:01:01.000" + ), + ( + "ios", + "iOS", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 404, + 800, + "agent", + "2021-07-01 00:01:01.500" + ); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('ngx_aggregation'); + ++-------------------------------------+ +| ADMIN FLUSH_FLOW('ngx_aggregation') | ++-------------------------------------+ +| FLOW_FLUSHED | ++-------------------------------------+ + +SELECT + status, + total_logs, + min_size, + max_size, + avg_size, + high_size_count, + time_window +FROM + ngx_statistics; + ++--------+------------+----------+----------+----------+-----------------+---------------------+ +| status | total_logs | min_size | max_size | avg_size | high_size_count | time_window | ++--------+------------+----------+----------+----------+-----------------+---------------------+ +| 200 | 2 | 500.0 | 1000.0 | 750.0 | 1 | 2021-07-01T00:00:00 | +| 200 | 2 | 500.0 | 600.0 | 550.0 | 1 | 2021-07-01T00:01:00 | +| 404 | 2 | 700.0 | 800.0 | 750.0 | 2 | 2021-07-01T00:01:00 | ++--------+------------+----------+----------+----------+-----------------+---------------------+ + +DROP FLOW ngx_aggregation; + +Affected Rows: 0 + +DROP TABLE ngx_statistics; + +Affected Rows: 0 + +DROP TABLE ngx_access_log; + +Affected Rows: 0 + +/* Usecase example */ +/* Real-time analytics example */ +/* input table */ +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + update_at TIMESTAMP, + __ts_placeholder TIMESTAMP TIME INDEX, + PRIMARY KEY(country) +); + +Affected Rows: 0 + +/* create flow task to calculate the distinct country */ +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + DISTINCT country, +FROM + ngx_access_log; + +Affected Rows: 0 + +/* insert some data */ +INSERT INTO + ngx_access_log +VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); + +Affected Rows: 10 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + country +FROM + ngx_country; + ++---------+ +| country | ++---------+ +| CN | +| JP | +| KR | +| UK | +| US | ++---------+ + +DROP FLOW calc_ngx_country; + +Affected Rows: 0 + +DROP TABLE ngx_access_log; + +Affected Rows: 0 + +DROP TABLE ngx_country; + +Affected Rows: 0 + +/* Real-time analytics example: Time Window */ +/* input table */ +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +/* input table create same as above */ +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, + PRIMARY KEY(country) +); + +Affected Rows: 0 + +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + DISTINCT country, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + country, + time_window; + +Affected Rows: 0 + +/* insert data using the same data as above */ +/* insert some data */ +INSERT INTO + ngx_access_log +VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); + +Affected Rows: 10 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_country') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + country, + time_window +FROM + ngx_country; + ++---------+---------------------+ +| country | time_window | ++---------+---------------------+ +| CN | 2022-01-01T00:00:00 | +| JP | 2022-01-01T00:00:00 | +| KR | 2022-01-01T00:00:00 | +| UK | 2022-01-01T00:00:00 | +| US | 2022-01-01T00:00:00 | ++---------+---------------------+ + +DROP FLOW calc_ngx_country; + +Affected Rows: 0 + +DROP TABLE ngx_access_log; + +Affected Rows: 0 + +DROP TABLE ngx_country; + +Affected Rows: 0 + +/* create input table */ +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +/* Real-time monitoring example */ +/* create sink table */ +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + update_at TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +); + +Affected Rows: 0 + +CREATE FLOW temp_monitoring SINK TO temp_alerts AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM + temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING + max_temp > 100; + +Affected Rows: 0 + +INSERT INTO + temp_sensor_data +VALUES + (1, "room1", 98.5, "2022-01-01 00:00:00"), + (2, "room2", 99.5, "2022-01-01 00:00:01"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('temp_monitoring'); + ++-------------------------------------+ +| ADMIN FLUSH_FLOW('temp_monitoring') | ++-------------------------------------+ +| FLOW_FLUSHED | ++-------------------------------------+ + +SELECT + sensor_id, + loc, + max_temp +FROM + temp_alerts; + +++ +++ + +INSERT INTO + temp_sensor_data +VALUES + (1, "room1", 101.5, "2022-01-01 00:00:02"), + (2, "room2", 102.5, "2022-01-01 00:00:03"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('temp_monitoring'); + ++-------------------------------------+ +| ADMIN FLUSH_FLOW('temp_monitoring') | ++-------------------------------------+ +| FLOW_FLUSHED | ++-------------------------------------+ + +/* wait at least one second for flow to update results to sink table */ +SELECT + sensor_id, + loc, + max_temp +FROM + temp_alerts; + ++-----------+-------+----------+ +| sensor_id | loc | max_temp | ++-----------+-------+----------+ +| 1 | room1 | 101.5 | +| 2 | room2 | 102.5 | ++-----------+-------+----------+ + +DROP FLOW temp_monitoring; + +Affected Rows: 0 + +DROP TABLE temp_sensor_data; + +Affected Rows: 0 + +DROP TABLE temp_alerts; + +Affected Rows: 0 + +/* Real-time dashboard */ +/* create input table */ +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +/* create sink table */ +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, + /* auto generated column to store the last update time */ + PRIMARY KEY(stat, bucket_size) +); + +Affected Rows: 0 + +/* create flow task to calculate the distribution of packet sizes for each status code */ +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS +SELECT + stat, + trunc(size, -1) :: INT as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + stat, + time_window, + bucket_size; + +Affected Rows: 0 + +INSERT INTO + ngx_access_log +VALUES + ("cli1", 200, 100, "2022-01-01 00:00:00"), + ("cli2", 200, 104, "2022-01-01 00:00:01"), + ("cli3", 200, 120, "2022-01-01 00:00:02"), + ("cli4", 200, 124, "2022-01-01 00:00:03"), + ("cli5", 200, 140, "2022-01-01 00:00:04"), + ("cli6", 404, 144, "2022-01-01 00:00:05"), + ("cli7", 404, 160, "2022-01-01 00:00:06"), + ("cli8", 404, 164, "2022-01-01 00:00:07"), + ("cli9", 404, 180, "2022-01-01 00:00:08"), + ("cli10", 404, 184, "2022-01-01 00:00:09"); + +Affected Rows: 10 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_distribution'); + ++-------------------------------------------+ +| ADMIN FLUSH_FLOW('calc_ngx_distribution') | ++-------------------------------------------+ +| FLOW_FLUSHED | ++-------------------------------------------+ + +SELECT + stat, + bucket_size, + total_logs, + time_window +FROM + ngx_distribution; + ++------+-------------+------------+---------------------+ +| stat | bucket_size | total_logs | time_window | ++------+-------------+------------+---------------------+ +| 200 | 100 | 2 | 2022-01-01T00:00:00 | +| 200 | 120 | 2 | 2022-01-01T00:00:00 | +| 200 | 140 | 1 | 2022-01-01T00:00:00 | +| 404 | 140 | 1 | 2022-01-01T00:00:00 | +| 404 | 160 | 2 | 2022-01-01T00:00:00 | +| 404 | 180 | 2 | 2022-01-01T00:00:00 | ++------+-------------+------------+---------------------+ + +DROP FLOW calc_ngx_distribution; + +Affected Rows: 0 + +DROP TABLE ngx_distribution; + +Affected Rows: 0 + +DROP TABLE ngx_access_log; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_user_guide.sql b/tests/cases/standalone/common/flow/flow_user_guide.sql new file mode 100644 index 0000000000..285d2198dc --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_user_guide.sql @@ -0,0 +1,411 @@ +-- user guide example +CREATE TABLE `ngx_access_log` ( + `client` STRING NULL, + `ua_platform` STRING NULL, + `referer` STRING NULL, + `method` STRING NULL, + `endpoint` STRING NULL, + `trace_id` STRING NULL FULLTEXT, + `protocol` STRING NULL, + `status` SMALLINT UNSIGNED NULL, + `size` DOUBLE NULL, + `agent` STRING NULL, + `access_time` TIMESTAMP(3) NOT NULL, + TIME INDEX (`access_time`) +) WITH(append_mode = 'true'); + +CREATE TABLE `ngx_statistics` ( + `status` SMALLINT UNSIGNED NULL, + `total_logs` BIGINT NULL, + `min_size` DOUBLE NULL, + `max_size` DOUBLE NULL, + `avg_size` DOUBLE NULL, + `high_size_count` BIGINT NULL, + `time_window` TIMESTAMP time index, + `update_at` TIMESTAMP NULL, + PRIMARY KEY (`status`) +); + +CREATE FLOW ngx_aggregation SINK TO ngx_statistics COMMENT 'Aggregate statistics for ngx_access_log' AS +SELECT + status, + count(client) AS total_logs, + min(size) as min_size, + max(size) as max_size, + avg(size) as avg_size, + sum( + case + when `size` > 550 then 1 + else 0 + end + ) as high_size_count, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + status, + time_window; + +INSERT INTO + ngx_access_log +VALUES + ( + "android", + "Android", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 200, + 1000, + "agent", + "2021-07-01 00:00:01.000" + ), + ( + "ios", + "iOS", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 200, + 500, + "agent", + "2021-07-01 00:00:30.500" + ), + ( + "android", + "Android", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 200, + 600, + "agent", + "2021-07-01 00:01:01.000" + ), + ( + "ios", + "iOS", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 404, + 700, + "agent", + "2021-07-01 00:01:01.500" + ); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('ngx_aggregation'); + +SELECT + status, + total_logs, + min_size, + max_size, + avg_size, + high_size_count, + time_window +FROM + ngx_statistics; + +INSERT INTO + ngx_access_log +VALUES + ( + "android", + "Android", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 200, + 500, + "agent", + "2021-07-01 00:01:01.000" + ), + ( + "ios", + "iOS", + "referer", + "GET", + "/api/v1", + "trace_id", + "HTTP", + 404, + 800, + "agent", + "2021-07-01 00:01:01.500" + ); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('ngx_aggregation'); + +SELECT + status, + total_logs, + min_size, + max_size, + avg_size, + high_size_count, + time_window +FROM + ngx_statistics; + +DROP FLOW ngx_aggregation; + +DROP TABLE ngx_statistics; + +DROP TABLE ngx_access_log; + +/* Usecase example */ +/* Real-time analytics example */ +/* input table */ +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + update_at TIMESTAMP, + __ts_placeholder TIMESTAMP TIME INDEX, + PRIMARY KEY(country) +); + +/* create flow task to calculate the distinct country */ +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + DISTINCT country, +FROM + ngx_access_log; + +/* insert some data */ +INSERT INTO + ngx_access_log +VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + country +FROM + ngx_country; + +DROP FLOW calc_ngx_country; + +DROP TABLE ngx_access_log; + +DROP TABLE ngx_country; + +/* Real-time analytics example: Time Window */ +/* input table */ +CREATE TABLE ngx_access_log ( + client STRING, + country STRING, + access_time TIMESTAMP TIME INDEX +); + +/* input table create same as above */ +/* sink table */ +CREATE TABLE ngx_country ( + country STRING, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, + PRIMARY KEY(country) +); + +CREATE FLOW calc_ngx_country SINK TO ngx_country AS +SELECT + DISTINCT country, + date_bin(INTERVAL '1 hour', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + country, + time_window; + +/* insert data using the same data as above */ +/* insert some data */ +INSERT INTO + ngx_access_log +VALUES + ("client1", "US", "2022-01-01 00:00:00"), + ("client2", "US", "2022-01-01 00:00:01"), + ("client3", "UK", "2022-01-01 00:00:02"), + ("client4", "UK", "2022-01-01 00:00:03"), + ("client5", "CN", "2022-01-01 00:00:04"), + ("client6", "CN", "2022-01-01 00:00:05"), + ("client7", "JP", "2022-01-01 00:00:06"), + ("client8", "JP", "2022-01-01 00:00:07"), + ("client9", "KR", "2022-01-01 00:00:08"), + ("client10", "KR", "2022-01-01 00:00:09"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_country'); + +SELECT + country, + time_window +FROM + ngx_country; + +DROP FLOW calc_ngx_country; + +DROP TABLE ngx_access_log; + +DROP TABLE ngx_country; + +/* create input table */ +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX +); + +/* Real-time monitoring example */ +/* create sink table */ +CREATE TABLE temp_alerts ( + sensor_id INT, + loc STRING, + max_temp DOUBLE, + update_at TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +); + +CREATE FLOW temp_monitoring SINK TO temp_alerts AS +SELECT + sensor_id, + loc, + max(temperature) as max_temp, +FROM + temp_sensor_data +GROUP BY + sensor_id, + loc +HAVING + max_temp > 100; + +INSERT INTO + temp_sensor_data +VALUES + (1, "room1", 98.5, "2022-01-01 00:00:00"), + (2, "room2", 99.5, "2022-01-01 00:00:01"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('temp_monitoring'); + +SELECT + sensor_id, + loc, + max_temp +FROM + temp_alerts; + +INSERT INTO + temp_sensor_data +VALUES + (1, "room1", 101.5, "2022-01-01 00:00:02"), + (2, "room2", 102.5, "2022-01-01 00:00:03"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('temp_monitoring'); + +/* wait at least one second for flow to update results to sink table */ +SELECT + sensor_id, + loc, + max_temp +FROM + temp_alerts; + +DROP FLOW temp_monitoring; +DROP TABLE temp_sensor_data; +DROP TABLE temp_alerts; + +/* Real-time dashboard */ +/* create input table */ +CREATE TABLE ngx_access_log ( + client STRING, + stat INT, + size INT, + access_time TIMESTAMP TIME INDEX +); + +/* create sink table */ +CREATE TABLE ngx_distribution ( + stat INT, + bucket_size INT, + total_logs BIGINT, + time_window TIMESTAMP TIME INDEX, + update_at TIMESTAMP, + /* auto generated column to store the last update time */ + PRIMARY KEY(stat, bucket_size) +); + +/* create flow task to calculate the distribution of packet sizes for each status code */ +CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS +SELECT + stat, + trunc(size, -1) :: INT as bucket_size, + count(client) AS total_logs, + date_bin(INTERVAL '1 minutes', access_time) as time_window, +FROM + ngx_access_log +GROUP BY + stat, + time_window, + bucket_size; + +INSERT INTO + ngx_access_log +VALUES + ("cli1", 200, 100, "2022-01-01 00:00:00"), + ("cli2", 200, 104, "2022-01-01 00:00:01"), + ("cli3", 200, 120, "2022-01-01 00:00:02"), + ("cli4", 200, 124, "2022-01-01 00:00:03"), + ("cli5", 200, 140, "2022-01-01 00:00:04"), + ("cli6", 404, 144, "2022-01-01 00:00:05"), + ("cli7", 404, 160, "2022-01-01 00:00:06"), + ("cli8", 404, 164, "2022-01-01 00:00:07"), + ("cli9", 404, 180, "2022-01-01 00:00:08"), + ("cli10", 404, 184, "2022-01-01 00:00:09"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_ngx_distribution'); + +SELECT + stat, + bucket_size, + total_logs, + time_window +FROM + ngx_distribution; + +DROP FLOW calc_ngx_distribution; + +DROP TABLE ngx_distribution; + +DROP TABLE ngx_access_log; \ No newline at end of file