Fix review issues:

- Add comments
- Handle members multixact wraparoud
- Extract WAL segment creation (pg_resetwal) to postgres_ffi
- Fix adding prepared 2PC files to basebackup
This commit is contained in:
Konstantin Knizhnik
2021-06-07 14:45:11 +03:00
parent e7587ceb81
commit abb114decd
9 changed files with 199 additions and 76 deletions

View File

@@ -17,8 +17,8 @@ use std::time::SystemTime;
use tar::{Builder, Header};
use walkdir::WalkDir;
use crate::repository::{DatabaseTag, ObjectTag, Timeline};
use crc32c::*;
use crate::repository::{DatabaseTag, ObjectTag, SlruBufferTag, Timeline};
use postgres_ffi::nonrelfile_utils::*;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::xlog_utils::*;
use postgres_ffi::*;
@@ -99,6 +99,9 @@ impl<'a> Basebackup<'a> {
}
// Generate non-relational files.
// Iteration is sorted order: all objects of the same time are grouped and traversed
// in key ascending order. For example all pg_xact records precede pg_multixact records and are sorted by block number.
// It allows to easily construct SLRU segments (32 blocks).
for obj in self.timeline.list_nonrels(self.lsn)? {
match obj {
ObjectTag::Clog(slru) =>
@@ -114,7 +117,7 @@ impl<'a> Basebackup<'a> {
_ => {}
}
}
self.finish_slru_segment()?; // write last non-completed Slru segment (if any)
self.finish_slru_segment()?; // write last non-completed SLRU segment (if any)
self.add_pgcontrol_file()?;
self.ar.finish()?;
debug!("all tarred up!");
@@ -122,7 +125,8 @@ impl<'a> Basebackup<'a> {
}
//
// Generate SRLU segment files from repository
// Generate SRLU segment files from repository. Path identifiers SLRU kind (pg_xact, pg_multixact/members, ...).
// Intiallly pass is empty string.
//
fn add_slru_segment(
&mut self,
@@ -136,10 +140,11 @@ impl<'a> Basebackup<'a> {
assert!(img.len() == pg_constants::BLCKSZ as usize);
let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT;
if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) {
// Switch to new segment: save old one
let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno);
let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?;
self.ar.append(&header, &self.slru_buf[..])?;
self.slru_buf = [0u8; pg_constants::SLRU_SEG_SIZE];
self.slru_buf = [0u8; pg_constants::SLRU_SEG_SIZE]; // reinitialize segment buffer
}
self.slru_segno = segno;
self.slru_path = path;
@@ -151,8 +156,13 @@ impl<'a> Basebackup<'a> {
Ok(())
}
//
// We flush SLRU segments to the tarball once them are completed.
// This method is used to flush last (may be incompleted) segment.
//
fn finish_slru_segment(&mut self) -> anyhow::Result<()> {
if self.slru_path != "" {
// is there is some incompleted segment
let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno);
let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?;
self.ar.append(&header, &self.slru_buf[..])?;
@@ -167,7 +177,7 @@ impl<'a> Basebackup<'a> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
info!("add_relmap_file {:?}", db);
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
String::from("global/pg_filenode.map")
String::from("global/pg_filenode.map") // filenode map for global tablespace
} else {
// User defined tablespaces are not supported
assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID);
@@ -182,18 +192,30 @@ impl<'a> Basebackup<'a> {
Ok(())
}
// Check transaction status
fn get_tx_status(&self, xid: TransactionId) -> anyhow::Result<u8> {
let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE;
let tag = ObjectTag::Clog(SlruBufferTag { blknum });
let clog_page = self.timeline.get_page_at_lsn(tag, self.lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
Ok(status)
}
//
// Extract twophase state files
//
fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar.append(&header, &buf[..])?;
// Include in tarball two-phase files only of in-progress transactions
if self.get_tx_status(xid)? == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
buf.put_u32_le(crc);
let path = format!("pg_twophase/{:>08X}", xid);
let header = new_tar_header(&path, buf.len() as u64)?;
self.ar.append(&header, &buf[..])?;
}
Ok(())
}
@@ -209,6 +231,7 @@ impl<'a> Basebackup<'a> {
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?;
let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?;
let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?;
// Here starts pg_resetwal inspired magic
// Generate new pg_control and WAL needed for bootstrap
let new_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE) + 1;
@@ -240,60 +263,8 @@ impl<'a> Basebackup<'a> {
);
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?;
let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize);
let hdr = XLogLongPageHeaderData {
std: {
XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: pg_constants::XLP_LONG_HEADER,
xlp_tli: 1, // FIXME: always use Postgres timeline 1
xlp_pageaddr: pg_control.checkPointCopy.redo - SizeOfXLogLongPHD as u64,
xlp_rem_len: 0,
}
},
xlp_sysid: pg_control.system_identifier,
xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32,
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
};
let hdr_bytes = encode_xlog_long_phd(hdr);
seg_buf.extend_from_slice(&hdr_bytes);
let rec_hdr = XLogRecord {
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD
+ SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT
+ SIZEOF_CHECKPOINT) as u32,
xl_xid: 0, //0 is for InvalidTransactionId
xl_prev: 0,
xl_info: pg_constants::XLOG_CHECKPOINT_SHUTDOWN,
xl_rmid: pg_constants::RM_XLOG_ID,
xl_crc: 0,
};
let mut rec_shord_hdr_bytes = BytesMut::new();
rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT);
rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8);
let rec_bytes = encode_xlog_record(rec_hdr);
let checkpoint_bytes = encode_checkpoint(pg_control.checkPointCopy);
//calculate record checksum
let mut crc = 0;
crc = crc32c_append(crc, &rec_shord_hdr_bytes[..]);
crc = crc32c_append(crc, &checkpoint_bytes[..]);
crc = crc32c_append(crc, &rec_bytes[0..XLOG_RECORD_CRC_OFFS]);
seg_buf.extend_from_slice(&rec_bytes[0..XLOG_RECORD_CRC_OFFS]);
seg_buf.put_u32_le(crc);
seg_buf.extend_from_slice(&rec_shord_hdr_bytes);
seg_buf.extend_from_slice(&checkpoint_bytes);
//zero out remainig file
seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0);
self.ar.append(&header, &seg_buf[..])?;
let wal_seg = generate_wal_segment(&pg_control);
self.ar.append(&header, &wal_seg[..])?;
Ok(())
}
}
@@ -345,16 +316,23 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> {
}
}
//
// Check if it is relational file
//
fn is_rel_file_path(path: &str) -> bool {
parse_rel_file_path(path).is_ok()
}
//
// Create new tarball entry header
//
fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
let mut header = Header::new_gnu();
header.set_size(size);
header.set_path(path)?;
header.set_mode(0b110000000);
header.set_mode(0b110000000); // -rw-------
header.set_mtime(
// use currenttime as last modified time
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()

View File

@@ -434,6 +434,8 @@ impl Timeline for ObjectTimeline {
Ok(())
}
/// Unlink object. This method is used for marking dropped relations
/// and removed segments of SLRUs.
fn put_unlink(&self, tag: ObjectTag, lsn: Lsn) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,

View File

@@ -72,9 +72,10 @@ pub trait ObjectStore: Send + Sync {
lsn: Lsn,
) -> Result<HashSet<RelTag>>;
/// Iterate through all objects
/// Iterate through objects tags. If nonrel_only, then only non-relationa data is iterated.
///
/// This is used to implement GC and preparing tarball for new node startup
/// Returns objects in increasing key-version order.
fn list_objects<'a>(
&'a self,
timelineid: ZTimelineId,

View File

@@ -71,7 +71,8 @@ pub trait Timeline: Send + Sync {
/// Truncate relation
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
/// Unlink object
/// Unlink object. This method is used for marking dropped relations
/// and removed segments of SLRUs.
fn put_unlink(&self, tag: ObjectTag, lsn: Lsn) -> Result<()>;
/// Remember the all WAL before the given LSN has been processed.

View File

@@ -23,6 +23,9 @@ pub type MultiXactId = TransactionId;
pub type MultiXactOffset = u32;
pub type MultiXactStatus = u32;
const MAX_MBR_BLKNO: u32 =
pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
#[allow(dead_code)]
pub struct WalStreamDecoder {
lsn: Lsn,
@@ -732,9 +735,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
blk.will_drop = true;
checkpoint.oldestXid = buf.get_u32_le();
checkpoint.oldestXidDB = buf.get_u32_le();
info!(
trace!(
"RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}",
blknum, checkpoint.oldestXid, checkpoint.oldestXidDB
blknum,
checkpoint.oldestXid,
checkpoint.oldestXidDB
);
}
trace!("RM_CLOG_ID updates block {}", blknum);
@@ -956,6 +961,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 0 is expected to be relation buffer tag but it is {:?}",
blocks[0].tag
);
}
}
} else if info == pg_constants::XLOG_HEAP_DELETE {
@@ -973,6 +983,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 0 is expected to be relation buffer tag but it is {:?}",
blocks[0].tag
);
}
}
} else if info == pg_constants::XLOG_HEAP_UPDATE
@@ -992,6 +1007,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 0 is expected to be relation buffer tag but it is {:?}",
blocks[0].tag
);
}
}
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0
@@ -1009,6 +1029,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
blknum: tag1.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 1 is expected to be relation buffer tag but it is {:?}",
blocks[1].tag
);
}
}
}
@@ -1033,6 +1058,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32,
});
blocks.push(blk);
} else {
panic!(
"Block 0 is expected to be relation buffer tag but it is {:?}",
blocks[0].tag
);
}
}
}
@@ -1062,11 +1092,26 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno =
(xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
for blknum in first_mbr_blkno..=last_mbr_blkno {
// The members SLRU can, in contrast to the offsets one, be filled to almost
// the full range at once. So we need to handle wraparound.
let mut blknum = first_mbr_blkno;
loop {
// Update members page
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum });
blocks.push(blk);
if blknum == last_mbr_blkno {
// last block inclusive
break;
}
// handle wraparound
if blknum == MAX_MBR_BLKNO {
blknum = 0;
} else {
blknum += 1;
}
}
if xlrec.mid >= checkpoint.nextMulti {
checkpoint.nextMulti = xlrec.mid + 1;
@@ -1094,6 +1139,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
let last_off_blkno =
xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
// Delete all the segments but the last one. The last segment can still
// contain, possibly partially, valid data.
for blknum in first_off_blkno..last_off_blkno {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum });
@@ -1104,11 +1151,22 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW
xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
let last_mbr_blkno =
xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
for blknum in first_mbr_blkno..last_mbr_blkno {
// The members SLRU can, in contrast to the offsets one, be filled to almost
// the full range at once. So we need to handle wraparound.
let mut blknum = first_mbr_blkno;
// Delete all the segments but the last one. The last segment can still
// contain, possibly partially, valid data.
while blknum != last_mbr_blkno {
let mut blk = DecodedBkpBlock::new();
blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum });
blk.will_drop = true;
blocks.push(blk);
// handle wraparound
if blknum == MAX_MBR_BLKNO {
blknum = 0;
} else {
blknum += 1;
}
}
} else {
panic!()

View File

@@ -150,6 +150,7 @@ fn walreceiver_main(
error!("No previous WAL position");
}
// FIXME: We have to do it to handle new segment generated by pg_resetwal at compute node startup
startpoint = Lsn::max(
startpoint,
Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)),

View File

@@ -271,15 +271,20 @@ impl PostgresRedoManagerInternal {
let apply_result: Result<Bytes, Error>;
if let ObjectTag::RelationBuffer(buf_tag) = tag {
// Relational WAL records are applied using wal-redo-postgres
apply_result = process.apply_wal_records(buf_tag, base_img, records).await;
} else {
// Non-relational WAL records we will aply ourselves.
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let mut page = BytesMut::new();
if let Some(fpi) = base_img {
// If full-page image is provided, then use it...
page.extend_from_slice(&fpi[..]);
} else {
// otherwise initialize page with zeros
page.extend_from_slice(&ZERO_PAGE);
}
// Apply all callected WAL records
for record in records {
let mut buf = record.rec.clone();
@@ -298,9 +303,11 @@ impl PostgresRedoManagerInternal {
if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID {
let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
// The only operation we need to implement is CLOG_ZEROPAGE
page.copy_from_slice(&ZERO_PAGE);
}
} else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID {
// Transaction manager stuff
let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
let tag_blknum = match tag {
@@ -316,10 +323,11 @@ impl PostgresRedoManagerInternal {
{
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
if info == pg_constants::XLOG_XACT_COMMIT {
// status of 2PC transaction will be set later
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
}
//handle subtrans
let _xact_time = buf.get_i64_le();
// decode xinfo
let mut xinfo = 0;
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
@@ -329,6 +337,7 @@ impl PostgresRedoManagerInternal {
}
}
// handle subtrans
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
for _i in 0..nsubxacts {
@@ -343,6 +352,7 @@ impl PostgresRedoManagerInternal {
}
}
if info == pg_constants::XLOG_XACT_COMMIT_PREPARED {
// Do not need to handle dropped relations here, just need to skip them
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
let nrels = buf.get_i32_le();
for _i in 0..nrels {
@@ -358,6 +368,7 @@ impl PostgresRedoManagerInternal {
);
}
}
// Skip invalidations
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
let nmsgs = buf.get_i32_le();
for _i in 0..nmsgs {
@@ -365,6 +376,7 @@ impl PostgresRedoManagerInternal {
buf.advance(sizeof_shared_invalidation_message);
}
}
// Set status of 2PC transaction
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
let xid = buf.get_u32_le();
transaction_id_set_status(xid, status, &mut page);
@@ -374,10 +386,12 @@ impl PostgresRedoManagerInternal {
{
status = pg_constants::TRANSACTION_STATUS_ABORTED;
if info == pg_constants::XLOG_XACT_ABORT {
// status of 2PC transaction will be set later
transaction_id_set_status(xlogrec.xl_xid, status, &mut page);
}
//handle subtrans
let _xact_time = buf.get_i64_le();
// decode xinfo
let mut xinfo = 0;
if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
@@ -387,6 +401,7 @@ impl PostgresRedoManagerInternal {
}
}
// handle subtrans
if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 {
let nsubxacts = buf.get_i32_le();
for _i in 0..nsubxacts {
@@ -400,6 +415,7 @@ impl PostgresRedoManagerInternal {
}
}
if info == pg_constants::XLOG_XACT_ABORT_PREPARED {
// Do not need to handle dropped relations here, just need to skip them
if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 {
let nrels = buf.get_i32_le();
for _i in 0..nrels {
@@ -415,6 +431,7 @@ impl PostgresRedoManagerInternal {
);
}
}
// Skip invalidations
if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 {
let nmsgs = buf.get_i32_le();
for _i in 0..nmsgs {
@@ -422,6 +439,7 @@ impl PostgresRedoManagerInternal {
buf.advance(sizeof_shared_invalidation_message);
}
}
// Set status of 2PC transaction
assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0);
let xid = buf.get_u32_le();
transaction_id_set_status(xid, status, &mut page);
@@ -437,10 +455,12 @@ impl PostgresRedoManagerInternal {
record.main_data_offset, record.rec.len());
}
} else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID {
// Multiexact operations
let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
{
// Just need to ero page
page.copy_from_slice(&ZERO_PAGE);
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
let xlrec = XlMultiXactCreate::decode(&mut buf);
@@ -470,6 +490,7 @@ impl PostgresRedoManagerInternal {
}
}
} else {
// Multixact offsets SLRU
let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32
* 4) as usize;
LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff);
@@ -478,6 +499,7 @@ impl PostgresRedoManagerInternal {
panic!();
}
} else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID {
// Ralation map file has size 512 bytes
page.clear();
page.extend_from_slice(&buf[12..]); // skip xl_relmap_update
assert!(page.len() == 512); // size of pg_filenode.map

View File

@@ -89,6 +89,9 @@ pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
// From multixact.h
pub const FIRST_MULTIXACT_ID: u32 = 1;
pub const MAX_MULTIXACT_ID: u32 = 0xFFFFFFFF;
pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00;
pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10;
pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20;

View File

@@ -7,6 +7,7 @@
// have been named the same as the corresponding PostgreSQL functions instead.
//
use crate::encode_checkpoint;
use crate::pg_constants;
use crate::CheckPoint;
use crate::FullTransactionId;
@@ -17,6 +18,7 @@ use crate::XLOG_PAGE_MAGIC;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes};
use bytes::{BufMut, BytesMut};
use crc32c::*;
use log::*;
use std::cmp::min;
@@ -383,3 +385,58 @@ impl CheckPoint {
}
}
}
pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes {
let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize);
let hdr = XLogLongPageHeaderData {
std: {
XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: pg_constants::XLP_LONG_HEADER,
xlp_tli: 1, // FIXME: always use Postgres timeline 1
xlp_pageaddr: pg_control.checkPointCopy.redo - SizeOfXLogLongPHD as u64,
xlp_rem_len: 0,
}
},
xlp_sysid: pg_control.system_identifier,
xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32,
xlp_xlog_blcksz: XLOG_BLCKSZ as u32,
};
let hdr_bytes = encode_xlog_long_phd(hdr);
seg_buf.extend_from_slice(&hdr_bytes);
let rec_hdr = XLogRecord {
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD
+ SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT
+ SIZEOF_CHECKPOINT) as u32,
xl_xid: 0, //0 is for InvalidTransactionId
xl_prev: 0,
xl_info: pg_constants::XLOG_CHECKPOINT_SHUTDOWN,
xl_rmid: pg_constants::RM_XLOG_ID,
xl_crc: 0,
};
let mut rec_shord_hdr_bytes = BytesMut::new();
rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT);
rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8);
let rec_bytes = encode_xlog_record(rec_hdr);
let checkpoint_bytes = encode_checkpoint(pg_control.checkPointCopy);
//calculate record checksum
let mut crc = 0;
crc = crc32c_append(crc, &rec_shord_hdr_bytes[..]);
crc = crc32c_append(crc, &checkpoint_bytes[..]);
crc = crc32c_append(crc, &rec_bytes[0..XLOG_RECORD_CRC_OFFS]);
seg_buf.extend_from_slice(&rec_bytes[0..XLOG_RECORD_CRC_OFFS]);
seg_buf.put_u32_le(crc);
seg_buf.extend_from_slice(&rec_shord_hdr_bytes);
seg_buf.extend_from_slice(&checkpoint_bytes);
//zero out remainig file
seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0);
seg_buf.freeze()
}