mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
fix: time unit mismatch in lookup_frontends function (#6798)
* fix: lookup frontend Signed-off-by: WenyXu <wenymedia@gmail.com> * test: add tests Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -17,6 +17,7 @@ use std::future::Future;
|
||||
use std::hash::Hash;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use common_error::ext::BoxedError;
|
||||
@@ -39,14 +40,14 @@ enum Value<'a> {
|
||||
NodeInfo(&'a NodeInfo),
|
||||
}
|
||||
|
||||
fn build_lease_filter(lease_secs: u64) -> impl Fn(Value) -> bool {
|
||||
fn build_lease_filter(lease: Duration) -> impl Fn(Value) -> bool {
|
||||
move |value: Value| {
|
||||
let active_time = match value {
|
||||
Value::LeaseValue(lease_value) => lease_value.timestamp_millis,
|
||||
Value::NodeInfo(node_info) => node_info.last_activity_ts,
|
||||
};
|
||||
|
||||
((time_util::current_time_millis() - active_time) as u64) < lease_secs.saturating_mul(1000)
|
||||
((time_util::current_time_millis() - active_time) as u64) < lease.as_millis() as u64
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,9 +93,9 @@ pub async fn find_datanode_lease_value(
|
||||
pub async fn lookup_datanode_peer(
|
||||
datanode_id: DatanodeId,
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
lease_secs: u64,
|
||||
lease: Duration,
|
||||
) -> Result<Option<Peer>> {
|
||||
let lease_filter = build_lease_filter(lease_secs);
|
||||
let lease_filter = build_lease_filter(lease);
|
||||
let lease_key = DatanodeLeaseKey {
|
||||
node_id: datanode_id,
|
||||
};
|
||||
@@ -121,7 +122,7 @@ pub struct LeaseFilter<'a, K>
|
||||
where
|
||||
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
|
||||
{
|
||||
lease_secs: u64,
|
||||
lease: Duration,
|
||||
key_prefix: Vec<u8>,
|
||||
meta_peer_client: &'a MetaPeerClientRef,
|
||||
condition: Option<fn(&LeaseValue) -> bool>,
|
||||
@@ -133,12 +134,12 @@ where
|
||||
K: Eq + Hash + TryFrom<Vec<u8>, Error = Error> + 'a,
|
||||
{
|
||||
pub fn new(
|
||||
lease_secs: u64,
|
||||
lease: Duration,
|
||||
key_prefix: Vec<u8>,
|
||||
meta_peer_client: &'a MetaPeerClientRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
lease_secs,
|
||||
lease,
|
||||
key_prefix,
|
||||
meta_peer_client,
|
||||
condition: None,
|
||||
@@ -163,7 +164,7 @@ where
|
||||
let this = self.get_mut();
|
||||
|
||||
if this.inner_future.is_none() {
|
||||
let lease_filter = build_lease_filter(this.lease_secs);
|
||||
let lease_filter = build_lease_filter(this.lease);
|
||||
let condition = this.condition;
|
||||
let key_prefix = std::mem::take(&mut this.key_prefix);
|
||||
let fut = filter(key_prefix, this.meta_peer_client, move |v| {
|
||||
@@ -183,18 +184,18 @@ where
|
||||
/// Find all alive datanodes
|
||||
pub fn alive_datanodes(
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
lease_secs: u64,
|
||||
lease: Duration,
|
||||
) -> LeaseFilter<'_, DatanodeLeaseKey> {
|
||||
LeaseFilter::new(lease_secs, DatanodeLeaseKey::prefix_key(), meta_peer_client)
|
||||
LeaseFilter::new(lease, DatanodeLeaseKey::prefix_key(), meta_peer_client)
|
||||
}
|
||||
|
||||
/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs`
|
||||
pub async fn lookup_flownode_peer(
|
||||
flownode_id: FlownodeId,
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
lease_secs: u64,
|
||||
lease: Duration,
|
||||
) -> Result<Option<Peer>> {
|
||||
let lease_filter = build_lease_filter(lease_secs);
|
||||
let lease_filter = build_lease_filter(lease);
|
||||
let lease_key = FlownodeLeaseKey {
|
||||
node_id: flownode_id,
|
||||
};
|
||||
@@ -218,13 +219,13 @@ pub async fn lookup_flownode_peer(
|
||||
/// Lookup all alive frontends from the memory backend, only return if it's alive under given `lease_secs`.
|
||||
pub async fn lookup_frontends(
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
lease_secs: u64,
|
||||
lease: Duration,
|
||||
) -> Result<Vec<Peer>> {
|
||||
let range_request =
|
||||
RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend));
|
||||
|
||||
let response = meta_peer_client.range(range_request).await?;
|
||||
let lease_filter = build_lease_filter(lease_secs);
|
||||
let lease_filter = build_lease_filter(lease);
|
||||
|
||||
let mut peers = Vec::with_capacity(response.kvs.len());
|
||||
for kv in response.kvs {
|
||||
@@ -241,10 +242,10 @@ pub async fn lookup_frontends(
|
||||
/// Find all alive flownodes
|
||||
pub fn alive_flownodes(
|
||||
meta_peer_client: &MetaPeerClientRef,
|
||||
lease_secs: u64,
|
||||
lease: Duration,
|
||||
) -> LeaseFilter<'_, FlownodeLeaseKey> {
|
||||
LeaseFilter::new(
|
||||
lease_secs,
|
||||
lease,
|
||||
FlownodeLeaseKey::prefix_key_by_cluster(),
|
||||
meta_peer_client,
|
||||
)
|
||||
@@ -294,14 +295,14 @@ impl MetaPeerLookupService {
|
||||
#[async_trait::async_trait]
|
||||
impl PeerLookupService for MetaPeerLookupService {
|
||||
async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result<Option<Peer>> {
|
||||
lookup_datanode_peer(id, &self.meta_peer_client, u64::MAX)
|
||||
lookup_datanode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result<Option<Peer>> {
|
||||
lookup_flownode_peer(id, &self.meta_peer_client, u64::MAX)
|
||||
lookup_flownode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
@@ -312,7 +313,7 @@ impl PeerLookupService for MetaPeerLookupService {
|
||||
lookup_frontends(
|
||||
&self.meta_peer_client,
|
||||
// TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
|
||||
FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
|
||||
Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
@@ -322,11 +323,14 @@ impl PeerLookupService for MetaPeerLookupService {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::heartbeat_request::NodeWorkloads;
|
||||
use api::v1::meta::DatanodeWorkloads;
|
||||
use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
|
||||
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
|
||||
use common_meta::kv_backend::ResettableKvBackendRef;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::peer::{Peer, PeerLookupService};
|
||||
use common_meta::rpc::store::PutRequest;
|
||||
use common_time::util::current_time_millis;
|
||||
use common_workload::DatanodeWorkloadType;
|
||||
@@ -334,6 +338,7 @@ mod tests {
|
||||
use crate::key::{DatanodeLeaseKey, LeaseValue};
|
||||
use crate::lease::{
|
||||
alive_datanodes, is_datanode_accept_ingest_workload, lookup_frontends, ClusterRole,
|
||||
MetaPeerLookupService,
|
||||
};
|
||||
use crate::test_util::create_meta_peer_client;
|
||||
|
||||
@@ -380,7 +385,9 @@ mod tests {
|
||||
}),
|
||||
};
|
||||
put_lease_value(&in_memory, key.clone(), value.clone()).await;
|
||||
let leases = alive_datanodes(&client, lease_secs as u64).await.unwrap();
|
||||
let leases = alive_datanodes(&client, Duration::from_secs(lease_secs as u64))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(leases.len(), 1);
|
||||
assert_eq!(leases.get(&key), Some(&value));
|
||||
}
|
||||
@@ -436,7 +443,7 @@ mod tests {
|
||||
};
|
||||
put_lease_value(&in_memory, key, value).await;
|
||||
|
||||
let leases = alive_datanodes(&client, lease_secs as u64)
|
||||
let leases = alive_datanodes(&client, Duration::from_secs(lease_secs))
|
||||
.with_condition(is_datanode_accept_ingest_workload)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -494,9 +501,46 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let peers = lookup_frontends(&client, lease_secs as u64).await.unwrap();
|
||||
let peers = lookup_frontends(&client, Duration::from_secs(lease_secs))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(peers.len(), 1);
|
||||
assert_eq!(peers[0].id, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_no_active_frontends() {
|
||||
let client = create_meta_peer_client();
|
||||
let in_memory = client.memory_backend();
|
||||
|
||||
let last_activity_ts =
|
||||
current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000;
|
||||
let active_frontend_node = NodeInfo {
|
||||
peer: Peer {
|
||||
id: 0,
|
||||
addr: "127.0.0.1:20201".to_string(),
|
||||
},
|
||||
last_activity_ts,
|
||||
status: NodeStatus::Frontend(FrontendStatus {}),
|
||||
version: "1.0.0".to_string(),
|
||||
git_commit: "1234567890".to_string(),
|
||||
start_time_ms: last_activity_ts as u64,
|
||||
};
|
||||
|
||||
let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend);
|
||||
|
||||
in_memory
|
||||
.put(PutRequest {
|
||||
key: format!("{}{}", key_prefix, "0").into(),
|
||||
value: active_frontend_node.try_into().unwrap(),
|
||||
prev_kv: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let service = MetaPeerLookupService::new(client);
|
||||
let peers = service.active_frontends().await.unwrap();
|
||||
assert_eq!(peers.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -688,7 +688,7 @@ impl Metasrv {
|
||||
lookup_datanode_peer(
|
||||
peer_id,
|
||||
&self.meta_peer_client,
|
||||
distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::peer::Peer;
|
||||
|
||||
@@ -51,9 +52,12 @@ impl Selector for LeaseBasedSelector {
|
||||
|
||||
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
|
||||
// 1. get alive datanodes.
|
||||
let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
|
||||
.with_condition(lease::is_datanode_accept_ingest_workload)
|
||||
.await?;
|
||||
let lease_kvs = lease::alive_datanodes(
|
||||
&ctx.meta_peer_client,
|
||||
Duration::from_secs(ctx.datanode_lease_secs),
|
||||
)
|
||||
.with_condition(lease::is_datanode_accept_ingest_workload)
|
||||
.await?;
|
||||
|
||||
// 2. compute weight array, but the weight of each item is the same.
|
||||
let mut weight_array = lease_kvs
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
|
||||
use common_meta::key::TableMetadataManager;
|
||||
@@ -66,9 +67,12 @@ where
|
||||
|
||||
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output> {
|
||||
// 1. get alive datanodes.
|
||||
let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
|
||||
.with_condition(lease::is_datanode_accept_ingest_workload)
|
||||
.await?;
|
||||
let lease_kvs = lease::alive_datanodes(
|
||||
&ctx.meta_peer_client,
|
||||
Duration::from_secs(ctx.datanode_lease_secs),
|
||||
)
|
||||
.with_condition(lease::is_datanode_accept_ingest_workload)
|
||||
.await?;
|
||||
|
||||
// 2. get stat kvs and filter out expired datanodes.
|
||||
let stat_keys = lease_kvs.keys().map(|k| k.into()).collect();
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::peer::Peer;
|
||||
use snafu::ensure;
|
||||
@@ -61,10 +62,12 @@ impl RoundRobinSelector {
|
||||
let mut peers = match self.select_target {
|
||||
SelectTarget::Datanode => {
|
||||
// 1. get alive datanodes.
|
||||
let lease_kvs =
|
||||
lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs)
|
||||
.with_condition(lease::is_datanode_accept_ingest_workload)
|
||||
.await?;
|
||||
let lease_kvs = lease::alive_datanodes(
|
||||
&ctx.meta_peer_client,
|
||||
Duration::from_secs(ctx.datanode_lease_secs),
|
||||
)
|
||||
.with_condition(lease::is_datanode_accept_ingest_workload)
|
||||
.await?;
|
||||
|
||||
let mut exclude_peer_ids = self
|
||||
.node_excluder
|
||||
@@ -82,8 +85,11 @@ impl RoundRobinSelector {
|
||||
}
|
||||
SelectTarget::Flownode => {
|
||||
// 1. get alive flownodes.
|
||||
let lease_kvs =
|
||||
lease::alive_flownodes(&ctx.meta_peer_client, ctx.flownode_lease_secs).await?;
|
||||
let lease_kvs = lease::alive_flownodes(
|
||||
&ctx.meta_peer_client,
|
||||
Duration::from_secs(ctx.flownode_lease_secs),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// 2. map into peers
|
||||
lease_kvs
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
@@ -37,7 +38,8 @@ impl HttpHandler for NodeLeaseHandler {
|
||||
_: http::Method,
|
||||
_: &HashMap<String, String>,
|
||||
) -> Result<http::Response<String>> {
|
||||
let leases = lease::alive_datanodes(&self.meta_peer_client, u64::MAX).await?;
|
||||
let leases =
|
||||
lease::alive_datanodes(&self.meta_peer_client, Duration::from_secs(u64::MAX)).await?;
|
||||
let leases = leases
|
||||
.into_iter()
|
||||
.map(|(k, v)| HumanLease {
|
||||
|
||||
@@ -316,10 +316,11 @@ impl GreptimeDbClusterBuilder {
|
||||
expected_datanodes: usize,
|
||||
) {
|
||||
for _ in 0..100 {
|
||||
let alive_datanodes = meta_srv::lease::alive_datanodes(meta_peer_client, u64::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.len();
|
||||
let alive_datanodes =
|
||||
meta_srv::lease::alive_datanodes(meta_peer_client, Duration::from_secs(u64::MAX))
|
||||
.await
|
||||
.unwrap()
|
||||
.len();
|
||||
if alive_datanodes == expected_datanodes {
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user