diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index f486d35bdd..bde323afe7 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -22,7 +22,9 @@ use common_catalog::consts::{ PG_CATALOG_NAME, }; use common_error::ext::BoxedError; -use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef}; +use common_meta::cache::{ + LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef, +}; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::schema_name::SchemaNameKey; @@ -266,16 +268,68 @@ impl CatalogManager for KvBackendCatalogManager { let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu { name: "table_cache", })?; - if let Some(table) = table_cache + let table_route_cache: TableRouteCacheRef = + self.cache_registry.get().context(CacheNotFoundSnafu { + name: "table_route_cache", + })?; + let table = table_cache .get_by_ref(&TableName { catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), table_name: table_name.to_string(), }) .await - .context(GetTableCacheSnafu)? + .context(GetTableCacheSnafu)?; + + // Override logical table's partition key indices with physical table's. + if let Some(table) = &table + && let Some(table_route_value) = table_route_cache + .get(table.table_info().table_id()) + .await + .context(TableMetadataManagerSnafu)? + && let TableRoute::Logical(logical_route) = &*table_route_value + && let Some(physical_table_info_value) = self + .table_metadata_manager + .table_info_manager() + .get(logical_route.physical_table_id()) + .await + .context(TableMetadataManagerSnafu)? { - return Ok(Some(table)); + let mut new_table_info = (*table.table_info()).clone(); + // Gather all column names from the logical table + let logical_column_names: std::collections::HashSet<_> = new_table_info + .meta + .schema + .column_schemas() + .iter() + .map(|col| &col.name) + .collect(); + + // Only preserve partition key indices where the corresponding columns exist in logical table + new_table_info.meta.partition_key_indices = physical_table_info_value + .table_info + .meta + .partition_key_indices + .iter() + .filter(|&&index| { + if let Some(physical_column) = physical_table_info_value + .table_info + .meta + .schema + .column_schemas + .get(index) + { + logical_column_names.contains(&physical_column.name) + } else { + false + } + }) + .cloned() + .collect(); + + let new_table = DistTable::table(Arc::new(new_table_info)); + + return Ok(Some(new_table)); } if channel == Channel::Postgres { @@ -288,7 +342,7 @@ impl CatalogManager for KvBackendCatalogManager { } } - return Ok(None); + Ok(table) } async fn tables_by_ids( diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 0e92601eb6..01d91d80b1 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -14,6 +14,7 @@ #![feature(assert_matches)] #![feature(try_blocks)] +#![feature(let_chains)] use std::any::Any; use std::fmt::{Debug, Formatter}; diff --git a/tests/cases/standalone/common/create/metric_engine_partition.result b/tests/cases/standalone/common/create/metric_engine_partition.result index 4e0674a497..a9740e6e24 100644 --- a/tests/cases/standalone/common/create/metric_engine_partition.result +++ b/tests/cases/standalone/common/create/metric_engine_partition.result @@ -48,6 +48,20 @@ with ( Affected Rows: 0 +create table logical_table_3 ( + ts timestamp time index, + a string, + z string, + cpu double, + primary key(a, z) -- trigger a physical table change with smaller and bigger column ids +) +engine = metric +with ( + on_physical_table = "metric_engine_partition", +); + +Affected Rows: 0 + show create table logical_table_2; +-----------------+-------------------------------------------------+ @@ -71,10 +85,22 @@ show create table logical_table_2; | | ) | +-----------------+-------------------------------------------------+ +select count(*) from logical_table_2; + ++----------+ +| count(*) | ++----------+ +| 0 | ++----------+ + drop table logical_table_2; Affected Rows: 0 +drop table logical_table_3; + +Affected Rows: 0 + drop table metric_engine_partition; Affected Rows: 0 diff --git a/tests/cases/standalone/common/create/metric_engine_partition.sql b/tests/cases/standalone/common/create/metric_engine_partition.sql index d224fd8c7d..28d5e45157 100644 --- a/tests/cases/standalone/common/create/metric_engine_partition.sql +++ b/tests/cases/standalone/common/create/metric_engine_partition.sql @@ -36,8 +36,24 @@ with ( on_physical_table = "metric_engine_partition", ); +create table logical_table_3 ( + ts timestamp time index, + a string, + z string, + cpu double, + primary key(a, z) -- trigger a physical table change with smaller and bigger column ids +) +engine = metric +with ( + on_physical_table = "metric_engine_partition", +); + show create table logical_table_2; +select count(*) from logical_table_2; + drop table logical_table_2; +drop table logical_table_3; + drop table metric_engine_partition;