feat: open catalogs and schemas in parallel (#1527)

* feat: open catalogs and schemas in parallel

* fix: code review
This commit is contained in:
gitccl
2023-05-08 10:34:30 +08:00
committed by GitHub
parent d679cfcb53
commit fbf1ddd006

View File

@@ -29,7 +29,6 @@ use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
use table::engine::{EngineContext, TableReference};
use table::metadata::TableId;
use table::requests::{CreateTableRequest, OpenTableRequest};
use table::TableRef;
use tokio::sync::Mutex;
@@ -80,16 +79,6 @@ impl RemoteCatalogManager {
}) as _
}
fn new_schema_provider(&self, catalog_name: &str, schema_name: &str) -> SchemaProviderRef {
Arc::new(RemoteSchemaProvider {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
node_id: self.node_id,
backend: self.backend.clone(),
engine_manager: self.engine_manager.clone(),
}) as _
}
async fn iter_remote_catalogs(
&self,
) -> Pin<Box<dyn Stream<Item = Result<CatalogKey>> + Send + '_>> {
@@ -114,186 +103,59 @@ impl RemoteCatalogManager {
}))
}
async fn iter_remote_schemas(
&self,
catalog_name: &str,
) -> Pin<Box<dyn Stream<Item = Result<SchemaKey>> + Send + '_>> {
let schema_prefix = build_schema_prefix(catalog_name);
let mut schemas = self.backend.range(schema_prefix.as_bytes());
Box::pin(stream!({
while let Some(r) = schemas.next().await {
let Kv(k, _) = r?;
if !k.starts_with(schema_prefix.as_bytes()) {
debug!("Ignoring non-schema key: {}", String::from_utf8_lossy(&k));
continue;
}
let schema_key = SchemaKey::parse(&String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
yield Ok(schema_key)
}
}))
}
/// Iterate over all table entries on metasrv
async fn iter_remote_tables(
&self,
catalog_name: &str,
schema_name: &str,
) -> Pin<Box<dyn Stream<Item = Result<(TableGlobalKey, TableGlobalValue)>> + Send + '_>> {
let table_prefix = build_table_global_prefix(catalog_name, schema_name);
let mut tables = self.backend.range(table_prefix.as_bytes());
Box::pin(stream!({
while let Some(r) = tables.next().await {
let Kv(k, v) = r?;
if !k.starts_with(table_prefix.as_bytes()) {
debug!("Ignoring non-table prefix: {}", String::from_utf8_lossy(&k));
continue;
}
let table_key = TableGlobalKey::parse(&String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
let table_value =
TableGlobalValue::from_bytes(&v).context(InvalidCatalogValueSnafu)?;
info!(
"Found catalog table entry, key: {}, value: {:?}",
table_key, table_value
);
// metasrv has allocated region ids to current datanode
if table_value
.regions_id_map
.get(&self.node_id)
.map(|v| !v.is_empty())
.unwrap_or(false)
{
yield Ok((table_key, table_value))
}
}
}))
}
/// Fetch catalogs/schemas/tables from remote catalog manager along with max table id allocated.
async fn initiate_catalogs(&self) -> Result<HashMap<String, CatalogProviderRef>> {
let mut res = HashMap::new();
let mut catalogs = self.iter_remote_catalogs().await;
let mut joins = Vec::new();
while let Some(r) = catalogs.next().await {
let mut max_table_id = MAX_SYS_TABLE_ID;
let CatalogKey { catalog_name, .. } = r?;
info!("Fetch catalog from metasrv: {}", catalog_name);
let catalog = res
.entry(catalog_name.clone())
.or_insert_with(|| self.new_catalog_provider(&catalog_name))
.clone();
let node_id = self.node_id;
let backend = self.backend.clone();
let engine_manager = self.engine_manager.clone();
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
self.initiate_schemas(&catalog_name, catalog, &mut max_table_id)
.await?;
info!(
"Catalog name: {}, max table id allocated: {}",
&catalog_name, max_table_id
);
joins.push(common_runtime::spawn_bg(async move {
let max_table_id =
initiate_schemas(node_id, backend, engine_manager, &catalog_name, catalog)
.await?;
info!(
"Catalog name: {}, max table id allocated: {}",
&catalog_name, max_table_id
);
Ok(())
}));
}
futures::future::try_join_all(joins)
.await
.context(ParallelOpenTableSnafu)?
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(res)
}
async fn initiate_schemas(
&self,
catalog_name: &str,
catalog: CatalogProviderRef,
max_table_id: &mut TableId,
) -> Result<()> {
let mut schemas = self.iter_remote_schemas(catalog_name).await;
while let Some(r) = schemas.next().await {
let SchemaKey {
catalog_name,
schema_name,
..
} = r?;
info!("Found schema: {}.{}", catalog_name, schema_name);
let schema = match catalog.schema(&schema_name).await? {
None => {
let schema = self.new_schema_provider(&catalog_name, &schema_name);
catalog
.register_schema(schema_name.clone(), schema.clone())
.await?;
info!("Registered schema: {}", &schema_name);
schema
}
Some(schema) => schema,
};
info!(
"Fetch schema from metasrv: {}.{}",
&catalog_name, &schema_name
);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT,
1.0,
&[crate::metrics::db_label(&catalog_name, &schema_name)],
);
self.initiate_tables(&catalog_name, &schema_name, schema, max_table_id)
.await?;
}
Ok(())
}
/// Initiates all tables inside a catalog by fetching data from metasrv.
async fn initiate_tables<'a>(
&'a self,
catalog_name: &'a str,
schema_name: &'a str,
schema: SchemaProviderRef,
max_table_id: &mut TableId,
) -> Result<()> {
info!("initializing tables in {}.{}", catalog_name, schema_name);
let mut table_num = 0;
let tables = self.iter_remote_tables(catalog_name, schema_name).await;
let kvs = tables.try_collect::<Vec<_>>().await?;
let node_id = self.node_id;
let joins = kvs
.into_iter()
.map(|(table_key, table_value)| {
let engine_manager = self.engine_manager.clone();
common_runtime::spawn_bg(async move {
open_or_create_table(node_id, engine_manager, &table_key, &table_value).await
})
})
.collect::<Vec<_>>();
let vec = futures::future::join_all(joins).await;
for res in vec {
let table_ref = res.context(ParallelOpenTableSnafu)??;
let table_info = table_ref.table_info();
let table_name = &table_info.name;
let table_id = table_info.ident.table_id;
schema.register_table(table_name.clone(), table_ref).await?;
info!("Registered table {}", table_name);
*max_table_id = (*max_table_id).max(table_id);
table_num += 1;
}
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(catalog_name, schema_name)],
);
info!(
"initialized tables in {}.{}, total: {}",
catalog_name, schema_name, table_num
);
Ok(())
}
pub async fn create_catalog_and_schema(
&self,
catalog_name: &str,
schema_name: &str,
) -> Result<CatalogProviderRef> {
let schema_provider = self.new_schema_provider(catalog_name, schema_name);
let schema_provider = new_schema_provider(
self.node_id,
self.backend.clone(),
self.engine_manager.clone(),
catalog_name,
schema_name,
);
let catalog_provider = self.new_catalog_provider(catalog_name);
catalog_provider
@@ -332,6 +194,213 @@ impl RemoteCatalogManager {
}
}
fn new_schema_provider(
node_id: u64,
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
catalog_name: &str,
schema_name: &str,
) -> SchemaProviderRef {
Arc::new(RemoteSchemaProvider {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
node_id,
backend,
engine_manager,
}) as _
}
async fn iter_remote_schemas<'a>(
backend: &'a KvBackendRef,
catalog_name: &'a str,
) -> Pin<Box<dyn Stream<Item = Result<SchemaKey>> + Send + 'a>> {
let schema_prefix = build_schema_prefix(catalog_name);
let mut schemas = backend.range(schema_prefix.as_bytes());
Box::pin(stream!({
while let Some(r) = schemas.next().await {
let Kv(k, _) = r?;
if !k.starts_with(schema_prefix.as_bytes()) {
debug!("Ignoring non-schema key: {}", String::from_utf8_lossy(&k));
continue;
}
let schema_key =
SchemaKey::parse(&String::from_utf8_lossy(&k)).context(InvalidCatalogValueSnafu)?;
yield Ok(schema_key)
}
}))
}
/// Initiates all schemas inside the catalog by fetching data from metasrv.
/// Return maximum table id in the catalog.
async fn initiate_schemas(
node_id: u64,
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
catalog_name: &str,
catalog: CatalogProviderRef,
) -> Result<u32> {
let mut schemas = iter_remote_schemas(&backend, catalog_name).await;
let mut joins = Vec::new();
while let Some(r) = schemas.next().await {
let SchemaKey {
catalog_name,
schema_name,
..
} = r?;
info!("Found schema: {}.{}", catalog_name, schema_name);
let schema = match catalog.schema(&schema_name).await? {
None => {
let schema = new_schema_provider(
node_id,
backend.clone(),
engine_manager.clone(),
&catalog_name,
&schema_name,
);
catalog
.register_schema(schema_name.clone(), schema.clone())
.await?;
info!("Registered schema: {}", &schema_name);
schema
}
Some(schema) => schema,
};
info!(
"Fetch schema from metasrv: {}.{}",
&catalog_name, &schema_name
);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT,
1.0,
&[crate::metrics::db_label(&catalog_name, &schema_name)],
);
let backend = backend.clone();
let engine_manager = engine_manager.clone();
joins.push(common_runtime::spawn_bg(async move {
initiate_tables(
node_id,
backend,
engine_manager,
&catalog_name,
&schema_name,
schema,
)
.await
}));
}
let mut max_table_id = MAX_SYS_TABLE_ID;
if let Some(found_max_table_id) = futures::future::try_join_all(joins)
.await
.context(ParallelOpenTableSnafu)?
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.max()
{
max_table_id = max_table_id.max(found_max_table_id);
}
Ok(max_table_id)
}
/// Iterate over all table entries on metasrv
async fn iter_remote_tables<'a>(
node_id: u64,
backend: &'a KvBackendRef,
catalog_name: &'a str,
schema_name: &'a str,
) -> Pin<Box<dyn Stream<Item = Result<(TableGlobalKey, TableGlobalValue)>> + Send + 'a>> {
let table_prefix = build_table_global_prefix(catalog_name, schema_name);
let mut tables = backend.range(table_prefix.as_bytes());
Box::pin(stream!({
while let Some(r) = tables.next().await {
let Kv(k, v) = r?;
if !k.starts_with(table_prefix.as_bytes()) {
debug!("Ignoring non-table prefix: {}", String::from_utf8_lossy(&k));
continue;
}
let table_key = TableGlobalKey::parse(&String::from_utf8_lossy(&k))
.context(InvalidCatalogValueSnafu)?;
let table_value = TableGlobalValue::from_bytes(&v).context(InvalidCatalogValueSnafu)?;
info!(
"Found catalog table entry, key: {}, value: {:?}",
table_key, table_value
);
// metasrv has allocated region ids to current datanode
if table_value
.regions_id_map
.get(&node_id)
.map(|v| !v.is_empty())
.unwrap_or(false)
{
yield Ok((table_key, table_value))
}
}
}))
}
/// Initiates all tables inside the catalog by fetching data from metasrv.
/// Return maximum table id in the schema.
async fn initiate_tables(
node_id: u64,
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
catalog_name: &str,
schema_name: &str,
schema: SchemaProviderRef,
) -> Result<u32> {
info!("initializing tables in {}.{}", catalog_name, schema_name);
let tables = iter_remote_tables(node_id, &backend, catalog_name, schema_name).await;
let kvs = tables.try_collect::<Vec<_>>().await?;
let table_num = kvs.len();
let joins = kvs
.into_iter()
.map(|(table_key, table_value)| {
let engine_manager = engine_manager.clone();
let schema = schema.clone();
common_runtime::spawn_bg(async move {
let table_ref =
open_or_create_table(node_id, engine_manager, &table_key, &table_value).await?;
let table_info = table_ref.table_info();
let table_name = &table_info.name;
schema.register_table(table_name.clone(), table_ref).await?;
info!("Registered table {}", table_name);
Ok(table_info.ident.table_id)
})
})
.collect::<Vec<_>>();
let max_table_id = futures::future::try_join_all(joins)
.await
.context(ParallelOpenTableSnafu)?
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.max()
.unwrap_or(MAX_SYS_TABLE_ID);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(catalog_name, schema_name)],
);
info!(
"initialized tables in {}.{}, total: {}",
catalog_name, schema_name, table_num
);
Ok(max_table_id)
}
async fn open_or_create_table(
node_id: u64,
engine_manager: TableEngineManagerRef,
@@ -507,7 +576,13 @@ impl CatalogManager for RemoteCatalogManager {
.context(CatalogNotFoundSnafu {
catalog_name: &catalog_name,
})?;
let schema_provider = self.new_schema_provider(&catalog_name, &schema_name);
let schema_provider = new_schema_provider(
self.node_id,
self.backend.clone(),
self.engine_manager.clone(),
&catalog_name,
&schema_name,
);
catalog_provider
.register_schema(schema_name, schema_provider)
.await?;