From dfe68a7e0bde8afdae20186788cfdb2320638ca7 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 9 Oct 2023 11:49:48 +0900 Subject: [PATCH] refactor: check push result out of loop (#2511) * refactor: check push result out of loop * chore: apply suggestions from CR --- src/meta-srv/src/handler.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 513bc8e1c6..d2c2f0dfdd 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -26,6 +26,7 @@ use common_meta::instruction::{Instruction, InstructionReply}; use common_meta::sequence::Sequence; use common_telemetry::{debug, info, timer, warn}; use dashmap::DashMap; +use futures::future::join_all; use metrics::{decrement_gauge, increment_gauge}; use snafu::{OptionExt, ResultExt}; use tokio::sync::mpsc::Sender; @@ -149,18 +150,25 @@ impl Pushers { .range(range) .map(|(_, value)| value) .collect::>(); + let mut results = Vec::with_capacity(pushers.len()); + for pusher in pushers { let mut mailbox_message = mailbox_message.clone(); mailbox_message.id = 0; // one-way message - pusher - .push(HeartbeatResponse { - header: Some(pusher.header()), - mailbox_message: Some(mailbox_message), - ..Default::default() - }) - .await?; + + results.push(pusher.push(HeartbeatResponse { + header: Some(pusher.header()), + mailbox_message: Some(mailbox_message), + ..Default::default() + })) } + // Checks the error out of the loop. + let _ = join_all(results) + .await + .into_iter() + .collect::>>()?; + Ok(()) }