diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index e3bc73aca0..4e089cd0d4 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -1280,6 +1280,11 @@ impl TableMetadataManager { region_distribution(current_table_route_value.region_routes()?); let new_region_distribution = region_distribution(&new_region_routes); + let update_topic_region_txn = self.topic_region_manager.build_update_txn( + table_id, + ®ion_info.region_wal_options, + new_region_wal_options, + )?; let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( table_id, region_info, @@ -1291,13 +1296,16 @@ impl TableMetadataManager { // Updates the table_route. let new_table_route_value = current_table_route_value.update(new_region_routes)?; - let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() .table_route_storage() .build_update_txn(table_id, current_table_route_value, &new_table_route_value)?; - let txn = Txn::merge_all(vec![update_datanode_table_txn, update_table_route_txn]); + let txn = Txn::merge_all(vec![ + update_datanode_table_txn, + update_table_route_txn, + update_topic_region_txn, + ]); let mut r = self.kv_backend.txn(txn).await?; @@ -1482,6 +1490,7 @@ mod tests { use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; + use crate::key::topic_region::TopicRegionKey; use crate::key::{ DeserializedValueWithBytes, RegionDistribution, RegionRoleSet, TOPIC_REGION_PREFIX, TableMetadataManager, ViewInfoValue, @@ -2262,6 +2271,218 @@ mod tests { ); } + #[tokio::test] + async fn test_update_table_route_with_topic_region_mapping() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let region_route = new_test_region_route(); + let region_routes = vec![region_route.clone()]; + let table_info: RawTableInfo = new_test_table_info().into(); + let table_id = table_info.ident.table_id; + let engine = table_info.meta.engine.as_str(); + let region_storage_path = + region_storage_path(&table_info.catalog_name, &table_info.schema_name); + + // Create initial metadata with Kafka WAL options + let old_region_wal_options: HashMap = vec![ + ( + 1, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + })) + .unwrap(), + ), + ( + 2, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_2".to_string(), + })) + .unwrap(), + ), + ] + .into_iter() + .collect(); + + create_physical_table_metadata( + &table_metadata_manager, + table_info.clone(), + region_routes.clone(), + old_region_wal_options.clone(), + ) + .await + .unwrap(); + + let current_table_route_value = DeserializedValueWithBytes::from_inner( + TableRouteValue::physical(region_routes.clone()), + ); + + // Verify initial topic region mappings exist + let region_id_1 = RegionId::new(table_id, 1); + let region_id_2 = RegionId::new(table_id, 2); + let topic_1_key = TopicRegionKey::new(region_id_1, "topic_1"); + let topic_2_key = TopicRegionKey::new(region_id_2, "topic_2"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_1_key.clone()) + .await + .unwrap() + .is_some() + ); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_2_key.clone()) + .await + .unwrap() + .is_some() + ); + + // Test 1: Add new region with new topic + let new_region_routes = vec![ + new_region_route(1, 1), + new_region_route(2, 2), + new_region_route(3, 3), // New region + ]; + let new_region_wal_options: HashMap = vec![ + ( + 1, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), // Unchanged + })) + .unwrap(), + ), + ( + 2, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_2".to_string(), // Unchanged + })) + .unwrap(), + ), + ( + 3, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_3".to_string(), // New topic + })) + .unwrap(), + ), + ] + .into_iter() + .collect(); + let current_table_route_value_updated = DeserializedValueWithBytes::from_inner( + current_table_route_value + .inner + .update(new_region_routes.clone()) + .unwrap(), + ); + table_metadata_manager + .update_table_route( + table_id, + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.clone(), + region_options: HashMap::new(), + region_wal_options: old_region_wal_options.clone(), + }, + ¤t_table_route_value, + new_region_routes.clone(), + &HashMap::new(), + &new_region_wal_options, + ) + .await + .unwrap(); + // Verify new topic region mapping was created + let region_id_3 = RegionId::new(table_id, 3); + let topic_3_key = TopicRegionKey::new(region_id_3, "topic_3"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_3_key) + .await + .unwrap() + .is_some() + ); + // Test 2: Remove a region and change topic for another + let newer_region_routes = vec![ + new_region_route(1, 1), + // Region 2 removed + // Region 3 now has different topic + ]; + let newer_region_wal_options: HashMap = vec![ + ( + 1, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), // Unchanged + })) + .unwrap(), + ), + ( + 3, + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: "topic_3_new".to_string(), // Changed topic + })) + .unwrap(), + ), + ] + .into_iter() + .collect(); + table_metadata_manager + .update_table_route( + table_id, + RegionInfo { + engine: engine.to_string(), + region_storage_path: region_storage_path.clone(), + region_options: HashMap::new(), + region_wal_options: new_region_wal_options.clone(), + }, + ¤t_table_route_value_updated, + newer_region_routes.clone(), + &HashMap::new(), + &newer_region_wal_options, + ) + .await + .unwrap(); + // Verify region 2 mapping was deleted + let topic_2_key_new = TopicRegionKey::new(region_id_2, "topic_2"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_2_key_new) + .await + .unwrap() + .is_none() + ); + // Verify region 3 old topic mapping was deleted + let topic_3_key_old = TopicRegionKey::new(region_id_3, "topic_3"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_3_key_old) + .await + .unwrap() + .is_none() + ); + // Verify region 3 new topic mapping was created + let topic_3_key_new = TopicRegionKey::new(region_id_3, "topic_3_new"); + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_3_key_new) + .await + .unwrap() + .is_some() + ); + // Verify region 1 mapping still exists (unchanged) + assert!( + table_metadata_manager + .topic_region_manager + .get(topic_1_key) + .await + .unwrap() + .is_some() + ); + } + #[tokio::test] async fn test_destroy_table_metadata() { let mem_kv = Arc::new(MemoryKvBackend::default()); diff --git a/src/common/meta/src/key/topic_region.rs b/src/common/meta/src/key/topic_region.rs index c34229cf9e..9feac10804 100644 --- a/src/common/meta/src/key/topic_region.rs +++ b/src/common/meta/src/key/topic_region.rs @@ -243,7 +243,7 @@ impl TopicRegionManager { let topic_region_mapping = self.get_topic_region_mapping(table_id, ®ion_wal_options); let topic_region_keys = topic_region_mapping .iter() - .map(|(topic, region_id)| TopicRegionKey::new(*topic, region_id)) + .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic)) .collect::>(); let operations = topic_region_keys .into_iter() @@ -252,6 +252,55 @@ impl TopicRegionManager { Ok(Txn::new().and_then(operations)) } + /// Build a update topic region mapping transaction. + pub fn build_update_txn( + &self, + table_id: TableId, + old_region_wal_options: &HashMap, + new_region_wal_options: &HashMap, + ) -> Result { + let old_wal_options_parsed = parse_region_wal_options(old_region_wal_options)?; + let new_wal_options_parsed = parse_region_wal_options(new_region_wal_options)?; + let old_mapping = self.get_topic_region_mapping(table_id, &old_wal_options_parsed); + let new_mapping = self.get_topic_region_mapping(table_id, &new_wal_options_parsed); + + // Convert to HashMap for easier lookup: RegionId -> Topic + let old_map: HashMap = old_mapping.into_iter().collect(); + let new_map: HashMap = new_mapping.into_iter().collect(); + let mut ops = Vec::new(); + + // Check for deletes (in old but not in new, or topic changed) + for (region_id, old_topic) in &old_map { + match new_map.get(region_id) { + Some(new_topic) if *new_topic == *old_topic => { + // Same topic, do nothing (preserve checkpoint) + } + _ => { + // Removed or topic changed -> Delete old + let key = TopicRegionKey::new(*region_id, old_topic); + ops.push(TxnOp::Delete(key.to_bytes())); + } + } + } + + // Check for adds (in new but not in old, or topic changed) + for (region_id, new_topic) in &new_map { + match old_map.get(region_id) { + Some(old_topic) if *old_topic == *new_topic => { + // Same topic, already handled (do nothing) + } + _ => { + // New or topic changed -> Put new + let key = TopicRegionKey::new(*region_id, new_topic); + // Initialize with empty value (default TopicRegionValue) + ops.push(TxnOp::Put(key.to_bytes(), vec![])); + } + } + } + + Ok(Txn::new().and_then(ops)) + } + /// Returns the map of [`RegionId`] to their corresponding topic [`TopicRegionValue`]. pub async fn regions(&self, topic: &str) -> Result> { let prefix = TopicRegionKey::range_topic_key(topic); @@ -431,4 +480,420 @@ mod tests { RegionId::from_u64(4410931412992) ); } + + #[test] + fn test_build_create_txn() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + (2, WalOptions::RaftEngine), // Should be ignored + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + + let txn = manager + .build_create_txn(table_id, ®ion_wal_options) + .unwrap(); + + // Verify the transaction contains correct operations + // Should create mappings for region 0 and 1, but not region 2 (RaftEngine) + let ops = txn.req().success.clone(); + assert_eq!(ops.len(), 2); + + let keys: Vec<_> = ops + .iter() + .filter_map(|op| { + if let TxnOp::Put(key, _) = op { + TopicRegionKey::from_bytes(key).ok() + } else { + None + } + }) + .collect(); + + assert_eq!(keys.len(), 2); + let region_ids: Vec<_> = keys.iter().map(|k| k.region_id).collect(); + assert!(region_ids.contains(&RegionId::new(table_id, 0))); + assert!(region_ids.contains(&RegionId::new(table_id, 1))); + assert!(!region_ids.contains(&RegionId::new(table_id, 2))); + + // Verify topics are correct + for key in keys { + match key.region_id.region_number() { + 0 => assert_eq!(key.topic, "topic_0"), + 1 => assert_eq!(key.topic, "topic_1"), + _ => panic!("Unexpected region number"), + } + } + } + + #[test] + fn test_build_update_txn_add_new_region() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + )] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + let ops = txn.req().success.clone(); + // Should only have Put for new region 1 (region 0 unchanged) + assert_eq!(ops.len(), 1); + if let TxnOp::Put(key, _) = &ops[0] { + let topic_key = TopicRegionKey::from_bytes(key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 1)); + assert_eq!(topic_key.topic, "topic_1"); + } else { + panic!("Expected Put operation"); + } + } + + #[test] + fn test_build_update_txn_remove_region() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + )] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + let ops = txn.req().success.clone(); + // Should only have Delete for removed region 1 (region 0 unchanged) + assert_eq!(ops.len(), 1); + match &ops[0] { + TxnOp::Delete(key) => { + let topic_key = TopicRegionKey::from_bytes(key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 1)); + assert_eq!(topic_key.topic, "topic_1"); + } + TxnOp::Put(_, _) | TxnOp::Get(_) => { + panic!("Expected Delete operation"); + } + } + } + + #[test] + fn test_build_update_txn_change_topic() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + )] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0_new".to_string(), + }), + )] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + let ops = txn.req().success.clone(); + // Should have Delete for old topic and Put for new topic + assert_eq!(ops.len(), 2); + + let mut delete_found = false; + let mut put_found = false; + for op in ops { + match op { + TxnOp::Delete(key) => { + let topic_key = TopicRegionKey::from_bytes(&key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 0)); + assert_eq!(topic_key.topic, "topic_0"); + delete_found = true; + } + TxnOp::Put(key, _) => { + let topic_key = TopicRegionKey::from_bytes(&key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 0)); + assert_eq!(topic_key.topic, "topic_0_new"); + put_found = true; + } + TxnOp::Get(_) => { + // Get operations shouldn't appear in this context + panic!("Unexpected Get operation in update transaction"); + } + } + } + assert!(delete_found, "Expected Delete operation for old topic"); + assert!(put_found, "Expected Put operation for new topic"); + } + + #[test] + fn test_build_update_txn_no_change() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, ®ion_wal_options, ®ion_wal_options) + .unwrap(); + // Should have no operations when nothing changes (preserves checkpoint) + let ops = txn.req().success.clone(); + assert_eq!(ops.len(), 0); + } + + #[test] + fn test_build_update_txn_mixed_scenarios() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), + }), + ), + ( + 2, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_2".to_string(), + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), // Unchanged + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1_new".to_string(), // Topic changed + }), + ), + // Region 2 removed + ( + 3, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_3".to_string(), // New region + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + + let ops = txn.req().success.clone(); + // Should have: + // - Delete for region 2 (removed) + // - Delete for region 1 old topic (topic changed) + // - Put for region 1 new topic (topic changed) + // - Put for region 3 (new) + // Region 0 unchanged, so no operation + assert_eq!(ops.len(), 4); + + let mut delete_ops = 0; + let mut put_ops = 0; + let mut delete_region_2 = false; + let mut delete_region_1_old = false; + let mut put_region_1_new = false; + let mut put_region_3 = false; + + for op in ops { + match op { + TxnOp::Delete(key) => { + delete_ops += 1; + let topic_key = TopicRegionKey::from_bytes(&key).unwrap(); + match topic_key.region_id.region_number() { + 1 => { + assert_eq!(topic_key.topic, "topic_1"); + delete_region_1_old = true; + } + 2 => { + assert_eq!(topic_key.topic, "topic_2"); + delete_region_2 = true; + } + _ => panic!("Unexpected delete operation for region"), + } + } + TxnOp::Put(key, _) => { + put_ops += 1; + let topic_key: TopicRegionKey<'_> = TopicRegionKey::from_bytes(&key).unwrap(); + match topic_key.region_id.region_number() { + 1 => { + assert_eq!(topic_key.topic, "topic_1_new"); + put_region_1_new = true; + } + 3 => { + assert_eq!(topic_key.topic, "topic_3"); + put_region_3 = true; + } + _ => panic!("Unexpected put operation for region"), + } + } + TxnOp::Get(_) => { + panic!("Unexpected Get operation in update transaction"); + } + } + } + + assert_eq!(delete_ops, 2); + assert_eq!(put_ops, 2); + assert!(delete_region_2, "Expected delete for removed region 2"); + assert!( + delete_region_1_old, + "Expected delete for region 1 old topic" + ); + assert!(put_region_1_new, "Expected put for region 1 new topic"); + assert!(put_region_3, "Expected put for new region 3"); + } + + #[test] + fn test_build_update_txn_with_raft_engine() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let manager = TopicRegionManager::new(kv_backend.clone()); + let table_id = 1; + let old_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + (1, WalOptions::RaftEngine), // Should be ignored + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let new_region_wal_options = vec![ + ( + 0, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_0".to_string(), + }), + ), + ( + 1, + WalOptions::Kafka(KafkaWalOptions { + topic: "topic_1".to_string(), // Changed from RaftEngine to Kafka + }), + ), + ] + .into_iter() + .map(|(num, opts)| (num, serde_json::to_string(&opts).unwrap())) + .collect::>(); + let txn = manager + .build_update_txn(table_id, &old_region_wal_options, &new_region_wal_options) + .unwrap(); + let ops = txn.req().success.clone(); + // Should only have Put for region 1 (new Kafka topic) + // Region 0 unchanged, so no operation + // Region 1 was RaftEngine before (not tracked), so only Put needed + assert_eq!(ops.len(), 1); + match &ops[0] { + TxnOp::Put(key, _) => { + let topic_key = TopicRegionKey::from_bytes(key).unwrap(); + assert_eq!(topic_key.region_id, RegionId::new(table_id, 1)); + assert_eq!(topic_key.topic, "topic_1"); + } + TxnOp::Delete(_) | TxnOp::Get(_) => { + panic!("Expected Put operation for new Kafka region"); + } + } + } } diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index fbec7ca14e..25a3d986ec 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -166,6 +166,8 @@ pub struct SubmitRegionMigrationTaskResult { pub peer_conflict: Vec, /// Regions whose table is not found. pub table_not_found: Vec, + /// Regions whose table exists but region route is not found (e.g., removed after repartition). + pub region_not_found: Vec, /// Regions still pending migration. pub migrating: Vec, /// Regions that have been submitted for migration. @@ -396,6 +398,7 @@ impl RegionMigrationManager { leader_changed, peer_conflict, mut table_not_found, + region_not_found, pending, } = analyze_region_migration_task(&task, &self.context_factory.table_metadata_manager) .await?; @@ -405,6 +408,7 @@ impl RegionMigrationManager { leader_changed, peer_conflict, table_not_found, + region_not_found, migrating: migrating_region_ids, submitted: vec![], procedure_id: None, @@ -445,6 +449,7 @@ impl RegionMigrationManager { leader_changed, peer_conflict, table_not_found, + region_not_found, migrating: migrating_region_ids, submitted: vec![], procedure_id: None, @@ -460,6 +465,7 @@ impl RegionMigrationManager { leader_changed, peer_conflict, table_not_found, + region_not_found, migrating: migrating_region_ids, submitted: submitting_region_ids, procedure_id: Some(procedure_id), diff --git a/src/meta-srv/src/procedure/region_migration/utils.rs b/src/meta-srv/src/procedure/region_migration/utils.rs index e6243ec5b0..a7c9e93871 100644 --- a/src/meta-srv/src/procedure/region_migration/utils.rs +++ b/src/meta-srv/src/procedure/region_migration/utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::time::Duration; @@ -82,13 +82,13 @@ impl RegionMigrationTaskBatch { /// Returns the table regions map. /// /// The key is the table id, the value is the region ids of the table. - pub(crate) fn table_regions(&self) -> HashMap> { + pub(crate) fn table_regions(&self) -> HashMap> { let mut table_regions = HashMap::new(); for region_id in &self.region_ids { table_regions .entry(region_id.table_id()) - .or_insert_with(Vec::new) - .push(*region_id); + .or_insert_with(HashSet::new) + .insert(*region_id); } table_regions } @@ -105,6 +105,8 @@ pub(crate) struct RegionMigrationAnalysis { pub(crate) peer_conflict: Vec, /// Regions whose table is not found. pub(crate) table_not_found: Vec, + /// Regions whose table exists but region route is not found (e.g., removed after repartition). + pub(crate) region_not_found: Vec, /// Regions still pending migration. pub(crate) pending: Vec, } @@ -209,6 +211,12 @@ pub async fn analyze_region_migration_task( err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."), } })?; + + let existing_region_ids = region_routes + .iter() + .map(|r| r.region.id) + .collect::>(); + for region_route in region_routes .iter() .filter(|r| region_ids.contains(&r.region.id)) @@ -220,6 +228,12 @@ pub async fn analyze_region_migration_task( task.to_peer.id, )?; } + + for region_id in region_ids { + if !existing_region_ids.contains(region_id) { + result.region_not_found.push(*region_id); + } + } } Ok(result) @@ -460,6 +474,8 @@ mod tests { RegionId::new(1024, 2), RegionId::new(1024, 3), RegionId::new(1024, 4), + // Region of existing table but route removed (e.g., after repartition). + RegionId::new(1024, 5), RegionId::new(1025, 1), ], from_peer: Peer::empty(1), @@ -477,6 +493,7 @@ mod tests { migrated: vec![RegionId::new(1024, 1)], leader_changed: vec![RegionId::new(1024, 2)], peer_conflict: vec![RegionId::new(1024, 3)], + region_not_found: vec![RegionId::new(1024, 5)], table_not_found: vec![RegionId::new(1025, 1)], } ); diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index c91270ecfa..f734677803 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -23,6 +23,7 @@ pub mod repartition_start; pub mod utils; use std::any::Any; +use std::collections::HashMap; use std::fmt::Debug; use common_error::ext::BoxedError; @@ -50,7 +51,7 @@ use common_telemetry::error; use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use store_api::storage::{RegionNumber, TableId}; use table::table_name::TableName; use crate::error::{self, Result}; @@ -198,6 +199,7 @@ impl Context { &self, current_table_route_value: &DeserializedValueWithBytes, new_region_routes: Vec, + new_region_wal_options: HashMap, ) -> Result<()> { let table_id = self.persistent_ctx.table_id; if new_region_routes.is_empty() { @@ -221,6 +223,16 @@ impl Context { region_wal_options, .. } = &datanode_table_value.region_info; + + // Merge and validate the new region wal options. + let validated_region_wal_options = + crate::procedure::repartition::utils::merge_and_validate_region_wal_options( + region_wal_options, + new_region_wal_options, + &new_region_routes, + table_id, + )?; + self.table_metadata_manager .update_table_route( table_id, @@ -228,7 +240,7 @@ impl Context { current_table_route_value, new_region_routes, region_options, - region_wal_options, + &validated_region_wal_options, ) .await .context(error::TableMetadataManagerSnafu) diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index fbf565b946..ceb50ba732 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -119,7 +119,7 @@ impl State for AllocateRegion { let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; let new_region_routes = Self::generate_region_routes(region_routes, &new_allocated_region_routes); - ctx.update_table_route(&table_route_value, new_region_routes) + ctx.update_table_route(&table_route_value, new_region_routes, wal_options) .await?; ctx.invalidate_table_cache().await?; diff --git a/src/meta-srv/src/procedure/repartition/deallocate_region.rs b/src/meta-srv/src/procedure/repartition/deallocate_region.rs index c369e9d5d1..48bfc31936 100644 --- a/src/meta-srv/src/procedure/repartition/deallocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/deallocate_region.rs @@ -98,7 +98,7 @@ impl State for DeallocateRegion { let region_routes = table_route_value.region_routes().unwrap(); let new_region_routes = Self::generate_region_routes(region_routes, &pending_deallocate_region_ids); - ctx.update_table_route(&table_route_value, new_region_routes) + ctx.update_table_route(&table_route_value, new_region_routes, HashMap::new()) .await?; ctx.invalidate_table_cache().await?; diff --git a/src/meta-srv/src/procedure/repartition/utils.rs b/src/meta-srv/src/procedure/repartition/utils.rs index 161c0feedd..ddeb974eef 100644 --- a/src/meta-srv/src/procedure/repartition/utils.rs +++ b/src/meta-srv/src/procedure/repartition/utils.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{HashMap, HashSet}; + use common_error::ext::BoxedError; use common_meta::key::TableMetadataManagerRef; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::TableId; +use common_meta::rpc::router::RegionRoute; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::error::{self, Result}; @@ -47,3 +50,207 @@ pub async fn get_datanode_table_value( })?; Ok(datanode_table_value) } + +/// Merges and validates region WAL options for repartition. +/// +/// This function: +/// 1. Validates that new WAL options don't overwrite existing ones +/// 2. Merges existing `region_wal_options` with new `new_region_wal_options` +/// 3. Filters out WAL options for regions that are not in `new_region_routes` +/// 4. Validates that every region in `new_region_routes` has a corresponding WAL option +/// +/// # Arguments +/// * `region_wal_options` - Existing region WAL options from datanode table +/// * `new_region_wal_options` - New region WAL options to merge (should only contain newly allocated regions) +/// * `new_region_routes` - The new region routes after repartition +/// * `table_id` - Table ID for error reporting +/// +/// # Returns +/// Returns the merged and filtered WAL options, ensuring all regions have options. +/// +/// # Errors +/// Returns an error if: +/// - New WAL options try to overwrite existing ones for the same region +/// - Any region in `new_region_routes` is missing a WAL option +pub fn merge_and_validate_region_wal_options( + region_wal_options: &HashMap, + mut new_region_wal_options: HashMap, + new_region_routes: &[RegionRoute], + table_id: TableId, +) -> Result> { + // Doesn't allow overwriting existing WAL options. + for (region_number, _) in new_region_wal_options.iter() { + if region_wal_options.contains_key(region_number) { + return error::UnexpectedSnafu { + violated: format!( + "Overwriting existing WAL option for region: {}", + RegionId::new(table_id, *region_number) + ), + } + .fail(); + } + } + + new_region_wal_options.extend(region_wal_options.clone()); + + // Extract region numbers from new routes + let region_numbers: HashSet = new_region_routes + .iter() + .map(|r| r.region.id.region_number()) + .collect(); + + // Filter out WAL options for regions that are not in new_region_routes + new_region_wal_options.retain(|k, _| region_numbers.contains(k)); + + // Validate that every region has a WAL option + ensure!( + region_numbers.len() == new_region_wal_options.len(), + error::UnexpectedSnafu { + violated: format!( + "Mismatch between number of region_numbers ({}) and new_region_wal_options ({}) for table: {}", + region_numbers.len(), + new_region_wal_options.len(), + table_id + ), + } + ); + + Ok(new_region_wal_options) +} + +#[cfg(test)] +mod tests { + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use common_wal::options::{KafkaWalOptions, WalOptions}; + use store_api::storage::RegionId; + + use super::*; + + /// Helper function to create a Kafka WAL option string from a topic name. + fn kafka_wal_option(topic: &str) -> String { + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + })) + .unwrap() + } + + fn new_region_route(region_id: u64, datanode_id: u64) -> RegionRoute { + RegionRoute { + region: Region { + id: RegionId::from_u64(region_id), + ..Default::default() + }, + leader_peer: Some(Peer::empty(datanode_id)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + } + } + + #[test] + fn test_merge_and_validate_region_wal_options_success() { + let table_id = 1; + let existing_wal_options: HashMap = vec![ + (1, kafka_wal_option("topic_1")), + (2, kafka_wal_option("topic_2")), + ] + .into_iter() + .collect(); + let new_wal_options: HashMap = + vec![(3, kafka_wal_option("topic_3"))].into_iter().collect(); + let new_region_routes = vec![ + new_region_route(1, 1), + new_region_route(2, 2), + new_region_route(3, 3), + ]; + let result = merge_and_validate_region_wal_options( + &existing_wal_options, + new_wal_options, + &new_region_routes, + table_id, + ) + .unwrap(); + + // Should have all three regions + assert_eq!(result.len(), 3); + assert!(result.contains_key(&1)); + assert!(result.contains_key(&2)); + assert!(result.contains_key(&3)); + // Existing options should be preserved + assert_eq!(result.get(&1).unwrap(), &kafka_wal_option("topic_1")); + assert_eq!(result.get(&2).unwrap(), &kafka_wal_option("topic_2")); + // New option should be present + assert_eq!(result.get(&3).unwrap(), &kafka_wal_option("topic_3")); + } + + #[test] + fn test_merge_and_validate_region_wal_options_new_overrides_existing() { + let table_id = 1; + let existing_wal_options: HashMap = + vec![(1, kafka_wal_option("topic_1_old"))] + .into_iter() + .collect(); + let new_wal_options: HashMap = + vec![(1, kafka_wal_option("topic_1_new"))] + .into_iter() + .collect(); + let new_region_routes = vec![new_region_route(1, 1)]; + merge_and_validate_region_wal_options( + &existing_wal_options, + new_wal_options, + &new_region_routes, + table_id, + ) + .unwrap_err(); + } + + #[test] + fn test_merge_and_validate_region_wal_options_filters_removed_regions() { + let table_id = 1; + let existing_wal_options: HashMap = vec![ + (1, kafka_wal_option("topic_1")), + (2, kafka_wal_option("topic_2")), + (3, kafka_wal_option("topic_3")), + ] + .into_iter() + .collect(); + let new_wal_options = HashMap::new(); + // Only regions 1 and 2 are in new routes (region 3 removed) + let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)]; + let result = merge_and_validate_region_wal_options( + &existing_wal_options, + new_wal_options, + &new_region_routes, + table_id, + ) + .unwrap(); + + // Should only have regions 1 and 2 (region 3 filtered out) + assert_eq!(result.len(), 2); + assert!(result.contains_key(&1)); + assert!(result.contains_key(&2)); + assert!(!result.contains_key(&3)); + } + + #[test] + fn test_merge_and_validate_region_wal_options_missing_option() { + let table_id = 1; + let existing_wal_options: HashMap = + vec![(1, kafka_wal_option("topic_1"))].into_iter().collect(); + let new_wal_options = HashMap::new(); + // Region 2 is in routes but has no WAL option + let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)]; + let result = merge_and_validate_region_wal_options( + &existing_wal_options, + new_wal_options, + &new_region_routes, + table_id, + ); + // Should fail validation + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("Mismatch")); + assert!(error_msg.contains(&table_id.to_string())); + } +} diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index f35d0b898f..90e08992f6 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -744,6 +744,18 @@ impl RegionSupervisor { result.migrating ); } + if !result.region_not_found.is_empty() { + let detecting_regions = result + .region_not_found + .iter() + .map(|region_id| (from_peer_id, *region_id)) + .collect::>(); + self.deregister_failure_detectors(detecting_regions).await; + info!( + "Region route not found, removed failover detectors for regions: {:?}", + result.region_not_found + ); + } if !result.table_not_found.is_empty() { let detecting_regions = result .table_not_found @@ -1222,6 +1234,34 @@ pub(crate) mod tests { assert!(supervisor.failover_counts.is_empty()); } + #[tokio::test] + async fn test_handle_submit_region_migration_task_result_region_not_found() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, _) = new_test_supervisor(); + let region_id = RegionId::new(1, 1); + let detecting_region = (1, region_id); + supervisor + .register_failure_detectors(vec![detecting_region]) + .await; + supervisor.failover_counts.insert(detecting_region, 1); + let result = SubmitRegionMigrationTaskResult { + region_not_found: vec![region_id], + ..Default::default() + }; + supervisor + .handle_submit_region_migration_task_result( + 1, + 2, + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + result, + ) + .await + .unwrap(); + assert!(!supervisor.failure_detector.contains(&detecting_region)); + assert!(supervisor.failover_counts.is_empty()); + } + #[tokio::test] async fn test_handle_submit_region_migration_task_result_leader_changed() { common_telemetry::init_default_ut_logging();