diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index ad0f086332..c5ccb5a40f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -38,7 +38,7 @@ use crate::relish::*; use crate::remote_storage::{schedule_timeline_checkpoint_upload, schedule_timeline_download}; use crate::repository::{ BlockNumber, GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncState, - TimelineWriter, WALRecord, + TimelineWriter, ZenithWalRecord, }; use crate::tenant_mgr; use crate::walreceiver; @@ -1981,7 +1981,7 @@ impl LayeredTimeline { assert!(data.page_img.is_none()); if let Some((first_rec_lsn, first_rec)) = data.records.first() { assert!(&cached_lsn < first_rec_lsn); - assert!(!first_rec.will_init); + assert!(!first_rec.will_init()); } data.page_img = Some(cached_img); break; @@ -2028,7 +2028,7 @@ impl LayeredTimeline { // // If we don't have a base image, then the oldest WAL record better initialize // the page - if data.page_img.is_none() && !data.records.first().unwrap().1.will_init { + if data.page_img.is_none() && !data.records.first().unwrap().1.will_init() { // FIXME: this ought to be an error? warn!( "Base image for page {}/{} at {} not found, but got {} WAL records", @@ -2125,8 +2125,8 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { &self, lsn: Lsn, rel: RelishTag, - rel_blknum: BlockNumber, - rec: WALRecord, + rel_blknum: u32, + rec: ZenithWalRecord, ) -> Result<()> { if !rel.is_blocky() && rel_blknum != 0 { bail!( diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 0a3b291b15..32393b23af 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -257,7 +257,7 @@ impl Layer for DeltaLayer { break; } PageVersion::Wal(rec) => { - let will_init = rec.will_init; + let will_init = rec.will_init(); reconstruct_data.records.push((*pv_lsn, rec)); if will_init { // This WAL record initializes the page, so no need to go further back @@ -373,12 +373,12 @@ impl Layer for DeltaLayer { write!(&mut desc, " img {} bytes", img.len())?; } PageVersion::Wal(rec) => { - let wal_desc = walrecord::describe_wal_record(&rec.rec); + let wal_desc = walrecord::describe_wal_record(&rec); write!( &mut desc, " rec {} bytes will_init: {} {}", - rec.rec.len(), - rec.will_init, + blob_range.size, + rec.will_init(), wal_desc )?; } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 68f98735c2..6242f0a361 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -15,7 +15,7 @@ use crate::layered_repository::storage_layer::{ }; use crate::layered_repository::LayeredTimeline; use crate::layered_repository::ZERO_PAGE; -use crate::repository::WALRecord; +use crate::repository::ZenithWalRecord; use crate::{ZTenantId, ZTimelineId}; use anyhow::{ensure, Result}; use bytes::Bytes; @@ -187,7 +187,7 @@ impl Layer for InMemoryLayer { } PageVersion::Wal(rec) => { reconstruct_data.records.push((*entry_lsn, rec.clone())); - if rec.will_init { + if rec.will_init() { // This WAL record initializes the page, so no need to go further back need_image = false; break; @@ -369,7 +369,12 @@ impl InMemoryLayer { // Write operations /// Remember new page version, as a WAL record over previous version - pub fn put_wal_record(&self, lsn: Lsn, blknum: SegmentBlk, rec: WALRecord) -> Result { + pub fn put_wal_record( + &self, + lsn: Lsn, + blknum: SegmentBlk, + rec: ZenithWalRecord, + ) -> Result { self.put_page_version(blknum, lsn, PageVersion::Wal(rec)) } diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index dab650d2ab..99fdaa6845 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -3,7 +3,7 @@ //! use crate::relish::RelishTag; -use crate::repository::{BlockNumber, WALRecord}; +use crate::repository::{BlockNumber, ZenithWalRecord}; use crate::{ZTenantId, ZTimelineId}; use anyhow::Result; use bytes::Bytes; @@ -67,7 +67,7 @@ impl SegmentTag { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum PageVersion { Page(Bytes), - Wal(WALRecord), + Wal(ZenithWalRecord), } /// @@ -78,7 +78,7 @@ pub enum PageVersion { /// 'records' contains the records to apply over the base image. /// pub struct PageReconstructData { - pub records: Vec<(Lsn, WALRecord)>, + pub records: Vec<(Lsn, ZenithWalRecord)>, pub page_img: Option, } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 4bed6449ed..8ddc2b4952 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,7 +1,9 @@ use crate::relish::*; +use crate::walrecord::MultiXactMember; use crate::CheckpointConfig; use anyhow::Result; use bytes::Bytes; +use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::ops::{AddAssign, Deref}; @@ -263,7 +265,7 @@ pub trait TimelineWriter: Deref { lsn: Lsn, tag: RelishTag, blknum: BlockNumber, - rec: WALRecord, + rec: ZenithWalRecord, ) -> Result<()>; /// Like put_wal_record, but with ready-made image of the page. @@ -288,14 +290,42 @@ pub trait TimelineWriter: Deref { fn advance_last_record_lsn(&self, lsn: Lsn); } +/// Each update to a page is represented by a ZenithWalRecord. It can be a wrapper +/// around a PostgreSQL WAL record, or a custom zenith-specific "record". #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct WALRecord { - pub will_init: bool, - pub rec: Bytes, - // Remember the offset of main_data in rec, - // so that we don't have to parse the record again. - // If record has no main_data, this offset equals rec.len(). - pub main_data_offset: u32, +pub enum ZenithWalRecord { + /// Native PostgreSQL WAL record + Postgres { will_init: bool, rec: Bytes }, + + /// Set bits in heap visibility map. (heap blkno, flag bits to clear) + ClearVisibilityMapFlags { heap_blkno: u32, flags: u8 }, + /// Mark transaction IDs as committed on a CLOG page + ClogSetCommitted { xids: Vec }, + /// Mark transaction IDs as aborted on a CLOG page + ClogSetAborted { xids: Vec }, + /// Extend multixact offsets SLRU + MultixactOffsetCreate { + mid: MultiXactId, + moff: MultiXactOffset, + }, + /// Extend multixact members SLRU. + MultixactMembersCreate { + moff: MultiXactOffset, + members: Vec, + }, +} + +impl ZenithWalRecord { + /// Does replaying this WAL record initialize the page from scratch, or does + /// it need to be applied over the previous image of the page? + pub fn will_init(&self) -> bool { + match self { + ZenithWalRecord::Postgres { will_init, rec: _ } => *will_init, + + // None of the special zenith record types currently initialize the page + _ => false, + } + } } #[cfg(test)] @@ -378,7 +408,7 @@ pub mod repo_harness { blknum: BlockNumber, lsn: Lsn, base_img: Option, - records: Vec<(Lsn, WALRecord)>, + records: Vec<(Lsn, ZenithWalRecord)>, ) -> Result { let s = format!( "redo for {} blk {} to get to {}, with {} and {} records", diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 0d2b04ef80..19228ae508 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -34,12 +34,10 @@ use crate::repository::*; use crate::walrecord::*; use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment; use postgres_ffi::xlog_utils::*; +use postgres_ffi::TransactionId; use postgres_ffi::{pg_constants, CheckPoint}; use zenith_utils::lsn::Lsn; -const MAX_MBR_BLKNO: u32 = - pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); pub struct WalIngest { @@ -74,66 +72,23 @@ impl WalIngest { recdata: Bytes, lsn: Lsn, ) -> Result<()> { - let decoded = decode_wal_record(recdata.clone()); + let mut decoded = decode_wal_record(recdata); + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); assert!(!self.checkpoint_modified); if self.checkpoint.update_next_xid(decoded.xl_xid) { self.checkpoint_modified = true; } - // Iterate through all the blocks that the record modifies, and - // "put" a separate copy of the record for each block. - for blk in decoded.blocks.iter() { - let tag = RelishTag::Relation(RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, - }); - - // - // Instead of storing full-page-image WAL record, - // it is better to store extracted image: we can skip wal-redo - // in this case. Also some FPI records may contain multiple (up to 32) pages, - // so them have to be copied multiple times. - // - if blk.apply_image - && blk.has_image - && decoded.xl_rmid == pg_constants::RM_XLOG_ID - && (decoded.xl_info == pg_constants::XLOG_FPI - || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) - // compression of WAL is not yet supported: fall back to storing the original WAL record - && (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0 - { - // Extract page image from FPI record - let img_len = blk.bimg_len as usize; - let img_offs = blk.bimg_offset as usize; - let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); - image.extend_from_slice(&recdata[img_offs..img_offs + img_len]); - - if blk.hole_length != 0 { - let tail = image.split_off(blk.hole_offset as usize); - image.resize(image.len() + blk.hole_length as usize, 0u8); - image.unsplit(tail); - } - image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); - image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); - assert_eq!(image.len(), pg_constants::BLCKSZ as usize); - timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?; - } else { - let rec = WALRecord { - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; - } + // Heap AM records need some special handling, because they modify VM pages + // without registering them with the standard mechanism. + if decoded.xl_rmid == pg_constants::RM_HEAP_ID + || decoded.xl_rmid == pg_constants::RM_HEAP2_ID + { + self.ingest_heapam_record(&mut buf, timeline, lsn, &mut decoded)?; } - - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); - - // Handle a few special record types + // Handle other special record types if decoded.xl_rmid == pg_constants::RM_SMGR_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE @@ -202,13 +157,23 @@ impl WalIngest { if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record(timeline, lsn, &parsed_xact, &decoded)?; + self.ingest_xact_record( + timeline, + lsn, + &parsed_xact, + info == pg_constants::XLOG_XACT_COMMIT, + )?; } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED || info == pg_constants::XLOG_XACT_ABORT_PREPARED { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record(timeline, lsn, &parsed_xact, &decoded)?; + self.ingest_xact_record( + timeline, + lsn, + &parsed_xact, + info == pg_constants::XLOG_XACT_COMMIT_PREPARED, + )?; // Remove twophase file. see RemoveTwoPhaseFile() in postgres code trace!( "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", @@ -223,9 +188,6 @@ impl WalIngest { lsn, )?; } else if info == pg_constants::XLOG_XACT_PREPARE { - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); - timeline.put_page_image( RelishTag::TwoPhase { xid: decoded.xl_xid, @@ -266,7 +228,7 @@ impl WalIngest { )?; } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); - self.ingest_multixact_create_record(timeline, lsn, &xlrec, &decoded)?; + self.ingest_multixact_create_record(timeline, lsn, &xlrec)?; } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); self.ingest_multixact_truncate_record(timeline, lsn, &xlrec)?; @@ -286,8 +248,6 @@ impl WalIngest { || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN { let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT]; - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); buf.copy_to_slice(&mut checkpoint_bytes); let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes).unwrap(); trace!( @@ -307,6 +267,12 @@ impl WalIngest { } } + // Iterate through all the blocks that the record modifies, and + // "put" a separate copy of the record for each block. + for blk in decoded.blocks.iter() { + self.ingest_decoded_block(timeline, lsn, &decoded, blk)?; + } + // If checkpoint data was updated, store the new version in the repository if self.checkpoint_modified { let new_checkpoint_bytes = self.checkpoint.encode(); @@ -322,6 +288,195 @@ impl WalIngest { Ok(()) } + fn ingest_decoded_block( + &mut self, + timeline: &dyn TimelineWriter, + lsn: Lsn, + decoded: &DecodedWALRecord, + blk: &DecodedBkpBlock, + ) -> Result<()> { + let tag = RelishTag::Relation(RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, + }); + + // + // Instead of storing full-page-image WAL record, + // it is better to store extracted image: we can skip wal-redo + // in this case. Also some FPI records may contain multiple (up to 32) pages, + // so them have to be copied multiple times. + // + if blk.apply_image + && blk.has_image + && decoded.xl_rmid == pg_constants::RM_XLOG_ID + && (decoded.xl_info == pg_constants::XLOG_FPI + || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) + // compression of WAL is not yet supported: fall back to storing the original WAL record + && (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0 + { + // Extract page image from FPI record + let img_len = blk.bimg_len as usize; + let img_offs = blk.bimg_offset as usize; + let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); + image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); + + if blk.hole_length != 0 { + let tail = image.split_off(blk.hole_offset as usize); + image.resize(image.len() + blk.hole_length as usize, 0u8); + image.unsplit(tail); + } + image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); + image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); + assert_eq!(image.len(), pg_constants::BLCKSZ as usize); + timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?; + } else { + let rec = ZenithWalRecord::Postgres { + will_init: blk.will_init || blk.apply_image, + rec: decoded.record.clone(), + }; + timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; + } + Ok(()) + } + + fn ingest_heapam_record( + &mut self, + buf: &mut Bytes, + timeline: &dyn TimelineWriter, + lsn: Lsn, + decoded: &mut DecodedWALRecord, + ) -> Result<()> { + // Handle VM bit updates that are implicitly part of heap records. + if decoded.xl_rmid == pg_constants::RM_HEAP_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + if info == pg_constants::XLOG_HEAP_INSERT { + let xlrec = XlHeapInsert::decode(buf); + assert_eq!(0, buf.remaining()); + if (xlrec.flags + & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED + | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) + != 0 + { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }), + decoded.blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[0].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + } else if info == pg_constants::XLOG_HEAP_DELETE { + let xlrec = XlHeapDelete::decode(buf); + assert_eq!(0, buf.remaining()); + if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }), + decoded.blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[0].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + } else if info == pg_constants::XLOG_HEAP_UPDATE + || info == pg_constants::XLOG_HEAP_HOT_UPDATE + { + let xlrec = XlHeapUpdate::decode(buf); + // the size of tuple data is inferred from the size of the record. + // we can't validate the remaining number of bytes without parsing + // the tuple data. + if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }), + decoded.blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[0].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 + && decoded.blocks.len() > 1 + { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[1].rnode_spcnode, + dbnode: decoded.blocks[1].rnode_dbnode, + relnode: decoded.blocks[1].rnode_relnode, + }), + decoded.blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[1].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + } + } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { + let xlrec = XlHeapMultiInsert::decode(buf); + + let offset_array_len = if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { + // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set + 0 + } else { + std::mem::size_of::() * xlrec.ntuples as usize + }; + assert_eq!(offset_array_len, buf.remaining()); + + // FIXME: why also ALL_FROZEN_SET? + if (xlrec.flags + & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED + | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) + != 0 + { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }), + decoded.blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[0].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + } + } + + // FIXME: What about XLOG_HEAP_LOCK and XLOG_HEAP2_LOCK_UPDATED? + + Ok(()) + } + /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. fn ingest_xlog_dbase_create( &mut self, @@ -476,34 +631,20 @@ impl WalIngest { timeline: &dyn TimelineWriter, lsn: Lsn, parsed: &XlXactParsedRecord, - decoded: &DecodedWALRecord, + is_commit: bool, ) -> Result<()> { - // Record update of CLOG page + // Record update of CLOG pages let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; - - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - let rec = WALRecord { - will_init: false, - rec: decoded.record.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - timeline.put_wal_record( - lsn, - RelishTag::Slru { - slru: SlruKind::Clog, - segno, - }, - rpageno, - rec.clone(), - )?; + let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + let mut page_xids: Vec = vec![parsed.xid]; for subxact in &parsed.subxacts { let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; if subxact_pageno != pageno { - pageno = subxact_pageno; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + // This subxact goes to different page. Write the record + // for all the XIDs on the previous page, and continue + // accumulating XIDs on this new page. timeline.put_wal_record( lsn, RelishTag::Slru { @@ -511,10 +652,33 @@ impl WalIngest { segno, }, rpageno, - rec.clone(), + if is_commit { + ZenithWalRecord::ClogSetCommitted { xids: page_xids } + } else { + ZenithWalRecord::ClogSetAborted { xids: page_xids } + }, )?; + page_xids = Vec::new(); } + pageno = subxact_pageno; + segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + page_xids.push(*subxact); } + timeline.put_wal_record( + lsn, + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + }, + rpageno, + if is_commit { + ZenithWalRecord::ClogSetCommitted { xids: page_xids } + } else { + ZenithWalRecord::ClogSetAborted { xids: page_xids } + }, + )?; + for xnode in &parsed.xnodes { for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { let rel = RelTag { @@ -596,16 +760,12 @@ impl WalIngest { timeline: &dyn TimelineWriter, lsn: Lsn, xlrec: &XlMultiXactCreate, - decoded: &DecodedWALRecord, ) -> Result<()> { - let rec = WALRecord { - will_init: false, - rec: decoded.record.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; + // Create WAL record for updating the multixact-offsets page let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_wal_record( lsn, RelishTag::Slru { @@ -613,40 +773,51 @@ impl WalIngest { segno, }, rpageno, - rec.clone(), + ZenithWalRecord::MultixactOffsetCreate { + mid: xlrec.mid, + moff: xlrec.moff, + }, )?; - let first_mbr_pageno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let last_mbr_pageno = - (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - // The members SLRU can, in contrast to the offsets one, be filled to almost - // the full range at once. So we need to handle wraparound. - let mut pageno = first_mbr_pageno; + // Create WAL records for the update of each affected multixact-members page + let mut members = xlrec.members.iter(); + let mut offset = xlrec.moff; loop { - // Update members page - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + + // How many members fit on this page? + let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32 + - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + + let mut this_page_members: Vec = Vec::new(); + for _ in 0..page_remain { + if let Some(m) = members.next() { + this_page_members.push(m.clone()); + } else { + break; + } + } + if this_page_members.is_empty() { + // all done + break; + } + let n_this_page = this_page_members.len(); + timeline.put_wal_record( lsn, RelishTag::Slru { slru: SlruKind::MultiXactMembers, - segno, + segno: pageno / pg_constants::SLRU_PAGES_PER_SEGMENT, + }, + pageno % pg_constants::SLRU_PAGES_PER_SEGMENT, + ZenithWalRecord::MultixactMembersCreate { + moff: offset, + members: this_page_members, }, - rpageno, - rec.clone(), )?; - if pageno == last_mbr_pageno { - // last block inclusive - break; - } - - // handle wraparound - if pageno == MAX_MBR_BLKNO { - pageno = 0; - } else { - pageno += 1; - } + // Note: The multixact members can wrap around, even within one WAL record. + offset = offset.wrapping_add(n_this_page as u32); } if xlrec.mid >= self.checkpoint.nextMulti { self.checkpoint.nextMulti = xlrec.mid + 1; diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 26dbbf04ed..378a015d4a 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -7,8 +7,11 @@ use postgres_ffi::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD}; use postgres_ffi::XLogRecord; use postgres_ffi::{BlockNumber, OffsetNumber}; use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; +use serde::{Deserialize, Serialize}; use tracing::*; +use crate::repository::ZenithWalRecord; + /// DecodedBkpBlock represents per-page data contained in a WAL record. #[derive(Default)] pub struct DecodedBkpBlock { @@ -351,7 +354,7 @@ impl XlClogTruncate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct MultiXactMember { pub xid: TransactionId, pub status: MultiXactStatus, @@ -676,97 +679,6 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { assert_eq!(buf.remaining(), main_data_len as usize); } - // 5. Handle a few special record types that modify blocks without registering - // them with the standard mechanism. - if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK; - let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; - if info == pg_constants::XLOG_HEAP_INSERT { - let xlrec = XlHeapInsert::decode(&mut buf); - assert_eq!(0, buf.remaining()); - if (xlrec.flags - & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED - | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) - != 0 - { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); - } - } else if info == pg_constants::XLOG_HEAP_DELETE { - let xlrec = XlHeapDelete::decode(&mut buf); - assert_eq!(0, buf.remaining()); - if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); - } - } else if info == pg_constants::XLOG_HEAP_UPDATE - || info == pg_constants::XLOG_HEAP_HOT_UPDATE - { - let xlrec = XlHeapUpdate::decode(&mut buf); - // the size of tuple data is inferred from the size of the record. - // we can't validate the remaining number of bytes without parsing - // the tuple data. - if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); - } - if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 - && blocks.len() > 1 - { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; - blk.rnode_spcnode = blocks[1].rnode_spcnode; - blk.rnode_dbnode = blocks[1].rnode_dbnode; - blk.rnode_relnode = blocks[1].rnode_relnode; - blocks.push(blk); - } - } - } else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK; - if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { - let xlrec = XlHeapMultiInsert::decode(&mut buf); - - let offset_array_len = if xlogrec.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { - // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set - 0 - } else { - std::mem::size_of::() * xlrec.ntuples as usize - }; - assert_eq!(offset_array_len, buf.remaining()); - - if (xlrec.flags - & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED - | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) - != 0 - { - let mut blk = DecodedBkpBlock::new(); - let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); - } - } - } - DecodedWALRecord { xl_xid: xlogrec.xl_xid, xl_info: xlogrec.xl_info, @@ -781,7 +693,20 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { /// Build a human-readable string to describe a WAL record /// /// For debugging purposes -pub fn describe_wal_record(record: &Bytes) -> String { +pub fn describe_wal_record(rec: &ZenithWalRecord) -> String { + match rec { + ZenithWalRecord::Postgres { will_init, rec } => { + format!( + "will_init: {}, {}", + will_init, + describe_postgres_wal_record(rec) + ) + } + _ => format!("{:?}", rec), + } +} + +fn describe_postgres_wal_record(record: &Bytes) -> String { // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this. // Maybe use the postgres wal redo process, the same used for replaying WAL records? // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index a8e25d3478..5583f1e651 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -19,7 +19,7 @@ //! process, he cannot escape out of it. //! use byteorder::{ByteOrder, LittleEndian}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; use nix::poll::*; @@ -43,15 +43,12 @@ use zenith_utils::zid::ZTenantId; use crate::config::PageServerConf; use crate::relish::*; -use crate::repository::WALRecord; -use crate::walrecord::XlMultiXactCreate; -use crate::walrecord::XlXactParsedRecord; +use crate::repository::ZenithWalRecord; use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift; use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset; use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset; use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; -use postgres_ffi::XLogRecord; /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. @@ -82,7 +79,7 @@ pub trait WalRedoManager: Send + Sync { blknum: u32, lsn: Lsn, base_img: Option, - records: Vec<(Lsn, WALRecord)>, + records: Vec<(Lsn, ZenithWalRecord)>, ) -> Result; } @@ -99,7 +96,7 @@ impl crate::walredo::WalRedoManager for DummyRedoManager { _blknum: u32, _lsn: Lsn, _base_img: Option, - _records: Vec<(Lsn, WALRecord)>, + _records: Vec<(Lsn, ZenithWalRecord)>, ) -> Result { Err(WalRedoError::InvalidState) } @@ -142,23 +139,43 @@ pub struct PostgresRedoManager { process: Mutex>, } -#[derive(Debug)] -struct WalRedoRequest { - rel: RelishTag, - blknum: u32, - lsn: Lsn, - - base_img: Option, - records: Vec<(Lsn, WALRecord)>, -} - -impl WalRedoRequest { - // Can this request be served by zenith redo funcitons - // or we need to pass it to wal-redo postgres process? - fn can_apply_in_zenith(&self) -> bool { - !matches!(self.rel, RelishTag::Relation(_)) +/// Can this request be served by zenith redo funcitons +/// or we need to pass it to wal-redo postgres process? +fn can_apply_in_zenith(rec: &ZenithWalRecord) -> bool { + // Currently, we don't have bespoken Rust code to replay any + // Postgres WAL records. But everything else is handled in zenith. + #[allow(clippy::match_like_matches_macro)] + match rec { + ZenithWalRecord::Postgres { + will_init: _, + rec: _, + } => false, + _ => true, } } + +fn check_forknum(rel: &RelishTag, expected_forknum: u8) -> bool { + if let RelishTag::Relation(RelTag { + forknum, + spcnode: _, + dbnode: _, + relnode: _, + }) = rel + { + *forknum == expected_forknum + } else { + false + } +} + +fn check_slru_segno(rel: &RelishTag, expected_slru: SlruKind, expected_segno: u32) -> bool { + if let RelishTag::Slru { slru, segno } = rel { + *slru == expected_slru && *segno == expected_segno + } else { + false + } +} + /// An error happened in WAL redo #[derive(Debug, thiserror::Error)] pub enum WalRedoError { @@ -187,53 +204,37 @@ impl WalRedoManager for PostgresRedoManager { blknum: u32, lsn: Lsn, base_img: Option, - records: Vec<(Lsn, WALRecord)>, + records: Vec<(Lsn, ZenithWalRecord)>, ) -> Result { - let start_time; - let end_time; - - let request = WalRedoRequest { - rel, - blknum, - lsn, - base_img, - records, - }; - - start_time = Instant::now(); - let result; - - if request.can_apply_in_zenith() { - result = self.handle_apply_request_zenith(&request); - - end_time = Instant::now(); - WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64()); - } else { - let mut process_guard = self.process.lock().unwrap(); - let lock_time = Instant::now(); - - // launch the WAL redo process on first use - if process_guard.is_none() { - let p = PostgresRedoProcess::launch(self.conf, &self.tenantid)?; - *process_guard = Some(p); - } - let process = process_guard.as_mut().unwrap(); - - result = self.handle_apply_request_postgres(process, &request); - - WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); - end_time = Instant::now(); - WAL_REDO_TIME.observe(end_time.duration_since(lock_time).as_secs_f64()); - - // If something went wrong, don't try to reuse the process. Kill it, and - // next request will launch a new one. - if result.is_err() { - let process = process_guard.take().unwrap(); - process.kill(); - } + if records.is_empty() { + error!("invalid WAL redo request with no records"); + return Err(WalRedoError::InvalidRequest); } - result + let mut img: Option = base_img; + let mut batch_zenith = can_apply_in_zenith(&records[0].1); + let mut batch_start = 0; + for i in 1..records.len() { + let rec_zenith = can_apply_in_zenith(&records[i].1); + + if rec_zenith != batch_zenith { + let result = if batch_zenith { + self.apply_batch_zenith(rel, blknum, lsn, img, &records[batch_start..i]) + } else { + self.apply_batch_postgres(rel, blknum, lsn, img, &records[batch_start..i]) + }; + img = Some(result?); + + batch_zenith = rec_zenith; + batch_start = i; + } + } + // last batch + if batch_zenith { + self.apply_batch_zenith(rel, blknum, lsn, img, &records[batch_start..]) + } else { + self.apply_batch_postgres(rel, blknum, lsn, img, &records[batch_start..]) + } } } @@ -253,203 +254,229 @@ impl PostgresRedoManager { /// /// Process one request for WAL redo using wal-redo postgres /// - fn handle_apply_request_postgres( + fn apply_batch_postgres( &self, - process: &mut PostgresRedoProcess, - request: &WalRedoRequest, + rel: RelishTag, + blknum: u32, + lsn: Lsn, + base_img: Option, + records: &[(Lsn, ZenithWalRecord)], ) -> Result { - let blknum = request.blknum; - let lsn = request.lsn; - let base_img = request.base_img.clone(); - let records = &request.records; - let nrecords = records.len(); - - let start = Instant::now(); + let start_time = Instant::now(); let apply_result: Result; - if let RelishTag::Relation(rel) = request.rel { + let mut process_guard = self.process.lock().unwrap(); + let lock_time = Instant::now(); + + // launch the WAL redo process on first use + if process_guard.is_none() { + let p = PostgresRedoProcess::launch(self.conf, &self.tenantid)?; + *process_guard = Some(p); + } + let process = process_guard.as_mut().unwrap(); + + WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); + + let result = if let RelishTag::Relation(rel) = rel { // Relational WAL records are applied using wal-redo-postgres let buf_tag = BufferTag { rel, blknum }; apply_result = process.apply_wal_records(buf_tag, base_img, records); - let duration = start.elapsed(); - - debug!( - "postgres applied {} WAL records in {} us to reconstruct page image at LSN {}", - nrecords, - duration.as_micros(), - lsn - ); - apply_result.map_err(WalRedoError::IoError) } else { + error!("unexpected non-relation relish: {:?}", rel); Err(WalRedoError::InvalidRequest) + }; + + let end_time = Instant::now(); + let duration = end_time.duration_since(lock_time); + WAL_REDO_TIME.observe(duration.as_secs_f64()); + debug!( + "postgres applied {} WAL records in {} us to reconstruct page image at LSN {}", + records.len(), + duration.as_micros(), + lsn + ); + + // If something went wrong, don't try to reuse the process. Kill it, and + // next request will launch a new one. + if result.is_err() { + let process = process_guard.take().unwrap(); + process.kill(); } + result } /// - /// Process one request for WAL redo using custom zenith code + /// Process a batch of WAL records using bespoken Zenith code. /// - fn handle_apply_request_zenith(&self, request: &WalRedoRequest) -> Result { - let rel = request.rel; - let blknum = request.blknum; - let lsn = request.lsn; - let base_img = request.base_img.clone(); - let records = &request.records; + fn apply_batch_zenith( + &self, + rel: RelishTag, + blknum: u32, + lsn: Lsn, + base_img: Option, + records: &[(Lsn, ZenithWalRecord)], + ) -> Result { + let start_time = Instant::now(); - let nrecords = records.len(); - - let start = Instant::now(); - - let apply_result: Result; - - // Non-relational WAL records are handled here, with custom code that has the - // same effects as the corresponding Postgres WAL redo function. - const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; let mut page = BytesMut::new(); if let Some(fpi) = base_img { // If full-page image is provided, then use it... page.extend_from_slice(&fpi[..]); } else { - // otherwise initialize page with zeros - page.extend_from_slice(&ZERO_PAGE); - } - // Apply all collected WAL records - for (_lsn, record) in records { - let mut buf = record.rec.clone(); - - WAL_REDO_RECORD_COUNTER.inc(); - - // 1. Parse XLogRecord struct - // FIXME: refactor to avoid code duplication. - let xlogrec = XLogRecord::from_bytes(&mut buf); - - //move to main data - // TODO probably, we should store some records in our special format - // to avoid this weird parsing on replay - let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; - if buf.remaining() > skip { - buf.advance(skip); - } - - if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { - // Transaction manager stuff - let rec_segno = match rel { - RelishTag::Slru { slru, segno } => { - assert!( - slru == SlruKind::Clog, - "Not valid XACT relish tag {:?}", - rel - ); - segno - } - _ => panic!("Not valid XACT relish tag {:?}", rel), - }; - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info); - if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT - || parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED - { - // Iterate the main XID, followed by all the subxids. - for xid in std::iter::once(&parsed_xact.xid).chain(parsed_xact.subxacts.iter()) - { - let pageno = *xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - // only update xids on the requested page - if rec_segno == segno && blknum == rpageno { - transaction_id_set_status( - *xid, - pg_constants::TRANSACTION_STATUS_COMMITTED, - &mut page, - ); - } - } - } else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT - || parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED - { - // Iterate the main XID, followed by all the subxids. - for xid in std::iter::once(&parsed_xact.xid).chain(parsed_xact.subxacts.iter()) - { - let pageno = *xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - // only update xids on the requested page - if rec_segno == segno && blknum == rpageno { - transaction_id_set_status( - *xid, - pg_constants::TRANSACTION_STATUS_ABORTED, - &mut page, - ); - } - } - } - } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - // Multixact operations - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - if let RelishTag::Slru { - slru, - segno: rec_segno, - } = rel - { - if slru == SlruKind::MultiXactMembers { - for i in 0..xlrec.nmembers { - let offset = xlrec.moff + i; - let pageno = - offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - if segno == rec_segno && rpageno == blknum { - // update only target block - let memberoff = mx_offset_to_member_offset(offset); - let flagsoff = mx_offset_to_flags_offset(offset); - let bshift = mx_offset_to_flags_bitshift(offset); - let mut flagsval = - LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); - flagsval &= - !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) - << bshift); - flagsval |= xlrec.members[i as usize].status << bshift; - LittleEndian::write_u32( - &mut page[flagsoff..flagsoff + 4], - flagsval, - ); - LittleEndian::write_u32( - &mut page[memberoff..memberoff + 4], - xlrec.members[i as usize].xid, - ); - } - } - } else { - // Multixact offsets SLRU - let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 - * 4) as usize; - LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); - } - } else { - panic!(); - } - } else { - panic!(); - } - } + // All the current WAL record types that we can handle require a base image. + error!("invalid zenith WAL redo request with no base image"); + return Err(WalRedoError::InvalidRequest); } - apply_result = Ok::(page.freeze()); - - let duration = start.elapsed(); + // Apply all the WAL records in the batch + for (record_lsn, record) in records.iter() { + self.apply_record_zenith(rel, blknum, &mut page, *record_lsn, record)?; + } + // Success! + let end_time = Instant::now(); + let duration = end_time.duration_since(start_time); + WAL_REDO_TIME.observe(duration.as_secs_f64()); debug!( "zenith applied {} WAL records in {} ms to reconstruct page image at LSN {}", - nrecords, - duration.as_millis(), + records.len(), + duration.as_micros(), lsn ); - apply_result.map_err(WalRedoError::IoError) + Ok(page.freeze()) + } + + fn apply_record_zenith( + &self, + rel: RelishTag, + blknum: u32, + page: &mut BytesMut, + _record_lsn: Lsn, + record: &ZenithWalRecord, + ) -> Result<(), WalRedoError> { + match record { + ZenithWalRecord::Postgres { + will_init: _, + rec: _, + } => panic!("tried to pass postgres wal record to zenith WAL redo"), + ZenithWalRecord::ClearVisibilityMapFlags { heap_blkno, flags } => { + // Calculate the VM block and offset that corresponds to the heap block. + let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(*heap_blkno); + let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(*heap_blkno); + let map_offset = pg_constants::HEAPBLK_TO_OFFSET(*heap_blkno); + + // Check that we're modifying the correct VM block. + assert!( + check_forknum(&rel, pg_constants::VISIBILITYMAP_FORKNUM), + "ClearVisibilityMapFlags record on unexpected rel {:?}", + rel + ); + assert!(map_block == blknum); + + // equivalent to PageGetContents(page) + let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; + + let mask: u8 = flags << map_offset; + map[map_byte as usize] &= !mask; + } + // Non-relational WAL records are handled here, with custom code that has the + // same effects as the corresponding Postgres WAL redo function. + ZenithWalRecord::ClogSetCommitted { xids } => { + for &xid in xids { + let pageno = xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + // Check that we're modifying the correct CLOG block. + assert!( + check_slru_segno(&rel, SlruKind::Clog, expected_segno), + "ClogSetCommitted record for XID {} with unexpected rel {:?}", + xid, + rel + ); + assert!(blknum == expected_blknum); + + transaction_id_set_status( + xid, + pg_constants::TRANSACTION_STATUS_COMMITTED, + page, + ); + } + } + ZenithWalRecord::ClogSetAborted { xids } => { + for &xid in xids { + let pageno = xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + // Check that we're modifying the correct CLOG block. + assert!( + check_slru_segno(&rel, SlruKind::Clog, expected_segno), + "ClogSetCommitted record for XID {} with unexpected rel {:?}", + xid, + rel + ); + assert!(blknum == expected_blknum); + + transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page); + } + } + ZenithWalRecord::MultixactOffsetCreate { mid, moff } => { + // Compute the block and offset to modify. + // See RecordNewMultiXact in PostgreSQL sources. + let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let offset = (entryno * 4) as usize; + + // Check that we're modifying the correct multixact-offsets block. + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + assert!( + check_slru_segno(&rel, SlruKind::MultiXactOffsets, expected_segno), + "MultiXactOffsetsCreate record for multi-xid {} with unexpected rel {:?}", + mid, + rel + ); + assert!(blknum == expected_blknum); + + LittleEndian::write_u32(&mut page[offset..offset + 4], *moff); + } + ZenithWalRecord::MultixactMembersCreate { moff, members } => { + for (i, member) in members.iter().enumerate() { + let offset = moff + i as u32; + + // Compute the block and offset to modify. + // See RecordNewMultiXact in PostgreSQL sources. + let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + + // Check that we're modifying the correct multixact-members block. + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + assert!( + check_slru_segno(&rel, SlruKind::MultiXactMembers, expected_segno), + "MultiXactMembersCreate record at offset {} with unexpected rel {:?}", + moff, + rel + ); + assert!(blknum == expected_blknum); + + let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift); + flagsval |= member.status << bshift; + LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval); + LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid); + } + } + } + + Ok(()) } } @@ -556,7 +583,7 @@ impl PostgresRedoProcess { &mut self, tag: BufferTag, base_img: Option, - records: &[(Lsn, WALRecord)], + records: &[(Lsn, ZenithWalRecord)], ) -> Result { // Serialize all the messages to send the WAL redo process first. // @@ -569,7 +596,15 @@ impl PostgresRedoProcess { build_push_page_msg(tag, &img, &mut writebuf); } for (lsn, rec) in records.iter() { - build_apply_record_msg(*lsn, &rec.rec, &mut writebuf); + if let ZenithWalRecord::Postgres { + will_init: _, + rec: postgres_rec, + } = rec + { + build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); + } else { + panic!("tried to pass zenith wal record to postgres WAL redo"); + } } build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); diff --git a/postgres_ffi/build.rs b/postgres_ffi/build.rs index ac738c7b0f..3b4b37f9ee 100644 --- a/postgres_ffi/build.rs +++ b/postgres_ffi/build.rs @@ -75,6 +75,7 @@ fn main() { .allowlist_var("XLOG_PAGE_MAGIC") .allowlist_var("PG_CONTROL_FILE_SIZE") .allowlist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC") + .allowlist_type("PageHeaderData") .allowlist_type("DBState") // Because structs are used for serialization, tell bindgen to emit // explicit padding fields. diff --git a/postgres_ffi/pg_control_ffi.h b/postgres_ffi/pg_control_ffi.h index 9877c0de6c..9da3f09b84 100644 --- a/postgres_ffi/pg_control_ffi.h +++ b/postgres_ffi/pg_control_ffi.h @@ -9,5 +9,6 @@ #include "access/xlog_internal.h" #include "storage/block.h" +#include "storage/bufpage.h" #include "storage/off.h" #include "access/multixact.h" diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index d5a4468dc7..923fbe4d5a 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -1,6 +1,8 @@ #![allow(non_upper_case_globals)] #![allow(non_camel_case_types)] #![allow(non_snake_case)] +// bindgen creates some unsafe code with no doc comments. +#![allow(clippy::missing_safety_doc)] // suppress warnings on rust 1.53 due to bindgen unit tests. // https://github.com/rust-lang/rust-bindgen/issues/1651 #![allow(deref_nullptr)] diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 5558b280f0..76f837cefc 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -7,6 +7,8 @@ //! comments on them. //! +use crate::PageHeaderData; + // // From pg_tablespace_d.h // @@ -31,6 +33,14 @@ pub const SMGR_TRUNCATE_FSM: u32 = 0x0004; pub const BLCKSZ: u16 = 8192; pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); +// +// From bufpage.h +// + +// Assumes 8 byte alignment +const SIZEOF_PAGE_HEADER_DATA: usize = std::mem::size_of::(); +pub const MAXALIGN_SIZE_OF_PAGE_HEADER_DATA: usize = (SIZEOF_PAGE_HEADER_DATA + 7) & !7; + // // constants from clog.h // @@ -39,13 +49,6 @@ pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE; pub const CLOG_BITS_PER_XACT: u8 = 2; pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; -// -// Constants from visbilitymap.h -// -pub const SIZE_OF_PAGE_HEADER: u16 = 24; -pub const BITS_PER_HEAPBLOCK: u16 = 2; -pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; - pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; @@ -53,6 +56,30 @@ pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; pub const CLOG_ZEROPAGE: u8 = 0x00; pub const CLOG_TRUNCATE: u8 = 0x10; +// +// Constants from visibilitymap.h, visibilitymapdefs.h and visibilitymap.c +// +pub const SIZE_OF_PAGE_HEADER: u16 = 24; +pub const BITS_PER_BYTE: u16 = 8; +pub const HEAPBLOCKS_PER_PAGE: u32 = + (BLCKSZ - SIZE_OF_PAGE_HEADER) as u32 * 8 / BITS_PER_HEAPBLOCK as u32; +pub const HEAPBLOCKS_PER_BYTE: u16 = BITS_PER_BYTE / BITS_PER_HEAPBLOCK; + +pub const fn HEAPBLK_TO_MAPBLOCK(x: u32) -> u32 { + x / HEAPBLOCKS_PER_PAGE +} +pub const fn HEAPBLK_TO_MAPBYTE(x: u32) -> u32 { + (x % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE as u32 +} +pub const fn HEAPBLK_TO_OFFSET(x: u32) -> u32 { + (x % HEAPBLOCKS_PER_BYTE as u32) * BITS_PER_HEAPBLOCK as u32 +} + +pub const BITS_PER_HEAPBLOCK: u16 = 2; +pub const VISIBILITYMAP_ALL_VISIBLE: u8 = 0x01; +pub const VISIBILITYMAP_ALL_FROZEN: u8 = 0x02; +pub const VISIBILITYMAP_VALID_BITS: u8 = 0x03; + // From xact.h pub const XLOG_XACT_COMMIT: u8 = 0x00; pub const XLOG_XACT_PREPARE: u8 = 0x10;