mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 00:10:38 +00:00
fix: no active datanode when frontend start (#1533)
* fix: no active datanode when frontend start * chore: add log when can not get stat_val
This commit is contained in:
@@ -193,6 +193,15 @@ pub struct StatKey {
|
||||
pub node_id: u64,
|
||||
}
|
||||
|
||||
impl From<&LeaseKey> for StatKey {
|
||||
fn from(lease_key: &LeaseKey) -> Self {
|
||||
StatKey {
|
||||
cluster_id: lease_key.cluster_id,
|
||||
node_id: lease_key.node_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<StatKey> for Vec<u8> {
|
||||
fn from(value: StatKey) -> Self {
|
||||
format!("{}-{}-{}", DN_STAT_PREFIX, value.cluster_id, value.node_id).into_bytes()
|
||||
@@ -395,4 +404,17 @@ mod tests {
|
||||
let region_num = stat_val.region_num().unwrap();
|
||||
assert_eq!(1, region_num);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lease_key_to_stat_key() {
|
||||
let lease_key = LeaseKey {
|
||||
cluster_id: 1,
|
||||
node_id: 101,
|
||||
};
|
||||
|
||||
let stat_key: StatKey = (&lease_key).into();
|
||||
|
||||
assert_eq!(1, stat_key.cluster_id);
|
||||
assert_eq!(101, stat_key.node_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::meta::Peer;
|
||||
use common_telemetry::warn;
|
||||
use common_time::util as time_util;
|
||||
|
||||
use crate::cluster::MetaPeerClient;
|
||||
@@ -24,6 +25,8 @@ use crate::lease;
|
||||
use crate::metasrv::Context;
|
||||
use crate::selector::{Namespace, Selector};
|
||||
|
||||
const MAX_REGION_NUMBER: u64 = u64::MAX;
|
||||
|
||||
pub struct LoadBasedSelector {
|
||||
pub meta_peer_client: MetaPeerClient,
|
||||
}
|
||||
@@ -44,6 +47,10 @@ impl Selector for LoadBasedSelector {
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
if lease_kvs.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// get stats of alive datanodes
|
||||
let stat_keys: Vec<StatKey> = lease_kvs
|
||||
.keys()
|
||||
@@ -54,17 +61,23 @@ impl Selector for LoadBasedSelector {
|
||||
.collect();
|
||||
let stat_kvs = self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?;
|
||||
|
||||
// aggregate lease and stat information
|
||||
let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = stat_kvs
|
||||
let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs
|
||||
.into_iter()
|
||||
.filter_map(|(stat_key, stat_val)| {
|
||||
let lease_key = to_lease_key(&stat_key);
|
||||
match (lease_kvs.get(&lease_key), stat_val.region_num()) {
|
||||
(Some(lease_val), Some(region_num)) => {
|
||||
Some((lease_key, lease_val.clone(), region_num))
|
||||
.map(|(lease_k, lease_v)| {
|
||||
let stat_key: StatKey = (&lease_k).into();
|
||||
|
||||
let region_num = match stat_kvs
|
||||
.get(&stat_key)
|
||||
.and_then(|stat_val| stat_val.region_num())
|
||||
{
|
||||
Some(region_num) => region_num,
|
||||
None => {
|
||||
warn!("Failed to get stat_val by stat_key {:?}", stat_key);
|
||||
MAX_REGION_NUMBER
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
};
|
||||
|
||||
(lease_k, lease_v, region_num)
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -80,27 +93,3 @@ impl Selector for LoadBasedSelector {
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
fn to_lease_key(k: &StatKey) -> LeaseKey {
|
||||
LeaseKey {
|
||||
cluster_id: k.cluster_id,
|
||||
node_id: k.node_id,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::to_lease_key;
|
||||
use crate::keys::StatKey;
|
||||
|
||||
#[test]
|
||||
fn test_to_lease_key() {
|
||||
let statkey = StatKey {
|
||||
cluster_id: 1,
|
||||
node_id: 101,
|
||||
};
|
||||
let lease_key = to_lease_key(&statkey);
|
||||
assert_eq!(1, lease_key.cluster_id);
|
||||
assert_eq!(101, lease_key.node_id);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user