mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat: get tables by ids in catalog manager (#5645)
feat: get tabels by ids in catalog manager Co-authored-by: jeremy <jeremy@greptime.local>
This commit is contained in:
@@ -38,6 +38,7 @@ use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
|
||||
use session::context::{Channel, QueryContext};
|
||||
use snafu::prelude::*;
|
||||
use table::dist_table::DistTable;
|
||||
use table::metadata::TableId;
|
||||
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
@@ -286,6 +287,28 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
async fn tables_by_ids(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table_ids: &[TableId],
|
||||
) -> Result<Vec<TableRef>> {
|
||||
let table_info_values = self
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.batch_get(table_ids)
|
||||
.await
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
let tables = table_info_values
|
||||
.into_values()
|
||||
.filter(|t| t.table_info.catalog_name == catalog && t.table_info.schema_name == schema)
|
||||
.map(build_table)
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
Ok(tables)
|
||||
}
|
||||
|
||||
fn tables<'a>(
|
||||
&'a self,
|
||||
catalog: &'a str,
|
||||
|
||||
@@ -87,6 +87,14 @@ pub trait CatalogManager: Send + Sync {
|
||||
query_ctx: Option<&QueryContext>,
|
||||
) -> Result<Option<TableRef>>;
|
||||
|
||||
/// Returns the tables by table ids.
|
||||
async fn tables_by_ids(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table_ids: &[TableId],
|
||||
) -> Result<Vec<TableRef>>;
|
||||
|
||||
/// Returns all tables with a stream by catalog and schema.
|
||||
fn tables<'a>(
|
||||
&'a self,
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::{Arc, RwLock, Weak};
|
||||
|
||||
use async_stream::{stream, try_stream};
|
||||
@@ -28,6 +28,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use futures_util::stream::BoxStream;
|
||||
use session::context::QueryContext;
|
||||
use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
|
||||
@@ -143,6 +144,33 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn tables_by_ids(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table_ids: &[TableId],
|
||||
) -> Result<Vec<TableRef>> {
|
||||
let catalogs = self.catalogs.read().unwrap();
|
||||
|
||||
let schemas = catalogs.get(catalog).context(CatalogNotFoundSnafu {
|
||||
catalog_name: catalog,
|
||||
})?;
|
||||
|
||||
let tables = schemas
|
||||
.get(schema)
|
||||
.context(SchemaNotFoundSnafu { catalog, schema })?;
|
||||
|
||||
let filter_ids: HashSet<_> = table_ids.iter().collect();
|
||||
// It is very inefficient, but we do not need to optimize it since it will not be called in `MemoryCatalogManager`.
|
||||
let tables = tables
|
||||
.values()
|
||||
.filter(|t| filter_ids.contains(&t.table_info().table_id()))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(tables)
|
||||
}
|
||||
|
||||
fn tables<'a>(
|
||||
&'a self,
|
||||
catalog: &'a str,
|
||||
|
||||
Reference in New Issue
Block a user