refactor: move group rollback ownership to parent repartition (#7967)

* refactor(meta-srv): move group rollback ownership to parent repartition procedure

- Parent procedure now owns partial rollback based on failed/unknown subprocedures
- rollback order: group metadata first, then allocated-region cleanup
- original_target_routes captured during build-plan, persisted in RepartitionPlanEntry
- rollback_group_metadata_routes moved to utils as parent-owned helper
- Group subprocedure no longer supports rollback (rollback_supported = false)
- Removed UpdateMetadata::RollbackStaging from group state machine
- Deleted redundant group rollback tests and helpers

BREAKING CHANGE: group Procedure no longer handles rollback; parent procedure
is responsible for crash recovery and selecting which plans to roll back.

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-04-16 17:37:16 +08:00
committed by GitHub
parent 525e88bce4
commit 037fccec2c
9 changed files with 841 additions and 763 deletions

View File

@@ -59,11 +59,13 @@ use crate::error::{self, Result};
use crate::procedure::repartition::collect::ProcedureMeta;
use crate::procedure::repartition::deallocate_region::DeallocateRegion;
use crate::procedure::repartition::group::{
Context as RepartitionGroupContext, RepartitionGroupProcedure,
Context as RepartitionGroupContext, RepartitionGroupProcedure, region_routes,
};
use crate::procedure::repartition::plan::RepartitionPlanEntry;
use crate::procedure::repartition::repartition_start::RepartitionStart;
use crate::procedure::repartition::utils::get_datanode_table_value;
use crate::procedure::repartition::utils::{
get_datanode_table_value, rollback_group_metadata_routes,
};
use crate::service::mailbox::MailboxRef;
#[cfg(test)]
@@ -76,11 +78,17 @@ pub struct PersistentContext {
pub table_name: String,
pub table_id: TableId,
pub plans: Vec<RepartitionPlanEntry>,
/// Records failed sub-procedures for metadata rollback.
#[serde(default)]
/// Records failed sub-procedures for parent rollback selection.
///
/// The parent repartition procedure uses these entries to decide which plans
/// require group-metadata restoration and allocated-region cleanup.
pub failed_procedures: Vec<ProcedureMeta>,
#[serde(default)]
/// Records unknown sub-procedures for metadata rollback.
/// Records unknown sub-procedures for parent rollback selection.
///
/// Unknown procedures are treated the same as failed ones when selecting the
/// plan subset that must be rolled back by the parent procedure.
pub unknown_procedures: Vec<ProcedureMeta>,
/// The timeout for repartition operations.
#[serde(with = "humantime_serde", default = "default_timeout")]
@@ -506,6 +514,23 @@ impl RepartitionProcedure {
|| self.state.as_any().is::<collect::Collect>()
}
fn rollback_plan_indices(&self) -> HashSet<usize> {
self.context
.persistent_ctx
.failed_procedures
.iter()
.chain(self.context.persistent_ctx.unknown_procedures.iter())
.map(|procedure_meta| procedure_meta.plan_index)
.collect()
}
/// Returns allocated region ids that parent rollback should remove.
///
/// Rollback uses an "after AllocateRegion" semantic:
/// - in `AllocateRegion` and `Dispatch`, all allocated regions belong to the
/// current repartition attempt and must be cleaned up.
/// - in `Collect`, only the plans referenced by failed or unknown
/// sub-procedures should be rolled back.
fn rollback_allocated_region_ids(&self) -> HashSet<store_api::storage::RegionId> {
if self.state.as_any().is::<allocate_region::AllocateRegion>()
|| self.state.as_any().is::<dispatch::Dispatch>()
@@ -519,13 +544,9 @@ impl RepartitionProcedure {
.collect();
}
self.context
.persistent_ctx
.failed_procedures
.iter()
.chain(self.context.persistent_ctx.unknown_procedures.iter())
.flat_map(|procedure_meta| {
let plan_index = procedure_meta.plan_index;
self.rollback_plan_indices()
.into_iter()
.flat_map(|plan_index| {
self.context.persistent_ctx.plans[plan_index]
.allocated_region_ids
.iter()
@@ -534,15 +555,35 @@ impl RepartitionProcedure {
.collect()
}
fn filter_allocated_region_routes(
region_routes: &[RegionRoute],
allocated_region_ids: &HashSet<store_api::storage::RegionId>,
) -> Vec<RegionRoute> {
region_routes
.iter()
.filter(|route| !allocated_region_ids.contains(&route.region.id))
.cloned()
.collect()
/// Restores group-level staging metadata for failed/unknown plans.
///
/// The helper mutates `region_routes` in memory.
async fn rollback_group_metadata_for_selected_plans(
&mut self,
region_routes: &mut [RegionRoute],
) -> Result<()> {
let rollback_plan_indices = self.rollback_plan_indices();
if rollback_plan_indices.is_empty() {
return Ok(());
}
let mut region_routes_map = region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for plan_index in rollback_plan_indices {
let plan = &self.context.persistent_ctx.plans[plan_index];
rollback_group_metadata_routes(
plan.group_id,
&plan.source_regions,
&plan.original_target_routes,
&plan.allocated_region_ids,
&plan.pending_deallocate_region_ids,
&mut region_routes_map,
)?;
}
Ok(())
}
async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
@@ -552,17 +593,17 @@ impl RepartitionProcedure {
let table_id = self.context.persistent_ctx.table_id;
let allocated_region_ids = self.rollback_allocated_region_ids();
if allocated_region_ids.is_empty() {
return Ok(());
}
let table_lock = TableLock::Write(table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
let table_route_value = self.context.get_table_route_value().await?;
let current_region_routes = table_route_value.region_routes().unwrap();
let original_region_routes = region_routes(table_id, table_route_value.get_inner_ref())?;
let mut current_region_routes = original_region_routes.clone();
self.rollback_group_metadata_for_selected_plans(&mut current_region_routes)
.await?;
let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
table_id,
current_region_routes,
&current_region_routes,
&allocated_region_ids,
);
if !allocated_region_routes.is_empty() {
@@ -587,9 +628,9 @@ impl RepartitionProcedure {
}
let new_region_routes =
Self::filter_allocated_region_routes(current_region_routes, &allocated_region_ids);
DeallocateRegion::generate_region_routes(&current_region_routes, &allocated_region_ids);
if new_region_routes.len() != current_region_routes.len() {
if new_region_routes != *original_region_routes {
self.context
.update_table_route(&table_route_value, new_region_routes, HashMap::new())
.await
@@ -809,6 +850,7 @@ mod tests {
use crate::procedure::repartition::collect::Collect;
use crate::procedure::repartition::deallocate_region::DeallocateRegion;
use crate::procedure::repartition::dispatch::Dispatch;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::repartition_end::RepartitionEnd;
use crate::procedure::repartition::test_util::{
@@ -837,9 +879,52 @@ mod tests {
allocated_region_ids: vec![RegionId::new(table_id, 3)],
pending_deallocate_region_ids: vec![],
transition_map: vec![vec![0, 1]],
original_target_routes: vec![],
}
}
fn with_rollback_metadata(
mut plan: RepartitionPlanEntry,
original_target_routes: Vec<RegionRoute>,
) -> RepartitionPlanEntry {
plan.original_target_routes = original_target_routes;
plan
}
fn apply_group_staging(
plan: &RepartitionPlanEntry,
current_region_routes: &[RegionRoute],
) -> Vec<RegionRoute> {
UpdateMetadata::apply_staging_region_routes(
plan.group_id,
&plan.source_regions,
&plan.target_regions,
&plan.pending_deallocate_region_ids,
current_region_routes,
)
.unwrap()
}
fn exit_group_staging(
plan: &RepartitionPlanEntry,
current_region_routes: &[RegionRoute],
) -> Vec<RegionRoute> {
UpdateMetadata::exit_staging_region_routes(
plan.group_id,
&plan.source_regions,
&plan.target_regions,
current_region_routes,
)
.unwrap()
}
fn region_route_by_id(region_routes: &[RegionRoute], region_id: RegionId) -> &RegionRoute {
region_routes
.iter()
.find(|route| route.region.id == region_id)
.unwrap()
}
fn test_procedure(state: Box<dyn State>, context: Context) -> RepartitionProcedure {
RepartitionProcedure { state, context }
}
@@ -870,10 +955,8 @@ mod tests {
];
let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]);
let new_region_routes = RepartitionProcedure::filter_allocated_region_routes(
&region_routes,
&allocated_region_ids,
);
let new_region_routes =
DeallocateRegion::generate_region_routes(&region_routes, &allocated_region_ids);
assert_eq!(new_region_routes.len(), 1);
assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1));
@@ -939,7 +1022,16 @@ mod tests {
table_id,
None,
);
persistent_ctx.plans = vec![test_plan(table_id)];
persistent_ctx.plans = vec![with_rollback_metadata(
test_plan(table_id),
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 3), ""),
],
)];
persistent_ctx.failed_procedures = vec![ProcedureMeta {
plan_index: 0,
group_id: Uuid::new_v4(),
@@ -1050,6 +1142,16 @@ mod tests {
None,
);
let failed_plan = test_plan(table_id);
let failed_plan = with_rollback_metadata(
failed_plan,
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 3), ""),
],
);
let succeeded_plan = RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![RegionDescriptor {
@@ -1069,6 +1171,13 @@ mod tests {
allocated_region_ids: vec![RegionId::new(table_id, 4)],
pending_deallocate_region_ids: vec![],
transition_map: vec![vec![0]],
original_target_routes: vec![
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 4), ""),
],
};
persistent_ctx.plans = vec![failed_plan, succeeded_plan];
persistent_ctx.failed_procedures = vec![ProcedureMeta {
@@ -1100,6 +1209,212 @@ mod tests {
assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4));
}
#[tokio::test]
async fn test_repartition_rollback_from_collect_restores_failed_group_metadata_only() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
let ddl_ctx = env.ddl_context(node_manager);
let original_region_routes = vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 3), ""),
test_region_route(RegionId::new(table_id, 4), ""),
];
let failed_plan = with_rollback_metadata(
test_plan(table_id),
vec![
original_region_routes[0].clone(),
original_region_routes[2].clone(),
],
);
let succeeded_plan = RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
}],
target_regions: vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 150),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 4),
partition_expr: range_expr("x", 150, 200),
},
],
allocated_region_ids: vec![RegionId::new(table_id, 4)],
pending_deallocate_region_ids: vec![],
transition_map: vec![vec![0, 1]],
original_target_routes: vec![
original_region_routes[1].clone(),
original_region_routes[3].clone(),
],
};
let current_region_routes = apply_group_staging(&failed_plan, &original_region_routes);
let current_region_routes = apply_group_staging(&succeeded_plan, &current_region_routes);
let current_region_routes = exit_group_staging(&succeeded_plan, &current_region_routes);
env.create_physical_table_metadata_with_wal_options(
table_id,
current_region_routes,
test_region_wal_options(&[1, 2, 3, 4]),
)
.await;
let mut persistent_ctx = PersistentContext::new(
TableName::new("test_catalog", "test_schema", "test_table"),
table_id,
None,
);
persistent_ctx.plans = vec![failed_plan, succeeded_plan.clone()];
persistent_ctx.failed_procedures = vec![ProcedureMeta {
plan_index: 0,
group_id: persistent_ctx.plans[0].group_id,
procedure_id: ProcedureId::random(),
}];
let context = Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
);
let mut procedure = RepartitionProcedure {
state: Box::new(Collect::new(vec![])),
context,
};
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
assert_eq!(
current_parent_region_routes(&procedure.context).await,
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 150).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 4),
partition_expr: range_expr("x", 150, 200).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
]
);
}
#[tokio::test]
async fn test_repartition_rollback_from_collect_restores_unknown_group_metadata() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
let ddl_ctx = env.ddl_context(node_manager);
let original_region_routes = vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 3), ""),
];
let plan = with_rollback_metadata(
test_plan(table_id),
vec![
original_region_routes[0].clone(),
original_region_routes[2].clone(),
],
);
let staged_region_routes = apply_group_staging(&plan, &original_region_routes);
assert_eq!(
region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
.region
.partition_expr(),
range_expr("x", 0, 50).as_json_str().unwrap()
);
assert!(
region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
.is_leader_staging()
);
assert_eq!(
region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
.region
.partition_expr(),
range_expr("x", 50, 100).as_json_str().unwrap()
);
assert!(
region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
.is_leader_staging()
);
env.create_physical_table_metadata_with_wal_options(
table_id,
staged_region_routes,
test_region_wal_options(&[1, 2, 3]),
)
.await;
let mut persistent_ctx = PersistentContext::new(
TableName::new("test_catalog", "test_schema", "test_table"),
table_id,
None,
);
persistent_ctx.plans = vec![plan.clone()];
persistent_ctx.unknown_procedures = vec![ProcedureMeta {
plan_index: 0,
group_id: plan.group_id,
procedure_id: ProcedureId::random(),
}];
let context = Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
);
let mut procedure = RepartitionProcedure {
state: Box::new(Collect::new(vec![])),
context,
};
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
assert_eq!(
current_parent_region_routes(&procedure.context).await,
vec![
original_region_routes[0].clone(),
original_region_routes[1].clone()
]
);
}
#[tokio::test]
async fn test_repartition_rollback_is_idempotent() {
let env = TestingEnv::new();
@@ -1129,7 +1444,16 @@ mod tests {
table_id,
None,
);
persistent_ctx.plans = vec![test_plan(table_id)];
persistent_ctx.plans = vec![with_rollback_metadata(
test_plan(table_id),
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 3), ""),
],
)];
persistent_ctx.failed_procedures = vec![ProcedureMeta {
plan_index: 0,
group_id: Uuid::new_v4(),
@@ -1164,6 +1488,147 @@ mod tests {
assert_eq!(once[1].region.id, RegionId::new(table_id, 2));
}
#[tokio::test]
async fn test_repartition_rollback_from_collect_restores_failed_merge_group_metadata_only() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
let ddl_ctx = env.ddl_context(node_manager);
let original_region_routes = vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 3),
&range_expr("x", 200, 300).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 4), ""),
];
let failed_merge_plan = RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
},
],
target_regions: vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 200),
}],
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![RegionId::new(table_id, 2)],
transition_map: vec![vec![0], vec![0]],
original_target_routes: vec![original_region_routes[0].clone()],
};
let succeeded_split_plan = RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![RegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 200, 300),
}],
target_regions: vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 200, 250),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 4),
partition_expr: range_expr("x", 250, 300),
},
],
allocated_region_ids: vec![RegionId::new(table_id, 4)],
pending_deallocate_region_ids: vec![],
transition_map: vec![vec![0, 1]],
original_target_routes: vec![
original_region_routes[2].clone(),
original_region_routes[3].clone(),
],
};
let current_region_routes =
apply_group_staging(&failed_merge_plan, &original_region_routes);
let current_region_routes =
apply_group_staging(&succeeded_split_plan, &current_region_routes);
let staged_region_routes =
exit_group_staging(&succeeded_split_plan, &current_region_routes);
env.create_physical_table_metadata_with_wal_options(
table_id,
staged_region_routes,
test_region_wal_options(&[1, 2, 3, 4]),
)
.await;
let mut persistent_ctx = PersistentContext::new(
TableName::new("test_catalog", "test_schema", "test_table"),
table_id,
None,
);
persistent_ctx.plans = vec![failed_merge_plan, succeeded_split_plan.clone()];
persistent_ctx.failed_procedures = vec![ProcedureMeta {
plan_index: 0,
group_id: persistent_ctx.plans[0].group_id,
procedure_id: ProcedureId::random(),
}];
let context = Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
);
let mut procedure = RepartitionProcedure {
state: Box::new(Collect::new(vec![])),
context,
};
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
let region_routes = current_parent_region_routes(&procedure.context).await;
assert_eq!(
region_routes,
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 200, 250).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 4),
partition_expr: range_expr("x", 250, 300).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
]
);
}
#[tokio::test]
async fn test_repartition_procedure_flow_split_failed_and_full_rollback() {
let env = TestingEnv::new();

View File

@@ -25,8 +25,8 @@ use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{debug, info};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::ResultExt;
use store_api::storage::{RegionNumber, TableId};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber, TableId};
use table::metadata::TableInfo;
use table::table_reference::TableReference;
use tokio::time::Instant;
@@ -104,7 +104,8 @@ impl BuildPlan {
table_id,
&mut next_region_number,
&self.plan_entries,
);
table_route_value.region_routes().unwrap(),
)?;
let plan_count = repartition_plan_entries.len();
let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
info!(
@@ -258,25 +259,64 @@ impl AllocateRegion {
/// Converts allocation plan entries to repartition plan entries.
///
/// This method takes the allocation plan entries and converts them to repartition plan entries,
/// updating `next_region_number` for each newly allocated region.
/// This method converts allocation intents into concrete repartition plans,
/// updates `next_region_number` for newly allocated regions, and captures
/// each plan's `original_target_routes` from the current table-route view.
///
/// This also persists each plan's pre-staging target routes for rollback.
fn convert_to_repartition_plans(
table_id: TableId,
next_region_number: &mut RegionNumber,
plan_entries: &[AllocationPlanEntry],
) -> Vec<RepartitionPlanEntry> {
current_region_routes: &[RegionRoute],
) -> Result<Vec<RepartitionPlanEntry>> {
let region_routes_map = current_region_routes
.iter()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
plan_entries
.iter()
.map(|plan_entry| {
convert_allocation_plan_to_repartition_plan(
let mut plan = convert_allocation_plan_to_repartition_plan(
table_id,
next_region_number,
plan_entry,
)
);
Self::capture_plan_original_target_routes(&mut plan, &region_routes_map)?;
Ok(plan)
})
.collect()
}
fn capture_plan_original_target_routes(
plan: &mut RepartitionPlanEntry,
region_routes_map: &HashMap<RegionId, &RegionRoute>,
) -> Result<()> {
// Persist the pre-staging target-route view on the parent plan.
// Newly allocated targets are skipped because rollback deletes their
// route metadata rather than restoring an original target route.
let mut original_target_routes = Vec::with_capacity(plan.target_regions.len());
for target in &plan.target_regions {
if plan.allocated_region_ids.contains(&target.region_id) {
// This target region is to be allocated, so it doesn't exist in current routes.
continue;
}
let route = region_routes_map.get(&target.region_id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id: plan.group_id,
region_id: target.region_id,
},
)?;
{
original_target_routes.push((*route).clone());
}
}
plan.original_target_routes = original_target_routes;
Ok(())
}
/// Collects all regions that need to be allocated from the repartition plan entries.
fn collect_allocate_regions(
repartition_plan_entries: &[RepartitionPlanEntry],
@@ -357,6 +397,8 @@ impl AllocateRegion {
#[cfg(test)]
mod tests {
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
@@ -405,6 +447,20 @@ mod tests {
}
}
fn create_current_region_routes(table_id: TableId, region_numbers: &[u32]) -> Vec<RegionRoute> {
region_numbers
.iter()
.map(|region_number| RegionRoute {
region: Region {
id: RegionId::new(table_id, *region_number),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
})
.collect()
}
#[test]
fn test_convert_to_repartition_plans_no_allocation() {
let table_id = 1024;
@@ -421,7 +477,9 @@ mod tests {
table_id,
&mut next_region_number,
&plan_entries,
);
&create_current_region_routes(table_id, &[1, 2]),
)
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].target_regions.len(), 2);
@@ -446,7 +504,9 @@ mod tests {
table_id,
&mut next_region_number,
&plan_entries,
);
&create_current_region_routes(table_id, &[1, 2]),
)
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].target_regions.len(), 4);
@@ -479,7 +539,9 @@ mod tests {
table_id,
&mut next_region_number,
&plan_entries,
);
&create_current_region_routes(table_id, &[1, 2, 3, 4]),
)
.unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result[0].allocated_region_ids.len(), 1);
@@ -504,7 +566,9 @@ mod tests {
table_id,
&mut next_region_number,
&plan_entries,
);
&create_current_region_routes(table_id, &[1, 2, 3, 4]),
)
.unwrap();
let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
assert_eq!(count, 2);
@@ -524,7 +588,9 @@ mod tests {
table_id,
&mut next_region_number,
&plan_entries,
);
&create_current_region_routes(table_id, &[1, 2]),
)
.unwrap();
let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
assert_eq!(allocate_regions.len(), 2);

View File

@@ -267,6 +267,7 @@ mod tests {
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![RegionId::new(table_id, 1)],
transition_map: vec![],
original_target_routes: vec![],
}];
let mut state = DeallocateRegion;

View File

@@ -41,18 +41,14 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::{error, info, warn};
use common_telemetry::{error, info};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use uuid::Uuid;
use crate::error::{self, Result};
use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
use crate::procedure::repartition::group::remap_manifest::RemapManifest;
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::utils::get_datanode_table_value;
use crate::procedure::repartition::{self};
@@ -196,62 +192,6 @@ impl RepartitionGroupProcedure {
Ok(Self { state, context })
}
async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
if !self.should_rollback_metadata() {
return Ok(());
}
let table_lock =
common_meta::lock_key::TableLock::Write(self.context.persistent_ctx.table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
UpdateMetadata::RollbackStaging
.rollback_staging_regions(&mut self.context)
.await?;
if let Err(err) = self.context.invalidate_table_cache().await {
warn!(
err;
"Failed to broadcast the invalidate table cache message during repartition group rollback"
);
}
Ok(())
}
/// Returns whether group rollback should revert staging metadata.
///
/// This uses an "after metadata apply, before exit staging" semantic.
/// Once execution reaches `UpdateMetadata::ApplyStaging` or any later staging state,
/// rollback must restore table-route metadata back to the pre-apply view.
///
/// State flow:
/// `RepartitionStart -> SyncRegion -> UpdateMetadata::ApplyStaging -> EnterStagingRegion`
/// ` -> RemapManifest -> ApplyStagingManifest -> UpdateMetadata::ExitStaging -> RepartitionEnd`
/// ` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^`
/// ` rollback staging metadata`
///
/// Notes:
/// - `RepartitionStart` / `SyncRegion`: no-op, metadata has not been staged yet.
/// - `UpdateMetadata::ApplyStaging` / `EnterStagingRegion` / `RemapManifest` /
/// `ApplyStagingManifest` / `UpdateMetadata::RollbackStaging`: rollback-active.
/// - `UpdateMetadata::ExitStaging` / `RepartitionEnd`: excluded, because metadata has
/// already moved into the post-commit exit path.
fn should_rollback_metadata(&self) -> bool {
self.state.as_any().is::<EnterStagingRegion>()
|| self.state.as_any().is::<RemapManifest>()
|| self.state.as_any().is::<ApplyStagingManifest>()
|| self
.state
.as_any()
.downcast_ref::<UpdateMetadata>()
.is_some_and(|state| {
matches!(
state,
UpdateMetadata::ApplyStaging | UpdateMetadata::RollbackStaging
)
})
}
}
#[async_trait::async_trait]
@@ -260,10 +200,10 @@ impl Procedure for RepartitionGroupProcedure {
Self::TYPE_NAME
}
async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner(ctx)
.await
.map_err(ProcedureError::external)
async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
// The parent repartition procedure is responsible for rollback and recovery.
// Subprocedures are not recovered after metasrv restarts, so implementing rollback for them is meaningless.
Ok(())
}
#[tracing::instrument(skip_all, fields(
@@ -304,7 +244,9 @@ impl Procedure for RepartitionGroupProcedure {
}
fn rollback_supported(&self) -> bool {
true
// Parent repartition owns rollback and recovery because subprocedures are
// not relied on as durable rollback units across metasrv restarts.
false
}
fn dump(&self) -> ProcedureResult<String> {
@@ -665,149 +607,12 @@ pub(crate) trait State: Sync + Send + Debug {
mod tests {
use std::assert_matches;
use std::sync::Arc;
use std::time::Duration;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::test_util::MockKvBackendBuilder;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use partition::expr::PartitionExpr;
use store_api::storage::RegionId;
use super::{
Context, PersistentContext, RepartitionGroupProcedure, RepartitionStart, State,
region_routes,
};
use crate::error::Error;
use crate::procedure::repartition::dispatch::build_region_mapping;
use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
use crate::procedure::repartition::group::remap_manifest::RemapManifest;
use crate::procedure::repartition::group::repartition_start::RepartitionStart as GroupRepartitionStart;
use crate::procedure::repartition::group::sync_region::SyncRegion;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan;
use crate::procedure::repartition::repartition_start::RepartitionStart as ParentRepartitionStart;
use crate::procedure::repartition::test_util::{
TestingEnv, new_persistent_context, range_expr,
};
struct GroupRollbackFixture {
context: Context,
original_region_routes: Vec<RegionRoute>,
next_state: Option<Box<dyn State>>,
}
async fn new_group_rollback_fixture(
original_region_routes: Vec<RegionRoute>,
from_exprs: Vec<PartitionExpr>,
to_exprs: Vec<PartitionExpr>,
sync_region: bool,
) -> GroupRollbackFixture {
let env = TestingEnv::new();
let procedure_ctx = TestingEnv::procedure_context();
let table_id = 1024;
let mut next_region_number = 10;
env.create_physical_table_metadata(table_id, original_region_routes.clone())
.await;
let (_, physical_route) = env
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.unwrap();
let allocation_plans =
ParentRepartitionStart::build_plan(&physical_route, &from_exprs, &to_exprs).unwrap();
assert_eq!(allocation_plans.len(), 1);
let repartition_plan = plan::convert_allocation_plan_to_repartition_plan(
table_id,
&mut next_region_number,
&allocation_plans[0],
);
let region_mapping = build_region_mapping(
&repartition_plan.source_regions,
&repartition_plan.target_regions,
&repartition_plan.transition_map,
);
let persistent_context = PersistentContext::new(
repartition_plan.group_id,
table_id,
"test_catalog".to_string(),
"test_schema".to_string(),
repartition_plan.source_regions,
repartition_plan.target_regions,
region_mapping,
sync_region,
repartition_plan.allocated_region_ids,
repartition_plan.pending_deallocate_region_ids,
Duration::from_secs(120),
);
let mut context = env.create_context(persistent_context);
let (next_state, _) = GroupRepartitionStart
.next(&mut context, &procedure_ctx)
.await
.unwrap();
GroupRollbackFixture {
context,
original_region_routes,
next_state: Some(next_state),
}
}
async fn new_split_group_rollback_fixture(sync_region: bool) -> GroupRollbackFixture {
new_group_rollback_fixture(
vec![
new_region_route(RegionId::new(1024, 1), Some(range_expr("x", 0, 100))),
new_region_route(RegionId::new(1024, 2), Some(range_expr("x", 100, 200))),
new_region_route(RegionId::new(1024, 10), None),
],
vec![range_expr("x", 0, 100)],
vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
sync_region,
)
.await
}
async fn new_merge_group_rollback_fixture(sync_region: bool) -> GroupRollbackFixture {
new_group_rollback_fixture(
vec![
new_region_route(RegionId::new(1024, 1), Some(range_expr("x", 0, 100))),
new_region_route(RegionId::new(1024, 2), Some(range_expr("x", 100, 200))),
new_region_route(RegionId::new(1024, 3), Some(range_expr("x", 200, 300))),
],
vec![range_expr("x", 0, 100), range_expr("x", 100, 200)],
vec![range_expr("x", 0, 200)],
sync_region,
)
.await
}
async fn stage_metadata(context: &mut Context) {
UpdateMetadata::ApplyStaging
.apply_staging_regions(context)
.await
.unwrap();
}
fn new_region_route(region_id: RegionId, partition_expr: Option<PartitionExpr>) -> RegionRoute {
RegionRoute {
region: Region {
id: region_id,
partition_expr: partition_expr
.map(|expr| expr.as_json_str().unwrap())
.unwrap_or_default(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}
}
use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
#[tokio::test]
async fn test_get_table_route_value_not_found_error() {
@@ -856,198 +661,4 @@ mod tests {
let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
assert!(err.is_retryable());
}
#[tokio::test]
async fn test_group_rollback_supported() {
let env = TestingEnv::new();
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let procedure = RepartitionGroupProcedure {
state: Box::new(RepartitionStart),
context: env.create_context(persistent_context),
};
assert!(procedure.rollback_supported());
}
#[tokio::test]
async fn test_group_rollback_is_noop_before_apply_staging() {
let env = TestingEnv::new();
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let ctx = env.create_context(persistent_context.clone());
let mut procedure = RepartitionGroupProcedure {
state: Box::new(RepartitionStart),
context: ctx,
};
let provider = Arc::new(MockContextProvider::new(Default::default()));
let procedure_ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider,
};
procedure.rollback(&procedure_ctx).await.unwrap();
assert!(procedure.state.as_any().is::<RepartitionStart>());
assert_eq!(procedure.context.persistent_ctx, persistent_context);
}
async fn assert_noop_rollback(
fixture: GroupRollbackFixture,
state: Box<dyn State>,
assert_state: impl FnOnce(&dyn State),
) {
let original_region_routes = fixture.original_region_routes.clone();
let procedure_ctx = TestingEnv::procedure_context();
let mut procedure = RepartitionGroupProcedure {
state,
context: fixture.context,
};
procedure.rollback(&procedure_ctx).await.unwrap();
assert_state(&*procedure.state);
let table_route_value = procedure
.context
.get_table_route_value()
.await
.unwrap()
.into_inner();
let region_routes = region_routes(
procedure.context.persistent_ctx.table_id,
&table_route_value,
)
.unwrap();
assert_eq!(region_routes.clone(), original_region_routes);
}
async fn assert_metadata_rollback_restores_table_route(
mut fixture: GroupRollbackFixture,
state: Box<dyn State>,
) {
let original_region_routes = fixture.original_region_routes.clone();
let procedure_ctx = TestingEnv::procedure_context();
stage_metadata(&mut fixture.context).await;
let mut procedure = RepartitionGroupProcedure {
state,
context: fixture.context,
};
procedure.rollback(&procedure_ctx).await.unwrap();
let table_route_value = procedure
.context
.get_table_route_value()
.await
.unwrap()
.into_inner();
let region_routes = region_routes(
procedure.context.persistent_ctx.table_id,
&table_route_value,
)
.unwrap();
assert_eq!(region_routes.clone(), original_region_routes);
}
#[tokio::test]
async fn test_group_rollback_is_noop_in_sync_region() {
let mut fixture = new_split_group_rollback_fixture(true).await;
assert!(
fixture
.next_state
.as_ref()
.unwrap()
.as_any()
.is::<SyncRegion>()
);
let state = fixture.next_state.take().unwrap();
assert_noop_rollback(fixture, state, |state| {
assert!(state.as_any().is::<SyncRegion>());
})
.await;
}
#[tokio::test]
async fn test_group_rollback_is_noop_in_exit_staging() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_noop_rollback(fixture, Box::new(UpdateMetadata::ExitStaging), |state| {
assert!(state.as_any().is::<UpdateMetadata>());
assert!(matches!(
state.as_any().downcast_ref::<UpdateMetadata>(),
Some(UpdateMetadata::ExitStaging)
));
})
.await;
}
#[tokio::test]
async fn test_group_rollback_restores_split_routes_from_apply_staging() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_metadata_rollback_restores_table_route(
fixture,
Box::new(UpdateMetadata::ApplyStaging),
)
.await;
}
#[tokio::test]
async fn test_group_rollback_restores_split_routes_from_enter_staging_region() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_metadata_rollback_restores_table_route(fixture, Box::new(EnterStagingRegion)).await;
}
#[tokio::test]
async fn test_group_rollback_restores_split_routes_from_remap_manifest() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_metadata_rollback_restores_table_route(fixture, Box::new(RemapManifest)).await;
}
#[tokio::test]
async fn test_group_rollback_restores_split_routes_from_apply_staging_manifest() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_metadata_rollback_restores_table_route(fixture, Box::new(ApplyStagingManifest))
.await;
}
#[tokio::test]
async fn test_group_rollback_restores_merge_routes_and_is_idempotent() {
let mut fixture = new_merge_group_rollback_fixture(false).await;
let original_region_routes = fixture.original_region_routes.clone();
let procedure_ctx = TestingEnv::procedure_context();
stage_metadata(&mut fixture.context).await;
let mut procedure = RepartitionGroupProcedure {
state: Box::new(UpdateMetadata::ApplyStaging),
context: fixture.context,
};
procedure.rollback(&procedure_ctx).await.unwrap();
let table_route_value = procedure
.context
.get_table_route_value()
.await
.unwrap()
.into_inner();
let once = region_routes(
procedure.context.persistent_ctx.table_id,
&table_route_value,
)
.unwrap()
.clone();
procedure.rollback(&procedure_ctx).await.unwrap();
let table_route_value = procedure
.context
.get_table_route_value()
.await
.unwrap()
.into_inner();
let twice = region_routes(
procedure.context.persistent_ctx.table_id,
&table_route_value,
)
.unwrap()
.clone();
assert_eq!(once, original_region_routes);
assert_eq!(once, twice);
}
}

