Use enum-typed PG versions

This makes it possible for the compiler to validate that a match
block matched all PostgreSQL versions we support.
This commit is contained in:
Matthias van de Meent
2025-06-16 11:18:31 +02:00
parent 7916aa26e0
commit bdf8ebdf65
56 changed files with 574 additions and 322 deletions

View File

@@ -7,8 +7,8 @@ use bytes::{Buf, Bytes};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{RelTag, SlruKind};
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::pg_constants;
use postgres_ffi::walrecord::*;
use postgres_ffi::{PgMajorVersion, pg_constants};
use postgres_ffi_types::forknum::VISIBILITYMAP_FORKNUM;
use utils::lsn::Lsn;
@@ -24,7 +24,7 @@ impl InterpretedWalRecord {
buf: Bytes,
shards: &[ShardIdentity],
next_record_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<HashMap<ShardIdentity, InterpretedWalRecord>> {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(buf, &mut decoded, pg_version)?;
@@ -78,7 +78,7 @@ impl MetadataRecord {
decoded: &DecodedWALRecord,
shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
next_record_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<()> {
// Note: this doesn't actually copy the bytes since
// the [`Bytes`] type implements it via a level of indirection.
@@ -193,7 +193,7 @@ impl MetadataRecord {
fn decode_heapam_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
// Handle VM bit updates that are implicitly part of heap records.
@@ -205,7 +205,7 @@ impl MetadataRecord {
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
match pg_version {
14 => {
PgMajorVersion::PG14 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -272,7 +272,7 @@ impl MetadataRecord {
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
15 => {
PgMajorVersion::PG15 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -339,7 +339,7 @@ impl MetadataRecord {
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
16 => {
PgMajorVersion::PG16 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -406,7 +406,7 @@ impl MetadataRecord {
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
17 => {
PgMajorVersion::PG17 => {
if decoded.xl_rmid == pg_constants::RM_HEAP_ID {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
@@ -473,7 +473,6 @@ impl MetadataRecord {
anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
}
}
_ => {}
}
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
@@ -500,7 +499,7 @@ impl MetadataRecord {
fn decode_neonmgr_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
// Handle VM bit updates that are implicitly part of heap records.
@@ -514,7 +513,7 @@ impl MetadataRecord {
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
match pg_version {
16 | 17 => {
PgMajorVersion::PG16 | PgMajorVersion::PG17 => {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
match info {
@@ -574,7 +573,7 @@ impl MetadataRecord {
info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info),
}
}
_ => anyhow::bail!(
PgMajorVersion::PG15 | PgMajorVersion::PG14 => anyhow::bail!(
"Neon RMGR has no known compatibility with PostgreSQL version {}",
pg_version
),
@@ -629,116 +628,121 @@ impl MetadataRecord {
fn decode_dbase_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
// TODO: Refactor this to avoid the duplication between postgres versions.
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID");
if pg_version == 14 {
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
let createdb = XlCreateDatabase::decode(buf);
tracing::debug!("XLOG_DBASE_CREATE v14");
match pg_version {
PgMajorVersion::PG14 => {
if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE {
let createdb = XlCreateDatabase::decode(buf);
tracing::debug!("XLOG_DBASE_CREATE v14");
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
return Ok(Some(record));
} else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
return Ok(Some(record));
}
}
} else if pg_version == 15 {
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
PgMajorVersion::PG15 => {
if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
return Ok(Some(record));
}
}
} else if pg_version == 16 {
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
PgMajorVersion::PG16 => {
if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
return Ok(Some(record));
}
}
} else if pg_version == 17 {
if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
PgMajorVersion::PG17 => {
if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG {
tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
let createdb = XlCreateDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate {
db_id: createdb.db_id,
tablespace_id: createdb.tablespace_id,
src_db_id: createdb.src_db_id,
src_tablespace_id: createdb.src_tablespace_id,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
} else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP {
let dropdb = XlDropDatabase::decode(buf);
let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop {
db_id: dropdb.db_id,
tablespace_ids: dropdb.tablespace_ids,
}));
return Ok(Some(record));
return Ok(Some(record));
}
}
}
@@ -748,12 +752,12 @@ impl MetadataRecord {
fn decode_clog_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
let pageno = if pg_version < 17 {
let pageno = if pg_version < PgMajorVersion::PG17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32
@@ -765,7 +769,7 @@ impl MetadataRecord {
ClogZeroPage { segno, rpageno },
))))
} else {
assert!(info == pg_constants::CLOG_TRUNCATE);
assert_eq!(info, pg_constants::CLOG_TRUNCATE);
let xlrec = XlClogTruncate::decode(buf, pg_version);
Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate(
@@ -838,14 +842,14 @@ impl MetadataRecord {
fn decode_multixact_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<Option<MetadataRecord>> {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE
|| info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE
{
let pageno = if pg_version < 17 {
let pageno = if pg_version < PgMajorVersion::PG17 {
buf.get_u32_le()
} else {
buf.get_u64_le() as u32

View File

@@ -13,7 +13,7 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants};
use postgres_ffi::{BLCKSZ, PgMajorVersion, page_is_new, page_set_lsn, pg_constants};
use serde::{Deserialize, Serialize};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
@@ -139,7 +139,7 @@ impl SerializedValueBatch {
decoded: DecodedWALRecord,
shard_records: &mut HashMap<ShardIdentity, InterpretedWalRecord>,
next_record_lsn: Lsn,
pg_version: u32,
pg_version: PgMajorVersion,
) -> anyhow::Result<()> {
// First determine how big the buffers need to be and allocate it up-front.
// This duplicates some of the work below, but it's empirically much faster.
@@ -267,7 +267,7 @@ impl SerializedValueBatch {
fn estimate_buffer_size(
decoded: &DecodedWALRecord,
shard: &ShardIdentity,
pg_version: u32,
pg_version: PgMajorVersion,
) -> usize {
let mut estimate: usize = 0;
@@ -303,7 +303,11 @@ impl SerializedValueBatch {
estimate
}
fn block_is_image(decoded: &DecodedWALRecord, blk: &DecodedBkpBlock, pg_version: u32) -> bool {
fn block_is_image(
decoded: &DecodedWALRecord,
blk: &DecodedBkpBlock,
pg_version: PgMajorVersion,
) -> bool {
blk.apply_image
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID