refactor: parallelize open table (#1392)

* refactor: change open_table to parallel on datanode startup

* chore: try move out register schema table

* chore: change mito engine to key lock

* chore: minor change

* chore: minor change

* chore: update error definition

* chore: remove rwlock on tables

* chore: try parallel register table on schema provider

* chore: add rt log

* chore: add region open rt log

* chore: add actual open region rt log

* chore: add recover rt log

* chore: divide to three part rt log

* chore: remove debug log

* chore: add replay rt log

* chore: update cargo lock

* chore: remove debug log

* chore: revert unused change

* chore: update err msg

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* chore: fix cr issue

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* chore: fix cr issue

* chore: fix cr issue

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
This commit is contained in:
shuiyisong
2023-04-18 21:36:29 +08:00
committed by GitHub
parent de8b889701
commit 145f8eb5a7
9 changed files with 201 additions and 151 deletions

12
Cargo.lock generated
View File

@@ -1188,6 +1188,7 @@ dependencies = [
"datatypes",
"futures",
"futures-util",
"key-lock",
"lazy_static",
"log-store",
"meta-client",
@@ -4285,6 +4286,15 @@ dependencies = [
"cpufeatures",
]
[[package]]
name = "key-lock"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "792fd2ded02f283f704cda2c8127e256a012beb48e87c9f09bf459e3db051873"
dependencies = [
"tokio",
]
[[package]]
name = "lalrpop"
version = "0.19.9"
@@ -4917,10 +4927,12 @@ dependencies = [
"common-telemetry",
"common-test-util",
"common-time",
"dashmap",
"datafusion",
"datafusion-common",
"datatypes",
"futures",
"key-lock",
"log-store",
"object-store",
"serde",

View File

@@ -24,6 +24,7 @@ datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util.workspace = true
key-lock = "0.1"
lazy_static = "1.4"
meta-client = { path = "../meta-client" }
parking_lot = "0.12"

View File

@@ -20,6 +20,7 @@ use common_error::prelude::{Snafu, StatusCode};
use datafusion::error::DataFusionError;
use datatypes::prelude::ConcreteDataType;
use snafu::Location;
use tokio::task::JoinError;
use crate::DeregisterTableRequest;
@@ -127,6 +128,9 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display("Failed to open table in parallel, source: {}", source))]
ParallelOpenTable { source: JoinError },
#[snafu(display("Table not found while opening table, table info: {}", table_info))]
TableNotFound {
table_info: String,
@@ -261,7 +265,8 @@ impl ErrorExt for Error {
| Error::IllegalManagerState { .. }
| Error::CatalogNotFound { .. }
| Error::InvalidEntryType { .. }
| Error::InvalidSystemTableDef { .. } => StatusCode::Unexpected,
| Error::InvalidSystemTableDef { .. }
| Error::ParallelOpenTable { .. } => StatusCode::Unexpected,
Error::SystemCatalog { .. }
| Error::EmptyValue { .. }

View File

@@ -26,7 +26,8 @@ use common_catalog::consts::{
use common_telemetry::{debug, error, info};
use dashmap::DashMap;
use futures::Stream;
use futures_util::StreamExt;
use futures_util::{StreamExt, TryStreamExt};
use key_lock::KeyLock;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
@@ -37,8 +38,9 @@ use table::TableRef;
use tokio::sync::Mutex;
use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, Result,
SchemaNotFoundSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu,
ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu,
TableExistsSnafu, UnimplementedSnafu,
};
use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
@@ -254,15 +256,31 @@ impl RemoteCatalogManager {
) -> Result<()> {
info!("initializing tables in {}.{}", catalog_name, schema_name);
let mut table_num = 0;
let mut tables = self.iter_remote_tables(catalog_name, schema_name).await;
while let Some(r) = tables.next().await {
let (table_key, table_value) = r?;
let table_ref = self.open_or_create_table(&table_key, &table_value).await?;
schema.register_table(table_key.table_name.to_string(), table_ref)?;
info!("Registered table {}", &table_key.table_name);
max_table_id = max_table_id.max(table_value.table_id());
let tables = self.iter_remote_tables(catalog_name, schema_name).await;
let kvs = tables.try_collect::<Vec<_>>().await?;
let node_id = self.node_id;
let joins = kvs
.into_iter()
.map(|(table_key, table_value)| {
let engine_manager = self.engine_manager.clone();
common_runtime::spawn_bg(async move {
open_or_create_table(node_id, engine_manager, &table_key, &table_value).await
})
})
.collect::<Vec<_>>();
let vec = futures::future::join_all(joins).await;
for res in vec {
let table_ref = res.context(ParallelOpenTableSnafu)??;
let table_info = table_ref.table_info();
let table_name = &table_info.name;
let table_id = table_info.ident.table_id;
schema.register_table(table_name.clone(), table_ref)?;
info!("Registered table {}", table_name);
max_table_id = max_table_id.max(table_id);
table_num += 1;
}
info!(
"initialized tables in {}.{}, total: {}",
catalog_name, schema_name, table_num
@@ -310,87 +328,88 @@ impl RemoteCatalogManager {
info!("Created catalog '{catalog_key}");
Ok(catalog_provider)
}
}
async fn open_or_create_table(
&self,
table_key: &TableGlobalKey,
table_value: &TableGlobalValue,
) -> Result<TableRef> {
let context = EngineContext {};
let TableGlobalKey {
catalog_name,
schema_name,
table_name,
..
} = table_key;
async fn open_or_create_table(
node_id: u64,
engine_manager: TableEngineManagerRef,
table_key: &TableGlobalKey,
table_value: &TableGlobalValue,
) -> Result<TableRef> {
let context = EngineContext {};
let TableGlobalKey {
catalog_name,
schema_name,
table_name,
..
} = table_key;
let table_id = table_value.table_id();
let table_id = table_value.table_id();
let TableGlobalValue {
table_info,
regions_id_map,
..
} = table_value;
let TableGlobalValue {
table_info,
regions_id_map,
..
} = table_value;
// unwrap safety: checked in yielding this table when `iter_remote_tables`
let region_numbers = regions_id_map.get(&self.node_id).unwrap();
// unwrap safety: checked in yielding this table when `iter_remote_tables`
let region_numbers = regions_id_map.get(&node_id).unwrap();
let request = OpenTableRequest {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
table_id,
};
let engine = self
.engine_manager
let request = OpenTableRequest {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
table_id,
};
let engine =
engine_manager
.engine(&table_info.meta.engine)
.context(TableEngineNotFoundSnafu {
engine_name: &table_info.meta.engine,
})?;
match engine
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"),
})? {
Some(table) => {
info!(
"Table opened: {}.{}.{}",
catalog_name, schema_name, table_name
);
Ok(table)
}
None => {
info!(
"Try create table: {}.{}.{}",
catalog_name, schema_name, table_name
);
match engine
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"),
})? {
Some(table) => {
info!(
"Table opened: {}.{}.{}",
catalog_name, schema_name, table_name
);
Ok(table)
}
None => {
info!(
"Try create table: {}.{}.{}",
catalog_name, schema_name, table_name
);
let meta = &table_info.meta;
let req = CreateTableRequest {
id: table_id,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
desc: None,
schema: meta.schema.clone(),
region_numbers: region_numbers.clone(),
primary_key_indices: meta.primary_key_indices.clone(),
create_if_not_exists: true,
table_options: meta.options.clone(),
engine: engine.name().to_string(),
};
let meta = &table_info.meta;
let req = CreateTableRequest {
id: table_id,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
desc: None,
schema: meta.schema.clone(),
region_numbers: region_numbers.clone(),
primary_key_indices: meta.primary_key_indices.clone(),
create_if_not_exists: true,
table_options: meta.options.clone(),
engine: engine.name().to_string(),
};
engine
.create_table(&context, req)
.await
.context(CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id:{}",
&catalog_name, &schema_name, &table_name, table_id
),
})
}
engine
.create_table(&context, req)
.await
.context(CreateTableSnafu {
table_info: format!(
"{}.{}.{}, id:{}",
&catalog_name, &schema_name, &table_name, table_id
),
})
}
}
}
@@ -737,8 +756,8 @@ pub struct RemoteSchemaProvider {
schema_name: String,
node_id: u64,
backend: KvBackendRef,
tables: Arc<ArcSwap<HashMap<String, TableRef>>>,
mutex: Arc<Mutex<()>>,
tables: Arc<ArcSwap<DashMap<String, TableRef>>>,
mutex: Arc<KeyLock<String>>,
}
impl RemoteSchemaProvider {
@@ -775,11 +794,16 @@ impl SchemaProvider for RemoteSchemaProvider {
}
fn table_names(&self) -> Result<Vec<String>> {
Ok(self.tables.load().keys().cloned().collect::<Vec<_>>())
Ok(self
.tables
.load()
.iter()
.map(|en| en.key().clone())
.collect::<Vec<_>>())
}
async fn table(&self, name: &str) -> Result<Option<TableRef>> {
Ok(self.tables.load().get(name).cloned())
Ok(self.tables.load().get(name).map(|en| en.value().clone()))
}
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
@@ -796,7 +820,7 @@ impl SchemaProvider for RemoteSchemaProvider {
let prev = std::thread::spawn(move || {
common_runtime::block_on_read(async move {
let _guard = mutex.lock().await;
let _guard = mutex.lock(table_key.clone()).await;
backend
.set(
table_key.as_bytes(),
@@ -808,11 +832,8 @@ impl SchemaProvider for RemoteSchemaProvider {
table_key, table_value
);
let prev_tables = tables.load();
let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1);
new_tables.clone_from(&prev_tables);
let prev = new_tables.insert(name, table);
tables.store(Arc::new(new_tables));
let tables = tables.load();
let prev = tables.insert(name, table);
Ok(prev)
})
})
@@ -836,18 +857,15 @@ impl SchemaProvider for RemoteSchemaProvider {
let tables = self.tables.clone();
let prev = std::thread::spawn(move || {
common_runtime::block_on_read(async move {
let _guard = mutex.lock().await;
let _guard = mutex.lock(table_key.clone()).await;
backend.delete(table_key.as_bytes()).await?;
debug!(
"Successfully deleted catalog table entry, key: {}",
table_key
);
let prev_tables = tables.load();
let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1);
new_tables.clone_from(&prev_tables);
let prev = new_tables.remove(&table_name);
tables.store(Arc::new(new_tables));
let tables = tables.load();
let prev = tables.remove(&table_name).map(|en| en.1);
Ok(prev)
})
})

View File

@@ -21,10 +21,12 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
dashmap = "5.4"
datafusion.workspace = true
datafusion-common.workspace = true
datatypes = { path = "../datatypes" }
futures.workspace = true
key-lock = "0.1"
log-store = { path = "../log-store" }
object-store = { path = "../object-store" }
serde = { version = "1.0", features = ["derive"] }

View File

@@ -17,7 +17,7 @@ mod procedure;
mod tests;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use async_trait::async_trait;
pub use common_catalog::consts::MITO_ENGINE;
@@ -26,7 +26,9 @@ use common_error::ext::BoxedError;
use common_procedure::{BoxedProcedure, ProcedureManager};
use common_telemetry::tracing::log::info;
use common_telemetry::{debug, logging};
use dashmap::DashMap;
use datatypes::schema::Schema;
use key_lock::KeyLock;
use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{
@@ -38,14 +40,12 @@ use table::engine::{
region_id, region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure,
TableReference,
};
use table::error::TableOperationSnafu;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion};
use table::requests::{
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
};
use table::table::{AlterContext, TableRef};
use table::{error as table_error, Result as TableResult, Table};
use tokio::sync::Mutex;
use crate::config::EngineConfig;
use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable};
@@ -208,12 +208,12 @@ pub(crate) struct MitoEngineInner<S: StorageEngine> {
/// All tables opened by the engine. Map key is formatted [TableReference].
///
/// Writing to `tables` should also hold the `table_mutex`.
tables: RwLock<HashMap<String, Arc<MitoTable<S::Region>>>>,
tables: DashMap<String, Arc<MitoTable<S::Region>>>,
object_store: ObjectStore,
storage_engine: S,
/// Table mutex is used to protect the operations such as creating/opening/closing
/// a table, to avoid things like opening the same table simultaneously.
table_mutex: Mutex<()>,
table_mutex: Arc<KeyLock<String>>,
}
fn build_row_key_desc(
@@ -392,7 +392,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table_dir = table_dir(catalog_name, schema_name, table_id);
let mut regions = HashMap::with_capacity(request.region_numbers.len());
let _lock = self.table_mutex.lock().await;
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table(&table_ref) {
return if request.create_if_not_exists {
@@ -481,10 +481,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
table_id
);
self.tables
.write()
.unwrap()
.insert(table_ref.to_string(), table.clone());
// already locked
self.tables.insert(table_ref.to_string(), table.clone());
Ok(table)
}
@@ -510,7 +508,9 @@ impl<S: StorageEngine> MitoEngineInner<S> {
// Acquires the mutex before opening a new table.
let table = {
let _lock = self.table_mutex.lock().await;
let table_name_key = table_ref.to_string();
let _lock = self.table_mutex.lock(table_name_key.clone()).await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table(&table_ref) {
return Ok(Some(table));
@@ -523,7 +523,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let Some((manifest, table_info)) = self
.recover_table_manifest_and_info(table_name, &table_dir)
.await.map_err(BoxedError::new)
.context(TableOperationSnafu)? else { return Ok(None) };
.context(table_error::TableOperationSnafu)? else { return Ok(None) };
let opts = OpenOptions {
parent_dir: table_dir.to_string(),
@@ -564,10 +564,8 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table = Arc::new(MitoTable::new(table_info, regions, manifest));
self.tables
.write()
.unwrap()
.insert(table_ref.to_string(), table.clone());
// already locked
self.tables.insert(table_ref.to_string(), table.clone());
Some(table as _)
};
@@ -598,20 +596,15 @@ impl<S: StorageEngine> MitoEngineInner<S> {
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
self.tables
.read()
.unwrap()
.get(&table_ref.to_string())
.cloned()
.map(|table| table as _)
.map(|en| en.value().clone() as _)
}
/// Returns the [MitoTable].
fn get_mito_table(&self, table_ref: &TableReference) -> Option<Arc<MitoTable<S::Region>>> {
self.tables
.read()
.unwrap()
.get(&table_ref.to_string())
.cloned()
.map(|en| en.value().clone())
}
async fn alter_table(&self, _ctx: &EngineContext, req: AlterTableRequest) -> Result<TableRef> {
@@ -650,10 +643,14 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.context(error::AlterTableSnafu { table_name })?;
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
let mut tables = self.tables.write().unwrap();
tables.remove(&table_ref.to_string());
let removed = {
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
self.tables.remove(&table_ref.to_string())
};
ensure!(removed.is_some(), error::TableNotFoundSnafu { table_name });
table_ref.table = new_table_name.as_str();
tables.insert(table_ref.to_string(), table.clone());
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
self.tables.insert(table_ref.to_string(), table.clone());
}
Ok(table)
}
@@ -666,35 +663,42 @@ impl<S: StorageEngine> MitoEngineInner<S> {
table: &req.table_name,
};
// todo(ruihang): reclaim persisted data
Ok(self
.tables
.write()
.unwrap()
.remove(&table_reference.to_string())
.is_some())
let _lock = self.table_mutex.lock(table_reference.to_string()).await;
Ok(self.tables.remove(&table_reference.to_string()).is_some())
}
async fn close(&self) -> TableResult<()> {
let _lock = self.table_mutex.lock().await;
let tables = self.tables.write().unwrap().clone();
futures::future::try_join_all(tables.values().map(|t| t.close()))
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
futures::future::try_join_all(
self.tables
.iter()
.map(|item| close_table(self.table_mutex.clone(), item.value().clone())),
)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(())
}
}
async fn close_table(lock: Arc<KeyLock<String>>, table: TableRef) -> TableResult<()> {
let info = table.table_info();
let table_ref = TableReference {
catalog: &info.catalog_name,
schema: &info.schema_name,
table: &info.name,
};
let _lock = lock.lock(table_ref.to_string()).await;
table.close().await
}
impl<S: StorageEngine> MitoEngineInner<S> {
fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self {
Self {
tables: RwLock::new(HashMap::default()),
tables: DashMap::new(),
storage_engine,
object_store,
table_mutex: Mutex::new(()),
table_mutex: Arc::new(KeyLock::new()),
}
}
}

View File

@@ -265,10 +265,17 @@ impl<S: StorageEngine> AlterMitoTable<S> {
// Rename key in tables map.
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
let mut table_ref = self.data.table_ref();
let mut tables = self.engine_inner.tables.write().unwrap();
tables.remove(&table_ref.to_string());
let removed = {
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
self.engine_inner.tables.remove(&table_ref.to_string())
};
ensure!(removed.is_some(), TableNotFoundSnafu { table_name });
table_ref.table = new_table_name.as_str();
tables.insert(table_ref.to_string(), self.table.clone());
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
self.engine_inner
.tables
.insert(table_ref.to_string(), self.table.clone());
}
Ok(Status::Done)

View File

@@ -258,10 +258,9 @@ impl<S: StorageEngine> CreateMitoTable<S> {
{
let table = Arc::new(MitoTable::new(table_info, self.regions.clone(), manifest));
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
self.engine_inner
.tables
.write()
.unwrap()
.insert(table_ref.to_string(), table);
return Ok(Status::Done);
}
@@ -269,10 +268,10 @@ impl<S: StorageEngine> CreateMitoTable<S> {
// We need to persist the table manifest and create the table.
let table = self.write_manifest_and_create_table(&table_dir).await?;
let table = Arc::new(table);
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
self.engine_inner
.tables
.write()
.unwrap()
.insert(table_ref.to_string(), table);
Ok(Status::Done)

View File

@@ -139,11 +139,13 @@ impl<S: StorageEngine> DropMitoTable<S> {
async fn on_close_regions(&mut self) -> Result<Status> {
// Remove the table from the engine to avoid further access from users.
let table_ref = self.data.table_ref();
self.engine_inner
.tables
.write()
.unwrap()
.remove(&table_ref.to_string());
let _lock = self
.engine_inner
.table_mutex
.lock(table_ref.to_string())
.await;
self.engine_inner.tables.remove(&table_ref.to_string());
// Close the table to close all regions. Closing a region is idempotent.
self.table.close().await.map_err(Error::from_error_ext)?;