Files
greptimedb/tests/cases/standalone/common/flow/flow_basic.result
discord9 ff38abde2e chore: better column schema check for flow (#4855)
* chore: better column schema check for flow

* chore: better msg

* tests: clean up after tests

* chore: better msg

* chore: per review

* tests: sqlness
2024-10-24 09:43:32 +00:00

919 lines
20 KiB
Plaintext

CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number)
FROM
numbers_input_basic
GROUP BY
tumble(ts, '1 second', '2021-07-01 00:00:00');
Affected Rows: 0
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
admin flush_flow('test_numbers_basic');
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
+----------------------------------------+
| 0 |
+----------------------------------------+
-- SQLNESS ARG restart=true
INSERT INTO
numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
admin flush_flow('test_numbers_basic');
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
+----------------------------------------+
| 1 |
+----------------------------------------+
SELECT
"SUM(numbers_input_basic.number)",
window_start,
window_end
FROM
out_num_cnt_basic;
+---------------------------------+---------------------+---------------------+
| SUM(numbers_input_basic.number) | window_start | window_end |
+---------------------------------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
+---------------------------------+---------------------+---------------------+
admin flush_flow('test_numbers_basic');
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
+----------------------------------------+
| 0 |
+----------------------------------------+
INSERT INTO
numbers_input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
admin flush_flow('test_numbers_basic');
+----------------------------------------+
| ADMIN flush_flow('test_numbers_basic') |
+----------------------------------------+
| 1 |
+----------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT
"SUM(numbers_input_basic.number)",
window_start,
window_end
FROM
out_num_cnt_basic;
+---------------------------------+---------------------+---------------------+
| SUM(numbers_input_basic.number) | window_start | window_end |
+---------------------------------+---------------------+---------------------+
| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 |
| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 |
+---------------------------------+---------------------+---------------------+
DROP FLOW test_numbers_basic;
Affected Rows: 0
DROP TABLE numbers_input_basic;
Affected Rows: 0
DROP TABLE out_num_cnt_basic;
Affected Rows: 0
-- test distinct
CREATE TABLE distinct_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
FROM
distinct_basic;
Affected Rows: 0
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
admin flush_flow('test_distinct_basic');
+-----------------------------------------+
| ADMIN flush_flow('test_distinct_basic') |
+-----------------------------------------+
| 0 |
+-----------------------------------------+
-- SQLNESS ARG restart=true
INSERT INTO
distinct_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 3
admin flush_flow('test_distinct_basic');
+-----------------------------------------+
| ADMIN flush_flow('test_distinct_basic') |
+-----------------------------------------+
| 1 |
+-----------------------------------------+
SELECT
dis
FROM
out_distinct_basic;
+-----+
| dis |
+-----+
| 20 |
| 22 |
+-----+
admin flush_flow('test_distinct_basic');
+-----------------------------------------+
| ADMIN flush_flow('test_distinct_basic') |
+-----------------------------------------+
| 0 |
+-----------------------------------------+
INSERT INTO
distinct_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
admin flush_flow('test_distinct_basic');
+-----------------------------------------+
| ADMIN flush_flow('test_distinct_basic') |
+-----------------------------------------+
| 1 |
+-----------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT
dis
FROM
out_distinct_basic;
+-----+
| dis |
+-----+
| 20 |
| 22 |
| 23 |
| 24 |
+-----+
DROP FLOW test_distinct_basic;
Affected Rows: 0
DROP TABLE distinct_basic;
Affected Rows: 0
DROP TABLE out_distinct_basic;
Affected Rows: 0
-- test interprete interval
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
create table out_num_cnt_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
INTERVAL '1 day 1 second',
INTERVAL '1 month 1 day 1 second',
INTERVAL '1 year 1 month'
FROM
numbers_input_basic
where
number > 10;
Affected Rows: 0
SHOW CREATE FLOW filter_numbers_basic;
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
| filter_numbers_basic | CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_basic |
| | SINK TO out_num_cnt_basic |
| | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 |
+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
drop flow filter_numbers_basic;
Affected Rows: 0
drop table out_num_cnt_basic;
Affected Rows: 0
drop table numbers_input_basic;
Affected Rows: 0
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
bytes_log
VALUES
(101, '2025-01-01 00:00:01'),
(300, '2025-01-01 00:00:29');
Affected Rows: 2
admin flush_flow('find_approx_rate');
+--------------------------------------+
| ADMIN flush_flow('find_approx_rate') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+-------------------+---------------------+
| rate | time_window |
+-------------------+---------------------+
| 6.633333333333334 | 2025-01-01T00:00:00 |
+-------------------+---------------------+
INSERT INTO
bytes_log
VALUES
(450, '2025-01-01 00:00:32'),
(500, '2025-01-01 00:00:37');
Affected Rows: 2
admin flush_flow('find_approx_rate');
+--------------------------------------+
| ADMIN flush_flow('find_approx_rate') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate;
+--------------------+---------------------+
| rate | time_window |
+--------------------+---------------------+
| 6.633333333333334 | 2025-01-01T00:00:00 |
| 1.6666666666666667 | 2025-01-01T00:00:30 |
+--------------------+---------------------+
DROP TABLE bytes_log;
Affected Rows: 0
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0
-- input table
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);
Affected Rows: 0
-- create flow task to calculate the distinct country
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
DISTINCT country,
FROM
ngx_access_log;
Affected Rows: 0
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 0);
Affected Rows: 1
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT
"ngx_access_log.country"
FROM
ngx_country;
+------------------------+
| ngx_access_log.country |
+------------------------+
| b |
+------------------------+
-- making sure distinct is working
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 1);
Affected Rows: 1
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT
"ngx_access_log.country"
FROM
ngx_country;
+------------------------+
| ngx_access_log.country |
+------------------------+
| b |
+------------------------+
INSERT INTO
ngx_access_log
VALUES
("cli1", "c", 2);
Affected Rows: 1
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT
"ngx_access_log.country"
FROM
ngx_country;
+------------------------+
| ngx_access_log.country |
+------------------------+
| b |
| c |
+------------------------+
DROP FLOW calc_ngx_country;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
DROP TABLE ngx_country;
Affected Rows: 0
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
DISTINCT country,
-- this distinct is not necessary, but it's a good test to see if it works
date_bin(INTERVAL '1 hour', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
country,
time_window;
Affected Rows: 0
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 0);
Affected Rows: 1
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT
"ngx_access_log.country",
time_window
FROM
ngx_country;
+------------------------+---------------------+
| ngx_access_log.country | time_window |
+------------------------+---------------------+
| b | 1970-01-01T00:00:00 |
+------------------------+---------------------+
-- making sure distinct is working
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 1);
Affected Rows: 1
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT
"ngx_access_log.country",
time_window
FROM
ngx_country;
+------------------------+---------------------+
| ngx_access_log.country | time_window |
+------------------------+---------------------+
| b | 1970-01-01T00:00:00 |
+------------------------+---------------------+
INSERT INTO
ngx_access_log
VALUES
("cli1", "c", 2);
Affected Rows: 1
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| 1 |
+--------------------------------------+
SELECT
"ngx_access_log.country",
time_window
FROM
ngx_country;
+------------------------+---------------------+
| ngx_access_log.country | time_window |
+------------------------+---------------------+
| b | 1970-01-01T00:00:00 |
| c | 1970-01-01T00:00:00 |
+------------------------+---------------------+
DROP FLOW calc_ngx_country;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
DROP TABLE ngx_country;
Affected Rows: 0
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
SELECT
sensor_id,
loc,
max(temperature) as max_temp,
FROM
temp_sensor_data
GROUP BY
sensor_id,
loc
HAVING
max_temp > 100;
Affected Rows: 0
INSERT INTO
temp_sensor_data
VALUES
(1, "room1", 50, 0);
Affected Rows: 1
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| 1 |
+-------------------------------------+
-- This table should not exist yet
SHOW TABLES LIKE 'temp_alerts';
+-------------+
| Tables |
+-------------+
| temp_alerts |
+-------------+
INSERT INTO
temp_sensor_data
VALUES
(1, "room1", 150, 1);
Affected Rows: 1
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| 1 |
+-------------------------------------+
SHOW TABLES LIKE 'temp_alerts';
+-------------+
| Tables |
+-------------+
| temp_alerts |
+-------------+
SELECT
sensor_id,
loc,
max_temp
FROM
temp_alerts;
+-----------+-------+----------+
| sensor_id | loc | max_temp |
+-----------+-------+----------+
| 1 | room1 | 150.0 |
+-----------+-------+----------+
INSERT INTO
temp_sensor_data
VALUES
(2, "room1", 0, 2);
Affected Rows: 1
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| 1 |
+-------------------------------------+
SELECT
sensor_id,
loc,
max_temp
FROM
temp_alerts;
+-----------+-------+----------+
| sensor_id | loc | max_temp |
+-----------+-------+----------+
| 1 | room1 | 150.0 |
+-----------+-------+----------+
DROP FLOW temp_monitoring;
Affected Rows: 0
DROP TABLE temp_sensor_data;
Affected Rows: 0
DROP TABLE temp_alerts;
Affected Rows: 0
CREATE TABLE ngx_access_log (
client STRING,
stat INT,
size INT,
access_time TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE TABLE ngx_distribution (
stat INT,
bucket_size INT,
total_logs BIGINT,
time_window TIMESTAMP TIME INDEX,
-- auto generated column by flow engine
update_at TIMESTAMP,
PRIMARY KEY(stat, bucket_size)
);
Affected Rows: 0
CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS
SELECT
stat,
trunc(size, -1) :: INT as bucket_size,
count(client) AS total_logs,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
stat,
time_window,
bucket_size;
Affected Rows: 0
INSERT INTO
ngx_access_log
VALUES
("cli1", 200, 100, 0);
Affected Rows: 1
ADMIN FLUSH_FLOW('calc_ngx_distribution');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_distribution') |
+-------------------------------------------+
| 1 |
+-------------------------------------------+
SELECT
stat,
bucket_size,
total_logs,
time_window
FROM
ngx_distribution;
+------+-------------+------------+---------------------+
| stat | bucket_size | total_logs | time_window |
+------+-------------+------------+---------------------+
| 200 | 100 | 1 | 1970-01-01T00:00:00 |
+------+-------------+------------+---------------------+
INSERT INTO
ngx_access_log
VALUES
("cli1", 200, 200, 1),
("cli1", 200, 205, 1),
("cli1", 200, 209, 1),
("cli1", 200, 210, 1),
("cli2", 200, 300, 1);
Affected Rows: 5
ADMIN FLUSH_FLOW('calc_ngx_distribution');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_distribution') |
+-------------------------------------------+
| 1 |
+-------------------------------------------+
SELECT
stat,
bucket_size,
total_logs,
time_window
FROM
ngx_distribution;
+------+-------------+------------+---------------------+
| stat | bucket_size | total_logs | time_window |
+------+-------------+------------+---------------------+
| 200 | 100 | 1 | 1970-01-01T00:00:00 |
| 200 | 200 | 1 | 1970-01-01T00:00:00 |
| 200 | 210 | 3 | 1970-01-01T00:00:00 |
| 200 | 300 | 1 | 1970-01-01T00:00:00 |
+------+-------------+------------+---------------------+
DROP FLOW calc_ngx_distribution;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
DROP TABLE ngx_distribution;
Affected Rows: 0
CREATE TABLE requests (
service_name STRING,
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE TABLE requests_without_ip (
service_name STRING,
val INT,
ts TIMESTAMP TIME INDEX,
);
Affected Rows: 0
CREATE FLOW requests_long_term SINK TO requests_without_ip AS
SELECT
service_name,
val,
ts
FROM
requests;
Affected Rows: 0
INSERT INTO
requests
VALUES
("svc1", "10.0.0.1", 100, "2024-10-18 19:00:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
("svc1", "10.0.0.1", 200, "2024-10-18 19:00:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
("svc1", "10.0.0.1", 300, "2024-10-18 19:01:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:01:01"),
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");
Affected Rows: 8
admin flush_flow('requests_long_term');
+----------------------------------------+
| ADMIN flush_flow('requests_long_term') |
+----------------------------------------+
| 1 |
+----------------------------------------+
SELECT
*
FROM
requests_without_ip;
+--------------+-----+---------------------+
| service_name | val | ts |
+--------------+-----+---------------------+
| svc1 | 100 | 2024-10-18T19:00:00 |
| svc1 | 200 | 2024-10-18T19:00:30 |
| svc1 | 300 | 2024-10-18T19:01:00 |
| svc1 | 100 | 2024-10-18T19:01:01 |
| svc1 | 400 | 2024-10-18T19:01:30 |
| svc1 | 200 | 2024-10-18T19:01:31 |
+--------------+-----+---------------------+
DROP FLOW requests_long_term;
Affected Rows: 0
DROP TABLE requests_without_ip;
Affected Rows: 0
DROP TABLE requests;
Affected Rows: 0