mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
fix: missing datanode id on keep lease (#2415)
This commit is contained in:
@@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, RequestHeader, Role};
|
||||
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role};
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::{
|
||||
@@ -170,14 +170,9 @@ impl HeartbeatTask {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let req_header = Some(RequestHeader {
|
||||
member_id: node_id,
|
||||
role: Role::Datanode as i32,
|
||||
..Default::default()
|
||||
});
|
||||
let self_peer = Some(Peer {
|
||||
id: node_id,
|
||||
addr: addr.clone(),
|
||||
..Default::default()
|
||||
});
|
||||
let epoch = self.region_alive_keeper.epoch();
|
||||
|
||||
@@ -222,7 +217,6 @@ impl HeartbeatTask {
|
||||
match outgoing_message_to_mailbox_message(message) {
|
||||
Ok(message) => {
|
||||
let req = HeartbeatRequest {
|
||||
header: req_header.clone(),
|
||||
peer: self_peer.clone(),
|
||||
mailbox_message: Some(message),
|
||||
..Default::default()
|
||||
@@ -243,7 +237,6 @@ impl HeartbeatTask {
|
||||
let now = Instant::now();
|
||||
let duration_since_epoch = (now - epoch).as_millis() as u64;
|
||||
let req = HeartbeatRequest {
|
||||
header: req_header.clone(),
|
||||
peer: self_peer.clone(),
|
||||
region_stats,
|
||||
duration_since_epoch,
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, RequestHeader, Role};
|
||||
use api::v1::meta::HeartbeatRequest;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
|
||||
};
|
||||
@@ -109,11 +109,6 @@ impl HeartbeatTask {
|
||||
) {
|
||||
let report_interval = self.report_interval;
|
||||
|
||||
let req_header = Some(RequestHeader {
|
||||
role: Role::Frontend as i32,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
tokio::pin!(sleep);
|
||||
@@ -125,7 +120,6 @@ impl HeartbeatTask {
|
||||
match outgoing_message_to_mailbox_message(message) {
|
||||
Ok(message) => {
|
||||
let req = HeartbeatRequest {
|
||||
header: req_header.clone(),
|
||||
mailbox_message: Some(message),
|
||||
..Default::default()
|
||||
};
|
||||
@@ -144,7 +138,6 @@ impl HeartbeatTask {
|
||||
_ = &mut sleep => {
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
|
||||
let req = HeartbeatRequest {
|
||||
header: req_header.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
Some(req)
|
||||
|
||||
@@ -242,7 +242,8 @@ impl Instance {
|
||||
let channel_manager = ChannelManager::with_config(channel_config);
|
||||
let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config);
|
||||
|
||||
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Frontend)
|
||||
let cluster_id = 0; // TODO(jeremy): read from config
|
||||
let mut meta_client = MetaClientBuilder::new(cluster_id, 0, Role::Frontend)
|
||||
.enable_router()
|
||||
.enable_store()
|
||||
.enable_heartbeat()
|
||||
|
||||
@@ -37,12 +37,15 @@ impl HeartbeatHandler for KeepLeaseHandler {
|
||||
_acc: &mut HeartbeatAccumulator,
|
||||
) -> Result<()> {
|
||||
let HeartbeatRequest { header, peer, .. } = req;
|
||||
let Some(header) = &header else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some(peer) = &peer else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let key = LeaseKey {
|
||||
cluster_id: header.as_ref().map_or(0, |h| h.cluster_id),
|
||||
cluster_id: header.cluster_id,
|
||||
node_id: peer.id,
|
||||
};
|
||||
let value = LeaseValue {
|
||||
|
||||
@@ -104,7 +104,7 @@ impl TryFrom<HeartbeatRequest> for Stat {
|
||||
timestamp_millis: time_util::current_time_millis(),
|
||||
cluster_id: header.cluster_id,
|
||||
// datanode id
|
||||
id: header.member_id,
|
||||
id: peer.id,
|
||||
// datanode address
|
||||
addr: peer.addr,
|
||||
rcus: region_stats.iter().map(|s| s.rcus).sum(),
|
||||
|
||||
@@ -99,8 +99,8 @@ impl Selector for LoadBasedSelector {
|
||||
|
||||
Ok(tuples
|
||||
.into_iter()
|
||||
.map(|(stat_key, lease_val, _)| Peer {
|
||||
id: stat_key.node_id,
|
||||
.map(|(lease_key, lease_val, _)| Peer {
|
||||
id: lease_key.node_id,
|
||||
addr: lease_val.node_addr,
|
||||
})
|
||||
.collect())
|
||||
|
||||
Reference in New Issue
Block a user