fix: override logical table's partition column with physical table's (#6326)

* fix: override logical table's partition column with physical table's

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add more sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* naming

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-06-17 16:00:54 +08:00
committed by GitHub
parent 53c4fd478e
commit 16e7f7b64b
4 changed files with 102 additions and 5 deletions

View File

@@ -22,7 +22,9 @@ use common_catalog::consts::{
PG_CATALOG_NAME,
};
use common_error::ext::BoxedError;
use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef};
use common_meta::cache::{
LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef,
};
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
@@ -266,16 +268,68 @@ impl CatalogManager for KvBackendCatalogManager {
let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_cache",
})?;
if let Some(table) = table_cache
let table_route_cache: TableRouteCacheRef =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
})?;
let table = table_cache
.get_by_ref(&TableName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
})
.await
.context(GetTableCacheSnafu)?
.context(GetTableCacheSnafu)?;
// Override logical table's partition key indices with physical table's.
if let Some(table) = &table
&& let Some(table_route_value) = table_route_cache
.get(table.table_info().table_id())
.await
.context(TableMetadataManagerSnafu)?
&& let TableRoute::Logical(logical_route) = &*table_route_value
&& let Some(physical_table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(logical_route.physical_table_id())
.await
.context(TableMetadataManagerSnafu)?
{
return Ok(Some(table));
let mut new_table_info = (*table.table_info()).clone();
// Gather all column names from the logical table
let logical_column_names: std::collections::HashSet<_> = new_table_info
.meta
.schema
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();
// Only preserve partition key indices where the corresponding columns exist in logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
.meta
.partition_key_indices
.iter()
.filter(|&&index| {
if let Some(physical_column) = physical_table_info_value
.table_info
.meta
.schema
.column_schemas
.get(index)
{
logical_column_names.contains(&physical_column.name)
} else {
false
}
})
.cloned()
.collect();
let new_table = DistTable::table(Arc::new(new_table_info));
return Ok(Some(new_table));
}
if channel == Channel::Postgres {
@@ -288,7 +342,7 @@ impl CatalogManager for KvBackendCatalogManager {
}
}
return Ok(None);
Ok(table)
}
async fn tables_by_ids(

View File

@@ -14,6 +14,7 @@
#![feature(assert_matches)]
#![feature(try_blocks)]
#![feature(let_chains)]
use std::any::Any;
use std::fmt::{Debug, Formatter};

View File

@@ -48,6 +48,20 @@ with (
Affected Rows: 0
create table logical_table_3 (
ts timestamp time index,
a string,
z string,
cpu double,
primary key(a, z) -- trigger a physical table change with smaller and bigger column ids
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
);
Affected Rows: 0
show create table logical_table_2;
+-----------------+-------------------------------------------------+
@@ -71,10 +85,22 @@ show create table logical_table_2;
| | ) |
+-----------------+-------------------------------------------------+
select count(*) from logical_table_2;
+----------+
| count(*) |
+----------+
| 0 |
+----------+
drop table logical_table_2;
Affected Rows: 0
drop table logical_table_3;
Affected Rows: 0
drop table metric_engine_partition;
Affected Rows: 0

View File

@@ -36,8 +36,24 @@ with (
on_physical_table = "metric_engine_partition",
);
create table logical_table_3 (
ts timestamp time index,
a string,
z string,
cpu double,
primary key(a, z) -- trigger a physical table change with smaller and bigger column ids
)
engine = metric
with (
on_physical_table = "metric_engine_partition",
);
show create table logical_table_2;
select count(*) from logical_table_2;
drop table logical_table_2;
drop table logical_table_3;
drop table metric_engine_partition;