From ccf42a9d973929d9371f253301ff86850dde380f Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 27 Feb 2025 11:58:21 +0800 Subject: [PATCH] fix: flow heartbeat retry (#5600) * fix: flow heartbeat retry * fix?: not sure if fixed * chore: per review --- src/flow/src/heartbeat.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 9cef02eac1..45786a4d80 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -103,6 +103,11 @@ impl HeartbeatTask { warn!("Heartbeat task started multiple times"); return Ok(()); } + + self.create_streams().await + } + + async fn create_streams(&self) -> Result<(), Error> { info!("Start to establish the heartbeat connection to metasrv."); let (req_sender, resp_stream) = self .meta_client @@ -231,6 +236,8 @@ impl HeartbeatTask { // set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong latest_report = query_flow_state(&query_stat_size, report_interval / 2).await; } + + info!("flownode heartbeat task stopped."); }); } @@ -274,7 +281,7 @@ impl HeartbeatTask { info!("Try to re-establish the heartbeat connection to metasrv."); - if self.start().await.is_ok() { + if self.create_streams().await.is_ok() { break; } }