From 8f7951c5bd6e9ebf0dfdf7ff7fd78c298871b091 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 25 May 2026 15:40:03 +0800 Subject: [PATCH] fix: close heartbeat streams on metasrv leader stepdown (#8156) * fix: reregister missing heartbeat pusher Signed-off-by: WenyXu * refactor: extract heartbeat session Signed-off-by: WenyXu * test: cover heartbeat session Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/meta-srv/src/handler.rs | 26 +- src/meta-srv/src/metasrv.rs | 5 - src/meta-srv/src/service/heartbeat.rs | 657 +++++++++++++++++++++++--- src/meta-srv/src/service/mailbox.rs | 3 - 4 files changed, 587 insertions(+), 104 deletions(-) diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 4b05db4e4c..92916838d8 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -278,15 +278,6 @@ impl Pushers { async fn remove(&self, pusher_id: &str) -> Option { self.0.write().await.remove(pusher_id) } - - pub(crate) async fn clear(&self) -> Vec { - let mut pushers = self.0.write().await; - let keys = pushers.keys().cloned().collect::>(); - if !keys.is_empty() { - pushers.clear(); - } - keys - } } #[derive(Clone)] @@ -322,12 +313,19 @@ impl HeartbeatHandlerGroup { /// Deregisters the heartbeat response [`Pusher`] with the given key from the group. pub async fn deregister_push(&self, pusher_id: PusherId) { - info!("Pusher unregister: {}", pusher_id); if self.pushers.remove(&pusher_id.string_key()).await.is_some() { + info!("Pusher unregister: {}", pusher_id); METRIC_META_HEARTBEAT_CONNECTION_NUM.dec(); } } + #[cfg(test)] + /// Returns whether the group contains the heartbeat response [`Pusher`] with the given key. + pub async fn contains_pusher(&self, pusher_id: &PusherId) -> bool { + let pushers = self.pushers.0.read().await; + pushers.contains_key(&pusher_id.string_key()) + } + /// Returns the [`Pushers`] of the group. pub fn pushers(&self) -> Pushers { self.pushers.clone() @@ -550,14 +548,6 @@ impl Mailbox for HeartbeatMailbox { Ok(()) } - - async fn reset(&self) { - let keys = self.pushers.clear().await; - if !keys.is_empty() { - info!("Reset mailbox, deregister pushers: {:?}", keys); - METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64); - } - } } /// The builder to build the group of heartbeat handlers. diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index df2a3a35b8..f7f5bbf77d 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -512,7 +512,6 @@ pub struct MetaStateHandler { greptimedb_telemetry_task: Arc, leader_cached_kv_backend: Arc, leadership_change_notifier: LeadershipChangeNotifier, - mailbox: MailboxRef, state: StateRef, } @@ -536,9 +535,6 @@ impl MetaStateHandler { pub async fn on_leader_stop(&self) { self.state.write().unwrap().next_state(become_follower()); - // Enforces the mailbox to clear all pushers. - // The remaining heartbeat connections will be closed by the remote peer or keep-alive detection. - self.mailbox.reset().await; self.leadership_change_notifier .notify_on_leader_stop() .await; @@ -667,7 +663,6 @@ impl Metasrv { state: self.state.clone(), leader_cached_kv_backend: leader_cached_kv_backend.clone(), leadership_change_notifier, - mailbox: self.mailbox.clone(), }; let _handle = common_runtime::spawn_global(async move { loop { diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 238ed99df2..066d156047 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -20,10 +20,12 @@ use api::v1::meta::{ AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, Peer, RequestHeader, ResponseHeader, Role, heartbeat_server, }; +use common_meta::election::LeaderChangeMessage; use common_telemetry::{debug, error, info, warn}; use futures::StreamExt; use once_cell::sync::OnceCell; use snafu::{OptionExt, ResultExt}; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::ReceiverStream; @@ -31,10 +33,282 @@ use tonic::{Request, Response, Status, Streaming}; use crate::error::{self, Result}; use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId}; -use crate::metasrv::{Context, Metasrv}; +use crate::metasrv::{Context, ElectionRef, Metasrv}; use crate::metrics::METRIC_META_HEARTBEAT_RECV; use crate::service::{GrpcResult, GrpcStream}; +type HeartbeatResponseResult = std::result::Result; + +#[async_trait::async_trait] +trait HeartbeatRequestStream { + async fn next(&mut self) -> Option>; +} + +struct TonicHeartbeatRequestStream { + inner: Streaming, +} + +impl TonicHeartbeatRequestStream { + fn new(inner: Streaming) -> Self { + Self { inner } + } +} + +#[async_trait::async_trait] +impl HeartbeatRequestStream for TonicHeartbeatRequestStream { + async fn next(&mut self) -> Option> { + self.inner.next().await + } +} + +enum LeaderStepDownEvent { + StepDown, + Closed, +} + +#[async_trait::async_trait] +trait LeaderStepDown { + async fn wait(&mut self) -> LeaderStepDownEvent; +} + +struct ElectionLeaderStepDown { + rx: tokio::sync::broadcast::Receiver, +} + +impl ElectionLeaderStepDown { + fn new(election: ElectionRef) -> Self { + Self { + rx: election.subscribe_leader_change(), + } + } +} + +#[async_trait::async_trait] +impl LeaderStepDown for ElectionLeaderStepDown { + async fn wait(&mut self) -> LeaderStepDownEvent { + loop { + match self.rx.recv().await { + Ok(LeaderChangeMessage::StepDown(_)) => return LeaderStepDownEvent::StepDown, + Ok(LeaderChangeMessage::Elected(_)) => {} + Err(RecvError::Lagged(skipped)) => { + warn!( + "Leader step-down watcher lagged, skipped {} leader change events", + skipped + ); + } + Err(RecvError::Closed) => return LeaderStepDownEvent::Closed, + } + } + } +} + +struct HeartbeatSession { + requests: R, + tx: Sender, + leader_step_down: Option, + handler_group: Arc, + ctx: Context, + sender_id: PusherId, +} + +impl HeartbeatSession +where + R: HeartbeatRequestStream, + L: LeaderStepDown, +{ + /// Initializes the heartbeat session by receiving the first request, + /// and returns `None` if the stream is closed or an error occurs. + async fn init( + mut requests: R, + tx: Sender, + leader_step_down: Option, + handler_group: Arc, + ctx: Context, + ) -> Option { + let msg = requests.next().await?; + + let req = match msg { + Ok(req) => req, + Err(err) => { + error!("Failed to receive the first heartbeat request, error: {err}"); + let _ = handle_request_stream_error(None, &tx, err).await; + return None; + } + }; + + let Some(header) = req.header.as_ref() else { + error!("Exit on malformed request: MissingRequestHeader"); + let _ = tx + .send(Err(error::MissingRequestHeaderSnafu {}.build().into())) + .await; + return None; + }; + + let sender_id = register_pusher(&handler_group, header, tx.clone()).await; + let mut session = Self { + requests, + tx, + leader_step_down, + handler_group, + ctx, + sender_id, + }; + + if session.handle_request(req, true).await { + Some(session) + } else { + session.cleanup().await; + None + } + } + + /// Runs the heartbeat session until the stream is closed or an error occurs. + async fn run(mut self) { + let mut leader_step_down = self.leader_step_down.take(); + + loop { + tokio::select! { + msg = self.requests.next() => { + let Some(msg) = msg else { + break; + }; + + if !self.handle_message(msg).await { + break; + } + } + event = wait_leader_step_down(leader_step_down.as_mut()), if leader_step_down.is_some() => { + match event { + LeaderStepDownEvent::StepDown => { + self.send_not_leader_error().await; + break; + } + LeaderStepDownEvent::Closed => { + warn!("Leader step-down watcher closed"); + self.send_election_unavailable_error().await; + break; + } + } + } + } + } + + self.cleanup().await; + } + + /// Handles the incoming message, and returns whether to continue the session. + async fn handle_message(&mut self, msg: std::result::Result) -> bool { + match msg { + Ok(req) => self.handle_request(req, false).await, + Err(err) => handle_request_stream_error(Some(self.sender_id), &self.tx, err).await, + } + } + + /// Handles the incoming heartbeat request, and returns whether to continue the session. + async fn handle_request(&mut self, req: HeartbeatRequest, is_handshake: bool) -> bool { + debug!("Receiving heartbeat request: {:?}", req); + + let sender_id = self.sender_id.to_string(); + METRIC_META_HEARTBEAT_RECV + .with_label_values(&[sender_id.as_str()]) + .inc(); + + let res = self + .handler_group + .handle(req, self.ctx.clone().with_handshake(is_handshake)) + .await + .inspect_err( + |e| warn!(e; "Failed to handle heartbeat request, sender: {}", self.sender_id), + ) + .map_err(|e| e.into()); + + let is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader()); + + debug!("Sending heartbeat response: {:?}", res); + + if self.tx.send(res).await.is_err() { + info!( + "ReceiverStream was dropped; shutting down, sender: {}", + self.sender_id + ); + return false; + } + + if is_not_leader { + warn!( + "Quit because it is no longer the leader, sender: {}", + self.sender_id + ); + self.send_not_leader_error().await; + return false; + } + + true + } + + async fn send_not_leader_error(&mut self) { + let _ = self + .tx + .send(Err(Status::aborted(format!( + "The requested metasrv node is not leader, node addr: {}", + self.ctx.server_addr + )))) + .await; + } + + async fn send_election_unavailable_error(&mut self) { + let _ = self + .tx + .send(Err(Status::unavailable(format!( + "The requested metasrv node is shutting down, node addr: {}", + self.ctx.server_addr + )))) + .await; + } + + async fn cleanup(&self) { + info!("Heartbeat stream closed, sender: {}", self.sender_id); + let _ = self.handler_group.deregister_push(self.sender_id).await; + } +} + +async fn wait_leader_step_down(leader_step_down: Option<&mut L>) -> LeaderStepDownEvent +where + L: LeaderStepDown, +{ + match leader_step_down { + Some(leader_step_down) => leader_step_down.wait().await, + None => std::future::pending().await, + } +} + +/// Handles request stream error by logging and forwarding the error to the client if possible. +/// +/// Returns `false` if the stream should be terminated. +async fn handle_request_stream_error( + sender_id: Option, + tx: &Sender, + err: Status, +) -> bool { + if let Some(io_err) = error::match_for_io_error(&err) + && io_err.kind() == ErrorKind::BrokenPipe + { + error!("Client disconnected: broken pipe, sender: {:?}", sender_id); + return false; + } + error!(err; "Error while receiving heartbeat request, sender: {:?}", sender_id); + + if tx.send(Err(err)).await.is_err() { + info!( + "Failed to forward heartbeat request stream error; response stream was dropped, sender: {:?}", + sender_id + ); + return false; + } + + true +} + #[async_trait::async_trait] impl heartbeat_server::Heartbeat for Metasrv { type HeartbeatStream = GrpcStream; @@ -43,88 +317,26 @@ impl heartbeat_server::Heartbeat for Metasrv { &self, req: Request>, ) -> GrpcResult { - let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); let handler_group = self.handler_group().context(error::UnexpectedSnafu { violated: "expected heartbeat handlers", })?; let ctx = self.new_ctx(); + let requests = TonicHeartbeatRequestStream::new(req.into_inner()); let _handle = common_runtime::spawn_global(async move { - let mut pusher_id = None; - while let Some(msg) = in_stream.next().await { - let mut is_not_leader = false; - match msg { - Ok(req) => { - debug!("Receiving heartbeat request: {:?}", req); - - let Some(header) = req.header.as_ref() else { - error!("Exit on malformed request: MissingRequestHeader"); - let _ = tx - .send(Err(error::MissingRequestHeaderSnafu {}.build().into())) - .await; - break; - }; - - let is_handshake = pusher_id.is_none(); - if is_handshake { - pusher_id = - Some(register_pusher(&handler_group, header, tx.clone()).await); - } - if let Some(k) = &pusher_id { - METRIC_META_HEARTBEAT_RECV.with_label_values(&[&k.to_string()]); - } else { - METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]); - } - - let res = handler_group - .handle(req, ctx.clone().with_handshake(is_handshake)) - .await - .inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", )) - .map_err(|e| e.into()); - - is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader()); - - debug!("Sending heartbeat response: {:?}", res); - - if tx.send(res).await.is_err() { - info!("ReceiverStream was dropped; shutting down"); - break; - } - } - Err(err) => { - if let Some(io_err) = error::match_for_io_error(&err) - && io_err.kind() == ErrorKind::BrokenPipe - { - // client disconnected in unexpected way - error!("Client disconnected: broken pipe"); - break; - } - error!(err; "Sending heartbeat response error"); - - if tx.send(Err(err)).await.is_err() { - info!("ReceiverStream was dropped; shutting down"); - break; - } - } - } - - if is_not_leader { - warn!("Quit because it is no longer the leader"); - let _ = tx - .send(Err(Status::aborted(format!( - "The requested metasrv node is not leader, node addr: {}", - ctx.server_addr - )))) - .await; - break; - } - } - - info!("Heartbeat stream closed: {pusher_id:?}"); - - if let Some(pusher_id) = pusher_id { - let _ = handler_group.deregister_push(pusher_id).await; + if let Some(session) = HeartbeatSession::init( + requests, + tx, + ctx.election + .as_ref() + .map(|r| ElectionLeaderStepDown::new(r.clone())), + handler_group, + ctx, + ) + .await + { + session.run().await; } }); @@ -192,6 +404,7 @@ async fn register_pusher( #[cfg(test)] mod tests { + use std::collections::VecDeque; use std::sync::Arc; use api::v1::meta::heartbeat_server::Heartbeat; @@ -199,12 +412,300 @@ mod tests { use common_meta::kv_backend::memory::MemoryKvBackend; use common_telemetry::tracing_context::W3cTrace; use servers::grpc::GrpcOptions; - use tonic::IntoRequest; + use tokio::sync::mpsc; + use tonic::{Code, IntoRequest}; - use super::get_node_id; + use super::*; + use crate::handler::test_utils::TestEnv; use crate::metasrv::MetasrvOptions; use crate::metasrv::builder::MetasrvBuilder; + struct MockHeartbeatRequestStream { + messages: VecDeque>, + pending_when_empty: bool, + } + + impl MockHeartbeatRequestStream { + fn new(messages: Vec>) -> Self { + Self { + messages: messages.into(), + pending_when_empty: false, + } + } + + fn pending_after(messages: Vec>) -> Self { + Self { + messages: messages.into(), + pending_when_empty: true, + } + } + } + + #[async_trait::async_trait] + impl HeartbeatRequestStream for MockHeartbeatRequestStream { + async fn next(&mut self) -> Option> { + if let Some(message) = self.messages.pop_front() { + return Some(message); + } + + if self.pending_when_empty { + std::future::pending().await + } else { + None + } + } + } + + struct MockLeaderStepDown { + event: Option, + } + + impl MockLeaderStepDown { + fn new(event: LeaderStepDownEvent) -> Self { + Self { event: Some(event) } + } + } + + #[async_trait::async_trait] + impl LeaderStepDown for MockLeaderStepDown { + async fn wait(&mut self) -> LeaderStepDownEvent { + self.event.take().unwrap() + } + } + + fn heartbeat_request(role: Role, member_id: u64) -> HeartbeatRequest { + HeartbeatRequest { + header: Some(RequestHeader { + role: role.into(), + member_id, + ..Default::default() + }), + ..Default::default() + } + } + + fn sender_id(role: Role, member_id: u64) -> PusherId { + PusherId::new(role, member_id) + } + + fn test_context() -> Context { + TestEnv::new().ctx() + } + + fn test_handler_group() -> Arc { + Arc::new(HeartbeatHandlerGroup::default()) + } + + async fn init_session( + requests: MockHeartbeatRequestStream, + tx: Sender, + leader_step_down: Option, + handler_group: Arc, + ) -> Option> + where + L: LeaderStepDown, + { + HeartbeatSession::init( + requests, + tx, + leader_step_down, + handler_group, + test_context(), + ) + .await + } + + async fn recv_response( + rx: &mut mpsc::Receiver, + ) -> HeartbeatResponseResult { + rx.recv().await.unwrap() + } + + #[tokio::test] + async fn test_heartbeat_session_init_returns_none_on_empty_stream() { + let (tx, _rx) = mpsc::channel(8); + let handler_group = test_handler_group(); + let requests = MockHeartbeatRequestStream::new(vec![]); + + let session = init_session( + requests, + tx, + None::, + handler_group.clone(), + ) + .await; + + assert!(session.is_none()); + assert!( + !handler_group + .contains_pusher(&sender_id(Role::Datanode, 42)) + .await + ); + } + + #[tokio::test] + async fn test_heartbeat_session_init_forwards_first_stream_error() { + let (tx, mut rx) = mpsc::channel(8); + let handler_group = test_handler_group(); + let requests = MockHeartbeatRequestStream::new(vec![Err(Status::internal("boom"))]); + + let session = init_session(requests, tx, None::, handler_group).await; + + assert!(session.is_none()); + let status = recv_response(&mut rx).await.unwrap_err(); + assert_eq!(Code::Internal, status.code()); + assert_eq!("boom", status.message()); + } + + #[tokio::test] + async fn test_heartbeat_session_init_sends_error_on_missing_header() { + let (tx, mut rx) = mpsc::channel(8); + let handler_group = test_handler_group(); + let requests = MockHeartbeatRequestStream::new(vec![Ok(HeartbeatRequest::default())]); + + let session = init_session( + requests, + tx, + None::, + handler_group.clone(), + ) + .await; + + assert!(session.is_none()); + assert!( + !handler_group + .contains_pusher(&sender_id(Role::Datanode, 42)) + .await + ); + + let status = recv_response(&mut rx).await.unwrap_err(); + assert_eq!(Code::InvalidArgument, status.code()); + } + + #[tokio::test] + async fn test_heartbeat_session_init_registers_sender() { + let (tx, mut rx) = mpsc::channel(8); + let handler_group = test_handler_group(); + let sender_id = sender_id(Role::Datanode, 42); + let requests = + MockHeartbeatRequestStream::new(vec![Ok(heartbeat_request(Role::Datanode, 42))]); + + let session = init_session( + requests, + tx, + None::, + handler_group.clone(), + ) + .await; + + assert!(session.is_some()); + assert!(handler_group.contains_pusher(&sender_id).await); + + let response = recv_response(&mut rx).await.unwrap(); + assert!(response.heartbeat_config.is_some()); + } + + #[tokio::test] + async fn test_heartbeat_session_run_deregisters_sender_on_stream_close() { + let (tx, mut rx) = mpsc::channel(8); + let handler_group = test_handler_group(); + let sender_id = sender_id(Role::Datanode, 42); + let requests = + MockHeartbeatRequestStream::new(vec![Ok(heartbeat_request(Role::Datanode, 42))]); + let session = init_session( + requests, + tx, + None::, + handler_group.clone(), + ) + .await + .unwrap(); + let _ = recv_response(&mut rx).await.unwrap(); + + session.run().await; + + assert!(!handler_group.contains_pusher(&sender_id).await); + } + + #[tokio::test] + async fn test_heartbeat_session_run_forwards_stream_error_after_init() { + let (tx, mut rx) = mpsc::channel(8); + let handler_group = test_handler_group(); + let sender_id = sender_id(Role::Datanode, 42); + let requests = MockHeartbeatRequestStream::new(vec![ + Ok(heartbeat_request(Role::Datanode, 42)), + Err(Status::unavailable("temporary")), + ]); + let session = init_session( + requests, + tx, + None::, + handler_group.clone(), + ) + .await + .unwrap(); + let _ = recv_response(&mut rx).await.unwrap(); + + session.run().await; + + let status = recv_response(&mut rx).await.unwrap_err(); + assert_eq!(Code::Unavailable, status.code()); + assert_eq!("temporary", status.message()); + assert!(!handler_group.contains_pusher(&sender_id).await); + } + + #[tokio::test] + async fn test_heartbeat_session_leader_step_down_sends_aborted_and_deregisters() { + let (tx, mut rx) = mpsc::channel(8); + let handler_group = test_handler_group(); + let sender_id = sender_id(Role::Datanode, 42); + let requests = MockHeartbeatRequestStream::pending_after(vec![Ok(heartbeat_request( + Role::Datanode, + 42, + ))]); + let session = init_session( + requests, + tx, + Some(MockLeaderStepDown::new(LeaderStepDownEvent::StepDown)), + handler_group.clone(), + ) + .await + .unwrap(); + let _ = recv_response(&mut rx).await.unwrap(); + + session.run().await; + + let status = recv_response(&mut rx).await.unwrap_err(); + assert_eq!(Code::Aborted, status.code()); + assert!(!handler_group.contains_pusher(&sender_id).await); + } + + #[tokio::test] + async fn test_heartbeat_session_leader_watcher_closed_sends_unavailable_and_deregisters() { + let (tx, mut rx) = mpsc::channel(8); + let handler_group = test_handler_group(); + let sender_id = sender_id(Role::Datanode, 42); + let requests = MockHeartbeatRequestStream::pending_after(vec![Ok(heartbeat_request( + Role::Datanode, + 42, + ))]); + let session = init_session( + requests, + tx, + Some(MockLeaderStepDown::new(LeaderStepDownEvent::Closed)), + handler_group.clone(), + ) + .await + .unwrap(); + let _ = recv_response(&mut rx).await.unwrap(); + + session.run().await; + + let status = recv_response(&mut rx).await.unwrap_err(); + assert_eq!(Code::Unavailable, status.code()); + assert!(!handler_group.contains_pusher(&sender_id).await); + } + #[tokio::test] async fn test_ask_leader() { let kv_backend = Arc::new(MemoryKvBackend::new()); diff --git a/src/meta-srv/src/service/mailbox.rs b/src/meta-srv/src/service/mailbox.rs index 8b37eeaad5..86b631998b 100644 --- a/src/meta-srv/src/service/mailbox.rs +++ b/src/meta-srv/src/service/mailbox.rs @@ -207,9 +207,6 @@ pub trait Mailbox: Send + Sync { async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>; async fn on_recv(&self, id: MessageId, maybe_msg: Result) -> Result<()>; - - /// Reset all pushers of the mailbox. - async fn reset(&self); } #[cfg(test)]