feat: tables stream with CatalogManager (#3180)

* feat: add tables for CatalogManager

* feat: replace table with tables

* Update src/catalog/src/information_schema/columns.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/catalog/src/information_schema/columns.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/catalog/src/information_schema/tables.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/catalog/src/information_schema/tables.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* feat: tables for MemoryCatalogManager

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
JeremyHi
2024-01-17 10:31:15 +08:00
committed by GitHub
parent 7a1b856dfb
commit c6c4ea5e64
5 changed files with 206 additions and 113 deletions

View File

@@ -31,6 +31,7 @@ use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
@@ -183,37 +184,29 @@ impl InformationSchemaColumnsBuilder {
continue;
}
for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};
while let Some(table) = stream.try_next().await? {
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table_name,
semantic_type,
column,
);
}
} else {
unreachable!();
for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};
self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table.table_info().name,
semantic_type,
column,
);
}
}
}

View File

@@ -27,6 +27,7 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
@@ -166,27 +167,19 @@ impl InformationSchemaTablesBuilder {
continue;
}
for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let table_info = table.table_info();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
&table_name,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
} else {
unreachable!();
}
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
&table_info.name,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
}
}

View File

@@ -16,17 +16,20 @@ use std::any::Any;
use std::collections::BTreeSet;
use std::sync::{Arc, Weak};
use async_stream::try_stream;
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::TryStreamExt;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
use table::dist_table::DistTable;
@@ -58,19 +61,27 @@ pub struct KvBackendCatalogManager {
system_catalog: SystemCatalog,
}
fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
let table_info = table_info_value
.table_info
.try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?;
Ok(DistTable::table(Arc::new(table_info)))
}
#[async_trait::async_trait]
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.await
}
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_id(ctx, table_id)
.await
}
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.await
}
}
impl KvBackendCatalogManager {
@@ -101,6 +112,10 @@ impl KvBackendCatalogManager {
#[async_trait::async_trait]
impl CatalogManager for KvBackendCatalogManager {
fn as_any(&self) -> &dyn Any {
self
}
async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
let stream = self
.table_metadata_manager
@@ -219,17 +234,56 @@ impl CatalogManager for KvBackendCatalogManager {
else {
return Ok(None);
};
let table_info = Arc::new(
table_info_value
.table_info
.try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?,
);
Ok(Some(DistTable::table(table_info)))
make_table(table_info_value).map(Some)
}
fn as_any(&self) -> &dyn Any {
self
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, CatalogResult<TableRef>> {
let sys_tables = try_stream!({
// System tables
let sys_table_names = self.system_catalog.table_names(schema);
for table_name in sys_table_names {
if let Some(table) = self.system_catalog.table(catalog, schema, &table_name) {
yield table;
}
}
});
let table_id_stream = self
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema)
.await
.map_ok(|(_, v)| v.table_id());
const BATCH_SIZE: usize = 128;
let user_tables = try_stream!({
// Split table ids into chunks
let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);
while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(BoxedError::new)
.context(ListTablesSnafu { catalog, schema })?;
let table_info_values = self
.table_metadata_manager
.table_info_manager()
.batch_get(&table_ids)
.await
.context(TableMetadataManagerSnafu)?;
for table_info_value in table_info_values.into_values() {
yield make_table(table_info_value)?;
}
}
});
Box::pin(sys_tables.chain(user_tables))
}
}

View File

@@ -20,6 +20,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use futures::future::BoxFuture;
use futures_util::stream::BoxStream;
use table::metadata::TableId;
use table::requests::CreateTableRequest;
use table::TableRef;
@@ -56,6 +57,13 @@ pub trait CatalogManager: Send + Sync {
schema: &str,
table_name: &str,
) -> Result<Option<TableRef>>;
/// Returns all tables with a stream by catalog and schema.
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>>;
}
pub type CatalogManagerRef = Arc<dyn CatalogManager>;

View File

@@ -17,10 +17,12 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, RwLock, Weak};
use async_stream::{stream, try_stream};
use common_catalog::build_db_string;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
};
use futures_util::stream::BoxStream;
use snafu::OptionExt;
use table::TableRef;
@@ -39,10 +41,64 @@ pub struct MemoryCatalogManager {
#[async_trait::async_trait]
impl CatalogManager for MemoryCatalogManager {
fn as_any(&self) -> &dyn Any {
self
}
async fn catalog_names(&self) -> Result<Vec<String>> {
Ok(self.catalogs.read().unwrap().keys().cloned().collect())
}
async fn schema_names(&self, catalog: &str) -> Result<Vec<String>> {
Ok(self
.catalogs
.read()
.unwrap()
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.keys()
.cloned()
.collect())
}
async fn table_names(&self, catalog: &str, schema: &str) -> Result<Vec<String>> {
Ok(self
.catalogs
.read()
.unwrap()
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.get(schema)
.with_context(|| SchemaNotFoundSnafu { catalog, schema })?
.keys()
.cloned()
.collect())
}
async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
self.catalog_exist_sync(catalog)
}
async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
self.schema_exist_sync(catalog, schema)
}
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
let catalogs = self.catalogs.read().unwrap();
Ok(catalogs
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.get(schema)
.with_context(|| SchemaNotFoundSnafu { catalog, schema })?
.contains_key(table))
}
async fn table(
&self,
catalog: &str,
@@ -61,57 +117,35 @@ impl CatalogManager for MemoryCatalogManager {
Ok(result)
}
async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
self.catalog_exist_sync(catalog)
}
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>> {
let catalogs = self.catalogs.read().unwrap();
Ok(catalogs
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.get(schema)
.with_context(|| SchemaNotFoundSnafu { catalog, schema })?
.contains_key(table))
}
async fn catalog_names(&self) -> Result<Vec<String>> {
Ok(self.catalogs.read().unwrap().keys().cloned().collect())
}
let Some(schemas) = catalogs.get(catalog) else {
return Box::pin(stream!({
yield CatalogNotFoundSnafu {
catalog_name: catalog,
}
.fail();
}));
};
async fn schema_names(&self, catalog_name: &str) -> Result<Vec<String>> {
Ok(self
.catalogs
.read()
.unwrap()
.get(catalog_name)
.with_context(|| CatalogNotFoundSnafu { catalog_name })?
.keys()
.cloned()
.collect())
}
let Some(tables) = schemas.get(schema) else {
return Box::pin(stream!({
yield SchemaNotFoundSnafu { catalog, schema }.fail();
}));
};
async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result<Vec<String>> {
Ok(self
.catalogs
.read()
.unwrap()
.get(catalog_name)
.with_context(|| CatalogNotFoundSnafu { catalog_name })?
.get(schema_name)
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?
.keys()
.cloned()
.collect())
}
let tables = tables.values().cloned().collect::<Vec<_>>();
fn as_any(&self) -> &dyn Any {
self
return Box::pin(try_stream!({
for table in tables {
yield table;
}
}));
}
}
@@ -307,6 +341,7 @@ pub fn new_memory_catalog_manager() -> Result<Arc<MemoryCatalogManager>> {
#[cfg(test)]
mod tests {
use common_catalog::consts::*;
use futures_util::TryStreamExt;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use super::*;
@@ -331,8 +366,18 @@ mod tests {
NUMBERS_TABLE_NAME,
)
.await
.unwrap()
.unwrap();
let _ = table.unwrap();
let stream = catalog_list
.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await;
let tables = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(tables.len(), 1);
assert_eq!(
table.table_info().table_id(),
tables[0].table_info().table_id()
);
assert!(catalog_list
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "not_exists")
.await