From 145f8eb5a7b9f6a415cdec7ca893bdd84ac9fe4f Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Tue, 18 Apr 2023 21:36:29 +0800 Subject: [PATCH] refactor: parallelize open table (#1392) * refactor: change open_table to parallel on datanode startup * chore: try move out register schema table * chore: change mito engine to key lock * chore: minor change * chore: minor change * chore: update error definition * chore: remove rwlock on tables * chore: try parallel register table on schema provider * chore: add rt log * chore: add region open rt log * chore: add actual open region rt log * chore: add recover rt log * chore: divide to three part rt log * chore: remove debug log * chore: add replay rt log * chore: update cargo lock * chore: remove debug log * chore: revert unused change * chore: update err msg Co-authored-by: dennis zhuang * chore: fix cr issue Co-authored-by: dennis zhuang * chore: fix cr issue * chore: fix cr issue --------- Co-authored-by: dennis zhuang --- Cargo.lock | 12 ++ src/catalog/Cargo.toml | 1 + src/catalog/src/error.rs | 7 +- src/catalog/src/remote/manager.rs | 210 +++++++++++++----------- src/mito/Cargo.toml | 2 + src/mito/src/engine.rs | 88 +++++----- src/mito/src/engine/procedure/alter.rs | 13 +- src/mito/src/engine/procedure/create.rs | 7 +- src/mito/src/engine/procedure/drop.rs | 12 +- 9 files changed, 201 insertions(+), 151 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eccb2c2992..29fadeedb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1188,6 +1188,7 @@ dependencies = [ "datatypes", "futures", "futures-util", + "key-lock", "lazy_static", "log-store", "meta-client", @@ -4285,6 +4286,15 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "key-lock" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "792fd2ded02f283f704cda2c8127e256a012beb48e87c9f09bf459e3db051873" +dependencies = [ + "tokio", +] + [[package]] name = "lalrpop" version = "0.19.9" @@ -4917,10 +4927,12 @@ dependencies = [ "common-telemetry", "common-test-util", "common-time", + "dashmap", "datafusion", "datafusion-common", "datatypes", "futures", + "key-lock", "log-store", "object-store", "serde", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 2bc1000187..50d70788a2 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -24,6 +24,7 @@ datafusion.workspace = true datatypes = { path = "../datatypes" } futures = "0.3" futures-util.workspace = true +key-lock = "0.1" lazy_static = "1.4" meta-client = { path = "../meta-client" } parking_lot = "0.12" diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 36c1e9de39..6e65b57166 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -20,6 +20,7 @@ use common_error::prelude::{Snafu, StatusCode}; use datafusion::error::DataFusionError; use datatypes::prelude::ConcreteDataType; use snafu::Location; +use tokio::task::JoinError; use crate::DeregisterTableRequest; @@ -127,6 +128,9 @@ pub enum Error { source: table::error::Error, }, + #[snafu(display("Failed to open table in parallel, source: {}", source))] + ParallelOpenTable { source: JoinError }, + #[snafu(display("Table not found while opening table, table info: {}", table_info))] TableNotFound { table_info: String, @@ -261,7 +265,8 @@ impl ErrorExt for Error { | Error::IllegalManagerState { .. } | Error::CatalogNotFound { .. } | Error::InvalidEntryType { .. } - | Error::InvalidSystemTableDef { .. } => StatusCode::Unexpected, + | Error::InvalidSystemTableDef { .. } + | Error::ParallelOpenTable { .. } => StatusCode::Unexpected, Error::SystemCatalog { .. } | Error::EmptyValue { .. } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 85325a7136..291507c234 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -26,7 +26,8 @@ use common_catalog::consts::{ use common_telemetry::{debug, error, info}; use dashmap::DashMap; use futures::Stream; -use futures_util::StreamExt; +use futures_util::{StreamExt, TryStreamExt}; +use key_lock::KeyLock; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt}; use table::engine::manager::TableEngineManagerRef; @@ -37,8 +38,9 @@ use table::TableRef; use tokio::sync::Mutex; use crate::error::{ - CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, Result, - SchemaNotFoundSnafu, TableEngineNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu, + CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, OpenTableSnafu, + ParallelOpenTableSnafu, Result, SchemaNotFoundSnafu, TableEngineNotFoundSnafu, + TableExistsSnafu, UnimplementedSnafu, }; use crate::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, @@ -254,15 +256,31 @@ impl RemoteCatalogManager { ) -> Result<()> { info!("initializing tables in {}.{}", catalog_name, schema_name); let mut table_num = 0; - let mut tables = self.iter_remote_tables(catalog_name, schema_name).await; - while let Some(r) = tables.next().await { - let (table_key, table_value) = r?; - let table_ref = self.open_or_create_table(&table_key, &table_value).await?; - schema.register_table(table_key.table_name.to_string(), table_ref)?; - info!("Registered table {}", &table_key.table_name); - max_table_id = max_table_id.max(table_value.table_id()); + let tables = self.iter_remote_tables(catalog_name, schema_name).await; + + let kvs = tables.try_collect::>().await?; + let node_id = self.node_id; + let joins = kvs + .into_iter() + .map(|(table_key, table_value)| { + let engine_manager = self.engine_manager.clone(); + common_runtime::spawn_bg(async move { + open_or_create_table(node_id, engine_manager, &table_key, &table_value).await + }) + }) + .collect::>(); + let vec = futures::future::join_all(joins).await; + for res in vec { + let table_ref = res.context(ParallelOpenTableSnafu)??; + let table_info = table_ref.table_info(); + let table_name = &table_info.name; + let table_id = table_info.ident.table_id; + schema.register_table(table_name.clone(), table_ref)?; + info!("Registered table {}", table_name); + max_table_id = max_table_id.max(table_id); table_num += 1; } + info!( "initialized tables in {}.{}, total: {}", catalog_name, schema_name, table_num @@ -310,87 +328,88 @@ impl RemoteCatalogManager { info!("Created catalog '{catalog_key}"); Ok(catalog_provider) } +} - async fn open_or_create_table( - &self, - table_key: &TableGlobalKey, - table_value: &TableGlobalValue, - ) -> Result { - let context = EngineContext {}; - let TableGlobalKey { - catalog_name, - schema_name, - table_name, - .. - } = table_key; +async fn open_or_create_table( + node_id: u64, + engine_manager: TableEngineManagerRef, + table_key: &TableGlobalKey, + table_value: &TableGlobalValue, +) -> Result { + let context = EngineContext {}; + let TableGlobalKey { + catalog_name, + schema_name, + table_name, + .. + } = table_key; - let table_id = table_value.table_id(); + let table_id = table_value.table_id(); - let TableGlobalValue { - table_info, - regions_id_map, - .. - } = table_value; + let TableGlobalValue { + table_info, + regions_id_map, + .. + } = table_value; - // unwrap safety: checked in yielding this table when `iter_remote_tables` - let region_numbers = regions_id_map.get(&self.node_id).unwrap(); + // unwrap safety: checked in yielding this table when `iter_remote_tables` + let region_numbers = regions_id_map.get(&node_id).unwrap(); - let request = OpenTableRequest { - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: table_name.clone(), - table_id, - }; - let engine = self - .engine_manager + let request = OpenTableRequest { + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + table_id, + }; + let engine = + engine_manager .engine(&table_info.meta.engine) .context(TableEngineNotFoundSnafu { engine_name: &table_info.meta.engine, })?; - match engine - .open_table(&context, request) - .await - .with_context(|_| OpenTableSnafu { - table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"), - })? { - Some(table) => { - info!( - "Table opened: {}.{}.{}", - catalog_name, schema_name, table_name - ); - Ok(table) - } - None => { - info!( - "Try create table: {}.{}.{}", - catalog_name, schema_name, table_name - ); + match engine + .open_table(&context, request) + .await + .with_context(|_| OpenTableSnafu { + table_info: format!("{catalog_name}.{schema_name}.{table_name}, id:{table_id}"), + })? { + Some(table) => { + info!( + "Table opened: {}.{}.{}", + catalog_name, schema_name, table_name + ); + Ok(table) + } + None => { + info!( + "Try create table: {}.{}.{}", + catalog_name, schema_name, table_name + ); - let meta = &table_info.meta; - let req = CreateTableRequest { - id: table_id, - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: table_name.clone(), - desc: None, - schema: meta.schema.clone(), - region_numbers: region_numbers.clone(), - primary_key_indices: meta.primary_key_indices.clone(), - create_if_not_exists: true, - table_options: meta.options.clone(), - engine: engine.name().to_string(), - }; + let meta = &table_info.meta; + let req = CreateTableRequest { + id: table_id, + catalog_name: catalog_name.clone(), + schema_name: schema_name.clone(), + table_name: table_name.clone(), + desc: None, + schema: meta.schema.clone(), + region_numbers: region_numbers.clone(), + primary_key_indices: meta.primary_key_indices.clone(), + create_if_not_exists: true, + table_options: meta.options.clone(), + engine: engine.name().to_string(), + }; - engine - .create_table(&context, req) - .await - .context(CreateTableSnafu { - table_info: format!( - "{}.{}.{}, id:{}", - &catalog_name, &schema_name, &table_name, table_id - ), - }) - } + engine + .create_table(&context, req) + .await + .context(CreateTableSnafu { + table_info: format!( + "{}.{}.{}, id:{}", + &catalog_name, &schema_name, &table_name, table_id + ), + }) } } } @@ -737,8 +756,8 @@ pub struct RemoteSchemaProvider { schema_name: String, node_id: u64, backend: KvBackendRef, - tables: Arc>>, - mutex: Arc>, + tables: Arc>>, + mutex: Arc>, } impl RemoteSchemaProvider { @@ -775,11 +794,16 @@ impl SchemaProvider for RemoteSchemaProvider { } fn table_names(&self) -> Result> { - Ok(self.tables.load().keys().cloned().collect::>()) + Ok(self + .tables + .load() + .iter() + .map(|en| en.key().clone()) + .collect::>()) } async fn table(&self, name: &str) -> Result> { - Ok(self.tables.load().get(name).cloned()) + Ok(self.tables.load().get(name).map(|en| en.value().clone())) } fn register_table(&self, name: String, table: TableRef) -> Result> { @@ -796,7 +820,7 @@ impl SchemaProvider for RemoteSchemaProvider { let prev = std::thread::spawn(move || { common_runtime::block_on_read(async move { - let _guard = mutex.lock().await; + let _guard = mutex.lock(table_key.clone()).await; backend .set( table_key.as_bytes(), @@ -808,11 +832,8 @@ impl SchemaProvider for RemoteSchemaProvider { table_key, table_value ); - let prev_tables = tables.load(); - let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1); - new_tables.clone_from(&prev_tables); - let prev = new_tables.insert(name, table); - tables.store(Arc::new(new_tables)); + let tables = tables.load(); + let prev = tables.insert(name, table); Ok(prev) }) }) @@ -836,18 +857,15 @@ impl SchemaProvider for RemoteSchemaProvider { let tables = self.tables.clone(); let prev = std::thread::spawn(move || { common_runtime::block_on_read(async move { - let _guard = mutex.lock().await; + let _guard = mutex.lock(table_key.clone()).await; backend.delete(table_key.as_bytes()).await?; debug!( "Successfully deleted catalog table entry, key: {}", table_key ); - let prev_tables = tables.load(); - let mut new_tables = HashMap::with_capacity(prev_tables.len() + 1); - new_tables.clone_from(&prev_tables); - let prev = new_tables.remove(&table_name); - tables.store(Arc::new(new_tables)); + let tables = tables.load(); + let prev = tables.remove(&table_name).map(|en| en.1); Ok(prev) }) }) diff --git a/src/mito/Cargo.toml b/src/mito/Cargo.toml index 29285b4385..6d40e2dce5 100644 --- a/src/mito/Cargo.toml +++ b/src/mito/Cargo.toml @@ -21,10 +21,12 @@ common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +dashmap = "5.4" datafusion.workspace = true datafusion-common.workspace = true datatypes = { path = "../datatypes" } futures.workspace = true +key-lock = "0.1" log-store = { path = "../log-store" } object-store = { path = "../object-store" } serde = { version = "1.0", features = ["derive"] } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 9084df5e7f..0fd90a32cb 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -17,7 +17,7 @@ mod procedure; mod tests; use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use async_trait::async_trait; pub use common_catalog::consts::MITO_ENGINE; @@ -26,7 +26,9 @@ use common_error::ext::BoxedError; use common_procedure::{BoxedProcedure, ProcedureManager}; use common_telemetry::tracing::log::info; use common_telemetry::{debug, logging}; +use dashmap::DashMap; use datatypes::schema::Schema; +use key_lock::KeyLock; use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ @@ -38,14 +40,12 @@ use table::engine::{ region_id, region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference, }; -use table::error::TableOperationSnafu; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}; use table::requests::{ AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, }; use table::table::{AlterContext, TableRef}; use table::{error as table_error, Result as TableResult, Table}; -use tokio::sync::Mutex; use crate::config::EngineConfig; use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable}; @@ -208,12 +208,12 @@ pub(crate) struct MitoEngineInner { /// All tables opened by the engine. Map key is formatted [TableReference]. /// /// Writing to `tables` should also hold the `table_mutex`. - tables: RwLock>>>, + tables: DashMap>>, object_store: ObjectStore, storage_engine: S, /// Table mutex is used to protect the operations such as creating/opening/closing /// a table, to avoid things like opening the same table simultaneously. - table_mutex: Mutex<()>, + table_mutex: Arc>, } fn build_row_key_desc( @@ -392,7 +392,7 @@ impl MitoEngineInner { let table_dir = table_dir(catalog_name, schema_name, table_id); let mut regions = HashMap::with_capacity(request.region_numbers.len()); - let _lock = self.table_mutex.lock().await; + let _lock = self.table_mutex.lock(table_ref.to_string()).await; // Checks again, read lock should be enough since we are guarded by the mutex. if let Some(table) = self.get_table(&table_ref) { return if request.create_if_not_exists { @@ -481,10 +481,8 @@ impl MitoEngineInner { table_id ); - self.tables - .write() - .unwrap() - .insert(table_ref.to_string(), table.clone()); + // already locked + self.tables.insert(table_ref.to_string(), table.clone()); Ok(table) } @@ -510,7 +508,9 @@ impl MitoEngineInner { // Acquires the mutex before opening a new table. let table = { - let _lock = self.table_mutex.lock().await; + let table_name_key = table_ref.to_string(); + let _lock = self.table_mutex.lock(table_name_key.clone()).await; + // Checks again, read lock should be enough since we are guarded by the mutex. if let Some(table) = self.get_table(&table_ref) { return Ok(Some(table)); @@ -523,7 +523,7 @@ impl MitoEngineInner { let Some((manifest, table_info)) = self .recover_table_manifest_and_info(table_name, &table_dir) .await.map_err(BoxedError::new) - .context(TableOperationSnafu)? else { return Ok(None) }; + .context(table_error::TableOperationSnafu)? else { return Ok(None) }; let opts = OpenOptions { parent_dir: table_dir.to_string(), @@ -564,10 +564,8 @@ impl MitoEngineInner { let table = Arc::new(MitoTable::new(table_info, regions, manifest)); - self.tables - .write() - .unwrap() - .insert(table_ref.to_string(), table.clone()); + // already locked + self.tables.insert(table_ref.to_string(), table.clone()); Some(table as _) }; @@ -598,20 +596,15 @@ impl MitoEngineInner { fn get_table(&self, table_ref: &TableReference) -> Option { self.tables - .read() - .unwrap() .get(&table_ref.to_string()) - .cloned() - .map(|table| table as _) + .map(|en| en.value().clone() as _) } /// Returns the [MitoTable]. fn get_mito_table(&self, table_ref: &TableReference) -> Option>> { self.tables - .read() - .unwrap() .get(&table_ref.to_string()) - .cloned() + .map(|en| en.value().clone()) } async fn alter_table(&self, _ctx: &EngineContext, req: AlterTableRequest) -> Result { @@ -650,10 +643,14 @@ impl MitoEngineInner { .context(error::AlterTableSnafu { table_name })?; if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { - let mut tables = self.tables.write().unwrap(); - tables.remove(&table_ref.to_string()); + let removed = { + let _lock = self.table_mutex.lock(table_ref.to_string()).await; + self.tables.remove(&table_ref.to_string()) + }; + ensure!(removed.is_some(), error::TableNotFoundSnafu { table_name }); table_ref.table = new_table_name.as_str(); - tables.insert(table_ref.to_string(), table.clone()); + let _lock = self.table_mutex.lock(table_ref.to_string()).await; + self.tables.insert(table_ref.to_string(), table.clone()); } Ok(table) } @@ -666,35 +663,42 @@ impl MitoEngineInner { table: &req.table_name, }; // todo(ruihang): reclaim persisted data - Ok(self - .tables - .write() - .unwrap() - .remove(&table_reference.to_string()) - .is_some()) + let _lock = self.table_mutex.lock(table_reference.to_string()).await; + Ok(self.tables.remove(&table_reference.to_string()).is_some()) } async fn close(&self) -> TableResult<()> { - let _lock = self.table_mutex.lock().await; - - let tables = self.tables.write().unwrap().clone(); - - futures::future::try_join_all(tables.values().map(|t| t.close())) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; + futures::future::try_join_all( + self.tables + .iter() + .map(|item| close_table(self.table_mutex.clone(), item.value().clone())), + ) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; Ok(()) } } +async fn close_table(lock: Arc>, table: TableRef) -> TableResult<()> { + let info = table.table_info(); + let table_ref = TableReference { + catalog: &info.catalog_name, + schema: &info.schema_name, + table: &info.name, + }; + let _lock = lock.lock(table_ref.to_string()).await; + table.close().await +} + impl MitoEngineInner { fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self { Self { - tables: RwLock::new(HashMap::default()), + tables: DashMap::new(), storage_engine, object_store, - table_mutex: Mutex::new(()), + table_mutex: Arc::new(KeyLock::new()), } } } diff --git a/src/mito/src/engine/procedure/alter.rs b/src/mito/src/engine/procedure/alter.rs index 1527d8c11b..fddbd8cb0c 100644 --- a/src/mito/src/engine/procedure/alter.rs +++ b/src/mito/src/engine/procedure/alter.rs @@ -265,10 +265,17 @@ impl AlterMitoTable { // Rename key in tables map. if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { let mut table_ref = self.data.table_ref(); - let mut tables = self.engine_inner.tables.write().unwrap(); - tables.remove(&table_ref.to_string()); + let removed = { + let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string()); + self.engine_inner.tables.remove(&table_ref.to_string()) + }; + ensure!(removed.is_some(), TableNotFoundSnafu { table_name }); + table_ref.table = new_table_name.as_str(); - tables.insert(table_ref.to_string(), self.table.clone()); + let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string()); + self.engine_inner + .tables + .insert(table_ref.to_string(), self.table.clone()); } Ok(Status::Done) diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index 31eda68a51..9c655f964a 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -258,10 +258,9 @@ impl CreateMitoTable { { let table = Arc::new(MitoTable::new(table_info, self.regions.clone(), manifest)); + let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string()); self.engine_inner .tables - .write() - .unwrap() .insert(table_ref.to_string(), table); return Ok(Status::Done); } @@ -269,10 +268,10 @@ impl CreateMitoTable { // We need to persist the table manifest and create the table. let table = self.write_manifest_and_create_table(&table_dir).await?; let table = Arc::new(table); + + let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string()); self.engine_inner .tables - .write() - .unwrap() .insert(table_ref.to_string(), table); Ok(Status::Done) diff --git a/src/mito/src/engine/procedure/drop.rs b/src/mito/src/engine/procedure/drop.rs index 2f92ce0098..d0e678516c 100644 --- a/src/mito/src/engine/procedure/drop.rs +++ b/src/mito/src/engine/procedure/drop.rs @@ -139,11 +139,13 @@ impl DropMitoTable { async fn on_close_regions(&mut self) -> Result { // Remove the table from the engine to avoid further access from users. let table_ref = self.data.table_ref(); - self.engine_inner - .tables - .write() - .unwrap() - .remove(&table_ref.to_string()); + + let _lock = self + .engine_inner + .table_mutex + .lock(table_ref.to_string()) + .await; + self.engine_inner.tables.remove(&table_ref.to_string()); // Close the table to close all regions. Closing a region is idempotent. self.table.close().await.map_err(Error::from_error_ext)?;