From 0969574d486dc06c31dda9aba7d8d42555a918ee Mon Sep 17 00:00:00 2001 From: anastasia Date: Tue, 8 Jun 2021 17:57:05 +0300 Subject: [PATCH] Use bindgen for various xlog structures and checkpoint. Implement encode/decode methods for them. Some methods are unused now. This is a preparatory commit for nonrel_wal --- pageserver/src/waldecoder.rs | 4 + postgres_ffi/build.rs | 7 ++ postgres_ffi/pg_control_ffi.h | 2 + postgres_ffi/src/pg_constants.rs | 6 ++ postgres_ffi/src/xlog_utils.rs | 134 +++++++++++++++++++++---------- 5 files changed, 109 insertions(+), 44 deletions(-) diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 3d589b89c0..2fbb0af95f 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -6,6 +6,10 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::*; +use postgres_ffi::XLogLongPageHeaderData; +use postgres_ffi::XLogPageHeaderData; +use postgres_ffi::XLogRecord; + use std::cmp::min; use std::str; use thiserror::Error; diff --git a/postgres_ffi/build.rs b/postgres_ffi/build.rs index 7207ca67bd..a29f2bbb6d 100644 --- a/postgres_ffi/build.rs +++ b/postgres_ffi/build.rs @@ -24,7 +24,14 @@ fn main() { // These are the types and constants that we want to generate bindings for // .whitelist_type("ControlFileData") + .whitelist_type("CheckPoint") + .whitelist_type("FullTransactionId") + .whitelist_type("XLogRecord") + .whitelist_type("XLogPageHeaderData") + .whitelist_type("XLogLongPageHeaderData") + .whitelist_var("XLOG_PAGE_MAGIC") .whitelist_var("PG_CONTROL_FILE_SIZE") + .whitelist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC") .whitelist_type("DBState") // // Path the server include dir. It is in tmp_install/include/server, if you did diff --git a/postgres_ffi/pg_control_ffi.h b/postgres_ffi/pg_control_ffi.h index d6f13be0d0..790accf7f0 100644 --- a/postgres_ffi/pg_control_ffi.h +++ b/postgres_ffi/pg_control_ffi.h @@ -6,3 +6,5 @@ */ #include "c.h" #include "catalog/pg_control.h" +#include "access/xlog_internal.h" + diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 760348318d..d8223d352f 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -131,5 +131,11 @@ pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ +/* From transam.h */ +pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3; +pub const INVALID_TRANSACTION_ID: u32 = 0; +pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000; +pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; + /* FIXME: pageserver should request wal_seg_size from compute node */ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 5d1ed5b456..aabbf8628b 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -8,6 +8,13 @@ // use crate::pg_constants; +use crate::CheckPoint; +use crate::FullTransactionId; +use crate::XLogLongPageHeaderData; +use crate::XLogPageHeaderData; +use crate::XLogRecord; + +use crate::XLOG_PAGE_MAGIC; use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, Bytes}; use crc32c::*; @@ -21,14 +28,14 @@ use std::time::SystemTime; pub const XLOG_FNAME_LEN: usize = 24; pub const XLOG_BLCKSZ: usize = 8192; pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001; -pub const XLOG_PAGE_MAGIC: u16 = 0xD109; pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8; -pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = XLP_REM_LEN_OFFS + 4 + 4; -pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = XLOG_SIZE_OF_XLOG_SHORT_PHD + 8 + 4 + 4; pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2; -pub const XLOG_SIZE_OF_XLOG_RECORD: usize = XLOG_RECORD_CRC_OFFS + 4; pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; +pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::(); +pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::(); +pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::(); + pub type XLogRecPtr = u64; pub type TimeLineID = u32; pub type TimestampTz = u64; @@ -120,7 +127,7 @@ fn find_end_of_wal_segment( let xlp_magic = LittleEndian::read_u16(&buf[0..2]); let xlp_info = LittleEndian::read_u16(&buf[2..4]); let xlp_rem_len = LittleEndian::read_u32(&buf[XLP_REM_LEN_OFFS..XLP_REM_LEN_OFFS + 4]); - if xlp_magic != XLOG_PAGE_MAGIC { + if xlp_magic != XLOG_PAGE_MAGIC as u16 { info!("Invalid WAL file {}.partial magic {}", file_name, xlp_magic); break; } @@ -257,21 +264,6 @@ pub fn main() { ); } -// -// Xlog record parsing routines -// TODO move here other related code from waldecoder.rs -// -#[repr(C)] -#[derive(Debug)] -pub struct XLogRecord { - pub xl_tot_len: u32, - pub xl_xid: u32, - pub xl_prev: u64, - pub xl_info: u8, - pub xl_rmid: u8, - pub xl_crc: u32, -} - impl XLogRecord { pub fn from_bytes(buf: &mut Bytes) -> XLogRecord { XLogRecord { @@ -287,46 +279,32 @@ impl XLogRecord { } } + pub fn encode(&self) -> Bytes { + let b: [u8; XLOG_SIZE_OF_XLOG_RECORD]; + b = unsafe { std::mem::transmute::(*self) }; + Bytes::copy_from_slice(&b[..]) + } + // Is this record an XLOG_SWITCH record? They need some special processing, pub fn is_xlog_switch_record(&self) -> bool { self.xl_info == pg_constants::XLOG_SWITCH && self.xl_rmid == pg_constants::RM_XLOG_ID } } -#[repr(C)] -#[derive(Debug)] -pub struct XLogPageHeaderData { - pub xlp_magic: u16, /* magic value for correctness checks */ - pub xlp_info: u16, /* flag bits, see below */ - pub xlp_tli: u32, /* TimeLineID of first record on page */ - pub xlp_pageaddr: u64, /* XLOG address of this page */ - pub xlp_rem_len: u32, /* total len of remaining data for record */ - padding: u32, /* Add explicit padding */ -} - impl XLogPageHeaderData { pub fn from_bytes(buf: &mut B) -> XLogPageHeaderData { - XLogPageHeaderData { + let hdr: XLogPageHeaderData = XLogPageHeaderData { xlp_magic: buf.get_u16_le(), xlp_info: buf.get_u16_le(), xlp_tli: buf.get_u32_le(), xlp_pageaddr: buf.get_u64_le(), xlp_rem_len: buf.get_u32_le(), - padding: buf.get_u32_le(), - } + }; + buf.get_u32_le(); //padding + hdr } } - -#[repr(C)] -#[derive(Debug)] -pub struct XLogLongPageHeaderData { - pub std: XLogPageHeaderData, /* standard header fields */ - pub xlp_sysid: u64, /* system identifier from pg_control */ - pub xlp_seg_size: u32, /* just as a cross-check */ - pub xlp_xlog_blcksz: u32, /* just as a cross-check */ -} - impl XLogLongPageHeaderData { pub fn from_bytes(buf: &mut B) -> XLogLongPageHeaderData { XLogLongPageHeaderData { @@ -336,4 +314,72 @@ impl XLogLongPageHeaderData { xlp_xlog_blcksz: buf.get_u32_le(), } } + + pub fn encode(&self) -> Bytes { + let b: [u8; XLOG_SIZE_OF_XLOG_LONG_PHD]; + b = unsafe { + std::mem::transmute::(*self) + }; + Bytes::copy_from_slice(&b[..]) + } +} + +pub const SIZEOF_CHECKPOINT: usize = std::mem::size_of::(); + +impl CheckPoint { + pub fn new(lsn: u64, timeline: u32) -> CheckPoint { + CheckPoint { + redo: lsn, + ThisTimeLineID: timeline, + PrevTimeLineID: timeline, + fullPageWrites: true, // TODO: get actual value of full_page_writes + nextXid: FullTransactionId { + value: pg_constants::FIRST_NORMAL_TRANSACTION_ID as u64, + }, // TODO: handle epoch? + nextOid: pg_constants::FIRST_BOOTSTRAP_OBJECT_ID, + nextMulti: 1, + nextMultiOffset: 0, + oldestXid: pg_constants::FIRST_NORMAL_TRANSACTION_ID, + oldestXidDB: 0, + oldestMulti: 1, + oldestMultiDB: 0, + time: 0, + oldestCommitTsXid: 0, + newestCommitTsXid: 0, + oldestActiveXid: pg_constants::INVALID_TRANSACTION_ID, + } + } + + pub fn encode(&self) -> Bytes { + let b: [u8; SIZEOF_CHECKPOINT]; + b = unsafe { std::mem::transmute::(*self) }; + Bytes::copy_from_slice(&b[..]) + } + + pub fn decode(buf: &[u8]) -> Result { + let mut b = [0u8; SIZEOF_CHECKPOINT]; + b.copy_from_slice(&buf[0..SIZEOF_CHECKPOINT]); + let checkpoint: CheckPoint; + checkpoint = unsafe { std::mem::transmute::<[u8; SIZEOF_CHECKPOINT], CheckPoint>(b) }; + Ok(checkpoint) + } + + // Update next XID based on provided new_xid and stored epoch. + // Next XID should be greater than new_xid. + // Also take in account 32-bit wrap-around. + pub fn update_next_xid(&mut self, xid: u32) { + let full_xid = self.nextXid.value; + let new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID); + let old_xid = full_xid as u32; + if new_xid.wrapping_sub(old_xid) as i32 > 0 { + let mut epoch = full_xid >> 32; + if new_xid < old_xid { + // wrap-around + epoch += 1; + } + self.nextXid = FullTransactionId { + value: (epoch << 32) | new_xid as u64, + }; + } + } }