diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 26cba88d2b..6da9d61879 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -4,8 +4,8 @@ //! TODO: this module has nothing to do with PostgreSQL pg_basebackup. //! It could use a better name. //! -//! Stateless Postgres compute node is lauched by sending taball which contains on-relational data (multixacts, clog, filenodemaps, twophase files) -//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directry and +//! Stateless Postgres compute node is launched by sending tarball which contains non-relational data (multixacts, clog, filenodemaps, twophase files) +//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directory and //! data stored in object storage. //! use crate::ZTimelineId; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index cae423c77a..e519012bc6 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -289,6 +289,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; + // get_page_at_lsn_nowait returns pages with zeros when object is not found in the storage. + // nextXid can not be zero, so this check is used to detect situation when checkpoint record needs to be initialized. if checkpoint.nextXid.value == 0 { let pg_control_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, startpoint)?; @@ -505,9 +507,7 @@ pub fn save_decoded_record( let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_NEXTOID { let next_oid = buf.get_u32_le(); - if next_oid > checkpoint.nextOid { - checkpoint.nextOid = next_oid; - } + checkpoint.nextOid = next_oid; } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN { @@ -764,10 +764,13 @@ fn save_multixact_create_record( if xlrec.moff + xlrec.nmembers > checkpoint.nextMultiOffset { checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers; } - let max_mbr_xid = xlrec - .members - .iter() - .fold(0u32, |acc, mbr| if mbr.xid > acc { mbr.xid } else { acc }); + let max_mbr_xid = xlrec.members.iter().fold(0u32, |acc, mbr| { + if mbr.xid.wrapping_sub(acc) as i32 > 0 { + mbr.xid + } else { + acc + } + }); checkpoint.update_next_xid(max_mbr_xid); Ok(()) } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index c52631f35c..b1945b7e8c 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -190,6 +190,7 @@ fn walreceiver_main( waldecoder.feed_bytes(data); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { + // Save old checkpoint value to compare with it after decoding WAL record let old_checkpoint_bytes = checkpoint.encode(); let decoded = decode_wal_record(recdata.clone()); restore_local_repo::save_decoded_record( @@ -202,6 +203,7 @@ fn walreceiver_main( last_rec_lsn = lsn; let new_checkpoint_bytes = checkpoint.encode(); + // Check if checkpoint data was updated by save_decoded_record if new_checkpoint_bytes != old_checkpoint_bytes { timeline.put_page_image( ObjectTag::Checkpoint, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e5cdf10ff6..8d965f9a84 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -317,9 +317,9 @@ impl PostgresRedoManagerInternal { ObjectTag::Clog(slru) => slru.blknum, ObjectTag::TwoPhase(_) => { assert!(info == pg_constants::XLOG_XACT_PREPARE); - trace!("Apply prepare {} record", xlogrec.xl_xid); - page.clear(); - page.extend_from_slice(&buf[..]); + trace!("Apply prepare {} record", xlogrec.xl_xid); + page.clear(); + page.extend_from_slice(&buf[..]); continue; } _ => panic!("Not valid XACT object tag {:?}", tag), @@ -371,7 +371,7 @@ impl PostgresRedoManagerInternal { if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { - // Just need to ero page + // Just need to zero page page.copy_from_slice(&ZERO_PAGE); } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 272f3855d7..67e2e03469 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -390,6 +390,13 @@ impl CheckPoint { } } +// +// Generate new WAL segment with single XLOG_CHECKPOINT_SHUTDOWN record. +// We need this segment to start compute node. +// In order to minimize changes in Postgres core, we prefer to +// provide WAL segment from which is can extract checkpoint record in standard way, +// rather then implement some alternative mechanism. +// pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes { let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize); diff --git a/vendor/postgres b/vendor/postgres index a08f50cba2..8ab674ad99 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit a08f50cba293fff045cb771f8c6bb23b89f4f7f7 +Subproject commit 8ab674ad9927095eeaf278fc67d6f8ad5e38c9cb