View File

@@ -14,7 +14,6 @@
pub(crate) mod apply_staging_region;
pub(crate) mod exit_staging_region;
pub(crate) mod rollback_staging_region;
use std::any::Any;
use std::time::Instant;
@@ -34,9 +33,7 @@ use crate::procedure::repartition::group::{Context, State};
pub enum UpdateMetadata {
/// Applies the new partition expressions for staging regions.
ApplyStaging,
/// Rolls back the new partition expressions for staging regions.
RollbackStaging,
/// Exits the staging regions.
/// Exits the staging regions after the group finishes its forward path.
ExitStaging,
}
@@ -64,17 +61,6 @@ impl State for UpdateMetadata {
ctx.update_update_metadata_elapsed(timer.elapsed());
Ok((Box::new(EnterStagingRegion), Status::executing(false)))
}
UpdateMetadata::RollbackStaging => {
self.rollback_staging_regions(ctx).await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
err;
"Failed to broadcast the invalidate table cache message during the rollback staging regions"
);
};
Ok((Box::new(RepartitionEnd), Status::executing(false)))
}
UpdateMetadata::ExitStaging => {
self.exit_staging_regions(ctx).await?;
if let Err(err) = ctx.invalidate_table_cache().await {

View File

@@ -25,7 +25,7 @@ use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
impl UpdateMetadata {
fn exit_staging_region_routes(
pub(crate) fn exit_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],

View File

@@ -1,301 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
impl UpdateMetadata {
/// Rolls back the staging regions.
///
/// Abort:
/// - Source region not found.
/// - Target region not found.
fn rollback_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
original_target_routes: &[RegionRoute],
pending_deallocate_region_ids: &[RegionId],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
let mut region_routes_map = region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for source in sources {
let region_route = region_routes_map.get_mut(&source.region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
},
)?;
// Clean leader staging state for source regions.
region_route.clear_leader_staging();
if pending_deallocate_region_ids.contains(&source.region_id) {
// Clean ignore all writes state for source regions if it's pending to be deallocated,
// which means the source region is merged into the target region.
region_route.clear_ignore_all_writes();
}
}
for target in original_target_routes {
let region_route = region_routes_map.get_mut(&target.region.id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region.id,
},
)?;
// Revert the partition expression and write route policy to the original value for the target region.
region_route.region.partition_expr = target.region.partition_expr.clone();
region_route.write_route_policy = target.write_route_policy;
// Clean leader staging state for target regions.
region_route.clear_leader_staging();
}
Ok(region_routes)
}
/// Rolls back the metadata for staging regions.
///
/// Abort:
/// - Table route is not physical.
/// - Source region not found.
/// - Target region not found.
/// - Failed to update the table route.
/// - Central region datanode table value not found.
pub(crate) async fn rollback_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let current_table_route_value = ctx.get_table_route_value().await?;
let region_routes = region_routes(table_id, current_table_route_value.get_inner_ref())?;
// Safety: prepare result is set in [RepartitionStart] state.
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let new_region_routes = Self::rollback_staging_region_routes(
group_id,
&ctx.persistent_ctx.sources,
&prepare_result.target_routes,
&ctx.persistent_ctx.pending_deallocate_region_ids,
region_routes,
)?;
let source_count = prepare_result.source_routes.len();
let target_count = prepare_result.target_routes.len();
info!(
"Rollback staging regions for repartition, table_id: {}, group_id: {}, sources: {}, targets: {}",
table_id, group_id, source_count, target_count
);
if let Err(err) = ctx
.update_table_route(&current_table_route_value, new_region_routes)
.await
{
error!(err; "Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}");
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the updating metadata for repartition: {table_id}, group_id: {group_id}"
),
});
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use common_meta::peer::Peer;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::test_util::range_expr;
fn new_region_route(
region_id: RegionId,
partition_expr: &str,
leader_state: Option<LeaderState>,
ignore_all_writes: bool,
) -> RegionRoute {
let mut route = RegionRoute {
region: Region {
id: region_id,
partition_expr: partition_expr.to_string(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state,
..Default::default()
};
if ignore_all_writes {
route.set_ignore_all_writes();
}
route
}
fn original_target_routes(
region_routes: &[RegionRoute],
targets: &[RegionDescriptor],
) -> Vec<RegionRoute> {
let target_ids = targets
.iter()
.map(|target| target.region_id)
.collect::<HashSet<_>>();
region_routes
.iter()
.filter(|route| target_ids.contains(&route.region.id))
.cloned()
.collect()
}
#[test]
fn test_rollback_staging_region_routes_split_case() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let original_region_routes = vec![
new_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
None,
false,
),
new_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
None,
false,
),
new_region_route(RegionId::new(table_id, 3), "", None, false),
];
let sources = vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
}];
let targets = vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 50),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 50, 100),
},
];
let applied_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&sources,
&targets,
&[],
&original_region_routes,
)
.unwrap();
let target_routes = original_target_routes(&original_region_routes, &targets);
let new_region_routes = UpdateMetadata::rollback_staging_region_routes(
group_id,
&sources,
&target_routes,
&[],
&applied_region_routes,
)
.unwrap();
assert_eq!(new_region_routes, original_region_routes);
}
#[test]
fn test_rollback_staging_region_routes_merge_case_is_idempotent() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let original_region_routes = vec![
new_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
None,
false,
),
new_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
None,
false,
),
new_region_route(
RegionId::new(table_id, 3),
&range_expr("x", 200, 300).as_json_str().unwrap(),
None,
false,
),
];
let sources = vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
},
];
let targets = vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 200),
}];
let target_routes = original_target_routes(&original_region_routes, &targets);
let applied_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&sources,
&targets,
&[RegionId::new(table_id, 2)],
&original_region_routes,
)
.unwrap();
let once = UpdateMetadata::rollback_staging_region_routes(
group_id,
&sources,
&target_routes,
&[RegionId::new(table_id, 2)],
&applied_region_routes,
)
.unwrap();
let twice = UpdateMetadata::rollback_staging_region_routes(
group_id,
&sources,
&target_routes,
&[RegionId::new(table_id, 2)],
&once,
)
.unwrap();
assert_eq!(once, original_region_routes);
assert_eq!(once, twice);
}
}

