mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-07-03 12:30:40 +00:00
test: add flow scheduled now compat case (#8400)
Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
name = "flow_scheduled_now_dist_plan"
|
||||
reason = "Verify flow metadata containing now()/current_timestamp() created by old binaries binds scheduled logical time after upgrade."
|
||||
introduced_by = "PR #8389"
|
||||
topologies = ["distributed"]
|
||||
from_range = ["<=v1.1.1"]
|
||||
to_range = [">v1.1.1"]
|
||||
features = ["flow", "table"]
|
||||
owner = "flow"
|
||||
@@ -0,0 +1,19 @@
|
||||
CREATE TABLE compat_flow_now_input (
|
||||
ts TIMESTAMP(3) TIME INDEX,
|
||||
v DOUBLE,
|
||||
PRIMARY KEY(v)
|
||||
);
|
||||
|
||||
CREATE FLOW compat_flow_scheduled_now
|
||||
SINK TO compat_flow_now_sink
|
||||
EVAL INTERVAL '1s'
|
||||
AS
|
||||
SELECT
|
||||
date_bin(INTERVAL '1 second', ts) AS window_start,
|
||||
count(v) AS value_count,
|
||||
now() AS create_time,
|
||||
current_timestamp() AS cur_ts
|
||||
FROM compat_flow_now_input
|
||||
WHERE ts >= date_trunc('second', now()) - INTERVAL '1 second'
|
||||
AND ts < date_trunc('second', current_timestamp())
|
||||
GROUP BY date_bin(INTERVAL '1 second', ts);
|
||||
@@ -0,0 +1,81 @@
|
||||
SELECT flow_name, source_table_names
|
||||
FROM information_schema.flows
|
||||
WHERE flow_name = 'compat_flow_scheduled_now';
|
||||
|
||||
+---------------------------+-------------------------------------------------------------+
|
||||
| flow_name | source_table_names |
|
||||
+---------------------------+-------------------------------------------------------------+
|
||||
| compat_flow_scheduled_now | greptime.flow_scheduled_now_dist_plan.compat_flow_now_input |
|
||||
+---------------------------+-------------------------------------------------------------+
|
||||
|
||||
SHOW CREATE FLOW compat_flow_scheduled_now;
|
||||
|
||||
+---------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| Flow | Create Flow |
|
||||
+---------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| compat_flow_scheduled_now | CREATE FLOW IF NOT EXISTS compat_flow_scheduled_now |
|
||||
| | SINK TO flow_scheduled_now_dist_plan.compat_flow_now_sink |
|
||||
| | EVAL INTERVAL '1 s' |
|
||||
| | AS SELECT date_bin(INTERVAL '1 second', ts) AS window_start, count(v) AS value_count, now() AS create_time, current_timestamp() AS cur_ts FROM compat_flow_now_input WHERE ts >= date_trunc('second', now()) - INTERVAL '1 second' AND ts < date_trunc('second', current_timestamp()) GROUP BY date_bin(INTERVAL '1 second', ts) |
|
||||
+---------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
SHOW CREATE TABLE compat_flow_now_sink;
|
||||
|
||||
+----------------------+-----------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+----------------------+-----------------------------------------------------+
|
||||
| compat_flow_now_sink | CREATE TABLE IF NOT EXISTS "compat_flow_now_sink" ( |
|
||||
| | "window_start" TIMESTAMP(3) NOT NULL, |
|
||||
| | "value_count" BIGINT NULL, |
|
||||
| | "create_time" TIMESTAMP(9) NULL, |
|
||||
| | "cur_ts" TIMESTAMP(9) NULL, |
|
||||
| | "update_at" TIMESTAMP(3) NULL, |
|
||||
| | TIME INDEX ("window_start") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | 'comment' = 'Auto created table by flow engine' |
|
||||
| | ) |
|
||||
+----------------------+-----------------------------------------------------+
|
||||
|
||||
INSERT INTO compat_flow_now_input VALUES
|
||||
(date_trunc('second', now()) - INTERVAL '1 second', 0.0),
|
||||
(date_trunc('second', now()), 1.0),
|
||||
(date_trunc('second', now()) + INTERVAL '1 second', 2.0),
|
||||
(date_trunc('second', now()) + INTERVAL '2 seconds', 3.0),
|
||||
(date_trunc('second', now()) + INTERVAL '3 seconds', 4.0),
|
||||
(date_trunc('second', now()) + INTERVAL '4 seconds', 5.0),
|
||||
(date_trunc('second', now()) + INTERVAL '5 seconds', 6.0),
|
||||
(date_trunc('second', now()) + INTERVAL '6 seconds', 7.0),
|
||||
(date_trunc('second', now()) + INTERVAL '7 seconds', 8.0);
|
||||
|
||||
Affected Rows: 9
|
||||
|
||||
-- SQLNESS SLEEP 4s
|
||||
SELECT
|
||||
count(DISTINCT window_start) >= 1 AS has_selected_window,
|
||||
min(value_count) AS min_value_count,
|
||||
max(value_count) AS max_value_count,
|
||||
bool_and(create_time = date_trunc('second', create_time)) AS all_create_time_at_second_boundary,
|
||||
bool_and(cur_ts = create_time) AS cur_ts_equals_create_time,
|
||||
bool_and(window_start = create_time - INTERVAL '1 second') AS all_windows_match_scheduled_previous_second
|
||||
FROM compat_flow_now_sink
|
||||
WHERE value_count > 0;
|
||||
|
||||
+---------------------+-----------------+-----------------+------------------------------------+---------------------------+---------------------------------------------+
|
||||
| has_selected_window | min_value_count | max_value_count | all_create_time_at_second_boundary | cur_ts_equals_create_time | all_windows_match_scheduled_previous_second |
|
||||
+---------------------+-----------------+-----------------+------------------------------------+---------------------------+---------------------------------------------+
|
||||
| true | 1 | 1 | true | true | true |
|
||||
+---------------------+-----------------+-----------------+------------------------------------+---------------------------+---------------------------------------------+
|
||||
|
||||
SELECT
|
||||
bool_and(value_count = 1) AS all_value_count_equals_one
|
||||
FROM compat_flow_now_sink
|
||||
WHERE value_count > 0;
|
||||
|
||||
+----------------------------+
|
||||
| all_value_count_equals_one |
|
||||
+----------------------------+
|
||||
| true |
|
||||
+----------------------------+
|
||||
@@ -0,0 +1,34 @@
|
||||
SELECT flow_name, source_table_names
|
||||
FROM information_schema.flows
|
||||
WHERE flow_name = 'compat_flow_scheduled_now';
|
||||
|
||||
SHOW CREATE FLOW compat_flow_scheduled_now;
|
||||
|
||||
SHOW CREATE TABLE compat_flow_now_sink;
|
||||
|
||||
INSERT INTO compat_flow_now_input VALUES
|
||||
(date_trunc('second', now()) - INTERVAL '1 second', 0.0),
|
||||
(date_trunc('second', now()), 1.0),
|
||||
(date_trunc('second', now()) + INTERVAL '1 second', 2.0),
|
||||
(date_trunc('second', now()) + INTERVAL '2 seconds', 3.0),
|
||||
(date_trunc('second', now()) + INTERVAL '3 seconds', 4.0),
|
||||
(date_trunc('second', now()) + INTERVAL '4 seconds', 5.0),
|
||||
(date_trunc('second', now()) + INTERVAL '5 seconds', 6.0),
|
||||
(date_trunc('second', now()) + INTERVAL '6 seconds', 7.0),
|
||||
(date_trunc('second', now()) + INTERVAL '7 seconds', 8.0);
|
||||
|
||||
-- SQLNESS SLEEP 4s
|
||||
SELECT
|
||||
count(DISTINCT window_start) >= 1 AS has_selected_window,
|
||||
min(value_count) AS min_value_count,
|
||||
max(value_count) AS max_value_count,
|
||||
bool_and(create_time = date_trunc('second', create_time)) AS all_create_time_at_second_boundary,
|
||||
bool_and(cur_ts = create_time) AS cur_ts_equals_create_time,
|
||||
bool_and(window_start = create_time - INTERVAL '1 second') AS all_windows_match_scheduled_previous_second
|
||||
FROM compat_flow_now_sink
|
||||
WHERE value_count > 0;
|
||||
|
||||
SELECT
|
||||
bool_and(value_count = 1) AS all_value_count_equals_one
|
||||
FROM compat_flow_now_sink
|
||||
WHERE value_count > 0;
|
||||
Reference in New Issue
Block a user