From 1783e4c5cbeed078b9851590c0cf1f5e0af93d43 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 18 Jul 2023 16:03:20 +0900 Subject: [PATCH] refactor: move DatanodeAlterTable after InvalidateTableCache (#1978) * refactor: move AlterDatanode after InvalidateTableCache * fix: acquire table key in region failover procedure --- src/frontend/src/instance/distributed.rs | 10 ----- src/meta-srv/src/procedure/alter_table.rs | 42 ++++++++++--------- src/meta-srv/src/procedure/region_failover.rs | 15 +++---- 3 files changed, 28 insertions(+), 39 deletions(-) diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 4b83432bcd..63c2fa33f3 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -514,16 +514,6 @@ impl DistInstance { .await .context(error::RequestMetaSnafu)?; - let table_info = table.table_info(); - self.catalog_manager - .invalidate_table( - &table_info.catalog_name, - &table_info.schema_name, - &table_info.name, - table_info.table_id(), - ) - .await; - Ok(Output::AffectedRows(0)) } diff --git a/src/meta-srv/src/procedure/alter_table.rs b/src/meta-srv/src/procedure/alter_table.rs index 9aed748941..c253db1846 100644 --- a/src/meta-srv/src/procedure/alter_table.rs +++ b/src/meta-srv/src/procedure/alter_table.rs @@ -30,7 +30,7 @@ use common_procedure::{ use common_telemetry::debug; use futures::future::join_all; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableInfo}; use table::requests::{AlterKind, AlterTableRequest}; @@ -75,12 +75,13 @@ impl AlterTableProcedure { /// Alters table on datanode. async fn on_datanode_alter_table(&mut self) -> Result { - let AlterTableState::DatanodeAlterTable(ref table_route) = self.data.state else { - return error::UnexpectedSnafu { - violated: "expected DatanodeAlterTable", - } - .fail(); - }; + let table_route = self + .data + .table_route + .as_ref() + .context(error::UnexpectedSnafu { + violated: "expected table_route", + })?; let table_ref = self.data.table_ref(); @@ -107,9 +108,7 @@ impl AlterTableProcedure { .map(|e| e.context(error::JoinSnafu).flatten()) .collect::>>()?; - self.data.state = AlterTableState::Broadcast; - - Ok(Status::executing(true)) + Ok(Status::Done) } /// Update table metadata for rename table operation. @@ -247,7 +246,8 @@ impl AlterTableProcedure { .on_update_metadata_for_rename(new_table_name, new_info) .await?; - self.data.state = AlterTableState::DatanodeAlterTable(table_route); + self.data.state = AlterTableState::InvalidateTableCache; + self.data.table_route = Some(table_route); return Ok(Status::executing(true)); } @@ -264,7 +264,8 @@ impl AlterTableProcedure { if table_global_value.table_info == new_raw_info { debug!("table: {} metadata already updated", table_ref.to_string()); - self.data.state = AlterTableState::DatanodeAlterTable(table_route); + self.data.state = AlterTableState::InvalidateTableCache; + self.data.table_route = Some(table_route); return Ok(Status::executing(true)); } @@ -308,7 +309,8 @@ impl AlterTableProcedure { debug!("table: {} metadata updated", table_ref.to_string()); - self.data.state = AlterTableState::DatanodeAlterTable(table_route); + self.data.state = AlterTableState::InvalidateTableCache; + self.data.table_route = Some(table_route); Ok(Status::executing(true)) } else { @@ -347,8 +349,8 @@ impl AlterTableProcedure { .mailbox .broadcast(&BroadcastChannel::Frontend, msg) .await?; - - Ok(Status::Done) + self.data.state = AlterTableState::DatanodeAlterTable; + Ok(Status::executing(true)) } fn lock_key_inner(&self) -> Vec { @@ -390,8 +392,8 @@ impl Procedure for AlterTableProcedure { match self.data.state { AlterTableState::UpdateMetadata => self.on_update_metadata().await, - AlterTableState::DatanodeAlterTable(_) => self.on_datanode_alter_table().await, - AlterTableState::Broadcast => self.on_broadcast().await, + AlterTableState::InvalidateTableCache => self.on_broadcast().await, + AlterTableState::DatanodeAlterTable => self.on_datanode_alter_table().await, } .map_err(error_handler) } @@ -411,10 +413,10 @@ impl Procedure for AlterTableProcedure { enum AlterTableState { /// Updates table metadata. UpdateMetadata, - /// Datanode alters the table. - DatanodeAlterTable(TableRoute), /// Broadcasts the invalidating table cache instruction. - Broadcast, + InvalidateTableCache, + /// Datanode alters the table. + DatanodeAlterTable, } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 1fd02e24c9..3c1b849692 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -357,16 +357,13 @@ impl Procedure for RegionFailoverProcedure { fn lock_key(&self) -> LockKey { let region_ident = &self.node.failed_region; - let key = format!( - "{}/region-{}", - common_catalog::format_full_table_name( - ®ion_ident.table_ident.catalog, - ®ion_ident.table_ident.schema, - ®ion_ident.table_ident.table - ), - region_ident.region_number + let table_key = common_catalog::format_full_table_name( + ®ion_ident.table_ident.catalog, + ®ion_ident.table_ident.schema, + ®ion_ident.table_ident.table, ); - LockKey::single(key) + let region_key = format!("{}/region-{}", table_key, region_ident.region_number); + LockKey::new(vec![table_key, region_key]) } }