mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 12:30:38 +00:00
* chore: upgrade DataFusion family Signed-off-by: luofucong <luofc@foxmail.com> * chore: switch to released version of datafusion-pg-catalog --------- Signed-off-by: luofucong <luofc@foxmail.com> Co-authored-by: Ning Sun <sunning@greptime.com> Co-authored-by: Ning Sun <sunng@protonmail.com>
453 lines
21 KiB
Plaintext
453 lines
21 KiB
Plaintext
CREATE TABLE numbers_input_show (
|
|
"number" INT,
|
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY(number),
|
|
TIME INDEX(ts)
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
create table out_num_cnt_show (
|
|
"number" INT,
|
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX,
|
|
PRIMARY KEY(number),
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
++
|
|
++
|
|
|
|
SHOW FLOWS LIKE 'filter_numbers_show';
|
|
|
|
++
|
|
++
|
|
|
|
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10;
|
|
|
|
Affected Rows: 0
|
|
|
|
SHOW CREATE FLOW filter_numbers_show;
|
|
|
|
+---------------------+------------------------------------------------------------+
|
|
| Flow | Create Flow |
|
|
+---------------------+------------------------------------------------------------+
|
|
| filter_numbers_show | CREATE FLOW IF NOT EXISTS filter_numbers_show |
|
|
| | SINK TO public.out_num_cnt_show |
|
|
| | AS SELECT number FROM numbers_input_show WHERE number > 10 |
|
|
+---------------------+------------------------------------------------------------+
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+---------------------+---------------+------------------------------------------------------------+------------------------------------+
|
|
| flow_name | table_catalog | flow_definition | source_table_names |
|
|
+---------------------+---------------+------------------------------------------------------------+------------------------------------+
|
|
| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| | | SINK TO public.out_num_cnt_show | |
|
|
| | | AS SELECT number FROM numbers_input_show WHERE number > 10 | |
|
|
+---------------------+---------------+------------------------------------------------------------+------------------------------------+
|
|
|
|
SHOW FLOWS LIKE 'filter_numbers_show';
|
|
|
|
+---------------------+
|
|
| Flows |
|
|
+---------------------+
|
|
| filter_numbers_show |
|
|
+---------------------+
|
|
|
|
drop flow filter_numbers_show;
|
|
|
|
Affected Rows: 0
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
++
|
|
++
|
|
|
|
SHOW FLOWS LIKE 'filter_numbers_show';
|
|
|
|
++
|
|
++
|
|
|
|
-- also test `CREATE OR REPLACE` and `IF NOT EXISTS`
|
|
-- (flow exists, replace, if not exists)=(false, false, false)
|
|
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 10;
|
|
|
|
Affected Rows: 0
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| flow_name | table_catalog | flow_definition | source_table_names |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| | | SINK TO public.out_num_cnt_show | |
|
|
| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 10 | |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
|
|
-- this one should error out
|
|
-- (flow exists, replace, if not exists)=(true, false, false)
|
|
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 15;
|
|
|
|
Error: 8000(FlowAlreadyExists), Flow already exists: greptime.filter_numbers_show
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| flow_name | table_catalog | flow_definition | source_table_names |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| | | SINK TO public.out_num_cnt_show | |
|
|
| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 10 | |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
|
|
-- makesure it's not replaced in flownode
|
|
INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2);
|
|
|
|
Affected Rows: 3
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('filter_numbers_show');
|
|
|
|
+-----------------------------------------+
|
|
| ADMIN FLUSH_FLOW('filter_numbers_show') |
|
|
+-----------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+-----------------------------------------+
|
|
|
|
SELECT number, ts FROM out_num_cnt_show;
|
|
|
|
+--------+-------------------------+
|
|
| number | ts |
|
|
+--------+-------------------------+
|
|
| 15 | 1970-01-01T00:00:00.001 |
|
|
| 16 | 1970-01-01T00:00:00.002 |
|
|
+--------+-------------------------+
|
|
|
|
-- after this one, the flow SHOULD NOT be replaced
|
|
-- (flow exists, replace, if not exists)=(true, false, true)
|
|
CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 5;
|
|
|
|
Affected Rows: 0
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| flow_name | table_catalog | flow_definition | source_table_names |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| | | SINK TO public.out_num_cnt_show | |
|
|
| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 10 | |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
|
|
-- makesure it's not replaced in flownode
|
|
INSERT INTO numbers_input_show VALUES (4,4),(5,4),(10, 3),(11, 4);
|
|
|
|
Affected Rows: 4
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('filter_numbers_show');
|
|
|
|
+-----------------------------------------+
|
|
| ADMIN FLUSH_FLOW('filter_numbers_show') |
|
|
+-----------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+-----------------------------------------+
|
|
|
|
SELECT number, ts FROM out_num_cnt_show;
|
|
|
|
+--------+-------------------------+
|
|
| number | ts |
|
|
+--------+-------------------------+
|
|
| 11 | 1970-01-01T00:00:00.004 |
|
|
| 15 | 1970-01-01T00:00:00.001 |
|
|
| 16 | 1970-01-01T00:00:00.002 |
|
|
+--------+-------------------------+
|
|
|
|
-- after this, the flow SHOULD be replaced
|
|
-- (flow exists, replace, if not exists)=(true, true, false)
|
|
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 3;
|
|
|
|
Affected Rows: 0
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+---------------------+---------------+---------------------------------------------------------------+------------------------------------+
|
|
| flow_name | table_catalog | flow_definition | source_table_names |
|
|
+---------------------+---------------+---------------------------------------------------------------+------------------------------------+
|
|
| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| | | SINK TO public.out_num_cnt_show | |
|
|
| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 3 | |
|
|
+---------------------+---------------+---------------------------------------------------------------+------------------------------------+
|
|
|
|
-- makesure it's replaced in flownode
|
|
INSERT INTO numbers_input_show VALUES (3, 1),(4, 2),(10, 3),(11, 4);
|
|
|
|
Affected Rows: 4
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('filter_numbers_show');
|
|
|
|
+-----------------------------------------+
|
|
| ADMIN FLUSH_FLOW('filter_numbers_show') |
|
|
+-----------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+-----------------------------------------+
|
|
|
|
SELECT number, ts FROM out_num_cnt_show;
|
|
|
|
+--------+-------------------------+
|
|
| number | ts |
|
|
+--------+-------------------------+
|
|
| 4 | 1970-01-01T00:00:00.002 |
|
|
| 10 | 1970-01-01T00:00:00.003 |
|
|
| 11 | 1970-01-01T00:00:00.004 |
|
|
| 15 | 1970-01-01T00:00:00.001 |
|
|
| 16 | 1970-01-01T00:00:00.002 |
|
|
+--------+-------------------------+
|
|
|
|
-- after this, the flow SHOULD error out since having both `replace` and `if not exists`
|
|
-- (flow exists, replace, if not exists)=(true, true, true)
|
|
CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 0;
|
|
|
|
Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE`
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+---------------------+---------------+---------------------------------------------------------------+------------------------------------+
|
|
| flow_name | table_catalog | flow_definition | source_table_names |
|
|
+---------------------+---------------+---------------------------------------------------------------+------------------------------------+
|
|
| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| | | SINK TO public.out_num_cnt_show | |
|
|
| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 3 | |
|
|
+---------------------+---------------+---------------------------------------------------------------+------------------------------------+
|
|
|
|
DROP FLOW filter_numbers_show;
|
|
|
|
Affected Rows: 0
|
|
|
|
-- (flow exists, replace, if not exists)=(false, true, true)
|
|
CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -1;
|
|
|
|
Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE`
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
++
|
|
++
|
|
|
|
DROP FLOW filter_numbers_show;
|
|
|
|
Error: 8001(FlowNotFound), Flow not found: greptime.filter_numbers_show
|
|
|
|
-- following always create since didn't exist
|
|
-- (flow exists, replace, if not exists)=(false, true, false)
|
|
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -2;
|
|
|
|
Affected Rows: 0
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| flow_name | table_catalog | flow_definition | source_table_names |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| | | SINK TO public.out_num_cnt_show | |
|
|
| | | AS SELECT number, ts FROM numbers_input_show WHERE number > -2 | |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
|
|
DROP FLOW filter_numbers_show;
|
|
|
|
Affected Rows: 0
|
|
|
|
-- (flow exists, replace, if not exists)=(false, false, true)
|
|
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -3;
|
|
|
|
Affected Rows: 0
|
|
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| flow_name | table_catalog | flow_definition | source_table_names |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| | | SINK TO public.out_num_cnt_show | |
|
|
| | | AS SELECT number, ts FROM numbers_input_show WHERE number > -3 | |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
|
|
-- makesure after recover should be the same
|
|
-- SQLNESS ARG restart=true
|
|
SELECT 1;
|
|
|
|
+----------+
|
|
| Int64(1) |
|
|
+----------+
|
|
| 1 |
|
|
+----------+
|
|
|
|
-- SQLNESS SLEEP 3s
|
|
SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| flow_name | table_catalog | flow_definition | source_table_names |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| | | SINK TO public.out_num_cnt_show | |
|
|
| | | AS SELECT number, ts FROM numbers_input_show WHERE number > -3 | |
|
|
+---------------------+---------------+----------------------------------------------------------------+------------------------------------+
|
|
|
|
SELECT * FROM out_num_cnt_show;
|
|
|
|
+--------+-------------------------+
|
|
| number | ts |
|
|
+--------+-------------------------+
|
|
| 4 | 1970-01-01T00:00:00.002 |
|
|
| 10 | 1970-01-01T00:00:00.003 |
|
|
| 11 | 1970-01-01T00:00:00.004 |
|
|
| 15 | 1970-01-01T00:00:00.001 |
|
|
| 16 | 1970-01-01T00:00:00.002 |
|
|
+--------+-------------------------+
|
|
|
|
INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3);
|
|
|
|
Affected Rows: 4
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('filter_numbers_show');
|
|
|
|
+-----------------------------------------+
|
|
| ADMIN FLUSH_FLOW('filter_numbers_show') |
|
|
+-----------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+-----------------------------------------+
|
|
|
|
SELECT * FROM out_num_cnt_show;
|
|
|
|
+--------+-------------------------+
|
|
| number | ts |
|
|
+--------+-------------------------+
|
|
| -2 | 1970-01-01T00:00:00.002 |
|
|
| -1 | 1970-01-01T00:00:00.003 |
|
|
| 4 | 1970-01-01T00:00:00.002 |
|
|
| 10 | 1970-01-01T00:00:00.003 |
|
|
| 11 | 1970-01-01T00:00:00.004 |
|
|
| 15 | 1970-01-01T00:00:00.001 |
|
|
| 16 | 1970-01-01T00:00:00.002 |
|
|
+--------+-------------------------+
|
|
|
|
DROP FLOW filter_numbers_show;
|
|
|
|
Affected Rows: 0
|
|
|
|
drop table out_num_cnt_show;
|
|
|
|
Affected Rows: 0
|
|
|
|
drop table numbers_input_show;
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE TABLE numbers_input_show (
|
|
"number" INT,
|
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY(number),
|
|
TIME INDEX(ts)
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
create table out_num_cnt_show (
|
|
"number" INT,
|
|
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX,
|
|
PRIMARY KEY(number),
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10;
|
|
|
|
Affected Rows: 0
|
|
|
|
INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2);
|
|
|
|
Affected Rows: 3
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('filter_numbers_show');
|
|
|
|
+-----------------------------------------+
|
|
| ADMIN FLUSH_FLOW('filter_numbers_show') |
|
|
+-----------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+-----------------------------------------+
|
|
|
|
SELECT number FROM out_num_cnt_show;
|
|
|
|
+--------+
|
|
| number |
|
|
+--------+
|
|
| 15 |
|
|
| 16 |
|
|
+--------+
|
|
|
|
-- should mismatch, hence the old flow remains
|
|
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15;
|
|
|
|
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
|
|
|
|
-- should mismatch, hence the old flow remains
|
|
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15;
|
|
|
|
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
|
|
|
|
SELECT flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
|
|
|
|
+------------------------------------------------------------------+------------------------------------+
|
|
| flow_definition | source_table_names |
|
|
+------------------------------------------------------------------+------------------------------------+
|
|
| CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show |
|
|
| SINK TO public.out_num_cnt_show | |
|
|
| AS SELECT number AS n1 FROM numbers_input_show WHERE number > 10 | |
|
|
+------------------------------------------------------------------+------------------------------------+
|
|
|
|
INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3);
|
|
|
|
Affected Rows: 4
|
|
|
|
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
|
|
ADMIN FLUSH_FLOW('filter_numbers_show');
|
|
|
|
+-----------------------------------------+
|
|
| ADMIN FLUSH_FLOW('filter_numbers_show') |
|
|
+-----------------------------------------+
|
|
| FLOW_FLUSHED |
|
|
+-----------------------------------------+
|
|
|
|
-- sink table shows new 11 since old flow remains
|
|
SELECT number FROM out_num_cnt_show;
|
|
|
|
+--------+
|
|
| number |
|
|
+--------+
|
|
| 11 |
|
|
| 15 |
|
|
| 15 |
|
|
| 16 |
|
|
| 18 |
|
|
+--------+
|
|
|
|
DROP FLOW filter_numbers_show;
|
|
|
|
Affected Rows: 0
|
|
|
|
drop table out_num_cnt_show;
|
|
|
|
Affected Rows: 0
|
|
|
|
drop table numbers_input_show;
|
|
|
|
Affected Rows: 0
|
|
|