From 5f272380a2925203333b8fcfaf0877ee56b31374 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 31 Mar 2021 20:00:02 +0300 Subject: [PATCH] Don't panic on XLOG_SWITCH records. --- src/waldecoder.rs | 35 ++++++++++++++++++++++++++++++++++- src/walreceiver.rs | 2 +- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/waldecoder.rs b/src/waldecoder.rs index da57d4401c..7f0975dd41 100644 --- a/src/waldecoder.rs +++ b/src/waldecoder.rs @@ -166,11 +166,21 @@ impl WalStreamDecoder { if self.contlen == 0 { let recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()); + let recordbuf = recordbuf.freeze(); + + // XLOG_SWITCH records are special. If we see one, we need to skip + // to the next WAL segment. + if is_xlog_switch_record(&recordbuf) { + trace!("saw xlog switch record at {:X}/{:X}", + (self.lsn >> 32), self.lsn & 0xffffffff); + self.padlen = (WAL_SEGMENT_SIZE - (self.lsn % WAL_SEGMENT_SIZE)) as u32; + } + if self.lsn % 8 != 0 { self.padlen = 8 - (self.lsn % 8) as u32; } - let result = (self.lsn, recordbuf.freeze()); + let result = (self.lsn, recordbuf); return Some(result); } continue; @@ -288,6 +298,29 @@ pub struct DecodedWALRecord { pub blocks: Vec } +// From pg_control.h and rmgrlist.h +const XLOG_SWITCH:u8 = 0x40; +const RM_XLOG_ID:u8 = 0; + +// Is this record an XLOG_SWITCH record? They need some special processing, +// so we need to check for that before the rest of the parsing. +// +// FIXME: refactor this and decode_wal_record() below to avoid the duplication. +fn is_xlog_switch_record(rec: &Bytes) -> bool { + let mut buf = rec.clone(); + + // FIXME: assume little-endian here + let _xl_tot_len = buf.get_u32_le(); + let _xl_xid = buf.get_u32_le(); + let _xl_prev = buf.get_u64_le(); + let xl_info = buf.get_u8(); + let xl_rmid = buf.get_u8(); + buf.advance(2); // 2 bytes of padding + let _xl_crc = buf.get_u32_le(); + + return xl_info == XLOG_SWITCH && xl_rmid == RM_XLOG_ID; +} + // // Routines to decode a WAL record and figure out which blocks are modified // diff --git a/src/walreceiver.rs b/src/walreceiver.rs index dbe67dbd7a..9f2f2c36c4 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -100,7 +100,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { let decoded = crate::waldecoder::decode_wal_record(startlsn, recdata.clone()); // Put the WAL record to the page cache. We make a separate copy of - // it for every block it modifes. (The actual WAL record is kept in + // it for every block it modifies. (The actual WAL record is kept in // a Bytes, which uses a reference counter for the underlying buffer, // so having multiple copies of it doesn't cost that much) for blk in decoded.blocks.iter() {