CREATE TABLE numbers_input_show ( "number" INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); Affected Rows: 0 create table out_num_cnt_show ( "number" INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, PRIMARY KEY(number), ); Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; ++ ++ SHOW FLOWS LIKE 'filter_numbers_show'; ++ ++ CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number FROM numbers_input_show where number > 10; Affected Rows: 0 SHOW CREATE FLOW filter_numbers_show; +---------------------+------------------------------------------------------------+ | Flow | Create Flow | +---------------------+------------------------------------------------------------+ | filter_numbers_show | CREATE FLOW IF NOT EXISTS filter_numbers_show | | | SINK TO public.out_num_cnt_show | | | AS SELECT number FROM numbers_input_show WHERE number > 10 | +---------------------+------------------------------------------------------------+ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+------------------------------------------------------------+------------------------------------+ | flow_name | table_catalog | flow_definition | source_table_names | +---------------------+---------------+------------------------------------------------------------+------------------------------------+ | filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | | | SINK TO public.out_num_cnt_show | | | | | AS SELECT number FROM numbers_input_show WHERE number > 10 | | +---------------------+---------------+------------------------------------------------------------+------------------------------------+ SHOW FLOWS LIKE 'filter_numbers_show'; +---------------------+ | Flows | +---------------------+ | filter_numbers_show | +---------------------+ drop flow filter_numbers_show; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; ++ ++ SHOW FLOWS LIKE 'filter_numbers_show'; ++ ++ -- also test `CREATE OR REPLACE` and `IF NOT EXISTS` -- (flow exists, replace, if not exists)=(false, false, false) CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 10; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | flow_name | table_catalog | flow_definition | source_table_names | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | | | SINK TO public.out_num_cnt_show | | | | | AS SELECT number, ts FROM numbers_input_show WHERE number > 10 | | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ -- this one should error out -- (flow exists, replace, if not exists)=(true, false, false) CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 15; Error: 8000(FlowAlreadyExists), Flow already exists: greptime.filter_numbers_show SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | flow_name | table_catalog | flow_definition | source_table_names | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | | | SINK TO public.out_num_cnt_show | | | | | AS SELECT number, ts FROM numbers_input_show WHERE number > 10 | | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ -- makesure it's not replaced in flownode INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); Affected Rows: 3 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('filter_numbers_show'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('filter_numbers_show') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ SELECT number, ts FROM out_num_cnt_show; +--------+-------------------------+ | number | ts | +--------+-------------------------+ | 15 | 1970-01-01T00:00:00.001 | | 16 | 1970-01-01T00:00:00.002 | +--------+-------------------------+ -- after this one, the flow SHOULD NOT be replaced -- (flow exists, replace, if not exists)=(true, false, true) CREATE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 5; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | flow_name | table_catalog | flow_definition | source_table_names | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | | | SINK TO public.out_num_cnt_show | | | | | AS SELECT number, ts FROM numbers_input_show WHERE number > 10 | | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ -- makesure it's not replaced in flownode INSERT INTO numbers_input_show VALUES (4,4),(5,4),(10, 3),(11, 4); Affected Rows: 4 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('filter_numbers_show'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('filter_numbers_show') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ SELECT number, ts FROM out_num_cnt_show; +--------+-------------------------+ | number | ts | +--------+-------------------------+ | 11 | 1970-01-01T00:00:00.004 | | 15 | 1970-01-01T00:00:00.001 | | 16 | 1970-01-01T00:00:00.002 | +--------+-------------------------+ -- after this, the flow SHOULD be replaced -- (flow exists, replace, if not exists)=(true, true, false) CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 3; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+---------------------------------------------------------------+------------------------------------+ | flow_name | table_catalog | flow_definition | source_table_names | +---------------------+---------------+---------------------------------------------------------------+------------------------------------+ | filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | | | SINK TO public.out_num_cnt_show | | | | | AS SELECT number, ts FROM numbers_input_show WHERE number > 3 | | +---------------------+---------------+---------------------------------------------------------------+------------------------------------+ -- makesure it's replaced in flownode INSERT INTO numbers_input_show VALUES (3, 1),(4, 2),(10, 3),(11, 4); Affected Rows: 4 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('filter_numbers_show'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('filter_numbers_show') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ SELECT number, ts FROM out_num_cnt_show; +--------+-------------------------+ | number | ts | +--------+-------------------------+ | 4 | 1970-01-01T00:00:00.002 | | 10 | 1970-01-01T00:00:00.003 | | 11 | 1970-01-01T00:00:00.004 | | 15 | 1970-01-01T00:00:00.001 | | 16 | 1970-01-01T00:00:00.002 | +--------+-------------------------+ -- after this, the flow SHOULD error out since having both `replace` and `if not exists` -- (flow exists, replace, if not exists)=(true, true, true) CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > 0; Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE` SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+---------------------------------------------------------------+------------------------------------+ | flow_name | table_catalog | flow_definition | source_table_names | +---------------------+---------------+---------------------------------------------------------------+------------------------------------+ | filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | | | SINK TO public.out_num_cnt_show | | | | | AS SELECT number, ts FROM numbers_input_show WHERE number > 3 | | +---------------------+---------------+---------------------------------------------------------------+------------------------------------+ DROP FLOW filter_numbers_show; Affected Rows: 0 -- (flow exists, replace, if not exists)=(false, true, true) CREATE OR REPLACE FLOW IF NOT EXISTS filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -1; Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EXISTS` and `OR REPLACE` SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; ++ ++ DROP FLOW filter_numbers_show; Error: 8001(FlowNotFound), Flow not found: greptime.filter_numbers_show -- following always create since didn't exist -- (flow exists, replace, if not exists)=(false, true, false) CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -2; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | flow_name | table_catalog | flow_definition | source_table_names | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | | | SINK TO public.out_num_cnt_show | | | | | AS SELECT number, ts FROM numbers_input_show WHERE number > -2 | | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ DROP FLOW filter_numbers_show; Affected Rows: 0 -- (flow exists, replace, if not exists)=(false, false, true) CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number, ts FROM numbers_input_show where number > -3; Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | flow_name | table_catalog | flow_definition | source_table_names | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | | | SINK TO public.out_num_cnt_show | | | | | AS SELECT number, ts FROM numbers_input_show WHERE number > -3 | | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ -- makesure after recover should be the same -- SQLNESS ARG restart=true SELECT 1; +----------+ | Int64(1) | +----------+ | 1 | +----------+ -- SQLNESS SLEEP 3s SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | flow_name | table_catalog | flow_definition | source_table_names | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ | filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | | | SINK TO public.out_num_cnt_show | | | | | AS SELECT number, ts FROM numbers_input_show WHERE number > -3 | | +---------------------+---------------+----------------------------------------------------------------+------------------------------------+ SELECT * FROM out_num_cnt_show; +--------+-------------------------+ | number | ts | +--------+-------------------------+ | 4 | 1970-01-01T00:00:00.002 | | 10 | 1970-01-01T00:00:00.003 | | 11 | 1970-01-01T00:00:00.004 | | 15 | 1970-01-01T00:00:00.001 | | 16 | 1970-01-01T00:00:00.002 | +--------+-------------------------+ INSERT INTO numbers_input_show VALUES(-4,0), (-3,1), (-2,2), (-1,3); Affected Rows: 4 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('filter_numbers_show'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('filter_numbers_show') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ SELECT * FROM out_num_cnt_show; +--------+-------------------------+ | number | ts | +--------+-------------------------+ | -2 | 1970-01-01T00:00:00.002 | | -1 | 1970-01-01T00:00:00.003 | | 4 | 1970-01-01T00:00:00.002 | | 10 | 1970-01-01T00:00:00.003 | | 11 | 1970-01-01T00:00:00.004 | | 15 | 1970-01-01T00:00:00.001 | | 16 | 1970-01-01T00:00:00.002 | +--------+-------------------------+ DROP FLOW filter_numbers_show; Affected Rows: 0 drop table out_num_cnt_show; Affected Rows: 0 drop table numbers_input_show; Affected Rows: 0 CREATE TABLE numbers_input_show ( "number" INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(number), TIME INDEX(ts) ); Affected Rows: 0 create table out_num_cnt_show ( "number" INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX, PRIMARY KEY(number), ); Affected Rows: 0 CREATE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number as n1 FROM numbers_input_show where number > 10; Affected Rows: 0 INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); Affected Rows: 3 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('filter_numbers_show'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('filter_numbers_show') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ SELECT number FROM out_num_cnt_show; +--------+ | number | +--------+ | 15 | | 16 | +--------+ -- should mismatch, hence the old flow remains CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15; Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) -- should mismatch, hence the old flow remains CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15; Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) SELECT flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +------------------------------------------------------------------+------------------------------------+ | flow_definition | source_table_names | +------------------------------------------------------------------+------------------------------------+ | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | | SINK TO public.out_num_cnt_show | | | AS SELECT number AS n1 FROM numbers_input_show WHERE number > 10 | | +------------------------------------------------------------------+------------------------------------+ INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3); Affected Rows: 4 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('filter_numbers_show'); +-----------------------------------------+ | ADMIN FLUSH_FLOW('filter_numbers_show') | +-----------------------------------------+ | FLOW_FLUSHED | +-----------------------------------------+ -- sink table shows new 11 since old flow remains SELECT number FROM out_num_cnt_show; +--------+ | number | +--------+ | 11 | | 15 | | 15 | | 16 | | 18 | +--------+ DROP FLOW filter_numbers_show; Affected Rows: 0 drop table out_num_cnt_show; Affected Rows: 0 drop table numbers_input_show; Affected Rows: 0