View File

@@ -14,6 +14,7 @@
use std::cmp::Ordering;
use common_meta::rpc::router::RegionRoute;
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use store_api::storage::{RegionId, RegionNumber, TableId};
@@ -46,7 +47,7 @@ pub struct AllocationPlanEntry {
/// A plan entry for the dispatch phase after region allocation,
/// with concrete source and target region descriptors.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RepartitionPlanEntry {
/// The group id for this plan entry.
pub group_id: GroupId,
@@ -61,6 +62,9 @@ pub struct RepartitionPlanEntry {
/// For each `source_regions[k]`, the corresponding vector contains global
/// `target_regions` that overlap with it.
pub transition_map: Vec<Vec<usize>>,
/// Pre-staging target routes persisted for parent rollback and recovery.
#[serde(default)]
pub original_target_routes: Vec<RegionRoute>,
}
impl RepartitionPlanEntry {
@@ -138,6 +142,7 @@ pub fn convert_allocation_plan_to_repartition_plan(
allocated_region_ids,
pending_deallocate_region_ids: vec![],
transition_map: transition_map.clone(),
original_target_routes: vec![],
}
}
Ordering::Equal => {
@@ -157,6 +162,7 @@ pub fn convert_allocation_plan_to_repartition_plan(
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![],
transition_map: transition_map.clone(),
original_target_routes: vec![],
}
}
Ordering::Greater => {
@@ -184,6 +190,7 @@ pub fn convert_allocation_plan_to_repartition_plan(
allocated_region_ids: vec![],
pending_deallocate_region_ids,
transition_map: transition_map.clone(),
original_target_routes: vec![],
}
}
}

