diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index 2b2b84fff8..040a4d8169 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -30,6 +30,8 @@ pub const REGION_LEASE_SECS: u64 = /// If the node's lease has expired, the `Selector` will not select it. pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS; +pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS; + /// The lease seconds of metasrv leader. pub const META_LEASE_SECS: u64 = 3; diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 8f03784863..ebad9e3e6b 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -37,7 +37,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error; use crate::error::{match_for_io_error, Result}; -use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX}; +use crate::key::{DatanodeStatKey, DatanodeStatValue}; use crate::metasrv::ElectionRef; pub type MetaPeerClientRef = Arc; @@ -154,13 +154,6 @@ impl KvBackend for MetaPeerClient { .fail() } - async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result { - error::UnsupportedSnafu { - operation: "compare and put".to_string(), - } - .fail() - } - async fn delete_range(&self, _req: DeleteRangeRequest) -> Result { error::UnsupportedSnafu { operation: "delete range".to_string(), @@ -175,6 +168,13 @@ impl KvBackend for MetaPeerClient { .fail() } + async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result { + error::UnsupportedSnafu { + operation: "compare and put".to_string(), + } + .fail() + } + async fn put_conditionally( &self, _key: Vec, @@ -197,7 +197,7 @@ impl KvBackend for MetaPeerClient { impl MetaPeerClient { async fn get_dn_key_value(&self, keys_only: bool) -> Result> { - let key = format!("{DN_STAT_PREFIX}-").into_bytes(); + let key = DatanodeStatKey::prefix_key(); let range_end = util::get_prefix_end_key(&key); let range_request = RangeRequest { key, @@ -209,7 +209,7 @@ impl MetaPeerClient { } // Get all datanode stat kvs from leader meta. - pub async fn get_all_dn_stat_kvs(&self) -> Result> { + pub async fn get_all_dn_stat_kvs(&self) -> Result> { let kvs = self.get_dn_key_value(false).await?; to_stat_kv_map(kvs) } @@ -218,12 +218,15 @@ impl MetaPeerClient { let kvs = self.get_dn_key_value(true).await?; kvs.into_iter() .map(|kv| kv.key.try_into()) - .collect::>>() + .collect::>>() .map(|hash_set| hash_set.len() as i32) } // Get datanode stat kvs from leader meta by input keys. - pub async fn get_dn_stat_kvs(&self, keys: Vec) -> Result> { + pub async fn get_dn_stat_kvs( + &self, + keys: Vec, + ) -> Result> { let stat_keys = keys.into_iter().map(|key| key.into()).collect(); let batch_get_req = BatchGetRequest { keys: stat_keys }; @@ -313,7 +316,7 @@ impl MetaPeerClient { } } -fn to_stat_kv_map(kvs: Vec) -> Result> { +fn to_stat_kv_map(kvs: Vec) -> Result> { let mut map = HashMap::with_capacity(kvs.len()); for kv in kvs { let _ = map.insert(kv.key.try_into()?, kv.value.try_into()?); @@ -358,11 +361,11 @@ mod tests { use super::{check_resp_header, to_stat_kv_map, Context}; use crate::error; use crate::handler::node_stat::Stat; - use crate::keys::{StatKey, StatValue}; + use crate::key::{DatanodeStatKey, DatanodeStatValue}; #[test] fn test_to_stat_kv_map() { - let stat_key = StatKey { + let stat_key = DatanodeStatKey { cluster_id: 0, node_id: 100, }; @@ -373,7 +376,7 @@ mod tests { addr: "127.0.0.1:3001".to_string(), ..Default::default() }; - let stat_val = StatValue { stats: vec![stat] }.try_into().unwrap(); + let stat_val = DatanodeStatValue { stats: vec![stat] }.try_into().unwrap(); let kv = KeyValue { key: stat_key.into(), diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 4490d84044..00946fefba 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -274,7 +274,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid datanode lease key: {}", key))] + #[snafu(display("Invalid lease key: {}", key))] InvalidLeaseKey { key: String, #[snafu(implicit)] @@ -295,7 +295,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse datanode lease key from utf8"))] + #[snafu(display("Failed to parse lease key from utf8"))] LeaseKeyFromUtf8 { #[snafu(source)] error: std::string::FromUtf8Error, @@ -303,7 +303,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse datanode lease value from utf8"))] + #[snafu(display("Failed to parse lease value from utf8"))] LeaseValueFromUtf8 { #[snafu(source)] error: std::string::FromUtf8Error, @@ -311,7 +311,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse datanode stat key from utf8"))] + #[snafu(display("Failed to parse stat key from utf8"))] StatKeyFromUtf8 { #[snafu(source)] error: std::string::FromUtf8Error, @@ -319,7 +319,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse datanode stat value from utf8"))] + #[snafu(display("Failed to parse stat value from utf8"))] StatValueFromUtf8 { #[snafu(source)] error: std::string::FromUtf8Error, diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index f42e84acd4..0d9e04d5da 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -55,7 +55,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler { start_time_ms: info.start_time_ms, }; - save_to_mem_store(key, value, ctx).await?; + put_into_memory_store(ctx, key, value).await?; Ok(HandleControl::Continue) } @@ -88,7 +88,7 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler { start_time_ms: info.start_time_ms, }; - save_to_mem_store(key, value, ctx).await?; + put_into_memory_store(ctx, key, value).await?; Ok(HandleControl::Continue) } @@ -138,7 +138,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { start_time_ms: info.start_time_ms, }; - save_to_mem_store(key, value, ctx).await?; + put_into_memory_store(ctx, key, value).await?; Ok(HandleControl::Continue) } @@ -176,7 +176,7 @@ fn extract_base_info( )) } -async fn save_to_mem_store(key: NodeInfoKey, value: NodeInfo, ctx: &mut Context) -> Result<()> { +async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> { let key = key.into(); let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?; let put_req = PutRequest { diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs index 82f2aad206..76669cd76b 100644 --- a/src/meta-srv/src/handler/keep_lease_handler.rs +++ b/src/meta-srv/src/handler/keep_lease_handler.rs @@ -12,21 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{HeartbeatRequest, Role}; +use api::v1::meta::{HeartbeatRequest, Peer, Role}; use common_meta::rpc::store::PutRequest; use common_telemetry::{trace, warn}; use common_time::util as time_util; use crate::error::Result; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; -use crate::keys::{LeaseKey, LeaseValue}; +use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue}; use crate::metasrv::Context; /// Keeps [Datanode] leases -pub struct KeepLeaseHandler; +pub struct DatanodeKeepLeaseHandler; #[async_trait::async_trait] -impl HeartbeatHandler for KeepLeaseHandler { +impl HeartbeatHandler for DatanodeKeepLeaseHandler { fn is_acceptable(&self, role: Role) -> bool { role == Role::Datanode } @@ -45,7 +45,7 @@ impl HeartbeatHandler for KeepLeaseHandler { return Ok(HandleControl::Continue); }; - let key = LeaseKey { + let key = DatanodeLeaseKey { cluster_id: header.cluster_id, node_id: peer.id, }; @@ -54,22 +54,68 @@ impl HeartbeatHandler for KeepLeaseHandler { node_addr: peer.addr.clone(), }; - trace!("Receive a heartbeat: {key:?}, {value:?}"); + trace!("Receive a heartbeat from datanode: {key:?}, {value:?}"); let key = key.try_into()?; let value = value.try_into()?; - let put_req = PutRequest { - key, - value, - ..Default::default() - }; - - let res = ctx.in_memory.put(put_req).await; - - if let Err(err) = res { - warn!(err; "Failed to update lease KV, peer: {peer:?}"); - } + put_into_memory_store(ctx, key, value, peer).await; Ok(HandleControl::Continue) } } + +/// Keeps [Flownode] leases +pub struct FlownodeKeepLeaseHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for FlownodeKeepLeaseHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Flownode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + ctx: &mut Context, + _acc: &mut HeartbeatAccumulator, + ) -> Result { + let HeartbeatRequest { header, peer, .. } = req; + let Some(header) = &header else { + return Ok(HandleControl::Continue); + }; + let Some(peer) = &peer else { + return Ok(HandleControl::Continue); + }; + + let key = FlownodeLeaseKey { + cluster_id: header.cluster_id, + node_id: peer.id, + }; + let value = LeaseValue { + timestamp_millis: time_util::current_time_millis(), + node_addr: peer.addr.clone(), + }; + + trace!("Receive a heartbeat from flownode: {key:?}, {value:?}"); + + let key = key.try_into()?; + let value = value.try_into()?; + put_into_memory_store(ctx, key, value, peer).await; + + Ok(HandleControl::Continue) + } +} + +async fn put_into_memory_store(ctx: &mut Context, key: Vec, value: Vec, peer: &Peer) { + let put_req = PutRequest { + key, + value, + ..Default::default() + }; + + let res = ctx.in_memory.put(put_req).await; + + if let Err(err) = res { + warn!(err; "Failed to update lease KV, peer: {peer:?}"); + } +} diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index e8f3f4d67f..b7fe55a0f4 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -22,7 +22,7 @@ use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use crate::error::{Error, InvalidHeartbeatRequestSnafu}; -use crate::keys::StatKey; +use crate::key::DatanodeStatKey; #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Stat { @@ -65,8 +65,8 @@ impl Stat { self.region_stats.is_empty() } - pub fn stat_key(&self) -> StatKey { - StatKey { + pub fn stat_key(&self) -> DatanodeStatKey { + DatanodeStatKey { cluster_id: self.cluster_id, node_id: self.id, } diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 41b9b39228..faa16c6741 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -23,7 +23,7 @@ use snafu::ResultExt; use crate::error::{self, Result}; use crate::handler::node_stat::Stat; use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; -use crate::keys::{StatKey, StatValue}; +use crate::key::{DatanodeStatKey, DatanodeStatValue}; use crate::metasrv::Context; const MAX_CACHED_STATS_PER_KEY: usize = 10; @@ -68,7 +68,7 @@ impl EpochStats { #[derive(Default)] pub struct PersistStatsHandler { - stats_cache: DashMap, + stats_cache: DashMap, } #[async_trait::async_trait] @@ -121,7 +121,7 @@ impl HeartbeatHandler for PersistStatsHandler { return Ok(HandleControl::Continue); } - let value: Vec = StatValue { + let value: Vec = DatanodeStatValue { stats: epoch_stats.drain_all(), } .try_into()?; @@ -152,7 +152,7 @@ mod tests { use super::*; use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pushers}; - use crate::keys::StatKey; + use crate::key::DatanodeStatKey; use crate::service::store::cached_kv::LeaderCachedKvBackend; #[tokio::test] @@ -186,17 +186,17 @@ mod tests { let handler = PersistStatsHandler::default(); handle_request_many_times(ctx.clone(), &handler, 1).await; - let key = StatKey { + let key = DatanodeStatKey { cluster_id: 3, node_id: 101, }; let key: Vec = key.into(); let res = ctx.in_memory.get(&key).await.unwrap(); let kv = res.unwrap(); - let key: StatKey = kv.key.clone().try_into().unwrap(); + let key: DatanodeStatKey = kv.key.clone().try_into().unwrap(); assert_eq!(3, key.cluster_id); assert_eq!(101, key.node_id); - let val: StatValue = kv.value.try_into().unwrap(); + let val: DatanodeStatValue = kv.value.try_into().unwrap(); // first new stat must be set in kv store immediately assert_eq!(1, val.stats.len()); assert_eq!(1, val.stats[0].region_num); @@ -206,7 +206,7 @@ mod tests { let key: Vec = key.into(); let res = ctx.in_memory.get(&key).await.unwrap(); let kv = res.unwrap(); - let val: StatValue = kv.value.try_into().unwrap(); + let val: DatanodeStatValue = kv.value.try_into().unwrap(); // refresh every 10 stats assert_eq!(10, val.stats.len()); } diff --git a/src/meta-srv/src/key.rs b/src/meta-srv/src/key.rs new file mode 100644 index 0000000000..243efe69c8 --- /dev/null +++ b/src/meta-srv/src/key.rs @@ -0,0 +1,138 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod datanode; +mod flownode; + +use std::str::FromStr; + +pub use datanode::*; +pub use flownode::*; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error; + +macro_rules! impl_from_str_lease_key { + ($key_type:ty, $pattern:expr) => { + impl FromStr for $key_type { + type Err = error::Error; + + fn from_str(key: &str) -> error::Result { + let caps = $pattern + .captures(key) + .context(error::InvalidLeaseKeySnafu { key })?; + + ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key }); + + let cluster_id = caps[1].to_string(); + let node_id = caps[2].to_string(); + let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu { + err_msg: format!("invalid cluster_id: {cluster_id}"), + })?; + let node_id: u64 = node_id.parse().context(error::ParseNumSnafu { + err_msg: format!("invalid node_id: {node_id}"), + })?; + + Ok(Self { + cluster_id, + node_id, + }) + } + } + }; +} + +impl_from_str_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_KEY_PATTERN); +impl_from_str_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_KEY_PATTERN); + +macro_rules! impl_try_from_lease_key { + ($key_type:ty, $prefix:expr) => { + impl TryFrom> for $key_type { + type Error = error::Error; + + fn try_from(bytes: Vec) -> error::Result { + String::from_utf8(bytes) + .context(error::LeaseKeyFromUtf8Snafu {}) + .map(|x| x.parse())? + } + } + + impl TryFrom<$key_type> for Vec { + type Error = error::Error; + + fn try_from(key: $key_type) -> error::Result { + Ok(format!("{}-{}-{}", $prefix, key.cluster_id, key.node_id).into_bytes()) + } + } + }; +} + +impl_try_from_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_PREFIX); +impl_try_from_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_PREFIX); + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct LeaseValue { + // last activity + pub timestamp_millis: i64, + pub node_addr: String, +} + +impl FromStr for LeaseValue { + type Err = error::Error; + + fn from_str(value: &str) -> crate::Result { + serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value }) + } +} + +impl TryFrom> for LeaseValue { + type Error = error::Error; + + fn try_from(bytes: Vec) -> crate::Result { + String::from_utf8(bytes) + .context(error::LeaseValueFromUtf8Snafu {}) + .map(|x| x.parse())? + } +} + +impl TryFrom for Vec { + type Error = error::Error; + + fn try_from(value: LeaseValue) -> crate::Result { + Ok(serde_json::to_string(&value) + .context(error::SerializeToJsonSnafu { + input: format!("{value:?}"), + })? + .into_bytes()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lease_value_round_trip() { + let value = LeaseValue { + timestamp_millis: 111, + node_addr: "127.0.0.1:3002".to_string(), + }; + + let value_bytes: Vec = value.clone().try_into().unwrap(); + let new_value: LeaseValue = value_bytes.try_into().unwrap(); + + assert_eq!(new_value, value); + } +} diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/key/datanode.rs similarity index 67% rename from src/meta-srv/src/keys.rs rename to src/meta-srv/src/key/datanode.rs index 250f736c45..4fe55685b2 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/key/datanode.rs @@ -24,16 +24,16 @@ use crate::error; use crate::error::Result; use crate::handler::node_stat::Stat; -pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; -pub(crate) const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region"; +pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease"; +const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region"; -pub const DN_STAT_PREFIX: &str = "__meta_dnstat"; +const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat"; lazy_static! { - static ref DATANODE_LEASE_KEY_PATTERN: Regex = - Regex::new(&format!("^{DN_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); + pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex = + Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); static ref DATANODE_STAT_KEY_PATTERN: Regex = - Regex::new(&format!("^{DN_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); + Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!( "^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$" )) @@ -41,118 +41,49 @@ lazy_static! { } #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] -pub struct LeaseKey { +pub struct DatanodeLeaseKey { pub cluster_id: ClusterId, pub node_id: u64, } -impl FromStr for LeaseKey { - type Err = error::Error; - - fn from_str(key: &str) -> Result { - let caps = DATANODE_LEASE_KEY_PATTERN - .captures(key) - .context(error::InvalidLeaseKeySnafu { key })?; - - ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key }); - - let cluster_id = caps[1].to_string(); - let node_id = caps[2].to_string(); - let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu { - err_msg: format!("invalid cluster_id: {cluster_id}"), - })?; - let node_id: u64 = node_id.parse().context(error::ParseNumSnafu { - err_msg: format!("invalid node_id: {node_id}"), - })?; - - Ok(Self { - cluster_id, - node_id, - }) - } -} - -impl TryFrom> for LeaseKey { - type Error = error::Error; - - fn try_from(bytes: Vec) -> Result { - String::from_utf8(bytes) - .context(error::LeaseKeyFromUtf8Snafu {}) - .map(|x| x.parse())? - } -} - -impl TryFrom for Vec { - type Error = error::Error; - - fn try_from(dn_key: LeaseKey) -> Result { - Ok(format!( - "{}-{}-{}", - DN_LEASE_PREFIX, dn_key.cluster_id, dn_key.node_id - ) - .into_bytes()) - } -} - -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct LeaseValue { - // last activity - pub timestamp_millis: i64, - pub node_addr: String, -} - -impl FromStr for LeaseValue { - type Err = error::Error; - - fn from_str(value: &str) -> Result { - serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value }) - } -} - -impl TryFrom> for LeaseValue { - type Error = error::Error; - - fn try_from(bytes: Vec) -> Result { - String::from_utf8(bytes) - .context(error::LeaseValueFromUtf8Snafu {}) - .map(|x| x.parse())? - } -} - -impl TryFrom for Vec { - type Error = error::Error; - - fn try_from(dn_value: LeaseValue) -> Result { - Ok(serde_json::to_string(&dn_value) - .context(error::SerializeToJsonSnafu { - input: format!("{dn_value:?}"), - })? - .into_bytes()) +impl DatanodeLeaseKey { + pub fn prefix_key_by_cluster(cluster_id: ClusterId) -> Vec { + format!("{DATANODE_LEASE_PREFIX}-{cluster_id}-").into_bytes() } } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -pub struct StatKey { +pub struct DatanodeStatKey { pub cluster_id: ClusterId, pub node_id: u64, } -impl From<&LeaseKey> for StatKey { - fn from(lease_key: &LeaseKey) -> Self { - StatKey { +impl DatanodeStatKey { + pub fn prefix_key() -> Vec { + format!("{DATANODE_STAT_PREFIX}-").into_bytes() + } +} + +impl From<&DatanodeLeaseKey> for DatanodeStatKey { + fn from(lease_key: &DatanodeLeaseKey) -> Self { + DatanodeStatKey { cluster_id: lease_key.cluster_id, node_id: lease_key.node_id, } } } -impl From for Vec { - fn from(value: StatKey) -> Self { - format!("{}-{}-{}", DN_STAT_PREFIX, value.cluster_id, value.node_id).into_bytes() +impl From for Vec { + fn from(value: DatanodeStatKey) -> Self { + format!( + "{}-{}-{}", + DATANODE_STAT_PREFIX, value.cluster_id, value.node_id + ) + .into_bytes() } } -impl FromStr for StatKey { +impl FromStr for DatanodeStatKey { type Err = error::Error; fn from_str(key: &str) -> Result { @@ -178,7 +109,7 @@ impl FromStr for StatKey { } } -impl TryFrom> for StatKey { +impl TryFrom> for DatanodeStatKey { type Error = error::Error; fn try_from(bytes: Vec) -> Result { @@ -190,11 +121,11 @@ impl TryFrom> for StatKey { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(transparent)] -pub struct StatValue { +pub struct DatanodeStatValue { pub stats: Vec, } -impl StatValue { +impl DatanodeStatValue { /// Get the latest number of regions. pub fn region_num(&self) -> Option { self.stats.last().map(|x| x.region_num) @@ -206,10 +137,10 @@ impl StatValue { } } -impl TryFrom for Vec { +impl TryFrom for Vec { type Error = error::Error; - fn try_from(stats: StatValue) -> Result { + fn try_from(stats: DatanodeStatValue) -> Result { Ok(serde_json::to_string(&stats) .context(error::SerializeToJsonSnafu { input: format!("{stats:?}"), @@ -218,7 +149,7 @@ impl TryFrom for Vec { } } -impl FromStr for StatValue { +impl FromStr for DatanodeStatValue { type Err = error::Error; fn from_str(value: &str) -> Result { @@ -226,7 +157,7 @@ impl FromStr for StatValue { } } -impl TryFrom> for StatValue { +impl TryFrom> for DatanodeStatValue { type Error = error::Error; fn try_from(value: Vec) -> Result { @@ -310,13 +241,13 @@ mod tests { #[test] fn test_stat_key_round_trip() { - let key = StatKey { + let key = DatanodeStatKey { cluster_id: 0, node_id: 1, }; let key_bytes: Vec = key.into(); - let new_key: StatKey = key_bytes.try_into().unwrap(); + let new_key: DatanodeStatKey = key_bytes.try_into().unwrap(); assert_eq!(0, new_key.cluster_id); assert_eq!(1, new_key.node_id); @@ -331,10 +262,10 @@ mod tests { ..Default::default() }; - let stat_val = StatValue { stats: vec![stat] }; + let stat_val = DatanodeStatValue { stats: vec![stat] }; let bytes: Vec = stat_val.try_into().unwrap(); - let stat_val: StatValue = bytes.try_into().unwrap(); + let stat_val: DatanodeStatValue = bytes.try_into().unwrap(); let stats = stat_val.stats; assert_eq!(1, stats.len()); @@ -347,37 +278,24 @@ mod tests { #[test] fn test_lease_key_round_trip() { - let key = LeaseKey { + let key = DatanodeLeaseKey { cluster_id: 0, node_id: 1, }; let key_bytes: Vec = key.clone().try_into().unwrap(); - let new_key: LeaseKey = key_bytes.try_into().unwrap(); + let new_key: DatanodeLeaseKey = key_bytes.try_into().unwrap(); assert_eq!(new_key, key); } - #[test] - fn test_lease_value_round_trip() { - let value = LeaseValue { - timestamp_millis: 111, - node_addr: "127.0.0.1:3002".to_string(), - }; - - let value_bytes: Vec = value.clone().try_into().unwrap(); - let new_value: LeaseValue = value_bytes.try_into().unwrap(); - - assert_eq!(new_value, value); - } - #[test] fn test_get_addr_from_stat_val() { - let empty = StatValue { stats: vec![] }; + let empty = DatanodeStatValue { stats: vec![] }; let addr = empty.node_addr(); assert!(addr.is_none()); - let stat_val = StatValue { + let stat_val = DatanodeStatValue { stats: vec![ Stat { addr: "1".to_string(), @@ -399,11 +317,11 @@ mod tests { #[test] fn test_get_region_num_from_stat_val() { - let empty = StatValue { stats: vec![] }; + let empty = DatanodeStatValue { stats: vec![] }; let region_num = empty.region_num(); assert!(region_num.is_none()); - let wrong = StatValue { + let wrong = DatanodeStatValue { stats: vec![Stat { region_num: 0, ..Default::default() @@ -412,7 +330,7 @@ mod tests { let right = wrong.region_num(); assert_eq!(Some(0), right); - let stat_val = StatValue { + let stat_val = DatanodeStatValue { stats: vec![ Stat { region_num: 1, @@ -434,12 +352,12 @@ mod tests { #[test] fn test_lease_key_to_stat_key() { - let lease_key = LeaseKey { + let lease_key = DatanodeLeaseKey { cluster_id: 1, node_id: 101, }; - let stat_key: StatKey = (&lease_key).into(); + let stat_key: DatanodeStatKey = (&lease_key).into(); assert_eq!(1, stat_key.cluster_id); assert_eq!(101, stat_key.node_id); diff --git a/src/meta-srv/src/key/flownode.rs b/src/meta-srv/src/key/flownode.rs new file mode 100644 index 0000000000..acb36cbf75 --- /dev/null +++ b/src/meta-srv/src/key/flownode.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::ClusterId; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; + +pub(crate) const FLOWNODE_LEASE_PREFIX: &str = "__meta_flownode_lease"; + +lazy_static! { + pub(crate) static ref FLOWNODE_LEASE_KEY_PATTERN: Regex = + Regex::new(&format!("^{FLOWNODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap(); +} + +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +pub struct FlownodeLeaseKey { + pub cluster_id: ClusterId, + pub node_id: u64, +} + +impl FlownodeLeaseKey { + pub fn prefix_key_by_cluster(cluster_id: ClusterId) -> Vec { + format!("{FLOWNODE_LEASE_PREFIX}-{cluster_id}-").into_bytes() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lease_key_round_trip() { + let key = FlownodeLeaseKey { + cluster_id: 0, + node_id: 1, + }; + + let key_bytes: Vec = key.clone().try_into().unwrap(); + let new_key: FlownodeLeaseKey = key_bytes.try_into().unwrap(); + + assert_eq!(new_key, key); + } +} diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 0a091e4a68..70ebb10c90 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::hash::Hash; use common_meta::kv_backend::KvBackend; use common_meta::peer::Peer; @@ -20,12 +21,13 @@ use common_meta::{util, ClusterId}; use common_time::util as time_util; use crate::cluster::MetaPeerClientRef; -use crate::error::Result; -use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX}; +use crate::error::{Error, Result}; +use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue}; -fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseKey, &LeaseValue) -> bool { - move |_: &LeaseKey, v: &LeaseValue| { - ((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000 +fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool { + move |v: &LeaseValue| { + ((time_util::current_time_millis() - v.timestamp_millis) as u64) + < lease_secs.checked_mul(1000).unwrap_or(u64::MAX) } } @@ -36,7 +38,7 @@ pub async fn lookup_alive_datanode_peer( lease_secs: u64, ) -> Result> { let lease_filter = build_lease_filter(lease_secs); - let lease_key = LeaseKey { + let lease_key = DatanodeLeaseKey { cluster_id, node_id: datanode_id, }; @@ -45,7 +47,7 @@ pub async fn lookup_alive_datanode_peer( return Ok(None); }; let lease_value: LeaseValue = kv.value.try_into()?; - if lease_filter(&lease_key, &lease_value) { + if lease_filter(&lease_value) { Ok(Some(Peer { id: lease_key.node_id, addr: lease_value.node_addr, @@ -59,23 +61,40 @@ pub async fn alive_datanodes( cluster_id: ClusterId, meta_peer_client: &MetaPeerClientRef, lease_secs: u64, -) -> Result> { - let lease_filter = build_lease_filter(lease_secs); - - filter_datanodes(cluster_id, meta_peer_client, lease_filter).await +) -> Result> { + let predicate = build_lease_filter(lease_secs); + filter( + DatanodeLeaseKey::prefix_key_by_cluster(cluster_id), + meta_peer_client, + |v| predicate(v), + ) + .await } -pub async fn filter_datanodes

