From ef37eb96b94cec0882d63d8ed5bb64afbcfc2b5f Mon Sep 17 00:00:00 2001 From: anastasia Date: Wed, 28 Apr 2021 00:34:48 +0300 Subject: [PATCH] refactor XLogRecord reading --- pageserver/src/waldecoder.rs | 67 ++++++++++++++-------------------- pageserver/src/walredo.rs | 25 +++++-------- postgres_ffi/src/xlog_utils.rs | 30 +++++++++++++++ 3 files changed, 67 insertions(+), 55 deletions(-) diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index c2a396ae89..4327d1763e 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -1,6 +1,7 @@ use crate::pg_constants; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; +use postgres_ffi::xlog_utils; use std::cmp::min; use std::str; use thiserror::Error; @@ -336,16 +337,8 @@ pub struct DecodedWALRecord { fn is_xlog_switch_record(rec: &Bytes) -> bool { let mut buf = rec.clone(); - // FIXME: assume little-endian here - let _xl_tot_len = buf.get_u32_le(); - let _xl_xid = buf.get_u32_le(); - let _xl_prev = buf.get_u64_le(); - let xl_info = buf.get_u8(); - let xl_rmid = buf.get_u8(); - buf.advance(2); // 2 bytes of padding - let _xl_crc = buf.get_u32_le(); - - xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID + let xlogrec = xlog_utils::parse_xlog_record(&mut buf); + xlogrec.xl_info == pg_constants::XLOG_SWITCH && xlogrec.xl_rmid == pg_constants::RM_XLOG_ID } pub type Oid = u32; @@ -438,21 +431,15 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { // 1. Parse XLogRecord struct // FIXME: assume little-endian here - let xl_tot_len = buf.get_u32_le(); - let xl_xid = buf.get_u32_le(); - let xl_prev = buf.get_u64_le(); - let xl_info = buf.get_u8(); - let xl_rmid = buf.get_u8(); - buf.advance(2); // 2 bytes of padding - let _xl_crc = buf.get_u32_le(); + let xlogrec = xlog_utils::parse_xlog_record(&mut buf); trace!( "decode_wal_record xl_rmid = {} xl_info = {}", - xl_rmid, - xl_info + xlogrec.xl_rmid, + xlogrec.xl_info ); - let remaining = xl_tot_len - SizeOfXLogRecord; + let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord; if buf.remaining() != remaining as usize { //TODO error @@ -651,7 +638,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { // We don't need them, so just skip blocks_total_len bytes buf.advance(blocks_total_len as usize); - let main_data_offset = (xl_tot_len - main_data_len) as usize; + let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize; // 4. Decode main_data if main_data_len > 0 { @@ -659,23 +646,23 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } //5. Handle special CLOG and XACT records - if xl_rmid == pg_constants::RM_CLOG_ID { + if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; blk.blkno = buf.get_i32_le() as u32; trace!("RM_CLOG_ID updates block {}", blk.blkno); blocks.push(blk); - } else if xl_rmid == pg_constants::RM_XACT_ID { - let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_XACT_COMMIT { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; - blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; trace!( "XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", - xl_info, (xl_prev >> 32), - xl_prev & 0xffffffff, - xl_xid, + xlogrec.xl_info, (xlogrec.xl_prev >> 32), + xlogrec.xl_prev & 0xffffffff, + xlogrec.xl_xid, blk.blkno, main_data_len ); @@ -686,7 +673,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let _xact_time = buf.get_i64_le(); let mut xinfo = 0; - if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { xinfo = buf.get_u32_le(); } if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { @@ -738,12 +725,12 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } else if info == pg_constants::XLOG_XACT_ABORT { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; - blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; + blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; trace!( "XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", - xl_info, (xl_prev >> 32), - xl_prev & 0xffffffff, - xl_xid, + xlogrec.xl_info, (xlogrec.xl_prev >> 32), + xlogrec.xl_prev & 0xffffffff, + xlogrec.xl_xid, blk.blkno, main_data_len ); @@ -753,7 +740,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let _xact_time = buf.get_i64_le(); let mut xinfo = 0; - if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { xinfo = buf.get_u32_le(); } if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { @@ -795,8 +782,8 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE"); } } - } else if xl_rmid == pg_constants::RM_DBASE_ID { - let info = xl_info & !pg_constants::XLR_INFO_MASK; + } else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID { + let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::XLOG_DBASE_CREATE { //buf points to main_data let db_id = buf.get_u32_le(); @@ -815,8 +802,8 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } else { trace!("XLOG_DBASE_DROP is not handled yet"); } - } else if xl_rmid == pg_constants::RM_TBLSPC_ID { - let info = xl_info & !pg_constants::XLR_INFO_MASK; + } else if xlogrec.xl_rmid == pg_constants::RM_TBLSPC_ID { + let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::XLOG_TBLSPC_CREATE { //buf points to main_data let ts_id = buf.get_u32_le(); @@ -828,8 +815,8 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } DecodedWALRecord { - xl_info, - xl_rmid, + xl_info: xlogrec.xl_info, + xl_rmid: xlogrec.xl_rmid, record, blocks, main_data_offset, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 7ddc120abe..1df386e447 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -38,6 +38,7 @@ use crate::page_cache::BufferTag; use crate::page_cache::WALRecord; use crate::ZTimelineId; use crate::{pg_constants, PageServerConf}; +use postgres_ffi::xlog_utils; static TIMEOUT: Duration = Duration::from_secs(20); @@ -242,13 +243,7 @@ impl WalRedoManagerInternal { // 1. Parse XLogRecord struct // FIXME: refactor to avoid code duplication. - let _xl_tot_len = buf.get_u32_le(); - let xl_xid = buf.get_u32_le(); - let _xl_prev = buf.get_u64_le(); - let xl_info = buf.get_u8(); - let xl_rmid = buf.get_u8(); - buf.advance(2); // 2 bytes of padding - let _xl_crc = buf.get_u32_le(); + let xlogrec = xlog_utils::parse_xlog_record(&mut buf); //move to main data // TODO probably, we should store some records in our special format @@ -258,21 +253,21 @@ impl WalRedoManagerInternal { buf.advance(skip); } - if xl_rmid == pg_constants::RM_CLOG_ID { - let info = xl_info & !pg_constants::XLR_INFO_MASK; + if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { + let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::CLOG_ZEROPAGE { page.clone_from_slice(zero_page_bytes); } - } else if xl_rmid == pg_constants::RM_XACT_ID { - let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; let mut status = 0; if info == pg_constants::XLOG_XACT_COMMIT { status = pg_constants::TRANSACTION_STATUS_COMMITTED; - self.transaction_id_set_status_bit(xl_xid, status, &mut page); + self.transaction_id_set_status_bit(xlogrec.xl_xid, status, &mut page); //handle subtrans let _xact_time = buf.get_i64_le(); let mut xinfo = 0; - if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { xinfo = buf.get_u32_le(); if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { let _dbid = buf.get_u32_le(); @@ -294,11 +289,11 @@ impl WalRedoManagerInternal { } } else if info == pg_constants::XLOG_XACT_ABORT { status = pg_constants::TRANSACTION_STATUS_ABORTED; - self.transaction_id_set_status_bit(xl_xid, status, &mut page); + self.transaction_id_set_status_bit(xlogrec.xl_xid, status, &mut page); //handle subtrans let _xact_time = buf.get_i64_le(); let mut xinfo = 0; - if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { xinfo = buf.get_u32_le(); if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { let _dbid = buf.get_u32_le(); diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 2ead4f9718..a2ccfb64e3 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -8,6 +8,7 @@ // use byteorder::{ByteOrder, LittleEndian}; +use bytes::{Buf, Bytes}; use crc32c::*; use log::*; use std::cmp::min; @@ -264,3 +265,32 @@ pub fn main() { tli ); } + +// +// Xlog record parsing routines +// TODO move here other related code from waldecoder.rs +// +#[repr(C)] +#[derive(Debug)] +pub struct XLogRecord { + pub xl_tot_len: u32, + pub xl_xid: u32, + pub xl_prev: u64, + pub xl_info: u8, + pub xl_rmid: u8, + pub xl_crc: u32, +} + +pub fn parse_xlog_record(buf: &mut Bytes) -> XLogRecord { + XLogRecord { + xl_tot_len: buf.get_u32_le(), + xl_xid: buf.get_u32_le(), + xl_prev: buf.get_u64_le(), + xl_info: buf.get_u8(), + xl_rmid: buf.get_u8(), + xl_crc: { + buf.advance(2); + buf.get_u32_le() + }, + } +}