From 4fdb6d2f21513e20474f00932d6872d2c85b1cc2 Mon Sep 17 00:00:00 2001 From: LFC Date: Wed, 12 Jul 2023 17:33:33 +0800 Subject: [PATCH] refactor: remote catalog uses memory (#1926) * refactor: remote catalog uses memory * rebase develop * fix: resolve PR comments --- Cargo.lock | 1 + src/catalog/Cargo.toml | 4 +- src/catalog/src/local/manager.rs | 2 - src/catalog/src/local/memory.rs | 53 +- src/catalog/src/remote/manager.rs | 653 ++++++------------ src/catalog/src/remote/region_alive_keeper.rs | 4 + src/catalog/tests/remote_catalog_tests.rs | 2 + src/cmd/Cargo.toml | 1 + src/cmd/src/cli/repl.rs | 4 +- src/common/meta/src/error.rs | 2 +- src/common/meta/src/key/table_name.rs | 26 +- src/datanode/src/instance.rs | 5 +- src/frontend/Cargo.toml | 2 +- src/frontend/src/catalog.rs | 5 + src/frontend/src/instance.rs | 4 + src/frontend/src/instance/distributed.rs | 5 + .../src/instance/distributed/inserter.rs | 3 + src/frontend/src/table.rs | 6 +- src/meta-client/src/client.rs | 15 +- src/meta-srv/Cargo.toml | 4 +- src/meta-srv/src/ddl.rs | 7 + src/meta-srv/src/error.rs | 7 + .../src/handler/persist_stats_handler.rs | 8 +- .../src/handler/region_lease_handler.rs | 16 +- .../src/handler/response_header_handler.rs | 7 +- src/meta-srv/src/metasrv.rs | 8 + src/meta-srv/src/metasrv/builder.rs | 11 +- src/meta-srv/src/procedure/region_failover.rs | 14 +- src/meta-srv/src/service/admin.rs | 2 + src/meta-srv/src/service/admin/meta.rs | 3 + src/meta-srv/src/service/store/kv.rs | 120 +++- src/meta-srv/src/test_util.rs | 5 +- src/query/Cargo.toml | 2 +- src/script/src/manager.rs | 1 - src/servers/Cargo.toml | 2 +- src/storage/Cargo.toml | 2 +- src/table/src/metadata.rs | 6 + tests-integration/src/cluster.rs | 22 +- tests-integration/src/test_util.rs | 3 +- tests-integration/src/tests.rs | 9 +- tests-integration/tests/region_failover.rs | 1 + 41 files changed, 529 insertions(+), 528 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a701c12c3d..592a083000 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1595,6 +1595,7 @@ dependencies = [ "client", "common-base", "common-error", + "common-meta", "common-query", "common-recordbatch", "common-telemetry", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 11c6c676e5..1e82fb8991 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -29,12 +29,12 @@ datatypes = { path = "../datatypes" } futures = "0.3" futures-util.workspace = true key-lock = "0.1" -lazy_static = "1.4" +lazy_static.workspace = true meta-client = { path = "../meta-client" } metrics.workspace = true moka = { version = "0.11", features = ["future"] } parking_lot = "0.12" -regex = "1.6" +regex.workspace = true serde = "1.0" serde_json = "1.0" session = { path = "../session" } diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 39f0323274..ac9e946f36 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -525,8 +525,6 @@ impl CatalogManager for LocalCatalogManager { } async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { - self.check_state().await?; - let catalog_name = request.create_table_request.catalog_name.clone(); let schema_name = request.create_table_request.schema_name.clone(); diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 0d9bf7e686..507346302f 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -75,15 +75,7 @@ impl CatalogManager for MemoryCatalogManager { } async fn register_table(&self, request: RegisterTableRequest) -> Result { - let catalog = request.catalog.clone(); - let schema = request.schema.clone(); - let result = self.register_table_sync(request); - increment_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - 1.0, - &[crate::metrics::db_label(&catalog, &schema)], - ); - result + self.register_table_sync(request) } async fn rename_table(&self, request: RenameTableRequest) -> Result { @@ -143,11 +135,7 @@ impl CatalogManager for MemoryCatalogManager { } async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { - let registered = self.register_schema_sync(request)?; - if registered { - increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0); - } - Ok(registered) + self.register_schema_sync(request) } async fn deregister_schema(&self, request: DeregisterSchemaRequest) -> Result { @@ -263,11 +251,7 @@ impl CatalogManager for MemoryCatalogManager { } async fn register_catalog(&self, name: String) -> Result { - let registered = self.register_catalog_sync(name)?; - if registered { - increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0); - } - Ok(registered) + self.register_catalog_sync(name) } fn as_any(&self) -> &dyn Any { @@ -291,7 +275,15 @@ impl MemoryCatalogManager { pub fn register_catalog_sync(&self, name: String) -> Result { let mut catalogs = self.catalogs.write().unwrap(); - Ok(catalogs.insert(name, HashMap::new()).is_some()) + + match catalogs.entry(name) { + Entry::Vacant(e) => { + e.insert(HashMap::new()); + increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0); + Ok(true) + } + Entry::Occupied(_) => Ok(false), + } } pub fn register_schema_sync(&self, request: RegisterSchemaRequest) -> Result { @@ -301,11 +293,15 @@ impl MemoryCatalogManager { .with_context(|| CatalogNotFoundSnafu { catalog_name: &request.catalog, })?; - if catalog.contains_key(&request.schema) { - return Ok(false); + + match catalog.entry(request.schema) { + Entry::Vacant(e) => { + e.insert(HashMap::new()); + increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0); + Ok(true) + } + Entry::Occupied(_) => Ok(false), } - let _ = catalog.insert(request.schema, HashMap::new()); - Ok(true) } pub fn register_table_sync(&self, request: RegisterTableRequest) -> Result { @@ -327,8 +323,13 @@ impl MemoryCatalogManager { } .fail(); } - - Ok(schema.insert(request.table_name, request.table).is_none()) + schema.insert(request.table_name, request.table); + increment_gauge!( + crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, + 1.0, + &[crate::metrics::db_label(&request.catalog, &request.schema)], + ); + Ok(true) } #[cfg(any(test, feature = "testing"))] diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 1c522a322f..46de3929b2 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -13,34 +13,34 @@ // limitations under the License. use std::any::Any; -use std::collections::HashSet; use std::sync::Arc; use async_trait::async_trait; -use common_catalog::consts::{MAX_SYS_TABLE_ID, MITO_ENGINE}; +use common_catalog::consts::MITO_ENGINE; use common_meta::ident::TableIdent; +use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::rpc::store::{PutRequest, RangeRequest}; use common_meta::rpc::KeyValue; use common_telemetry::{debug, error, info, warn}; -use metrics::{decrement_gauge, increment_gauge}; -use snafu::ResultExt; +use metrics::increment_gauge; +use snafu::{ensure, OptionExt, ResultExt}; use table::engine::manager::TableEngineManagerRef; use table::engine::{EngineContext, TableReference}; -use table::requests::{CreateTableRequest, OpenTableRequest}; +use table::requests::OpenTableRequest; use table::TableRef; use tokio::sync::Mutex; use crate::error::{ - CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, - ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu, - TableMetadataManagerSnafu, UnimplementedSnafu, + InvalidCatalogValueSnafu, OpenTableSnafu, ParallelOpenTableSnafu, Result, + TableEngineNotFoundSnafu, TableExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, + UnimplementedSnafu, }; use crate::helper::{ - build_catalog_prefix, build_schema_prefix, build_table_global_prefix, - build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, - TableGlobalValue, TableRegionalKey, TableRegionalValue, CATALOG_KEY_PREFIX, + build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey, + TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, }; +use crate::local::MemoryCatalogManager; use crate::remote::region_alive_keeper::RegionAliveKeepers; use crate::{ handle_system_table_request, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, @@ -54,6 +54,8 @@ pub struct RemoteCatalogManager { engine_manager: TableEngineManagerRef, system_table_requests: Mutex>, region_alive_keepers: Arc, + memory_catalog_manager: Arc, + table_metadata_manager: TableMetadataManagerRef, } impl RemoteCatalogManager { @@ -62,6 +64,7 @@ impl RemoteCatalogManager { node_id: u64, backend: KvBackendRef, region_alive_keepers: Arc, + table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { engine_manager, @@ -69,6 +72,8 @@ impl RemoteCatalogManager { backend, system_table_requests: Default::default(), region_alive_keepers, + memory_catalog_manager: Arc::new(MemoryCatalogManager::default()), + table_metadata_manager, } } @@ -120,40 +125,6 @@ impl RemoteCatalogManager { Ok(()) } - pub async fn create_catalog_and_schema( - &self, - catalog_name: &str, - schema_name: &str, - ) -> Result<()> { - let schema_key = SchemaKey { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - } - .to_string(); - let req = PutRequest::new() - .with_key(schema_key.as_bytes()) - .with_value(SchemaValue.as_bytes().context(InvalidCatalogValueSnafu)?); - self.backend - .put(req) - .await - .context(TableMetadataManagerSnafu)?; - info!("Created schema '{schema_key}'"); - - let catalog_key = CatalogKey { - catalog_name: catalog_name.to_string(), - } - .to_string(); - let req = PutRequest::new() - .with_key(catalog_key.as_bytes()) - .with_value(CatalogValue.as_bytes().context(InvalidCatalogValueSnafu)?); - self.backend - .put(req) - .await - .context(TableMetadataManagerSnafu)?; - info!("Created catalog '{catalog_key}"); - Ok(()) - } - fn build_schema_key(&self, catalog_name: String, schema_name: String) -> SchemaKey { SchemaKey { catalog_name, @@ -170,74 +141,39 @@ impl RemoteCatalogManager { engine_manager: TableEngineManagerRef, catalog_name: String, schema_name: String, - ) -> Result { + ) -> Result<()> { info!("initializing tables in {}.{}", catalog_name, schema_name); let kvs = iter_remote_tables(node_id, &backend, &catalog_name, &schema_name).await?; - let table_num = kvs.len(); let joins = kvs .into_iter() - .map(|(table_key, table_value)| { + .map(|(_, table_value)| { let engine_manager = engine_manager.clone(); - let backend = backend.clone(); + let memory_catalog_manager = self.memory_catalog_manager.clone(); + let table_metadata_manager = self.table_metadata_manager.clone(); + let table_id = table_value.table_id(); common_runtime::spawn_bg(async move { - match open_or_create_table(node_id, engine_manager, &table_key, &table_value) - .await + if let Err(e) = open_and_register_table( + node_id, + engine_manager, + &table_value, + memory_catalog_manager, + table_metadata_manager, + ) + .await { - Ok(table_ref) => { - let table_info = table_ref.table_info(); - let table_name = &table_info.name; - info!("Registered table {}", table_name); - Ok(Some(table_info.ident.table_id)) - } - Err(err) => { - warn!( - "Node id: {}, failed to open table: {}, source: {}", - node_id, table_key, err - ); - debug!( - "Node id: {}, TableGlobalKey: {}, value: {:?},", - node_id, table_key, table_value - ); - print_regional_key_debug_info(node_id, backend, &table_key).await; - - Ok(None) - } + // Note that we don't return error here if table opened failed. This is because + // we don't want those broken tables to impede the startup of Datanode. + // However, this could be changed in the future. + error!(e; "Failed to open or register table, id = {table_id}") } }) }) .collect::>(); - let opened_table_ids = futures::future::try_join_all(joins) + futures::future::try_join_all(joins) .await - .context(ParallelOpenTableSnafu)? - .into_iter() - .collect::>>()? - .into_iter() - .flatten() - .collect::>(); - - let opened = opened_table_ids.len(); - - let max_table_id = opened_table_ids - .into_iter() - .max() - .unwrap_or(MAX_SYS_TABLE_ID); - - increment_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - table_num as f64, - &[crate::metrics::db_label(&catalog_name, &schema_name)], - ); - info!( - "initialized tables in {}.{}, total: {}, opened: {}, failed: {}", - catalog_name, - schema_name, - table_num, - opened, - table_num - opened - ); - - Ok(max_table_id) + .context(ParallelOpenTableSnafu)?; + Ok(()) } /// Initiates all schemas inside the catalog by fetching data from metasrv. @@ -275,19 +211,7 @@ impl RemoteCatalogManager { )); } - let mut max_table_id = MAX_SYS_TABLE_ID; - if let Some(found_max_table_id) = futures::future::try_join_all(joins) - .await? - .into_iter() - .max() - { - max_table_id = max_table_id.max(found_max_table_id); - info!( - "Catalog name: {}, max table id allocated: {}", - catalog_name, max_table_id - ); - } - + futures::future::try_join_all(joins).await?; Ok(()) } @@ -402,24 +326,6 @@ impl RemoteCatalogManager { node_id: self.node_id, } } - - async fn check_catalog_schema_exist( - &self, - catalog_name: &str, - schema_name: &str, - ) -> Result<()> { - if !self.catalog_exist(catalog_name).await? { - return CatalogNotFoundSnafu { catalog_name }.fail()?; - } - if !self.schema_exist(catalog_name, schema_name).await? { - return SchemaNotFoundSnafu { - catalog: catalog_name, - schema: schema_name, - } - .fail()?; - } - Ok(()) - } } async fn iter_remote_schemas<'a>( @@ -495,59 +401,14 @@ async fn iter_remote_tables<'a>( Ok(tables) } -async fn print_regional_key_debug_info( - node_id: u64, - backend: KvBackendRef, - table_key: &TableGlobalKey, -) { - let regional_key = TableRegionalKey { - catalog_name: table_key.catalog_name.clone(), - schema_name: table_key.schema_name.clone(), - table_name: table_key.table_name.clone(), - node_id, - } - .to_string(); - - match backend.get(regional_key.as_bytes()).await { - Ok(Some(KeyValue { - key: _, - value: values_bytes, - })) => { - debug!( - "Node id: {}, TableRegionalKey: {}, value: {},", - node_id, - table_key, - String::from_utf8_lossy(&values_bytes), - ); - } - Ok(None) => { - debug!( - "Node id: {}, TableRegionalKey: {}, value: None", - node_id, table_key, - ); - } - Err(err) => { - debug!( - "Node id: {}, failed to fetch TableRegionalKey: {}, source: {}", - node_id, regional_key, err - ); - } - } -} - -async fn open_or_create_table( +async fn open_and_register_table( node_id: u64, engine_manager: TableEngineManagerRef, - table_key: &TableGlobalKey, table_value: &TableGlobalValue, -) -> Result { + memory_catalog_manager: Arc, + _table_metadata_manager: TableMetadataManagerRef, +) -> Result<()> { let context = EngineContext {}; - let TableGlobalKey { - catalog_name, - schema_name, - table_name, - .. - } = table_key; let table_id = table_value.table_id(); @@ -557,6 +418,10 @@ async fn open_or_create_table( .. } = table_value; + let catalog_name = table_info.catalog_name.clone(); + let schema_name = table_info.schema_name.clone(); + let table_name = table_info.name.clone(); + // unwrap safety: checked in yielding this table when `iter_remote_tables` let region_numbers = regions_id_map.get(&node_id).unwrap(); @@ -573,51 +438,42 @@ async fn open_or_create_table( .context(TableEngineNotFoundSnafu { engine_name: &table_info.meta.engine, })?; - match engine + + let table_ident = TableIdent { + catalog: catalog_name, + schema: schema_name, + table: table_name, + table_id, + engine: table_info.meta.engine.clone(), + }; + + let table = engine .open_table(&context, request) .await .with_context(|_| OpenTableSnafu { - table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"), - })? { - Some(table) => { - info!( - "Table opened: {}.{}.{}", - catalog_name, schema_name, table_name - ); - Ok(table) - } - None => { - info!( - "Try create table: {}.{}.{}", - catalog_name, schema_name, table_name - ); + table_info: table_ident.to_string(), + })? + .with_context(|| TableNotFoundSnafu { + table_info: table_ident.to_string(), + })?; + info!("Successfully opened table, {table_ident}"); - let meta = &table_info.meta; - let req = CreateTableRequest { - id: table_id, - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: table_name.clone(), - desc: None, - schema: meta.schema.clone(), - region_numbers: region_numbers.clone(), - primary_key_indices: meta.primary_key_indices.clone(), - create_if_not_exists: true, - table_options: meta.options.clone(), - engine: engine.name().to_string(), - }; - - engine - .create_table(&context, req) - .await - .context(CreateTableSnafu { - table_info: format!( - "{}.{}.{}, id:{}", - &catalog_name, &schema_name, &table_name, table_id - ), - }) + let request = RegisterTableRequest { + catalog: table_ident.catalog.clone(), + schema: table_ident.schema.clone(), + table_name: table_ident.table.clone(), + table_id, + table, + }; + let registered = memory_catalog_manager.register_table_sync(request)?; + ensure!( + registered, + TableExistsSnafu { + table: table_ident.to_string(), } - } + ); + info!("Successfully registered table, {table_ident}"); + Ok(()) } #[async_trait] @@ -638,92 +494,70 @@ impl CatalogManager for RemoteCatalogManager { } async fn register_table(&self, request: RegisterTableRequest) -> Result { - let catalog_name = request.catalog; - let schema_name = request.schema; - self.check_catalog_schema_exist(&catalog_name, &schema_name) - .await?; + let catalog = request.catalog.clone(); + let schema = request.schema.clone(); + let table_name = request.table_name.clone(); + let table = request.table.clone(); - let _ = self - .register_table( - catalog_name.clone(), - schema_name.clone(), - request.table_name, - request.table.clone(), - ) - .await?; + let registered = self.memory_catalog_manager.register_table_sync(request)?; - let table_info = request.table.table_info(); - let table_ident = TableIdent { - catalog: table_info.catalog_name.clone(), - schema: table_info.schema_name.clone(), - table: table_info.name.clone(), - table_id: table_info.ident.table_id, - engine: table_info.meta.engine.clone(), - }; - self.region_alive_keepers - .register_table(table_ident, request.table) - .await?; + if registered { + self.register_table(catalog, schema, table_name, table.clone()) + .await?; - increment_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - 1.0, - &[crate::metrics::db_label(&catalog_name, &schema_name)], - ); - Ok(true) + let table_info = table.table_info(); + let table_ident = TableIdent { + catalog: table_info.catalog_name.clone(), + schema: table_info.schema_name.clone(), + table: table_info.name.clone(), + table_id: table_info.table_id(), + engine: table_info.meta.engine.clone(), + }; + self.region_alive_keepers + .register_table(table_ident, table) + .await?; + } + + Ok(registered) } async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> { - let catalog_name = request.catalog; - let schema_name = request.schema; - let table_name = request.table_name; - self.check_catalog_schema_exist(&catalog_name, &schema_name) - .await?; + let Some(table) = self + .memory_catalog_manager + .table(&request.catalog, &request.schema, &request.table_name) + .await? else { return Ok(()) }; - let result = self - .deregister_table( - catalog_name.clone(), - schema_name.clone(), - table_name.clone(), - ) - .await?; - decrement_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - 1.0, - &[crate::metrics::db_label(&catalog_name, &schema_name)], - ); + self.deregister_table( + request.catalog.clone(), + request.schema.clone(), + request.table_name.clone(), + ) + .await?; - if let Some(table) = result.as_ref() { - let table_info = table.table_info(); - let table_ident: TableIdent = TableIdent { - catalog: catalog_name, - schema: schema_name, - table: table_name, - table_id: table_info.ident.table_id, - engine: table_info.meta.engine.clone(), - }; - let _ = self - .region_alive_keepers - .deregister_table(&table_ident) - .await; + let table_info = table.table_info(); + let table_ident = TableIdent { + catalog: request.catalog.clone(), + schema: request.schema.clone(), + table: request.table_name.clone(), + table_id: table_info.ident.table_id, + engine: table_info.meta.engine.clone(), + }; + if let Some(keeper) = self + .region_alive_keepers + .deregister_table(&table_ident) + .await + { + warn!( + "Table {} is deregistered from region alive keepers", + keeper.table_ident(), + ); } - Ok(()) + self.memory_catalog_manager.deregister_table(request).await } async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { - let catalog_name = request.catalog; - let schema_name = request.schema; - let key = self.build_schema_key(catalog_name, schema_name).to_string(); - let req = PutRequest::new() - .with_key(key.as_bytes()) - .with_value(SchemaValue.as_bytes().context(InvalidCatalogValueSnafu)?); - self.backend - .put(req) - .await - .context(TableMetadataManagerSnafu)?; - - increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0); - Ok(true) + self.memory_catalog_manager.register_schema_sync(request) } async fn deregister_schema(&self, _request: DeregisterSchemaRequest) -> Result { @@ -734,42 +568,7 @@ impl CatalogManager for RemoteCatalogManager { } async fn rename_table(&self, request: RenameTableRequest) -> Result { - let catalog_name = request.catalog.clone(); - let schema_name = request.schema.clone(); - self.check_catalog_schema_exist(&catalog_name, &schema_name) - .await?; - - let old_table_key = TableRegionalKey { - catalog_name: request.catalog.clone(), - schema_name: request.schema.clone(), - table_name: request.table_name.clone(), - node_id: self.node_id, - } - .to_string(); - let Some(KeyValue{ key: _, value }) = self.backend - .get(old_table_key.as_bytes()) - .await - .context(TableMetadataManagerSnafu)? else { - return Ok(false) - }; - let new_table_key = TableRegionalKey { - catalog_name: request.catalog.clone(), - schema_name: request.schema.clone(), - table_name: request.new_table_name, - node_id: self.node_id, - }; - let req = PutRequest::new() - .with_key(new_table_key.to_string().as_bytes()) - .with_value(value); - self.backend - .put(req) - .await - .context(TableMetadataManagerSnafu)?; - self.backend - .delete(old_table_key.to_string().as_bytes(), false) - .await - .context(TableMetadataManagerSnafu)?; - Ok(true) + self.memory_catalog_manager.rename_table(request).await } async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { @@ -787,15 +586,45 @@ impl CatalogManager for RemoteCatalogManager { } async fn schema_exist(&self, catalog: &str, schema: &str) -> Result { + if !self.catalog_exist(catalog).await? { + return Ok(false); + } + + if self + .memory_catalog_manager + .schema_exist(catalog, schema) + .await? + { + return Ok(true); + } + let key = self .build_schema_key(catalog.to_string(), schema.to_string()) .to_string(); - Ok(self + let remote_schema_exists = self .backend .get(key.as_bytes()) .await .context(TableMetadataManagerSnafu)? - .is_some()) + .is_some(); + + // Create schema locally if remote schema exists. Since local schema is managed by memory + // catalog manager, creating a local schema is relatively cheap (just a HashMap). + // Besides, if this method ("schema_exist) is called, it's very likely that someone wants to + // create a table in this schema. We should create the schema now. + if remote_schema_exists + && self + .memory_catalog_manager + .register_schema(RegisterSchemaRequest { + catalog: catalog.to_string(), + schema: schema.to_string(), + }) + .await? + { + info!("register schema '{catalog}/{schema}' on demand"); + } + + Ok(remote_schema_exists) } async fn table( @@ -804,161 +633,73 @@ impl CatalogManager for RemoteCatalogManager { schema_name: &str, table_name: &str, ) -> Result> { - self.check_catalog_schema_exist(catalog_name, schema_name) - .await?; - - let key = self - .build_regional_table_key( - catalog_name.to_string(), - schema_name.to_string(), - table_name.to_string(), - ) - .to_string(); - let table_opt = self - .backend - .get(key.as_bytes()) + self.memory_catalog_manager + .table(catalog_name, schema_name, table_name) .await - .context(TableMetadataManagerSnafu)? - .map(|KeyValue { key: _, value: v }| { - let TableRegionalValue { - table_id, - engine_name, - .. - } = TableRegionalValue::parse(String::from_utf8_lossy(&v)) - .context(InvalidCatalogValueSnafu)?; - - let Some(table_id) = table_id else { - warn!("Cannot find table id for {key}, the value has an old format"); - return Ok(None); - }; - let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE); - let engine = self - .engine_manager - .engine(engine_name) - .context(TableEngineNotFoundSnafu { engine_name })?; - let reference = TableReference { - catalog: catalog_name, - schema: schema_name, - table: table_name, - }; - let table = engine - .get_table(&EngineContext {}, table_id) - .with_context(|_| OpenTableSnafu { - table_info: reference.to_string(), - })?; - Ok(table) - }) - .transpose()? - .flatten(); - - Ok(table_opt) } async fn catalog_exist(&self, catalog: &str) -> Result { + if self.memory_catalog_manager.catalog_exist(catalog).await? { + return Ok(true); + } + let key = CatalogKey { catalog_name: catalog.to_string(), }; - Ok(self + + let remote_catalog_exists = self .backend .get(key.to_string().as_bytes()) .await .context(TableMetadataManagerSnafu)? - .is_some()) + .is_some(); + + // Create catalog locally if remote catalog exists. Since local catalog is managed by memory + // catalog manager, creating a local catalog is relatively cheap (just a HashMap). + // Besides, if this method ("catalog_exist) is called, it's very likely that someone wants to + // create a table in this catalog. We should create the catalog now. + if remote_catalog_exists + && self + .memory_catalog_manager + .register_catalog(catalog.to_string()) + .await? + { + info!("register catalog '{catalog}' on demand"); + } + + Ok(remote_catalog_exists) } async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result { - self.check_catalog_schema_exist(catalog, schema).await?; - - let key = TableRegionalKey { - catalog_name: catalog.to_string(), - schema_name: schema.to_string(), - table_name: table.to_string(), - node_id: self.node_id, + if !self.catalog_exist(catalog).await? { + return Ok(false); } - .to_string(); - Ok(self - .backend - .get(key.as_bytes()) + if !self.schema_exist(catalog, schema).await? { + return Ok(false); + } + + self.memory_catalog_manager + .table_exist(catalog, schema, table) .await - .context(TableMetadataManagerSnafu)? - .is_some()) } async fn catalog_names(&self) -> Result> { - let req = RangeRequest::new().with_prefix(CATALOG_KEY_PREFIX.as_bytes()); - let kvs = self - .backend - .range(req) - .await - .context(TableMetadataManagerSnafu)? - .kvs; - let mut catalogs = HashSet::new(); - - for catalog in kvs { - let catalog_key = String::from_utf8_lossy(catalog.key()); - if let Ok(key) = CatalogKey::parse(&catalog_key) { - let _ = catalogs.insert(key.catalog_name); - } - } - - Ok(catalogs.into_iter().collect()) + self.memory_catalog_manager.catalog_names().await } async fn schema_names(&self, catalog_name: &str) -> Result> { - let req = RangeRequest::new().with_prefix(build_schema_prefix(catalog_name).as_bytes()); - let kvs = self - .backend - .range(req) - .await - .context(TableMetadataManagerSnafu)? - .kvs; - let mut schemas = HashSet::new(); - - for schema in kvs { - let schema_key = String::from_utf8_lossy(schema.key()); - if let Ok(key) = SchemaKey::parse(&schema_key) { - let _ = schemas.insert(key.schema_name); - } - } - Ok(schemas.into_iter().collect()) + self.memory_catalog_manager.schema_names(catalog_name).await } async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result> { - self.check_catalog_schema_exist(catalog_name, schema_name) - .await?; - - let req = RangeRequest::new() - .with_prefix(build_table_regional_prefix(catalog_name, schema_name).as_bytes()); - let kvs = self - .backend - .range(req) + self.memory_catalog_manager + .table_names(catalog_name, schema_name) .await - .context(TableMetadataManagerSnafu)? - .kvs; - let mut tables = HashSet::new(); - - for table in kvs { - let table_key = String::from_utf8_lossy(table.key()); - if let Ok(key) = TableRegionalKey::parse(&table_key) { - let _ = tables.insert(key.table_name); - } - } - Ok(tables.into_iter().collect()) } async fn register_catalog(&self, name: String) -> Result { - let key = CatalogKey { catalog_name: name }.to_string(); - // TODO(hl): use compare_and_swap to prevent concurrent update - let req = PutRequest::new() - .with_key(key.as_bytes()) - .with_value(CatalogValue.as_bytes().context(InvalidCatalogValueSnafu)?); - self.backend - .put(req) - .await - .context(TableMetadataManagerSnafu)?; - increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0); - Ok(false) + self.memory_catalog_manager.register_catalog_sync(name) } fn as_any(&self) -> &dyn Any { diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index 0f8de141c4..c9106b2866 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -312,6 +312,10 @@ impl RegionAliveKeeper { } deadline } + + pub fn table_ident(&self) -> &TableIdent { + &self.table_ident + } } #[derive(Debug)] diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index f43011b60e..a37c4f0a64 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -29,6 +29,7 @@ mod tests { use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::ident::TableIdent; + use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackend; use common_meta::rpc::store::{CompareAndPutRequest, PutRequest, RangeRequest}; @@ -155,6 +156,7 @@ mod tests { node_id, cached_backend.clone(), region_alive_keepers.clone(), + Arc::new(TableMetadataManager::new(cached_backend)), ); catalog_manager.start().await.unwrap(); diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index b31a443450..94eed1841b 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -22,6 +22,7 @@ client = { path = "../client" } common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } +common-meta = { path = "../common/meta" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry", features = [ "deadlock_detection", diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index e5c4cb5911..892bcc84e0 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -21,6 +21,7 @@ use client::client_manager::DatanodeClients; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_error::prelude::ErrorExt; +use common_meta::key::TableMetadataManager; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging; @@ -263,9 +264,10 @@ async fn create_query_engine(meta_addr: &str) -> Result { let catalog_list = Arc::new(FrontendCatalogManager::new( cached_meta_backend.clone(), - cached_meta_backend, + cached_meta_backend.clone(), partition_manager, datanode_clients, + Arc::new(TableMetadataManager::new(cached_meta_backend)), )); let plugins: Arc = Default::default(); let state = Arc::new(QueryEngineState::new( diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index b1a7cbd5d5..382cd72b82 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -70,7 +70,7 @@ pub enum Error { #[snafu(display("Get null from cache, key: {}", key))] CacheNotGet { key: String, location: Location }, - #[snafu(display("Failed to request MetaSrv, source: {}", source))] + #[snafu(display("{source}"))] MetaSrv { source: BoxedError, location: Location, diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index c8026d1761..4c1f1eda8b 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -24,6 +24,7 @@ use crate::key::{to_removed_key, TableMetaKey}; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{CompareAndPutRequest, MoveValueRequest, RangeRequest}; +use crate::table_name::TableName; #[derive(Debug)] pub struct TableNameKey<'a> { @@ -77,6 +78,16 @@ impl TableMetaKey for TableNameKey<'_> { } } +impl<'a> From<&'a TableName> for TableNameKey<'a> { + fn from(value: &'a TableName) -> Self { + Self { + catalog: &value.catalog_name, + schema: &value.schema_name, + table: &value.table_name, + } + } +} + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] pub struct TableNameValue { table_id: TableId, @@ -207,9 +218,18 @@ mod tests { test_err(b"__table_name/x/000_invalid_schema/z"); test_err(b"__table_name/x/y/000_invalid_table"); - let table_name = - TableNameKey::strip_table_name(b"__table_name/my_catalog/my_schema/my_table").unwrap(); - assert_eq!(table_name, "my_table"); + fn test_ok(table_name: &str) { + assert_eq!( + table_name, + TableNameKey::strip_table_name( + format!("__table_name/my_catalog/my_schema/{}", table_name).as_bytes() + ) + .unwrap() + ); + } + test_ok("my_table"); + test_ok("cpu:metrics"); + test_ok(":cpu:metrics"); } #[test] diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 78b452cd07..ceb6ec8543 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -28,6 +28,7 @@ use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; +use common_meta::key::TableMetadataManager; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::store::state_store::ObjectStateStore; use common_procedure::ProcedureManagerRef; @@ -253,8 +254,9 @@ impl Instance { let catalog_manager = Arc::new(RemoteCatalogManager::new( engine_manager.clone(), opts.node_id.context(MissingNodeIdSnafu)?, - kv_backend, + kv_backend.clone(), region_alive_keepers.clone(), + Arc::new(TableMetadataManager::new(kv_backend)), )); ( @@ -265,7 +267,6 @@ impl Instance { } }; - catalog_manager.start().await.context(CatalogSnafu)?; let factory = QueryEngineFactory::new_with_plugins( catalog_manager.clone(), false, diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 58eea9db4e..610603bae3 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -50,7 +50,7 @@ openmetrics-parser = "0.4" partition = { path = "../partition" } prost.workspace = true query = { path = "../query" } -regex = "1.6" +regex.workspace = true script = { path = "../script", features = ["python"], optional = true } serde = "1.0" serde_json = "1.0" diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 328346a3db..94f24c255b 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -34,6 +34,7 @@ use catalog::{ use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME}; use common_error::prelude::BoxedError; +use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::rpc::store::RangeRequest; use common_meta::rpc::KeyValue; @@ -54,6 +55,7 @@ pub struct FrontendCatalogManager { backend_cache_invalidator: KvCacheInvalidatorRef, partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, + table_metadata_manager: TableMetadataManagerRef, // TODO(LFC): Remove this field. // DistInstance in FrontendCatalogManager is only used for creating distributed script table now. @@ -68,12 +70,14 @@ impl FrontendCatalogManager { backend_cache_invalidator: KvCacheInvalidatorRef, partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, + table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { backend, backend_cache_invalidator, partition_manager, datanode_clients, + table_metadata_manager, dist_instance: None, } } @@ -415,6 +419,7 @@ impl CatalogManager for FrontendCatalogManager { TableName::new(catalog, schema, table_name), table_info, Arc::new(self.clone()), + self.table_metadata_manager.clone(), )); Ok(Some(table)) } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 87d069ef1d..c0a63f3f6e 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -39,6 +39,7 @@ use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; +use common_meta::key::TableMetadataManager; use common_query::Output; use common_telemetry::logging::{debug, info}; use common_telemetry::timer; @@ -149,17 +150,20 @@ impl Instance { let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); + let table_metadata_manager = Arc::new(TableMetadataManager::new(meta_backend.clone())); let mut catalog_manager = FrontendCatalogManager::new( meta_backend.clone(), meta_backend.clone(), partition_manager.clone(), datanode_clients.clone(), + table_metadata_manager.clone(), ); let dist_instance = DistInstance::new( meta_client.clone(), Arc::new(catalog_manager.clone()), datanode_clients.clone(), + table_metadata_manager.clone(), ); let dist_instance = Arc::new(dist_instance); diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index d91776710f..a6b62d54ad 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -34,6 +34,7 @@ use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::prelude::BoxedError; +use common_meta::key::TableMetadataManagerRef; use common_meta::peer::Peer; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition as MetaPartition, RouteRequest}; @@ -84,6 +85,7 @@ pub struct DistInstance { meta_client: Arc, catalog_manager: Arc, datanode_clients: Arc, + table_metadata_manager: TableMetadataManagerRef, } impl DistInstance { @@ -91,11 +93,13 @@ impl DistInstance { meta_client: Arc, catalog_manager: Arc, datanode_clients: Arc, + table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { meta_client, catalog_manager, datanode_clients, + table_metadata_manager, } } @@ -131,6 +135,7 @@ impl DistInstance { table_name.clone(), table_info, self.catalog_manager.clone(), + self.table_metadata_manager.clone(), )); let request = RegisterTableRequest { diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs index e59887b511..f41b9c9fc9 100644 --- a/src/frontend/src/instance/distributed/inserter.rs +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -181,6 +181,7 @@ mod tests { }; use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackend, KvBackendRef}; use common_meta::rpc::store::PutRequest; @@ -262,6 +263,7 @@ mod tests { async fn test_split_inserts() { let backend = prepare_mocked_backend().await; + let table_metadata_manager = Arc::new(TableMetadataManager::new(backend.clone())); let table_name = "one_column_partitioning_table"; create_testing_table(&backend, table_name).await; @@ -270,6 +272,7 @@ mod tests { Arc::new(MockKvCacheInvalidator::default()), create_partition_rule_manager().await, Arc::new(DatanodeClients::default()), + table_metadata_manager, )); let inserter = DistInserter::new( diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index b1fa833dca..5233910eb0 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use client::Database; use common_error::prelude::BoxedError; -use common_meta::key::TableRouteKey; +use common_meta::key::{TableMetadataManagerRef, TableRouteKey}; use common_meta::rpc::store::{MoveValueRequest, PutRequest}; use common_meta::table_name::TableName; use common_query::error::Result as QueryResult; @@ -72,6 +72,8 @@ pub struct DistTable { table_name: TableName, table_info: TableInfoRef, catalog_manager: Arc, + #[allow(unused)] + table_metadata_manager: TableMetadataManagerRef, } #[async_trait] @@ -242,11 +244,13 @@ impl DistTable { table_name: TableName, table_info: TableInfoRef, catalog_manager: Arc, + table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { table_name, table_info, catalog_manager, + table_metadata_manager, } } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index c04031589a..2768336cff 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -537,16 +537,16 @@ mod tests { assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } - fn new_table_info() -> RawTableInfo { + fn new_table_info(table_name: &TableName) -> RawTableInfo { RawTableInfo { ident: TableIdent { table_id: 0, version: 0, }, - name: "t".to_string(), + name: table_name.table_name.clone(), desc: None, - catalog_name: "c".to_string(), - schema_name: "s".to_string(), + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), meta: RawTableMeta { schema: RawSchema { column_schemas: vec![ColumnSchema::new( @@ -579,8 +579,9 @@ mod tests { .build(); meta_client.start(urls).await.unwrap(); - let table_info = new_table_info(); - let req = CreateRequest::new(TableName::new("c", "s", "t"), &table_info); + let table_name = TableName::new("c", "s", "t"); + let table_info = new_table_info(&table_name); + let req = CreateRequest::new(table_name, &table_info); let res = meta_client.create_route(req).await; assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } @@ -674,7 +675,7 @@ mod tests { value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()], }; let table_name = TableName::new("test_catalog", "test_schema", "test_table"); - let table_info = new_table_info(); + let table_info = new_table_info(&table_name); let req = CreateRequest::new(table_name.clone(), &table_info) .add_partition(p1) .add_partition(p2); diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 87e692e364..ef1289d5b0 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -29,13 +29,13 @@ etcd-client.workspace = true futures.workspace = true h2 = "0.3" http-body = "0.4" -lazy_static = "1.4" +lazy_static.workspace = true metrics.workspace = true once_cell = "1.17" parking_lot = "0.12" prost.workspace = true rand.workspace = true -regex = "1.6" +regex.workspace = true serde = "1.0" serde_json = "1.0" snafu.workspace = true diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs index e7e07b66ba..eeb8b89648 100644 --- a/src/meta-srv/src/ddl.rs +++ b/src/meta-srv/src/ddl.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use client::client_manager::DatanodeClients; +use common_meta::key::TableMetadataManagerRef; use common_meta::rpc::ddl::{CreateTableTask, DropTableTask}; use common_meta::rpc::router::TableRoute; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; @@ -34,6 +35,7 @@ pub struct DdlManager { datanode_clients: Arc, mailbox: MailboxRef, server_addr: String, + table_metadata_manager: TableMetadataManagerRef, } #[derive(Clone)] @@ -42,6 +44,8 @@ pub(crate) struct DdlContext { pub(crate) datanode_clients: Arc, pub(crate) mailbox: MailboxRef, pub(crate) server_addr: String, + #[allow(unused)] + pub(crate) table_metadata_manager: TableMetadataManagerRef, } impl DdlManager { @@ -51,6 +55,7 @@ impl DdlManager { datanode_clients: Arc, mailbox: MailboxRef, server_addr: String, + table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { procedure_manager, @@ -58,6 +63,7 @@ impl DdlManager { datanode_clients, mailbox, server_addr, + table_metadata_manager, } } @@ -67,6 +73,7 @@ impl DdlManager { datanode_clients: self.datanode_clients.clone(), mailbox: self.mailbox.clone(), server_addr: self.server_addr.clone(), + table_metadata_manager: self.table_metadata_manager.clone(), } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 98c3b52c9e..008e6a023f 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -422,6 +422,12 @@ pub enum Error { source: BoxedError, location: Location, }, + + #[snafu(display("Table metadata manager error: {}", source))] + TableMetadataManager { + source: common_meta::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -512,6 +518,7 @@ impl ErrorExt for Error { Error::TableRouteConversion { source, .. } | Error::ConvertProtoData { source, .. } + | Error::TableMetadataManager { source, .. } | Error::ConvertEtcdTxnObject { source, .. } => source.status_code(), Error::Other { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 28893f0397..db8d73362a 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -142,12 +142,15 @@ mod tests { use std::sync::atomic::AtomicBool; use std::sync::Arc; + use common_meta::key::TableMetadataManager; + use super::*; use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pushers}; use crate::keys::StatKey; use crate::sequence::Sequence; use crate::service::store::cached_kv::LeaderCachedKvStore; + use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; #[tokio::test] @@ -168,13 +171,16 @@ mod tests { let ctx = Context { server_addr: "127.0.0.1:0000".to_string(), in_memory, - kv_store, + kv_store: kv_store.clone(), leader_cached_kv_store, meta_peer_client, mailbox, election: None, skip_all: Arc::new(AtomicBool::new(false)), is_infancy: false, + table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))), }; let handler = PersistStatsHandler::default(); diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index e7f86285ff..0d0997e2d8 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; use catalog::helper::TableGlobalKey; use common_meta::ident::TableIdent; +use common_meta::key::TableMetadataManagerRef; use common_meta::ClusterId; use store_api::storage::{RegionId, RegionNumber}; @@ -37,16 +38,20 @@ pub(crate) const REGION_LEASE_SECONDS: u64 = 20; pub(crate) struct RegionLeaseHandler { kv_store: KvStoreRef, region_failover_manager: Option>, + #[allow(unused)] + table_metadata_manager: TableMetadataManagerRef, } impl RegionLeaseHandler { pub(crate) fn new( kv_store: KvStoreRef, region_failover_manager: Option>, + table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { kv_store, region_failover_manager, + table_metadata_manager, } } @@ -149,10 +154,12 @@ impl HeartbeatHandler for RegionLeaseHandler { #[cfg(test)] mod test { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_meta::key::TableMetadataManager; use super::*; use crate::handler::node_stat::{RegionStat, Stat}; use crate::metasrv::builder::MetaSrvBuilder; + use crate::service::store::kv::KvBackendAdapter; use crate::test_util; #[tokio::test] @@ -165,6 +172,9 @@ mod test { .clone(); let table_name = "my_table"; + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))); let _ = table_routes::tests::prepare_table_global_value(&kv_store, table_name).await; let table_ident = TableIdent { @@ -184,7 +194,11 @@ mod test { region_number: 1, }); - let handler = RegionLeaseHandler::new(kv_store, Some(region_failover_manager)); + let handler = RegionLeaseHandler::new( + kv_store, + Some(region_failover_manager), + table_metadata_manager, + ); let req = HeartbeatRequest { duration_since_epoch: 1234, diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 1210e6436c..25e3ff5bd6 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -51,12 +51,14 @@ mod tests { use std::sync::Arc; use api::v1::meta::{HeartbeatResponse, RequestHeader}; + use common_meta::key::TableMetadataManager; use super::*; use crate::cluster::MetaPeerClientBuilder; use crate::handler::{Context, HeartbeatMailbox, Pushers}; use crate::sequence::Sequence; use crate::service::store::cached_kv::LeaderCachedKvStore; + use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; #[tokio::test] @@ -77,13 +79,16 @@ mod tests { let mut ctx = Context { server_addr: "127.0.0.1:0000".to_string(), in_memory, - kv_store, + kv_store: kv_store.clone(), leader_cached_kv_store, meta_peer_client, mailbox, election: None, skip_all: Arc::new(AtomicBool::new(false)), is_infancy: false, + table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))), }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 76ce1e36f6..0811525c50 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use api::v1::meta::Peer; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_meta::key::TableMetadataManagerRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; @@ -90,6 +91,7 @@ pub struct Context { pub election: Option, pub skip_all: Arc, pub is_infancy: bool, + pub table_metadata_manager: TableMetadataManagerRef, } impl Context { @@ -145,6 +147,7 @@ pub struct MetaSrv { metadata_service: MetadataServiceRef, mailbox: MailboxRef, ddl_manager: DdlManagerRef, + table_metadata_manager: TableMetadataManagerRef, } impl MetaSrv { @@ -294,6 +297,10 @@ impl MetaSrv { &self.procedure_manager } + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } + #[inline] pub fn new_ctx(&self) -> Context { let server_addr = self.options().server_addr.clone(); @@ -314,6 +321,7 @@ impl MetaSrv { election, skip_all, is_infancy: false, + table_metadata_manager: self.table_metadata_manager.clone(), } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index df20c11f59..c7991780b5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -16,6 +16,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use client::client_manager::DatanodeClients; +use common_meta::key::TableMetadataManager; use common_procedure::local::{LocalManager, ManagerConfig}; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; @@ -39,7 +40,7 @@ use crate::procedure::state_store::MetaStateStore; use crate::selector::lease_based::LeaseBasedSelector; use crate::sequence::Sequence; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; -use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; +use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef}; use crate::service::store::memory::MemStore; // TODO(fys): try use derive_builder macro @@ -173,6 +174,10 @@ impl MetaSrvBuilder { .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); + let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( + kv_store.clone(), + ))); + // TODO(weny): considers to modify the default config of procedure manager let ddl_manager = Arc::new(DdlManager::new( procedure_manager.clone(), @@ -180,6 +185,7 @@ impl MetaSrvBuilder { datanode_clients.unwrap_or_else(|| Arc::new(DatanodeClients::default())), mailbox.clone(), options.server_addr.clone(), + table_metadata_manager.clone(), )); let _ = ddl_manager.try_start(); @@ -204,6 +210,7 @@ impl MetaSrvBuilder { table: None, }, lock.clone(), + table_metadata_manager.clone(), )); Some( @@ -217,6 +224,7 @@ impl MetaSrvBuilder { region_failover_handler .as_ref() .map(|x| x.region_failover_manager().clone()), + table_metadata_manager.clone(), ); let group = HeartbeatHandlerGroup::new(pushers); @@ -254,6 +262,7 @@ impl MetaSrvBuilder { metadata_service, mailbox, ddl_manager, + table_metadata_manager, }) } } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 167dfe170b..175dfc6a76 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -27,6 +27,7 @@ use std::time::Duration; use async_trait::async_trait; use catalog::helper::TableGlobalKey; use common_meta::ident::TableIdent; +use common_meta::key::TableMetadataManagerRef; use common_meta::{ClusterId, RegionIdent}; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -74,6 +75,7 @@ pub(crate) struct RegionFailoverManager { selector_ctx: SelectorContext, dist_lock: DistLockRef, running_procedures: Arc>>, + table_metadata_manager: TableMetadataManagerRef, } struct FailoverProcedureGuard { @@ -94,6 +96,7 @@ impl RegionFailoverManager { selector: SelectorRef, selector_ctx: SelectorContext, dist_lock: DistLockRef, + table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { mailbox, @@ -102,6 +105,7 @@ impl RegionFailoverManager { selector_ctx, dist_lock, running_procedures: Arc::new(RwLock::new(HashSet::new())), + table_metadata_manager, } } @@ -111,6 +115,7 @@ impl RegionFailoverManager { selector: self.selector.clone(), selector_ctx: self.selector_ctx.clone(), dist_lock: self.dist_lock.clone(), + table_metadata_manager: self.table_metadata_manager.clone(), } } @@ -228,6 +233,7 @@ pub struct RegionFailoverContext { pub selector: SelectorRef, pub selector_ctx: SelectorContext, pub dist_lock: DistLockRef, + pub table_metadata_manager: TableMetadataManagerRef, } /// The state machine of region failover procedure. Driven by the call to `next`. @@ -374,6 +380,7 @@ mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; + use common_meta::key::TableMetadataManager; use common_meta::DatanodeId; use common_procedure::BoxedProcedure; use rand::prelude::SliceRandom; @@ -386,6 +393,7 @@ mod tests { use crate::selector::{Namespace, Selector}; use crate::sequence::Sequence; use crate::service::mailbox::Channel; + use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; use crate::service::store::memory::MemStore; use crate::table_routes; @@ -468,7 +476,7 @@ mod tests { } pub async fn build(self) -> TestingEnv { - let kv_store = Arc::new(MemStore::new()) as _; + let kv_store: KvStoreRef = Arc::new(MemStore::new()); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) .in_memory(Arc::new(MemStore::new())) @@ -478,6 +486,9 @@ mod tests { .unwrap(); let table = "my_table"; + let table_metadata_manager = Arc::new(TableMetadataManager::new( + KvBackendAdapter::wrap(kv_store.clone()), + )); let (_, table_global_value) = table_routes::tests::prepare_table_global_value(&kv_store, table).await; @@ -524,6 +535,7 @@ mod tests { selector, selector_ctx, dist_lock: Arc::new(MemLock::default()), + table_metadata_manager, }, pushers, heartbeat_receivers, diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index c9efebb798..3859bb6b71 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -65,6 +65,7 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { "/tables", meta::TablesHandler { kv_store: meta_srv.kv_store(), + table_metadata_manager: meta_srv.table_metadata_manager().clone(), }, ); @@ -72,6 +73,7 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { "/table", meta::TableHandler { kv_store: meta_srv.kv_store(), + table_metadata_manager: meta_srv.table_metadata_manager().clone(), }, ); diff --git a/src/meta-srv/src/service/admin/meta.rs b/src/meta-srv/src/service/admin/meta.rs index 6bcb6c97d5..3a8c5c9e7c 100644 --- a/src/meta-srv/src/service/admin/meta.rs +++ b/src/meta-srv/src/service/admin/meta.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use catalog::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, TABLE_GLOBAL_KEY_PREFIX, }; +use common_meta::key::TableMetadataManagerRef; use common_meta::rpc::store::{RangeRequest, RangeResponse}; use common_meta::util; use snafu::{OptionExt, ResultExt}; @@ -37,10 +38,12 @@ pub struct SchemasHandler { pub struct TablesHandler { pub kv_store: KvStoreRef, + pub table_metadata_manager: TableMetadataManagerRef, } pub struct TableHandler { pub kv_store: KvStoreRef, + pub table_metadata_manager: TableMetadataManagerRef, } #[async_trait::async_trait] diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs index febbf3a7fa..28b8485740 100644 --- a/src/meta-srv/src/service/store/kv.rs +++ b/src/meta-srv/src/service/store/kv.rs @@ -12,9 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::sync::Arc; -use common_meta::kv_backend::KvBackend; +use async_trait::async_trait; +use common_error::ext::BoxedError; +use common_meta::error::MetaSrvSnafu; +use common_meta::kv_backend::txn::{Txn, TxnResponse}; +use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; +use common_meta::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, + DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, + RangeRequest, RangeResponse, +}; +use snafu::ResultExt; use crate::error::Error; @@ -24,3 +36,109 @@ pub type ResettableKvStoreRef = Arc; pub trait ResettableKvStore: KvBackend { fn reset(&self); } + +/// An adaptor to bridge [KvStoreRef] and [KvBackendRef]. +pub struct KvBackendAdapter(KvStoreRef); + +impl KvBackendAdapter { + pub fn wrap(kv_store: KvStoreRef) -> KvBackendRef { + Arc::new(Self(kv_store)) + } +} + +#[async_trait] +impl TxnService for KvBackendAdapter { + type Error = common_meta::error::Error; + + async fn txn(&self, txn: Txn) -> Result { + self.0 + .txn(txn) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } +} + +#[async_trait] +impl KvBackend for KvBackendAdapter { + fn name(&self) -> &str { + self.0.name() + } + + async fn range(&self, req: RangeRequest) -> Result { + self.0 + .range(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } + + async fn put(&self, req: PutRequest) -> Result { + self.0 + .put(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + self.0 + .batch_put(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } + + async fn compare_and_put( + &self, + req: CompareAndPutRequest, + ) -> Result { + self.0 + .compare_and_put(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } + + async fn delete_range( + &self, + req: DeleteRangeRequest, + ) -> Result { + self.0 + .delete_range(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } + + async fn batch_delete( + &self, + req: BatchDeleteRequest, + ) -> Result { + self.0 + .batch_delete(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + self.0 + .batch_get(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } + + async fn move_value(&self, req: MoveValueRequest) -> Result { + self.0 + .move_value(req) + .await + .map_err(BoxedError::new) + .context(MetaSrvSnafu) + } + + fn as_any(&self) -> &dyn Any { + self.0.as_any() + } +} diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index fbb2433ae0..2af0b48023 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_meta::key::TableMetadataManager; use common_procedure::local::{LocalManager, ManagerConfig}; use crate::cluster::MetaPeerClientBuilder; @@ -24,6 +25,7 @@ use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; use crate::selector::lease_based::LeaseBasedSelector; use crate::sequence::Sequence; +use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; pub(crate) fn create_region_failover_manager() -> Arc { @@ -49,7 +51,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { let selector_ctx = SelectorContext { datanode_lease_secs: 10, server_addr: "127.0.0.1:3002".to_string(), - kv_store, + kv_store: kv_store.clone(), meta_peer_client, catalog: None, schema: None, @@ -62,5 +64,6 @@ pub(crate) fn create_region_failover_manager() -> Arc { selector, selector_ctx, Arc::new(MemLock::default()), + Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(kv_store))), )) } diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 36f06ebc64..a330f21f0c 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -40,7 +40,7 @@ once_cell = "1.10" partition = { path = "../partition" } promql = { path = "../promql" } promql-parser = "0.1.1" -regex = "1.6" +regex.workspace = true serde.workspace = true serde_json = "1.0" session = { path = "../session" } diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index abc16ce1d7..f202ae60c6 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -170,7 +170,6 @@ mod tests { .await .unwrap(), ); - catalog_manager.start().await.unwrap(); let factory = QueryEngineFactory::new(catalog_manager.clone(), false); let query_engine = factory.query_engine(); diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index e076940aa4..42a14d99a2 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -62,7 +62,7 @@ promql-parser = "0.1.1" prost.workspace = true query = { path = "../query" } rand.workspace = true -regex = "1.6" +regex.workspace = true rustls = "0.21" rustls-pemfile = "1.0" rust-embed = { version = "6.6", features = ["debug-embed"] } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 47814c45a7..fa5b9f5180 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -28,7 +28,7 @@ datafusion.workspace = true futures.workspace = true futures-util.workspace = true itertools.workspace = true -lazy_static = "1.4" +lazy_static.workspace = true metrics.workspace = true object-store = { path = "../object-store" } parquet = { workspace = true, features = ["async"] } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 5abdab5ad1..f41c79ebb2 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -463,6 +463,12 @@ pub struct TableInfo { pub type TableInfoRef = Arc; +impl TableInfo { + pub fn table_id(&self) -> TableId { + self.ident.table_id + } +} + impl TableInfoBuilder { pub fn new>(name: S, meta: TableMeta) -> Self { Self { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 649dff7294..713926b790 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Role; -use catalog::remote::RemoteCatalogManager; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; @@ -33,6 +32,7 @@ use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; +use meta_srv::metadata_service::{DefaultMetadataService, MetadataService}; use meta_srv::metasrv::{MetaSrv, MetaSrvOptions}; use meta_srv::mocks::MockInfo; use meta_srv::service::store::kv::KvStoreRef; @@ -128,7 +128,16 @@ impl GreptimeDbClusterBuilder { ..Default::default() }; - meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await + let mock = + meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await; + + let metadata_service = DefaultMetadataService::new(mock.meta_srv.kv_store()); + metadata_service + .create_schema("another_catalog", "another_schema", true) + .await + .unwrap(); + + mock } async fn build_datanodes( @@ -208,15 +217,6 @@ impl GreptimeDbClusterBuilder { if let Some(heartbeat) = heartbeat.as_ref() { heartbeat.start().await.unwrap(); } - // create another catalog and schema for testing - instance - .catalog_manager() - .as_any() - .downcast_ref::() - .unwrap() - .create_catalog_and_schema("another_catalog", "another_schema") - .await - .unwrap(); (instance, heartbeat) } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 535322ef70..5d56ddb472 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -369,6 +369,8 @@ pub async fn create_test_table( pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); + instance.start().await.unwrap(); + create_test_table( instance.catalog_manager(), instance.sql_handler(), @@ -380,7 +382,6 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router let frontend_instance = FeInstance::try_new_standalone(instance.clone()) .await .unwrap(); - instance.start().await.unwrap(); if let Some(heartbeat) = heartbeat { heartbeat.start().await.unwrap(); } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index cb465afecf..26f0ae7a82 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::Arc; use catalog::RegisterSchemaRequest; +use common_meta::key::TableMetadataManagerRef; use common_test_util::temp_dir::TempDir; use datanode::instance::Instance as DatanodeInstance; use frontend::instance::Instance; @@ -49,6 +50,11 @@ impl MockDistributedInstance { pub fn datanodes(&self) -> &HashMap> { &self.0.datanode_instances } + + #[allow(unused)] + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + self.0.meta_srv.table_metadata_manager() + } } pub struct MockStandaloneInstance { @@ -73,6 +79,8 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon .await .unwrap(); + dn_instance.start().await.unwrap(); + assert!(dn_instance .catalog_manager() .register_catalog("another_catalog".to_string()) @@ -88,7 +96,6 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon .await .is_ok()); - dn_instance.start().await.unwrap(); if let Some(heartbeat) = heartbeat { heartbeat.start().await.unwrap(); }; diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index f764c19e17..b9952d1263 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -344,6 +344,7 @@ async fn run_region_failover_procedure( table: None, }, dist_lock: meta_srv.lock().clone(), + table_metadata_manager: meta_srv.table_metadata_manager().clone(), }, ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));