walkeeper: Add parsing check for hot standby tag (#597)

This commit is contained in:
Max Sharnoff
2021-09-16 17:04:35 +01:00
committed by GitHub
parent 7dda9f2894
commit bbe4f39790

View File

@@ -25,6 +25,10 @@ use zenith_utils::sock_split::ReadStream;
pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX;
// See: https://www.postgresql.org/docs/13/protocol-replication.html
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
type FullTransactionId = u64;
/// Hot standby feedback received from replica
@@ -81,11 +85,22 @@ impl ReplicationConn {
) -> Result<()> {
// Wait for replica's feedback.
while let Some(msg) = FeMessage::read(&mut stream_in)? {
match msg {
match &msg {
FeMessage::CopyData(m) => {
let feedback = HotStandbyFeedback::des(&m)
.context("failed to deserialize HotStandbyFeedback")?;
subscriber.add_hs_feedback(feedback);
// There's two possible data messages that the client is supposed to send here:
// `HotStandbyFeedback` and `StandbyStatusUpdate`. We only handle hot standby
// feedback.
match m.first().cloned() {
Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
// Note: deserializing is on m[1..] because we skip the tag byte.
let feedback = HotStandbyFeedback::des(&m[1..])
.context("failed to deserialize HotStandbyFeedback")?;
subscriber.add_hs_feedback(feedback);
}
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => (),
_ => warn!("unexpected message {:?}", msg),
}
}
FeMessage::Sync => {}
FeMessage::CopyFail => return Err(anyhow!("Copy failed")),