mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
fix: force streaming mode for instant source table (#6031)
* fix: force streaming mode for instant source table * tests: sqlness test&refactor: get table * refactor: per review
This commit is contained in:
@@ -8,7 +8,7 @@ CREATE TABLE distinct_basic (
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- should fail
|
||||
-- should fallback to streaming mode
|
||||
-- SQLNESS REPLACE id=\d+ id=REDACTED
|
||||
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||
SELECT
|
||||
@@ -16,9 +16,151 @@ SELECT
|
||||
FROM
|
||||
distinct_basic;
|
||||
|
||||
Error: 3001(EngineExecuteQuery), Unsupported: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval
|
||||
Affected Rows: 0
|
||||
|
||||
ALTER TABLE distinct_basic SET 'ttl' = '5s';
|
||||
-- flow_options should have a flow_type:streaming
|
||||
-- since source table's ttl=instant
|
||||
SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
+---------------------+---------------------------+
|
||||
| flow_name | options |
|
||||
+---------------------+---------------------------+
|
||||
| test_distinct_basic | {"flow_type":"streaming"} |
|
||||
+---------------------+---------------------------+
|
||||
|
||||
SHOW CREATE TABLE distinct_basic;
|
||||
|
||||
+----------------+-----------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+----------------+-----------------------------------------------------------+
|
||||
| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( |
|
||||
| | "number" INT NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("number") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | ttl = 'instant' |
|
||||
| | ) |
|
||||
+----------------+-----------------------------------------------------------+
|
||||
|
||||
SHOW CREATE TABLE out_distinct_basic;
|
||||
|
||||
+--------------------+---------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+--------------------+---------------------------------------------------+
|
||||
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
|
||||
| | "dis" INT NULL, |
|
||||
| | "update_at" TIMESTAMP(3) NULL, |
|
||||
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("__ts_placeholder"), |
|
||||
| | PRIMARY KEY ("dis") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+--------------------+---------------------------------------------------+
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
+-----------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('test_distinct_basic') |
|
||||
+-----------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+-----------------------------------------+
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
out_distinct_basic;
|
||||
|
||||
+-----+
|
||||
| dis |
|
||||
+-----+
|
||||
| 20 |
|
||||
| 22 |
|
||||
+-----+
|
||||
|
||||
SELECT number FROM distinct_basic;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS SLEEP 6s
|
||||
ADMIN FLUSH_TABLE('distinct_basic');
|
||||
|
||||
+-------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('distinct_basic') |
|
||||
+-------------------------------------+
|
||||
| 0 |
|
||||
+-------------------------------------+
|
||||
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
(23, "2021-07-01 00:00:01.600");
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
+-----------------------------------------+
|
||||
| ADMIN FLUSH_FLOW('test_distinct_basic') |
|
||||
+-----------------------------------------+
|
||||
| FLOW_FLUSHED |
|
||||
+-----------------------------------------+
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
out_distinct_basic;
|
||||
|
||||
+-----+
|
||||
| dis |
|
||||
+-----+
|
||||
| 20 |
|
||||
| 22 |
|
||||
| 23 |
|
||||
+-----+
|
||||
|
||||
SELECT number FROM distinct_basic;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
DROP FLOW test_distinct_basic;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE distinct_basic;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DROP TABLE out_distinct_basic;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- test ttl = 5s
|
||||
CREATE TABLE distinct_basic (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
)WITH ('ttl' = '5s');
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -30,6 +172,16 @@ FROM
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- flow_options should have a flow_type:batching
|
||||
-- since source table's ttl=instant
|
||||
SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
+---------------------+--------------------------+
|
||||
| flow_name | options |
|
||||
+---------------------+--------------------------+
|
||||
| test_distinct_basic | {"flow_type":"batching"} |
|
||||
+---------------------+--------------------------+
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
@@ -139,41 +291,6 @@ ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
| FLOW_FLUSHED |
|
||||
+-----------------------------------------+
|
||||
|
||||
SHOW CREATE TABLE distinct_basic;
|
||||
|
||||
+----------------+-----------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+----------------+-----------------------------------------------------------+
|
||||
| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( |
|
||||
| | "number" INT NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("number") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | WITH( |
|
||||
| | ttl = '5s' |
|
||||
| | ) |
|
||||
+----------------+-----------------------------------------------------------+
|
||||
|
||||
SHOW CREATE TABLE out_distinct_basic;
|
||||
|
||||
+--------------------+---------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+--------------------+---------------------------------------------------+
|
||||
| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( |
|
||||
| | "dis" INT NULL, |
|
||||
| | "update_at" TIMESTAMP(3) NULL, |
|
||||
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("__ts_placeholder"), |
|
||||
| | PRIMARY KEY ("dis") |
|
||||
| | ) |
|
||||
| | |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+--------------------+---------------------------------------------------+
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
|
||||
@@ -6,7 +6,7 @@ CREATE TABLE distinct_basic (
|
||||
TIME INDEX(ts)
|
||||
)WITH ('ttl' = 'instant');
|
||||
|
||||
-- should fail
|
||||
-- should fallback to streaming mode
|
||||
-- SQLNESS REPLACE id=\d+ id=REDACTED
|
||||
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||
SELECT
|
||||
@@ -14,7 +14,61 @@ SELECT
|
||||
FROM
|
||||
distinct_basic;
|
||||
|
||||
ALTER TABLE distinct_basic SET 'ttl' = '5s';
|
||||
-- flow_options should have a flow_type:streaming
|
||||
-- since source table's ttl=instant
|
||||
SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
SHOW CREATE TABLE distinct_basic;
|
||||
|
||||
SHOW CREATE TABLE out_distinct_basic;
|
||||
|
||||
-- SQLNESS SLEEP 3s
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(20, "2021-07-01 00:00:00.200"),
|
||||
(22, "2021-07-01 00:00:00.600");
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
out_distinct_basic;
|
||||
|
||||
SELECT number FROM distinct_basic;
|
||||
|
||||
-- SQLNESS SLEEP 6s
|
||||
ADMIN FLUSH_TABLE('distinct_basic');
|
||||
|
||||
INSERT INTO
|
||||
distinct_basic
|
||||
VALUES
|
||||
(23, "2021-07-01 00:00:01.600");
|
||||
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
out_distinct_basic;
|
||||
|
||||
SELECT number FROM distinct_basic;
|
||||
|
||||
DROP FLOW test_distinct_basic;
|
||||
DROP TABLE distinct_basic;
|
||||
DROP TABLE out_distinct_basic;
|
||||
|
||||
-- test ttl = 5s
|
||||
CREATE TABLE distinct_basic (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(number),
|
||||
TIME INDEX(ts)
|
||||
)WITH ('ttl' = '5s');
|
||||
|
||||
CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS
|
||||
SELECT
|
||||
@@ -22,6 +76,10 @@ SELECT
|
||||
FROM
|
||||
distinct_basic;
|
||||
|
||||
-- flow_options should have a flow_type:batching
|
||||
-- since source table's ttl=instant
|
||||
SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
-- SQLNESS ARG restart=true
|
||||
SELECT 1;
|
||||
|
||||
@@ -58,10 +116,6 @@ VALUES
|
||||
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
||||
ADMIN FLUSH_FLOW('test_distinct_basic');
|
||||
|
||||
SHOW CREATE TABLE distinct_basic;
|
||||
|
||||
SHOW CREATE TABLE out_distinct_basic;
|
||||
|
||||
SELECT
|
||||
dis
|
||||
FROM
|
||||
|
||||
Reference in New Issue
Block a user