feat: register flow node (#4166)

* feat: rename keys.rs to key.rs

* feat: refactor datanode keys

* feat: add flownode key

* feat: keep flownode's lease info in metasrv

* feat: flow selector

* feat: impl_try_from_lease_key and impl_from_str_lease_key to simple code
This commit is contained in:
Jeremyhi
2024-06-20 11:46:19 +08:00
committed by GitHub
parent 48a0f39b19
commit 4c3d4af127
23 changed files with 523 additions and 275 deletions

View File

@@ -30,6 +30,8 @@ pub const REGION_LEASE_SECS: u64 =
/// If the node's lease has expired, the `Selector` will not select it.
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 3;

View File

@@ -37,7 +37,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
use crate::error::{match_for_io_error, Result};
use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX};
use crate::key::{DatanodeStatKey, DatanodeStatValue};
use crate::metasrv::ElectionRef;
pub type MetaPeerClientRef = Arc<MetaPeerClient>;
@@ -154,13 +154,6 @@ impl KvBackend for MetaPeerClient {
.fail()
}
async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
error::UnsupportedSnafu {
operation: "compare and put".to_string(),
}
.fail()
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
error::UnsupportedSnafu {
operation: "delete range".to_string(),
@@ -175,6 +168,13 @@ impl KvBackend for MetaPeerClient {
.fail()
}
async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
error::UnsupportedSnafu {
operation: "compare and put".to_string(),
}
.fail()
}
async fn put_conditionally(
&self,
_key: Vec<u8>,
@@ -197,7 +197,7 @@ impl KvBackend for MetaPeerClient {
impl MetaPeerClient {
async fn get_dn_key_value(&self, keys_only: bool) -> Result<Vec<KeyValue>> {
let key = format!("{DN_STAT_PREFIX}-").into_bytes();
let key = DatanodeStatKey::prefix_key();
let range_end = util::get_prefix_end_key(&key);
let range_request = RangeRequest {
key,
@@ -209,7 +209,7 @@ impl MetaPeerClient {
}
// Get all datanode stat kvs from leader meta.
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
let kvs = self.get_dn_key_value(false).await?;
to_stat_kv_map(kvs)
}
@@ -218,12 +218,15 @@ impl MetaPeerClient {
let kvs = self.get_dn_key_value(true).await?;
kvs.into_iter()
.map(|kv| kv.key.try_into())
.collect::<Result<HashSet<StatKey>>>()
.collect::<Result<HashSet<DatanodeStatKey>>>()
.map(|hash_set| hash_set.len() as i32)
}
// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
pub async fn get_dn_stat_kvs(
&self,
keys: Vec<DatanodeStatKey>,
) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
let batch_get_req = BatchGetRequest { keys: stat_keys };
@@ -313,7 +316,7 @@ impl MetaPeerClient {
}
}
fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<StatKey, StatValue>> {
fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<DatanodeStatKey, DatanodeStatValue>> {
let mut map = HashMap::with_capacity(kvs.len());
for kv in kvs {
let _ = map.insert(kv.key.try_into()?, kv.value.try_into()?);
@@ -358,11 +361,11 @@ mod tests {
use super::{check_resp_header, to_stat_kv_map, Context};
use crate::error;
use crate::handler::node_stat::Stat;
use crate::keys::{StatKey, StatValue};
use crate::key::{DatanodeStatKey, DatanodeStatValue};
#[test]
fn test_to_stat_kv_map() {
let stat_key = StatKey {
let stat_key = DatanodeStatKey {
cluster_id: 0,
node_id: 100,
};
@@ -373,7 +376,7 @@ mod tests {
addr: "127.0.0.1:3001".to_string(),
..Default::default()
};
let stat_val = StatValue { stats: vec![stat] }.try_into().unwrap();
let stat_val = DatanodeStatValue { stats: vec![stat] }.try_into().unwrap();
let kv = KeyValue {
key: stat_key.into(),

View File

@@ -274,7 +274,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid datanode lease key: {}", key))]
#[snafu(display("Invalid lease key: {}", key))]
InvalidLeaseKey {
key: String,
#[snafu(implicit)]
@@ -295,7 +295,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse datanode lease key from utf8"))]
#[snafu(display("Failed to parse lease key from utf8"))]
LeaseKeyFromUtf8 {
#[snafu(source)]
error: std::string::FromUtf8Error,
@@ -303,7 +303,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse datanode lease value from utf8"))]
#[snafu(display("Failed to parse lease value from utf8"))]
LeaseValueFromUtf8 {
#[snafu(source)]
error: std::string::FromUtf8Error,
@@ -311,7 +311,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse datanode stat key from utf8"))]
#[snafu(display("Failed to parse stat key from utf8"))]
StatKeyFromUtf8 {
#[snafu(source)]
error: std::string::FromUtf8Error,
@@ -319,7 +319,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse datanode stat value from utf8"))]
#[snafu(display("Failed to parse stat value from utf8"))]
StatValueFromUtf8 {
#[snafu(source)]
error: std::string::FromUtf8Error,

View File

@@ -55,7 +55,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
start_time_ms: info.start_time_ms,
};
save_to_mem_store(key, value, ctx).await?;
put_into_memory_store(ctx, key, value).await?;
Ok(HandleControl::Continue)
}
@@ -88,7 +88,7 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
start_time_ms: info.start_time_ms,
};
save_to_mem_store(key, value, ctx).await?;
put_into_memory_store(ctx, key, value).await?;
Ok(HandleControl::Continue)
}
@@ -138,7 +138,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
start_time_ms: info.start_time_ms,
};
save_to_mem_store(key, value, ctx).await?;
put_into_memory_store(ctx, key, value).await?;
Ok(HandleControl::Continue)
}
@@ -176,7 +176,7 @@ fn extract_base_info(
))
}
async fn save_to_mem_store(key: NodeInfoKey, value: NodeInfo, ctx: &mut Context) -> Result<()> {
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {
let key = key.into();
let value = value.try_into().context(InvalidClusterInfoFormatSnafu)?;
let put_req = PutRequest {

View File

@@ -12,21 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use common_meta::rpc::store::PutRequest;
use common_telemetry::{trace, warn};
use common_time::util as time_util;
use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::{LeaseKey, LeaseValue};
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
use crate::metasrv::Context;
/// Keeps [Datanode] leases
pub struct KeepLeaseHandler;
pub struct DatanodeKeepLeaseHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for KeepLeaseHandler {
impl HeartbeatHandler for DatanodeKeepLeaseHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
@@ -45,7 +45,7 @@ impl HeartbeatHandler for KeepLeaseHandler {
return Ok(HandleControl::Continue);
};
let key = LeaseKey {
let key = DatanodeLeaseKey {
cluster_id: header.cluster_id,
node_id: peer.id,
};
@@ -54,22 +54,68 @@ impl HeartbeatHandler for KeepLeaseHandler {
node_addr: peer.addr.clone(),
};
trace!("Receive a heartbeat: {key:?}, {value:?}");
trace!("Receive a heartbeat from datanode: {key:?}, {value:?}");
let key = key.try_into()?;
let value = value.try_into()?;
let put_req = PutRequest {
key,
value,
..Default::default()
};
let res = ctx.in_memory.put(put_req).await;
if let Err(err) = res {
warn!(err; "Failed to update lease KV, peer: {peer:?}");
}
put_into_memory_store(ctx, key, value, peer).await;
Ok(HandleControl::Continue)
}
}
/// Keeps [Flownode] leases
pub struct FlownodeKeepLeaseHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for FlownodeKeepLeaseHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Flownode
}
async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let HeartbeatRequest { header, peer, .. } = req;
let Some(header) = &header else {
return Ok(HandleControl::Continue);
};
let Some(peer) = &peer else {
return Ok(HandleControl::Continue);
};
let key = FlownodeLeaseKey {
cluster_id: header.cluster_id,
node_id: peer.id,
};
let value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: peer.addr.clone(),
};
trace!("Receive a heartbeat from flownode: {key:?}, {value:?}");
let key = key.try_into()?;
let value = value.try_into()?;
put_into_memory_store(ctx, key, value, peer).await;
Ok(HandleControl::Continue)
}
}
async fn put_into_memory_store(ctx: &mut Context, key: Vec<u8>, value: Vec<u8>, peer: &Peer) {
let put_req = PutRequest {
key,
value,
..Default::default()
};
let res = ctx.in_memory.put(put_req).await;
if let Err(err) = res {
warn!(err; "Failed to update lease KV, peer: {peer:?}");
}
}

View File

@@ -22,7 +22,7 @@ use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use crate::error::{Error, InvalidHeartbeatRequestSnafu};
use crate::keys::StatKey;
use crate::key::DatanodeStatKey;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Stat {
@@ -65,8 +65,8 @@ impl Stat {
self.region_stats.is_empty()
}
pub fn stat_key(&self) -> StatKey {
StatKey {
pub fn stat_key(&self) -> DatanodeStatKey {
DatanodeStatKey {
cluster_id: self.cluster_id,
node_id: self.id,
}

View File

@@ -23,7 +23,7 @@ use snafu::ResultExt;
use crate::error::{self, Result};
use crate::handler::node_stat::Stat;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::{StatKey, StatValue};
use crate::key::{DatanodeStatKey, DatanodeStatValue};
use crate::metasrv::Context;
const MAX_CACHED_STATS_PER_KEY: usize = 10;
@@ -68,7 +68,7 @@ impl EpochStats {
#[derive(Default)]
pub struct PersistStatsHandler {
stats_cache: DashMap<StatKey, EpochStats>,
stats_cache: DashMap<DatanodeStatKey, EpochStats>,
}
#[async_trait::async_trait]
@@ -121,7 +121,7 @@ impl HeartbeatHandler for PersistStatsHandler {
return Ok(HandleControl::Continue);
}
let value: Vec<u8> = StatValue {
let value: Vec<u8> = DatanodeStatValue {
stats: epoch_stats.drain_all(),
}
.try_into()?;
@@ -152,7 +152,7 @@ mod tests {
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::keys::StatKey;
use crate::key::DatanodeStatKey;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
#[tokio::test]
@@ -186,17 +186,17 @@ mod tests {
let handler = PersistStatsHandler::default();
handle_request_many_times(ctx.clone(), &handler, 1).await;
let key = StatKey {
let key = DatanodeStatKey {
cluster_id: 3,
node_id: 101,
};
let key: Vec<u8> = key.into();
let res = ctx.in_memory.get(&key).await.unwrap();
let kv = res.unwrap();
let key: StatKey = kv.key.clone().try_into().unwrap();
let key: DatanodeStatKey = kv.key.clone().try_into().unwrap();
assert_eq!(3, key.cluster_id);
assert_eq!(101, key.node_id);
let val: StatValue = kv.value.try_into().unwrap();
let val: DatanodeStatValue = kv.value.try_into().unwrap();
// first new stat must be set in kv store immediately
assert_eq!(1, val.stats.len());
assert_eq!(1, val.stats[0].region_num);
@@ -206,7 +206,7 @@ mod tests {
let key: Vec<u8> = key.into();
let res = ctx.in_memory.get(&key).await.unwrap();
let kv = res.unwrap();
let val: StatValue = kv.value.try_into().unwrap();
let val: DatanodeStatValue = kv.value.try_into().unwrap();
// refresh every 10 stats
assert_eq!(10, val.stats.len());
}

138
src/meta-srv/src/key.rs Normal file
View File

@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod datanode;
mod flownode;
use std::str::FromStr;
pub use datanode::*;
pub use flownode::*;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
macro_rules! impl_from_str_lease_key {
($key_type:ty, $pattern:expr) => {
impl FromStr for $key_type {
type Err = error::Error;
fn from_str(key: &str) -> error::Result<Self> {
let caps = $pattern
.captures(key)
.context(error::InvalidLeaseKeySnafu { key })?;
ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key });
let cluster_id = caps[1].to_string();
let node_id = caps[2].to_string();
let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid cluster_id: {cluster_id}"),
})?;
let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid node_id: {node_id}"),
})?;
Ok(Self {
cluster_id,
node_id,
})
}
}
};
}
impl_from_str_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_KEY_PATTERN);
impl_from_str_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_KEY_PATTERN);
macro_rules! impl_try_from_lease_key {
($key_type:ty, $prefix:expr) => {
impl TryFrom<Vec<u8>> for $key_type {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> error::Result<Self> {
String::from_utf8(bytes)
.context(error::LeaseKeyFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
impl TryFrom<$key_type> for Vec<u8> {
type Error = error::Error;
fn try_from(key: $key_type) -> error::Result<Self> {
Ok(format!("{}-{}-{}", $prefix, key.cluster_id, key.node_id).into_bytes())
}
}
};
}
impl_try_from_lease_key!(FlownodeLeaseKey, FLOWNODE_LEASE_PREFIX);
impl_try_from_lease_key!(DatanodeLeaseKey, DATANODE_LEASE_PREFIX);
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct LeaseValue {
// last activity
pub timestamp_millis: i64,
pub node_addr: String,
}
impl FromStr for LeaseValue {
type Err = error::Error;
fn from_str(value: &str) -> crate::Result<Self> {
serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
}
}
impl TryFrom<Vec<u8>> for LeaseValue {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> crate::Result<Self> {
String::from_utf8(bytes)
.context(error::LeaseValueFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
impl TryFrom<LeaseValue> for Vec<u8> {
type Error = error::Error;
fn try_from(value: LeaseValue) -> crate::Result<Self> {
Ok(serde_json::to_string(&value)
.context(error::SerializeToJsonSnafu {
input: format!("{value:?}"),
})?
.into_bytes())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lease_value_round_trip() {
let value = LeaseValue {
timestamp_millis: 111,
node_addr: "127.0.0.1:3002".to_string(),
};
let value_bytes: Vec<u8> = value.clone().try_into().unwrap();
let new_value: LeaseValue = value_bytes.try_into().unwrap();
assert_eq!(new_value, value);
}
}

View File

@@ -24,16 +24,16 @@ use crate::error;
use crate::error::Result;
use crate::handler::node_stat::Stat;
pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease";
pub(crate) const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
pub const DN_STAT_PREFIX: &str = "__meta_dnstat";
const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
lazy_static! {
static ref DATANODE_LEASE_KEY_PATTERN: Regex =
Regex::new(&format!("^{DN_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
static ref DATANODE_STAT_KEY_PATTERN: Regex =
Regex::new(&format!("^{DN_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!(
"^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
))
@@ -41,118 +41,49 @@ lazy_static! {
}
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct LeaseKey {
pub struct DatanodeLeaseKey {
pub cluster_id: ClusterId,
pub node_id: u64,
}
impl FromStr for LeaseKey {
type Err = error::Error;
fn from_str(key: &str) -> Result<Self> {
let caps = DATANODE_LEASE_KEY_PATTERN
.captures(key)
.context(error::InvalidLeaseKeySnafu { key })?;
ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key });
let cluster_id = caps[1].to_string();
let node_id = caps[2].to_string();
let cluster_id: u64 = cluster_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid cluster_id: {cluster_id}"),
})?;
let node_id: u64 = node_id.parse().context(error::ParseNumSnafu {
err_msg: format!("invalid node_id: {node_id}"),
})?;
Ok(Self {
cluster_id,
node_id,
})
}
}
impl TryFrom<Vec<u8>> for LeaseKey {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(error::LeaseKeyFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
impl TryFrom<LeaseKey> for Vec<u8> {
type Error = error::Error;
fn try_from(dn_key: LeaseKey) -> Result<Self> {
Ok(format!(
"{}-{}-{}",
DN_LEASE_PREFIX, dn_key.cluster_id, dn_key.node_id
)
.into_bytes())
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct LeaseValue {
// last activity
pub timestamp_millis: i64,
pub node_addr: String,
}
impl FromStr for LeaseValue {
type Err = error::Error;
fn from_str(value: &str) -> Result<Self> {
serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value })
}
}
impl TryFrom<Vec<u8>> for LeaseValue {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
String::from_utf8(bytes)
.context(error::LeaseValueFromUtf8Snafu {})
.map(|x| x.parse())?
}
}
impl TryFrom<LeaseValue> for Vec<u8> {
type Error = error::Error;
fn try_from(dn_value: LeaseValue) -> Result<Self> {
Ok(serde_json::to_string(&dn_value)
.context(error::SerializeToJsonSnafu {
input: format!("{dn_value:?}"),
})?
.into_bytes())
impl DatanodeLeaseKey {
pub fn prefix_key_by_cluster(cluster_id: ClusterId) -> Vec<u8> {
format!("{DATANODE_LEASE_PREFIX}-{cluster_id}-").into_bytes()
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct StatKey {
pub struct DatanodeStatKey {
pub cluster_id: ClusterId,
pub node_id: u64,
}
impl From<&LeaseKey> for StatKey {
fn from(lease_key: &LeaseKey) -> Self {
StatKey {
impl DatanodeStatKey {
pub fn prefix_key() -> Vec<u8> {
format!("{DATANODE_STAT_PREFIX}-").into_bytes()
}
}
impl From<&DatanodeLeaseKey> for DatanodeStatKey {
fn from(lease_key: &DatanodeLeaseKey) -> Self {
DatanodeStatKey {
cluster_id: lease_key.cluster_id,
node_id: lease_key.node_id,
}
}
}
impl From<StatKey> for Vec<u8> {
fn from(value: StatKey) -> Self {
format!("{}-{}-{}", DN_STAT_PREFIX, value.cluster_id, value.node_id).into_bytes()
impl From<DatanodeStatKey> for Vec<u8> {
fn from(value: DatanodeStatKey) -> Self {
format!(
"{}-{}-{}",
DATANODE_STAT_PREFIX, value.cluster_id, value.node_id
)
.into_bytes()
}
}
impl FromStr for StatKey {
impl FromStr for DatanodeStatKey {
type Err = error::Error;
fn from_str(key: &str) -> Result<Self> {
@@ -178,7 +109,7 @@ impl FromStr for StatKey {
}
}
impl TryFrom<Vec<u8>> for StatKey {
impl TryFrom<Vec<u8>> for DatanodeStatKey {
type Error = error::Error;
fn try_from(bytes: Vec<u8>) -> Result<Self> {
@@ -190,11 +121,11 @@ impl TryFrom<Vec<u8>> for StatKey {
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct StatValue {
pub struct DatanodeStatValue {
pub stats: Vec<Stat>,
}
impl StatValue {
impl DatanodeStatValue {
/// Get the latest number of regions.
pub fn region_num(&self) -> Option<u64> {
self.stats.last().map(|x| x.region_num)
@@ -206,10 +137,10 @@ impl StatValue {
}
}
impl TryFrom<StatValue> for Vec<u8> {
impl TryFrom<DatanodeStatValue> for Vec<u8> {
type Error = error::Error;
fn try_from(stats: StatValue) -> Result<Self> {
fn try_from(stats: DatanodeStatValue) -> Result<Self> {
Ok(serde_json::to_string(&stats)
.context(error::SerializeToJsonSnafu {
input: format!("{stats:?}"),
@@ -218,7 +149,7 @@ impl TryFrom<StatValue> for Vec<u8> {
}
}
impl FromStr for StatValue {
impl FromStr for DatanodeStatValue {
type Err = error::Error;
fn from_str(value: &str) -> Result<Self> {
@@ -226,7 +157,7 @@ impl FromStr for StatValue {
}
}
impl TryFrom<Vec<u8>> for StatValue {
impl TryFrom<Vec<u8>> for DatanodeStatValue {
type Error = error::Error;
fn try_from(value: Vec<u8>) -> Result<Self> {
@@ -310,13 +241,13 @@ mod tests {
#[test]
fn test_stat_key_round_trip() {
let key = StatKey {
let key = DatanodeStatKey {
cluster_id: 0,
node_id: 1,
};
let key_bytes: Vec<u8> = key.into();
let new_key: StatKey = key_bytes.try_into().unwrap();
let new_key: DatanodeStatKey = key_bytes.try_into().unwrap();
assert_eq!(0, new_key.cluster_id);
assert_eq!(1, new_key.node_id);
@@ -331,10 +262,10 @@ mod tests {
..Default::default()
};
let stat_val = StatValue { stats: vec![stat] };
let stat_val = DatanodeStatValue { stats: vec![stat] };
let bytes: Vec<u8> = stat_val.try_into().unwrap();
let stat_val: StatValue = bytes.try_into().unwrap();
let stat_val: DatanodeStatValue = bytes.try_into().unwrap();
let stats = stat_val.stats;
assert_eq!(1, stats.len());
@@ -347,37 +278,24 @@ mod tests {
#[test]
fn test_lease_key_round_trip() {
let key = LeaseKey {
let key = DatanodeLeaseKey {
cluster_id: 0,
node_id: 1,
};
let key_bytes: Vec<u8> = key.clone().try_into().unwrap();
let new_key: LeaseKey = key_bytes.try_into().unwrap();
let new_key: DatanodeLeaseKey = key_bytes.try_into().unwrap();
assert_eq!(new_key, key);
}
#[test]
fn test_lease_value_round_trip() {
let value = LeaseValue {
timestamp_millis: 111,
node_addr: "127.0.0.1:3002".to_string(),
};
let value_bytes: Vec<u8> = value.clone().try_into().unwrap();
let new_value: LeaseValue = value_bytes.try_into().unwrap();
assert_eq!(new_value, value);
}
#[test]
fn test_get_addr_from_stat_val() {
let empty = StatValue { stats: vec![] };
let empty = DatanodeStatValue { stats: vec![] };
let addr = empty.node_addr();
assert!(addr.is_none());
let stat_val = StatValue {
let stat_val = DatanodeStatValue {
stats: vec![
Stat {
addr: "1".to_string(),
@@ -399,11 +317,11 @@ mod tests {
#[test]
fn test_get_region_num_from_stat_val() {
let empty = StatValue { stats: vec![] };
let empty = DatanodeStatValue { stats: vec![] };
let region_num = empty.region_num();
assert!(region_num.is_none());
let wrong = StatValue {
let wrong = DatanodeStatValue {
stats: vec![Stat {
region_num: 0,
..Default::default()
@@ -412,7 +330,7 @@ mod tests {
let right = wrong.region_num();
assert_eq!(Some(0), right);
let stat_val = StatValue {
let stat_val = DatanodeStatValue {
stats: vec![
Stat {
region_num: 1,
@@ -434,12 +352,12 @@ mod tests {
#[test]
fn test_lease_key_to_stat_key() {
let lease_key = LeaseKey {
let lease_key = DatanodeLeaseKey {
cluster_id: 1,
node_id: 101,
};
let stat_key: StatKey = (&lease_key).into();
let stat_key: DatanodeStatKey = (&lease_key).into();
assert_eq!(1, stat_key.cluster_id);
assert_eq!(101, stat_key.node_id);

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::ClusterId;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
pub(crate) const FLOWNODE_LEASE_PREFIX: &str = "__meta_flownode_lease";
lazy_static! {
pub(crate) static ref FLOWNODE_LEASE_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOWNODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
}
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct FlownodeLeaseKey {
pub cluster_id: ClusterId,
pub node_id: u64,
}
impl FlownodeLeaseKey {
pub fn prefix_key_by_cluster(cluster_id: ClusterId) -> Vec<u8> {
format!("{FLOWNODE_LEASE_PREFIX}-{cluster_id}-").into_bytes()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lease_key_round_trip() {
let key = FlownodeLeaseKey {
cluster_id: 0,
node_id: 1,
};
let key_bytes: Vec<u8> = key.clone().try_into().unwrap();
let new_key: FlownodeLeaseKey = key_bytes.try_into().unwrap();
assert_eq!(new_key, key);
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::HashMap;
use std::hash::Hash;
use common_meta::kv_backend::KvBackend;
use common_meta::peer::Peer;
@@ -20,12 +21,13 @@ use common_meta::{util, ClusterId};
use common_time::util as time_util;
use crate::cluster::MetaPeerClientRef;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX};
use crate::error::{Error, Result};
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseKey, &LeaseValue) -> bool {
move |_: &LeaseKey, v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
move |v: &LeaseValue| {
((time_util::current_time_millis() - v.timestamp_millis) as u64)
< lease_secs.checked_mul(1000).unwrap_or(u64::MAX)
}
}
@@ -36,7 +38,7 @@ pub async fn lookup_alive_datanode_peer(
lease_secs: u64,
) -> Result<Option<Peer>> {
let lease_filter = build_lease_filter(lease_secs);
let lease_key = LeaseKey {
let lease_key = DatanodeLeaseKey {
cluster_id,
node_id: datanode_id,
};
@@ -45,7 +47,7 @@ pub async fn lookup_alive_datanode_peer(
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
if lease_filter(&lease_key, &lease_value) {
if lease_filter(&lease_value) {
Ok(Some(Peer {
id: lease_key.node_id,
addr: lease_value.node_addr,
@@ -59,23 +61,40 @@ pub async fn alive_datanodes(
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
lease_secs: u64,
) -> Result<HashMap<LeaseKey, LeaseValue>> {
let lease_filter = build_lease_filter(lease_secs);
filter_datanodes(cluster_id, meta_peer_client, lease_filter).await
) -> Result<HashMap<DatanodeLeaseKey, LeaseValue>> {
let predicate = build_lease_filter(lease_secs);
filter(
DatanodeLeaseKey::prefix_key_by_cluster(cluster_id),
meta_peer_client,
|v| predicate(v),
)
.await
}
pub async fn filter_datanodes<P>(
pub async fn alive_flownodes(
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
predicate: P,
) -> Result<HashMap<LeaseKey, LeaseValue>>
where
P: Fn(&LeaseKey, &LeaseValue) -> bool,
{
let key = get_lease_prefix(cluster_id);
let range_end = util::get_prefix_end_key(&key);
lease_secs: u64,
) -> Result<HashMap<FlownodeLeaseKey, LeaseValue>> {
let predicate = build_lease_filter(lease_secs);
filter(
FlownodeLeaseKey::prefix_key_by_cluster(cluster_id),
meta_peer_client,
|v| predicate(v),
)
.await
}
pub async fn filter<P, K>(
key: Vec<u8>,
meta_peer_client: &MetaPeerClientRef,
predicate: P,
) -> Result<HashMap<K, LeaseValue>>
where
P: Fn(&LeaseValue) -> bool,
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error>,
{
let range_end = util::get_prefix_end_key(&key);
let range_req = common_meta::rpc::store::RangeRequest {
key,
range_end,
@@ -85,9 +104,9 @@ where
let kvs = meta_peer_client.range(range_req).await?.kvs;
let mut lease_kvs = HashMap::new();
for kv in kvs {
let lease_key: LeaseKey = kv.key.try_into()?;
let lease_key: K = kv.key.try_into()?;
let lease_value: LeaseValue = kv.value.try_into()?;
if !predicate(&lease_key, &lease_value) {
if !predicate(&lease_value) {
continue;
}
let _ = lease_kvs.insert(lease_key, lease_value);
@@ -95,8 +114,3 @@ where
Ok(lease_kvs)
}
#[inline]
pub fn get_lease_prefix(cluster_id: u64) -> Vec<u8> {
format!("{DN_LEASE_PREFIX}-{cluster_id}").into_bytes()
}

View File

@@ -25,7 +25,7 @@ pub mod election;
pub mod error;
mod failure_detector;
pub mod handler;
pub mod keys;
pub mod key;
pub mod lease;
pub mod lock;
pub mod metasrv;

View File

@@ -241,10 +241,17 @@ impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
}
}
#[derive(Clone, Copy)]
pub enum SelectTarget {
Datanode,
Flownode,
}
#[derive(Clone)]
pub struct SelectorContext {
pub server_addr: String,
pub datanode_lease_secs: u64,
pub flownode_lease_secs: u64,
pub kv_backend: KvBackendRef,
pub meta_peer_client: MetaPeerClientRef,
pub table_id: Option<TableId>,
@@ -314,7 +321,10 @@ pub struct Metasrv {
kv_backend: KvBackendRef,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
meta_peer_client: MetaPeerClientRef,
// The selector is used to select a target datanode.
selector: SelectorRef,
// The flow selector is used to select a target flownode.
flow_selector: SelectorRef,
handler_group: HeartbeatHandlerGroup,
election: Option<ElectionRef>,
lock: DistLockRef,
@@ -503,6 +513,10 @@ impl Metasrv {
&self.selector
}
pub fn flow_selector(&self) -> &SelectorRef {
&self.flow_selector
}
pub fn handler_group(&self) -> &HeartbeatHandlerGroup {
&self.handler_group
}

View File

@@ -38,7 +38,7 @@ use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
use super::FLOW_ID_SEQ;
use super::{SelectTarget, FLOW_ID_SEQ};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, Result};
@@ -50,7 +50,7 @@ use crate::handler::collect_cluster_info_handler::{
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
use crate::handler::keep_lease_handler::KeepLeaseHandler;
use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::on_leader_start_handler::OnLeaderStartHandler;
use crate::handler::persist_stats_handler::PersistStatsHandler;
@@ -68,6 +68,7 @@ use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::pubsub::PublisherRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
use crate::state::State;
@@ -212,6 +213,7 @@ impl MetasrvBuilder {
let selector_ctx = SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
table_id: None,
@@ -334,7 +336,8 @@ impl MetasrvBuilder {
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
// because even if the current meta-server node is no longer the leader it can
// still help the datanode to keep lease.
group.add_handler(KeepLeaseHandler).await;
group.add_handler(DatanodeKeepLeaseHandler).await;
group.add_handler(FlownodeKeepLeaseHandler).await;
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(CollectStatsHandler).await;
@@ -367,6 +370,8 @@ impl MetasrvBuilder {
leader_cached_kv_backend,
meta_peer_client: meta_peer_client.clone(),
selector,
// TODO(jeremy): We do not allow configuring the flow selector.
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
handler_group,
election,
lock,

View File

@@ -574,6 +574,7 @@ mod tests {
});
let selector_ctx = SelectorContext {
datanode_lease_secs: 10,
flownode_lease_secs: 10,
server_addr: "127.0.0.1:3002".to_string(),
kv_backend: kv_backend.clone(),
meta_peer_client,

View File

@@ -23,7 +23,7 @@ use snafu::ResultExt;
use table::metadata::TableId;
use crate::error::{self, Result};
use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue};
use crate::key::{DatanodeLeaseKey, DatanodeStatKey, DatanodeStatValue, LeaseValue};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::common::choose_peers;
@@ -58,7 +58,7 @@ impl Default for LoadBasedSelector<RandomWeightedChoose<Peer>, RegionNumsBasedWe
impl<W, C> Selector for LoadBasedSelector<W, C>
where
W: WeightedChoose<Peer>,
C: WeightCompute<Source = HashMap<StatKey, StatValue>>,
C: WeightCompute<Source = HashMap<DatanodeStatKey, DatanodeStatValue>>,
{
type Context = SelectorContext;
type Output = Vec<Peer>;
@@ -112,9 +112,9 @@ where
}
fn filter_out_expired_datanode(
mut stat_kvs: HashMap<StatKey, StatValue>,
lease_kvs: &HashMap<LeaseKey, LeaseValue>,
) -> HashMap<StatKey, StatValue> {
mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue>,
lease_kvs: &HashMap<DatanodeLeaseKey, LeaseValue>,
) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
lease_kvs
.iter()
.filter_map(|(lease_k, _)| stat_kvs.remove_entry(&lease_k.into()))
@@ -122,9 +122,9 @@ fn filter_out_expired_datanode(
}
fn filter_out_datanode_by_table(
stat_kvs: &HashMap<StatKey, StatValue>,
stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
leader_peer_ids: &[u64],
) -> HashMap<StatKey, StatValue> {
) -> HashMap<DatanodeStatKey, DatanodeStatValue> {
stat_kvs
.iter()
.filter(|(stat_k, _)| leader_peer_ids.contains(&stat_k.node_id))
@@ -162,37 +162,37 @@ async fn get_leader_peer_ids(
mod tests {
use std::collections::HashMap;
use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue};
use crate::key::{DatanodeLeaseKey, DatanodeStatKey, DatanodeStatValue, LeaseValue};
use crate::selector::load_based::filter_out_expired_datanode;
#[test]
fn test_filter_out_expired_datanode() {
let mut stat_kvs = HashMap::new();
stat_kvs.insert(
StatKey {
DatanodeStatKey {
cluster_id: 1,
node_id: 0,
},
StatValue { stats: vec![] },
DatanodeStatValue { stats: vec![] },
);
stat_kvs.insert(
StatKey {
DatanodeStatKey {
cluster_id: 1,
node_id: 1,
},
StatValue { stats: vec![] },
DatanodeStatValue { stats: vec![] },
);
stat_kvs.insert(
StatKey {
DatanodeStatKey {
cluster_id: 1,
node_id: 2,
},
StatValue { stats: vec![] },
DatanodeStatValue { stats: vec![] },
);
let mut lease_kvs = HashMap::new();
lease_kvs.insert(
LeaseKey {
DatanodeLeaseKey {
cluster_id: 1,
node_id: 1,
},
@@ -205,7 +205,7 @@ mod tests {
let alive_stat_kvs = filter_out_expired_datanode(stat_kvs, &lease_kvs);
assert_eq!(1, alive_stat_kvs.len());
assert!(alive_stat_kvs.contains_key(&StatKey {
assert!(alive_stat_kvs.contains_key(&DatanodeStatKey {
cluster_id: 1,
node_id: 1
}));

View File

@@ -19,7 +19,7 @@ use snafu::ensure;
use crate::error::{NoEnoughAvailableDatanodeSnafu, Result};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::metasrv::{SelectTarget, SelectorContext};
use crate::selector::{Namespace, Selector, SelectorOptions};
/// Round-robin selector that returns the next peer in the list in sequence.
@@ -29,11 +29,76 @@ use crate::selector::{Namespace, Selector, SelectorOptions};
/// all datanodes. But **it's not recommended** to use this selector in serious
/// production environments because it doesn't take into account the load of
/// each datanode.
#[derive(Default)]
pub struct RoundRobinSelector {
select_target: SelectTarget,
counter: AtomicUsize,
}
impl Default for RoundRobinSelector {
fn default() -> Self {
Self {
select_target: SelectTarget::Datanode,
counter: AtomicUsize::new(0),
}
}
}
impl RoundRobinSelector {
pub fn new(select_target: SelectTarget) -> Self {
Self {
select_target,
..Default::default()
}
}
async fn get_peers(
&self,
ns: Namespace,
min_required_items: usize,
ctx: &SelectorContext,
) -> Result<Vec<Peer>> {
let mut peers = match self.select_target {
SelectTarget::Datanode => {
// 1. get alive datanodes.
let lease_kvs =
lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs)
.await?;
// 2. map into peers
lease_kvs
.into_iter()
.map(|(k, v)| Peer::new(k.node_id, v.node_addr))
.collect::<Vec<_>>()
}
SelectTarget::Flownode => {
// 1. get alive flownodes.
let lease_kvs =
lease::alive_flownodes(ns, &ctx.meta_peer_client, ctx.flownode_lease_secs)
.await?;
// 2. map into peers
lease_kvs
.into_iter()
.map(|(k, v)| Peer::new(k.node_id, v.node_addr))
.collect::<Vec<_>>()
}
};
ensure!(
!peers.is_empty(),
NoEnoughAvailableDatanodeSnafu {
required: min_required_items,
available: 0usize,
}
);
// 3. sort by node id
peers.sort_by_key(|p| p.id);
Ok(peers)
}
}
#[async_trait::async_trait]
impl Selector for RoundRobinSelector {
type Context = SelectorContext;
@@ -45,25 +110,8 @@ impl Selector for RoundRobinSelector {
ctx: &Self::Context,
opts: SelectorOptions,
) -> Result<Vec<Peer>> {
// 1. get alive datanodes.
let lease_kvs =
lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs).await?;
// 2. map into peers and sort on node id
let mut peers: Vec<Peer> = lease_kvs
.into_iter()
.map(|(k, v)| Peer::new(k.node_id, v.node_addr))
.collect();
peers.sort_by_key(|p| p.id);
ensure!(
!peers.is_empty(),
NoEnoughAvailableDatanodeSnafu {
required: opts.min_required_items,
available: 0usize,
}
);
// 3. choose peers
let peers = self.get_peers(ns, opts.min_required_items, ctx).await?;
// choose peers
let mut selected = Vec::with_capacity(opts.min_required_items);
for _ in 0..opts.min_required_items {
let idx = self

View File

@@ -17,7 +17,7 @@ use std::collections::HashMap;
use common_meta::peer::Peer;
use itertools::{Itertools, MinMaxResult};
use crate::keys::{StatKey, StatValue};
use crate::key::{DatanodeStatKey, DatanodeStatValue};
use crate::selector::weighted_choose::WeightedItem;
/// The [`WeightCompute`] trait is used to compute the weight array by heartbeats.
@@ -37,9 +37,12 @@ pub trait WeightCompute: Send + Sync {
pub struct RegionNumsBasedWeightCompute;
impl WeightCompute for RegionNumsBasedWeightCompute {
type Source = HashMap<StatKey, StatValue>;
type Source = HashMap<DatanodeStatKey, DatanodeStatValue>;
fn compute(&self, stat_kvs: &HashMap<StatKey, StatValue>) -> Vec<WeightedItem<Peer>> {
fn compute(
&self,
stat_kvs: &HashMap<DatanodeStatKey, DatanodeStatValue>,
) -> Vec<WeightedItem<Peer>> {
let mut region_nums = Vec::with_capacity(stat_kvs.len());
let mut peers = Vec::with_capacity(stat_kvs.len());
@@ -98,32 +101,32 @@ mod tests {
use super::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::handler::node_stat::{RegionStat, Stat};
use crate::keys::{StatKey, StatValue};
use crate::key::{DatanodeStatKey, DatanodeStatValue};
#[test]
fn test_weight_compute() {
let mut stat_kvs: HashMap<StatKey, StatValue> = HashMap::default();
let stat_key = StatKey {
let mut stat_kvs: HashMap<DatanodeStatKey, DatanodeStatValue> = HashMap::default();
let stat_key = DatanodeStatKey {
cluster_id: 1,
node_id: 1,
};
let stat_val = StatValue {
let stat_val = DatanodeStatValue {
stats: vec![mock_stat_1()],
};
stat_kvs.insert(stat_key, stat_val);
let stat_key = StatKey {
let stat_key = DatanodeStatKey {
cluster_id: 1,
node_id: 2,
};
let stat_val = StatValue {
let stat_val = DatanodeStatValue {
stats: vec![mock_stat_2()],
};
stat_kvs.insert(stat_key, stat_val);
let stat_key = StatKey {
let stat_key = DatanodeStatKey {
cluster_id: 1,
node_id: 3,
};
let stat_val = StatValue {
let stat_val = DatanodeStatValue {
stats: vec![mock_stat_3()],
};
stat_kvs.insert(stat_key, stat_val);

View File

@@ -20,7 +20,7 @@ use tonic::codegen::http;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Result};
use crate::keys::StatValue;
use crate::key::DatanodeStatValue;
use crate::service::admin::{util, HttpHandler};
#[derive(Clone)]
@@ -46,7 +46,7 @@ impl HttpHandler for HeartBeatHandler {
}
let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?;
let mut stat_vals: Vec<StatValue> = stat_kvs.into_values().collect();
let mut stat_vals: Vec<DatanodeStatValue> = stat_kvs.into_values().collect();
if let Some(addr) = params.get("addr") {
stat_vals = filter_by_addr(stat_vals, addr);
@@ -63,7 +63,7 @@ impl HttpHandler for HeartBeatHandler {
#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct StatValues {
pub stat_vals: Vec<StatValue>,
pub stat_vals: Vec<DatanodeStatValue>,
}
impl TryFrom<StatValues> for String {
@@ -76,7 +76,7 @@ impl TryFrom<StatValues> for String {
}
}
fn filter_by_addr(stat_vals: Vec<StatValue>, addr: &str) -> Vec<StatValue> {
fn filter_by_addr(stat_vals: Vec<DatanodeStatValue>, addr: &str) -> Vec<DatanodeStatValue> {
stat_vals
.into_iter()
.filter(|stat_val| stat_val.stats.iter().any(|stat| stat.addr == addr))
@@ -86,12 +86,12 @@ fn filter_by_addr(stat_vals: Vec<StatValue>, addr: &str) -> Vec<StatValue> {
#[cfg(test)]
mod tests {
use crate::handler::node_stat::Stat;
use crate::keys::StatValue;
use crate::key::DatanodeStatValue;
use crate::service::admin::heartbeat::filter_by_addr;
#[tokio::test]
async fn test_filter_by_addr() {
let stat_value1 = StatValue {
let stat_value1 = DatanodeStatValue {
stats: vec![
Stat {
addr: "127.0.0.1:3001".to_string(),
@@ -106,7 +106,7 @@ mod tests {
],
};
let stat_value2 = StatValue {
let stat_value2 = DatanodeStatValue {
stats: vec![
Stat {
addr: "127.0.0.1:3002".to_string(),

View File

@@ -20,7 +20,7 @@ use tonic::codegen::http;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Result};
use crate::keys::{LeaseKey, LeaseValue};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease;
use crate::service::admin::{util, HttpHandler};
@@ -38,8 +38,7 @@ impl HttpHandler for NodeLeaseHandler {
) -> Result<http::Response<String>> {
let cluster_id = util::extract_cluster_id(params)?;
let leases =
lease::filter_datanodes(cluster_id, &self.meta_peer_client, |_, _| true).await?;
let leases = lease::alive_datanodes(cluster_id, &self.meta_peer_client, u64::MAX).await?;
let leases = leases
.into_iter()
.map(|(k, v)| HumanLease {
@@ -59,7 +58,7 @@ impl HttpHandler for NodeLeaseHandler {
#[derive(Debug, Serialize, Deserialize)]
pub struct HumanLease {
pub name: LeaseKey,
pub name: DatanodeLeaseKey,
pub human_time: String,
pub lease: LeaseValue,
}

View File

@@ -34,7 +34,7 @@ use table::requests::TableOptions;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::keys::{LeaseKey, LeaseValue};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lock::memory::MemLock;
use crate::metasrv::SelectorContext;
use crate::procedure::region_failover::RegionFailoverManager;
@@ -71,6 +71,7 @@ pub(crate) fn create_selector_context() -> SelectorContext {
SelectorContext {
datanode_lease_secs: 10,
flownode_lease_secs: 10,
server_addr: "127.0.0.1:3002".to_string(),
kv_backend: in_memory,
meta_peer_client,
@@ -175,7 +176,7 @@ pub(crate) async fn put_datanodes(
) {
let backend = meta_peer_client.memory_backend();
for datanode in datanodes {
let lease_key = LeaseKey {
let lease_key = DatanodeLeaseKey {
cluster_id,
node_id: datanode.id,
};

View File

@@ -300,7 +300,7 @@ impl GreptimeDbClusterBuilder {
) {
for _ in 0..10 {
let alive_datanodes =
meta_srv::lease::filter_datanodes(1000, meta_peer_client, |_, _| true)
meta_srv::lease::alive_datanodes(1000, meta_peer_client, u64::MAX)
.await
.unwrap()
.len();

View File

@@ -357,6 +357,7 @@ async fn run_region_failover_procedure(
selector,
selector_ctx: SelectorContext {
datanode_lease_secs: distributed_time_constants::REGION_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::REGION_LEASE_SECS,
server_addr: metasrv.options().server_addr.clone(),
kv_backend: metasrv.kv_backend().clone(),
meta_peer_client: metasrv.meta_peer_client().clone(),