Generate WAL in tests

This commit is contained in:
Arthur Petukhovsky
2023-08-03 16:58:41 +00:00
parent cb6a8d3fe3
commit 7f36028fab
9 changed files with 249 additions and 9 deletions

2
.gitignore vendored
View File

@@ -18,3 +18,5 @@ test_output/
*.o
*.so
*.Po
tmp

View File

@@ -24,3 +24,5 @@ void WalProposerCleanup();
// Initialize global variables before calling any Postgres C code.
void MyContextInit();
XLogRecPtr MyInsertRecord();

View File

@@ -109,6 +109,7 @@ fn main() -> anyhow::Result<()> {
.allowlist_function("WalProposerRust")
.allowlist_function("MyContextInit")
.allowlist_function("WalProposerCleanup")
.allowlist_function("MyInsertRecord")
.allowlist_var("wal_acceptors_list")
.allowlist_var("wal_acceptor_reconnect_timeout")
.allowlist_var("wal_acceptor_connection_timeout")

View File

@@ -6,7 +6,7 @@ cd /home/admin/simulator/libs/walproposer
# TODO: rewrite to Makefile
make -C ../.. neon-pg-ext-walproposer
# make -C ../../pg_install/build/v15/src/backend postgres-lib -s
make -C ../../pg_install/build/v15/src/backend postgres-lib -s
cp ../../pg_install/build/v15/src/backend/libpostgres.a .
cp ../../pg_install/build/v15/src/common/libpgcommon_srv.a .
cp ../../pg_install/build/v15/src/port/libpgport_srv.a .

View File

@@ -2,6 +2,7 @@
#include "neon.h"
#include "walproposer.h"
#include "rust_bindings.h"
#include "replication/message.h"
// defined in walproposer.h
uint64 sim_redo_start_lsn;
@@ -167,3 +168,214 @@ sim_start_replication(XLogRecPtr startptr)
WalProposerPoll();
}
}
#define max_rdatas 16
void InitMyInsert();
static void MyBeginInsert();
static void MyRegisterData(char *data, int len);
static XLogRecPtr MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags);
static void MyCopyXLogRecordToWAL(int write_len, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos);
/*
* An array of XLogRecData structs, to hold registered data.
*/
static XLogRecData rdatas[max_rdatas];
static int num_rdatas; /* entries currently used */
static uint32 mainrdata_len; /* total # of bytes in chain */
static XLogRecData hdr_rdt;
static char hdr_scratch[16000];
static XLogRecPtr CurrBytePos;
static XLogRecPtr PrevBytePos;
void InitMyInsert()
{
CurrBytePos = sim_redo_start_lsn;
PrevBytePos = InvalidXLogRecPtr;
}
static void MyBeginInsert()
{
num_rdatas = 0;
mainrdata_len = 0;
}
static void MyRegisterData(char *data, int len)
{
XLogRecData *rdata;
if (num_rdatas >= max_rdatas)
walprop_log(ERROR, "too much WAL data");
rdata = &rdatas[num_rdatas++];
rdata->data = data;
rdata->len = len;
rdata->next = NULL;
if (num_rdatas > 1) {
rdatas[num_rdatas - 2].next = rdata;
}
mainrdata_len += len;
}
static XLogRecPtr
MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags)
{
XLogRecData *rdt;
uint32 total_len = 0;
int block_id;
pg_crc32c rdata_crc;
XLogRecord *rechdr;
char *scratch = hdr_scratch;
int size;
XLogRecPtr StartPos;
XLogRecPtr EndPos;
/*
* Note: this function can be called multiple times for the same record.
* All the modifications we do to the rdata chains below must handle that.
*/
/* The record begins with the fixed-size header */
rechdr = (XLogRecord *) scratch;
scratch += SizeOfXLogRecord;
hdr_rdt.data = hdr_scratch;
if (num_rdatas > 0)
{
hdr_rdt.next = &rdatas[0];
}
else
{
hdr_rdt.next = NULL;
}
/* followed by main data, if any */
if (mainrdata_len > 0)
{
if (mainrdata_len > 255)
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
memcpy(scratch, &mainrdata_len, sizeof(uint32));
scratch += sizeof(uint32);
}
else
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
*(scratch++) = (uint8) mainrdata_len;
}
total_len += mainrdata_len;
}
hdr_rdt.len = (scratch - hdr_scratch);
total_len += hdr_rdt.len;
/*
* Calculate CRC of the data
*
* Note that the record header isn't added into the CRC initially since we
* don't know the prev-link yet. Thus, the CRC will represent the CRC of
* the whole record in the order: rdata, then backup blocks, then record
* header.
*/
INIT_CRC32C(rdata_crc);
COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
for (size_t i = 0; i < num_rdatas; i++)
{
rdt = &rdatas[i];
COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
}
/*
* Fill in the fields in the record header. Prev-link is filled in later,
* once we know where in the WAL the record will be inserted. The CRC does
* not include the record header yet.
*/
rechdr->xl_xid = 0;
rechdr->xl_tot_len = total_len;
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
rechdr->xl_crc = rdata_crc;
size = MAXALIGN(rechdr->xl_tot_len);
/* All (non xlog-switch) records should contain data. */
Assert(size > SizeOfXLogRecord);
// Get the position.
StartPos = CurrBytePos;
EndPos = StartPos + size;
rechdr->xl_prev = PrevBytePos;
// Update global pointers.
CurrBytePos = EndPos;
PrevBytePos = StartPos;
/*
* Now that xl_prev has been filled in, calculate CRC of the record
* header.
*/
rdata_crc = rechdr->xl_crc;
COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rdata_crc);
rechdr->xl_crc = rdata_crc;
// Now write it to disk.
MyCopyXLogRecordToWAL(rechdr->xl_tot_len, &hdr_rdt, StartPos, EndPos);
return EndPos;
}
static void
MyCopyXLogRecordToWAL(int write_len, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos)
{
XLogRecPtr CurrPos;
int written;
// Write hdr_rdt and `num_rdatas` other datas.
CurrPos = StartPos;
while (rdata != NULL)
{
char *rdata_data = rdata->data;
int rdata_len = rdata->len;
// Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0);
XLogWalPropWrite(rdata_data, rdata_len, CurrPos);
CurrPos += rdata_len;
written += rdata_len;
rdata = rdata->next;
}
Assert(written == write_len);
CurrPos = MAXALIGN64(CurrPos);
Assert(CurrPos == EndPos);
}
XLogRecPtr MyInsertRecord()
{
const char *prefix = "prefix";
const char *message = "message";
size_t size = 7;
bool transactional = false;
xl_logical_message xlrec;
xlrec.dbId = 0;
xlrec.transactional = transactional;
/* trailing zero is critical; see logicalmsg_desc */
xlrec.prefix_size = strlen(prefix) + 1;
xlrec.message_size = size;
MyBeginInsert();
MyRegisterData((char *) &xlrec, SizeOfLogicalMessage);
MyRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
MyRegisterData(unconstify(char *, message), size);
return MyFinishInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLOG_INCLUDE_ORIGIN);
}

