feat: broadcast with mailbox (#1661)

feat: broad with mailbox
This commit is contained in:
JeremyHi
2023-05-29 15:11:50 +08:00
committed by GitHub
parent 78fab08b51
commit 8b9b5a0d3a
2 changed files with 68 additions and 1 deletions

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
@@ -41,7 +42,10 @@ use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstruction
use crate::metasrv::Context;
use crate::metrics::METRIC_META_HEARTBEAT_CONNECTION_NUM;
use crate::sequence::Sequence;
use crate::service::mailbox::{Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId};
use crate::service::mailbox::{
BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
};
mod check_leader_handler;
mod collect_stats_handler;
pub(crate) mod failure_handler;
@@ -130,6 +134,30 @@ impl Pushers {
.await
}
async fn broadcast(
&self,
range: Range<String>,
mailbox_message: &MailboxMessage,
) -> Result<()> {
let pushers = self.0.read().await;
let pushers = pushers
.range(range)
.map(|(_, value)| value)
.collect::<Vec<_>>();
for pusher in pushers {
let mut mailbox_message = mailbox_message.clone();
mailbox_message.id = 0; // one-way message
pusher
.push(HeartbeatResponse {
header: Some(pusher.header()),
mailbox_message: Some(mailbox_message),
})
.await?;
}
Ok(())
}
pub(crate) async fn insert(&self, pusher_id: String, pusher: Pusher) -> Option<Pusher> {
self.0.write().await.insert(pusher_id, pusher)
}
@@ -321,6 +349,10 @@ impl Mailbox for HeartbeatMailbox {
Ok(MailboxReceiver::new(message_id, rx))
}
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()> {
self.pushers.broadcast(ch.pusher_range(), msg).await
}
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
debug!("Received mailbox message {maybe_msg:?}");

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -41,6 +42,26 @@ impl Channel {
}
}
pub enum BroadcastChannel {
Datanode,
Frontend,
}
impl BroadcastChannel {
pub(crate) fn pusher_range(&self) -> Range<String> {
match self {
BroadcastChannel::Datanode => Range {
start: format!("{}-", Role::Datanode as i32),
end: format!("{}-", Role::Frontend as i32),
},
BroadcastChannel::Frontend => Range {
start: format!("{}-", Role::Frontend as i32),
end: format!("{}-", Role::Frontend as i32 + 1),
},
}
}
}
pub struct MailboxReceiver {
message_id: MessageId,
rx: oneshot::Receiver<Result<MailboxMessage>>,
@@ -81,6 +102,8 @@ pub trait Mailbox: Send + Sync {
timeout: Duration,
) -> Result<MailboxReceiver>;
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
}
@@ -93,4 +116,16 @@ mod tests {
assert_eq!(Channel::Datanode(42).pusher_id(), "0-42");
assert_eq!(Channel::Frontend(42).pusher_id(), "1-42");
}
#[test]
fn test_channel_pusher_range() {
assert_eq!(
BroadcastChannel::Datanode.pusher_range(),
("0-".to_string().."1-".to_string())
);
assert_eq!(
BroadcastChannel::Frontend.pusher_range(),
("1-".to_string().."2-".to_string())
);
}
}