libs/pq_proto: add interpreted wal records type

This commit is contained in:
Vlad Lazar
2024-11-13 12:05:28 +01:00
parent 057138f065
commit 1f043f480a

View File

@@ -562,6 +562,9 @@ pub enum BeMessage<'a> {
options: &'a [&'a str],
},
KeepAlive(WalSndKeepAlive),
/// Batch of interpreted, shard filtered WAL records,
/// ready for the pageserver to ingest
InterpretedWalRecords(InterpretedWalRecordsBody<'a>),
}
/// Common shorthands.
@@ -672,6 +675,18 @@ pub struct WalSndKeepAlive {
pub request_reply: bool,
}
#[derive(Debug)]
pub struct InterpretedWalRecordsBody<'a> {
/// End of raw WAL in [`Self::data`]
pub streaming_lsn: u64,
/// Current end of WAL on the server
pub safekeeper_wal_end_lsn: u64,
/// Start LSN of the next record in PG WAL.
/// Is 0 if the portion of PG WAL did not contain any records.
pub next_record_lsn: u64,
pub data: &'a [u8],
}
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
// single text column
@@ -996,6 +1011,18 @@ impl BeMessage<'_> {
Ok(())
})?
}
BeMessage::InterpretedWalRecords(rec) => {
buf.put_u8(b'd'); // arbitrary byte
write_body(buf, |buf| {
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
// dependency
buf.put_u64(rec.streaming_lsn);
buf.put_u64(rec.safekeeper_wal_end_lsn);
buf.put_u64(rec.next_record_lsn);
buf.put_slice(rec.data);
});
}
}
Ok(())
}