diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 7fae6a121b..9e480a5135 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -282,11 +282,16 @@ pub fn save_decoded_record( { let truncate = XlSmgrTruncate::decode(&decoded); save_xlog_smgr_truncate(timeline, lsn, &truncate)?; - } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID - && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE - { - let createdb = XlCreateDatabase::decode(&decoded); - save_xlog_dbase_create(timeline, lsn, &createdb)?; + } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { + if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE { + let createdb = XlCreateDatabase::decode(&decoded); + save_xlog_dbase_create(timeline, lsn, &createdb)?; + } else { + // TODO + trace!("XLOG_DBASE_DROP is not handled yet"); + } + } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { + trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); } // Now that this record has been handled, let the repository know that diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 2fbb0af95f..a21936880f 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -11,7 +11,6 @@ use postgres_ffi::XLogPageHeaderData; use postgres_ffi::XLogRecord; use std::cmp::min; -use std::str; use thiserror::Error; use zenith_utils::lsn::Lsn; @@ -19,9 +18,6 @@ pub type Oid = u32; pub type TransactionId = u32; pub type BlockNumber = u32; pub type OffsetNumber = u16; -pub type MultiXactId = TransactionId; -pub type MultiXactOffset = u32; -pub type MultiXactStatus = u32; #[allow(dead_code)] pub struct WalStreamDecoder { @@ -264,24 +260,6 @@ pub struct RelFileNode { pub relnode: Oid, /* relation */ } -#[repr(C)] -#[derive(Debug)] -pub struct XlRelmapUpdate { - pub dbid: Oid, /* database ID, or 0 for shared map */ - pub tsid: Oid, /* database's tablespace, or pg_global */ - pub nbytes: i32, /* size of relmap data */ -} - -impl XlRelmapUpdate { - pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate { - XlRelmapUpdate { - dbid: buf.get_u32_le(), - tsid: buf.get_u32_le(), - nbytes: buf.get_i32_le(), - } - } -} - #[repr(C)] #[derive(Debug)] pub struct XlSmgrTruncate { @@ -404,74 +382,6 @@ impl XlHeapUpdate { } } -#[repr(C)] -#[derive(Debug)] -pub struct MultiXactMember { - pub xid: TransactionId, - pub status: MultiXactStatus, -} - -impl MultiXactMember { - pub fn decode(buf: &mut Bytes) -> MultiXactMember { - MultiXactMember { - xid: buf.get_u32_le(), - status: buf.get_u32_le(), - } - } -} - -#[repr(C)] -#[derive(Debug)] -pub struct XlMultiXactCreate { - pub mid: MultiXactId, /* new MultiXact's ID */ - pub moff: MultiXactOffset, /* its starting offset in members file */ - pub nmembers: u32, /* number of member XIDs */ - pub members: Vec, -} - -impl XlMultiXactCreate { - pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate { - let mid = buf.get_u32_le(); - let moff = buf.get_u32_le(); - let nmembers = buf.get_u32_le(); - let mut members = Vec::new(); - for _ in 0..nmembers { - members.push(MultiXactMember::decode(buf)); - } - XlMultiXactCreate { - mid, - moff, - nmembers, - members, - } - } -} - -#[repr(C)] -#[derive(Debug)] -pub struct XlMultiXactTruncate { - oldest_multi_db: Oid, - /* to-be-truncated range of multixact offsets */ - start_trunc_off: MultiXactId, /* just for completeness' sake */ - end_trunc_off: MultiXactId, - - /* to-be-truncated range of multixact members */ - start_trunc_memb: MultiXactOffset, - end_trunc_memb: MultiXactOffset, -} - -impl XlMultiXactTruncate { - pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate { - XlMultiXactTruncate { - oldest_multi_db: buf.get_u32_le(), - start_trunc_off: buf.get_u32_le(), - end_trunc_off: buf.get_u32_le(), - start_trunc_memb: buf.get_u32_le(), - end_trunc_memb: buf.get_u32_le(), - } - } -} - /// Main routine to decode a WAL record and figure out which blocks are modified // // See xlogrecord.h for details @@ -719,125 +629,9 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { assert_eq!(buf.remaining(), main_data_len as usize); } - //5. Handle special XACT records - if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; - if info == pg_constants::XLOG_XACT_COMMIT { - //parse commit record to extract subtrans entries - // xl_xact_commit starts with time of commit - let _xact_time = buf.get_i64_le(); - - let mut xinfo = 0; - if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { - xinfo = buf.get_u32_le(); - } - if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { - let _dbid = buf.get_u32_le(); - let _tsid = buf.get_u32_le(); - } - if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { - let nsubxacts = buf.get_i32_le(); - for _i in 0..nsubxacts { - let _subxact = buf.get_u32_le(); - } - } - if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { - let nrels = buf.get_i32_le(); - for _i in 0..nrels { - let spcnode = buf.get_u32_le(); - let dbnode = buf.get_u32_le(); - let relnode = buf.get_u32_le(); - //TODO handle this too? - trace!( - "XLOG_XACT_COMMIT relfilenode {}/{}/{}", - spcnode, - dbnode, - relnode - ); - } - } - if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { - let nmsgs = buf.get_i32_le(); - for _i in 0..nmsgs { - let sizeof_shared_invalidation_message = 0; - buf.advance(sizeof_shared_invalidation_message); - } - } - if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { - let _xid = buf.get_u32_le(); - trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); - //TODO handle this to be able to restore pg_twophase on node start - } - } else if info == pg_constants::XLOG_XACT_ABORT { - //parse abort record to extract subtrans entries - // xl_xact_abort starts with time of commit - let _xact_time = buf.get_i64_le(); - - let mut xinfo = 0; - if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { - xinfo = buf.get_u32_le(); - } - if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { - let _dbid = buf.get_u32_le(); - let _tsid = buf.get_u32_le(); - } - if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { - let nsubxacts = buf.get_i32_le(); - for _i in 0..nsubxacts { - let _subxact = buf.get_u32_le(); - } - } - if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { - let nrels = buf.get_i32_le(); - for _i in 0..nrels { - let spcnode = buf.get_u32_le(); - let dbnode = buf.get_u32_le(); - let relnode = buf.get_u32_le(); - //TODO handle this too? - trace!( - "XLOG_XACT_ABORT relfilenode {}/{}/{}", - spcnode, - dbnode, - relnode - ); - } - } - if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { - let _xid = buf.get_u32_le(); - trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE"); - } - } - } else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID { - let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::XLOG_DBASE_CREATE { - //buf points to main_data - let db_id = buf.get_u32_le(); - let tablespace_id = buf.get_u32_le(); - let src_db_id = buf.get_u32_le(); - let src_tablespace_id = buf.get_u32_le(); - trace!( - "XLOG_DBASE_CREATE tablespace_id/db_id {}/{} src_db_id {}/{}", - tablespace_id, - db_id, - src_tablespace_id, - src_db_id - ); - // in postgres it is implemented as copydir - // we need to copy all pages in page_cache - } else { - trace!("XLOG_DBASE_DROP is not handled yet"); - } - } else if xlogrec.xl_rmid == pg_constants::RM_TBLSPC_ID { - let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::XLOG_TBLSPC_CREATE { - //buf points to main_data - let ts_id = buf.get_u32_le(); - let ts_path = str::from_utf8(&buf).unwrap(); - trace!("XLOG_TBLSPC_CREATE ts_id {} ts_path {}", ts_id, ts_path); - } else { - trace!("XLOG_TBLSPC_DROP is not handled yet"); - } - } else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { + // 5. Handle a few special record types that modify blocks without registering + // them with the standard mechanism. + if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; if info == pg_constants::XLOG_HEAP_INSERT { diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 94262bc438..ecd7fe3919 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -33,33 +33,11 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24; pub const BITS_PER_HEAPBLOCK: u16 = 2; pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; -pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; -pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; -pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; - -// From xact.h -pub const XLOG_XACT_COMMIT: u8 = 0x00; -pub const XLOG_XACT_PREPARE: u8 = 0x10; -pub const XLOG_XACT_ABORT: u8 = 0x20; - /* mask for filtering opcodes out of xl_info */ pub const XLOG_XACT_OPMASK: u8 = 0x70; /* does this record have a 'xinfo' field or not */ pub const XLOG_XACT_HAS_INFO: u8 = 0x80; -/* - * The following flags, stored in xinfo, determine which information is - * contained in commit/abort records. - */ -pub const XACT_XINFO_HAS_DBINFO: u32 = 1u32 << 0; -pub const XACT_XINFO_HAS_SUBXACTS: u32 = 1u32 << 1; -pub const XACT_XINFO_HAS_RELFILENODES: u32 = 1u32 << 2; -pub const XACT_XINFO_HAS_INVALS: u32 = 1u32 << 3; -pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; -// pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5; -// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6; -// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; - // From pg_control.h and rmgrlist.h pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;