feat: add shutdown mechanism for HeartbeatTask (#424)

This commit is contained in:
Lei, Huang
2022-11-08 19:23:02 +08:00
committed by GitHub
parent dd488e8d21
commit dfd4b10493

View File

@@ -13,24 +13,33 @@ use crate::error::{MetaClientInitSnafu, Result};
pub struct HeartbeatTask {
node_id: u64,
server_addr: String,
started: Arc<AtomicBool>,
running: Arc<AtomicBool>,
meta_client: MetaClient,
interval: u64,
}
impl Drop for HeartbeatTask {
fn drop(&mut self) {
self.running.store(false, Ordering::Release);
}
}
impl HeartbeatTask {
/// Create a new heartbeat task instance.
pub fn new(node_id: u64, server_addr: String, meta_client: MetaClient) -> Self {
Self {
node_id,
server_addr,
started: Arc::new(AtomicBool::new(false)),
running: Arc::new(AtomicBool::new(false)),
meta_client,
interval: 5_000, // default interval is set to 5 secs
}
}
pub async fn create_streams(meta_client: &MetaClient) -> Result<HeartbeatSender> {
pub async fn create_streams(
meta_client: &MetaClient,
running: Arc<AtomicBool>,
) -> Result<HeartbeatSender> {
let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
common_runtime::spawn_bg(async move {
while let Some(res) = match rx.message().await {
@@ -41,6 +50,9 @@ impl HeartbeatTask {
}
} {
Self::handle_response(res).await;
if !running.load(Ordering::Acquire) {
info!("Heartbeat task shutdown");
}
}
info!("Heartbeat handling loop exit.")
});
@@ -53,8 +65,8 @@ impl HeartbeatTask {
/// Start heartbeat task, spawn background task.
pub async fn start(&self) -> Result<()> {
let started = self.started.clone();
if started
let running = self.running.clone();
if running
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
@@ -66,9 +78,9 @@ impl HeartbeatTask {
let server_addr = self.server_addr.clone();
let meta_client = self.meta_client.clone();
let mut tx = Self::create_streams(&meta_client).await?;
let mut tx = Self::create_streams(&meta_client, running.clone()).await?;
common_runtime::spawn_bg(async move {
while started.load(Ordering::Acquire) {
while running.load(Ordering::Acquire) {
let req = HeartbeatRequest {
peer: Some(Peer {
id: node_id,
@@ -78,7 +90,7 @@ impl HeartbeatTask {
};
if let Err(e) = tx.send(req).await {
error!("Failed to send heartbeat to metasrv, error: {:?}", e);
match Self::create_streams(&meta_client).await {
match Self::create_streams(&meta_client, running.clone()).await {
Ok(new_tx) => {
info!("Reconnected to metasrv");
tx = new_tx;