diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 6b219488ac..0239b56d9c 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -36,6 +36,7 @@ macro_rules! postgres_ffi { pub mod controlfile_utils; pub mod nonrelfile_utils; pub mod wal_craft_test_export; + pub mod wal_generator; pub mod waldecoder_handler; pub mod xlog_utils; diff --git a/libs/postgres_ffi/src/wal_generator.rs b/libs/postgres_ffi/src/wal_generator.rs new file mode 100644 index 0000000000..97968c269b --- /dev/null +++ b/libs/postgres_ffi/src/wal_generator.rs @@ -0,0 +1,203 @@ +use std::ffi::CStr; + +use bytes::{Bytes, BytesMut}; +use crc32c::crc32c_append; +use utils::lsn::Lsn; + +use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC}; +use super::xlog_utils::{ + XlLogicalMessage, XLOG_RECORD_CRC_OFFS, XLOG_SIZE_OF_XLOG_RECORD, XLP_BKP_REMOVABLE, + XLP_FIRST_IS_CONTRECORD, +}; +use super::XLogRecord; +use crate::pg_constants::{ + RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG, + XLR_BLOCK_ID_DATA_SHORT, +}; +use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; + +/// Generates binary WAL records for use in tests and benchmarks. Currently only generates logical +/// messages (effectively noops) with a fixed payload. It is used as an iterator which yields +/// encoded bytes for a single WAL record, including internal page headers if it spans pages. +/// Concatenating the bytes will yield a complete, well-formed WAL, which can be chunked at segment +/// boundaries if desired. Not optimized for performance. +/// +/// The WAL format is version-dependant (see e.g. `XLOG_PAGE_MAGIC`), so make sure to import this +/// for the appropriate Postgres version (e.g. `postgres_ffi::v17::wal_generator::WalGenerator`). +/// +/// A WAL is split into 16 MB segments. Each segment is split into 8 KB pages, with headers. +/// Records are arbitrary length, 8-byte aligned, and may span pages. The layout is e.g.: +/// +/// | Segment 1 | Segment 2 | Segment 3 | +/// | Page 1 | Page 2 | Page 3 | Page 4 | Page 5 | Page 6 | Page 7 | Page 8 | Page 9 | +/// | R1 | R2 |R3| R4 | R5 | R6 | R7 | R8 | +/// +/// TODO: support generating actual tables and rows. +#[derive(Default)] +pub struct WalGenerator { + /// Current LSN to append the next record at. + /// + /// Callers can modify this (and prev_lsn) to restart generation at a different LSN, but should + /// ensure that the LSN is on a valid record boundary (i.e. we can't start appending in the + /// middle on an existing record or header, or beyond the end of the existing WAL). + pub lsn: Lsn, + /// The starting LSN of the previous record. Used in WAL record headers. The Safekeeper doesn't + /// care about this, unlike Postgres, but we include it for completeness. + pub prev_lsn: Lsn, +} + +impl WalGenerator { + // For now, hardcode the message payload. + // TODO: support specifying the payload size. + const PREFIX: &CStr = c"prefix"; + const MESSAGE: &[u8] = b"message"; + + // Hardcode the sys, timeline, and DB IDs. We can make them configurable if we care about them. + const SYS_ID: u64 = 0; + const TIMELINE_ID: u32 = 1; + const DB_ID: u32 = 0; + + /// Creates a new WAL generator, which emits logical message records (noops). + pub fn new() -> Self { + Self::default() + } + + /// Encodes a logical message (basically a noop), with the given prefix and message. + pub(crate) fn encode_logical_message(prefix: &CStr, message: &[u8]) -> Bytes { + let prefix = prefix.to_bytes_with_nul(); + let header = XlLogicalMessage { + db_id: Self::DB_ID, + transactional: 0, + prefix_size: prefix.len() as u64, + message_size: message.len() as u64, + }; + [&header.encode(), prefix, message].concat().into() + } + + /// Encode a WAL record with the given payload data (e.g. a logical message). + pub(crate) fn encode_record(data: Bytes, rmid: u8, info: u8, prev_lsn: Lsn) -> Bytes { + // Prefix data with block ID and length. + let data_header = Bytes::from(match data.len() { + 0 => vec![], + 1..=255 => vec![XLR_BLOCK_ID_DATA_SHORT, data.len() as u8], + 256.. => { + let len_bytes = (data.len() as u32).to_le_bytes(); + [&[XLR_BLOCK_ID_DATA_LONG], len_bytes.as_slice()].concat() + } + }); + + // Construct the WAL record header. + let mut header = XLogRecord { + xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + data_header.len() + data.len()) as u32, + xl_xid: 0, + xl_prev: prev_lsn.into(), + xl_info: info, + xl_rmid: rmid, + __bindgen_padding_0: [0; 2], + xl_crc: 0, // see below + }; + + // Compute the CRC checksum for the data, and the header up to the CRC field. + let mut crc = 0; + crc = crc32c_append(crc, &data_header); + crc = crc32c_append(crc, &data); + crc = crc32c_append(crc, &header.encode().unwrap()[0..XLOG_RECORD_CRC_OFFS]); + header.xl_crc = crc; + + // Encode the final header and record. + let header = header.encode().unwrap(); + + [header, data_header, data].concat().into() + } + + /// Injects page headers on 8KB page boundaries. Takes the current LSN position where the record + /// is to be appended. + fn encode_pages(record: Bytes, mut lsn: Lsn) -> Bytes { + // Fast path: record fits in current page, and the page already has a header. + if lsn.remaining_in_block() as usize >= record.len() && lsn.block_offset() > 0 { + return record; + } + + let mut pages = BytesMut::new(); + let mut remaining = record.clone(); // Bytes::clone() is cheap + while !remaining.is_empty() { + // At new page boundary, inject page header. + if lsn.block_offset() == 0 { + let mut page_header = XLogPageHeaderData { + xlp_magic: XLOG_PAGE_MAGIC as u16, + xlp_info: XLP_BKP_REMOVABLE, + xlp_tli: Self::TIMELINE_ID, + xlp_pageaddr: lsn.0, + xlp_rem_len: 0, + __bindgen_padding_0: [0; 4], + }; + // If the record was split across page boundaries, mark as continuation. + if remaining.len() < record.len() { + page_header.xlp_rem_len = remaining.len() as u32; + page_header.xlp_info |= XLP_FIRST_IS_CONTRECORD; + } + // At start of segment, use a long page header. + let page_header = if lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 { + page_header.xlp_info |= XLP_LONG_HEADER; + XLogLongPageHeaderData { + std: page_header, + xlp_sysid: Self::SYS_ID, + xlp_seg_size: WAL_SEGMENT_SIZE as u32, + xlp_xlog_blcksz: XLOG_BLCKSZ as u32, + } + .encode() + .unwrap() + } else { + page_header.encode().unwrap() + }; + pages.extend_from_slice(&page_header); + lsn += page_header.len() as u64; + } + + // Append the record up to the next page boundary, if any. + let page_free = lsn.remaining_in_block() as usize; + let chunk = remaining.split_to(std::cmp::min(page_free, remaining.len())); + pages.extend_from_slice(&chunk); + lsn += chunk.len() as u64; + } + pages.freeze() + } + + /// Records must be 8-byte aligned. Take an encoded record (including any injected page + /// boundaries), starting at the given LSN, and add any necessary padding at the end. + fn pad_record(record: Bytes, mut lsn: Lsn) -> Bytes { + lsn += record.len() as u64; + let padding = lsn.calc_padding(8u64) as usize; + if padding == 0 { + return record; + } + [record, Bytes::from(vec![0; padding])].concat().into() + } + + /// Generates a record with an arbitrary payload at the current LSN, then increments the LSN. + pub fn generate_record(&mut self, data: Bytes, rmid: u8, info: u8) -> Bytes { + let record = Self::encode_record(data, rmid, info, self.prev_lsn); + let record = Self::encode_pages(record, self.lsn); + let record = Self::pad_record(record, self.lsn); + self.prev_lsn = self.lsn; + self.lsn += record.len() as u64; + record + } + + /// Generates a logical message at the current LSN. Can be used to construct arbitrary messages. + pub fn generate_logical_message(&mut self, prefix: &CStr, message: &[u8]) -> Bytes { + let data = Self::encode_logical_message(prefix, message); + self.generate_record(data, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE) + } +} + +/// Generate WAL records as an iterator. +impl Iterator for WalGenerator { + type Item = (Lsn, Bytes); + + fn next(&mut self) -> Option { + let lsn = self.lsn; + let record = self.generate_logical_message(Self::PREFIX, Self::MESSAGE); + Some((lsn, record)) + } +} diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index a636bd2a97..78a965174f 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -7,15 +7,14 @@ // have been named the same as the corresponding PostgreSQL functions instead. // -use crc32c::crc32c_append; - use super::super::waldecoder::WalStreamDecoder; use super::bindings::{ CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz, XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC, }; +use super::wal_generator::WalGenerator; use super::PG_MAJORVERSION; -use crate::pg_constants; +use crate::pg_constants::{self, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE}; use crate::PG_TLI; use crate::{uint32, uint64, Oid}; use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; @@ -26,7 +25,7 @@ use bytes::{Buf, Bytes}; use log::*; use serde::Serialize; -use std::ffi::OsStr; +use std::ffi::{CString, OsStr}; use std::fs::File; use std::io::prelude::*; use std::io::ErrorKind; @@ -39,6 +38,7 @@ use utils::bin_ser::SerializeError; use utils::lsn::Lsn; pub const XLOG_FNAME_LEN: usize = 24; +pub const XLP_BKP_REMOVABLE: u16 = 0x0004; pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001; pub const XLP_REM_LEN_OFFS: usize = 2 + 2 + 4 + 8; pub const XLOG_RECORD_CRC_OFFS: usize = 4 + 4 + 8 + 1 + 1 + 2; @@ -489,64 +489,16 @@ impl XlLogicalMessage { /// Create new WAL record for non-transactional logical message. /// Used for creating artificial WAL for tests, as LogicalMessage /// record is basically no-op. -/// -/// NOTE: This leaves the xl_prev field zero. The safekeeper and -/// pageserver tolerate that, but PostgreSQL does not. -pub fn encode_logical_message(prefix: &str, message: &str) -> Vec { - let mut prefix_bytes: Vec = Vec::with_capacity(prefix.len() + 1); - prefix_bytes.write_all(prefix.as_bytes()).unwrap(); - prefix_bytes.push(0); - - let message_bytes = message.as_bytes(); - - let logical_message = XlLogicalMessage { - db_id: 0, - transactional: 0, - prefix_size: prefix_bytes.len() as u64, - message_size: message_bytes.len() as u64, - }; - - let mainrdata = logical_message.encode(); - let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); - // only short mainrdata is supported for now - assert!(mainrdata_len <= 255); - let mainrdata_len = mainrdata_len as u8; - - let mut data: Vec = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; - data.extend_from_slice(&mainrdata); - data.extend_from_slice(&prefix_bytes); - data.extend_from_slice(message_bytes); - - let total_len = XLOG_SIZE_OF_XLOG_RECORD + data.len(); - - let mut header = XLogRecord { - xl_tot_len: total_len as u32, - xl_xid: 0, - xl_prev: 0, - xl_info: 0, - xl_rmid: 21, - __bindgen_padding_0: [0u8; 2usize], - xl_crc: 0, // crc will be calculated later - }; - - let header_bytes = header.encode().expect("failed to encode header"); - let crc = crc32c_append(0, &data); - let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]); - header.xl_crc = crc; - - let mut wal: Vec = Vec::new(); - wal.extend_from_slice(&header.encode().expect("failed to encode header")); - wal.extend_from_slice(&data); - - // WAL start position must be aligned at 8 bytes, - // this will add padding for the next WAL record. - const PADDING: usize = 8; - let padding_rem = wal.len() % PADDING; - if padding_rem != 0 { - wal.resize(wal.len() + PADDING - padding_rem, 0); - } - - wal +pub fn encode_logical_message(prefix: &str, message: &str) -> Bytes { + // This function can take untrusted input, so discard any NUL bytes in the prefix string. + let prefix = CString::new(prefix.replace('\0', "")).expect("no NULs"); + let message = message.as_bytes(); + WalGenerator::encode_record( + WalGenerator::encode_logical_message(&prefix, message), + RM_LOGICALMSG_ID, + XLOG_LOGICAL_MESSAGE, + Lsn(0), + ) } #[cfg(test)] diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 3ec2c130bd..524f3604a1 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -12,7 +12,7 @@ use crate::seqwait::MonotonicCounter; pub const XLOG_BLCKSZ: u32 = 8192; /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr -#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash)] +#[derive(Clone, Copy, Default, Eq, Ord, PartialEq, PartialOrd, Hash)] pub struct Lsn(pub u64); impl Serialize for Lsn { diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 7fe924a08e..0573ea81e7 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -7,7 +7,6 @@ //! use anyhow::Context; -use bytes::Bytes; use postgres_backend::QueryError; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -176,7 +175,7 @@ pub async fn append_logical_message( truncate_lsn: msg.truncate_lsn, proposer_uuid: [0u8; 16], }, - wal_data: Bytes::from(wal_data), + wal_data, }); let response = tli.process_msg(&append_request).await?; diff --git a/safekeeper/tests/walproposer_sim/simulation.rs b/safekeeper/tests/walproposer_sim/simulation.rs index 0d7aaf517b..fabf450eef 100644 --- a/safekeeper/tests/walproposer_sim/simulation.rs +++ b/safekeeper/tests/walproposer_sim/simulation.rs @@ -151,8 +151,7 @@ impl WalProposer { for _ in 0..cnt { self.disk .lock() - .insert_logical_message("prefix", b"message") - .expect("failed to generate logical message"); + .insert_logical_message(c"prefix", b"message"); } let end_lsn = self.disk.lock().flush_rec_ptr(); diff --git a/safekeeper/tests/walproposer_sim/walproposer_disk.rs b/safekeeper/tests/walproposer_sim/walproposer_disk.rs index 123cd6bad6..f70cd65dfc 100644 --- a/safekeeper/tests/walproposer_sim/walproposer_disk.rs +++ b/safekeeper/tests/walproposer_sim/walproposer_disk.rs @@ -1,24 +1,7 @@ -use std::{ffi::CString, sync::Arc}; +use std::{ffi::CStr, sync::Arc}; -use byteorder::{LittleEndian, WriteBytesExt}; -use crc32c::crc32c_append; use parking_lot::{Mutex, MutexGuard}; -use postgres_ffi::{ - pg_constants::{ - RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLP_LONG_HEADER, XLR_BLOCK_ID_DATA_LONG, - XLR_BLOCK_ID_DATA_SHORT, - }, - v16::{ - wal_craft_test_export::{XLogLongPageHeaderData, XLogPageHeaderData, XLOG_PAGE_MAGIC}, - xlog_utils::{ - XLogSegNoOffsetToRecPtr, XlLogicalMessage, XLOG_RECORD_CRC_OFFS, - XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD, - XLP_FIRST_IS_CONTRECORD, - }, - XLogRecord, - }, - WAL_SEGMENT_SIZE, XLOG_BLCKSZ, -}; +use postgres_ffi::v16::wal_generator::WalGenerator; use utils::lsn::Lsn; use super::block_storage::BlockStorage; @@ -35,6 +18,7 @@ impl DiskWalProposer { internal_available_lsn: Lsn(0), prev_lsn: Lsn(0), disk: BlockStorage::new(), + wal_generator: WalGenerator::new(), }), }) } @@ -51,6 +35,8 @@ pub struct State { prev_lsn: Lsn, // actual WAL storage disk: BlockStorage, + // WAL record generator + wal_generator: WalGenerator, } impl State { @@ -66,6 +52,9 @@ impl State { /// Update the internal available LSN to the given value. pub fn reset_to(&mut self, lsn: Lsn) { self.internal_available_lsn = lsn; + self.prev_lsn = Lsn(0); // Safekeeper doesn't care if this is omitted + self.wal_generator.lsn = self.internal_available_lsn; + self.wal_generator.prev_lsn = self.prev_lsn; } /// Get current LSN. @@ -73,242 +62,11 @@ impl State { self.internal_available_lsn } - /// Generate a new WAL record at the current LSN. - pub fn insert_logical_message(&mut self, prefix: &str, msg: &[u8]) -> anyhow::Result<()> { - let prefix_cstr = CString::new(prefix)?; - let prefix_bytes = prefix_cstr.as_bytes_with_nul(); - - let lm = XlLogicalMessage { - db_id: 0, - transactional: 0, - prefix_size: prefix_bytes.len() as ::std::os::raw::c_ulong, - message_size: msg.len() as ::std::os::raw::c_ulong, - }; - - let record_bytes = lm.encode(); - let rdatas: Vec<&[u8]> = vec![&record_bytes, prefix_bytes, msg]; - insert_wal_record(self, rdatas, RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE) - } -} - -fn insert_wal_record( - state: &mut State, - rdatas: Vec<&[u8]>, - rmid: u8, - info: u8, -) -> anyhow::Result<()> { - // bytes right after the header, in the same rdata block - let mut scratch = Vec::new(); - let mainrdata_len: usize = rdatas.iter().map(|rdata| rdata.len()).sum(); - - if mainrdata_len > 0 { - if mainrdata_len > 255 { - scratch.push(XLR_BLOCK_ID_DATA_LONG); - // TODO: verify endiness - let _ = scratch.write_u32::(mainrdata_len as u32); - } else { - scratch.push(XLR_BLOCK_ID_DATA_SHORT); - scratch.push(mainrdata_len as u8); - } - } - - let total_len: u32 = (XLOG_SIZE_OF_XLOG_RECORD + scratch.len() + mainrdata_len) as u32; - let size = maxalign(total_len); - assert!(size as usize > XLOG_SIZE_OF_XLOG_RECORD); - - let start_bytepos = recptr_to_bytepos(state.internal_available_lsn); - let end_bytepos = start_bytepos + size as u64; - - let start_recptr = bytepos_to_recptr(start_bytepos); - let end_recptr = bytepos_to_recptr(end_bytepos); - - assert!(recptr_to_bytepos(start_recptr) == start_bytepos); - assert!(recptr_to_bytepos(end_recptr) == end_bytepos); - - let mut crc = crc32c_append(0, &scratch); - for rdata in &rdatas { - crc = crc32c_append(crc, rdata); - } - - let mut header = XLogRecord { - xl_tot_len: total_len, - xl_xid: 0, - xl_prev: state.prev_lsn.0, - xl_info: info, - xl_rmid: rmid, - __bindgen_padding_0: [0u8; 2usize], - xl_crc: crc, - }; - - // now we have the header and can finish the crc - let header_bytes = header.encode()?; - let crc = crc32c_append(crc, &header_bytes[0..XLOG_RECORD_CRC_OFFS]); - header.xl_crc = crc; - - let mut header_bytes = header.encode()?.to_vec(); - assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_RECORD); - - header_bytes.extend_from_slice(&scratch); - - // finish rdatas - let mut rdatas = rdatas; - rdatas.insert(0, &header_bytes); - - write_walrecord_to_disk(state, total_len as u64, rdatas, start_recptr, end_recptr)?; - - state.internal_available_lsn = end_recptr; - state.prev_lsn = start_recptr; - Ok(()) -} - -fn write_walrecord_to_disk( - state: &mut State, - total_len: u64, - rdatas: Vec<&[u8]>, - start: Lsn, - end: Lsn, -) -> anyhow::Result<()> { - let mut curr_ptr = start; - let mut freespace = insert_freespace(curr_ptr); - let mut written: usize = 0; - - assert!(freespace >= size_of::()); - - for mut rdata in rdatas { - while rdata.len() >= freespace { - assert!( - curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD - || freespace == 0 - ); - - state.write(curr_ptr.0, &rdata[..freespace]); - rdata = &rdata[freespace..]; - written += freespace; - curr_ptr = Lsn(curr_ptr.0 + freespace as u64); - - let mut new_page = XLogPageHeaderData { - xlp_magic: XLOG_PAGE_MAGIC as u16, - xlp_info: XLP_BKP_REMOVABLE, - xlp_tli: 1, - xlp_pageaddr: curr_ptr.0, - xlp_rem_len: (total_len - written as u64) as u32, - ..Default::default() // Put 0 in padding fields. - }; - if new_page.xlp_rem_len > 0 { - new_page.xlp_info |= XLP_FIRST_IS_CONTRECORD; - } - - if curr_ptr.segment_offset(WAL_SEGMENT_SIZE) == 0 { - new_page.xlp_info |= XLP_LONG_HEADER; - let long_page = XLogLongPageHeaderData { - std: new_page, - xlp_sysid: 0, - xlp_seg_size: WAL_SEGMENT_SIZE as u32, - xlp_xlog_blcksz: XLOG_BLCKSZ as u32, - }; - let header_bytes = long_page.encode()?; - assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_LONG_PHD); - state.write(curr_ptr.0, &header_bytes); - curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64); - } else { - let header_bytes = new_page.encode()?; - assert!(header_bytes.len() == XLOG_SIZE_OF_XLOG_SHORT_PHD); - state.write(curr_ptr.0, &header_bytes); - curr_ptr = Lsn(curr_ptr.0 + header_bytes.len() as u64); - } - freespace = insert_freespace(curr_ptr); - } - - assert!( - curr_ptr.segment_offset(WAL_SEGMENT_SIZE) >= XLOG_SIZE_OF_XLOG_SHORT_PHD - || rdata.is_empty() - ); - state.write(curr_ptr.0, rdata); - curr_ptr = Lsn(curr_ptr.0 + rdata.len() as u64); - written += rdata.len(); - freespace -= rdata.len(); - } - - assert!(written == total_len as usize); - curr_ptr.0 = maxalign(curr_ptr.0); - assert!(curr_ptr == end); - Ok(()) -} - -fn maxalign(size: T) -> T -where - T: std::ops::BitAnd - + std::ops::Add - + std::ops::Not - + From, -{ - (size + T::from(7)) & !T::from(7) -} - -fn insert_freespace(ptr: Lsn) -> usize { - if ptr.block_offset() == 0 { - 0 - } else { - (XLOG_BLCKSZ as u64 - ptr.block_offset()) as usize - } -} - -const XLP_BKP_REMOVABLE: u16 = 0x0004; -const USABLE_BYTES_IN_PAGE: u64 = (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64; -const USABLE_BYTES_IN_SEGMENT: u64 = ((WAL_SEGMENT_SIZE / XLOG_BLCKSZ) as u64 - * USABLE_BYTES_IN_PAGE) - - (XLOG_SIZE_OF_XLOG_RECORD - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64; - -fn bytepos_to_recptr(bytepos: u64) -> Lsn { - let fullsegs = bytepos / USABLE_BYTES_IN_SEGMENT; - let mut bytesleft = bytepos % USABLE_BYTES_IN_SEGMENT; - - let seg_offset = if bytesleft < (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64 { - // fits on first page of segment - bytesleft + XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 - } else { - // account for the first page on segment with long header - bytesleft -= (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64; - let fullpages = bytesleft / USABLE_BYTES_IN_PAGE; - bytesleft %= USABLE_BYTES_IN_PAGE; - - XLOG_BLCKSZ as u64 - + fullpages * XLOG_BLCKSZ as u64 - + bytesleft - + XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 - }; - - Lsn(XLogSegNoOffsetToRecPtr( - fullsegs, - seg_offset as u32, - WAL_SEGMENT_SIZE, - )) -} - -fn recptr_to_bytepos(ptr: Lsn) -> u64 { - let fullsegs = ptr.segment_number(WAL_SEGMENT_SIZE); - let offset = ptr.segment_offset(WAL_SEGMENT_SIZE) as u64; - - let fullpages = offset / XLOG_BLCKSZ as u64; - let offset = offset % XLOG_BLCKSZ as u64; - - if fullpages == 0 { - fullsegs * USABLE_BYTES_IN_SEGMENT - + if offset > 0 { - assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64); - offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 - } else { - 0 - } - } else { - fullsegs * USABLE_BYTES_IN_SEGMENT - + (XLOG_BLCKSZ - XLOG_SIZE_OF_XLOG_SHORT_PHD) as u64 - + (fullpages - 1) * USABLE_BYTES_IN_PAGE - + if offset > 0 { - assert!(offset >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64); - offset - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 - } else { - 0 - } + /// Inserts a logical record in the WAL at the current LSN. + pub fn insert_logical_message(&mut self, prefix: &CStr, msg: &[u8]) { + let record = self.wal_generator.generate_logical_message(prefix, msg); + self.disk.write(self.internal_available_lsn.into(), &record); + self.prev_lsn = self.internal_available_lsn; + self.internal_available_lsn += record.len() as u64; } }