diff --git a/Cargo.lock b/Cargo.lock index 08f4f7d7b9..a0a2f96fe5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1100,6 +1100,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-time", + "dashmap", "datafusion", "datatypes", "futures", @@ -1109,6 +1110,7 @@ dependencies = [ "meta-client", "mito", "object-store", + "parking_lot", "regex", "serde", "serde_json", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 520780fdfc..8ce18e5198 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -18,12 +18,14 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +dashmap = "5.4" datafusion.workspace = true datatypes = { path = "../datatypes" } futures = "0.3" futures-util.workspace = true lazy_static = "1.4" meta-client = { path = "../meta-client" } +parking_lot = "0.12" regex = "1.6" serde = "1.0" serde_json = "1.0" diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 604a1eec5c..1bf93633a7 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::Arc; @@ -22,8 +22,10 @@ use async_stream::stream; use async_trait::async_trait; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_telemetry::{debug, error, info}; +use dashmap::DashMap; use futures::Stream; use futures_util::StreamExt; +use parking_lot::RwLock; use snafu::{OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::TableId; @@ -39,6 +41,7 @@ use crate::error::{ use crate::helper::{ build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue, + CATALOG_KEY_PREFIX, }; use crate::remote::{Kv, KvBackendRef}; use crate::{ @@ -51,10 +54,9 @@ use crate::{ pub struct RemoteCatalogManager { node_id: u64, backend: KvBackendRef, - catalogs: Arc>>, + catalogs: Arc>>, engine: TableEngineRef, system_table_requests: Mutex>, - mutex: Arc>, } impl RemoteCatalogManager { @@ -65,7 +67,6 @@ impl RemoteCatalogManager { backend, catalogs: Default::default(), system_table_requests: Default::default(), - mutex: Default::default(), } } @@ -386,7 +387,14 @@ impl CatalogManager for RemoteCatalogManager { "Initialized catalogs: {:?}", catalogs.keys().cloned().collect::>() ); - self.catalogs.store(Arc::new(catalogs)); + + { + let self_catalogs = self.catalogs.read(); + catalogs.into_iter().for_each(|(k, v)| { + self_catalogs.insert(k, v); + }); + } + info!("Max table id allocated: {}", max_table_id); let mut system_table_requests = self.system_table_requests.lock().await; @@ -504,12 +512,10 @@ impl CatalogList for RemoteCatalogManager { ) -> Result> { let key = self.build_catalog_key(&name).to_string(); let backend = self.backend.clone(); - let mutex = self.mutex.clone(); let catalogs = self.catalogs.clone(); std::thread::spawn(|| { common_runtime::block_on_write(async move { - let _guard = mutex.lock().await; backend .set( key.as_bytes(), @@ -518,11 +524,10 @@ impl CatalogList for RemoteCatalogManager { .context(InvalidCatalogValueSnafu)?, ) .await?; - let prev_catalogs = catalogs.load(); - let mut new_catalogs = HashMap::with_capacity(prev_catalogs.len() + 1); - new_catalogs.clone_from(&prev_catalogs); - let prev = new_catalogs.insert(name, catalog); - catalogs.store(Arc::new(new_catalogs)); + + let catalogs = catalogs.read(); + let prev = catalogs.insert(name, catalog.clone()); + Ok(prev) }) }) @@ -532,12 +537,65 @@ impl CatalogList for RemoteCatalogManager { /// List all catalogs from metasrv fn catalog_names(&self) -> Result> { - Ok(self.catalogs.load().keys().cloned().collect::>()) + let catalogs = self.catalogs.read(); + Ok(catalogs.iter().map(|k| k.key().to_string()).collect()) } /// Read catalog info of given name from metasrv. fn catalog(&self, name: &str) -> Result> { - Ok(self.catalogs.load().get(name).cloned()) + { + let catalogs = self.catalogs.read(); + let catalog = catalogs.get(name); + + if let Some(catalog) = catalog { + return Ok(Some(catalog.clone())); + } + } + + let catalogs = self.catalogs.write(); + + let catalog = catalogs.get(name); + if let Some(catalog) = catalog { + return Ok(Some(catalog.clone())); + } + + // It's for lack of incremental catalog syncing between datanode and meta. Here we fetch catalog + // from meta on demand. This can be removed when incremental catalog syncing is done in datanode. + + let backend = self.backend.clone(); + + let catalogs_from_meta: HashSet = std::thread::spawn(|| { + common_runtime::block_on_read(async move { + let mut stream = backend.range(CATALOG_KEY_PREFIX.as_bytes()); + let mut catalogs = HashSet::new(); + + while let Some(catalog) = stream.next().await { + if let Ok(catalog) = catalog { + let catalog_key = String::from_utf8_lossy(&catalog.0); + + if let Ok(key) = CatalogKey::parse(&catalog_key) { + catalogs.insert(key.catalog_name); + } + } + } + + catalogs + }) + }) + .join() + .unwrap(); + + catalogs.retain(|catalog_name, _| catalogs_from_meta.get(catalog_name).is_some()); + + for catalog in catalogs_from_meta { + catalogs + .entry(catalog.clone()) + .or_insert(self.new_catalog_provider(&catalog)); + } + + let catalog = catalogs.get(name); + + Ok(catalog.as_deref().cloned()) } }