Files
discord9 d3cc1b1888 feat(flow): stabilize eval interval scheduling (#8360)
* feat(flow): stabilize eval interval scheduling

Signed-off-by: discord9 <discord9@163.com>

* fix(flow): satisfy eval schedule clippy

Signed-off-by: discord9 <discord9@163.com>

* test(flow): trim eval schedule coverage

Signed-off-by: discord9 <discord9@163.com>

* test(flow): cover stable eval scheduling

Signed-off-by: discord9 <discord9@163.com>

* fix(flow): reserve scheduled runtime hint

Signed-off-by: discord9 <discord9@163.com>

* test(flow): trim sqlness result eof

Signed-off-by: discord9 <discord9@163.com>

* fix(flow): harden eval schedule edges

Signed-off-by: discord9 <discord9@163.com>

* fix(flow): address scheduled flow review

Signed-off-by: discord9 <discord9@163.com>

* fix(flow): clean scheduled config handling

Signed-off-by: discord9 <discord9@163.com>

* test(flow): add eval interval compat case

Signed-off-by: discord9 <discord9@163.com>

* test(flow): cover show create flow in compat

Signed-off-by: discord9 <discord9@163.com>

* fix(flow): drop scheduled time from flow context

Signed-off-by: discord9 <discord9@163.com>

* test(flow): assert scheduled now binding

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-06-29 09:55:27 +00:00

120 lines
6.6 KiB
Plaintext

SELECT flow_name, source_table_names
FROM information_schema.flows
WHERE flow_name IN ('compat_sql_eval_flow', 'compat_tql_eval_flow')
ORDER BY flow_name;
+----------------------+----------------------------------------------+
| flow_name | source_table_names |
+----------------------+----------------------------------------------+
| compat_sql_eval_flow | greptime.flow_eval_interval.compat_sql_input |
| compat_tql_eval_flow | greptime.flow_eval_interval.compat_tql_input |
+----------------------+----------------------------------------------+
SHOW CREATE FLOW compat_sql_eval_flow;
+----------------------+---------------------------------------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+----------------------+---------------------------------------------------------------------------------------------------------------------------------+
| compat_sql_eval_flow | CREATE FLOW IF NOT EXISTS compat_sql_eval_flow |
| | SINK TO flow_eval_interval.compat_sql_eval_sink |
| | EVAL INTERVAL '1 s' |
| | AS SELECT date_trunc('second', now()) AS ts, count(v) AS value_count FROM compat_sql_input GROUP BY date_trunc('second', now()) |
+----------------------+---------------------------------------------------------------------------------------------------------------------------------+
SHOW CREATE FLOW compat_tql_eval_flow;
+----------------------+-------------------------------------------------------------------------------------------------+
| Flow | Create Flow |
+----------------------+-------------------------------------------------------------------------------------------------+
| compat_tql_eval_flow | CREATE FLOW IF NOT EXISTS compat_tql_eval_flow |
| | SINK TO flow_eval_interval.compat_tql_eval_sink |
| | EVAL INTERVAL '1 s' |
| | AS TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", compat_tql_input) |
+----------------------+-------------------------------------------------------------------------------------------------+
SHOW CREATE TABLE compat_sql_eval_sink;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| compat_sql_eval_sink | CREATE TABLE IF NOT EXISTS "compat_sql_eval_sink" ( |
| | "ts" TIMESTAMP(9) NOT NULL, |
| | "value_count" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------------------+-----------------------------------------------------+
SHOW CREATE TABLE compat_tql_eval_sink;
+----------------------+-----------------------------------------------------+
| Table | Create Table |
+----------------------+-----------------------------------------------------+
| compat_tql_eval_sink | CREATE TABLE IF NOT EXISTS "compat_tql_eval_sink" ( |
| | "count(compat_tql_input.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------------------+-----------------------------------------------------+
INSERT INTO compat_sql_input VALUES
('2026-06-25 00:00:00', 'a', 1.0),
('2026-06-25 00:00:01', 'b', 2.0);
Affected Rows: 2
INSERT INTO compat_tql_input VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
(now() - '13s'::interval, 'host2', 'idc1', 401),
(now() - '7s'::interval, 'host3', 'idc2', 500);
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('compat_sql_eval_flow');
+------------------------------------------+
| ADMIN FLUSH_FLOW('compat_sql_eval_flow') |
+------------------------------------------+
| FLOW_FLUSHED |
+------------------------------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('compat_tql_eval_flow');
+------------------------------------------+
| ADMIN FLUSH_FLOW('compat_tql_eval_flow') |
+------------------------------------------+
| FLOW_FLUSHED |
+------------------------------------------+
SELECT count(*) > 0 AS sql_flow_executed
FROM compat_sql_eval_sink;
+-------------------+
| sql_flow_executed |
+-------------------+
| true |
+-------------------+
SELECT count(*) > 0 AS tql_flow_executed
FROM compat_tql_eval_sink;
+-------------------+
| tql_flow_executed |
+-------------------+
| true |
+-------------------+