diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index ca20805d37..82e1c8f876 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -38,6 +38,7 @@ use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use session::context::{Channel, QueryContext}; use snafu::prelude::*; use table::dist_table::DistTable; +use table::metadata::TableId; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::table_name::TableName; use table::TableRef; @@ -286,6 +287,28 @@ impl CatalogManager for KvBackendCatalogManager { return Ok(None); } + async fn tables_by_ids( + &self, + catalog: &str, + schema: &str, + table_ids: &[TableId], + ) -> Result> { + let table_info_values = self + .table_metadata_manager + .table_info_manager() + .batch_get(table_ids) + .await + .context(TableMetadataManagerSnafu)?; + + let tables = table_info_values + .into_values() + .filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema) + .map(build_table) + .collect::>>()?; + + Ok(tables) + } + fn tables<'a>( &'a self, catalog: &'a str, diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 729ea58724..34884f1355 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -87,6 +87,14 @@ pub trait CatalogManager: Send + Sync { query_ctx: Option<&QueryContext>, ) -> Result>; + /// Returns the tables by table ids. + async fn tables_by_ids( + &self, + catalog: &str, + schema: &str, + table_ids: &[TableId], + ) -> Result>; + /// Returns all tables with a stream by catalog and schema. fn tables<'a>( &'a self, diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index 62ff863c46..9b53a20e3d 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -14,7 +14,7 @@ use std::any::Any; use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock, Weak}; use async_stream::{stream, try_stream}; @@ -28,6 +28,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use futures_util::stream::BoxStream; use session::context::QueryContext; use snafu::OptionExt; +use table::metadata::TableId; use table::TableRef; use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; @@ -143,6 +144,33 @@ impl CatalogManager for MemoryCatalogManager { Ok(result) } + async fn tables_by_ids( + &self, + catalog: &str, + schema: &str, + table_ids: &[TableId], + ) -> Result> { + let catalogs = self.catalogs.read().unwrap(); + + let schemas = catalogs.get(catalog).context(CatalogNotFoundSnafu { + catalog_name: catalog, + })?; + + let tables = schemas + .get(schema) + .context(SchemaNotFoundSnafu { catalog, schema })?; + + let filter_ids: HashSet<_> = table_ids.iter().collect(); + // It is very inefficient, but we do not need to optimize it since it will not be called in `MemoryCatalogManager`. + let tables = tables + .values() + .filter(|t| filter_ids.contains(&t.table_info().table_id())) + .cloned() + .collect::>(); + + Ok(tables) + } + fn tables<'a>( &'a self, catalog: &'a str,