CREATE TABLE numbers_input_basic ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); Affected Rows: 0 CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS SELECT sum(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window FROM numbers_input_basic GROUP BY time_window; Affected Rows: 0 SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ | Table | Create Table | +-------------------+--------------------------------------------------+ | out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | | | "sum(numbers_input_basic.number)" BIGINT NULL, | | | "time_window" TIMESTAMP(9) NOT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | TIME INDEX ("time_window") | | | ) | | | | | | ENGINE=mito | | | | +-------------------+--------------------------------------------------+ -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ | ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ | Table | Create Table | +-------------------+--------------------------------------------------+ | out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | | | "sum(numbers_input_basic.number)" BIGINT NULL, | | | "time_window" TIMESTAMP(9) NOT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | TIME INDEX ("time_window") | | | ) | | | | | | ENGINE=mito | | | | +-------------------+--------------------------------------------------+ -- SQLNESS ARG restart=true SELECT 1; +----------+ | Int64(1) | +----------+ | 1 | +----------+ -- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ | ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ SELECT "sum(numbers_input_basic.number)", time_window FROM out_num_cnt_basic; +---------------------------------+---------------------+ | sum(numbers_input_basic.number) | time_window | +---------------------------------+---------------------+ | 42 | 2021-07-01T00:00:00 | +---------------------------------+---------------------+ -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ | ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ INSERT INTO numbers_input_basic VALUES (23, "2021-07-01 00:00:01.000"), (24, "2021-07-01 00:00:01.500"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ | ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion SELECT "sum(numbers_input_basic.number)", time_window FROM out_num_cnt_basic; +---------------------------------+---------------------+ | sum(numbers_input_basic.number) | time_window | +---------------------------------+---------------------+ | 42 | 2021-07-01T00:00:00 | | 47 | 2021-07-01T00:00:01 | +---------------------------------+---------------------+ DROP FLOW test_numbers_basic; Affected Rows: 0 DROP TABLE numbers_input_basic; Affected Rows: 0 DROP TABLE out_num_cnt_basic; Affected Rows: 0 -- test count(*) rewrite CREATE TABLE input_basic ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); Affected Rows: 0 CREATE FLOW test_wildcard_basic SiNk TO out_basic AS SELECT COUNT(*) as wildcard FROM input_basic; Affected Rows: 0 SHOW CREATE TABLE out_basic; +-----------+---------------------------------------------+ | Table | Create Table | +-----------+---------------------------------------------+ | out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | | | "wildcard" BIGINT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("__ts_placeholder") | | | ) | | | | | | ENGINE=mito | | | | +-----------+---------------------------------------------+ DROP FLOW test_wildcard_basic; Affected Rows: 0 CREATE FLOW test_wildcard_basic sink TO out_basic AS SELECT COUNT(*) as wildcard FROM input_basic; Affected Rows: 0 SHOW CREATE TABLE out_basic; +-----------+---------------------------------------------+ | Table | Create Table | +-----------+---------------------------------------------+ | out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | | | "wildcard" BIGINT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("__ts_placeholder") | | | ) | | | | | | ENGINE=mito | | | | +-----------+---------------------------------------------+ -- SQLNESS ARG restart=true SELECT 1; +----------+ | Int64(1) | +----------+ | 1 | +----------+ -- SQLNESS SLEEP 3s INSERT INTO input_basic VALUES (23, "2021-07-01 00:00:01.000"), (24, "2021-07-01 00:00:01.500"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_wildcard_basic'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('test_wildcard_basic') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ SHOW CREATE TABLE out_basic; +-----------+---------------------------------------------+ | Table | Create Table | +-----------+---------------------------------------------+ | out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | | | "wildcard" BIGINT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("__ts_placeholder") | | | ) | | | | | | ENGINE=mito | | | | +-----------+---------------------------------------------+ SELECT wildcard FROM out_basic; +----------+ | wildcard | +----------+ | 2 | +----------+ DROP FLOW test_wildcard_basic; Affected Rows: 0 DROP TABLE out_basic; Affected Rows: 0 DROP TABLE input_basic; Affected Rows: 0 -- test distinct CREATE TABLE distinct_basic ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); Affected Rows: 0 CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS SELECT DISTINCT number as dis FROM distinct_basic; Affected Rows: 0 SHOW CREATE TABLE out_distinct_basic; +--------------------+---------------------------------------------------+ | Table | Create Table | +--------------------+---------------------------------------------------+ | out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | | | "dis" INT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("__ts_placeholder"), | | | PRIMARY KEY ("dis") | | | ) | | | | | | ENGINE=mito | | | | +--------------------+---------------------------------------------------+ -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('test_distinct_basic') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ -- SQLNESS ARG restart=true SELECT 1; +----------+ | Int64(1) | +----------+ | 1 | +----------+ -- SQLNESS SLEEP 3s INSERT INTO distinct_basic VALUES (20, "2021-07-01 00:00:00.200"), (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); Affected Rows: 3 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('test_distinct_basic') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ SHOW CREATE TABLE out_distinct_basic; +--------------------+---------------------------------------------------+ | Table | Create Table | +--------------------+---------------------------------------------------+ | out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | | | "dis" INT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("__ts_placeholder"), | | | PRIMARY KEY ("dis") | | | ) | | | | | | ENGINE=mito | | | | +--------------------+---------------------------------------------------+ SELECT dis FROM out_distinct_basic; +-----+ | dis | +-----+ | 20 | | 22 | +-----+ -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('test_distinct_basic') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ INSERT INTO distinct_basic VALUES (23, "2021-07-01 00:00:01.000"), (24, "2021-07-01 00:00:01.500"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('test_distinct_basic') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion SELECT dis FROM out_distinct_basic; +-----+ | dis | +-----+ | 20 | | 22 | | 23 | | 24 | +-----+ DROP FLOW test_distinct_basic; Affected Rows: 0 DROP TABLE distinct_basic; Affected Rows: 0 DROP TABLE out_distinct_basic; Affected Rows: 0 CREATE TABLE bytes_log ( byte INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time TIME INDEX(ts) ); Affected Rows: 0 -- TODO(discord9): remove this after auto infer table's time index is impl CREATE TABLE approx_rate ( rate DOUBLE, time_window TIMESTAMP, update_at TIMESTAMP, TIME INDEX(time_window) ); Affected Rows: 0 CREATE FLOW find_approx_rate SINK TO approx_rate AS SELECT (max(byte) - min(byte)) / 30.0 as rate, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window; Affected Rows: 0 SHOW CREATE TABLE approx_rate; +-------------+--------------------------------------------+ | Table | Create Table | +-------------+--------------------------------------------+ | approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( | | | "rate" DOUBLE NULL, | | | "time_window" TIMESTAMP(3) NOT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | TIME INDEX ("time_window") | | | ) | | | | | | ENGINE=mito | | | | +-------------+--------------------------------------------+ INSERT INTO bytes_log VALUES (NULL, '2023-01-01 00:00:01'), (300, '2023-01-01 00:00:29'); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('find_approx_rate'); +--------------------------------------+ | ADMIN FLUSH_FLOW('find_approx_rate') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SELECT rate, time_window FROM approx_rate ORDER BY time_window ASC; +------+---------------------+ | rate | time_window | +------+---------------------+ | 0.0 | 2023-01-01T00:00:00 | +------+---------------------+ INSERT INTO bytes_log VALUES (NULL, '2022-01-01 00:00:01'), (NULL, '2022-01-01 00:00:29'); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('find_approx_rate'); +--------------------------------------+ | ADMIN FLUSH_FLOW('find_approx_rate') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SELECT rate, time_window FROM approx_rate ORDER BY time_window ASC; +------+---------------------+ | rate | time_window | +------+---------------------+ | | 2022-01-01T00:00:00 | | 0.0 | 2023-01-01T00:00:00 | +------+---------------------+ INSERT INTO bytes_log VALUES (101, '2025-01-01 00:00:01'), (300, '2025-01-01 00:00:29'); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('find_approx_rate'); +--------------------------------------+ | ADMIN FLUSH_FLOW('find_approx_rate') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SELECT rate, time_window FROM approx_rate ORDER BY time_window ASC; +-------------------+---------------------+ | rate | time_window | +-------------------+---------------------+ | | 2022-01-01T00:00:00 | | 0.0 | 2023-01-01T00:00:00 | | 6.633333333333334 | 2025-01-01T00:00:00 | +-------------------+---------------------+ INSERT INTO bytes_log VALUES (450, '2025-01-01 00:00:32'), (500, '2025-01-01 00:00:37'); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('find_approx_rate'); +--------------------------------------+ | ADMIN FLUSH_FLOW('find_approx_rate') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SELECT rate, time_window FROM approx_rate ORDER BY time_window ASC; +--------------------+---------------------+ | rate | time_window | +--------------------+---------------------+ | | 2022-01-01T00:00:00 | | 0.0 | 2023-01-01T00:00:00 | | 6.633333333333334 | 2025-01-01T00:00:00 | | 1.6666666666666667 | 2025-01-01T00:00:30 | +--------------------+---------------------+ DROP TABLE bytes_log; Affected Rows: 0 DROP FLOW find_approx_rate; Affected Rows: 0 DROP TABLE approx_rate; Affected Rows: 0 -- input table CREATE TABLE ngx_access_log ( client STRING, country STRING, access_time TIMESTAMP TIME INDEX ); Affected Rows: 0 -- create flow task to calculate the distinct country CREATE FLOW calc_ngx_country SINK TO ngx_country AS SELECT DISTINCT country, FROM ngx_access_log; Affected Rows: 0 SHOW CREATE TABLE ngx_country; +-------------+---------------------------------------------+ | Table | Create Table | +-------------+---------------------------------------------+ | ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | | | "country" STRING NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("__ts_placeholder"), | | | PRIMARY KEY ("country") | | | ) | | | | | | ENGINE=mito | | | | +-------------+---------------------------------------------+ INSERT INTO ngx_access_log VALUES ("cli1", "b", 0); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SHOW CREATE TABLE ngx_country; +-------------+---------------------------------------------+ | Table | Create Table | +-------------+---------------------------------------------+ | ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | | | "country" STRING NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("__ts_placeholder"), | | | PRIMARY KEY ("country") | | | ) | | | | | | ENGINE=mito | | | | +-------------+---------------------------------------------+ SELECT country FROM ngx_country; +---------+ | country | +---------+ | b | +---------+ -- making sure distinct is working INSERT INTO ngx_access_log VALUES ("cli1", "b", 1); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SELECT country FROM ngx_country; +---------+ | country | +---------+ | b | +---------+ INSERT INTO ngx_access_log VALUES ("cli1", "c", 2); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SELECT country FROM ngx_country; +---------+ | country | +---------+ | b | | c | +---------+ DROP FLOW calc_ngx_country; Affected Rows: 0 DROP TABLE ngx_access_log; Affected Rows: 0 DROP TABLE ngx_country; Affected Rows: 0 CREATE TABLE ngx_access_log ( client STRING, country STRING, access_time TIMESTAMP TIME INDEX ); Affected Rows: 0 CREATE FLOW calc_ngx_country SINK TO ngx_country AS SELECT DISTINCT country, -- this distinct is not necessary, but it's a good test to see if it works date_bin(INTERVAL '1 hour', access_time) as time_window, FROM ngx_access_log GROUP BY country, time_window; Affected Rows: 0 SHOW CREATE TABLE ngx_country; +-------------+--------------------------------------------+ | Table | Create Table | +-------------+--------------------------------------------+ | ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | | | "country" STRING NULL, | | | "time_window" TIMESTAMP(3) NOT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | TIME INDEX ("time_window"), | | | PRIMARY KEY ("country") | | | ) | | | | | | ENGINE=mito | | | | +-------------+--------------------------------------------+ INSERT INTO ngx_access_log VALUES ("cli1", "b", 0); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SHOW CREATE TABLE ngx_country; +-------------+--------------------------------------------+ | Table | Create Table | +-------------+--------------------------------------------+ | ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | | | "country" STRING NULL, | | | "time_window" TIMESTAMP(3) NOT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | TIME INDEX ("time_window"), | | | PRIMARY KEY ("country") | | | ) | | | | | | ENGINE=mito | | | | +-------------+--------------------------------------------+ SELECT country, time_window FROM ngx_country; +---------+---------------------+ | country | time_window | +---------+---------------------+ | b | 1970-01-01T00:00:00 | +---------+---------------------+ -- making sure distinct is working INSERT INTO ngx_access_log VALUES ("cli1", "b", 1); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SELECT country, time_window FROM ngx_country; +---------+---------------------+ | country | time_window | +---------+---------------------+ | b | 1970-01-01T00:00:00 | +---------+---------------------+ INSERT INTO ngx_access_log VALUES ("cli1", "c", 2); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | FLOW_FLUSHED | +--------------------------------------+ SELECT country, time_window FROM ngx_country; +---------+---------------------+ | country | time_window | +---------+---------------------+ | b | 1970-01-01T00:00:00 | | c | 1970-01-01T00:00:00 | +---------+---------------------+ DROP FLOW calc_ngx_country; Affected Rows: 0 DROP TABLE ngx_access_log; Affected Rows: 0 DROP TABLE ngx_country; Affected Rows: 0 CREATE TABLE temp_sensor_data ( sensor_id INT, loc STRING, temperature DOUBLE, ts TIMESTAMP TIME INDEX )WITH( append_mode = 'true' ); Affected Rows: 0 CREATE TABLE temp_alerts ( sensor_id INT, loc STRING, max_temp DOUBLE, event_ts TIMESTAMP TIME INDEX, update_at TIMESTAMP ); Affected Rows: 0 CREATE FLOW temp_monitoring SINK TO temp_alerts AS SELECT sensor_id, loc, max(temperature) as max_temp, max(ts) as event_ts FROM temp_sensor_data GROUP BY sensor_id, loc HAVING max_temp > 100; Affected Rows: 0 SHOW CREATE TABLE temp_alerts; +-------------+--------------------------------------------+ | Table | Create Table | +-------------+--------------------------------------------+ | temp_alerts | CREATE TABLE IF NOT EXISTS "temp_alerts" ( | | | "sensor_id" INT NULL, | | | "loc" STRING NULL, | | | "max_temp" DOUBLE NULL, | | | "event_ts" TIMESTAMP(3) NOT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | TIME INDEX ("event_ts") | | | ) | | | | | | ENGINE=mito | | | | +-------------+--------------------------------------------+ INSERT INTO temp_sensor_data VALUES (1, "room1", 50, 0); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('temp_monitoring'); +-------------------------------------+ | ADMIN FLUSH_FLOW('temp_monitoring') | +-------------------------------------+ | FLOW_FLUSHED | +-------------------------------------+ -- This table should not exist yet SHOW TABLES LIKE 'temp_alerts'; +------------------+ | Tables_in_public | +------------------+ | temp_alerts | +------------------+ INSERT INTO temp_sensor_data VALUES (1, "room1", 150, 1); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('temp_monitoring'); +-------------------------------------+ | ADMIN FLUSH_FLOW('temp_monitoring') | +-------------------------------------+ | FLOW_FLUSHED | +-------------------------------------+ SHOW TABLES LIKE 'temp_alerts'; +------------------+ | Tables_in_public | +------------------+ | temp_alerts | +------------------+ SELECT sensor_id, loc, max_temp, event_ts FROM temp_alerts; +-----------+-------+----------+-------------------------+ | sensor_id | loc | max_temp | event_ts | +-----------+-------+----------+-------------------------+ | 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 | +-----------+-------+----------+-------------------------+ INSERT INTO temp_sensor_data VALUES (2, "room1", 0, 2); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('temp_monitoring'); +-------------------------------------+ | ADMIN FLUSH_FLOW('temp_monitoring') | +-------------------------------------+ | FLOW_FLUSHED | +-------------------------------------+ SELECT sensor_id, loc, max_temp, event_ts FROM temp_alerts; +-----------+-------+----------+-------------------------+ | sensor_id | loc | max_temp | event_ts | +-----------+-------+----------+-------------------------+ | 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 | +-----------+-------+----------+-------------------------+ DROP FLOW temp_monitoring; Affected Rows: 0 DROP TABLE temp_sensor_data; Affected Rows: 0 DROP TABLE temp_alerts; Affected Rows: 0 CREATE TABLE ngx_access_log ( client STRING, stat INT, size INT, access_time TIMESTAMP TIME INDEX )WITH( append_mode = 'true' ); Affected Rows: 0 CREATE TABLE ngx_distribution ( stat INT, bucket_size INT, total_logs BIGINT, time_window TIMESTAMP TIME INDEX, -- auto generated column by flow engine update_at TIMESTAMP, PRIMARY KEY(stat, bucket_size) ); Affected Rows: 0 CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS SELECT stat, trunc(size, -1) :: INT as bucket_size, count(client) AS total_logs, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM ngx_access_log GROUP BY stat, time_window, bucket_size; Affected Rows: 0 SHOW CREATE TABLE ngx_distribution; +------------------+-------------------------------------------------+ | Table | Create Table | +------------------+-------------------------------------------------+ | ngx_distribution | CREATE TABLE IF NOT EXISTS "ngx_distribution" ( | | | "stat" INT NULL, | | | "bucket_size" INT NULL, | | | "total_logs" BIGINT NULL, | | | "time_window" TIMESTAMP(3) NOT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | TIME INDEX ("time_window"), | | | PRIMARY KEY ("stat", "bucket_size") | | | ) | | | | | | ENGINE=mito | | | | +------------------+-------------------------------------------------+ INSERT INTO ngx_access_log VALUES ("cli1", 200, 100, 0); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_distribution'); +-------------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_distribution') | +-------------------------------------------+ | FLOW_FLUSHED | +-------------------------------------------+ SELECT stat, bucket_size, total_logs, time_window FROM ngx_distribution; +------+-------------+------------+---------------------+ | stat | bucket_size | total_logs | time_window | +------+-------------+------------+---------------------+ | 200 | 100 | 1 | 1970-01-01T00:00:00 | +------+-------------+------------+---------------------+ INSERT INTO ngx_access_log VALUES ("cli1", 200, 200, 1), ("cli1", 200, 205, 1), ("cli1", 200, 209, 1), ("cli1", 200, 210, 1), ("cli2", 200, 300, 1); Affected Rows: 5 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_ngx_distribution'); +-------------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_distribution') | +-------------------------------------------+ | FLOW_FLUSHED | +-------------------------------------------+ SELECT stat, bucket_size, total_logs, time_window FROM ngx_distribution; +------+-------------+------------+---------------------+ | stat | bucket_size | total_logs | time_window | +------+-------------+------------+---------------------+ | 200 | 100 | 1 | 1970-01-01T00:00:00 | | 200 | 200 | 1 | 1970-01-01T00:00:00 | | 200 | 210 | 3 | 1970-01-01T00:00:00 | | 200 | 300 | 1 | 1970-01-01T00:00:00 | +------+-------------+------------+---------------------+ DROP FLOW calc_ngx_distribution; Affected Rows: 0 DROP TABLE ngx_access_log; Affected Rows: 0 DROP TABLE ngx_distribution; Affected Rows: 0 CREATE TABLE requests ( service_name STRING, service_ip STRING, val INT, ts TIMESTAMP TIME INDEX )WITH( append_mode = 'true' ); Affected Rows: 0 CREATE TABLE requests_without_ip ( service_name STRING, val INT, ts TIMESTAMP TIME INDEX, PRIMARY KEY(service_name) ); Affected Rows: 0 CREATE FLOW requests_long_term SINK TO requests_without_ip AS SELECT service_name, val, ts FROM requests; Affected Rows: 0 SHOW CREATE TABLE requests_without_ip; +---------------------+----------------------------------------------------+ | Table | Create Table | +---------------------+----------------------------------------------------+ | requests_without_ip | CREATE TABLE IF NOT EXISTS "requests_without_ip" ( | | | "service_name" STRING NULL, | | | "val" INT NULL, | | | "ts" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("ts"), | | | PRIMARY KEY ("service_name") | | | ) | | | | | | ENGINE=mito | | | | +---------------------+----------------------------------------------------+ INSERT INTO requests VALUES (NULL, "10.0.0.1", 100, "2024-10-18 19:00:00"), ("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"), (NULL, "10.0.0.1", 200, "2024-10-18 19:00:30"), ("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"), (NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"), (NULL, "10.0.0.2", 100, "2024-10-18 19:01:01"), ("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"), ("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31"); Affected Rows: 8 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('requests_long_term'); +----------------------------------------+ | ADMIN FLUSH_FLOW('requests_long_term') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ SELECT * FROM requests_without_ip ORDER BY ts ASC; +--------------+-----+---------------------+ | service_name | val | ts | +--------------+-----+---------------------+ | | 100 | 2024-10-18T19:00:00 | | svc1 | 100 | 2024-10-18T19:00:00 | | | 200 | 2024-10-18T19:00:30 | | svc1 | 200 | 2024-10-18T19:00:30 | | | 300 | 2024-10-18T19:01:00 | | | 100 | 2024-10-18T19:01:01 | | svc1 | 400 | 2024-10-18T19:01:30 | | svc1 | 200 | 2024-10-18T19:01:31 | +--------------+-----+---------------------+ -- Test if FLOWS table works, but don't care about the result since it vary from runs SELECT count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows, FROM INFORMATION_SCHEMA.FLOWS; +--------------+ | active_flows | +--------------+ | 1 | +--------------+ INSERT INTO requests VALUES (null, "10.0.0.1", 100, "2024-10-19 19:00:00"), (null, "10.0.0.2", 100, "2024-10-19 19:00:00"), (null, "10.0.0.1", 200, "2024-10-19 19:00:30"), (null, "10.0.0.2", 200, "2024-10-19 19:00:30"), (null, "10.0.0.1", 300, "2024-10-19 19:01:00"), (null, "10.0.0.2", 100, "2024-10-19 19:01:01"), (null, "10.0.0.1", 400, "2024-10-19 19:01:30"), (null, "10.0.0.2", 200, "2024-10-19 19:01:31"); Affected Rows: 8 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('requests_long_term'); +----------------------------------------+ | ADMIN FLUSH_FLOW('requests_long_term') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ SELECT * FROM requests_without_ip ORDER BY ts ASC; +--------------+-----+---------------------+ | service_name | val | ts | +--------------+-----+---------------------+ | | 100 | 2024-10-18T19:00:00 | | svc1 | 100 | 2024-10-18T19:00:00 | | | 200 | 2024-10-18T19:00:30 | | svc1 | 200 | 2024-10-18T19:00:30 | | | 300 | 2024-10-18T19:01:00 | | | 100 | 2024-10-18T19:01:01 | | svc1 | 400 | 2024-10-18T19:01:30 | | svc1 | 200 | 2024-10-18T19:01:31 | | | 100 | 2024-10-19T19:00:00 | | | 200 | 2024-10-19T19:00:30 | | | 300 | 2024-10-19T19:01:00 | | | 100 | 2024-10-19T19:01:01 | | | 400 | 2024-10-19T19:01:30 | | | 200 | 2024-10-19T19:01:31 | +--------------+-----+---------------------+ INSERT INTO requests VALUES ("svc2", "10.0.0.1", 100, "2024-10-18 19:00:00"), ("svc2", "10.0.0.2", 100, "2024-10-18 19:00:00"), ("svc2", "10.0.0.1", 200, "2024-10-18 19:00:30"), ("svc2", "10.0.0.2", 200, "2024-10-18 19:00:30"), ("svc2", "10.0.0.1", 300, "2024-10-18 19:01:00"), ("svc2", "10.0.0.2", 100, "2024-10-18 19:01:01"), ("svc2", "10.0.0.1", 400, "2024-10-18 19:01:30"), ("svc2", "10.0.0.2", 200, "2024-10-18 19:01:31"); Affected Rows: 8 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('requests_long_term'); +----------------------------------------+ | ADMIN FLUSH_FLOW('requests_long_term') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ SELECT * FROM requests_without_ip ORDER BY ts ASC;; +--------------+-----+---------------------+ | service_name | val | ts | +--------------+-----+---------------------+ | | 100 | 2024-10-18T19:00:00 | | svc1 | 100 | 2024-10-18T19:00:00 | | svc2 | 100 | 2024-10-18T19:00:00 | | | 200 | 2024-10-18T19:00:30 | | svc1 | 200 | 2024-10-18T19:00:30 | | svc2 | 200 | 2024-10-18T19:00:30 | | | 300 | 2024-10-18T19:01:00 | | svc2 | 300 | 2024-10-18T19:01:00 | | | 100 | 2024-10-18T19:01:01 | | svc2 | 100 | 2024-10-18T19:01:01 | | svc1 | 400 | 2024-10-18T19:01:30 | | svc2 | 400 | 2024-10-18T19:01:30 | | svc1 | 200 | 2024-10-18T19:01:31 | | svc2 | 200 | 2024-10-18T19:01:31 | | | 100 | 2024-10-19T19:00:00 | | | 200 | 2024-10-19T19:00:30 | | | 300 | 2024-10-19T19:01:00 | | | 100 | 2024-10-19T19:01:01 | | | 400 | 2024-10-19T19:01:30 | | | 200 | 2024-10-19T19:01:31 | +--------------+-----+---------------------+ DROP FLOW requests_long_term; Affected Rows: 0 DROP TABLE requests_without_ip; Affected Rows: 0 DROP TABLE requests; Affected Rows: 0 CREATE TABLE android_log ( `log` STRING, ts TIMESTAMP(9), TIME INDEX(ts) )WITH( append_mode = 'true' ); Affected Rows: 0 CREATE TABLE android_log_abnormal ( crash BIGINT NULL, fatal BIGINT NULL, backtrace BIGINT NULL, anr BIGINT NULL, time_window TIMESTAMP(9) TIME INDEX, update_at TIMESTAMP, ); Affected Rows: 0 CREATE FLOW calc_android_log_abnormal SINK TO android_log_abnormal AS SELECT sum(case when `log` LIKE '%am_crash%' then 1 else 0 end) as crash, sum(case when `log` LIKE '%FATAL EXCEPTION%' then 1 else 0 end) as fatal, sum(case when `log` LIKE '%backtrace%' then 1 else 0 end) as backtrace, sum(case when `log` LIKE '%am_anr%' then 1 else 0 end) as anr, date_bin(INTERVAL '5 minutes', ts) as time_window, FROM android_log GROUP BY time_window; Affected Rows: 0 SHOW CREATE TABLE android_log_abnormal; +----------------------+-----------------------------------------------------+ | Table | Create Table | +----------------------+-----------------------------------------------------+ | android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | | | "crash" BIGINT NULL, | | | "fatal" BIGINT NULL, | | | "backtrace" BIGINT NULL, | | | "anr" BIGINT NULL, | | | "time_window" TIMESTAMP(9) NOT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | TIME INDEX ("time_window") | | | ) | | | | | | ENGINE=mito | | | | +----------------------+-----------------------------------------------------+ INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_android_log_abnormal'); +-----------------------------------------------+ | ADMIN FLUSH_FLOW('calc_android_log_abnormal') | +-----------------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------------+ SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; +-------+-------+-----------+-----+ | crash | fatal | backtrace | anr | +-------+-------+-----------+-----+ | 1 | 0 | 1 | 0 | +-------+-------+-----------+-----+ INSERT INTO android_log values ("FATAL EXCEPTION", "2021-07-01 00:01:01.000"), ("mamam_anraaaa", "2021-07-01 00:01:01.000"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_android_log_abnormal'); +-----------------------------------------------+ | ADMIN FLUSH_FLOW('calc_android_log_abnormal') | +-----------------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------------+ SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; +-------+-------+-----------+-----+ | crash | fatal | backtrace | anr | +-------+-------+-----------+-----+ | 1 | 1 | 1 | 1 | +-------+-------+-----------+-----+ DROP FLOW calc_android_log_abnormal; Affected Rows: 0 DROP TABLE android_log_abnormal; Affected Rows: 0 DROP TABLE android_log; Affected Rows: 0 CREATE TABLE android_log ( `log` STRING, ts TIMESTAMP(9), TIME INDEX(ts) )WITH( append_mode = 'true' ); Affected Rows: 0 CREATE TABLE android_log_abnormal ( crash BIGINT NULL, fatal BIGINT NULL, backtrace BIGINT NULL, anr BIGINT NULL, time_window TIMESTAMP(9) TIME INDEX, update_at TIMESTAMP, ); Affected Rows: 0 CREATE FLOW calc_android_log_abnormal SINK TO android_log_abnormal AS SELECT sum(case when regexp_like(`log`, '.*am_crash.*') then 1 else 0 end) as crash, sum(case when regexp_like(`log`, '.*FATAL EXCEPTION.*') then 1 else 0 end) as fatal, sum(case when regexp_like(`log`, '.*backtrace.*') then 1 else 0 end) as backtrace, sum(case when regexp_like(`log`, '.*am_anr.*') then 1 else 0 end) as anr, date_bin(INTERVAL '5 minutes', ts) as time_window, FROM android_log GROUP BY time_window; Affected Rows: 0 SHOW CREATE TABLE android_log_abnormal; +----------------------+-----------------------------------------------------+ | Table | Create Table | +----------------------+-----------------------------------------------------+ | android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | | | "crash" BIGINT NULL, | | | "fatal" BIGINT NULL, | | | "backtrace" BIGINT NULL, | | | "anr" BIGINT NULL, | | | "time_window" TIMESTAMP(9) NOT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | TIME INDEX ("time_window") | | | ) | | | | | | ENGINE=mito | | | | +----------------------+-----------------------------------------------------+ INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_android_log_abnormal'); +-----------------------------------------------+ | ADMIN FLUSH_FLOW('calc_android_log_abnormal') | +-----------------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------------+ SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; +-------+-------+-----------+-----+ | crash | fatal | backtrace | anr | +-------+-------+-----------+-----+ | 1 | 0 | 1 | 0 | +-------+-------+-----------+-----+ INSERT INTO android_log values ("FATAL EXCEPTION", "2021-07-01 00:01:01.000"), ("mamam_anraaaa", "2021-07-01 00:01:01.000"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_android_log_abnormal'); +-----------------------------------------------+ | ADMIN FLUSH_FLOW('calc_android_log_abnormal') | +-----------------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------------+ SELECT crash, fatal, backtrace, anr FROM android_log_abnormal; +-------+-------+-----------+-----+ | crash | fatal | backtrace | anr | +-------+-------+-----------+-----+ | 1 | 1 | 1 | 1 | +-------+-------+-----------+-----+ DROP FLOW calc_android_log_abnormal; Affected Rows: 0 DROP TABLE android_log_abnormal; Affected Rows: 0 DROP TABLE android_log; Affected Rows: 0 CREATE TABLE numbers_input_basic ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); Affected Rows: 0 CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS SELECT sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num FROM numbers_input_basic; Affected Rows: 0 SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ | Table | Create Table | +-------------------+--------------------------------------------------+ | out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | | | "avg_after_filter_num" BIGINT NULL, | | | "update_at" TIMESTAMP(3) NULL, | | | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("__ts_placeholder") | | | ) | | | | | | ENGINE=mito | | | | +-------------------+--------------------------------------------------+ -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ | ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ -- SQLNESS ARG restart=true SELECT 1; +----------+ | Int64(1) | +----------+ | 1 | +----------+ -- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ | ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ SELECT avg_after_filter_num FROM out_num_cnt_basic; +----------------------+ | avg_after_filter_num | +----------------------+ | 1 | +----------------------+ INSERT INTO numbers_input_basic VALUES (10, "2021-07-01 00:00:00.200"), (23, "2021-07-01 00:00:00.600"); Affected Rows: 2 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ | ADMIN FLUSH_FLOW('test_numbers_basic') | +----------------------------------------+ | FLOW_FLUSHED | +----------------------------------------+ DROP FLOW test_numbers_basic; Affected Rows: 0 DROP TABLE numbers_input_basic; Affected Rows: 0 DROP TABLE out_num_cnt_basic; Affected Rows: 0 CREATE TABLE `live_connection_log` ( `device_model` STRING NULL, `connect_protocol` INT NULL, `connect_mode` INT NULL, `connect_retry_times` DOUBLE NULL, `connect_result` INT NULL, `first_frame_time` DOUBLE NULL, `record_time` TIMESTAMP TIME INDEX, `iot_online` INT NULL, PRIMARY KEY (`device_model`,`connect_protocol`), ); Affected Rows: 0 CREATE TABLE `live_connection_statistics_detail` ( `device_model` STRING NULL, `connect_protocol` INT NULL, `connect_mode` INT NULL, `avg_connect_retry_times` DOUBLE NULL, `total_connect_result_ok` INT64 NULL, `total_connect_result_fail` INT64 NULL, `total_connect` INT64 NULL, `conection_rate` DOUBLE NULL, `avg_first_frame_time` DOUBLE NULL, `max_first_frame_time` DOUBLE NULL, `ok_conection_rate` DOUBLE NULL, `record_time_window` TIMESTAMP TIME INDEX, `update_at` TIMESTAMP, PRIMARY KEY (`device_model`,`connect_protocol`), ); Affected Rows: 0 CREATE FLOW live_connection_aggregation_detail SINK TO live_connection_statistics_detail AS SELECT device_model, connect_protocol, connect_mode, avg(connect_retry_times) as avg_connect_retry_times, sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok, sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail, count(connect_result) as total_connect, sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate, avg(first_frame_time) as avg_first_frame_time, max(first_frame_time) as max_first_frame_time, sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate, date_bin(INTERVAL '1 minutes', record_time) as record_time_window, FROM live_connection_log WHERE iot_online = 1 GROUP BY device_model, connect_protocol, connect_mode, record_time_window; Affected Rows: 0 INSERT INTO live_connection_log VALUES ("STM51", 1, 1, 0.5, 1, 0.1, 0, 1); Affected Rows: 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('live_connection_aggregation_detail'); +--------------------------------------------------------+ | ADMIN FLUSH_FLOW('live_connection_aggregation_detail') | +--------------------------------------------------------+ | FLOW_FLUSHED | +--------------------------------------------------------+ SELECT device_model, connect_protocol, connect_mode, avg_connect_retry_times, total_connect_result_ok, total_connect_result_fail, total_connect, conection_rate, avg_first_frame_time, max_first_frame_time, ok_conection_rate, record_time_window FROM live_connection_statistics_detail; +--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ | device_model | connect_protocol | connect_mode | avg_connect_retry_times | total_connect_result_ok | total_connect_result_fail | total_connect | conection_rate | avg_first_frame_time | max_first_frame_time | ok_conection_rate | record_time_window | +--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ | STM51 | 1 | 1 | 0.5 | 1 | 0 | 1 | 1.0 | 0.1 | 0.1 | 1.0 | 1970-01-01T00:00:00 | +--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+ -- Create a flow with same source and sink table CREATE FLOW same_source_and_sink_table SINK TO live_connection_log AS SELECT * FROM live_connection_log; Error: 1001(Unsupported), Unsupported operation Creating flow with source and sink table being the same: greptime.public.live_connection_log DROP FLOW live_connection_aggregation_detail; Affected Rows: 0 DROP TABLE live_connection_log; Affected Rows: 0 DROP TABLE live_connection_statistics_detail; Affected Rows: 0