mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-17 02:32:56 +00:00
* generic bundle trait * feat: impl get/let * fix: drop batch * test: tumble batch * feat: use batch eval flow * fix: div use arrow::div not mul * perf: not append batch * perf: use bool mask for reduce * perf: tiny opt * perf: refactor slow path * feat: opt if then * fix: WIP * perf: if then * chore: use trace instead * fix: reduce missing non-first batch * perf: flow if then using interleave * docs: add TODO * perf: remove unnecessary eq * chore: remove unused import * fix: run_available no longer loop forever * feat: blocking on high input buf * chore: increase threhold * chore: after rebase * chore: per review * chore: per review * fix: allow empty values in reduce&test * tests: more flow doc example tests * chore: per review * chore: per review
464 lines
8.1 KiB
SQL
464 lines
8.1 KiB
SQL
CREATE TABLE numbers_input_basic (
|
|
number INT,
|
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY(number),
|
|
TIME INDEX(ts)
|
|
);
|
|
|
|
CREATE FLOW test_numbers_basic SINK TO out_num_cnt_basic AS
|
|
SELECT
|
|
sum(number)
|
|
FROM
|
|
numbers_input_basic
|
|
GROUP BY
|
|
tumble(ts, '1 second', '2021-07-01 00:00:00');
|
|
|
|
-- TODO(discord9): confirm if it's necessary to flush flow here?
|
|
-- because flush_flow result is at most 1
|
|
admin flush_flow('test_numbers_basic');
|
|
|
|
-- SQLNESS ARG restart=true
|
|
INSERT INTO
|
|
numbers_input_basic
|
|
VALUES
|
|
(20, "2021-07-01 00:00:00.200"),
|
|
(22, "2021-07-01 00:00:00.600");
|
|
|
|
admin flush_flow('test_numbers_basic');
|
|
|
|
SELECT
|
|
"SUM(numbers_input_basic.number)",
|
|
window_start,
|
|
window_end
|
|
FROM
|
|
out_num_cnt_basic;
|
|
|
|
admin flush_flow('test_numbers_basic');
|
|
|
|
INSERT INTO
|
|
numbers_input_basic
|
|
VALUES
|
|
(23, "2021-07-01 00:00:01.000"),
|
|
(24, "2021-07-01 00:00:01.500");
|
|
|
|
admin flush_flow('test_numbers_basic');
|
|
|
|
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
|
SELECT
|
|
"SUM(numbers_input_basic.number)",
|
|
window_start,
|
|
window_end
|
|
FROM
|
|
out_num_cnt_basic;
|
|
|
|
DROP FLOW test_numbers_basic;
|
|
|
|
DROP TABLE numbers_input_basic;
|
|
|
|
DROP TABLE out_num_cnt_basic;
|
|
|
|
-- test distinct
|
|
CREATE TABLE distinct_basic (
|
|
number INT,
|
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY(number),
|
|
TIME INDEX(ts)
|
|
);
|
|
|
|
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
|
SELECT
|
|
DISTINCT number as dis
|
|
FROM
|
|
distinct_basic;
|
|
|
|
-- TODO(discord9): confirm if it's necessary to flush flow here?
|
|
-- because flush_flow result is at most 1
|
|
admin flush_flow('test_distinct_basic');
|
|
|
|
-- SQLNESS ARG restart=true
|
|
INSERT INTO
|
|
distinct_basic
|
|
VALUES
|
|
(20, "2021-07-01 00:00:00.200"),
|
|
(22, "2021-07-01 00:00:00.600");
|
|
|
|
admin flush_flow('test_distinct_basic');
|
|
|
|
SELECT
|
|
dis
|
|
FROM
|
|
out_distinct_basic;
|
|
|
|
admin flush_flow('test_distinct_basic');
|
|
|
|
INSERT INTO
|
|
distinct_basic
|
|
VALUES
|
|
(23, "2021-07-01 00:00:01.000"),
|
|
(24, "2021-07-01 00:00:01.500");
|
|
|
|
admin flush_flow('test_distinct_basic');
|
|
|
|
-- note that this quote-unquote column is a column-name, **not** a aggregation expr, generated by datafusion
|
|
SELECT
|
|
dis
|
|
FROM
|
|
out_distinct_basic;
|
|
|
|
DROP FLOW test_distinct_basic;
|
|
|
|
DROP TABLE distinct_basic;
|
|
|
|
DROP TABLE out_distinct_basic;
|
|
|
|
-- test interprete interval
|
|
CREATE TABLE numbers_input_basic (
|
|
number INT,
|
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY(number),
|
|
TIME INDEX(ts)
|
|
);
|
|
|
|
create table out_num_cnt_basic (
|
|
number INT,
|
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX
|
|
);
|
|
|
|
CREATE FLOW filter_numbers_basic SINK TO out_num_cnt_basic AS
|
|
SELECT
|
|
INTERVAL '1 day 1 second',
|
|
INTERVAL '1 month 1 day 1 second',
|
|
INTERVAL '1 year 1 month'
|
|
FROM
|
|
numbers_input_basic
|
|
where
|
|
number > 10;
|
|
|
|
SHOW CREATE FLOW filter_numbers_basic;
|
|
|
|
drop flow filter_numbers_basic;
|
|
|
|
drop table out_num_cnt_basic;
|
|
|
|
drop table numbers_input_basic;
|
|
|
|
CREATE TABLE bytes_log (
|
|
byte INT,
|
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
-- event time
|
|
TIME INDEX(ts)
|
|
);
|
|
|
|
-- TODO(discord9): remove this after auto infer table's time index is impl
|
|
CREATE TABLE approx_rate (
|
|
rate DOUBLE,
|
|
time_window TIMESTAMP,
|
|
update_at TIMESTAMP,
|
|
TIME INDEX(time_window)
|
|
);
|
|
|
|
CREATE FLOW find_approx_rate SINK TO approx_rate AS
|
|
SELECT
|
|
(max(byte) - min(byte)) / 30.0 as rate,
|
|
date_bin(INTERVAL '30 second', ts) as time_window
|
|
from
|
|
bytes_log
|
|
GROUP BY
|
|
time_window;
|
|
|
|
INSERT INTO
|
|
bytes_log
|
|
VALUES
|
|
(101, '2025-01-01 00:00:01'),
|
|
(300, '2025-01-01 00:00:29');
|
|
|
|
admin flush_flow('find_approx_rate');
|
|
|
|
SELECT
|
|
rate,
|
|
time_window
|
|
FROM
|
|
approx_rate;
|
|
|
|
INSERT INTO
|
|
bytes_log
|
|
VALUES
|
|
(450, '2025-01-01 00:00:32'),
|
|
(500, '2025-01-01 00:00:37');
|
|
|
|
admin flush_flow('find_approx_rate');
|
|
|
|
SELECT
|
|
rate,
|
|
time_window
|
|
FROM
|
|
approx_rate;
|
|
|
|
DROP TABLE bytes_log;
|
|
|
|
DROP FLOW find_approx_rate;
|
|
|
|
DROP TABLE approx_rate;
|
|
|
|
-- input table
|
|
CREATE TABLE ngx_access_log (
|
|
client STRING,
|
|
country STRING,
|
|
access_time TIMESTAMP TIME INDEX
|
|
);
|
|
|
|
-- create flow task to calculate the distinct country
|
|
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
|
|
SELECT
|
|
DISTINCT country,
|
|
FROM
|
|
ngx_access_log;
|
|
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", "b", 0);
|
|
|
|
ADMIN FLUSH_FLOW('calc_ngx_country');
|
|
|
|
SELECT
|
|
"ngx_access_log.country"
|
|
FROM
|
|
ngx_country;
|
|
|
|
-- making sure distinct is working
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", "b", 1);
|
|
|
|
ADMIN FLUSH_FLOW('calc_ngx_country');
|
|
|
|
SELECT
|
|
"ngx_access_log.country"
|
|
FROM
|
|
ngx_country;
|
|
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", "c", 2);
|
|
|
|
ADMIN FLUSH_FLOW('calc_ngx_country');
|
|
|
|
SELECT
|
|
"ngx_access_log.country"
|
|
FROM
|
|
ngx_country;
|
|
|
|
DROP FLOW calc_ngx_country;
|
|
|
|
DROP TABLE ngx_access_log;
|
|
|
|
DROP TABLE ngx_country;
|
|
|
|
CREATE TABLE ngx_access_log (
|
|
client STRING,
|
|
country STRING,
|
|
access_time TIMESTAMP TIME INDEX
|
|
);
|
|
|
|
CREATE FLOW calc_ngx_country SINK TO ngx_country AS
|
|
SELECT
|
|
DISTINCT country,
|
|
-- this distinct is not necessary, but it's a good test to see if it works
|
|
date_bin(INTERVAL '1 hour', access_time) as time_window,
|
|
FROM
|
|
ngx_access_log
|
|
GROUP BY
|
|
country,
|
|
time_window;
|
|
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", "b", 0);
|
|
|
|
ADMIN FLUSH_FLOW('calc_ngx_country');
|
|
|
|
SELECT
|
|
"ngx_access_log.country",
|
|
time_window
|
|
FROM
|
|
ngx_country;
|
|
|
|
-- making sure distinct is working
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", "b", 1);
|
|
|
|
ADMIN FLUSH_FLOW('calc_ngx_country');
|
|
|
|
SELECT
|
|
"ngx_access_log.country",
|
|
time_window
|
|
FROM
|
|
ngx_country;
|
|
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", "c", 2);
|
|
|
|
ADMIN FLUSH_FLOW('calc_ngx_country');
|
|
|
|
SELECT
|
|
"ngx_access_log.country",
|
|
time_window
|
|
FROM
|
|
ngx_country;
|
|
|
|
DROP FLOW calc_ngx_country;
|
|
|
|
DROP TABLE ngx_access_log;
|
|
|
|
DROP TABLE ngx_country;
|
|
|
|
CREATE TABLE temp_sensor_data (
|
|
sensor_id INT,
|
|
loc STRING,
|
|
temperature DOUBLE,
|
|
ts TIMESTAMP TIME INDEX
|
|
);
|
|
|
|
CREATE TABLE temp_alerts (
|
|
sensor_id INT,
|
|
loc STRING,
|
|
max_temp DOUBLE,
|
|
ts TIMESTAMP TIME INDEX
|
|
);
|
|
|
|
CREATE FLOW temp_monitoring SINK TO temp_alerts AS
|
|
SELECT
|
|
sensor_id,
|
|
loc,
|
|
max(temperature) as max_temp,
|
|
FROM
|
|
temp_sensor_data
|
|
GROUP BY
|
|
sensor_id,
|
|
loc
|
|
HAVING
|
|
max_temp > 100;
|
|
|
|
INSERT INTO
|
|
temp_sensor_data
|
|
VALUES
|
|
(1, "room1", 50, 0);
|
|
|
|
ADMIN FLUSH_FLOW('temp_monitoring');
|
|
|
|
-- This table should not exist yet
|
|
SHOW TABLES LIKE 'temp_alerts';
|
|
|
|
INSERT INTO
|
|
temp_sensor_data
|
|
VALUES
|
|
(1, "room1", 150, 1);
|
|
|
|
ADMIN FLUSH_FLOW('temp_monitoring');
|
|
|
|
SHOW TABLES LIKE 'temp_alerts';
|
|
|
|
SELECT
|
|
sensor_id,
|
|
loc,
|
|
max_temp
|
|
FROM
|
|
temp_alerts;
|
|
|
|
INSERT INTO
|
|
temp_sensor_data
|
|
VALUES
|
|
(2, "room1", 0, 2);
|
|
|
|
ADMIN FLUSH_FLOW('temp_monitoring');
|
|
|
|
SELECT
|
|
sensor_id,
|
|
loc,
|
|
max_temp
|
|
FROM
|
|
temp_alerts;
|
|
|
|
DROP FLOW temp_monitoring;
|
|
|
|
DROP TABLE temp_sensor_data;
|
|
|
|
DROP TABLE temp_alerts;
|
|
|
|
CREATE TABLE ngx_access_log (
|
|
client STRING,
|
|
stat INT,
|
|
size INT,
|
|
access_time TIMESTAMP TIME INDEX
|
|
);
|
|
|
|
CREATE TABLE ngx_distribution (
|
|
stat INT,
|
|
bucket_size INT,
|
|
total_logs BIGINT,
|
|
time_window TIMESTAMP TIME INDEX,
|
|
update_at TIMESTAMP, -- auto generated column by flow engine
|
|
PRIMARY KEY(stat, bucket_size)
|
|
);
|
|
|
|
CREATE FLOW calc_ngx_distribution SINK TO ngx_distribution AS
|
|
SELECT
|
|
stat,
|
|
trunc(size, -1)::INT as bucket_size,
|
|
count(client) AS total_logs,
|
|
date_bin(INTERVAL '1 minutes', access_time) as time_window,
|
|
FROM
|
|
ngx_access_log
|
|
GROUP BY
|
|
stat,
|
|
time_window,
|
|
bucket_size;
|
|
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", 200, 100, 0);
|
|
|
|
ADMIN FLUSH_FLOW('calc_ngx_distribution');
|
|
|
|
SELECT
|
|
stat,
|
|
bucket_size,
|
|
total_logs,
|
|
time_window
|
|
FROM
|
|
ngx_distribution;
|
|
|
|
INSERT INTO
|
|
ngx_access_log
|
|
VALUES
|
|
("cli1", 200, 200, 1),
|
|
("cli1", 200, 205, 1),
|
|
("cli1", 200, 209, 1),
|
|
("cli1", 200, 210, 1),
|
|
("cli2", 200, 300, 1);
|
|
|
|
ADMIN FLUSH_FLOW('calc_ngx_distribution');
|
|
|
|
SELECT
|
|
stat,
|
|
bucket_size,
|
|
total_logs,
|
|
time_window
|
|
FROM
|
|
ngx_distribution;
|
|
|
|
DROP FLOW calc_ngx_distribution;
|
|
|
|
DROP TABLE ngx_access_log;
|
|
|
|
DROP TABLE ngx_distribution;
|