Fix WAL page header

This commit is contained in:
Arthur Petukhovsky
2023-09-16 20:50:10 +00:00
parent eb2886b401
commit 10ad3ae4eb
2 changed files with 165 additions and 3 deletions

View File

@@ -3,6 +3,7 @@
#include "walproposer.h"
#include "rust_bindings.h"
#include "replication/message.h"
#include "access/xlog_internal.h"
// defined in walproposer.h
uint64 sim_redo_start_lsn;
@@ -169,6 +170,91 @@ sim_start_replication(XLogRecPtr startptr)
}
}
#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
static int UsableBytesInSegment =
(DEFAULT_XLOG_SEG_SIZE / XLOG_BLCKSZ * UsableBytesInPage) -
(SizeOfXLogLongPHD - SizeOfXLogShortPHD);
/*
* Converts a "usable byte position" to XLogRecPtr. A usable byte position
* is the position starting from the beginning of WAL, excluding all WAL
* page headers.
*/
static XLogRecPtr
XLogBytePosToRecPtr(uint64 bytepos)
{
uint64 fullsegs;
uint64 fullpages;
uint64 bytesleft;
uint32 seg_offset;
XLogRecPtr result;
fullsegs = bytepos / UsableBytesInSegment;
bytesleft = bytepos % UsableBytesInSegment;
if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD)
{
/* fits on first page of segment */
seg_offset = bytesleft + SizeOfXLogLongPHD;
}
else
{
/* account for the first page on segment with long header */
seg_offset = XLOG_BLCKSZ;
bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD;
fullpages = bytesleft / UsableBytesInPage;
bytesleft = bytesleft % UsableBytesInPage;
seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD;
}
XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, wal_segment_size, result);
return result;
}
/*
* Convert an XLogRecPtr to a "usable byte position".
*/
static uint64
XLogRecPtrToBytePos(XLogRecPtr ptr)
{
uint64 fullsegs;
uint32 fullpages;
uint32 offset;
uint64 result;
XLByteToSeg(ptr, fullsegs, wal_segment_size);
fullpages = (XLogSegmentOffset(ptr, wal_segment_size)) / XLOG_BLCKSZ;
offset = ptr % XLOG_BLCKSZ;
if (fullpages == 0)
{
result = fullsegs * UsableBytesInSegment;
if (offset > 0)
{
Assert(offset >= SizeOfXLogLongPHD);
result += offset - SizeOfXLogLongPHD;
}
}
else
{
result = fullsegs * UsableBytesInSegment +
(XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */
(fullpages - 1) * UsableBytesInPage; /* full pages */
if (offset > 0)
{
Assert(offset >= SizeOfXLogShortPHD);
result += offset - SizeOfXLogShortPHD;
}
}
return result;
}
#define max_rdatas 16
void InitMyInsert();
@@ -231,6 +317,8 @@ MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags)
int size;
XLogRecPtr StartPos;
XLogRecPtr EndPos;
uint64 startbytepos;
uint64 endbytepos;
/*
* Note: this function can be called multiple times for the same record.
@@ -305,11 +393,17 @@ MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags)
/* All (non xlog-switch) records should contain data. */
Assert(size > SizeOfXLogRecord);
startbytepos = XLogRecPtrToBytePos(CurrBytePos);
endbytepos = startbytepos + size;
// Get the position.
StartPos = CurrBytePos;
EndPos = StartPos + size;
StartPos = XLogBytePosToRecPtr(startbytepos);
EndPos = XLogBytePosToRecPtr(startbytepos + size);
rechdr->xl_prev = PrevBytePos;
Assert(XLogRecPtrToBytePos(StartPos) == startbytepos);
Assert(XLogRecPtrToBytePos(EndPos) == endbytepos);
// Update global pointers.
CurrBytePos = EndPos;
PrevBytePos = StartPos;
@@ -329,25 +423,92 @@ MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags)
return EndPos;
}
#define INSERT_FREESPACE(endptr) \
(((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ))
static void
MyCopyXLogRecordToWAL(int write_len, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos)
{
XLogRecPtr CurrPos;
int written;
int freespace;
// Write hdr_rdt and `num_rdatas` other datas.
CurrPos = StartPos;
freespace = INSERT_FREESPACE(CurrPos);
written = 0;
Assert(freespace >= sizeof(uint32));
while (rdata != NULL)
{
char *rdata_data = rdata->data;
int rdata_len = rdata->len;
// Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
while (rdata_len >= freespace)
{
char header_buf[SizeOfXLogLongPHD];
XLogPageHeader NewPage = (XLogPageHeader) header_buf;
Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0);
XLogWalPropWrite(rdata_data, freespace, CurrPos);
rdata_data += freespace;
rdata_len -= freespace;
written += freespace;
CurrPos += freespace;
// Init new page
MemSet(header_buf, 0, SizeOfXLogLongPHD);
/*
* Fill the new page's header
*/
NewPage->xlp_magic = XLOG_PAGE_MAGIC;
/* NewPage->xlp_info = 0; */ /* done by memset */
NewPage->xlp_tli = 1;
NewPage->xlp_pageaddr = CurrPos;
/* NewPage->xlp_rem_len = 0; */ /* done by memset */
NewPage->xlp_info |= XLP_BKP_REMOVABLE;
/*
* If first page of an XLOG segment file, make it a long header.
*/
if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0)
{
XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
NewLongPage->xlp_sysid = 0;
NewLongPage->xlp_seg_size = wal_segment_size;
NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
NewPage->xlp_info |= XLP_LONG_HEADER;
}
NewPage->xlp_rem_len = write_len - written;
if (NewPage->xlp_rem_len > 0) {
NewPage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
}
/* skip over the page header */
if (XLogSegmentOffset(CurrPos, wal_segment_size) == 0)
{
XLogWalPropWrite(header_buf, SizeOfXLogLongPHD, CurrPos);
CurrPos += SizeOfXLogLongPHD;
}
else
{
XLogWalPropWrite(header_buf, SizeOfXLogShortPHD, CurrPos);
CurrPos += SizeOfXLogShortPHD;
}
freespace = INSERT_FREESPACE(CurrPos);
}
Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
XLogWalPropWrite(rdata_data, rdata_len, CurrPos);
CurrPos += rdata_len;
written += rdata_len;
freespace -= rdata_len;
rdata = rdata->next;
}

View File

@@ -158,6 +158,7 @@ fn test_simple_schedule() -> anyhow::Result<()> {
#[test]
fn test_many_tx() -> anyhow::Result<()> {
enable_debug();
let clock = init_logger();
let mut config = TestConfig::new(Some(clock));
let test = config.start(1337);