diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index df4489a519..4008fe38f9 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -44,6 +44,7 @@ use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use table::dist_table::DistTable; use table::metadata::{TableId, TableInfoRef}; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; +use table::table::PartitionRules; use table::table_name::TableName; use table::TableRef; use tokio::sync::Semaphore; @@ -132,6 +133,8 @@ impl KvBackendCatalogManager { { let mut new_table_info = (*table.table_info()).clone(); + let mut phy_part_cols_not_in_logical_table = vec![]; + // Remap partition key indices from physical table to logical table new_table_info.meta.partition_key_indices = physical_table_info_value .table_info @@ -148,15 +151,30 @@ impl KvBackendCatalogManager { .get(physical_index) .and_then(|physical_column| { // Find the corresponding index in the logical table schema - new_table_info + let idx = new_table_info .meta .schema - .column_index_by_name(physical_column.name.as_str()) + .column_index_by_name(physical_column.name.as_str()); + if idx.is_none() { + // not all part columns in physical table that are also in logical table + phy_part_cols_not_in_logical_table + .push(physical_column.name.clone()); + } + + idx }) }) .collect(); - let new_table = DistTable::table(Arc::new(new_table_info)); + let partition_rules = if !phy_part_cols_not_in_logical_table.is_empty() { + Some(PartitionRules { + extra_phy_cols_not_in_logical_table: phy_part_cols_not_in_logical_table, + }) + } else { + None + }; + + let new_table = DistTable::table_partitioned(Arc::new(new_table_info), partition_rules); return Ok(new_table); } diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index d59ba9ac76..af290e4ade 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -482,14 +482,32 @@ impl PlanRewriter { .as_any() .downcast_ref::() { - if provider.table().table_type() == TableType::Base { - let info = provider.table().table_info(); + let table = provider.table(); + if table.table_type() == TableType::Base { + let info = table.table_info(); let partition_key_indices = info.meta.partition_key_indices.clone(); let schema = info.meta.schema.clone(); - let partition_cols = partition_key_indices + let mut partition_cols = partition_key_indices .into_iter() .map(|index| schema.column_name_by_index(index).to_string()) .collect::>(); + + let partition_rules = table.partition_rules(); + let exist_phy_part_cols_not_in_logical_table = partition_rules + .map(|r| !r.extra_phy_cols_not_in_logical_table.is_empty()) + .unwrap_or(false); + + if exist_phy_part_cols_not_in_logical_table && partition_cols.is_empty() { + // there are other physical partition columns that are not in logical table and part cols are empty + // so we need to add a placeholder for it to prevent certain optimization + // this is used to make sure the final partition columns(that optimizer see) are not empty + // notice if originally partition_cols is not empty, then there is no need to add this place holder, + // as subset of phy part cols can still be used for certain optimization, and it works as if + // those columns are always null + // This helps with distinguishing between non-partitioned table and partitioned table with all phy part cols not in logical table + partition_cols + .push("__OTHER_PHYSICAL_PART_COLS_PLACEHOLDER__".to_string()); + } self.partition_cols = Some(partition_cols); } } diff --git a/src/table/src/dist_table.rs b/src/table/src/dist_table.rs index da3de28933..deced5d5dc 100644 --- a/src/table/src/dist_table.rs +++ b/src/table/src/dist_table.rs @@ -21,6 +21,7 @@ use store_api::storage::ScanRequest; use crate::error::UnsupportedSnafu; use crate::metadata::{FilterPushDownType, TableInfoRef}; +use crate::table::PartitionRules; use crate::{Table, TableRef}; #[derive(Clone)] @@ -32,6 +33,20 @@ impl DistTable { let table = Table::new(table_info, FilterPushDownType::Inexact, data_source); Arc::new(table) } + + pub fn table_partitioned( + table_info: TableInfoRef, + partition_rule: Option, + ) -> TableRef { + let data_source = Arc::new(DummyDataSource); + let table = Table::new_partitioned( + table_info, + FilterPushDownType::Inexact, + data_source, + partition_rule, + ); + Arc::new(table) + } } pub struct DummyDataSource; diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 885d9aacf2..1fbfcb9dee 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -52,6 +52,16 @@ lazy_static! { }; } +/// Defines partition rules for a table. +/// TODO(discord9): add the entire partition exprs rules here later +pub struct PartitionRules { + /// The physical partition columns that are not in the logical table. + /// only used in kvbackend manager to store the physical partition columns that are not in the logical table. + /// This is used to avoid the partition columns in the physical table that are not in the logical table + /// to prevent certain optimizations, if table is not a logical table, this should be empty + pub extra_phy_cols_not_in_logical_table: Vec, +} + pub type TableRef = Arc; /// Table handle. @@ -61,6 +71,7 @@ pub struct Table { data_source: DataSourceRef, /// Columns default [`Expr`] column_defaults: HashMap, + partition_rules: Option, } impl Table { @@ -74,6 +85,22 @@ impl Table { table_info, filter_pushdown, data_source, + partition_rules: None, + } + } + + pub fn new_partitioned( + table_info: TableInfoRef, + filter_pushdown: FilterPushDownType, + data_source: DataSourceRef, + partition_rules: Option, + ) -> Self { + Self { + column_defaults: collect_column_defaults(table_info.meta.schema.column_schemas()), + table_info, + filter_pushdown, + data_source, + partition_rules, } } @@ -101,6 +128,10 @@ impl Table { self.table_info.table_type } + pub fn partition_rules(&self) -> Option<&PartitionRules> { + self.partition_rules.as_ref() + } + pub async fn scan_to_stream(&self, request: ScanRequest) -> Result { self.data_source .get_stream(request) diff --git a/tests/cases/standalone/common/create/metric_engine_partition.result b/tests/cases/standalone/common/create/metric_engine_partition.result index 028a788230..8dda7887e7 100644 --- a/tests/cases/standalone/common/create/metric_engine_partition.result +++ b/tests/cases/standalone/common/create/metric_engine_partition.result @@ -51,19 +51,12 @@ with ( Affected Rows: 0 -create table logical_table_3 ( - ts timestamp time index, - a string, - z string, - cpu double, - primary key(a, z) -- trigger a physical table change with smaller and bigger column ids -) -engine = metric -with ( - on_physical_table = "metric_engine_partition", -); +insert into logical_table_2(ts, host, cpu) values +('2023-01-01 00:00:00', 'host1', 1.0), +('2023-01-01 00:00:01', 'host2', 2.0), +('2023-01-01 00:00:02', 'host3', 3.0); -Affected Rows: 0 +Affected Rows: 3 show create table logical_table_2; @@ -93,9 +86,273 @@ select count(*) from logical_table_2; +----------+ | count(*) | +----------+ -| 0 | +| 3 | +----------+ +-- check if part col aggr push down works with only subset of phy part cols +select host, count(*) from logical_table_2 GROUP BY host ORDER BY host; + ++-------+----------+ +| host | count(*) | ++-------+----------+ +| host1 | 1 | +| host2 | 1 | +| host3 | 1 | ++-------+----------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN +select host, count(*) from logical_table_2 GROUP BY host ORDER BY host; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| MergeSort: logical_table_2.host ASC NULLS LAST_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Sort: logical_table_2.host ASC NULLS LAST_| +|_|_Projection: logical_table_2.host, count(*)_| +|_|_Aggregate: groupBy=[[logical_table_2.host]], aggr=[[count(logical_table_2.ts) AS count(*)]]_| +|_|_TableScan: logical_table_2_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +-- check if step aggr push down works with non-part col +select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts; + ++---------------------+----------+ +| ts | count(*) | ++---------------------+----------+ +| 2023-01-01T00:00:00 | 1 | +| 2023-01-01T00:00:01 | 1 | +| 2023-01-01T00:00:02 | 1 | ++---------------------+----------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN +select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: logical_table_2.ts ASC NULLS LAST_| +|_|_Aggregate: groupBy=[[logical_table_2.ts]], aggr=[[__count_merge(__count_state(logical_table_2.ts)) AS count(*)]] | +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Aggregate: groupBy=[[logical_table_2.ts]], aggr=[[__count_state(logical_table_2.ts)]]_| +|_|_TableScan: logical_table_2_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(*)]_| +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: REDACTED +|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(*)]_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +create table logical_table_3 ( + ts timestamp time index, + a string, + z string, + cpu double, + primary key(a, z) -- trigger a physical table change with smaller and bigger column ids +) +engine = metric +with ( + on_physical_table = "metric_engine_partition", +); + +Affected Rows: 0 + +show create table logical_table_3; + ++-----------------+-------------------------------------------------+ +| Table | Create Table | ++-----------------+-------------------------------------------------+ +| logical_table_3 | CREATE TABLE IF NOT EXISTS "logical_table_3" ( | +| | "a" STRING NULL, | +| | "cpu" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "z" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("a", "z") | +| | ) | +| | | +| | ENGINE=metric | +| | WITH( | +| | on_physical_table = 'metric_engine_partition' | +| | ) | ++-----------------+-------------------------------------------------+ + +insert into logical_table_3(ts, a, z, cpu) values +('2023-01-01 00:00:00', 'a1', 'z1', 1.0), +('2023-01-01 00:00:01', 'a2', 'z2', 2.0), +('2023-01-01 00:00:02', 'a3', 'z3', 3.0); + +Affected Rows: 3 + +select count(*) from logical_table_3; + ++----------+ +| count(*) | ++----------+ +| 3 | ++----------+ + +-- check if step aggr push down works with non-part col +select a, count(*) from logical_table_3 GROUP BY a ORDER BY a; + ++----+----------+ +| a | count(*) | ++----+----------+ +| a1 | 1 | +| a2 | 1 | +| a3 | 1 | ++----+----------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN +select a, count(*) from logical_table_3 GROUP BY a ORDER BY a; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: logical_table_3.a ASC NULLS LAST_| +|_|_Aggregate: groupBy=[[logical_table_3.a]], aggr=[[__count_merge(__count_state(logical_table_3.ts)) AS count(*)]] | +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Aggregate: groupBy=[[logical_table_3.a]], aggr=[[__count_state(logical_table_3.ts)]]_| +|_|_TableScan: logical_table_3_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)]_| +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: REDACTED +|_|_AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)]_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +-- create a logical table without partition columns on physical table +create table logical_table_4 ( + ts timestamp time index, + cpu double, +) +engine = metric +with ( + on_physical_table = "metric_engine_partition", +); + +Affected Rows: 0 + +show create table logical_table_4; + ++-----------------+-------------------------------------------------+ +| Table | Create Table | ++-----------------+-------------------------------------------------+ +| logical_table_4 | CREATE TABLE IF NOT EXISTS "logical_table_4" ( | +| | "cpu" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=metric | +| | WITH( | +| | on_physical_table = 'metric_engine_partition' | +| | ) | ++-----------------+-------------------------------------------------+ + +insert into logical_table_4(ts, cpu) values +('2023-01-01 00:00:00', 1.0), +('2023-01-01 00:00:01', 2.0), +('2023-01-01 00:00:02', 3.0); + +Affected Rows: 3 + +-- this should only return one row +select count(*) from logical_table_4; + ++----------+ +| count(*) | ++----------+ +| 3 | ++----------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN select count(*) from logical_table_4; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Aggregate: groupBy=[[]], aggr=[[__count_merge(__count_state(logical_table_4.ts)) AS count(*)]]_| +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Aggregate: groupBy=[[]], aggr=[[__count_state(logical_table_4.ts)]]_| +|_|_TableScan: logical_table_4_| +|_| ]]_| +| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[count(*)]_| +|_|_CoalescePartitionsExec_| +|_|_AggregateExec: mode=Partial, gby=[], aggr=[count(*)]_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + +-- check if step aggr push down works with non-part col +select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts; + ++---------------------+----------+ +| ts | count(*) | ++---------------------+----------+ +| 2023-01-01T00:00:00 | 1 | +| 2023-01-01T00:00:01 | 1 | +| 2023-01-01T00:00:02 | 1 | ++---------------------+----------+ + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN +select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts; + ++-+-+ +| plan_type_| plan_| ++-+-+ +| logical_plan_| Sort: logical_table_4.ts ASC NULLS LAST_| +|_|_Aggregate: groupBy=[[logical_table_4.ts]], aggr=[[__count_merge(__count_state(logical_table_4.ts)) AS count(*)]] | +|_|_MergeScan [is_placeholder=false, remote_input=[_| +|_| Aggregate: groupBy=[[logical_table_4.ts]], aggr=[[__count_state(logical_table_4.ts)]]_| +|_|_TableScan: logical_table_4_| +|_| ]]_| +| physical_plan | SortPreservingMergeExec: [ts@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[ts@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_AggregateExec: mode=FinalPartitioned, gby=[ts@0 as ts], aggr=[count(*)]_| +|_|_CoalesceBatchesExec: target_batch_size=8192_| +|_|_RepartitionExec: REDACTED +|_|_AggregateExec: mode=Partial, gby=[ts@0 as ts], aggr=[count(*)]_| +|_|_MergeScanExec: REDACTED +|_|_| ++-+-+ + drop table logical_table_2; Affected Rows: 0 @@ -104,6 +361,10 @@ drop table logical_table_3; Affected Rows: 0 +drop table logical_table_4; + +Affected Rows: 0 + drop table metric_engine_partition; Affected Rows: 0 diff --git a/tests/cases/standalone/common/create/metric_engine_partition.sql b/tests/cases/standalone/common/create/metric_engine_partition.sql index d068c88465..d967533a28 100644 --- a/tests/cases/standalone/common/create/metric_engine_partition.sql +++ b/tests/cases/standalone/common/create/metric_engine_partition.sql @@ -39,6 +39,37 @@ with ( on_physical_table = "metric_engine_partition", ); +insert into logical_table_2(ts, host, cpu) values +('2023-01-01 00:00:00', 'host1', 1.0), +('2023-01-01 00:00:01', 'host2', 2.0), +('2023-01-01 00:00:02', 'host3', 3.0); + +show create table logical_table_2; + +select count(*) from logical_table_2; + +-- check if part col aggr push down works with only subset of phy part cols +select host, count(*) from logical_table_2 GROUP BY host ORDER BY host; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN +select host, count(*) from logical_table_2 GROUP BY host ORDER BY host; + +-- check if step aggr push down works with non-part col +select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN +select ts, count(*) from logical_table_2 GROUP BY ts ORDER BY ts; + create table logical_table_3 ( ts timestamp time index, a string, @@ -51,12 +82,68 @@ with ( on_physical_table = "metric_engine_partition", ); -show create table logical_table_2; +show create table logical_table_3; -select count(*) from logical_table_2; +insert into logical_table_3(ts, a, z, cpu) values +('2023-01-01 00:00:00', 'a1', 'z1', 1.0), +('2023-01-01 00:00:01', 'a2', 'z2', 2.0), +('2023-01-01 00:00:02', 'a3', 'z3', 3.0); + +select count(*) from logical_table_3; + +-- check if step aggr push down works with non-part col +select a, count(*) from logical_table_3 GROUP BY a ORDER BY a; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN +select a, count(*) from logical_table_3 GROUP BY a ORDER BY a; + +-- create a logical table without partition columns on physical table +create table logical_table_4 ( + ts timestamp time index, + cpu double, +) +engine = metric +with ( + on_physical_table = "metric_engine_partition", +); + +show create table logical_table_4; + +insert into logical_table_4(ts, cpu) values +('2023-01-01 00:00:00', 1.0), +('2023-01-01 00:00:01', 2.0), +('2023-01-01 00:00:02', 3.0); + +-- this should only return one row +select count(*) from logical_table_4; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN select count(*) from logical_table_4; + +-- check if step aggr push down works with non-part col +select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts; + +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RepartitionExec:.*) RepartitionExec: REDACTED +EXPLAIN +select ts, count(*) from logical_table_4 GROUP BY ts ORDER BY ts; drop table logical_table_2; drop table logical_table_3; +drop table logical_table_4; + drop table metric_engine_partition;