From 2a6c830ca74596645951bc5634f4fe454cf2a1c5 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 29 Aug 2023 11:43:43 +0800 Subject: [PATCH] refactor(table): remove Table impl for system (#2270) * refactor(table): remove Table impl for system Signed-off-by: Zhenchi * fix: format & import Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/catalog/src/local/manager.rs | 2 +- src/catalog/src/system.rs | 150 +++++++++++++++++-------------- src/catalog/src/tables.rs | 34 ++----- 3 files changed, 90 insertions(+), 96 deletions(-) diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index b5945ca758..eab49873c0 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -136,7 +136,7 @@ impl LocalCatalogManager { schema: INFORMATION_SCHEMA_NAME.to_string(), table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), table_id: SYSTEM_CATALOG_TABLE_ID, - table: self.system.information_schema.system.clone(), + table: self.system.information_schema.system.as_table_ref(), }; self.catalogs.register_table(register_table_req).await?; diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index fc4b43745e..c4c9d654ea 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; use std::collections::HashMap; use std::sync::Arc; @@ -21,24 +20,23 @@ use common_catalog::consts::{ SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_ID, SYSTEM_CATALOG_TABLE_NAME, }; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::debug; +use common_telemetry::{debug, warn}; use common_time::util; use datatypes::prelude::{ConcreteDataType, ScalarVector, VectorRef}; -use datatypes::schema::{ColumnSchema, RawSchema, SchemaRef}; +use datatypes::schema::{ColumnSchema, RawSchema}; use datatypes::vectors::{BinaryVector, TimestampMillisecondVector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::ScanRequest; use table::engine::{EngineContext, TableEngineRef}; -use table::metadata::{TableId, TableInfoRef, TableType}; -use table::requests::{ - CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest, TableOptions, -}; -use table::{Result as TableResult, Table, TableRef}; +use table::metadata::TableId; +use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest, TableOptions}; +use table::TableRef; use crate::error::{ - self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu, - OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu, + self, CreateSystemCatalogSnafu, DeregisterTableSnafu, EmptyValueSnafu, Error, + InsertCatalogRecordSnafu, InvalidEntryTypeSnafu, InvalidKeySnafu, OpenSystemCatalogSnafu, + Result, ValueDeserializeSnafu, }; use crate::DeregisterTableRequest; @@ -48,42 +46,6 @@ pub const VALUE_INDEX: usize = 3; pub struct SystemCatalogTable(TableRef); -#[async_trait::async_trait] -impl Table for SystemCatalogTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.0.schema() - } - - async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { - self.0.scan_to_stream(request).await - } - - /// Insert values into table. - async fn insert(&self, request: InsertRequest) -> TableResult { - self.0.insert(request).await - } - - fn table_info(&self) -> TableInfoRef { - self.0.table_info() - } - - fn table_type(&self) -> TableType { - self.0.table_type() - } - - async fn delete(&self, request: DeleteRequest) -> TableResult { - self.0.delete(request).await - } - - fn statistics(&self) -> Option { - self.0.statistics() - } -} - impl SystemCatalogTable { pub async fn new(engine: TableEngineRef) -> Result { let request = OpenTableRequest { @@ -126,6 +88,54 @@ impl SystemCatalogTable { } } + pub async fn register_table( + &self, + catalog: String, + schema: String, + table_name: String, + table_id: TableId, + engine: String, + ) -> Result { + let insert_request = + build_table_insert_request(catalog, schema, table_name, table_id, engine); + self.0 + .insert(insert_request) + .await + .context(InsertCatalogRecordSnafu) + } + + pub(crate) async fn deregister_table( + &self, + request: &DeregisterTableRequest, + table_id: TableId, + ) -> Result<()> { + let deletion_request = build_table_deletion_request(request, table_id); + self.0 + .insert(deletion_request) + .await + .map(|x| { + if x != 1 { + let table = common_catalog::format_full_table_name( + &request.catalog, + &request.schema, + &request.table_name + ); + warn!("Failed to delete table record from information_schema, unexpected returned result: {x}, table: {table}"); + } + }) + .with_context(|_| DeregisterTableSnafu { + request: request.clone(), + }) + } + + pub async fn register_schema(&self, catalog: String, schema: String) -> Result { + let insert_request = build_schema_insert_request(catalog, schema); + self.0 + .insert(insert_request) + .await + .context(InsertCatalogRecordSnafu) + } + /// Create a stream of all entries inside system catalog table pub async fn records(&self) -> Result { let full_projection = None; @@ -137,11 +147,16 @@ impl SystemCatalogTable { limit: None, }; let stream = self + .0 .scan_to_stream(scan_req) .await .context(error::SystemCatalogTableScanSnafu)?; Ok(stream) } + + pub fn as_table_ref(&self) -> TableRef { + self.0.clone() + } } /// Build system catalog table schema. @@ -541,14 +556,14 @@ mod tests { async fn test_system_table_type() { let (_dir, table_engine) = prepare_table_engine().await; let system_table = SystemCatalogTable::new(table_engine).await.unwrap(); - assert_eq!(Base, system_table.table_type()); + assert_eq!(Base, system_table.as_table_ref().table_type()); } #[tokio::test] async fn test_system_table_info() { let (_dir, table_engine) = prepare_table_engine().await; let system_table = SystemCatalogTable::new(table_engine).await.unwrap(); - let info = system_table.table_info(); + let info = system_table.as_table_ref().table_info(); assert_eq!(TableType::Base, info.table_type); assert_eq!(SYSTEM_CATALOG_TABLE_NAME, info.name); assert_eq!(SYSTEM_CATALOG_TABLE_ID, info.ident.table_id); @@ -561,14 +576,16 @@ mod tests { let (_, table_engine) = prepare_table_engine().await; let catalog_table = SystemCatalogTable::new(table_engine).await.unwrap(); - let table_insertion = build_table_insert_request( - DEFAULT_CATALOG_NAME.to_string(), - DEFAULT_SCHEMA_NAME.to_string(), - "my_table".to_string(), - 1, - MITO_ENGINE.to_string(), - ); - let result = catalog_table.insert(table_insertion).await.unwrap(); + let result = catalog_table + .register_table( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + "my_table".to_string(), + 1, + MITO_ENGINE.to_string(), + ) + .await + .unwrap(); assert_eq!(result, 1); let records = catalog_table.records().await.unwrap(); @@ -598,16 +615,17 @@ mod tests { }); assert_eq!(entry, expected); - let table_deletion = build_table_deletion_request( - &DeregisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: "my_table".to_string(), - }, - 1, - ); - let result = catalog_table.insert(table_deletion).await.unwrap(); - assert_eq!(result, 1); + catalog_table + .deregister_table( + &DeregisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "my_table".to_string(), + }, + 1, + ) + .await + .unwrap(); let records = catalog_table.records().await.unwrap(); let batches = RecordBatches::try_collect(records).await.unwrap().take(); diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 4ba085aed1..7efa1fba06 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -16,16 +16,9 @@ use std::sync::Arc; -use common_telemetry::logging; -use snafu::ResultExt; use table::metadata::TableId; -use table::Table; -use crate::error::{self, InsertCatalogRecordSnafu, Result as CatalogResult}; -use crate::system::{ - build_schema_insert_request, build_table_deletion_request, build_table_insert_request, - SystemCatalogTable, -}; +use crate::system::SystemCatalogTable; use crate::DeregisterTableRequest; pub struct InformationSchema { @@ -54,36 +47,21 @@ impl SystemCatalog { table_id: TableId, engine: String, ) -> crate::error::Result { - let request = build_table_insert_request(catalog, schema, table_name, table_id, engine); self.information_schema .system - .insert(request) + .register_table(catalog, schema, table_name, table_id, engine) .await - .context(InsertCatalogRecordSnafu) } pub(crate) async fn deregister_table( &self, request: &DeregisterTableRequest, table_id: TableId, - ) -> CatalogResult<()> { + ) -> crate::error::Result<()> { self.information_schema .system - .insert(build_table_deletion_request(request, table_id)) + .deregister_table(request, table_id) .await - .map(|x| { - if x != 1 { - let table = common_catalog::format_full_table_name( - &request.catalog, - &request.schema, - &request.table_name - ); - logging::warn!("Failed to delete table record from information_schema, unexpected returned result: {x}, table: {table}"); - } - }) - .with_context(|_| error::DeregisterTableSnafu { - request: request.clone(), - }) } pub async fn register_schema( @@ -91,11 +69,9 @@ impl SystemCatalog { catalog: String, schema: String, ) -> crate::error::Result { - let request = build_schema_insert_request(catalog, schema); self.information_schema .system - .insert(request) + .register_schema(catalog, schema) .await - .context(InsertCatalogRecordSnafu) } }