use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::XLogRecord; use std::cmp::min; use std::str; use thiserror::Error; use zenith_utils::lsn::Lsn; pub type Oid = u32; pub type TransactionId = u32; pub type BlockNumber = u32; pub type OffsetNumber = u16; pub type MultiXactId = TransactionId; pub type MultiXactOffset = u32; pub type MultiXactStatus = u32; // From PostgreSQL headers #[repr(C)] #[derive(Debug)] pub struct XLogPageHeaderData { xlp_magic: u16, /* magic value for correctness checks */ xlp_info: u16, /* flag bits, see below */ xlp_tli: u32, /* TimeLineID of first record on page */ xlp_pageaddr: u64, /* XLOG address of this page */ xlp_rem_len: u32, /* total len of remaining data for record */ } // FIXME: this assumes MAXIMUM_ALIGNOF 8. There are 4 padding bytes at end #[allow(non_upper_case_globals)] const SizeOfXLogShortPHD: usize = 2 + 2 + 4 + 8 + 4 + 4; #[repr(C)] #[derive(Debug)] pub struct XLogLongPageHeaderData { std: XLogPageHeaderData, /* standard header fields */ xlp_sysid: u64, /* system identifier from pg_control */ xlp_seg_size: u32, /* just as a cross-check */ xlp_xlog_blcksz: u32, /* just as a cross-check */ } // FIXME: this assumes MAXIMUM_ALIGNOF 8. #[allow(non_upper_case_globals)] const SizeOfXLogLongPHD: usize = (2 + 2 + 4 + 8 + 4) + 4 + 8 + 4 + 4; #[allow(dead_code)] pub struct WalStreamDecoder { lsn: Lsn, startlsn: Lsn, // LSN where this record starts contlen: u32, padlen: u32, inputbuf: BytesMut, recordbuf: BytesMut, } #[derive(Error, Debug, Clone)] #[error("{msg} at {lsn}")] pub struct WalDecodeError { msg: String, lsn: Lsn, } // // WalRecordStream is a Stream that returns a stream of WAL records // FIXME: This isn't a proper rust stream // impl WalStreamDecoder { pub fn new(lsn: Lsn) -> WalStreamDecoder { WalStreamDecoder { lsn, startlsn: Lsn(0), contlen: 0, padlen: 0, inputbuf: BytesMut::new(), recordbuf: BytesMut::new(), } } pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); } /// Attempt to decode another WAL record from the input that has been fed to the /// decoder so far. /// /// Returns one of the following: /// Ok((u64, Bytes)): a tuple containing the LSN of next record, and the record itself /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function /// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid. /// pub fn poll_decode(&mut self) -> Result, WalDecodeError> { loop { // parse and verify page boundaries as we go if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 { // parse long header if self.inputbuf.remaining() < SizeOfXLogLongPHD { return Ok(None); } let hdr = self.decode_XLogLongPageHeaderData(); if hdr.std.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { msg: "invalid xlog segment header".into(), lsn: self.lsn, }); } // TODO: verify the remaining fields in the header self.lsn += SizeOfXLogLongPHD as u64; continue; } else if self.lsn.block_offset() == 0 { if self.inputbuf.remaining() < SizeOfXLogShortPHD { return Ok(None); } let hdr = self.decode_XLogPageHeaderData(); if hdr.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { msg: "invalid xlog page header".into(), lsn: self.lsn, }); } // TODO: verify the remaining fields in the header self.lsn += SizeOfXLogShortPHD as u64; continue; } else if self.padlen > 0 { if self.inputbuf.remaining() < self.padlen as usize { return Ok(None); } // skip padding self.inputbuf.advance(self.padlen as usize); self.lsn += self.padlen as u64; self.padlen = 0; } else if self.contlen == 0 { // need to have at least the xl_tot_len field if self.inputbuf.remaining() < 4 { return Ok(None); } // read xl_tot_len FIXME: assumes little-endian self.startlsn = self.lsn; let xl_tot_len = self.inputbuf.get_u32_le(); if xl_tot_len < SizeOfXLogRecord { return Err(WalDecodeError { msg: format!("invalid xl_tot_len {}", xl_tot_len), lsn: self.lsn, }); } self.lsn += 4; self.recordbuf.clear(); self.recordbuf.reserve(xl_tot_len as usize); self.recordbuf.put_u32_le(xl_tot_len); self.contlen = xl_tot_len - 4; continue; } else { // we're continuing a record, possibly from previous page. let pageleft = self.lsn.remaining_in_block() as u32; // read the rest of the record, or as much as fits on this page. let n = min(self.contlen, pageleft) as usize; if self.inputbuf.remaining() < n { return Ok(None); } self.recordbuf.put(self.inputbuf.split_to(n)); self.lsn += n as u64; self.contlen -= n as u32; if self.contlen == 0 { let recordbuf = std::mem::replace(&mut self.recordbuf, BytesMut::new()); let recordbuf = recordbuf.freeze(); let mut buf = recordbuf.clone(); // XLOG_SWITCH records are special. If we see one, we need to skip // to the next WAL segment. let xlogrec = XLogRecord::from_bytes(&mut buf); if xlogrec.is_xlog_switch_record() { trace!("saw xlog switch record at {}", self.lsn); self.padlen = self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32; } else { // Pad to an 8-byte boundary self.padlen = self.lsn.calc_padding(8u32) as u32; } let result = (self.lsn, recordbuf); return Ok(Some(result)); } continue; } } // check record boundaries // deal with continuation records // deal with xlog_switch records } #[allow(non_snake_case)] fn decode_XLogPageHeaderData(&mut self) -> XLogPageHeaderData { let buf = &mut self.inputbuf; // FIXME: Assume little-endian let hdr: XLogPageHeaderData = XLogPageHeaderData { xlp_magic: buf.get_u16_le(), xlp_info: buf.get_u16_le(), xlp_tli: buf.get_u32_le(), xlp_pageaddr: buf.get_u64_le(), xlp_rem_len: buf.get_u32_le(), }; // 4 bytes of padding, on 64-bit systems buf.advance(4); // FIXME: check that hdr.xlp_rem_len matches self.contlen //println!("next xlog page (xlp_rem_len: {})", hdr.xlp_rem_len); hdr } #[allow(non_snake_case)] fn decode_XLogLongPageHeaderData(&mut self) -> XLogLongPageHeaderData { let hdr: XLogLongPageHeaderData = XLogLongPageHeaderData { std: self.decode_XLogPageHeaderData(), xlp_sysid: self.inputbuf.get_u64_le(), xlp_seg_size: self.inputbuf.get_u32_le(), xlp_xlog_blcksz: self.inputbuf.get_u32_le(), }; hdr } } #[allow(dead_code)] pub struct DecodedBkpBlock { /* Is this block ref in use? */ //in_use: bool, /* Identify the block this refers to */ pub rnode_spcnode: u32, pub rnode_dbnode: u32, pub rnode_relnode: u32, // Note that we have a few special forknum values for non-rel files. pub forknum: u8, pub blkno: u32, /* copy of the fork_flags field from the XLogRecordBlockHeader */ flags: u8, /* Information on full-page image, if any */ has_image: bool, /* has image, even for consistency checking */ pub apply_image: bool, /* has image that should be restored */ pub will_init: bool, //char *bkp_image; hole_offset: u16, hole_length: u16, bimg_len: u16, bimg_info: u8, /* Buffer holding the rmgr-specific data associated with this block */ has_data: bool, data_len: u16, } impl DecodedBkpBlock { pub fn new() -> DecodedBkpBlock { DecodedBkpBlock { rnode_spcnode: 0, rnode_dbnode: 0, rnode_relnode: 0, forknum: 0, blkno: 0, flags: 0, has_image: false, apply_image: false, will_init: false, hole_offset: 0, hole_length: 0, bimg_len: 0, bimg_info: 0, has_data: false, data_len: 0, } } } #[allow(non_upper_case_globals)] const SizeOfXLogRecord: u32 = 24; pub struct DecodedWALRecord { pub xl_info: u8, pub xl_rmid: u8, pub record: Bytes, // raw XLogRecord pub blocks: Vec, pub main_data_offset: usize, } #[repr(C)] #[derive(Debug, Clone, Copy)] pub struct RelFileNode { pub spcnode: Oid, /* tablespace */ pub dbnode: Oid, /* database */ pub relnode: Oid, /* relation */ } #[repr(C)] #[derive(Debug)] pub struct XlRelmapUpdate { pub dbid: Oid, /* database ID, or 0 for shared map */ pub tsid: Oid, /* database's tablespace, or pg_global */ pub nbytes: i32, /* size of relmap data */ } impl XlRelmapUpdate { pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate { XlRelmapUpdate { dbid: buf.get_u32_le(), tsid: buf.get_u32_le(), nbytes: buf.get_i32_le(), } } } #[repr(C)] #[derive(Debug)] pub struct XlSmgrTruncate { pub blkno: BlockNumber, pub rnode: RelFileNode, pub flags: u32, } impl XlSmgrTruncate { pub fn decode(decoded: &DecodedWALRecord) -> XlSmgrTruncate { let mut buf = decoded.record.clone(); buf.advance((SizeOfXLogRecord + 2) as usize); XlSmgrTruncate { blkno: buf.get_u32_le(), rnode: RelFileNode { spcnode: buf.get_u32_le(), /* tablespace */ dbnode: buf.get_u32_le(), /* database */ relnode: buf.get_u32_le(), /* relation */ }, flags: buf.get_u32_le(), } } } #[repr(C)] #[derive(Debug)] pub struct XlCreateDatabase { pub db_id: Oid, pub tablespace_id: Oid, pub src_db_id: Oid, pub src_tablespace_id: Oid, } impl XlCreateDatabase { pub fn decode(decoded: &DecodedWALRecord) -> XlCreateDatabase { let mut buf = decoded.record.clone(); buf.advance((SizeOfXLogRecord + 2) as usize); XlCreateDatabase { db_id: buf.get_u32_le(), tablespace_id: buf.get_u32_le(), src_db_id: buf.get_u32_le(), src_tablespace_id: buf.get_u32_le(), } } } #[repr(C)] #[derive(Debug)] pub struct XlHeapInsert { pub offnum: OffsetNumber, pub flags: u8, } impl XlHeapInsert { pub fn decode(buf: &mut Bytes) -> XlHeapInsert { XlHeapInsert { offnum: buf.get_u16_le(), flags: buf.get_u8(), } } } #[repr(C)] #[derive(Debug)] pub struct XlHeapMultiInsert { pub flags: u8, pub ntuples: u16, } impl XlHeapMultiInsert { pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert { XlHeapMultiInsert { flags: buf.get_u8(), ntuples: buf.get_u16_le(), } } } #[repr(C)] #[derive(Debug)] pub struct XlHeapDelete { pub xmax: TransactionId, pub offnum: OffsetNumber, pub infobits_set: u8, pub flags: u8, } impl XlHeapDelete { pub fn decode(buf: &mut Bytes) -> XlHeapDelete { XlHeapDelete { xmax: buf.get_u32_le(), offnum: buf.get_u16_le(), infobits_set: buf.get_u8(), flags: buf.get_u8(), } } } #[repr(C)] #[derive(Debug)] pub struct XlHeapUpdate { pub old_xmax: TransactionId, pub old_offnum: OffsetNumber, pub old_infobits_set: u8, pub flags: u8, pub new_xmax: TransactionId, pub new_offnum: OffsetNumber, } impl XlHeapUpdate { pub fn decode(buf: &mut Bytes) -> XlHeapUpdate { XlHeapUpdate { old_xmax: buf.get_u32_le(), old_offnum: buf.get_u16_le(), old_infobits_set: buf.get_u8(), flags: buf.get_u8(), new_xmax: buf.get_u32_le(), new_offnum: buf.get_u16_le(), } } } #[repr(C)] #[derive(Debug)] pub struct MultiXactMember { pub xid: TransactionId, pub status: MultiXactStatus, } impl MultiXactMember { pub fn decode(buf: &mut Bytes) -> MultiXactMember { MultiXactMember { xid: buf.get_u32_le(), status: buf.get_u32_le(), } } } #[repr(C)] #[derive(Debug)] pub struct XlMultiXactCreate { pub mid: MultiXactId, /* new MultiXact's ID */ pub moff: MultiXactOffset, /* its starting offset in members file */ pub nmembers: u32, /* number of member XIDs */ pub members: Vec, } impl XlMultiXactCreate { pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate { let mid = buf.get_u32_le(); let moff = buf.get_u32_le(); let nmembers = buf.get_u32_le(); let mut members = Vec::new(); for _ in 0..nmembers { members.push(MultiXactMember::decode(buf)); } XlMultiXactCreate { mid, moff, nmembers, members, } } } #[repr(C)] #[derive(Debug)] pub struct XlMultiXactTruncate { oldest_multi_db: Oid, /* to-be-truncated range of multixact offsets */ start_trunc_off: MultiXactId, /* just for completeness' sake */ end_trunc_off: MultiXactId, /* to-be-truncated range of multixact members */ start_trunc_memb: MultiXactOffset, end_trunc_memb: MultiXactOffset, } impl XlMultiXactTruncate { pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate { XlMultiXactTruncate { oldest_multi_db: buf.get_u32_le(), start_trunc_off: buf.get_u32_le(), end_trunc_off: buf.get_u32_le(), start_trunc_memb: buf.get_u32_le(), end_trunc_memb: buf.get_u32_le(), } } } // // Routines to decode a WAL record and figure out which blocks are modified // // See xlogrecord.h for details // The overall layout of an XLOG record is: // Fixed-size header (XLogRecord struct) // XLogRecordBlockHeader struct // If pg_constants::BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows // If pg_constants::BKPIMAGE_HAS_HOLE and pg_constants::BKPIMAGE_IS_COMPRESSED, an // XLogRecordBlockCompressHeader struct follows. // If pg_constants::BKPBLOCK_SAME_REL is not set, a RelFileNode follows // BlockNumber follows // XLogRecordBlockHeader struct // ... // XLogRecordDataHeader[Short|Long] struct // block data // block data // ... // main data pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let mut rnode_spcnode: u32 = 0; let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; let mut got_rnode = false; let mut buf = record.clone(); // 1. Parse XLogRecord struct // FIXME: assume little-endian here let xlogrec = XLogRecord::from_bytes(&mut buf); trace!( "decode_wal_record xl_rmid = {} xl_info = {}", xlogrec.xl_rmid, xlogrec.xl_info ); let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord; if buf.remaining() != remaining as usize { //TODO error } let mut max_block_id = 0; let mut blocks_total_len: u32 = 0; let mut main_data_len = 0; let mut datatotal: u32 = 0; let mut blocks: Vec = Vec::new(); // 2. Decode the headers. // XLogRecordBlockHeaders if any, // XLogRecordDataHeader[Short|Long] while buf.remaining() > datatotal as usize { let block_id = buf.get_u8(); match block_id { pg_constants::XLR_BLOCK_ID_DATA_SHORT => { /* XLogRecordDataHeaderShort */ main_data_len = buf.get_u8() as u32; datatotal += main_data_len; } pg_constants::XLR_BLOCK_ID_DATA_LONG => { /* XLogRecordDataHeaderLong */ main_data_len = buf.get_u32_le(); datatotal += main_data_len; } pg_constants::XLR_BLOCK_ID_ORIGIN => { // RepOriginId is uint16 buf.advance(2); } pg_constants::XLR_BLOCK_ID_TOPLEVEL_XID => { // TransactionId is uint32 buf.advance(4); } 0..=pg_constants::XLR_MAX_BLOCK_ID => { /* XLogRecordBlockHeader */ let mut blk = DecodedBkpBlock::new(); let fork_flags: u8; if block_id <= max_block_id { // TODO //report_invalid_record(state, // "out-of-order block_id %u at %X/%X", // block_id, // (uint32) (state->ReadRecPtr >> 32), // (uint32) state->ReadRecPtr); // goto err; } max_block_id = block_id; fork_flags = buf.get_u8(); blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK; blk.flags = fork_flags; blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0; blk.has_data = (fork_flags & pg_constants::BKPBLOCK_HAS_DATA) != 0; blk.will_init = (fork_flags & pg_constants::BKPBLOCK_WILL_INIT) != 0; blk.data_len = buf.get_u16_le(); /* TODO cross-check that the HAS_DATA flag is set iff data_length > 0 */ datatotal += blk.data_len as u32; blocks_total_len += blk.data_len as u32; if blk.has_image { blk.bimg_len = buf.get_u16_le(); blk.hole_offset = buf.get_u16_le(); blk.bimg_info = buf.get_u8(); blk.apply_image = (blk.bimg_info & pg_constants::BKPIMAGE_APPLY) != 0; if blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED != 0 { if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 { blk.hole_length = buf.get_u16_le(); } else { blk.hole_length = 0; } } else { blk.hole_length = pg_constants::BLCKSZ - blk.bimg_len; } datatotal += blk.bimg_len as u32; blocks_total_len += blk.bimg_len as u32; /* * cross-check that hole_offset > 0, hole_length > 0 and * bimg_len < BLCKSZ if the HAS_HOLE flag is set. */ if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 && (blk.hole_offset == 0 || blk.hole_length == 0 || blk.bimg_len == pg_constants::BLCKSZ) { // TODO /* report_invalid_record(state, "pg_constants::BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, (unsigned int) blk->bimg_len, (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); goto err; */ } /* * cross-check that hole_offset == 0 and hole_length == 0 if * the HAS_HOLE flag is not set. */ if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 && (blk.hole_offset != 0 || blk.hole_length != 0) { // TODO /* report_invalid_record(state, "pg_constants::BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", (unsigned int) blk->hole_offset, (unsigned int) blk->hole_length, (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); goto err; */ } /* * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED * flag is set. */ if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0) && blk.bimg_len == pg_constants::BLCKSZ { // TODO /* report_invalid_record(state, "pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X", (unsigned int) blk->bimg_len, (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); goto err; */ } /* * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor * IS_COMPRESSED flag is set. */ if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 && blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0 && blk.bimg_len != pg_constants::BLCKSZ { // TODO /* report_invalid_record(state, "neither pg_constants::BKPIMAGE_HAS_HOLE nor pg_constants::BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X", (unsigned int) blk->data_len, (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); goto err; */ } } if fork_flags & pg_constants::BKPBLOCK_SAME_REL == 0 { rnode_spcnode = buf.get_u32_le(); rnode_dbnode = buf.get_u32_le(); rnode_relnode = buf.get_u32_le(); got_rnode = true; } else if !got_rnode { // TODO /* report_invalid_record(state, "pg_constants::BKPBLOCK_SAME_REL set but no previous rel at %X/%X", (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); goto err; */ } blk.rnode_spcnode = rnode_spcnode; blk.rnode_dbnode = rnode_dbnode; blk.rnode_relnode = rnode_relnode; blk.blkno = buf.get_u32_le(); trace!( "this record affects {}/{}/{} blk {}", rnode_spcnode, rnode_dbnode, rnode_relnode, blk.blkno ); blocks.push(blk); } _ => { // TODO: invalid block_id } } } // 3. Decode blocks. // We don't need them, so just skip blocks_total_len bytes buf.advance(blocks_total_len as usize); let main_data_offset = (xlogrec.xl_tot_len - main_data_len) as usize; // 4. Decode main_data if main_data_len > 0 { assert_eq!(buf.remaining(), main_data_len as usize); } //5. Handle special XACT records if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_XACT_COMMIT { //parse commit record to extract subtrans entries // xl_xact_commit starts with time of commit let _xact_time = buf.get_i64_le(); let mut xinfo = 0; if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { xinfo = buf.get_u32_le(); } if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { let _dbid = buf.get_u32_le(); let _tsid = buf.get_u32_le(); } if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); for _i in 0..nsubxacts { let _subxact = buf.get_u32_le(); } } if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { let nrels = buf.get_i32_le(); for _i in 0..nrels { let spcnode = buf.get_u32_le(); let dbnode = buf.get_u32_le(); let relnode = buf.get_u32_le(); //TODO handle this too? trace!( "XLOG_XACT_COMMIT relfilenode {}/{}/{}", spcnode, dbnode, relnode ); } } if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { let nmsgs = buf.get_i32_le(); for _i in 0..nmsgs { let sizeof_shared_invalidation_message = 0; buf.advance(sizeof_shared_invalidation_message); } } if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { let _xid = buf.get_u32_le(); trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); //TODO handle this to be able to restore pg_twophase on node start } } else if info == pg_constants::XLOG_XACT_ABORT { //parse abort record to extract subtrans entries // xl_xact_abort starts with time of commit let _xact_time = buf.get_i64_le(); let mut xinfo = 0; if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { xinfo = buf.get_u32_le(); } if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { let _dbid = buf.get_u32_le(); let _tsid = buf.get_u32_le(); } if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); for _i in 0..nsubxacts { let _subxact = buf.get_u32_le(); } } if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { let nrels = buf.get_i32_le(); for _i in 0..nrels { let spcnode = buf.get_u32_le(); let dbnode = buf.get_u32_le(); let relnode = buf.get_u32_le(); //TODO save these too trace!( "XLOG_XACT_ABORT relfilenode {}/{}/{}", spcnode, dbnode, relnode ); } } if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { let _xid = buf.get_u32_le(); trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE"); } } } else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::XLOG_DBASE_CREATE { //buf points to main_data let db_id = buf.get_u32_le(); let tablespace_id = buf.get_u32_le(); let src_db_id = buf.get_u32_le(); let src_tablespace_id = buf.get_u32_le(); trace!( "XLOG_DBASE_CREATE tablespace_id/db_id {}/{} src_db_id {}/{}", tablespace_id, db_id, src_tablespace_id, src_db_id ); // in postgres it is implemented as copydir // we need to copy all pages in page_cache } else { trace!("XLOG_DBASE_DROP is not handled yet"); } } else if xlogrec.xl_rmid == pg_constants::RM_TBLSPC_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::XLOG_TBLSPC_CREATE { //buf points to main_data let ts_id = buf.get_u32_le(); let ts_path = str::from_utf8(&buf).unwrap(); trace!("XLOG_TBLSPC_CREATE ts_id {} ts_path {}", ts_id, ts_path); } else { trace!("XLOG_TBLSPC_DROP is not handled yet"); } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; if info == pg_constants::XLOG_HEAP_INSERT { let xlrec = XlHeapInsert::decode(&mut buf); if (xlrec.flags & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) != 0 { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; blk.blkno = blkno; blk.rnode_spcnode = blocks[0].rnode_spcnode; blk.rnode_dbnode = blocks[0].rnode_dbnode; blk.rnode_relnode = blocks[0].rnode_relnode; blocks.push(blk); } } else if info == pg_constants::XLOG_HEAP_DELETE { let xlrec = XlHeapDelete::decode(&mut buf); if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; blk.blkno = blkno; blk.rnode_spcnode = blocks[0].rnode_spcnode; blk.rnode_dbnode = blocks[0].rnode_dbnode; blk.rnode_relnode = blocks[0].rnode_relnode; blocks.push(blk); } } else if info == pg_constants::XLOG_HEAP_UPDATE || info == pg_constants::XLOG_HEAP_HOT_UPDATE { let xlrec = XlHeapUpdate::decode(&mut buf); if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; blk.blkno = blkno; blk.rnode_spcnode = blocks[0].rnode_spcnode; blk.rnode_dbnode = blocks[0].rnode_dbnode; blk.rnode_relnode = blocks[0].rnode_relnode; blocks.push(blk); } if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 && blocks.len() > 1 { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; blk.blkno = blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; blk.rnode_spcnode = blocks[1].rnode_spcnode; blk.rnode_dbnode = blocks[1].rnode_dbnode; blk.rnode_relnode = blocks[1].rnode_relnode; blocks.push(blk); } } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { let xlrec = XlHeapMultiInsert::decode(&mut buf); if (xlrec.flags & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) != 0 { let mut blk = DecodedBkpBlock::new(); let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; blk.blkno = blkno; blk.rnode_spcnode = blocks[0].rnode_spcnode; blk.rnode_dbnode = blocks[0].rnode_dbnode; blk.rnode_relnode = blocks[0].rnode_relnode; blocks.push(blk); } } } DecodedWALRecord { xl_info: xlogrec.xl_info, xl_rmid: xlogrec.xl_rmid, record, blocks, main_data_offset, } }