fix: fix RegionAliveKeeper does not find the table after restarting (#2249)

This commit is contained in:
Weny Xu
2023-08-25 11:05:17 +08:00
committed by GitHub
parent 48348aa364
commit b13d932e4e

View File

@@ -85,6 +85,7 @@ impl RemoteCatalogManager {
let engine_manager = self.engine_manager.clone();
let memory_catalog_manager = self.memory_catalog_manager.clone();
let table_metadata_manager = self.table_metadata_manager.clone();
let region_alive_keepers = self.region_alive_keepers.clone();
common_runtime::spawn_bg(async move {
let table_id = datanode_table_value.table_id;
if let Err(e) = open_and_register_table(
@@ -92,6 +93,7 @@ impl RemoteCatalogManager {
datanode_table_value,
memory_catalog_manager,
table_metadata_manager,
region_alive_keepers,
)
.await
{
@@ -116,6 +118,7 @@ async fn open_and_register_table(
datanode_table_value: DatanodeTableValue,
memory_catalog_manager: Arc<MemoryCatalogManager>,
table_metadata_manager: TableMetadataManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Result<()> {
let context = EngineContext {};
@@ -192,7 +195,8 @@ async fn open_and_register_table(
table_id,
table,
};
let registered = memory_catalog_manager.register_table_sync(request)?;
let registered =
register_table(&memory_catalog_manager, &region_alive_keepers, request).await?;
ensure!(
registered,
TableExistsSnafu {
@@ -203,6 +207,32 @@ async fn open_and_register_table(
Ok(())
}
async fn register_table(
memory_catalog_manager: &Arc<MemoryCatalogManager>,
region_alive_keepers: &Arc<RegionAliveKeepers>,
request: RegisterTableRequest,
) -> Result<bool> {
let table = request.table.clone();
let registered = memory_catalog_manager.register_table_sync(request)?;
if registered {
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id: table_info.table_id(),
engine: table_info.meta.engine.clone(),
};
region_alive_keepers
.register_table(table_ident, table)
.await?;
}
Ok(registered)
}
#[async_trait]
impl CatalogManager for RemoteCatalogManager {
async fn start(&self) -> Result<()> {
@@ -221,25 +251,12 @@ impl CatalogManager for RemoteCatalogManager {
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
let table = request.table.clone();
let registered = self.memory_catalog_manager.register_table_sync(request)?;
if registered {
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id: table_info.table_id(),
engine: table_info.meta.engine.clone(),
};
self.region_alive_keepers
.register_table(table_ident, table)
.await?;
}
Ok(registered)
register_table(
&self.memory_catalog_manager,
&self.region_alive_keepers,
request,
)
.await
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {