mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 04:50:38 +00:00
pageserver: refactor xlog record
This is an odd one. It requires the current checkpoint value to decide what to do. That can't trivially be moved to the SK. It's possible with protocol change, but deferring decision for now. Hence, send the raw record and let the pageserver figure it out.
This commit is contained in:
@@ -206,6 +206,12 @@ struct RelmapRecord {
|
||||
buf: Bytes,
|
||||
}
|
||||
|
||||
struct XlogRecord {
|
||||
info: u8,
|
||||
lsn: Lsn,
|
||||
buf: Bytes,
|
||||
}
|
||||
|
||||
impl WalIngest {
|
||||
pub async fn new(
|
||||
timeline: &Timeline,
|
||||
@@ -384,96 +390,19 @@ impl WalIngest {
|
||||
self.ingest_relmap_record(relmap_record, modification, ctx)
|
||||
.await?;
|
||||
}
|
||||
// This is an odd duck. It needs to go to all shards.
|
||||
// Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
|
||||
// in WalIngest::new), we have to send the whole DecodedWalRecord::record to
|
||||
// the pageserver and decode it there.
|
||||
//
|
||||
// Alternatively, one can make the checkpoint part of the subscription protocol
|
||||
// to the pageserver. This should work fine, but can be done at a later point.
|
||||
pg_constants::RM_XLOG_ID => {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
|
||||
if info == pg_constants::XLOG_PARAMETER_CHANGE {
|
||||
if let CheckPoint::V17(cp) = &mut self.checkpoint {
|
||||
let rec = v17::XlParameterChange::decode(&mut buf);
|
||||
cp.wal_level = rec.wal_level;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_END_OF_RECOVERY {
|
||||
if let CheckPoint::V17(cp) = &mut self.checkpoint {
|
||||
let rec = v17::XlEndOfRecovery::decode(&mut buf);
|
||||
cp.wal_level = rec.wal_level;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
}
|
||||
|
||||
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
||||
if info == pg_constants::XLOG_NEXTOID {
|
||||
let next_oid = buf.get_u32_le();
|
||||
if cp.nextOid != next_oid {
|
||||
cp.nextOid = next_oid;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|
||||
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
|
||||
buf.copy_to_slice(&mut checkpoint_bytes);
|
||||
let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
|
||||
xlog_checkpoint.oldestXid,
|
||||
cp.oldestXid
|
||||
);
|
||||
if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
|
||||
cp.oldestXid = xlog_checkpoint.oldestXid;
|
||||
}
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
|
||||
xlog_checkpoint.oldestActiveXid,
|
||||
cp.oldestActiveXid
|
||||
);
|
||||
|
||||
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
|
||||
// because at shutdown, all in-progress transactions will implicitly
|
||||
// end. Postgres startup code knows that, and allows hot standby to start
|
||||
// immediately from a shutdown checkpoint.
|
||||
//
|
||||
// In Neon, Postgres hot standby startup always behaves as if starting from
|
||||
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
|
||||
// instead of overwriting self.checkpoint.oldestActiveXid with
|
||||
// InvalidTransactionid from the checkpoint WAL record, update it to a
|
||||
// proper value, knowing that there are no in-progress transactions at this
|
||||
// point, except for prepared transactions.
|
||||
//
|
||||
// See also the neon code changes in the InitWalRecovery() function.
|
||||
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
|
||||
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let oldest_active_xid = if pg_version >= 17 {
|
||||
let mut oldest_active_full_xid = cp.nextXid.value;
|
||||
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
||||
if xid < oldest_active_full_xid {
|
||||
oldest_active_full_xid = xid;
|
||||
}
|
||||
}
|
||||
oldest_active_full_xid as u32
|
||||
} else {
|
||||
let mut oldest_active_xid = cp.nextXid.value as u32;
|
||||
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
||||
let narrow_xid = xid as u32;
|
||||
if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
|
||||
oldest_active_xid = narrow_xid;
|
||||
}
|
||||
}
|
||||
oldest_active_xid
|
||||
};
|
||||
cp.oldestActiveXid = oldest_active_xid;
|
||||
} else {
|
||||
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
}
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
// have some trace of the checkpoint records in the layer files at the same
|
||||
// LSNs.
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
});
|
||||
let xlog_record = Self::decode_xlog_record(&mut buf, &decoded, lsn, pg_version)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
self.ingest_xlog_record(xlog_record, modification, ctx)
|
||||
.await?;
|
||||
}
|
||||
pg_constants::RM_LOGICALMSG_ID => {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
@@ -2211,6 +2140,120 @@ impl WalIngest {
|
||||
}))
|
||||
}
|
||||
|
||||
async fn ingest_xlog_record(
|
||||
&mut self,
|
||||
record: XlogRecord,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
let XlogRecord { info, lsn, mut buf } = record;
|
||||
let pg_version = modification.tline.pg_version;
|
||||
|
||||
if info == pg_constants::XLOG_PARAMETER_CHANGE {
|
||||
if let CheckPoint::V17(cp) = &mut self.checkpoint {
|
||||
let rec = v17::XlParameterChange::decode(&mut buf);
|
||||
cp.wal_level = rec.wal_level;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_END_OF_RECOVERY {
|
||||
if let CheckPoint::V17(cp) = &mut self.checkpoint {
|
||||
let rec = v17::XlEndOfRecovery::decode(&mut buf);
|
||||
cp.wal_level = rec.wal_level;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
}
|
||||
|
||||
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
||||
if info == pg_constants::XLOG_NEXTOID {
|
||||
let next_oid = buf.get_u32_le();
|
||||
if cp.nextOid != next_oid {
|
||||
cp.nextOid = next_oid;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|
||||
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
|
||||
buf.copy_to_slice(&mut checkpoint_bytes);
|
||||
let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
|
||||
xlog_checkpoint.oldestXid,
|
||||
cp.oldestXid
|
||||
);
|
||||
if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
|
||||
cp.oldestXid = xlog_checkpoint.oldestXid;
|
||||
}
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
|
||||
xlog_checkpoint.oldestActiveXid,
|
||||
cp.oldestActiveXid
|
||||
);
|
||||
|
||||
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
|
||||
// because at shutdown, all in-progress transactions will implicitly
|
||||
// end. Postgres startup code knows that, and allows hot standby to start
|
||||
// immediately from a shutdown checkpoint.
|
||||
//
|
||||
// In Neon, Postgres hot standby startup always behaves as if starting from
|
||||
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
|
||||
// instead of overwriting self.checkpoint.oldestActiveXid with
|
||||
// InvalidTransactionid from the checkpoint WAL record, update it to a
|
||||
// proper value, knowing that there are no in-progress transactions at this
|
||||
// point, except for prepared transactions.
|
||||
//
|
||||
// See also the neon code changes in the InitWalRecovery() function.
|
||||
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
|
||||
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let oldest_active_xid = if pg_version >= 17 {
|
||||
let mut oldest_active_full_xid = cp.nextXid.value;
|
||||
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
||||
if xid < oldest_active_full_xid {
|
||||
oldest_active_full_xid = xid;
|
||||
}
|
||||
}
|
||||
oldest_active_full_xid as u32
|
||||
} else {
|
||||
let mut oldest_active_xid = cp.nextXid.value as u32;
|
||||
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
||||
let narrow_xid = xid as u32;
|
||||
if (narrow_xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
|
||||
oldest_active_xid = narrow_xid;
|
||||
}
|
||||
}
|
||||
oldest_active_xid
|
||||
};
|
||||
cp.oldestActiveXid = oldest_active_xid;
|
||||
} else {
|
||||
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
}
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
// have some trace of the checkpoint records in the layer files at the same
|
||||
// LSNs.
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn decode_xlog_record(
|
||||
buf: &mut Bytes,
|
||||
decoded: &DecodedWALRecord,
|
||||
lsn: Lsn,
|
||||
_pg_version: u32,
|
||||
) -> anyhow::Result<Option<XlogRecord>> {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
Ok(Some(XlogRecord {
|
||||
info,
|
||||
lsn,
|
||||
buf: buf.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn put_rel_creation(
|
||||
&mut self,
|
||||
modification: &mut DatadirModification<'_>,
|
||||
|
||||
Reference in New Issue
Block a user