From dfd4b104935f6f47755a5d03ee93784e3f753cc8 Mon Sep 17 00:00:00 2001 From: "Lei, Huang" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 8 Nov 2022 19:23:02 +0800 Subject: [PATCH] feat: add shutdown mechanism for HeartbeatTask (#424) --- src/datanode/src/heartbeat.rs | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 229742472f..ba9f6e8366 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -13,24 +13,33 @@ use crate::error::{MetaClientInitSnafu, Result}; pub struct HeartbeatTask { node_id: u64, server_addr: String, - started: Arc, + running: Arc, meta_client: MetaClient, interval: u64, } +impl Drop for HeartbeatTask { + fn drop(&mut self) { + self.running.store(false, Ordering::Release); + } +} + impl HeartbeatTask { /// Create a new heartbeat task instance. pub fn new(node_id: u64, server_addr: String, meta_client: MetaClient) -> Self { Self { node_id, server_addr, - started: Arc::new(AtomicBool::new(false)), + running: Arc::new(AtomicBool::new(false)), meta_client, interval: 5_000, // default interval is set to 5 secs } } - pub async fn create_streams(meta_client: &MetaClient) -> Result { + pub async fn create_streams( + meta_client: &MetaClient, + running: Arc, + ) -> Result { let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?; common_runtime::spawn_bg(async move { while let Some(res) = match rx.message().await { @@ -41,6 +50,9 @@ impl HeartbeatTask { } } { Self::handle_response(res).await; + if !running.load(Ordering::Acquire) { + info!("Heartbeat task shutdown"); + } } info!("Heartbeat handling loop exit.") }); @@ -53,8 +65,8 @@ impl HeartbeatTask { /// Start heartbeat task, spawn background task. pub async fn start(&self) -> Result<()> { - let started = self.started.clone(); - if started + let running = self.running.clone(); + if running .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_err() { @@ -66,9 +78,9 @@ impl HeartbeatTask { let server_addr = self.server_addr.clone(); let meta_client = self.meta_client.clone(); - let mut tx = Self::create_streams(&meta_client).await?; + let mut tx = Self::create_streams(&meta_client, running.clone()).await?; common_runtime::spawn_bg(async move { - while started.load(Ordering::Acquire) { + while running.load(Ordering::Acquire) { let req = HeartbeatRequest { peer: Some(Peer { id: node_id, @@ -78,7 +90,7 @@ impl HeartbeatTask { }; if let Err(e) = tx.send(req).await { error!("Failed to send heartbeat to metasrv, error: {:?}", e); - match Self::create_streams(&meta_client).await { + match Self::create_streams(&meta_client, running.clone()).await { Ok(new_tx) => { info!("Reconnected to metasrv"); tx = new_tx;