mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
Remove a bunch of dead code
Some of these were related to handling various WAL records that are not
related to any relations, like pg_multixact updates. These should have
been removed in the revert commit 6a9c036ac1, but I missed them.
Also, we didn't anything with commit/abort records. We will start
parsing commit/abort records in the next commit, but seems better to
add that from clean slate.
Reviewed-by: Konstantin Knizhnik
This commit is contained in:
@@ -282,11 +282,16 @@ pub fn save_decoded_record(
|
||||
{
|
||||
let truncate = XlSmgrTruncate::decode(&decoded);
|
||||
save_xlog_smgr_truncate(timeline, lsn, &truncate)?;
|
||||
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
|
||||
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE
|
||||
{
|
||||
let createdb = XlCreateDatabase::decode(&decoded);
|
||||
save_xlog_dbase_create(timeline, lsn, &createdb)?;
|
||||
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
|
||||
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE {
|
||||
let createdb = XlCreateDatabase::decode(&decoded);
|
||||
save_xlog_dbase_create(timeline, lsn, &createdb)?;
|
||||
} else {
|
||||
// TODO
|
||||
trace!("XLOG_DBASE_DROP is not handled yet");
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
|
||||
trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
|
||||
}
|
||||
|
||||
// Now that this record has been handled, let the repository know that
|
||||
|
||||
@@ -11,7 +11,6 @@ use postgres_ffi::XLogPageHeaderData;
|
||||
use postgres_ffi::XLogRecord;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::str;
|
||||
use thiserror::Error;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
@@ -19,9 +18,6 @@ pub type Oid = u32;
|
||||
pub type TransactionId = u32;
|
||||
pub type BlockNumber = u32;
|
||||
pub type OffsetNumber = u16;
|
||||
pub type MultiXactId = TransactionId;
|
||||
pub type MultiXactOffset = u32;
|
||||
pub type MultiXactStatus = u32;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct WalStreamDecoder {
|
||||
@@ -264,24 +260,6 @@ pub struct RelFileNode {
|
||||
pub relnode: Oid, /* relation */
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlRelmapUpdate {
|
||||
pub dbid: Oid, /* database ID, or 0 for shared map */
|
||||
pub tsid: Oid, /* database's tablespace, or pg_global */
|
||||
pub nbytes: i32, /* size of relmap data */
|
||||
}
|
||||
|
||||
impl XlRelmapUpdate {
|
||||
pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate {
|
||||
XlRelmapUpdate {
|
||||
dbid: buf.get_u32_le(),
|
||||
tsid: buf.get_u32_le(),
|
||||
nbytes: buf.get_i32_le(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlSmgrTruncate {
|
||||
@@ -404,74 +382,6 @@ impl XlHeapUpdate {
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct MultiXactMember {
|
||||
pub xid: TransactionId,
|
||||
pub status: MultiXactStatus,
|
||||
}
|
||||
|
||||
impl MultiXactMember {
|
||||
pub fn decode(buf: &mut Bytes) -> MultiXactMember {
|
||||
MultiXactMember {
|
||||
xid: buf.get_u32_le(),
|
||||
status: buf.get_u32_le(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlMultiXactCreate {
|
||||
pub mid: MultiXactId, /* new MultiXact's ID */
|
||||
pub moff: MultiXactOffset, /* its starting offset in members file */
|
||||
pub nmembers: u32, /* number of member XIDs */
|
||||
pub members: Vec<MultiXactMember>,
|
||||
}
|
||||
|
||||
impl XlMultiXactCreate {
|
||||
pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate {
|
||||
let mid = buf.get_u32_le();
|
||||
let moff = buf.get_u32_le();
|
||||
let nmembers = buf.get_u32_le();
|
||||
let mut members = Vec::new();
|
||||
for _ in 0..nmembers {
|
||||
members.push(MultiXactMember::decode(buf));
|
||||
}
|
||||
XlMultiXactCreate {
|
||||
mid,
|
||||
moff,
|
||||
nmembers,
|
||||
members,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlMultiXactTruncate {
|
||||
oldest_multi_db: Oid,
|
||||
/* to-be-truncated range of multixact offsets */
|
||||
start_trunc_off: MultiXactId, /* just for completeness' sake */
|
||||
end_trunc_off: MultiXactId,
|
||||
|
||||
/* to-be-truncated range of multixact members */
|
||||
start_trunc_memb: MultiXactOffset,
|
||||
end_trunc_memb: MultiXactOffset,
|
||||
}
|
||||
|
||||
impl XlMultiXactTruncate {
|
||||
pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate {
|
||||
XlMultiXactTruncate {
|
||||
oldest_multi_db: buf.get_u32_le(),
|
||||
start_trunc_off: buf.get_u32_le(),
|
||||
end_trunc_off: buf.get_u32_le(),
|
||||
start_trunc_memb: buf.get_u32_le(),
|
||||
end_trunc_memb: buf.get_u32_le(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Main routine to decode a WAL record and figure out which blocks are modified
|
||||
//
|
||||
// See xlogrecord.h for details
|
||||
@@ -719,125 +629,9 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
assert_eq!(buf.remaining(), main_data_len as usize);
|
||||
}
|
||||
|
||||
//5. Handle special XACT records
|
||||
if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
if info == pg_constants::XLOG_XACT_COMMIT {
|
||||
//parse commit record to extract subtrans entries
|
||||
// xl_xact_commit starts with time of commit
|
||||
let _xact_time = buf.get_i64_le();
|
||||
|
||||
let mut xinfo = 0;
|
||||
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
|
||||
xinfo = buf.get_u32_le();
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
|
||||
let _dbid = buf.get_u32_le();
|
||||
let _tsid = buf.get_u32_le();
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
|
||||
let nsubxacts = buf.get_i32_le();
|
||||
for _i in 0..nsubxacts {
|
||||
let _subxact = buf.get_u32_le();
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||
let nrels = buf.get_i32_le();
|
||||
for _i in 0..nrels {
|
||||
let spcnode = buf.get_u32_le();
|
||||
let dbnode = buf.get_u32_le();
|
||||
let relnode = buf.get_u32_le();
|
||||
//TODO handle this too?
|
||||
trace!(
|
||||
"XLOG_XACT_COMMIT relfilenode {}/{}/{}",
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode
|
||||
);
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
|
||||
let nmsgs = buf.get_i32_le();
|
||||
for _i in 0..nmsgs {
|
||||
let sizeof_shared_invalidation_message = 0;
|
||||
buf.advance(sizeof_shared_invalidation_message);
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||
let _xid = buf.get_u32_le();
|
||||
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
|
||||
//TODO handle this to be able to restore pg_twophase on node start
|
||||
}
|
||||
} else if info == pg_constants::XLOG_XACT_ABORT {
|
||||
//parse abort record to extract subtrans entries
|
||||
// xl_xact_abort starts with time of commit
|
||||
let _xact_time = buf.get_i64_le();
|
||||
|
||||
let mut xinfo = 0;
|
||||
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
|
||||
xinfo = buf.get_u32_le();
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
|
||||
let _dbid = buf.get_u32_le();
|
||||
let _tsid = buf.get_u32_le();
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
|
||||
let nsubxacts = buf.get_i32_le();
|
||||
for _i in 0..nsubxacts {
|
||||
let _subxact = buf.get_u32_le();
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
|
||||
let nrels = buf.get_i32_le();
|
||||
for _i in 0..nrels {
|
||||
let spcnode = buf.get_u32_le();
|
||||
let dbnode = buf.get_u32_le();
|
||||
let relnode = buf.get_u32_le();
|
||||
//TODO handle this too?
|
||||
trace!(
|
||||
"XLOG_XACT_ABORT relfilenode {}/{}/{}",
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode
|
||||
);
|
||||
}
|
||||
}
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 {
|
||||
let _xid = buf.get_u32_le();
|
||||
trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE");
|
||||
}
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID {
|
||||
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_DBASE_CREATE {
|
||||
//buf points to main_data
|
||||
let db_id = buf.get_u32_le();
|
||||
let tablespace_id = buf.get_u32_le();
|
||||
let src_db_id = buf.get_u32_le();
|
||||
let src_tablespace_id = buf.get_u32_le();
|
||||
trace!(
|
||||
"XLOG_DBASE_CREATE tablespace_id/db_id {}/{} src_db_id {}/{}",
|
||||
tablespace_id,
|
||||
db_id,
|
||||
src_tablespace_id,
|
||||
src_db_id
|
||||
);
|
||||
// in postgres it is implemented as copydir
|
||||
// we need to copy all pages in page_cache
|
||||
} else {
|
||||
trace!("XLOG_DBASE_DROP is not handled yet");
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_TBLSPC_ID {
|
||||
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_TBLSPC_CREATE {
|
||||
//buf points to main_data
|
||||
let ts_id = buf.get_u32_le();
|
||||
let ts_path = str::from_utf8(&buf).unwrap();
|
||||
trace!("XLOG_TBLSPC_CREATE ts_id {} ts_path {}", ts_id, ts_path);
|
||||
} else {
|
||||
trace!("XLOG_TBLSPC_DROP is not handled yet");
|
||||
}
|
||||
} else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
// 5. Handle a few special record types that modify blocks without registering
|
||||
// them with the standard mechanism.
|
||||
if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID {
|
||||
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32;
|
||||
if info == pg_constants::XLOG_HEAP_INSERT {
|
||||
|
||||
@@ -33,33 +33,11 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24;
|
||||
pub const BITS_PER_HEAPBLOCK: u16 = 2;
|
||||
pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK;
|
||||
|
||||
pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01;
|
||||
pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02;
|
||||
pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03;
|
||||
|
||||
// From xact.h
|
||||
pub const XLOG_XACT_COMMIT: u8 = 0x00;
|
||||
pub const XLOG_XACT_PREPARE: u8 = 0x10;
|
||||
pub const XLOG_XACT_ABORT: u8 = 0x20;
|
||||
|
||||
/* mask for filtering opcodes out of xl_info */
|
||||
pub const XLOG_XACT_OPMASK: u8 = 0x70;
|
||||
/* does this record have a 'xinfo' field or not */
|
||||
pub const XLOG_XACT_HAS_INFO: u8 = 0x80;
|
||||
|
||||
/*
|
||||
* The following flags, stored in xinfo, determine which information is
|
||||
* contained in commit/abort records.
|
||||
*/
|
||||
pub const XACT_XINFO_HAS_DBINFO: u32 = 1u32 << 0;
|
||||
pub const XACT_XINFO_HAS_SUBXACTS: u32 = 1u32 << 1;
|
||||
pub const XACT_XINFO_HAS_RELFILENODES: u32 = 1u32 << 2;
|
||||
pub const XACT_XINFO_HAS_INVALS: u32 = 1u32 << 3;
|
||||
pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4;
|
||||
// pub const XACT_XINFO_HAS_ORIGIN: u32 = 1u32 << 5;
|
||||
// pub const XACT_XINFO_HAS_AE_LOCKS: u32 = 1u32 << 6;
|
||||
// pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7;
|
||||
|
||||
// From pg_control.h and rmgrlist.h
|
||||
pub const XLOG_SWITCH: u8 = 0x40;
|
||||
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
|
||||
|
||||
Reference in New Issue
Block a user