diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index e686526726..0092a90276 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -218,6 +218,7 @@ impl HeartbeatTask { if let Some(message) = message { Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report) } else { + warn!("Sender has been dropped, exiting the heartbeat loop"); // Receives None that means Sender was dropped, we need to break the current loop break } @@ -259,7 +260,11 @@ impl HeartbeatTask { error!(e; "Error while handling heartbeat response"); } } - Ok(None) => break, + Ok(None) => { + warn!("Heartbeat response stream closed"); + capture_self.start_with_retry(retry_interval).await; + break; + } Err(e) => { error!(e; "Occur error while reading heartbeat response"); capture_self.start_with_retry(retry_interval).await; diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index add975385e..c3768e766a 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{ }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; @@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT}; pub struct HeartbeatTask { peer_addr: String, meta_client: Arc, - report_interval: u64, - retry_interval: u64, + report_interval: Duration, + retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, start_time_ms: u64, } @@ -65,8 +65,8 @@ impl HeartbeatTask { addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)) }, meta_client, - report_interval: heartbeat_opts.interval.as_millis() as u64, - retry_interval: heartbeat_opts.retry_interval.as_millis() as u64, + report_interval: heartbeat_opts.interval, + retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, } @@ -110,13 +110,15 @@ impl HeartbeatTask { HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc(); } } - Ok(None) => break, + Ok(None) => { + warn!("Heartbeat response stream closed"); + capture_self.start_with_retry(retry_interval).await; + break; + } Err(e) => { HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc(); error!(e; "Occur error while reading heartbeat response"); - capture_self - .start_with_retry(Duration::from_millis(retry_interval)) - .await; + capture_self.start_with_retry(retry_interval).await; break; } @@ -184,12 +186,13 @@ impl HeartbeatTask { if let Some(message) = message { Self::new_heartbeat_request(&heartbeat_request, Some(message)) } else { + warn!("Sender has been dropped, exiting the heartbeat loop"); // Receives None that means Sender was dropped, we need to break the current loop break } } _ = &mut sleep => { - sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval)); + sleep.as_mut().reset(Instant::now() + report_interval); Self::new_heartbeat_request(&heartbeat_request, None) } }; diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index fd9232ecd7..d1a3a0e636 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -27,10 +27,9 @@ use snafu::OptionExt; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Streaming}; +use tonic::{Request, Response, Status, Streaming}; -use crate::error; -use crate::error::Result; +use crate::error::{self, Result}; use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId}; use crate::metasrv::{Context, Metasrv}; use crate::metrics::METRIC_META_HEARTBEAT_RECV; @@ -109,6 +108,12 @@ impl heartbeat_server::Heartbeat for Metasrv { if is_not_leader { warn!("Quit because it is no longer the leader"); + let _ = tx + .send(Err(Status::aborted(format!( + "The requested metasrv node is not leader, node addr: {}", + ctx.server_addr + )))) + .await; break; } }