diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index a14d23deb5..1c01060c5e 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -450,7 +450,7 @@ mod tests { let roles = ddl_context .memory_region_keeper - .extract_operating_region_roles(7, &mut HashSet::from([region_id])); + .extract_operating_region_roles(7, &HashSet::from([region_id])); assert_eq!(roles.get(®ion_id), Some(&RegionRole::DowngradingLeader)); } diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index 29dd120b9f..da91dedcaf 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -354,7 +354,7 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { ); let roles = ddl_context .memory_region_keeper - .extract_operating_region_roles(datanode_id, &mut HashSet::from([region_id])); + .extract_operating_region_roles(datanode_id, &HashSet::from([region_id])); assert_eq!(roles.get(®ion_id), Some(&RegionRole::Leader)); execute_procedure_until_done(&mut procedure).await; diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index ae08d8b689..c518bb36a2 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -331,7 +331,7 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { ); let roles = ddl_context .memory_region_keeper - .extract_operating_region_roles(datanode_id, &mut HashSet::from([region_id])); + .extract_operating_region_roles(datanode_id, &HashSet::from([region_id])); assert_eq!(roles.get(®ion_id), Some(&RegionRole::Leader)); execute_procedure_until_done(&mut procedure).await; diff --git a/src/common/meta/src/region_keeper.rs b/src/common/meta/src/region_keeper.rs index 77db68450c..aca0b95838 100644 --- a/src/common/meta/src/region_keeper.rs +++ b/src/common/meta/src/region_keeper.rs @@ -92,16 +92,15 @@ impl MemoryRegionKeeper { pub fn extract_operating_region_roles( &self, datanode_id: DatanodeId, - region_ids: &mut HashSet, + region_ids: &HashSet, ) -> HashMap { let inner = self.inner.read().unwrap(); region_ids - .extract_if(|region_id| inner.contains_key(&(datanode_id, *region_id))) - .map(|region_id| { - let role = *inner - .get(&(datanode_id, region_id)) - .expect("operating region role must exist"); - (region_id, role) + .iter() + .filter_map(|region_id| { + inner + .get(&(datanode_id, *region_id)) + .map(|role| (*region_id, *role)) }) .collect() } @@ -149,25 +148,24 @@ mod tests { .register_with_role(1, RegionId::from_u64(2), RegionRole::Follower) .unwrap(); - let mut regions = HashSet::from([ + let regions = HashSet::from([ RegionId::from_u64(1), RegionId::from_u64(2), RegionId::from_u64(3), ]); - let output = keeper.extract_operating_region_roles(1, &mut regions); + let output = keeper.extract_operating_region_roles(1, ®ions); assert_eq!(output.len(), 2); assert!(output.contains_key(&RegionId::from_u64(1))); assert!(output.contains_key(&RegionId::from_u64(2))); - assert_eq!(regions, HashSet::from([RegionId::from_u64(3)])); assert_eq!(keeper.len(), 2); - let mut regions = HashSet::from([ + let regions = HashSet::from([ RegionId::from_u64(1), RegionId::from_u64(2), RegionId::from_u64(3), ]); - let output = keeper.extract_operating_region_roles(1, &mut regions); + let output = keeper.extract_operating_region_roles(1, ®ions); assert_eq!( output, HashMap::from([ @@ -175,7 +173,6 @@ mod tests { (RegionId::from_u64(2), RegionRole::Follower), ]) ); - assert_eq!(regions, HashSet::from([RegionId::from_u64(3)])); assert_eq!(keeper.len(), 2); drop(guard); diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index 5c2bf521d0..519b12e081 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -107,15 +107,11 @@ pub fn operating_leader_region_roles( region_routes .iter() .filter_map(|route| { - route.leader_region_role().map(|role| { - ( - route.region.id, - route.leader_peer.as_ref().unwrap().id, - role, - ) - }) + let role = route.leader_region_role()?; + let leader = route.leader_peer.as_ref()?; + Some((route.region.id, leader.id, role)) }) - .collect::>() + .collect() } /// Returns the HashMap<[RegionNumber], &[Peer]>; diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs index 7ee9137f5a..f314a40080 100644 --- a/src/meta-srv/src/procedure/repartition.rs +++ b/src/meta-srv/src/procedure/repartition.rs @@ -1028,11 +1028,11 @@ mod tests { let _guards = Context::register_operating_regions(&keeper, ®ion_routes).unwrap(); let leader_roles = - keeper.extract_operating_region_roles(1, &mut HashSet::from([RegionId::new(1024, 1)])); + keeper.extract_operating_region_roles(1, &HashSet::from([RegionId::new(1024, 1)])); let staging_roles = - keeper.extract_operating_region_roles(2, &mut HashSet::from([RegionId::new(1024, 2)])); + keeper.extract_operating_region_roles(2, &HashSet::from([RegionId::new(1024, 2)])); let downgrading_roles = - keeper.extract_operating_region_roles(3, &mut HashSet::from([RegionId::new(1024, 3)])); + keeper.extract_operating_region_roles(3, &HashSet::from([RegionId::new(1024, 3)])); assert_eq!( leader_roles.get(&RegionId::new(1024, 1)), diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index a80ff0f566..d4a899d002 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -77,11 +77,28 @@ fn renew_region_lease_via_region_route( return Some((region_id, RegionRole::Follower)); } - warn!( - "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region_routes: {:?}", - region_route - ); - // The region doesn't belong to this datanode. + None +} + +fn renew_region_lease_via_operating_regions( + operating_regions: &HashMap, + datanode_id: DatanodeId, + region_id: RegionId, + reported_role: RegionRole, +) -> Option { + // `operating_regions` is filtered by the current datanode in `collect_metadata`, + // so looking up by `region_id` is sufficient here. + if let Some(role) = operating_regions.get(®ion_id) { + let region_lease_info = RegionLeaseInfo::operating(region_id, *role); + if *role != reported_role { + info!( + "The region {} on datanode {} is operating with role {:?}, but reported as {:?}", + region_id, datanode_id, role, reported_role + ); + } + return Some(region_lease_info); + } + None } @@ -143,7 +160,7 @@ impl RegionLeaseKeeper { } /// Returns [None] if: - /// - The region doesn't belong to the datanode. + /// - The region doesn't belong to the datanode in metadata or operating regions. /// - The region belongs to a logical table. fn renew_region_lease( &self, @@ -153,50 +170,41 @@ impl RegionLeaseKeeper { region_id: RegionId, reported_role: RegionRole, ) -> Option { - // `operating_regions` is filtered by the current datanode in `collect_metadata`, - // so looking up by `region_id` is sufficient here. - if let Some(role) = operating_regions.get(®ion_id) { - let region_lease_info = RegionLeaseInfo::operating(region_id, *role); - if *role != reported_role { - info!( - "The region {} on datanode {} is operating with role {:?}, but reported as {:?}", - region_id, datanode_id, role, reported_role - ); - } + // First try to renew via region route + if let Some(table_route) = table_metadata.get(®ion_id.table_id()) + && let Ok(Some(region_route)) = table_route.region_route(region_id) + && let Some(region_lease) = + renew_region_lease_via_region_route(®ion_route, datanode_id, region_id) + { + return Some(RegionLeaseInfo::from(region_lease)); + } + // Then try to renew via operating regions, which covers the opening region without region route in metadata. + if let Some(region_lease_info) = renew_region_lease_via_operating_regions( + operating_regions, + datanode_id, + region_id, + reported_role, + ) { return Some(region_lease_info); } - if let Some(table_route) = table_metadata.get(®ion_id.table_id()) { - if let Ok(Some(region_route)) = table_route.region_route(region_id) { - return renew_region_lease_via_region_route(®ion_route, datanode_id, region_id) - .map(RegionLeaseInfo::from); - } else { - warn!( - "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region route is not found in table({})", - region_id.table_id() - ); - } - } else { - warn!( - "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, table({}) is not found", - region_id.table_id() - ); - } + warn!( + "Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, no matching metadata or operating region found", + ); None } async fn collect_metadata( &self, datanode_id: DatanodeId, - mut region_ids: HashSet, + region_ids: HashSet, ) -> Result<( HashMap, HashMap, )> { - // Filters out operating region first, improves the cache hit rate(reduce expensive remote fetches). let operating_regions = self .memory_region_keeper - .extract_operating_region_roles(datanode_id, &mut region_ids); + .extract_operating_region_roles(datanode_id, ®ion_ids); let table_ids = region_ids .into_iter() .map(|region_id| region_id.table_id()) @@ -686,6 +694,101 @@ mod tests { ); } + #[tokio::test] + async fn test_renew_region_leases_metadata_role_beats_keeper_role() { + let table_id = 2048; + let table_info: TableInfo = new_test_table_info(table_id); + + let datanode_id = 1024; + let region_id = RegionId::new(table_id, 1); + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(datanode_id)) + .build() + .unwrap(); + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(vec![region_route]), + HashMap::default(), + ) + .await + .unwrap(); + + let _guard = keeper + .memory_region_keeper + .register_with_role(datanode_id, region_id, RegionRole::Follower) + .unwrap(); + + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(datanode_id, &[(region_id, RegionRole::Follower)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!( + renewed, + HashMap::from([( + region_id, + RegionLeaseInfo::from((region_id, RegionRole::Leader)) + )]) + ); + } + + #[tokio::test] + async fn test_renew_region_leases_missing_route_falls_back_to_keeper_role() { + let table_id = 2048; + let table_info: TableInfo = new_test_table_info(table_id); + + let datanode_id = 1024; + let region_id = RegionId::new(table_id, 1); + let another_region_id = RegionId::new(table_id, 2); + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(another_region_id)) + .leader_peer(Peer::empty(datanode_id)) + .build() + .unwrap(); + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(vec![region_route]), + HashMap::default(), + ) + .await + .unwrap(); + + let _guard = keeper + .memory_region_keeper + .register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader) + .unwrap(); + + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!( + renewed, + HashMap::from([( + region_id, + RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader) + )]) + ); + } + #[tokio::test] async fn test_renew_region_leases_operating_region_uses_keeper_role() { let keeper = new_test_keeper();