refactor: inactive node manager (#2300)

refactor: use region_id instead of table&region_num in InactiveNodeManager
This commit is contained in:
JeremyHi
2023-09-01 15:10:53 +08:00
committed by Ruihang Xia
parent b234733c61
commit 4d2cae4174
6 changed files with 40 additions and 50 deletions

2
Cargo.lock generated
View File

@@ -4072,7 +4072,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ffc2bdfabea578b1d264a11b741df12395e89e87#ffc2bdfabea578b1d264a11b741df12395e89e87"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b585955dbbade25951d725251b66457fdd77143f#b585955dbbade25951d725251b66457fdd77143f"
dependencies = [
"prost",
"serde",

View File

@@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ffc2bdfabea578b1d264a11b741df12395e89e87" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b585955dbbade25951d725251b66457fdd77143f" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"

View File

@@ -56,6 +56,10 @@ impl Stat {
node_id: self.id,
}
}
pub fn region_ids(&self) -> Vec<u64> {
self.region_stats.iter().map(|s| s.id).collect()
}
}
impl TryFrom<HeartbeatRequest> for Stat {

View File

@@ -56,23 +56,12 @@ impl HeartbeatHandler for RegionLeaseHandler {
.push(region_id.region_number());
});
let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory);
for (table_id, region_numbers) in table_region_leases.iter_mut() {
// TODO(jeremy): refactor this, use region_id
inactive_node_manager
.retain_active_regions(stat.cluster_id, stat.id, *table_id, region_numbers)
.await?;
}
let mut region_ids = stat.region_ids();
let region_ids = table_region_leases
.into_iter()
.filter(|(_, region_nums)| !region_nums.is_empty())
.flat_map(|(table_id, region_nums)| {
region_nums
.into_iter()
.map(move |region_num| RegionId::new(table_id, region_num).as_u64())
})
.collect();
let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory);
inactive_node_manager
.retain_active_regions(stat.cluster_id, stat.id, &mut region_ids)
.await?;
acc.region_lease = Some(RegionLease {
region_ids,

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use common_meta::rpc::store::{BatchGetRequest, PutRequest};
use common_meta::RegionIdent;
use store_api::storage::RegionNumber;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::keys::InactiveNodeKey;
@@ -32,11 +32,15 @@ impl<'a> InactiveNodeManager<'a> {
}
pub async fn register_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> {
let region_id = RegionId::new(
region_ident.table_ident.table_id,
region_ident.region_number,
)
.as_u64();
let key = InactiveNodeKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
table_id: region_ident.table_ident.table_id,
region_number: region_ident.region_number,
region_id,
};
let req = PutRequest {
key: key.into(),
@@ -48,46 +52,45 @@ impl<'a> InactiveNodeManager<'a> {
}
pub async fn deregister_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> {
let region_id = RegionId::new(
region_ident.table_ident.table_id,
region_ident.region_number,
)
.as_u64();
let key: Vec<u8> = InactiveNodeKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
table_id: region_ident.table_ident.table_id,
region_number: region_ident.region_number,
region_id,
}
.into();
self.store.delete(&key, false).await?;
Ok(())
}
/// The input is a list of regions from a table on a specific node. If one or more
/// regions have been set to inactive state by metasrv, the corresponding regions
/// will be removed, then return the remaining regions.
/// The input is a list of regions on a specific node. If one or more regions have been
/// set to inactive state by metasrv, the corresponding regions will be removed, then
/// return the remaining regions.
pub async fn retain_active_regions(
&self,
cluster_id: u64,
node_id: u64,
table_id: u32,
region_numbers: &mut Vec<RegionNumber>,
region_ids: &mut Vec<u64>,
) -> Result<()> {
let key_region_numbers: Vec<(Vec<u8>, RegionNumber)> = region_numbers
let key_region_ids = region_ids
.iter()
.map(|region_number| {
.map(|region_id| {
(
InactiveNodeKey {
cluster_id,
node_id,
table_id,
region_number: *region_number,
region_id: *region_id,
}
.into(),
*region_number,
*region_id,
)
})
.collect();
let keys = key_region_numbers
.iter()
.map(|(key, _)| key.clone())
.collect();
.collect::<Vec<(Vec<u8>, _)>>();
let keys = key_region_ids.iter().map(|(key, _)| key.clone()).collect();
let resp = self.store.batch_get(BatchGetRequest { keys }).await?;
let kvs = resp.kvs;
if kvs.is_empty() {
@@ -95,12 +98,12 @@ impl<'a> InactiveNodeManager<'a> {
}
let inactive_keys = kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>();
let active_region_numbers = key_region_numbers
let active_region_ids = key_region_ids
.into_iter()
.filter(|(key, _)| !inactive_keys.contains(key))
.map(|(_, region_number)| region_number)
.map(|(_, region_id)| region_id)
.collect::<Vec<_>>();
*region_numbers = active_region_numbers;
*region_ids = active_region_ids;
Ok(())
}

View File

@@ -19,7 +19,6 @@ use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use crate::error;
use crate::error::Result;
@@ -242,19 +241,14 @@ impl TryFrom<Vec<u8>> for StatValue {
pub struct InactiveNodeKey {
pub cluster_id: u64,
pub node_id: u64,
pub table_id: u32,
pub region_number: RegionNumber,
pub region_id: u64,
}
impl From<InactiveNodeKey> for Vec<u8> {
fn from(value: InactiveNodeKey) -> Self {
format!(
"{}-{}-{}-{}-{}",
INACTIVE_NODE_PREFIX,
value.cluster_id,
value.node_id,
value.table_id,
value.region_number
"{}-{}-{}-{}",
INACTIVE_NODE_PREFIX, value.cluster_id, value.node_id, value.region_id
)
.into_bytes()
}