fix(meta): prefer metadata roles for region lease renewal

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
WenyXu
2026-04-16 03:47:29 +00:00
parent dc66363f89
commit 557a0d22f1
7 changed files with 158 additions and 62 deletions

View File

@@ -450,7 +450,7 @@ mod tests {
let roles = ddl_context
.memory_region_keeper
.extract_operating_region_roles(7, &mut HashSet::from([region_id]));
.extract_operating_region_roles(7, &HashSet::from([region_id]));
assert_eq!(roles.get(&region_id), Some(&RegionRole::DowngradingLeader));
}

View File

@@ -354,7 +354,7 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
);
let roles = ddl_context
.memory_region_keeper
.extract_operating_region_roles(datanode_id, &mut HashSet::from([region_id]));
.extract_operating_region_roles(datanode_id, &HashSet::from([region_id]));
assert_eq!(roles.get(&region_id), Some(&RegionRole::Leader));
execute_procedure_until_done(&mut procedure).await;

View File

@@ -331,7 +331,7 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
);
let roles = ddl_context
.memory_region_keeper
.extract_operating_region_roles(datanode_id, &mut HashSet::from([region_id]));
.extract_operating_region_roles(datanode_id, &HashSet::from([region_id]));
assert_eq!(roles.get(&region_id), Some(&RegionRole::Leader));
execute_procedure_until_done(&mut procedure).await;

View File

