From bbe4f3979034dd9bd6b526cc303684e8ee223367 Mon Sep 17 00:00:00 2001 From: Max Sharnoff Date: Thu, 16 Sep 2021 17:04:35 +0100 Subject: [PATCH] walkeeper: Add parsing check for hot standby tag (#597) --- walkeeper/src/replication.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/walkeeper/src/replication.rs b/walkeeper/src/replication.rs index 46bcebbd45..4ec5f16ba9 100644 --- a/walkeeper/src/replication.rs +++ b/walkeeper/src/replication.rs @@ -25,6 +25,10 @@ use zenith_utils::sock_split::ReadStream; pub const END_REPLICATION_MARKER: Lsn = Lsn::MAX; +// See: https://www.postgresql.org/docs/13/protocol-replication.html +const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; +const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; + type FullTransactionId = u64; /// Hot standby feedback received from replica @@ -81,11 +85,22 @@ impl ReplicationConn { ) -> Result<()> { // Wait for replica's feedback. while let Some(msg) = FeMessage::read(&mut stream_in)? { - match msg { + match &msg { FeMessage::CopyData(m) => { - let feedback = HotStandbyFeedback::des(&m) - .context("failed to deserialize HotStandbyFeedback")?; - subscriber.add_hs_feedback(feedback); + // There's two possible data messages that the client is supposed to send here: + // `HotStandbyFeedback` and `StandbyStatusUpdate`. We only handle hot standby + // feedback. + + match m.first().cloned() { + Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => { + // Note: deserializing is on m[1..] because we skip the tag byte. + let feedback = HotStandbyFeedback::des(&m[1..]) + .context("failed to deserialize HotStandbyFeedback")?; + subscriber.add_hs_feedback(feedback); + } + Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => (), + _ => warn!("unexpected message {:?}", msg), + } } FeMessage::Sync => {} FeMessage::CopyFail => return Err(anyhow!("Copy failed")),