( +pub async fn alive_flownodes( cluster_id: ClusterId, meta_peer_client: &MetaPeerClientRef, - predicate: P, -) -> Result> -where - P: Fn(&LeaseKey, &LeaseValue) -> bool, -{ - let key = get_lease_prefix(cluster_id); - let range_end = util::get_prefix_end_key(&key); + lease_secs: u64, +) -> Result> { + let predicate = build_lease_filter(lease_secs); + filter( + FlownodeLeaseKey::prefix_key_by_cluster(cluster_id), + meta_peer_client, + |v| predicate(v), + ) + .await +} +pub async fn filter( + key: Vec, + meta_peer_client: &MetaPeerClientRef, + predicate: P, +) -> Result> +where + P: Fn(&LeaseValue) -> bool, + K: Eq + Hash + TryFrom, Error = Error>, +{ + let range_end = util::get_prefix_end_key(&key); let range_req = common_meta::rpc::store::RangeRequest { key, range_end, @@ -85,9 +104,9 @@ where let kvs = meta_peer_client.range(range_req).await?.kvs; let mut lease_kvs = HashMap::new(); for kv in kvs { - let lease_key: LeaseKey = kv.key.try_into()?; + let lease_key: K = kv.key.try_into()?; let lease_value: LeaseValue = kv.value.try_into()?; - if !predicate(&lease_key, &lease_value) { + if !predicate(&lease_value) { continue; } let _ = lease_kvs.insert(lease_key, lease_value); @@ -95,8 +114,3 @@ where Ok(lease_kvs) } - -#[inline] -pub fn get_lease_prefix(cluster_id: u64) -> Vec { - format!("{DN_LEASE_PREFIX}-{cluster_id}").into_bytes() -} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 6515ade81e..2994b12403 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -25,7 +25,7 @@ pub mod election; pub mod error; mod failure_detector; pub mod handler; -pub mod keys; +pub mod key; pub mod lease; pub mod lock; pub mod metasrv; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 9e7032b6a8..853ac3e504 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -241,10 +241,17 @@ impl From for api::v1::meta::MetasrvNodeInfo { } } +#[derive(Clone, Copy)] +pub enum SelectTarget { + Datanode, + Flownode, +} + #[derive(Clone)] pub struct SelectorContext { pub server_addr: String, pub datanode_lease_secs: u64, + pub flownode_lease_secs: u64, pub kv_backend: KvBackendRef, pub meta_peer_client: MetaPeerClientRef, pub table_id: Option, @@ -314,7 +321,10 @@ pub struct Metasrv { kv_backend: KvBackendRef, leader_cached_kv_backend: Arc, meta_peer_client: MetaPeerClientRef, + // The selector is used to select a target datanode. selector: SelectorRef, + // The flow selector is used to select a target flownode. + flow_selector: SelectorRef, handler_group: HeartbeatHandlerGroup, election: Option, lock: DistLockRef, @@ -503,6 +513,10 @@ impl Metasrv { &self.selector } + pub fn flow_selector(&self) -> &SelectorRef { + &self.flow_selector + } + pub fn handler_group(&self) -> &HeartbeatHandlerGroup { &self.handler_group } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ddf8908773..6f516751d5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -38,7 +38,7 @@ use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; -use super::FLOW_ID_SEQ; +use super::{SelectTarget, FLOW_ID_SEQ}; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::error::{self, Result}; @@ -50,7 +50,7 @@ use crate::handler::collect_cluster_info_handler::{ use crate::handler::collect_stats_handler::CollectStatsHandler; use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler; -use crate::handler::keep_lease_handler::KeepLeaseHandler; +use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler}; use crate::handler::mailbox_handler::MailboxHandler; use crate::handler::on_leader_start_handler::OnLeaderStartHandler; use crate::handler::persist_stats_handler::PersistStatsHandler; @@ -68,6 +68,7 @@ use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; use crate::pubsub::PublisherRef; use crate::selector::lease_based::LeaseBasedSelector; +use crate::selector::round_robin::RoundRobinSelector; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::LeaderCachedKvBackend; use crate::state::State; @@ -212,6 +213,7 @@ impl MetasrvBuilder { let selector_ctx = SelectorContext { server_addr: options.server_addr.clone(), datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS, + flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS, kv_backend: kv_backend.clone(), meta_peer_client: meta_peer_client.clone(), table_id: None, @@ -334,7 +336,8 @@ impl MetasrvBuilder { // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, // because even if the current meta-server node is no longer the leader it can // still help the datanode to keep lease. - group.add_handler(KeepLeaseHandler).await; + group.add_handler(DatanodeKeepLeaseHandler).await; + group.add_handler(FlownodeKeepLeaseHandler).await; group.add_handler(CheckLeaderHandler).await; group.add_handler(OnLeaderStartHandler).await; group.add_handler(CollectStatsHandler).await; @@ -367,6 +370,8 @@ impl MetasrvBuilder { leader_cached_kv_backend, meta_peer_client: meta_peer_client.clone(), selector, + // TODO(jeremy): We do not allow configuring the flow selector. + flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)), handler_group, election, lock, diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 9ee017ad15..8e0e1424a6 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -574,6 +574,7 @@ mod tests { }); let selector_ctx = SelectorContext { datanode_lease_secs: 10, + flownode_lease_secs: 10, server_addr: "127.0.0.1:3002".to_string(), kv_backend: kv_backend.clone(), meta_peer_client, diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index de9a293446..12dea5854b 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -23,7 +23,7 @@ use snafu::ResultExt; use table::metadata::TableId; use crate::error::{self, Result}; -use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue}; +use crate::key::{DatanodeLeaseKey, DatanodeStatKey, DatanodeStatValue, LeaseValue}; use crate::lease; use crate::metasrv::SelectorContext; use crate::selector::common::choose_peers; @@ -58,7 +58,7 @@ impl Default for LoadBasedSelector, RegionNumsBasedWe impl Selector for LoadBasedSelector where W: WeightedChoose, - C: WeightCompute>, + C: WeightCompute>, { type Context = SelectorContext; type Output = Vec; @@ -112,9 +112,9 @@ where } fn filter_out_expired_datanode( - mut stat_kvs: HashMap, - lease_kvs: &HashMap, -) -> HashMap { + mut stat_kvs: HashMap, + lease_kvs: &HashMap, +) -> HashMap { lease_kvs .iter() .filter_map(|(lease_k, _)| stat_kvs.remove_entry(&lease_k.into())) @@ -122,9 +122,9 @@ fn filter_out_expired_datanode( } fn filter_out_datanode_by_table( - stat_kvs: &HashMap, + stat_kvs: &HashMap, leader_peer_ids: &[u64], -) -> HashMap { +) -> HashMap { stat_kvs .iter() .filter(|(stat_k, _)| leader_peer_ids.contains(&stat_k.node_id)) @@ -162,37 +162,37 @@ async fn get_leader_peer_ids( mod tests { use std::collections::HashMap; - use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue}; + use crate::key::{DatanodeLeaseKey, DatanodeStatKey, DatanodeStatValue, LeaseValue}; use crate::selector::load_based::filter_out_expired_datanode; #[test] fn test_filter_out_expired_datanode() { let mut stat_kvs = HashMap::new(); stat_kvs.insert( - StatKey { + DatanodeStatKey { cluster_id: 1, node_id: 0, }, - StatValue { stats: vec![] }, + DatanodeStatValue { stats: vec![] }, ); stat_kvs.insert( - StatKey { + DatanodeStatKey { cluster_id: 1, node_id: 1, }, - StatValue { stats: vec![] }, + DatanodeStatValue { stats: vec![] }, ); stat_kvs.insert( - StatKey { + DatanodeStatKey { cluster_id: 1, node_id: 2, }, - StatValue { stats: vec![] }, + DatanodeStatValue { stats: vec![] }, ); let mut lease_kvs = HashMap::new(); lease_kvs.insert( - LeaseKey { + DatanodeLeaseKey { cluster_id: 1, node_id: 1, }, @@ -205,7 +205,7 @@ mod tests { let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs); assert_eq!(1, alive_stat_kvs.len()); - assert!(alive_stat_kvs.contains_key(&StatKey { + assert!(alive_stat_kvs.contains_key(&DatanodeStatKey { cluster_id: 1, node_id: 1 })); diff --git a/src/meta-srv/src/selector/round_robin.rs b/src/meta-srv/src/selector/round_robin.rs index 4355837fc7..dba8b556a3 100644 --- a/src/meta-srv/src/selector/round_robin.rs +++ b/src/meta-srv/src/selector/round_robin.rs @@ -19,7 +19,7 @@ use snafu::ensure; use crate::error::{NoEnoughAvailableDatanodeSnafu, Result}; use crate::lease; -use crate::metasrv::SelectorContext; +use crate::metasrv::{SelectTarget, SelectorContext}; use crate::selector::{Namespace, Selector, SelectorOptions}; /// Round-robin selector that returns the next peer in the list in sequence. @@ -29,11 +29,76 @@ use crate::selector::{Namespace, Selector, SelectorOptions}; /// all datanodes. But **it's not recommended** to use this selector in serious /// production environments because it doesn't take into account the load of /// each datanode. -#[derive(Default)] pub struct RoundRobinSelector { + select_target: SelectTarget, counter: AtomicUsize, } +impl Default for RoundRobinSelector { + fn default() -> Self { + Self { + select_target: SelectTarget::Datanode, + counter: AtomicUsize::new(0), + } + } +} + +impl RoundRobinSelector { + pub fn new(select_target: SelectTarget) -> Self { + Self { + select_target, + ..Default::default() + } + } + + async fn get_peers( + &self, + ns: Namespace, + min_required_items: usize, + ctx: &SelectorContext, + ) -> Result> { + let mut peers = match self.select_target { + SelectTarget::Datanode => { + // 1. get alive datanodes. + let lease_kvs = + lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs) + .await?; + + // 2. map into peers + lease_kvs + .into_iter() + .map(|(k, v)| Peer::new(k.node_id, v.node_addr)) + .collect::>() + } + SelectTarget::Flownode => { + // 1. get alive flownodes. + let lease_kvs = + lease::alive_flownodes(ns, &ctx.meta_peer_client, ctx.flownode_lease_secs) + .await?; + + // 2. map into peers + lease_kvs + .into_iter() + .map(|(k, v)| Peer::new(k.node_id, v.node_addr)) + .collect::>() + } + }; + + ensure!( + !peers.is_empty(), + NoEnoughAvailableDatanodeSnafu { + required: min_required_items, + available: 0usize, + } + ); + + // 3. sort by node id + peers.sort_by_key(|p| p.id); + + Ok(peers) + } +} + #[async_trait::async_trait] impl Selector for RoundRobinSelector { type Context = SelectorContext; @@ -45,25 +110,8 @@ impl Selector for RoundRobinSelector { ctx: &Self::Context, opts: SelectorOptions, ) -> Result> { - // 1. get alive datanodes. - let lease_kvs = - lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs).await?; - - // 2. map into peers and sort on node id - let mut peers: Vec = lease_kvs - .into_iter() - .map(|(k, v)| Peer::new(k.node_id, v.node_addr)) - .collect(); - peers.sort_by_key(|p| p.id); - ensure!( - !peers.is_empty(), - NoEnoughAvailableDatanodeSnafu { - required: opts.min_required_items, - available: 0usize, - } - ); - - // 3. choose peers + let peers = self.get_peers(ns, opts.min_required_items, ctx).await?; + // choose peers let mut selected = Vec::with_capacity(opts.min_required_items); for _ in 0..opts.min_required_items { let idx = self diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index b1a75c6ad9..a87a1b3b7f 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use common_meta::peer::Peer; use itertools::{Itertools, MinMaxResult}; -use crate::keys::{StatKey, StatValue}; +use crate::key::{DatanodeStatKey, DatanodeStatValue}; use crate::selector::weighted_choose::WeightedItem; /// The [`WeightCompute`] trait is used to compute the weight array by heartbeats. @@ -37,9 +37,12 @@ pub trait WeightCompute: Send + Sync { pub struct RegionNumsBasedWeightCompute; impl WeightCompute for RegionNumsBasedWeightCompute { - type Source = HashMap; + type Source = HashMap; - fn compute(&self, stat_kvs: &HashMap) -> Vec> { + fn compute( + &self, + stat_kvs: &HashMap, + ) -> Vec> { let mut region_nums = Vec::with_capacity(stat_kvs.len()); let mut peers = Vec::with_capacity(stat_kvs.len()); @@ -98,32 +101,32 @@ mod tests { use super::{RegionNumsBasedWeightCompute, WeightCompute}; use crate::handler::node_stat::{RegionStat, Stat}; - use crate::keys::{StatKey, StatValue}; + use crate::key::{DatanodeStatKey, DatanodeStatValue}; #[test] fn test_weight_compute() { - let mut stat_kvs: HashMap = HashMap::default(); - let stat_key = StatKey { + let mut stat_kvs: HashMap = HashMap::default(); + let stat_key = DatanodeStatKey { cluster_id: 1, node_id: 1, }; - let stat_val = StatValue { + let stat_val = DatanodeStatValue { stats: vec![mock_stat_1()], }; stat_kvs.insert(stat_key, stat_val); - let stat_key = StatKey { + let stat_key = DatanodeStatKey { cluster_id: 1, node_id: 2, }; - let stat_val = StatValue { + let stat_val = DatanodeStatValue { stats: vec![mock_stat_2()], }; stat_kvs.insert(stat_key, stat_val); - let stat_key = StatKey { + let stat_key = DatanodeStatKey { cluster_id: 1, node_id: 3, }; - let stat_val = StatValue { + let stat_val = DatanodeStatValue { stats: vec![mock_stat_3()], }; stat_kvs.insert(stat_key, stat_val); diff --git a/src/meta-srv/src/service/admin/heartbeat.rs b/src/meta-srv/src/service/admin/heartbeat.rs index 618d6ef6ef..aa4b7cc96d 100644 --- a/src/meta-srv/src/service/admin/heartbeat.rs +++ b/src/meta-srv/src/service/admin/heartbeat.rs @@ -20,7 +20,7 @@ use tonic::codegen::http; use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result}; -use crate::keys::StatValue; +use crate::key::DatanodeStatValue; use crate::service::admin::{util, HttpHandler}; #[derive(Clone)] @@ -46,7 +46,7 @@ impl HttpHandler for HeartBeatHandler { } let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?; - let mut stat_vals: Vec = stat_kvs.into_values().collect(); + let mut stat_vals: Vec = stat_kvs.into_values().collect(); if let Some(addr) = params.get("addr") { stat_vals = filter_by_addr(stat_vals, addr); @@ -63,7 +63,7 @@ impl HttpHandler for HeartBeatHandler { #[derive(Debug, Serialize, Deserialize)] #[serde(transparent)] pub struct StatValues { - pub stat_vals: Vec, + pub stat_vals: Vec, } impl TryFrom for String { @@ -76,7 +76,7 @@ impl TryFrom for String { } } -fn filter_by_addr(stat_vals: Vec, addr: &str) -> Vec { +fn filter_by_addr(stat_vals: Vec, addr: &str) -> Vec { stat_vals .into_iter() .filter(|stat_val| stat_val.stats.iter().any(|stat| stat.addr == addr)) @@ -86,12 +86,12 @@ fn filter_by_addr(stat_vals: Vec, addr: &str) -> Vec { #[cfg(test)] mod tests { use crate::handler::node_stat::Stat; - use crate::keys::StatValue; + use crate::key::DatanodeStatValue; use crate::service::admin::heartbeat::filter_by_addr; #[tokio::test] async fn test_filter_by_addr() { - let stat_value1 = StatValue { + let stat_value1 = DatanodeStatValue { stats: vec![ Stat { addr: "127.0.0.1:3001".to_string(), @@ -106,7 +106,7 @@ mod tests { ], }; - let stat_value2 = StatValue { + let stat_value2 = DatanodeStatValue { stats: vec![ Stat { addr: "127.0.0.1:3002".to_string(), diff --git a/src/meta-srv/src/service/admin/node_lease.rs b/src/meta-srv/src/service/admin/node_lease.rs index a0f60ced56..a3736d1818 100644 --- a/src/meta-srv/src/service/admin/node_lease.rs +++ b/src/meta-srv/src/service/admin/node_lease.rs @@ -20,7 +20,7 @@ use tonic::codegen::http; use crate::cluster::MetaPeerClientRef; use crate::error::{self, Result}; -use crate::keys::{LeaseKey, LeaseValue}; +use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::lease; use crate::service::admin::{util, HttpHandler}; @@ -38,8 +38,7 @@ impl HttpHandler for NodeLeaseHandler { ) -> Result> { let cluster_id = util::extract_cluster_id(params)?; - let leases = - lease::filter_datanodes(cluster_id, &self.meta_peer_client, |_, _| true).await?; + let leases = lease::alive_datanodes(cluster_id, &self.meta_peer_client, u64::MAX).await?; let leases = leases .into_iter() .map(|(k, v)| HumanLease { @@ -59,7 +58,7 @@ impl HttpHandler for NodeLeaseHandler { #[derive(Debug, Serialize, Deserialize)] pub struct HumanLease { - pub name: LeaseKey, + pub name: DatanodeLeaseKey, pub human_time: String, pub lease: LeaseValue, } diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 0c9ae03f1f..1553236406 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -34,7 +34,7 @@ use table::requests::TableOptions; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::handler::{HeartbeatMailbox, Pushers}; -use crate::keys::{LeaseKey, LeaseValue}; +use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::lock::memory::MemLock; use crate::metasrv::SelectorContext; use crate::procedure::region_failover::RegionFailoverManager; @@ -71,6 +71,7 @@ pub(crate) fn create_selector_context() -> SelectorContext { SelectorContext { datanode_lease_secs: 10, + flownode_lease_secs: 10, server_addr: "127.0.0.1:3002".to_string(), kv_backend: in_memory, meta_peer_client, @@ -175,7 +176,7 @@ pub(crate) async fn put_datanodes( ) { let backend = meta_peer_client.memory_backend(); for datanode in datanodes { - let lease_key = LeaseKey { + let lease_key = DatanodeLeaseKey { cluster_id, node_id: datanode.id, }; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 7c0bb2f1d0..c524e05523 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -300,7 +300,7 @@ impl GreptimeDbClusterBuilder { ) { for _ in 0..10 { let alive_datanodes = - meta_srv::lease::filter_datanodes(1000, meta_peer_client, |_, _| true) + meta_srv::lease::alive_datanodes(1000, meta_peer_client, u64::MAX) .await .unwrap() .len(); diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index f2430a0d48..103b9481f8 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -357,6 +357,7 @@ async fn run_region_failover_procedure( selector, selector_ctx: SelectorContext { datanode_lease_secs: distributed_time_constants::REGION_LEASE_SECS, + flownode_lease_secs: distributed_time_constants::REGION_LEASE_SECS, server_addr: metasrv.options().server_addr.clone(), kv_backend: metasrv.kv_backend().clone(), meta_peer_client: metasrv.meta_peer_client().clone(),