mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 06:42:57 +00:00
refactor: move DatanodeAlterTable after InvalidateTableCache (#1978)
* refactor: move AlterDatanode after InvalidateTableCache * fix: acquire table key in region failover procedure
This commit is contained in:
@@ -514,16 +514,6 @@ impl DistInstance {
|
||||
.await
|
||||
.context(error::RequestMetaSnafu)?;
|
||||
|
||||
let table_info = table.table_info();
|
||||
self.catalog_manager
|
||||
.invalidate_table(
|
||||
&table_info.catalog_name,
|
||||
&table_info.schema_name,
|
||||
&table_info.name,
|
||||
table_info.table_id(),
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Output::AffectedRows(0))
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ use common_procedure::{
|
||||
use common_telemetry::debug;
|
||||
use futures::future::join_all;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::engine::TableReference;
|
||||
use table::metadata::{RawTableInfo, TableInfo};
|
||||
use table::requests::{AlterKind, AlterTableRequest};
|
||||
@@ -75,12 +75,13 @@ impl AlterTableProcedure {
|
||||
|
||||
/// Alters table on datanode.
|
||||
async fn on_datanode_alter_table(&mut self) -> Result<Status> {
|
||||
let AlterTableState::DatanodeAlterTable(ref table_route) = self.data.state else {
|
||||
return error::UnexpectedSnafu {
|
||||
violated: "expected DatanodeAlterTable",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let table_route = self
|
||||
.data
|
||||
.table_route
|
||||
.as_ref()
|
||||
.context(error::UnexpectedSnafu {
|
||||
violated: "expected table_route",
|
||||
})?;
|
||||
|
||||
let table_ref = self.data.table_ref();
|
||||
|
||||
@@ -107,9 +108,7 @@ impl AlterTableProcedure {
|
||||
.map(|e| e.context(error::JoinSnafu).flatten())
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
self.data.state = AlterTableState::Broadcast;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
Ok(Status::Done)
|
||||
}
|
||||
|
||||
/// Update table metadata for rename table operation.
|
||||
@@ -247,7 +246,8 @@ impl AlterTableProcedure {
|
||||
.on_update_metadata_for_rename(new_table_name, new_info)
|
||||
.await?;
|
||||
|
||||
self.data.state = AlterTableState::DatanodeAlterTable(table_route);
|
||||
self.data.state = AlterTableState::InvalidateTableCache;
|
||||
self.data.table_route = Some(table_route);
|
||||
return Ok(Status::executing(true));
|
||||
}
|
||||
|
||||
@@ -264,7 +264,8 @@ impl AlterTableProcedure {
|
||||
if table_global_value.table_info == new_raw_info {
|
||||
debug!("table: {} metadata already updated", table_ref.to_string());
|
||||
|
||||
self.data.state = AlterTableState::DatanodeAlterTable(table_route);
|
||||
self.data.state = AlterTableState::InvalidateTableCache;
|
||||
self.data.table_route = Some(table_route);
|
||||
return Ok(Status::executing(true));
|
||||
}
|
||||
|
||||
@@ -308,7 +309,8 @@ impl AlterTableProcedure {
|
||||
|
||||
debug!("table: {} metadata updated", table_ref.to_string());
|
||||
|
||||
self.data.state = AlterTableState::DatanodeAlterTable(table_route);
|
||||
self.data.state = AlterTableState::InvalidateTableCache;
|
||||
self.data.table_route = Some(table_route);
|
||||
|
||||
Ok(Status::executing(true))
|
||||
} else {
|
||||
@@ -347,8 +349,8 @@ impl AlterTableProcedure {
|
||||
.mailbox
|
||||
.broadcast(&BroadcastChannel::Frontend, msg)
|
||||
.await?;
|
||||
|
||||
Ok(Status::Done)
|
||||
self.data.state = AlterTableState::DatanodeAlterTable;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
fn lock_key_inner(&self) -> Vec<String> {
|
||||
@@ -390,8 +392,8 @@ impl Procedure for AlterTableProcedure {
|
||||
|
||||
match self.data.state {
|
||||
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
|
||||
AlterTableState::DatanodeAlterTable(_) => self.on_datanode_alter_table().await,
|
||||
AlterTableState::Broadcast => self.on_broadcast().await,
|
||||
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
|
||||
AlterTableState::DatanodeAlterTable => self.on_datanode_alter_table().await,
|
||||
}
|
||||
.map_err(error_handler)
|
||||
}
|
||||
@@ -411,10 +413,10 @@ impl Procedure for AlterTableProcedure {
|
||||
enum AlterTableState {
|
||||
/// Updates table metadata.
|
||||
UpdateMetadata,
|
||||
/// Datanode alters the table.
|
||||
DatanodeAlterTable(TableRoute),
|
||||
/// Broadcasts the invalidating table cache instruction.
|
||||
Broadcast,
|
||||
InvalidateTableCache,
|
||||
/// Datanode alters the table.
|
||||
DatanodeAlterTable,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
||||
@@ -357,16 +357,13 @@ impl Procedure for RegionFailoverProcedure {
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
let region_ident = &self.node.failed_region;
|
||||
let key = format!(
|
||||
"{}/region-{}",
|
||||
common_catalog::format_full_table_name(
|
||||
®ion_ident.table_ident.catalog,
|
||||
®ion_ident.table_ident.schema,
|
||||
®ion_ident.table_ident.table
|
||||
),
|
||||
region_ident.region_number
|
||||
let table_key = common_catalog::format_full_table_name(
|
||||
®ion_ident.table_ident.catalog,
|
||||
®ion_ident.table_ident.schema,
|
||||
®ion_ident.table_ident.table,
|
||||
);
|
||||
LockKey::single(key)
|
||||
let region_key = format!("{}/region-{}", table_key, region_ident.region_number);
|
||||
LockKey::new(vec![table_key, region_key])
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user