From 279908984d47d5bf3734f044523e1c35dde2295e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 13 Jan 2026 12:10:45 +0800 Subject: [PATCH] fix: fix topic region mapping sync and handle `region_not_found` in migration (#7552) * fix(meta): update topic region mapping during table route updates Fix a bug in `build_create_txn` where the parameter order was incorrect (`(topic, region_id)` -> `(region_id, topic)`), and add support for updating topic region mappings during repartition operations. - Add `build_update_txn` method to handle topic region mapping updates - Integrate topic region update into `update_table_route` transaction - Add WAL options merging and validation logic for repartition - Update allocate/deallocate procedures to pass WAL options - Add comprehensive tests for all scenarios This ensures topic region mappings stay in sync with table routes during repartition, preventing data inconsistencies. Signed-off-by: WenyXu * feat(meta): handle region_not_found in region migration Add support for detecting and handling regions that exist in migration tasks but are no longer present in table routes (e.g., removed after repartition). This prevents unnecessary retries and cleans up related resources. Changes: - Add `region_not_found` field to `SubmitRegionMigrationTaskResult` and `RegionMigrationAnalysis` structs - Update `analyze_region_migration_task` to detect regions missing from current table routes - Deregister failure detectors for `region_not_found` regions in supervisor - Change `table_regions()` return type from `HashMap>` to `HashMap>` for better performance - Add test cases for `region_not_found` handling This fixes the issue where migration tasks would continue retrying on regions that have been removed after repartition operations. Signed-off-by: WenyXu * fix: fix clippy Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/key.rs | 225 ++++++++- src/common/meta/src/key/topic_region.rs | 467 +++++++++++++++++- .../src/procedure/region_migration/manager.rs | 6 + .../src/procedure/region_migration/utils.rs | 25 +- src/meta-srv/src/procedure/repartition.rs | 16 +- .../procedure/repartition/allocate_region.rs | 2 +- .../repartition/deallocate_region.rs | 2 +- .../src/procedure/repartition/utils.rs | 211 +++++++- src/meta-srv/src/region/supervisor.rs | 40 ++ 9 files changed, 981 insertions(+), 13 deletions(-) diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index e3bc73aca0..4e089cd0d4 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -1280,6 +1280,11 @@ impl TableMetadataManager { region_distribution(current_table_route_value.region_routes()?); let new_region_distribution = region_distribution(&new_region_routes); + let update_topic_region_txn = self.topic_region_manager.build_update_txn( + table_id, + ®ion_info.region_wal_options, + new_region_wal_options, + )?; let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( table_id, region_info, @@ -1291,13 +1296,16 @@ impl TableMetadataManager { // Updates the table_route. let new_table_route_value = current_table_route_value.update(new_region_routes)?; - let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() .table_route_storage() .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?; - let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]); + let txn = Txn::merge_all(vec![ + update_datanode_table_txn, + update_table_route_txn, + update_topic_region_txn, + ]); let mut r = self.kv_backend.txn(txn).await?; @@ -1482,6 +1490,7 @@ mod tests { use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; + use crate::key::topic_region::TopicRegionKey; use crate::key::{ DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue, @@ -2262,6 +2271,218 @@ mod tests { ); } + #[tokio::test] + async fn test_update_table_route_with_topic_region_mapping() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let region_route = new_test_region_route(); + let region_routes = vec![region_route.clone()]; + let table_info: RawTableInfo = new_test_table_info().into(); + let table_id = table_info.ident.table_id; + let engine = table_info.meta.engine.as_str(); + let region_storage_path = + region_storage_path(&table_info.catalog_name, &table_info.schema_name); + + // Create initial metadata with Kafka WAL options + let old_region_wal_options: HashMap = vec![ + ( + 1, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + })) + .unwrap(), + ), + ( + 2, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_2".to_string(), + })) + .unwrap(), + ), + ] + .into_iter() + .collect(); + + create_physical_table_metadata( + &table_metadata_manager, + table_info.clone(), + region_routes.clone(), + old_region_wal_options.clone(), + ) + .await + .unwrap(); + + let current_table_route_value = DeserializedValueWithBytes::from_inner( + TableRouteValue::physical(region_routes.clone()), + ); + + // Verify initial topic region mappings exist + let region_id_1 = RegionId::new(table_id, 1); + let region_id_2 = RegionId::new(table_id, 2); + let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1"); + let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_1_key.clone()) + .await + .unwrap() + .is_some() + ); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_2_key.clone()) + .await + .unwrap() + .is_some() + ); + + // Test 1: Add new region with new topic + let new_region_routes = vec![ + new_region_route(1, 1), + new_region_route(2, 2), + new_region_route(3, 3), // New region + ]; + let new_region_wal_options: HashMap = vec![ + ( + 1, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), // Unchanged + })) + .unwrap(), + ), + ( + 2, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_2".to_string(), // Unchanged + })) + .unwrap(), + ), + ( + 3, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_3".to_string(), // New topic + })) + .unwrap(), + ), + ] + .into_iter() + .collect(); + let current_table_route_value_updated = DeserializedValueWithBytes::from_inner( + current_table_route_value + .inner + .update(new_region_routes.clone()) + .unwrap(), + ); + table_metadata_manager + .update_table_route( + table_id, + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.clone(), + region_options: HashMap::new(), + region_wal_options: old_region_wal_options.clone(), + }, + ¤t_table_route_value, + new_region_routes.clone(), + &HashMap::new(), + &new_region_wal_options, + ) + .await + .unwrap(); + // Verify new topic region mapping was created + let region_id_3 = RegionId::new(table_id, 3); + let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_3_key) + .await + .unwrap() + .is_some() + ); + // Test 2: Remove a region and change topic for another + let newer_region_routes = vec![ + new_region_route(1, 1), + // Region 2 removed + // Region 3 now has different topic + ]; + let newer_region_wal_options: HashMap = vec![ + ( + 1, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), // Unchanged + })) + .unwrap(), + ), + ( + 3, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_3_new".to_string(), // Changed topic + })) + .unwrap(), + ), + ] + .into_iter() + .collect(); + table_metadata_manager + .update_table_route( + table_id, + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.clone(), + region_options: HashMap::new(), + region_wal_options: new_region_wal_options.clone(), + }, + ¤t_table_route_value_updated, + newer_region_routes.clone(), + &HashMap::new(), + &newer_region_wal_options, + ) + .await + .unwrap(); + // Verify region 2 mapping was deleted + let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_2_key_new) + .await + .unwrap() + .is_none() + ); + // Verify region 3 old topic mapping was deleted + let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_3_key_old) + .await + .unwrap() + .is_none() + ); + // Verify region 3 new topic mapping was created + let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_3_key_new) + .await + .unwrap() + .is_some() + ); + // Verify region 1 mapping still exists (unchanged) + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_1_key) + .await + .unwrap() + .is_some() + ); + } + #[tokio::test] async fn test_destroy_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/common/meta/src/key/topic_region.rs b/src/common/meta/src/key/topic_region.rs index c34229cf9e..9feac10804 100644 --- a/src/common/meta/src/key/topic_region.rs +++ b/src/common/meta/src/key/topic_region.rs @@ -243,7 +243,7 @@ impl TopicRegionManager { let topic_region_mapping = self.get_topic_region_mapping(table_id, ®ion_wal_options); let topic_region_keys = topic_region_mapping .iter() - .map(|(topic, region_id)| TopicRegionKey::new(*topic, region_id)) + .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic)) .collect::>(); let operations = topic_region_keys .into_iter() @@ -252,6 +252,55 @@ impl TopicRegionManager { Ok(Txn::new().and_then(operations)) } + /// Build a update topic region mapping transaction. + pub fn build_update_txn( + &self, + table_id: TableId, + old_region_wal_options: &HashMap, + new_region_wal_options: &HashMap, + ) -> Result { + let old_wal_options_parsed = parse_region_wal_options(old_region_wal_options)?; + let new_wal_options_parsed = parse_region_wal_options(new_region_wal_options)?; + let old_mapping = self.get_topic_region_mapping(table_id, &old_wal_options_parsed); + let new_mapping = self.get_topic_region_mapping(table_id, &new_wal_options_parsed); + + // Convert to HashMap for easier lookup: RegionId -> Topic + let old_map: HashMap = old_mapping.into_iter().collect(); + let new_map: HashMap = new_mapping.into_iter().collect(); + let mut ops = Vec::new(); + + // Check for deletes (in old but not in new, or topic changed) + for (region_id, old_topic) in &old_map { + match new_map.get(region_id) { + Some(new_topic) if *new_topic == *old_topic => { + // Same topic, do nothing (preserve checkpoint) + } + _ => { + // Removed or topic changed -> Delete old + let key = TopicRegionKey::new(*region_id, old_topic); + ops.push(TxnOp::Delete(key.to_bytes())); + } + } + } + + // Check for adds (in new but not in old, or topic changed) + for (region_id, new_topic) in &new_map { + match old_map.get(region_id) { + Some(old_topic) if *old_topic == *new_topic => { + // Same topic, already handled (do nothing) + } + _ => { + // New or topic changed -> Put new + let key = TopicRegionKey::new(*region_id, new_topic); + // Initialize with empty value (default TopicRegionValue) + ops.push(TxnOp::Put(key.to_bytes(), vec![])); + } + } + } + + Ok(Txn::new().and_then(ops)) + } + /// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`]. pub async fn regions(&self, topic: &str) -> Result> { let prefix = TopicRegionKey::range_topic_key(topic); @@ -431,4 +480,420 @@ mod tests { RegionId::from_u64(4410931412992) ); } + + #[test] + fn test_build_create_txn() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + (2, WalOptions::RaftEngine), // Should be ignored + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + + let txn = manager + .build_create_txn(table_id, ®ion_wal_options) + .unwrap(); + + // Verify the transaction contains correct operations + // Should create mappings for region 0 and 1, but not region 2 (RaftEngine) + let ops = txn.req().success.clone(); + assert_eq!(ops.len(), 2); + + let keys: Vec<_> = ops + .iter() + .filter_map(|op| { + if let TxnOp::Put(key, _) = op { + TopicRegionKey::from_bytes(key).ok() + } else { + None + } + }) + .collect(); + + assert_eq!(keys.len(), 2); + let region_ids: Vec<_> = keys.iter().map(|k| k.region_id).collect(); + assert!(region_ids.contains(&RegionId::new(table_id, 0))); + assert!(region_ids.contains(&RegionId::new(table_id, 1))); + assert!(!region_ids.contains(&RegionId::new(table_id, 2))); + + // Verify topics are correct + for key in keys { + match key.region_id.region_number() { + 0 => assert_eq!(key.topic, "topic_0"), + 1 => assert_eq!(key.topic, "topic_1"), + _ => panic!("Unexpected region number"), + } + } + } + + #[test] + fn test_build_update_txn_add_new_region() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + )] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + let ops = txn.req().success.clone(); + // Should only have Put for new region 1 (region 0 unchanged) + assert_eq!(ops.len(), 1); + if let TxnOp::Put(key, _) = &ops[0] { + let topic_key = TopicRegionKey::from_bytes(key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 1)); + assert_eq!(topic_key.topic, "topic_1"); + } else { + panic!("Expected Put operation"); + } + } + + #[test] + fn test_build_update_txn_remove_region() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + )] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + let ops = txn.req().success.clone(); + // Should only have Delete for removed region 1 (region 0 unchanged) + assert_eq!(ops.len(), 1); + match &ops[0] { + TxnOp::Delete(key) => { + let topic_key = TopicRegionKey::from_bytes(key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 1)); + assert_eq!(topic_key.topic, "topic_1"); + } + TxnOp::Put(_, _) | TxnOp::Get(_) => { + panic!("Expected Delete operation"); + } + } + } + + #[test] + fn test_build_update_txn_change_topic() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + )] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0_new".to_string(), + }), + )] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + let ops = txn.req().success.clone(); + // Should have Delete for old topic and Put for new topic + assert_eq!(ops.len(), 2); + + let mut delete_found = false; + let mut put_found = false; + for op in ops { + match op { + TxnOp::Delete(key) => { + let topic_key = TopicRegionKey::from_bytes(&key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 0)); + assert_eq!(topic_key.topic, "topic_0"); + delete_found = true; + } + TxnOp::Put(key, _) => { + let topic_key = TopicRegionKey::from_bytes(&key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 0)); + assert_eq!(topic_key.topic, "topic_0_new"); + put_found = true; + } + TxnOp::Get(_) => { + // Get operations shouldn't appear in this context + panic!("Unexpected Get operation in update transaction"); + } + } + } + assert!(delete_found, "Expected Delete operation for old topic"); + assert!(put_found, "Expected Put operation for new topic"); + } + + #[test] + fn test_build_update_txn_no_change() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, ®ion_wal_options, ®ion_wal_options) + .unwrap(); + // Should have no operations when nothing changes (preserves checkpoint) + let ops = txn.req().success.clone(); + assert_eq!(ops.len(), 0); + } + + #[test] + fn test_build_update_txn_mixed_scenarios() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + ( + 2, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_2".to_string(), + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), // Unchanged + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1_new".to_string(), // Topic changed + }), + ), + // Region 2 removed + ( + 3, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_3".to_string(), // New region + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + + let ops = txn.req().success.clone(); + // Should have: + // - Delete for region 2 (removed) + // - Delete for region 1 old topic (topic changed) + // - Put for region 1 new topic (topic changed) + // - Put for region 3 (new) + // Region 0 unchanged, so no operation + assert_eq!(ops.len(), 4); + + let mut delete_ops = 0; + let mut put_ops = 0; + let mut delete_region_2 = false; + let mut delete_region_1_old = false; + let mut put_region_1_new = false; + let mut put_region_3 = false; + + for op in ops { + match op { + TxnOp::Delete(key) => { + delete_ops += 1; + let topic_key = TopicRegionKey::from_bytes(&key).unwrap(); + match topic_key.region_id.region_number() { + 1 => { + assert_eq!(topic_key.topic, "topic_1"); + delete_region_1_old = true; + } + 2 => { + assert_eq!(topic_key.topic, "topic_2"); + delete_region_2 = true; + } + _ => panic!("Unexpected delete operation for region"), + } + } + TxnOp::Put(key, _) => { + put_ops += 1; + let topic_key: TopicRegionKey<'_> = TopicRegionKey::from_bytes(&key).unwrap(); + match topic_key.region_id.region_number() { + 1 => { + assert_eq!(topic_key.topic, "topic_1_new"); + put_region_1_new = true; + } + 3 => { + assert_eq!(topic_key.topic, "topic_3"); + put_region_3 = true; + } + _ => panic!("Unexpected put operation for region"), + } + } + TxnOp::Get(_) => { + panic!("Unexpected Get operation in update transaction"); + } + } + } + + assert_eq!(delete_ops, 2); + assert_eq!(put_ops, 2); + assert!(delete_region_2, "Expected delete for removed region 2"); + assert!( + delete_region_1_old, + "Expected delete for region 1 old topic" + ); + assert!(put_region_1_new, "Expected put for region 1 new topic"); + assert!(put_region_3, "Expected put for new region 3"); + } + + #[test] + fn test_build_update_txn_with_raft_engine() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + (1, WalOptions::RaftEngine), // Should be ignored + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), // Changed from RaftEngine to Kafka + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + let ops = txn.req().success.clone(); + // Should only have Put for region 1 (new Kafka topic) + // Region 0 unchanged, so no operation + // Region 1 was RaftEngine before (not tracked), so only Put needed + assert_eq!(ops.len(), 1); + match &ops[0] { + TxnOp::Put(key, _) => { + let topic_key = TopicRegionKey::from_bytes(key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 1)); + assert_eq!(topic_key.topic, "topic_1"); + } + TxnOp::Delete(_) | TxnOp::Get(_) => { + panic!("Expected Put operation for new Kafka region"); + } + } + } } diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index fbec7ca14e..25a3d986ec 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -166,6 +166,8 @@ pub struct SubmitRegionMigrationTaskResult { pub peer_conflict: Vec, /// Regions whose table is not found. pub table_not_found: Vec, + /// Regions whose table exists but region route is not found (e.g., removed after repartition). + pub region_not_found: Vec, /// Regions still pending migration. pub migrating: Vec, /// Regions that have been submitted for migration. @@ -396,6 +398,7 @@ impl RegionMigrationManager { leader_changed, peer_conflict, mut table_not_found, + region_not_found, pending, } = analyze_region_migration_task(&task, &self.context_factory.table_metadata_manager) .await?; @@ -405,6 +408,7 @@ impl RegionMigrationManager { leader_changed, peer_conflict, table_not_found, + region_not_found, migrating: migrating_region_ids, submitted: vec![], procedure_id: None, @@ -445,6 +449,7 @@ impl RegionMigrationManager { leader_changed, peer_conflict, table_not_found, + region_not_found, migrating: migrating_region_ids, submitted: vec![], procedure_id: None, @@ -460,6 +465,7 @@ impl RegionMigrationManager { leader_changed, peer_conflict, table_not_found, + region_not_found, migrating: migrating_region_ids, submitted: submitting_region_ids, procedure_id: Some(procedure_id), diff --git a/src/meta-srv/src/procedure/region_migration/utils.rs b/src/meta-srv/src/procedure/region_migration/utils.rs index e6243ec5b0..a7c9e93871 100644 --- a/src/meta-srv/src/procedure/region_migration/utils.rs +++ b/src/meta-srv/src/procedure/region_migration/utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::time::Duration; @@ -82,13 +82,13 @@ impl RegionMigrationTaskBatch { /// Returns the table regions map. /// /// The key is the table id, the value is the region ids of the table. - pub(crate) fn table_regions(&self) -> HashMap> { + pub(crate) fn table_regions(&self) -> HashMap> { let mut table_regions = HashMap::new(); for region_id in &self.region_ids { table_regions .entry(region_id.table_id()) - .or_insert_with(Vec::new) - .push(*region_id); + .or_insert_with(HashSet::new) + .insert(*region_id); } table_regions } @@ -105,6 +105,8 @@ pub(crate) struct RegionMigrationAnalysis { pub(crate) peer_conflict: Vec, /// Regions whose table is not found. pub(crate) table_not_found: Vec, + /// Regions whose table exists but region route is not found (e.g., removed after repartition). + pub(crate) region_not_found: Vec, /// Regions still pending migration. pub(crate) pending: Vec, } @@ -209,6 +211,12 @@ pub async fn analyze_region_migration_task( err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."), } })?; + + let existing_region_ids = region_routes + .iter() + .map(|r| r.region.id) + .collect::>(); + for region_route in region_routes .iter() .filter(|r| region_ids.contains(&r.region.id)) @@ -220,6 +228,12 @@ pub async fn analyze_region_migration_task( task.to_peer.id, )?; } + + for region_id in region_ids { + if !existing_region_ids.contains(region_id) { + result.region_not_found.push(*region_id); + } + } } Ok(result) @@ -460,6 +474,8 @@ mod tests { RegionId::new(1024, 2), RegionId::new(1024, 3), RegionId::new(1024, 4), + // Region of existing table but route removed (e.g., after repartition). + RegionId::new(1024, 5), RegionId::new(1025, 1), ], from_peer: Peer::empty(1), @@ -477,6 +493,7 @@ mod tests { migrated: vec![RegionId::new(1024, 1)], leader_changed: vec![RegionId::new(1024, 2)], peer_conflict: vec![RegionId::new(1024, 3)], + region_not_found: vec![RegionId::new(1024, 5)], table_not_found: vec![RegionId::new(1025, 1)], } ); diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index c91270ecfa..f734677803 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -23,6 +23,7 @@ pub mod repartition_start; pub mod utils; use std::any::Any; +use std::collections::HashMap; use std::fmt::Debug; use common_error::ext::BoxedError; @@ -50,7 +51,7 @@ use common_telemetry::error; use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{RegionNumber, TableId}; use table::table_name::TableName; use crate::error::{self, Result}; @@ -198,6 +199,7 @@ impl Context { &self, current_table_route_value: &DeserializedValueWithBytes, new_region_routes: Vec, + new_region_wal_options: HashMap, ) -> Result<()> { let table_id = self.persistent_ctx.table_id; if new_region_routes.is_empty() { @@ -221,6 +223,16 @@ impl Context { region_wal_options, .. } = &datanode_table_value.region_info; + + // Merge and validate the new region wal options. + let validated_region_wal_options = + crate::procedure::repartition::utils::merge_and_validate_region_wal_options( + region_wal_options, + new_region_wal_options, + &new_region_routes, + table_id, + )?; + self.table_metadata_manager .update_table_route( table_id, @@ -228,7 +240,7 @@ impl Context { current_table_route_value, new_region_routes, region_options, - region_wal_options, + &validated_region_wal_options, ) .await .context(error::TableMetadataManagerSnafu) diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index fbf565b946..ceb50ba732 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -119,7 +119,7 @@ impl State for AllocateRegion { let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; let new_region_routes = Self::generate_region_routes(region_routes, &new_allocated_region_routes); - ctx.update_table_route(&table_route_value, new_region_routes) + ctx.update_table_route(&table_route_value, new_region_routes, wal_options) .await?; ctx.invalidate_table_cache().await?; diff --git a/src/meta-srv/src/procedure/repartition/deallocate_region.rs b/src/meta-srv/src/procedure/repartition/deallocate_region.rs index c369e9d5d1..48bfc31936 100644 --- a/src/meta-srv/src/procedure/repartition/deallocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/deallocate_region.rs @@ -98,7 +98,7 @@ impl State for DeallocateRegion { let region_routes = table_route_value.region_routes().unwrap(); let new_region_routes = Self::generate_region_routes(region_routes, &pending_deallocate_region_ids); - ctx.update_table_route(&table_route_value, new_region_routes) + ctx.update_table_route(&table_route_value, new_region_routes, HashMap::new()) .await?; ctx.invalidate_table_cache().await?; diff --git a/src/meta-srv/src/procedure/repartition/utils.rs b/src/meta-srv/src/procedure/repartition/utils.rs index 161c0feedd..ddeb974eef 100644 --- a/src/meta-srv/src/procedure/repartition/utils.rs +++ b/src/meta-srv/src/procedure/repartition/utils.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{HashMap, HashSet}; + use common_error::ext::BoxedError; use common_meta::key::TableMetadataManagerRef; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use common_meta::rpc::router::RegionRoute; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::error::{self, Result}; @@ -47,3 +50,207 @@ pub async fn get_datanode_table_value( })?; Ok(datanode_table_value) } + +/// Merges and validates region WAL options for repartition. +/// +/// This function: +/// 1. Validates that new WAL options don't overwrite existing ones +/// 2. Merges existing `region_wal_options` with new `new_region_wal_options` +/// 3. Filters out WAL options for regions that are not in `new_region_routes` +/// 4. Validates that every region in `new_region_routes` has a corresponding WAL option +/// +/// # Arguments +/// * `region_wal_options` - Existing region WAL options from datanode table +/// * `new_region_wal_options` - New region WAL options to merge (should only contain newly allocated regions) +/// * `new_region_routes` - The new region routes after repartition +/// * `table_id` - Table ID for error reporting +/// +/// # Returns +/// Returns the merged and filtered WAL options, ensuring all regions have options. +/// +/// # Errors +/// Returns an error if: +/// - New WAL options try to overwrite existing ones for the same region +/// - Any region in `new_region_routes` is missing a WAL option +pub fn merge_and_validate_region_wal_options( + region_wal_options: &HashMap, + mut new_region_wal_options: HashMap, + new_region_routes: &[RegionRoute], + table_id: TableId, +) -> Result> { + // Doesn't allow overwriting existing WAL options. + for (region_number, _) in new_region_wal_options.iter() { + if region_wal_options.contains_key(region_number) { + return error::UnexpectedSnafu { + violated: format!( + "Overwriting existing WAL option for region: {}", + RegionId::new(table_id, *region_number) + ), + } + .fail(); + } + } + + new_region_wal_options.extend(region_wal_options.clone()); + + // Extract region numbers from new routes + let region_numbers: HashSet = new_region_routes + .iter() + .map(|r| r.region.id.region_number()) + .collect(); + + // Filter out WAL options for regions that are not in new_region_routes + new_region_wal_options.retain(|k, _| region_numbers.contains(k)); + + // Validate that every region has a WAL option + ensure!( + region_numbers.len() == new_region_wal_options.len(), + error::UnexpectedSnafu { + violated: format!( + "Mismatch between number of region_numbers ({}) and new_region_wal_options ({}) for table: {}", + region_numbers.len(), + new_region_wal_options.len(), + table_id + ), + } + ); + + Ok(new_region_wal_options) +} + +#[cfg(test)] +mod tests { + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use common_wal::options::{KafkaWalOptions, WalOptions}; + use store_api::storage::RegionId; + + use super::*; + + /// Helper function to create a Kafka WAL option string from a topic name. + fn kafka_wal_option(topic: &str) -> String { + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + })) + .unwrap() + } + + fn new_region_route(region_id: u64, datanode_id: u64) -> RegionRoute { + RegionRoute { + region: Region { + id: RegionId::from_u64(region_id), + ..Default::default() + }, + leader_peer: Some(Peer::empty(datanode_id)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + } + } + + #[test] + fn test_merge_and_validate_region_wal_options_success() { + let table_id = 1; + let existing_wal_options: HashMap = vec![ + (1, kafka_wal_option("topic_1")), + (2, kafka_wal_option("topic_2")), + ] + .into_iter() + .collect(); + let new_wal_options: HashMap = + vec![(3, kafka_wal_option("topic_3"))].into_iter().collect(); + let new_region_routes = vec![ + new_region_route(1, 1), + new_region_route(2, 2), + new_region_route(3, 3), + ]; + let result = merge_and_validate_region_wal_options( + &existing_wal_options, + new_wal_options, + &new_region_routes, + table_id, + ) + .unwrap(); + + // Should have all three regions + assert_eq!(result.len(), 3); + assert!(result.contains_key(&1)); + assert!(result.contains_key(&2)); + assert!(result.contains_key(&3)); + // Existing options should be preserved + assert_eq!(result.get(&1).unwrap(), &kafka_wal_option("topic_1")); + assert_eq!(result.get(&2).unwrap(), &kafka_wal_option("topic_2")); + // New option should be present + assert_eq!(result.get(&3).unwrap(), &kafka_wal_option("topic_3")); + } + + #[test] + fn test_merge_and_validate_region_wal_options_new_overrides_existing() { + let table_id = 1; + let existing_wal_options: HashMap = + vec![(1, kafka_wal_option("topic_1_old"))] + .into_iter() + .collect(); + let new_wal_options: HashMap = + vec![(1, kafka_wal_option("topic_1_new"))] + .into_iter() + .collect(); + let new_region_routes = vec![new_region_route(1, 1)]; + merge_and_validate_region_wal_options( + &existing_wal_options, + new_wal_options, + &new_region_routes, + table_id, + ) + .unwrap_err(); + } + + #[test] + fn test_merge_and_validate_region_wal_options_filters_removed_regions() { + let table_id = 1; + let existing_wal_options: HashMap = vec![ + (1, kafka_wal_option("topic_1")), + (2, kafka_wal_option("topic_2")), + (3, kafka_wal_option("topic_3")), + ] + .into_iter() + .collect(); + let new_wal_options = HashMap::new(); + // Only regions 1 and 2 are in new routes (region 3 removed) + let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)]; + let result = merge_and_validate_region_wal_options( + &existing_wal_options, + new_wal_options, + &new_region_routes, + table_id, + ) + .unwrap(); + + // Should only have regions 1 and 2 (region 3 filtered out) + assert_eq!(result.len(), 2); + assert!(result.contains_key(&1)); + assert!(result.contains_key(&2)); + assert!(!result.contains_key(&3)); + } + + #[test] + fn test_merge_and_validate_region_wal_options_missing_option() { + let table_id = 1; + let existing_wal_options: HashMap = + vec![(1, kafka_wal_option("topic_1"))].into_iter().collect(); + let new_wal_options = HashMap::new(); + // Region 2 is in routes but has no WAL option + let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)]; + let result = merge_and_validate_region_wal_options( + &existing_wal_options, + new_wal_options, + &new_region_routes, + table_id, + ); + // Should fail validation + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("Mismatch")); + assert!(error_msg.contains(&table_id.to_string())); + } +} diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index f35d0b898f..90e08992f6 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -744,6 +744,18 @@ impl RegionSupervisor { result.migrating ); } + if !result.region_not_found.is_empty() { + let detecting_regions = result + .region_not_found + .iter() + .map(|region_id| (from_peer_id, *region_id)) + .collect::>(); + self.deregister_failure_detectors(detecting_regions).await; + info!( + "Region route not found, removed failover detectors for regions: {:?}", + result.region_not_found + ); + } if !result.table_not_found.is_empty() { let detecting_regions = result .table_not_found @@ -1222,6 +1234,34 @@ pub(crate) mod tests { assert!(supervisor.failover_counts.is_empty()); } + #[tokio::test] + async fn test_handle_submit_region_migration_task_result_region_not_found() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, _) = new_test_supervisor(); + let region_id = RegionId::new(1, 1); + let detecting_region = (1, region_id); + supervisor + .register_failure_detectors(vec![detecting_region]) + .await; + supervisor.failover_counts.insert(detecting_region, 1); + let result = SubmitRegionMigrationTaskResult { + region_not_found: vec![region_id], + ..Default::default() + }; + supervisor + .handle_submit_region_migration_task_result( + 1, + 2, + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + result, + ) + .await + .unwrap(); + assert!(!supervisor.failure_detector.contains(&detecting_region)); + assert!(supervisor.failover_counts.is_empty()); + } + #[tokio::test] async fn test_handle_submit_region_migration_task_result_leader_changed() { common_telemetry::init_default_ut_logging();