From 877ce6e8934ba4957475b3847328e045ac9a22b9 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 6 Aug 2025 14:25:32 +0800 Subject: [PATCH] chore: add methods to catalog manager (#6656) * chore/optimize-catalog: ### Add `table_id` Method to `CatalogManager` - **Files Modified**: - `src/catalog/src/kvbackend/manager.rs` - `src/catalog/src/lib.rs` - **Key Changes**: - Introduced a new asynchronous method `table_id` in the `CatalogManager` trait to retrieve the table ID based on catalog, schema, and table name. - Implemented the `table_id` method in `KvBackendCatalogManager` to fetch the table ID from the system catalog or cache, with a fallback to `pg_catalog` for Postgres channels. Signed-off-by: Lei, HUANG * chore/optimize-catalog: ### Add `table_info_by_id` Method to Catalog Managers - **`manager.rs`**: Introduced the `table_info_by_id` method in `KvBackendCatalogManager` to retrieve table information by table ID using the `TableInfoCacheRef`. - **`lib.rs`**: Updated the `CatalogManager` trait to include the new `table_info_by_id` method. - **`memory/manager.rs`**: Implemented the `table_info_by_id` method in `MemoryCatalogManager` to fetch table information by table ID from in-memory catalogs. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/catalog/src/kvbackend/manager.rs | 62 +++++++++++++++++++++++++++- src/catalog/src/lib.rs | 19 ++++++++- src/catalog/src/memory/manager.rs | 14 ++++++- 3 files changed, 91 insertions(+), 4 deletions(-) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 58a3ab0adb..df4489a519 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -23,7 +23,8 @@ use common_catalog::consts::{ }; use common_error::ext::BoxedError; use common_meta::cache::{ - LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef, + LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef, + ViewInfoCacheRef, }; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::flow::FlowMetadataManager; @@ -41,7 +42,7 @@ use session::context::{Channel, QueryContext}; use snafu::prelude::*; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use table::dist_table::DistTable; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfoRef}; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::table_name::TableName; use table::TableRef; @@ -325,6 +326,63 @@ impl CatalogManager for KvBackendCatalogManager { Ok(None) } + async fn table_id( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + query_ctx: Option<&QueryContext>, + ) -> Result> { + let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel()); + if let Some(table) = + self.system_catalog + .table(catalog_name, schema_name, table_name, query_ctx) + { + return Ok(Some(table.table_info().table_id())); + } + + let table_cache: TableNameCacheRef = + self.cache_registry.get().context(CacheNotFoundSnafu { + name: "table_name_cache", + })?; + + let table = table_cache + .get_by_ref(&TableName { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + }) + .await + .context(GetTableCacheSnafu)?; + + if let Some(table) = table { + return Ok(Some(table)); + } + + if channel == Channel::Postgres { + // falldown to pg_catalog + if let Some(table) = + self.system_catalog + .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx) + { + return Ok(Some(table.table_info().table_id())); + } + } + + Ok(None) + } + + async fn table_info_by_id(&self, table_id: TableId) -> Result> { + let table_info_cache: TableInfoCacheRef = + self.cache_registry.get().context(CacheNotFoundSnafu { + name: "table_info_cache", + })?; + table_info_cache + .get_by_ref(&table_id) + .await + .context(GetTableCacheSnafu) + } + async fn tables_by_ids( &self, catalog: &str, diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 01d91d80b1..b664c08b60 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -25,7 +25,7 @@ use common_catalog::consts::{INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME}; use futures::future::BoxFuture; use futures_util::stream::BoxStream; use session::context::QueryContext; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfoRef}; use table::TableRef; use crate::error::Result; @@ -89,6 +89,23 @@ pub trait CatalogManager: Send + Sync { query_ctx: Option<&QueryContext>, ) -> Result>; + /// Returns the table id of provided table ident. + async fn table_id( + &self, + catalog: &str, + schema: &str, + table_name: &str, + query_ctx: Option<&QueryContext>, + ) -> Result> { + Ok(self + .table(catalog, schema, table_name, query_ctx) + .await? + .map(|t| t.table_info().ident.table_id)) + } + + /// Returns the table of provided id. + async fn table_info_by_id(&self, table_id: TableId) -> Result>; + /// Returns the tables by table ids. async fn tables_by_ids( &self, diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index b58d2b1e5f..a377542af6 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -28,7 +28,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use futures_util::stream::BoxStream; use session::context::QueryContext; use snafu::OptionExt; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfoRef}; use table::TableRef; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; @@ -144,6 +144,18 @@ impl CatalogManager for MemoryCatalogManager { Ok(result) } + async fn table_info_by_id(&self, table_id: TableId) -> Result> { + Ok(self + .catalogs + .read() + .unwrap() + .iter() + .flat_map(|(_, schema_entries)| schema_entries.values()) + .flat_map(|tables| tables.values()) + .find(|t| t.table_info().ident.table_id == table_id) + .map(|t| t.table_info())) + } + async fn tables_by_ids( &self, catalog: &str,