From 424e400c80df87bc86de87f332ba646c02e166af Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 3 Jul 2026 14:31:40 +0800 Subject: [PATCH] test: add flow scheduled now compat case (#8400) Signed-off-by: discord9 --- .../flow_scheduled_now_dist_plan/case.toml | 8 ++ .../flow_scheduled_now_dist_plan/setup.sql | 19 +++++ .../verify.result | 81 +++++++++++++++++++ .../flow_scheduled_now_dist_plan/verify.sql | 34 ++++++++ 4 files changed, 142 insertions(+) create mode 100644 tests/compatibility/cases/flow_scheduled_now_dist_plan/case.toml create mode 100644 tests/compatibility/cases/flow_scheduled_now_dist_plan/setup.sql create mode 100644 tests/compatibility/cases/flow_scheduled_now_dist_plan/verify.result create mode 100644 tests/compatibility/cases/flow_scheduled_now_dist_plan/verify.sql diff --git a/tests/compatibility/cases/flow_scheduled_now_dist_plan/case.toml b/tests/compatibility/cases/flow_scheduled_now_dist_plan/case.toml new file mode 100644 index 0000000000..cfab71575b --- /dev/null +++ b/tests/compatibility/cases/flow_scheduled_now_dist_plan/case.toml @@ -0,0 +1,8 @@ +name = "flow_scheduled_now_dist_plan" +reason = "Verify flow metadata containing now()/current_timestamp() created by old binaries binds scheduled logical time after upgrade." +introduced_by = "PR #8389" +topologies = ["distributed"] +from_range = ["<=v1.1.1"] +to_range = [">v1.1.1"] +features = ["flow", "table"] +owner = "flow" diff --git a/tests/compatibility/cases/flow_scheduled_now_dist_plan/setup.sql b/tests/compatibility/cases/flow_scheduled_now_dist_plan/setup.sql new file mode 100644 index 0000000000..d4d6c713ec --- /dev/null +++ b/tests/compatibility/cases/flow_scheduled_now_dist_plan/setup.sql @@ -0,0 +1,19 @@ +CREATE TABLE compat_flow_now_input ( + ts TIMESTAMP(3) TIME INDEX, + v DOUBLE, + PRIMARY KEY(v) +); + +CREATE FLOW compat_flow_scheduled_now +SINK TO compat_flow_now_sink +EVAL INTERVAL '1s' +AS +SELECT + date_bin(INTERVAL '1 second', ts) AS window_start, + count(v) AS value_count, + now() AS create_time, + current_timestamp() AS cur_ts +FROM compat_flow_now_input +WHERE ts >= date_trunc('second', now()) - INTERVAL '1 second' + AND ts < date_trunc('second', current_timestamp()) +GROUP BY date_bin(INTERVAL '1 second', ts); diff --git a/tests/compatibility/cases/flow_scheduled_now_dist_plan/verify.result b/tests/compatibility/cases/flow_scheduled_now_dist_plan/verify.result new file mode 100644 index 0000000000..0a30135b64 --- /dev/null +++ b/tests/compatibility/cases/flow_scheduled_now_dist_plan/verify.result @@ -0,0 +1,81 @@ +SELECT flow_name, source_table_names +FROM information_schema.flows +WHERE flow_name = 'compat_flow_scheduled_now'; + ++---------------------------+-------------------------------------------------------------+ +| flow_name | source_table_names | ++---------------------------+-------------------------------------------------------------+ +| compat_flow_scheduled_now | greptime.flow_scheduled_now_dist_plan.compat_flow_now_input | ++---------------------------+-------------------------------------------------------------+ + +SHOW CREATE FLOW compat_flow_scheduled_now; + ++---------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Flow | Create Flow | ++---------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| compat_flow_scheduled_now | CREATE FLOW IF NOT EXISTS compat_flow_scheduled_now | +| | SINK TO flow_scheduled_now_dist_plan.compat_flow_now_sink | +| | EVAL INTERVAL '1 s' | +| | AS SELECT date_bin(INTERVAL '1 second', ts) AS window_start, count(v) AS value_count, now() AS create_time, current_timestamp() AS cur_ts FROM compat_flow_now_input WHERE ts >= date_trunc('second', now()) - INTERVAL '1 second' AND ts < date_trunc('second', current_timestamp()) GROUP BY date_bin(INTERVAL '1 second', ts) | ++---------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +SHOW CREATE TABLE compat_flow_now_sink; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| compat_flow_now_sink | CREATE TABLE IF NOT EXISTS "compat_flow_now_sink" ( | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "value_count" BIGINT NULL, | +| | "create_time" TIMESTAMP(9) NULL, | +| | "cur_ts" TIMESTAMP(9) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++----------------------+-----------------------------------------------------+ + +INSERT INTO compat_flow_now_input VALUES + (date_trunc('second', now()) - INTERVAL '1 second', 0.0), + (date_trunc('second', now()), 1.0), + (date_trunc('second', now()) + INTERVAL '1 second', 2.0), + (date_trunc('second', now()) + INTERVAL '2 seconds', 3.0), + (date_trunc('second', now()) + INTERVAL '3 seconds', 4.0), + (date_trunc('second', now()) + INTERVAL '4 seconds', 5.0), + (date_trunc('second', now()) + INTERVAL '5 seconds', 6.0), + (date_trunc('second', now()) + INTERVAL '6 seconds', 7.0), + (date_trunc('second', now()) + INTERVAL '7 seconds', 8.0); + +Affected Rows: 9 + +-- SQLNESS SLEEP 4s +SELECT + count(DISTINCT window_start) >= 1 AS has_selected_window, + min(value_count) AS min_value_count, + max(value_count) AS max_value_count, + bool_and(create_time = date_trunc('second', create_time)) AS all_create_time_at_second_boundary, + bool_and(cur_ts = create_time) AS cur_ts_equals_create_time, + bool_and(window_start = create_time - INTERVAL '1 second') AS all_windows_match_scheduled_previous_second +FROM compat_flow_now_sink +WHERE value_count > 0; + ++---------------------+-----------------+-----------------+------------------------------------+---------------------------+---------------------------------------------+ +| has_selected_window | min_value_count | max_value_count | all_create_time_at_second_boundary | cur_ts_equals_create_time | all_windows_match_scheduled_previous_second | ++---------------------+-----------------+-----------------+------------------------------------+---------------------------+---------------------------------------------+ +| true | 1 | 1 | true | true | true | ++---------------------+-----------------+-----------------+------------------------------------+---------------------------+---------------------------------------------+ + +SELECT + bool_and(value_count = 1) AS all_value_count_equals_one +FROM compat_flow_now_sink +WHERE value_count > 0; + ++----------------------------+ +| all_value_count_equals_one | ++----------------------------+ +| true | ++----------------------------+ diff --git a/tests/compatibility/cases/flow_scheduled_now_dist_plan/verify.sql b/tests/compatibility/cases/flow_scheduled_now_dist_plan/verify.sql new file mode 100644 index 0000000000..9325681bc4 --- /dev/null +++ b/tests/compatibility/cases/flow_scheduled_now_dist_plan/verify.sql @@ -0,0 +1,34 @@ +SELECT flow_name, source_table_names +FROM information_schema.flows +WHERE flow_name = 'compat_flow_scheduled_now'; + +SHOW CREATE FLOW compat_flow_scheduled_now; + +SHOW CREATE TABLE compat_flow_now_sink; + +INSERT INTO compat_flow_now_input VALUES + (date_trunc('second', now()) - INTERVAL '1 second', 0.0), + (date_trunc('second', now()), 1.0), + (date_trunc('second', now()) + INTERVAL '1 second', 2.0), + (date_trunc('second', now()) + INTERVAL '2 seconds', 3.0), + (date_trunc('second', now()) + INTERVAL '3 seconds', 4.0), + (date_trunc('second', now()) + INTERVAL '4 seconds', 5.0), + (date_trunc('second', now()) + INTERVAL '5 seconds', 6.0), + (date_trunc('second', now()) + INTERVAL '6 seconds', 7.0), + (date_trunc('second', now()) + INTERVAL '7 seconds', 8.0); + +-- SQLNESS SLEEP 4s +SELECT + count(DISTINCT window_start) >= 1 AS has_selected_window, + min(value_count) AS min_value_count, + max(value_count) AS max_value_count, + bool_and(create_time = date_trunc('second', create_time)) AS all_create_time_at_second_boundary, + bool_and(cur_ts = create_time) AS cur_ts_equals_create_time, + bool_and(window_start = create_time - INTERVAL '1 second') AS all_windows_match_scheduled_previous_second +FROM compat_flow_now_sink +WHERE value_count > 0; + +SELECT + bool_and(value_count = 1) AS all_value_count_equals_one +FROM compat_flow_now_sink +WHERE value_count > 0;