diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index c1f3ca2503..e38d6d3a95 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -148,9 +148,6 @@ impl ExecutePlan { let region_number_and_partition_exprs = AllocateRegion::prepare_region_allocation_data(&allocate_regions)?; let table_info_value = ctx.get_table_info_value().await?; - let table_route_value = ctx.get_table_route_value().await?; - // Safety: it is physical table route value. - let region_routes = table_route_value.region_routes().unwrap(); let new_allocated_region_routes = ctx .region_routes_allocator .allocate( @@ -205,6 +202,11 @@ impl ExecutePlan { // Updates the table routes. let table_lock = TableLock::Write(table_id).into(); let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; + // MUST refresh the table route value after acquiring the lock to avoid lost update. + // Otherwise, the new allocated regions might be overridden by concurrent repartition procedures. + let table_route_value = ctx.get_table_route_value().await?; + // Safety: it is physical table route value. + let region_routes = table_route_value.region_routes().unwrap(); let new_region_routes = AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes); ctx.update_table_route(&table_route_value, new_region_routes, wal_options) @@ -397,14 +399,26 @@ impl AllocateRegion { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use common_meta::key::TableMetadataManagerRef; + use common_meta::key::datanode_table::DatanodeTableKey; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; + use common_meta::test_util::MockDatanodeManager; + use common_procedure::{ContextProvider, ProcedureId, ProcedureState}; + use common_procedure_test::MockContextProvider; use store_api::storage::RegionId; + use tokio::sync::watch; use uuid::Uuid; use super::*; use crate::procedure::repartition::State; - use crate::procedure::repartition::test_util::range_expr; + use crate::procedure::repartition::test_util::{ + TestingEnv, current_parent_region_routes, new_parent_context, range_expr, + test_region_wal_options, + }; fn create_region_descriptor( table_id: TableId, @@ -461,6 +475,88 @@ mod tests { .collect() } + struct ConcurrentTableRouteUpdateProvider { + inner: MockContextProvider, + table_metadata_manager: TableMetadataManagerRef, + table_id: TableId, + concurrent_region_route: RegionRoute, + region_wal_options: HashMap, + } + + #[async_trait::async_trait] + impl ContextProvider for ConcurrentTableRouteUpdateProvider { + async fn procedure_state( + &self, + procedure_id: ProcedureId, + ) -> common_procedure::Result> { + self.inner.procedure_state(procedure_id).await + } + + async fn procedure_state_receiver( + &self, + procedure_id: ProcedureId, + ) -> common_procedure::Result>> { + self.inner.procedure_state_receiver(procedure_id).await + } + + async fn try_put_poison( + &self, + key: &common_procedure::PoisonKey, + procedure_id: ProcedureId, + ) -> common_procedure::Result<()> { + self.inner.try_put_poison(key, procedure_id).await + } + + async fn acquire_lock( + &self, + key: &common_procedure::StringKey, + ) -> common_procedure::local::DynamicKeyLockGuard { + let current_table_route_value = self + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get_with_raw_bytes(self.table_id) + .await + .unwrap() + .unwrap(); + let mut region_routes = current_table_route_value.region_routes().unwrap().clone(); + + if !region_routes + .iter() + .any(|route| route.region.id == self.concurrent_region_route.region.id) + { + region_routes.push(self.concurrent_region_route.clone()); + let datanode_id = current_table_route_value.region_routes().unwrap()[0] + .leader_peer + .as_ref() + .unwrap() + .id; + let datanode_table_value = self + .table_metadata_manager + .datanode_table_manager() + .get(&DatanodeTableKey::new(datanode_id, self.table_id)) + .await + .unwrap() + .unwrap(); + let region_options = &datanode_table_value.region_info.region_options; + + self.table_metadata_manager + .update_table_route( + self.table_id, + datanode_table_value.region_info.clone(), + ¤t_table_route_value, + region_routes, + region_options, + &self.region_wal_options, + ) + .await + .unwrap(); + } + + self.inner.acquire_lock(key).await + } + } + #[test] fn test_convert_to_repartition_plans_no_allocation() { let table_id = 1024; @@ -617,6 +713,62 @@ mod tests { assert!(!result[1].1.is_empty()); } + #[tokio::test] + async fn test_execute_plan_uses_latest_table_route_after_lock() { + let env = TestingEnv::new(); + let table_id = 1024; + let original_region_routes = create_current_region_routes(table_id, &[1]); + env.create_physical_table_metadata_for_repartition( + table_id, + original_region_routes, + test_region_wal_options(&[1]), + ) + .await; + + let node_manager = Arc::new(MockDatanodeManager::new(())); + let mut ctx = new_parent_context(&env, node_manager, table_id); + ctx.persistent_ctx.plans = vec![RepartitionPlanEntry { + group_id: Uuid::new_v4(), + source_regions: vec![], + target_regions: vec![create_region_descriptor(table_id, 3, "x", 0, 100)], + allocated_region_ids: vec![RegionId::new(table_id, 3)], + pending_deallocate_region_ids: vec![], + transition_map: vec![], + original_target_routes: vec![], + }]; + let concurrent_region_route = create_current_region_routes(table_id, &[2]) + .into_iter() + .next() + .unwrap(); + let procedure_ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(ConcurrentTableRouteUpdateProvider { + inner: MockContextProvider::default(), + table_metadata_manager: env.table_metadata_manager.clone(), + table_id, + concurrent_region_route, + region_wal_options: test_region_wal_options(&[1, 2]), + }), + }; + let mut state = ExecutePlan; + + state.next(&mut ctx, &procedure_ctx).await.unwrap(); + + let region_ids = current_parent_region_routes(&ctx) + .await + .into_iter() + .map(|route| route.region.id) + .collect::>(); + assert_eq!( + region_ids, + vec![ + RegionId::new(table_id, 1), + RegionId::new(table_id, 2), + RegionId::new(table_id, 3), + ] + ); + } + #[test] fn test_allocate_region_state_backward_compatibility() { // Arrange