View File

@@ -9,3 +9,4 @@ neon.tenant_id=cc6e67313d57283bad411600fbf5c142
neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e
synchronous_standby_names=walproposer
neon.safekeepers='node:1,node:2,node:3'
max_connections=100

View File

@@ -2,15 +2,15 @@ use std::{ffi::CString, str::FromStr, sync::Arc};
use safekeeper::simlib::{
network::{Delay, NetworkOptions},
world::Node,
world::World,
world::{Node, NodeEvent},
world::World, proto::AnyMessage,
};
use utils::{id::TenantTimelineId, logging, lsn::Lsn};
use crate::{
bindings::{
neon_tenant_walproposer, neon_timeline_walproposer, wal_acceptor_connection_timeout,
wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers, sim_redo_start_lsn,
wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers, sim_redo_start_lsn, MyInsertRecord,
},
c_context,
simtest::safekeeper::run_server,
@@ -179,9 +179,13 @@ struct WalProposer {
impl WalProposer {
fn gen_wal_record(&self) -> Lsn {
// TODO:
let new_ptr = unsafe { MyInsertRecord() };
panic!("implement me")
self.node.network_chan().send(NodeEvent::Internal(
AnyMessage::LSN(new_ptr as u64),
));
return Lsn(new_ptr as u64);
}
}
@@ -216,7 +220,10 @@ fn run_walproposer_generate_wal() {
let wp = test.launch_walproposer(lsn);
// let rec1 = wp.gen_wal_record();
test.poll_for_duration(3000);
test.poll_for_duration(30);
// TODO:
for i in 0..100 {
wp.gen_wal_record();
test.poll_for_duration(5);
}
}

View File

@@ -6,6 +6,8 @@
#include "postgres.h"
#include "utils/memutils.h"
#include "utils/guc.h"
#include "miscadmin.h"
#include "common/pg_prng.h"
// From src/backend/main/main.c
const char *progname = "fakepostgres";
@@ -65,6 +67,7 @@ void MyContextInit() {
if (!initializedMemoryContext) {
initializedMemoryContext = true;
MemoryContextInit();
pg_prng_seed(&pg_global_prng_state, 0);
setenv("PGDATA", "/home/admin/simulator/libs/walproposer/pgdata", 1);
@@ -78,6 +81,13 @@ void MyContextInit() {
exit(1);
log_min_messages = DEBUG5;
InitializeMaxBackends();
ChangeToDataDir();
CreateSharedMemoryAndSemaphores();
SetInstallXLogFileSegmentActive();
// CreateAuxProcessResourceOwner();
// StartupXLOG();
}
pthread_mutex_unlock(&lock);
}

View File

@@ -349,11 +349,16 @@ void WalProposerCleanup()
}
}
void InitMyInsert();
void WalProposerRust()
{
struct stat stat_buf;
walprop_log(LOG, "WalProposerRust");
InitMyInsert();
#if PG_VERSION_NUM < 150000
ThisTimeLineID = 1;
#endif
@@ -479,7 +484,7 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events)
}
}
walprop_log(FATAL, "unknown tcp connection");
} else if (event.tag == Message && event.any_message == LSN) {
} else if (event.tag == Internal && event.any_message == LSN) {
sim_epoll_rcv(0);
sim_msg_get_lsn(&sim_latest_available_lsn);
*occurred_events = (WaitEvent) {