mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-03 13:50:40 +00:00
* fix(flow): harden incremental read correctness Signed-off-by: discord9 <discord9@163.com> * fix(flow): propagate dirty window options Signed-off-by: discord9 <discord9@163.com> * test: more Signed-off-by: discord9 <discord9@163.com> * chore: test config api Signed-off-by: discord9 <discord9@163.com> * refactor: split gen Signed-off-by: discord9 <discord9@163.com> * chore: per review Signed-off-by: discord9 <discord9@163.com> * fix: allowlist key Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
150 lines
4.6 KiB
Plaintext
150 lines
4.6 KiB
Plaintext
-- Incremental aggregate reads only support append-only source tables because
|
|
-- update/upsert sources need old-value compensation.
|
|
CREATE TABLE incremental_non_append_input (
|
|
host_id INT,
|
|
n INT,
|
|
ts TIMESTAMP TIME INDEX,
|
|
PRIMARY KEY(host_id)
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE FLOW incremental_non_append_flow SINK TO incremental_non_append_sink
|
|
WITH (experimental_enable_incremental_read = 'true')
|
|
AS
|
|
SELECT
|
|
sum(n) AS total,
|
|
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
|
|
FROM
|
|
incremental_non_append_input
|
|
GROUP BY
|
|
time_window;
|
|
|
|
Error: 3001(EngineExecuteQuery), Unsupported: Flow incremental read requires append-only source table, but source table `greptime.public.incremental_non_append_input` is not append-only. Consider setting append_mode='true' on the source table or disabling experimental_enable_incremental_read
|
|
|
|
DROP TABLE incremental_non_append_input;
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE TABLE incremental_aggr_input (
|
|
host_id INT,
|
|
n INT,
|
|
ts TIMESTAMP TIME INDEX,
|
|
PRIMARY KEY(host_id)
|
|
) WITH (
|
|
append_mode = 'true'
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE FLOW incremental_aggr_flow SINK TO incremental_aggr_sink
|
|
WITH (experimental_enable_incremental_read = 'true')
|
|
AS
|
|
SELECT
|
|
sum(n) AS total,
|
|
min(n) AS min_n,
|
|
max(n) AS max_n,
|
|
date_bin(INTERVAL '1 minute', ts, '2024-01-01 00:00:00') AS time_window
|
|
FROM
|
|
incremental_aggr_input
|
|
GROUP BY
|
|
time_window;
|
|
|
|
Affected Rows: 0
|
|
|
|
INSERT INTO incremental_aggr_input VALUES
|
|
(1, 10, '2024-01-01 00:00:00'),
|
|
(2, 20, '2024-01-01 00:00:30');
|
|
|
|
Affected Rows: 2
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('incremental_aggr_flow');
|
|
|
|
+-------------------------------------------+
|
|
| ADMIN FLUSH_FLOW('incremental_aggr_flow') |
|
|
+-------------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+-------------------------------------------+
|
|
|
|
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
|
|
|
|
+-------+-------+-------+---------------------+
|
|
| total | min_n | max_n | time_window |
|
|
+-------+-------+-------+---------------------+
|
|
| 30 | 10 | 20 | 2024-01-01T00:00:00 |
|
|
+-------+-------+-------+---------------------+
|
|
|
|
-- Move already checkpointed source rows into SST. The next incremental run
|
|
-- must still read only the memtable delta and must not merge these old SST
|
|
-- rows again.
|
|
ADMIN FLUSH_TABLE('incremental_aggr_input');
|
|
|
|
+---------------------------------------------+
|
|
| ADMIN FLUSH_TABLE('incremental_aggr_input') |
|
|
+---------------------------------------------+
|
|
| 0 |
|
|
+---------------------------------------------+
|
|
|
|
-- Insert more rows into the same time window. An incremental-safe flow should
|
|
-- merge the delta aggregate with the existing sink aggregate state.
|
|
INSERT INTO incremental_aggr_input VALUES
|
|
(3, 30, '2024-01-01 00:00:15'),
|
|
(4, 40, '2024-01-01 00:00:45');
|
|
|
|
Affected Rows: 2
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('incremental_aggr_flow');
|
|
|
|
+-------------------------------------------+
|
|
| ADMIN FLUSH_FLOW('incremental_aggr_flow') |
|
|
+-------------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+-------------------------------------------+
|
|
|
|
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
|
|
|
|
+-------+-------+-------+---------------------+
|
|
| total | min_n | max_n | time_window |
|
|
+-------+-------+-------+---------------------+
|
|
| 100 | 10 | 40 | 2024-01-01T00:00:00 |
|
|
+-------+-------+-------+---------------------+
|
|
|
|
-- Insert a row into a new time window to cover append of a new aggregate key.
|
|
INSERT INTO incremental_aggr_input VALUES
|
|
(5, 50, '2024-01-01 00:01:00');
|
|
|
|
Affected Rows: 1
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('incremental_aggr_flow');
|
|
|
|
+-------------------------------------------+
|
|
| ADMIN FLUSH_FLOW('incremental_aggr_flow') |
|
|
+-------------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+-------------------------------------------+
|
|
|
|
SELECT total, min_n, max_n, time_window FROM incremental_aggr_sink ORDER BY time_window;
|
|
|
|
+-------+-------+-------+---------------------+
|
|
| total | min_n | max_n | time_window |
|
|
+-------+-------+-------+---------------------+
|
|
| 100 | 10 | 40 | 2024-01-01T00:00:00 |
|
|
| 50 | 50 | 50 | 2024-01-01T00:01:00 |
|
|
+-------+-------+-------+---------------------+
|
|
|
|
DROP FLOW incremental_aggr_flow;
|
|
|
|
Affected Rows: 0
|
|
|
|
DROP TABLE incremental_aggr_input;
|
|
|
|
Affected Rows: 0
|
|
|
|
DROP TABLE incremental_aggr_sink;
|
|
|
|
Affected Rows: 0
|
|
|