From 4c2955b86b7523f66fd73032960014ae68cf5571 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 22 Aug 2025 20:34:26 +0800 Subject: [PATCH] fix: time unit mismatch in `lookup_frontends` function (#6798) * fix: lookup frontend Signed-off-by: WenyXu * test: add tests Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/meta-srv/src/lease.rs | 90 +++++++++++++++----- src/meta-srv/src/metasrv.rs | 2 +- src/meta-srv/src/selector/lease_based.rs | 10 ++- src/meta-srv/src/selector/load_based.rs | 10 ++- src/meta-srv/src/selector/round_robin.rs | 18 ++-- src/meta-srv/src/service/admin/node_lease.rs | 4 +- tests-integration/src/cluster.rs | 9 +- 7 files changed, 102 insertions(+), 41 deletions(-) diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index ec6244450c..819b82fadc 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -17,6 +17,7 @@ use std::future::Future; use std::hash::Hash; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; use api::v1::meta::heartbeat_request::NodeWorkloads; use common_error::ext::BoxedError; @@ -39,14 +40,14 @@ enum Value<'a> { NodeInfo(&'a NodeInfo), } -fn build_lease_filter(lease_secs: u64) -> impl Fn(Value) -> bool { +fn build_lease_filter(lease: Duration) -> impl Fn(Value) -> bool { move |value: Value| { let active_time = match value { Value::LeaseValue(lease_value) => lease_value.timestamp_millis, Value::NodeInfo(node_info) => node_info.last_activity_ts, }; - ((time_util::current_time_millis() - active_time) as u64) < lease_secs.saturating_mul(1000) + ((time_util::current_time_millis() - active_time) as u64) < lease.as_millis() as u64 } } @@ -92,9 +93,9 @@ pub async fn find_datanode_lease_value( pub async fn lookup_datanode_peer( datanode_id: DatanodeId, meta_peer_client: &MetaPeerClientRef, - lease_secs: u64, + lease: Duration, ) -> Result> { - let lease_filter = build_lease_filter(lease_secs); + let lease_filter = build_lease_filter(lease); let lease_key = DatanodeLeaseKey { node_id: datanode_id, }; @@ -121,7 +122,7 @@ pub struct LeaseFilter<'a, K> where K: Eq + Hash + TryFrom, Error = Error> + 'a, { - lease_secs: u64, + lease: Duration, key_prefix: Vec, meta_peer_client: &'a MetaPeerClientRef, condition: Option bool>, @@ -133,12 +134,12 @@ where K: Eq + Hash + TryFrom, Error = Error> + 'a, { pub fn new( - lease_secs: u64, + lease: Duration, key_prefix: Vec, meta_peer_client: &'a MetaPeerClientRef, ) -> Self { Self { - lease_secs, + lease, key_prefix, meta_peer_client, condition: None, @@ -163,7 +164,7 @@ where let this = self.get_mut(); if this.inner_future.is_none() { - let lease_filter = build_lease_filter(this.lease_secs); + let lease_filter = build_lease_filter(this.lease); let condition = this.condition; let key_prefix = std::mem::take(&mut this.key_prefix); let fut = filter(key_prefix, this.meta_peer_client, move |v| { @@ -183,18 +184,18 @@ where /// Find all alive datanodes pub fn alive_datanodes( meta_peer_client: &MetaPeerClientRef, - lease_secs: u64, + lease: Duration, ) -> LeaseFilter<'_, DatanodeLeaseKey> { - LeaseFilter::new(lease_secs, DatanodeLeaseKey::prefix_key(), meta_peer_client) + LeaseFilter::new(lease, DatanodeLeaseKey::prefix_key(), meta_peer_client) } /// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], only return if it's alive under given `lease_secs` pub async fn lookup_flownode_peer( flownode_id: FlownodeId, meta_peer_client: &MetaPeerClientRef, - lease_secs: u64, + lease: Duration, ) -> Result> { - let lease_filter = build_lease_filter(lease_secs); + let lease_filter = build_lease_filter(lease); let lease_key = FlownodeLeaseKey { node_id: flownode_id, }; @@ -218,13 +219,13 @@ pub async fn lookup_flownode_peer( /// Lookup all alive frontends from the memory backend, only return if it's alive under given `lease_secs`. pub async fn lookup_frontends( meta_peer_client: &MetaPeerClientRef, - lease_secs: u64, + lease: Duration, ) -> Result> { let range_request = RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend)); let response = meta_peer_client.range(range_request).await?; - let lease_filter = build_lease_filter(lease_secs); + let lease_filter = build_lease_filter(lease); let mut peers = Vec::with_capacity(response.kvs.len()); for kv in response.kvs { @@ -241,10 +242,10 @@ pub async fn lookup_frontends( /// Find all alive flownodes pub fn alive_flownodes( meta_peer_client: &MetaPeerClientRef, - lease_secs: u64, + lease: Duration, ) -> LeaseFilter<'_, FlownodeLeaseKey> { LeaseFilter::new( - lease_secs, + lease, FlownodeLeaseKey::prefix_key_by_cluster(), meta_peer_client, ) @@ -294,14 +295,14 @@ impl MetaPeerLookupService { #[async_trait::async_trait] impl PeerLookupService for MetaPeerLookupService { async fn datanode(&self, id: DatanodeId) -> common_meta::error::Result> { - lookup_datanode_peer(id, &self.meta_peer_client, u64::MAX) + lookup_datanode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX)) .await .map_err(BoxedError::new) .context(common_meta::error::ExternalSnafu) } async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result> { - lookup_flownode_peer(id, &self.meta_peer_client, u64::MAX) + lookup_flownode_peer(id, &self.meta_peer_client, Duration::from_secs(u64::MAX)) .await .map_err(BoxedError::new) .context(common_meta::error::ExternalSnafu) @@ -312,7 +313,7 @@ impl PeerLookupService for MetaPeerLookupService { lookup_frontends( &self.meta_peer_client, // TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval? - FRONTEND_HEARTBEAT_INTERVAL_MILLIS, + Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS), ) .await .map_err(BoxedError::new) @@ -322,11 +323,14 @@ impl PeerLookupService for MetaPeerLookupService { #[cfg(test)] mod tests { + use std::time::Duration; + use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::DatanodeWorkloads; use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus}; + use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; use common_meta::kv_backend::ResettableKvBackendRef; - use common_meta::peer::Peer; + use common_meta::peer::{Peer, PeerLookupService}; use common_meta::rpc::store::PutRequest; use common_time::util::current_time_millis; use common_workload::DatanodeWorkloadType; @@ -334,6 +338,7 @@ mod tests { use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::lease::{ alive_datanodes, is_datanode_accept_ingest_workload, lookup_frontends, ClusterRole, + MetaPeerLookupService, }; use crate::test_util::create_meta_peer_client; @@ -380,7 +385,9 @@ mod tests { }), }; put_lease_value(&in_memory, key.clone(), value.clone()).await; - let leases = alive_datanodes(&client, lease_secs as u64).await.unwrap(); + let leases = alive_datanodes(&client, Duration::from_secs(lease_secs as u64)) + .await + .unwrap(); assert_eq!(leases.len(), 1); assert_eq!(leases.get(&key), Some(&value)); } @@ -436,7 +443,7 @@ mod tests { }; put_lease_value(&in_memory, key, value).await; - let leases = alive_datanodes(&client, lease_secs as u64) + let leases = alive_datanodes(&client, Duration::from_secs(lease_secs)) .with_condition(is_datanode_accept_ingest_workload) .await .unwrap(); @@ -494,9 +501,46 @@ mod tests { .await .unwrap(); - let peers = lookup_frontends(&client, lease_secs as u64).await.unwrap(); + let peers = lookup_frontends(&client, Duration::from_secs(lease_secs)) + .await + .unwrap(); assert_eq!(peers.len(), 1); assert_eq!(peers[0].id, 0); } + + #[tokio::test] + async fn test_no_active_frontends() { + let client = create_meta_peer_client(); + let in_memory = client.memory_backend(); + + let last_activity_ts = + current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000; + let active_frontend_node = NodeInfo { + peer: Peer { + id: 0, + addr: "127.0.0.1:20201".to_string(), + }, + last_activity_ts, + status: NodeStatus::Frontend(FrontendStatus {}), + version: "1.0.0".to_string(), + git_commit: "1234567890".to_string(), + start_time_ms: last_activity_ts as u64, + }; + + let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend); + + in_memory + .put(PutRequest { + key: format!("{}{}", key_prefix, "0").into(), + value: active_frontend_node.try_into().unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + + let service = MetaPeerLookupService::new(client); + let peers = service.active_frontends().await.unwrap(); + assert_eq!(peers.len(), 0); + } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index dc3bc6c32b..28cd2b13b9 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -688,7 +688,7 @@ impl Metasrv { lookup_datanode_peer( peer_id, &self.meta_peer_client, - distributed_time_constants::DATANODE_LEASE_SECS, + Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS), ) .await } diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index 53a0dcceb5..5ddf3881d6 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -14,6 +14,7 @@ use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; use common_meta::peer::Peer; @@ -51,9 +52,12 @@ impl Selector for LeaseBasedSelector { async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result { // 1. get alive datanodes. - let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs) - .with_condition(lease::is_datanode_accept_ingest_workload) - .await?; + let lease_kvs = lease::alive_datanodes( + &ctx.meta_peer_client, + Duration::from_secs(ctx.datanode_lease_secs), + ) + .with_condition(lease::is_datanode_accept_ingest_workload) + .await?; // 2. compute weight array, but the weight of each item is the same. let mut weight_array = lease_kvs diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 70af63f085..5863afe8ce 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -14,6 +14,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue}; use common_meta::key::TableMetadataManager; @@ -66,9 +67,12 @@ where async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result { // 1. get alive datanodes. - let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs) - .with_condition(lease::is_datanode_accept_ingest_workload) - .await?; + let lease_kvs = lease::alive_datanodes( + &ctx.meta_peer_client, + Duration::from_secs(ctx.datanode_lease_secs), + ) + .with_condition(lease::is_datanode_accept_ingest_workload) + .await?; // 2. get stat kvs and filter out expired datanodes. let stat_keys = lease_kvs.keys().map(|k| k.into()).collect(); diff --git a/src/meta-srv/src/selector/round_robin.rs b/src/meta-srv/src/selector/round_robin.rs index 536892dfca..cbe9246b86 100644 --- a/src/meta-srv/src/selector/round_robin.rs +++ b/src/meta-srv/src/selector/round_robin.rs @@ -15,6 +15,7 @@ use std::collections::HashSet; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use std::time::Duration; use common_meta::peer::Peer; use snafu::ensure; @@ -61,10 +62,12 @@ impl RoundRobinSelector { let mut peers = match self.select_target { SelectTarget::Datanode => { // 1. get alive datanodes. - let lease_kvs = - lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs) - .with_condition(lease::is_datanode_accept_ingest_workload) - .await?; + let lease_kvs = lease::alive_datanodes( + &ctx.meta_peer_client, + Duration::from_secs(ctx.datanode_lease_secs), + ) + .with_condition(lease::is_datanode_accept_ingest_workload) + .await?; let mut exclude_peer_ids = self .node_excluder @@ -82,8 +85,11 @@ impl RoundRobinSelector { } SelectTarget::Flownode => { // 1. get alive flownodes. - let lease_kvs = - lease::alive_flownodes(&ctx.meta_peer_client, ctx.flownode_lease_secs).await?; + let lease_kvs = lease::alive_flownodes( + &ctx.meta_peer_client, + Duration::from_secs(ctx.flownode_lease_secs), + ) + .await?; // 2. map into peers lease_kvs diff --git a/src/meta-srv/src/service/admin/node_lease.rs b/src/meta-srv/src/service/admin/node_lease.rs index bce3591054..f3216e1644 100644 --- a/src/meta-srv/src/service/admin/node_lease.rs +++ b/src/meta-srv/src/service/admin/node_lease.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::time::Duration; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -37,7 +38,8 @@ impl HttpHandler for NodeLeaseHandler { _: http::Method, _: &HashMap, ) -> Result> { - let leases = lease::alive_datanodes(&self.meta_peer_client, u64::MAX).await?; + let leases = + lease::alive_datanodes(&self.meta_peer_client, Duration::from_secs(u64::MAX)).await?; let leases = leases .into_iter() .map(|(k, v)| HumanLease { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 8242e4022e..1c13e52e2a 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -316,10 +316,11 @@ impl GreptimeDbClusterBuilder { expected_datanodes: usize, ) { for _ in 0..100 { - let alive_datanodes = meta_srv::lease::alive_datanodes(meta_peer_client, u64::MAX) - .await - .unwrap() - .len(); + let alive_datanodes = + meta_srv::lease::alive_datanodes(meta_peer_client, Duration::from_secs(u64::MAX)) + .await + .unwrap() + .len(); if alive_datanodes == expected_datanodes { return; }