From 4038dd406700e14b08fbbe753b504f808151bce5 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 24 Nov 2022 11:10:33 +0800 Subject: [PATCH] fix: add concurrency control for catalog manager (#619) --- src/catalog/Cargo.toml | 2 +- src/catalog/src/error.rs | 2 +- src/catalog/src/lib.rs | 13 +++ src/catalog/src/local/manager.rs | 76 ++++++++----- src/catalog/src/local/memory.rs | 21 +++- src/catalog/src/remote/manager.rs | 4 +- src/catalog/tests/local_catalog_tests.rs | 132 +++++++++++++++++++++++ src/common/catalog/src/helper.rs | 4 + src/datanode/src/sql/create.rs | 3 +- src/frontend/src/catalog.rs | 3 +- src/frontend/src/error.rs | 4 + src/frontend/src/instance/distributed.rs | 25 ++++- src/meta-srv/src/service/router.rs | 3 +- src/table/src/table/numbers.rs | 16 ++- 14 files changed, 257 insertions(+), 51 deletions(-) create mode 100644 src/catalog/tests/local_catalog_tests.rs diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index ad52271a4e..dddf607078 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -42,6 +42,6 @@ log-store = { path = "../log-store" } object-store = { path = "../object-store" } opendal = "0.20" storage = { path = "../storage" } -mito = { path = "../mito" } +mito = { path = "../mito", features = ["test"] } tempdir = "0.3" tokio = { version = "1.0", features = ["full"] } diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index a5467f6f1a..05e6944cd5 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -94,7 +94,7 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Table {} already exists", table))] + #[snafu(display("Table `{}` already exists", table))] TableExists { table: String, backtrace: Backtrace }, #[snafu(display("Schema {} already exists", schema))] diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 9f4f5908ad..1490d6a67b 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -15,6 +15,7 @@ #![feature(assert_matches)] use std::any::Any; +use std::fmt::{Debug, Formatter}; use std::sync::Arc; use common_telemetry::info; @@ -128,6 +129,18 @@ pub struct RegisterTableRequest { pub table: TableRef, } +impl Debug for RegisterTableRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RegisterTableRequest") + .field("catalog", &self.catalog) + .field("schema", &self.schema) + .field("table_name", &self.table_name) + .field("table_id", &self.table_id) + .field("table", &self.table.table_info()) + .finish() + } +} + #[derive(Clone)] pub struct DeregisterTableRequest { pub catalog: String, diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 34f9d94488..d09411cbaa 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -21,7 +21,7 @@ use common_catalog::consts::{ SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME, }; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; -use common_telemetry::info; +use common_telemetry::{error, info}; use datatypes::prelude::ScalarVector; use datatypes::vectors::{BinaryVector, UInt8Vector}; use futures_util::lock::Mutex; @@ -57,6 +57,7 @@ pub struct LocalCatalogManager { engine: TableEngineRef, next_table_id: AtomicU32, init_lock: Mutex, + register_lock: Mutex<()>, system_table_requests: Mutex>, } @@ -76,6 +77,7 @@ impl LocalCatalogManager { engine, next_table_id: AtomicU32::new(MIN_USER_TABLE_ID), init_lock: Mutex::new(false), + register_lock: Mutex::new(()), system_table_requests: Mutex::new(Vec::default()), }) } @@ -332,24 +334,40 @@ impl CatalogManager for LocalCatalogManager { schema_info: format!("{}.{}", catalog_name, schema_name), })?; - if schema.table_exist(&request.table_name)? { - return TableExistsSnafu { - table: format_full_table_name(catalog_name, schema_name, &request.table_name), + { + let _lock = self.register_lock.lock().await; + if let Some(existing) = schema.table(&request.table_name)? { + if existing.table_info().ident.table_id != request.table_id { + error!( + "Unexpected table register request: {:?}, existing: {:?}", + request, + existing.table_info() + ); + return TableExistsSnafu { + table: format_full_table_name( + catalog_name, + schema_name, + &request.table_name, + ), + } + .fail(); + } + // Try to register table with same table id, just ignore. + Ok(false) + } else { + // table does not exist + self.system + .register_table( + catalog_name.clone(), + schema_name.clone(), + request.table_name.clone(), + request.table_id, + ) + .await?; + schema.register_table(request.table_name, request.table)?; + Ok(true) } - .fail(); } - - self.system - .register_table( - catalog_name.clone(), - schema_name.clone(), - request.table_name.clone(), - request.table_id, - ) - .await?; - - schema.register_table(request.table_name, request.table)?; - Ok(true) } async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result { @@ -374,17 +392,21 @@ impl CatalogManager for LocalCatalogManager { .catalogs .catalog(catalog_name)? .context(CatalogNotFoundSnafu { catalog_name })?; - if catalog.schema(schema_name)?.is_some() { - return SchemaExistsSnafu { - schema: schema_name, - } - .fail(); + + { + let _lock = self.register_lock.lock().await; + ensure!( + catalog.schema(schema_name)?.is_none(), + SchemaExistsSnafu { + schema: schema_name, + } + ); + self.system + .register_schema(request.catalog, schema_name.clone()) + .await?; + catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?; + Ok(true) } - self.system - .register_schema(request.catalog, schema_name.clone()) - .await?; - catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?; - Ok(true) } async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 8018b7ba44..a870c84aa3 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; use common_catalog::consts::MIN_USER_TABLE_ID; +use common_telemetry::error; use snafu::OptionExt; use table::metadata::TableId; use table::table::TableIdProvider; @@ -270,11 +271,21 @@ impl SchemaProvider for MemorySchemaProvider { } fn register_table(&self, name: String, table: TableRef) -> Result> { - if self.table_exist(name.as_str())? { - return TableExistsSnafu { table: name }.fail()?; - } let mut tables = self.tables.write().unwrap(); - Ok(tables.insert(name, table)) + if let Some(existing) = tables.get(name.as_str()) { + // if table with the same name but different table id exists, then it's a fatal bug + if existing.table_info().ident.table_id != table.table_info().ident.table_id { + error!( + "Unexpected table register: {:?}, existing: {:?}", + table.table_info(), + existing.table_info() + ); + return TableExistsSnafu { table: name }.fail()?; + } + Ok(Some(existing.clone())) + } else { + Ok(tables.insert(name, table)) + } } fn deregister_table(&self, name: &str) -> Result> { @@ -334,7 +345,7 @@ mod tests { .unwrap() .is_none()); assert!(provider.table_exist(table_name).unwrap()); - let other_table = NumbersTable::default(); + let other_table = NumbersTable::new(12); let result = provider.register_table(table_name.to_string(), Arc::new(other_table)); let err = result.err().unwrap(); assert!(err.backtrace_opt().is_some()); diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 19ffa8e192..ba7c09f6c0 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -154,8 +154,8 @@ impl RemoteCatalogManager { } let table_key = TableGlobalKey::parse(&String::from_utf8_lossy(&k)) .context(InvalidCatalogValueSnafu)?; - let table_value = TableGlobalValue::parse(&String::from_utf8_lossy(&v)) - .context(InvalidCatalogValueSnafu)?; + let table_value = + TableGlobalValue::from_bytes(&v).context(InvalidCatalogValueSnafu)?; info!( "Found catalog table entry, key: {}, value: {:?}", diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs new file mode 100644 index 0000000000..2e57754077 --- /dev/null +++ b/src/catalog/tests/local_catalog_tests.rs @@ -0,0 +1,132 @@ +// Copyright 2022 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use catalog::local::LocalCatalogManager; + use catalog::{CatalogManager, RegisterTableRequest}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_telemetry::{error, info}; + use mito::config::EngineConfig; + use table::table::numbers::NumbersTable; + use table::TableRef; + use tokio::sync::Mutex; + + async fn create_local_catalog_manager() -> Result { + let (_dir, object_store) = + mito::table::test_util::new_test_object_store("setup_mock_engine_and_table").await; + let mock_engine = Arc::new(mito::table::test_util::MockMitoEngine::new( + EngineConfig::default(), + mito::table::test_util::MockEngine::default(), + object_store, + )); + let catalog_manager = LocalCatalogManager::try_new(mock_engine).await.unwrap(); + catalog_manager.start().await?; + Ok(catalog_manager) + } + + #[tokio::test] + async fn test_duplicate_register() { + let catalog_manager = create_local_catalog_manager().await.unwrap(); + let request = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "test_table".to_string(), + table_id: 42, + table: Arc::new(NumbersTable::new(42)), + }; + assert!(catalog_manager + .register_table(request.clone()) + .await + .unwrap()); + + // register table with same table id will succeed with 0 as return val. + assert!(!catalog_manager.register_table(request).await.unwrap()); + + let err = catalog_manager + .register_table(RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "test_table".to_string(), + table_id: 43, + table: Arc::new(NumbersTable::new(43)), + }) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("Table `greptime.public.test_table` already exists"), + "Actual error message: {}", + err + ); + } + + #[test] + fn test_concurrent_register() { + common_telemetry::init_default_ut_logging(); + let rt = Arc::new(tokio::runtime::Builder::new_multi_thread().build().unwrap()); + let catalog_manager = + Arc::new(rt.block_on(async { create_local_catalog_manager().await.unwrap() })); + + let succeed: Arc>> = Arc::new(Mutex::new(None)); + + let mut handles = Vec::with_capacity(8); + for i in 0..8 { + let catalog = catalog_manager.clone(); + let succeed = succeed.clone(); + let handle = rt.spawn(async move { + let table_id = 42 + i; + let table = Arc::new(NumbersTable::new(table_id)); + let req = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "test_table".to_string(), + table_id, + table: table.clone(), + }; + match catalog.register_table(req).await { + Ok(res) => { + if res { + let mut succeed = succeed.lock().await; + info!("Successfully registered table: {}", table_id); + *succeed = Some(table); + } + } + Err(_) => { + error!("Failed to register table {}", table_id); + } + } + }); + handles.push(handle); + } + + rt.block_on(async move { + for handle in handles { + handle.await.unwrap(); + } + let guard = succeed.lock().await; + let table = guard.as_ref().unwrap(); + let table_registered = catalog_manager + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "test_table") + .unwrap() + .unwrap(); + assert_eq!( + table_registered.table_info().ident.table_id, + table.table_info().ident.table_id + ); + }); + } +} diff --git a/src/common/catalog/src/helper.rs b/src/common/catalog/src/helper.rs index ccfe362969..3b7f20d959 100644 --- a/src/common/catalog/src/helper.rs +++ b/src/common/catalog/src/helper.rs @@ -261,6 +261,10 @@ macro_rules! define_catalog_value { .context(DeserializeCatalogEntryValueSnafu { raw: s.as_ref() }) } + pub fn from_bytes(bytes: impl AsRef<[u8]>) -> Result { + Self::parse(&String::from_utf8_lossy(bytes.as_ref())) + } + pub fn as_bytes(&self) -> Result, Error> { Ok(serde_json::to_string(self) .context(SerializeCatalogEntryValueSnafu)? diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index c134832063..c82484ec88 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -84,7 +84,6 @@ impl SqlHandler { // determine catalog and schema from the very beginning let table_name = req.table_name.clone(); - let table_id = req.id; let table = self .table_engine .create_table(&ctx, req) @@ -97,7 +96,7 @@ impl SqlHandler { catalog: table.table_info().catalog_name.clone(), schema: table.table_info().schema_name.clone(), table_name: table_name.clone(), - table_id, + table_id: table.table_info().ident.table_id, table, }; diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index e2fcbd0cc2..86356db08c 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -278,8 +278,7 @@ impl SchemaProvider for FrontendSchemaProvider { } Some(r) => r, }; - let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1)) - .context(InvalidCatalogValueSnafu)?; + let val = TableGlobalValue::from_bytes(&res.1).context(InvalidCatalogValueSnafu)?; let table = Arc::new(DistTable::new( table_name, diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 2e56517db8..5be2009877 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -436,6 +436,9 @@ pub enum Error { #[snafu(display("Failed to find leaders when altering table, table: {}", table))] LeaderNotFound { table: String, backtrace: Backtrace }, + + #[snafu(display("Table already exists: `{}`", table))] + TableAlreadyExist { table: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -525,6 +528,7 @@ impl ErrorExt for Error { Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments, Error::AlterExprToRequest { source, .. } => source.status_code(), Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable, + Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, } } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 0611f2bb4a..4098f040f2 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -23,7 +23,7 @@ use client::admin::{admin_result_to_output, Admin}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; use common_query::Output; -use common_telemetry::{debug, info}; +use common_telemetry::{debug, error, info}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::RawSchema; use meta_client::client::MetaClient; @@ -239,17 +239,32 @@ impl DistInstance { catalog_name: table_name.catalog_name.clone(), schema_name: table_name.schema_name.clone(), table_name: table_name.table_name.clone(), - }; + } + .to_string(); let value = create_table_global_value(create_table, table_route)? .as_bytes() .context(error::CatalogEntrySerdeSnafu)?; - self.catalog_manager + if let Err(existing) = self + .catalog_manager .backend() - .set(key.to_string().as_bytes(), &value) + .compare_and_set(key.as_bytes(), &[], &value) .await - .context(error::CatalogSnafu) + .context(CatalogSnafu)? + { + let existing_bytes = existing.unwrap(); //this unwrap is safe since we compare with empty bytes and failed + let existing_value = + TableGlobalValue::from_bytes(&existing_bytes).context(CatalogEntrySerdeSnafu)?; + if existing_value.table_info.ident.table_id != create_table.table_id.unwrap() { + error!( + "Table with name {} already exists, value in catalog: {:?}", + key, existing_bytes + ); + return error::TableAlreadyExistSnafu { table: key }.fail(); + } + } + Ok(()) } } diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 11226fca1a..ba924e61d2 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -216,8 +216,7 @@ async fn get_table_global_value( let tv = get_from_store(kv_store, tg_key).await?; match tv { Some(tv) => { - let tv = TableGlobalValue::parse(&String::from_utf8_lossy(&tv)) - .context(error::InvalidCatalogValueSnafu)?; + let tv = TableGlobalValue::from_bytes(&tv).context(error::InvalidCatalogValueSnafu)?; Ok(Some(tv)) } None => Ok(None), diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 26eab9a8bf..db33769c31 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -27,24 +27,26 @@ use futures::task::{Context, Poll}; use futures::Stream; use crate::error::Result; -use crate::metadata::{TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; +use crate::metadata::{TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; use crate::table::scan::SimpleTableScan; use crate::table::{Expr, Table}; /// numbers table for test #[derive(Debug, Clone)] pub struct NumbersTable { + table_id: TableId, schema: SchemaRef, } -impl Default for NumbersTable { - fn default() -> Self { +impl NumbersTable { + pub fn new(table_id: TableId) -> Self { let column_schemas = vec![ColumnSchema::new( "number", ConcreteDataType::uint32_datatype(), false, )]; Self { + table_id, schema: Arc::new( SchemaBuilder::try_from_columns(column_schemas) .unwrap() @@ -55,6 +57,12 @@ impl Default for NumbersTable { } } +impl Default for NumbersTable { + fn default() -> Self { + NumbersTable::new(1) + } +} + #[async_trait::async_trait] impl Table for NumbersTable { fn as_any(&self) -> &dyn Any { @@ -68,7 +76,7 @@ impl Table for NumbersTable { fn table_info(&self) -> TableInfoRef { Arc::new( TableInfoBuilder::default() - .table_id(1) + .table_id(self.table_id) .name("numbers") .catalog_name("greptime") .schema_name("public")