fix: obtain system time after fetching lease values (#7223)

* fix: acquire system time inside closure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-11-14 14:53:15 +08:00
committed by GitHub
parent 7cc0439cc9
commit c1e762960a
5 changed files with 239 additions and 50 deletions

View File

@@ -26,6 +26,7 @@ use common_meta::distributed_time_constants::{
use common_meta::error::Result;
use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
use common_meta::{DatanodeId, FlownodeId};
use common_time::util::DefaultSystemTimer;
use snafu::ResultExt;
use crate::cluster::MetaPeerClient;
@@ -35,6 +36,7 @@ use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
impl PeerDiscovery for MetaPeerClient {
async fn active_frontends(&self) -> Result<Vec<Peer>> {
utils::alive_frontends(
&DefaultSystemTimer,
self,
Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
)
@@ -47,20 +49,30 @@ impl PeerDiscovery for MetaPeerClient {
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
utils::alive_datanodes(self, Duration::from_secs(DATANODE_LEASE_SECS), filter)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
utils::alive_datanodes(
&DefaultSystemTimer,
self,
Duration::from_secs(DATANODE_LEASE_SECS),
filter,
)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn active_flownodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
utils::alive_flownodes(self, Duration::from_secs(FLOWNODE_LEASE_SECS), filter)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
utils::alive_flownodes(
&DefaultSystemTimer,
self,
Duration::from_secs(FLOWNODE_LEASE_SECS),
filter,
)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}

View File

@@ -95,20 +95,22 @@ impl LeaseValueAccessor for MetaPeerClient {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::Duration;
use api::v1::meta::DatanodeWorkloads;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads};
use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role};
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::peer::{Peer, PeerDiscovery};
use common_meta::rpc::store::PutRequest;
use common_time::util::current_time_millis;
use common_time::util::{DefaultSystemTimer, SystemTimer, current_time_millis};
use common_workload::DatanodeWorkloadType;
use crate::discovery::utils::{self, accept_ingest_workload};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
use crate::test_util::create_meta_peer_client;
async fn put_lease_value(
@@ -126,17 +128,47 @@ mod tests {
.unwrap();
}
async fn put_flownode_lease_value(
kv_backend: &ResettableKvBackendRef,
key: FlownodeLeaseKey,
value: LeaseValue,
) {
kv_backend
.put(PutRequest {
key: key.try_into().unwrap(),
value: value.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
}
struct MockTimer {
current: Arc<AtomicI64>,
}
impl SystemTimer for MockTimer {
fn current_time_millis(&self) -> i64 {
self.current.fetch_add(1, Ordering::Relaxed)
}
fn current_time_rfc3339(&self) -> String {
unimplemented!()
}
}
#[tokio::test]
async fn test_alive_datanodes() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = DefaultSystemTimer;
// put a stale lease value for node 1
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: current_time_millis() - lease_secs * 2 * 1000,
timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
@@ -147,7 +179,7 @@ mod tests {
// put a fresh lease value for node 2
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
@@ -155,6 +187,37 @@ mod tests {
};
put_lease_value(&in_memory, key.clone(), value.clone()).await;
let peers = utils::alive_datanodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs as u64),
None,
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
}
#[tokio::test]
async fn test_alive_datanodes_with_timer() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = MockTimer {
current: Arc::new(AtomicI64::new(current_time_millis())),
};
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key.clone(), value.clone()).await;
let peers = utils::alive_datanodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs as u64),
None,
@@ -170,12 +233,13 @@ mod tests {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = DefaultSystemTimer;
// put a lease value for node 1 without mode info
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: current_time_millis() - 20 * 1000,
timestamp_millis: timer.current_time_millis() - 20 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
@@ -186,7 +250,7 @@ mod tests {
// put a lease value for node 2 with mode info
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
@@ -197,7 +261,7 @@ mod tests {
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 3 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20203".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
@@ -208,7 +272,7 @@ mod tests {
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 4 };
let value = LeaseValue {
timestamp_millis: current_time_millis(),
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20204".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
@@ -217,6 +281,7 @@ mod tests {
put_lease_value(&in_memory, key, value).await;
let peers = utils::alive_datanodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs),
Some(accept_ingest_workload),
@@ -227,18 +292,84 @@ mod tests {
assert!(peers.contains(&Peer::new(2, "127.0.0.1:20202".to_string())));
}
#[tokio::test]
async fn test_alive_flownodes() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = DefaultSystemTimer;
// put a stale lease value for node 1
let key = FlownodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
};
put_flownode_lease_value(&in_memory, key, value).await;
// put a fresh lease value for node 2
let key = FlownodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
};
put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await;
let peers = utils::alive_flownodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs as u64),
None,
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
}
#[tokio::test]
async fn test_alive_flownodes_with_timer() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = MockTimer {
current: Arc::new(AtomicI64::new(current_time_millis())),
};
let key = FlownodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
};
put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await;
let peers = utils::alive_flownodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs as u64),
None,
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
}
#[tokio::test]
async fn test_lookup_frontends() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = DefaultSystemTimer;
let active_frontend_node = NodeInfo {
peer: Peer {
id: 0,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts: current_time_millis(),
last_activity_ts: timer.current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
@@ -266,7 +397,7 @@ mod tests {
id: 1,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts: current_time_millis() - 20 * 1000,
last_activity_ts: timer.current_time_millis() - 20 * 1000,
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
@@ -287,9 +418,52 @@ mod tests {
.await
.unwrap();
let peers = utils::alive_frontends(client.as_ref(), Duration::from_secs(lease_secs))
let peers =
utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs))
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].id, 0);
}
#[tokio::test]
async fn test_lookup_frontends_with_timer() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = MockTimer {
current: Arc::new(AtomicI64::new(current_time_millis())),
};
let active_frontend_node = NodeInfo {
peer: Peer {
id: 0,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts: timer.current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: current_time_millis() as u64,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
};
let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "0").into(),
value: active_frontend_node.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
let peers =
utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs))
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].id, 0);
}

