feat(meta): add pusher deregister signal to mailbox receiver (#6072)

This commit is contained in:
Weny Xu
2025-05-13 16:04:43 +08:00
committed by GitHub
parent ca1641d1c4
commit 29540b55ee
8 changed files with 197 additions and 33 deletions

View File

@@ -26,6 +26,7 @@ use tonic::codegen::http;
use crate::metasrv::SelectTarget;
use crate::pubsub::Message;
use crate::service::mailbox::Channel;
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -591,6 +592,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Mailbox channel closed: {channel}"))]
MailboxChannelClosed {
channel: Channel,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Missing request header"))]
MissingRequestHeader {
#[snafu(implicit)]
@@ -894,6 +902,7 @@ impl ErrorExt for Error {
| Error::MailboxClosed { .. }
| Error::MailboxTimeout { .. }
| Error::MailboxReceiver { .. }
| Error::MailboxChannelClosed { .. }
| Error::RetryLater { .. }
| Error::RetryLaterWithSource { .. }
| Error::StartGrpc { .. }

View File

@@ -50,7 +50,7 @@ use response_header_handler::ResponseHeaderHandler;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Notify, RwLock};
use tokio::sync::{oneshot, watch, Notify, RwLock};
use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
use crate::handler::flow_state_handler::FlowStateHandler;
@@ -148,20 +148,42 @@ impl PusherId {
}
}
/// The receiver of the deregister signal.
pub type DeregisterSignalReceiver = watch::Receiver<bool>;
/// The pusher of the heartbeat response.
pub struct Pusher {
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
// The sender of the deregister signal.
// default is false, means the pusher is not deregistered.
// when the pusher is deregistered, the sender will be notified.
deregister_signal_sender: watch::Sender<bool>,
deregister_signal_receiver: DeregisterSignalReceiver,
res_header: ResponseHeader,
}
impl Drop for Pusher {
fn drop(&mut self) {
// Ignore the error here.
// if all the receivers have been dropped, means no body cares the deregister signal.
let _ = self.deregister_signal_sender.send(true);
}
}
impl Pusher {
pub fn new(sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>) -> Self {
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
..Default::default()
};
Self { sender, res_header }
let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false);
Self {
sender,
deregister_signal_sender,
deregister_signal_receiver,
res_header,
}
}
#[inline]
@@ -185,19 +207,26 @@ impl Pusher {
pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);
impl Pushers {
async fn push(&self, pusher_id: PusherId, mailbox_message: MailboxMessage) -> Result<()> {
async fn push(
&self,
pusher_id: PusherId,
mailbox_message: MailboxMessage,
) -> Result<DeregisterSignalReceiver> {
let pusher_id = pusher_id.string_key();
let pushers = self.0.read().await;
let pusher = pushers
.get(&pusher_id)
.context(error::PusherNotFoundSnafu { pusher_id })?;
pusher
.push(HeartbeatResponse {
header: Some(pusher.header()),
mailbox_message: Some(mailbox_message),
..Default::default()
})
.await
.await?;
Ok(pusher.deregister_signal_receiver.clone())
}
async fn broadcast(
@@ -447,9 +476,14 @@ impl Mailbox for HeartbeatMailbox {
self.timeouts.insert(message_id, deadline);
self.timeout_notify.notify_one();
self.pushers.push(pusher_id, msg).await?;
let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?;
Ok(MailboxReceiver::new(message_id, rx, *ch))
Ok(MailboxReceiver::new(
message_id,
rx,
deregister_signal_receiver,
*ch,
))
}
async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> {
@@ -809,7 +843,7 @@ mod tests {
mailbox.on_recv(id, Ok(resp_msg)).await.unwrap();
let recv_msg = receiver.await.unwrap().unwrap();
let recv_msg = receiver.await.unwrap();
assert_eq!(recv_msg.id, id);
assert_eq!(recv_msg.timestamp_millis, 456);
assert_eq!(recv_msg.subject, "resp-test".to_string());
@@ -818,7 +852,7 @@ mod tests {
#[tokio::test]
async fn test_mailbox_timeout() {
let (_, receiver) = push_msg_via_mailbox().await;
let res = receiver.await.unwrap();
let res = receiver.await;
assert!(res.is_err());
}
@@ -1151,4 +1185,14 @@ mod tests {
.unwrap_err();
assert_matches!(err, error::Error::HandlerNotFound { .. });
}
#[tokio::test]
async fn test_pusher_drop() {
let (tx, _rx) = mpsc::channel(1);
let pusher = Pusher::new(tx);
let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone();
drop(pusher);
deregister_signal_tx.changed().await.unwrap();
}
}

View File

@@ -113,7 +113,7 @@ impl CloseDowngradedRegion {
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
.await?;
match receiver.await? {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(

View File

@@ -164,7 +164,7 @@ impl DowngradeLeaderRegion {
let now = Instant::now();
let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
match receiver.await? {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(

View File

@@ -107,7 +107,7 @@ impl PreFlushRegion {
let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
match result {
Ok(receiver) => match receiver.await? {
Ok(receiver) => match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(

View File

@@ -146,7 +146,7 @@ impl OpenCandidateRegion {
.send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
.await?;
match receiver.await? {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(

View File

@@ -133,7 +133,7 @@ impl UpgradeCandidateRegion {
let ch = Channel::Datanode(candidate.id);
let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
match receiver.await? {
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
let InstructionReply::UpgradeRegion(UpgradeRegionReply {

View File

@@ -20,11 +20,11 @@ use std::task::{Context, Poll};
use std::time::Duration;
use api::v1::meta::{MailboxMessage, Role};
use futures::Future;
use futures::{Future, FutureExt};
use tokio::sync::oneshot;
use crate::error::{self, Result};
use crate::handler::PusherId;
use crate::handler::{DeregisterSignalReceiver, PusherId};
pub type MailboxRef = Arc<dyn Mailbox>;
@@ -87,43 +87,109 @@ impl BroadcastChannel {
}
}
pub struct MailboxReceiver {
message_id: MessageId,
rx: oneshot::Receiver<Result<MailboxMessage>>,
ch: Channel,
/// The mailbox receiver
pub enum MailboxReceiver {
Init {
message_id: MessageId,
ch: Channel,
/// The [`MailboxMessage`] receiver
rx: Option<oneshot::Receiver<Result<MailboxMessage>>>,
/// The pusher deregister signal receiver
pusher_deregister_signal_receiver: Option<DeregisterSignalReceiver>,
},
Polling {
message_id: MessageId,
ch: Channel,
inner_future: Pin<Box<dyn Future<Output = Result<MailboxMessage>> + Send + 'static>>,
},
}
impl MailboxReceiver {
pub fn new(
message_id: MessageId,
rx: oneshot::Receiver<Result<MailboxMessage>>,
pusher_deregister_signal_receiver: DeregisterSignalReceiver,
ch: Channel,
) -> Self {
Self { message_id, rx, ch }
Self::Init {
message_id,
rx: Some(rx),
pusher_deregister_signal_receiver: Some(pusher_deregister_signal_receiver),
ch,
}
}
/// Get the message id of the mailbox receiver
pub fn message_id(&self) -> MessageId {
self.message_id
match self {
MailboxReceiver::Init { message_id, .. } => *message_id,
MailboxReceiver::Polling { message_id, .. } => *message_id,
}
}
/// Get the channel of the mailbox receiver
pub fn channel(&self) -> Channel {
self.ch
match self {
MailboxReceiver::Init { ch, .. } => *ch,
MailboxReceiver::Polling { ch, .. } => *ch,
}
}
async fn wait_for_message(
rx: oneshot::Receiver<Result<MailboxMessage>>,
mut pusher_deregister_signal_receiver: DeregisterSignalReceiver,
channel: Channel,
message_id: MessageId,
) -> Result<MailboxMessage> {
tokio::select! {
res = rx => {
res.map_err(|e| error::MailboxReceiverSnafu {
id: message_id,
err_msg: e.to_string(),
}.build())?
}
_ = pusher_deregister_signal_receiver.changed() => {
Err(error::MailboxChannelClosedSnafu {
channel,
}.build())
}
}
}
}
impl Future for MailboxReceiver {
type Output = Result<Result<MailboxMessage>>;
type Output = Result<MailboxMessage>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.rx).poll(cx).map(|r| {
r.map_err(|e| {
error::MailboxReceiverSnafu {
id: self.message_id,
err_msg: e.to_string(),
}
.build()
})
})
match &mut *self {
MailboxReceiver::Init {
message_id,
ch,
rx,
pusher_deregister_signal_receiver,
} => {
let polling = MailboxReceiver::Polling {
message_id: *message_id,
ch: *ch,
inner_future: Self::wait_for_message(
rx.take().expect("rx already taken"),
pusher_deregister_signal_receiver
.take()
.expect("pusher_deregister_signal_receiver already taken"),
*ch,
*message_id,
)
.boxed(),
};
*self = polling;
self.poll(cx)
}
MailboxReceiver::Polling { inner_future, .. } => {
let result = futures::ready!(inner_future.as_mut().poll(cx));
Poll::Ready(result)
}
}
}
}
@@ -145,6 +211,11 @@ pub trait Mailbox: Send + Sync {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_time::util::current_time_millis;
use tokio::sync::watch;
use super::*;
#[test]
@@ -162,4 +233,44 @@ mod tests {
("2-".to_string().."3-".to_string())
);
}
#[tokio::test]
async fn test_mailbox_receiver() {
let (tx, rx) = oneshot::channel();
let (_deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
let timestamp_millis = current_time_millis();
tokio::spawn(async move {
tx.send(Ok(MailboxMessage {
id: 1,
subject: "test-subject".to_string(),
from: "test-from".to_string(),
to: "test-to".to_string(),
timestamp_millis,
payload: None,
}))
});
let result = receiver.await.unwrap();
assert_eq!(result.id, 1);
assert_eq!(result.subject, "test-subject");
assert_eq!(result.from, "test-from");
assert_eq!(result.to, "test-to");
}
#[tokio::test]
async fn test_mailbox_receiver_deregister_signal() {
let (_tx, rx) = oneshot::channel();
let (deregister_signal_tx, deregister_signal_rx) = watch::channel(false);
let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1));
// Sends the deregister signal
tokio::spawn(async move {
let _ = deregister_signal_tx.send(true);
});
let err = receiver.await.unwrap_err();
assert_matches!(err, error::Error::MailboxChannelClosed { .. });
}
}