|
|
|
|
@@ -8,6 +8,13 @@
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
use crate::pg_constants;
|
|
|
|
|
use crate::CheckPoint;
|
|
|
|
|
use crate::FullTransactionId;
|
|
|
|
|
use crate::XLogLongPageHeaderData;
|
|
|
|
|
use crate::XLogPageHeaderData;
|
|
|
|
|
use crate::XLogRecord;
|
|
|
|
|
|
|
|
|
|
use crate::XLOG_PAGE_MAGIC;
|
|
|
|
|
use byteorder::{ByteOrder, LittleEndian};
|
|
|
|
|
use bytes::{Buf, Bytes};
|
|
|
|
|
use crc32c::*;
|
|
|
|
|
@@ -21,14 +28,14 @@ use std::time::SystemTime;
|
|
|
|
|
pub const XLOG_FNAME_LEN: usize = 24;
|
|
|
|
|
pub const XLOG_BLCKSZ: usize = 8192;
|
|
|
|
|
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
|
|
|
|
|
pub const XLOG_PAGE_MAGIC: u16 = 0xD109;
|
|
|
|
|
pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8;
|
|
|
|
|
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = XLP_REM_LEN_OFFS + 4 + 4;
|
|
|
|
|
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = XLOG_SIZE_OF_XLOG_SHORT_PHD + 8 + 4 + 4;
|
|
|
|
|
pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2;
|
|
|
|
|
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = XLOG_RECORD_CRC_OFFS + 4;
|
|
|
|
|
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
|
|
|
|
|
|
|
|
|
|
pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::<XLogPageHeaderData>();
|
|
|
|
|
pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::<XLogLongPageHeaderData>();
|
|
|
|
|
pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
|
|
|
|
|
|
|
|
|
|
pub type XLogRecPtr = u64;
|
|
|
|
|
pub type TimeLineID = u32;
|
|
|
|
|
pub type TimestampTz = u64;
|
|
|
|
|
@@ -120,7 +127,7 @@ fn find_end_of_wal_segment(
|
|
|
|
|
let xlp_magic = LittleEndian::read_u16(&buf[0..2]);
|
|
|
|
|
let xlp_info = LittleEndian::read_u16(&buf[2..4]);
|
|
|
|
|
let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]);
|
|
|
|
|
if xlp_magic != XLOG_PAGE_MAGIC {
|
|
|
|
|
if xlp_magic != XLOG_PAGE_MAGIC as u16 {
|
|
|
|
|
info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@@ -257,21 +264,6 @@ pub fn main() {
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// Xlog record parsing routines
|
|
|
|
|
// TODO move here other related code from waldecoder.rs
|
|
|
|
|
//
|
|
|
|
|
#[repr(C)]
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct XLogRecord {
|
|
|
|
|
pub xl_tot_len: u32,
|
|
|
|
|
pub xl_xid: u32,
|
|
|
|
|
pub xl_prev: u64,
|
|
|
|
|
pub xl_info: u8,
|
|
|
|
|
pub xl_rmid: u8,
|
|
|
|
|
pub xl_crc: u32,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl XLogRecord {
|
|
|
|
|
pub fn from_bytes(buf: &mut Bytes) -> XLogRecord {
|
|
|
|
|
XLogRecord {
|
|
|
|
|
@@ -287,46 +279,32 @@ impl XLogRecord {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn encode(&self) -> Bytes {
|
|
|
|
|
let b: [u8; XLOG_SIZE_OF_XLOG_RECORD];
|
|
|
|
|
b = unsafe { std::mem::transmute::<XLogRecord, [u8; XLOG_SIZE_OF_XLOG_RECORD]>(*self) };
|
|
|
|
|
Bytes::copy_from_slice(&b[..])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Is this record an XLOG_SWITCH record? They need some special processing,
|
|
|
|
|
pub fn is_xlog_switch_record(&self) -> bool {
|
|
|
|
|
self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[repr(C)]
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct XLogPageHeaderData {
|
|
|
|
|
pub xlp_magic: u16, /* magic value for correctness checks */
|
|
|
|
|
pub xlp_info: u16, /* flag bits, see below */
|
|
|
|
|
pub xlp_tli: u32, /* TimeLineID of first record on page */
|
|
|
|
|
pub xlp_pageaddr: u64, /* XLOG address of this page */
|
|
|
|
|
pub xlp_rem_len: u32, /* total len of remaining data for record */
|
|
|
|
|
padding: u32, /* Add explicit padding */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl XLogPageHeaderData {
|
|
|
|
|
pub fn from_bytes<B: Buf>(buf: &mut B) -> XLogPageHeaderData {
|
|
|
|
|
XLogPageHeaderData {
|
|
|
|
|
let hdr: XLogPageHeaderData = XLogPageHeaderData {
|
|
|
|
|
xlp_magic: buf.get_u16_le(),
|
|
|
|
|
xlp_info: buf.get_u16_le(),
|
|
|
|
|
xlp_tli: buf.get_u32_le(),
|
|
|
|
|
xlp_pageaddr: buf.get_u64_le(),
|
|
|
|
|
xlp_rem_len: buf.get_u32_le(),
|
|
|
|
|
padding: buf.get_u32_le(),
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
buf.get_u32_le(); //padding
|
|
|
|
|
hdr
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[repr(C)]
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct XLogLongPageHeaderData {
|
|
|
|
|
pub std: XLogPageHeaderData, /* standard header fields */
|
|
|
|
|
pub xlp_sysid: u64, /* system identifier from pg_control */
|
|
|
|
|
pub xlp_seg_size: u32, /* just as a cross-check */
|
|
|
|
|
pub xlp_xlog_blcksz: u32, /* just as a cross-check */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl XLogLongPageHeaderData {
|
|
|
|
|
pub fn from_bytes<B: Buf>(buf: &mut B) -> XLogLongPageHeaderData {
|
|
|
|
|
XLogLongPageHeaderData {
|
|
|
|
|
@@ -336,4 +314,72 @@ impl XLogLongPageHeaderData {
|
|
|
|
|
xlp_xlog_blcksz: buf.get_u32_le(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn encode(&self) -> Bytes {
|
|
|
|
|
let b: [u8; XLOG_SIZE_OF_XLOG_LONG_PHD];
|
|
|
|
|
b = unsafe {
|
|
|
|
|
std::mem::transmute::<XLogLongPageHeaderData, [u8; XLOG_SIZE_OF_XLOG_LONG_PHD]>(*self)
|
|
|
|
|
};
|
|
|
|
|
Bytes::copy_from_slice(&b[..])
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::<CheckPoint>();
|
|
|
|
|
|
|
|
|
|
impl CheckPoint {
|
|
|
|
|
pub fn new(lsn: u64, timeline: u32) -> CheckPoint {
|
|
|
|
|
CheckPoint {
|
|
|
|
|
redo: lsn,
|
|
|
|
|
ThisTimeLineID: timeline,
|
|
|
|
|
PrevTimeLineID: timeline,
|
|
|
|
|
fullPageWrites: true, // TODO: get actual value of full_page_writes
|
|
|
|
|
nextXid: FullTransactionId {
|
|
|
|
|
value: pg_constants::FIRST_NORMAL_TRANSACTION_ID as u64,
|
|
|
|
|
}, // TODO: handle epoch?
|
|
|
|
|
nextOid: pg_constants::FIRST_BOOTSTRAP_OBJECT_ID,
|
|
|
|
|
nextMulti: 1,
|
|
|
|
|
nextMultiOffset: 0,
|
|
|
|
|
oldestXid: pg_constants::FIRST_NORMAL_TRANSACTION_ID,
|
|
|
|
|
oldestXidDB: 0,
|
|
|
|
|
oldestMulti: 1,
|
|
|
|
|
oldestMultiDB: 0,
|
|
|
|
|
time: 0,
|
|
|
|
|
oldestCommitTsXid: 0,
|
|
|
|
|
newestCommitTsXid: 0,
|
|
|
|
|
oldestActiveXid: pg_constants::INVALID_TRANSACTION_ID,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn encode(&self) -> Bytes {
|
|
|
|
|
let b: [u8; SIZEOF_CHECKPOINT];
|
|
|
|
|
b = unsafe { std::mem::transmute::<CheckPoint, [u8; SIZEOF_CHECKPOINT]>(*self) };
|
|
|
|
|
Bytes::copy_from_slice(&b[..])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn decode(buf: &[u8]) -> Result<CheckPoint, anyhow::Error> {
|
|
|
|
|
let mut b = [0u8; SIZEOF_CHECKPOINT];
|
|
|
|
|
b.copy_from_slice(&buf[0..SIZEOF_CHECKPOINT]);
|
|
|
|
|
let checkpoint: CheckPoint;
|
|
|
|
|
checkpoint = unsafe { std::mem::transmute::<[u8; SIZEOF_CHECKPOINT], CheckPoint>(b) };
|
|
|
|
|
Ok(checkpoint)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update next XID based on provided new_xid and stored epoch.
|
|
|
|
|
// Next XID should be greater than new_xid.
|
|
|
|
|
// Also take in account 32-bit wrap-around.
|
|
|
|
|
pub fn update_next_xid(&mut self, xid: u32) {
|
|
|
|
|
let full_xid = self.nextXid.value;
|
|
|
|
|
let new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
|
|
|
|
|
let old_xid = full_xid as u32;
|
|
|
|
|
if new_xid.wrapping_sub(old_xid) as i32 > 0 {
|
|
|
|
|
let mut epoch = full_xid >> 32;
|
|
|
|
|
if new_xid < old_xid {
|
|
|
|
|
// wrap-around
|
|
|
|
|
epoch += 1;
|
|
|
|
|
}
|
|
|
|
|
self.nextXid = FullTransactionId {
|
|
|
|
|
value: (epoch << 32) | new_xid as u64,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|