mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 16:30:39 +00:00
refactor: add the active_frontends() in PeerLookupService (#6504)
Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
@@ -19,11 +19,17 @@ pub use api::v1::meta::Peer;
|
||||
use crate::error::Error;
|
||||
use crate::{DatanodeId, FlownodeId};
|
||||
|
||||
/// can query peer given a node id
|
||||
/// PeerLookupService is a service that can lookup peers.
|
||||
#[async_trait::async_trait]
|
||||
pub trait PeerLookupService {
|
||||
/// Returns the datanode with the given id. It may return inactive peers.
|
||||
async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>, Error>;
|
||||
|
||||
/// Returns the flownode with the given id. It may return inactive peers.
|
||||
async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>, Error>;
|
||||
|
||||
/// Returns all currently active frontend nodes that have reported a heartbeat within the most recent heartbeat interval from the in-memory backend.
|
||||
async fn active_frontends(&self) -> Result<Vec<Peer>, Error>;
|
||||
}
|
||||
|
||||
pub type PeerLookupServiceRef = Arc<dyn PeerLookupService + Send + Sync>;
|
||||
|
||||
@@ -213,6 +213,10 @@ impl PeerLookupService for NoopPeerLookupService {
|
||||
async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
|
||||
Ok(Some(Peer::empty(id)))
|
||||
}
|
||||
|
||||
async fn active_frontends(&self) -> Result<Vec<Peer>> {
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a kafka topic pool for testing.
|
||||
|
||||
@@ -20,8 +20,11 @@ use std::task::{Context, Poll};
|
||||
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
|
||||
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
|
||||
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
|
||||
use common_meta::peer::{Peer, PeerLookupService};
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_meta::{util, DatanodeId, FlownodeId};
|
||||
use common_time::util as time_util;
|
||||
use common_workload::DatanodeWorkloadType;
|
||||
@@ -31,10 +34,19 @@ use crate::cluster::MetaPeerClientRef;
|
||||
use crate::error::{Error, KvBackendSnafu, Result};
|
||||
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
|
||||
|
||||
fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
|
||||
move |v: &LeaseValue| {
|
||||
((time_util::current_time_millis() - v.timestamp_millis) as u64)
|
||||
< lease_secs.saturating_mul(1000)
|
||||
enum Value<'a> {
|
||||
LeaseValue(&'a LeaseValue),
|
||||
NodeInfo(&'a NodeInfo),
|
||||
}
|
||||
|
||||
fn build_lease_filter(lease_secs: u64) -> impl Fn(Value) -> bool {
|
||||
move |value: Value| {
|
||||
let active_time = match value {
|
||||
Value::LeaseValue(lease_value) => lease_value.timestamp_millis,
|
||||
Value::NodeInfo(node_info) => node_info.last_activity_ts,
|
||||
};
|
||||
|
||||
((time_util::current_time_millis() - active_time) as u64) < lease_secs.saturating_mul(1000)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,7 +103,7 @@ pub async fn lookup_datanode_peer(
|
||||
return Ok(None);
|
||||
};
|
||||
let lease_value: LeaseValue = kv.value.try_into()?;
|
||||
let is_alive = lease_filter(&lease_value);
|
||||
let is_alive = lease_filter(Value::LeaseValue(&lease_value));
|
||||
if is_alive {
|
||||
Ok(Some(Peer {
|
||||
id: lease_key.node_id,
|
||||
@@ -155,7 +167,7 @@ where
|
||||
let condition = this.condition;
|
||||
let key_prefix = std::mem::take(&mut this.key_prefix);
|
||||
let fut = filter(key_prefix, this.meta_peer_client, move |v| {
|
||||
lease_filter(v) && condition.unwrap_or(|_| true)(v)
|
||||
lease_filter(Value::LeaseValue(v)) && condition.unwrap_or(|_| true)(v)
|
||||
});
|
||||
|
||||
this.inner_future = Some(Box::pin(fut));
|
||||
@@ -192,7 +204,7 @@ pub async fn lookup_flownode_peer(
|
||||
};
|
||||
let lease_value: LeaseValue = kv.value.try_into()?;
|
||||
|
||||
let is_alive = lease_filter(&lease_value);
|
||||
let is_alive = lease_filter(Value::LeaseValue(&lease_value));
|
||||
if is_alive {
|
||||
Ok(Some(Peer {
|
||||
id: lease_key.node_id,
|
||||
@@ -203,6 +215,29 @@ pub async fn lookup_flownode_peer(
|
||||
}
|
||||
}
|
||||
|
||||
/// Lookup all alive frontends from the memory backend, only return if it's alive under given `lease_secs`.
|
||||
pub async fn lookup_frontends(
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
lease_secs: u64,
|
||||
) -> Result<Vec<Peer>> {
|
||||
let range_request =
|
||||
RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend));
|
||||
|
||||
let response = meta_peer_client.range(range_request).await?;
|
||||
let lease_filter = build_lease_filter(lease_secs);
|
||||
|
||||
let mut peers = Vec::with_capacity(response.kvs.len());
|
||||
for kv in response.kvs {
|
||||
let node_info = NodeInfo::try_from(kv.value).context(KvBackendSnafu)?;
|
||||
let is_alive = lease_filter(Value::NodeInfo(&node_info));
|
||||
if is_alive {
|
||||
peers.push(node_info.peer);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(peers)
|
||||
}
|
||||
|
||||
/// Find all alive flownodes
|
||||
pub fn alive_flownodes(
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
@@ -264,25 +299,42 @@ impl PeerLookupService for MetaPeerLookupService {
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
|
||||
lookup_flownode_peer(id, &self.meta_peer_client, u64::MAX)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn active_frontends(&self) -> common_meta::error::Result<Vec<Peer>> {
|
||||
// Get the active frontends within the last heartbeat interval.
|
||||
lookup_frontends(
|
||||
&self.meta_peer_client,
|
||||
// TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
|
||||
FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use api::v1::meta::DatanodeWorkloads;
|
||||
use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
|
||||
use common_meta::kv_backend::ResettableKvBackendRef;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::PutRequest;
|
||||
use common_time::util::current_time_millis;
|
||||
use common_workload::DatanodeWorkloadType;
|
||||
|
||||
use crate::key::{DatanodeLeaseKey, LeaseValue};
|
||||
use crate::lease::{alive_datanodes, is_datanode_accept_ingest_workload};
|
||||
use crate::lease::{
|
||||
alive_datanodes, is_datanode_accept_ingest_workload, lookup_frontends, ClusterRole,
|
||||
};
|
||||
use crate::test_util::create_meta_peer_client;
|
||||
|
||||
async fn put_lease_value(
|
||||
@@ -391,4 +443,60 @@ mod tests {
|
||||
assert_eq!(leases.len(), 1);
|
||||
assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 }));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_frontends() {
|
||||
let client = create_meta_peer_client();
|
||||
let in_memory = client.memory_backend();
|
||||
let lease_secs = 10;
|
||||
|
||||
let active_frontend_node = NodeInfo {
|
||||
peer: Peer {
|
||||
id: 0,
|
||||
addr: "127.0.0.1:20201".to_string(),
|
||||
},
|
||||
last_activity_ts: current_time_millis(),
|
||||
status: NodeStatus::Frontend(FrontendStatus {}),
|
||||
version: "1.0.0".to_string(),
|
||||
git_commit: "1234567890".to_string(),
|
||||
start_time_ms: current_time_millis() as u64,
|
||||
};
|
||||
|
||||
let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
|
||||
|
||||
in_memory
|
||||
.put(PutRequest {
|
||||
key: format!("{}{}", key_prefix, "0").into(),
|
||||
value: active_frontend_node.try_into().unwrap(),
|
||||
prev_kv: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let inactive_frontend_node = NodeInfo {
|
||||
peer: Peer {
|
||||
id: 1,
|
||||
addr: "127.0.0.1:20201".to_string(),
|
||||
},
|
||||
last_activity_ts: current_time_millis() - 20 * 1000,
|
||||
status: NodeStatus::Frontend(FrontendStatus {}),
|
||||
version: "1.0.0".to_string(),
|
||||
git_commit: "1234567890".to_string(),
|
||||
start_time_ms: current_time_millis() as u64,
|
||||
};
|
||||
|
||||
in_memory
|
||||
.put(PutRequest {
|
||||
key: format!("{}{}", key_prefix, "1").into(),
|
||||
value: inactive_frontend_node.try_into().unwrap(),
|
||||
prev_kv: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let peers = lookup_frontends(&client, lease_secs as u64).await.unwrap();
|
||||
|
||||
assert_eq!(peers.len(), 1);
|
||||
assert_eq!(peers[0].id, 0);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user