Files
greptimedb/tests/cases/standalone/common/flow/flow_incremental_aggr.result
discord9 28fd796f4e fix(flow): harden incremental read correctness (#8196)
* fix(flow): harden incremental read correctness

Signed-off-by: discord9 <discord9@163.com>

* fix(flow): propagate dirty window options

Signed-off-by: discord9 <discord9@163.com>

* test: more

Signed-off-by: discord9 <discord9@163.com>

* chore: test config api

Signed-off-by: discord9 <discord9@163.com>

* refactor: split gen

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* fix: allowlist key

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-06-01 02:48:00 +00:00

150 lines
4.6 KiB
Plaintext

-- Incremental aggregate reads only support append-only source tables because
-- update/upsert sources need old-value compensation.
CREATE TABLE incremental_non_append_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
);
Affected Rows: 0
CREATE FLOW incremental_non_append_flow SINK TO incremental_non_append_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
incremental_non_append_input
GROUP BY
time_window;
Error: 3001(EngineExecuteQuery), Unsupported: Flow incremental read requires append-only source table, but source table `greptime.public.incremental_non_append_input` is not append-only. Consider setting append_mode='true' on the source table or disabling experimental_enable_incremental_read
DROP TABLE incremental_non_append_input;
Affected Rows: 0
CREATE TABLE incremental_aggr_input (
host_id INT,
n INT,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(host_id)
) WITH (
append_mode = 'true'
);
Affected Rows: 0
CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sum(n) AS total,
min(n) AS min_n,
max(n) AS max_n,
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
FROM
incremental_aggr_input
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO incremental_aggr_input VALUES
(1, 10, '2024-01-01 00:00:00'),
(2, 20, '2024-01-01 00:00:30');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('incremental_aggr_flow');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('incremental_aggr_flow') |
+-------------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 30 | 10 | 20 | 2024-01-01T00:00:00 |
+-------+-------+-------+---------------------+
-- Move already checkpointed source rows into SST. The next incremental run
-- must still read only the memtable delta and must not merge these old SST
-- rows again.
ADMIN FLUSH_TABLE('incremental_aggr_input');
+---------------------------------------------+
| ADMIN FLUSH_TABLE('incremental_aggr_input') |
+---------------------------------------------+
| 0 |
+---------------------------------------------+
-- Insert more rows into the same time window. An incremental-safe flow should
-- merge the delta aggregate with the existing sink aggregate state.
INSERT INTO incremental_aggr_input VALUES
(3, 30, '2024-01-01 00:00:15'),
(4, 40, '2024-01-01 00:00:45');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('incremental_aggr_flow');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('incremental_aggr_flow') |
+-------------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 100 | 10 | 40 | 2024-01-01T00:00:00 |
+-------+-------+-------+---------------------+
-- Insert a row into a new time window to cover append of a new aggregate key.
INSERT INTO incremental_aggr_input VALUES
(5, 50, '2024-01-01 00:01:00');
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('incremental_aggr_flow');
+-------------------------------------------+
| ADMIN FLUSH_FLOW('incremental_aggr_flow') |
+-------------------------------------------+
| FLOW_FLUSHED |
+-------------------------------------------+
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
+-------+-------+-------+---------------------+
| total | min_n | max_n | time_window |
+-------+-------+-------+---------------------+
| 100 | 10 | 40 | 2024-01-01T00:00:00 |
| 50 | 50 | 50 | 2024-01-01T00:01:00 |
+-------+-------+-------+---------------------+
DROP FLOW incremental_aggr_flow;
Affected Rows: 0
DROP TABLE incremental_aggr_input;
Affected Rows: 0
DROP TABLE incremental_aggr_sink;
Affected Rows: 0