mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 06:42:57 +00:00
chore: add methods to catalog manager (#6656)
* chore/optimize-catalog: ### Add `table_id` Method to `CatalogManager` - **Files Modified**: - `src/catalog/src/kvbackend/manager.rs` - `src/catalog/src/lib.rs` - **Key Changes**: - Introduced a new asynchronous method `table_id` in the `CatalogManager` trait to retrieve the table ID based on catalog, schema, and table name. - Implemented the `table_id` method in `KvBackendCatalogManager` to fetch the table ID from the system catalog or cache, with a fallback to `pg_catalog` for Postgres channels. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/optimize-catalog: ### Add `table_info_by_id` Method to Catalog Managers - **`manager.rs`**: Introduced the `table_info_by_id` method in `KvBackendCatalogManager` to retrieve table information by table ID using the `TableInfoCacheRef`. - **`lib.rs`**: Updated the `CatalogManager` trait to include the new `table_info_by_id` method. - **`memory/manager.rs`**: Implemented the `table_info_by_id` method in `MemoryCatalogManager` to fetch table information by table ID from in-memory catalogs. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -23,7 +23,8 @@ use common_catalog::consts::{
|
||||
};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache::{
|
||||
LayeredCacheRegistryRef, TableRoute, TableRouteCacheRef, ViewInfoCacheRef,
|
||||
LayeredCacheRegistryRef, TableInfoCacheRef, TableNameCacheRef, TableRoute, TableRouteCacheRef,
|
||||
ViewInfoCacheRef,
|
||||
};
|
||||
use common_meta::key::catalog_name::CatalogNameKey;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
@@ -41,7 +42,7 @@ use session::context::{Channel, QueryContext};
|
||||
use snafu::prelude::*;
|
||||
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
|
||||
use table::dist_table::DistTable;
|
||||
use table::metadata::TableId;
|
||||
use table::metadata::{TableId, TableInfoRef};
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
@@ -325,6 +326,63 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn table_id(
|
||||
&self,
|
||||
catalog_name: &str,
|
||||
schema_name: &str,
|
||||
table_name: &str,
|
||||
query_ctx: Option<&QueryContext>,
|
||||
) -> Result<Option<TableId>> {
|
||||
let channel = query_ctx.map_or(Channel::Unknown, |ctx| ctx.channel());
|
||||
if let Some(table) =
|
||||
self.system_catalog
|
||||
.table(catalog_name, schema_name, table_name, query_ctx)
|
||||
{
|
||||
return Ok(Some(table.table_info().table_id()));
|
||||
}
|
||||
|
||||
let table_cache: TableNameCacheRef =
|
||||
self.cache_registry.get().context(CacheNotFoundSnafu {
|
||||
name: "table_name_cache",
|
||||
})?;
|
||||
|
||||
let table = table_cache
|
||||
.get_by_ref(&TableName {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
})
|
||||
.await
|
||||
.context(GetTableCacheSnafu)?;
|
||||
|
||||
if let Some(table) = table {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
|
||||
if channel == Channel::Postgres {
|
||||
// falldown to pg_catalog
|
||||
if let Some(table) =
|
||||
self.system_catalog
|
||||
.table(catalog_name, PG_CATALOG_NAME, table_name, query_ctx)
|
||||
{
|
||||
return Ok(Some(table.table_info().table_id()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>> {
|
||||
let table_info_cache: TableInfoCacheRef =
|
||||
self.cache_registry.get().context(CacheNotFoundSnafu {
|
||||
name: "table_info_cache",
|
||||
})?;
|
||||
table_info_cache
|
||||
.get_by_ref(&table_id)
|
||||
.await
|
||||
.context(GetTableCacheSnafu)
|
||||
}
|
||||
|
||||
async fn tables_by_ids(
|
||||
&self,
|
||||
catalog: &str,
|
||||
|
||||
@@ -25,7 +25,7 @@ use common_catalog::consts::{INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME};
|
||||
use futures::future::BoxFuture;
|
||||
use futures_util::stream::BoxStream;
|
||||
use session::context::QueryContext;
|
||||
use table::metadata::TableId;
|
||||
use table::metadata::{TableId, TableInfoRef};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -89,6 +89,23 @@ pub trait CatalogManager: Send + Sync {
|
||||
query_ctx: Option<&QueryContext>,
|
||||
) -> Result<Option<TableRef>>;
|
||||
|
||||
/// Returns the table id of provided table ident.
|
||||
async fn table_id(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table_name: &str,
|
||||
query_ctx: Option<&QueryContext>,
|
||||
) -> Result<Option<TableId>> {
|
||||
Ok(self
|
||||
.table(catalog, schema, table_name, query_ctx)
|
||||
.await?
|
||||
.map(|t| t.table_info().ident.table_id))
|
||||
}
|
||||
|
||||
/// Returns the table of provided id.
|
||||
async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>>;
|
||||
|
||||
/// Returns the tables by table ids.
|
||||
async fn tables_by_ids(
|
||||
&self,
|
||||
|
||||
@@ -28,7 +28,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use futures_util::stream::BoxStream;
|
||||
use session::context::QueryContext;
|
||||
use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
use table::metadata::{TableId, TableInfoRef};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
|
||||
@@ -144,6 +144,18 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn table_info_by_id(&self, table_id: TableId) -> Result<Option<TableInfoRef>> {
|
||||
Ok(self
|
||||
.catalogs
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.flat_map(|(_, schema_entries)| schema_entries.values())
|
||||
.flat_map(|tables| tables.values())
|
||||
.find(|t| t.table_info().ident.table_id == table_id)
|
||||
.map(|t| t.table_info()))
|
||||
}
|
||||
|
||||
async fn tables_by_ids(
|
||||
&self,
|
||||
catalog: &str,
|
||||
|
||||
Reference in New Issue
Block a user