postgres_ffi: make WalGenerator generic over record generator (#9614)

## Problem

Benchmarks need more control over the WAL generated by `WalGenerator`.
In particular, they need to vary the size of logical messages.

## Summary of changes

* Make `WalGenerator` generic over `RecordGenerator`, which constructs
WAL records.
* Add `LogicalMessageGenerator` which emits logical messages, with a
configurable payload.
* Minor tweaks and code reorganization.

There are no changes to the core logic or emitted WAL.
This commit is contained in:
Erik Grinaker
2024-11-07 11:38:39 +01:00
committed by GitHub
parent e1d0b73824
commit d6aa26a533
3 changed files with 156 additions and 102 deletions

View File

@@ -1,10 +1,10 @@
use std::ffi::CStr;
use std::ffi::{CStr, CString};
use bytes::{Bytes, BytesMut};
use crc32c::crc32c_append;
use utils::lsn::Lsn;
use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
use super::bindings::{RmgrId, XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC};
use super::xlog_utils::{
XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE,
XLP_FIRST_IS_CONTRECORD,
@@ -16,11 +16,65 @@ use crate::pg_constants::{
};
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
/// Generates binary WAL records for use in tests and benchmarks. Currently only generates logical
/// messages (effectively noops) with a fixed payload. It is used as an iterator which yields
/// encoded bytes for a single WAL record, including internal page headers if it spans pages.
/// Concatenating the bytes will yield a complete, well-formed WAL, which can be chunked at segment
/// boundaries if desired. Not optimized for performance.
/// A WAL record payload. Will be prefixed by an XLogRecord header when encoded.
pub struct Record {
pub rmid: RmgrId,
pub info: u8,
pub data: Bytes,
}
impl Record {
/// Encodes the WAL record including an XLogRecord header. prev_lsn is the start position of
/// the previous record in the WAL -- this is ignored by the Safekeeper, but not Postgres.
pub fn encode(&self, prev_lsn: Lsn) -> Bytes {
// Prefix data with block ID and length.
let data_header = Bytes::from(match self.data.len() {
0 => vec![],
1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, self.data.len() as u8],
256.. => {
let len_bytes = (self.data.len() as u32).to_le_bytes();
[&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat()
}
});
// Construct the WAL record header.
let mut header = XLogRecord {
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + self.data.len()) as u32,
xl_xid: 0,
xl_prev: prev_lsn.into(),
xl_info: self.info,
xl_rmid: self.rmid,
__bindgen_padding_0: [0; 2],
xl_crc: 0, // see below
};
// Compute the CRC checksum for the data, and the header up to the CRC field.
let mut crc = 0;
crc = crc32c_append(crc, &data_header);
crc = crc32c_append(crc, &self.data);
crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]);
header.xl_crc = crc;
// Encode the final header and record.
let header = header.encode().unwrap();
[header, data_header, self.data.clone()].concat().into()
}
}
/// Generates WAL record payloads.
///
/// TODO: currently only provides LogicalMessageGenerator for trivial noop messages. Add a generator
/// that creates a table and inserts rows.
pub trait RecordGenerator: Iterator<Item = Record> {}
impl<I: Iterator<Item = Record>> RecordGenerator for I {}
/// Generates binary WAL for use in tests and benchmarks. The provided record generator constructs
/// the WAL records. It is used as an iterator which yields encoded bytes for a single WAL record,
/// including internal page headers if it spans pages. Concatenating the bytes will yield a
/// complete, well-formed WAL, which can be chunked at segment boundaries if desired. Not optimized
/// for performance.
///
/// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this
/// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`).
@@ -31,10 +85,10 @@ use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
/// | Segment 1 | Segment 2 | Segment 3 |
/// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 |
/// | R1 | R2 |R3| R4 | R5 | R6 | R7 | R8 |
///
/// TODO: support generating actual tables and rows.
#[derive(Default)]
pub struct WalGenerator {
pub struct WalGenerator<R: RecordGenerator> {
/// Generates record payloads for the WAL.
pub record_generator: R,
/// Current LSN to append the next record at.
///
/// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should
@@ -46,73 +100,35 @@ pub struct WalGenerator {
pub prev_lsn: Lsn,
}
impl WalGenerator {
// For now, hardcode the message payload.
// TODO: support specifying the payload size.
const PREFIX: &CStr = c"prefix";
const MESSAGE: &[u8] = b"message";
// Hardcode the sys, timeline, and DB IDs. We can make them configurable if we care about them.
impl<R: RecordGenerator> WalGenerator<R> {
// Hardcode the sys and timeline ID. We can make them configurable if we care about them.
const SYS_ID: u64 = 0;
const TIMELINE_ID: u32 = 1;
const DB_ID: u32 = 0;
/// Creates a new WAL generator, which emits logical message records (noops).
pub fn new() -> Self {
Self::default()
/// Creates a new WAL generator with the given record generator.
pub fn new(record_generator: R) -> WalGenerator<R> {
Self {
record_generator,
lsn: Lsn(0),
prev_lsn: Lsn(0),
}
}
/// Encodes a logical message (basically a noop), with the given prefix and message.
pub(crate) fn encode_logical_message(prefix: &CStr, message: &[u8]) -> Bytes {
let prefix = prefix.to_bytes_with_nul();
let header = XlLogicalMessage {
db_id: Self::DB_ID,
transactional: 0,
prefix_size: prefix.len() as u64,
message_size: message.len() as u64,
};
[&header.encode(), prefix, message].concat().into()
/// Appends a record with an arbitrary payload at the current LSN, then increments the LSN.
/// Returns the WAL bytes for the record, including page headers and padding, and the start LSN.
fn append_record(&mut self, record: Record) -> (Lsn, Bytes) {
let record = record.encode(self.prev_lsn);
let record = Self::insert_pages(record, self.lsn);
let record = Self::pad_record(record, self.lsn);
let lsn = self.lsn;
self.prev_lsn = self.lsn;
self.lsn += record.len() as u64;
(lsn, record)
}
/// Encode a WAL record with the given payload data (e.g. a logical message).
pub(crate) fn encode_record(data: Bytes, rmid: u8, info: u8, prev_lsn: Lsn) -> Bytes {
// Prefix data with block ID and length.
let data_header = Bytes::from(match data.len() {
0 => vec![],
1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, data.len() as u8],
256.. => {
let len_bytes = (data.len() as u32).to_le_bytes();
[&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat()
}
});
// Construct the WAL record header.
let mut header = XLogRecord {
xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + data.len()) as u32,
xl_xid: 0,
xl_prev: prev_lsn.into(),
xl_info: info,
xl_rmid: rmid,
__bindgen_padding_0: [0; 2],
xl_crc: 0, // see below
};
// Compute the CRC checksum for the data, and the header up to the CRC field.
let mut crc = 0;
crc = crc32c_append(crc, &data_header);
crc = crc32c_append(crc, &data);
crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]);
header.xl_crc = crc;
// Encode the final header and record.
let header = header.encode().unwrap();
[header, data_header, data].concat().into()
}
/// Injects page headers on 8KB page boundaries. Takes the current LSN position where the record
/// Inserts page headers on 8KB page boundaries. Takes the current LSN position where the record
/// is to be appended.
fn encode_pages(record: Bytes, mut lsn: Lsn) -> Bytes {
fn insert_pages(record: Bytes, mut lsn: Lsn) -> Bytes {
// Fast path: record fits in current page, and the page already has a header.
if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 {
return record;
@@ -173,31 +189,71 @@ impl WalGenerator {
}
[record, Bytes::from(vec![0; padding])].concat().into()
}
/// Generates a record with an arbitrary payload at the current LSN, then increments the LSN.
pub fn generate_record(&mut self, data: Bytes, rmid: u8, info: u8) -> Bytes {
let record = Self::encode_record(data, rmid, info, self.prev_lsn);
let record = Self::encode_pages(record, self.lsn);
let record = Self::pad_record(record, self.lsn);
self.prev_lsn = self.lsn;
self.lsn += record.len() as u64;
record
}
/// Generates a logical message at the current LSN. Can be used to construct arbitrary messages.
pub fn generate_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> Bytes {
let data = Self::encode_logical_message(prefix, message);
self.generate_record(data, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE)
}
}
/// Generate WAL records as an iterator.
impl Iterator for WalGenerator {
/// Generates WAL records as an iterator.
impl<R: RecordGenerator> Iterator for WalGenerator<R> {
type Item = (Lsn, Bytes);
fn next(&mut self) -> Option<Self::Item> {
let lsn = self.lsn;
let record = self.generate_logical_message(Self::PREFIX, Self::MESSAGE);
Some((lsn, record))
let record = self.record_generator.next()?;
Some(self.append_record(record))
}
}
/// Generates logical message records (effectively noops) with a fixed message.
pub struct LogicalMessageGenerator {
prefix: CString,
message: Vec<u8>,
}
impl LogicalMessageGenerator {
const DB_ID: u32 = 0; // hardcoded for now
const RM_ID: RmgrId = RM_LOGICALMSG_ID;
const INFO: u8 = XLOG_LOGICAL_MESSAGE;
/// Creates a new LogicalMessageGenerator.
pub fn new(prefix: &CStr, message: &[u8]) -> Self {
Self {
prefix: prefix.to_owned(),
message: message.to_owned(),
}
}
/// Encodes a logical message.
fn encode(prefix: &CStr, message: &[u8]) -> Bytes {
let prefix = prefix.to_bytes_with_nul();
let header = XlLogicalMessage {
db_id: Self::DB_ID,
transactional: 0,
prefix_size: prefix.len() as u64,
message_size: message.len() as u64,
};
[&header.encode(), prefix, message].concat().into()
}
}
impl Iterator for LogicalMessageGenerator {
type Item = Record;
fn next(&mut self) -> Option<Self::Item> {
Some(Record {
rmid: Self::RM_ID,
info: Self::INFO,
data: Self::encode(&self.prefix, &self.message),
})
}
}
impl WalGenerator<LogicalMessageGenerator> {
/// Convenience method for appending a WAL record with an arbitrary logical message at the
/// current WAL LSN position. Returns the start LSN and resulting WAL bytes.
pub fn append_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> (Lsn, Bytes) {
let record = Record {
rmid: LogicalMessageGenerator::RM_ID,
info: LogicalMessageGenerator::INFO,
data: LogicalMessageGenerator::encode(prefix, message),
};
self.append_record(record)
}
}

View File

@@ -12,9 +12,9 @@ use super::bindings::{
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
};
use super::wal_generator::WalGenerator;
use super::wal_generator::LogicalMessageGenerator;
use super::PG_MAJORVERSION;
use crate::pg_constants::{self, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE};
use crate::pg_constants;
use crate::PG_TLI;
use crate::{uint32, uint64, Oid};
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
@@ -493,12 +493,10 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Bytes {
// This function can take untrusted input, so discard any NUL bytes in the prefix string.
let prefix = CString::new(prefix.replace('\0', "")).expect("no NULs");
let message = message.as_bytes();
WalGenerator::encode_record(
WalGenerator::encode_logical_message(&prefix, message),
RM_LOGICALMSG_ID,
XLOG_LOGICAL_MESSAGE,
Lsn(0),
)
LogicalMessageGenerator::new(&prefix, message)
.next()
.unwrap()
.encode(Lsn(0))
}
#[cfg(test)]

View File

@@ -1,7 +1,7 @@
use std::{ffi::CStr, sync::Arc};
use parking_lot::{Mutex, MutexGuard};
use postgres_ffi::v16::wal_generator::WalGenerator;
use postgres_ffi::v16::wal_generator::{LogicalMessageGenerator, WalGenerator};
use utils::lsn::Lsn;
use super::block_storage::BlockStorage;
@@ -18,7 +18,7 @@ impl DiskWalProposer {
internal_available_lsn: Lsn(0),
prev_lsn: Lsn(0),
disk: BlockStorage::new(),
wal_generator: WalGenerator::new(),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
}),
})
}
@@ -36,7 +36,7 @@ pub struct State {
// actual WAL storage
disk: BlockStorage,
// WAL record generator
wal_generator: WalGenerator,
wal_generator: WalGenerator<LogicalMessageGenerator>,
}
impl State {
@@ -64,7 +64,7 @@ impl State {
/// Inserts a logical record in the WAL at the current LSN.
pub fn insert_logical_message(&mut self, prefix: &CStr, msg: &[u8]) {
let record = self.wal_generator.generate_logical_message(prefix, msg);
let (_, record) = self.wal_generator.append_logical_message(prefix, msg);
self.disk.write(self.internal_available_lsn.into(), &record);
self.prev_lsn = self.internal_available_lsn;
self.internal_available_lsn += record.len() as u64;