diff --git a/libs/walproposer/libpqwalproposer.c b/libs/walproposer/libpqwalproposer.c index 37ab1e049a..5a50e51d6b 100644 --- a/libs/walproposer/libpqwalproposer.c +++ b/libs/walproposer/libpqwalproposer.c @@ -3,6 +3,7 @@ #include "walproposer.h" #include "rust_bindings.h" #include "replication/message.h" +#include "access/xlog_internal.h" // defined in walproposer.h uint64 sim_redo_start_lsn; @@ -169,6 +170,91 @@ sim_start_replication(XLogRecPtr startptr) } } +#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD) + +static int UsableBytesInSegment = + (DEFAULT_XLOG_SEG_SIZE / XLOG_BLCKSZ * UsableBytesInPage) - + (SizeOfXLogLongPHD - SizeOfXLogShortPHD); + +/* + * Converts a "usable byte position" to XLogRecPtr. A usable byte position + * is the position starting from the beginning of WAL, excluding all WAL + * page headers. + */ +static XLogRecPtr +XLogBytePosToRecPtr(uint64 bytepos) +{ + uint64 fullsegs; + uint64 fullpages; + uint64 bytesleft; + uint32 seg_offset; + XLogRecPtr result; + + fullsegs = bytepos / UsableBytesInSegment; + bytesleft = bytepos % UsableBytesInSegment; + + if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD) + { + /* fits on first page of segment */ + seg_offset = bytesleft + SizeOfXLogLongPHD; + } + else + { + /* account for the first page on segment with long header */ + seg_offset = XLOG_BLCKSZ; + bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD; + + fullpages = bytesleft / UsableBytesInPage; + bytesleft = bytesleft % UsableBytesInPage; + + seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD; + } + + XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, wal_segment_size, result); + + return result; +} + +/* + * Convert an XLogRecPtr to a "usable byte position". + */ +static uint64 +XLogRecPtrToBytePos(XLogRecPtr ptr) +{ + uint64 fullsegs; + uint32 fullpages; + uint32 offset; + uint64 result; + + XLByteToSeg(ptr, fullsegs, wal_segment_size); + + fullpages = (XLogSegmentOffset(ptr, wal_segment_size)) / XLOG_BLCKSZ; + offset = ptr % XLOG_BLCKSZ; + + if (fullpages == 0) + { + result = fullsegs * UsableBytesInSegment; + if (offset > 0) + { + Assert(offset >= SizeOfXLogLongPHD); + result += offset - SizeOfXLogLongPHD; + } + } + else + { + result = fullsegs * UsableBytesInSegment + + (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */ + (fullpages - 1) * UsableBytesInPage; /* full pages */ + if (offset > 0) + { + Assert(offset >= SizeOfXLogShortPHD); + result += offset - SizeOfXLogShortPHD; + } + } + + return result; +} + #define max_rdatas 16 void InitMyInsert(); @@ -231,6 +317,8 @@ MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags) int size; XLogRecPtr StartPos; XLogRecPtr EndPos; + uint64 startbytepos; + uint64 endbytepos; /* * Note: this function can be called multiple times for the same record. @@ -305,11 +393,17 @@ MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags) /* All (non xlog-switch) records should contain data. */ Assert(size > SizeOfXLogRecord); + startbytepos = XLogRecPtrToBytePos(CurrBytePos); + endbytepos = startbytepos + size; + // Get the position. - StartPos = CurrBytePos; - EndPos = StartPos + size; + StartPos = XLogBytePosToRecPtr(startbytepos); + EndPos = XLogBytePosToRecPtr(startbytepos + size); rechdr->xl_prev = PrevBytePos; + Assert(XLogRecPtrToBytePos(StartPos) == startbytepos); + Assert(XLogRecPtrToBytePos(EndPos) == endbytepos); + // Update global pointers. CurrBytePos = EndPos; PrevBytePos = StartPos; @@ -329,25 +423,92 @@ MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags) return EndPos; } +#define INSERT_FREESPACE(endptr) \ + (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ)) + static void MyCopyXLogRecordToWAL(int write_len, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos) { XLogRecPtr CurrPos; int written; + int freespace; // Write hdr_rdt and `num_rdatas` other datas. CurrPos = StartPos; + freespace = INSERT_FREESPACE(CurrPos); + written = 0; + + Assert(freespace >= sizeof(uint32)); while (rdata != NULL) { char *rdata_data = rdata->data; int rdata_len = rdata->len; - // Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0); + while (rdata_len >= freespace) + { + char header_buf[SizeOfXLogLongPHD]; + XLogPageHeader NewPage = (XLogPageHeader) header_buf; + + Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0); + XLogWalPropWrite(rdata_data, freespace, CurrPos); + rdata_data += freespace; + rdata_len -= freespace; + written += freespace; + CurrPos += freespace; + // Init new page + MemSet(header_buf, 0, SizeOfXLogLongPHD); + + /* + * Fill the new page's header + */ + NewPage->xlp_magic = XLOG_PAGE_MAGIC; + + /* NewPage->xlp_info = 0; */ /* done by memset */ + NewPage->xlp_tli = 1; + NewPage->xlp_pageaddr = CurrPos; + + /* NewPage->xlp_rem_len = 0; */ /* done by memset */ + NewPage->xlp_info |= XLP_BKP_REMOVABLE; + + /* + * If first page of an XLOG segment file, make it a long header. + */ + if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0) + { + XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; + + NewLongPage->xlp_sysid = 0; + NewLongPage->xlp_seg_size = wal_segment_size; + NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ; + NewPage->xlp_info |= XLP_LONG_HEADER; + } + + NewPage->xlp_rem_len = write_len - written; + if (NewPage->xlp_rem_len > 0) { + NewPage->xlp_info |= XLP_FIRST_IS_CONTRECORD; + } + + /* skip over the page header */ + if (XLogSegmentOffset(CurrPos, wal_segment_size) == 0) + { + XLogWalPropWrite(header_buf, SizeOfXLogLongPHD, CurrPos); + CurrPos += SizeOfXLogLongPHD; + } + else + { + XLogWalPropWrite(header_buf, SizeOfXLogShortPHD, CurrPos); + CurrPos += SizeOfXLogShortPHD; + } + freespace = INSERT_FREESPACE(CurrPos); + } + + Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0); XLogWalPropWrite(rdata_data, rdata_len, CurrPos); CurrPos += rdata_len; written += rdata_len; + freespace -= rdata_len; rdata = rdata->next; } diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index f6a28122ab..c263b5bd45 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -158,6 +158,7 @@ fn test_simple_schedule() -> anyhow::Result<()> { #[test] fn test_many_tx() -> anyhow::Result<()> { + enable_debug(); let clock = init_logger(); let mut config = TestConfig::new(Some(clock)); let test = config.start(1337);