fix: add concurrency control for catalog manager (#619)

This commit is contained in:
Lei, HUANG
2022-11-24 11:10:33 +08:00
committed by GitHub
parent 8be0f05570
commit 4038dd4067
14 changed files with 257 additions and 51 deletions

View File

@@ -42,6 +42,6 @@ log-store = { path = "../log-store" }
object-store = { path = "../object-store" }
opendal = "0.20"
storage = { path = "../storage" }
mito = { path = "../mito" }
mito = { path = "../mito", features = ["test"] }
tempdir = "0.3"
tokio = { version = "1.0", features = ["full"] }

View File

@@ -94,7 +94,7 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Table {} already exists", table))]
#[snafu(display("Table `{}` already exists", table))]
TableExists { table: String, backtrace: Backtrace },
#[snafu(display("Schema {} already exists", schema))]

View File

@@ -15,6 +15,7 @@
#![feature(assert_matches)]
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use common_telemetry::info;
@@ -128,6 +129,18 @@ pub struct RegisterTableRequest {
pub table: TableRef,
}
impl Debug for RegisterTableRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RegisterTableRequest")
.field("catalog", &self.catalog)
.field("schema", &self.schema)
.field("table_name", &self.table_name)
.field("table_id", &self.table_id)
.field("table", &self.table.table_info())
.finish()
}
}
#[derive(Clone)]
pub struct DeregisterTableRequest {
pub catalog: String,

View File

@@ -21,7 +21,7 @@ use common_catalog::consts::{
SYSTEM_CATALOG_NAME, SYSTEM_CATALOG_TABLE_NAME,
};
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::info;
use common_telemetry::{error, info};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::{BinaryVector, UInt8Vector};
use futures_util::lock::Mutex;
@@ -57,6 +57,7 @@ pub struct LocalCatalogManager {
engine: TableEngineRef,
next_table_id: AtomicU32,
init_lock: Mutex<bool>,
register_lock: Mutex<()>,
system_table_requests: Mutex<Vec<RegisterSystemTableRequest>>,
}
@@ -76,6 +77,7 @@ impl LocalCatalogManager {
engine,
next_table_id: AtomicU32::new(MIN_USER_TABLE_ID),
init_lock: Mutex::new(false),
register_lock: Mutex::new(()),
system_table_requests: Mutex::new(Vec::default()),
})
}
@@ -332,24 +334,40 @@ impl CatalogManager for LocalCatalogManager {
schema_info: format!("{}.{}", catalog_name, schema_name),
})?;
if schema.table_exist(&request.table_name)? {
return TableExistsSnafu {
table: format_full_table_name(catalog_name, schema_name, &request.table_name),
{
let _lock = self.register_lock.lock().await;
if let Some(existing) = schema.table(&request.table_name)? {
if existing.table_info().ident.table_id != request.table_id {
error!(
"Unexpected table register request: {:?}, existing: {:?}",
request,
existing.table_info()
);
return TableExistsSnafu {
table: format_full_table_name(
catalog_name,
schema_name,
&request.table_name,
),
}
.fail();
}
// Try to register table with same table id, just ignore.
Ok(false)
} else {
// table does not exist
self.system
.register_table(
catalog_name.clone(),
schema_name.clone(),
request.table_name.clone(),
request.table_id,
)
.await?;
schema.register_table(request.table_name, request.table)?;
Ok(true)
}
.fail();
}
self.system
.register_table(
catalog_name.clone(),
schema_name.clone(),
request.table_name.clone(),
request.table_id,
)
.await?;
schema.register_table(request.table_name, request.table)?;
Ok(true)
}
async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
@@ -374,17 +392,21 @@ impl CatalogManager for LocalCatalogManager {
.catalogs
.catalog(catalog_name)?
.context(CatalogNotFoundSnafu { catalog_name })?;
if catalog.schema(schema_name)?.is_some() {
return SchemaExistsSnafu {
schema: schema_name,
}
.fail();
{
let _lock = self.register_lock.lock().await;
ensure!(
catalog.schema(schema_name)?.is_none(),
SchemaExistsSnafu {
schema: schema_name,
}
);
self.system
.register_schema(request.catalog, schema_name.clone())
.await?;
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
Ok(true)
}
self.system
.register_schema(request.catalog, schema_name.clone())
.await?;
catalog.register_schema(request.schema, Arc::new(MemorySchemaProvider::new()))?;
Ok(true)
}
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {

View File

@@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_telemetry::error;
use snafu::OptionExt;
use table::metadata::TableId;
use table::table::TableIdProvider;
@@ -270,11 +271,21 @@ impl SchemaProvider for MemorySchemaProvider {
}
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
if self.table_exist(name.as_str())? {
return TableExistsSnafu { table: name }.fail()?;
}
let mut tables = self.tables.write().unwrap();
Ok(tables.insert(name, table))
if let Some(existing) = tables.get(name.as_str()) {
// if table with the same name but different table id exists, then it's a fatal bug
if existing.table_info().ident.table_id != table.table_info().ident.table_id {
error!(
"Unexpected table register: {:?}, existing: {:?}",
table.table_info(),
existing.table_info()
);
return TableExistsSnafu { table: name }.fail()?;
}
Ok(Some(existing.clone()))
} else {
Ok(tables.insert(name, table))
}
}
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
@@ -334,7 +345,7 @@ mod tests {
.unwrap()
.is_none());
assert!(provider.table_exist(table_name).unwrap());
let other_table = NumbersTable::default();
let other_table = NumbersTable::new(12);
let result = provider.register_table(table_name.to_string(), Arc::new(other_table));
let err = result.err().unwrap();
assert!(err.backtrace_opt().is_some());

View File

@@ -154,8 +154,8 @@ impl RemoteCatalogManager {
}
let table_key = TableGlobalKey::parse(&String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
let table_value = TableGlobalValue::parse(&String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
let table_value =
TableGlobalValue::from_bytes(&v).context(InvalidCatalogValueSnafu)?;
info!(
"Found catalog table entry, key: {}, value: {:?}",

View File

@@ -0,0 +1,132 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod tests {
use std::sync::Arc;
use catalog::local::LocalCatalogManager;
use catalog::{CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::{error, info};
use mito::config::EngineConfig;
use table::table::numbers::NumbersTable;
use table::TableRef;
use tokio::sync::Mutex;
async fn create_local_catalog_manager() -> Result<LocalCatalogManager, catalog::error::Error> {
let (_dir, object_store) =
mito::table::test_util::new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(mito::table::test_util::MockMitoEngine::new(
EngineConfig::default(),
mito::table::test_util::MockEngine::default(),
object_store,
));
let catalog_manager = LocalCatalogManager::try_new(mock_engine).await.unwrap();
catalog_manager.start().await?;
Ok(catalog_manager)
}
#[tokio::test]
async fn test_duplicate_register() {
let catalog_manager = create_local_catalog_manager().await.unwrap();
let request = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "test_table".to_string(),
table_id: 42,
table: Arc::new(NumbersTable::new(42)),
};
assert!(catalog_manager
.register_table(request.clone())
.await
.unwrap());
// register table with same table id will succeed with 0 as return val.
assert!(!catalog_manager.register_table(request).await.unwrap());
let err = catalog_manager
.register_table(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "test_table".to_string(),
table_id: 43,
table: Arc::new(NumbersTable::new(43)),
})
.await
.unwrap_err();
assert!(
err.to_string()
.contains("Table `greptime.public.test_table` already exists"),
"Actual error message: {}",
err
);
}
#[test]
fn test_concurrent_register() {
common_telemetry::init_default_ut_logging();
let rt = Arc::new(tokio::runtime::Builder::new_multi_thread().build().unwrap());
let catalog_manager =
Arc::new(rt.block_on(async { create_local_catalog_manager().await.unwrap() }));
let succeed: Arc<Mutex<Option<TableRef>>> = Arc::new(Mutex::new(None));
let mut handles = Vec::with_capacity(8);
for i in 0..8 {
let catalog = catalog_manager.clone();
let succeed = succeed.clone();
let handle = rt.spawn(async move {
let table_id = 42 + i;
let table = Arc::new(NumbersTable::new(table_id));
let req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "test_table".to_string(),
table_id,
table: table.clone(),
};
match catalog.register_table(req).await {
Ok(res) => {
if res {
let mut succeed = succeed.lock().await;
info!("Successfully registered table: {}", table_id);
*succeed = Some(table);
}
}
Err(_) => {
error!("Failed to register table {}", table_id);
}
}
});
handles.push(handle);
}
rt.block_on(async move {
for handle in handles {
handle.await.unwrap();
}
let guard = succeed.lock().await;
let table = guard.as_ref().unwrap();
let table_registered = catalog_manager
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "test_table")
.unwrap()
.unwrap();
assert_eq!(
table_registered.table_info().ident.table_id,
table.table_info().ident.table_id
);
});
}
}

View File

@@ -261,6 +261,10 @@ macro_rules! define_catalog_value {
.context(DeserializeCatalogEntryValueSnafu { raw: s.as_ref() })
}
pub fn from_bytes(bytes: impl AsRef<[u8]>) -> Result<Self, Error> {
Self::parse(&String::from_utf8_lossy(bytes.as_ref()))
}
pub fn as_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(serde_json::to_string(self)
.context(SerializeCatalogEntryValueSnafu)?

View File

@@ -84,7 +84,6 @@ impl SqlHandler {
// determine catalog and schema from the very beginning
let table_name = req.table_name.clone();
let table_id = req.id;
let table = self
.table_engine
.create_table(&ctx, req)
@@ -97,7 +96,7 @@ impl SqlHandler {
catalog: table.table_info().catalog_name.clone(),
schema: table.table_info().schema_name.clone(),
table_name: table_name.clone(),
table_id,
table_id: table.table_info().ident.table_id,
table,
};

View File

@@ -278,8 +278,7 @@ impl SchemaProvider for FrontendSchemaProvider {
}
Some(r) => r,
};
let val = TableGlobalValue::parse(String::from_utf8_lossy(&res.1))
.context(InvalidCatalogValueSnafu)?;
let val = TableGlobalValue::from_bytes(&res.1).context(InvalidCatalogValueSnafu)?;
let table = Arc::new(DistTable::new(
table_name,

View File

@@ -436,6 +436,9 @@ pub enum Error {
#[snafu(display("Failed to find leaders when altering table, table: {}", table))]
LeaderNotFound { table: String, backtrace: Backtrace },
#[snafu(display("Table already exists: `{}`", table))]
TableAlreadyExist { table: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -525,6 +528,7 @@ impl ErrorExt for Error {
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
Error::AlterExprToRequest { source, .. } => source.status_code(),
Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable,
Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
}
}

View File

@@ -23,7 +23,7 @@ use client::admin::{admin_result_to_output, Admin};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
use common_query::Output;
use common_telemetry::{debug, info};
use common_telemetry::{debug, error, info};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use meta_client::client::MetaClient;
@@ -239,17 +239,32 @@ impl DistInstance {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
}
.to_string();
let value = create_table_global_value(create_table, table_route)?
.as_bytes()
.context(error::CatalogEntrySerdeSnafu)?;
self.catalog_manager
if let Err(existing) = self
.catalog_manager
.backend()
.set(key.to_string().as_bytes(), &value)
.compare_and_set(key.as_bytes(), &[], &value)
.await
.context(error::CatalogSnafu)
.context(CatalogSnafu)?
{
let existing_bytes = existing.unwrap(); //this unwrap is safe since we compare with empty bytes and failed
let existing_value =
TableGlobalValue::from_bytes(&existing_bytes).context(CatalogEntrySerdeSnafu)?;
if existing_value.table_info.ident.table_id != create_table.table_id.unwrap() {
error!(
"Table with name {} already exists, value in catalog: {:?}",
key, existing_bytes
);
return error::TableAlreadyExistSnafu { table: key }.fail();
}
}
Ok(())
}
}

View File

@@ -216,8 +216,7 @@ async fn get_table_global_value(
let tv = get_from_store(kv_store, tg_key).await?;
match tv {
Some(tv) => {
let tv = TableGlobalValue::parse(&String::from_utf8_lossy(&tv))
.context(error::InvalidCatalogValueSnafu)?;
let tv = TableGlobalValue::from_bytes(&tv).context(error::InvalidCatalogValueSnafu)?;
Ok(Some(tv))
}
None => Ok(None),

View File

@@ -27,24 +27,26 @@ use futures::task::{Context, Poll};
use futures::Stream;
use crate::error::Result;
use crate::metadata::{TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType};
use crate::metadata::{TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType};
use crate::table::scan::SimpleTableScan;
use crate::table::{Expr, Table};
/// numbers table for test
#[derive(Debug, Clone)]
pub struct NumbersTable {
table_id: TableId,
schema: SchemaRef,
}
impl Default for NumbersTable {
fn default() -> Self {
impl NumbersTable {
pub fn new(table_id: TableId) -> Self {
let column_schemas = vec![ColumnSchema::new(
"number",
ConcreteDataType::uint32_datatype(),
false,
)];
Self {
table_id,
schema: Arc::new(
SchemaBuilder::try_from_columns(column_schemas)
.unwrap()
@@ -55,6 +57,12 @@ impl Default for NumbersTable {
}
}
impl Default for NumbersTable {
fn default() -> Self {
NumbersTable::new(1)
}
}
#[async_trait::async_trait]
impl Table for NumbersTable {
fn as_any(&self) -> &dyn Any {
@@ -68,7 +76,7 @@ impl Table for NumbersTable {
fn table_info(&self) -> TableInfoRef {
Arc::new(
TableInfoBuilder::default()
.table_id(1)
.table_id(self.table_id)
.name("numbers")
.catalog_name("greptime")
.schema_name("public")