View File

@@ -19,7 +19,7 @@ use common_meta::DatanodeId;
use common_meta::cluster::NodeInfo;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_time::util::{DefaultSystemTimer, SystemTimer};
use common_time::util::SystemTimer;
use common_workload::DatanodeWorkloadType;
use snafu::ResultExt;
@@ -49,16 +49,9 @@ pub trait LastActiveTs {
/// Builds a filter closure that checks whether a [`LastActiveTs`] item
/// is still within the specified active duration, relative to the
/// current time provided by the given [`SystemTimer`].
///
/// The returned closure uses the timestamp at the time of building,
/// so the "now" reference point is fixed when this function is called.
pub fn build_active_filter<T: LastActiveTs>(
timer: impl SystemTimer,
active_duration: Duration,
) -> impl Fn(&T) -> bool {
let now = timer.current_time_millis();
let active_duration = active_duration.as_millis() as u64;
move |item: &T| {
pub fn build_active_filter<T: LastActiveTs>(active_duration: Duration) -> impl Fn(i64, &T) -> bool {
move |now: i64, item: &T| {
let active_duration = active_duration.as_millis() as u64;
let elapsed = now.saturating_sub(item.last_active_ts()) as u64;
elapsed < active_duration
}
@@ -66,18 +59,19 @@ pub fn build_active_filter<T: LastActiveTs>(
/// Returns the alive datanodes.
pub async fn alive_datanodes(
timer: &impl SystemTimer,
accessor: &impl LeaseValueAccessor,
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
let active_filter = build_active_filter(active_duration);
let condition = condition.unwrap_or(|_| true);
Ok(accessor
.lease_values(LeaseValueType::Datanode)
.await?
let lease_values = accessor.lease_values(LeaseValueType::Datanode).await?;
let now = timer.current_time_millis();
Ok(lease_values
.into_iter()
.filter_map(|(peer_id, lease_value)| {
if active_filter(&lease_value) && condition(&lease_value.workloads) {
if active_filter(now, &lease_value) && condition(&lease_value.workloads) {
Some(Peer::new(peer_id, lease_value.node_addr))
} else {
None
@@ -88,18 +82,19 @@ pub async fn alive_datanodes(
/// Returns the alive flownodes.
pub async fn alive_flownodes(
timer: &impl SystemTimer,
accessor: &impl LeaseValueAccessor,
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
let active_filter = build_active_filter(active_duration);
let condition = condition.unwrap_or(|_| true);
Ok(accessor
.lease_values(LeaseValueType::Flownode)
.await?
let lease_values = accessor.lease_values(LeaseValueType::Flownode).await?;
let now = timer.current_time_millis();
Ok(lease_values
.into_iter()
.filter_map(|(peer_id, lease_value)| {
if active_filter(&lease_value) && condition(&lease_value.workloads) {
if active_filter(now, &lease_value) && condition(&lease_value.workloads) {
Some(Peer::new(peer_id, lease_value.node_addr))
} else {
None
@@ -110,16 +105,17 @@ pub async fn alive_flownodes(
/// Returns the alive frontends.
pub async fn alive_frontends(
timer: &impl SystemTimer,
lister: &impl NodeInfoAccessor,
active_duration: Duration,
) -> Result<Vec<Peer>> {
let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
Ok(lister
.node_infos(NodeInfoType::Frontend)
.await?
let active_filter = build_active_filter(active_duration);
let node_infos = lister.node_infos(NodeInfoType::Frontend).await?;
let now = timer.current_time_millis();
Ok(node_infos
.into_iter()
.filter_map(|(_, node_info)| {
if active_filter(&node_info) {
if active_filter(now, &node_info) {
Some(node_info.peer)
} else {
None
@@ -130,15 +126,18 @@ pub async fn alive_frontends(
/// Returns the alive datanode peer.
pub async fn alive_datanode(
timer: &impl SystemTimer,
lister: &impl LeaseValueAccessor,
peer_id: u64,
active_duration: Duration,
) -> Result<Option<Peer>> {
let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
let v = lister
let active_filter = build_active_filter(active_duration);
let lease_value = lister
.lease_value(LeaseValueType::Datanode, peer_id)
.await?
.filter(|(_, lease)| active_filter(lease))
.await?;
let now = timer.current_time_millis();
let v = lease_value
.filter(|(_, lease)| active_filter(now, lease))
.map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
Ok(v)

View File

@@ -49,6 +49,7 @@ use common_procedure::options::ProcedureConfig;
use common_stat::ResourceStatRef;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_telemetry::{error, info, warn};
use common_time::util::DefaultSystemTimer;
use common_wal::config::MetasrvWalConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
@@ -735,6 +736,7 @@ impl Metasrv {
/// A datanode is considered alive when it's still within the lease period.
pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
discovery::utils::alive_datanode(
&DefaultSystemTimer,
self.meta_peer_client.as_ref(),
peer_id,
Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS),

View File

@@ -46,6 +46,7 @@ use common_runtime::Builder as RuntimeBuilder;
use common_runtime::runtime::BuilderBuild;
use common_stat::ResourceStatImpl;
use common_test_util::temp_dir::create_temp_dir;
use common_time::util::DefaultSystemTimer;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
@@ -319,6 +320,7 @@ impl GreptimeDbClusterBuilder {
) {
for _ in 0..100 {
let alive_datanodes = discovery::utils::alive_datanodes(
&DefaultSystemTimer,
meta_peer_client.as_ref(),
Duration::from_secs(u64::MAX),
None,