diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index bde323afe7..2c5e83b29e 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::any::Any; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashSet}; use std::sync::{Arc, Weak}; use async_stream::try_stream; @@ -28,7 +28,7 @@ use common_meta::cache::{ use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::table_info::TableInfoValue; +use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::TableNameKey; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; @@ -39,6 +39,7 @@ use moka::sync::Cache; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use session::context::{Channel, QueryContext}; use snafu::prelude::*; +use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use table::dist_table::DistTable; use table::metadata::TableId; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; @@ -142,6 +143,64 @@ impl KvBackendCatalogManager { pub fn procedure_manager(&self) -> Option { self.procedure_manager.clone() } + + // Override logical table's partition key indices with physical table's. + async fn override_logical_table_partition_key_indices( + table_route_cache: &TableRouteCacheRef, + table_info_manager: &TableInfoManager, + table: TableRef, + ) -> Result { + // If the table is not a metric table, return the table directly. + if table.table_info().meta.engine != METRIC_ENGINE_NAME { + return Ok(table); + } + + if let Some(table_route_value) = table_route_cache + .get(table.table_info().table_id()) + .await + .context(TableMetadataManagerSnafu)? + && let TableRoute::Logical(logical_route) = &*table_route_value + && let Some(physical_table_info_value) = table_info_manager + .get(logical_route.physical_table_id()) + .await + .context(TableMetadataManagerSnafu)? + { + let mut new_table_info = (*table.table_info()).clone(); + // Gather all column names from the logical table + let logical_column_names: HashSet<_> = new_table_info + .meta + .schema + .column_schemas() + .iter() + .map(|col| &col.name) + .collect(); + + // Only preserve partition key indices where the corresponding columns exist in logical table + new_table_info.meta.partition_key_indices = physical_table_info_value + .table_info + .meta + .partition_key_indices + .iter() + .filter(|&&index| { + physical_table_info_value + .table_info + .meta + .schema + .column_schemas + .get(index) + .map(|physical_column| logical_column_names.contains(&physical_column.name)) + .unwrap_or(false) + }) + .cloned() + .collect(); + + let new_table = DistTable::table(Arc::new(new_table_info)); + + return Ok(new_table); + } + + Ok(table) + } } #[async_trait::async_trait] @@ -268,10 +327,7 @@ impl CatalogManager for KvBackendCatalogManager { let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu { name: "table_cache", })?; - let table_route_cache: TableRouteCacheRef = - self.cache_registry.get().context(CacheNotFoundSnafu { - name: "table_route_cache", - })?; + let table = table_cache .get_by_ref(&TableName { catalog_name: catalog_name.to_string(), @@ -281,55 +337,18 @@ impl CatalogManager for KvBackendCatalogManager { .await .context(GetTableCacheSnafu)?; - // Override logical table's partition key indices with physical table's. - if let Some(table) = &table - && let Some(table_route_value) = table_route_cache - .get(table.table_info().table_id()) - .await - .context(TableMetadataManagerSnafu)? - && let TableRoute::Logical(logical_route) = &*table_route_value - && let Some(physical_table_info_value) = self - .table_metadata_manager - .table_info_manager() - .get(logical_route.physical_table_id()) - .await - .context(TableMetadataManagerSnafu)? - { - let mut new_table_info = (*table.table_info()).clone(); - // Gather all column names from the logical table - let logical_column_names: std::collections::HashSet<_> = new_table_info - .meta - .schema - .column_schemas() - .iter() - .map(|col| &col.name) - .collect(); - - // Only preserve partition key indices where the corresponding columns exist in logical table - new_table_info.meta.partition_key_indices = physical_table_info_value - .table_info - .meta - .partition_key_indices - .iter() - .filter(|&&index| { - if let Some(physical_column) = physical_table_info_value - .table_info - .meta - .schema - .column_schemas - .get(index) - { - logical_column_names.contains(&physical_column.name) - } else { - false - } - }) - .cloned() - .collect(); - - let new_table = DistTable::table(Arc::new(new_table_info)); - - return Ok(Some(new_table)); + if let Some(table) = table { + let table_route_cache: TableRouteCacheRef = + self.cache_registry.get().context(CacheNotFoundSnafu { + name: "table_route_cache", + })?; + return Self::override_logical_table_partition_key_indices( + &table_route_cache, + self.table_metadata_manager.table_info_manager(), + table, + ) + .await + .map(Some); } if channel == Channel::Postgres { @@ -342,7 +361,7 @@ impl CatalogManager for KvBackendCatalogManager { } } - Ok(table) + Ok(None) } async fn tables_by_ids( @@ -394,8 +413,20 @@ impl CatalogManager for KvBackendCatalogManager { let catalog = catalog.to_string(); let schema = schema.to_string(); let semaphore = Arc::new(Semaphore::new(CONCURRENCY)); + let table_route_cache: Result = + self.cache_registry.get().context(CacheNotFoundSnafu { + name: "table_route_cache", + }); common_runtime::spawn_global(async move { + let table_route_cache = match table_route_cache { + Ok(table_route_cache) => table_route_cache, + Err(e) => { + let _ = tx.send(Err(e)).await; + return; + } + }; + let table_id_stream = metadata_manager .table_name_manager() .tables(&catalog, &schema) @@ -422,6 +453,7 @@ impl CatalogManager for KvBackendCatalogManager { let metadata_manager = metadata_manager.clone(); let tx = tx.clone(); let semaphore = semaphore.clone(); + let table_route_cache = table_route_cache.clone(); common_runtime::spawn_global(async move { // we don't explicitly close the semaphore so just ignore the potential error. let _ = semaphore.acquire().await; @@ -439,6 +471,16 @@ impl CatalogManager for KvBackendCatalogManager { }; for table in table_info_values.into_values().map(build_table) { + let table = if let Ok(table) = table { + Self::override_logical_table_partition_key_indices( + &table_route_cache, + metadata_manager.table_info_manager(), + table, + ) + .await + } else { + table + }; if tx.send(table).await.is_err() { return; }