feat(meta): enhance region lease handling with operating status (#6027)

* feat(meta): enhance region lease handling with operating status

* test: adjust flush metadata region test
This commit is contained in:
Weny Xu
2025-04-30 21:00:34 +08:00
committed by GitHub
parent 44e75b142d
commit 8726bf9f7a
3 changed files with 80 additions and 17 deletions

View File

@@ -19,13 +19,15 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::region_engine::GrantedRegion;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse};
use crate::region::lease_keeper::{
RegionLeaseInfo, RegionLeaseKeeperRef, RenewRegionLeasesResponse,
};
use crate::region::RegionLeaseKeeper;
pub struct RegionLeaseHandler {
@@ -40,7 +42,7 @@ pub trait CustomizedRegionLeaseRenewer: Send + Sync {
fn renew(
&self,
ctx: &mut Context,
regions: HashMap<RegionId, RegionRole>,
regions: HashMap<RegionId, RegionLeaseInfo>,
) -> Vec<GrantedRegion>;
}
@@ -98,7 +100,9 @@ impl HeartbeatHandler for RegionLeaseHandler {
} else {
renewed
.into_iter()
.map(|(region_id, region_role)| GrantedRegion::new(region_id, region_role).into())
.map(|(region_id, region_lease_info)| {
GrantedRegion::new(region_id, region_lease_info.role).into()
})
.collect::<Vec<_>>()
};

View File

@@ -38,7 +38,7 @@ pub struct RegionLeaseKeeper {
/// The result of region lease renewal,
/// contains the renewed region leases and [RegionId] of non-existing regions.
pub struct RenewRegionLeasesResponse {
pub renewed: HashMap<RegionId, RegionRole>,
pub renewed: HashMap<RegionId, RegionLeaseInfo>,
pub non_exists: HashSet<RegionId>,
}
@@ -89,6 +89,39 @@ fn renew_region_lease_via_region_route(
None
}
/// The information of region lease.
#[derive(Debug, PartialEq, Eq)]
pub struct RegionLeaseInfo {
pub region_id: RegionId,
/// Whether the region is operating.
///
/// The region is under dropping or opening / migration operation.
pub is_operating: bool,
/// The role of region.
pub role: RegionRole,
}
impl RegionLeaseInfo {
/// Creates a new [RegionLeaseInfo] with the given region id and role with operating status.
pub fn operating(region_id: RegionId, role: RegionRole) -> Self {
Self {
region_id,
is_operating: true,
role,
}
}
}
impl From<(RegionId, RegionRole)> for RegionLeaseInfo {
fn from((region_id, role): (RegionId, RegionRole)) -> Self {
Self {
region_id,
is_operating: false,
role,
}
}
}
impl RegionLeaseKeeper {
async fn collect_table_metadata(
&self,
@@ -123,14 +156,16 @@ impl RegionLeaseKeeper {
datanode_id: DatanodeId,
region_id: RegionId,
role: RegionRole,
) -> Option<(RegionId, RegionRole)> {
) -> Option<RegionLeaseInfo> {
if operating_regions.contains(&region_id) {
return Some((region_id, role));
let region_lease_info = RegionLeaseInfo::operating(region_id, role);
return Some(region_lease_info);
}
if let Some(table_route) = table_metadata.get(&region_id.table_id()) {
if let Ok(Some(region_route)) = table_route.region_route(region_id) {
return renew_region_lease_via_region_route(&region_route, datanode_id, region_id);
return renew_region_lease_via_region_route(&region_route, datanode_id, region_id)
.map(RegionLeaseInfo::from);
}
}
warn!(
@@ -187,8 +222,8 @@ impl RegionLeaseKeeper {
region,
role,
) {
Some((region, renewed_role)) => {
renewed.insert(region, renewed_role);
Some(region_lease_info) => {
renewed.insert(region_lease_info.region_id, region_lease_info);
}
None => {
non_exists.insert(region);
@@ -225,7 +260,7 @@ mod tests {
use table::metadata::RawTableInfo;
use super::{renew_region_lease_via_region_route, RegionLeaseKeeper};
use crate::region::lease_keeper::RenewRegionLeasesResponse;
use crate::region::lease_keeper::{RegionLeaseInfo, RenewRegionLeasesResponse};
fn new_test_keeper() -> RegionLeaseKeeper {
let store = Arc::new(MemoryKvBackend::new());
@@ -400,7 +435,13 @@ mod tests {
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Leader)]));
assert_eq!(
renewed,
HashMap::from([(
region_id,
RegionLeaseInfo::from((region_id, RegionRole::Leader))
)])
);
}
// The follower region on the datanode.
@@ -414,7 +455,13 @@ mod tests {
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Follower)]));
assert_eq!(
renewed,
HashMap::from([(
region_id,
RegionLeaseInfo::from((region_id, RegionRole::Follower))
)])
);
}
let opening_region_id = RegionId::new(2048, 1);
@@ -435,7 +482,13 @@ mod tests {
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(renewed, HashMap::from([(opening_region_id, role)]));
assert_eq!(
renewed,
HashMap::from([(
opening_region_id,
RegionLeaseInfo::operating(opening_region_id, role)
)])
);
}
}
@@ -515,7 +568,13 @@ mod tests {
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(renewed, HashMap::from([(region_id, RegionRole::Follower)]));
assert_eq!(
renewed,
HashMap::from([(
region_id,
RegionLeaseInfo::from((region_id, RegionRole::Follower))
)])
);
}
}
}

View File

@@ -98,7 +98,7 @@ mod tests {
let env = TestEnv::with_prefix_and_config(
"test_flush_metadata_region_task",
EngineConfig {
flush_metadata_region_interval: Duration::from_millis(100),
flush_metadata_region_interval: Duration::from_millis(10),
..Default::default()
},
)
@@ -106,7 +106,7 @@ mod tests {
env.init_metric_region().await;
let engine = env.metric();
// Wait for flush task run
tokio::time::sleep(Duration::from_millis(200)).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let physical_region_id = env.default_physical_region_id();
let stat = engine.region_statistic(physical_region_id).unwrap();