feat: override logical table's partition key indices (#6385)

* feat: Override logical table's partition key indices with physical table's

* chore: by comment

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
jeremyhi
2025-06-27 10:55:56 +08:00
committed by Yingwen
parent c37c4df20d
commit 80fae1c559

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashSet};
use std::sync::{Arc, Weak};
use async_stream::try_stream;
@@ -28,7 +28,7 @@ use common_meta::cache::{
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -39,6 +39,7 @@ use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use table::dist_table::DistTable;
use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
@@ -142,6 +143,64 @@ impl KvBackendCatalogManager {
pub fn procedure_manager(&self) -> Option<ProcedureManagerRef> {
self.procedure_manager.clone()
}
// Override logical table's partition key indices with physical table's.
async fn override_logical_table_partition_key_indices(
table_route_cache: &TableRouteCacheRef,
table_info_manager: &TableInfoManager,
table: TableRef,
) -> Result<TableRef> {
// If the table is not a metric table, return the table directly.
if table.table_info().meta.engine != METRIC_ENGINE_NAME {
return Ok(table);
}
if let Some(table_route_value) = table_route_cache
.get(table.table_info().table_id())
.await
.context(TableMetadataManagerSnafu)?
&& let TableRoute::Logical(logical_route) = &*table_route_value
&& let Some(physical_table_info_value) = table_info_manager
.get(logical_route.physical_table_id())
.await
.context(TableMetadataManagerSnafu)?
{
let mut new_table_info = (*table.table_info()).clone();
// Gather all column names from the logical table
let logical_column_names: HashSet<_> = new_table_info
.meta
.schema
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();
// Only preserve partition key indices where the corresponding columns exist in logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
.meta
.partition_key_indices
.iter()
.filter(|&&index| {
physical_table_info_value
.table_info
.meta
.schema
.column_schemas
.get(index)
.map(|physical_column| logical_column_names.contains(&physical_column.name))
.unwrap_or(false)
})
.cloned()
.collect();
let new_table = DistTable::table(Arc::new(new_table_info));
return Ok(new_table);
}
Ok(table)
}
}
#[async_trait::async_trait]
@@ -268,10 +327,7 @@ impl CatalogManager for KvBackendCatalogManager {
let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_cache",
})?;
let table_route_cache: TableRouteCacheRef =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
})?;
let table = table_cache
.get_by_ref(&TableName {
catalog_name: catalog_name.to_string(),
@@ -281,55 +337,18 @@ impl CatalogManager for KvBackendCatalogManager {
.await
.context(GetTableCacheSnafu)?;
// Override logical table's partition key indices with physical table's.
if let Some(table) = &table
&& let Some(table_route_value) = table_route_cache
.get(table.table_info().table_id())
.await
.context(TableMetadataManagerSnafu)?
&& let TableRoute::Logical(logical_route) = &*table_route_value
&& let Some(physical_table_info_value) = self
.table_metadata_manager
.table_info_manager()
.get(logical_route.physical_table_id())
.await
.context(TableMetadataManagerSnafu)?
{
let mut new_table_info = (*table.table_info()).clone();
// Gather all column names from the logical table
let logical_column_names: std::collections::HashSet<_> = new_table_info
.meta
.schema
.column_schemas()
.iter()
.map(|col| &col.name)
.collect();
// Only preserve partition key indices where the corresponding columns exist in logical table
new_table_info.meta.partition_key_indices = physical_table_info_value
.table_info
.meta
.partition_key_indices
.iter()
.filter(|&&index| {
if let Some(physical_column) = physical_table_info_value
.table_info
.meta
.schema
.column_schemas
.get(index)
{
logical_column_names.contains(&physical_column.name)
} else {
false
}
})
.cloned()
.collect();
let new_table = DistTable::table(Arc::new(new_table_info));
return Ok(Some(new_table));
if let Some(table) = table {
let table_route_cache: TableRouteCacheRef =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
})?;
return Self::override_logical_table_partition_key_indices(
&table_route_cache,
self.table_metadata_manager.table_info_manager(),
table,
)
.await
.map(Some);
}
if channel == Channel::Postgres {
@@ -342,7 +361,7 @@ impl CatalogManager for KvBackendCatalogManager {
}
}
Ok(table)
Ok(None)
}
async fn tables_by_ids(
@@ -394,8 +413,20 @@ impl CatalogManager for KvBackendCatalogManager {
let catalog = catalog.to_string();
let schema = schema.to_string();
let semaphore = Arc::new(Semaphore::new(CONCURRENCY));
let table_route_cache: Result<TableRouteCacheRef> =
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_route_cache",
});
common_runtime::spawn_global(async move {
let table_route_cache = match table_route_cache {
Ok(table_route_cache) => table_route_cache,
Err(e) => {
let _ = tx.send(Err(e)).await;
return;
}
};
let table_id_stream = metadata_manager
.table_name_manager()
.tables(&catalog, &schema)
@@ -422,6 +453,7 @@ impl CatalogManager for KvBackendCatalogManager {
let metadata_manager = metadata_manager.clone();
let tx = tx.clone();
let semaphore = semaphore.clone();
let table_route_cache = table_route_cache.clone();
common_runtime::spawn_global(async move {
// we don't explicitly close the semaphore so just ignore the potential error.
let _ = semaphore.acquire().await;
@@ -439,6 +471,16 @@ impl CatalogManager for KvBackendCatalogManager {
};
for table in table_info_values.into_values().map(build_table) {
let table = if let Ok(table) = table {
Self::override_logical_table_partition_key_indices(
&table_route_cache,
metadata_manager.table_info_manager(),
table,
)
.await
} else {
table
};
if tx.send(table).await.is_err() {
return;
}