mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
Handle visbility map updates in WAL redo
This commit is contained in:
@@ -1,12 +1,11 @@
|
||||
use crate::ZTimelineId;
|
||||
use log::*;
|
||||
use postgres_ffi::FilePathError;
|
||||
use regex::Regex;
|
||||
use std::fmt;
|
||||
use std::io::Write;
|
||||
use tar::Builder;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::ZTimelineId;
|
||||
|
||||
pub fn send_snapshot_tarball(
|
||||
write: &mut dyn Write,
|
||||
timelineid: ZTimelineId,
|
||||
@@ -85,45 +84,7 @@ pub fn send_snapshot_tarball(
|
||||
// <oid>.<segment number>
|
||||
// <oid>_<fork name>.<segment number>
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FilePathError {
|
||||
msg: String,
|
||||
}
|
||||
|
||||
impl FilePathError {
|
||||
fn new(msg: &str) -> FilePathError {
|
||||
FilePathError {
|
||||
msg: msg.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<core::num::ParseIntError> for FilePathError {
|
||||
fn from(e: core::num::ParseIntError) -> Self {
|
||||
return FilePathError {
|
||||
msg: format!("invalid filename: {}", e),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for FilePathError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "invalid filename")
|
||||
}
|
||||
}
|
||||
|
||||
fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
|
||||
match forkname {
|
||||
// "main" is not in filenames, it's implicit if the fork name is not present
|
||||
None => Ok(0),
|
||||
Some("fsm") => Ok(1),
|
||||
Some("vm") => Ok(2),
|
||||
Some("init") => Ok(3),
|
||||
Some(_) => Err(FilePathError::new("invalid forkname")),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_filename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
||||
fn parse_filename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
|
||||
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap();
|
||||
|
||||
let caps = re
|
||||
@@ -134,7 +95,7 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
||||
let relnode = u32::from_str_radix(relnode_str, 10)?;
|
||||
|
||||
let forkname = caps.name("forkname").map(|f| f.as_str());
|
||||
let forknum = forkname_to_forknum(forkname)?;
|
||||
let forknum = postgres_ffi::forkname_to_forknum(forkname)?;
|
||||
|
||||
let segno_match = caps.name("segno");
|
||||
let segno = if segno_match.is_none() {
|
||||
|
||||
@@ -12,10 +12,8 @@
|
||||
|
||||
use log::*;
|
||||
use regex::Regex;
|
||||
use std::fmt;
|
||||
|
||||
use std::cmp::max;
|
||||
use std::error::Error;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
@@ -35,6 +33,7 @@ use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
use postgres_ffi::FilePathError;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
// From pg_tablespace_d.h
|
||||
@@ -210,8 +209,7 @@ fn restore_relfile(
|
||||
let mut file = File::open(path)?;
|
||||
let mut buf: [u8; 8192] = [0u8; 8192];
|
||||
|
||||
// FIXME: use constants (BLCKSZ)
|
||||
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / 8192);
|
||||
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
|
||||
loop {
|
||||
let r = file.read_exact(&mut buf);
|
||||
match r {
|
||||
@@ -221,7 +219,7 @@ fn restore_relfile(
|
||||
spcnode: spcoid,
|
||||
dbnode: dboid,
|
||||
relnode,
|
||||
forknum: forknum as u8,
|
||||
forknum,
|
||||
},
|
||||
blknum,
|
||||
};
|
||||
@@ -257,7 +255,7 @@ fn restore_nonrelfile(
|
||||
pcache: &PageCache,
|
||||
_timeline: ZTimelineId,
|
||||
snapshot: &str,
|
||||
forknum: u32,
|
||||
forknum: u8,
|
||||
path: &Path,
|
||||
) -> Result<()> {
|
||||
let lsn = Lsn::from_hex(snapshot)?;
|
||||
@@ -268,7 +266,6 @@ fn restore_nonrelfile(
|
||||
let mut buf: [u8; 8192] = [0u8; 8192];
|
||||
let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?;
|
||||
|
||||
// FIXME: use constants (BLCKSZ)
|
||||
let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
loop {
|
||||
let r = file.read_exact(&mut buf);
|
||||
@@ -279,7 +276,7 @@ fn restore_nonrelfile(
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
forknum: forknum as u8,
|
||||
forknum,
|
||||
},
|
||||
blknum,
|
||||
};
|
||||
@@ -411,55 +408,12 @@ fn restore_wal(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct FilePathError {
|
||||
msg: String,
|
||||
}
|
||||
|
||||
impl Error for FilePathError {
|
||||
fn description(&self) -> &str {
|
||||
&self.msg
|
||||
}
|
||||
}
|
||||
impl FilePathError {
|
||||
fn new(msg: &str) -> FilePathError {
|
||||
FilePathError {
|
||||
msg: msg.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<core::num::ParseIntError> for FilePathError {
|
||||
fn from(e: core::num::ParseIntError) -> Self {
|
||||
return FilePathError {
|
||||
msg: format!("invalid filename: {}", e),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for FilePathError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "invalid filename")
|
||||
}
|
||||
}
|
||||
|
||||
fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
|
||||
match forkname {
|
||||
// "main" is not in filenames, it's implicit if the fork name is not present
|
||||
None => Ok(0),
|
||||
Some("fsm") => Ok(1),
|
||||
Some("vm") => Ok(2),
|
||||
Some("init") => Ok(3),
|
||||
Some(_) => Err(FilePathError::new("invalid forkname")),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ParsedBaseImageFileName {
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
pub relnode: u32,
|
||||
pub forknum: u32,
|
||||
pub forknum: u8,
|
||||
pub segno: u32,
|
||||
|
||||
pub lsn: u64,
|
||||
@@ -471,7 +425,7 @@ struct ParsedBaseImageFileName {
|
||||
// <oid>.<segment number>
|
||||
// <oid>_<fork name>.<segment number>
|
||||
|
||||
fn parse_relfilename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
||||
fn parse_relfilename(fname: &str) -> Result<(u32, u8, u32), FilePathError> {
|
||||
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?$").unwrap();
|
||||
|
||||
let caps = re
|
||||
@@ -482,7 +436,7 @@ fn parse_relfilename(fname: &str) -> Result<(u32, u32, u32), FilePathError> {
|
||||
let relnode = u32::from_str_radix(relnode_str, 10)?;
|
||||
|
||||
let forkname = caps.name("forkname").map(|f| f.as_str());
|
||||
let forknum = forkname_to_forknum(forkname)?;
|
||||
let forknum = postgres_ffi::forkname_to_forknum(forkname)?;
|
||||
|
||||
let segno_match = caps.name("segno");
|
||||
let segno = if segno_match.is_none() {
|
||||
|
||||
@@ -133,50 +133,12 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> {
|
||||
const DEFAULTTABLESPACE_OID: u32 = 1663;
|
||||
const GLOBALTABLESPACE_OID: u32 = 1664;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FilePathError {
|
||||
msg: String,
|
||||
}
|
||||
|
||||
impl FilePathError {
|
||||
fn new(msg: &str) -> FilePathError {
|
||||
FilePathError {
|
||||
msg: msg.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<core::num::ParseIntError> for FilePathError {
|
||||
fn from(e: core::num::ParseIntError) -> Self {
|
||||
return FilePathError {
|
||||
msg: format!("invalid filename: {}", e),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for FilePathError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "invalid filename")
|
||||
}
|
||||
}
|
||||
|
||||
fn forkname_to_forknum(forkname: Option<&str>) -> Result<u32, FilePathError> {
|
||||
match forkname {
|
||||
// "main" is not in filenames, it's implicit if the fork name is not present
|
||||
None => Ok(0),
|
||||
Some("fsm") => Ok(1),
|
||||
Some("vm") => Ok(2),
|
||||
Some("init") => Ok(3),
|
||||
Some(_) => Err(FilePathError::new("invalid forkname")),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ParsedBaseImageFileName {
|
||||
pub spcnode: u32,
|
||||
pub dbnode: u32,
|
||||
pub relnode: u32,
|
||||
pub forknum: u32,
|
||||
pub forknum: u8,
|
||||
pub segno: u32,
|
||||
|
||||
pub lsn: u64,
|
||||
@@ -188,7 +150,7 @@ struct ParsedBaseImageFileName {
|
||||
// <oid>.<segment number>
|
||||
// <oid>_<fork name>.<segment number>
|
||||
|
||||
fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> {
|
||||
fn parse_filename(fname: &str) -> Result<(u32, u8, u32, u64), FilePathError> {
|
||||
let re = Regex::new(r"^(?P<relnode>\d+)(_(?P<forkname>[a-z]+))?(\.(?P<segno>\d+))?_(?P<lsnhi>[[:xdigit:]]{8})(?P<lsnlo>[[:xdigit:]]{8})$").unwrap();
|
||||
|
||||
let caps = re
|
||||
@@ -294,8 +256,7 @@ async fn slurp_base_file(
|
||||
|
||||
let mut bytes = BytesMut::from(data.as_slice()).freeze();
|
||||
|
||||
// FIXME: use constants (BLCKSZ)
|
||||
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192);
|
||||
let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / pg_constants::BLCKSZ as u32);
|
||||
|
||||
let pcache = page_cache::get_pagecache(conf, sys_id);
|
||||
|
||||
@@ -305,7 +266,7 @@ async fn slurp_base_file(
|
||||
spcnode: parsed.spcnode,
|
||||
dbnode: parsed.dbnode,
|
||||
relnode: parsed.relnode,
|
||||
forknum: parsed.forknum as u8,
|
||||
forknum: parsed.forknum,
|
||||
},
|
||||
blknum,
|
||||
};
|
||||
|
||||
@@ -307,10 +307,9 @@ pub struct DecodedWALRecord {
|
||||
}
|
||||
|
||||
pub type Oid = u32;
|
||||
pub type TransactionId = u32;
|
||||
pub type BlockNumber = u32;
|
||||
|
||||
pub const MAIN_FORKNUM: u8 = 0;
|
||||
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
|
||||
pub type OffsetNumber = u16;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -366,6 +365,82 @@ impl XlCreateDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlHeapInsert {
|
||||
pub offnum: OffsetNumber,
|
||||
pub flags: u8,
|
||||
}
|
||||
|
||||
impl XlHeapInsert {
|
||||
pub fn decode(buf: &mut Bytes) -> XlHeapInsert {
|
||||
XlHeapInsert {
|
||||
offnum: buf.get_u16_le(),
|
||||
flags: buf.get_u8(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlHeapMultiInsert {
|
||||
pub flags: u8,
|
||||
pub ntuples: u16,
|
||||
}
|
||||
|
||||
impl XlHeapMultiInsert {
|
||||
pub fn decode(buf: &mut Bytes) -> XlHeapMultiInsert {
|
||||
XlHeapMultiInsert {
|
||||
flags: buf.get_u8(),
|
||||
ntuples: buf.get_u16_le(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlHeapDelete {
|
||||
pub xmax: TransactionId,
|
||||
pub offnum: OffsetNumber,
|
||||
pub infobits_set: u8,
|
||||
pub flags: u8,
|
||||
}
|
||||
|
||||
impl XlHeapDelete {
|
||||
pub fn decode(buf: &mut Bytes) -> XlHeapDelete {
|
||||
XlHeapDelete {
|
||||
xmax: buf.get_u32_le(),
|
||||
offnum: buf.get_u16_le(),
|
||||
infobits_set: buf.get_u8(),
|
||||
flags: buf.get_u8(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlHeapUpdate {
|
||||
pub old_xmax: TransactionId,
|
||||
pub old_offnum: OffsetNumber,
|
||||
pub old_infobits_set: u8,
|
||||
pub flags: u8,
|
||||
pub new_xmax: TransactionId,
|
||||
pub new_offnum: OffsetNumber,
|
||||
}
|
||||
|
||||
impl XlHeapUpdate {
|
||||
pub fn decode(buf: &mut Bytes) -> XlHeapUpdate {
|
||||
XlHeapUpdate {
|
||||
old_xmax: buf.get_u32_le(),
|
||||
old_offnum: buf.get_u16_le(),
|
||||
old_infobits_set: buf.get_u8(),
|
||||
flags: buf.get_u8(),
|
||||
new_xmax: buf.get_u32_le(),
|
||||
new_offnum: buf.get_u16_le(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Routines to decode a WAL record and figure out which blocks are modified
|
||||
//
|
||||
@@ -617,7 +692,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
//5. Handle special CLOG and XACT records
|
||||
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = buf.get_i32_le() as u32;
|
||||
blk.will_init = true;
|
||||
trace!("RM_CLOG_ID updates block {}", blk.blkno);
|
||||
@@ -626,7 +701,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
@@ -659,7 +734,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
if prev_blkno != blkno {
|
||||
prev_blkno = blkno;
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blocks.push(blk);
|
||||
}
|
||||
@@ -694,7 +769,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
trace!(
|
||||
"XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}",
|
||||
@@ -726,7 +801,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
if prev_blkno != blkno {
|
||||
prev_blkno = blkno;
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM as u8;
|
||||
blk.forknum = pg_constants::PG_XACT_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blocks.push(blk);
|
||||
}
|
||||
@@ -782,6 +857,79 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
} else {
|
||||
trace!("XLOG_TBLSPC_DROP is not handled yet");
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
let xlrec = XlHeapInsert::decode(&mut buf);
|
||||
if (xlrec.flags
|
||||
& (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED
|
||||
| pg_constants::XLH_INSERT_ALL_FROZEN_SET))
|
||||
!= 0
|
||||
{
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blk.rnode_spcnode = blocks[0].rnode_spcnode;
|
||||
blk.rnode_dbnode = blocks[0].rnode_dbnode;
|
||||
blk.rnode_relnode = blocks[0].rnode_relnode;
|
||||
blocks.push(blk);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_DELETE {
|
||||
let xlrec = XlHeapDelete::decode(&mut buf);
|
||||
if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blk.rnode_spcnode = blocks[0].rnode_spcnode;
|
||||
blk.rnode_dbnode = blocks[0].rnode_dbnode;
|
||||
blk.rnode_relnode = blocks[0].rnode_relnode;
|
||||
blocks.push(blk);
|
||||
}
|
||||
} else if info == pg_constants::XLOG_HEAP_UPDATE
|
||||
|| info == pg_constants::XLOG_HEAP_HOT_UPDATE
|
||||
{
|
||||
let xlrec = XlHeapUpdate::decode(&mut buf);
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blk.rnode_spcnode = blocks[0].rnode_spcnode;
|
||||
blk.rnode_dbnode = blocks[0].rnode_dbnode;
|
||||
blk.rnode_relnode = blocks[0].rnode_relnode;
|
||||
blocks.push(blk);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0
|
||||
&& blocks.len() > 1
|
||||
{
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||
blk.blkno = blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
|
||||
blk.rnode_spcnode = blocks[1].rnode_spcnode;
|
||||
blk.rnode_dbnode = blocks[1].rnode_dbnode;
|
||||
blk.rnode_relnode = blocks[1].rnode_relnode;
|
||||
blocks.push(blk);
|
||||
}
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
if info == pg_constants::XLOG_HEAP2_MULTI_INSERT {
|
||||
let xlrec = XlHeapMultiInsert::decode(&mut buf);
|
||||
if (xlrec.flags
|
||||
& (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED
|
||||
| pg_constants::XLH_INSERT_ALL_FROZEN_SET))
|
||||
!= 0
|
||||
{
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
|
||||
blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM;
|
||||
blk.blkno = blkno;
|
||||
blk.rnode_spcnode = blocks[0].rnode_spcnode;
|
||||
blk.rnode_dbnode = blocks[0].rnode_dbnode;
|
||||
blk.rnode_relnode = blocks[0].rnode_relnode;
|
||||
blocks.push(blk);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DecodedWALRecord {
|
||||
|
||||
@@ -246,13 +246,13 @@ fn walreceiver_main(
|
||||
== pg_constants::XLOG_SMGR_TRUNCATE
|
||||
{
|
||||
let truncate = XlSmgrTruncate::decode(&decoded);
|
||||
if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 {
|
||||
if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
|
||||
let tag = BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode: truncate.rnode.spcnode,
|
||||
dbnode: truncate.rnode.dbnode,
|
||||
relnode: truncate.rnode.relnode,
|
||||
forknum: MAIN_FORKNUM,
|
||||
forknum: pg_constants::MAIN_FORKNUM,
|
||||
},
|
||||
blknum: truncate.blkno,
|
||||
};
|
||||
|
||||
@@ -226,7 +226,7 @@ impl WalRedoManagerInternal {
|
||||
let start = Instant::now();
|
||||
|
||||
let apply_result: Result<Bytes, Error>;
|
||||
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
|
||||
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM {
|
||||
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||
let mut page = BytesMut::new();
|
||||
if let Some(fpi) = base_img {
|
||||
|
||||
@@ -7,6 +7,8 @@ pub mod pg_constants;
|
||||
pub mod xlog_utils;
|
||||
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
|
||||
// sizeof(ControlFileData)
|
||||
const SIZEOF_CONTROLDATA: usize = std::mem::size_of::<ControlFileData>();
|
||||
@@ -49,6 +51,50 @@ pub fn decode_pg_control(mut buf: Bytes) -> Result<ControlFileData, anyhow::Erro
|
||||
Ok(controlfile)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FilePathError {
|
||||
msg: String,
|
||||
}
|
||||
|
||||
impl Error for FilePathError {
|
||||
fn description(&self) -> &str {
|
||||
&self.msg
|
||||
}
|
||||
}
|
||||
|
||||
impl FilePathError {
|
||||
pub fn new(msg: &str) -> FilePathError {
|
||||
FilePathError {
|
||||
msg: msg.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<core::num::ParseIntError> for FilePathError {
|
||||
fn from(e: core::num::ParseIntError) -> Self {
|
||||
return FilePathError {
|
||||
msg: format!("invalid filename: {}", e),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for FilePathError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "invalid filename")
|
||||
}
|
||||
}
|
||||
|
||||
pub fn forkname_to_forknum(forkname: Option<&str>) -> Result<u8, FilePathError> {
|
||||
match forkname {
|
||||
// "main" is not in filenames, it's implicit if the fork name is not present
|
||||
None => Ok(pg_constants::MAIN_FORKNUM),
|
||||
Some("fsm") => Ok(pg_constants::FSM_FORKNUM),
|
||||
Some("vm") => Ok(pg_constants::VISIBILITYMAP_FORKNUM),
|
||||
Some("init") => Ok(pg_constants::INIT_FORKNUM),
|
||||
Some(_) => Err(FilePathError::new("invalid forkname")),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes {
|
||||
let b: [u8; SIZEOF_CONTROLDATA];
|
||||
|
||||
|
||||
@@ -2,13 +2,17 @@
|
||||
//
|
||||
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
|
||||
pub const GLOBALTABLESPACE_OID: u32 = 1664;
|
||||
//Special values for non-rel files' tags
|
||||
//TODO maybe use enum?
|
||||
pub const PG_CONTROLFILE_FORKNUM: u32 = 42;
|
||||
pub const PG_FILENODEMAP_FORKNUM: u32 = 43;
|
||||
pub const PG_XACT_FORKNUM: u32 = 44;
|
||||
pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45;
|
||||
pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46;
|
||||
pub const MAIN_FORKNUM: u8 = 0;
|
||||
pub const FSM_FORKNUM: u8 = 1;
|
||||
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
|
||||
pub const INIT_FORKNUM: u8 = 3;
|
||||
//Special values for non-rel files' tags
|
||||
pub const PG_CONTROLFILE_FORKNUM: u8 = 42;
|
||||
pub const PG_FILENODEMAP_FORKNUM: u8 = 43;
|
||||
pub const PG_XACT_FORKNUM: u8 = 44;
|
||||
pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45;
|
||||
pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46;
|
||||
|
||||
//
|
||||
// constants from clog.h
|
||||
@@ -18,6 +22,13 @@ pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE;
|
||||
pub const CLOG_BITS_PER_XACT: u8 = 2;
|
||||
pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1;
|
||||
|
||||
//
|
||||
// Constants from visbilitymap.h
|
||||
//
|
||||
pub const SIZE_OF_PAGE_HEADER: u16 = 24;
|
||||
pub const BITS_PER_HEAPBLOCK: u16 = 2;
|
||||
pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK;
|
||||
|
||||
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
|
||||
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
|
||||
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
|
||||
@@ -53,13 +64,32 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
|
||||
// From pg_control.h and rmgrlist.h
|
||||
pub const XLOG_SWITCH: u8 = 0x40;
|
||||
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
|
||||
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
|
||||
|
||||
// From heapam_xlog.h
|
||||
pub const XLOG_HEAP_INSERT: u8 = 0x00;
|
||||
pub const XLOG_HEAP_DELETE: u8 = 0x10;
|
||||
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
|
||||
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
|
||||
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
|
||||
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
|
||||
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
|
||||
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||
pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||
pub const XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED: u8 = (1 << 1) as u8;
|
||||
pub const XLH_DELETE_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
|
||||
|
||||
pub const RM_XLOG_ID: u8 = 0;
|
||||
pub const RM_XACT_ID: u8 = 1;
|
||||
pub const RM_SMGR_ID: u8 = 2;
|
||||
pub const RM_CLOG_ID: u8 = 3;
|
||||
pub const RM_DBASE_ID: u8 = 4;
|
||||
pub const RM_TBLSPC_ID: u8 = 5;
|
||||
// pub const RM_MULTIXACT_ID:u8 = 6;
|
||||
pub const RM_MULTIXACT_ID: u8 = 6;
|
||||
pub const RM_RELMAP_ID: u8 = 7;
|
||||
pub const RM_STANDBY_ID: u8 = 8;
|
||||
pub const RM_HEAP2_ID: u8 = 9;
|
||||
pub const RM_HEAP_ID: u8 = 10;
|
||||
|
||||
// from xlogreader.h
|
||||
pub const XLR_INFO_MASK: u8 = 0x0F;
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: c2409039d6...87080ddc02
Reference in New Issue
Block a user