diff --git a/pageserver/src/pg_constants.rs b/pageserver/src/pg_constants.rs index 60a455b41e..b2a54bc78b 100644 --- a/pageserver/src/pg_constants.rs +++ b/pageserver/src/pg_constants.rs @@ -38,9 +38,14 @@ pub const XLOG_XACT_HAS_INFO: u8 = 0x80; * The following flags, stored in xinfo, determine which information is * contained in commit/abort records. */ -pub const XACT_XINFO_HAS_DBINFO: u32 = 1; -pub const XACT_XINFO_HAS_SUBXACTS: u32 = 2; -pub const XACT_XINFO_HAS_RELFILENODES: u32 = 4; +pub const XACT_XINFO_HAS_DBINFO: u32 = 1u32 << 0; +pub const XACT_XINFO_HAS_SUBXACTS: u32 = 1u32 << 1; +pub const XACT_XINFO_HAS_RELFILENODES: u32 = 1u32 << 2; +pub const XACT_XINFO_HAS_INVALS: u32 = 1u32 << 3; +pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; +// pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5; +// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6; +// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; // From pg_control.h and rmgrlist.h pub const XLOG_SWITCH: u8 = 0x40; @@ -63,3 +68,5 @@ pub const XLOG_DBASE_DROP: u8 = 0x10; pub const XLOG_TBLSPC_CREATE: u8 = 0x00; pub const XLOG_TBLSPC_DROP: u8 = 0x10; + +pub const SIZEOF_XLOGRECORD: u32 = 24; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 33e16ca336..c2a396ae89 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -672,27 +672,128 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; trace!( - "XLOG_XACT_COMMIT xl_prev {:X}/{:X} xid {} updates block {}", - (xl_prev >> 32), + "XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", + xl_info, (xl_prev >> 32), xl_prev & 0xffffffff, xl_xid, - blk.blkno + blk.blkno, + main_data_len ); blocks.push(blk); - //TODO parse commit record to extract subtrans entries + + //parse commit record to extract subtrans entries + // xl_xact_commit starts with time of commit + let _xact_time = buf.get_i64_le(); + + let mut xinfo = 0; + if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + xinfo = buf.get_u32_le(); + } + if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { + let _dbid = buf.get_u32_le(); + let _tsid = buf.get_u32_le(); + } + if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { + let nsubxacts = buf.get_i32_le(); + let mut prev_blkno = u32::MAX; + for _i in 0..nsubxacts { + let subxact = buf.get_u32_le(); + let blkno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; + if prev_blkno != blkno { + prev_blkno = blkno; + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.blkno = blkno; + blocks.push(blk); + } + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { + let nrels = buf.get_i32_le(); + for _i in 0..nrels { + let spcnode = buf.get_u32_le(); + let dbnode = buf.get_u32_le(); + let relnode = buf.get_u32_le(); + //TODO handle this too? + trace!( + "XLOG_XACT_COMMIT relfilenode {}/{}/{}", + spcnode, + dbnode, + relnode + ); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { + let nmsgs = buf.get_i32_le(); + for _i in 0..nmsgs { + let sizeof_shared_invalidation_message = 0; + buf.advance(sizeof_shared_invalidation_message); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { + let _xid = buf.get_u32_le(); + trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); + //TODO handle this to be able to restore pg_twophase on node start + } } else if info == pg_constants::XLOG_XACT_ABORT { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; blk.blkno = xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; trace!( - "XLOG_XACT_ABORT xl_prev {:X}/{:X} xid {} updates block {}", - (xl_prev >> 32), + "XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", + xl_info, (xl_prev >> 32), xl_prev & 0xffffffff, xl_xid, - blk.blkno + blk.blkno, + main_data_len ); blocks.push(blk); - //TODO parse abort record to extract subtrans entries + //parse abort record to extract subtrans entries + // xl_xact_abort starts with time of commit + let _xact_time = buf.get_i64_le(); + + let mut xinfo = 0; + if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + xinfo = buf.get_u32_le(); + } + if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { + let _dbid = buf.get_u32_le(); + let _tsid = buf.get_u32_le(); + } + if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { + let nsubxacts = buf.get_i32_le(); + let mut prev_blkno = u32::MAX; + for _i in 0..nsubxacts { + let subxact = buf.get_u32_le(); + let blkno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; + if prev_blkno != blkno { + prev_blkno = blkno; + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.blkno = blkno; + blocks.push(blk); + } + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { + let nrels = buf.get_i32_le(); + for _i in 0..nrels { + let spcnode = buf.get_u32_le(); + let dbnode = buf.get_u32_le(); + let relnode = buf.get_u32_le(); + //TODO save these too + trace!( + "XLOG_XACT_ABORT relfilenode {}/{}/{}", + spcnode, + dbnode, + relnode + ); + } + } + if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { + let _xid = buf.get_u32_le(); + trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE"); + } } } else if xl_rmid == pg_constants::RM_DBASE_ID { let info = xl_info & !pg_constants::XLR_INFO_MASK; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 23b824bfe8..7ddc120abe 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -182,37 +182,17 @@ impl WalRedoManagerInternal { } } - fn transaction_id_set_status_bit( - &self, - xl_info: u8, - xl_xid: u32, - record: &WALRecord, - page: &mut BytesMut, - ) { - let info = xl_info & pg_constants::XLOG_XACT_OPMASK; - let mut status = 0; - if info == pg_constants::XLOG_XACT_COMMIT { - status = pg_constants::TRANSACTION_STATUS_COMMITTED; - } else if info == pg_constants::XLOG_XACT_ABORT { - status = pg_constants::TRANSACTION_STATUS_ABORTED; - } else { - trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", - status, - record.lsn, - record.main_data_offset, record.rec.len()); - return; - } + fn transaction_id_set_status_bit(&self, xid: u32, status: u8, page: &mut BytesMut) { + trace!( + "handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort, 3-sub_commit)", + status + ); - trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {} main_data_offset {}, rec.len {}", - status, - record.lsn, - record.main_data_offset, record.rec.len()); - - let byteno: usize = ((xl_xid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) + let byteno: usize = ((xid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) / pg_constants::CLOG_XACTS_PER_BYTE) as usize; let byteptr = &mut page[byteno..byteno + 1]; - let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE) + let bshift: u8 = ((xid % pg_constants::CLOG_XACTS_PER_BYTE) * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; let mut curval = byteptr[0]; @@ -225,8 +205,8 @@ impl WalRedoManagerInternal { byteptr.copy_from_slice(&byteval); trace!( - "xl_xid {} byteno {} curval {} byteval {}", - xl_xid, + "xid {} byteno {} curval {} byteval {}", + xid, byteno, curval, byteval[0] @@ -270,16 +250,80 @@ impl WalRedoManagerInternal { buf.advance(2); // 2 bytes of padding let _xl_crc = buf.get_u32_le(); + //move to main data + // TODO probably, we should store some records in our special format + // to avoid this weird parsing on replay + let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; + if buf.remaining() > skip { + buf.advance(skip); + } + if xl_rmid == pg_constants::RM_CLOG_ID { let info = xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::CLOG_ZEROPAGE { page.clone_from_slice(zero_page_bytes); - trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {} main_data_offset {}, rec.len {}", + } + } else if xl_rmid == pg_constants::RM_XACT_ID { + let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + let mut status = 0; + if info == pg_constants::XLOG_XACT_COMMIT { + status = pg_constants::TRANSACTION_STATUS_COMMITTED; + self.transaction_id_set_status_bit(xl_xid, status, &mut page); + //handle subtrans + let _xact_time = buf.get_i64_le(); + let mut xinfo = 0; + if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + xinfo = buf.get_u32_le(); + if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { + let _dbid = buf.get_u32_le(); + let _tsid = buf.get_u32_le(); + } + } + + if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { + let nsubxacts = buf.get_i32_le(); + for _i in 0..nsubxacts { + let subxact = buf.get_u32_le(); + let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + // only update xids on the requested page + if tag.blknum == blkno { + status = pg_constants::TRANSACTION_STATUS_SUB_COMMITTED; + self.transaction_id_set_status_bit(subxact, status, &mut page); + } + } + } + } else if info == pg_constants::XLOG_XACT_ABORT { + status = pg_constants::TRANSACTION_STATUS_ABORTED; + self.transaction_id_set_status_bit(xl_xid, status, &mut page); + //handle subtrans + let _xact_time = buf.get_i64_le(); + let mut xinfo = 0; + if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + xinfo = buf.get_u32_le(); + if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { + let _dbid = buf.get_u32_le(); + let _tsid = buf.get_u32_le(); + } + } + + if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { + let nsubxacts = buf.get_i32_le(); + for _i in 0..nsubxacts { + let subxact = buf.get_u32_le(); + let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + // only update xids on the requested page + if tag.blknum == blkno { + status = pg_constants::TRANSACTION_STATUS_ABORTED; + self.transaction_id_set_status_bit(subxact, status, &mut page); + } + } + } + } else { + trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", + status, record.lsn, record.main_data_offset, record.rec.len()); } - } else if xl_rmid == pg_constants::RM_XACT_ID { - self.transaction_id_set_status_bit(xl_info, xl_xid, record, &mut page); } }