From 29540b55ee2c3fb661b893b40ce6c99e312e28a5 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 13 May 2025 16:04:43 +0800 Subject: [PATCH] feat(meta): add pusher deregister signal to mailbox receiver (#6072) --- src/meta-srv/src/error.rs | 9 ++ src/meta-srv/src/handler.rs | 62 ++++++-- .../close_downgraded_region.rs | 2 +- .../downgrade_leader_region.rs | 2 +- .../region_migration/flush_leader_region.rs | 2 +- .../region_migration/open_candidate_region.rs | 2 +- .../upgrade_candidate_region.rs | 2 +- src/meta-srv/src/service/mailbox.rs | 149 +++++++++++++++--- 8 files changed, 197 insertions(+), 33 deletions(-) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 050274f24c..f9de94fc8c 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -26,6 +26,7 @@ use tonic::codegen::http; use crate::metasrv::SelectTarget; use crate::pubsub::Message; +use crate::service::mailbox::Channel; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -591,6 +592,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Mailbox channel closed: {channel}"))] + MailboxChannelClosed { + channel: Channel, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Missing request header"))] MissingRequestHeader { #[snafu(implicit)] @@ -894,6 +902,7 @@ impl ErrorExt for Error { | Error::MailboxClosed { .. } | Error::MailboxTimeout { .. } | Error::MailboxReceiver { .. } + | Error::MailboxChannelClosed { .. } | Error::RetryLater { .. } | Error::RetryLaterWithSource { .. } | Error::StartGrpc { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 3cba10d7d6..4d621fa07a 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -50,7 +50,7 @@ use response_header_handler::ResponseHeaderHandler; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use tokio::sync::mpsc::Sender; -use tokio::sync::{oneshot, Notify, RwLock}; +use tokio::sync::{oneshot, watch, Notify, RwLock}; use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu}; use crate::handler::flow_state_handler::FlowStateHandler; @@ -148,20 +148,42 @@ impl PusherId { } } +/// The receiver of the deregister signal. +pub type DeregisterSignalReceiver = watch::Receiver; + /// The pusher of the heartbeat response. pub struct Pusher { sender: Sender>, + // The sender of the deregister signal. + // default is false, means the pusher is not deregistered. + // when the pusher is deregistered, the sender will be notified. + deregister_signal_sender: watch::Sender, + deregister_signal_receiver: DeregisterSignalReceiver, + res_header: ResponseHeader, } +impl Drop for Pusher { + fn drop(&mut self) { + // Ignore the error here. + // if all the receivers have been dropped, means no body cares the deregister signal. + let _ = self.deregister_signal_sender.send(true); + } +} + impl Pusher { pub fn new(sender: Sender>) -> Self { let res_header = ResponseHeader { protocol_version: PROTOCOL_VERSION, ..Default::default() }; - - Self { sender, res_header } + let (deregister_signal_sender, deregister_signal_receiver) = watch::channel(false); + Self { + sender, + deregister_signal_sender, + deregister_signal_receiver, + res_header, + } } #[inline] @@ -185,19 +207,26 @@ impl Pusher { pub struct Pushers(Arc>>); impl Pushers { - async fn push(&self, pusher_id: PusherId, mailbox_message: MailboxMessage) -> Result<()> { + async fn push( + &self, + pusher_id: PusherId, + mailbox_message: MailboxMessage, + ) -> Result { let pusher_id = pusher_id.string_key(); let pushers = self.0.read().await; let pusher = pushers .get(&pusher_id) .context(error::PusherNotFoundSnafu { pusher_id })?; + pusher .push(HeartbeatResponse { header: Some(pusher.header()), mailbox_message: Some(mailbox_message), ..Default::default() }) - .await + .await?; + + Ok(pusher.deregister_signal_receiver.clone()) } async fn broadcast( @@ -447,9 +476,14 @@ impl Mailbox for HeartbeatMailbox { self.timeouts.insert(message_id, deadline); self.timeout_notify.notify_one(); - self.pushers.push(pusher_id, msg).await?; + let deregister_signal_receiver = self.pushers.push(pusher_id, msg).await?; - Ok(MailboxReceiver::new(message_id, rx, *ch)) + Ok(MailboxReceiver::new( + message_id, + rx, + deregister_signal_receiver, + *ch, + )) } async fn send_oneway(&self, ch: &Channel, mut msg: MailboxMessage) -> Result<()> { @@ -809,7 +843,7 @@ mod tests { mailbox.on_recv(id, Ok(resp_msg)).await.unwrap(); - let recv_msg = receiver.await.unwrap().unwrap(); + let recv_msg = receiver.await.unwrap(); assert_eq!(recv_msg.id, id); assert_eq!(recv_msg.timestamp_millis, 456); assert_eq!(recv_msg.subject, "resp-test".to_string()); @@ -818,7 +852,7 @@ mod tests { #[tokio::test] async fn test_mailbox_timeout() { let (_, receiver) = push_msg_via_mailbox().await; - let res = receiver.await.unwrap(); + let res = receiver.await; assert!(res.is_err()); } @@ -1151,4 +1185,14 @@ mod tests { .unwrap_err(); assert_matches!(err, error::Error::HandlerNotFound { .. }); } + + #[tokio::test] + async fn test_pusher_drop() { + let (tx, _rx) = mpsc::channel(1); + let pusher = Pusher::new(tx); + let mut deregister_signal_tx = pusher.deregister_signal_receiver.clone(); + + drop(pusher); + deregister_signal_tx.changed().await.unwrap(); + } } diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs index bbd1e7a0c2..88f1f3a3ec 100644 --- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -113,7 +113,7 @@ impl CloseDowngradedRegion { .send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT) .await?; - match receiver.await? { + match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; info!( diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 9ca3800456..887b0ef825 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -164,7 +164,7 @@ impl DowngradeLeaderRegion { let now = Instant::now(); let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?; - match receiver.await? { + match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; info!( diff --git a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs index 88fab129ad..8014c5fdca 100644 --- a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs @@ -107,7 +107,7 @@ impl PreFlushRegion { let result = ctx.mailbox.send(&ch, msg, operation_timeout).await; match result { - Ok(receiver) => match receiver.await? { + Ok(receiver) => match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; info!( diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 92be086978..7228108cb2 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -146,7 +146,7 @@ impl OpenCandidateRegion { .send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT) .await?; - match receiver.await? { + match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; info!( diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index a51ee26b37..2866a0d34d 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -133,7 +133,7 @@ impl UpgradeCandidateRegion { let ch = Channel::Datanode(candidate.id); let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?; - match receiver.await? { + match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; let InstructionReply::UpgradeRegion(UpgradeRegionReply { diff --git a/src/meta-srv/src/service/mailbox.rs b/src/meta-srv/src/service/mailbox.rs index 90c96b9381..f339e5c4da 100644 --- a/src/meta-srv/src/service/mailbox.rs +++ b/src/meta-srv/src/service/mailbox.rs @@ -20,11 +20,11 @@ use std::task::{Context, Poll}; use std::time::Duration; use api::v1::meta::{MailboxMessage, Role}; -use futures::Future; +use futures::{Future, FutureExt}; use tokio::sync::oneshot; use crate::error::{self, Result}; -use crate::handler::PusherId; +use crate::handler::{DeregisterSignalReceiver, PusherId}; pub type MailboxRef = Arc; @@ -87,43 +87,109 @@ impl BroadcastChannel { } } -pub struct MailboxReceiver { - message_id: MessageId, - rx: oneshot::Receiver>, - ch: Channel, +/// The mailbox receiver +pub enum MailboxReceiver { + Init { + message_id: MessageId, + ch: Channel, + /// The [`MailboxMessage`] receiver + rx: Option>>, + /// The pusher deregister signal receiver + pusher_deregister_signal_receiver: Option, + }, + Polling { + message_id: MessageId, + ch: Channel, + inner_future: Pin> + Send + 'static>>, + }, } impl MailboxReceiver { pub fn new( message_id: MessageId, rx: oneshot::Receiver>, + pusher_deregister_signal_receiver: DeregisterSignalReceiver, ch: Channel, ) -> Self { - Self { message_id, rx, ch } + Self::Init { + message_id, + rx: Some(rx), + pusher_deregister_signal_receiver: Some(pusher_deregister_signal_receiver), + ch, + } } + /// Get the message id of the mailbox receiver pub fn message_id(&self) -> MessageId { - self.message_id + match self { + MailboxReceiver::Init { message_id, .. } => *message_id, + MailboxReceiver::Polling { message_id, .. } => *message_id, + } } + /// Get the channel of the mailbox receiver pub fn channel(&self) -> Channel { - self.ch + match self { + MailboxReceiver::Init { ch, .. } => *ch, + MailboxReceiver::Polling { ch, .. } => *ch, + } + } + + async fn wait_for_message( + rx: oneshot::Receiver>, + mut pusher_deregister_signal_receiver: DeregisterSignalReceiver, + channel: Channel, + message_id: MessageId, + ) -> Result { + tokio::select! { + res = rx => { + res.map_err(|e| error::MailboxReceiverSnafu { + id: message_id, + err_msg: e.to_string(), + }.build())? + } + _ = pusher_deregister_signal_receiver.changed() => { + Err(error::MailboxChannelClosedSnafu { + channel, + }.build()) + } + } } } impl Future for MailboxReceiver { - type Output = Result>; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.rx).poll(cx).map(|r| { - r.map_err(|e| { - error::MailboxReceiverSnafu { - id: self.message_id, - err_msg: e.to_string(), - } - .build() - }) - }) + match &mut *self { + MailboxReceiver::Init { + message_id, + ch, + rx, + pusher_deregister_signal_receiver, + } => { + let polling = MailboxReceiver::Polling { + message_id: *message_id, + ch: *ch, + inner_future: Self::wait_for_message( + rx.take().expect("rx already taken"), + pusher_deregister_signal_receiver + .take() + .expect("pusher_deregister_signal_receiver already taken"), + *ch, + *message_id, + ) + .boxed(), + }; + + *self = polling; + self.poll(cx) + } + MailboxReceiver::Polling { inner_future, .. } => { + let result = futures::ready!(inner_future.as_mut().poll(cx)); + Poll::Ready(result) + } + } } } @@ -145,6 +211,11 @@ pub trait Mailbox: Send + Sync { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + + use common_time::util::current_time_millis; + use tokio::sync::watch; + use super::*; #[test] @@ -162,4 +233,44 @@ mod tests { ("2-".to_string().."3-".to_string()) ); } + + #[tokio::test] + async fn test_mailbox_receiver() { + let (tx, rx) = oneshot::channel(); + let (_deregister_signal_tx, deregister_signal_rx) = watch::channel(false); + let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1)); + + let timestamp_millis = current_time_millis(); + tokio::spawn(async move { + tx.send(Ok(MailboxMessage { + id: 1, + subject: "test-subject".to_string(), + from: "test-from".to_string(), + to: "test-to".to_string(), + timestamp_millis, + payload: None, + })) + }); + + let result = receiver.await.unwrap(); + assert_eq!(result.id, 1); + assert_eq!(result.subject, "test-subject"); + assert_eq!(result.from, "test-from"); + assert_eq!(result.to, "test-to"); + } + + #[tokio::test] + async fn test_mailbox_receiver_deregister_signal() { + let (_tx, rx) = oneshot::channel(); + let (deregister_signal_tx, deregister_signal_rx) = watch::channel(false); + let receiver = MailboxReceiver::new(1, rx, deregister_signal_rx, Channel::Datanode(1)); + + // Sends the deregister signal + tokio::spawn(async move { + let _ = deregister_signal_tx.send(true); + }); + + let err = receiver.await.unwrap_err(); + assert_matches!(err, error::Error::MailboxChannelClosed { .. }); + } }