fix: prevent stale physical table route during procedure retries (#6825)

fix(meta): prevent stale physical table route during procedure retries

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-26 20:14:06 +08:00
committed by GitHub
parent 3d1a4b56a4
commit 6782bcddfa
2 changed files with 38 additions and 10 deletions

View File

@@ -50,6 +50,8 @@ use crate::rpc::router::RegionRoute;
pub struct AlterLogicalTablesProcedure {
pub context: DdlContext,
pub data: AlterTablesData,
/// Physical table route cache.
pub physical_table_route: Option<PhysicalTableRouteValue>,
}
/// Builds the validator from the [`AlterTablesData`].
@@ -93,16 +95,20 @@ impl AlterLogicalTablesProcedure {
table_info_values: vec![],
physical_table_id,
physical_table_info: None,
physical_table_route: None,
physical_columns: vec![],
table_cache_keys_to_invalidate: vec![],
},
physical_table_route: None,
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
Ok(Self {
context,
data,
physical_table_route: None,
})
}
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
@@ -141,21 +147,24 @@ impl AlterLogicalTablesProcedure {
// Updates the procedure state.
retain_unskipped(&mut self.data.tasks, &skip_alter);
self.data.physical_table_info = Some(physical_table_info);
self.data.physical_table_route = Some(physical_table_route);
self.data.table_info_values = table_info_values;
debug_assert_eq!(self.data.tasks.len(), self.data.table_info_values.len());
self.physical_table_route = Some(physical_table_route);
self.data.state = AlterTablesState::SubmitAlterRegionRequests;
Ok(Status::executing(true))
}
pub(crate) async fn on_submit_alter_region_requests(&mut self) -> Result<Status> {
// Safety: we have checked the state in on_prepare
let physical_table_route = &self.data.physical_table_route.as_ref().unwrap();
self.fetch_physical_table_route_if_non_exist().await?;
// Safety: fetched in `fetch_physical_table_route_if_non_exist`.
let region_routes = &self.physical_table_route.as_ref().unwrap().region_routes;
let executor = build_executor_from_alter_expr(&self.data);
let mut results = executor
.on_alter_regions(
&self.context.node_manager,
&physical_table_route.region_routes,
// Avoid double-borrowing self by extracting the region_routes first
region_routes,
)
.await?;
@@ -166,7 +175,7 @@ impl AlterLogicalTablesProcedure {
} else {
warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
self.submit_sync_region_requests(results, region_routes)
.await;
self.data.state = AlterTablesState::UpdateMetadata;
Ok(Status::executing(true))
@@ -232,6 +241,21 @@ impl AlterLogicalTablesProcedure {
.await?;
Ok(Status::done())
}
/// Fetches the physical table route if it is not already fetched.
async fn fetch_physical_table_route_if_non_exist(&mut self) -> Result<()> {
if self.physical_table_route.is_none() {
let (_, physical_table_route) = self
.context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(self.data.physical_table_id)
.await?;
self.physical_table_route = Some(physical_table_route);
}
Ok(())
}
}
#[async_trait]
@@ -261,6 +285,10 @@ impl Procedure for AlterLogicalTablesProcedure {
AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
}
.inspect_err(|_| {
// Reset the physical table route cache.
self.physical_table_route = None;
})
.map_err(map_to_procedure_error)
}
@@ -298,7 +326,6 @@ pub struct AlterTablesData {
/// Physical table info
physical_table_id: TableId,
physical_table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
physical_table_route: Option<PhysicalTableRouteValue>,
physical_columns: Vec<ColumnMetadata>,
table_cache_keys_to_invalidate: Vec<CacheIdent>,
}
@@ -311,7 +338,6 @@ impl AlterTablesData {
self.table_info_values.clear();
self.physical_table_id = 0;
self.physical_table_info = None;
self.physical_table_route = None;
self.physical_columns.clear();
}
}

View File

@@ -28,9 +28,11 @@ use crate::rpc::router::region_distribution;
impl AlterLogicalTablesProcedure {
pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> {
self.fetch_physical_table_route_if_non_exist().await?;
// Safety: must exist.
let physical_table_info = self.data.physical_table_info.as_ref().unwrap();
let physical_table_route = self.data.physical_table_route.as_ref().unwrap();
// Safety: fetched in `fetch_physical_table_route_if_non_exist`.
let physical_table_route = self.physical_table_route.as_ref().unwrap();
let region_distribution = region_distribution(&physical_table_route.region_routes);
// Updates physical table's metadata.