mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
745 lines
28 KiB
Rust
745 lines
28 KiB
Rust
//
|
|
// This file contains common utilities for dealing with PostgreSQL WAL files and
|
|
// LSNs.
|
|
//
|
|
// Many of these functions have been copied from PostgreSQL, and rewritten in
|
|
// Rust. That's why they don't follow the usual Rust naming conventions, they
|
|
// have been named the same as the corresponding PostgreSQL functions instead.
|
|
//
|
|
|
|
use crate::pg_constants;
|
|
use crate::CheckPoint;
|
|
use crate::FullTransactionId;
|
|
use crate::XLogLongPageHeaderData;
|
|
use crate::XLogPageHeaderData;
|
|
use crate::XLogRecord;
|
|
use crate::XLOG_PAGE_MAGIC;
|
|
|
|
use anyhow::{bail, ensure};
|
|
use byteorder::{ByteOrder, LittleEndian};
|
|
use bytes::BytesMut;
|
|
use bytes::{Buf, Bytes};
|
|
use crc32c::*;
|
|
use log::*;
|
|
use std::cmp::max;
|
|
use std::cmp::min;
|
|
use std::fs::{self, File};
|
|
use std::io::prelude::*;
|
|
use std::io::SeekFrom;
|
|
use std::path::{Path, PathBuf};
|
|
use std::time::SystemTime;
|
|
use utils::bin_ser::DeserializeError;
|
|
use utils::bin_ser::SerializeError;
|
|
use utils::const_assert;
|
|
use utils::lsn::Lsn;
|
|
|
|
pub const XLOG_FNAME_LEN: usize = 24;
|
|
pub const XLOG_BLCKSZ: usize = 8192;
|
|
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
|
|
pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
|
|
pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
|
|
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
|
|
|
|
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
|
|
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
|
|
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
|
|
#[allow(clippy::identity_op)]
|
|
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
|
|
|
|
// PG timeline is always 1, changing it doesn't have useful meaning in Zenith.
|
|
pub const PG_TLI: u32 = 1;
|
|
|
|
pub type XLogRecPtr = u64;
|
|
pub type TimeLineID = u32;
|
|
pub type TimestampTz = i64;
|
|
pub type XLogSegNo = u64;
|
|
|
|
/// Interval of checkpointing metadata file. We should store metadata file to enforce
|
|
/// predicate that checkpoint.nextXid is larger than any XID in WAL.
|
|
/// But flushing checkpoint file for each transaction seems to be too expensive,
|
|
/// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
|
|
/// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
|
|
/// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
|
|
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
|
|
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
|
|
|
|
#[allow(non_snake_case)]
|
|
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
|
|
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
|
|
}
|
|
|
|
#[allow(non_snake_case)]
|
|
pub fn XLogSegNoOffsetToRecPtr(
|
|
segno: XLogSegNo,
|
|
offset: u32,
|
|
wal_segsz_bytes: usize,
|
|
) -> XLogRecPtr {
|
|
segno * (wal_segsz_bytes as u64) + (offset as u64)
|
|
}
|
|
|
|
#[allow(non_snake_case)]
|
|
pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String {
|
|
return format!(
|
|
"{:>08X}{:>08X}{:>08X}",
|
|
tli,
|
|
logSegNo / XLogSegmentsPerXLogId(wal_segsz_bytes),
|
|
logSegNo % XLogSegmentsPerXLogId(wal_segsz_bytes)
|
|
);
|
|
}
|
|
|
|
#[allow(non_snake_case)]
|
|
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
|
|
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
|
|
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
|
|
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
|
|
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
|
|
}
|
|
|
|
#[allow(non_snake_case)]
|
|
pub fn IsXLogFileName(fname: &str) -> bool {
|
|
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
|
|
}
|
|
|
|
#[allow(non_snake_case)]
|
|
pub fn IsPartialXLogFileName(fname: &str) -> bool {
|
|
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
|
|
}
|
|
|
|
/// If LSN points to the beginning of the page, then shift it to first record,
|
|
/// otherwise align on 8-bytes boundary (required for WAL records)
|
|
pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
|
|
if lsn.0 % XLOG_BLCKSZ as u64 == 0 {
|
|
let hdr_size = if lsn.0 % seg_sz as u64 == 0 {
|
|
XLOG_SIZE_OF_XLOG_LONG_PHD
|
|
} else {
|
|
XLOG_SIZE_OF_XLOG_SHORT_PHD
|
|
};
|
|
lsn + hdr_size as u64
|
|
} else {
|
|
lsn.align()
|
|
}
|
|
}
|
|
|
|
pub fn get_current_timestamp() -> TimestampTz {
|
|
to_pg_timestamp(SystemTime::now())
|
|
}
|
|
|
|
pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz {
|
|
const UNIX_EPOCH_JDATE: u64 = 2440588; /* == date2j(1970, 1, 1) */
|
|
const POSTGRES_EPOCH_JDATE: u64 = 2451545; /* == date2j(2000, 1, 1) */
|
|
const SECS_PER_DAY: u64 = 86400;
|
|
const USECS_PER_SEC: u64 = 1000000;
|
|
match time.duration_since(SystemTime::UNIX_EPOCH) {
|
|
Ok(n) => {
|
|
((n.as_secs() - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY))
|
|
* USECS_PER_SEC
|
|
+ n.subsec_micros() as u64) as i64
|
|
}
|
|
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
|
|
}
|
|
}
|
|
|
|
/// Return offset of the last valid record in the segment segno, starting
|
|
/// looking at start_offset. Returns start_offset if no records found.
|
|
fn find_end_of_wal_segment(
|
|
data_dir: &Path,
|
|
segno: XLogSegNo,
|
|
tli: TimeLineID,
|
|
wal_seg_size: usize,
|
|
start_offset: usize, // start reading at this point
|
|
) -> anyhow::Result<u32> {
|
|
// step back to the beginning of the page to read it in...
|
|
let mut offs: usize = start_offset - start_offset % XLOG_BLCKSZ;
|
|
let mut skipping_first_contrecord: bool = false;
|
|
let mut contlen: usize = 0;
|
|
let mut xl_crc: u32 = 0;
|
|
let mut crc: u32 = 0;
|
|
let mut rec_offs: usize = 0;
|
|
let mut buf = [0u8; XLOG_BLCKSZ];
|
|
let file_name = XLogFileName(tli, segno, wal_seg_size);
|
|
let mut last_valid_rec_pos: usize = start_offset; // assume at given start_offset begins new record
|
|
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
|
|
file.seek(SeekFrom::Start(offs as u64))?;
|
|
// xl_crc is the last field in XLogRecord, will not be read into rec_hdr
|
|
const_assert!(XLOG_RECORD_CRC_OFFS + 4 == XLOG_SIZE_OF_XLOG_RECORD);
|
|
let mut rec_hdr = [0u8; XLOG_RECORD_CRC_OFFS];
|
|
|
|
trace!("find_end_of_wal_segment(data_dir={}, segno={}, tli={}, wal_seg_size={}, start_offset=0x{:x})", data_dir.display(), segno, tli, wal_seg_size, start_offset);
|
|
while offs < wal_seg_size {
|
|
// we are at the beginning of the page; read it in
|
|
if offs % XLOG_BLCKSZ == 0 {
|
|
trace!("offs=0x{:x}: new page", offs);
|
|
let bytes_read = file.read(&mut buf)?;
|
|
if bytes_read != buf.len() {
|
|
bail!(
|
|
"failed to read {} bytes from {} at {}",
|
|
XLOG_BLCKSZ,
|
|
file_name,
|
|
offs
|
|
);
|
|
}
|
|
|
|
let xlp_magic = LittleEndian::read_u16(&buf[0..2]);
|
|
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
|
|
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
|
|
trace!(
|
|
" xlp_magic=0x{:x}, xlp_info=0x{:x}, xlp_rem_len={}",
|
|
xlp_magic,
|
|
xlp_info,
|
|
xlp_rem_len
|
|
);
|
|
// this is expected in current usage when valid WAL starts after page header
|
|
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
|
|
trace!(
|
|
" invalid WAL file {}.partial magic {} at {:?}",
|
|
file_name,
|
|
xlp_magic,
|
|
Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)),
|
|
);
|
|
}
|
|
if offs == 0 {
|
|
offs += XLOG_SIZE_OF_XLOG_LONG_PHD;
|
|
if (xlp_info & XLP_FIRST_IS_CONTRECORD) != 0 {
|
|
trace!(" first record is contrecord");
|
|
skipping_first_contrecord = true;
|
|
contlen = xlp_rem_len as usize;
|
|
if offs < start_offset {
|
|
// Pre-condition failed: the beginning of the segment is unexpectedly corrupted.
|
|
ensure!(start_offset - offs >= contlen,
|
|
"start_offset is in the middle of the first record (which happens to be a contrecord), \
|
|
expected to be on a record boundary. Is beginning of the segment corrupted?");
|
|
contlen = 0;
|
|
// keep skipping_first_contrecord to avoid counting the contrecord as valid, we did not check it.
|
|
}
|
|
} else {
|
|
trace!(" first record is not contrecord");
|
|
}
|
|
} else {
|
|
offs += XLOG_SIZE_OF_XLOG_SHORT_PHD;
|
|
}
|
|
// ... and step forward again if asked
|
|
trace!(" skipped header to 0x{:x}", offs);
|
|
offs = max(offs, start_offset);
|
|
// beginning of the next record
|
|
} else if contlen == 0 {
|
|
let page_offs = offs % XLOG_BLCKSZ;
|
|
let xl_tot_len = LittleEndian::read_u32(&buf[page_offs..page_offs + 4]) as usize;
|
|
trace!("offs=0x{:x}: new record, xl_tot_len={}", offs, xl_tot_len);
|
|
if xl_tot_len == 0 {
|
|
info!(
|
|
"find_end_of_wal_segment reached zeros at {:?}, last records ends at {:?}",
|
|
Lsn(XLogSegNoOffsetToRecPtr(segno, offs as u32, wal_seg_size)),
|
|
Lsn(XLogSegNoOffsetToRecPtr(
|
|
segno,
|
|
last_valid_rec_pos as u32,
|
|
wal_seg_size
|
|
))
|
|
);
|
|
break; // zeros, reached the end
|
|
}
|
|
if skipping_first_contrecord {
|
|
skipping_first_contrecord = false;
|
|
trace!(" first contrecord has been just completed");
|
|
} else {
|
|
trace!(
|
|
" updating last_valid_rec_pos: 0x{:x} --> 0x{:x}",
|
|
last_valid_rec_pos,
|
|
offs
|
|
);
|
|
last_valid_rec_pos = offs;
|
|
}
|
|
offs += 4;
|
|
rec_offs = 4;
|
|
contlen = xl_tot_len - 4;
|
|
trace!(
|
|
" reading rec_hdr[0..4] <-- [0x{:x}; 0x{:x})",
|
|
page_offs,
|
|
page_offs + 4
|
|
);
|
|
rec_hdr[0..4].copy_from_slice(&buf[page_offs..page_offs + 4]);
|
|
} else {
|
|
// we're continuing a record, possibly from previous page.
|
|
let page_offs = offs % XLOG_BLCKSZ;
|
|
let pageleft = XLOG_BLCKSZ - page_offs;
|
|
|
|
// read the rest of the record, or as much as fits on this page.
|
|
let n = min(contlen, pageleft);
|
|
trace!(
|
|
"offs=0x{:x}, record continuation, pageleft={}, contlen={}",
|
|
offs,
|
|
pageleft,
|
|
contlen
|
|
);
|
|
// fill rec_hdr header up to (but not including) xl_crc field
|
|
trace!(
|
|
" rec_offs={}, XLOG_RECORD_CRC_OFFS={}, XLOG_SIZE_OF_XLOG_RECORD={}",
|
|
rec_offs,
|
|
XLOG_RECORD_CRC_OFFS,
|
|
XLOG_SIZE_OF_XLOG_RECORD
|
|
);
|
|
if rec_offs < XLOG_RECORD_CRC_OFFS {
|
|
let len = min(XLOG_RECORD_CRC_OFFS - rec_offs, n);
|
|
trace!(
|
|
" reading rec_hdr[{}..{}] <-- [0x{:x}; 0x{:x})",
|
|
rec_offs,
|
|
rec_offs + len,
|
|
page_offs,
|
|
page_offs + len
|
|
);
|
|
rec_hdr[rec_offs..rec_offs + len].copy_from_slice(&buf[page_offs..page_offs + len]);
|
|
}
|
|
if rec_offs <= XLOG_RECORD_CRC_OFFS && rec_offs + n >= XLOG_SIZE_OF_XLOG_RECORD {
|
|
let crc_offs = page_offs - rec_offs + XLOG_RECORD_CRC_OFFS;
|
|
// All records are aligned on 8-byte boundary, so their 8-byte frames
|
|
// cannot be split between pages. As xl_crc is the last field,
|
|
// its content is always on the same page.
|
|
const_assert!(XLOG_RECORD_CRC_OFFS % 8 == 4);
|
|
// We should always start reading aligned records even in incorrect WALs so if
|
|
// the condition is false it is likely a bug. However, it is localized somewhere
|
|
// in this function, hence we do not crash and just report failure instead.
|
|
ensure!(crc_offs % 8 == 4, "Record is not aligned properly (bug?)");
|
|
xl_crc = LittleEndian::read_u32(&buf[crc_offs..crc_offs + 4]);
|
|
trace!(
|
|
" reading xl_crc: [0x{:x}; 0x{:x}) = 0x{:x}",
|
|
crc_offs,
|
|
crc_offs + 4,
|
|
xl_crc
|
|
);
|
|
crc = crc32c_append(0, &buf[crc_offs + 4..page_offs + n]);
|
|
trace!(
|
|
" initializing crc: [0x{:x}; 0x{:x}); crc = 0x{:x}",
|
|
crc_offs + 4,
|
|
page_offs + n,
|
|
crc
|
|
);
|
|
} else if rec_offs > XLOG_RECORD_CRC_OFFS {
|
|
// As all records are 8-byte aligned, the header is already fully read and `crc` is initialized in the branch above.
|
|
ensure!(rec_offs >= XLOG_SIZE_OF_XLOG_RECORD);
|
|
let old_crc = crc;
|
|
crc = crc32c_append(crc, &buf[page_offs..page_offs + n]);
|
|
trace!(
|
|
" appending to crc: [0x{:x}; 0x{:x}); 0x{:x} --> 0x{:x}",
|
|
page_offs,
|
|
page_offs + n,
|
|
old_crc,
|
|
crc
|
|
);
|
|
} else {
|
|
// Correct because of the way conditions are written above.
|
|
assert!(rec_offs + n < XLOG_SIZE_OF_XLOG_RECORD);
|
|
// If `skipping_first_contrecord == true`, we may be reading from a middle of a record
|
|
// which started in the previous segment. Hence there is no point in validating the header.
|
|
if !skipping_first_contrecord && rec_offs + n > XLOG_RECORD_CRC_OFFS {
|
|
info!(
|
|
"Curiously corrupted WAL: a record stops inside the header; \
|
|
offs=0x{:x}, record continuation, pageleft={}, contlen={}",
|
|
offs, pageleft, contlen
|
|
);
|
|
break;
|
|
}
|
|
// Do nothing: we are still reading the header. It's accounted in CRC in the end of the record.
|
|
}
|
|
rec_offs += n;
|
|
offs += n;
|
|
contlen -= n;
|
|
|
|
if contlen == 0 {
|
|
trace!(" record completed at 0x{:x}", offs);
|
|
crc = crc32c_append(crc, &rec_hdr);
|
|
offs = (offs + 7) & !7; // pad on 8 bytes boundary */
|
|
trace!(
|
|
" padded offs to 0x{:x}, crc is {:x}, expected crc is {:x}",
|
|
offs,
|
|
crc,
|
|
xl_crc
|
|
);
|
|
if skipping_first_contrecord {
|
|
// do nothing, the flag will go down on next iteration when we're reading new record
|
|
trace!(" first conrecord has been just completed");
|
|
} else if crc == xl_crc {
|
|
// record is valid, advance the result to its end (with
|
|
// alignment to the next record taken into account)
|
|
trace!(
|
|
" updating last_valid_rec_pos: 0x{:x} --> 0x{:x}",
|
|
last_valid_rec_pos,
|
|
offs
|
|
);
|
|
last_valid_rec_pos = offs;
|
|
} else {
|
|
info!(
|
|
"CRC mismatch {} vs {} at {}",
|
|
crc, xl_crc, last_valid_rec_pos
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
trace!("last_valid_rec_pos=0x{:x}", last_valid_rec_pos);
|
|
Ok(last_valid_rec_pos as u32)
|
|
}
|
|
|
|
///
|
|
/// Scan a directory that contains PostgreSQL WAL files, for the end of WAL.
|
|
/// If precise, returns end LSN (next insertion point, basically);
|
|
/// otherwise, start of the last segment.
|
|
/// Returns (0, 0) if there is no WAL.
|
|
///
|
|
pub fn find_end_of_wal(
|
|
data_dir: &Path,
|
|
wal_seg_size: usize,
|
|
precise: bool,
|
|
start_lsn: Lsn, // start reading WAL at this point or later
|
|
) -> anyhow::Result<(XLogRecPtr, TimeLineID)> {
|
|
let mut high_segno: XLogSegNo = 0;
|
|
let mut high_tli: TimeLineID = 0;
|
|
let mut high_ispartial = false;
|
|
|
|
for entry in fs::read_dir(data_dir).unwrap().flatten() {
|
|
let ispartial: bool;
|
|
let entry_name = entry.file_name();
|
|
let fname = entry_name.to_str().unwrap();
|
|
/*
|
|
* Check if the filename looks like an xlog file, or a .partial file.
|
|
*/
|
|
if IsXLogFileName(fname) {
|
|
ispartial = false;
|
|
} else if IsPartialXLogFileName(fname) {
|
|
ispartial = true;
|
|
} else {
|
|
continue;
|
|
}
|
|
let (segno, tli) = XLogFromFileName(fname, wal_seg_size);
|
|
if !ispartial && entry.metadata().unwrap().len() != wal_seg_size as u64 {
|
|
continue;
|
|
}
|
|
if segno > high_segno
|
|
|| (segno == high_segno && tli > high_tli)
|
|
|| (segno == high_segno && tli == high_tli && high_ispartial && !ispartial)
|
|
{
|
|
high_segno = segno;
|
|
high_tli = tli;
|
|
high_ispartial = ispartial;
|
|
}
|
|
}
|
|
if high_segno > 0 {
|
|
let mut high_offs = 0;
|
|
/*
|
|
* Move the starting pointer to the start of the next segment, if the
|
|
* highest one we saw was completed.
|
|
*/
|
|
if !high_ispartial {
|
|
high_segno += 1;
|
|
} else if precise {
|
|
/* otherwise locate last record in last partial segment */
|
|
if start_lsn.segment_number(wal_seg_size) > high_segno {
|
|
bail!(
|
|
"provided start_lsn {:?} is beyond highest segno {:?} available",
|
|
start_lsn,
|
|
high_segno,
|
|
);
|
|
}
|
|
let start_offset = if start_lsn.segment_number(wal_seg_size) == high_segno {
|
|
start_lsn.segment_offset(wal_seg_size)
|
|
} else {
|
|
0
|
|
};
|
|
high_offs = find_end_of_wal_segment(
|
|
data_dir,
|
|
high_segno,
|
|
high_tli,
|
|
wal_seg_size,
|
|
start_offset,
|
|
)?;
|
|
}
|
|
let high_ptr = XLogSegNoOffsetToRecPtr(high_segno, high_offs, wal_seg_size);
|
|
return Ok((high_ptr, high_tli));
|
|
}
|
|
Ok((0, 0))
|
|
}
|
|
|
|
pub fn main() {
|
|
let mut data_dir = PathBuf::new();
|
|
data_dir.push(".");
|
|
let wal_seg_size = 16 * 1024 * 1024;
|
|
let (wal_end, tli) = find_end_of_wal(&data_dir, wal_seg_size, true, Lsn(0)).unwrap();
|
|
println!(
|
|
"wal_end={:>08X}{:>08X}, tli={}",
|
|
(wal_end >> 32) as u32,
|
|
wal_end as u32,
|
|
tli
|
|
);
|
|
}
|
|
|
|
impl XLogRecord {
|
|
pub fn from_slice(buf: &[u8]) -> Result<XLogRecord, DeserializeError> {
|
|
use utils::bin_ser::LeSer;
|
|
XLogRecord::des(buf)
|
|
}
|
|
|
|
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogRecord, DeserializeError> {
|
|
use utils::bin_ser::LeSer;
|
|
XLogRecord::des_from(&mut buf.reader())
|
|
}
|
|
|
|
pub fn encode(&self) -> Result<Bytes, SerializeError> {
|
|
use utils::bin_ser::LeSer;
|
|
Ok(self.ser()?.into())
|
|
}
|
|
|
|
// Is this record an XLOG_SWITCH record? They need some special processing,
|
|
pub fn is_xlog_switch_record(&self) -> bool {
|
|
self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
|
|
}
|
|
}
|
|
|
|
impl XLogPageHeaderData {
|
|
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogPageHeaderData, DeserializeError> {
|
|
use utils::bin_ser::LeSer;
|
|
XLogPageHeaderData::des_from(&mut buf.reader())
|
|
}
|
|
}
|
|
|
|
impl XLogLongPageHeaderData {
|
|
pub fn from_bytes<B: Buf>(buf: &mut B) -> Result<XLogLongPageHeaderData, DeserializeError> {
|
|
use utils::bin_ser::LeSer;
|
|
XLogLongPageHeaderData::des_from(&mut buf.reader())
|
|
}
|
|
|
|
pub fn encode(&self) -> Result<Bytes, SerializeError> {
|
|
use utils::bin_ser::LeSer;
|
|
self.ser().map(|b| b.into())
|
|
}
|
|
}
|
|
|
|
pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
|
|
|
|
impl CheckPoint {
|
|
pub fn encode(&self) -> Result<Bytes, SerializeError> {
|
|
use utils::bin_ser::LeSer;
|
|
Ok(self.ser()?.into())
|
|
}
|
|
|
|
pub fn decode(buf: &[u8]) -> Result<CheckPoint, DeserializeError> {
|
|
use utils::bin_ser::LeSer;
|
|
CheckPoint::des(buf)
|
|
}
|
|
|
|
/// Update next XID based on provided new_xid and stored epoch.
|
|
/// Next XID should be greater than new_xid. This handles 32-bit
|
|
/// XID wraparound correctly.
|
|
///
|
|
/// Returns 'true' if the XID was updated.
|
|
pub fn update_next_xid(&mut self, xid: u32) -> bool {
|
|
// nextXid should nw greater than any XID in WAL, so increment provided XID and check for wraparround.
|
|
let mut new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
|
|
// To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
|
|
// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
|
|
new_xid =
|
|
new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
|
|
let full_xid = self.nextXid.value;
|
|
let old_xid = full_xid as u32;
|
|
if new_xid.wrapping_sub(old_xid) as i32 > 0 {
|
|
let mut epoch = full_xid >> 32;
|
|
if new_xid < old_xid {
|
|
// wrap-around
|
|
epoch += 1;
|
|
}
|
|
let nextXid = (epoch << 32) | new_xid as u64;
|
|
|
|
if nextXid != self.nextXid.value {
|
|
self.nextXid = FullTransactionId { value: nextXid };
|
|
return true;
|
|
}
|
|
}
|
|
false
|
|
}
|
|
}
|
|
|
|
//
|
|
// Generate new, empty WAL segment.
|
|
// We need this segment to start compute node.
|
|
//
|
|
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, SerializeError> {
|
|
let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize);
|
|
|
|
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, pg_constants::WAL_SEGMENT_SIZE);
|
|
let hdr = XLogLongPageHeaderData {
|
|
std: {
|
|
XLogPageHeaderData {
|
|
xlp_magic: XLOG_PAGE_MAGIC as u16,
|
|
xlp_info: pg_constants::XLP_LONG_HEADER,
|
|
xlp_tli: PG_TLI,
|
|
xlp_pageaddr: pageaddr,
|
|
xlp_rem_len: 0,
|
|
..Default::default() // Put 0 in padding fields.
|
|
}
|
|
},
|
|
xlp_sysid: system_id,
|
|
xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32,
|
|
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
|
|
};
|
|
|
|
let hdr_bytes = hdr.encode()?;
|
|
seg_buf.extend_from_slice(&hdr_bytes);
|
|
|
|
//zero out the rest of the file
|
|
seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0);
|
|
Ok(seg_buf.freeze())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use regex::Regex;
|
|
use std::{env, str::FromStr};
|
|
|
|
fn init_logging() {
|
|
let _ = env_logger::Builder::from_env(
|
|
env_logger::Env::default()
|
|
.default_filter_or("wal_generate=info,postgres_ffi::xlog_utils=trace"),
|
|
)
|
|
.is_test(true)
|
|
.try_init();
|
|
}
|
|
|
|
fn test_end_of_wal(
|
|
test_name: &str,
|
|
generate_wal: impl Fn(&mut postgres::Client) -> anyhow::Result<postgres::types::PgLsn>,
|
|
expected_end_of_wal_non_partial: Lsn,
|
|
last_segment: &str,
|
|
) {
|
|
use wal_generate::*;
|
|
// 1. Generate some WAL
|
|
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
|
.join("..")
|
|
.join("..");
|
|
let cfg = Conf {
|
|
pg_distrib_dir: top_path.join("tmp_install"),
|
|
datadir: top_path.join(format!("test_output/{}", test_name)),
|
|
};
|
|
if cfg.datadir.exists() {
|
|
fs::remove_dir_all(&cfg.datadir).unwrap();
|
|
}
|
|
cfg.initdb().unwrap();
|
|
let mut srv = cfg.start_server().unwrap();
|
|
let expected_wal_end: Lsn =
|
|
u64::from(generate_wal(&mut srv.connect_with_timeout().unwrap()).unwrap()).into();
|
|
srv.kill();
|
|
|
|
// 2. Pick WAL generated by initdb
|
|
let wal_dir = cfg.datadir.join("pg_wal");
|
|
let wal_seg_size = 16 * 1024 * 1024;
|
|
|
|
// 3. Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
|
|
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap();
|
|
let wal_end = Lsn(wal_end);
|
|
info!(
|
|
"find_end_of_wal returned (wal_end={}, tli={})",
|
|
wal_end, tli
|
|
);
|
|
assert_eq!(wal_end, expected_end_of_wal_non_partial);
|
|
|
|
// 4. Get the actual end of WAL by pg_waldump
|
|
let waldump_output = cfg
|
|
.pg_waldump("000000010000000000000001", last_segment)
|
|
.unwrap()
|
|
.stderr;
|
|
let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
|
|
let caps = match Regex::new(r"invalid record length at (.+):")
|
|
.unwrap()
|
|
.captures(waldump_output)
|
|
{
|
|
Some(caps) => caps,
|
|
None => {
|
|
error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output);
|
|
panic!();
|
|
}
|
|
};
|
|
let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
|
|
info!(
|
|
"waldump erred on {}, expected wal end at {}",
|
|
waldump_wal_end, expected_wal_end
|
|
);
|
|
assert_eq!(waldump_wal_end, expected_wal_end);
|
|
|
|
// 5. Rename file to partial to actually find last valid lsn
|
|
fs::rename(
|
|
wal_dir.join(last_segment),
|
|
wal_dir.join(format!("{}.partial", last_segment)),
|
|
)
|
|
.unwrap();
|
|
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap();
|
|
let wal_end = Lsn(wal_end);
|
|
info!(
|
|
"find_end_of_wal returned (wal_end={}, tli={})",
|
|
wal_end, tli
|
|
);
|
|
assert_eq!(wal_end, waldump_wal_end);
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_find_end_of_wal_simple() {
|
|
init_logging();
|
|
test_end_of_wal(
|
|
"test_find_end_of_wal_simple",
|
|
wal_generate::generate_simple,
|
|
"0/2000000".parse::<Lsn>().unwrap(),
|
|
"000000010000000000000001",
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() {
|
|
init_logging();
|
|
test_end_of_wal(
|
|
"test_find_end_of_wal_crossing_segment_followed_by_small_one",
|
|
wal_generate::generate_wal_record_crossing_segment_followed_by_small_one,
|
|
"0/3000000".parse::<Lsn>().unwrap(),
|
|
"000000010000000000000002",
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
#[ignore = "not yet fixed, needs correct parsing of pre-last segments"] // TODO
|
|
pub fn test_find_end_of_wal_last_crossing_segment() {
|
|
init_logging();
|
|
test_end_of_wal(
|
|
"test_find_end_of_wal_last_crossing_segment",
|
|
wal_generate::generate_last_wal_record_crossing_segment,
|
|
"0/3000000".parse::<Lsn>().unwrap(),
|
|
"000000010000000000000002",
|
|
);
|
|
}
|
|
|
|
/// Check the math in update_next_xid
|
|
///
|
|
/// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL,
|
|
/// currently 1024.
|
|
#[test]
|
|
pub fn test_update_next_xid() {
|
|
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
|
|
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
|
|
|
|
checkpoint.nextXid = FullTransactionId { value: 10 };
|
|
assert_eq!(checkpoint.nextXid.value, 10);
|
|
|
|
// The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
|
|
// boundary
|
|
checkpoint.update_next_xid(100);
|
|
assert_eq!(checkpoint.nextXid.value, 1024);
|
|
|
|
// No change
|
|
checkpoint.update_next_xid(500);
|
|
assert_eq!(checkpoint.nextXid.value, 1024);
|
|
checkpoint.update_next_xid(1023);
|
|
assert_eq!(checkpoint.nextXid.value, 1024);
|
|
|
|
// The function returns the *next* XID, given the highest XID seen so
|
|
// far. So when we pass 1024, the nextXid gets bumped up to the next
|
|
// XID_CHECKPOINT_INTERVAL boundary.
|
|
checkpoint.update_next_xid(1024);
|
|
assert_eq!(checkpoint.nextXid.value, 2048);
|
|
}
|
|
}
|