From fb9978e95d0d1f1f6b6136a1973376cdf138cdfb Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 26 Apr 2023 16:36:40 +0800 Subject: [PATCH] refactor: catalog (#1454) * wip * add schema_async * remove CatalogList * remove catalog provider and schema provider * fix * fix: rename table * fix: sqlness * fix: ignore tonic error metadata * fix: table engine name * feat: rename catalog_async to catalog * respect engine name in table regional value when deregistering tables * fix: CR --- src/catalog/src/datafusion.rs | 15 - src/catalog/src/datafusion/catalog_adapter.rs | 324 ---------- src/catalog/src/helper.rs | 1 + src/catalog/src/information_schema.rs | 4 +- src/catalog/src/information_schema/tables.rs | 6 +- src/catalog/src/lib.rs | 56 +- src/catalog/src/local/manager.rs | 151 ++--- src/catalog/src/local/memory.rs | 272 +++++---- src/catalog/src/remote/manager.rs | 556 +++++++++--------- src/catalog/src/schema.rs | 10 +- src/catalog/src/table_source.rs | 29 +- src/catalog/src/tables.rs | 11 +- src/catalog/tests/remote_catalog_tests.rs | 61 +- src/cmd/Cargo.toml | 3 +- src/common/substrait/src/df_logical.rs | 133 +---- src/common/substrait/src/df_substrait.rs | 9 +- src/common/substrait/src/lib.rs | 7 +- src/datanode/src/instance.rs | 2 + src/datanode/src/instance/grpc.rs | 116 +++- src/datanode/src/sql/create.rs | 1 + src/datanode/src/sql/flush_table.rs | 3 +- src/datanode/src/tests/test_util.rs | 2 + src/frontend/src/catalog.rs | 186 +++--- src/frontend/src/instance.rs | 3 +- src/frontend/src/instance/distributed.rs | 1 + src/frontend/src/statement.rs | 9 +- src/frontend/src/statement/show.rs | 6 +- src/frontend/src/table.rs | 21 +- src/frontend/src/tests.rs | 4 +- src/meta-client/src/client/router.rs | 14 +- src/query/src/datafusion.rs | 32 +- src/query/src/datafusion/planner.rs | 2 +- src/query/src/planner.rs | 2 +- src/query/src/query_engine.rs | 10 +- src/query/src/query_engine/state.rs | 16 +- src/query/src/sql.rs | 13 +- src/query/src/tests/function.rs | 7 +- src/query/src/tests/my_sum_udaf_example.rs | 9 +- src/query/src/tests/percentile_test.rs | 7 +- src/query/src/tests/query_engine_test.rs | 7 +- src/query/src/tests/time_range_filter_test.rs | 8 +- src/script/benches/py_benchmark.rs | 7 +- src/script/src/python/engine.rs | 7 +- src/servers/src/http.rs | 6 +- src/servers/src/http/handler.rs | 4 +- src/servers/src/mysql/handler.rs | 2 +- src/servers/src/postgres/auth_handler.rs | 5 +- src/servers/src/query_handler/sql.rs | 5 +- src/servers/tests/http/influxdb_test.rs | 2 +- src/servers/tests/http/opentsdb_test.rs | 2 +- src/servers/tests/http/prometheus_test.rs | 2 +- src/servers/tests/mod.rs | 11 +- src/table-procedure/src/alter.rs | 6 +- src/table-procedure/src/create.rs | 8 +- src/table-procedure/src/drop.rs | 3 +- src/table/src/metadata.rs | 12 +- tests-integration/src/test_util.rs | 3 + .../distributed/alter/rename_table.result | 8 +- .../cases/distributed/alter/rename_table.sql | 3 + 59 files changed, 998 insertions(+), 1227 deletions(-) delete mode 100644 src/catalog/src/datafusion.rs delete mode 100644 src/catalog/src/datafusion/catalog_adapter.rs diff --git a/src/catalog/src/datafusion.rs b/src/catalog/src/datafusion.rs deleted file mode 100644 index f54699455d..0000000000 --- a/src/catalog/src/datafusion.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod catalog_adapter; diff --git a/src/catalog/src/datafusion/catalog_adapter.rs b/src/catalog/src/datafusion/catalog_adapter.rs deleted file mode 100644 index 711ffe3afa..0000000000 --- a/src/catalog/src/datafusion/catalog_adapter.rs +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Catalog adapter between datafusion and greptime query engine. - -use std::any::Any; -use std::sync::Arc; - -use async_trait::async_trait; -use common_error::prelude::BoxedError; -use datafusion::catalog::catalog::{ - CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider, -}; -use datafusion::catalog::schema::SchemaProvider as DfSchemaProvider; -use datafusion::datasource::TableProvider as DfTableProvider; -use datafusion::error::Result as DataFusionResult; -use snafu::ResultExt; -use table::table::adapter::{DfTableProviderAdapter, TableAdapter}; -use table::TableRef; - -use crate::error::{self, Result, SchemaProviderOperationSnafu}; -use crate::{ - CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef, -}; - -pub struct DfCatalogListAdapter { - catalog_list: CatalogListRef, -} - -impl DfCatalogListAdapter { - pub fn new(catalog_list: CatalogListRef) -> DfCatalogListAdapter { - DfCatalogListAdapter { catalog_list } - } -} - -impl DfCatalogList for DfCatalogListAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn register_catalog( - &self, - name: String, - catalog: Arc, - ) -> Option> { - let catalog_adapter = Arc::new(CatalogProviderAdapter { - df_catalog_provider: catalog, - }); - self.catalog_list - .register_catalog(name, catalog_adapter) - .expect("datafusion does not accept fallible catalog access") // TODO(hl): datafusion register catalog does not handles errors - .map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _) - } - - fn catalog_names(&self) -> Vec { - // TODO(hl): datafusion register catalog does not handles errors - self.catalog_list - .catalog_names() - .expect("datafusion does not accept fallible catalog access") - } - - fn catalog(&self, name: &str) -> Option> { - self.catalog_list - .catalog(name) - .expect("datafusion does not accept fallible catalog access") // TODO(hl): datafusion register catalog does not handles errors - .map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _) - } -} - -/// Datafusion's CatalogProvider -> greptime CatalogProvider -struct CatalogProviderAdapter { - df_catalog_provider: Arc, -} - -impl CatalogProvider for CatalogProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Result> { - Ok(self.df_catalog_provider.schema_names()) - } - - fn register_schema( - &self, - _name: String, - _schema: SchemaProviderRef, - ) -> Result> { - todo!("register_schema is not supported in Datafusion catalog provider") - } - - fn schema(&self, name: &str) -> Result>> { - Ok(self - .df_catalog_provider - .schema(name) - .map(|df_schema_provider| Arc::new(SchemaProviderAdapter { df_schema_provider }) as _)) - } -} - -///Greptime CatalogProvider -> datafusion's CatalogProvider -pub struct DfCatalogProviderAdapter { - catalog_provider: CatalogProviderRef, -} - -impl DfCatalogProviderAdapter { - pub fn new(catalog_provider: CatalogProviderRef) -> Self { - Self { catalog_provider } - } -} - -impl DfCatalogProvider for DfCatalogProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - self.catalog_provider - .schema_names() - .expect("datafusion does not accept fallible catalog access") - } - - fn schema(&self, name: &str) -> Option> { - self.catalog_provider - .schema(name) - .expect("datafusion does not accept fallible catalog access") - .map(|schema_provider| Arc::new(DfSchemaProviderAdapter { schema_provider }) as _) - } -} - -/// Greptime SchemaProvider -> datafusion SchemaProvider -struct DfSchemaProviderAdapter { - schema_provider: Arc, -} - -#[async_trait] -impl DfSchemaProvider for DfSchemaProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Vec { - self.schema_provider - .table_names() - .expect("datafusion does not accept fallible catalog access") - } - - async fn table(&self, name: &str) -> Option> { - self.schema_provider - .table(name) - .await - .expect("datafusion does not accept fallible catalog access") - .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) - } - - fn register_table( - &self, - name: String, - table: Arc, - ) -> DataFusionResult>> { - let table = Arc::new(TableAdapter::new(table)?); - match self.schema_provider.register_table(name, table)? { - Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - None => Ok(None), - } - } - - fn deregister_table(&self, name: &str) -> DataFusionResult>> { - match self.schema_provider.deregister_table(name)? { - Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - None => Ok(None), - } - } - - fn table_exist(&self, name: &str) -> bool { - self.schema_provider - .table_exist(name) - .expect("datafusion does not accept fallible catalog access") - } -} - -/// Datafusion SchemaProviderAdapter -> greptime SchemaProviderAdapter -struct SchemaProviderAdapter { - df_schema_provider: Arc, -} - -#[async_trait] -impl SchemaProvider for SchemaProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - /// Retrieves the list of available table names in this schema. - fn table_names(&self) -> Result> { - Ok(self.df_schema_provider.table_names()) - } - - async fn table(&self, name: &str) -> Result> { - let table = self.df_schema_provider.table(name).await; - let table = table.map(|table_provider| { - match table_provider - .as_any() - .downcast_ref::() - { - Some(adapter) => adapter.table(), - None => { - // TODO(yingwen): Avoid panic here. - let adapter = - TableAdapter::new(table_provider).expect("convert datafusion table"); - Arc::new(adapter) as _ - } - } - }); - Ok(table) - } - - fn register_table(&self, name: String, table: TableRef) -> Result> { - let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone())); - Ok(self - .df_schema_provider - .register_table(name, table_provider) - .context(error::DatafusionSnafu { - msg: "Fail to register table to datafusion", - }) - .map_err(BoxedError::new) - .context(SchemaProviderOperationSnafu)? - .map(|_| table)) - } - - fn rename_table(&self, _name: &str, _new_name: String) -> Result { - todo!() - } - - fn deregister_table(&self, name: &str) -> Result> { - self.df_schema_provider - .deregister_table(name) - .context(error::DatafusionSnafu { - msg: "Fail to deregister table from datafusion", - }) - .map_err(BoxedError::new) - .context(SchemaProviderOperationSnafu)? - .map(|table| { - let adapter = TableAdapter::new(table) - .context(error::TableSchemaMismatchSnafu) - .map_err(BoxedError::new) - .context(SchemaProviderOperationSnafu)?; - Ok(Arc::new(adapter) as _) - }) - .transpose() - } - - fn table_exist(&self, name: &str) -> Result { - Ok(self.df_schema_provider.table_exist(name)) - } -} - -#[cfg(test)] -mod tests { - use table::table::numbers::NumbersTable; - - use super::*; - use crate::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; - - #[test] - #[should_panic] - pub fn test_register_schema() { - let adapter = CatalogProviderAdapter { - df_catalog_provider: Arc::new( - datafusion::catalog::catalog::MemoryCatalogProvider::new(), - ), - }; - - adapter - .register_schema( - "whatever".to_string(), - Arc::new(MemorySchemaProvider::new()), - ) - .unwrap(); - } - - #[tokio::test] - async fn test_register_table() { - let adapter = DfSchemaProviderAdapter { - schema_provider: Arc::new(MemorySchemaProvider::new()), - }; - - adapter - .register_table( - "test_table".to_string(), - Arc::new(DfTableProviderAdapter::new(Arc::new( - NumbersTable::default(), - ))), - ) - .unwrap(); - adapter.table("test_table").await.unwrap(); - } - - #[test] - pub fn test_register_catalog() { - let catalog_list = DfCatalogListAdapter { - catalog_list: new_memory_catalog_list().unwrap(), - }; - assert!(catalog_list - .register_catalog( - "test_catalog".to_string(), - Arc::new(DfCatalogProviderAdapter { - catalog_provider: Arc::new(MemoryCatalogProvider::new()), - }), - ) - .is_none()); - - catalog_list.catalog("test_catalog").unwrap(); - } -} diff --git a/src/catalog/src/helper.rs b/src/catalog/src/helper.rs index 6ee9a94dd5..83a0a84a17 100644 --- a/src/catalog/src/helper.rs +++ b/src/catalog/src/helper.rs @@ -191,6 +191,7 @@ impl TableRegionalKey { pub struct TableRegionalValue { pub version: TableVersion, pub regions_ids: Vec, + pub engine_name: Option, } pub struct CatalogKey { diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index f4d70ec6b9..fb29b84d97 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -49,7 +49,7 @@ impl SchemaProvider for InformationSchemaProvider { self } - fn table_names(&self) -> Result> { + async fn table_names(&self) -> Result> { Ok(vec![TABLES.to_string()]) } @@ -74,7 +74,7 @@ impl SchemaProvider for InformationSchemaProvider { Ok(Some(Arc::new(table))) } - fn table_exist(&self, name: &str) -> Result { + async fn table_exist(&self, name: &str) -> Result { Ok(matches!(name.to_ascii_lowercase().as_str(), TABLES)) } } diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 17650d2d9a..2f9d200159 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -98,13 +98,13 @@ impl InformationSchemaTablesBuilder { async fn make_tables(&mut self) -> Result { let catalog_name = self.catalog_name.clone(); - for schema_name in self.catalog_provider.schema_names()? { + for schema_name in self.catalog_provider.schema_names().await? { if schema_name == INFORMATION_SCHEMA_NAME { continue; } - let Some(schema) = self.catalog_provider.schema(&schema_name)? else { continue }; - for table_name in schema.table_names()? { + let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue }; + for table_name in schema.table_names().await? { let Some(table) = schema.table(&table_name).await? else { continue }; let table_info = table.table_info(); self.add_table( diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 41d640049d..3949b5fad5 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -29,7 +29,6 @@ use table::TableRef; use crate::error::{CreateTableSnafu, Result}; pub use crate::schema::{SchemaProvider, SchemaProviderRef}; -pub mod datafusion; pub mod error; pub mod helper; pub(crate) mod information_schema; @@ -40,55 +39,40 @@ pub mod system; pub mod table_source; pub mod tables; -/// Represent a list of named catalogs -pub trait CatalogList: Sync + Send { - /// Returns the catalog list as [`Any`](std::any::Any) - /// so that it can be downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// Adds a new catalog to this catalog list - /// If a catalog of the same name existed before, it is replaced in the list and returned. - fn register_catalog( - &self, - name: String, - catalog: CatalogProviderRef, - ) -> Result>; - - /// Retrieves the list of available catalog names - fn catalog_names(&self) -> Result>; - - /// Retrieves a specific catalog by name, provided it exists. - fn catalog(&self, name: &str) -> Result>; -} - /// Represents a catalog, comprising a number of named schemas. +#[async_trait::async_trait] pub trait CatalogProvider: Sync + Send { /// Returns the catalog provider as [`Any`](std::any::Any) /// so that it can be downcast to a specific implementation. fn as_any(&self) -> &dyn Any; /// Retrieves the list of available schema names in this catalog. - fn schema_names(&self) -> Result>; + async fn schema_names(&self) -> Result>; /// Registers schema to this catalog. - fn register_schema( + async fn register_schema( &self, name: String, schema: SchemaProviderRef, ) -> Result>; /// Retrieves a specific schema from the catalog by name, provided it exists. - fn schema(&self, name: &str) -> Result>; + async fn schema(&self, name: &str) -> Result>; } -pub type CatalogListRef = Arc; pub type CatalogProviderRef = Arc; #[async_trait::async_trait] -pub trait CatalogManager: CatalogList { +pub trait CatalogManager: Send + Sync { /// Starts a catalog manager. async fn start(&self) -> Result<()>; + async fn register_catalog( + &self, + name: String, + catalog: CatalogProviderRef, + ) -> Result>; + /// Registers a table within given catalog/schema to catalog manager, /// returns whether the table registered. async fn register_table(&self, request: RegisterTableRequest) -> Result; @@ -108,7 +92,11 @@ pub trait CatalogManager: CatalogList { async fn register_system_table(&self, request: RegisterSystemTableRequest) -> error::Result<()>; - fn schema(&self, catalog: &str, schema: &str) -> Result>; + async fn catalog_names(&self) -> Result>; + + async fn catalog(&self, catalog: &str) -> Result>; + + async fn schema(&self, catalog: &str, schema: &str) -> Result>; /// Returns the table by catalog, schema and table name. async fn table( @@ -117,6 +105,8 @@ pub trait CatalogManager: CatalogList { schema: &str, table_name: &str, ) -> Result>; + + fn as_any(&self) -> &dyn Any; } pub type CatalogManagerRef = Arc; @@ -238,15 +228,15 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec Result<()> { - self.init_system_catalog()?; + self.init_system_catalog().await?; let system_records = self.system.information_schema.system.records().await?; let entries = self.collect_system_catalog_entries(system_records).await?; let max_table_id = self.handle_system_catalog_entries(entries).await?; @@ -114,27 +114,27 @@ impl LocalCatalogManager { Ok(()) } - fn init_system_catalog(&self) -> Result<()> { + async fn init_system_catalog(&self) -> Result<()> { let system_schema = Arc::new(MemorySchemaProvider::new()); - system_schema.register_table( + system_schema.register_table_sync( SYSTEM_CATALOG_TABLE_NAME.to_string(), self.system.information_schema.system.clone(), )?; let system_catalog = Arc::new(MemoryCatalogProvider::new()); - system_catalog.register_schema(INFORMATION_SCHEMA_NAME.to_string(), system_schema)?; + system_catalog.register_schema_sync(INFORMATION_SCHEMA_NAME.to_string(), system_schema)?; self.catalogs - .register_catalog(SYSTEM_CATALOG_NAME.to_string(), system_catalog)?; + .register_catalog_sync(SYSTEM_CATALOG_NAME.to_string(), system_catalog)?; let default_catalog = Arc::new(MemoryCatalogProvider::new()); let default_schema = Arc::new(MemorySchemaProvider::new()); // Add numbers table for test let table = Arc::new(NumbersTable::default()); - default_schema.register_table("numbers".to_string(), table)?; + default_schema.register_table_sync("numbers".to_string(), table)?; - default_catalog.register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?; + default_catalog.register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema)?; self.catalogs - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog)?; + .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog)?; Ok(()) } @@ -213,16 +213,17 @@ impl LocalCatalogManager { info!("Register catalog: {}", c.catalog_name); } Entry::Schema(s) => { - let catalog = - self.catalogs - .catalog(&s.catalog_name)? - .context(CatalogNotFoundSnafu { - catalog_name: &s.catalog_name, - })?; - catalog.register_schema( - s.schema_name.clone(), - Arc::new(MemorySchemaProvider::new()), - )?; + self.catalogs + .catalog(&s.catalog_name) + .await? + .context(CatalogNotFoundSnafu { + catalog_name: &s.catalog_name, + })? + .register_schema( + s.schema_name.clone(), + Arc::new(MemorySchemaProvider::new()), + ) + .await?; info!("Registered schema: {:?}", s); } Entry::Table(t) => { @@ -243,14 +244,16 @@ impl LocalCatalogManager { } async fn open_and_register_table(&self, t: &TableEntry) -> Result<()> { - let catalog = self - .catalogs - .catalog(&t.catalog_name)? - .context(CatalogNotFoundSnafu { - catalog_name: &t.catalog_name, - })?; + let catalog = + self.catalogs + .catalog(&t.catalog_name) + .await? + .context(CatalogNotFoundSnafu { + catalog_name: &t.catalog_name, + })?; let schema = catalog - .schema(&t.schema_name)? + .schema(&t.schema_name) + .await? .context(SchemaNotFoundSnafu { catalog: &t.catalog_name, schema: &t.schema_name, @@ -286,37 +289,11 @@ impl LocalCatalogManager { ), })?; - schema.register_table(t.table_name.clone(), option)?; + schema.register_table(t.table_name.clone(), option).await?; Ok(()) } } -impl CatalogList for LocalCatalogManager { - fn as_any(&self) -> &dyn Any { - self - } - - fn register_catalog( - &self, - name: String, - catalog: CatalogProviderRef, - ) -> Result> { - self.catalogs.register_catalog(name, catalog) - } - - fn catalog_names(&self) -> Result> { - self.catalogs.catalog_names() - } - - fn catalog(&self, name: &str) -> Result> { - if name.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) { - Ok(Some(self.system.clone())) - } else { - self.catalogs.catalog(name) - } - } -} - #[async_trait::async_trait] impl TableIdProvider for LocalCatalogManager { async fn next_table_id(&self) -> table::Result { @@ -347,10 +324,12 @@ impl CatalogManager for LocalCatalogManager { let catalog = self .catalogs - .catalog(catalog_name)? + .catalog(catalog_name) + .await? .context(CatalogNotFoundSnafu { catalog_name })?; let schema = catalog - .schema(schema_name)? + .schema(schema_name) + .await? .with_context(|| SchemaNotFoundSnafu { catalog: catalog_name, schema: schema_name, @@ -388,7 +367,9 @@ impl CatalogManager for LocalCatalogManager { engine, ) .await?; - schema.register_table(request.table_name, request.table)?; + schema + .register_table(request.table_name, request.table) + .await?; Ok(true) } } @@ -409,11 +390,13 @@ impl CatalogManager for LocalCatalogManager { let catalog = self .catalogs - .catalog(catalog_name)? + .catalog(catalog_name) + .await? .context(CatalogNotFoundSnafu { catalog_name })?; let schema = catalog - .schema(schema_name)? + .schema(schema_name) + .await? .with_context(|| SchemaNotFoundSnafu { catalog: catalog_name, schema: schema_name, @@ -421,7 +404,7 @@ impl CatalogManager for LocalCatalogManager { let _lock = self.register_lock.lock().await; ensure!( - !schema.table_exist(&request.new_table_name)?, + !schema.table_exist(&request.new_table_name).await?, TableExistsSnafu { table: &request.new_table_name } @@ -447,6 +430,7 @@ impl CatalogManager for LocalCatalogManager { let renamed = schema .rename_table(&request.table_name, request.new_table_name.clone()) + .await .is_ok(); Ok(renamed) } @@ -497,13 +481,14 @@ impl CatalogManager for LocalCatalogManager { let catalog = self .catalogs - .catalog(catalog_name)? + .catalog(catalog_name) + .await? .context(CatalogNotFoundSnafu { catalog_name })?; { let _lock = self.register_lock.lock().await; ensure!( - catalog.schema(schema_name)?.is_none(), + catalog.schema(schema_name).await?.is_none(), SchemaExistsSnafu { schema: schema_name, } @@ -511,7 +496,9 @@ impl CatalogManager for LocalCatalogManager { self.system .register_schema(request.catalog, schema_name.clone()) .await?; - catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?; + catalog + .register_schema(request.schema, Arc::new(MemorySchemaProvider::new())) + .await?; Ok(true) } } @@ -530,13 +517,15 @@ impl CatalogManager for LocalCatalogManager { Ok(()) } - fn schema(&self, catalog: &str, schema: &str) -> Result> { + async fn schema(&self, catalog: &str, schema: &str) -> Result> { self.catalogs - .catalog(catalog)? + .catalog(catalog) + .await? .context(CatalogNotFoundSnafu { catalog_name: catalog, })? .schema(schema) + .await } async fn table( @@ -547,16 +536,42 @@ impl CatalogManager for LocalCatalogManager { ) -> Result> { let catalog = self .catalogs - .catalog(catalog_name)? + .catalog(catalog_name) + .await? .context(CatalogNotFoundSnafu { catalog_name })?; let schema = catalog - .schema(schema_name)? + .schema(schema_name) + .await? .with_context(|| SchemaNotFoundSnafu { catalog: catalog_name, schema: schema_name, })?; schema.table(table_name).await } + + async fn catalog(&self, catalog: &str) -> Result> { + if catalog.eq_ignore_ascii_case(SYSTEM_CATALOG_NAME) { + Ok(Some(self.system.clone())) + } else { + self.catalogs.catalog(catalog).await + } + } + + async fn catalog_names(&self) -> Result> { + self.catalogs.catalog_names().await + } + + async fn register_catalog( + &self, + name: String, + catalog: CatalogProviderRef, + ) -> Result> { + self.catalogs.register_catalog(name, catalog).await + } + + fn as_any(&self) -> &dyn Any { + self + } } #[cfg(test)] diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 84448eeeb8..4a87c65ee9 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -31,7 +31,7 @@ use crate::error::{ }; use crate::schema::SchemaProvider; use crate::{ - CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, + CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProviderRef, }; @@ -51,10 +51,10 @@ impl Default for MemoryCatalogManager { }; let default_catalog = Arc::new(MemoryCatalogProvider::new()); manager - .register_catalog("greptime".to_string(), default_catalog.clone()) + .register_catalog_sync("greptime".to_string(), default_catalog.clone()) .unwrap(); default_catalog - .register_schema("public".to_string(), Arc::new(MemorySchemaProvider::new())) + .register_schema_sync("public".to_string(), Arc::new(MemorySchemaProvider::new())) .unwrap(); manager } @@ -75,70 +75,70 @@ impl CatalogManager for MemoryCatalogManager { } async fn register_table(&self, request: RegisterTableRequest) -> Result { - let catalogs = self.catalogs.write().unwrap(); - let catalog = catalogs - .get(&request.catalog) + let schema = self + .catalog(&request.catalog) .context(CatalogNotFoundSnafu { catalog_name: &request.catalog, })? - .clone(); - let schema = catalog - .schema(&request.schema)? - .with_context(|| SchemaNotFoundSnafu { + .schema(&request.schema) + .await? + .context(SchemaNotFoundSnafu { catalog: &request.catalog, schema: &request.schema, })?; schema .register_table(request.table_name, request.table) + .await .map(|v| v.is_none()) } async fn rename_table(&self, request: RenameTableRequest) -> Result { - let catalogs = self.catalogs.write().unwrap(); - let catalog = catalogs - .get(&request.catalog) + let catalog = self + .catalog(&request.catalog) .context(CatalogNotFoundSnafu { catalog_name: &request.catalog, - })? - .clone(); - let schema = catalog - .schema(&request.schema)? - .with_context(|| SchemaNotFoundSnafu { - catalog: &request.catalog, - schema: &request.schema, })?; + let schema = + catalog + .schema(&request.schema) + .await? + .with_context(|| SchemaNotFoundSnafu { + catalog: &request.catalog, + schema: &request.schema, + })?; Ok(schema .rename_table(&request.table_name, request.new_table_name) + .await .is_ok()) } async fn deregister_table(&self, request: DeregisterTableRequest) -> Result { - let catalogs = self.catalogs.write().unwrap(); - let catalog = catalogs - .get(&request.catalog) + let schema = self + .catalog(&request.catalog) .context(CatalogNotFoundSnafu { catalog_name: &request.catalog, })? - .clone(); - let schema = catalog - .schema(&request.schema)? + .schema(&request.schema) + .await? .with_context(|| SchemaNotFoundSnafu { catalog: &request.catalog, schema: &request.schema, })?; schema .deregister_table(&request.table_name) + .await .map(|v| v.is_some()) } async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { - let catalogs = self.catalogs.write().unwrap(); - let catalog = catalogs - .get(&request.catalog) + let catalog = self + .catalog(&request.catalog) .context(CatalogNotFoundSnafu { catalog_name: &request.catalog, })?; - catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?; + catalog + .register_schema(request.schema, Arc::new(MemorySchemaProvider::new())) + .await?; Ok(true) } @@ -147,10 +147,9 @@ impl CatalogManager for MemoryCatalogManager { Ok(()) } - fn schema(&self, catalog: &str, schema: &str) -> Result> { - let catalogs = self.catalogs.read().unwrap(); - if let Some(c) = catalogs.get(catalog) { - c.schema(schema) + async fn schema(&self, catalog: &str, schema: &str) -> Result> { + if let Some(c) = self.catalog(catalog) { + c.schema(schema).await } else { Ok(None) } @@ -162,15 +161,30 @@ impl CatalogManager for MemoryCatalogManager { schema: &str, table_name: &str, ) -> Result> { - let catalog = { - let c = self.catalogs.read().unwrap(); - let Some(c) = c.get(catalog) else { return Ok(None) }; - c.clone() - }; - match catalog.schema(schema)? { - None => Ok(None), - Some(s) => s.table(table_name).await, - } + let Some(catalog) = self + .catalog(catalog) else { return Ok(None)}; + let Some(s) = catalog.schema(schema).await? else { return Ok(None) }; + s.table(table_name).await + } + + async fn catalog(&self, catalog: &str) -> Result> { + Ok(self.catalogs.read().unwrap().get(catalog).cloned()) + } + + async fn catalog_names(&self) -> Result> { + Ok(self.catalogs.read().unwrap().keys().cloned().collect()) + } + + async fn register_catalog( + &self, + name: String, + catalog: CatalogProviderRef, + ) -> Result> { + self.register_catalog_sync(name, catalog) + } + + fn as_any(&self) -> &dyn Any { + self } } @@ -192,14 +206,8 @@ impl MemoryCatalogManager { } } } -} -impl CatalogList for MemoryCatalogManager { - fn as_any(&self) -> &dyn Any { - self - } - - fn register_catalog( + pub fn register_catalog_sync( &self, name: String, catalog: CatalogProviderRef, @@ -208,14 +216,8 @@ impl CatalogList for MemoryCatalogManager { Ok(catalogs.insert(name, catalog)) } - fn catalog_names(&self) -> Result> { - let catalogs = self.catalogs.read().unwrap(); - Ok(catalogs.keys().map(|s| s.to_string()).collect()) - } - - fn catalog(&self, name: &str) -> Result> { - let catalogs = self.catalogs.read().unwrap(); - Ok(catalogs.get(name).cloned()) + fn catalog(&self, catalog_name: &str) -> Option { + self.catalogs.read().unwrap().get(catalog_name).cloned() } } @@ -237,19 +239,13 @@ impl MemoryCatalogProvider { schemas: RwLock::new(HashMap::new()), } } -} -impl CatalogProvider for MemoryCatalogProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Result> { + pub fn schema_names_sync(&self) -> Result> { let schemas = self.schemas.read().unwrap(); Ok(schemas.keys().cloned().collect()) } - fn register_schema( + pub fn register_schema_sync( &self, name: String, schema: SchemaProviderRef, @@ -262,12 +258,35 @@ impl CatalogProvider for MemoryCatalogProvider { Ok(schemas.insert(name, schema)) } - fn schema(&self, name: &str) -> Result>> { + pub fn schema_sync(&self, name: &str) -> Result>> { let schemas = self.schemas.read().unwrap(); Ok(schemas.get(name).cloned()) } } +#[async_trait::async_trait] +impl CatalogProvider for MemoryCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + async fn schema_names(&self) -> Result> { + self.schema_names_sync() + } + + async fn register_schema( + &self, + name: String, + schema: SchemaProviderRef, + ) -> Result> { + self.register_schema_sync(name, schema) + } + + async fn schema(&self, name: &str) -> Result>> { + self.schema_sync(name) + } +} + /// Simple in-memory implementation of a schema. pub struct MemorySchemaProvider { tables: RwLock>, @@ -280,31 +299,8 @@ impl MemorySchemaProvider { tables: RwLock::new(HashMap::new()), } } -} -impl Default for MemorySchemaProvider { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl SchemaProvider for MemorySchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Result> { - let tables = self.tables.read().unwrap(); - Ok(tables.keys().cloned().collect()) - } - - async fn table(&self, name: &str) -> Result> { - let tables = self.tables.read().unwrap(); - Ok(tables.get(name).cloned()) - } - - fn register_table(&self, name: String, table: TableRef) -> Result> { + pub fn register_table_sync(&self, name: String, table: TableRef) -> Result> { let mut tables = self.tables.write().unwrap(); if let Some(existing) = tables.get(name.as_str()) { // if table with the same name but different table id exists, then it's a fatal bug @@ -322,7 +318,7 @@ impl SchemaProvider for MemorySchemaProvider { } } - fn rename_table(&self, name: &str, new_name: String) -> Result { + pub fn rename_table_sync(&self, name: &str, new_name: String) -> Result { let mut tables = self.tables.write().unwrap(); let Some(table) = tables.remove(name) else { return TableNotFoundSnafu { @@ -340,14 +336,53 @@ impl SchemaProvider for MemorySchemaProvider { Ok(table) } - fn deregister_table(&self, name: &str) -> Result> { + pub fn table_exist_sync(&self, name: &str) -> Result { + let tables = self.tables.read().unwrap(); + Ok(tables.contains_key(name)) + } + + pub fn deregister_table_sync(&self, name: &str) -> Result> { let mut tables = self.tables.write().unwrap(); Ok(tables.remove(name)) } +} - fn table_exist(&self, name: &str) -> Result { +impl Default for MemorySchemaProvider { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl SchemaProvider for MemorySchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + async fn table_names(&self) -> Result> { let tables = self.tables.read().unwrap(); - Ok(tables.contains_key(name)) + Ok(tables.keys().cloned().collect()) + } + + async fn table(&self, name: &str) -> Result> { + let tables = self.tables.read().unwrap(); + Ok(tables.get(name).cloned()) + } + + async fn register_table(&self, name: String, table: TableRef) -> Result> { + self.register_table_sync(name, table) + } + + async fn rename_table(&self, name: &str, new_name: String) -> Result { + self.rename_table_sync(name, new_name) + } + + async fn deregister_table(&self, name: &str) -> Result> { + self.deregister_table_sync(name) + } + + async fn table_exist(&self, name: &str) -> Result { + self.table_exist_sync(name) } } @@ -368,15 +403,20 @@ mod tests { #[tokio::test] async fn test_new_memory_catalog_list() { let catalog_list = new_memory_catalog_list().unwrap(); - let default_catalog = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap().unwrap(); + let default_catalog = CatalogManager::catalog(&*catalog_list, DEFAULT_CATALOG_NAME) + .await + .unwrap() + .unwrap(); let default_schema = default_catalog .schema(DEFAULT_SCHEMA_NAME) + .await .unwrap() .unwrap(); default_schema .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) + .await .unwrap(); let table = default_schema.table("numbers").await.unwrap(); @@ -388,17 +428,17 @@ mod tests { async fn test_mem_provider() { let provider = MemorySchemaProvider::new(); let table_name = "numbers"; - assert!(!provider.table_exist(table_name).unwrap()); - assert!(provider.deregister_table(table_name).unwrap().is_none()); + assert!(!provider.table_exist_sync(table_name).unwrap()); + provider.deregister_table_sync(table_name).unwrap(); let test_table = NumbersTable::default(); // register table successfully assert!(provider - .register_table(table_name.to_string(), Arc::new(test_table)) + .register_table_sync(table_name.to_string(), Arc::new(test_table)) .unwrap() .is_none()); - assert!(provider.table_exist(table_name).unwrap()); + assert!(provider.table_exist_sync(table_name).unwrap()); let other_table = NumbersTable::new(12); - let result = provider.register_table(table_name.to_string(), Arc::new(other_table)); + let result = provider.register_table_sync(table_name.to_string(), Arc::new(other_table)); let err = result.err().unwrap(); assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); } @@ -407,27 +447,27 @@ mod tests { async fn test_mem_provider_rename_table() { let provider = MemorySchemaProvider::new(); let table_name = "num"; - assert!(!provider.table_exist(table_name).unwrap()); + assert!(!provider.table_exist_sync(table_name).unwrap()); let test_table: TableRef = Arc::new(NumbersTable::default()); // register test table assert!(provider - .register_table(table_name.to_string(), test_table.clone()) + .register_table_sync(table_name.to_string(), test_table.clone()) .unwrap() .is_none()); - assert!(provider.table_exist(table_name).unwrap()); + assert!(provider.table_exist_sync(table_name).unwrap()); // rename test table let new_table_name = "numbers"; provider - .rename_table(table_name, new_table_name.to_string()) + .rename_table_sync(table_name, new_table_name.to_string()) .unwrap(); // test old table name not exist - assert!(!provider.table_exist(table_name).unwrap()); - assert!(provider.deregister_table(table_name).unwrap().is_none()); + assert!(!provider.table_exist_sync(table_name).unwrap()); + provider.deregister_table_sync(table_name).unwrap(); // test new table name exists - assert!(provider.table_exist(new_table_name).unwrap()); + assert!(provider.table_exist_sync(new_table_name).unwrap()); let registered_table = provider.table(new_table_name).await.unwrap().unwrap(); assert_eq!( registered_table.table_info().ident.table_id, @@ -435,7 +475,9 @@ mod tests { ); let other_table = Arc::new(NumbersTable::new(2)); - let result = provider.register_table(new_table_name.to_string(), other_table); + let result = provider + .register_table(new_table_name.to_string(), other_table) + .await; let err = result.err().unwrap(); assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); } @@ -445,6 +487,7 @@ mod tests { let catalog = MemoryCatalogManager::default(); let schema = catalog .schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .await .unwrap() .unwrap(); @@ -460,7 +503,7 @@ mod tests { table, }; assert!(catalog.register_table(register_table_req).await.unwrap()); - assert!(schema.table_exist(table_name).unwrap()); + assert!(schema.table_exist(table_name).await.unwrap()); // rename table let new_table_name = "numbers_new"; @@ -472,8 +515,8 @@ mod tests { table_id, }; assert!(catalog.rename_table(rename_table_req).await.unwrap()); - assert!(!schema.table_exist(table_name).unwrap()); - assert!(schema.table_exist(new_table_name).unwrap()); + assert!(!schema.table_exist(table_name).await.unwrap()); + assert!(schema.table_exist(new_table_name).await.unwrap()); let registered_table = catalog .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) @@ -507,6 +550,7 @@ mod tests { let catalog = MemoryCatalogManager::default(); let schema = catalog .schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .await .unwrap() .unwrap(); @@ -518,7 +562,7 @@ mod tests { table: Arc::new(NumbersTable::default()), }; catalog.register_table(register_table_req).await.unwrap(); - assert!(schema.table_exist("numbers").unwrap()); + assert!(schema.table_exist("numbers").await.unwrap()); let deregister_table_req = DeregisterTableRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), @@ -529,6 +573,6 @@ mod tests { .deregister_table(deregister_table_req) .await .unwrap(); - assert!(!schema.table_exist("numbers").unwrap()); + assert!(!schema.table_exist("numbers").await.unwrap()); } } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 910032efb9..f449032268 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -17,19 +17,17 @@ use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::Arc; -use arc_swap::ArcSwap; use async_stream::stream; use async_trait::async_trait; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use dashmap::DashMap; use futures::Stream; use futures_util::{StreamExt, TryStreamExt}; -use key_lock::KeyLock; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt}; use table::engine::manager::TableEngineManagerRef; -use table::engine::EngineContext; +use table::engine::{EngineContext, TableReference}; use table::metadata::TableId; use table::requests::{CreateTableRequest, OpenTableRequest}; use table::TableRef; @@ -41,13 +39,13 @@ use crate::error::{ TableExistsSnafu, UnimplementedSnafu, }; use crate::helper::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, - SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, - CATALOG_KEY_PREFIX, + build_catalog_prefix, build_schema_prefix, build_table_global_prefix, + build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, + TableGlobalValue, TableRegionalKey, TableRegionalValue, CATALOG_KEY_PREFIX, }; use crate::remote::{Kv, KvBackendRef}; use crate::{ - handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, + handle_system_table_request, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef, }; @@ -72,19 +70,12 @@ impl RemoteCatalogManager { } } - fn build_catalog_key(&self, catalog_name: impl AsRef) -> CatalogKey { - CatalogKey { - catalog_name: catalog_name.as_ref().to_string(), - } - } - fn new_catalog_provider(&self, catalog_name: &str) -> CatalogProviderRef { Arc::new(RemoteCatalogProvider { node_id: self.node_id, catalog_name: catalog_name.to_string(), backend: self.backend.clone(), - schemas: Default::default(), - mutex: Default::default(), + engine_manager: self.engine_manager.clone(), }) as _ } @@ -92,10 +83,9 @@ impl RemoteCatalogManager { Arc::new(RemoteSchemaProvider { catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), - tables: Default::default(), node_id: self.node_id, backend: self.backend.clone(), - mutex: Default::default(), + engine_manager: self.engine_manager.clone(), }) as _ } @@ -217,10 +207,12 @@ impl RemoteCatalogManager { .. } = r?; info!("Found schema: {}.{}", catalog_name, schema_name); - let schema = match catalog.schema(&schema_name)? { + let schema = match catalog.schema(&schema_name).await? { None => { let schema = self.new_schema_provider(&catalog_name, &schema_name); - catalog.register_schema(schema_name.clone(), schema.clone())?; + catalog + .register_schema(schema_name.clone(), schema.clone()) + .await?; info!("Registered schema: {}", &schema_name); schema } @@ -266,7 +258,7 @@ impl RemoteCatalogManager { let table_info = table_ref.table_info(); let table_name = &table_info.name; let table_id = table_info.ident.table_id; - schema.register_table(table_name.clone(), table_ref)?; + schema.register_table(table_name.clone(), table_ref).await?; info!("Registered table {}", table_name); max_table_id = max_table_id.max(table_id); table_num += 1; @@ -287,7 +279,9 @@ impl RemoteCatalogManager { let schema_provider = self.new_schema_provider(catalog_name, schema_name); let catalog_provider = self.new_catalog_provider(catalog_name); - catalog_provider.register_schema(schema_name.to_string(), schema_provider.clone())?; + catalog_provider + .register_schema(schema_name.to_string(), schema_provider.clone()) + .await?; let schema_key = SchemaKey { catalog_name: catalog_name.to_string(), @@ -438,56 +432,85 @@ impl CatalogManager for RemoteCatalogManager { async fn register_table(&self, request: RegisterTableRequest) -> Result { let catalog_name = request.catalog; let schema_name = request.schema; - let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu { - catalog_name: &catalog_name, - })?; - let schema_provider = - catalog_provider - .schema(&schema_name)? - .with_context(|| SchemaNotFoundSnafu { - catalog: &catalog_name, - schema: &schema_name, - })?; - if schema_provider.table_exist(&request.table_name)? { + let schema_provider = self + .catalog(&catalog_name) + .await? + .context(CatalogNotFoundSnafu { + catalog_name: &catalog_name, + })? + .schema(&schema_name) + .await? + .with_context(|| SchemaNotFoundSnafu { + catalog: &catalog_name, + schema: &schema_name, + })?; + if schema_provider.table_exist(&request.table_name).await? { return TableExistsSnafu { table: format!("{}.{}.{}", &catalog_name, &schema_name, &request.table_name), } .fail(); } - schema_provider.register_table(request.table_name, request.table)?; + schema_provider + .register_table(request.table_name, request.table) + .await?; Ok(true) } async fn deregister_table(&self, request: DeregisterTableRequest) -> Result { let catalog_name = &request.catalog; let schema_name = &request.schema; - let schema = self - .schema(catalog_name, schema_name)? + let result = self + .schema(catalog_name, schema_name) + .await? .context(SchemaNotFoundSnafu { catalog: catalog_name, schema: schema_name, - })?; - - let result = schema.deregister_table(&request.table_name)?; + })? + .deregister_table(&request.table_name) + .await?; Ok(result.is_none()) } async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { let catalog_name = request.catalog; let schema_name = request.schema; - let catalog_provider = self.catalog(&catalog_name)?.context(CatalogNotFoundSnafu { - catalog_name: &catalog_name, - })?; + let catalog_provider = + self.catalog(&catalog_name) + .await? + .context(CatalogNotFoundSnafu { + catalog_name: &catalog_name, + })?; let schema_provider = self.new_schema_provider(&catalog_name, &schema_name); - catalog_provider.register_schema(schema_name, schema_provider)?; + catalog_provider + .register_schema(schema_name, schema_provider) + .await?; Ok(true) } - async fn rename_table(&self, _request: RenameTableRequest) -> Result { - UnimplementedSnafu { - operation: "rename table", + async fn rename_table(&self, request: RenameTableRequest) -> Result { + let old_table_key = TableRegionalKey { + catalog_name: request.catalog.clone(), + schema_name: request.schema.clone(), + table_name: request.table_name.clone(), + node_id: self.node_id, } - .fail() + .to_string(); + let Some(Kv(_, value_bytes)) = self.backend.get(old_table_key.as_bytes()).await? else { + return Ok(false) + }; + let new_table_key = TableRegionalKey { + catalog_name: request.catalog.clone(), + schema_name: request.schema.clone(), + table_name: request.new_table_name, + node_id: self.node_id, + }; + self.backend + .set(new_table_key.to_string().as_bytes(), &value_bytes) + .await?; + self.backend + .delete(old_table_key.to_string().as_bytes()) + .await?; + Ok(true) } async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { @@ -496,12 +519,14 @@ impl CatalogManager for RemoteCatalogManager { Ok(()) } - fn schema(&self, catalog: &str, schema: &str) -> Result> { - self.catalog(catalog)? + async fn schema(&self, catalog: &str, schema: &str) -> Result> { + self.catalog(catalog) + .await? .context(CatalogNotFoundSnafu { catalog_name: catalog, })? .schema(schema) + .await } async fn table( @@ -511,114 +536,67 @@ impl CatalogManager for RemoteCatalogManager { table_name: &str, ) -> Result> { let catalog = self - .catalog(catalog_name)? + .catalog(catalog_name) + .await? .with_context(|| CatalogNotFoundSnafu { catalog_name })?; let schema = catalog - .schema(schema_name)? + .schema(schema_name) + .await? .with_context(|| SchemaNotFoundSnafu { catalog: catalog_name, schema: schema_name, })?; schema.table(table_name).await } -} -impl CatalogList for RemoteCatalogManager { - fn as_any(&self) -> &dyn Any { - self + async fn catalog(&self, catalog: &str) -> Result> { + let key = CatalogKey { + catalog_name: catalog.to_string(), + }; + Ok(self + .backend + .get(key.to_string().as_bytes()) + .await? + .map(|_| self.new_catalog_provider(catalog))) } - fn register_catalog( - &self, - name: String, - catalog: CatalogProviderRef, - ) -> Result> { - let key = self.build_catalog_key(&name).to_string(); - let backend = self.backend.clone(); - let catalogs = self.catalogs.clone(); + async fn catalog_names(&self) -> Result> { + let mut stream = self.backend.range(CATALOG_KEY_PREFIX.as_bytes()); + let mut catalogs = HashSet::new(); - std::thread::spawn(|| { - common_runtime::block_on_write(async move { - backend - .set( - key.as_bytes(), - &CatalogValue {} - .as_bytes() - .context(InvalidCatalogValueSnafu)?, - ) - .await?; + while let Some(catalog) = stream.next().await { + if let Ok(catalog) = catalog { + let catalog_key = String::from_utf8_lossy(&catalog.0); - let catalogs = catalogs.read(); - let prev = catalogs.insert(name, catalog.clone()); - - Ok(prev) - }) - }) - .join() - .unwrap() - } - - /// List all catalogs from metasrv - fn catalog_names(&self) -> Result> { - let catalogs = self.catalogs.read(); - Ok(catalogs.iter().map(|k| k.key().to_string()).collect()) - } - - /// Read catalog info of given name from metasrv. - fn catalog(&self, name: &str) -> Result> { - { - let catalogs = self.catalogs.read(); - let catalog = catalogs.get(name); - - if let Some(catalog) = catalog { - return Ok(Some(catalog.clone())); + if let Ok(key) = CatalogKey::parse(&catalog_key) { + catalogs.insert(key.catalog_name); + } } } - let catalogs = self.catalogs.write(); + Ok(catalogs.into_iter().collect()) + } - let catalog = catalogs.get(name); - if let Some(catalog) = catalog { - return Ok(Some(catalog.clone())); - } + async fn register_catalog( + &self, + name: String, + _catalog: CatalogProviderRef, + ) -> Result> { + let key = CatalogKey { catalog_name: name }.to_string(); + // TODO(hl): use compare_and_swap to prevent concurrent update + self.backend + .set( + key.as_bytes(), + &CatalogValue {} + .as_bytes() + .context(InvalidCatalogValueSnafu)?, + ) + .await?; + Ok(None) + } - // It's for lack of incremental catalog syncing between datanode and meta. Here we fetch catalog - // from meta on demand. This can be removed when incremental catalog syncing is done in datanode. - - let backend = self.backend.clone(); - - let catalogs_from_meta: HashSet = std::thread::spawn(|| { - common_runtime::block_on_read(async move { - let mut stream = backend.range(CATALOG_KEY_PREFIX.as_bytes()); - let mut catalogs = HashSet::new(); - - while let Some(catalog) = stream.next().await { - if let Ok(catalog) = catalog { - let catalog_key = String::from_utf8_lossy(&catalog.0); - - if let Ok(key) = CatalogKey::parse(&catalog_key) { - catalogs.insert(key.catalog_name); - } - } - } - - catalogs - }) - }) - .join() - .unwrap(); - - catalogs.retain(|catalog_name, _| catalogs_from_meta.get(catalog_name).is_some()); - - for catalog in catalogs_from_meta { - catalogs - .entry(catalog.clone()) - .or_insert(self.new_catalog_provider(&catalog)); - } - - let catalog = catalogs.get(name); - - Ok(catalog.as_deref().cloned()) + fn as_any(&self) -> &dyn Any { + self } } @@ -626,119 +604,91 @@ pub struct RemoteCatalogProvider { node_id: u64, catalog_name: String, backend: KvBackendRef, - schemas: Arc>>, - mutex: Arc>, + engine_manager: TableEngineManagerRef, } impl RemoteCatalogProvider { - pub fn new(catalog_name: String, backend: KvBackendRef, node_id: u64) -> Self { + pub fn new( + catalog_name: String, + backend: KvBackendRef, + engine_manager: TableEngineManagerRef, + node_id: u64, + ) -> Self { Self { node_id, catalog_name, backend, - schemas: Default::default(), - mutex: Default::default(), + engine_manager, } } - pub fn refresh_schemas(&self) -> Result<()> { - let schemas = self.schemas.clone(); - let schema_prefix = build_schema_prefix(&self.catalog_name); - let catalog_name = self.catalog_name.clone(); - let mutex = self.mutex.clone(); - let backend = self.backend.clone(); - let node_id = self.node_id; - - std::thread::spawn(move || { - common_runtime::block_on_write(async move { - let _guard = mutex.lock().await; - let prev_schemas = schemas.load(); - let mut new_schemas = HashMap::with_capacity(prev_schemas.len() + 1); - new_schemas.clone_from(&prev_schemas); - - let mut remote_schemas = backend.range(schema_prefix.as_bytes()); - while let Some(r) = remote_schemas.next().await { - let Kv(k, _) = r?; - let schema_key = SchemaKey::parse(&String::from_utf8_lossy(&k)) - .context(InvalidCatalogValueSnafu)?; - if !new_schemas.contains_key(&schema_key.schema_name) { - new_schemas.insert( - schema_key.schema_name.clone(), - Arc::new(RemoteSchemaProvider::new( - catalog_name.clone(), - schema_key.schema_name, - node_id, - backend.clone(), - )), - ); - } - } - schemas.store(Arc::new(new_schemas)); - Ok(()) - }) - }) - .join() - .unwrap()?; - - Ok(()) - } - fn build_schema_key(&self, schema_name: impl AsRef) -> SchemaKey { SchemaKey { catalog_name: self.catalog_name.clone(), schema_name: schema_name.as_ref().to_string(), } } + + fn build_schema_provider(&self, schema_name: &str) -> SchemaProviderRef { + let provider = RemoteSchemaProvider { + catalog_name: self.catalog_name.clone(), + schema_name: schema_name.to_string(), + node_id: self.node_id, + backend: self.backend.clone(), + engine_manager: self.engine_manager.clone(), + }; + Arc::new(provider) as Arc<_> + } } +#[async_trait::async_trait] impl CatalogProvider for RemoteCatalogProvider { fn as_any(&self) -> &dyn Any { self } - fn schema_names(&self) -> Result> { - self.refresh_schemas()?; - Ok(self.schemas.load().keys().cloned().collect::>()) + async fn schema_names(&self) -> Result> { + let schema_prefix = build_schema_prefix(&self.catalog_name); + + let remote_schemas = self.backend.range(schema_prefix.as_bytes()); + let res = remote_schemas + .map(|kv| { + let Kv(k, _) = kv?; + let schema_key = SchemaKey::parse(String::from_utf8_lossy(&k)) + .context(InvalidCatalogValueSnafu)?; + Ok(schema_key.schema_name) + }) + .try_collect() + .await?; + Ok(res) } - fn register_schema( + async fn register_schema( &self, name: String, schema: SchemaProviderRef, ) -> Result> { + let _ = schema; // we don't care about schema provider let key = self.build_schema_key(&name).to_string(); - let backend = self.backend.clone(); - let mutex = self.mutex.clone(); - let schemas = self.schemas.clone(); - - std::thread::spawn(|| { - common_runtime::block_on_write(async move { - let _guard = mutex.lock().await; - backend - .set( - key.as_bytes(), - &SchemaValue {} - .as_bytes() - .context(InvalidCatalogValueSnafu)?, - ) - .await?; - - let prev_schemas = schemas.load(); - let mut new_schemas = HashMap::with_capacity(prev_schemas.len() + 1); - new_schemas.clone_from(&prev_schemas); - let prev_schema = new_schemas.insert(name, schema); - schemas.store(Arc::new(new_schemas)); - Ok(prev_schema) - }) - }) - .join() - .unwrap() + self.backend + .set( + key.as_bytes(), + &SchemaValue {} + .as_bytes() + .context(InvalidCatalogValueSnafu)?, + ) + .await?; + // TODO(hl): maybe return preview schema by cas + Ok(None) } - fn schema(&self, name: &str) -> Result>> { - // TODO(hl): We should refresh whole catalog before calling datafusion's query engine. - self.refresh_schemas()?; - Ok(self.schemas.load().get(name).cloned()) + async fn schema(&self, name: &str) -> Result> { + let key = self.build_schema_key(name).to_string(); + Ok(self + .backend + .get(key.as_bytes()) + .await? + .map(|_| self.build_schema_provider(name))) } } @@ -747,8 +697,7 @@ pub struct RemoteSchemaProvider { schema_name: String, node_id: u64, backend: KvBackendRef, - tables: Arc>>, - mutex: Arc>, + engine_manager: TableEngineManagerRef, } impl RemoteSchemaProvider { @@ -756,6 +705,7 @@ impl RemoteSchemaProvider { catalog_name: String, schema_name: String, node_id: u64, + engine_manager: TableEngineManagerRef, backend: KvBackendRef, ) -> Self { Self { @@ -763,8 +713,7 @@ impl RemoteSchemaProvider { schema_name, node_id, backend, - tables: Default::default(), - mutex: Default::default(), + engine_manager, } } @@ -784,89 +733,132 @@ impl SchemaProvider for RemoteSchemaProvider { self } - fn table_names(&self) -> Result> { - Ok(self - .tables - .load() - .iter() - .map(|en| en.key().clone()) - .collect::>()) + async fn table_names(&self) -> Result> { + let key_prefix = build_table_regional_prefix(&self.catalog_name, &self.schema_name); + let iter = self.backend.range(key_prefix.as_bytes()); + let table_names = iter + .map(|kv| { + let Kv(key, _) = kv?; + let regional_key = TableRegionalKey::parse(String::from_utf8_lossy(&key)) + .context(InvalidCatalogValueSnafu)?; + Ok(regional_key.table_name) + }) + .try_collect() + .await?; + Ok(table_names) } async fn table(&self, name: &str) -> Result> { - Ok(self.tables.load().get(name).map(|en| en.value().clone())) + let key = self.build_regional_table_key(name).to_string(); + let table_opt = self + .backend + .get(key.as_bytes()) + .await? + .map(|Kv(_, v)| { + let TableRegionalValue { engine_name, .. } = + TableRegionalValue::parse(String::from_utf8_lossy(&v)) + .context(InvalidCatalogValueSnafu)?; + let reference = TableReference { + catalog: &self.catalog_name, + schema: &self.schema_name, + table: name, + }; + let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE); + let engine = self + .engine_manager + .engine(engine_name) + .context(TableEngineNotFoundSnafu { engine_name })?; + let table = engine + .get_table(&EngineContext {}, &reference) + .with_context(|_| OpenTableSnafu { + table_info: reference.to_string(), + })?; + Ok(table) + }) + .transpose()? + .flatten(); + + Ok(table_opt) } - fn register_table(&self, name: String, table: TableRef) -> Result> { + async fn register_table(&self, name: String, table: TableRef) -> Result> { let table_info = table.table_info(); let table_version = table_info.ident.version; let table_value = TableRegionalValue { version: table_version, regions_ids: table.table_info().meta.region_numbers.clone(), + engine_name: Some(table_info.meta.engine.clone()), }; - let backend = self.backend.clone(); - let mutex = self.mutex.clone(); - let tables = self.tables.clone(); let table_key = self.build_regional_table_key(&name).to_string(); + self.backend + .set( + table_key.as_bytes(), + &table_value.as_bytes().context(InvalidCatalogValueSnafu)?, + ) + .await?; + debug!( + "Successfully set catalog table entry, key: {}, table value: {:?}", + table_key, table_value + ); - let prev = std::thread::spawn(move || { - common_runtime::block_on_read(async move { - let _guard = mutex.lock(table_key.clone()).await; - backend - .set( - table_key.as_bytes(), - &table_value.as_bytes().context(InvalidCatalogValueSnafu)?, - ) - .await?; - debug!( - "Successfully set catalog table entry, key: {}, table value: {:?}", - table_key, table_value - ); - - let tables = tables.load(); - let prev = tables.insert(name, table); - Ok(prev) - }) - }) - .join() - .unwrap(); - prev + // TODO(hl): retrieve prev table info using cas + Ok(None) } - fn rename_table(&self, _name: &str, _new_name: String) -> Result { + async fn rename_table(&self, _name: &str, _new_name: String) -> Result { UnimplementedSnafu { operation: "rename table", } .fail() } - fn deregister_table(&self, name: &str) -> Result> { - let table_name = name.to_string(); - let table_key = self.build_regional_table_key(&table_name).to_string(); - let backend = self.backend.clone(); - let mutex = self.mutex.clone(); - let tables = self.tables.clone(); - let prev = std::thread::spawn(move || { - common_runtime::block_on_read(async move { - let _guard = mutex.lock(table_key.clone()).await; - backend.delete(table_key.as_bytes()).await?; - debug!( - "Successfully deleted catalog table entry, key: {}", - table_key - ); + async fn deregister_table(&self, name: &str) -> Result> { + let table_key = self.build_regional_table_key(name).to_string(); - let tables = tables.load(); - let prev = tables.remove(&table_name).map(|en| en.1); - Ok(prev) + let engine_opt = self + .backend + .get(table_key.as_bytes()) + .await? + .map(|Kv(_, v)| { + let TableRegionalValue { engine_name, .. } = + TableRegionalValue::parse(String::from_utf8_lossy(&v)) + .context(InvalidCatalogValueSnafu)?; + Ok(engine_name) }) - }) - .join() - .unwrap(); - prev + .transpose()? + .flatten(); + + let engine_name = engine_opt.as_deref().unwrap_or_else(|| { + warn!("Cannot find table engine name for {table_key}"); + MITO_ENGINE + }); + + self.backend.delete(table_key.as_bytes()).await?; + debug!( + "Successfully deleted catalog table entry, key: {}", + table_key + ); + + let reference = TableReference { + catalog: &self.catalog_name, + schema: &self.schema_name, + table: name, + }; + // deregistering table does not necessarily mean dropping the table + let table = self + .engine_manager + .engine(engine_name) + .context(TableEngineNotFoundSnafu { engine_name })? + .get_table(&EngineContext {}, &reference) + .with_context(|_| OpenTableSnafu { + table_info: reference.to_string(), + })?; + Ok(table) } /// Checks if table exists in schema provider based on locally opened table map. - fn table_exist(&self, name: &str) -> Result { - Ok(self.tables.load().contains_key(name)) + async fn table_exist(&self, name: &str) -> Result { + let key = self.build_regional_table_key(name).to_string(); + Ok(self.backend.get(key.as_bytes()).await?.is_some()) } } diff --git a/src/catalog/src/schema.rs b/src/catalog/src/schema.rs index 9dcf329657..f9e6e2a6c2 100644 --- a/src/catalog/src/schema.rs +++ b/src/catalog/src/schema.rs @@ -28,14 +28,14 @@ pub trait SchemaProvider: Sync + Send { fn as_any(&self) -> &dyn Any; /// Retrieves the list of available table names in this schema. - fn table_names(&self) -> Result>; + async fn table_names(&self) -> Result>; /// Retrieves a specific table from the schema by name, provided it exists. async fn table(&self, name: &str) -> Result>; /// If supported by the implementation, adds a new table to this schema. /// If a table of the same name existed before, it returns "Table already exists" error. - fn register_table(&self, name: String, _table: TableRef) -> Result> { + async fn register_table(&self, name: String, _table: TableRef) -> Result> { NotSupportedSnafu { op: format!("register_table({name}, )"), } @@ -44,7 +44,7 @@ pub trait SchemaProvider: Sync + Send { /// If supported by the implementation, renames an existing table from this schema and returns it. /// If no table of that name exists, returns "Table not found" error. - fn rename_table(&self, name: &str, new_name: String) -> Result { + async fn rename_table(&self, name: &str, new_name: String) -> Result { NotSupportedSnafu { op: format!("rename_table({name}, {new_name})"), } @@ -53,7 +53,7 @@ pub trait SchemaProvider: Sync + Send { /// If supported by the implementation, removes an existing table from this schema and returns it. /// If no table of that name exists, returns Ok(None). - fn deregister_table(&self, name: &str) -> Result> { + async fn deregister_table(&self, name: &str) -> Result> { NotSupportedSnafu { op: format!("deregister_table({name})"), } @@ -63,7 +63,7 @@ pub trait SchemaProvider: Sync + Send { /// If supported by the implementation, checks the table exist in the schema provider or not. /// If no matched table in the schema provider, return false. /// Otherwise, return true. - fn table_exist(&self, name: &str) -> Result; + async fn table_exist(&self, name: &str) -> Result; } pub type SchemaProviderRef = Arc; diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index 81a0840a4c..bce848b788 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -28,10 +28,10 @@ use crate::error::{ CatalogNotFoundSnafu, QueryAccessDeniedSnafu, Result, SchemaNotFoundSnafu, TableNotExistSnafu, }; use crate::information_schema::InformationSchemaProvider; -use crate::CatalogListRef; +use crate::CatalogManagerRef; pub struct DfTableSourceProvider { - catalog_list: CatalogListRef, + catalog_manager: CatalogManagerRef, resolved_tables: HashMap>, disallow_cross_schema_query: bool, default_catalog: String, @@ -40,12 +40,12 @@ pub struct DfTableSourceProvider { impl DfTableSourceProvider { pub fn new( - catalog_list: CatalogListRef, + catalog_manager: CatalogManagerRef, disallow_cross_schema_query: bool, query_ctx: &QueryContext, ) -> Self { Self { - catalog_list, + catalog_manager, disallow_cross_schema_query, resolved_tables: HashMap::new(), default_catalog: query_ctx.current_catalog(), @@ -104,17 +104,22 @@ impl DfTableSourceProvider { let schema = if schema_name != INFORMATION_SCHEMA_NAME { let catalog = self - .catalog_list - .catalog(catalog_name)? + .catalog_manager + .catalog(catalog_name) + .await? .context(CatalogNotFoundSnafu { catalog_name })?; - catalog.schema(schema_name)?.context(SchemaNotFoundSnafu { - catalog: catalog_name, - schema: schema_name, - })? + catalog + .schema(schema_name) + .await? + .context(SchemaNotFoundSnafu { + catalog: catalog_name, + schema: schema_name, + })? } else { let catalog_provider = self - .catalog_list - .catalog(catalog_name)? + .catalog_manager + .catalog(catalog_name) + .await? .context(CatalogNotFoundSnafu { catalog_name })?; Arc::new(InformationSchemaProvider::new( catalog_name.to_string(), diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 8262621309..baf8e24fa2 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -40,7 +40,7 @@ impl SchemaProvider for InformationSchema { self } - fn table_names(&self) -> Result, Error> { + async fn table_names(&self) -> Result, Error> { Ok(vec![SYSTEM_CATALOG_TABLE_NAME.to_string()]) } @@ -52,7 +52,7 @@ impl SchemaProvider for InformationSchema { } } - fn table_exist(&self, name: &str) -> Result { + async fn table_exist(&self, name: &str) -> Result { Ok(name.eq_ignore_ascii_case(SYSTEM_CATALOG_TABLE_NAME)) } } @@ -116,16 +116,17 @@ impl SystemCatalog { } } +#[async_trait::async_trait] impl CatalogProvider for SystemCatalog { fn as_any(&self) -> &dyn Any { self } - fn schema_names(&self) -> Result, Error> { + async fn schema_names(&self) -> Result, Error> { Ok(vec![INFORMATION_SCHEMA_NAME.to_string()]) } - fn register_schema( + async fn register_schema( &self, _name: String, _schema: SchemaProviderRef, @@ -133,7 +134,7 @@ impl CatalogProvider for SystemCatalog { panic!("System catalog does not support registering schema!") } - fn schema(&self, name: &str) -> Result>, Error> { + async fn schema(&self, name: &str) -> Result>, Error> { if name.eq_ignore_ascii_case(INFORMATION_SCHEMA_NAME) { Ok(Some(self.information_schema.clone())) } else { diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 8ce777a3ec..42bc41f7e0 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -26,11 +26,11 @@ mod tests { use catalog::remote::{ KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, }; - use catalog::{CatalogList, CatalogManager, RegisterTableRequest}; + use catalog::{CatalogManager, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use datatypes::schema::RawSchema; use futures_util::StreamExt; - use table::engine::manager::MemoryTableEngineManager; + use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef}; use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; @@ -78,35 +78,47 @@ mod tests { async fn prepare_components( node_id: u64, - ) -> (KvBackendRef, TableEngineRef, Arc) { + ) -> ( + KvBackendRef, + TableEngineRef, + Arc, + TableEngineManagerRef, + ) { let backend = Arc::new(MockKvBackend::default()) as KvBackendRef; let table_engine = Arc::new(MockTableEngine::default()); let engine_manager = Arc::new(MemoryTableEngineManager::alias( MITO_ENGINE.to_string(), table_engine.clone(), )); - let catalog_manager = RemoteCatalogManager::new(engine_manager, node_id, backend.clone()); + let catalog_manager = + RemoteCatalogManager::new(engine_manager.clone(), node_id, backend.clone()); catalog_manager.start().await.unwrap(); - (backend, table_engine, Arc::new(catalog_manager)) + ( + backend, + table_engine, + Arc::new(catalog_manager), + engine_manager as Arc<_>, + ) } #[tokio::test] async fn test_remote_catalog_default() { common_telemetry::init_default_ut_logging(); let node_id = 42; - let (_, _, catalog_manager) = prepare_components(node_id).await; + let (_, _, catalog_manager, _) = prepare_components(node_id).await; assert_eq!( vec![DEFAULT_CATALOG_NAME.to_string()], - catalog_manager.catalog_names().unwrap() + catalog_manager.catalog_names().await.unwrap() ); let default_catalog = catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .await .unwrap() .unwrap(); assert_eq!( vec![DEFAULT_SCHEMA_NAME.to_string()], - default_catalog.schema_names().unwrap() + default_catalog.schema_names().await.unwrap() ); } @@ -114,7 +126,7 @@ mod tests { async fn test_remote_catalog_register_nonexistent() { common_telemetry::init_default_ut_logging(); let node_id = 42; - let (_, table_engine, catalog_manager) = prepare_components(node_id).await; + let (_, table_engine, catalog_manager, _) = prepare_components(node_id).await; // register a new table with an nonexistent catalog let catalog_name = "nonexistent_catalog".to_string(); let schema_name = "nonexistent_schema".to_string(); @@ -159,18 +171,20 @@ mod tests { #[tokio::test] async fn test_register_table() { let node_id = 42; - let (_, table_engine, catalog_manager) = prepare_components(node_id).await; + let (_, table_engine, catalog_manager, _) = prepare_components(node_id).await; let default_catalog = catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .await .unwrap() .unwrap(); assert_eq!( vec![DEFAULT_SCHEMA_NAME.to_string()], - default_catalog.schema_names().unwrap() + default_catalog.schema_names().await.unwrap() ); let default_schema = default_catalog .schema(DEFAULT_SCHEMA_NAME) + .await .unwrap() .unwrap(); @@ -208,31 +222,36 @@ mod tests { table, }; assert!(catalog_manager.register_table(reg_req).await.unwrap()); - assert_eq!(vec![table_name], default_schema.table_names().unwrap()); + assert_eq!( + vec![table_name], + default_schema.table_names().await.unwrap() + ); } #[tokio::test] async fn test_register_catalog_schema_table() { let node_id = 42; - let (backend, table_engine, catalog_manager) = prepare_components(node_id).await; + let (backend, table_engine, catalog_manager, engine_manager) = + prepare_components(node_id).await; let catalog_name = "test_catalog".to_string(); let schema_name = "nonexistent_schema".to_string(); let catalog = Arc::new(RemoteCatalogProvider::new( catalog_name.clone(), backend.clone(), + engine_manager.clone(), node_id, )); // register catalog to catalog manager - catalog_manager - .register_catalog(catalog_name.clone(), catalog) + CatalogManager::register_catalog(&*catalog_manager, catalog_name.clone(), catalog) + .await .unwrap(); assert_eq!( HashSet::::from_iter( vec![DEFAULT_CATALOG_NAME.to_string(), catalog_name.clone()].into_iter() ), - HashSet::from_iter(catalog_manager.catalog_names().unwrap().into_iter()) + HashSet::from_iter(catalog_manager.catalog_names().await.unwrap().into_iter()) ); let table_to_register = table_engine @@ -273,24 +292,32 @@ mod tests { let new_catalog = catalog_manager .catalog(&catalog_name) + .await .unwrap() .expect("catalog should exist since it's already registered"); let schema = Arc::new(RemoteSchemaProvider::new( catalog_name.clone(), schema_name.clone(), node_id, + engine_manager, backend.clone(), )); let prev = new_catalog .register_schema(schema_name.clone(), schema.clone()) + .await .expect("Register schema should not fail"); assert!(prev.is_none()); assert!(catalog_manager.register_table(reg_req).await.unwrap()); assert_eq!( HashSet::from([schema_name.clone()]), - new_catalog.schema_names().unwrap().into_iter().collect() + new_catalog + .schema_names() + .await + .unwrap() + .into_iter() + .collect() ) } } diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 9c5edc09bd..79aa744dcd 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -35,7 +35,8 @@ partition = { path = "../partition" } query = { path = "../query" } rustyline = "10.1" serde.workspace = true -servers = { path = "../servers" } +servers = { path = "../servers", features = ["dashboard"] } + session = { path = "../session" } snafu.workspace = true substrait = { path = "../common/substrait" } diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index c07472666d..bb8edf45ed 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -16,18 +16,18 @@ use std::sync::Arc; use async_recursion::async_recursion; use async_trait::async_trait; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, Bytes}; use catalog::table_source::DfTableSourceProvider; use catalog::CatalogManagerRef; use common_catalog::format_full_table_name; use common_telemetry::debug; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::catalog::catalog::CatalogList; use datafusion::common::{DFField, DFSchema}; use datafusion::datasource::DefaultTableSource; use datafusion::physical_plan::project_schema; use datafusion::sql::TableReference; use datafusion_expr::{Filter, LogicalPlan, TableScan}; -use prost::Message; use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; use substrait_proto::proto::expression::mask_expression::{StructItem, StructSelect}; @@ -42,9 +42,9 @@ use table::table::adapter::DfTableProviderAdapter; use crate::context::ConvertorContext; use crate::df_expr::{expression_from_df_expr, to_df_expr}; use crate::error::{ - self, DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error, - InvalidParametersSnafu, MissingFieldSnafu, ResolveTableSnafu, SchemaNotMatchSnafu, - UnknownPlanSnafu, UnsupportedExprSnafu, UnsupportedPlanSnafu, + self, DFInternalSnafu, EmptyPlanSnafu, Error, InvalidParametersSnafu, MissingFieldSnafu, + ResolveTableSnafu, SchemaNotMatchSnafu, UnknownPlanSnafu, UnsupportedExprSnafu, + UnsupportedPlanSnafu, }; use crate::schema::{from_schema, to_schema}; use crate::SubstraitPlan; @@ -59,20 +59,14 @@ impl SubstraitPlan for DFLogicalSubstraitConvertorDeprecated { async fn decode( &self, - message: B, - catalog_manager: CatalogManagerRef, + _message: B, + _catalog_list: Arc, ) -> Result { - let plan = Plan::decode(message).context(DecodeRelSnafu)?; - self.convert_plan(plan, catalog_manager).await + unimplemented!() } fn encode(&self, plan: Self::Plan) -> Result { - let plan = self.convert_df_plan(plan)?; - - let mut buf = BytesMut::new(); - plan.encode(&mut buf).context(EncodeRelSnafu)?; - - Ok(buf.freeze()) + unimplemented!() } } @@ -538,112 +532,3 @@ fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> b && x.is_nullable() == y.is_nullable() }) } - -#[cfg(test)] -mod test { - use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; - use catalog::{CatalogList, CatalogProvider, RegisterTableRequest}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; - use datafusion::common::{DFSchema, ToDFSchema}; - use datafusion_expr::TableSource; - use datatypes::schema::RawSchema; - use table::engine::manager::MemoryTableEngineManager; - use table::requests::CreateTableRequest; - use table::test_util::{EmptyTable, MockTableEngine}; - - use super::*; - use crate::schema::test::supported_types; - - const DEFAULT_TABLE_NAME: &str = "SubstraitTable"; - - async fn build_mock_catalog_manager() -> CatalogManagerRef { - let mock_table_engine = Arc::new(MockTableEngine::new()); - let engine_manager = Arc::new(MemoryTableEngineManager::alias( - MITO_ENGINE.to_string(), - mock_table_engine.clone(), - )); - let catalog_manager = Arc::new(LocalCatalogManager::try_new(engine_manager).await.unwrap()); - let schema_provider = Arc::new(MemorySchemaProvider::new()); - let catalog_provider = Arc::new(MemoryCatalogProvider::new()); - catalog_provider - .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) - .unwrap(); - catalog_manager - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) - .unwrap(); - - catalog_manager.init().await.unwrap(); - catalog_manager - } - - fn build_create_table_request(table_name: N) -> CreateTableRequest { - CreateTableRequest { - id: 1, - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - desc: None, - schema: RawSchema::new(supported_types()), - region_numbers: vec![0], - primary_key_indices: vec![], - create_if_not_exists: true, - table_options: Default::default(), - engine: MITO_ENGINE.to_string(), - } - } - - async fn logical_plan_round_trip(plan: LogicalPlan, catalog: CatalogManagerRef) { - let convertor = DFLogicalSubstraitConvertorDeprecated; - - let proto = convertor.encode(plan.clone()).unwrap(); - let tripped_plan = convertor.decode(proto, catalog).await.unwrap(); - - assert_eq!(format!("{plan:?}"), format!("{tripped_plan:?}")); - } - - #[tokio::test] - async fn test_table_scan() { - let catalog_manager = build_mock_catalog_manager().await; - let table_ref = Arc::new(EmptyTable::new(build_create_table_request( - DEFAULT_TABLE_NAME, - ))); - catalog_manager - .register_table(RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: DEFAULT_TABLE_NAME.to_string(), - table_id: 1, - table: table_ref.clone(), - }) - .await - .unwrap(); - let adapter = Arc::new(DefaultTableSource::new(Arc::new( - DfTableProviderAdapter::new(table_ref), - ))); - - let projection = vec![1, 3, 5]; - let df_schema = adapter.schema().to_dfschema().unwrap(); - let projected_fields = projection - .iter() - .map(|index| df_schema.field(*index).clone()) - .collect(); - let projected_schema = - Arc::new(DFSchema::new_with_metadata(projected_fields, Default::default()).unwrap()); - - let table_name = TableReference::full( - DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, - DEFAULT_TABLE_NAME, - ); - let table_scan_plan = LogicalPlan::TableScan(TableScan { - table_name, - source: adapter, - projection: Some(projection), - projected_schema, - filters: vec![], - fetch: None, - }); - - logical_plan_round_trip(table_scan_plan, catalog_manager).await; - } -} diff --git a/src/common/substrait/src/df_substrait.rs b/src/common/substrait/src/df_substrait.rs index 808bb8def0..ceb2760acf 100644 --- a/src/common/substrait/src/df_substrait.rs +++ b/src/common/substrait/src/df_substrait.rs @@ -16,13 +16,12 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::{Buf, Bytes, BytesMut}; -use catalog::CatalogManagerRef; +use datafusion::catalog::catalog::CatalogList; use datafusion::prelude::SessionContext; use datafusion_expr::LogicalPlan; use datafusion_substrait::logical_plan::consumer::from_substrait_plan; use datafusion_substrait::logical_plan::producer::to_substrait_plan; use prost::Message; -use query::datafusion::DfCatalogListAdapter; use snafu::ResultExt; use substrait_proto::proto::Plan; @@ -40,13 +39,11 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor { async fn decode( &self, message: B, - catalog_manager: CatalogManagerRef, + catalog_list: Arc, ) -> Result { let mut context = SessionContext::new(); - let df_catalog_list = Arc::new(DfCatalogListAdapter::new(catalog_manager)); - context.register_catalog_list(df_catalog_list); - let plan = Plan::decode(message).context(DecodeRelSnafu)?; + context.register_catalog_list(catalog_list); let df_plan = from_substrait_plan(&mut context, &plan) .await .context(DecodeDfPlanSnafu)?; diff --git a/src/common/substrait/src/lib.rs b/src/common/substrait/src/lib.rs index 9fd3926e20..420ef6e39d 100644 --- a/src/common/substrait/src/lib.rs +++ b/src/common/substrait/src/lib.rs @@ -17,15 +17,18 @@ mod context; mod df_expr; +#[allow(unused)] mod df_logical; mod df_substrait; pub mod error; mod schema; mod types; +use std::sync::Arc; + use async_trait::async_trait; use bytes::{Buf, Bytes}; -use catalog::CatalogManagerRef; +use datafusion::catalog::catalog::CatalogList; pub use crate::df_substrait::DFLogicalSubstraitConvertor; @@ -38,7 +41,7 @@ pub trait SubstraitPlan { async fn decode( &self, message: B, - catalog_manager: CatalogManagerRef, + catalog_list: Arc, ) -> Result; fn encode(&self, plan: Self::Plan) -> Result; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 78ff946445..59c37e2752 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -275,10 +275,12 @@ impl Instance { let schema_list = self .catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .await .map_err(BoxedError::new) .context(ShutdownInstanceSnafu)? .expect("Default schema not found") .schema_names() + .await .map_err(BoxedError::new) .context(ShutdownInstanceSnafu)?; let flush_requests = schema_list diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index d3d452748d..036daf2403 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -12,12 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; +use std::sync::Arc; + use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request as GrpcRequest; use api::v1::query_request::Query; use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequest}; use async_trait::async_trait; +use catalog::CatalogManagerRef; use common_query::Output; +use datafusion::catalog::catalog::{ + CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider, +}; +use datafusion::catalog::schema::SchemaProvider; +use datafusion::datasource::TableProvider; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; use query::query_engine::SqlStatementExecutor; @@ -28,11 +37,12 @@ use sql::statements::statement::Statement; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::engine::TableReference; use table::requests::CreateDatabaseRequest; +use table::table::adapter::DfTableProviderAdapter; use crate::error::{ - self, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, DeleteSnafu, - ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, PlanStatementSnafu, Result, - TableNotFoundSnafu, + self, CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, + DeleteSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, PlanStatementSnafu, Result, + SchemaNotFoundSnafu, TableNotFoundSnafu, }; use crate::instance::Instance; @@ -49,9 +59,20 @@ impl Instance { self.sql_handler.create_database(req, query_ctx).await } - pub(crate) async fn execute_logical(&self, plan_bytes: Vec) -> Result { + pub(crate) async fn execute_logical( + &self, + plan_bytes: Vec, + ctx: QueryContextRef, + ) -> Result { + let catalog_list = new_dummy_catalog_list( + &ctx.current_catalog(), + &ctx.current_schema(), + self.catalog_manager.clone(), + ) + .await?; + let logical_plan = DFLogicalSubstraitConvertor - .decode(plan_bytes.as_slice(), self.catalog_manager.clone()) + .decode(plan_bytes.as_slice(), Arc::new(catalog_list) as Arc<_>) .await .context(DecodeLogicalPlanSnafu)?; @@ -85,7 +106,7 @@ impl Instance { } } } - Query::LogicalPlan(plan) => self.execute_logical(plan).await, + Query::LogicalPlan(plan) => self.execute_logical(plan, ctx).await, Query::PromRangeQuery(promql) => { let prom_query = PromQuery { query: promql.query, @@ -185,6 +206,89 @@ impl GrpcQueryHandler for Instance { } } +struct DummySchemaProvider { + catalog: String, + schema: String, + table_names: Vec, + catalog_manager: CatalogManagerRef, +} + +impl DummySchemaProvider { + pub async fn try_new( + catalog_name: String, + schema_name: String, + catalog_manager: CatalogManagerRef, + ) -> Result { + let catalog = catalog_manager + .catalog(&catalog_name) + .await + .context(CatalogSnafu)? + .context(CatalogNotFoundSnafu { + name: &catalog_name, + })?; + let schema = catalog + .schema(&schema_name) + .await + .context(CatalogSnafu)? + .context(SchemaNotFoundSnafu { name: &schema_name })?; + let table_names = schema.table_names().await.context(CatalogSnafu)?; + Ok(Self { + catalog: catalog_name, + schema: schema_name, + table_names, + catalog_manager, + }) + } +} + +#[async_trait::async_trait] +impl SchemaProvider for DummySchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.table_names.clone() + } + + async fn table(&self, name: &str) -> Option> { + self.catalog_manager + .table(&self.catalog, &self.schema, name) + .await + .context(CatalogSnafu) + .ok() + .flatten() + .map(|t| Arc::new(DfTableProviderAdapter::new(t)) as Arc<_>) + } + + fn table_exist(&self, name: &str) -> bool { + self.table_names.iter().any(|t| t == name) + } +} + +async fn new_dummy_catalog_list( + catalog_name: &str, + schema_name: &str, + catalog_manager: CatalogManagerRef, +) -> Result { + let schema_provider = DummySchemaProvider::try_new( + catalog_name.to_string(), + schema_name.to_string(), + catalog_manager, + ) + .await?; + let catalog_provider = MemoryCatalogProvider::new(); + catalog_provider + .register_schema(schema_name, Arc::new(schema_provider) as Arc<_>) + .unwrap(); + let catalog_list = MemoryCatalogList::new(); + catalog_list.register_catalog( + catalog_name.to_string(), + Arc::new(catalog_provider) as Arc<_>, + ); + Ok(catalog_list) +} + #[cfg(test)] mod test { use api::v1::column::{SemanticType, Values}; diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index ef54a183ab..104a60bb5c 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -49,6 +49,7 @@ impl SqlHandler { if self .catalog_manager .schema(&catalog, &schema) + .await .context(CatalogSnafu)? .is_some() { diff --git a/src/datanode/src/sql/flush_table.rs b/src/datanode/src/sql/flush_table.rs index 9d35138815..71b7d632de 100644 --- a/src/datanode/src/sql/flush_table.rs +++ b/src/datanode/src/sql/flush_table.rs @@ -25,6 +25,7 @@ impl SqlHandler { let schema = self .catalog_manager .schema(&req.catalog_name, &req.schema_name) + .await .context(CatalogSnafu)? .context(DatabaseNotFoundSnafu { catalog: &req.catalog_name, @@ -35,7 +36,7 @@ impl SqlHandler { self.flush_table_inner(schema, table, req.region_number, req.wait) .await?; } else { - let all_table_names = schema.table_names().context(CatalogSnafu)?; + let all_table_names = schema.table_names().await.context(CatalogSnafu)?; futures::future::join_all(all_table_names.iter().map(|table| { self.flush_table_inner(schema.clone(), table, req.region_number, req.wait) })) diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index b447f1336e..b9dfbb4505 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -126,10 +126,12 @@ pub(crate) async fn create_test_table( let schema_provider = instance .catalog_manager .schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .await .unwrap() .unwrap(); schema_provider .register_table(table_name.to_string(), table) + .await .unwrap(); Ok(()) } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index a7c71a1af7..ab31162a5c 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -28,7 +28,7 @@ use catalog::helper::{ }; use catalog::remote::{Kv, KvBackendRef}; use catalog::{ - CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, + CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef, }; @@ -36,6 +36,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::BoxedError; use common_telemetry::warn; use futures::StreamExt; +use futures_util::TryStreamExt; use meta_client::rpc::TableName; use partition::manager::PartitionRuleManagerRef; use snafu::prelude::*; @@ -99,6 +100,14 @@ impl CatalogManager for FrontendCatalogManager { Ok(()) } + async fn register_catalog( + &self, + _name: String, + _catalog: CatalogProviderRef, + ) -> CatalogResult> { + unimplemented!("Frontend catalog list does not support register catalog") + } + // TODO(LFC): Handle the table caching in (de)register_table. async fn register_table(&self, _request: RegisterTableRequest) -> CatalogResult { Ok(true) @@ -216,16 +225,49 @@ impl CatalogManager for FrontendCatalogManager { } } - fn schema( + async fn catalog_names(&self) -> CatalogResult> { + let key = build_catalog_prefix(); + let mut iter = self.backend.range(key.as_bytes()); + let mut res = HashSet::new(); + while let Some(r) = iter.next().await { + let Kv(k, _) = r?; + let catalog_key = String::from_utf8_lossy(&k); + if let Ok(key) = CatalogKey::parse(catalog_key.as_ref()) { + res.insert(key.catalog_name); + } else { + warn!("invalid catalog key: {:?}", catalog_key); + } + } + Ok(res.into_iter().collect()) + } + + async fn catalog(&self, catalog: &str) -> CatalogResult> { + let key = CatalogKey { + catalog_name: catalog.to_string(), + } + .to_string(); + Ok(self.backend.get(key.as_bytes()).await?.map(|_| { + Arc::new(FrontendCatalogProvider { + catalog_name: catalog.to_string(), + backend: self.backend.clone(), + partition_manager: self.partition_manager.clone(), + datanode_clients: self.datanode_clients.clone(), + }) as Arc<_> + })) + } + + async fn schema( &self, catalog: &str, schema: &str, ) -> catalog::error::Result> { - self.catalog(catalog)? + self.catalog(catalog) + .await? .context(catalog::error::CatalogNotFoundSnafu { catalog_name: catalog, })? .schema(schema) + .await } async fn table( @@ -234,65 +276,16 @@ impl CatalogManager for FrontendCatalogManager { schema: &str, table_name: &str, ) -> catalog::error::Result> { - self.schema(catalog, schema)? + self.schema(catalog, schema) + .await? .context(catalog::error::SchemaNotFoundSnafu { catalog, schema })? .table(table_name) .await } -} -impl CatalogList for FrontendCatalogManager { fn as_any(&self) -> &dyn Any { self } - - fn register_catalog( - &self, - _name: String, - _catalog: CatalogProviderRef, - ) -> catalog::error::Result> { - unimplemented!("Frontend catalog list does not support register catalog") - } - - fn catalog_names(&self) -> catalog::error::Result> { - let backend = self.backend.clone(); - let res = std::thread::spawn(|| { - common_runtime::block_on_read(async move { - let key = build_catalog_prefix(); - let mut iter = backend.range(key.as_bytes()); - let mut res = HashSet::new(); - - while let Some(r) = iter.next().await { - let Kv(k, _) = r?; - - let catalog_key = String::from_utf8_lossy(&k); - if let Ok(key) = CatalogKey::parse(catalog_key.as_ref()) { - res.insert(key.catalog_name); - } else { - warn!("invalid catalog key: {:?}", catalog_key); - } - } - Ok(res.into_iter().collect()) - }) - }) - .join() - .unwrap(); - res - } - - fn catalog(&self, name: &str) -> catalog::error::Result> { - let all_catalogs = self.catalog_names()?; - if all_catalogs.contains(&name.to_string()) { - Ok(Some(Arc::new(FrontendCatalogProvider { - catalog_name: name.to_string(), - backend: self.backend.clone(), - partition_manager: self.partition_manager.clone(), - datanode_clients: self.datanode_clients.clone(), - }))) - } else { - Ok(None) - } - } } pub struct FrontendCatalogProvider { @@ -302,35 +295,26 @@ pub struct FrontendCatalogProvider { datanode_clients: Arc, } +#[async_trait::async_trait] impl CatalogProvider for FrontendCatalogProvider { fn as_any(&self) -> &dyn Any { self } - fn schema_names(&self) -> catalog::error::Result> { - let backend = self.backend.clone(); - let catalog_name = self.catalog_name.clone(); - let res = std::thread::spawn(|| { - common_runtime::block_on_read(async move { - let key = build_schema_prefix(&catalog_name); - let mut iter = backend.range(key.as_bytes()); - let mut res = HashSet::new(); - - while let Some(r) = iter.next().await { - let Kv(k, _) = r?; - let key = SchemaKey::parse(String::from_utf8_lossy(&k)) - .context(InvalidCatalogValueSnafu)?; - res.insert(key.schema_name); - } - Ok(res.into_iter().collect()) - }) - }) - .join() - .unwrap(); - res + async fn schema_names(&self) -> catalog::error::Result> { + let key = build_schema_prefix(&self.catalog_name); + let mut iter = self.backend.range(key.as_bytes()); + let mut res = HashSet::new(); + while let Some(r) = iter.next().await { + let Kv(k, _) = r?; + let key = + SchemaKey::parse(String::from_utf8_lossy(&k)).context(InvalidCatalogValueSnafu)?; + res.insert(key.schema_name); + } + Ok(res.into_iter().collect()) } - fn register_schema( + async fn register_schema( &self, _name: String, _schema: SchemaProviderRef, @@ -338,8 +322,8 @@ impl CatalogProvider for FrontendCatalogProvider { unimplemented!("Frontend catalog provider does not support register schema") } - fn schema(&self, name: &str) -> catalog::error::Result> { - let all_schemas = self.schema_names()?; + async fn schema(&self, name: &str) -> catalog::error::Result> { + let all_schemas = self.schema_names().await?; if all_schemas.contains(&name.to_string()) { Ok(Some(Arc::new(FrontendSchemaProvider { catalog_name: self.catalog_name.clone(), @@ -368,32 +352,24 @@ impl SchemaProvider for FrontendSchemaProvider { self } - fn table_names(&self) -> catalog::error::Result> { - let backend = self.backend.clone(); - let catalog_name = self.catalog_name.clone(); - let schema_name = self.schema_name.clone(); - - std::thread::spawn(|| { - common_runtime::block_on_read(async move { - let mut tables = vec![]; - if catalog_name == DEFAULT_CATALOG_NAME && schema_name == DEFAULT_SCHEMA_NAME { - tables.push("numbers".to_string()); - } - - let key = build_table_global_prefix(catalog_name, schema_name); - let mut iter = backend.range(key.as_bytes()); - - while let Some(r) = iter.next().await { - let Kv(k, _) = r?; - let key = TableGlobalKey::parse(String::from_utf8_lossy(&k)) - .context(InvalidCatalogValueSnafu)?; - tables.push(key.table_name); - } - Ok(tables) + async fn table_names(&self) -> catalog::error::Result> { + let mut tables = vec![]; + if self.catalog_name == DEFAULT_CATALOG_NAME && self.schema_name == DEFAULT_SCHEMA_NAME { + tables.push("numbers".to_string()); + } + let key = build_table_global_prefix(&self.catalog_name, &self.schema_name); + let iter = self.backend.range(key.as_bytes()); + let result = iter + .map(|r| { + let Kv(k, _) = r?; + let key = TableGlobalKey::parse(String::from_utf8_lossy(&k)) + .context(InvalidCatalogValueSnafu)?; + Ok(key.table_name) }) - }) - .join() - .unwrap() + .try_collect::>() + .await?; + tables.extend(result); + Ok(tables) } async fn table(&self, name: &str) -> catalog::error::Result> { @@ -426,8 +402,8 @@ impl SchemaProvider for FrontendSchemaProvider { Ok(Some(table)) } - fn table_exist(&self, name: &str) -> catalog::error::Result { - Ok(self.table_names()?.contains(&name.to_string())) + async fn table_exist(&self, name: &str) -> catalog::error::Result { + Ok(self.table_names().await?.contains(&name.to_string())) } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 27b7363eac..2f7cd8092e 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -527,9 +527,10 @@ impl SqlQueryHandler for Instance { } } - fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { + async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { self.catalog_manager .schema(catalog, schema) + .await .map(|s| s.is_some()) .context(error::CatalogSnafu) } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 0588b1dbbb..a314d35747 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -421,6 +421,7 @@ impl DistInstance { if self .catalog_manager .schema(&catalog, &expr.database_name) + .await .context(CatalogSnafu)? .is_some() { diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index 6af1756bba..28f7f177b6 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -86,11 +86,11 @@ impl StatementExecutor { Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await, - Statement::Use(db) => self.handle_use(db, query_ctx), + Statement::Use(db) => self.handle_use(db, query_ctx).await, - Statement::ShowDatabases(stmt) => self.show_databases(stmt), + Statement::ShowDatabases(stmt) => self.show_databases(stmt).await, - Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx), + Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await, Statement::Copy(stmt) => { let req = to_copy_table_request(stmt, query_ctx)?; @@ -126,11 +126,12 @@ impl StatementExecutor { .context(ExecLogicalPlanSnafu) } - fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result { + async fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result { let catalog = &query_ctx.current_catalog(); ensure!( self.catalog_manager .schema(catalog, &db) + .await .context(CatalogSnafu)? .is_some(), SchemaNotFoundSnafu { schema_info: &db } diff --git a/src/frontend/src/statement/show.rs b/src/frontend/src/statement/show.rs index bfe4c9db0f..0c00a0cdae 100644 --- a/src/frontend/src/statement/show.rs +++ b/src/frontend/src/statement/show.rs @@ -21,17 +21,19 @@ use crate::error::{ExecuteStatementSnafu, Result}; use crate::statement::StatementExecutor; impl StatementExecutor { - pub(super) fn show_databases(&self, stmt: ShowDatabases) -> Result { + pub(super) async fn show_databases(&self, stmt: ShowDatabases) -> Result { query::sql::show_databases(stmt, self.catalog_manager.clone()) + .await .context(ExecuteStatementSnafu) } - pub(super) fn show_tables( + pub(super) async fn show_tables( &self, stmt: ShowTables, query_ctx: QueryContextRef, ) -> Result { query::sql::show_tables(stmt, self.catalog_manager.clone(), query_ctx) + .await .context(ExecuteStatementSnafu) } } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index a10b3a1d01..293714152b 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -42,7 +42,7 @@ use snafu::prelude::*; use store_api::storage::RegionNumber; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; -use table::requests::{AlterTableRequest, DeleteRequest, InsertRequest}; +use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest}; use table::table::AlterContext; use table::{meter_insert_request, Table}; use tokio::sync::RwLock; @@ -261,6 +261,13 @@ impl DistTable { .context(error::CatalogSnafu) } + async fn delete_table_global_value(&self, key: TableGlobalKey) -> Result<()> { + self.backend + .delete(key.to_string().as_bytes()) + .await + .context(error::CatalogSnafu) + } + async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> { let alter_expr = context .get::() @@ -297,7 +304,17 @@ impl DistTable { value.table_info = new_info.into(); - self.set_table_global_value(key, value).await + if let AlterKind::RenameTable { new_table_name } = &request.alter_kind { + let new_key = TableGlobalKey { + catalog_name: alter_expr.catalog_name.clone(), + schema_name: alter_expr.schema_name.clone(), + table_name: new_table_name.clone(), + }; + self.set_table_global_value(new_key, value).await?; + self.delete_table_global_value(key).await + } else { + self.set_table_global_value(key, value).await + } } /// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 2fc040a41a..63be8cca2c 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -22,7 +22,6 @@ use std::time::Duration; use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; use catalog::remote::{MetaKvBackend, RemoteCatalogManager}; -use catalog::CatalogProvider; use client::Client; use common_grpc::channel_manager::ChannelManager; use common_runtime::Builder as RuntimeBuilder; @@ -94,7 +93,7 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon // create another catalog and schema for testing let another_catalog = Arc::new(MemoryCatalogProvider::new()); let _ = another_catalog - .register_schema( + .register_schema_sync( "another_schema".to_string(), Arc::new(MemorySchemaProvider::new()), ) @@ -102,6 +101,7 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon let _ = dn_instance .catalog_manager() .register_catalog("another_catalog".to_string(), another_catalog) + .await .unwrap(); dn_instance.start().await.unwrap(); diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs index 486edfa11f..a297203d04 100644 --- a/src/meta-client/src/client/router.rs +++ b/src/meta-client/src/client/router.rs @@ -18,12 +18,13 @@ use std::sync::Arc; use api::v1::meta::router_client::RouterClient; use api::v1::meta::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse}; use common_grpc::channel_manager::ChannelManager; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, Location, OptionExt, ResultExt}; use tokio::sync::RwLock; use tonic::transport::Channel; use crate::client::{load_balance as lb, Id}; use crate::error; +use crate::error::Error::TonicStatus; use crate::error::Result; #[derive(Clone, Debug)] @@ -122,8 +123,15 @@ impl Inner { async fn delete(&self, mut req: DeleteRequest) -> Result { let mut client = self.random_client()?; req.set_header(self.id); - let res = client.delete(req).await.context(error::TonicStatusSnafu)?; - + let res = client.delete(req).await.map_err(|mut source| { + // FIXME(hl): here intentionally clear the metadata field so that error date does not changes which will break sqlness test. + // we can remove this hack as soon as either: sqlness supports regex result match or greptimedb supports renaming table routes + source.metadata_mut().clear(); + TonicStatus { + source, + location: Location::default(), + } + })?; Ok(res.into_inner()) } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index f17cef3634..8dfbf05faf 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -21,7 +21,6 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; -pub use catalog::datafusion::catalog_adapter::DfCatalogListAdapter; use common_error::prelude::BoxedError; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::udf::create_udf; @@ -183,19 +182,20 @@ impl DatafusionQueryEngine { let catalog = self .state - .catalog_list() + .catalog_manager() .catalog(catalog_name) + .await .context(CatalogSnafu)? .context(CatalogNotFoundSnafu { catalog: catalog_name, })?; - let schema = - catalog - .schema(schema_name) - .context(CatalogSnafu)? - .context(SchemaNotFoundSnafu { - schema: schema_name, - })?; + let schema = catalog + .schema(schema_name) + .await + .context(CatalogSnafu)? + .context(SchemaNotFoundSnafu { + schema: schema_name, + })?; let table = schema .table(table_name) .await @@ -377,7 +377,7 @@ mod tests { use std::sync::Arc; use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; - use catalog::{CatalogList, CatalogProvider, SchemaProvider}; + use catalog::{CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use common_recordbatch::util; @@ -390,19 +390,21 @@ mod tests { use crate::parser::QueryLanguageParser; use crate::query_engine::{QueryEngineFactory, QueryEngineRef}; - fn create_test_engine() -> QueryEngineRef { + async fn create_test_engine() -> QueryEngineRef { let catalog_list = catalog::local::new_memory_catalog_list().unwrap(); let default_schema = Arc::new(MemorySchemaProvider::new()); default_schema .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) + .await .unwrap(); let default_catalog = Arc::new(MemoryCatalogProvider::new()); default_catalog .register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema) + .await .unwrap(); catalog_list - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog) + .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog) .unwrap(); QueryEngineFactory::new(catalog_list).query_engine() @@ -410,7 +412,7 @@ mod tests { #[tokio::test] async fn test_sql_to_plan() { - let engine = create_test_engine(); + let engine = create_test_engine().await; let sql = "select sum(number) from numbers limit 20"; let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); @@ -432,7 +434,7 @@ mod tests { #[tokio::test] async fn test_execute() { - let engine = create_test_engine(); + let engine = create_test_engine().await; let sql = "select sum(number) from numbers limit 20"; let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); @@ -470,7 +472,7 @@ mod tests { #[tokio::test] async fn test_describe() { - let engine = create_test_engine(); + let engine = create_test_engine().await; let sql = "select sum(number) from numbers limit 20"; let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 9b6fc46756..ee33aed2e7 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -55,7 +55,7 @@ impl DfContextProviderAdapter { .context(DataFusionSnafu)?; let mut table_provider = DfTableSourceProvider::new( - engine_state.catalog_list().clone(), + engine_state.catalog_manager().clone(), engine_state.disallow_cross_schema_query(), query_ctx.as_ref(), ); diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 441a7084c9..92131be37c 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -82,7 +82,7 @@ impl DfLogicalPlanner { async fn plan_pql(&self, stmt: EvalStmt, query_ctx: QueryContextRef) -> Result { let table_provider = DfTableSourceProvider::new( - self.engine_state.catalog_list().clone(), + self.engine_state.catalog_manager().clone(), self.engine_state.disallow_cross_schema_query(), query_ctx.as_ref(), ); diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 90c711f48c..0843ec5321 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -19,7 +19,7 @@ mod state; use std::sync::Arc; use async_trait::async_trait; -use catalog::CatalogListRef; +use catalog::CatalogManagerRef; use common_base::Plugins; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY}; @@ -65,12 +65,12 @@ pub struct QueryEngineFactory { } impl QueryEngineFactory { - pub fn new(catalog_list: CatalogListRef) -> Self { - Self::new_with_plugins(catalog_list, Default::default()) + pub fn new(catalog_manager: CatalogManagerRef) -> Self { + Self::new_with_plugins(catalog_manager, Default::default()) } - pub fn new_with_plugins(catalog_list: CatalogListRef, plugins: Arc) -> Self { - let state = Arc::new(QueryEngineState::new(catalog_list, plugins)); + pub fn new_with_plugins(catalog_manager: CatalogManagerRef, plugins: Arc) -> Self { + let state = Arc::new(QueryEngineState::new(catalog_manager, plugins)); let query_engine = Arc::new(DatafusionQueryEngine::new(state)); register_functions(&query_engine); Self { query_engine } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index b15a18aba4..617620a924 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -17,11 +17,12 @@ use std::fmt; use std::sync::{Arc, RwLock}; use async_trait::async_trait; -use catalog::CatalogListRef; +use catalog::CatalogManagerRef; use common_base::Plugins; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::physical_plan::SessionContext; use common_query::prelude::ScalarUdf; +use datafusion::catalog::catalog::MemoryCatalogList; use datafusion::error::Result as DfResult; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; use datafusion::execution::runtime_env::RuntimeEnv; @@ -31,7 +32,6 @@ use datafusion_expr::LogicalPlan as DfLogicalPlan; use datafusion_optimizer::analyzer::Analyzer; use promql::extension_plan::PromExtensionPlanner; -use crate::datafusion::DfCatalogListAdapter; use crate::optimizer::TypeConversionRule; use crate::query_engine::options::QueryOptions; @@ -42,7 +42,7 @@ use crate::query_engine::options::QueryOptions; #[derive(Clone)] pub struct QueryEngineState { df_context: SessionContext, - catalog_list: CatalogListRef, + catalog_manager: CatalogManagerRef, aggregate_functions: Arc>>, plugins: Arc, } @@ -55,7 +55,7 @@ impl fmt::Debug for QueryEngineState { } impl QueryEngineState { - pub fn new(catalog_list: CatalogListRef, plugins: Arc) -> Self { + pub fn new(catalog_list: CatalogManagerRef, plugins: Arc) -> Self { let runtime_env = Arc::new(RuntimeEnv::default()); let session_config = SessionConfig::new().with_create_default_catalog_and_schema(false); // Apply the type conversion rule first. @@ -65,7 +65,7 @@ impl QueryEngineState { let session_state = SessionState::with_config_rt_and_catalog_list( session_config, runtime_env, - Arc::new(DfCatalogListAdapter::new(catalog_list.clone())), + Arc::new(MemoryCatalogList::default()), // pass a dummy catalog list ) .with_analyzer_rules(analyzer.rules) .with_query_planner(Arc::new(DfQueryPlanner::new())); @@ -74,7 +74,7 @@ impl QueryEngineState { Self { df_context, - catalog_list, + catalog_manager: catalog_list, aggregate_functions: Arc::new(RwLock::new(HashMap::new())), plugins, } @@ -104,8 +104,8 @@ impl QueryEngineState { } #[inline] - pub fn catalog_list(&self) -> &CatalogListRef { - &self.catalog_list + pub fn catalog_manager(&self) -> &CatalogManagerRef { + &self.catalog_manager } pub(crate) fn disallow_cross_schema_query(&self) -> bool { diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 7516baab45..858ccc915d 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -99,7 +99,10 @@ static SHOW_CREATE_TABLE_OUTPUT_SCHEMA: Lazy> = Lazy::new(|| { ])) }); -pub fn show_databases(stmt: ShowDatabases, catalog_manager: CatalogManagerRef) -> Result { +pub async fn show_databases( + stmt: ShowDatabases, + catalog_manager: CatalogManagerRef, +) -> Result { // TODO(LFC): supports WHERE ensure!( matches!(stmt.kind, ShowKind::All | ShowKind::Like(_)), @@ -110,11 +113,12 @@ pub fn show_databases(stmt: ShowDatabases, catalog_manager: CatalogManagerRef) - let catalog = catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .await .context(error::CatalogSnafu)? .context(error::CatalogNotFoundSnafu { catalog: DEFAULT_CATALOG_NAME, })?; - let mut databases = catalog.schema_names().context(error::CatalogSnafu)?; + let mut databases = catalog.schema_names().await.context(error::CatalogSnafu)?; // TODO(dennis): Specify the order of the results in catalog manager API databases.sort(); @@ -134,7 +138,7 @@ pub fn show_databases(stmt: ShowDatabases, catalog_manager: CatalogManagerRef) - Ok(Output::RecordBatches(records)) } -pub fn show_tables( +pub async fn show_tables( stmt: ShowTables, catalog_manager: CatalogManagerRef, query_ctx: QueryContextRef, @@ -155,9 +159,10 @@ pub fn show_tables( // TODO(sunng87): move this function into query_ctx let schema = catalog_manager .schema(&query_ctx.current_catalog(), &schema) + .await .context(error::CatalogSnafu)? .context(error::SchemaNotFoundSnafu { schema })?; - let mut tables = schema.table_names().context(error::CatalogSnafu)?; + let mut tables = schema.table_names().await.context(error::CatalogSnafu)?; // TODO(dennis): Specify the order of the results in schema provider API tables.sort(); diff --git a/src/query/src/tests/function.rs b/src/query/src/tests/function.rs index a301bc11ad..91759ab150 100644 --- a/src/query/src/tests/function.rs +++ b/src/query/src/tests/function.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_recordbatch::RecordBatch; use datatypes::for_all_primitive_types; @@ -57,14 +56,14 @@ pub fn create_query_engine() -> Arc { let recordbatch = RecordBatch::new(schema, columns).unwrap(); let number_table = Arc::new(MemTable::new("numbers", recordbatch)); schema_provider - .register_table(number_table.table_name().to_string(), number_table) + .register_table_sync(number_table.table_name().to_string(), number_table) .unwrap(); catalog_provider - .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) .unwrap(); catalog_list - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) .unwrap(); QueryEngineFactory::new(catalog_list).query_engine() diff --git a/src/query/src/tests/my_sum_udaf_example.rs b/src/query/src/tests/my_sum_udaf_example.rs index 27e4981cc0..22104c5572 100644 --- a/src/query/src/tests/my_sum_udaf_example.rs +++ b/src/query/src/tests/my_sum_udaf_example.rs @@ -17,7 +17,6 @@ use std::marker::PhantomData; use std::sync::Arc; use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_function::scalars::aggregate::AggregateFunctionMeta; use common_function_macro::{as_aggr_func_creator, AggrFuncTypeStore}; @@ -234,12 +233,14 @@ fn new_query_engine_factory(table: MemTable) -> QueryEngineFactory { let catalog_provider = Arc::new(MemoryCatalogProvider::new()); let catalog_list = Arc::new(MemoryCatalogManager::default()); - schema_provider.register_table(table_name, table).unwrap(); + schema_provider + .register_table_sync(table_name, table) + .unwrap(); catalog_provider - .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) .unwrap(); catalog_list - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) .unwrap(); QueryEngineFactory::new(catalog_list) diff --git a/src/query/src/tests/percentile_test.rs b/src/query/src/tests/percentile_test.rs index eefb825d75..10fa1df574 100644 --- a/src/query/src/tests/percentile_test.rs +++ b/src/query/src/tests/percentile_test.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_recordbatch::RecordBatch; use datatypes::for_all_primitive_types; @@ -102,14 +101,14 @@ fn create_correctness_engine() -> Arc { RecordBatch::new(schema, columns).unwrap(), )); schema_provider - .register_table(number_table.table_name().to_string(), number_table) + .register_table_sync(number_table.table_name().to_string(), number_table) .unwrap(); catalog_provider - .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) .unwrap(); catalog_list - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) .unwrap(); QueryEngineFactory::new(catalog_list).query_engine() diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index e3688f6e4f..61753a314f 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::BoxedError; @@ -110,14 +109,14 @@ fn catalog_list() -> Result> { let default_schema = Arc::new(MemorySchemaProvider::new()); default_schema - .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) + .register_table_sync("numbers".to_string(), Arc::new(NumbersTable::default())) .unwrap(); let default_catalog = Arc::new(MemoryCatalogProvider::new()); default_catalog - .register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema) + .register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema) .unwrap(); catalog_list - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog) + .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog) .unwrap(); Ok(catalog_list) } diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index e0f5a458fc..249d32e678 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -16,7 +16,6 @@ use std::any::Any; use std::sync::Arc; use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; use common_recordbatch::RecordBatch; @@ -109,14 +108,15 @@ fn create_test_engine() -> TimeRangeTester { let catalog_list = new_memory_catalog_list().unwrap(); let default_schema = Arc::new(MemorySchemaProvider::new()); - MemorySchemaProvider::register_table(&default_schema, "m".to_string(), table.clone()).unwrap(); + MemorySchemaProvider::register_table_sync(&default_schema, "m".to_string(), table.clone()) + .unwrap(); let default_catalog = Arc::new(MemoryCatalogProvider::new()); default_catalog - .register_schema("public".to_string(), default_schema) + .register_schema_sync("public".to_string(), default_schema) .unwrap(); catalog_list - .register_catalog("greptime".to_string(), default_catalog) + .register_catalog_sync("greptime".to_string(), default_catalog) .unwrap(); let engine = QueryEngineFactory::new(catalog_list).query_engine(); diff --git a/src/script/benches/py_benchmark.rs b/src/script/benches/py_benchmark.rs index 8a02ee65c6..44c7538207 100644 --- a/src/script/benches/py_benchmark.rs +++ b/src/script/benches/py_benchmark.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::sync::Arc; use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use criterion::{black_box, criterion_group, criterion_main, Criterion}; @@ -55,14 +54,14 @@ pub(crate) fn sample_script_engine() -> PyEngine { let default_schema = Arc::new(MemorySchemaProvider::new()); default_schema - .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) + .register_table_sync("numbers".to_string(), Arc::new(NumbersTable::default())) .unwrap(); let default_catalog = Arc::new(MemoryCatalogProvider::new()); default_catalog - .register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema) + .register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema) .unwrap(); catalog_list - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog) + .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog) .unwrap(); let factory = QueryEngineFactory::new(catalog_list); diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 25ec54c235..f5e01e9ec5 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -352,7 +352,6 @@ pub(crate) use tests::sample_script_engine; #[cfg(test)] mod tests { use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; - use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_recordbatch::util; use datatypes::prelude::ScalarVector; @@ -368,14 +367,14 @@ mod tests { let default_schema = Arc::new(MemorySchemaProvider::new()); default_schema - .register_table("numbers".to_string(), Arc::new(NumbersTable::default())) + .register_table_sync("numbers".to_string(), Arc::new(NumbersTable::default())) .unwrap(); let default_catalog = Arc::new(MemoryCatalogProvider::new()); default_catalog - .register_schema(DEFAULT_SCHEMA_NAME.to_string(), default_schema) + .register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), default_schema) .unwrap(); catalog_list - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), default_catalog) + .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), default_catalog) .unwrap(); let factory = QueryEngineFactory::new(catalog_list); diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 4e5fad7c52..31b2b75b1e 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -71,14 +71,14 @@ use crate::server::Server; /// create query context from database name information, catalog and schema are /// resolved from the name -pub(crate) fn query_context_from_db( +pub(crate) async fn query_context_from_db( query_handler: ServerSqlQueryHandlerRef, db: Option, ) -> std::result::Result, JsonResponse> { if let Some(db) = &db { let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db); - match query_handler.is_valid_schema(catalog, schema) { + match query_handler.is_valid_schema(catalog, schema).await { Ok(true) => Ok(Arc::new(QueryContext::with(catalog, schema))), Ok(false) => Err(JsonResponse::with_error( format!("Database not found: {db}"), @@ -698,7 +698,7 @@ mod test { unimplemented!() } - fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result { + async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result { Ok(true) } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 0b377cf41f..1856db893d 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -52,7 +52,7 @@ pub async fn sql( let db = query_params.db.or(form_params.db); let resp = if let Some(sql) = &sql { - match super::query_context_from_db(sql_handler.clone(), db) { + match crate::http::query_context_from_db(sql_handler.clone(), db).await { Ok(query_ctx) => { JsonResponse::from_output(sql_handler.do_query(sql, query_ctx).await).await } @@ -102,7 +102,7 @@ pub async fn promql( let exec_start = Instant::now(); let db = params.db.clone(); let prom_query = params.into(); - let resp = match super::query_context_from_db(sql_handler.clone(), db) { + let resp = match super::query_context_from_db(sql_handler.clone(), db).await { Ok(query_ctx) => { JsonResponse::from_output(sql_handler.do_promql_query(&prom_query, query_ctx).await) .await diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 1c93673184..01007c195d 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -234,7 +234,7 @@ impl AsyncMysqlShim for MysqlInstanceShi async fn on_init<'a>(&'a mut self, database: &'a str, w: InitWriter<'a, W>) -> Result<()> { let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(database); ensure!( - self.query_handler.is_valid_schema(catalog, schema)?, + self.query_handler.is_valid_schema(catalog, schema).await?, error::DatabaseNotFoundSnafu { catalog, schema } ); diff --git a/src/servers/src/postgres/auth_handler.rs b/src/servers/src/postgres/auth_handler.rs index 78a309713c..7ddacefad6 100644 --- a/src/servers/src/postgres/auth_handler.rs +++ b/src/servers/src/postgres/auth_handler.rs @@ -135,7 +135,7 @@ impl StartupHandler for PostgresServerHandler { auth::save_startup_parameters_to_metadata(client, startup); // check if db is valid - match resolve_db_info(client, self.query_handler.clone())? { + match resolve_db_info(client, self.query_handler.clone()).await? { DbResolution::Resolved(catalog, schema) => { client .metadata_mut() @@ -210,7 +210,7 @@ enum DbResolution { } /// A function extracted to resolve lifetime and readability issues: -fn resolve_db_info( +async fn resolve_db_info( client: &mut C, query_handler: ServerSqlQueryHandlerRef, ) -> PgWireResult @@ -222,6 +222,7 @@ where let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); if query_handler .is_valid_schema(catalog, schema) + .await .map_err(|e| PgWireError::ApiError(Box::new(e)))? { Ok(DbResolution::Resolved( diff --git a/src/servers/src/query_handler/sql.rs b/src/servers/src/query_handler/sql.rs index 76259bf157..dd62100393 100644 --- a/src/servers/src/query_handler/sql.rs +++ b/src/servers/src/query_handler/sql.rs @@ -50,7 +50,7 @@ pub trait SqlQueryHandler { query_ctx: QueryContextRef, ) -> std::result::Result, Self::Error>; - fn is_valid_schema( + async fn is_valid_schema( &self, catalog: &str, schema: &str, @@ -116,9 +116,10 @@ where .context(error::DescribeStatementSnafu) } - fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { + async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { self.0 .is_valid_schema(catalog, schema) + .await .map_err(BoxedError::new) .context(error::CheckDatabaseValiditySnafu) } diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index da5d9d9a1f..ab5c86afc7 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -87,7 +87,7 @@ impl SqlQueryHandler for DummyInstance { unimplemented!() } - fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result { + async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result { Ok(true) } } diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 8a0ed59986..399d0d7833 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -85,7 +85,7 @@ impl SqlQueryHandler for DummyInstance { unimplemented!() } - fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result { + async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result { Ok(true) } } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 69d4fb8046..a1046d59e9 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -110,7 +110,7 @@ impl SqlQueryHandler for DummyInstance { unimplemented!() } - fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result { + async fn is_valid_schema(&self, _catalog: &str, _schema: &str) -> Result { Ok(true) } } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index f73ffdd865..a01436fadd 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -19,7 +19,6 @@ use api::v1::greptime_request::{Request as GreptimeRequest, Request}; use api::v1::query_request::Query; use async_trait::async_trait; use catalog::local::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; -use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use datatypes::schema::Schema; @@ -106,7 +105,7 @@ impl SqlQueryHandler for DummyInstance { } } - fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { + async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { Ok(catalog == DEFAULT_CATALOG_NAME && schema == DEFAULT_SCHEMA_NAME) } } @@ -202,12 +201,14 @@ fn create_testing_instance(table: MemTable) -> DummyInstance { let schema_provider = Arc::new(MemorySchemaProvider::new()); let catalog_provider = Arc::new(MemoryCatalogProvider::new()); let catalog_list = Arc::new(MemoryCatalogManager::default()); - schema_provider.register_table(table_name, table).unwrap(); + schema_provider + .register_table_sync(table_name, table) + .unwrap(); catalog_provider - .register_schema(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) + .register_schema_sync(DEFAULT_SCHEMA_NAME.to_string(), schema_provider) .unwrap(); catalog_list - .register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) + .register_catalog_sync(DEFAULT_CATALOG_NAME.to_string(), catalog_provider) .unwrap(); let factory = QueryEngineFactory::new(catalog_list); diff --git a/src/table-procedure/src/alter.rs b/src/table-procedure/src/alter.rs index f63de63149..3259448592 100644 --- a/src/table-procedure/src/alter.rs +++ b/src/table-procedure/src/alter.rs @@ -138,12 +138,14 @@ impl AlterTableProcedure { let catalog = self .catalog_manager .catalog(&request.catalog_name) + .await .context(AccessCatalogSnafu)? .context(CatalogNotFoundSnafu { name: &request.catalog_name, })?; let schema = catalog .schema(&request.schema_name) + .await .context(AccessCatalogSnafu)? .context(SchemaNotFoundSnafu { name: &request.schema_name, @@ -163,6 +165,7 @@ impl AlterTableProcedure { ensure!( !schema .table_exist(new_table_name) + .await .context(AccessCatalogSnafu)?, TableExistsSnafu { name: format!( @@ -338,9 +341,10 @@ mod tests { let catalog = catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .await .unwrap() .unwrap(); - let schema = catalog.schema(DEFAULT_SCHEMA_NAME).unwrap().unwrap(); + let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap(); let table = schema.table(new_table_name).await.unwrap().unwrap(); let table_info = table.table_info(); assert_eq!(new_table_name, table_info.name); diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index 6a98485297..d531ed4370 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -47,7 +47,7 @@ impl Procedure for CreateTableProcedure { async fn execute(&mut self, ctx: &Context) -> Result { match self.data.state { - CreateTableState::Prepare => self.on_prepare(), + CreateTableState::Prepare => self.on_prepare().await, CreateTableState::EngineCreateTable => self.on_engine_create_table(ctx).await, CreateTableState::RegisterCatalog => self.on_register_catalog().await, } @@ -131,11 +131,12 @@ impl CreateTableProcedure { }) } - fn on_prepare(&mut self) -> Result { + async fn on_prepare(&mut self) -> Result { // Check whether catalog and schema exist. let catalog = self .catalog_manager .catalog(&self.data.request.catalog_name) + .await .context(AccessCatalogSnafu)? .with_context(|| { logging::error!( @@ -148,6 +149,7 @@ impl CreateTableProcedure { })?; catalog .schema(&self.data.request.schema_name) + .await .context(AccessCatalogSnafu)? .with_context(|| { logging::error!( @@ -224,12 +226,14 @@ impl CreateTableProcedure { let catalog = self .catalog_manager .catalog(&self.data.request.catalog_name) + .await .context(AccessCatalogSnafu)? .context(CatalogNotFoundSnafu { name: &self.data.request.catalog_name, })?; let schema = catalog .schema(&self.data.request.schema_name) + .await .context(AccessCatalogSnafu)? .context(SchemaNotFoundSnafu { name: &self.data.request.schema_name, diff --git a/src/table-procedure/src/drop.rs b/src/table-procedure/src/drop.rs index 5ea82d8c56..2ceac194a4 100644 --- a/src/table-procedure/src/drop.rs +++ b/src/table-procedure/src/drop.rs @@ -291,9 +291,10 @@ mod tests { let catalog = catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .await .unwrap() .unwrap(); - let schema = catalog.schema(DEFAULT_SCHEMA_NAME).unwrap().unwrap(); + let schema = catalog.schema(DEFAULT_SCHEMA_NAME).await.unwrap().unwrap(); assert!(schema.table(table_name).await.unwrap().is_none()); let ctx = EngineContext::default(); assert!(!table_engine.table_exists( diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 6e30742286..5958c477df 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -89,7 +89,7 @@ pub struct TableIdent { pub version: TableVersion, } -/// The table medatadata +/// The table metadata /// Note: if you add new fields to this struct, please ensure 'new_meta_builder' function works. /// TODO(dennis): find a better way to ensure 'new_meta_builder' works when adding new fields. #[derive(Clone, Debug, Builder, PartialEq, Eq)] @@ -172,7 +172,15 @@ impl TableMeta { AlterKind::AddColumns { columns } => self.add_columns(table_name, columns), AlterKind::DropColumns { names } => self.remove_columns(table_name, names), // No need to rebuild table meta when renaming tables. - AlterKind::RenameTable { .. } => Ok(TableMetaBuilder::default()), + AlterKind::RenameTable { .. } => { + let mut meta_builder = TableMetaBuilder::default(); + meta_builder + .schema(self.schema.clone()) + .primary_key_indices(self.primary_key_indices.clone()) + .engine(self.engine.clone()) + .next_column_id(self.next_column_id); + Ok(meta_builder) + } } } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 501a6a8844..ec3a2c7bc5 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -267,13 +267,16 @@ pub async fn create_test_table( let schema_provider = catalog_manager .catalog(DEFAULT_CATALOG_NAME) + .await .unwrap() .unwrap() .schema(DEFAULT_SCHEMA_NAME) + .await .unwrap() .unwrap(); schema_provider .register_table(table_name.to_string(), table) + .await .unwrap(); Ok(()) } diff --git a/tests/cases/distributed/alter/rename_table.result b/tests/cases/distributed/alter/rename_table.result index 2599550875..bd474870b3 100644 --- a/tests/cases/distributed/alter/rename_table.result +++ b/tests/cases/distributed/alter/rename_table.result @@ -27,9 +27,13 @@ SELECT * from t; ALTER TABLE t RENAME new_table; -Error: 1001(Unsupported), Operation rename table not implemented yet +Affected Rows: 0 DROP TABLE t; -Affected Rows: 1 +Error: 4001(TableNotFound), Table not found: greptime.public.t + +DROP TABLE new_table; + +Error: 1003(Internal), status: Internal, message: "Table route not found: __meta_table_route-greptime-public-new_table-1025", details: [], metadata: MetadataMap { headers: {} } diff --git a/tests/cases/distributed/alter/rename_table.sql b/tests/cases/distributed/alter/rename_table.sql index a35bffb1bf..22b623fb18 100644 --- a/tests/cases/distributed/alter/rename_table.sql +++ b/tests/cases/distributed/alter/rename_table.sql @@ -10,3 +10,6 @@ SELECT * from t; ALTER TABLE t RENAME new_table; DROP TABLE t; + +-- TODO: this clause should success +DROP TABLE new_table;