feat: heartbeat request with header (#2412)

* feat: heartbeat request with header

* chore: frontend send heartbeat with a longer interval
This commit is contained in:
JeremyHi
2023-09-16 17:56:41 +08:00
committed by GitHub
parent 0a692aafb0
commit 342a6d071f
7 changed files with 58 additions and 20 deletions

View File

@@ -347,7 +347,7 @@ impl Default for DatanodeOptions {
storage: StorageConfig::default(),
region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
heartbeat: HeartbeatOptions::datanode_default(),
enable_telemetry: true,
}
}

View File

@@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role};
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, RequestHeader, Role};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
@@ -169,7 +169,17 @@ impl HeartbeatTask {
)
.await?;
let req_header = Some(RequestHeader {
member_id: node_id,
role: Role::Datanode as i32,
..Default::default()
});
let self_peer = Some(Peer {
addr: addr.clone(),
..Default::default()
});
let epoch = self.region_alive_keeper.epoch();
common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
@@ -184,9 +194,9 @@ impl HeartbeatTask {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let peer = Some(Peer { id: node_id, addr: addr.clone() });
let req = HeartbeatRequest {
peer,
header: req_header.clone(),
peer: self_peer.clone(),
mailbox_message: Some(message),
..Default::default()
};
@@ -202,12 +212,12 @@ impl HeartbeatTask {
}
}
_ = &mut sleep => {
let peer = Some(Peer { id: node_id, addr: addr.clone() });
let region_stats = Self::load_region_stats(&region_server_clone).await;
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
let req = HeartbeatRequest {
peer,
header: req_header.clone(),
peer: self_peer.clone(),
region_stats,
duration_since_epoch,
node_epoch,

View File

@@ -48,7 +48,7 @@ impl Default for FrontendOptions {
Self {
mode: Mode::Standalone,
node_id: None,
heartbeat: HeartbeatOptions::default(),
heartbeat: HeartbeatOptions::frontend_default(),
http_options: HttpOptions::default(),
grpc_options: GrpcOptions::default(),
mysql_options: MysqlOptions::default(),

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::{HeartbeatRequest, RequestHeader, Role};
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
@@ -44,13 +44,13 @@ pub struct HeartbeatTask {
impl HeartbeatTask {
pub fn new(
meta_client: Arc<MetaClient>,
heartbeat: HeartbeatOptions,
heartbeat_opts: HeartbeatOptions,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
HeartbeatTask {
meta_client,
report_interval: heartbeat.interval_millis,
retry_interval: heartbeat.retry_interval_millis,
report_interval: heartbeat_opts.interval_millis,
retry_interval: heartbeat_opts.retry_interval_millis,
resp_handler_executor,
}
}
@@ -109,7 +109,12 @@ impl HeartbeatTask {
) {
let report_interval = self.report_interval;
let _handle = common_runtime::spawn_bg(async move {
let req_header = Some(RequestHeader {
role: Role::Frontend as i32,
..Default::default()
});
common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
@@ -120,6 +125,7 @@ impl HeartbeatTask {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
header: req_header.clone(),
mailbox_message: Some(message),
..Default::default()
};
@@ -137,7 +143,11 @@ impl HeartbeatTask {
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
Some(HeartbeatRequest::default())
let req = HeartbeatRequest {
header: req_header.clone(),
..Default::default()
};
Some(req)
}
};

View File

@@ -103,7 +103,9 @@ impl TryFrom<HeartbeatRequest> for Stat {
Ok(Self {
timestamp_millis: time_util::current_time_millis(),
cluster_id: header.cluster_id,
id: peer.id,
// datanode id
id: header.member_id,
// datanode address
addr: peer.addr,
rcus: region_stats.iter().map(|s| s.rcus).sum(),
wcus: region_stats.iter().map(|s| s.wcus).sum(),

View File

@@ -22,8 +22,8 @@ pub struct ResponseHeaderHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for ResponseHeaderHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
fn is_acceptable(&self, _role: Role) -> bool {
true
}
async fn handle(

View File

@@ -14,6 +14,8 @@
use serde::{Deserialize, Serialize};
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 5000;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct HeartbeatOptions {
@@ -21,11 +23,25 @@ pub struct HeartbeatOptions {
pub retry_interval_millis: u64,
}
impl Default for HeartbeatOptions {
fn default() -> Self {
impl HeartbeatOptions {
pub fn datanode_default() -> Self {
Default::default()
}
pub fn frontend_default() -> Self {
Self {
interval_millis: 5000,
retry_interval_millis: 5000,
// Frontend can send heartbeat with a longer interval.
interval_millis: HEARTBEAT_INTERVAL_MILLIS * 10,
retry_interval_millis: HEARTBEAT_INTERVAL_MILLIS,
}
}
}
impl Default for HeartbeatOptions {
fn default() -> Self {
Self {
interval_millis: HEARTBEAT_INTERVAL_MILLIS,
retry_interval_millis: HEARTBEAT_INTERVAL_MILLIS,
}
}
}