fix: fix datanode cannot start while failing to open tables (#1601)

This commit is contained in:
Weny Xu
2023-05-17 21:56:13 +09:00
committed by GitHub
parent 57c02af55b
commit 68dfea0cfd

View File

@@ -343,6 +343,43 @@ async fn iter_remote_tables<'a>(
}))
}
async fn print_regional_key_debug_info(
node_id: u64,
backend: KvBackendRef,
table_key: &TableGlobalKey,
) {
let regional_key = TableRegionalKey {
catalog_name: table_key.catalog_name.clone(),
schema_name: table_key.schema_name.clone(),
table_name: table_key.table_name.clone(),
node_id,
}
.to_string();
match backend.get(regional_key.as_bytes()).await {
Ok(Some(Kv(_, values_bytes))) => {
debug!(
"Node id: {}, TableRegionalKey: {}, value: {},",
node_id,
table_key,
String::from_utf8_lossy(&values_bytes),
);
}
Ok(None) => {
debug!(
"Node id: {}, TableRegionalKey: {}, value: None",
node_id, table_key,
);
}
Err(err) => {
debug!(
"Node id: {}, failed to fetch TableRegionalKey: {}, source: {}",
node_id, regional_key, err
);
}
}
}
/// Initiates all tables inside the catalog by fetching data from metasrv.
/// Return maximum table id in the schema.
async fn initiate_tables(
@@ -363,23 +400,47 @@ async fn initiate_tables(
.map(|(table_key, table_value)| {
let engine_manager = engine_manager.clone();
let schema = schema.clone();
let backend = backend.clone();
common_runtime::spawn_bg(async move {
let table_ref =
open_or_create_table(node_id, engine_manager, &table_key, &table_value).await?;
let table_info = table_ref.table_info();
let table_name = &table_info.name;
schema.register_table(table_name.clone(), table_ref).await?;
info!("Registered table {}", table_name);
Ok(table_info.ident.table_id)
match open_or_create_table(node_id, engine_manager, &table_key, &table_value).await
{
Ok(table_ref) => {
let table_info = table_ref.table_info();
let table_name = &table_info.name;
schema.register_table(table_name.clone(), table_ref).await?;
info!("Registered table {}", table_name);
Ok(Some(table_info.ident.table_id))
}
Err(err) => {
warn!(
"Node id: {}, failed to open table: {}, source: {}",
node_id, table_key, err
);
debug!(
"Node id: {}, TableGlobalKey: {}, value: {:?},",
node_id, table_key, table_value
);
print_regional_key_debug_info(node_id, backend, &table_key).await;
Ok(None)
}
}
})
})
.collect::<Vec<_>>();
let max_table_id = futures::future::try_join_all(joins)
let opened_table_ids = futures::future::try_join_all(joins)
.await
.context(ParallelOpenTableSnafu)?
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect::<Vec<_>>();
let opened = opened_table_ids.len();
let max_table_id = opened_table_ids
.into_iter()
.max()
.unwrap_or(MAX_SYS_TABLE_ID);
@@ -390,8 +451,12 @@ async fn initiate_tables(
&[crate::metrics::db_label(catalog_name, schema_name)],
);
info!(
"initialized tables in {}.{}, total: {}",
catalog_name, schema_name, table_num
"initialized tables in {}.{}, total: {}, opened: {}, failed: {}",
catalog_name,
schema_name,
table_num,
opened,
table_num - opened
);
Ok(max_table_id)