diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index b3fcaae62f..4505101ea6 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -24,7 +24,7 @@ pub struct Key { /// When working with large numbers of Keys in-memory, it is more efficient to handle them as i128 than as /// a struct of fields. -#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd)] +#[derive(Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] pub struct CompactKey(i128); /// The storage key size. diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 010a9c2932..09d1fae221 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -24,7 +24,7 @@ use postgres_ffi::Oid; // FIXME: should move 'forknum' as last field to keep this consistent with Postgres. // Then we could replace the custom Ord and PartialOrd implementations below with // deriving them. This will require changes in walredoproc.c. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] pub struct RelTag { pub forknum: u8, pub spcnode: Oid, diff --git a/libs/postgres_ffi/src/walrecord.rs b/libs/postgres_ffi/src/walrecord.rs index dedbaef64d..b32106632a 100644 --- a/libs/postgres_ffi/src/walrecord.rs +++ b/libs/postgres_ffi/src/walrecord.rs @@ -16,7 +16,7 @@ use utils::bin_ser::DeserializeError; use utils::lsn::Lsn; #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlMultiXactCreate { pub mid: MultiXactId, /* new MultiXact's ID */ @@ -46,7 +46,7 @@ impl XlMultiXactCreate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlMultiXactTruncate { pub oldest_multi_db: Oid, /* to-be-truncated range of multixact offsets */ @@ -72,7 +72,7 @@ impl XlMultiXactTruncate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlRelmapUpdate { pub dbid: Oid, /* database ID, or 0 for shared map */ pub tsid: Oid, /* database's tablespace, or pg_global */ @@ -90,7 +90,7 @@ impl XlRelmapUpdate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlReploriginDrop { pub node_id: RepOriginId, } @@ -104,7 +104,7 @@ impl XlReploriginDrop { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlReploriginSet { pub remote_lsn: Lsn, pub node_id: RepOriginId, @@ -120,7 +120,7 @@ impl XlReploriginSet { } #[repr(C)] -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct RelFileNode { pub spcnode: Oid, /* tablespace */ pub dbnode: Oid, /* database */ @@ -911,7 +911,7 @@ impl XlSmgrCreate { } #[repr(C)] -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlSmgrTruncate { pub blkno: BlockNumber, pub rnode: RelFileNode, @@ -984,7 +984,7 @@ impl XlDropDatabase { /// xl_xact_parsed_abort structs in PostgreSQL, but we use the same /// struct for commits and aborts. /// -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct XlXactParsedRecord { pub xid: TransactionId, pub info: u8, diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 5d90eeb69c..88371fe51e 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -32,16 +32,19 @@ use postgres_ffi::walrecord::{ XlSmgrTruncate, XlXactParsedRecord, }; use postgres_ffi::{Oid, TransactionId}; +use serde::{Deserialize, Serialize}; use utils::lsn::Lsn; use crate::serialized_batch::SerializedValueBatch; +#[derive(Serialize, Deserialize)] pub enum FlushUncommittedRecords { Yes, No, } /// An interpreted Postgres WAL record, ready to be handled by the pageserver +#[derive(Serialize, Deserialize)] pub struct InterpretedWalRecord { /// Optional metadata record - may cause writes to metadata keys /// in the storage engine @@ -62,6 +65,7 @@ pub struct InterpretedWalRecord { /// The interpreted part of the Postgres WAL record which requires metadata /// writes to the underlying storage engine. +#[derive(Serialize, Deserialize)] pub enum MetadataRecord { Heapam(HeapamRecord), Neonrmgr(NeonrmgrRecord), @@ -77,10 +81,12 @@ pub enum MetadataRecord { Replorigin(ReploriginRecord), } +#[derive(Serialize, Deserialize)] pub enum HeapamRecord { ClearVmBits(ClearVmBits), } +#[derive(Serialize, Deserialize)] pub struct ClearVmBits { pub new_heap_blkno: Option, pub old_heap_blkno: Option, @@ -88,24 +94,29 @@ pub struct ClearVmBits { pub flags: u8, } +#[derive(Serialize, Deserialize)] pub enum NeonrmgrRecord { ClearVmBits(ClearVmBits), } +#[derive(Serialize, Deserialize)] pub enum SmgrRecord { Create(SmgrCreate), Truncate(XlSmgrTruncate), } +#[derive(Serialize, Deserialize)] pub struct SmgrCreate { pub rel: RelTag, } +#[derive(Serialize, Deserialize)] pub enum DbaseRecord { Create(DbaseCreate), Drop(DbaseDrop), } +#[derive(Serialize, Deserialize)] pub struct DbaseCreate { pub db_id: Oid, pub tablespace_id: Oid, @@ -113,27 +124,32 @@ pub struct DbaseCreate { pub src_tablespace_id: Oid, } +#[derive(Serialize, Deserialize)] pub struct DbaseDrop { pub db_id: Oid, pub tablespace_ids: Vec, } +#[derive(Serialize, Deserialize)] pub enum ClogRecord { ZeroPage(ClogZeroPage), Truncate(ClogTruncate), } +#[derive(Serialize, Deserialize)] pub struct ClogZeroPage { pub segno: u32, pub rpageno: u32, } +#[derive(Serialize, Deserialize)] pub struct ClogTruncate { pub pageno: u32, pub oldest_xid: TransactionId, pub oldest_xid_db: Oid, } +#[derive(Serialize, Deserialize)] pub enum XactRecord { Commit(XactCommon), Abort(XactCommon), @@ -142,6 +158,7 @@ pub enum XactRecord { Prepare(XactPrepare), } +#[derive(Serialize, Deserialize)] pub struct XactCommon { pub parsed: XlXactParsedRecord, pub origin_id: u16, @@ -150,61 +167,73 @@ pub struct XactCommon { pub lsn: Lsn, } +#[derive(Serialize, Deserialize)] pub struct XactPrepare { pub xl_xid: TransactionId, pub data: Bytes, } +#[derive(Serialize, Deserialize)] pub enum MultiXactRecord { ZeroPage(MultiXactZeroPage), Create(XlMultiXactCreate), Truncate(XlMultiXactTruncate), } +#[derive(Serialize, Deserialize)] pub struct MultiXactZeroPage { pub slru_kind: SlruKind, pub segno: u32, pub rpageno: u32, } +#[derive(Serialize, Deserialize)] pub enum RelmapRecord { Update(RelmapUpdate), } +#[derive(Serialize, Deserialize)] pub struct RelmapUpdate { pub update: XlRelmapUpdate, pub buf: Bytes, } +#[derive(Serialize, Deserialize)] pub enum XlogRecord { Raw(RawXlogRecord), } +#[derive(Serialize, Deserialize)] pub struct RawXlogRecord { pub info: u8, pub lsn: Lsn, pub buf: Bytes, } +#[derive(Serialize, Deserialize)] pub enum LogicalMessageRecord { Put(PutLogicalMessage), #[cfg(feature = "testing")] Failpoint, } +#[derive(Serialize, Deserialize)] pub struct PutLogicalMessage { pub path: String, pub buf: Bytes, } +#[derive(Serialize, Deserialize)] pub enum StandbyRecord { RunningXacts(StandbyRunningXacts), } +#[derive(Serialize, Deserialize)] pub struct StandbyRunningXacts { pub oldest_running_xid: TransactionId, } +#[derive(Serialize, Deserialize)] pub enum ReploriginRecord { Set(XlReploriginSet), Drop(XlReploriginDrop), diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index 8f33291023..632603cc8b 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -16,6 +16,7 @@ use pageserver_api::shard::ShardIdentity; use pageserver_api::{key::CompactKey, value::Value}; use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord}; use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ}; +use serde::{Deserialize, Serialize}; use utils::bin_ser::BeSer; use utils::lsn::Lsn; @@ -29,6 +30,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); /// relation sizes. In the case of "observed" values, we only need to know /// the key and LSN, so two types of metadata are supported to save on network /// bandwidth. +#[derive(Serialize, Deserialize)] pub enum ValueMeta { Serialized(SerializedValueMeta), Observed(ObservedValueMeta), @@ -75,6 +77,7 @@ impl PartialEq for OrderedValueMeta { impl Eq for OrderedValueMeta {} /// Metadata for a [`Value`] serialized into the batch. +#[derive(Serialize, Deserialize)] pub struct SerializedValueMeta { pub key: CompactKey, pub lsn: Lsn, @@ -86,12 +89,14 @@ pub struct SerializedValueMeta { } /// Metadata for a [`Value`] observed by the batch +#[derive(Serialize, Deserialize)] pub struct ObservedValueMeta { pub key: CompactKey, pub lsn: Lsn, } /// Batch of serialized [`Value`]s. +#[derive(Serialize, Deserialize)] pub struct SerializedValueBatch { /// [`Value`]s serialized in EphemeralFile's native format, /// ready for disk write by the pageserver