mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-04 13:00:38 +00:00
* feat(flow): stabilize eval interval scheduling Signed-off-by: discord9 <discord9@163.com> * fix(flow): satisfy eval schedule clippy Signed-off-by: discord9 <discord9@163.com> * test(flow): trim eval schedule coverage Signed-off-by: discord9 <discord9@163.com> * test(flow): cover stable eval scheduling Signed-off-by: discord9 <discord9@163.com> * fix(flow): reserve scheduled runtime hint Signed-off-by: discord9 <discord9@163.com> * test(flow): trim sqlness result eof Signed-off-by: discord9 <discord9@163.com> * fix(flow): harden eval schedule edges Signed-off-by: discord9 <discord9@163.com> * fix(flow): address scheduled flow review Signed-off-by: discord9 <discord9@163.com> * fix(flow): clean scheduled config handling Signed-off-by: discord9 <discord9@163.com> * test(flow): add eval interval compat case Signed-off-by: discord9 <discord9@163.com> * test(flow): cover show create flow in compat Signed-off-by: discord9 <discord9@163.com> * fix(flow): drop scheduled time from flow context Signed-off-by: discord9 <discord9@163.com> * test(flow): assert scheduled now binding Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
120 lines
6.6 KiB
Plaintext
120 lines
6.6 KiB
Plaintext
SELECT flow_name, source_table_names
|
|
FROM information_schema.flows
|
|
WHERE flow_name IN ('compat_sql_eval_flow', 'compat_tql_eval_flow')
|
|
ORDER BY flow_name;
|
|
|
|
+----------------------+----------------------------------------------+
|
|
| flow_name | source_table_names |
|
|
+----------------------+----------------------------------------------+
|
|
| compat_sql_eval_flow | greptime.flow_eval_interval.compat_sql_input |
|
|
| compat_tql_eval_flow | greptime.flow_eval_interval.compat_tql_input |
|
|
+----------------------+----------------------------------------------+
|
|
|
|
SHOW CREATE FLOW compat_sql_eval_flow;
|
|
|
|
+----------------------+---------------------------------------------------------------------------------------------------------------------------------+
|
|
| Flow | Create Flow |
|
|
+----------------------+---------------------------------------------------------------------------------------------------------------------------------+
|
|
| compat_sql_eval_flow | CREATE FLOW IF NOT EXISTS compat_sql_eval_flow |
|
|
| | SINK TO flow_eval_interval.compat_sql_eval_sink |
|
|
| | EVAL INTERVAL '1 s' |
|
|
| | AS SELECT date_trunc('second', now()) AS ts, count(v) AS value_count FROM compat_sql_input GROUP BY date_trunc('second', now()) |
|
|
+----------------------+---------------------------------------------------------------------------------------------------------------------------------+
|
|
|
|
SHOW CREATE FLOW compat_tql_eval_flow;
|
|
|
|
+----------------------+-------------------------------------------------------------------------------------------------+
|
|
| Flow | Create Flow |
|
|
+----------------------+-------------------------------------------------------------------------------------------------+
|
|
| compat_tql_eval_flow | CREATE FLOW IF NOT EXISTS compat_tql_eval_flow |
|
|
| | SINK TO flow_eval_interval.compat_tql_eval_sink |
|
|
| | EVAL INTERVAL '1 s' |
|
|
| | AS TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", compat_tql_input) |
|
|
+----------------------+-------------------------------------------------------------------------------------------------+
|
|
|
|
SHOW CREATE TABLE compat_sql_eval_sink;
|
|
|
|
+----------------------+-----------------------------------------------------+
|
|
| Table | Create Table |
|
|
+----------------------+-----------------------------------------------------+
|
|
| compat_sql_eval_sink | CREATE TABLE IF NOT EXISTS "compat_sql_eval_sink" ( |
|
|
| | "ts" TIMESTAMP(9) NOT NULL, |
|
|
| | "value_count" BIGINT NULL, |
|
|
| | "update_at" TIMESTAMP(3) NULL, |
|
|
| | TIME INDEX ("ts") |
|
|
| | ) |
|
|
| | |
|
|
| | ENGINE=mito |
|
|
| | WITH( |
|
|
| | 'comment' = 'Auto created table by flow engine' |
|
|
| | ) |
|
|
+----------------------+-----------------------------------------------------+
|
|
|
|
SHOW CREATE TABLE compat_tql_eval_sink;
|
|
|
|
+----------------------+-----------------------------------------------------+
|
|
| Table | Create Table |
|
|
+----------------------+-----------------------------------------------------+
|
|
| compat_tql_eval_sink | CREATE TABLE IF NOT EXISTS "compat_tql_eval_sink" ( |
|
|
| | "count(compat_tql_input.val)" DOUBLE NULL, |
|
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
|
| | "status_code" STRING NULL, |
|
|
| | TIME INDEX ("ts"), |
|
|
| | PRIMARY KEY ("status_code") |
|
|
| | ) |
|
|
| | |
|
|
| | ENGINE=mito |
|
|
| | WITH( |
|
|
| | 'comment' = 'Auto created table by flow engine' |
|
|
| | ) |
|
|
+----------------------+-----------------------------------------------------+
|
|
|
|
INSERT INTO compat_sql_input VALUES
|
|
('2026-06-25 00:00:00', 'a', 1.0),
|
|
('2026-06-25 00:00:01', 'b', 2.0);
|
|
|
|
Affected Rows: 2
|
|
|
|
INSERT INTO compat_tql_input VALUES
|
|
(now() - '17s'::interval, 'host1', 'idc1', 200),
|
|
(now() - '13s'::interval, 'host2', 'idc1', 401),
|
|
(now() - '7s'::interval, 'host3', 'idc2', 500);
|
|
|
|
Affected Rows: 3
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('compat_sql_eval_flow');
|
|
|
|
+------------------------------------------+
|
|
| ADMIN FLUSH_FLOW('compat_sql_eval_flow') |
|
|
+------------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+------------------------------------------+
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('compat_tql_eval_flow');
|
|
|
|
+------------------------------------------+
|
|
| ADMIN FLUSH_FLOW('compat_tql_eval_flow') |
|
|
+------------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+------------------------------------------+
|
|
|
|
SELECT count(*) > 0 AS sql_flow_executed
|
|
FROM compat_sql_eval_sink;
|
|
|
|
+-------------------+
|
|
| sql_flow_executed |
|
|
+-------------------+
|
|
| true |
|
|
+-------------------+
|
|
|
|
SELECT count(*) > 0 AS tql_flow_executed
|
|
FROM compat_tql_eval_sink;
|
|
|
|
+-------------------+
|
|
| tql_flow_executed |
|
|
+-------------------+
|
|
| true |
|
|
+-------------------+
|