Handle visbility map updates in WAL redo

This commit is contained in:
Konstantin Knizhnik
2021-05-12 10:08:13 +03:00
parent 372617a4f5
commit 22e7fcbf2d
8 changed files with 248 additions and 26 deletions

View File

@@ -1,4 +1,6 @@
use crate::ZTimelineId;
use log::*;
use postgres_ffi::FilePathError;
use std::io::Write;
use tar::Builder;
use walkdir::WalkDir;

View File

@@ -11,7 +11,6 @@
//
use log::*;
use std::cmp::max;
use std::fs;
use std::fs::File;
@@ -30,6 +29,7 @@ use crate::ZTimelineId;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::FilePathError;
use zenith_utils::lsn::Lsn;
///
@@ -199,8 +199,7 @@ fn restore_relfile(
let mut file = File::open(path)?;
let mut buf: [u8; 8192] = [0u8; 8192];
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / 8192);
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
loop {
let r = file.read_exact(&mut buf);
match r {
@@ -210,7 +209,7 @@ fn restore_relfile(
spcnode: spcoid,
dbnode: dboid,
relnode,
forknum: forknum as u8,
forknum,
},
blknum,
};
@@ -246,7 +245,7 @@ fn restore_nonrelfile(
timeline: &dyn Timeline,
_timelineid: ZTimelineId,
snapshot: &str,
forknum: u32,
forknum: u8,
path: &Path,
) -> Result<()> {
let lsn = Lsn::from_hex(snapshot)?;
@@ -257,7 +256,6 @@ fn restore_nonrelfile(
let mut buf: [u8; 8192] = [0u8; 8192];
let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?;
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
loop {
let r = file.read_exact(&mut buf);
@@ -268,7 +266,7 @@ fn restore_nonrelfile(
spcnode: 0,
dbnode: 0,
relnode: 0,
forknum: forknum as u8,
forknum,
},
blknum,
};

View File

@@ -134,7 +134,7 @@ struct ParsedBaseImageFileName {
pub spcnode: u32,
pub dbnode: u32,
pub relnode: u32,
pub forknum: u32,
pub forknum: u8,
pub segno: u32,
pub lsn: u64,
@@ -146,7 +146,7 @@ struct ParsedBaseImageFileName {
// <oid>.<segment number>
// <oid>_<fork name>.<segment number>
fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
fn parse_filename(fname: &str) -> Result<(u32, u8, u32, u64), FilePathError> {
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?_(?P<lsnhi>[[:xdigit:]]{8})(?P<lsnlo>[[:xdigit:]]{8})$").unwrap();
let caps = re
@@ -252,8 +252,7 @@ async fn slurp_base_file(
let mut bytes = BytesMut::from(data.as_slice()).freeze();
// FIXME: use constants (BLCKSZ)
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
let pcache = page_cache::get_pagecache(conf, sys_id);
@@ -263,7 +262,7 @@ async fn slurp_base_file(
spcnode: parsed.spcnode,
dbnode: parsed.dbnode,
relnode: parsed.relnode,
forknum: parsed.forknum as u8,
forknum: parsed.forknum,
},
blknum,
};

View File

@@ -307,7 +307,9 @@ pub struct DecodedWALRecord {
}
pub type Oid = u32;
pub type TransactionId = u32;
pub type BlockNumber = u32;
pub type OffsetNumber = u16;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
@@ -363,6 +365,82 @@ impl XlCreateDatabase {
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapInsert {
pub offnum: OffsetNumber,
pub flags: u8,
}
impl XlHeapInsert {
pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
XlHeapInsert {
offnum: buf.get_u16_le(),
flags: buf.get_u8(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapMultiInsert {
pub flags: u8,
pub ntuples: u16,
}
impl XlHeapMultiInsert {
pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
XlHeapMultiInsert {
flags: buf.get_u8(),
ntuples: buf.get_u16_le(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapDelete {
pub xmax: TransactionId,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapDelete {
pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
XlHeapDelete {
xmax: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapUpdate {
pub old_xmax: TransactionId,
pub old_offnum: OffsetNumber,
pub old_infobits_set: u8,
pub flags: u8,
pub new_xmax: TransactionId,
pub new_offnum: OffsetNumber,
}
impl XlHeapUpdate {
pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
XlHeapUpdate {
old_xmax: buf.get_u32_le(),
old_offnum: buf.get_u16_le(),
old_infobits_set: buf.get_u8(),
flags: buf.get_u8(),
new_xmax: buf.get_u32_le(),
new_offnum: buf.get_u16_le(),
}
}
}
//
// Routines to decode a WAL record and figure out which blocks are modified
//
@@ -614,7 +692,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
//5. Handle special CLOG and XACT records
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = buf.get_i32_le() as u32;
blk.will_init = true;
trace!("RM_CLOG_ID updates block {}", blk.blkno);
@@ -623,7 +701,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_XACT_COMMIT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
@@ -656,7 +734,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
if prev_blkno != blkno {
prev_blkno = blkno;
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = blkno;
blocks.push(blk);
}
@@ -691,7 +769,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
} else if info == pg_constants::XLOG_XACT_ABORT {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
trace!(
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
@@ -723,7 +801,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
if prev_blkno != blkno {
prev_blkno = blkno;
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
blk.forknum = pg_constants::PG_XACT_FORKNUM;
blk.blkno = blkno;
blocks.push(blk);
}
@@ -779,6 +857,79 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
} else {
trace!("XLOG_TBLSPC_DROP is not handled yet");
}
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
if info == pg_constants::XLOG_HEAP_INSERT {
let xlrec = XlHeapInsert::decode(&mut buf);
if (xlrec.flags
& (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED
| pg_constants::XLH_INSERT_ALL_FROZEN_SET))
!= 0
{
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blkno;
blk.rnode_spcnode = blocks[0].rnode_spcnode;
blk.rnode_dbnode = blocks[0].rnode_dbnode;
blk.rnode_relnode = blocks[0].rnode_relnode;
blocks.push(blk);
}
} else if info == pg_constants::XLOG_HEAP_DELETE {
let xlrec = XlHeapDelete::decode(&mut buf);
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blkno;
blk.rnode_spcnode = blocks[0].rnode_spcnode;
blk.rnode_dbnode = blocks[0].rnode_dbnode;
blk.rnode_relnode = blocks[0].rnode_relnode;
blocks.push(blk);
}
} else if info == pg_constants::XLOG_HEAP_UPDATE
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
{
let xlrec = XlHeapUpdate::decode(&mut buf);
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blkno;
blk.rnode_spcnode = blocks[0].rnode_spcnode;
blk.rnode_dbnode = blocks[0].rnode_dbnode;
blk.rnode_relnode = blocks[0].rnode_relnode;
blocks.push(blk);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0
&& blocks.len() > 1
{
let mut blk = DecodedBkpBlock::new();
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
blk.rnode_spcnode = blocks[1].rnode_spcnode;
blk.rnode_dbnode = blocks[1].rnode_dbnode;
blk.rnode_relnode = blocks[1].rnode_relnode;
blocks.push(blk);
}
}
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID {
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
let xlrec = XlHeapMultiInsert::decode(&mut buf);
if (xlrec.flags
& (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED
| pg_constants::XLH_INSERT_ALL_FROZEN_SET))
!= 0
{
let mut blk = DecodedBkpBlock::new();
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
blk.blkno = blkno;
blk.rnode_spcnode = blocks[0].rnode_spcnode;
blk.rnode_dbnode = blocks[0].rnode_dbnode;
blk.rnode_relnode = blocks[0].rnode_relnode;
blocks.push(blk);
}
}
}
DecodedWALRecord {

View File

@@ -229,7 +229,7 @@ impl WalRedoManagerInternal {
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM {
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let mut page = BytesMut::new();
if let Some(fpi) = base_img {

View File

@@ -8,6 +8,8 @@ pub mod relfile_utils;
pub mod xlog_utils;
use bytes::{Buf, Bytes, BytesMut};
use std::error::Error;
use std::fmt;
// sizeof(ControlFileData)
const SIZEOF_CONTROLDATA: usize = std::mem::size_of::<ControlFileData>();
@@ -50,6 +52,50 @@ pub fn decode_pg_control(mut buf: Bytes) -> Result<ControlFileData, anyhow::Erro
Ok(controlfile)
}
#[derive(Debug, Clone)]
pub struct FilePathError {
msg: String,
}
impl Error for FilePathError {
fn description(&self) -> &str {
&self.msg
}
}
impl FilePathError {
pub fn new(msg: &str) -> FilePathError {
FilePathError {
msg: msg.to_string(),
}
}
}
impl From<core::num::ParseIntError> for FilePathError {
fn from(e: core::num::ParseIntError) -> Self {
return FilePathError {
msg: format!("invalid filename: {}", e),
};
}
}
impl fmt::Display for FilePathError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "invalid filename")
}
}
pub fn forkname_to_forknum(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(pg_constants::MAIN_FORKNUM),
Some("fsm") => Ok(pg_constants::FSM_FORKNUM),
Some("vm") => Ok(pg_constants::VISIBILITYMAP_FORKNUM),
Some("init") => Ok(pg_constants::INIT_FORKNUM),
Some(_) => Err(FilePathError::new("invalid forkname")),
}
}
pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes {
let b: [u8; SIZEOF_CONTROLDATA];

View File

@@ -15,13 +15,13 @@ pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
// Special values for non-rel files' tags (Zenith-specific)
pub const PG_CONTROLFILE_FORKNUM: u32 = 42;
pub const PG_FILENODEMAP_FORKNUM: u32 = 43;
pub const PG_XACT_FORKNUM: u32 = 44;
pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45;
pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46;
//Special values for non-rel files' tags
pub const PG_CONTROLFILE_FORKNUM: u8 = 42;
pub const PG_FILENODEMAP_FORKNUM: u8 = 43;
pub const PG_XACT_FORKNUM: u8 = 44;
pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45;
pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46;
// From storage_xlog.h
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
@@ -34,6 +34,13 @@ pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE;
pub const CLOG_BITS_PER_XACT: u8 = 2;
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
//
// Constants from visbilitymap.h
//
pub const SIZE_OF_PAGE_HEADER: u16 = 24;
pub const BITS_PER_HEAPBLOCK: u16 = 2;
pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK;
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
@@ -69,13 +76,32 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
// From pg_control.h and rmgrlist.h
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
// From heapam_xlog.h
pub const XLOG_HEAP_INSERT: u8 = 0x00;
pub const XLOG_HEAP_DELETE: u8 = 0x10;
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
pub const XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED: u8 = (1 << 1) as u8;
pub const XLH_DELETE_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
pub const RM_XLOG_ID: u8 = 0;
pub const RM_XACT_ID: u8 = 1;
pub const RM_SMGR_ID: u8 = 2;
pub const RM_CLOG_ID: u8 = 3;
pub const RM_DBASE_ID: u8 = 4;
pub const RM_TBLSPC_ID: u8 = 5;
// pub const RM_MULTIXACT_ID:u8 = 6;
pub const RM_MULTIXACT_ID: u8 = 6;
pub const RM_RELMAP_ID: u8 = 7;
pub const RM_STANDBY_ID: u8 = 8;
pub const RM_HEAP2_ID: u8 = 9;
pub const RM_HEAP_ID: u8 = 10;
// from xlogreader.h
pub const XLR_INFO_MASK: u8 = 0x0F;