Files
greptimedb/tests/cases/standalone/common/flow/flow_basic.result
dennis zhuang 1f91422bae feat!: improve mysql/pg compatibility (#7315)
* feat(mysql): add SHOW WARNINGS support and return warnings for unsupported SET variables

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat(function): add MySQL IF() function and PostgreSQL description functions for connector compatibility

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: show tables for mysql

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: partitions table in information_schema and add starrocks external catalog compatibility

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* refactor: async udf

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: set warnings

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: impl pg_my_temp_schema and make description functions simple

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add test for issue 7313

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: apply suggestions

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: partition_expression and partition_description

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: test

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: unit tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: saerch_path only works for pg

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* feat: improve warnings processing

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* fix: warnings while writing affected rows and refactor

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: improve ShobjDescriptionFunction signature

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* refactor: array_to_boolean

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-12-01 20:41:14 +00:00

1882 lines
56 KiB
Plaintext

CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(number),
date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') as time_window
FROM
numbers_input_basic
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT
"sum(numbers_input_basic.number)",
time_window
FROM
out_num_cnt_basic;
+---------------------------------+---------------------+
| sum(numbers_input_basic.number) | time_window |
+---------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 |
+---------------------------------+---------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
INSERT INTO
numbers_input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT
"sum(numbers_input_basic.number)",
time_window
FROM
out_num_cnt_basic;
+---------------------------------+---------------------+
| sum(numbers_input_basic.number) | time_window |
+---------------------------------+---------------------+
| 42 | 2021-07-01T00:00:00 |
| 47 | 2021-07-01T00:00:01 |
+---------------------------------+---------------------+
DROP FLOW test_numbers_basic;
Affected Rows: 0
DROP TABLE numbers_input_basic;
Affected Rows: 0
DROP TABLE out_num_cnt_basic;
Affected Rows: 0
-- test count(*) rewrite
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_wildcard_basic SiNk TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------+
| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( |
| | "wildcard" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+---------------------------------------------+
DROP FLOW test_wildcard_basic;
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------+
| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( |
| | "wildcard" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+---------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------+
| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( |
| | "wildcard" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+---------------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 2 |
+----------+
DROP FLOW test_wildcard_basic;
Affected Rows: 0
DROP TABLE out_basic;
Affected Rows: 0
DROP TABLE input_basic;
Affected Rows: 0
-- test distinct
CREATE TABLE distinct_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
SELECT
DISTINCT number as dis
FROM
distinct_basic;
Affected Rows: 0
SHOW CREATE TABLE out_distinct_basic;
+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
| | "dis" INT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("dis") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+--------------------+---------------------------------------------------+
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
distinct_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SHOW CREATE TABLE out_distinct_basic;
+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
| | "dis" INT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("dis") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+--------------------+---------------------------------------------------+
SELECT
dis
FROM
out_distinct_basic;
+-----+
| dis |
+-----+
| 20 |
| 22 |
+-----+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
INSERT INTO
distinct_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_distinct_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_distinct_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
SELECT
dis
FROM
out_distinct_basic;
+-----+
| dis |
+-----+
| 20 |
| 22 |
| 23 |
| 24 |
+-----+
DROP FLOW test_distinct_basic;
Affected Rows: 0
DROP TABLE distinct_basic;
Affected Rows: 0
DROP TABLE out_distinct_basic;
Affected Rows: 0
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE,
time_window TIMESTAMP,
update_at TIMESTAMP,
TIME INDEX(time_window)
);
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE TABLE approx_rate;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( |
| | "rate" DOUBLE NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
INSERT INTO
bytes_log
VALUES
(NULL, '2023-01-01 00:00:01'),
(300, '2023-01-01 00:00:29');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate
ORDER BY time_window ASC;
+------+---------------------+
| rate | time_window |
+------+---------------------+
| 0.0 | 2023-01-01T00:00:00 |
+------+---------------------+
INSERT INTO
bytes_log
VALUES
(NULL, '2022-01-01 00:00:01'),
(NULL, '2022-01-01 00:00:29');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate
ORDER BY time_window ASC;
+------+---------------------+
| rate | time_window |
+------+---------------------+
| | 2022-01-01T00:00:00 |
| 0.0 | 2023-01-01T00:00:00 |
+------+---------------------+
INSERT INTO
bytes_log
VALUES
(101, '2025-01-01 00:00:01'),
(300, '2025-01-01 00:00:29');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate
ORDER BY time_window ASC;
+-------------------+---------------------+
| rate | time_window |
+-------------------+---------------------+
| | 2022-01-01T00:00:00 |
| 0.0 | 2023-01-01T00:00:00 |
| 6.633333333333334 | 2025-01-01T00:00:00 |
+-------------------+---------------------+
INSERT INTO
bytes_log
VALUES
(450, '2025-01-01 00:00:32'),
(500, '2025-01-01 00:00:37');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
rate,
time_window
FROM
approx_rate
ORDER BY time_window ASC;
+--------------------+---------------------+
| rate | time_window |
+--------------------+---------------------+
| | 2022-01-01T00:00:00 |
| 0.0 | 2023-01-01T00:00:00 |
| 6.633333333333334 | 2025-01-01T00:00:00 |
| 1.6666666666666667 | 2025-01-01T00:00:30 |
+--------------------+---------------------+
DROP TABLE bytes_log;
Affected Rows: 0
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0
-- input table
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);
Affected Rows: 0
-- create flow task to calculate the distinct country
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
DISTINCT country,
FROM
ngx_access_log;
Affected Rows: 0
SHOW CREATE TABLE ngx_country;
+-------------+---------------------------------------------+
| Table | Create Table |
+-------------+---------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "country" STRING NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("country") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+---------------------------------------------+
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 0);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SHOW CREATE TABLE ngx_country;
+-------------+---------------------------------------------+
| Table | Create Table |
+-------------+---------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "country" STRING NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("country") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+---------------------------------------------+
SELECT
country
FROM
ngx_country;
+---------+
| country |
+---------+
| b |
+---------+
-- making sure distinct is working
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 1);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
country
FROM
ngx_country;
+---------+
| country |
+---------+
| b |
+---------+
INSERT INTO
ngx_access_log
VALUES
("cli1", "c", 2);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
country
FROM
ngx_country;
+---------+
| country |
+---------+
| b |
| c |
+---------+
DROP FLOW calc_ngx_country;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
DROP TABLE ngx_country;
Affected Rows: 0
CREATE TABLE ngx_access_log (
client STRING,
country STRING,
access_time TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
SELECT
DISTINCT country,
-- this distinct is not necessary, but it's a good test to see if it works
date_bin(INTERVAL '1 hour', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
country,
time_window;
Affected Rows: 0
SHOW CREATE TABLE ngx_country;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "country" STRING NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window"), |
| | PRIMARY KEY ("country") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 0);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SHOW CREATE TABLE ngx_country;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "country" STRING NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window"), |
| | PRIMARY KEY ("country") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
SELECT
country,
time_window
FROM
ngx_country;
+---------+---------------------+
| country | time_window |
+---------+---------------------+
| b | 1970-01-01T00:00:00 |
+---------+---------------------+
-- making sure distinct is working
INSERT INTO
ngx_access_log
VALUES
("cli1", "b", 1);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
country,
time_window
FROM
ngx_country;
+---------+---------------------+
| country | time_window |
+---------+---------------------+
| b | 1970-01-01T00:00:00 |
+---------+---------------------+
INSERT INTO
ngx_access_log
VALUES
("cli1", "c", 2);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_country');
+--------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_country') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT
country,
time_window
FROM
ngx_country;
+---------+---------------------+
| country | time_window |
+---------+---------------------+
| b | 1970-01-01T00:00:00 |
| c | 1970-01-01T00:00:00 |
+---------+---------------------+
DROP FLOW calc_ngx_country;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
DROP TABLE ngx_country;
Affected Rows: 0
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE temp_alerts (
sensor_id INT,
loc STRING,
max_temp DOUBLE,
event_ts TIMESTAMP TIME INDEX,
update_at TIMESTAMP
);
Affected Rows: 0
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
SELECT
sensor_id,
loc,
max(temperature) as max_temp,
max(ts) as event_ts
FROM
temp_sensor_data
GROUP BY
sensor_id,
loc
HAVING
max_temp > 100;
Affected Rows: 0
SHOW CREATE TABLE temp_alerts;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| temp_alerts | CREATE TABLE IF NOT EXISTS "temp_alerts" ( |
| | "sensor_id" INT NULL, |
| | "loc" STRING NULL, |
| | "max_temp" DOUBLE NULL, |
| | "event_ts" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("event_ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
INSERT INTO
temp_sensor_data
VALUES
(1, "room1", 50, 0);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------+
-- This table should not exist yet
SHOW TABLES LIKE 'temp_alerts';
+------------------+
| Tables_in_public |
+------------------+
| temp_alerts |
+------------------+
INSERT INTO
temp_sensor_data
VALUES
(1, "room1", 150, 1);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------+
SHOW TABLES LIKE 'temp_alerts';
+------------------+
| Tables_in_public |
+------------------+
| temp_alerts |
+------------------+
SELECT
sensor_id,
loc,
max_temp,
event_ts
FROM
temp_alerts;
+-----------+-------+----------+-------------------------+
| sensor_id | loc | max_temp | event_ts |
+-----------+-------+----------+-------------------------+
| 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 |
+-----------+-------+----------+-------------------------+
INSERT INTO
temp_sensor_data
VALUES
(2, "room1", 0, 2);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('temp_monitoring');
+-------------------------------------+
| ADMIN FLUSH_FLOW('temp_monitoring') |
+-------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------+
SELECT
sensor_id,
loc,
max_temp,
event_ts
FROM
temp_alerts;
+-----------+-------+----------+-------------------------+
| sensor_id | loc | max_temp | event_ts |
+-----------+-------+----------+-------------------------+
| 1 | room1 | 150.0 | 1970-01-01T00:00:00.001 |
+-----------+-------+----------+-------------------------+
DROP FLOW temp_monitoring;
Affected Rows: 0
DROP TABLE temp_sensor_data;
Affected Rows: 0
DROP TABLE temp_alerts;
Affected Rows: 0
CREATE TABLE ngx_access_log (
client STRING,
stat INT,
size INT,
access_time TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE ngx_distribution (
stat INT,
bucket_size INT,
total_logs BIGINT,
time_window TIMESTAMP TIME INDEX,
-- auto generated column by flow engine
update_at TIMESTAMP,
PRIMARY KEY(stat, bucket_size)
);
Affected Rows: 0
CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS
SELECT
stat,
trunc(size, -1) :: INT as bucket_size,
count(client) AS total_logs,
date_bin(INTERVAL '1 minutes', access_time) as time_window,
FROM
ngx_access_log
GROUP BY
stat,
time_window,
bucket_size;
Affected Rows: 0
SHOW CREATE TABLE ngx_distribution;
+------------------+-------------------------------------------------+
| Table | Create Table |
+------------------+-------------------------------------------------+
| ngx_distribution | CREATE TABLE IF NOT EXISTS "ngx_distribution" ( |
| | "stat" INT NULL, |
| | "bucket_size" INT NULL, |
| | "total_logs" BIGINT NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window"), |
| | PRIMARY KEY ("stat", "bucket_size") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+------------------+-------------------------------------------------+
INSERT INTO
ngx_access_log
VALUES
("cli1", 200, 100, 0);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_distribution');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_distribution') |
+-------------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT
stat,
bucket_size,
total_logs,
time_window
FROM
ngx_distribution;
+------+-------------+------------+---------------------+
| stat | bucket_size | total_logs | time_window |
+------+-------------+------------+---------------------+
| 200 | 100 | 1 | 1970-01-01T00:00:00 |
+------+-------------+------------+---------------------+
INSERT INTO
ngx_access_log
VALUES
("cli1", 200, 200, 1),
("cli1", 200, 205, 1),
("cli1", 200, 209, 1),
("cli1", 200, 210, 1),
("cli2", 200, 300, 1);
Affected Rows: 5
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_ngx_distribution');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('calc_ngx_distribution') |
+-------------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT
stat,
bucket_size,
total_logs,
time_window
FROM
ngx_distribution;
+------+-------------+------------+---------------------+
| stat | bucket_size | total_logs | time_window |
+------+-------------+------------+---------------------+
| 200 | 100 | 1 | 1970-01-01T00:00:00 |
| 200 | 200 | 1 | 1970-01-01T00:00:00 |
| 200 | 210 | 3 | 1970-01-01T00:00:00 |
| 200 | 300 | 1 | 1970-01-01T00:00:00 |
+------+-------------+------------+---------------------+
DROP FLOW calc_ngx_distribution;
Affected Rows: 0
DROP TABLE ngx_access_log;
Affected Rows: 0
DROP TABLE ngx_distribution;
Affected Rows: 0
CREATE TABLE requests (
service_name STRING,
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
)WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE requests_without_ip (
service_name STRING,
val INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(service_name)
);
Affected Rows: 0
CREATE FLOW requests_long_term SINK TO requests_without_ip AS
SELECT
service_name,
val,
ts
FROM
requests;
Affected Rows: 0
SHOW CREATE TABLE requests_without_ip;
+---------------------+----------------------------------------------------+
| Table | Create Table |
+---------------------+----------------------------------------------------+
| requests_without_ip | CREATE TABLE IF NOT EXISTS "requests_without_ip" ( |
| | "service_name" STRING NULL, |
| | "val" INT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("service_name") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+---------------------+----------------------------------------------------+
INSERT INTO
requests
VALUES
(NULL, "10.0.0.1", 100, "2024-10-18 19:00:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
(NULL, "10.0.0.1", 200, "2024-10-18 19:00:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
(NULL, "10.0.0.1", 300, "2024-10-18 19:01:00"),
(NULL, "10.0.0.2", 100, "2024-10-18 19:01:01"),
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");
Affected Rows: 8
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('requests_long_term');
+----------------------------------------+
| ADMIN FLUSH_FLOW('requests_long_term') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT
*
FROM
requests_without_ip
ORDER BY ts ASC;
+--------------+-----+---------------------+
| service_name | val | ts |
+--------------+-----+---------------------+
| | 100 | 2024-10-18T19:00:00 |
| svc1 | 100 | 2024-10-18T19:00:00 |
| | 200 | 2024-10-18T19:00:30 |
| svc1 | 200 | 2024-10-18T19:00:30 |
| | 300 | 2024-10-18T19:01:00 |
| | 100 | 2024-10-18T19:01:01 |
| svc1 | 400 | 2024-10-18T19:01:30 |
| svc1 | 200 | 2024-10-18T19:01:31 |
+--------------+-----+---------------------+
-- Test if FLOWS table works, but don't care about the result since it vary from runs
SELECT
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
FROM
INFORMATION_SCHEMA.FLOWS;
+--------------+
| active_flows |
+--------------+
| 1 |
+--------------+
INSERT INTO
requests
VALUES
(null, "10.0.0.1", 100, "2024-10-19 19:00:00"),
(null, "10.0.0.2", 100, "2024-10-19 19:00:00"),
(null, "10.0.0.1", 200, "2024-10-19 19:00:30"),
(null, "10.0.0.2", 200, "2024-10-19 19:00:30"),
(null, "10.0.0.1", 300, "2024-10-19 19:01:00"),
(null, "10.0.0.2", 100, "2024-10-19 19:01:01"),
(null, "10.0.0.1", 400, "2024-10-19 19:01:30"),
(null, "10.0.0.2", 200, "2024-10-19 19:01:31");
Affected Rows: 8
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('requests_long_term');
+----------------------------------------+
| ADMIN FLUSH_FLOW('requests_long_term') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT
*
FROM
requests_without_ip
ORDER BY ts ASC;
+--------------+-----+---------------------+
| service_name | val | ts |
+--------------+-----+---------------------+
| | 100 | 2024-10-18T19:00:00 |
| svc1 | 100 | 2024-10-18T19:00:00 |
| | 200 | 2024-10-18T19:00:30 |
| svc1 | 200 | 2024-10-18T19:00:30 |
| | 300 | 2024-10-18T19:01:00 |
| | 100 | 2024-10-18T19:01:01 |
| svc1 | 400 | 2024-10-18T19:01:30 |
| svc1 | 200 | 2024-10-18T19:01:31 |
| | 100 | 2024-10-19T19:00:00 |
| | 200 | 2024-10-19T19:00:30 |
| | 300 | 2024-10-19T19:01:00 |
| | 100 | 2024-10-19T19:01:01 |
| | 400 | 2024-10-19T19:01:30 |
| | 200 | 2024-10-19T19:01:31 |
+--------------+-----+---------------------+
INSERT INTO
requests
VALUES
("svc2", "10.0.0.1", 100, "2024-10-18 19:00:00"),
("svc2", "10.0.0.2", 100, "2024-10-18 19:00:00"),
("svc2", "10.0.0.1", 200, "2024-10-18 19:00:30"),
("svc2", "10.0.0.2", 200, "2024-10-18 19:00:30"),
("svc2", "10.0.0.1", 300, "2024-10-18 19:01:00"),
("svc2", "10.0.0.2", 100, "2024-10-18 19:01:01"),
("svc2", "10.0.0.1", 400, "2024-10-18 19:01:30"),
("svc2", "10.0.0.2", 200, "2024-10-18 19:01:31");
Affected Rows: 8
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('requests_long_term');
+----------------------------------------+
| ADMIN FLUSH_FLOW('requests_long_term') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT
*
FROM
requests_without_ip
ORDER BY ts ASC;;
+--------------+-----+---------------------+
| service_name | val | ts |
+--------------+-----+---------------------+
| | 100 | 2024-10-18T19:00:00 |
| svc1 | 100 | 2024-10-18T19:00:00 |
| svc2 | 100 | 2024-10-18T19:00:00 |
| | 200 | 2024-10-18T19:00:30 |
| svc1 | 200 | 2024-10-18T19:00:30 |
| svc2 | 200 | 2024-10-18T19:00:30 |
| | 300 | 2024-10-18T19:01:00 |
| svc2 | 300 | 2024-10-18T19:01:00 |
| | 100 | 2024-10-18T19:01:01 |
| svc2 | 100 | 2024-10-18T19:01:01 |
| svc1 | 400 | 2024-10-18T19:01:30 |
| svc2 | 400 | 2024-10-18T19:01:30 |
| svc1 | 200 | 2024-10-18T19:01:31 |
| svc2 | 200 | 2024-10-18T19:01:31 |
| | 100 | 2024-10-19T19:00:00 |
| | 200 | 2024-10-19T19:00:30 |
| | 300 | 2024-10-19T19:01:00 |
| | 100 | 2024-10-19T19:01:01 |
| | 400 | 2024-10-19T19:01:30 |
| | 200 | 2024-10-19T19:01:31 |
+--------------+-----+---------------------+
DROP FLOW requests_long_term;
Affected Rows: 0
DROP TABLE requests_without_ip;
Affected Rows: 0
DROP TABLE requests;
Affected Rows: 0
CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE android_log_abnormal (
crash BIGINT NULL,
fatal BIGINT NULL,
backtrace BIGINT NULL,
anr BIGINT NULL,
time_window TIMESTAMP(9) TIME INDEX,
update_at TIMESTAMP,
);
Affected Rows: 0
CREATE FLOW calc_android_log_abnormal
SINK TO android_log_abnormal
AS
SELECT
sum(case when `log` LIKE '%am_crash%' then 1 else 0 end) as crash,
sum(case when `log` LIKE '%FATAL EXCEPTION%' then 1 else 0 end) as fatal,
sum(case when `log` LIKE '%backtrace%' then 1 else 0 end) as backtrace,
sum(case when `log` LIKE '%am_anr%' then 1 else 0 end) as anr,
date_bin(INTERVAL '5 minutes', ts) as time_window,
FROM android_log
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE TABLE android_log_abnormal;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( |
| | "crash" BIGINT NULL, |
| | "fatal" BIGINT NULL, |
| | "backtrace" BIGINT NULL, |
| | "anr" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------------------+-----------------------------------------------------+
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
+-----------------------------------------------+
| ADMIN FLUSH_FLOW('calc_android_log_abnormal') |
+-----------------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------------+
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
+-------+-------+-----------+-----+
| crash | fatal | backtrace | anr |
+-------+-------+-----------+-----+
| 1 | 0 | 1 | 0 |
+-------+-------+-----------+-----+
INSERT INTO android_log values
("FATAL EXCEPTION", "2021-07-01 00:01:01.000"),
("mamam_anraaaa", "2021-07-01 00:01:01.000");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
+-----------------------------------------------+
| ADMIN FLUSH_FLOW('calc_android_log_abnormal') |
+-----------------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------------+
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
+-------+-------+-----------+-----+
| crash | fatal | backtrace | anr |
+-------+-------+-----------+-----+
| 1 | 1 | 1 | 1 |
+-------+-------+-----------+-----+
DROP FLOW calc_android_log_abnormal;
Affected Rows: 0
DROP TABLE android_log_abnormal;
Affected Rows: 0
DROP TABLE android_log;
Affected Rows: 0
CREATE TABLE android_log (
`log` STRING,
ts TIMESTAMP(9),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE android_log_abnormal (
crash BIGINT NULL,
fatal BIGINT NULL,
backtrace BIGINT NULL,
anr BIGINT NULL,
time_window TIMESTAMP(9) TIME INDEX,
update_at TIMESTAMP,
);
Affected Rows: 0
CREATE FLOW calc_android_log_abnormal
SINK TO android_log_abnormal
AS
SELECT
sum(case when regexp_like(`log`, '.*am_crash.*') then 1 else 0 end) as crash,
sum(case when regexp_like(`log`, '.*FATAL EXCEPTION.*') then 1 else 0 end) as fatal,
sum(case when regexp_like(`log`, '.*backtrace.*') then 1 else 0 end) as backtrace,
sum(case when regexp_like(`log`, '.*am_anr.*') then 1 else 0 end) as anr,
date_bin(INTERVAL '5 minutes', ts) as time_window,
FROM android_log
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE TABLE android_log_abnormal;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( |
| | "crash" BIGINT NULL, |
| | "fatal" BIGINT NULL, |
| | "backtrace" BIGINT NULL, |
| | "anr" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------------------+-----------------------------------------------------+
INSERT INTO android_log values
("am_crash", "2021-07-01 00:01:01.000"),
("asas.backtrace.ssss", "2021-07-01 00:01:01.000");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
+-----------------------------------------------+
| ADMIN FLUSH_FLOW('calc_android_log_abnormal') |
+-----------------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------------+
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
+-------+-------+-----------+-----+
| crash | fatal | backtrace | anr |
+-------+-------+-----------+-----+
| 1 | 0 | 1 | 0 |
+-------+-------+-----------+-----+
INSERT INTO android_log values
("FATAL EXCEPTION", "2021-07-01 00:01:01.000"),
("mamam_anraaaa", "2021-07-01 00:01:01.000");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_android_log_abnormal');
+-----------------------------------------------+
| ADMIN FLUSH_FLOW('calc_android_log_abnormal') |
+-----------------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------------+
SELECT crash, fatal, backtrace, anr FROM android_log_abnormal;
+-------+-------+-----------+-----+
| crash | fatal | backtrace | anr |
+-------+-------+-----------+-----+
| 1 | 1 | 1 | 1 |
+-------+-------+-----------+-----+
DROP FLOW calc_android_log_abnormal;
Affected Rows: 0
DROP TABLE android_log_abnormal;
Affected Rows: 0
DROP TABLE android_log;
Affected Rows: 0
CREATE TABLE numbers_input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
SELECT
sum(case when number > 10 then 1 else 0 end)/count(number) as avg_after_filter_num
FROM
numbers_input_basic;
Affected Rows: 0
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "avg_after_filter_num" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
(22, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
SELECT avg_after_filter_num FROM out_num_cnt_basic;
+----------------------+
| avg_after_filter_num |
+----------------------+
| 1 |
+----------------------+
INSERT INTO
numbers_input_basic
VALUES
(10, "2021-07-01 00:00:00.200"),
(23, "2021-07-01 00:00:00.600");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
+----------------------------------------+
| ADMIN FLUSH_FLOW('test_numbers_basic') |
+----------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------+
DROP FLOW test_numbers_basic;
Affected Rows: 0
DROP TABLE numbers_input_basic;
Affected Rows: 0
DROP TABLE out_num_cnt_basic;
Affected Rows: 0
CREATE TABLE `live_connection_log`
(
`device_model` STRING NULL,
`connect_protocol` INT NULL,
`connect_mode` INT NULL,
`connect_retry_times` DOUBLE NULL,
`connect_result` INT NULL,
`first_frame_time` DOUBLE NULL,
`record_time` TIMESTAMP TIME INDEX,
`iot_online` INT NULL,
PRIMARY KEY (`device_model`,`connect_protocol`),
);
Affected Rows: 0
CREATE TABLE `live_connection_statistics_detail`
(
`device_model` STRING NULL,
`connect_protocol` INT NULL,
`connect_mode` INT NULL,
`avg_connect_retry_times` DOUBLE NULL,
`total_connect_result_ok` INT64 NULL,
`total_connect_result_fail` INT64 NULL,
`total_connect` INT64 NULL,
`conection_rate` DOUBLE NULL,
`avg_first_frame_time` DOUBLE NULL,
`max_first_frame_time` DOUBLE NULL,
`ok_conection_rate` DOUBLE NULL,
`record_time_window` TIMESTAMP TIME INDEX,
`update_at` TIMESTAMP,
PRIMARY KEY (`device_model`,`connect_protocol`),
);
Affected Rows: 0
CREATE FLOW live_connection_aggregation_detail
SINK TO live_connection_statistics_detail
AS
SELECT
device_model,
connect_protocol,
connect_mode,
avg(connect_retry_times) as avg_connect_retry_times,
sum(case when connect_result = 1 then 1 else 0 end) as total_connect_result_ok,
sum(case when connect_result = 0 then 1 else 0 end) as total_connect_result_fail,
count(connect_result) as total_connect,
sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as conection_rate,
avg(first_frame_time) as avg_first_frame_time,
max(first_frame_time) as max_first_frame_time,
sum(case when connect_result = 1 then 1 else 0 end)::double / count(connect_result) as ok_conection_rate,
date_bin(INTERVAL '1 minutes', record_time) as record_time_window,
FROM live_connection_log
WHERE iot_online = 1
GROUP BY
device_model,
connect_protocol,
connect_mode,
record_time_window;
Affected Rows: 0
INSERT INTO
live_connection_log
VALUES
("STM51", 1, 1, 0.5, 1, 0.1, 0, 1);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('live_connection_aggregation_detail');
+--------------------------------------------------------+
| ADMIN FLUSH_FLOW('live_connection_aggregation_detail') |
+--------------------------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------------------------+
SELECT device_model,
connect_protocol,
connect_mode,
avg_connect_retry_times,
total_connect_result_ok,
total_connect_result_fail,
total_connect,
conection_rate,
avg_first_frame_time,
max_first_frame_time,
ok_conection_rate,
record_time_window FROM live_connection_statistics_detail;
+--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+
| device_model | connect_protocol | connect_mode | avg_connect_retry_times | total_connect_result_ok | total_connect_result_fail | total_connect | conection_rate | avg_first_frame_time | max_first_frame_time | ok_conection_rate | record_time_window |
+--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+
| STM51 | 1 | 1 | 0.5 | 1 | 0 | 1 | 1.0 | 0.1 | 0.1 | 1.0 | 1970-01-01T00:00:00 |
+--------------+------------------+--------------+-------------------------+-------------------------+---------------------------+---------------+----------------+----------------------+----------------------+-------------------+---------------------+
-- Create a flow with same source and sink table
CREATE FLOW same_source_and_sink_table SINK TO live_connection_log AS
SELECT
*
FROM
live_connection_log;
Error: 1001(Unsupported), Unsupported operation Creating flow with source and sink table being the same: greptime.public.live_connection_log
DROP FLOW live_connection_aggregation_detail;
Affected Rows: 0
DROP TABLE live_connection_log;
Affected Rows: 0
DROP TABLE live_connection_statistics_detail;
Affected Rows: 0