diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 22d5e3c399..bfc7b5e9e5 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -206,6 +206,12 @@ struct RelmapRecord { buf: Bytes, } +struct XlogRecord { + info: u8, + lsn: Lsn, + buf: Bytes, +} + impl WalIngest { pub async fn new( timeline: &Timeline, @@ -384,96 +390,19 @@ impl WalIngest { self.ingest_relmap_record(relmap_record, modification, ctx) .await?; } + // This is an odd duck. It needs to go to all shards. + // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY + // in WalIngest::new), we have to send the whole DecodedWalRecord::record to + // the pageserver and decode it there. + // + // Alternatively, one can make the checkpoint part of the subscription protocol + // to the pageserver. This should work fine, but can be done at a later point. pg_constants::RM_XLOG_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_PARAMETER_CHANGE { - if let CheckPoint::V17(cp) = &mut self.checkpoint { - let rec = v17::XlParameterChange::decode(&mut buf); - cp.wal_level = rec.wal_level; - self.checkpoint_modified = true; - } - } else if info == pg_constants::XLOG_END_OF_RECOVERY { - if let CheckPoint::V17(cp) = &mut self.checkpoint { - let rec = v17::XlEndOfRecovery::decode(&mut buf); - cp.wal_level = rec.wal_level; - self.checkpoint_modified = true; - } - } - - enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { - if info == pg_constants::XLOG_NEXTOID { - let next_oid = buf.get_u32_le(); - if cp.nextOid != next_oid { - cp.nextOid = next_oid; - self.checkpoint_modified = true; - } - } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE - || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN - { - let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT]; - buf.copy_to_slice(&mut checkpoint_bytes); - let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?; - trace!( - "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", - xlog_checkpoint.oldestXid, - cp.oldestXid - ); - if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 { - cp.oldestXid = xlog_checkpoint.oldestXid; - } - trace!( - "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}", - xlog_checkpoint.oldestActiveXid, - cp.oldestActiveXid - ); - - // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`, - // because at shutdown, all in-progress transactions will implicitly - // end. Postgres startup code knows that, and allows hot standby to start - // immediately from a shutdown checkpoint. - // - // In Neon, Postgres hot standby startup always behaves as if starting from - // an online checkpoint. It needs a valid `oldestActiveXid` value, so - // instead of overwriting self.checkpoint.oldestActiveXid with - // InvalidTransactionid from the checkpoint WAL record, update it to a - // proper value, knowing that there are no in-progress transactions at this - // point, except for prepared transactions. - // - // See also the neon code changes in the InitWalRecovery() function. - if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID - && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN - { - let oldest_active_xid = if pg_version >= 17 { - let mut oldest_active_full_xid = cp.nextXid.value; - for xid in modification.tline.list_twophase_files(lsn, ctx).await? { - if xid < oldest_active_full_xid { - oldest_active_full_xid = xid; - } - } - oldest_active_full_xid as u32 - } else { - let mut oldest_active_xid = cp.nextXid.value as u32; - for xid in modification.tline.list_twophase_files(lsn, ctx).await? { - let narrow_xid = xid as u32; - if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 { - oldest_active_xid = narrow_xid; - } - } - oldest_active_xid - }; - cp.oldestActiveXid = oldest_active_xid; - } else { - cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid; - } - - // Write a new checkpoint key-value pair on every checkpoint record, even - // if nothing really changed. Not strictly required, but it seems nice to - // have some trace of the checkpoint records in the layer files at the same - // LSNs. - self.checkpoint_modified = true; - } - }); + let xlog_record = Self::decode_xlog_record(&mut buf, &decoded, lsn, pg_version) + .unwrap() + .unwrap(); + self.ingest_xlog_record(xlog_record, modification, ctx) + .await?; } pg_constants::RM_LOGICALMSG_ID => { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; @@ -2211,6 +2140,120 @@ impl WalIngest { })) } + async fn ingest_xlog_record( + &mut self, + record: XlogRecord, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> Result<()> { + let XlogRecord { info, lsn, mut buf } = record; + let pg_version = modification.tline.pg_version; + + if info == pg_constants::XLOG_PARAMETER_CHANGE { + if let CheckPoint::V17(cp) = &mut self.checkpoint { + let rec = v17::XlParameterChange::decode(&mut buf); + cp.wal_level = rec.wal_level; + self.checkpoint_modified = true; + } + } else if info == pg_constants::XLOG_END_OF_RECOVERY { + if let CheckPoint::V17(cp) = &mut self.checkpoint { + let rec = v17::XlEndOfRecovery::decode(&mut buf); + cp.wal_level = rec.wal_level; + self.checkpoint_modified = true; + } + } + + enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, { + if info == pg_constants::XLOG_NEXTOID { + let next_oid = buf.get_u32_le(); + if cp.nextOid != next_oid { + cp.nextOid = next_oid; + self.checkpoint_modified = true; + } + } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE + || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT]; + buf.copy_to_slice(&mut checkpoint_bytes); + let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?; + trace!( + "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", + xlog_checkpoint.oldestXid, + cp.oldestXid + ); + if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 { + cp.oldestXid = xlog_checkpoint.oldestXid; + } + trace!( + "xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}", + xlog_checkpoint.oldestActiveXid, + cp.oldestActiveXid + ); + + // A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`, + // because at shutdown, all in-progress transactions will implicitly + // end. Postgres startup code knows that, and allows hot standby to start + // immediately from a shutdown checkpoint. + // + // In Neon, Postgres hot standby startup always behaves as if starting from + // an online checkpoint. It needs a valid `oldestActiveXid` value, so + // instead of overwriting self.checkpoint.oldestActiveXid with + // InvalidTransactionid from the checkpoint WAL record, update it to a + // proper value, knowing that there are no in-progress transactions at this + // point, except for prepared transactions. + // + // See also the neon code changes in the InitWalRecovery() function. + if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID + && info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let oldest_active_xid = if pg_version >= 17 { + let mut oldest_active_full_xid = cp.nextXid.value; + for xid in modification.tline.list_twophase_files(lsn, ctx).await? { + if xid < oldest_active_full_xid { + oldest_active_full_xid = xid; + } + } + oldest_active_full_xid as u32 + } else { + let mut oldest_active_xid = cp.nextXid.value as u32; + for xid in modification.tline.list_twophase_files(lsn, ctx).await? { + let narrow_xid = xid as u32; + if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 { + oldest_active_xid = narrow_xid; + } + } + oldest_active_xid + }; + cp.oldestActiveXid = oldest_active_xid; + } else { + cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid; + } + + // Write a new checkpoint key-value pair on every checkpoint record, even + // if nothing really changed. Not strictly required, but it seems nice to + // have some trace of the checkpoint records in the layer files at the same + // LSNs. + self.checkpoint_modified = true; + } + }); + + Ok(()) + } + + fn decode_xlog_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + lsn: Lsn, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + Ok(Some(XlogRecord { + info, + lsn, + buf: buf.clone(), + })) + } + async fn put_rel_creation( &mut self, modification: &mut DatadirModification<'_>,