From 8b9b5a0d3a7e2ca1f68692450d58a765ca0d7e6c Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Mon, 29 May 2023 15:11:50 +0800 Subject: [PATCH] feat: broadcast with mailbox (#1661) feat: broad with mailbox --- src/meta-srv/src/handler.rs | 34 +++++++++++++++++++++++++++- src/meta-srv/src/service/mailbox.rs | 35 +++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index ee55d51c71..39d417d21d 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::ops::Range; use std::sync::Arc; use std::time::Duration; @@ -41,7 +42,10 @@ use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstruction use crate::metasrv::Context; use crate::metrics::METRIC_META_HEARTBEAT_CONNECTION_NUM; use crate::sequence::Sequence; -use crate::service::mailbox::{Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId}; +use crate::service::mailbox::{ + BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId, +}; + mod check_leader_handler; mod collect_stats_handler; pub(crate) mod failure_handler; @@ -130,6 +134,30 @@ impl Pushers { .await } + async fn broadcast( + &self, + range: Range, + mailbox_message: &MailboxMessage, + ) -> Result<()> { + let pushers = self.0.read().await; + let pushers = pushers + .range(range) + .map(|(_, value)| value) + .collect::>(); + for pusher in pushers { + let mut mailbox_message = mailbox_message.clone(); + mailbox_message.id = 0; // one-way message + pusher + .push(HeartbeatResponse { + header: Some(pusher.header()), + mailbox_message: Some(mailbox_message), + }) + .await?; + } + + Ok(()) + } + pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option { self.0.write().await.insert(pusher_id, pusher) } @@ -321,6 +349,10 @@ impl Mailbox for HeartbeatMailbox { Ok(MailboxReceiver::new(message_id, rx)) } + async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> { + self.pushers.broadcast(ch.pusher_range(), msg).await + } + async fn on_recv(&self, id: MessageId, maybe_msg: Result) -> Result<()> { debug!("Received mailbox message {maybe_msg:?}"); diff --git a/src/meta-srv/src/service/mailbox.rs b/src/meta-srv/src/service/mailbox.rs index 538da3e552..dd0b266d1c 100644 --- a/src/meta-srv/src/service/mailbox.rs +++ b/src/meta-srv/src/service/mailbox.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -41,6 +42,26 @@ impl Channel { } } +pub enum BroadcastChannel { + Datanode, + Frontend, +} + +impl BroadcastChannel { + pub(crate) fn pusher_range(&self) -> Range { + match self { + BroadcastChannel::Datanode => Range { + start: format!("{}-", Role::Datanode as i32), + end: format!("{}-", Role::Frontend as i32), + }, + BroadcastChannel::Frontend => Range { + start: format!("{}-", Role::Frontend as i32), + end: format!("{}-", Role::Frontend as i32 + 1), + }, + } + } +} + pub struct MailboxReceiver { message_id: MessageId, rx: oneshot::Receiver>, @@ -81,6 +102,8 @@ pub trait Mailbox: Send + Sync { timeout: Duration, ) -> Result; + async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>; + async fn on_recv(&self, id: MessageId, maybe_msg: Result) -> Result<()>; } @@ -93,4 +116,16 @@ mod tests { assert_eq!(Channel::Datanode(42).pusher_id(), "0-42"); assert_eq!(Channel::Frontend(42).pusher_id(), "1-42"); } + + #[test] + fn test_channel_pusher_range() { + assert_eq!( + BroadcastChannel::Datanode.pusher_range(), + ("0-".to_string().."1-".to_string()) + ); + assert_eq!( + BroadcastChannel::Frontend.pusher_range(), + ("1-".to_string().."2-".to_string()) + ); + } }