fix: bring back inactive_region_ids (#2783)

This commit is contained in:
Weny Xu
2023-11-21 15:24:33 +09:00
committed by GitHub
parent 20f01219e9
commit 43a7457e15
5 changed files with 22 additions and 15 deletions

View File

@@ -28,6 +28,7 @@ use common_telemetry::{debug, info, warn};
use dashmap::DashMap;
use futures::future::join_all;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Notify, RwLock};
@@ -84,7 +85,7 @@ pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
pub instructions: Vec<Instruction>,
pub stat: Option<Stat>,
pub inactive_region_ids: HashSet<u64>,
pub inactive_region_ids: HashSet<RegionId>,
pub region_lease: Option<RegionLease>,
}

View File

@@ -20,7 +20,6 @@ use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use common_catalog::consts::default_engine;
use common_meta::RegionIdent;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
@@ -86,7 +85,7 @@ impl HeartbeatHandler for RegionFailureHandler {
.region_stats
.iter()
.map(|x| {
let region_id = RegionId::from(x.id);
let region_id = x.id;
RegionIdent {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
@@ -109,6 +108,7 @@ impl HeartbeatHandler for RegionFailureHandler {
#[cfg(test)]
mod tests {
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
@@ -134,7 +134,7 @@ mod tests {
let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
id: RegionId::from_u64(region_id),
rcus: 0,
wcus: 0,
approximate_bytes: 0,

View File

@@ -45,7 +45,7 @@ pub struct Stat {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionStat {
/// The region_id.
pub id: u64,
pub id: RegionId,
/// The read capacity units during this period
pub rcus: i64,
/// The write capacity units during this period
@@ -75,13 +75,10 @@ impl Stat {
/// Returns a tuple array containing [RegionId] and [RegionRole].
pub fn regions(&self) -> Vec<(RegionId, RegionRole)> {
self.region_stats
.iter()
.map(|s| (RegionId::from(s.id), s.role))
.collect()
self.region_stats.iter().map(|s| (s.id, s.role)).collect()
}
pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<u64>) {
pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<RegionId>) {
if inactive_region_ids.is_empty() {
return;
}
@@ -140,7 +137,7 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
fn try_from(value: api::v1::meta::RegionStat) -> Result<Self, Self::Error> {
Ok(Self {
id: value.region_id,
id: RegionId::from_u64(value.region_id),
rcus: value.rcus,
wcus: value.wcus,
approximate_bytes: value.approximate_bytes,

View File

@@ -99,6 +99,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
let cluster_id = stat.cluster_id;
let datanode_id = stat.id;
let mut granted_regions = Vec::with_capacity(regions.len());
let mut inactive_regions = HashSet::new();
let (leaders, followers): (Vec<_>, Vec<_>) = regions
.into_iter()
@@ -122,6 +123,7 @@ impl HeartbeatHandler for RegionLeaseHandler {
&leaders,
RegionRole::Leader,
);
inactive_regions.extend(closable);
let followers = followers.into_iter().flatten().collect::<Vec<_>>();
@@ -142,7 +144,9 @@ impl HeartbeatHandler for RegionLeaseHandler {
&followers,
RegionRole::Follower,
);
inactive_regions.extend(closable);
acc.inactive_region_ids = inactive_regions;
acc.region_lease = Some(RegionLease {
regions: granted_regions
.into_iter()
@@ -184,7 +188,7 @@ mod test {
fn new_empty_region_stat(region_id: RegionId, role: RegionRole) -> RegionStat {
RegionStat {
id: region_id.as_u64(),
id: region_id,
role,
rcus: 0,
wcus: 0,
@@ -253,6 +257,7 @@ mod test {
handler.handle(&req, ctx, acc).await.unwrap();
assert_region_lease(acc, vec![GrantedRegion::new(region_id, RegionRole::Leader)]);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
let acc = &mut HeartbeatAccumulator::default();
@@ -277,6 +282,7 @@ mod test {
acc,
vec![GrantedRegion::new(region_id, RegionRole::Follower)],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
let opening_region_id = RegionId::new(table_id, region_number + 2);
let _guard = opening_region_keeper
@@ -310,6 +316,7 @@ mod test {
GrantedRegion::new(opening_region_id, RegionRole::Follower),
],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([another_region_id]));
}
#[tokio::test]
@@ -385,6 +392,7 @@ mod test {
GrantedRegion::new(another_region_id, RegionRole::Leader),
],
);
assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
}
fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {

View File

@@ -94,6 +94,7 @@ mod tests {
use api::v1::meta::Peer;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use super::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::handler::node_stat::{RegionStat, Stat};
@@ -189,7 +190,7 @@ mod tests {
addr: "127.0.0.1:3001".to_string(),
region_num: 11,
region_stats: vec![RegionStat {
id: 111,
id: RegionId::from_u64(111),
rcus: 1,
wcus: 1,
approximate_bytes: 1,
@@ -206,7 +207,7 @@ mod tests {
addr: "127.0.0.1:3002".to_string(),
region_num: 12,
region_stats: vec![RegionStat {
id: 112,
id: RegionId::from_u64(112),
rcus: 1,
wcus: 1,
approximate_bytes: 1,
@@ -223,7 +224,7 @@ mod tests {
addr: "127.0.0.1:3003".to_string(),
region_num: 13,
region_stats: vec![RegionStat {
id: 113,
id: RegionId::from_u64(113),
rcus: 1,
wcus: 1,
approximate_bytes: 1,