mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 20:40:39 +00:00
* chore: upgrade DataFusion family Signed-off-by: luofucong <luofc@foxmail.com> * chore: switch to released version of datafusion-pg-catalog --------- Signed-off-by: luofucong <luofc@foxmail.com> Co-authored-by: Ning Sun <sunning@greptime.com> Co-authored-by: Ning Sun <sunng@protonmail.com>
455 lines
19 KiB
Plaintext
455 lines
19 KiB
Plaintext
create table metric_engine_partition (
|
|
ts timestamp time index,
|
|
host string,
|
|
cpu double,
|
|
`one_partition_key` string,
|
|
`another_partition_key` string,
|
|
primary key(host, `one_partition_key`, `another_partition_key`)
|
|
)
|
|
partition on columns (host, `one_partition_key`, `another_partition_key`) (
|
|
host <= 'host1',
|
|
host > 'host1' and host <= 'host2',
|
|
host > 'host2'
|
|
)
|
|
engine = metric
|
|
with (
|
|
physical_metric_table = "true",
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
select count(*) from metric_engine_partition;
|
|
|
|
+----------+
|
|
| count(*) |
|
|
+----------+
|
|
| 0 |
|
|
+----------+
|
|
|
|
create table logical_table_1 (
|
|
ts timestamp time index,
|
|
host string primary key,
|
|
cpu double,
|
|
)
|
|
partition on columns (host) ()
|
|
engine = metric
|
|
with (
|
|
on_physical_table = "metric_engine_partition",
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
create table invalid_logical_partition (
|
|
ts timestamp time index,
|
|
host string primary key,
|
|
cpu double,
|
|
)
|
|
partition on columns (host) (
|
|
host <= 'host1',
|
|
host > 'host1' and host <= 'host2',
|
|
host > 'host2' and host <= 'host3',
|
|
host > 'host3'
|
|
)
|
|
engine = metric
|
|
with (
|
|
on_physical_table = "metric_engine_partition",
|
|
);
|
|
|
|
Error: 1004(InvalidArguments), Invalid partition rule: logical table partition rule must match the corresponding physical table's
|
|
logical table partition exprs: [PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host1"))) }, PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host3"))) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host1"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host2"))) }) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host2"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host3"))) }) }]
|
|
physical table partition exprs: [PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host1"))) }, PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host2"))) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host1"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host2"))) }) }]
|
|
|
|
create table logical_table_2 (
|
|
ts timestamp time index,
|
|
host string primary key,
|
|
cpu double,
|
|
)
|
|
partition on columns (host) (
|
|
host <= 'host1',
|
|
host > 'host1' and host <= 'host2',
|
|
host > 'host2'
|
|
)
|
|
engine = metric
|
|
with (
|
|
on_physical_table = "metric_engine_partition",
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
insert into logical_table_2(ts, host, cpu) values
|
|
('2023-01-01 00:00:00', 'host1', 1.0),
|
|
('2023-01-01 00:00:01', 'host2', 2.0),
|
|
('2023-01-01 00:00:02', 'host3', 3.0);
|
|
|
|
Affected Rows: 3
|
|
|
|
show create table logical_table_2;
|
|
|
|
+-----------------+-------------------------------------------------------------------------------+
|
|
| Table | Create Table |
|
|
+-----------------+-------------------------------------------------------------------------------+
|
|
| logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( |
|
|
| | "another_partition_key" STRING NULL, |
|
|
| | "cpu" DOUBLE NULL, |
|
|
| | "host" STRING NULL, |
|
|
| | "one_partition_key" STRING NULL, |
|
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
|
| | TIME INDEX ("ts"), |
|
|
| | PRIMARY KEY ("another_partition_key", "host", "one_partition_key") |
|
|
| | ) |
|
|
| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( |
|
|
| | host <= 'host1', |
|
|
| | host > 'host1' AND host <= 'host2', |
|
|
| | host > 'host2' |
|
|
| | ) |
|
|
| | ENGINE=metric |
|
|
| | WITH( |
|
|
| | on_physical_table = 'metric_engine_partition' |
|
|
| | ) |
|
|
+-----------------+-------------------------------------------------------------------------------+
|
|
|
|
select count(*) from logical_table_2;
|
|
|
|
+----------+
|
|
| count(*) |
|
|
+----------+
|
|
| 3 |
|
|
+----------+
|
|
|
|
-- check if part col aggr push down works with only subset of phy part cols
|
|
select host, count(*) from logical_table_2 GROUP BY host ORDER BY host;
|
|
|
|
+-------+----------+
|
|
| host | count(*) |
|
|
+-------+----------+
|
|
| host1 | 1 |
|
|
| host2 | 1 |
|
|
| host3 | 1 |
|
|
+-------+----------+
|
|
|
|
-- SQLNESS REPLACE (-+) -
|
|
-- SQLNESS REPLACE (\s\s+) _
|
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
|
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
|
|
EXPLAIN
|
|
select host, count(*) from logical_table_2 GROUP BY host ORDER BY host;
|
|
|
|
+-+-+
|
|
| plan_type_| plan_|
|
|
+-+-+
|
|
| logical_plan_| Sort: logical_table_2.host ASC NULLS LAST_|
|
|
|_|_Projection: logical_table_2.host, count(Int64(1)) AS count(*)_|
|
|
|_|_Aggregate: groupBy=[[logical_table_2.host]], aggr=[[__count_merge(__count_state(logical_table_2.ts)) AS count(Int64(1))]] |
|
|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
|
|_| Aggregate: groupBy=[[logical_table_2.host]], aggr=[[__count_state(logical_table_2.ts)]]_|
|
|
|_|_TableScan: logical_table_2_|
|
|
|_| ]]_|
|
|
| physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_|
|
|
|_|_SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
|
|_|_ProjectionExec: expr=[host@0 as host, count(Int64(1))@1 as count(*)]_|
|
|
|_|_AggregateExec: mode=SinglePartitioned, gby=[host@0 as host], aggr=[count(Int64(1))]_|
|
|
|_|_MergeScanExec: REDACTED
|
|
|_|_|
|
|
+-+-+
|
|
|
|
-- check if step aggr push down works with non-part col
|
|
select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts;
|
|
|
|
+---------------------+----------+
|
|
| ts | count(*) |
|
|
+---------------------+----------+
|
|
| 2023-01-01T00:00:00 | 1 |
|
|
| 2023-01-01T00:00:01 | 1 |
|
|
| 2023-01-01T00:00:02 | 1 |
|
|
+---------------------+----------+
|
|
|
|
-- SQLNESS REPLACE (-+) -
|
|
-- SQLNESS REPLACE (\s\s+) _
|
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
|
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
|
|
EXPLAIN
|
|
select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts;
|
|
|
|
+-+-+
|
|
| plan_type_| plan_|
|
|
+-+-+
|
|
| logical_plan_| Sort: logical_table_2.ts ASC NULLS LAST_|
|
|
|_|_Projection: logical_table_2.ts, count(Int64(1)) AS count(*)_|
|
|
|_|_Aggregate: groupBy=[[logical_table_2.ts]], aggr=[[__count_merge(__count_state(logical_table_2.ts)) AS count(Int64(1))]] |
|
|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
|
|_| Aggregate: groupBy=[[logical_table_2.ts]], aggr=[[__count_state(logical_table_2.ts)]]_|
|
|
|_|_TableScan: logical_table_2_|
|
|
|_| ]]_|
|
|
| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST]_|
|
|
|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
|
|_|_ProjectionExec: expr=[ts@0 as ts, count(Int64(1))@1 as count(*)]_|
|
|
|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(Int64(1))]_|
|
|
|_|_RepartitionExec: REDACTED
|
|
|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(Int64(1))]_|
|
|
|_|_MergeScanExec: REDACTED
|
|
|_|_|
|
|
+-+-+
|
|
|
|
create table logical_table_3 (
|
|
ts timestamp time index,
|
|
a string,
|
|
z string,
|
|
cpu double,
|
|
primary key(a, z) -- trigger a physical table change with smaller and bigger column ids
|
|
)
|
|
engine = metric
|
|
with (
|
|
on_physical_table = "metric_engine_partition",
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
show create table logical_table_3;
|
|
|
|
+-----------------+--------------------------------------------------------------------------------+
|
|
| Table | Create Table |
|
|
+-----------------+--------------------------------------------------------------------------------+
|
|
| logical_table_3 | CREATE TABLE IF NOT EXISTS "logical_table_3" ( |
|
|
| | "a" STRING NULL, |
|
|
| | "another_partition_key" STRING NULL, |
|
|
| | "cpu" DOUBLE NULL, |
|
|
| | "host" STRING NULL, |
|
|
| | "one_partition_key" STRING NULL, |
|
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
|
| | "z" STRING NULL, |
|
|
| | TIME INDEX ("ts"), |
|
|
| | PRIMARY KEY ("a", "another_partition_key", "host", "one_partition_key", "z") |
|
|
| | ) |
|
|
| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( |
|
|
| | host <= 'host1', |
|
|
| | host > 'host1' AND host <= 'host2', |
|
|
| | host > 'host2' |
|
|
| | ) |
|
|
| | ENGINE=metric |
|
|
| | WITH( |
|
|
| | on_physical_table = 'metric_engine_partition' |
|
|
| | ) |
|
|
+-----------------+--------------------------------------------------------------------------------+
|
|
|
|
insert into logical_table_3(ts, a, z, cpu) values
|
|
('2023-01-01 00:00:00', 'a1', 'z1', 1.0),
|
|
('2023-01-01 00:00:01', 'a2', 'z2', 2.0),
|
|
('2023-01-01 00:00:02', 'a3', 'z3', 3.0);
|
|
|
|
Affected Rows: 3
|
|
|
|
select count(*) from logical_table_3;
|
|
|
|
+----------+
|
|
| count(*) |
|
|
+----------+
|
|
| 3 |
|
|
+----------+
|
|
|
|
-- check if step aggr push down works with non-part col
|
|
select a, count(*) from logical_table_3 GROUP BY a ORDER BY a;
|
|
|
|
+----+----------+
|
|
| a | count(*) |
|
|
+----+----------+
|
|
| a1 | 1 |
|
|
| a2 | 1 |
|
|
| a3 | 1 |
|
|
+----+----------+
|
|
|
|
-- SQLNESS REPLACE (-+) -
|
|
-- SQLNESS REPLACE (\s\s+) _
|
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
|
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
|
|
EXPLAIN
|
|
select a, count(*) from logical_table_3 GROUP BY a ORDER BY a;
|
|
|
|
+-+-+
|
|
| plan_type_| plan_|
|
|
+-+-+
|
|
| logical_plan_| Sort: logical_table_3.a ASC NULLS LAST_|
|
|
|_|_Projection: logical_table_3.a, count(Int64(1)) AS count(*)_|
|
|
|_|_Aggregate: groupBy=[[logical_table_3.a]], aggr=[[__count_merge(__count_state(logical_table_3.ts)) AS count(Int64(1))]] |
|
|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
|
|_| Aggregate: groupBy=[[logical_table_3.a]], aggr=[[__count_state(logical_table_3.ts)]]_|
|
|
|_|_TableScan: logical_table_3_|
|
|
|_| ]]_|
|
|
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST]_|
|
|
|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
|
|_|_ProjectionExec: expr=[a@0 as a, count(Int64(1))@1 as count(*)]_|
|
|
|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))]_|
|
|
|_|_RepartitionExec: REDACTED
|
|
|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))]_|
|
|
|_|_MergeScanExec: REDACTED
|
|
|_|_|
|
|
+-+-+
|
|
|
|
-- create a logical table without partition columns on physical table
|
|
create table logical_table_4 (
|
|
ts timestamp time index,
|
|
cpu double,
|
|
)
|
|
engine = metric
|
|
with (
|
|
on_physical_table = "metric_engine_partition",
|
|
);
|
|
|
|
Affected Rows: 0
|
|
|
|
show create table logical_table_4;
|
|
|
|
+-----------------+-------------------------------------------------------------------------------+
|
|
| Table | Create Table |
|
|
+-----------------+-------------------------------------------------------------------------------+
|
|
| logical_table_4 | CREATE TABLE IF NOT EXISTS "logical_table_4" ( |
|
|
| | "another_partition_key" STRING NULL, |
|
|
| | "cpu" DOUBLE NULL, |
|
|
| | "host" STRING NULL, |
|
|
| | "one_partition_key" STRING NULL, |
|
|
| | "ts" TIMESTAMP(3) NOT NULL, |
|
|
| | TIME INDEX ("ts"), |
|
|
| | PRIMARY KEY ("another_partition_key", "host", "one_partition_key") |
|
|
| | ) |
|
|
| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( |
|
|
| | host <= 'host1', |
|
|
| | host > 'host1' AND host <= 'host2', |
|
|
| | host > 'host2' |
|
|
| | ) |
|
|
| | ENGINE=metric |
|
|
| | WITH( |
|
|
| | on_physical_table = 'metric_engine_partition' |
|
|
| | ) |
|
|
+-----------------+-------------------------------------------------------------------------------+
|
|
|
|
insert into logical_table_4(ts, cpu) values
|
|
('2023-01-01 00:00:00', 1.0),
|
|
('2023-01-01 00:00:01', 2.0),
|
|
('2023-01-01 00:00:02', 3.0);
|
|
|
|
Affected Rows: 3
|
|
|
|
-- this should only return one row
|
|
select count(*) from logical_table_4;
|
|
|
|
+----------+
|
|
| count(*) |
|
|
+----------+
|
|
| 3 |
|
|
+----------+
|
|
|
|
-- SQLNESS REPLACE (-+) -
|
|
-- SQLNESS REPLACE (\s\s+) _
|
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
|
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
|
|
EXPLAIN select count(*) from logical_table_4;
|
|
|
|
+-+-+
|
|
| plan_type_| plan_|
|
|
+-+-+
|
|
| logical_plan_| Projection: count(Int64(1)) AS count(*)_|
|
|
|_|_Aggregate: groupBy=[[]], aggr=[[__count_merge(__count_state(logical_table_4.ts)) AS count(Int64(1))]] |
|
|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
|
|_| Aggregate: groupBy=[[]], aggr=[[__count_state(logical_table_4.ts)]]_|
|
|
|_|_TableScan: logical_table_4_|
|
|
|_| ]]_|
|
|
| physical_plan | ProjectionExec: expr=[count(Int64(1))@0 as count(*)]_|
|
|
|_|_AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]_|
|
|
|_|_CoalescePartitionsExec_|
|
|
|_|_AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]_|
|
|
|_|_MergeScanExec: REDACTED
|
|
|_|_|
|
|
+-+-+
|
|
|
|
-- check if step aggr push down works with non-part col
|
|
select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts;
|
|
|
|
+---------------------+----------+
|
|
| ts | count(*) |
|
|
+---------------------+----------+
|
|
| 2023-01-01T00:00:00 | 1 |
|
|
| 2023-01-01T00:00:01 | 1 |
|
|
| 2023-01-01T00:00:02 | 1 |
|
|
+---------------------+----------+
|
|
|
|
-- SQLNESS REPLACE (-+) -
|
|
-- SQLNESS REPLACE (\s\s+) _
|
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
|
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
|
|
EXPLAIN
|
|
select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts;
|
|
|
|
+-+-+
|
|
| plan_type_| plan_|
|
|
+-+-+
|
|
| logical_plan_| Sort: logical_table_4.ts ASC NULLS LAST_|
|
|
|_|_Projection: logical_table_4.ts, count(Int64(1)) AS count(*)_|
|
|
|_|_Aggregate: groupBy=[[logical_table_4.ts]], aggr=[[__count_merge(__count_state(logical_table_4.ts)) AS count(Int64(1))]] |
|
|
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|
|
|_| Aggregate: groupBy=[[logical_table_4.ts]], aggr=[[__count_state(logical_table_4.ts)]]_|
|
|
|_|_TableScan: logical_table_4_|
|
|
|_| ]]_|
|
|
| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST]_|
|
|
|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|
|
|_|_ProjectionExec: expr=[ts@0 as ts, count(Int64(1))@1 as count(*)]_|
|
|
|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(Int64(1))]_|
|
|
|_|_RepartitionExec: REDACTED
|
|
|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(Int64(1))]_|
|
|
|_|_MergeScanExec: REDACTED
|
|
|_|_|
|
|
+-+-+
|
|
|
|
select * from logical_table_4;
|
|
|
|
+-----------------------+-----+------+-------------------+---------------------+
|
|
| another_partition_key | cpu | host | one_partition_key | ts |
|
|
+-----------------------+-----+------+-------------------+---------------------+
|
|
| | 1.0 | | | 2023-01-01T00:00:00 |
|
|
| | 2.0 | | | 2023-01-01T00:00:01 |
|
|
| | 3.0 | | | 2023-01-01T00:00:02 |
|
|
+-----------------------+-----+------+-------------------+---------------------+
|
|
|
|
-- SQLNESS REPLACE (-+) -
|
|
-- SQLNESS REPLACE (\s\s+) _
|
|
-- SQLNESS REPLACE (peers.*) REDACTED
|
|
-- SQLNESS REPLACE (metrics.*) REDACTED
|
|
-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED
|
|
EXPLAIN select * from logical_table_4;
|
|
|
|
+-+-+
|
|
| plan_type_| plan_|
|
|
+-+-+
|
|
| logical_plan_| MergeScan [is_placeholder=false, remote_input=[_|
|
|
|_| Projection: logical_table_4.another_partition_key, logical_table_4.cpu, logical_table_4.host, logical_table_4.one_partition_key, logical_table_4.ts |
|
|
|_|_TableScan: logical_table_4_|
|
|
|_| ]]_|
|
|
| physical_plan | CooperativeExec_|
|
|
|_|_MergeScanExec: REDACTED
|
|
|_|_|
|
|
+-+-+
|
|
|
|
drop table logical_table_1;
|
|
|
|
Affected Rows: 0
|
|
|
|
drop table logical_table_2;
|
|
|
|
Affected Rows: 0
|
|
|
|
drop table logical_table_3;
|
|
|
|
Affected Rows: 0
|
|
|
|
drop table logical_table_4;
|
|
|
|
Affected Rows: 0
|
|
|
|
drop table metric_engine_partition;
|
|
|
|
Affected Rows: 0
|
|
|