From c6c4ea5e64e89a983e0692e0c3ff88b32c009ddc Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Wed, 17 Jan 2024 10:31:15 +0800 Subject: [PATCH] feat: tables stream with CatalogManager (#3180) * feat: add tables for CatalogManager * feat: replace table with tables * Update src/catalog/src/information_schema/columns.rs Co-authored-by: Weny Xu * Update src/catalog/src/information_schema/columns.rs Co-authored-by: Weny Xu * Update src/catalog/src/information_schema/tables.rs Co-authored-by: Weny Xu * Update src/catalog/src/information_schema/tables.rs Co-authored-by: Weny Xu * feat: tables for MemoryCatalogManager --------- Co-authored-by: Weny Xu --- src/catalog/src/information_schema/columns.rs | 51 +++---- src/catalog/src/information_schema/tables.rs | 35 ++--- src/catalog/src/kvbackend/manager.rs | 86 +++++++++-- src/catalog/src/lib.rs | 8 + src/catalog/src/memory/manager.rs | 139 ++++++++++++------ 5 files changed, 206 insertions(+), 113 deletions(-) diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 5ca12eb73f..dbae865383 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -31,6 +31,7 @@ use datatypes::scalars::ScalarVectorBuilder; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::value::Value; use datatypes::vectors::{StringVectorBuilder, VectorRef}; +use futures::TryStreamExt; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ScanRequest, TableId}; @@ -183,37 +184,29 @@ impl InformationSchemaColumnsBuilder { continue; } - for table_name in catalog_manager - .table_names(&catalog_name, &schema_name) - .await? - { - if let Some(table) = catalog_manager - .table(&catalog_name, &schema_name, &table_name) - .await? - { - let keys = &table.table_info().meta.primary_key_indices; - let schema = table.schema(); + let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; - for (idx, column) in schema.column_schemas().iter().enumerate() { - let semantic_type = if column.is_time_index() { - SEMANTIC_TYPE_TIME_INDEX - } else if keys.contains(&idx) { - SEMANTIC_TYPE_PRIMARY_KEY - } else { - SEMANTIC_TYPE_FIELD - }; + while let Some(table) = stream.try_next().await? { + let keys = &table.table_info().meta.primary_key_indices; + let schema = table.schema(); - self.add_column( - &predicates, - &catalog_name, - &schema_name, - &table_name, - semantic_type, - column, - ); - } - } else { - unreachable!(); + for (idx, column) in schema.column_schemas().iter().enumerate() { + let semantic_type = if column.is_time_index() { + SEMANTIC_TYPE_TIME_INDEX + } else if keys.contains(&idx) { + SEMANTIC_TYPE_PRIMARY_KEY + } else { + SEMANTIC_TYPE_FIELD + }; + + self.add_column( + &predicates, + &catalog_name, + &schema_name, + &table.table_info().name, + semantic_type, + column, + ); } } } diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 5320e50277..454dbf9b88 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -27,6 +27,7 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::value::Value; use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; +use futures::TryStreamExt; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ScanRequest, TableId}; use table::metadata::TableType; @@ -166,27 +167,19 @@ impl InformationSchemaTablesBuilder { continue; } - for table_name in catalog_manager - .table_names(&catalog_name, &schema_name) - .await? - { - if let Some(table) = catalog_manager - .table(&catalog_name, &schema_name, &table_name) - .await? - { - let table_info = table.table_info(); - self.add_table( - &predicates, - &catalog_name, - &schema_name, - &table_name, - table.table_type(), - Some(table_info.ident.table_id), - Some(&table_info.meta.engine), - ); - } else { - unreachable!(); - } + let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; + + while let Some(table) = stream.try_next().await? { + let table_info = table.table_info(); + self.add_table( + &predicates, + &catalog_name, + &schema_name, + &table_info.name, + table.table_type(), + Some(table_info.ident.table_id), + Some(&table_info.meta.engine), + ); } } diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 224c1c0121..f6f8bbc0f8 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -16,17 +16,20 @@ use std::any::Any; use std::collections::BTreeSet; use std::sync::{Arc, Weak}; +use async_stream::try_stream; use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context}; use common_meta::error::Result as MetaResult; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; +use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_name::TableNameKey; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::table_name::TableName; -use futures_util::TryStreamExt; +use futures_util::stream::BoxStream; +use futures_util::{StreamExt, TryStreamExt}; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use snafu::prelude::*; use table::dist_table::DistTable; @@ -58,19 +61,27 @@ pub struct KvBackendCatalogManager { system_catalog: SystemCatalog, } +fn make_table(table_info_value: TableInfoValue) -> CatalogResult { + let table_info = table_info_value + .table_info + .try_into() + .context(catalog_err::InvalidTableInfoInCatalogSnafu)?; + Ok(DistTable::table(Arc::new(table_info))) +} + #[async_trait::async_trait] impl CacheInvalidator for KvBackendCatalogManager { - async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { - self.cache_invalidator - .invalidate_table_name(ctx, table_name) - .await - } - async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> { self.cache_invalidator .invalidate_table_id(ctx, table_id) .await } + + async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { + self.cache_invalidator + .invalidate_table_name(ctx, table_name) + .await + } } impl KvBackendCatalogManager { @@ -101,6 +112,10 @@ impl KvBackendCatalogManager { #[async_trait::async_trait] impl CatalogManager for KvBackendCatalogManager { + fn as_any(&self) -> &dyn Any { + self + } + async fn catalog_names(&self) -> CatalogResult> { let stream = self .table_metadata_manager @@ -219,17 +234,56 @@ impl CatalogManager for KvBackendCatalogManager { else { return Ok(None); }; - let table_info = Arc::new( - table_info_value - .table_info - .try_into() - .context(catalog_err::InvalidTableInfoInCatalogSnafu)?, - ); - Ok(Some(DistTable::table(table_info))) + make_table(table_info_value).map(Some) } - fn as_any(&self) -> &dyn Any { - self + async fn tables<'a>( + &'a self, + catalog: &'a str, + schema: &'a str, + ) -> BoxStream<'a, CatalogResult> { + let sys_tables = try_stream!({ + // System tables + let sys_table_names = self.system_catalog.table_names(schema); + for table_name in sys_table_names { + if let Some(table) = self.system_catalog.table(catalog, schema, &table_name) { + yield table; + } + } + }); + + let table_id_stream = self + .table_metadata_manager + .table_name_manager() + .tables(catalog, schema) + .await + .map_ok(|(_, v)| v.table_id()); + const BATCH_SIZE: usize = 128; + let user_tables = try_stream!({ + // Split table ids into chunks + let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE); + + while let Some(table_ids) = table_id_chunks.next().await { + let table_ids = table_ids + .into_iter() + .collect::, _>>() + .map_err(BoxedError::new) + .context(ListTablesSnafu { catalog, schema })?; + + let table_info_values = self + .table_metadata_manager + .table_info_manager() + .batch_get(&table_ids) + .await + .context(TableMetadataManagerSnafu)?; + + for table_info_value in table_info_values.into_values() { + yield make_table(table_info_value)?; + } + } + }); + + Box::pin(sys_tables.chain(user_tables)) } } diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 98a0bdfd13..6864748881 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -20,6 +20,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use futures::future::BoxFuture; +use futures_util::stream::BoxStream; use table::metadata::TableId; use table::requests::CreateTableRequest; use table::TableRef; @@ -56,6 +57,13 @@ pub trait CatalogManager: Send + Sync { schema: &str, table_name: &str, ) -> Result>; + + /// Returns all tables with a stream by catalog and schema. + async fn tables<'a>( + &'a self, + catalog: &'a str, + schema: &'a str, + ) -> BoxStream<'a, Result>; } pub type CatalogManagerRef = Arc; diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index dea21bb7ac..7139c82569 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -17,10 +17,12 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::{Arc, RwLock, Weak}; +use async_stream::{stream, try_stream}; use common_catalog::build_db_string; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, }; +use futures_util::stream::BoxStream; use snafu::OptionExt; use table::TableRef; @@ -39,10 +41,64 @@ pub struct MemoryCatalogManager { #[async_trait::async_trait] impl CatalogManager for MemoryCatalogManager { + fn as_any(&self) -> &dyn Any { + self + } + + async fn catalog_names(&self) -> Result> { + Ok(self.catalogs.read().unwrap().keys().cloned().collect()) + } + + async fn schema_names(&self, catalog: &str) -> Result> { + Ok(self + .catalogs + .read() + .unwrap() + .get(catalog) + .with_context(|| CatalogNotFoundSnafu { + catalog_name: catalog, + })? + .keys() + .cloned() + .collect()) + } + + async fn table_names(&self, catalog: &str, schema: &str) -> Result> { + Ok(self + .catalogs + .read() + .unwrap() + .get(catalog) + .with_context(|| CatalogNotFoundSnafu { + catalog_name: catalog, + })? + .get(schema) + .with_context(|| SchemaNotFoundSnafu { catalog, schema })? + .keys() + .cloned() + .collect()) + } + + async fn catalog_exists(&self, catalog: &str) -> Result { + self.catalog_exist_sync(catalog) + } + async fn schema_exists(&self, catalog: &str, schema: &str) -> Result { self.schema_exist_sync(catalog, schema) } + async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result { + let catalogs = self.catalogs.read().unwrap(); + Ok(catalogs + .get(catalog) + .with_context(|| CatalogNotFoundSnafu { + catalog_name: catalog, + })? + .get(schema) + .with_context(|| SchemaNotFoundSnafu { catalog, schema })? + .contains_key(table)) + } + async fn table( &self, catalog: &str, @@ -61,57 +117,35 @@ impl CatalogManager for MemoryCatalogManager { Ok(result) } - async fn catalog_exists(&self, catalog: &str) -> Result { - self.catalog_exist_sync(catalog) - } - - async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result { + async fn tables<'a>( + &'a self, + catalog: &'a str, + schema: &'a str, + ) -> BoxStream<'a, Result> { let catalogs = self.catalogs.read().unwrap(); - Ok(catalogs - .get(catalog) - .with_context(|| CatalogNotFoundSnafu { - catalog_name: catalog, - })? - .get(schema) - .with_context(|| SchemaNotFoundSnafu { catalog, schema })? - .contains_key(table)) - } - async fn catalog_names(&self) -> Result> { - Ok(self.catalogs.read().unwrap().keys().cloned().collect()) - } + let Some(schemas) = catalogs.get(catalog) else { + return Box::pin(stream!({ + yield CatalogNotFoundSnafu { + catalog_name: catalog, + } + .fail(); + })); + }; - async fn schema_names(&self, catalog_name: &str) -> Result> { - Ok(self - .catalogs - .read() - .unwrap() - .get(catalog_name) - .with_context(|| CatalogNotFoundSnafu { catalog_name })? - .keys() - .cloned() - .collect()) - } + let Some(tables) = schemas.get(schema) else { + return Box::pin(stream!({ + yield SchemaNotFoundSnafu { catalog, schema }.fail(); + })); + }; - async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result> { - Ok(self - .catalogs - .read() - .unwrap() - .get(catalog_name) - .with_context(|| CatalogNotFoundSnafu { catalog_name })? - .get(schema_name) - .with_context(|| SchemaNotFoundSnafu { - catalog: catalog_name, - schema: schema_name, - })? - .keys() - .cloned() - .collect()) - } + let tables = tables.values().cloned().collect::>(); - fn as_any(&self) -> &dyn Any { - self + return Box::pin(try_stream!({ + for table in tables { + yield table; + } + })); } } @@ -307,6 +341,7 @@ pub fn new_memory_catalog_manager() -> Result> { #[cfg(test)] mod tests { use common_catalog::consts::*; + use futures_util::TryStreamExt; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use super::*; @@ -331,8 +366,18 @@ mod tests { NUMBERS_TABLE_NAME, ) .await + .unwrap() .unwrap(); - let _ = table.unwrap(); + let stream = catalog_list + .tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .await; + let tables = stream.try_collect::>().await.unwrap(); + assert_eq!(tables.len(), 1); + assert_eq!( + table.table_info().table_id(), + tables[0].table_info().table_id() + ); + assert!(catalog_list .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "not_exists") .await