Files
greptimedb/tests/cases/standalone/common/flow/flow_rebuild.result
LFC b2074e3863 chore: upgrade DataFusion family, again (#7578)
* chore: upgrade DataFusion family

Signed-off-by: luofucong <luofc@foxmail.com>

* chore: switch to released version of datafusion-pg-catalog

---------

Signed-off-by: luofucong <luofc@foxmail.com>
Co-authored-by: Ning Sun <sunning@greptime.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
2026-03-03 07:36:39 +00:00

782 lines
15 KiB
Plaintext

CREATE TABLE input_basic (
"number" INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
)WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 2 |
+----------+
DROP TABLE input_basic;
Affected Rows: 0
DROP TABLE out_basic;
Affected Rows: 0
DROP FLOW test_wildcard_basic;
Affected Rows: 0
-- combination of different order of rebuild input table/flow
CREATE TABLE input_basic (
"number" INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 2 |
+----------+
DROP TABLE input_basic;
Affected Rows: 0
CREATE TABLE input_basic (
"number" INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
-- this is expected to be the same as above("2") since the new `input_basic` table
-- have different table id, so is a different table
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 2 |
+----------+
DROP FLOW test_wildcard_basic;
Affected Rows: 0
-- recreate flow so that it use new table id
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700");
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
-- flow batching mode
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 3 |
+----------+
SELECT count(*) FROM input_basic;
+----------+
| count(*) |
+----------+
| 3 |
+----------+
DROP TABLE input_basic;
Affected Rows: 0
DROP FLOW test_wildcard_basic;
Affected Rows: 0
DROP TABLE out_basic;
Affected Rows: 0
CREATE TABLE input_basic (
"number" INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 2 |
+----------+
DROP FLOW test_wildcard_basic;
Affected Rows: 0
DROP TABLE out_basic;
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700");
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 3 |
+----------+
-- test again, this time with db restart
DROP TABLE input_basic;
Affected Rows: 0
DROP TABLE out_basic;
Affected Rows: 0
DROP FLOW test_wildcard_basic;
Affected Rows: 0
CREATE TABLE input_basic (
"number" INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 2 |
+----------+
DROP TABLE input_basic;
Affected Rows: 0
DROP TABLE out_basic;
Affected Rows: 0
DROP FLOW test_wildcard_basic;
Affected Rows: 0
-- combination of different order of rebuild input table/flow
CREATE TABLE input_basic (
"number" INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 2 |
+----------+
DROP TABLE input_basic;
Affected Rows: 0
CREATE TABLE input_basic (
"number" INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(26, "2021-07-01 00:00:02.000");
Affected Rows: 3
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
-- this is expected to be the same as above("2") since the new `input_basic` table
-- have different table id, so is a different table
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 2 |
+----------+
DROP FLOW test_wildcard_basic;
Affected Rows: 0
-- recreate flow so that it use new table id
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
-- give flownode a second to rebuild flow
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700");
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
-- 4 is also expected, since flow batching mode
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 4 |
+----------+
SELECT count(*) FROM input_basic;
+----------+
| count(*) |
+----------+
| 4 |
+----------+
DROP TABLE input_basic;
Affected Rows: 0
DROP FLOW test_wildcard_basic;
Affected Rows: 0
DROP TABLE out_basic;
Affected Rows: 0
CREATE TABLE input_basic (
"number" INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(number),
TIME INDEX(ts)
);
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500");
Affected Rows: 2
-- give flownode a second to rebuild flow
-- SQLNESS SLEEP 3s
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 2 |
+----------+
DROP FLOW test_wildcard_basic;
Affected Rows: 0
DROP TABLE out_basic;
Affected Rows: 0
CREATE FLOW test_wildcard_basic sink TO out_basic AS
SELECT
COUNT(*) as wildcard
FROM
input_basic;
Affected Rows: 0
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 3s
INSERT INTO
input_basic
VALUES
(23, "2021-07-01 00:00:01.000"),
(24, "2021-07-01 00:00:01.500"),
(25, "2021-07-01 00:00:01.700");
Affected Rows: 3
-- give flownode a second to rebuild flow
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_wildcard_basic');
+-----------------------------------------+
| ADMIN FLUSH_FLOW('test_wildcard_basic') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
SELECT wildcard FROM out_basic;
+----------+
| wildcard |
+----------+
| 3 |
+----------+
DROP FLOW test_wildcard_basic;
Affected Rows: 0
DROP TABLE input_basic;
Affected Rows: 0
DROP TABLE out_basic;
Affected Rows: 0
-- check if different schema is working as expected
CREATE DATABASE jsdp_log;
Affected Rows: 1
USE jsdp_log;
Affected Rows: 0
CREATE TABLE IF NOT EXISTS `api_log` (
`time` TIMESTAMP(9) NOT NULL,
`key` STRING NULL SKIPPING INDEX WITH(granularity = '1024', type = 'BLOOM'),
`status_code` TINYINT NULL,
`method` STRING NULL,
`path` STRING NULL,
`raw_query` STRING NULL,
`user_agent` STRING NULL,
`client_ip` STRING NULL,
`duration` INT NULL,
`count` INT NULL,
TIME INDEX (`time`)
) ENGINE=mito WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE IF NOT EXISTS `api_stats` (
`time` TIMESTAMP(0) NOT NULL,
`key` STRING NULL,
`qpm` BIGINT NULL,
`rpm` BIGINT NULL,
TIME INDEX (`time`),
PRIMARY KEY (`key`)
) ENGINE=mito;
Affected Rows: 0
CREATE FLOW IF NOT EXISTS api_stats_flow
SINK TO api_stats AS
SELECT date_trunc('minute', `time`::TimestampSecond) AS `time1`, `key`, count(*), sum(`count`)
FROM api_log
GROUP BY `time1`, `key`;
Affected Rows: 0
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (0::TimestampSecond, '1', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');
+------------------------------------+
| ADMIN FLUSH_FLOW('api_stats_flow') |
+------------------------------------+
| FLOW_FLUSHED |
+------------------------------------+
SELECT * FROM api_stats;
+---------------------+-----+-----+-----+
| time | key | qpm | rpm |
+---------------------+-----+-----+-----+
| 1970-01-01T00:00:00 | 1 | 1 | 1 |
+---------------------+-----+-----+-----+
-- SQLNESS ARG restart=true
SELECT 1;
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
-- SQLNESS SLEEP 5s
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (0::TimestampSecond, '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
Affected Rows: 1
-- wait more time so flownode have time to recover flows
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');
+------------------------------------+
| ADMIN FLUSH_FLOW('api_stats_flow') |
+------------------------------------+
| FLOW_FLUSHED |
+------------------------------------+
-- SQLNESS SLEEP 5s
SELECT * FROM api_stats;
+---------------------+-----+-----+-----+
| time | key | qpm | rpm |
+---------------------+-----+-----+-----+
| 1970-01-01T00:00:00 | 1 | 1 | 1 |
| 1970-01-01T00:00:00 | 2 | 1 | 1 |
+---------------------+-----+-----+-----+
DROP FLOW api_stats_flow;
Affected Rows: 0
DROP TABLE api_log;
Affected Rows: 0
DROP TABLE api_stats;
Affected Rows: 0
USE public;
Affected Rows: 0
DROP DATABASE jsdp_log;
Affected Rows: 0