diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 29cd8109de..33e16ca336 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -6,8 +6,6 @@ use std::str; use thiserror::Error; use zenith_utils::lsn::Lsn; -const XLOG_BLCKSZ: u32 = 8192; - // FIXME: this is configurable in PostgreSQL, 16 MB is the default const WAL_SEGMENT_SIZE: u64 = 16 * 1024 * 1024; @@ -111,10 +109,7 @@ impl WalStreamDecoder { self.lsn += SizeOfXLogLongPHD as u64; continue; - } else if self.lsn.0 % (XLOG_BLCKSZ as u64) == 0 { - // FIXME: make this a member of Lsn, but what should it be called? - // parse page header - + } else if self.lsn.block_offset() == 0 { if self.inputbuf.remaining() < SizeOfXLogShortPHD { return Ok(None); } @@ -165,8 +160,7 @@ impl WalStreamDecoder { continue; } else { // we're continuing a record, possibly from previous page. - // FIXME: Should any of this math be captured into Lsn or a related type? - let pageleft: u32 = XLOG_BLCKSZ - (self.lsn.0 % (XLOG_BLCKSZ as u64)) as u32; + let pageleft = self.lsn.remaining_in_block() as u32; // read the rest of the record, or as much as fits on this page. let n = min(self.contlen, pageleft) as usize; @@ -188,12 +182,10 @@ impl WalStreamDecoder { // to the next WAL segment. if is_xlog_switch_record(&recordbuf) { trace!("saw xlog switch record at {}", self.lsn); - self.padlen = (WAL_SEGMENT_SIZE - (self.lsn.0 % WAL_SEGMENT_SIZE)) as u32; - } - - // FIXME: what does this code do? - if self.lsn.0 % 8 != 0 { - self.padlen = 8 - (self.lsn.0 % 8) as u32; + self.padlen = self.lsn.calc_padding(WAL_SEGMENT_SIZE) as u32; + } else { + // Pad to an 8-byte boundary + self.padlen = self.lsn.calc_padding(8u32) as u32; } let result = (self.lsn, recordbuf); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 18d7aa842e..5ef5f1cf02 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -166,10 +166,7 @@ async fn walreceiver_main( // FIXME: It probably would be better to always start streaming from the beginning // of the page, or the segment, so that we could check the page/segment headers // too. Just for the sake of paranoia. - // FIXME: should any of this logic move inside the Lsn type? - if startpoint.0 % 8 != 0 { - startpoint += 8 - (startpoint.0 % 8); - } + startpoint += startpoint.calc_padding(8u32); } debug!( "last_valid_lsn {} starting replication from {} for timeline {}, server is at {}...", diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index e16fb678a3..e4fb3dc018 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -6,6 +6,9 @@ use std::path::Path; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; +/// Transaction log block size in bytes +pub const XLOG_BLCKSZ: u32 = 8192; + /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr #[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] pub struct Lsn(pub u64); @@ -54,6 +57,32 @@ impl Lsn { pub fn segment_number(self, seg_sz: u64) -> u64 { self.0 / seg_sz } + + /// Compute the offset into a block + pub fn block_offset(self) -> u64 { + const BLCKSZ: u64 = XLOG_BLCKSZ as u64; + self.0 % BLCKSZ + } + + /// Compute the bytes remaining in this block + /// + /// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`. + pub fn remaining_in_block(self) -> u64 { + const BLCKSZ: u64 = XLOG_BLCKSZ as u64; + BLCKSZ - (self.0 % BLCKSZ) + } + + /// Compute the bytes remaining to fill a chunk of some size + /// + /// If the LSN is already at the chunk boundary, it will return 0. + pub fn calc_padding>(self, sz: T) -> u64 { + let sz: u64 = sz.into(); + // By using wrapping_sub, we can subtract first and then mod second. + // If it's done the other way around, then we would return a full + // chunk size if we're already at the chunk boundary. + // (Regular subtraction will panic on overflow in debug builds.) + (sz.wrapping_sub(self.0)) % sz + } } impl From for Lsn {