add Lsn::block_offset, remaining_in_block, calc_padding

Replace open-coded math with member fns.
This commit is contained in:
Eric Seppanen
2021-04-25 10:57:33 -07:00
parent 1c775bdcac
commit 648755a25e
3 changed files with 36 additions and 18 deletions

View File

@@ -6,8 +6,6 @@ use std::str;
use thiserror::Error;
use zenith_utils::lsn::Lsn;
const XLOG_BLCKSZ: u32 = 8192;
// FIXME: this is configurable in PostgreSQL, 16 MB is the default
const WAL_SEGMENT_SIZE: u64 = 16 * 1024 * 1024;
@@ -111,10 +109,7 @@ impl WalStreamDecoder {
self.lsn += SizeOfXLogLongPHD as u64;
continue;
} else if self.lsn.0 % (XLOG_BLCKSZ as u64) == 0 {
// FIXME: make this a member of Lsn, but what should it be called?
// parse page header
} else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < SizeOfXLogShortPHD {
return Ok(None);
}
@@ -165,8 +160,7 @@ impl WalStreamDecoder {
continue;
} else {
// we're continuing a record, possibly from previous page.
// FIXME: Should any of this math be captured into Lsn or a related type?
let pageleft: u32 = XLOG_BLCKSZ - (self.lsn.0 % (XLOG_BLCKSZ as u64)) as u32;
let pageleft = self.lsn.remaining_in_block() as u32;
// read the rest of the record, or as much as fits on this page.
let n = min(self.contlen, pageleft) as usize;
@@ -188,12 +182,10 @@ impl WalStreamDecoder {
// to the next WAL segment.
if is_xlog_switch_record(&recordbuf) {
trace!("saw xlog switch record at {}", self.lsn);
self.padlen = (WAL_SEGMENT_SIZE - (self.lsn.0 % WAL_SEGMENT_SIZE)) as u32;
}
// FIXME: what does this code do?
if self.lsn.0 % 8 != 0 {
self.padlen = 8 - (self.lsn.0 % 8) as u32;
self.padlen = self.lsn.calc_padding(WAL_SEGMENT_SIZE) as u32;
} else {
// Pad to an 8-byte boundary
self.padlen = self.lsn.calc_padding(8u32) as u32;
}
let result = (self.lsn, recordbuf);

View File

@@ -166,10 +166,7 @@ async fn walreceiver_main(
// FIXME: It probably would be better to always start streaming from the beginning
// of the page, or the segment, so that we could check the page/segment headers
// too. Just for the sake of paranoia.
// FIXME: should any of this logic move inside the Lsn type?
if startpoint.0 % 8 != 0 {
startpoint += 8 - (startpoint.0 % 8);
}
startpoint += startpoint.calc_padding(8u32);
}
debug!(
"last_valid_lsn {} starting replication from {} for timeline {}, server is at {}...",

View File

@@ -6,6 +6,9 @@ use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
/// Transaction log block size in bytes
pub const XLOG_BLCKSZ: u32 = 8192;
/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
#[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
pub struct Lsn(pub u64);
@@ -54,6 +57,32 @@ impl Lsn {
pub fn segment_number(self, seg_sz: u64) -> u64 {
self.0 / seg_sz
}
/// Compute the offset into a block
pub fn block_offset(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
self.0 % BLCKSZ
}
/// Compute the bytes remaining in this block
///
/// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
pub fn remaining_in_block(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
BLCKSZ - (self.0 % BLCKSZ)
}
/// Compute the bytes remaining to fill a chunk of some size
///
/// If the LSN is already at the chunk boundary, it will return 0.
pub fn calc_padding<T: Into<u64>>(self, sz: T) -> u64 {
let sz: u64 = sz.into();
// By using wrapping_sub, we can subtract first and then mod second.
// If it's done the other way around, then we would return a full
// chunk size if we're already at the chunk boundary.
// (Regular subtraction will panic on overflow in debug builds.)
(sz.wrapping_sub(self.0)) % sz
}
}
impl From<u64> for Lsn {