From 96e35e11a6e092429015d78120f8d12dcc542077 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 30 Oct 2024 12:46:39 +0100 Subject: [PATCH] postgres_ffi: add WAL generator for tests/benchmarks (#9503) ## Problem We don't have a convenient way to generate WAL records for benchmarks and tests. ## Summary of changes Adds a WAL generator, exposed as an iterator. It currently only generates logical messages (noops), but will be extended to write actual table rows later. Some existing code for WAL generation has been replaced with this generator, to reduce duplication. --- libs/postgres_ffi/src/lib.rs | 1 + libs/postgres_ffi/src/wal_generator.rs | 203 +++++++++++++ libs/postgres_ffi/src/xlog_utils.rs | 76 +---- libs/utils/src/lsn.rs | 2 +- safekeeper/src/json_ctrl.rs | 3 +- .../tests/walproposer_sim/simulation.rs | 3 +- .../tests/walproposer_sim/walproposer_disk.rs | 270 +----------------- 7 files changed, 235 insertions(+), 323 deletions(-) create mode 100644 libs/postgres_ffi/src/wal_generator.rs diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 6b219488ac..0239b56d9c 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -36,6 +36,7 @@ macro_rules! postgres_ffi { pub mod controlfile_utils; pub mod nonrelfile_utils; pub mod wal_craft_test_export; + pub mod wal_generator; pub mod waldecoder_handler; pub mod xlog_utils; diff --git a/libs/postgres_ffi/src/wal_generator.rs b/libs/postgres_ffi/src/wal_generator.rs new file mode 100644 index 0000000000..97968c269b --- /dev/null +++ b/libs/postgres_ffi/src/wal_generator.rs @@ -0,0 +1,203 @@ +use std::ffi::CStr; + +use bytes::{Bytes, BytesMut}; +use crc32c::crc32c_append; +use utils::lsn::Lsn; + +use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC}; +use super::xlog_utils::{ + XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE, + XLP_FIRST_IS_CONTRECORD, +}; +use super::XLogRecord; +use crate::pg_constants::{ + RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG, + XLR_BLOCK_ID_DATA_SHORT, +}; +use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; + +/// Generates binary WAL records for use in tests and benchmarks. Currently only generates logical +/// messages (effectively noops) with a fixed payload. It is used as an iterator which yields +/// encoded bytes for a single WAL record, including internal page headers if it spans pages. +/// Concatenating the bytes will yield a complete, well-formed WAL, which can be chunked at segment +/// boundaries if desired. Not optimized for performance. +/// +/// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this +/// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`). +/// +/// A WAL is split into 16 MB segments. Each segment is split into 8 KB pages, with headers. +/// Records are arbitrary length, 8-byte aligned, and may span pages. The layout is e.g.: +/// +/// | Segment 1 | Segment 2 | Segment 3 | +/// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 | +/// | R1 | R2 |R3| R4 | R5 | R6 | R7 | R8 | +/// +/// TODO: support generating actual tables and rows. +#[derive(Default)] +pub struct WalGenerator { + /// Current LSN to append the next record at. + /// + /// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should + /// ensure that the LSN is on a valid record boundary (i.e. we can't start appending in the + /// middle on an existing record or header, or beyond the end of the existing WAL). + pub lsn: Lsn, + /// The starting LSN of the previous record. Used in WAL record headers. The Safekeeper doesn't + /// care about this, unlike Postgres, but we include it for completeness. + pub prev_lsn: Lsn, +} + +impl WalGenerator { + // For now, hardcode the message payload. + // TODO: support specifying the payload size. + const PREFIX: &CStr = c"prefix"; + const MESSAGE: &[u8] = b"message"; + + // Hardcode the sys, timeline, and DB IDs. We can make them configurable if we care about them. + const SYS_ID: u64 = 0; + const TIMELINE_ID: u32 = 1; + const DB_ID: u32 = 0; + + /// Creates a new WAL generator, which emits logical message records (noops). + pub fn new() -> Self { + Self::default() + } + + /// Encodes a logical message (basically a noop), with the given prefix and message. + pub(crate) fn encode_logical_message(prefix: &CStr, message: &[u8]) -> Bytes { + let prefix = prefix.to_bytes_with_nul(); + let header = XlLogicalMessage { + db_id: Self::DB_ID, + transactional: 0, + prefix_size: prefix.len() as u64, + message_size: message.len() as u64, + }; + [&header.encode(), prefix, message].concat().into() + } + + /// Encode a WAL record with the given payload data (e.g. a logical message). + pub(crate) fn encode_record(data: Bytes, rmid: u8, info: u8, prev_lsn: Lsn) -> Bytes { + // Prefix data with block ID and length. + let data_header = Bytes::from(match data.len() { + 0 => vec![], + 1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, data.len() as u8], + 256.. => { + let len_bytes = (data.len() as u32).to_le_bytes(); + [&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat() + } + }); + + // Construct the WAL record header. + let mut header = XLogRecord { + xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + data.len()) as u32, + xl_xid: 0, + xl_prev: prev_lsn.into(), + xl_info: info, + xl_rmid: rmid, + __bindgen_padding_0: [0; 2], + xl_crc: 0, // see below + }; + + // Compute the CRC checksum for the data, and the header up to the CRC field. + let mut crc = 0; + crc = crc32c_append(crc, &data_header); + crc = crc32c_append(crc, &data); + crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]); + header.xl_crc = crc; + + // Encode the final header and record. + let header = header.encode().unwrap(); + + [header, data_header, data].concat().into() + } + + /// Injects page headers on 8KB page boundaries. Takes the current LSN position where the record + /// is to be appended. + fn encode_pages(record: Bytes, mut lsn: Lsn) -> Bytes { + // Fast path: record fits in current page, and the page already has a header. + if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 { + return record; + } + + let mut pages = BytesMut::new(); + let mut remaining = record.clone(); // Bytes::clone() is cheap + while !remaining.is_empty() { + // At new page boundary, inject page header. + if lsn.block_offset() == 0 { + let mut page_header = XLogPageHeaderData { + xlp_magic: XLOG_PAGE_MAGIC as u16, + xlp_info: XLP_BKP_REMOVABLE, + xlp_tli: Self::TIMELINE_ID, + xlp_pageaddr: lsn.0, + xlp_rem_len: 0, + __bindgen_padding_0: [0; 4], + }; + // If the record was split across page boundaries, mark as continuation. + if remaining.len() < record.len() { + page_header.xlp_rem_len = remaining.len() as u32; + page_header.xlp_info |= XLP_FIRST_IS_CONTRECORD; + } + // At start of segment, use a long page header. + let page_header = if lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 { + page_header.xlp_info |= XLP_LONG_HEADER; + XLogLongPageHeaderData { + std: page_header, + xlp_sysid: Self::SYS_ID, + xlp_seg_size: WAL_SEGMENT_SIZE as u32, + xlp_xlog_blcksz: XLOG_BLCKSZ as u32, + } + .encode() + .unwrap() + } else { + page_header.encode().unwrap() + }; + pages.extend_from_slice(&page_header); + lsn += page_header.len() as u64; + } + + // Append the record up to the next page boundary, if any. + let page_free = lsn.remaining_in_block() as usize; + let chunk = remaining.split_to(std::cmp::min(page_free, remaining.len())); + pages.extend_from_slice(&chunk); + lsn += chunk.len() as u64; + } + pages.freeze() + } + + /// Records must be 8-byte aligned. Take an encoded record (including any injected page + /// boundaries), starting at the given LSN, and add any necessary padding at the end. + fn pad_record(record: Bytes, mut lsn: Lsn) -> Bytes { + lsn += record.len() as u64; + let padding = lsn.calc_padding(8u64) as usize; + if padding == 0 { + return record; + } + [record, Bytes::from(vec![0; padding])].concat().into() + } + + /// Generates a record with an arbitrary payload at the current LSN, then increments the LSN. + pub fn generate_record(&mut self, data: Bytes, rmid: u8, info: u8) -> Bytes { + let record = Self::encode_record(data, rmid, info, self.prev_lsn); + let record = Self::encode_pages(record, self.lsn); + let record = Self::pad_record(record, self.lsn); + self.prev_lsn = self.lsn; + self.lsn += record.len() as u64; + record + } + + /// Generates a logical message at the current LSN. Can be used to construct arbitrary messages. + pub fn generate_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> Bytes { + let data = Self::encode_logical_message(prefix, message); + self.generate_record(data, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE) + } +} + +/// Generate WAL records as an iterator. +impl Iterator for WalGenerator { + type Item = (Lsn, Bytes); + + fn next(&mut self) -> Option { + let lsn = self.lsn; + let record = self.generate_logical_message(Self::PREFIX, Self::MESSAGE); + Some((lsn, record)) + } +} diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index a636bd2a97..78a965174f 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -7,15 +7,14 @@ // have been named the same as the corresponding PostgreSQL functions instead. // -use crc32c::crc32c_append; - use super::super::waldecoder::WalStreamDecoder; use super::bindings::{ CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz, XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC, }; +use super::wal_generator::WalGenerator; use super::PG_MAJORVERSION; -use crate::pg_constants; +use crate::pg_constants::{self, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE}; use crate::PG_TLI; use crate::{uint32, uint64, Oid}; use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; @@ -26,7 +25,7 @@ use bytes::{Buf, Bytes}; use log::*; use serde::Serialize; -use std::ffi::OsStr; +use std::ffi::{CString, OsStr}; use std::fs::File; use std::io::prelude::*; use std::io::ErrorKind; @@ -39,6 +38,7 @@ use utils::bin_ser::SerializeError; use utils::lsn::Lsn; pub const XLOG_FNAME_LEN: usize = 24; +pub const XLP_BKP_REMOVABLE: u16 = 0x0004; pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001; pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8; pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2; @@ -489,64 +489,16 @@ impl XlLogicalMessage { /// Create new WAL record for non-transactional logical message. /// Used for creating artificial WAL for tests, as LogicalMessage /// record is basically no-op. -/// -/// NOTE: This leaves the xl_prev field zero. The safekeeper and -/// pageserver tolerate that, but PostgreSQL does not. -pub fn encode_logical_message(prefix: &str, message: &str) -> Vec { - let mut prefix_bytes: Vec = Vec::with_capacity(prefix.len() + 1); - prefix_bytes.write_all(prefix.as_bytes()).unwrap(); - prefix_bytes.push(0); - - let message_bytes = message.as_bytes(); - - let logical_message = XlLogicalMessage { - db_id: 0, - transactional: 0, - prefix_size: prefix_bytes.len() as u64, - message_size: message_bytes.len() as u64, - }; - - let mainrdata = logical_message.encode(); - let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); - // only short mainrdata is supported for now - assert!(mainrdata_len <= 255); - let mainrdata_len = mainrdata_len as u8; - - let mut data: Vec = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; - data.extend_from_slice(&mainrdata); - data.extend_from_slice(&prefix_bytes); - data.extend_from_slice(message_bytes); - - let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len(); - - let mut header = XLogRecord { - xl_tot_len: total_len as u32, - xl_xid: 0, - xl_prev: 0, - xl_info: 0, - xl_rmid: 21, - __bindgen_padding_0: [0u8; 2usize], - xl_crc: 0, // crc will be calculated later - }; - - let header_bytes = header.encode().expect("failed to encode header"); - let crc = crc32c_append(0, &data); - let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]); - header.xl_crc = crc; - - let mut wal: Vec = Vec::new(); - wal.extend_from_slice(&header.encode().expect("failed to encode header")); - wal.extend_from_slice(&data); - - // WAL start position must be aligned at 8 bytes, - // this will add padding for the next WAL record. - const PADDING: usize = 8; - let padding_rem = wal.len() % PADDING; - if padding_rem != 0 { - wal.resize(wal.len() + PADDING - padding_rem, 0); - } - - wal +pub fn encode_logical_message(prefix: &str, message: &str) -> Bytes { + // This function can take untrusted input, so discard any NUL bytes in the prefix string. + let prefix = CString::new(prefix.replace('\0', "")).expect("no NULs"); + let message = message.as_bytes(); + WalGenerator::encode_record( + WalGenerator::encode_logical_message(&prefix, message), + RM_LOGICALMSG_ID, + XLOG_LOGICAL_MESSAGE, + Lsn(0), + ) } #[cfg(test)] diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 3ec2c130bd..524f3604a1 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -12,7 +12,7 @@ use crate::seqwait::MonotonicCounter; pub const XLOG_BLCKSZ: u32 = 8192; /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr -#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash)] +#[derive(Clone, Copy, Default, Eq, Ord, PartialEq, PartialOrd, Hash)] pub struct Lsn(pub u64); impl Serialize for Lsn { diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 7fe924a08e..0573ea81e7 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -7,7 +7,6 @@ //! use anyhow::Context; -use bytes::Bytes; use postgres_backend::QueryError; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -176,7 +175,7 @@ pub async fn append_logical_message( truncate_lsn: msg.truncate_lsn, proposer_uuid: [0u8; 16], }, - wal_data: Bytes::from(wal_data), + wal_data, }); let response = tli.process_msg(&append_request).await?; diff --git a/safekeeper/tests/walproposer_sim/simulation.rs b/safekeeper/tests/walproposer_sim/simulation.rs index 0d7aaf517b..fabf450eef 100644 --- a/safekeeper/tests/walproposer_sim/simulation.rs +++ b/safekeeper/tests/walproposer_sim/simulation.rs @@ -151,8 +151,7 @@ impl WalProposer { for _ in 0..cnt { self.disk .lock() - .insert_logical_message("prefix", b"message") - .expect("failed to generate logical message"); + .insert_logical_message(c"prefix", b"message"); } let end_lsn = self.disk.lock().flush_rec_ptr(); diff --git a/safekeeper/tests/walproposer_sim/walproposer_disk.rs b/safekeeper/tests/walproposer_sim/walproposer_disk.rs index 123cd6bad6..f70cd65dfc 100644 --- a/safekeeper/tests/walproposer_sim/walproposer_disk.rs +++ b/safekeeper/tests/walproposer_sim/walproposer_disk.rs @@ -1,24 +1,7 @@ -use std::{ffi::CString, sync::Arc}; +use std::{ffi::CStr, sync::Arc}; -use byteorder::{LittleEndian, WriteBytesExt}; -use crc32c::crc32c_append; use parking_lot::{Mutex, MutexGuard}; -use postgres_ffi::{ - pg_constants::{ - RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG, - XLR_BLOCK_ID_DATA_SHORT, - }, - v16::{ - wal_craft_test_export::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC}, - xlog_utils::{ - XLogSegNoOffsetToRecPtr, XlLogicalMessage, XLOG_RECORD_CRC_OFFS, - XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD, - XLP_FIRST_IS_CONTRECORD, - }, - XLogRecord, - }, - WAL_SEGMENT_SIZE, XLOG_BLCKSZ, -}; +use postgres_ffi::v16::wal_generator::WalGenerator; use utils::lsn::Lsn; use super::block_storage::BlockStorage; @@ -35,6 +18,7 @@ impl DiskWalProposer { internal_available_lsn: Lsn(0), prev_lsn: Lsn(0), disk: BlockStorage::new(), + wal_generator: WalGenerator::new(), }), }) } @@ -51,6 +35,8 @@ pub struct State { prev_lsn: Lsn, // actual WAL storage disk: BlockStorage, + // WAL record generator + wal_generator: WalGenerator, } impl State { @@ -66,6 +52,9 @@ impl State { /// Update the internal available LSN to the given value. pub fn reset_to(&mut self, lsn: Lsn) { self.internal_available_lsn = lsn; + self.prev_lsn = Lsn(0); // Safekeeper doesn't care if this is omitted + self.wal_generator.lsn = self.internal_available_lsn; + self.wal_generator.prev_lsn = self.prev_lsn; } /// Get current LSN. @@ -73,242 +62,11 @@ impl State { self.internal_available_lsn } - /// Generate a new WAL record at the current LSN. - pub fn insert_logical_message(&mut self, prefix: &str, msg: &[u8]) -> anyhow::Result<()> { - let prefix_cstr = CString::new(prefix)?; - let prefix_bytes = prefix_cstr.as_bytes_with_nul(); - - let lm = XlLogicalMessage { - db_id: 0, - transactional: 0, - prefix_size: prefix_bytes.len() as ::std::os::raw::c_ulong, - message_size: msg.len() as ::std::os::raw::c_ulong, - }; - - let record_bytes = lm.encode(); - let rdatas: Vec<&[u8]> = vec![&record_bytes, prefix_bytes, msg]; - insert_wal_record(self, rdatas, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE) - } -} - -fn insert_wal_record( - state: &mut State, - rdatas: Vec<&[u8]>, - rmid: u8, - info: u8, -) -> anyhow::Result<()> { - // bytes right after the header, in the same rdata block - let mut scratch = Vec::new(); - let mainrdata_len: usize = rdatas.iter().map(|rdata| rdata.len()).sum(); - - if mainrdata_len > 0 { - if mainrdata_len > 255 { - scratch.push(XLR_BLOCK_ID_DATA_LONG); - // TODO: verify endiness - let _ = scratch.write_u32::(mainrdata_len as u32); - } else { - scratch.push(XLR_BLOCK_ID_DATA_SHORT); - scratch.push(mainrdata_len as u8); - } - } - - let total_len: u32 = (XLOG_SIZE_OF_XLOG_RECORD + scratch.len() + mainrdata_len) as u32; - let size = maxalign(total_len); - assert!(size as usize > XLOG_SIZE_OF_XLOG_RECORD); - - let start_bytepos = recptr_to_bytepos(state.internal_available_lsn); - let end_bytepos = start_bytepos + size as u64; - - let start_recptr = bytepos_to_recptr(start_bytepos); - let end_recptr = bytepos_to_recptr(end_bytepos); - - assert!(recptr_to_bytepos(start_recptr) == start_bytepos); - assert!(recptr_to_bytepos(end_recptr) == end_bytepos); - - let mut crc = crc32c_append(0, &scratch); - for rdata in &rdatas { - crc = crc32c_append(crc, rdata); - } - - let mut header = XLogRecord { - xl_tot_len: total_len, - xl_xid: 0, - xl_prev: state.prev_lsn.0, - xl_info: info, - xl_rmid: rmid, - __bindgen_padding_0: [0u8; 2usize], - xl_crc: crc, - }; - - // now we have the header and can finish the crc - let header_bytes = header.encode()?; - let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]); - header.xl_crc = crc; - - let mut header_bytes = header.encode()?.to_vec(); - assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_RECORD); - - header_bytes.extend_from_slice(&scratch); - - // finish rdatas - let mut rdatas = rdatas; - rdatas.insert(0, &header_bytes); - - write_walrecord_to_disk(state, total_len as u64, rdatas, start_recptr, end_recptr)?; - - state.internal_available_lsn = end_recptr; - state.prev_lsn = start_recptr; - Ok(()) -} - -fn write_walrecord_to_disk( - state: &mut State, - total_len: u64, - rdatas: Vec<&[u8]>, - start: Lsn, - end: Lsn, -) -> anyhow::Result<()> { - let mut curr_ptr = start; - let mut freespace = insert_freespace(curr_ptr); - let mut written: usize = 0; - - assert!(freespace >= size_of::()); - - for mut rdata in rdatas { - while rdata.len() >= freespace { - assert!( - curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD - || freespace == 0 - ); - - state.write(curr_ptr.0, &rdata[..freespace]); - rdata = &rdata[freespace..]; - written += freespace; - curr_ptr = Lsn(curr_ptr.0 + freespace as u64); - - let mut new_page = XLogPageHeaderData { - xlp_magic: XLOG_PAGE_MAGIC as u16, - xlp_info: XLP_BKP_REMOVABLE, - xlp_tli: 1, - xlp_pageaddr: curr_ptr.0, - xlp_rem_len: (total_len - written as u64) as u32, - ..Default::default() // Put 0 in padding fields. - }; - if new_page.xlp_rem_len > 0 { - new_page.xlp_info |= XLP_FIRST_IS_CONTRECORD; - } - - if curr_ptr.segment_offset(WAL_SEGMENT_SIZE) == 0 { - new_page.xlp_info |= XLP_LONG_HEADER; - let long_page = XLogLongPageHeaderData { - std: new_page, - xlp_sysid: 0, - xlp_seg_size: WAL_SEGMENT_SIZE as u32, - xlp_xlog_blcksz: XLOG_BLCKSZ as u32, - }; - let header_bytes = long_page.encode()?; - assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_LONG_PHD); - state.write(curr_ptr.0, &header_bytes); - curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64); - } else { - let header_bytes = new_page.encode()?; - assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_SHORT_PHD); - state.write(curr_ptr.0, &header_bytes); - curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64); - } - freespace = insert_freespace(curr_ptr); - } - - assert!( - curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD - || rdata.is_empty() - ); - state.write(curr_ptr.0, rdata); - curr_ptr = Lsn(curr_ptr.0 + rdata.len() as u64); - written += rdata.len(); - freespace -= rdata.len(); - } - - assert!(written == total_len as usize); - curr_ptr.0 = maxalign(curr_ptr.0); - assert!(curr_ptr == end); - Ok(()) -} - -fn maxalign(size: T) -> T -where - T: std::ops::BitAnd - + std::ops::Add - + std::ops::Not - + From, -{ - (size + T::from(7)) & !T::from(7) -} - -fn insert_freespace(ptr: Lsn) -> usize { - if ptr.block_offset() == 0 { - 0 - } else { - (XLOG_BLCKSZ as u64 - ptr.block_offset()) as usize - } -} - -const XLP_BKP_REMOVABLE: u16 = 0x0004; -const USABLE_BYTES_IN_PAGE: u64 = (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64; -const USABLE_BYTES_IN_SEGMENT: u64 = ((WAL_SEGMENT_SIZE / XLOG_BLCKSZ) as u64 - * USABLE_BYTES_IN_PAGE) - - (XLOG_SIZE_OF_XLOG_RECORD - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64; - -fn bytepos_to_recptr(bytepos: u64) -> Lsn { - let fullsegs = bytepos / USABLE_BYTES_IN_SEGMENT; - let mut bytesleft = bytepos % USABLE_BYTES_IN_SEGMENT; - - let seg_offset = if bytesleft < (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64 { - // fits on first page of segment - bytesleft + XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 - } else { - // account for the first page on segment with long header - bytesleft -= (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64; - let fullpages = bytesleft / USABLE_BYTES_IN_PAGE; - bytesleft %= USABLE_BYTES_IN_PAGE; - - XLOG_BLCKSZ as u64 - + fullpages * XLOG_BLCKSZ as u64 - + bytesleft - + XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 - }; - - Lsn(XLogSegNoOffsetToRecPtr( - fullsegs, - seg_offset as u32, - WAL_SEGMENT_SIZE, - )) -} - -fn recptr_to_bytepos(ptr: Lsn) -> u64 { - let fullsegs = ptr.segment_number(WAL_SEGMENT_SIZE); - let offset = ptr.segment_offset(WAL_SEGMENT_SIZE) as u64; - - let fullpages = offset / XLOG_BLCKSZ as u64; - let offset = offset % XLOG_BLCKSZ as u64; - - if fullpages == 0 { - fullsegs * USABLE_BYTES_IN_SEGMENT - + if offset > 0 { - assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64); - offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 - } else { - 0 - } - } else { - fullsegs * USABLE_BYTES_IN_SEGMENT - + (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64 - + (fullpages - 1) * USABLE_BYTES_IN_PAGE - + if offset > 0 { - assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64); - offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 - } else { - 0 - } + /// Inserts a logical record in the WAL at the current LSN. + pub fn insert_logical_message(&mut self, prefix: &CStr, msg: &[u8]) { + let record = self.wal_generator.generate_logical_message(prefix, msg); + self.disk.write(self.internal_available_lsn.into(), &record); + self.prev_lsn = self.internal_available_lsn; + self.internal_available_lsn += record.len() as u64; } }