CREATE TABLE numbers_input_basic ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); Affected Rows: 0 CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); Affected Rows: 0 -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 admin flush_flow('test_numbers_basic'); +----------------------------------------+ | ADMIN flush_flow('test_numbers_basic') | +----------------------------------------+ | 0 | +----------------------------------------+ -- SQLNESS ARG restart=true INSERT INTO numbers_input_basic VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); Affected Rows: 2 admin flush_flow('test_numbers_basic'); +----------------------------------------+ | ADMIN flush_flow('test_numbers_basic') | +----------------------------------------+ | 1 | +----------------------------------------+ SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic; +---------------------------------+---------------------+---------------------+ | SUM(numbers_input_basic.number) | window_start | window_end | +---------------------------------+---------------------+---------------------+ | 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +---------------------------------+---------------------+---------------------+ admin flush_flow('test_numbers_basic'); +----------------------------------------+ | ADMIN flush_flow('test_numbers_basic') | +----------------------------------------+ | 0 | +----------------------------------------+ INSERT INTO numbers_input_basic VALUES (23, "2021-07-01 00:00:01.000"), (24, "2021-07-01 00:00:01.500"); Affected Rows: 2 admin flush_flow('test_numbers_basic'); +----------------------------------------+ | ADMIN flush_flow('test_numbers_basic') | +----------------------------------------+ | 1 | +----------------------------------------+ -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion SELECT "SUM(numbers_input_basic.number)", window_start, window_end FROM out_num_cnt_basic; +---------------------------------+---------------------+---------------------+ | SUM(numbers_input_basic.number) | window_start | window_end | +---------------------------------+---------------------+---------------------+ | 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | | 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | +---------------------------------+---------------------+---------------------+ DROP FLOW test_numbers_basic; Affected Rows: 0 DROP TABLE numbers_input_basic; Affected Rows: 0 DROP TABLE out_num_cnt_basic; Affected Rows: 0 -- test distinct CREATE TABLE distinct_basic ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); Affected Rows: 0 CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS SELECT DISTINCT number as dis FROM distinct_basic; Affected Rows: 0 -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 admin flush_flow('test_distinct_basic'); +-----------------------------------------+ | ADMIN flush_flow('test_distinct_basic') | +-----------------------------------------+ | 0 | +-----------------------------------------+ -- SQLNESS ARG restart=true INSERT INTO distinct_basic VALUES (20, "2021-07-01 00:00:00.200"), (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); Affected Rows: 3 admin flush_flow('test_distinct_basic'); +-----------------------------------------+ | ADMIN flush_flow('test_distinct_basic') | +-----------------------------------------+ | 1 | +-----------------------------------------+ SELECT dis FROM out_distinct_basic; +-----+ | dis | +-----+ | 20 | | 22 | +-----+ admin flush_flow('test_distinct_basic'); +-----------------------------------------+ | ADMIN flush_flow('test_distinct_basic') | +-----------------------------------------+ | 0 | +-----------------------------------------+ INSERT INTO distinct_basic VALUES (23, "2021-07-01 00:00:01.000"), (24, "2021-07-01 00:00:01.500"); Affected Rows: 2 admin flush_flow('test_distinct_basic'); +-----------------------------------------+ | ADMIN flush_flow('test_distinct_basic') | +-----------------------------------------+ | 1 | +-----------------------------------------+ -- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion SELECT dis FROM out_distinct_basic; +-----+ | dis | +-----+ | 20 | | 22 | | 23 | | 24 | +-----+ DROP FLOW test_distinct_basic; Affected Rows: 0 DROP TABLE distinct_basic; Affected Rows: 0 DROP TABLE out_distinct_basic; Affected Rows: 0 -- test interprete interval CREATE TABLE numbers_input_basic ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); Affected Rows: 0 create table out_num_cnt_basic ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX ); Affected Rows: 0 CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic where number > 10; Affected Rows: 0 SHOW CREATE FLOW filter_numbers_basic; +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ | Flow | Create Flow | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ | filter_numbers_basic | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_basic | | | SINK TO out_num_cnt_basic | | | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ drop flow filter_numbers_basic; Affected Rows: 0 drop table out_num_cnt_basic; Affected Rows: 0 drop table numbers_input_basic; Affected Rows: 0 CREATE TABLE bytes_log ( byte INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- event time TIME INDEX(ts) ); Affected Rows: 0 -- TODO(discord9): remove this after auto infer table's time index is impl CREATE TABLE approx_rate ( rate DOUBLE, time_window TIMESTAMP, update_at TIMESTAMP, TIME INDEX(time_window) ); Affected Rows: 0 CREATE FLOW find_approx_rate SINK TO approx_rate AS SELECT (max(byte) - min(byte)) / 30.0 as rate, date_bin(INTERVAL '30 second', ts) as time_window from bytes_log GROUP BY time_window; Affected Rows: 0 INSERT INTO bytes_log VALUES (101, '2025-01-01 00:00:01'), (300, '2025-01-01 00:00:29'); Affected Rows: 2 admin flush_flow('find_approx_rate'); +--------------------------------------+ | ADMIN flush_flow('find_approx_rate') | +--------------------------------------+ | 1 | +--------------------------------------+ SELECT rate, time_window FROM approx_rate; +-------------------+---------------------+ | rate | time_window | +-------------------+---------------------+ | 6.633333333333334 | 2025-01-01T00:00:00 | +-------------------+---------------------+ INSERT INTO bytes_log VALUES (450, '2025-01-01 00:00:32'), (500, '2025-01-01 00:00:37'); Affected Rows: 2 admin flush_flow('find_approx_rate'); +--------------------------------------+ | ADMIN flush_flow('find_approx_rate') | +--------------------------------------+ | 1 | +--------------------------------------+ SELECT rate, time_window FROM approx_rate; +--------------------+---------------------+ | rate | time_window | +--------------------+---------------------+ | 6.633333333333334 | 2025-01-01T00:00:00 | | 1.6666666666666667 | 2025-01-01T00:00:30 | +--------------------+---------------------+ DROP TABLE bytes_log; Affected Rows: 0 DROP FLOW find_approx_rate; Affected Rows: 0 DROP TABLE approx_rate; Affected Rows: 0 -- input table CREATE TABLE ngx_access_log ( client STRING, country STRING, access_time TIMESTAMP TIME INDEX ); Affected Rows: 0 -- create flow task to calculate the distinct country CREATE FLOW calc_ngx_country SINK TO ngx_country AS SELECT DISTINCT country, FROM ngx_access_log; Affected Rows: 0 INSERT INTO ngx_access_log VALUES ("cli1", "b", 0); Affected Rows: 1 ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | 1 | +--------------------------------------+ SELECT "ngx_access_log.country" FROM ngx_country; +------------------------+ | ngx_access_log.country | +------------------------+ | b | +------------------------+ -- making sure distinct is working INSERT INTO ngx_access_log VALUES ("cli1", "b", 1); Affected Rows: 1 ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | 1 | +--------------------------------------+ SELECT "ngx_access_log.country" FROM ngx_country; +------------------------+ | ngx_access_log.country | +------------------------+ | b | +------------------------+ INSERT INTO ngx_access_log VALUES ("cli1", "c", 2); Affected Rows: 1 ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | 1 | +--------------------------------------+ SELECT "ngx_access_log.country" FROM ngx_country; +------------------------+ | ngx_access_log.country | +------------------------+ | b | | c | +------------------------+ DROP FLOW calc_ngx_country; Affected Rows: 0 DROP TABLE ngx_access_log; Affected Rows: 0 DROP TABLE ngx_country; Affected Rows: 0 CREATE TABLE ngx_access_log ( client STRING, country STRING, access_time TIMESTAMP TIME INDEX ); Affected Rows: 0 CREATE FLOW calc_ngx_country SINK TO ngx_country AS SELECT DISTINCT country, -- this distinct is not necessary, but it's a good test to see if it works date_bin(INTERVAL '1 hour', access_time) as time_window, FROM ngx_access_log GROUP BY country, time_window; Affected Rows: 0 INSERT INTO ngx_access_log VALUES ("cli1", "b", 0); Affected Rows: 1 ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | 1 | +--------------------------------------+ SELECT "ngx_access_log.country", time_window FROM ngx_country; +------------------------+---------------------+ | ngx_access_log.country | time_window | +------------------------+---------------------+ | b | 1970-01-01T00:00:00 | +------------------------+---------------------+ -- making sure distinct is working INSERT INTO ngx_access_log VALUES ("cli1", "b", 1); Affected Rows: 1 ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | 1 | +--------------------------------------+ SELECT "ngx_access_log.country", time_window FROM ngx_country; +------------------------+---------------------+ | ngx_access_log.country | time_window | +------------------------+---------------------+ | b | 1970-01-01T00:00:00 | +------------------------+---------------------+ INSERT INTO ngx_access_log VALUES ("cli1", "c", 2); Affected Rows: 1 ADMIN FLUSH_FLOW('calc_ngx_country'); +--------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_country') | +--------------------------------------+ | 1 | +--------------------------------------+ SELECT "ngx_access_log.country", time_window FROM ngx_country; +------------------------+---------------------+ | ngx_access_log.country | time_window | +------------------------+---------------------+ | b | 1970-01-01T00:00:00 | | c | 1970-01-01T00:00:00 | +------------------------+---------------------+ DROP FLOW calc_ngx_country; Affected Rows: 0 DROP TABLE ngx_access_log; Affected Rows: 0 DROP TABLE ngx_country; Affected Rows: 0 CREATE TABLE temp_sensor_data ( sensor_id INT, loc STRING, temperature DOUBLE, ts TIMESTAMP TIME INDEX ); Affected Rows: 0 CREATE TABLE temp_alerts ( sensor_id INT, loc STRING, max_temp DOUBLE, ts TIMESTAMP TIME INDEX ); Affected Rows: 0 CREATE FLOW temp_monitoring SINK TO temp_alerts AS SELECT sensor_id, loc, max(temperature) as max_temp, FROM temp_sensor_data GROUP BY sensor_id, loc HAVING max_temp > 100; Affected Rows: 0 INSERT INTO temp_sensor_data VALUES (1, "room1", 50, 0); Affected Rows: 1 ADMIN FLUSH_FLOW('temp_monitoring'); +-------------------------------------+ | ADMIN FLUSH_FLOW('temp_monitoring') | +-------------------------------------+ | 1 | +-------------------------------------+ -- This table should not exist yet SHOW TABLES LIKE 'temp_alerts'; +-------------+ | Tables | +-------------+ | temp_alerts | +-------------+ INSERT INTO temp_sensor_data VALUES (1, "room1", 150, 1); Affected Rows: 1 ADMIN FLUSH_FLOW('temp_monitoring'); +-------------------------------------+ | ADMIN FLUSH_FLOW('temp_monitoring') | +-------------------------------------+ | 1 | +-------------------------------------+ SHOW TABLES LIKE 'temp_alerts'; +-------------+ | Tables | +-------------+ | temp_alerts | +-------------+ SELECT sensor_id, loc, max_temp FROM temp_alerts; +-----------+-------+----------+ | sensor_id | loc | max_temp | +-----------+-------+----------+ | 1 | room1 | 150.0 | +-----------+-------+----------+ INSERT INTO temp_sensor_data VALUES (2, "room1", 0, 2); Affected Rows: 1 ADMIN FLUSH_FLOW('temp_monitoring'); +-------------------------------------+ | ADMIN FLUSH_FLOW('temp_monitoring') | +-------------------------------------+ | 1 | +-------------------------------------+ SELECT sensor_id, loc, max_temp FROM temp_alerts; +-----------+-------+----------+ | sensor_id | loc | max_temp | +-----------+-------+----------+ | 1 | room1 | 150.0 | +-----------+-------+----------+ DROP FLOW temp_monitoring; Affected Rows: 0 DROP TABLE temp_sensor_data; Affected Rows: 0 DROP TABLE temp_alerts; Affected Rows: 0 CREATE TABLE ngx_access_log ( client STRING, stat INT, size INT, access_time TIMESTAMP TIME INDEX ); Affected Rows: 0 CREATE TABLE ngx_distribution ( stat INT, bucket_size INT, total_logs BIGINT, time_window TIMESTAMP TIME INDEX, -- auto generated column by flow engine update_at TIMESTAMP, PRIMARY KEY(stat, bucket_size) ); Affected Rows: 0 CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS SELECT stat, trunc(size, -1) :: INT as bucket_size, count(client) AS total_logs, date_bin(INTERVAL '1 minutes', access_time) as time_window, FROM ngx_access_log GROUP BY stat, time_window, bucket_size; Affected Rows: 0 INSERT INTO ngx_access_log VALUES ("cli1", 200, 100, 0); Affected Rows: 1 ADMIN FLUSH_FLOW('calc_ngx_distribution'); +-------------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_distribution') | +-------------------------------------------+ | 1 | +-------------------------------------------+ SELECT stat, bucket_size, total_logs, time_window FROM ngx_distribution; +------+-------------+------------+---------------------+ | stat | bucket_size | total_logs | time_window | +------+-------------+------------+---------------------+ | 200 | 100 | 1 | 1970-01-01T00:00:00 | +------+-------------+------------+---------------------+ INSERT INTO ngx_access_log VALUES ("cli1", 200, 200, 1), ("cli1", 200, 205, 1), ("cli1", 200, 209, 1), ("cli1", 200, 210, 1), ("cli2", 200, 300, 1); Affected Rows: 5 ADMIN FLUSH_FLOW('calc_ngx_distribution'); +-------------------------------------------+ | ADMIN FLUSH_FLOW('calc_ngx_distribution') | +-------------------------------------------+ | 1 | +-------------------------------------------+ SELECT stat, bucket_size, total_logs, time_window FROM ngx_distribution; +------+-------------+------------+---------------------+ | stat | bucket_size | total_logs | time_window | +------+-------------+------------+---------------------+ | 200 | 100 | 1 | 1970-01-01T00:00:00 | | 200 | 200 | 1 | 1970-01-01T00:00:00 | | 200 | 210 | 3 | 1970-01-01T00:00:00 | | 200 | 300 | 1 | 1970-01-01T00:00:00 | +------+-------------+------------+---------------------+ DROP FLOW calc_ngx_distribution; Affected Rows: 0 DROP TABLE ngx_access_log; Affected Rows: 0 DROP TABLE ngx_distribution; Affected Rows: 0 CREATE TABLE requests ( service_name STRING, service_ip STRING, val INT, ts TIMESTAMP TIME INDEX ); Affected Rows: 0 CREATE TABLE requests_without_ip ( service_name STRING, val INT, ts TIMESTAMP TIME INDEX, ); Affected Rows: 0 CREATE FLOW requests_long_term SINK TO requests_without_ip AS SELECT service_name, val, ts FROM requests; Affected Rows: 0 INSERT INTO requests VALUES ("svc1", "10.0.0.1", 100, "2024-10-18 19:00:00"), ("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"), ("svc1", "10.0.0.1", 200, "2024-10-18 19:00:30"), ("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"), ("svc1", "10.0.0.1", 300, "2024-10-18 19:01:00"), ("svc1", "10.0.0.2", 100, "2024-10-18 19:01:01"), ("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"), ("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31"); Affected Rows: 8 admin flush_flow('requests_long_term'); +----------------------------------------+ | ADMIN flush_flow('requests_long_term') | +----------------------------------------+ | 1 | +----------------------------------------+ SELECT * FROM requests_without_ip; +--------------+-----+---------------------+ | service_name | val | ts | +--------------+-----+---------------------+ | svc1 | 100 | 2024-10-18T19:00:00 | | svc1 | 200 | 2024-10-18T19:00:30 | | svc1 | 300 | 2024-10-18T19:01:00 | | svc1 | 100 | 2024-10-18T19:01:01 | | svc1 | 400 | 2024-10-18T19:01:30 | | svc1 | 200 | 2024-10-18T19:01:31 | +--------------+-----+---------------------+ DROP FLOW requests_long_term; Affected Rows: 0 DROP TABLE requests_without_ip; Affected Rows: 0 DROP TABLE requests; Affected Rows: 0