View File

@@ -22,6 +22,8 @@ use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::{self, Result};
use crate::procedure::repartition::group::GroupId;
use crate::procedure::repartition::plan::RegionDescriptor;
/// Returns the `datanode_table_value`
///
@@ -118,14 +120,79 @@ pub fn merge_and_validate_region_wal_options(
Ok(new_region_wal_options)
}
/// Restores group staging metadata in-place for parent repartition rollback.
///
/// This helper lives in repartition utilities instead of the group subprocedure
/// because parent repartition owns crash recovery and rollback selection.
///
/// The function mutates `region_routes` in place to avoid rebuilding the route
/// vector for each selected plan. It restores:
/// - source-region leader staging flags,
/// - merge-source `ignore_all_writes` markers for pending-deallocate sources,
/// - target-region partition expressions,
/// - target-region write-route policies,
/// - target-region leader staging flags.
///
/// `original_target_routes` contains only pre-existing target routes.
/// Newly allocated targets are removed by parent rollback instead of being
/// restored here.
pub fn rollback_group_metadata_routes(
group_id: GroupId,
source_regions: &[RegionDescriptor],
original_target_routes: &[RegionRoute],
allocated_region_ids: &[RegionId],
pending_deallocate_region_ids: &[RegionId],
region_routes_map: &mut HashMap<RegionId, &mut RegionRoute>,
) -> Result<()> {
for source in source_regions {
let region_route = region_routes_map.get_mut(&source.region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
},
)?;
region_route.clear_leader_staging();
if pending_deallocate_region_ids.contains(&source.region_id) {
region_route.clear_ignore_all_writes();
}
}
for target in original_target_routes {
let Some(region_route) = region_routes_map.get_mut(&target.region.id) else {
// Ignores newly allocated region routes that do not exist in the current region routes.
// They may have already been deleted (to ensure idempotency).
if allocated_region_ids.contains(&target.region.id) {
continue;
}
return error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region.id,
}
.fail();
};
region_route.region.partition_expr = target.region.partition_expr.clone();
region_route.write_route_policy = target.write_route_policy;
region_route.clear_leader_staging();
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_wal::options::{KafkaWalOptions, WalOptions};
use store_api::storage::RegionId;
use uuid::Uuid;
use super::*;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::test_util::range_expr;
/// Helper function to create a Kafka WAL option string from a topic name.
fn kafka_wal_option(topic: &str) -> String {
@@ -149,6 +216,45 @@ mod tests {
}
}
fn new_staged_region_route(
region_id: RegionId,
partition_expr: &str,
leader_state: Option<LeaderState>,
ignore_all_writes: bool,
) -> RegionRoute {
let mut route = RegionRoute {
region: Region {
id: region_id,
partition_expr: partition_expr.to_string(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state,
..Default::default()
};
if ignore_all_writes {
route.set_ignore_all_writes();
}
route
}
fn original_target_routes(
region_routes: &[RegionRoute],
targets: &[RegionDescriptor],
) -> Vec<RegionRoute> {
let target_ids = targets
.iter()
.map(|target| target.region_id)
.collect::<HashSet<_>>();
region_routes
.iter()
.filter(|route| target_ids.contains(&route.region.id))
.cloned()
.collect()
}
#[test]
fn test_merge_and_validate_region_wal_options_success() {
let table_id = 1;
@@ -254,4 +360,141 @@ mod tests {
assert!(error_msg.contains("Mismatch"));
assert!(error_msg.contains(&table_id.to_string()));
}
#[test]
fn test_rollback_group_metadata_routes_split_case() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let original_region_routes = vec![
new_staged_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
None,
false,
),
new_staged_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
None,
false,
),
new_staged_region_route(RegionId::new(table_id, 3), "", None, false),
];
let sources = vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
}];
let targets = vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 50),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 50, 100),
},
];
let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&sources,
&targets,
&[],
&original_region_routes,
)
.unwrap();
let target_routes = original_target_routes(&original_region_routes, &targets);
rollback_group_metadata_routes(
group_id,
&sources,
&target_routes,
&[],
&[],
&mut applied_region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect(),
)
.unwrap();
assert_eq!(applied_region_routes, original_region_routes);
}
#[test]
fn test_rollback_group_metadata_routes_merge_case_is_idempotent() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let original_region_routes = vec![
new_staged_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
None,
false,
),
new_staged_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
None,
false,
),
new_staged_region_route(
RegionId::new(table_id, 3),
&range_expr("x", 200, 300).as_json_str().unwrap(),
None,
false,
),
];
let sources = vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
},
];
let targets = vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 200),
}];
let target_routes = original_target_routes(&original_region_routes, &targets);
let mut once = UpdateMetadata::apply_staging_region_routes(
group_id,
&sources,
&targets,
&[RegionId::new(table_id, 2)],
&original_region_routes,
)
.unwrap();
rollback_group_metadata_routes(
group_id,
&sources,
&target_routes,
&[],
&[RegionId::new(table_id, 2)],
&mut once
.iter_mut()
.map(|route| (route.region.id, route))
.collect(),
)
.unwrap();
let mut twice = once.clone();
rollback_group_metadata_routes(
group_id,
&sources,
&target_routes,
&[],
&[RegionId::new(table_id, 2)],
&mut twice
.iter_mut()
.map(|route| (route.region.id, route))
.collect(),
)
.unwrap();
assert_eq!(once, original_region_routes);
assert_eq!(once, twice);
}
}