From 4d2cae4174eda9e756b547b231f236ea23757995 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Fri, 1 Sep 2023 15:10:53 +0800 Subject: [PATCH] refactor: inactive node manager (#2300) refactor: use region_id instead of table®ion_num in InactiveNodeManager --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/meta-srv/src/handler/node_stat.rs | 4 ++ .../src/handler/region_lease_handler.rs | 21 ++------ src/meta-srv/src/inactive_node_manager.rs | 49 ++++++++++--------- src/meta-srv/src/keys.rs | 12 ++--- 6 files changed, 40 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49a7676dd0..5248a61d4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4072,7 +4072,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ffc2bdfabea578b1d264a11b741df12395e89e87#ffc2bdfabea578b1d264a11b741df12395e89e87" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b585955dbbade25951d725251b66457fdd77143f#b585955dbbade25951d725251b66457fdd77143f" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 07e3fb6335..c09abdc771 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ffc2bdfabea578b1d264a11b741df12395e89e87" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b585955dbbade25951d725251b66457fdd77143f" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 16db9329de..82b68eadaf 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -56,6 +56,10 @@ impl Stat { node_id: self.id, } } + + pub fn region_ids(&self) -> Vec { + self.region_stats.iter().map(|s| s.id).collect() + } } impl TryFrom for Stat { diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 1177c60416..fe421f38f5 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -56,23 +56,12 @@ impl HeartbeatHandler for RegionLeaseHandler { .push(region_id.region_number()); }); - let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory); - for (table_id, region_numbers) in table_region_leases.iter_mut() { - // TODO(jeremy): refactor this, use region_id - inactive_node_manager - .retain_active_regions(stat.cluster_id, stat.id, *table_id, region_numbers) - .await?; - } + let mut region_ids = stat.region_ids(); - let region_ids = table_region_leases - .into_iter() - .filter(|(_, region_nums)| !region_nums.is_empty()) - .flat_map(|(table_id, region_nums)| { - region_nums - .into_iter() - .map(move |region_num| RegionId::new(table_id, region_num).as_u64()) - }) - .collect(); + let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory); + inactive_node_manager + .retain_active_regions(stat.cluster_id, stat.id, &mut region_ids) + .await?; acc.region_lease = Some(RegionLease { region_ids, diff --git a/src/meta-srv/src/inactive_node_manager.rs b/src/meta-srv/src/inactive_node_manager.rs index 6462a8a89c..a0e0a9a38a 100644 --- a/src/meta-srv/src/inactive_node_manager.rs +++ b/src/meta-srv/src/inactive_node_manager.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use common_meta::rpc::store::{BatchGetRequest, PutRequest}; use common_meta::RegionIdent; -use store_api::storage::RegionNumber; +use store_api::storage::RegionId; use crate::error::Result; use crate::keys::InactiveNodeKey; @@ -32,11 +32,15 @@ impl<'a> InactiveNodeManager<'a> { } pub async fn register_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> { + let region_id = RegionId::new( + region_ident.table_ident.table_id, + region_ident.region_number, + ) + .as_u64(); let key = InactiveNodeKey { cluster_id: region_ident.cluster_id, node_id: region_ident.datanode_id, - table_id: region_ident.table_ident.table_id, - region_number: region_ident.region_number, + region_id, }; let req = PutRequest { key: key.into(), @@ -48,46 +52,45 @@ impl<'a> InactiveNodeManager<'a> { } pub async fn deregister_inactive_region(&self, region_ident: &RegionIdent) -> Result<()> { + let region_id = RegionId::new( + region_ident.table_ident.table_id, + region_ident.region_number, + ) + .as_u64(); let key: Vec = InactiveNodeKey { cluster_id: region_ident.cluster_id, node_id: region_ident.datanode_id, - table_id: region_ident.table_ident.table_id, - region_number: region_ident.region_number, + region_id, } .into(); self.store.delete(&key, false).await?; Ok(()) } - /// The input is a list of regions from a table on a specific node. If one or more - /// regions have been set to inactive state by metasrv, the corresponding regions - /// will be removed, then return the remaining regions. + /// The input is a list of regions on a specific node. If one or more regions have been + /// set to inactive state by metasrv, the corresponding regions will be removed, then + /// return the remaining regions. pub async fn retain_active_regions( &self, cluster_id: u64, node_id: u64, - table_id: u32, - region_numbers: &mut Vec, + region_ids: &mut Vec, ) -> Result<()> { - let key_region_numbers: Vec<(Vec, RegionNumber)> = region_numbers + let key_region_ids = region_ids .iter() - .map(|region_number| { + .map(|region_id| { ( InactiveNodeKey { cluster_id, node_id, - table_id, - region_number: *region_number, + region_id: *region_id, } .into(), - *region_number, + *region_id, ) }) - .collect(); - let keys = key_region_numbers - .iter() - .map(|(key, _)| key.clone()) - .collect(); + .collect::, _)>>(); + let keys = key_region_ids.iter().map(|(key, _)| key.clone()).collect(); let resp = self.store.batch_get(BatchGetRequest { keys }).await?; let kvs = resp.kvs; if kvs.is_empty() { @@ -95,12 +98,12 @@ impl<'a> InactiveNodeManager<'a> { } let inactive_keys = kvs.into_iter().map(|kv| kv.key).collect::>(); - let active_region_numbers = key_region_numbers + let active_region_ids = key_region_ids .into_iter() .filter(|(key, _)| !inactive_keys.contains(key)) - .map(|(_, region_number)| region_number) + .map(|(_, region_id)| region_id) .collect::>(); - *region_numbers = active_region_numbers; + *region_ids = active_region_ids; Ok(()) } diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index d01b704b9c..bcfc498de1 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -19,7 +19,6 @@ use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::RegionNumber; use crate::error; use crate::error::Result; @@ -242,19 +241,14 @@ impl TryFrom> for StatValue { pub struct InactiveNodeKey { pub cluster_id: u64, pub node_id: u64, - pub table_id: u32, - pub region_number: RegionNumber, + pub region_id: u64, } impl From for Vec { fn from(value: InactiveNodeKey) -> Self { format!( - "{}-{}-{}-{}-{}", - INACTIVE_NODE_PREFIX, - value.cluster_id, - value.node_id, - value.table_id, - value.region_number + "{}-{}-{}-{}", + INACTIVE_NODE_PREFIX, value.cluster_id, value.node_id, value.region_id ) .into_bytes() }