From 037fccec2cf6ac9a1da1cb54fcf28f95850d4f7a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 16 Apr 2026 17:37:16 +0800 Subject: [PATCH] refactor: move group rollback ownership to parent repartition (#7967) * refactor(meta-srv): move group rollback ownership to parent repartition procedure - Parent procedure now owns partial rollback based on failed/unknown subprocedures - rollback order: group metadata first, then allocated-region cleanup - original_target_routes captured during build-plan, persisted in RepartitionPlanEntry - rollback_group_metadata_routes moved to utils as parent-owned helper - Group subprocedure no longer supports rollback (rollback_supported = false) - Removed UpdateMetadata::RollbackStaging from group state machine - Deleted redundant group rollback tests and helpers BREAKING CHANGE: group Procedure no longer handles rollback; parent procedure is responsible for crash recovery and selecting which plans to roll back. Signed-off-by: WenyXu * chore: update comments Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/meta-srv/src/procedure/repartition.rs | 531 ++++++++++++++++-- .../procedure/repartition/allocate_region.rs | 92 ++- .../repartition/deallocate_region.rs | 1 + .../src/procedure/repartition/group.rs | 407 +------------- .../repartition/group/update_metadata.rs | 16 +- .../update_metadata/exit_staging_region.rs | 2 +- .../rollback_staging_region.rs | 301 ---------- .../src/procedure/repartition/plan.rs | 9 +- .../src/procedure/repartition/utils.rs | 245 +++++++- 9 files changed, 841 insertions(+), 763 deletions(-) delete mode 100644 src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index db8bfeadc5..fc1ece15e2 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -59,11 +59,13 @@ use crate::error::{self, Result}; use crate::procedure::repartition::collect::ProcedureMeta; use crate::procedure::repartition::deallocate_region::DeallocateRegion; use crate::procedure::repartition::group::{ - Context as RepartitionGroupContext, RepartitionGroupProcedure, + Context as RepartitionGroupContext, RepartitionGroupProcedure, region_routes, }; use crate::procedure::repartition::plan::RepartitionPlanEntry; use crate::procedure::repartition::repartition_start::RepartitionStart; -use crate::procedure::repartition::utils::get_datanode_table_value; +use crate::procedure::repartition::utils::{ + get_datanode_table_value, rollback_group_metadata_routes, +}; use crate::service::mailbox::MailboxRef; #[cfg(test)] @@ -76,11 +78,17 @@ pub struct PersistentContext { pub table_name: String, pub table_id: TableId, pub plans: Vec, - /// Records failed sub-procedures for metadata rollback. #[serde(default)] + /// Records failed sub-procedures for parent rollback selection. + /// + /// The parent repartition procedure uses these entries to decide which plans + /// require group-metadata restoration and allocated-region cleanup. pub failed_procedures: Vec, #[serde(default)] - /// Records unknown sub-procedures for metadata rollback. + /// Records unknown sub-procedures for parent rollback selection. + /// + /// Unknown procedures are treated the same as failed ones when selecting the + /// plan subset that must be rolled back by the parent procedure. pub unknown_procedures: Vec, /// The timeout for repartition operations. #[serde(with = "humantime_serde", default = "default_timeout")] @@ -506,6 +514,23 @@ impl RepartitionProcedure { || self.state.as_any().is::() } + fn rollback_plan_indices(&self) -> HashSet { + self.context + .persistent_ctx + .failed_procedures + .iter() + .chain(self.context.persistent_ctx.unknown_procedures.iter()) + .map(|procedure_meta| procedure_meta.plan_index) + .collect() + } + + /// Returns allocated region ids that parent rollback should remove. + /// + /// Rollback uses an "after AllocateRegion" semantic: + /// - in `AllocateRegion` and `Dispatch`, all allocated regions belong to the + /// current repartition attempt and must be cleaned up. + /// - in `Collect`, only the plans referenced by failed or unknown + /// sub-procedures should be rolled back. fn rollback_allocated_region_ids(&self) -> HashSet { if self.state.as_any().is::() || self.state.as_any().is::() @@ -519,13 +544,9 @@ impl RepartitionProcedure { .collect(); } - self.context - .persistent_ctx - .failed_procedures - .iter() - .chain(self.context.persistent_ctx.unknown_procedures.iter()) - .flat_map(|procedure_meta| { - let plan_index = procedure_meta.plan_index; + self.rollback_plan_indices() + .into_iter() + .flat_map(|plan_index| { self.context.persistent_ctx.plans[plan_index] .allocated_region_ids .iter() @@ -534,15 +555,35 @@ impl RepartitionProcedure { .collect() } - fn filter_allocated_region_routes( - region_routes: &[RegionRoute], - allocated_region_ids: &HashSet, - ) -> Vec { - region_routes - .iter() - .filter(|route| !allocated_region_ids.contains(&route.region.id)) - .cloned() - .collect() + /// Restores group-level staging metadata for failed/unknown plans. + /// + /// The helper mutates `region_routes` in memory. + async fn rollback_group_metadata_for_selected_plans( + &mut self, + region_routes: &mut [RegionRoute], + ) -> Result<()> { + let rollback_plan_indices = self.rollback_plan_indices(); + if rollback_plan_indices.is_empty() { + return Ok(()); + } + + let mut region_routes_map = region_routes + .iter_mut() + .map(|route| (route.region.id, route)) + .collect::>(); + for plan_index in rollback_plan_indices { + let plan = &self.context.persistent_ctx.plans[plan_index]; + rollback_group_metadata_routes( + plan.group_id, + &plan.source_regions, + &plan.original_target_routes, + &plan.allocated_region_ids, + &plan.pending_deallocate_region_ids, + &mut region_routes_map, + )?; + } + + Ok(()) } async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> { @@ -552,17 +593,17 @@ impl RepartitionProcedure { let table_id = self.context.persistent_ctx.table_id; let allocated_region_ids = self.rollback_allocated_region_ids(); - if allocated_region_ids.is_empty() { - return Ok(()); - } let table_lock = TableLock::Write(table_id).into(); let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; let table_route_value = self.context.get_table_route_value().await?; - let current_region_routes = table_route_value.region_routes().unwrap(); + let original_region_routes = region_routes(table_id, table_route_value.get_inner_ref())?; + let mut current_region_routes = original_region_routes.clone(); + self.rollback_group_metadata_for_selected_plans(&mut current_region_routes) + .await?; let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes( table_id, - current_region_routes, + ¤t_region_routes, &allocated_region_ids, ); if !allocated_region_routes.is_empty() { @@ -587,9 +628,9 @@ impl RepartitionProcedure { } let new_region_routes = - Self::filter_allocated_region_routes(current_region_routes, &allocated_region_ids); + DeallocateRegion::generate_region_routes(¤t_region_routes, &allocated_region_ids); - if new_region_routes.len() != current_region_routes.len() { + if new_region_routes != *original_region_routes { self.context .update_table_route(&table_route_value, new_region_routes, HashMap::new()) .await @@ -809,6 +850,7 @@ mod tests { use crate::procedure::repartition::collect::Collect; use crate::procedure::repartition::deallocate_region::DeallocateRegion; use crate::procedure::repartition::dispatch::Dispatch; + use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::plan::RegionDescriptor; use crate::procedure::repartition::repartition_end::RepartitionEnd; use crate::procedure::repartition::test_util::{ @@ -837,9 +879,52 @@ mod tests { allocated_region_ids: vec![RegionId::new(table_id, 3)], pending_deallocate_region_ids: vec![], transition_map: vec![vec![0, 1]], + original_target_routes: vec![], } } + fn with_rollback_metadata( + mut plan: RepartitionPlanEntry, + original_target_routes: Vec, + ) -> RepartitionPlanEntry { + plan.original_target_routes = original_target_routes; + plan + } + + fn apply_group_staging( + plan: &RepartitionPlanEntry, + current_region_routes: &[RegionRoute], + ) -> Vec { + UpdateMetadata::apply_staging_region_routes( + plan.group_id, + &plan.source_regions, + &plan.target_regions, + &plan.pending_deallocate_region_ids, + current_region_routes, + ) + .unwrap() + } + + fn exit_group_staging( + plan: &RepartitionPlanEntry, + current_region_routes: &[RegionRoute], + ) -> Vec { + UpdateMetadata::exit_staging_region_routes( + plan.group_id, + &plan.source_regions, + &plan.target_regions, + current_region_routes, + ) + .unwrap() + } + + fn region_route_by_id(region_routes: &[RegionRoute], region_id: RegionId) -> &RegionRoute { + region_routes + .iter() + .find(|route| route.region.id == region_id) + .unwrap() + } + fn test_procedure(state: Box, context: Context) -> RepartitionProcedure { RepartitionProcedure { state, context } } @@ -870,10 +955,8 @@ mod tests { ]; let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]); - let new_region_routes = RepartitionProcedure::filter_allocated_region_routes( - ®ion_routes, - &allocated_region_ids, - ); + let new_region_routes = + DeallocateRegion::generate_region_routes(®ion_routes, &allocated_region_ids); assert_eq!(new_region_routes.len(), 1); assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1)); @@ -939,7 +1022,16 @@ mod tests { table_id, None, ); - persistent_ctx.plans = vec![test_plan(table_id)]; + persistent_ctx.plans = vec![with_rollback_metadata( + test_plan(table_id), + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 3), ""), + ], + )]; persistent_ctx.failed_procedures = vec![ProcedureMeta { plan_index: 0, group_id: Uuid::new_v4(), @@ -1050,6 +1142,16 @@ mod tests { None, ); let failed_plan = test_plan(table_id); + let failed_plan = with_rollback_metadata( + failed_plan, + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 3), ""), + ], + ); let succeeded_plan = RepartitionPlanEntry { group_id: Uuid::new_v4(), source_regions: vec![RegionDescriptor { @@ -1069,6 +1171,13 @@ mod tests { allocated_region_ids: vec![RegionId::new(table_id, 4)], pending_deallocate_region_ids: vec![], transition_map: vec![vec![0]], + original_target_routes: vec![ + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 4), ""), + ], }; persistent_ctx.plans = vec![failed_plan, succeeded_plan]; persistent_ctx.failed_procedures = vec![ProcedureMeta { @@ -1100,6 +1209,212 @@ mod tests { assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4)); } + #[tokio::test] + async fn test_repartition_rollback_from_collect_restores_failed_group_metadata_only() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_ctx = env.ddl_context(node_manager); + let original_region_routes = vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 3), ""), + test_region_route(RegionId::new(table_id, 4), ""), + ]; + + let failed_plan = with_rollback_metadata( + test_plan(table_id), + vec![ + original_region_routes[0].clone(), + original_region_routes[2].clone(), + ], + ); + let succeeded_plan = RepartitionPlanEntry { + group_id: Uuid::new_v4(), + source_regions: vec![RegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 100, 200), + }], + target_regions: vec![ + RegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 100, 150), + }, + RegionDescriptor { + region_id: RegionId::new(table_id, 4), + partition_expr: range_expr("x", 150, 200), + }, + ], + allocated_region_ids: vec![RegionId::new(table_id, 4)], + pending_deallocate_region_ids: vec![], + transition_map: vec![vec![0, 1]], + original_target_routes: vec![ + original_region_routes[1].clone(), + original_region_routes[3].clone(), + ], + }; + let current_region_routes = apply_group_staging(&failed_plan, &original_region_routes); + let current_region_routes = apply_group_staging(&succeeded_plan, ¤t_region_routes); + let current_region_routes = exit_group_staging(&succeeded_plan, ¤t_region_routes); + env.create_physical_table_metadata_with_wal_options( + table_id, + current_region_routes, + test_region_wal_options(&[1, 2, 3, 4]), + ) + .await; + + let mut persistent_ctx = PersistentContext::new( + TableName::new("test_catalog", "test_schema", "test_table"), + table_id, + None, + ); + persistent_ctx.plans = vec![failed_plan, succeeded_plan.clone()]; + persistent_ctx.failed_procedures = vec![ProcedureMeta { + plan_index: 0, + group_id: persistent_ctx.plans[0].group_id, + procedure_id: ProcedureId::random(), + }]; + + let context = Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ); + let mut procedure = RepartitionProcedure { + state: Box::new(Collect::new(vec![])), + context, + }; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + + assert_eq!( + current_parent_region_routes(&procedure.context).await, + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + RegionRoute { + region: Region { + id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 100, 150).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(table_id, 4), + partition_expr: range_expr("x", 150, 200).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + ] + ); + } + + #[tokio::test] + async fn test_repartition_rollback_from_collect_restores_unknown_group_metadata() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_ctx = env.ddl_context(node_manager); + let original_region_routes = vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 3), ""), + ]; + let plan = with_rollback_metadata( + test_plan(table_id), + vec![ + original_region_routes[0].clone(), + original_region_routes[2].clone(), + ], + ); + let staged_region_routes = apply_group_staging(&plan, &original_region_routes); + assert_eq!( + region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1)) + .region + .partition_expr(), + range_expr("x", 0, 50).as_json_str().unwrap() + ); + assert!( + region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1)) + .is_leader_staging() + ); + assert_eq!( + region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3)) + .region + .partition_expr(), + range_expr("x", 50, 100).as_json_str().unwrap() + ); + assert!( + region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3)) + .is_leader_staging() + ); + env.create_physical_table_metadata_with_wal_options( + table_id, + staged_region_routes, + test_region_wal_options(&[1, 2, 3]), + ) + .await; + + let mut persistent_ctx = PersistentContext::new( + TableName::new("test_catalog", "test_schema", "test_table"), + table_id, + None, + ); + persistent_ctx.plans = vec![plan.clone()]; + persistent_ctx.unknown_procedures = vec![ProcedureMeta { + plan_index: 0, + group_id: plan.group_id, + procedure_id: ProcedureId::random(), + }]; + + let context = Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ); + let mut procedure = RepartitionProcedure { + state: Box::new(Collect::new(vec![])), + context, + }; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + + assert_eq!( + current_parent_region_routes(&procedure.context).await, + vec![ + original_region_routes[0].clone(), + original_region_routes[1].clone() + ] + ); + } + #[tokio::test] async fn test_repartition_rollback_is_idempotent() { let env = TestingEnv::new(); @@ -1129,7 +1444,16 @@ mod tests { table_id, None, ); - persistent_ctx.plans = vec![test_plan(table_id)]; + persistent_ctx.plans = vec![with_rollback_metadata( + test_plan(table_id), + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 3), ""), + ], + )]; persistent_ctx.failed_procedures = vec![ProcedureMeta { plan_index: 0, group_id: Uuid::new_v4(), @@ -1164,6 +1488,147 @@ mod tests { assert_eq!(once[1].region.id, RegionId::new(table_id, 2)); } + #[tokio::test] + async fn test_repartition_rollback_from_collect_restores_failed_merge_group_metadata_only() { + let env = TestingEnv::new(); + let table_id = 1024; + let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler)); + let ddl_ctx = env.ddl_context(node_manager); + let original_region_routes = vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 3), + &range_expr("x", 200, 300).as_json_str().unwrap(), + ), + test_region_route(RegionId::new(table_id, 4), ""), + ]; + let failed_merge_plan = RepartitionPlanEntry { + group_id: Uuid::new_v4(), + source_regions: vec![ + RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 100), + }, + RegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 100, 200), + }, + ], + target_regions: vec![RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 200), + }], + allocated_region_ids: vec![], + pending_deallocate_region_ids: vec![RegionId::new(table_id, 2)], + transition_map: vec![vec![0], vec![0]], + original_target_routes: vec![original_region_routes[0].clone()], + }; + let succeeded_split_plan = RepartitionPlanEntry { + group_id: Uuid::new_v4(), + source_regions: vec![RegionDescriptor { + region_id: RegionId::new(table_id, 3), + partition_expr: range_expr("x", 200, 300), + }], + target_regions: vec![ + RegionDescriptor { + region_id: RegionId::new(table_id, 3), + partition_expr: range_expr("x", 200, 250), + }, + RegionDescriptor { + region_id: RegionId::new(table_id, 4), + partition_expr: range_expr("x", 250, 300), + }, + ], + allocated_region_ids: vec![RegionId::new(table_id, 4)], + pending_deallocate_region_ids: vec![], + transition_map: vec![vec![0, 1]], + original_target_routes: vec![ + original_region_routes[2].clone(), + original_region_routes[3].clone(), + ], + }; + let current_region_routes = + apply_group_staging(&failed_merge_plan, &original_region_routes); + let current_region_routes = + apply_group_staging(&succeeded_split_plan, ¤t_region_routes); + let staged_region_routes = + exit_group_staging(&succeeded_split_plan, ¤t_region_routes); + env.create_physical_table_metadata_with_wal_options( + table_id, + staged_region_routes, + test_region_wal_options(&[1, 2, 3, 4]), + ) + .await; + + let mut persistent_ctx = PersistentContext::new( + TableName::new("test_catalog", "test_schema", "test_table"), + table_id, + None, + ); + persistent_ctx.plans = vec![failed_merge_plan, succeeded_split_plan.clone()]; + persistent_ctx.failed_procedures = vec![ProcedureMeta { + plan_index: 0, + group_id: persistent_ctx.plans[0].group_id, + procedure_id: ProcedureId::random(), + }]; + + let context = Context::new( + &ddl_ctx, + env.mailbox_ctx.mailbox().clone(), + env.server_addr.clone(), + persistent_ctx, + ); + let mut procedure = RepartitionProcedure { + state: Box::new(Collect::new(vec![])), + context, + }; + + procedure + .rollback(&TestingEnv::procedure_context()) + .await + .unwrap(); + + let region_routes = current_parent_region_routes(&procedure.context).await; + assert_eq!( + region_routes, + vec![ + test_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + ), + test_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + ), + RegionRoute { + region: Region { + id: RegionId::new(table_id, 3), + partition_expr: range_expr("x", 200, 250).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(table_id, 4), + partition_expr: range_expr("x", 250, 300).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + ] + ); + } + #[tokio::test] async fn test_repartition_procedure_flow_split_failed_and_full_rollback() { let env = TestingEnv::new(); diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index 12ffac9918..c1f3ca2503 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -25,8 +25,8 @@ use common_meta::rpc::router::RegionRoute; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{debug, info}; use serde::{Deserialize, Deserializer, Serialize}; -use snafu::ResultExt; -use store_api::storage::{RegionNumber, TableId}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{RegionId, RegionNumber, TableId}; use table::metadata::TableInfo; use table::table_reference::TableReference; use tokio::time::Instant; @@ -104,7 +104,8 @@ impl BuildPlan { table_id, &mut next_region_number, &self.plan_entries, - ); + table_route_value.region_routes().unwrap(), + )?; let plan_count = repartition_plan_entries.len(); let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries); info!( @@ -258,25 +259,64 @@ impl AllocateRegion { /// Converts allocation plan entries to repartition plan entries. /// - /// This method takes the allocation plan entries and converts them to repartition plan entries, - /// updating `next_region_number` for each newly allocated region. + /// This method converts allocation intents into concrete repartition plans, + /// updates `next_region_number` for newly allocated regions, and captures + /// each plan's `original_target_routes` from the current table-route view. + /// + /// This also persists each plan's pre-staging target routes for rollback. fn convert_to_repartition_plans( table_id: TableId, next_region_number: &mut RegionNumber, plan_entries: &[AllocationPlanEntry], - ) -> Vec { + current_region_routes: &[RegionRoute], + ) -> Result> { + let region_routes_map = current_region_routes + .iter() + .map(|route| (route.region.id, route)) + .collect::>(); + plan_entries .iter() .map(|plan_entry| { - convert_allocation_plan_to_repartition_plan( + let mut plan = convert_allocation_plan_to_repartition_plan( table_id, next_region_number, plan_entry, - ) + ); + Self::capture_plan_original_target_routes(&mut plan, ®ion_routes_map)?; + Ok(plan) }) .collect() } + fn capture_plan_original_target_routes( + plan: &mut RepartitionPlanEntry, + region_routes_map: &HashMap, + ) -> Result<()> { + // Persist the pre-staging target-route view on the parent plan. + // Newly allocated targets are skipped because rollback deletes their + // route metadata rather than restoring an original target route. + let mut original_target_routes = Vec::with_capacity(plan.target_regions.len()); + for target in &plan.target_regions { + if plan.allocated_region_ids.contains(&target.region_id) { + // This target region is to be allocated, so it doesn't exist in current routes. + continue; + } + let route = region_routes_map.get(&target.region_id).context( + error::RepartitionTargetRegionMissingSnafu { + group_id: plan.group_id, + region_id: target.region_id, + }, + )?; + { + original_target_routes.push((*route).clone()); + } + } + + plan.original_target_routes = original_target_routes; + Ok(()) + } + /// Collects all regions that need to be allocated from the repartition plan entries. fn collect_allocate_regions( repartition_plan_entries: &[RepartitionPlanEntry], @@ -357,6 +397,8 @@ impl AllocateRegion { #[cfg(test)] mod tests { + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; use store_api::storage::RegionId; use uuid::Uuid; @@ -405,6 +447,20 @@ mod tests { } } + fn create_current_region_routes(table_id: TableId, region_numbers: &[u32]) -> Vec { + region_numbers + .iter() + .map(|region_number| RegionRoute { + region: Region { + id: RegionId::new(table_id, *region_number), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }) + .collect() + } + #[test] fn test_convert_to_repartition_plans_no_allocation() { let table_id = 1024; @@ -421,7 +477,9 @@ mod tests { table_id, &mut next_region_number, &plan_entries, - ); + &create_current_region_routes(table_id, &[1, 2]), + ) + .unwrap(); assert_eq!(result.len(), 1); assert_eq!(result[0].target_regions.len(), 2); @@ -446,7 +504,9 @@ mod tests { table_id, &mut next_region_number, &plan_entries, - ); + &create_current_region_routes(table_id, &[1, 2]), + ) + .unwrap(); assert_eq!(result.len(), 1); assert_eq!(result[0].target_regions.len(), 4); @@ -479,7 +539,9 @@ mod tests { table_id, &mut next_region_number, &plan_entries, - ); + &create_current_region_routes(table_id, &[1, 2, 3, 4]), + ) + .unwrap(); assert_eq!(result.len(), 3); assert_eq!(result[0].allocated_region_ids.len(), 1); @@ -504,7 +566,9 @@ mod tests { table_id, &mut next_region_number, &plan_entries, - ); + &create_current_region_routes(table_id, &[1, 2, 3, 4]), + ) + .unwrap(); let count = AllocateRegion::count_regions_to_allocate(&repartition_plans); assert_eq!(count, 2); @@ -524,7 +588,9 @@ mod tests { table_id, &mut next_region_number, &plan_entries, - ); + &create_current_region_routes(table_id, &[1, 2]), + ) + .unwrap(); let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans); assert_eq!(allocate_regions.len(), 2); diff --git a/src/meta-srv/src/procedure/repartition/deallocate_region.rs b/src/meta-srv/src/procedure/repartition/deallocate_region.rs index 3f5dc5bd8e..f3102221fb 100644 --- a/src/meta-srv/src/procedure/repartition/deallocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/deallocate_region.rs @@ -267,6 +267,7 @@ mod tests { allocated_region_ids: vec![], pending_deallocate_region_ids: vec![RegionId::new(table_id, 1)], transition_map: vec![], + original_target_routes: vec![], }]; let mut state = DeallocateRegion; diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index e5a06f79a8..12374e8ada 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -41,18 +41,14 @@ use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, StringKey, UserMetadata, }; -use common_telemetry::{error, info, warn}; +use common_telemetry::{error, info}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::{RegionId, TableId}; use uuid::Uuid; use crate::error::{self, Result}; -use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest; -use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion; -use crate::procedure::repartition::group::remap_manifest::RemapManifest; use crate::procedure::repartition::group::repartition_start::RepartitionStart; -use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::plan::RegionDescriptor; use crate::procedure::repartition::utils::get_datanode_table_value; use crate::procedure::repartition::{self}; @@ -196,62 +192,6 @@ impl RepartitionGroupProcedure { Ok(Self { state, context }) } - - async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> { - if !self.should_rollback_metadata() { - return Ok(()); - } - - let table_lock = - common_meta::lock_key::TableLock::Write(self.context.persistent_ctx.table_id).into(); - let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; - UpdateMetadata::RollbackStaging - .rollback_staging_regions(&mut self.context) - .await?; - - if let Err(err) = self.context.invalidate_table_cache().await { - warn!( - err; - "Failed to broadcast the invalidate table cache message during repartition group rollback" - ); - } - - Ok(()) - } - - /// Returns whether group rollback should revert staging metadata. - /// - /// This uses an "after metadata apply, before exit staging" semantic. - /// Once execution reaches `UpdateMetadata::ApplyStaging` or any later staging state, - /// rollback must restore table-route metadata back to the pre-apply view. - /// - /// State flow: - /// `RepartitionStart -> SyncRegion -> UpdateMetadata::ApplyStaging -> EnterStagingRegion` - /// ` -> RemapManifest -> ApplyStagingManifest -> UpdateMetadata::ExitStaging -> RepartitionEnd` - /// ` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^` - /// ` rollback staging metadata` - /// - /// Notes: - /// - `RepartitionStart` / `SyncRegion`: no-op, metadata has not been staged yet. - /// - `UpdateMetadata::ApplyStaging` / `EnterStagingRegion` / `RemapManifest` / - /// `ApplyStagingManifest` / `UpdateMetadata::RollbackStaging`: rollback-active. - /// - `UpdateMetadata::ExitStaging` / `RepartitionEnd`: excluded, because metadata has - /// already moved into the post-commit exit path. - fn should_rollback_metadata(&self) -> bool { - self.state.as_any().is::() - || self.state.as_any().is::() - || self.state.as_any().is::() - || self - .state - .as_any() - .downcast_ref::() - .is_some_and(|state| { - matches!( - state, - UpdateMetadata::ApplyStaging | UpdateMetadata::RollbackStaging - ) - }) - } } #[async_trait::async_trait] @@ -260,10 +200,10 @@ impl Procedure for RepartitionGroupProcedure { Self::TYPE_NAME } - async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> { - self.rollback_inner(ctx) - .await - .map_err(ProcedureError::external) + async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> { + // The parent repartition procedure is responsible for rollback and recovery. + // Subprocedures are not recovered after metasrv restarts, so implementing rollback for them is meaningless. + Ok(()) } #[tracing::instrument(skip_all, fields( @@ -304,7 +244,9 @@ impl Procedure for RepartitionGroupProcedure { } fn rollback_supported(&self) -> bool { - true + // Parent repartition owns rollback and recovery because subprocedures are + // not relied on as durable rollback units across metasrv restarts. + false } fn dump(&self) -> ProcedureResult { @@ -665,149 +607,12 @@ pub(crate) trait State: Sync + Send + Debug { mod tests { use std::assert_matches; use std::sync::Arc; - use std::time::Duration; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::test_util::MockKvBackendBuilder; - use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; - use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId}; - use common_procedure_test::MockContextProvider; - use partition::expr::PartitionExpr; - use store_api::storage::RegionId; - use super::{ - Context, PersistentContext, RepartitionGroupProcedure, RepartitionStart, State, - region_routes, - }; use crate::error::Error; - use crate::procedure::repartition::dispatch::build_region_mapping; - use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest; - use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion; - use crate::procedure::repartition::group::remap_manifest::RemapManifest; - use crate::procedure::repartition::group::repartition_start::RepartitionStart as GroupRepartitionStart; - use crate::procedure::repartition::group::sync_region::SyncRegion; - use crate::procedure::repartition::group::update_metadata::UpdateMetadata; - use crate::procedure::repartition::plan; - use crate::procedure::repartition::repartition_start::RepartitionStart as ParentRepartitionStart; - use crate::procedure::repartition::test_util::{ - TestingEnv, new_persistent_context, range_expr, - }; - - struct GroupRollbackFixture { - context: Context, - original_region_routes: Vec, - next_state: Option>, - } - - async fn new_group_rollback_fixture( - original_region_routes: Vec, - from_exprs: Vec, - to_exprs: Vec, - sync_region: bool, - ) -> GroupRollbackFixture { - let env = TestingEnv::new(); - let procedure_ctx = TestingEnv::procedure_context(); - let table_id = 1024; - let mut next_region_number = 10; - - env.create_physical_table_metadata(table_id, original_region_routes.clone()) - .await; - - let (_, physical_route) = env - .table_metadata_manager - .table_route_manager() - .get_physical_table_route(table_id) - .await - .unwrap(); - let allocation_plans = - ParentRepartitionStart::build_plan(&physical_route, &from_exprs, &to_exprs).unwrap(); - assert_eq!(allocation_plans.len(), 1); - - let repartition_plan = plan::convert_allocation_plan_to_repartition_plan( - table_id, - &mut next_region_number, - &allocation_plans[0], - ); - let region_mapping = build_region_mapping( - &repartition_plan.source_regions, - &repartition_plan.target_regions, - &repartition_plan.transition_map, - ); - let persistent_context = PersistentContext::new( - repartition_plan.group_id, - table_id, - "test_catalog".to_string(), - "test_schema".to_string(), - repartition_plan.source_regions, - repartition_plan.target_regions, - region_mapping, - sync_region, - repartition_plan.allocated_region_ids, - repartition_plan.pending_deallocate_region_ids, - Duration::from_secs(120), - ); - let mut context = env.create_context(persistent_context); - let (next_state, _) = GroupRepartitionStart - .next(&mut context, &procedure_ctx) - .await - .unwrap(); - - GroupRollbackFixture { - context, - original_region_routes, - next_state: Some(next_state), - } - } - - async fn new_split_group_rollback_fixture(sync_region: bool) -> GroupRollbackFixture { - new_group_rollback_fixture( - vec![ - new_region_route(RegionId::new(1024, 1), Some(range_expr("x", 0, 100))), - new_region_route(RegionId::new(1024, 2), Some(range_expr("x", 100, 200))), - new_region_route(RegionId::new(1024, 10), None), - ], - vec![range_expr("x", 0, 100)], - vec![range_expr("x", 0, 50), range_expr("x", 50, 100)], - sync_region, - ) - .await - } - - async fn new_merge_group_rollback_fixture(sync_region: bool) -> GroupRollbackFixture { - new_group_rollback_fixture( - vec![ - new_region_route(RegionId::new(1024, 1), Some(range_expr("x", 0, 100))), - new_region_route(RegionId::new(1024, 2), Some(range_expr("x", 100, 200))), - new_region_route(RegionId::new(1024, 3), Some(range_expr("x", 200, 300))), - ], - vec![range_expr("x", 0, 100), range_expr("x", 100, 200)], - vec![range_expr("x", 0, 200)], - sync_region, - ) - .await - } - - async fn stage_metadata(context: &mut Context) { - UpdateMetadata::ApplyStaging - .apply_staging_regions(context) - .await - .unwrap(); - } - - fn new_region_route(region_id: RegionId, partition_expr: Option) -> RegionRoute { - RegionRoute { - region: Region { - id: region_id, - partition_expr: partition_expr - .map(|expr| expr.as_json_str().unwrap()) - .unwrap_or_default(), - ..Default::default() - }, - leader_peer: Some(Peer::empty(1)), - ..Default::default() - } - } + use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context}; #[tokio::test] async fn test_get_table_route_value_not_found_error() { @@ -856,198 +661,4 @@ mod tests { let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err(); assert!(err.is_retryable()); } - - #[tokio::test] - async fn test_group_rollback_supported() { - let env = TestingEnv::new(); - let persistent_context = new_persistent_context(1024, vec![], vec![]); - let procedure = RepartitionGroupProcedure { - state: Box::new(RepartitionStart), - context: env.create_context(persistent_context), - }; - - assert!(procedure.rollback_supported()); - } - - #[tokio::test] - async fn test_group_rollback_is_noop_before_apply_staging() { - let env = TestingEnv::new(); - let persistent_context = new_persistent_context(1024, vec![], vec![]); - let ctx = env.create_context(persistent_context.clone()); - let mut procedure = RepartitionGroupProcedure { - state: Box::new(RepartitionStart), - context: ctx, - }; - let provider = Arc::new(MockContextProvider::new(Default::default())); - let procedure_ctx = ProcedureContext { - procedure_id: ProcedureId::random(), - provider, - }; - - procedure.rollback(&procedure_ctx).await.unwrap(); - - assert!(procedure.state.as_any().is::()); - assert_eq!(procedure.context.persistent_ctx, persistent_context); - } - - async fn assert_noop_rollback( - fixture: GroupRollbackFixture, - state: Box, - assert_state: impl FnOnce(&dyn State), - ) { - let original_region_routes = fixture.original_region_routes.clone(); - let procedure_ctx = TestingEnv::procedure_context(); - let mut procedure = RepartitionGroupProcedure { - state, - context: fixture.context, - }; - - procedure.rollback(&procedure_ctx).await.unwrap(); - - assert_state(&*procedure.state); - let table_route_value = procedure - .context - .get_table_route_value() - .await - .unwrap() - .into_inner(); - let region_routes = region_routes( - procedure.context.persistent_ctx.table_id, - &table_route_value, - ) - .unwrap(); - assert_eq!(region_routes.clone(), original_region_routes); - } - - async fn assert_metadata_rollback_restores_table_route( - mut fixture: GroupRollbackFixture, - state: Box, - ) { - let original_region_routes = fixture.original_region_routes.clone(); - let procedure_ctx = TestingEnv::procedure_context(); - stage_metadata(&mut fixture.context).await; - let mut procedure = RepartitionGroupProcedure { - state, - context: fixture.context, - }; - - procedure.rollback(&procedure_ctx).await.unwrap(); - - let table_route_value = procedure - .context - .get_table_route_value() - .await - .unwrap() - .into_inner(); - let region_routes = region_routes( - procedure.context.persistent_ctx.table_id, - &table_route_value, - ) - .unwrap(); - assert_eq!(region_routes.clone(), original_region_routes); - } - - #[tokio::test] - async fn test_group_rollback_is_noop_in_sync_region() { - let mut fixture = new_split_group_rollback_fixture(true).await; - assert!( - fixture - .next_state - .as_ref() - .unwrap() - .as_any() - .is::() - ); - let state = fixture.next_state.take().unwrap(); - - assert_noop_rollback(fixture, state, |state| { - assert!(state.as_any().is::()); - }) - .await; - } - - #[tokio::test] - async fn test_group_rollback_is_noop_in_exit_staging() { - let fixture = new_split_group_rollback_fixture(false).await; - - assert_noop_rollback(fixture, Box::new(UpdateMetadata::ExitStaging), |state| { - assert!(state.as_any().is::()); - assert!(matches!( - state.as_any().downcast_ref::(), - Some(UpdateMetadata::ExitStaging) - )); - }) - .await; - } - - #[tokio::test] - async fn test_group_rollback_restores_split_routes_from_apply_staging() { - let fixture = new_split_group_rollback_fixture(false).await; - assert_metadata_rollback_restores_table_route( - fixture, - Box::new(UpdateMetadata::ApplyStaging), - ) - .await; - } - - #[tokio::test] - async fn test_group_rollback_restores_split_routes_from_enter_staging_region() { - let fixture = new_split_group_rollback_fixture(false).await; - assert_metadata_rollback_restores_table_route(fixture, Box::new(EnterStagingRegion)).await; - } - - #[tokio::test] - async fn test_group_rollback_restores_split_routes_from_remap_manifest() { - let fixture = new_split_group_rollback_fixture(false).await; - assert_metadata_rollback_restores_table_route(fixture, Box::new(RemapManifest)).await; - } - - #[tokio::test] - async fn test_group_rollback_restores_split_routes_from_apply_staging_manifest() { - let fixture = new_split_group_rollback_fixture(false).await; - assert_metadata_rollback_restores_table_route(fixture, Box::new(ApplyStagingManifest)) - .await; - } - - #[tokio::test] - async fn test_group_rollback_restores_merge_routes_and_is_idempotent() { - let mut fixture = new_merge_group_rollback_fixture(false).await; - let original_region_routes = fixture.original_region_routes.clone(); - let procedure_ctx = TestingEnv::procedure_context(); - stage_metadata(&mut fixture.context).await; - let mut procedure = RepartitionGroupProcedure { - state: Box::new(UpdateMetadata::ApplyStaging), - context: fixture.context, - }; - - procedure.rollback(&procedure_ctx).await.unwrap(); - let table_route_value = procedure - .context - .get_table_route_value() - .await - .unwrap() - .into_inner(); - let once = region_routes( - procedure.context.persistent_ctx.table_id, - &table_route_value, - ) - .unwrap() - .clone(); - procedure.rollback(&procedure_ctx).await.unwrap(); - let table_route_value = procedure - .context - .get_table_route_value() - .await - .unwrap() - .into_inner(); - let twice = region_routes( - procedure.context.persistent_ctx.table_id, - &table_route_value, - ) - .unwrap() - .clone(); - - assert_eq!(once, original_region_routes); - assert_eq!(once, twice); - } } diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata.rs index cf3b111971..90fe6c5893 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata.rs @@ -14,7 +14,6 @@ pub(crate) mod apply_staging_region; pub(crate) mod exit_staging_region; -pub(crate) mod rollback_staging_region; use std::any::Any; use std::time::Instant; @@ -34,9 +33,7 @@ use crate::procedure::repartition::group::{Context, State}; pub enum UpdateMetadata { /// Applies the new partition expressions for staging regions. ApplyStaging, - /// Rolls back the new partition expressions for staging regions. - RollbackStaging, - /// Exits the staging regions. + /// Exits the staging regions after the group finishes its forward path. ExitStaging, } @@ -64,17 +61,6 @@ impl State for UpdateMetadata { ctx.update_update_metadata_elapsed(timer.elapsed()); Ok((Box::new(EnterStagingRegion), Status::executing(false))) } - UpdateMetadata::RollbackStaging => { - self.rollback_staging_regions(ctx).await?; - - if let Err(err) = ctx.invalidate_table_cache().await { - warn!( - err; - "Failed to broadcast the invalidate table cache message during the rollback staging regions" - ); - }; - Ok((Box::new(RepartitionEnd), Status::executing(false))) - } UpdateMetadata::ExitStaging => { self.exit_staging_regions(ctx).await?; if let Err(err) = ctx.invalidate_table_cache().await { diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs index f8e505f0ff..50864daa93 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs @@ -25,7 +25,7 @@ use crate::procedure::repartition::group::{Context, GroupId, region_routes}; use crate::procedure::repartition::plan::RegionDescriptor; impl UpdateMetadata { - fn exit_staging_region_routes( + pub(crate) fn exit_staging_region_routes( group_id: GroupId, sources: &[RegionDescriptor], targets: &[RegionDescriptor], diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs deleted file mode 100644 index 4e6bf67fc8..0000000000 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/rollback_staging_region.rs +++ /dev/null @@ -1,301 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use common_error::ext::BoxedError; -use common_meta::rpc::router::RegionRoute; -use common_telemetry::{error, info}; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; - -use crate::error::{self, Result}; -use crate::procedure::repartition::group::update_metadata::UpdateMetadata; -use crate::procedure::repartition::group::{Context, GroupId, region_routes}; -use crate::procedure::repartition::plan::RegionDescriptor; - -impl UpdateMetadata { - /// Rolls back the staging regions. - /// - /// Abort: - /// - Source region not found. - /// - Target region not found. - fn rollback_staging_region_routes( - group_id: GroupId, - sources: &[RegionDescriptor], - original_target_routes: &[RegionRoute], - pending_deallocate_region_ids: &[RegionId], - current_region_routes: &[RegionRoute], - ) -> Result> { - let mut region_routes = current_region_routes.to_vec(); - let mut region_routes_map = region_routes - .iter_mut() - .map(|route| (route.region.id, route)) - .collect::>(); - for source in sources { - let region_route = region_routes_map.get_mut(&source.region_id).context( - error::RepartitionSourceRegionMissingSnafu { - group_id, - region_id: source.region_id, - }, - )?; - // Clean leader staging state for source regions. - region_route.clear_leader_staging(); - if pending_deallocate_region_ids.contains(&source.region_id) { - // Clean ignore all writes state for source regions if it's pending to be deallocated, - // which means the source region is merged into the target region. - region_route.clear_ignore_all_writes(); - } - } - - for target in original_target_routes { - let region_route = region_routes_map.get_mut(&target.region.id).context( - error::RepartitionTargetRegionMissingSnafu { - group_id, - region_id: target.region.id, - }, - )?; - - // Revert the partition expression and write route policy to the original value for the target region. - region_route.region.partition_expr = target.region.partition_expr.clone(); - region_route.write_route_policy = target.write_route_policy; - - // Clean leader staging state for target regions. - region_route.clear_leader_staging(); - } - - Ok(region_routes) - } - - /// Rolls back the metadata for staging regions. - /// - /// Abort: - /// - Table route is not physical. - /// - Source region not found. - /// - Target region not found. - /// - Failed to update the table route. - /// - Central region datanode table value not found. - pub(crate) async fn rollback_staging_regions(&self, ctx: &mut Context) -> Result<()> { - let table_id = ctx.persistent_ctx.table_id; - let group_id = ctx.persistent_ctx.group_id; - let current_table_route_value = ctx.get_table_route_value().await?; - let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?; - // Safety: prepare result is set in [RepartitionStart] state. - let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); - let new_region_routes = Self::rollback_staging_region_routes( - group_id, - &ctx.persistent_ctx.sources, - &prepare_result.target_routes, - &ctx.persistent_ctx.pending_deallocate_region_ids, - region_routes, - )?; - - let source_count = prepare_result.source_routes.len(); - let target_count = prepare_result.target_routes.len(); - info!( - "Rollback staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}", - table_id, group_id, source_count, target_count - ); - - if let Err(err) = ctx - .update_table_route(¤t_table_route_value, new_region_routes) - .await - { - error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"); - return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { - reason: format!( - "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}" - ), - }); - }; - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - - use common_meta::peer::Peer; - use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; - use store_api::storage::RegionId; - use uuid::Uuid; - - use crate::procedure::repartition::group::update_metadata::UpdateMetadata; - use crate::procedure::repartition::plan::RegionDescriptor; - use crate::procedure::repartition::test_util::range_expr; - - fn new_region_route( - region_id: RegionId, - partition_expr: &str, - leader_state: Option, - ignore_all_writes: bool, - ) -> RegionRoute { - let mut route = RegionRoute { - region: Region { - id: region_id, - partition_expr: partition_expr.to_string(), - ..Default::default() - }, - leader_peer: Some(Peer::empty(1)), - leader_state, - ..Default::default() - }; - - if ignore_all_writes { - route.set_ignore_all_writes(); - } - - route - } - - fn original_target_routes( - region_routes: &[RegionRoute], - targets: &[RegionDescriptor], - ) -> Vec { - let target_ids = targets - .iter() - .map(|target| target.region_id) - .collect::>(); - region_routes - .iter() - .filter(|route| target_ids.contains(&route.region.id)) - .cloned() - .collect() - } - - #[test] - fn test_rollback_staging_region_routes_split_case() { - let group_id = Uuid::new_v4(); - let table_id = 1024; - let original_region_routes = vec![ - new_region_route( - RegionId::new(table_id, 1), - &range_expr("x", 0, 100).as_json_str().unwrap(), - None, - false, - ), - new_region_route( - RegionId::new(table_id, 2), - &range_expr("x", 100, 200).as_json_str().unwrap(), - None, - false, - ), - new_region_route(RegionId::new(table_id, 3), "", None, false), - ]; - let sources = vec![RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100), - }]; - let targets = vec![ - RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 50), - }, - RegionDescriptor { - region_id: RegionId::new(table_id, 3), - partition_expr: range_expr("x", 50, 100), - }, - ]; - let applied_region_routes = UpdateMetadata::apply_staging_region_routes( - group_id, - &sources, - &targets, - &[], - &original_region_routes, - ) - .unwrap(); - let target_routes = original_target_routes(&original_region_routes, &targets); - let new_region_routes = UpdateMetadata::rollback_staging_region_routes( - group_id, - &sources, - &target_routes, - &[], - &applied_region_routes, - ) - .unwrap(); - - assert_eq!(new_region_routes, original_region_routes); - } - - #[test] - fn test_rollback_staging_region_routes_merge_case_is_idempotent() { - let group_id = Uuid::new_v4(); - let table_id = 1024; - let original_region_routes = vec![ - new_region_route( - RegionId::new(table_id, 1), - &range_expr("x", 0, 100).as_json_str().unwrap(), - None, - false, - ), - new_region_route( - RegionId::new(table_id, 2), - &range_expr("x", 100, 200).as_json_str().unwrap(), - None, - false, - ), - new_region_route( - RegionId::new(table_id, 3), - &range_expr("x", 200, 300).as_json_str().unwrap(), - None, - false, - ), - ]; - let sources = vec![ - RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 100), - }, - RegionDescriptor { - region_id: RegionId::new(table_id, 2), - partition_expr: range_expr("x", 100, 200), - }, - ]; - let targets = vec![RegionDescriptor { - region_id: RegionId::new(table_id, 1), - partition_expr: range_expr("x", 0, 200), - }]; - let target_routes = original_target_routes(&original_region_routes, &targets); - let applied_region_routes = UpdateMetadata::apply_staging_region_routes( - group_id, - &sources, - &targets, - &[RegionId::new(table_id, 2)], - &original_region_routes, - ) - .unwrap(); - - let once = UpdateMetadata::rollback_staging_region_routes( - group_id, - &sources, - &target_routes, - &[RegionId::new(table_id, 2)], - &applied_region_routes, - ) - .unwrap(); - let twice = UpdateMetadata::rollback_staging_region_routes( - group_id, - &sources, - &target_routes, - &[RegionId::new(table_id, 2)], - &once, - ) - .unwrap(); - - assert_eq!(once, original_region_routes); - assert_eq!(once, twice); - } -} diff --git a/src/meta-srv/src/procedure/repartition/plan.rs b/src/meta-srv/src/procedure/repartition/plan.rs index 9eabec7101..063a64341b 100644 --- a/src/meta-srv/src/procedure/repartition/plan.rs +++ b/src/meta-srv/src/procedure/repartition/plan.rs @@ -14,6 +14,7 @@ use std::cmp::Ordering; +use common_meta::rpc::router::RegionRoute; use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use store_api::storage::{RegionId, RegionNumber, TableId}; @@ -46,7 +47,7 @@ pub struct AllocationPlanEntry { /// A plan entry for the dispatch phase after region allocation, /// with concrete source and target region descriptors. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct RepartitionPlanEntry { /// The group id for this plan entry. pub group_id: GroupId, @@ -61,6 +62,9 @@ pub struct RepartitionPlanEntry { /// For each `source_regions[k]`, the corresponding vector contains global /// `target_regions` that overlap with it. pub transition_map: Vec>, + /// Pre-staging target routes persisted for parent rollback and recovery. + #[serde(default)] + pub original_target_routes: Vec, } impl RepartitionPlanEntry { @@ -138,6 +142,7 @@ pub fn convert_allocation_plan_to_repartition_plan( allocated_region_ids, pending_deallocate_region_ids: vec![], transition_map: transition_map.clone(), + original_target_routes: vec![], } } Ordering::Equal => { @@ -157,6 +162,7 @@ pub fn convert_allocation_plan_to_repartition_plan( allocated_region_ids: vec![], pending_deallocate_region_ids: vec![], transition_map: transition_map.clone(), + original_target_routes: vec![], } } Ordering::Greater => { @@ -184,6 +190,7 @@ pub fn convert_allocation_plan_to_repartition_plan( allocated_region_ids: vec![], pending_deallocate_region_ids, transition_map: transition_map.clone(), + original_target_routes: vec![], } } } diff --git a/src/meta-srv/src/procedure/repartition/utils.rs b/src/meta-srv/src/procedure/repartition/utils.rs index 1bda85699b..6f274e9596 100644 --- a/src/meta-srv/src/procedure/repartition/utils.rs +++ b/src/meta-srv/src/procedure/repartition/utils.rs @@ -22,6 +22,8 @@ use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::error::{self, Result}; +use crate::procedure::repartition::group::GroupId; +use crate::procedure::repartition::plan::RegionDescriptor; /// Returns the `datanode_table_value` /// @@ -118,14 +120,79 @@ pub fn merge_and_validate_region_wal_options( Ok(new_region_wal_options) } +/// Restores group staging metadata in-place for parent repartition rollback. +/// +/// This helper lives in repartition utilities instead of the group subprocedure +/// because parent repartition owns crash recovery and rollback selection. +/// +/// The function mutates `region_routes` in place to avoid rebuilding the route +/// vector for each selected plan. It restores: +/// - source-region leader staging flags, +/// - merge-source `ignore_all_writes` markers for pending-deallocate sources, +/// - target-region partition expressions, +/// - target-region write-route policies, +/// - target-region leader staging flags. +/// +/// `original_target_routes` contains only pre-existing target routes. +/// Newly allocated targets are removed by parent rollback instead of being +/// restored here. +pub fn rollback_group_metadata_routes( + group_id: GroupId, + source_regions: &[RegionDescriptor], + original_target_routes: &[RegionRoute], + allocated_region_ids: &[RegionId], + pending_deallocate_region_ids: &[RegionId], + region_routes_map: &mut HashMap, +) -> Result<()> { + for source in source_regions { + let region_route = region_routes_map.get_mut(&source.region_id).context( + error::RepartitionSourceRegionMissingSnafu { + group_id, + region_id: source.region_id, + }, + )?; + region_route.clear_leader_staging(); + if pending_deallocate_region_ids.contains(&source.region_id) { + region_route.clear_ignore_all_writes(); + } + } + + for target in original_target_routes { + let Some(region_route) = region_routes_map.get_mut(&target.region.id) else { + // Ignores newly allocated region routes that do not exist in the current region routes. + // They may have already been deleted (to ensure idempotency). + if allocated_region_ids.contains(&target.region.id) { + continue; + } + + return error::RepartitionTargetRegionMissingSnafu { + group_id, + region_id: target.region.id, + } + .fail(); + }; + region_route.region.partition_expr = target.region.partition_expr.clone(); + region_route.write_route_policy = target.write_route_policy; + region_route.clear_leader_staging(); + } + + Ok(()) +} + #[cfg(test)] mod tests { + use std::collections::HashSet; + use common_meta::peer::Peer; - use common_meta::rpc::router::{Region, RegionRoute}; + use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; use common_wal::options::{KafkaWalOptions, WalOptions}; use store_api::storage::RegionId; + use uuid::Uuid; use super::*; + use crate::procedure::repartition::group::update_metadata::UpdateMetadata; + use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::test_util::range_expr; /// Helper function to create a Kafka WAL option string from a topic name. fn kafka_wal_option(topic: &str) -> String { @@ -149,6 +216,45 @@ mod tests { } } + fn new_staged_region_route( + region_id: RegionId, + partition_expr: &str, + leader_state: Option, + ignore_all_writes: bool, + ) -> RegionRoute { + let mut route = RegionRoute { + region: Region { + id: region_id, + partition_expr: partition_expr.to_string(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + leader_state, + ..Default::default() + }; + + if ignore_all_writes { + route.set_ignore_all_writes(); + } + + route + } + + fn original_target_routes( + region_routes: &[RegionRoute], + targets: &[RegionDescriptor], + ) -> Vec { + let target_ids = targets + .iter() + .map(|target| target.region_id) + .collect::>(); + region_routes + .iter() + .filter(|route| target_ids.contains(&route.region.id)) + .cloned() + .collect() + } + #[test] fn test_merge_and_validate_region_wal_options_success() { let table_id = 1; @@ -254,4 +360,141 @@ mod tests { assert!(error_msg.contains("Mismatch")); assert!(error_msg.contains(&table_id.to_string())); } + + #[test] + fn test_rollback_group_metadata_routes_split_case() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let original_region_routes = vec![ + new_staged_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + None, + false, + ), + new_staged_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + None, + false, + ), + new_staged_region_route(RegionId::new(table_id, 3), "", None, false), + ]; + let sources = vec![RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 100), + }]; + let targets = vec![ + RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 50), + }, + RegionDescriptor { + region_id: RegionId::new(table_id, 3), + partition_expr: range_expr("x", 50, 100), + }, + ]; + let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes( + group_id, + &sources, + &targets, + &[], + &original_region_routes, + ) + .unwrap(); + let target_routes = original_target_routes(&original_region_routes, &targets); + + rollback_group_metadata_routes( + group_id, + &sources, + &target_routes, + &[], + &[], + &mut applied_region_routes + .iter_mut() + .map(|route| (route.region.id, route)) + .collect(), + ) + .unwrap(); + + assert_eq!(applied_region_routes, original_region_routes); + } + + #[test] + fn test_rollback_group_metadata_routes_merge_case_is_idempotent() { + let group_id = Uuid::new_v4(); + let table_id = 1024; + let original_region_routes = vec![ + new_staged_region_route( + RegionId::new(table_id, 1), + &range_expr("x", 0, 100).as_json_str().unwrap(), + None, + false, + ), + new_staged_region_route( + RegionId::new(table_id, 2), + &range_expr("x", 100, 200).as_json_str().unwrap(), + None, + false, + ), + new_staged_region_route( + RegionId::new(table_id, 3), + &range_expr("x", 200, 300).as_json_str().unwrap(), + None, + false, + ), + ]; + let sources = vec![ + RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 100), + }, + RegionDescriptor { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 100, 200), + }, + ]; + let targets = vec![RegionDescriptor { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 200), + }]; + let target_routes = original_target_routes(&original_region_routes, &targets); + let mut once = UpdateMetadata::apply_staging_region_routes( + group_id, + &sources, + &targets, + &[RegionId::new(table_id, 2)], + &original_region_routes, + ) + .unwrap(); + + rollback_group_metadata_routes( + group_id, + &sources, + &target_routes, + &[], + &[RegionId::new(table_id, 2)], + &mut once + .iter_mut() + .map(|route| (route.region.id, route)) + .collect(), + ) + .unwrap(); + let mut twice = once.clone(); + rollback_group_metadata_routes( + group_id, + &sources, + &target_routes, + &[], + &[RegionId::new(table_id, 2)], + &mut twice + .iter_mut() + .map(|route| (route.region.id, route)) + .collect(), + ) + .unwrap(); + + assert_eq!(once, original_region_routes); + assert_eq!(once, twice); + } }