create table metric_engine_partition ( ts timestamp time index, host string, cpu double, `one_partition_key` string, `another_partition_key` string, primary key(host, `one_partition_key`, `another_partition_key`) ) partition on columns (host, `one_partition_key`, `another_partition_key`) ( host <= 'host1', host > 'host1' and host <= 'host2', host > 'host2' ) engine = metric with ( physical_metric_table = "true", ); Affected Rows: 0 select count(*) from metric_engine_partition; +----------+ | count(*) | +----------+ | 0 | +----------+ create table logical_table_1 ( ts timestamp time index, host string primary key, cpu double, ) partition on columns (host) () engine = metric with ( on_physical_table = "metric_engine_partition", ); Affected Rows: 0 create table invalid_logical_partition ( ts timestamp time index, host string primary key, cpu double, ) partition on columns (host) ( host <= 'host1', host > 'host1' and host <= 'host2', host > 'host2' and host <= 'host3', host > 'host3' ) engine = metric with ( on_physical_table = "metric_engine_partition", ); Error: 1004(InvalidArguments), Invalid partition rule: logical table partition rule must match the corresponding physical table's logical table partition exprs: [PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host1"))) }, PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host3"))) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host1"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host2"))) }) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host2"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host3"))) }) }] physical table partition exprs: [PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host1"))) }, PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host2"))) }, PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("host"), op: Gt, rhs: Value(String(StringBytes("host1"))) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("host"), op: LtEq, rhs: Value(String(StringBytes("host2"))) }) }] create table logical_table_2 ( ts timestamp time index, host string primary key, cpu double, ) partition on columns (host) ( host <= 'host1', host > 'host1' and host <= 'host2', host > 'host2' ) engine = metric with ( on_physical_table = "metric_engine_partition", ); Affected Rows: 0 insert into logical_table_2(ts, host, cpu) values ('2023-01-01 00:00:00', 'host1', 1.0), ('2023-01-01 00:00:01', 'host2', 2.0), ('2023-01-01 00:00:02', 'host3', 3.0); Affected Rows: 3 show create table logical_table_2; +-----------------+-------------------------------------------------------------------------------+ | Table | Create Table | +-----------------+-------------------------------------------------------------------------------+ | logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( | | | "another_partition_key" STRING NULL, | | | "cpu" DOUBLE NULL, | | | "host" STRING NULL, | | | "one_partition_key" STRING NULL, | | | "ts" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("ts"), | | | PRIMARY KEY ("another_partition_key", "host", "one_partition_key") | | | ) | | | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( | | | host <= 'host1', | | | host > 'host1' AND host <= 'host2', | | | host > 'host2' | | | ) | | | ENGINE=metric | | | WITH( | | | on_physical_table = 'metric_engine_partition' | | | ) | +-----------------+-------------------------------------------------------------------------------+ select count(*) from logical_table_2; +----------+ | count(*) | +----------+ | 3 | +----------+ -- check if part col aggr push down works with only subset of phy part cols select host, count(*) from logical_table_2 GROUP BY host ORDER BY host; +-------+----------+ | host | count(*) | +-------+----------+ | host1 | 1 | | host2 | 1 | | host3 | 1 | +-------+----------+ -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ -- SQLNESS REPLACE (peers.*) REDACTED -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED EXPLAIN select host, count(*) from logical_table_2 GROUP BY host ORDER BY host; +-+-+ | plan_type_| plan_| +-+-+ | logical_plan_| Sort: logical_table_2.host ASC NULLS LAST_| |_|_Projection: logical_table_2.host, count(Int64(1)) AS count(*)_| |_|_Aggregate: groupBy=[[logical_table_2.host]], aggr=[[__count_merge(__count_state(logical_table_2.ts)) AS count(Int64(1))]] | |_|_MergeScan [is_placeholder=false, remote_input=[_| |_| Aggregate: groupBy=[[logical_table_2.host]], aggr=[[__count_state(logical_table_2.ts)]]_| |_|_TableScan: logical_table_2_| |_| ]]_| | physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_| |_|_SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[true]_| |_|_ProjectionExec: expr=[host@0 as host, count(Int64(1))@1 as count(*)]_| |_|_AggregateExec: mode=SinglePartitioned, gby=[host@0 as host], aggr=[count(Int64(1))]_| |_|_MergeScanExec: REDACTED |_|_| +-+-+ -- check if step aggr push down works with non-part col select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts; +---------------------+----------+ | ts | count(*) | +---------------------+----------+ | 2023-01-01T00:00:00 | 1 | | 2023-01-01T00:00:01 | 1 | | 2023-01-01T00:00:02 | 1 | +---------------------+----------+ -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ -- SQLNESS REPLACE (peers.*) REDACTED -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED EXPLAIN select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts; +-+-+ | plan_type_| plan_| +-+-+ | logical_plan_| Sort: logical_table_2.ts ASC NULLS LAST_| |_|_Projection: logical_table_2.ts, count(Int64(1)) AS count(*)_| |_|_Aggregate: groupBy=[[logical_table_2.ts]], aggr=[[__count_merge(__count_state(logical_table_2.ts)) AS count(Int64(1))]] | |_|_MergeScan [is_placeholder=false, remote_input=[_| |_| Aggregate: groupBy=[[logical_table_2.ts]], aggr=[[__count_state(logical_table_2.ts)]]_| |_|_TableScan: logical_table_2_| |_| ]]_| | physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST]_| |_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true]_| |_|_ProjectionExec: expr=[ts@0 as ts, count(Int64(1))@1 as count(*)]_| |_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(Int64(1))]_| |_|_RepartitionExec: REDACTED |_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(Int64(1))]_| |_|_MergeScanExec: REDACTED |_|_| +-+-+ create table logical_table_3 ( ts timestamp time index, a string, z string, cpu double, primary key(a, z) -- trigger a physical table change with smaller and bigger column ids ) engine = metric with ( on_physical_table = "metric_engine_partition", ); Affected Rows: 0 show create table logical_table_3; +-----------------+--------------------------------------------------------------------------------+ | Table | Create Table | +-----------------+--------------------------------------------------------------------------------+ | logical_table_3 | CREATE TABLE IF NOT EXISTS "logical_table_3" ( | | | "a" STRING NULL, | | | "another_partition_key" STRING NULL, | | | "cpu" DOUBLE NULL, | | | "host" STRING NULL, | | | "one_partition_key" STRING NULL, | | | "ts" TIMESTAMP(3) NOT NULL, | | | "z" STRING NULL, | | | TIME INDEX ("ts"), | | | PRIMARY KEY ("a", "another_partition_key", "host", "one_partition_key", "z") | | | ) | | | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( | | | host <= 'host1', | | | host > 'host1' AND host <= 'host2', | | | host > 'host2' | | | ) | | | ENGINE=metric | | | WITH( | | | on_physical_table = 'metric_engine_partition' | | | ) | +-----------------+--------------------------------------------------------------------------------+ insert into logical_table_3(ts, a, z, cpu) values ('2023-01-01 00:00:00', 'a1', 'z1', 1.0), ('2023-01-01 00:00:01', 'a2', 'z2', 2.0), ('2023-01-01 00:00:02', 'a3', 'z3', 3.0); Affected Rows: 3 select count(*) from logical_table_3; +----------+ | count(*) | +----------+ | 3 | +----------+ -- check if step aggr push down works with non-part col select a, count(*) from logical_table_3 GROUP BY a ORDER BY a; +----+----------+ | a | count(*) | +----+----------+ | a1 | 1 | | a2 | 1 | | a3 | 1 | +----+----------+ -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ -- SQLNESS REPLACE (peers.*) REDACTED -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED EXPLAIN select a, count(*) from logical_table_3 GROUP BY a ORDER BY a; +-+-+ | plan_type_| plan_| +-+-+ | logical_plan_| Sort: logical_table_3.a ASC NULLS LAST_| |_|_Projection: logical_table_3.a, count(Int64(1)) AS count(*)_| |_|_Aggregate: groupBy=[[logical_table_3.a]], aggr=[[__count_merge(__count_state(logical_table_3.ts)) AS count(Int64(1))]] | |_|_MergeScan [is_placeholder=false, remote_input=[_| |_| Aggregate: groupBy=[[logical_table_3.a]], aggr=[[__count_state(logical_table_3.ts)]]_| |_|_TableScan: logical_table_3_| |_| ]]_| | physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST]_| |_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]_| |_|_ProjectionExec: expr=[a@0 as a, count(Int64(1))@1 as count(*)]_| |_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))]_| |_|_RepartitionExec: REDACTED |_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))]_| |_|_MergeScanExec: REDACTED |_|_| +-+-+ -- create a logical table without partition columns on physical table create table logical_table_4 ( ts timestamp time index, cpu double, ) engine = metric with ( on_physical_table = "metric_engine_partition", ); Affected Rows: 0 show create table logical_table_4; +-----------------+-------------------------------------------------------------------------------+ | Table | Create Table | +-----------------+-------------------------------------------------------------------------------+ | logical_table_4 | CREATE TABLE IF NOT EXISTS "logical_table_4" ( | | | "another_partition_key" STRING NULL, | | | "cpu" DOUBLE NULL, | | | "host" STRING NULL, | | | "one_partition_key" STRING NULL, | | | "ts" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("ts"), | | | PRIMARY KEY ("another_partition_key", "host", "one_partition_key") | | | ) | | | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( | | | host <= 'host1', | | | host > 'host1' AND host <= 'host2', | | | host > 'host2' | | | ) | | | ENGINE=metric | | | WITH( | | | on_physical_table = 'metric_engine_partition' | | | ) | +-----------------+-------------------------------------------------------------------------------+ insert into logical_table_4(ts, cpu) values ('2023-01-01 00:00:00', 1.0), ('2023-01-01 00:00:01', 2.0), ('2023-01-01 00:00:02', 3.0); Affected Rows: 3 -- this should only return one row select count(*) from logical_table_4; +----------+ | count(*) | +----------+ | 3 | +----------+ -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ -- SQLNESS REPLACE (peers.*) REDACTED -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED EXPLAIN select count(*) from logical_table_4; +-+-+ | plan_type_| plan_| +-+-+ | logical_plan_| Projection: count(Int64(1)) AS count(*)_| |_|_Aggregate: groupBy=[[]], aggr=[[__count_merge(__count_state(logical_table_4.ts)) AS count(Int64(1))]] | |_|_MergeScan [is_placeholder=false, remote_input=[_| |_| Aggregate: groupBy=[[]], aggr=[[__count_state(logical_table_4.ts)]]_| |_|_TableScan: logical_table_4_| |_| ]]_| | physical_plan | ProjectionExec: expr=[count(Int64(1))@0 as count(*)]_| |_|_AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]_| |_|_CoalescePartitionsExec_| |_|_AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]_| |_|_MergeScanExec: REDACTED |_|_| +-+-+ -- check if step aggr push down works with non-part col select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts; +---------------------+----------+ | ts | count(*) | +---------------------+----------+ | 2023-01-01T00:00:00 | 1 | | 2023-01-01T00:00:01 | 1 | | 2023-01-01T00:00:02 | 1 | +---------------------+----------+ -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ -- SQLNESS REPLACE (peers.*) REDACTED -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED EXPLAIN select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts; +-+-+ | plan_type_| plan_| +-+-+ | logical_plan_| Sort: logical_table_4.ts ASC NULLS LAST_| |_|_Projection: logical_table_4.ts, count(Int64(1)) AS count(*)_| |_|_Aggregate: groupBy=[[logical_table_4.ts]], aggr=[[__count_merge(__count_state(logical_table_4.ts)) AS count(Int64(1))]] | |_|_MergeScan [is_placeholder=false, remote_input=[_| |_| Aggregate: groupBy=[[logical_table_4.ts]], aggr=[[__count_state(logical_table_4.ts)]]_| |_|_TableScan: logical_table_4_| |_| ]]_| | physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST]_| |_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true]_| |_|_ProjectionExec: expr=[ts@0 as ts, count(Int64(1))@1 as count(*)]_| |_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(Int64(1))]_| |_|_RepartitionExec: REDACTED |_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(Int64(1))]_| |_|_MergeScanExec: REDACTED |_|_| +-+-+ select * from logical_table_4; +-----------------------+-----+------+-------------------+---------------------+ | another_partition_key | cpu | host | one_partition_key | ts | +-----------------------+-----+------+-------------------+---------------------+ | | 1.0 | | | 2023-01-01T00:00:00 | | | 2.0 | | | 2023-01-01T00:00:01 | | | 3.0 | | | 2023-01-01T00:00:02 | +-----------------------+-----+------+-------------------+---------------------+ -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ -- SQLNESS REPLACE (peers.*) REDACTED -- SQLNESS REPLACE (metrics.*) REDACTED -- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED EXPLAIN select * from logical_table_4; +-+-+ | plan_type_| plan_| +-+-+ | logical_plan_| MergeScan [is_placeholder=false, remote_input=[_| |_| Projection: logical_table_4.another_partition_key, logical_table_4.cpu, logical_table_4.host, logical_table_4.one_partition_key, logical_table_4.ts | |_|_TableScan: logical_table_4_| |_| ]]_| | physical_plan | CooperativeExec_| |_|_MergeScanExec: REDACTED |_|_| +-+-+ drop table logical_table_1; Affected Rows: 0 drop table logical_table_2; Affected Rows: 0 drop table logical_table_3; Affected Rows: 0 drop table logical_table_4; Affected Rows: 0 drop table metric_engine_partition; Affected Rows: 0