diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 58a3ab0adb..df4489a519 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -23,7 +23,8 @@ use common_catalog::consts::{ }; use common_error::ext::BoxedError; use common_meta::cache::{ - LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef, + LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef, + ViewInfoCacheRef, }; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::flow::FlowMetadataManager; @@ -41,7 +42,7 @@ use session::context::{Channel, QueryContext}; use snafu::prelude::*; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use table::dist_table::DistTable; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfoRef}; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::table_name::TableName; use table::TableRef; @@ -325,6 +326,63 @@ impl CatalogManager for KvBackendCatalogManager { Ok(None) } + async fn table_id( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + query_ctx: Option<&QueryContext>, + ) -> Result> { + let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel()); + if let Some(table) = + self.system_catalog + .table(catalog_name, schema_name, table_name, query_ctx) + { + return Ok(Some(table.table_info().table_id())); + } + + let table_cache: TableNameCacheRef = + self.cache_registry.get().context(CacheNotFoundSnafu { + name: "table_name_cache", + })?; + + let table = table_cache + .get_by_ref(&TableName { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + }) + .await + .context(GetTableCacheSnafu)?; + + if let Some(table) = table { + return Ok(Some(table)); + } + + if channel == Channel::Postgres { + // falldown to pg_catalog + if let Some(table) = + self.system_catalog + .table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx) + { + return Ok(Some(table.table_info().table_id())); + } + } + + Ok(None) + } + + async fn table_info_by_id(&self, table_id: TableId) -> Result> { + let table_info_cache: TableInfoCacheRef = + self.cache_registry.get().context(CacheNotFoundSnafu { + name: "table_info_cache", + })?; + table_info_cache + .get_by_ref(&table_id) + .await + .context(GetTableCacheSnafu) + } + async fn tables_by_ids( &self, catalog: &str, diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 01d91d80b1..b664c08b60 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -25,7 +25,7 @@ use common_catalog::consts::{INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME}; use futures::future::BoxFuture; use futures_util::stream::BoxStream; use session::context::QueryContext; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfoRef}; use table::TableRef; use crate::error::Result; @@ -89,6 +89,23 @@ pub trait CatalogManager: Send + Sync { query_ctx: Option<&QueryContext>, ) -> Result>; + /// Returns the table id of provided table ident. + async fn table_id( + &self, + catalog: &str, + schema: &str, + table_name: &str, + query_ctx: Option<&QueryContext>, + ) -> Result> { + Ok(self + .table(catalog, schema, table_name, query_ctx) + .await? + .map(|t| t.table_info().ident.table_id)) + } + + /// Returns the table of provided id. + async fn table_info_by_id(&self, table_id: TableId) -> Result>; + /// Returns the tables by table ids. async fn tables_by_ids( &self, diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index b58d2b1e5f..a377542af6 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -28,7 +28,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use futures_util::stream::BoxStream; use session::context::QueryContext; use snafu::OptionExt; -use table::metadata::TableId; +use table::metadata::{TableId, TableInfoRef}; use table::TableRef; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; @@ -144,6 +144,18 @@ impl CatalogManager for MemoryCatalogManager { Ok(result) } + async fn table_info_by_id(&self, table_id: TableId) -> Result> { + Ok(self + .catalogs + .read() + .unwrap() + .iter() + .flat_map(|(_, schema_entries)| schema_entries.values()) + .flat_map(|tables| tables.values()) + .find(|t| t.table_info().ident.table_id == table_id) + .map(|t| t.table_info())) + } + async fn tables_by_ids( &self, catalog: &str,