From 22e7fcbf2dba6c34c80849f5d3e9af67cec0d609 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 12 May 2021 10:08:13 +0300 Subject: [PATCH] Handle visbility map updates in WAL redo --- pageserver/src/basebackup.rs | 2 + pageserver/src/restore_local_repo.rs | 12 +- pageserver/src/restore_s3.rs | 9 +- pageserver/src/waldecoder.rs | 161 ++++++++++++++++++++++++++- pageserver/src/walredo.rs | 2 +- postgres_ffi/src/lib.rs | 46 ++++++++ postgres_ffi/src/pg_constants.rs | 40 +++++-- vendor/postgres | 2 +- 8 files changed, 248 insertions(+), 26 deletions(-) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 3b626f91df..426e413521 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -1,4 +1,6 @@ +use crate::ZTimelineId; use log::*; +use postgres_ffi::FilePathError; use std::io::Write; use tar::Builder; use walkdir::WalkDir; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index c0978adb20..5646929998 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -11,7 +11,6 @@ // use log::*; - use std::cmp::max; use std::fs; use std::fs::File; @@ -30,6 +29,7 @@ use crate::ZTimelineId; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; +use postgres_ffi::FilePathError; use zenith_utils::lsn::Lsn; /// @@ -199,8 +199,7 @@ fn restore_relfile( let mut file = File::open(path)?; let mut buf: [u8; 8192] = [0u8; 8192]; - // FIXME: use constants (BLCKSZ) - let mut blknum: u32 = segno * (1024 * 1024 * 1024 / 8192); + let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32); loop { let r = file.read_exact(&mut buf); match r { @@ -210,7 +209,7 @@ fn restore_relfile( spcnode: spcoid, dbnode: dboid, relnode, - forknum: forknum as u8, + forknum, }, blknum, }; @@ -246,7 +245,7 @@ fn restore_nonrelfile( timeline: &dyn Timeline, _timelineid: ZTimelineId, snapshot: &str, - forknum: u32, + forknum: u8, path: &Path, ) -> Result<()> { let lsn = Lsn::from_hex(snapshot)?; @@ -257,7 +256,6 @@ fn restore_nonrelfile( let mut buf: [u8; 8192] = [0u8; 8192]; let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?; - // FIXME: use constants (BLCKSZ) let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; loop { let r = file.read_exact(&mut buf); @@ -268,7 +266,7 @@ fn restore_nonrelfile( spcnode: 0, dbnode: 0, relnode: 0, - forknum: forknum as u8, + forknum, }, blknum, }; diff --git a/pageserver/src/restore_s3.rs b/pageserver/src/restore_s3.rs index 2996d7650c..46cfaa8a89 100644 --- a/pageserver/src/restore_s3.rs +++ b/pageserver/src/restore_s3.rs @@ -134,7 +134,7 @@ struct ParsedBaseImageFileName { pub spcnode: u32, pub dbnode: u32, pub relnode: u32, - pub forknum: u32, + pub forknum: u8, pub segno: u32, pub lsn: u64, @@ -146,7 +146,7 @@ struct ParsedBaseImageFileName { // . // _. -fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> { +fn parse_filename(fname: &str) -> Result<(u32, u8, u32, u64), FilePathError> { let re = Regex::new(r"^(?P\d+)(_(?P[a-z]+))?(\.(?P\d+))?_(?P[[:xdigit:]]{8})(?P[[:xdigit:]]{8})$").unwrap(); let caps = re @@ -252,8 +252,7 @@ async fn slurp_base_file( let mut bytes = BytesMut::from(data.as_slice()).freeze(); - // FIXME: use constants (BLCKSZ) - let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192); + let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32); let pcache = page_cache::get_pagecache(conf, sys_id); @@ -263,7 +262,7 @@ async fn slurp_base_file( spcnode: parsed.spcnode, dbnode: parsed.dbnode, relnode: parsed.relnode, - forknum: parsed.forknum as u8, + forknum: parsed.forknum, }, blknum, }; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index e7b3ba43bd..cb0c3ce7de 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -307,7 +307,9 @@ pub struct DecodedWALRecord { } pub type Oid = u32; +pub type TransactionId = u32; pub type BlockNumber = u32; +pub type OffsetNumber = u16; #[repr(C)] #[derive(Debug, Clone, Copy)] @@ -363,6 +365,82 @@ impl XlCreateDatabase { } } +#[repr(C)] +#[derive(Debug)] +pub struct XlHeapInsert { + pub offnum: OffsetNumber, + pub flags: u8, +} + +impl XlHeapInsert { + pub fn decode(buf: &mut Bytes) -> XlHeapInsert { + XlHeapInsert { + offnum: buf.get_u16_le(), + flags: buf.get_u8(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlHeapMultiInsert { + pub flags: u8, + pub ntuples: u16, +} + +impl XlHeapMultiInsert { + pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert { + XlHeapMultiInsert { + flags: buf.get_u8(), + ntuples: buf.get_u16_le(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlHeapDelete { + pub xmax: TransactionId, + pub offnum: OffsetNumber, + pub infobits_set: u8, + pub flags: u8, +} + +impl XlHeapDelete { + pub fn decode(buf: &mut Bytes) -> XlHeapDelete { + XlHeapDelete { + xmax: buf.get_u32_le(), + offnum: buf.get_u16_le(), + infobits_set: buf.get_u8(), + flags: buf.get_u8(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlHeapUpdate { + pub old_xmax: TransactionId, + pub old_offnum: OffsetNumber, + pub old_infobits_set: u8, + pub flags: u8, + pub new_xmax: TransactionId, + pub new_offnum: OffsetNumber, +} + +impl XlHeapUpdate { + pub fn decode(buf: &mut Bytes) -> XlHeapUpdate { + XlHeapUpdate { + old_xmax: buf.get_u32_le(), + old_offnum: buf.get_u16_le(), + old_infobits_set: buf.get_u8(), + flags: buf.get_u8(), + new_xmax: buf.get_u32_le(), + new_offnum: buf.get_u16_le(), + } + } +} + // // Routines to decode a WAL record and figure out which blocks are modified // @@ -614,7 +692,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { //5. Handle special CLOG and XACT records if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.forknum = pg_constants::PG_XACT_FORKNUM; blk.blkno = buf.get_i32_le() as u32; blk.will_init = true; trace!("RM_CLOG_ID updates block {}", blk.blkno); @@ -623,7 +701,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_XACT_COMMIT { let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.forknum = pg_constants::PG_XACT_FORKNUM; blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; trace!( "XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", @@ -656,7 +734,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { if prev_blkno != blkno { prev_blkno = blkno; let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.forknum = pg_constants::PG_XACT_FORKNUM; blk.blkno = blkno; blocks.push(blk); } @@ -691,7 +769,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } } else if info == pg_constants::XLOG_XACT_ABORT { let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.forknum = pg_constants::PG_XACT_FORKNUM; blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; trace!( "XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", @@ -723,7 +801,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { if prev_blkno != blkno { prev_blkno = blkno; let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; + blk.forknum = pg_constants::PG_XACT_FORKNUM; blk.blkno = blkno; blocks.push(blk); } @@ -779,6 +857,79 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } else { trace!("XLOG_TBLSPC_DROP is not handled yet"); } + } else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; + if info == pg_constants::XLOG_HEAP_INSERT { + let xlrec = XlHeapInsert::decode(&mut buf); + if (xlrec.flags + & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED + | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) + != 0 + { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; + blk.blkno = blkno; + blk.rnode_spcnode = blocks[0].rnode_spcnode; + blk.rnode_dbnode = blocks[0].rnode_dbnode; + blk.rnode_relnode = blocks[0].rnode_relnode; + blocks.push(blk); + } + } else if info == pg_constants::XLOG_HEAP_DELETE { + let xlrec = XlHeapDelete::decode(&mut buf); + if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; + blk.blkno = blkno; + blk.rnode_spcnode = blocks[0].rnode_spcnode; + blk.rnode_dbnode = blocks[0].rnode_dbnode; + blk.rnode_relnode = blocks[0].rnode_relnode; + blocks.push(blk); + } + } else if info == pg_constants::XLOG_HEAP_UPDATE + || info == pg_constants::XLOG_HEAP_HOT_UPDATE + { + let xlrec = XlHeapUpdate::decode(&mut buf); + if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; + blk.blkno = blkno; + blk.rnode_spcnode = blocks[0].rnode_spcnode; + blk.rnode_dbnode = blocks[0].rnode_dbnode; + blk.rnode_relnode = blocks[0].rnode_relnode; + blocks.push(blk); + } + if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 + && blocks.len() > 1 + { + let mut blk = DecodedBkpBlock::new(); + blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; + blk.blkno = blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; + blk.rnode_spcnode = blocks[1].rnode_spcnode; + blk.rnode_dbnode = blocks[1].rnode_dbnode; + blk.rnode_relnode = blocks[1].rnode_relnode; + blocks.push(blk); + } + } + } else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID { + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { + let xlrec = XlHeapMultiInsert::decode(&mut buf); + if (xlrec.flags + & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED + | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) + != 0 + { + let mut blk = DecodedBkpBlock::new(); + let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; + blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; + blk.blkno = blkno; + blk.rnode_spcnode = blocks[0].rnode_spcnode; + blk.rnode_dbnode = blocks[0].rnode_dbnode; + blk.rnode_relnode = blocks[0].rnode_relnode; + blocks.push(blk); + } + } } DecodedWALRecord { diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 3aedd16faf..cf878c0a57 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -229,7 +229,7 @@ impl WalRedoManagerInternal { let start = Instant::now(); let apply_result: Result; - if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 { + if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM { const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; let mut page = BytesMut::new(); if let Some(fpi) = base_img { diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index a1a12a901d..67e8e4b1b2 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -8,6 +8,8 @@ pub mod relfile_utils; pub mod xlog_utils; use bytes::{Buf, Bytes, BytesMut}; +use std::error::Error; +use std::fmt; // sizeof(ControlFileData) const SIZEOF_CONTROLDATA: usize = std::mem::size_of::(); @@ -50,6 +52,50 @@ pub fn decode_pg_control(mut buf: Bytes) -> Result &str { + &self.msg + } +} + +impl FilePathError { + pub fn new(msg: &str) -> FilePathError { + FilePathError { + msg: msg.to_string(), + } + } +} + +impl From for FilePathError { + fn from(e: core::num::ParseIntError) -> Self { + return FilePathError { + msg: format!("invalid filename: {}", e), + }; + } +} + +impl fmt::Display for FilePathError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "invalid filename") + } +} + +pub fn forkname_to_forknum(forkname: Option<&str>) -> Result { + match forkname { + // "main" is not in filenames, it's implicit if the fork name is not present + None => Ok(pg_constants::MAIN_FORKNUM), + Some("fsm") => Ok(pg_constants::FSM_FORKNUM), + Some("vm") => Ok(pg_constants::VISIBILITYMAP_FORKNUM), + Some("init") => Ok(pg_constants::INIT_FORKNUM), + Some(_) => Err(FilePathError::new("invalid forkname")), + } +} + pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes { let b: [u8; SIZEOF_CONTROLDATA]; diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index ba923799e2..5adb09ab9c 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -15,13 +15,13 @@ pub const MAIN_FORKNUM: u8 = 0; pub const FSM_FORKNUM: u8 = 1; pub const VISIBILITYMAP_FORKNUM: u8 = 2; pub const INIT_FORKNUM: u8 = 3; - // Special values for non-rel files' tags (Zenith-specific) -pub const PG_CONTROLFILE_FORKNUM: u32 = 42; -pub const PG_FILENODEMAP_FORKNUM: u32 = 43; -pub const PG_XACT_FORKNUM: u32 = 44; -pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45; -pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46; +//Special values for non-rel files' tags +pub const PG_CONTROLFILE_FORKNUM: u8 = 42; +pub const PG_FILENODEMAP_FORKNUM: u8 = 43; +pub const PG_XACT_FORKNUM: u8 = 44; +pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45; +pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46; // From storage_xlog.h pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; @@ -34,6 +34,13 @@ pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE; pub const CLOG_BITS_PER_XACT: u8 = 2; pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; +// +// Constants from visbilitymap.h +// +pub const SIZE_OF_PAGE_HEADER: u16 = 24; +pub const BITS_PER_HEAPBLOCK: u16 = 2; +pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; + pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; @@ -69,13 +76,32 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; // From pg_control.h and rmgrlist.h pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; +pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; + +// From heapam_xlog.h +pub const XLOG_HEAP_INSERT: u8 = 0x00; +pub const XLOG_HEAP_DELETE: u8 = 0x10; +pub const XLOG_HEAP_UPDATE: u8 = 0x20; +pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40; +pub const XLOG_HEAP2_VISIBLE: u8 = 0x40; +pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50; +pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8; +pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8; +pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8; +pub const XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED: u8 = (1 << 1) as u8; +pub const XLH_DELETE_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8; + pub const RM_XLOG_ID: u8 = 0; pub const RM_XACT_ID: u8 = 1; pub const RM_SMGR_ID: u8 = 2; pub const RM_CLOG_ID: u8 = 3; pub const RM_DBASE_ID: u8 = 4; pub const RM_TBLSPC_ID: u8 = 5; -// pub const RM_MULTIXACT_ID:u8 = 6; +pub const RM_MULTIXACT_ID: u8 = 6; +pub const RM_RELMAP_ID: u8 = 7; +pub const RM_STANDBY_ID: u8 = 8; +pub const RM_HEAP2_ID: u8 = 9; +pub const RM_HEAP_ID: u8 = 10; // from xlogreader.h pub const XLR_INFO_MASK: u8 = 0x0F; diff --git a/vendor/postgres b/vendor/postgres index 98cdb40a5e..87080ddc02 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 98cdb40a5ecfe6169be0695679efda83c2bd43c3 +Subproject commit 87080ddc02ad34881c4ccb5d45d903b6f86a6fa6