Files
greptimedb/tests/cases/standalone/common/flow/flow_rebuild.sql
discord9 a7da9af5de feat: use flow batching engine
broken: try using logical plan

fix: use dummy catalog for logical plan

fix: insert plan exec&sqlness grpc addr

feat: use frontend instance in flownode in standalone

feat: flow type in metasrv&fix: flush flow out of sync& column name alias

tests: sqlness update

tests: sqlness flow rebuild udpate

chore: per review

refactor: keep chnl mgr

refactor: use catalog mgr for get table

tests: use valid sql

fix: add more check

refactor: put flow type determine to frontend
2025-04-22 17:09:42 +08:00

353 lines
7.6 KiB
SQL

CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
SELECT wildcard FROM out_basic;
DROP TABLE input_basic;
DROP TABLE out_basic;
DROP FLOW test_wildcard_basic;
-- combination of different order of rebuild input table/flow
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
SELECT wildcard FROM out_basic;
DROP TABLE input_basic;
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
-- this is expected to be the same as above("2") since the new `input_basic` table
-- have different table id, so is a different table
SELECT wildcard FROM out_basic;
DROP FLOW test_wildcard_basic;
-- recreate flow so that it use new table id
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
-- flow batching mode
SELECT wildcard FROM out_basic;
SELECT count(*) FROM input_basic;
DROP TABLE input_basic;
DROP FLOW test_wildcard_basic;
DROP TABLE out_basic;
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
SELECT wildcard FROM out_basic;
DROP FLOW test_wildcard_basic;
DROP TABLE out_basic;
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
SELECT wildcard FROM out_basic;
-- test again, this time with db restart
DROP TABLE input_basic;
DROP TABLE out_basic;
DROP FLOW test_wildcard_basic;
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
SELECT wildcard FROM out_basic;
DROP TABLE input_basic;
DROP TABLE out_basic;
DROP FLOW test_wildcard_basic;
-- combination of different order of rebuild input table/flow
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
SELECT wildcard FROM out_basic;
DROP TABLE input_basic;
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(26, "2021-07-01 00:00:02.000");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
-- this is expected to be the same as above("2") since the new `input_basic` table
-- have different table id, so is a different table
SELECT wildcard FROM out_basic;
DROP FLOW test_wildcard_basic;
-- recreate flow so that it use new table id
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
-- give flownode a second to rebuild flow
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700");
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
-- 4 is also expected, since flow batching mode
SELECT wildcard FROM out_basic;
SELECT count(*) FROM input_basic;
DROP TABLE input_basic;
DROP FLOW test_wildcard_basic;
DROP TABLE out_basic;
CREATE TABLE input_basic (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
SELECT wildcard FROM out_basic;
DROP FLOW test_wildcard_basic;
DROP TABLE out_basic;
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
-- SQLNESS ARG restart=true
SELECT 1;
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700");
-- give flownode a second to rebuild flow
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
SELECT wildcard FROM out_basic;
DROP FLOW test_wildcard_basic;
DROP TABLE input_basic;
DROP TABLE out_basic;