mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-21 07:20:41 +00:00
fix(meta): renew operating region leases from keeper roles
Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -20,7 +20,7 @@ use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::region_keeper::MemoryRegionKeeperRef;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_telemetry::warn;
|
||||
use common_telemetry::{info, warn};
|
||||
use snafu::ResultExt;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
@@ -63,15 +63,9 @@ fn renew_region_lease_via_region_route(
|
||||
if let Some(leader) = ®ion_route.leader_peer
|
||||
&& leader.id == datanode_id
|
||||
{
|
||||
let region_role = if region_route.is_leader_staging() {
|
||||
RegionRole::StagingLeader
|
||||
} else if region_route.is_leader_downgrading() {
|
||||
RegionRole::DowngradingLeader
|
||||
} else {
|
||||
RegionRole::Leader
|
||||
};
|
||||
|
||||
return Some((region_id, region_role));
|
||||
return region_route
|
||||
.leader_region_role()
|
||||
.map(|region_role| (region_id, region_role));
|
||||
}
|
||||
|
||||
// If it's a follower region on this datanode.
|
||||
@@ -154,13 +148,21 @@ impl RegionLeaseKeeper {
|
||||
fn renew_region_lease(
|
||||
&self,
|
||||
table_metadata: &HashMap<TableId, TableRouteValue>,
|
||||
operating_regions: &HashSet<RegionId>,
|
||||
operating_regions: &HashMap<RegionId, RegionRole>,
|
||||
datanode_id: DatanodeId,
|
||||
region_id: RegionId,
|
||||
role: RegionRole,
|
||||
reported_role: RegionRole,
|
||||
) -> Option<RegionLeaseInfo> {
|
||||
if operating_regions.contains(®ion_id) {
|
||||
let region_lease_info = RegionLeaseInfo::operating(region_id, role);
|
||||
// `operating_regions` is filtered by the current datanode in `collect_metadata`,
|
||||
// so looking up by `region_id` is sufficient here.
|
||||
if let Some(role) = operating_regions.get(®ion_id) {
|
||||
let region_lease_info = RegionLeaseInfo::operating(region_id, *role);
|
||||
if *role != reported_role {
|
||||
info!(
|
||||
"The region {} on datanode {} is operating with role {:?}, but reported as {:?}",
|
||||
region_id, datanode_id, role, reported_role
|
||||
);
|
||||
}
|
||||
return Some(region_lease_info);
|
||||
}
|
||||
|
||||
@@ -187,11 +189,14 @@ impl RegionLeaseKeeper {
|
||||
&self,
|
||||
datanode_id: DatanodeId,
|
||||
mut region_ids: HashSet<RegionId>,
|
||||
) -> Result<(HashMap<TableId, TableRouteValue>, HashSet<RegionId>)> {
|
||||
) -> Result<(
|
||||
HashMap<TableId, TableRouteValue>,
|
||||
HashMap<RegionId, RegionRole>,
|
||||
)> {
|
||||
// Filters out operating region first, improves the cache hit rate(reduce expensive remote fetches).
|
||||
let operating_regions = self
|
||||
.memory_region_keeper
|
||||
.extract_operating_regions(datanode_id, &mut region_ids);
|
||||
.extract_operating_region_roles(datanode_id, &mut region_ids);
|
||||
let table_ids = region_ids
|
||||
.into_iter()
|
||||
.map(|region_id| region_id.table_id())
|
||||
@@ -224,13 +229,13 @@ impl RegionLeaseKeeper {
|
||||
let mut renewed = HashMap::new();
|
||||
let mut non_exists = HashSet::new();
|
||||
|
||||
for &(region, role) in regions {
|
||||
for &(region, reported_role) in regions {
|
||||
match self.renew_region_lease(
|
||||
&table_metadata,
|
||||
&operating_regions,
|
||||
datanode_id,
|
||||
region,
|
||||
role,
|
||||
reported_role,
|
||||
) {
|
||||
Some(region_lease_info) => {
|
||||
renewed.insert(region_lease_info.region_id, region_lease_info);
|
||||
@@ -399,8 +404,10 @@ mod tests {
|
||||
metadata.keys().cloned().collect::<Vec<_>>(),
|
||||
vec![region_id.table_id()]
|
||||
);
|
||||
assert!(regions.contains(&opening_region_id));
|
||||
assert_eq!(regions.len(), 1);
|
||||
assert_eq!(
|
||||
regions,
|
||||
HashMap::from([(opening_region_id, RegionRole::Leader)])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -490,12 +497,12 @@ mod tests {
|
||||
|
||||
// The opening region on the datanode.
|
||||
// NOTES: The procedure lock will ensure only one opening leader.
|
||||
for role in [RegionRole::Leader, RegionRole::Follower] {
|
||||
for reported_role in [RegionRole::Leader, RegionRole::Follower] {
|
||||
let RenewRegionLeasesResponse {
|
||||
non_exists,
|
||||
renewed,
|
||||
} = keeper
|
||||
.renew_region_leases(leader_peer_id, &[(opening_region_id, role)])
|
||||
.renew_region_leases(leader_peer_id, &[(opening_region_id, reported_role)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -504,7 +511,7 @@ mod tests {
|
||||
renewed,
|
||||
HashMap::from([(
|
||||
opening_region_id,
|
||||
RegionLeaseInfo::operating(opening_region_id, role)
|
||||
RegionLeaseInfo::operating(opening_region_id, RegionRole::Leader)
|
||||
)])
|
||||
);
|
||||
}
|
||||
@@ -680,14 +687,14 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_renew_region_leases_operating_region_preserves_reported_role() {
|
||||
async fn test_renew_region_leases_operating_region_uses_keeper_role() {
|
||||
let keeper = new_test_keeper();
|
||||
let datanode_id = 1024;
|
||||
let region_id = RegionId::new(2048, 1);
|
||||
|
||||
let _guard = keeper
|
||||
.memory_region_keeper
|
||||
.register_with_role(datanode_id, region_id, RegionRole::Leader)
|
||||
.register_with_role(datanode_id, region_id, RegionRole::DowngradingLeader)
|
||||
.unwrap();
|
||||
|
||||
let RenewRegionLeasesResponse {
|
||||
@@ -703,7 +710,7 @@ mod tests {
|
||||
renewed,
|
||||
HashMap::from([(
|
||||
region_id,
|
||||
RegionLeaseInfo::operating(region_id, RegionRole::StagingLeader)
|
||||
RegionLeaseInfo::operating(region_id, RegionRole::DowngradingLeader)
|
||||
)])
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user