diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 9ffaaba584..20b9ea18b1 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -562,6 +562,9 @@ pub enum BeMessage<'a> { options: &'a [&'a str], }, KeepAlive(WalSndKeepAlive), + /// Batch of interpreted, shard filtered WAL records, + /// ready for the pageserver to ingest + InterpretedWalRecords(InterpretedWalRecordsBody<'a>), } /// Common shorthands. @@ -672,6 +675,18 @@ pub struct WalSndKeepAlive { pub request_reply: bool, } +#[derive(Debug)] +pub struct InterpretedWalRecordsBody<'a> { + /// End of raw WAL in [`Self::data`] + pub streaming_lsn: u64, + /// Current end of WAL on the server + pub safekeeper_wal_end_lsn: u64, + /// Start LSN of the next record in PG WAL. + /// Is 0 if the portion of PG WAL did not contain any records. + pub next_record_lsn: u64, + pub data: &'a [u8], +} + pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]); // single text column @@ -996,6 +1011,18 @@ impl BeMessage<'_> { Ok(()) })? } + + BeMessage::InterpretedWalRecords(rec) => { + buf.put_u8(b'd'); // arbitrary byte + write_body(buf, |buf| { + buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol + // dependency + buf.put_u64(rec.streaming_lsn); + buf.put_u64(rec.safekeeper_wal_end_lsn); + buf.put_u64(rec.next_record_lsn); + buf.put_slice(rec.data); + }); + } } Ok(()) }