diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index a6d0f35cca..1334a80942 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -17,8 +17,8 @@ use std::time::SystemTime; use tar::{Builder, Header}; use walkdir::WalkDir; -use crate::repository::{DatabaseTag, ObjectTag, Timeline}; -use crc32c::*; +use crate::repository::{DatabaseTag, ObjectTag, SlruBufferTag, Timeline}; +use postgres_ffi::nonrelfile_utils::*; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; use postgres_ffi::*; @@ -99,6 +99,9 @@ impl<'a> Basebackup<'a> { } // Generate non-relational files. + // Iteration is sorted order: all objects of the same time are grouped and traversed + // in key ascending order. For example all pg_xact records precede pg_multixact records and are sorted by block number. + // It allows to easily construct SLRU segments (32 blocks). for obj in self.timeline.list_nonrels(self.lsn)? { match obj { ObjectTag::Clog(slru) => @@ -114,7 +117,7 @@ impl<'a> Basebackup<'a> { _ => {} } } - self.finish_slru_segment()?; // write last non-completed Slru segment (if any) + self.finish_slru_segment()?; // write last non-completed SLRU segment (if any) self.add_pgcontrol_file()?; self.ar.finish()?; debug!("all tarred up!"); @@ -122,7 +125,8 @@ impl<'a> Basebackup<'a> { } // - // Generate SRLU segment files from repository + // Generate SRLU segment files from repository. Path identifiers SLRU kind (pg_xact, pg_multixact/members, ...). + // Intiallly pass is empty string. // fn add_slru_segment( &mut self, @@ -136,10 +140,11 @@ impl<'a> Basebackup<'a> { assert!(img.len() == pg_constants::BLCKSZ as usize); let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT; if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) { + // Switch to new segment: save old one let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?; self.ar.append(&header, &self.slru_buf[..])?; - self.slru_buf = [0u8; pg_constants::SLRU_SEG_SIZE]; + self.slru_buf = [0u8; pg_constants::SLRU_SEG_SIZE]; // reinitialize segment buffer } self.slru_segno = segno; self.slru_path = path; @@ -151,8 +156,13 @@ impl<'a> Basebackup<'a> { Ok(()) } + // + // We flush SLRU segments to the tarball once them are completed. + // This method is used to flush last (may be incompleted) segment. + // fn finish_slru_segment(&mut self) -> anyhow::Result<()> { if self.slru_path != "" { + // is there is some incompleted segment let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?; self.ar.append(&header, &self.slru_buf[..])?; @@ -167,7 +177,7 @@ impl<'a> Basebackup<'a> { let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; info!("add_relmap_file {:?}", db); let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { - String::from("global/pg_filenode.map") + String::from("global/pg_filenode.map") // filenode map for global tablespace } else { // User defined tablespaces are not supported assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); @@ -182,18 +192,30 @@ impl<'a> Basebackup<'a> { Ok(()) } + // Check transaction status + fn get_tx_status(&self, xid: TransactionId) -> anyhow::Result { + let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE; + let tag = ObjectTag::Clog(SlruBufferTag { blknum }); + let clog_page = self.timeline.get_page_at_lsn(tag, self.lsn)?; + let status = transaction_id_get_status(xid, &clog_page[..]); + Ok(status) + } + // // Extract twophase state files // fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> { - let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; - let mut buf = BytesMut::new(); - buf.extend_from_slice(&img[..]); - let crc = crc32c::crc32c(&img[..]); - buf.put_u32_le(crc); - let path = format!("pg_twophase/{:>08X}", xid); - let header = new_tar_header(&path, buf.len() as u64)?; - self.ar.append(&header, &buf[..])?; + // Include in tarball two-phase files only of in-progress transactions + if self.get_tx_status(xid)? == pg_constants::TRANSACTION_STATUS_IN_PROGRESS { + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; + let mut buf = BytesMut::new(); + buf.extend_from_slice(&img[..]); + let crc = crc32c::crc32c(&img[..]); + buf.put_u32_le(crc); + let path = format!("pg_twophase/{:>08X}", xid); + let header = new_tar_header(&path, buf.len() as u64)?; + self.ar.append(&header, &buf[..])?; + } Ok(()) } @@ -209,6 +231,7 @@ impl<'a> Basebackup<'a> { .get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?; let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?; let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?; + // Here starts pg_resetwal inspired magic // Generate new pg_control and WAL needed for bootstrap let new_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE) + 1; @@ -240,60 +263,8 @@ impl<'a> Basebackup<'a> { ); let wal_file_path = format!("pg_wal/{}", wal_file_name); let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?; - - let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize); - - let hdr = XLogLongPageHeaderData { - std: { - XLogPageHeaderData { - xlp_magic: XLOG_PAGE_MAGIC as u16, - xlp_info: pg_constants::XLP_LONG_HEADER, - xlp_tli: 1, // FIXME: always use Postgres timeline 1 - xlp_pageaddr: pg_control.checkPointCopy.redo - SizeOfXLogLongPHD as u64, - xlp_rem_len: 0, - } - }, - xlp_sysid: pg_control.system_identifier, - xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, - xlp_xlog_blcksz: XLOG_BLCKSZ as u32, - }; - - let hdr_bytes = encode_xlog_long_phd(hdr); - seg_buf.extend_from_slice(&hdr_bytes); - - let rec_hdr = XLogRecord { - xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD - + SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT - + SIZEOF_CHECKPOINT) as u32, - xl_xid: 0, //0 is for InvalidTransactionId - xl_prev: 0, - xl_info: pg_constants::XLOG_CHECKPOINT_SHUTDOWN, - xl_rmid: pg_constants::RM_XLOG_ID, - xl_crc: 0, - }; - - let mut rec_shord_hdr_bytes = BytesMut::new(); - rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT); - rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8); - - let rec_bytes = encode_xlog_record(rec_hdr); - let checkpoint_bytes = encode_checkpoint(pg_control.checkPointCopy); - - //calculate record checksum - let mut crc = 0; - crc = crc32c_append(crc, &rec_shord_hdr_bytes[..]); - crc = crc32c_append(crc, &checkpoint_bytes[..]); - crc = crc32c_append(crc, &rec_bytes[0..XLOG_RECORD_CRC_OFFS]); - - seg_buf.extend_from_slice(&rec_bytes[0..XLOG_RECORD_CRC_OFFS]); - seg_buf.put_u32_le(crc); - seg_buf.extend_from_slice(&rec_shord_hdr_bytes); - seg_buf.extend_from_slice(&checkpoint_bytes); - - //zero out remainig file - seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0); - - self.ar.append(&header, &seg_buf[..])?; + let wal_seg = generate_wal_segment(&pg_control); + self.ar.append(&header, &wal_seg[..])?; Ok(()) } } @@ -345,16 +316,23 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> { } } +// +// Check if it is relational file +// fn is_rel_file_path(path: &str) -> bool { parse_rel_file_path(path).is_ok() } +// +// Create new tarball entry header +// fn new_tar_header(path: &str, size: u64) -> anyhow::Result
{ let mut header = Header::new_gnu(); header.set_size(size); header.set_path(path)?; - header.set_mode(0b110000000); + header.set_mode(0b110000000); // -rw------- header.set_mtime( + // use currenttime as last modified time SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 1979063e33..b8a6751fc5 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -434,6 +434,8 @@ impl Timeline for ObjectTimeline { Ok(()) } + /// Unlink object. This method is used for marking dropped relations + /// and removed segments of SLRUs. fn put_unlink(&self, tag: ObjectTag, lsn: Lsn) -> Result<()> { let key = ObjectKey { timeline: self.timelineid, diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs index ebd0cf7467..8d5f60b61d 100644 --- a/pageserver/src/object_store.rs +++ b/pageserver/src/object_store.rs @@ -72,9 +72,10 @@ pub trait ObjectStore: Send + Sync { lsn: Lsn, ) -> Result>; - /// Iterate through all objects + /// Iterate through objects tags. If nonrel_only, then only non-relationa data is iterated. /// /// This is used to implement GC and preparing tarball for new node startup + /// Returns objects in increasing key-version order. fn list_objects<'a>( &'a self, timelineid: ZTimelineId, diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index b807740576..0850dc6065 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -71,7 +71,8 @@ pub trait Timeline: Send + Sync { /// Truncate relation fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; - /// Unlink object + /// Unlink object. This method is used for marking dropped relations + /// and removed segments of SLRUs. fn put_unlink(&self, tag: ObjectTag, lsn: Lsn) -> Result<()>; /// Remember the all WAL before the given LSN has been processed. diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 07975d8872..96b355f3c0 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -23,6 +23,9 @@ pub type MultiXactId = TransactionId; pub type MultiXactOffset = u32; pub type MultiXactStatus = u32; +const MAX_MBR_BLKNO: u32 = + pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + #[allow(dead_code)] pub struct WalStreamDecoder { lsn: Lsn, @@ -732,9 +735,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blk.will_drop = true; checkpoint.oldestXid = buf.get_u32_le(); checkpoint.oldestXidDB = buf.get_u32_le(); - info!( + trace!( "RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}", - blknum, checkpoint.oldestXid, checkpoint.oldestXidDB + blknum, + checkpoint.oldestXid, + checkpoint.oldestXidDB ); } trace!("RM_CLOG_ID updates block {}", blknum); @@ -956,6 +961,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, }); blocks.push(blk); + } else { + panic!( + "Block 0 is expected to be relation buffer tag but it is {:?}", + blocks[0].tag + ); } } } else if info == pg_constants::XLOG_HEAP_DELETE { @@ -973,6 +983,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, }); blocks.push(blk); + } else { + panic!( + "Block 0 is expected to be relation buffer tag but it is {:?}", + blocks[0].tag + ); } } } else if info == pg_constants::XLOG_HEAP_UPDATE @@ -992,6 +1007,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, }); blocks.push(blk); + } else { + panic!( + "Block 0 is expected to be relation buffer tag but it is {:?}", + blocks[0].tag + ); } } if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 @@ -1009,6 +1029,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blknum: tag1.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, }); blocks.push(blk); + } else { + panic!( + "Block 1 is expected to be relation buffer tag but it is {:?}", + blocks[1].tag + ); } } } @@ -1033,6 +1058,11 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blknum: tag0.blknum / pg_constants::HEAPBLOCKS_PER_PAGE as u32, }); blocks.push(blk); + } else { + panic!( + "Block 0 is expected to be relation buffer tag but it is {:?}", + blocks[0].tag + ); } } } @@ -1062,11 +1092,26 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; let last_mbr_blkno = (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - for blknum in first_mbr_blkno..=last_mbr_blkno { + // The members SLRU can, in contrast to the offsets one, be filled to almost + // the full range at once. So we need to handle wraparound. + let mut blknum = first_mbr_blkno; + loop { // Update members page let mut blk = DecodedBkpBlock::new(); blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum }); blocks.push(blk); + + if blknum == last_mbr_blkno { + // last block inclusive + break; + } + + // handle wraparound + if blknum == MAX_MBR_BLKNO { + blknum = 0; + } else { + blknum += 1; + } } if xlrec.mid >= checkpoint.nextMulti { checkpoint.nextMulti = xlrec.mid + 1; @@ -1094,6 +1139,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; let last_off_blkno = xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + // Delete all the segments but the last one. The last segment can still + // contain, possibly partially, valid data. for blknum in first_off_blkno..last_off_blkno { let mut blk = DecodedBkpBlock::new(); blk.tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }); @@ -1104,11 +1151,22 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; let last_mbr_blkno = xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - for blknum in first_mbr_blkno..last_mbr_blkno { + // The members SLRU can, in contrast to the offsets one, be filled to almost + // the full range at once. So we need to handle wraparound. + let mut blknum = first_mbr_blkno; + // Delete all the segments but the last one. The last segment can still + // contain, possibly partially, valid data. + while blknum != last_mbr_blkno { let mut blk = DecodedBkpBlock::new(); blk.tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum }); blk.will_drop = true; blocks.push(blk); + // handle wraparound + if blknum == MAX_MBR_BLKNO { + blknum = 0; + } else { + blknum += 1; + } } } else { panic!() diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index b417982967..eb0a7e45a0 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -150,6 +150,7 @@ fn walreceiver_main( error!("No previous WAL position"); } + // FIXME: We have to do it to handle new segment generated by pg_resetwal at compute node startup startpoint = Lsn::max( startpoint, Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)), diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 25f9514831..c972813191 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -271,15 +271,20 @@ impl PostgresRedoManagerInternal { let apply_result: Result; if let ObjectTag::RelationBuffer(buf_tag) = tag { + // Relational WAL records are applied using wal-redo-postgres apply_result = process.apply_wal_records(buf_tag, base_img, records).await; } else { + // Non-relational WAL records we will aply ourselves. const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; let mut page = BytesMut::new(); if let Some(fpi) = base_img { + // If full-page image is provided, then use it... page.extend_from_slice(&fpi[..]); } else { + // otherwise initialize page with zeros page.extend_from_slice(&ZERO_PAGE); } + // Apply all callected WAL records for record in records { let mut buf = record.rec.clone(); @@ -298,9 +303,11 @@ impl PostgresRedoManagerInternal { if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::CLOG_ZEROPAGE { + // The only operation we need to implement is CLOG_ZEROPAGE page.copy_from_slice(&ZERO_PAGE); } } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + // Transaction manager stuff let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; let mut status = 0; let tag_blknum = match tag { @@ -316,10 +323,11 @@ impl PostgresRedoManagerInternal { { status = pg_constants::TRANSACTION_STATUS_COMMITTED; if info == pg_constants::XLOG_XACT_COMMIT { + // status of 2PC transaction will be set later transaction_id_set_status(xlogrec.xl_xid, status, &mut page); } - //handle subtrans let _xact_time = buf.get_i64_le(); + // decode xinfo let mut xinfo = 0; if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { xinfo = buf.get_u32_le(); @@ -329,6 +337,7 @@ impl PostgresRedoManagerInternal { } } + // handle subtrans if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); for _i in 0..nsubxacts { @@ -343,6 +352,7 @@ impl PostgresRedoManagerInternal { } } if info == pg_constants::XLOG_XACT_COMMIT_PREPARED { + // Do not need to handle dropped relations here, just need to skip them if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { let nrels = buf.get_i32_le(); for _i in 0..nrels { @@ -358,6 +368,7 @@ impl PostgresRedoManagerInternal { ); } } + // Skip invalidations if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { let nmsgs = buf.get_i32_le(); for _i in 0..nmsgs { @@ -365,6 +376,7 @@ impl PostgresRedoManagerInternal { buf.advance(sizeof_shared_invalidation_message); } } + // Set status of 2PC transaction assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0); let xid = buf.get_u32_le(); transaction_id_set_status(xid, status, &mut page); @@ -374,10 +386,12 @@ impl PostgresRedoManagerInternal { { status = pg_constants::TRANSACTION_STATUS_ABORTED; if info == pg_constants::XLOG_XACT_ABORT { + // status of 2PC transaction will be set later transaction_id_set_status(xlogrec.xl_xid, status, &mut page); } //handle subtrans let _xact_time = buf.get_i64_le(); + // decode xinfo let mut xinfo = 0; if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { xinfo = buf.get_u32_le(); @@ -387,6 +401,7 @@ impl PostgresRedoManagerInternal { } } + // handle subtrans if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); for _i in 0..nsubxacts { @@ -400,6 +415,7 @@ impl PostgresRedoManagerInternal { } } if info == pg_constants::XLOG_XACT_ABORT_PREPARED { + // Do not need to handle dropped relations here, just need to skip them if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { let nrels = buf.get_i32_le(); for _i in 0..nrels { @@ -415,6 +431,7 @@ impl PostgresRedoManagerInternal { ); } } + // Skip invalidations if xinfo & pg_constants::XACT_XINFO_HAS_INVALS != 0 { let nmsgs = buf.get_i32_le(); for _i in 0..nmsgs { @@ -422,6 +439,7 @@ impl PostgresRedoManagerInternal { buf.advance(sizeof_shared_invalidation_message); } } + // Set status of 2PC transaction assert!((xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE) != 0); let xid = buf.get_u32_le(); transaction_id_set_status(xid, status, &mut page); @@ -437,10 +455,12 @@ impl PostgresRedoManagerInternal { record.main_data_offset, record.rec.len()); } } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { + // Multiexact operations let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { + // Just need to ero page page.copy_from_slice(&ZERO_PAGE); } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); @@ -470,6 +490,7 @@ impl PostgresRedoManagerInternal { } } } else { + // Multixact offsets SLRU let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 * 4) as usize; LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); @@ -478,6 +499,7 @@ impl PostgresRedoManagerInternal { panic!(); } } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { + // Ralation map file has size 512 bytes page.clear(); page.extend_from_slice(&buf[12..]); // skip xl_relmap_update assert!(page.len() == 512); // size of pg_filenode.map diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 61c54b05d2..a9af87eac6 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -89,6 +89,9 @@ pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; // From multixact.h +pub const FIRST_MULTIXACT_ID: u32 = 1; +pub const MAX_MULTIXACT_ID: u32 = 0xFFFFFFFF; + pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00; pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10; pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20; diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 8648b799d8..7876fbad20 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -7,6 +7,7 @@ // have been named the same as the corresponding PostgreSQL functions instead. // +use crate::encode_checkpoint; use crate::pg_constants; use crate::CheckPoint; use crate::FullTransactionId; @@ -17,6 +18,7 @@ use crate::XLOG_PAGE_MAGIC; use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, Bytes}; +use bytes::{BufMut, BytesMut}; use crc32c::*; use log::*; use std::cmp::min; @@ -383,3 +385,58 @@ impl CheckPoint { } } } + +pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes { + let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize); + + let hdr = XLogLongPageHeaderData { + std: { + XLogPageHeaderData { + xlp_magic: XLOG_PAGE_MAGIC as u16, + xlp_info: pg_constants::XLP_LONG_HEADER, + xlp_tli: 1, // FIXME: always use Postgres timeline 1 + xlp_pageaddr: pg_control.checkPointCopy.redo - SizeOfXLogLongPHD as u64, + xlp_rem_len: 0, + } + }, + xlp_sysid: pg_control.system_identifier, + xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, + xlp_xlog_blcksz: XLOG_BLCKSZ as u32, + }; + + let hdr_bytes = encode_xlog_long_phd(hdr); + seg_buf.extend_from_slice(&hdr_bytes); + + let rec_hdr = XLogRecord { + xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + + SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT + + SIZEOF_CHECKPOINT) as u32, + xl_xid: 0, //0 is for InvalidTransactionId + xl_prev: 0, + xl_info: pg_constants::XLOG_CHECKPOINT_SHUTDOWN, + xl_rmid: pg_constants::RM_XLOG_ID, + xl_crc: 0, + }; + + let mut rec_shord_hdr_bytes = BytesMut::new(); + rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT); + rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8); + + let rec_bytes = encode_xlog_record(rec_hdr); + let checkpoint_bytes = encode_checkpoint(pg_control.checkPointCopy); + + //calculate record checksum + let mut crc = 0; + crc = crc32c_append(crc, &rec_shord_hdr_bytes[..]); + crc = crc32c_append(crc, &checkpoint_bytes[..]); + crc = crc32c_append(crc, &rec_bytes[0..XLOG_RECORD_CRC_OFFS]); + + seg_buf.extend_from_slice(&rec_bytes[0..XLOG_RECORD_CRC_OFFS]); + seg_buf.put_u32_le(crc); + seg_buf.extend_from_slice(&rec_shord_hdr_bytes); + seg_buf.extend_from_slice(&checkpoint_bytes); + + //zero out remainig file + seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0); + seg_buf.freeze() +}