@@ -92,16 +92,15 @@ impl MemoryRegionKeeper {
pub fn extract_operating_region_roles(
&self,
datanode_id: DatanodeId,
region_ids: &mut HashSet<RegionId>,
region_ids: &HashSet<RegionId>,
) -> HashMap<RegionId, RegionRole> {
let inner = self.inner.read().unwrap();
region_ids
.extract_if(|region_id| inner.contains_key(&(datanode_id, *region_id)))
.map(|region_id| {
let role = *inner
.get(&(datanode_id, region_id))
.expect("operating region role must exist");
(region_id, role)
.iter()
.filter_map(|region_id| {
inner
.get(&(datanode_id, *region_id))
.map(|role| (*region_id, *role))
})
.collect()
}
@@ -149,25 +148,24 @@ mod tests {
.register_with_role(1, RegionId::from_u64(2), RegionRole::Follower)
.unwrap();
let mut regions = HashSet::from([
let regions = HashSet::from([
RegionId::from_u64(1),
RegionId::from_u64(2),
RegionId::from_u64(3),
]);
let output = keeper.extract_operating_region_roles(1, &mut regions);
let output = keeper.extract_operating_region_roles(1, &regions);
assert_eq!(output.len(), 2);
assert!(output.contains_key(&RegionId::from_u64(1)));
assert!(output.contains_key(&RegionId::from_u64(2)));
assert_eq!(regions, HashSet::from([RegionId::from_u64(3)]));
assert_eq!(keeper.len(), 2);
let mut regions = HashSet::from([
let regions = HashSet::from([
RegionId::from_u64(1),
RegionId::from_u64(2),
RegionId::from_u64(3),
]);
let output = keeper.extract_operating_region_roles(1, &mut regions);
let output = keeper.extract_operating_region_roles(1, &regions);
assert_eq!(
output,
HashMap::from([
@@ -175,7 +173,6 @@ mod tests {
(RegionId::from_u64(2), RegionRole::Follower),
])
);
assert_eq!(regions, HashSet::from([RegionId::from_u64(3)]));
assert_eq!(keeper.len(), 2);
drop(guard);

View File

@@ -107,15 +107,11 @@ pub fn operating_leader_region_roles(
region_routes
.iter()
.filter_map(|route| {
route.leader_region_role().map(|role| {
(
route.region.id,
route.leader_peer.as_ref().unwrap().id,
role,
)
})
let role = route.leader_region_role()?;
let leader = route.leader_peer.as_ref()?;
Some((route.region.id, leader.id, role))
})
.collect::<Vec<_>>()
.collect()
}
/// Returns the HashMap<[RegionNumber], &[Peer]>;

View File

@@ -1028,11 +1028,11 @@ mod tests {
let _guards = Context::register_operating_regions(&keeper, &region_routes).unwrap();
let leader_roles =
keeper.extract_operating_region_roles(1, &mut HashSet::from([RegionId::new(1024, 1)]));
keeper.extract_operating_region_roles(1, &HashSet::from([RegionId::new(1024, 1)]));
let staging_roles =
keeper.extract_operating_region_roles(2, &mut HashSet::from([RegionId::new(1024, 2)]));
keeper.extract_operating_region_roles(2, &HashSet::from([RegionId::new(1024, 2)]));
let downgrading_roles =
keeper.extract_operating_region_roles(3, &mut HashSet::from([RegionId::new(1024, 3)]));
keeper.extract_operating_region_roles(3, &HashSet::from([RegionId::new(1024, 3)]));
assert_eq!(
leader_roles.get(&RegionId::new(1024, 1)),

View File

@@ -77,11 +77,28 @@ fn renew_region_lease_via_region_route(
return Some((region_id, RegionRole::Follower));
}
warn!(
"Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region_routes: {:?}",
region_route
);
// The region doesn't belong to this datanode.
None
}
fn renew_region_lease_via_operating_regions(
operating_regions: &HashMap<RegionId, RegionRole>,
datanode_id: DatanodeId,
region_id: RegionId,
reported_role: RegionRole,
) -> Option<RegionLeaseInfo> {
// `operating_regions` is filtered by the current datanode in `collect_metadata`,
// so looking up by `region_id` is sufficient here.
if let Some(role) = operating_regions.get(&region_id) {
let region_lease_info = RegionLeaseInfo::operating(region_id, *role);
if *role != reported_role {
info!(
"The region {} on datanode {} is operating with role {:?}, but reported as {:?}",
region_id, datanode_id, role, reported_role
);
}
return Some(region_lease_info);
}
None
}
@@ -143,7 +160,7 @@ impl RegionLeaseKeeper {
}
/// Returns [None] if:
/// - The region doesn't belong to the datanode.
/// - The region doesn't belong to the datanode in metadata or operating regions.
/// - The region belongs to a logical table.
fn renew_region_lease(
&self,
@@ -153,50 +170,41 @@ impl RegionLeaseKeeper {
region_id: RegionId,
reported_role: RegionRole,
) -> Option<RegionLeaseInfo> {
// `operating_regions` is filtered by the current datanode in `collect_metadata`,
// so looking up by `region_id` is sufficient here.
if let Some(role) = operating_regions.get(&region_id) {
let region_lease_info = RegionLeaseInfo::operating(region_id, *role);
if *role != reported_role {
info!(
"The region {} on datanode {} is operating with role {:?}, but reported as {:?}",
region_id, datanode_id, role, reported_role
);
}
// First try to renew via region route
if let Some(table_route) = table_metadata.get(&region_id.table_id())
&& let Ok(Some(region_route)) = table_route.region_route(region_id)
&& let Some(region_lease) =
renew_region_lease_via_region_route(&region_route, datanode_id, region_id)
{
return Some(RegionLeaseInfo::from(region_lease));
}
// Then try to renew via operating regions, which covers the opening region without region route in metadata.
if let Some(region_lease_info) = renew_region_lease_via_operating_regions(
operating_regions,
datanode_id,
region_id,
reported_role,
) {
return Some(region_lease_info);
}
if let Some(table_route) = table_metadata.get(&region_id.table_id()) {
if let Ok(Some(region_route)) = table_route.region_route(region_id) {
return renew_region_lease_via_region_route(&region_route, datanode_id, region_id)
.map(RegionLeaseInfo::from);
} else {
warn!(
"Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, region route is not found in table({})",
region_id.table_id()
);
}
} else {
warn!(
"Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, table({}) is not found",
region_id.table_id()
);
}
warn!(
"Denied to renew region lease for datanode: {datanode_id}, region_id: {region_id}, no matching metadata or operating region found",
);
None
}
async fn collect_metadata(
&self,
datanode_id: DatanodeId,
mut region_ids: HashSet<RegionId>,
region_ids: HashSet<RegionId>,
) -> Result<(
HashMap<TableId, TableRouteValue>,
HashMap<RegionId, RegionRole>,
)> {
// Filters out operating region first, improves the cache hit rate(reduce expensive remote fetches).
let operating_regions = self
.memory_region_keeper
.extract_operating_region_roles(datanode_id, &mut region_ids);
.extract_operating_region_roles(datanode_id, &region_ids);
let table_ids = region_ids
.into_iter()
.map(|region_id| region_id.table_id())
@@ -686,6 +694,101 @@ mod tests {
);
}
#[tokio::test]
async fn test_renew_region_leases_metadata_role_beats_keeper_role() {
let table_id = 2048;
let table_info: TableInfo = new_test_table_info(table_id);
let datanode_id = 1024;
let region_id = RegionId::new(table_id, 1);
let region_route = RegionRouteBuilder::default()
.region(Region::new_test(region_id))
.leader_peer(Peer::empty(datanode_id))
.build()
.unwrap();
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.await
.unwrap();
let _guard = keeper
.memory_region_keeper
.register_with_role(datanode_id, region_id, RegionRole::Follower)
.unwrap();
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(datanode_id, &[(region_id, RegionRole::Follower)])
.await
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(
renewed,
HashMap::from([(
region_id,
RegionLeaseInfo::from((region_id, RegionRole::Leader))
)])
);
}
#[tokio::test]
async fn test_renew_region_leases_missing_route_falls_back_to_keeper_role() {
let table_id = 2048;
let table_info: TableInfo = new_test_table_info(table_id);
let datanode_id = 1024;
let region_id = RegionId::new(table_id, 1);
let another_region_id = RegionId::new(table_id, 2);
let region_route = RegionRouteBuilder::default()
.region(Region::new_test(another_region_id))
.leader_peer(Peer::empty(datanode_id))
.build()
.unwrap();
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.await
.unwrap();
let _guard = keeper
.memory_region_keeper
.register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader)
.unwrap();
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)])
.await
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(
renewed,
HashMap::from([(
region_id,
RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader)
)])
);
}
#[tokio::test]
async fn test_renew_region_leases_operating_region_uses_keeper_role() {
let keeper = new_test_keeper();