From 55a4cf64a1310ac5f38099e2e0589ef1ec547f09 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 4 Jan 2022 11:24:24 +0200 Subject: [PATCH] Refactor WAL record handling. Introduce the concept of a "ZenithWalRecord", which can be a Postgres WAL record that is replayed with the Postgres WAL redo process, or a built-in type that is handled entirely by pageserver code. Replace the special code to replay Postgres XACT commit/abort records with new Zenith WAL records. A separate zenith WAL record is created for each modified CLOG page. This allows removing the 'main_data_offset' field from stored PostgreSQL WAL records, which saves some memory and some disk space in delta layers. Introduce zenith WAL records for updating bits in the visibility map. Previously, when e.g. a heap insert cleared the VM bit, we duplicated the heap insert WAL record for the affected VM page. That was very wasteful. The heap WAL record could be massive, containing a full page image in the worst case. This addresses github issue #941. --- pageserver/src/layered_repository.rs | 10 +- .../src/layered_repository/delta_layer.rs | 8 +- .../src/layered_repository/inmemory_layer.rs | 11 +- .../src/layered_repository/storage_layer.rs | 6 +- pageserver/src/repository.rs | 48 +- pageserver/src/walingest.rs | 405 ++++++++++---- pageserver/src/walrecord.rs | 111 +--- pageserver/src/walredo.rs | 503 ++++++++++-------- postgres_ffi/build.rs | 1 + postgres_ffi/pg_control_ffi.h | 1 + postgres_ffi/src/lib.rs | 2 + postgres_ffi/src/pg_constants.rs | 41 +- 12 files changed, 672 insertions(+), 475 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index ad0f086332..c5ccb5a40f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -38,7 +38,7 @@ use crate::relish::*; use crate::remote_storage::{schedule_timeline_checkpoint_upload, schedule_timeline_download}; use crate::repository::{ BlockNumber, GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncState, - TimelineWriter, WALRecord, + TimelineWriter, ZenithWalRecord, }; use crate::tenant_mgr; use crate::walreceiver; @@ -1981,7 +1981,7 @@ impl LayeredTimeline { assert!(data.page_img.is_none()); if let Some((first_rec_lsn, first_rec)) = data.records.first() { assert!(&cached_lsn < first_rec_lsn); - assert!(!first_rec.will_init); + assert!(!first_rec.will_init()); } data.page_img = Some(cached_img); break; @@ -2028,7 +2028,7 @@ impl LayeredTimeline { // // If we don't have a base image, then the oldest WAL record better initialize // the page - if data.page_img.is_none() && !data.records.first().unwrap().1.will_init { + if data.page_img.is_none() && !data.records.first().unwrap().1.will_init() { // FIXME: this ought to be an error? warn!( "Base image for page {}/{} at {} not found, but got {} WAL records", @@ -2125,8 +2125,8 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> { &self, lsn: Lsn, rel: RelishTag, - rel_blknum: BlockNumber, - rec: WALRecord, + rel_blknum: u32, + rec: ZenithWalRecord, ) -> Result<()> { if !rel.is_blocky() && rel_blknum != 0 { bail!( diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 0a3b291b15..32393b23af 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -257,7 +257,7 @@ impl Layer for DeltaLayer { break; } PageVersion::Wal(rec) => { - let will_init = rec.will_init; + let will_init = rec.will_init(); reconstruct_data.records.push((*pv_lsn, rec)); if will_init { // This WAL record initializes the page, so no need to go further back @@ -373,12 +373,12 @@ impl Layer for DeltaLayer { write!(&mut desc, " img {} bytes", img.len())?; } PageVersion::Wal(rec) => { - let wal_desc = walrecord::describe_wal_record(&rec.rec); + let wal_desc = walrecord::describe_wal_record(&rec); write!( &mut desc, " rec {} bytes will_init: {} {}", - rec.rec.len(), - rec.will_init, + blob_range.size, + rec.will_init(), wal_desc )?; } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 68f98735c2..6242f0a361 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -15,7 +15,7 @@ use crate::layered_repository::storage_layer::{ }; use crate::layered_repository::LayeredTimeline; use crate::layered_repository::ZERO_PAGE; -use crate::repository::WALRecord; +use crate::repository::ZenithWalRecord; use crate::{ZTenantId, ZTimelineId}; use anyhow::{ensure, Result}; use bytes::Bytes; @@ -187,7 +187,7 @@ impl Layer for InMemoryLayer { } PageVersion::Wal(rec) => { reconstruct_data.records.push((*entry_lsn, rec.clone())); - if rec.will_init { + if rec.will_init() { // This WAL record initializes the page, so no need to go further back need_image = false; break; @@ -369,7 +369,12 @@ impl InMemoryLayer { // Write operations /// Remember new page version, as a WAL record over previous version - pub fn put_wal_record(&self, lsn: Lsn, blknum: SegmentBlk, rec: WALRecord) -> Result { + pub fn put_wal_record( + &self, + lsn: Lsn, + blknum: SegmentBlk, + rec: ZenithWalRecord, + ) -> Result { self.put_page_version(blknum, lsn, PageVersion::Wal(rec)) } diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index dab650d2ab..99fdaa6845 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -3,7 +3,7 @@ //! use crate::relish::RelishTag; -use crate::repository::{BlockNumber, WALRecord}; +use crate::repository::{BlockNumber, ZenithWalRecord}; use crate::{ZTenantId, ZTimelineId}; use anyhow::Result; use bytes::Bytes; @@ -67,7 +67,7 @@ impl SegmentTag { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum PageVersion { Page(Bytes), - Wal(WALRecord), + Wal(ZenithWalRecord), } /// @@ -78,7 +78,7 @@ pub enum PageVersion { /// 'records' contains the records to apply over the base image. /// pub struct PageReconstructData { - pub records: Vec<(Lsn, WALRecord)>, + pub records: Vec<(Lsn, ZenithWalRecord)>, pub page_img: Option, } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 4bed6449ed..8ddc2b4952 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,7 +1,9 @@ use crate::relish::*; +use crate::walrecord::MultiXactMember; use crate::CheckpointConfig; use anyhow::Result; use bytes::Bytes; +use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::ops::{AddAssign, Deref}; @@ -263,7 +265,7 @@ pub trait TimelineWriter: Deref { lsn: Lsn, tag: RelishTag, blknum: BlockNumber, - rec: WALRecord, + rec: ZenithWalRecord, ) -> Result<()>; /// Like put_wal_record, but with ready-made image of the page. @@ -288,14 +290,42 @@ pub trait TimelineWriter: Deref { fn advance_last_record_lsn(&self, lsn: Lsn); } +/// Each update to a page is represented by a ZenithWalRecord. It can be a wrapper +/// around a PostgreSQL WAL record, or a custom zenith-specific "record". #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct WALRecord { - pub will_init: bool, - pub rec: Bytes, - // Remember the offset of main_data in rec, - // so that we don't have to parse the record again. - // If record has no main_data, this offset equals rec.len(). - pub main_data_offset: u32, +pub enum ZenithWalRecord { + /// Native PostgreSQL WAL record + Postgres { will_init: bool, rec: Bytes }, + + /// Set bits in heap visibility map. (heap blkno, flag bits to clear) + ClearVisibilityMapFlags { heap_blkno: u32, flags: u8 }, + /// Mark transaction IDs as committed on a CLOG page + ClogSetCommitted { xids: Vec }, + /// Mark transaction IDs as aborted on a CLOG page + ClogSetAborted { xids: Vec }, + /// Extend multixact offsets SLRU + MultixactOffsetCreate { + mid: MultiXactId, + moff: MultiXactOffset, + }, + /// Extend multixact members SLRU. + MultixactMembersCreate { + moff: MultiXactOffset, + members: Vec, + }, +} + +impl ZenithWalRecord { + /// Does replaying this WAL record initialize the page from scratch, or does + /// it need to be applied over the previous image of the page? + pub fn will_init(&self) -> bool { + match self { + ZenithWalRecord::Postgres { will_init, rec: _ } => *will_init, + + // None of the special zenith record types currently initialize the page + _ => false, + } + } } #[cfg(test)] @@ -378,7 +408,7 @@ pub mod repo_harness { blknum: BlockNumber, lsn: Lsn, base_img: Option, - records: Vec<(Lsn, WALRecord)>, + records: Vec<(Lsn, ZenithWalRecord)>, ) -> Result { let s = format!( "redo for {} blk {} to get to {}, with {} and {} records", diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 0d2b04ef80..19228ae508 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -34,12 +34,10 @@ use crate::repository::*; use crate::walrecord::*; use postgres_ffi::nonrelfile_utils::mx_offset_to_member_segment; use postgres_ffi::xlog_utils::*; +use postgres_ffi::TransactionId; use postgres_ffi::{pg_constants, CheckPoint}; use zenith_utils::lsn::Lsn; -const MAX_MBR_BLKNO: u32 = - pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); pub struct WalIngest { @@ -74,66 +72,23 @@ impl WalIngest { recdata: Bytes, lsn: Lsn, ) -> Result<()> { - let decoded = decode_wal_record(recdata.clone()); + let mut decoded = decode_wal_record(recdata); + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); assert!(!self.checkpoint_modified); if self.checkpoint.update_next_xid(decoded.xl_xid) { self.checkpoint_modified = true; } - // Iterate through all the blocks that the record modifies, and - // "put" a separate copy of the record for each block. - for blk in decoded.blocks.iter() { - let tag = RelishTag::Relation(RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, - }); - - // - // Instead of storing full-page-image WAL record, - // it is better to store extracted image: we can skip wal-redo - // in this case. Also some FPI records may contain multiple (up to 32) pages, - // so them have to be copied multiple times. - // - if blk.apply_image - && blk.has_image - && decoded.xl_rmid == pg_constants::RM_XLOG_ID - && (decoded.xl_info == pg_constants::XLOG_FPI - || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) - // compression of WAL is not yet supported: fall back to storing the original WAL record - && (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0 - { - // Extract page image from FPI record - let img_len = blk.bimg_len as usize; - let img_offs = blk.bimg_offset as usize; - let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); - image.extend_from_slice(&recdata[img_offs..img_offs + img_len]); - - if blk.hole_length != 0 { - let tail = image.split_off(blk.hole_offset as usize); - image.resize(image.len() + blk.hole_length as usize, 0u8); - image.unsplit(tail); - } - image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); - image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); - assert_eq!(image.len(), pg_constants::BLCKSZ as usize); - timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?; - } else { - let rec = WALRecord { - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; - } + // Heap AM records need some special handling, because they modify VM pages + // without registering them with the standard mechanism. + if decoded.xl_rmid == pg_constants::RM_HEAP_ID + || decoded.xl_rmid == pg_constants::RM_HEAP2_ID + { + self.ingest_heapam_record(&mut buf, timeline, lsn, &mut decoded)?; } - - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); - - // Handle a few special record types + // Handle other special record types if decoded.xl_rmid == pg_constants::RM_SMGR_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE @@ -202,13 +157,23 @@ impl WalIngest { if info == pg_constants::XLOG_XACT_COMMIT || info == pg_constants::XLOG_XACT_ABORT { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record(timeline, lsn, &parsed_xact, &decoded)?; + self.ingest_xact_record( + timeline, + lsn, + &parsed_xact, + info == pg_constants::XLOG_XACT_COMMIT, + )?; } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED || info == pg_constants::XLOG_XACT_ABORT_PREPARED { let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); - self.ingest_xact_record(timeline, lsn, &parsed_xact, &decoded)?; + self.ingest_xact_record( + timeline, + lsn, + &parsed_xact, + info == pg_constants::XLOG_XACT_COMMIT_PREPARED, + )?; // Remove twophase file. see RemoveTwoPhaseFile() in postgres code trace!( "Drop twophaseFile for xid {} parsed_xact.xid {} here at {}", @@ -223,9 +188,6 @@ impl WalIngest { lsn, )?; } else if info == pg_constants::XLOG_XACT_PREPARE { - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); - timeline.put_page_image( RelishTag::TwoPhase { xid: decoded.xl_xid, @@ -266,7 +228,7 @@ impl WalIngest { )?; } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); - self.ingest_multixact_create_record(timeline, lsn, &xlrec, &decoded)?; + self.ingest_multixact_create_record(timeline, lsn, &xlrec)?; } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); self.ingest_multixact_truncate_record(timeline, lsn, &xlrec)?; @@ -286,8 +248,6 @@ impl WalIngest { || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN { let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT]; - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); buf.copy_to_slice(&mut checkpoint_bytes); let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes).unwrap(); trace!( @@ -307,6 +267,12 @@ impl WalIngest { } } + // Iterate through all the blocks that the record modifies, and + // "put" a separate copy of the record for each block. + for blk in decoded.blocks.iter() { + self.ingest_decoded_block(timeline, lsn, &decoded, blk)?; + } + // If checkpoint data was updated, store the new version in the repository if self.checkpoint_modified { let new_checkpoint_bytes = self.checkpoint.encode(); @@ -322,6 +288,195 @@ impl WalIngest { Ok(()) } + fn ingest_decoded_block( + &mut self, + timeline: &dyn TimelineWriter, + lsn: Lsn, + decoded: &DecodedWALRecord, + blk: &DecodedBkpBlock, + ) -> Result<()> { + let tag = RelishTag::Relation(RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, + }); + + // + // Instead of storing full-page-image WAL record, + // it is better to store extracted image: we can skip wal-redo + // in this case. Also some FPI records may contain multiple (up to 32) pages, + // so them have to be copied multiple times. + // + if blk.apply_image + && blk.has_image + && decoded.xl_rmid == pg_constants::RM_XLOG_ID + && (decoded.xl_info == pg_constants::XLOG_FPI + || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) + // compression of WAL is not yet supported: fall back to storing the original WAL record + && (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0 + { + // Extract page image from FPI record + let img_len = blk.bimg_len as usize; + let img_offs = blk.bimg_offset as usize; + let mut image = BytesMut::with_capacity(pg_constants::BLCKSZ as usize); + image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); + + if blk.hole_length != 0 { + let tail = image.split_off(blk.hole_offset as usize); + image.resize(image.len() + blk.hole_length as usize, 0u8); + image.unsplit(tail); + } + image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); + image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); + assert_eq!(image.len(), pg_constants::BLCKSZ as usize); + timeline.put_page_image(tag, blk.blkno, lsn, image.freeze())?; + } else { + let rec = ZenithWalRecord::Postgres { + will_init: blk.will_init || blk.apply_image, + rec: decoded.record.clone(), + }; + timeline.put_wal_record(lsn, tag, blk.blkno, rec)?; + } + Ok(()) + } + + fn ingest_heapam_record( + &mut self, + buf: &mut Bytes, + timeline: &dyn TimelineWriter, + lsn: Lsn, + decoded: &mut DecodedWALRecord, + ) -> Result<()> { + // Handle VM bit updates that are implicitly part of heap records. + if decoded.xl_rmid == pg_constants::RM_HEAP_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + if info == pg_constants::XLOG_HEAP_INSERT { + let xlrec = XlHeapInsert::decode(buf); + assert_eq!(0, buf.remaining()); + if (xlrec.flags + & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED + | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) + != 0 + { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }), + decoded.blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[0].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + } else if info == pg_constants::XLOG_HEAP_DELETE { + let xlrec = XlHeapDelete::decode(buf); + assert_eq!(0, buf.remaining()); + if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }), + decoded.blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[0].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + } else if info == pg_constants::XLOG_HEAP_UPDATE + || info == pg_constants::XLOG_HEAP_HOT_UPDATE + { + let xlrec = XlHeapUpdate::decode(buf); + // the size of tuple data is inferred from the size of the record. + // we can't validate the remaining number of bytes without parsing + // the tuple data. + if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }), + decoded.blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[0].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 + && decoded.blocks.len() > 1 + { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[1].rnode_spcnode, + dbnode: decoded.blocks[1].rnode_dbnode, + relnode: decoded.blocks[1].rnode_relnode, + }), + decoded.blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[1].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + } + } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { + let xlrec = XlHeapMultiInsert::decode(buf); + + let offset_array_len = if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { + // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set + 0 + } else { + std::mem::size_of::() * xlrec.ntuples as usize + }; + assert_eq!(offset_array_len, buf.remaining()); + + // FIXME: why also ALL_FROZEN_SET? + if (xlrec.flags + & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED + | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) + != 0 + { + timeline.put_wal_record( + lsn, + RelishTag::Relation(RelTag { + forknum: pg_constants::VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }), + decoded.blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32, + ZenithWalRecord::ClearVisibilityMapFlags { + heap_blkno: decoded.blocks[0].blkno, + flags: pg_constants::VISIBILITYMAP_VALID_BITS, + }, + )?; + } + } + } + + // FIXME: What about XLOG_HEAP_LOCK and XLOG_HEAP2_LOCK_UPDATED? + + Ok(()) + } + /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. fn ingest_xlog_dbase_create( &mut self, @@ -476,34 +631,20 @@ impl WalIngest { timeline: &dyn TimelineWriter, lsn: Lsn, parsed: &XlXactParsedRecord, - decoded: &DecodedWALRecord, + is_commit: bool, ) -> Result<()> { - // Record update of CLOG page + // Record update of CLOG pages let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; - - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - let rec = WALRecord { - will_init: false, - rec: decoded.record.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - timeline.put_wal_record( - lsn, - RelishTag::Slru { - slru: SlruKind::Clog, - segno, - }, - rpageno, - rec.clone(), - )?; + let mut segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let mut rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + let mut page_xids: Vec = vec![parsed.xid]; for subxact in &parsed.subxacts { let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; if subxact_pageno != pageno { - pageno = subxact_pageno; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + // This subxact goes to different page. Write the record + // for all the XIDs on the previous page, and continue + // accumulating XIDs on this new page. timeline.put_wal_record( lsn, RelishTag::Slru { @@ -511,10 +652,33 @@ impl WalIngest { segno, }, rpageno, - rec.clone(), + if is_commit { + ZenithWalRecord::ClogSetCommitted { xids: page_xids } + } else { + ZenithWalRecord::ClogSetAborted { xids: page_xids } + }, )?; + page_xids = Vec::new(); } + pageno = subxact_pageno; + segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + page_xids.push(*subxact); } + timeline.put_wal_record( + lsn, + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + }, + rpageno, + if is_commit { + ZenithWalRecord::ClogSetCommitted { xids: page_xids } + } else { + ZenithWalRecord::ClogSetAborted { xids: page_xids } + }, + )?; + for xnode in &parsed.xnodes { for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { let rel = RelTag { @@ -596,16 +760,12 @@ impl WalIngest { timeline: &dyn TimelineWriter, lsn: Lsn, xlrec: &XlMultiXactCreate, - decoded: &DecodedWALRecord, ) -> Result<()> { - let rec = WALRecord { - will_init: false, - rec: decoded.record.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; + // Create WAL record for updating the multixact-offsets page let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_wal_record( lsn, RelishTag::Slru { @@ -613,40 +773,51 @@ impl WalIngest { segno, }, rpageno, - rec.clone(), + ZenithWalRecord::MultixactOffsetCreate { + mid: xlrec.mid, + moff: xlrec.moff, + }, )?; - let first_mbr_pageno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let last_mbr_pageno = - (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - // The members SLRU can, in contrast to the offsets one, be filled to almost - // the full range at once. So we need to handle wraparound. - let mut pageno = first_mbr_pageno; + // Create WAL records for the update of each affected multixact-members page + let mut members = xlrec.members.iter(); + let mut offset = xlrec.moff; loop { - // Update members page - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + + // How many members fit on this page? + let page_remain = pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32 + - offset % pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + + let mut this_page_members: Vec = Vec::new(); + for _ in 0..page_remain { + if let Some(m) = members.next() { + this_page_members.push(m.clone()); + } else { + break; + } + } + if this_page_members.is_empty() { + // all done + break; + } + let n_this_page = this_page_members.len(); + timeline.put_wal_record( lsn, RelishTag::Slru { slru: SlruKind::MultiXactMembers, - segno, + segno: pageno / pg_constants::SLRU_PAGES_PER_SEGMENT, + }, + pageno % pg_constants::SLRU_PAGES_PER_SEGMENT, + ZenithWalRecord::MultixactMembersCreate { + moff: offset, + members: this_page_members, }, - rpageno, - rec.clone(), )?; - if pageno == last_mbr_pageno { - // last block inclusive - break; - } - - // handle wraparound - if pageno == MAX_MBR_BLKNO { - pageno = 0; - } else { - pageno += 1; - } + // Note: The multixact members can wrap around, even within one WAL record. + offset = offset.wrapping_add(n_this_page as u32); } if xlrec.mid >= self.checkpoint.nextMulti { self.checkpoint.nextMulti = xlrec.mid + 1; diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 26dbbf04ed..378a015d4a 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -7,8 +7,11 @@ use postgres_ffi::xlog_utils::{TimestampTz, XLOG_SIZE_OF_XLOG_RECORD}; use postgres_ffi::XLogRecord; use postgres_ffi::{BlockNumber, OffsetNumber}; use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; +use serde::{Deserialize, Serialize}; use tracing::*; +use crate::repository::ZenithWalRecord; + /// DecodedBkpBlock represents per-page data contained in a WAL record. #[derive(Default)] pub struct DecodedBkpBlock { @@ -351,7 +354,7 @@ impl XlClogTruncate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct MultiXactMember { pub xid: TransactionId, pub status: MultiXactStatus, @@ -676,97 +679,6 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { assert_eq!(buf.remaining(), main_data_len as usize); } - // 5. Handle a few special record types that modify blocks without registering - // them with the standard mechanism. - if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK; - let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; - if info == pg_constants::XLOG_HEAP_INSERT { - let xlrec = XlHeapInsert::decode(&mut buf); - assert_eq!(0, buf.remaining()); - if (xlrec.flags - & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED - | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) - != 0 - { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); - } - } else if info == pg_constants::XLOG_HEAP_DELETE { - let xlrec = XlHeapDelete::decode(&mut buf); - assert_eq!(0, buf.remaining()); - if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); - } - } else if info == pg_constants::XLOG_HEAP_UPDATE - || info == pg_constants::XLOG_HEAP_HOT_UPDATE - { - let xlrec = XlHeapUpdate::decode(&mut buf); - // the size of tuple data is inferred from the size of the record. - // we can't validate the remaining number of bytes without parsing - // the tuple data. - if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); - } - if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 - && blocks.len() > 1 - { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blocks[1].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; - blk.rnode_spcnode = blocks[1].rnode_spcnode; - blk.rnode_dbnode = blocks[1].rnode_dbnode; - blk.rnode_relnode = blocks[1].rnode_relnode; - blocks.push(blk); - } - } - } else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_HEAP_OPMASK; - if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { - let xlrec = XlHeapMultiInsert::decode(&mut buf); - - let offset_array_len = if xlogrec.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { - // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set - 0 - } else { - std::mem::size_of::() * xlrec.ntuples as usize - }; - assert_eq!(offset_array_len, buf.remaining()); - - if (xlrec.flags - & (pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED - | pg_constants::XLH_INSERT_ALL_FROZEN_SET)) - != 0 - { - let mut blk = DecodedBkpBlock::new(); - let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; - blk.forknum = pg_constants::VISIBILITYMAP_FORKNUM; - blk.blkno = blkno; - blk.rnode_spcnode = blocks[0].rnode_spcnode; - blk.rnode_dbnode = blocks[0].rnode_dbnode; - blk.rnode_relnode = blocks[0].rnode_relnode; - blocks.push(blk); - } - } - } - DecodedWALRecord { xl_xid: xlogrec.xl_xid, xl_info: xlogrec.xl_info, @@ -781,7 +693,20 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { /// Build a human-readable string to describe a WAL record /// /// For debugging purposes -pub fn describe_wal_record(record: &Bytes) -> String { +pub fn describe_wal_record(rec: &ZenithWalRecord) -> String { + match rec { + ZenithWalRecord::Postgres { will_init, rec } => { + format!( + "will_init: {}, {}", + will_init, + describe_postgres_wal_record(rec) + ) + } + _ => format!("{:?}", rec), + } +} + +fn describe_postgres_wal_record(record: &Bytes) -> String { // TODO: It would be nice to use the PostgreSQL rmgrdesc infrastructure for this. // Maybe use the postgres wal redo process, the same used for replaying WAL records? // Or could we compile the rmgrdesc routines into the dump_layer_file() binary directly, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index a8e25d3478..5583f1e651 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -19,7 +19,7 @@ //! process, he cannot escape out of it. //! use byteorder::{ByteOrder, LittleEndian}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; use nix::poll::*; @@ -43,15 +43,12 @@ use zenith_utils::zid::ZTenantId; use crate::config::PageServerConf; use crate::relish::*; -use crate::repository::WALRecord; -use crate::walrecord::XlMultiXactCreate; -use crate::walrecord::XlXactParsedRecord; +use crate::repository::ZenithWalRecord; use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_bitshift; use postgres_ffi::nonrelfile_utils::mx_offset_to_flags_offset; use postgres_ffi::nonrelfile_utils::mx_offset_to_member_offset; use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; -use postgres_ffi::XLogRecord; /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. @@ -82,7 +79,7 @@ pub trait WalRedoManager: Send + Sync { blknum: u32, lsn: Lsn, base_img: Option, - records: Vec<(Lsn, WALRecord)>, + records: Vec<(Lsn, ZenithWalRecord)>, ) -> Result; } @@ -99,7 +96,7 @@ impl crate::walredo::WalRedoManager for DummyRedoManager { _blknum: u32, _lsn: Lsn, _base_img: Option, - _records: Vec<(Lsn, WALRecord)>, + _records: Vec<(Lsn, ZenithWalRecord)>, ) -> Result { Err(WalRedoError::InvalidState) } @@ -142,23 +139,43 @@ pub struct PostgresRedoManager { process: Mutex>, } -#[derive(Debug)] -struct WalRedoRequest { - rel: RelishTag, - blknum: u32, - lsn: Lsn, - - base_img: Option, - records: Vec<(Lsn, WALRecord)>, -} - -impl WalRedoRequest { - // Can this request be served by zenith redo funcitons - // or we need to pass it to wal-redo postgres process? - fn can_apply_in_zenith(&self) -> bool { - !matches!(self.rel, RelishTag::Relation(_)) +/// Can this request be served by zenith redo funcitons +/// or we need to pass it to wal-redo postgres process? +fn can_apply_in_zenith(rec: &ZenithWalRecord) -> bool { + // Currently, we don't have bespoken Rust code to replay any + // Postgres WAL records. But everything else is handled in zenith. + #[allow(clippy::match_like_matches_macro)] + match rec { + ZenithWalRecord::Postgres { + will_init: _, + rec: _, + } => false, + _ => true, } } + +fn check_forknum(rel: &RelishTag, expected_forknum: u8) -> bool { + if let RelishTag::Relation(RelTag { + forknum, + spcnode: _, + dbnode: _, + relnode: _, + }) = rel + { + *forknum == expected_forknum + } else { + false + } +} + +fn check_slru_segno(rel: &RelishTag, expected_slru: SlruKind, expected_segno: u32) -> bool { + if let RelishTag::Slru { slru, segno } = rel { + *slru == expected_slru && *segno == expected_segno + } else { + false + } +} + /// An error happened in WAL redo #[derive(Debug, thiserror::Error)] pub enum WalRedoError { @@ -187,53 +204,37 @@ impl WalRedoManager for PostgresRedoManager { blknum: u32, lsn: Lsn, base_img: Option, - records: Vec<(Lsn, WALRecord)>, + records: Vec<(Lsn, ZenithWalRecord)>, ) -> Result { - let start_time; - let end_time; - - let request = WalRedoRequest { - rel, - blknum, - lsn, - base_img, - records, - }; - - start_time = Instant::now(); - let result; - - if request.can_apply_in_zenith() { - result = self.handle_apply_request_zenith(&request); - - end_time = Instant::now(); - WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64()); - } else { - let mut process_guard = self.process.lock().unwrap(); - let lock_time = Instant::now(); - - // launch the WAL redo process on first use - if process_guard.is_none() { - let p = PostgresRedoProcess::launch(self.conf, &self.tenantid)?; - *process_guard = Some(p); - } - let process = process_guard.as_mut().unwrap(); - - result = self.handle_apply_request_postgres(process, &request); - - WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); - end_time = Instant::now(); - WAL_REDO_TIME.observe(end_time.duration_since(lock_time).as_secs_f64()); - - // If something went wrong, don't try to reuse the process. Kill it, and - // next request will launch a new one. - if result.is_err() { - let process = process_guard.take().unwrap(); - process.kill(); - } + if records.is_empty() { + error!("invalid WAL redo request with no records"); + return Err(WalRedoError::InvalidRequest); } - result + let mut img: Option = base_img; + let mut batch_zenith = can_apply_in_zenith(&records[0].1); + let mut batch_start = 0; + for i in 1..records.len() { + let rec_zenith = can_apply_in_zenith(&records[i].1); + + if rec_zenith != batch_zenith { + let result = if batch_zenith { + self.apply_batch_zenith(rel, blknum, lsn, img, &records[batch_start..i]) + } else { + self.apply_batch_postgres(rel, blknum, lsn, img, &records[batch_start..i]) + }; + img = Some(result?); + + batch_zenith = rec_zenith; + batch_start = i; + } + } + // last batch + if batch_zenith { + self.apply_batch_zenith(rel, blknum, lsn, img, &records[batch_start..]) + } else { + self.apply_batch_postgres(rel, blknum, lsn, img, &records[batch_start..]) + } } } @@ -253,203 +254,229 @@ impl PostgresRedoManager { /// /// Process one request for WAL redo using wal-redo postgres /// - fn handle_apply_request_postgres( + fn apply_batch_postgres( &self, - process: &mut PostgresRedoProcess, - request: &WalRedoRequest, + rel: RelishTag, + blknum: u32, + lsn: Lsn, + base_img: Option, + records: &[(Lsn, ZenithWalRecord)], ) -> Result { - let blknum = request.blknum; - let lsn = request.lsn; - let base_img = request.base_img.clone(); - let records = &request.records; - let nrecords = records.len(); - - let start = Instant::now(); + let start_time = Instant::now(); let apply_result: Result; - if let RelishTag::Relation(rel) = request.rel { + let mut process_guard = self.process.lock().unwrap(); + let lock_time = Instant::now(); + + // launch the WAL redo process on first use + if process_guard.is_none() { + let p = PostgresRedoProcess::launch(self.conf, &self.tenantid)?; + *process_guard = Some(p); + } + let process = process_guard.as_mut().unwrap(); + + WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); + + let result = if let RelishTag::Relation(rel) = rel { // Relational WAL records are applied using wal-redo-postgres let buf_tag = BufferTag { rel, blknum }; apply_result = process.apply_wal_records(buf_tag, base_img, records); - let duration = start.elapsed(); - - debug!( - "postgres applied {} WAL records in {} us to reconstruct page image at LSN {}", - nrecords, - duration.as_micros(), - lsn - ); - apply_result.map_err(WalRedoError::IoError) } else { + error!("unexpected non-relation relish: {:?}", rel); Err(WalRedoError::InvalidRequest) + }; + + let end_time = Instant::now(); + let duration = end_time.duration_since(lock_time); + WAL_REDO_TIME.observe(duration.as_secs_f64()); + debug!( + "postgres applied {} WAL records in {} us to reconstruct page image at LSN {}", + records.len(), + duration.as_micros(), + lsn + ); + + // If something went wrong, don't try to reuse the process. Kill it, and + // next request will launch a new one. + if result.is_err() { + let process = process_guard.take().unwrap(); + process.kill(); } + result } /// - /// Process one request for WAL redo using custom zenith code + /// Process a batch of WAL records using bespoken Zenith code. /// - fn handle_apply_request_zenith(&self, request: &WalRedoRequest) -> Result { - let rel = request.rel; - let blknum = request.blknum; - let lsn = request.lsn; - let base_img = request.base_img.clone(); - let records = &request.records; + fn apply_batch_zenith( + &self, + rel: RelishTag, + blknum: u32, + lsn: Lsn, + base_img: Option, + records: &[(Lsn, ZenithWalRecord)], + ) -> Result { + let start_time = Instant::now(); - let nrecords = records.len(); - - let start = Instant::now(); - - let apply_result: Result; - - // Non-relational WAL records are handled here, with custom code that has the - // same effects as the corresponding Postgres WAL redo function. - const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; let mut page = BytesMut::new(); if let Some(fpi) = base_img { // If full-page image is provided, then use it... page.extend_from_slice(&fpi[..]); } else { - // otherwise initialize page with zeros - page.extend_from_slice(&ZERO_PAGE); - } - // Apply all collected WAL records - for (_lsn, record) in records { - let mut buf = record.rec.clone(); - - WAL_REDO_RECORD_COUNTER.inc(); - - // 1. Parse XLogRecord struct - // FIXME: refactor to avoid code duplication. - let xlogrec = XLogRecord::from_bytes(&mut buf); - - //move to main data - // TODO probably, we should store some records in our special format - // to avoid this weird parsing on replay - let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; - if buf.remaining() > skip { - buf.advance(skip); - } - - if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { - // Transaction manager stuff - let rec_segno = match rel { - RelishTag::Slru { slru, segno } => { - assert!( - slru == SlruKind::Clog, - "Not valid XACT relish tag {:?}", - rel - ); - segno - } - _ => panic!("Not valid XACT relish tag {:?}", rel), - }; - let parsed_xact = - XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info); - if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT - || parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED - { - // Iterate the main XID, followed by all the subxids. - for xid in std::iter::once(&parsed_xact.xid).chain(parsed_xact.subxacts.iter()) - { - let pageno = *xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - // only update xids on the requested page - if rec_segno == segno && blknum == rpageno { - transaction_id_set_status( - *xid, - pg_constants::TRANSACTION_STATUS_COMMITTED, - &mut page, - ); - } - } - } else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT - || parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED - { - // Iterate the main XID, followed by all the subxids. - for xid in std::iter::once(&parsed_xact.xid).chain(parsed_xact.subxacts.iter()) - { - let pageno = *xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - // only update xids on the requested page - if rec_segno == segno && blknum == rpageno { - transaction_id_set_status( - *xid, - pg_constants::TRANSACTION_STATUS_ABORTED, - &mut page, - ); - } - } - } - } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - // Multixact operations - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - if let RelishTag::Slru { - slru, - segno: rec_segno, - } = rel - { - if slru == SlruKind::MultiXactMembers { - for i in 0..xlrec.nmembers { - let offset = xlrec.moff + i; - let pageno = - offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - if segno == rec_segno && rpageno == blknum { - // update only target block - let memberoff = mx_offset_to_member_offset(offset); - let flagsoff = mx_offset_to_flags_offset(offset); - let bshift = mx_offset_to_flags_bitshift(offset); - let mut flagsval = - LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); - flagsval &= - !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) - << bshift); - flagsval |= xlrec.members[i as usize].status << bshift; - LittleEndian::write_u32( - &mut page[flagsoff..flagsoff + 4], - flagsval, - ); - LittleEndian::write_u32( - &mut page[memberoff..memberoff + 4], - xlrec.members[i as usize].xid, - ); - } - } - } else { - // Multixact offsets SLRU - let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 - * 4) as usize; - LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); - } - } else { - panic!(); - } - } else { - panic!(); - } - } + // All the current WAL record types that we can handle require a base image. + error!("invalid zenith WAL redo request with no base image"); + return Err(WalRedoError::InvalidRequest); } - apply_result = Ok::(page.freeze()); - - let duration = start.elapsed(); + // Apply all the WAL records in the batch + for (record_lsn, record) in records.iter() { + self.apply_record_zenith(rel, blknum, &mut page, *record_lsn, record)?; + } + // Success! + let end_time = Instant::now(); + let duration = end_time.duration_since(start_time); + WAL_REDO_TIME.observe(duration.as_secs_f64()); debug!( "zenith applied {} WAL records in {} ms to reconstruct page image at LSN {}", - nrecords, - duration.as_millis(), + records.len(), + duration.as_micros(), lsn ); - apply_result.map_err(WalRedoError::IoError) + Ok(page.freeze()) + } + + fn apply_record_zenith( + &self, + rel: RelishTag, + blknum: u32, + page: &mut BytesMut, + _record_lsn: Lsn, + record: &ZenithWalRecord, + ) -> Result<(), WalRedoError> { + match record { + ZenithWalRecord::Postgres { + will_init: _, + rec: _, + } => panic!("tried to pass postgres wal record to zenith WAL redo"), + ZenithWalRecord::ClearVisibilityMapFlags { heap_blkno, flags } => { + // Calculate the VM block and offset that corresponds to the heap block. + let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(*heap_blkno); + let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(*heap_blkno); + let map_offset = pg_constants::HEAPBLK_TO_OFFSET(*heap_blkno); + + // Check that we're modifying the correct VM block. + assert!( + check_forknum(&rel, pg_constants::VISIBILITYMAP_FORKNUM), + "ClearVisibilityMapFlags record on unexpected rel {:?}", + rel + ); + assert!(map_block == blknum); + + // equivalent to PageGetContents(page) + let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; + + let mask: u8 = flags << map_offset; + map[map_byte as usize] &= !mask; + } + // Non-relational WAL records are handled here, with custom code that has the + // same effects as the corresponding Postgres WAL redo function. + ZenithWalRecord::ClogSetCommitted { xids } => { + for &xid in xids { + let pageno = xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + // Check that we're modifying the correct CLOG block. + assert!( + check_slru_segno(&rel, SlruKind::Clog, expected_segno), + "ClogSetCommitted record for XID {} with unexpected rel {:?}", + xid, + rel + ); + assert!(blknum == expected_blknum); + + transaction_id_set_status( + xid, + pg_constants::TRANSACTION_STATUS_COMMITTED, + page, + ); + } + } + ZenithWalRecord::ClogSetAborted { xids } => { + for &xid in xids { + let pageno = xid as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + // Check that we're modifying the correct CLOG block. + assert!( + check_slru_segno(&rel, SlruKind::Clog, expected_segno), + "ClogSetCommitted record for XID {} with unexpected rel {:?}", + xid, + rel + ); + assert!(blknum == expected_blknum); + + transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page); + } + } + ZenithWalRecord::MultixactOffsetCreate { mid, moff } => { + // Compute the block and offset to modify. + // See RecordNewMultiXact in PostgreSQL sources. + let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let offset = (entryno * 4) as usize; + + // Check that we're modifying the correct multixact-offsets block. + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + assert!( + check_slru_segno(&rel, SlruKind::MultiXactOffsets, expected_segno), + "MultiXactOffsetsCreate record for multi-xid {} with unexpected rel {:?}", + mid, + rel + ); + assert!(blknum == expected_blknum); + + LittleEndian::write_u32(&mut page[offset..offset + 4], *moff); + } + ZenithWalRecord::MultixactMembersCreate { moff, members } => { + for (i, member) in members.iter().enumerate() { + let offset = moff + i as u32; + + // Compute the block and offset to modify. + // See RecordNewMultiXact in PostgreSQL sources. + let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + + // Check that we're modifying the correct multixact-members block. + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + assert!( + check_slru_segno(&rel, SlruKind::MultiXactMembers, expected_segno), + "MultiXactMembersCreate record at offset {} with unexpected rel {:?}", + moff, + rel + ); + assert!(blknum == expected_blknum); + + let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift); + flagsval |= member.status << bshift; + LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval); + LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid); + } + } + } + + Ok(()) } } @@ -556,7 +583,7 @@ impl PostgresRedoProcess { &mut self, tag: BufferTag, base_img: Option, - records: &[(Lsn, WALRecord)], + records: &[(Lsn, ZenithWalRecord)], ) -> Result { // Serialize all the messages to send the WAL redo process first. // @@ -569,7 +596,15 @@ impl PostgresRedoProcess { build_push_page_msg(tag, &img, &mut writebuf); } for (lsn, rec) in records.iter() { - build_apply_record_msg(*lsn, &rec.rec, &mut writebuf); + if let ZenithWalRecord::Postgres { + will_init: _, + rec: postgres_rec, + } = rec + { + build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); + } else { + panic!("tried to pass zenith wal record to postgres WAL redo"); + } } build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); diff --git a/postgres_ffi/build.rs b/postgres_ffi/build.rs index ac738c7b0f..3b4b37f9ee 100644 --- a/postgres_ffi/build.rs +++ b/postgres_ffi/build.rs @@ -75,6 +75,7 @@ fn main() { .allowlist_var("XLOG_PAGE_MAGIC") .allowlist_var("PG_CONTROL_FILE_SIZE") .allowlist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC") + .allowlist_type("PageHeaderData") .allowlist_type("DBState") // Because structs are used for serialization, tell bindgen to emit // explicit padding fields. diff --git a/postgres_ffi/pg_control_ffi.h b/postgres_ffi/pg_control_ffi.h index 9877c0de6c..9da3f09b84 100644 --- a/postgres_ffi/pg_control_ffi.h +++ b/postgres_ffi/pg_control_ffi.h @@ -9,5 +9,6 @@ #include "access/xlog_internal.h" #include "storage/block.h" +#include "storage/bufpage.h" #include "storage/off.h" #include "access/multixact.h" diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index d5a4468dc7..923fbe4d5a 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -1,6 +1,8 @@ #![allow(non_upper_case_globals)] #![allow(non_camel_case_types)] #![allow(non_snake_case)] +// bindgen creates some unsafe code with no doc comments. +#![allow(clippy::missing_safety_doc)] // suppress warnings on rust 1.53 due to bindgen unit tests. // https://github.com/rust-lang/rust-bindgen/issues/1651 #![allow(deref_nullptr)] diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 5558b280f0..76f837cefc 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -7,6 +7,8 @@ //! comments on them. //! +use crate::PageHeaderData; + // // From pg_tablespace_d.h // @@ -31,6 +33,14 @@ pub const SMGR_TRUNCATE_FSM: u32 = 0x0004; pub const BLCKSZ: u16 = 8192; pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); +// +// From bufpage.h +// + +// Assumes 8 byte alignment +const SIZEOF_PAGE_HEADER_DATA: usize = std::mem::size_of::(); +pub const MAXALIGN_SIZE_OF_PAGE_HEADER_DATA: usize = (SIZEOF_PAGE_HEADER_DATA + 7) & !7; + // // constants from clog.h // @@ -39,13 +49,6 @@ pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE; pub const CLOG_BITS_PER_XACT: u8 = 2; pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; -// -// Constants from visbilitymap.h -// -pub const SIZE_OF_PAGE_HEADER: u16 = 24; -pub const BITS_PER_HEAPBLOCK: u16 = 2; -pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; - pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; @@ -53,6 +56,30 @@ pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; pub const CLOG_ZEROPAGE: u8 = 0x00; pub const CLOG_TRUNCATE: u8 = 0x10; +// +// Constants from visibilitymap.h, visibilitymapdefs.h and visibilitymap.c +// +pub const SIZE_OF_PAGE_HEADER: u16 = 24; +pub const BITS_PER_BYTE: u16 = 8; +pub const HEAPBLOCKS_PER_PAGE: u32 = + (BLCKSZ - SIZE_OF_PAGE_HEADER) as u32 * 8 / BITS_PER_HEAPBLOCK as u32; +pub const HEAPBLOCKS_PER_BYTE: u16 = BITS_PER_BYTE / BITS_PER_HEAPBLOCK; + +pub const fn HEAPBLK_TO_MAPBLOCK(x: u32) -> u32 { + x / HEAPBLOCKS_PER_PAGE +} +pub const fn HEAPBLK_TO_MAPBYTE(x: u32) -> u32 { + (x % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE as u32 +} +pub const fn HEAPBLK_TO_OFFSET(x: u32) -> u32 { + (x % HEAPBLOCKS_PER_BYTE as u32) * BITS_PER_HEAPBLOCK as u32 +} + +pub const BITS_PER_HEAPBLOCK: u16 = 2; +pub const VISIBILITYMAP_ALL_VISIBLE: u8 = 0x01; +pub const VISIBILITYMAP_ALL_FROZEN: u8 = 0x02; +pub const VISIBILITYMAP_VALID_BITS: u8 = 0x03; + // From xact.h pub const XLOG_XACT_COMMIT: u8 = 0x00; pub const XLOG_XACT_PREPARE: u8 = 0x10;