refactor XLogRecord reading

This commit is contained in:
anastasia
2021-04-28 00:34:48 +03:00
committed by lubennikovaav
parent d311f708b6
commit ef37eb96b9
3 changed files with 67 additions and 55 deletions

View File

@@ -1,6 +1,7 @@
use crate::pg_constants;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::xlog_utils;
use std::cmp::min;
use std::str;
use thiserror::Error;
@@ -336,16 +337,8 @@ pub struct DecodedWALRecord {
fn is_xlog_switch_record(rec: &Bytes) -> bool {
let mut buf = rec.clone();
// FIXME: assume little-endian here
let _xl_tot_len = buf.get_u32_le();
let _xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
xl_info == pg_constants::XLOG_SWITCH && xl_rmid == pg_constants::RM_XLOG_ID
let xlogrec = xlog_utils::parse_xlog_record(&mut buf);
xlogrec.xl_info == pg_constants::XLOG_SWITCH && xlogrec.xl_rmid == pg_constants::RM_XLOG_ID
}
pub type Oid = u32;
@@ -438,21 +431,15 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
// 1. Parse XLogRecord struct
// FIXME: assume little-endian here
let xl_tot_len = buf.get_u32_le();
let xl_xid = buf.get_u32_le();
let xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
let xlogrec = xlog_utils::parse_xlog_record(&mut buf);
trace!(
"decode_wal_record xl_rmid = {} xl_info = {}",
xl_rmid,
xl_info
xlogrec.xl_rmid,
xlogrec.xl_info
);
let remaining = xl_tot_len - SizeOfXLogRecord;
let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord;
if buf.remaining() != remaining as usize {
//TODO error
@@ -651,7 +638,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
// We don't need them, so just skip blocks_total_len bytes
buf.advance(blocks_total_len as usize);
let main_data_offset = (xl_tot_len - main_data_len) as usize;
let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize;
// 4. Decode main_data
if main_data_len > 0 {
@@ -659,23 +646,23 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
//5. Handle special CLOG and XACT records
if xl_rmid == pg_constants::RM_CLOG_ID {
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.blkno = buf.get_i32_le() as u32;
trace!("RM_CLOG_ID updates block {}", blk.blkno);
blocks.push(blk);
} else if xl_rmid == pg_constants::RM_XACT_ID {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xl_info, (xl_prev >> 32),
xl_prev & 0xffffffff,
xl_xid,
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blk.blkno,
main_data_len
);
@@ -686,7 +673,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
}
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
@@ -738,12 +725,12 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
} else if info == pg_constants::XLOG_XACT_ABORT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
xl_info, (xl_prev >> 32),
xl_prev & 0xffffffff,
xl_xid,
xlogrec.xl_info, (xlogrec.xl_prev >> 32),
xlogrec.xl_prev & 0xffffffff,
xlogrec.xl_xid,
blk.blkno,
main_data_len
);
@@ -753,7 +740,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
}
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
@@ -795,8 +782,8 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
}
}
} else if xl_rmid == pg_constants::RM_DBASE_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::XLOG_DBASE_CREATE {
//buf points to main_data
let db_id = buf.get_u32_le();
@@ -815,8 +802,8 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
} else {
trace!("XLOG_DBASE_DROP is not handled yet");
}
} else if xl_rmid == pg_constants::RM_TBLSPC_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
} else if xlogrec.xl_rmid == pg_constants::RM_TBLSPC_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::XLOG_TBLSPC_CREATE {
//buf points to main_data
let ts_id = buf.get_u32_le();
@@ -828,8 +815,8 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
DecodedWALRecord {
xl_info,
xl_rmid,
xl_info: xlogrec.xl_info,
xl_rmid: xlogrec.xl_rmid,
record,
blocks,
main_data_offset,

View File

@@ -38,6 +38,7 @@ use crate::page_cache::BufferTag;
use crate::page_cache::WALRecord;
use crate::ZTimelineId;
use crate::{pg_constants, PageServerConf};
use postgres_ffi::xlog_utils;
static TIMEOUT: Duration = Duration::from_secs(20);
@@ -242,13 +243,7 @@ impl WalRedoManagerInternal {
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let _xl_tot_len = buf.get_u32_le();
let xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
let xlogrec = xlog_utils::parse_xlog_record(&mut buf);
//move to main data
// TODO probably, we should store some records in our special format
@@ -258,21 +253,21 @@ impl WalRedoManagerInternal {
buf.advance(skip);
}
if xl_rmid == pg_constants::RM_CLOG_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
page.clone_from_slice(zero_page_bytes);
}
} else if xl_rmid == pg_constants::RM_XACT_ID {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
if info == pg_constants::XLOG_XACT_COMMIT {
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
self.transaction_id_set_status_bit(xl_xid, status, &mut page);
self.transaction_id_set_status_bit(xlogrec.xl_xid, status, &mut page);
//handle subtrans
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
let _dbid = buf.get_u32_le();
@@ -294,11 +289,11 @@ impl WalRedoManagerInternal {
}
} else if info == pg_constants::XLOG_XACT_ABORT {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
self.transaction_id_set_status_bit(xl_xid, status, &mut page);
self.transaction_id_set_status_bit(xlogrec.xl_xid, status, &mut page);
//handle subtrans
let _xact_time = buf.get_i64_le();
let mut xinfo = 0;
if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
let _dbid = buf.get_u32_le();

View File

@@ -8,6 +8,7 @@
//
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes};
use crc32c::*;
use log::*;
use std::cmp::min;
@@ -264,3 +265,32 @@ pub fn main() {
tli
);
}
//
// Xlog record parsing routines
// TODO move here other related code from waldecoder.rs
//
#[repr(C)]
#[derive(Debug)]
pub struct XLogRecord {
pub xl_tot_len: u32,
pub xl_xid: u32,
pub xl_prev: u64,
pub xl_info: u8,
pub xl_rmid: u8,
pub xl_crc: u32,
}
pub fn parse_xlog_record(buf: &mut Bytes) -> XLogRecord {
XLogRecord {
xl_tot_len: buf.get_u32_le(),
xl_xid: buf.get_u32_le(),
xl_prev: buf.get_u64_le(),
xl_info: buf.get_u8(),
xl_rmid: buf.get_u8(),
xl_crc: {
buf.advance(2);
buf.get_u32_le()
},
}
}