From fbf1ddd006320cff3da19e6fbf53b0ffc4f75031 Mon Sep 17 00:00:00 2001 From: gitccl <60637740+gitccl@users.noreply.github.com> Date: Mon, 8 May 2023 10:34:30 +0800 Subject: [PATCH] feat: open catalogs and schemas in parallel (#1527) * feat: open catalogs and schemas in parallel * fix: code review --- src/catalog/src/remote/manager.rs | 411 ++++++++++++++++++------------ 1 file changed, 243 insertions(+), 168 deletions(-) diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index fe5d75a6b5..36eb5a2a82 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -29,7 +29,6 @@ use parking_lot::RwLock; use snafu::{OptionExt, ResultExt}; use table::engine::manager::TableEngineManagerRef; use table::engine::{EngineContext, TableReference}; -use table::metadata::TableId; use table::requests::{CreateTableRequest, OpenTableRequest}; use table::TableRef; use tokio::sync::Mutex; @@ -80,16 +79,6 @@ impl RemoteCatalogManager { }) as _ } - fn new_schema_provider(&self, catalog_name: &str, schema_name: &str) -> SchemaProviderRef { - Arc::new(RemoteSchemaProvider { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - node_id: self.node_id, - backend: self.backend.clone(), - engine_manager: self.engine_manager.clone(), - }) as _ - } - async fn iter_remote_catalogs( &self, ) -> Pin> + Send + '_>> { @@ -114,186 +103,59 @@ impl RemoteCatalogManager { })) } - async fn iter_remote_schemas( - &self, - catalog_name: &str, - ) -> Pin> + Send + '_>> { - let schema_prefix = build_schema_prefix(catalog_name); - let mut schemas = self.backend.range(schema_prefix.as_bytes()); - - Box::pin(stream!({ - while let Some(r) = schemas.next().await { - let Kv(k, _) = r?; - if !k.starts_with(schema_prefix.as_bytes()) { - debug!("Ignoring non-schema key: {}", String::from_utf8_lossy(&k)); - continue; - } - - let schema_key = SchemaKey::parse(&String::from_utf8_lossy(&k)) - .context(InvalidCatalogValueSnafu)?; - yield Ok(schema_key) - } - })) - } - - /// Iterate over all table entries on metasrv - async fn iter_remote_tables( - &self, - catalog_name: &str, - schema_name: &str, - ) -> Pin> + Send + '_>> { - let table_prefix = build_table_global_prefix(catalog_name, schema_name); - let mut tables = self.backend.range(table_prefix.as_bytes()); - Box::pin(stream!({ - while let Some(r) = tables.next().await { - let Kv(k, v) = r?; - if !k.starts_with(table_prefix.as_bytes()) { - debug!("Ignoring non-table prefix: {}", String::from_utf8_lossy(&k)); - continue; - } - let table_key = TableGlobalKey::parse(&String::from_utf8_lossy(&k)) - .context(InvalidCatalogValueSnafu)?; - let table_value = - TableGlobalValue::from_bytes(&v).context(InvalidCatalogValueSnafu)?; - - info!( - "Found catalog table entry, key: {}, value: {:?}", - table_key, table_value - ); - // metasrv has allocated region ids to current datanode - if table_value - .regions_id_map - .get(&self.node_id) - .map(|v| !v.is_empty()) - .unwrap_or(false) - { - yield Ok((table_key, table_value)) - } - } - })) - } - /// Fetch catalogs/schemas/tables from remote catalog manager along with max table id allocated. async fn initiate_catalogs(&self) -> Result> { let mut res = HashMap::new(); let mut catalogs = self.iter_remote_catalogs().await; + let mut joins = Vec::new(); while let Some(r) = catalogs.next().await { - let mut max_table_id = MAX_SYS_TABLE_ID; let CatalogKey { catalog_name, .. } = r?; info!("Fetch catalog from metasrv: {}", catalog_name); let catalog = res .entry(catalog_name.clone()) .or_insert_with(|| self.new_catalog_provider(&catalog_name)) .clone(); + + let node_id = self.node_id; + let backend = self.backend.clone(); + let engine_manager = self.engine_manager.clone(); + increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0); - self.initiate_schemas(&catalog_name, catalog, &mut max_table_id) - .await?; - - info!( - "Catalog name: {}, max table id allocated: {}", - &catalog_name, max_table_id - ); + joins.push(common_runtime::spawn_bg(async move { + let max_table_id = + initiate_schemas(node_id, backend, engine_manager, &catalog_name, catalog) + .await?; + info!( + "Catalog name: {}, max table id allocated: {}", + &catalog_name, max_table_id + ); + Ok(()) + })); } + futures::future::try_join_all(joins) + .await + .context(ParallelOpenTableSnafu)? + .into_iter() + .collect::>>()?; + Ok(res) } - async fn initiate_schemas( - &self, - catalog_name: &str, - catalog: CatalogProviderRef, - max_table_id: &mut TableId, - ) -> Result<()> { - let mut schemas = self.iter_remote_schemas(catalog_name).await; - while let Some(r) = schemas.next().await { - let SchemaKey { - catalog_name, - schema_name, - .. - } = r?; - info!("Found schema: {}.{}", catalog_name, schema_name); - let schema = match catalog.schema(&schema_name).await? { - None => { - let schema = self.new_schema_provider(&catalog_name, &schema_name); - catalog - .register_schema(schema_name.clone(), schema.clone()) - .await?; - info!("Registered schema: {}", &schema_name); - schema - } - Some(schema) => schema, - }; - - info!( - "Fetch schema from metasrv: {}.{}", - &catalog_name, &schema_name - ); - increment_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, - 1.0, - &[crate::metrics::db_label(&catalog_name, &schema_name)], - ); - self.initiate_tables(&catalog_name, &schema_name, schema, max_table_id) - .await?; - } - Ok(()) - } - - /// Initiates all tables inside a catalog by fetching data from metasrv. - async fn initiate_tables<'a>( - &'a self, - catalog_name: &'a str, - schema_name: &'a str, - schema: SchemaProviderRef, - max_table_id: &mut TableId, - ) -> Result<()> { - info!("initializing tables in {}.{}", catalog_name, schema_name); - let mut table_num = 0; - let tables = self.iter_remote_tables(catalog_name, schema_name).await; - - let kvs = tables.try_collect::>().await?; - let node_id = self.node_id; - let joins = kvs - .into_iter() - .map(|(table_key, table_value)| { - let engine_manager = self.engine_manager.clone(); - common_runtime::spawn_bg(async move { - open_or_create_table(node_id, engine_manager, &table_key, &table_value).await - }) - }) - .collect::>(); - let vec = futures::future::join_all(joins).await; - for res in vec { - let table_ref = res.context(ParallelOpenTableSnafu)??; - let table_info = table_ref.table_info(); - let table_name = &table_info.name; - let table_id = table_info.ident.table_id; - schema.register_table(table_name.clone(), table_ref).await?; - info!("Registered table {}", table_name); - *max_table_id = (*max_table_id).max(table_id); - table_num += 1; - } - - increment_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - 1.0, - &[crate::metrics::db_label(catalog_name, schema_name)], - ); - info!( - "initialized tables in {}.{}, total: {}", - catalog_name, schema_name, table_num - ); - Ok(()) - } - pub async fn create_catalog_and_schema( &self, catalog_name: &str, schema_name: &str, ) -> Result { - let schema_provider = self.new_schema_provider(catalog_name, schema_name); + let schema_provider = new_schema_provider( + self.node_id, + self.backend.clone(), + self.engine_manager.clone(), + catalog_name, + schema_name, + ); let catalog_provider = self.new_catalog_provider(catalog_name); catalog_provider @@ -332,6 +194,213 @@ impl RemoteCatalogManager { } } +fn new_schema_provider( + node_id: u64, + backend: KvBackendRef, + engine_manager: TableEngineManagerRef, + catalog_name: &str, + schema_name: &str, +) -> SchemaProviderRef { + Arc::new(RemoteSchemaProvider { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + node_id, + backend, + engine_manager, + }) as _ +} + +async fn iter_remote_schemas<'a>( + backend: &'a KvBackendRef, + catalog_name: &'a str, +) -> Pin> + Send + 'a>> { + let schema_prefix = build_schema_prefix(catalog_name); + let mut schemas = backend.range(schema_prefix.as_bytes()); + + Box::pin(stream!({ + while let Some(r) = schemas.next().await { + let Kv(k, _) = r?; + if !k.starts_with(schema_prefix.as_bytes()) { + debug!("Ignoring non-schema key: {}", String::from_utf8_lossy(&k)); + continue; + } + + let schema_key = + SchemaKey::parse(&String::from_utf8_lossy(&k)).context(InvalidCatalogValueSnafu)?; + yield Ok(schema_key) + } + })) +} + +/// Initiates all schemas inside the catalog by fetching data from metasrv. +/// Return maximum table id in the catalog. +async fn initiate_schemas( + node_id: u64, + backend: KvBackendRef, + engine_manager: TableEngineManagerRef, + catalog_name: &str, + catalog: CatalogProviderRef, +) -> Result { + let mut schemas = iter_remote_schemas(&backend, catalog_name).await; + let mut joins = Vec::new(); + while let Some(r) = schemas.next().await { + let SchemaKey { + catalog_name, + schema_name, + .. + } = r?; + + info!("Found schema: {}.{}", catalog_name, schema_name); + let schema = match catalog.schema(&schema_name).await? { + None => { + let schema = new_schema_provider( + node_id, + backend.clone(), + engine_manager.clone(), + &catalog_name, + &schema_name, + ); + catalog + .register_schema(schema_name.clone(), schema.clone()) + .await?; + info!("Registered schema: {}", &schema_name); + schema + } + Some(schema) => schema, + }; + + info!( + "Fetch schema from metasrv: {}.{}", + &catalog_name, &schema_name + ); + increment_gauge!( + crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, + 1.0, + &[crate::metrics::db_label(&catalog_name, &schema_name)], + ); + + let backend = backend.clone(); + let engine_manager = engine_manager.clone(); + + joins.push(common_runtime::spawn_bg(async move { + initiate_tables( + node_id, + backend, + engine_manager, + &catalog_name, + &schema_name, + schema, + ) + .await + })); + } + + let mut max_table_id = MAX_SYS_TABLE_ID; + if let Some(found_max_table_id) = futures::future::try_join_all(joins) + .await + .context(ParallelOpenTableSnafu)? + .into_iter() + .collect::>>()? + .into_iter() + .max() + { + max_table_id = max_table_id.max(found_max_table_id); + } + + Ok(max_table_id) +} + +/// Iterate over all table entries on metasrv +async fn iter_remote_tables<'a>( + node_id: u64, + backend: &'a KvBackendRef, + catalog_name: &'a str, + schema_name: &'a str, +) -> Pin> + Send + 'a>> { + let table_prefix = build_table_global_prefix(catalog_name, schema_name); + let mut tables = backend.range(table_prefix.as_bytes()); + Box::pin(stream!({ + while let Some(r) = tables.next().await { + let Kv(k, v) = r?; + if !k.starts_with(table_prefix.as_bytes()) { + debug!("Ignoring non-table prefix: {}", String::from_utf8_lossy(&k)); + continue; + } + let table_key = TableGlobalKey::parse(&String::from_utf8_lossy(&k)) + .context(InvalidCatalogValueSnafu)?; + let table_value = TableGlobalValue::from_bytes(&v).context(InvalidCatalogValueSnafu)?; + + info!( + "Found catalog table entry, key: {}, value: {:?}", + table_key, table_value + ); + // metasrv has allocated region ids to current datanode + if table_value + .regions_id_map + .get(&node_id) + .map(|v| !v.is_empty()) + .unwrap_or(false) + { + yield Ok((table_key, table_value)) + } + } + })) +} + +/// Initiates all tables inside the catalog by fetching data from metasrv. +/// Return maximum table id in the schema. +async fn initiate_tables( + node_id: u64, + backend: KvBackendRef, + engine_manager: TableEngineManagerRef, + catalog_name: &str, + schema_name: &str, + schema: SchemaProviderRef, +) -> Result { + info!("initializing tables in {}.{}", catalog_name, schema_name); + let tables = iter_remote_tables(node_id, &backend, catalog_name, schema_name).await; + + let kvs = tables.try_collect::>().await?; + let table_num = kvs.len(); + let joins = kvs + .into_iter() + .map(|(table_key, table_value)| { + let engine_manager = engine_manager.clone(); + let schema = schema.clone(); + common_runtime::spawn_bg(async move { + let table_ref = + open_or_create_table(node_id, engine_manager, &table_key, &table_value).await?; + let table_info = table_ref.table_info(); + let table_name = &table_info.name; + schema.register_table(table_name.clone(), table_ref).await?; + info!("Registered table {}", table_name); + Ok(table_info.ident.table_id) + }) + }) + .collect::>(); + + let max_table_id = futures::future::try_join_all(joins) + .await + .context(ParallelOpenTableSnafu)? + .into_iter() + .collect::>>()? + .into_iter() + .max() + .unwrap_or(MAX_SYS_TABLE_ID); + + increment_gauge!( + crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, + 1.0, + &[crate::metrics::db_label(catalog_name, schema_name)], + ); + info!( + "initialized tables in {}.{}, total: {}", + catalog_name, schema_name, table_num + ); + + Ok(max_table_id) +} + async fn open_or_create_table( node_id: u64, engine_manager: TableEngineManagerRef, @@ -507,7 +576,13 @@ impl CatalogManager for RemoteCatalogManager { .context(CatalogNotFoundSnafu { catalog_name: &catalog_name, })?; - let schema_provider = self.new_schema_provider(&catalog_name, &schema_name); + let schema_provider = new_schema_provider( + self.node_id, + self.backend.clone(), + self.engine_manager.clone(), + &catalog_name, + &schema_name, + ); catalog_provider .register_schema(schema_name, schema_provider) .await?;