fix: handle heartbeat shutdown gracefully (#2886)

* fix: handle heartbeat shutdown gracefully

Signed-off-by: tison <wander4096@gmail.com>

* improve logging

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
This commit is contained in:
tison
2023-12-08 11:59:05 +08:00
committed by GitHub
parent 58183fe72f
commit bfb4794cfa

View File

@@ -55,10 +55,12 @@ impl heartbeat_server::Heartbeat for MetaSrv {
Some(header) => header,
None => {
let err = error::MissingRequestHeaderSnafu {}.build();
tx.send(Err(err.into())).await.expect("working rx");
error!("Exit on malformed request: MissingRequestHeader");
let _ = tx.send(Err(err.into())).await;
break;
}
};
debug!("Receiving heartbeat request: {:?}", req);
if pusher_key.is_none() {
@@ -78,7 +80,10 @@ impl heartbeat_server::Heartbeat for MetaSrv {
is_not_leader = res.as_ref().map_or(false, |r| r.is_not_leader());
debug!("Sending heartbeat response: {:?}", res);
tx.send(res).await.expect("working rx");
if tx.send(res).await.is_err() {
info!("ReceiverStream was dropped; shutting down");
break;
}
}
Err(err) => {
if let Some(io_err) = error::match_for_io_error(&err) {
@@ -89,9 +94,9 @@ impl heartbeat_server::Heartbeat for MetaSrv {
}
}
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was dropped
if tx.send(Err(err)).await.is_err() {
info!("ReceiverStream was dropped; shutting down");
break;
}
}
}
@@ -101,10 +106,12 @@ impl heartbeat_server::Heartbeat for MetaSrv {
break;
}
}
info!(
"Heartbeat stream broken: {:?}",
"Heartbeat stream closed: {:?}",
pusher_key.as_ref().unwrap_or(&"unknown".to_string())
);
if let Some(key) = pusher_key {
let _ = handler_group.unregister(&key).await;
}