diff --git a/.gitignore b/.gitignore index f1afdee599..e23c926834 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ test_output/ *.o *.so *.Po + +tmp diff --git a/libs/walproposer/bindgen_deps.h b/libs/walproposer/bindgen_deps.h index e53cc2c0d7..009298d93e 100644 --- a/libs/walproposer/bindgen_deps.h +++ b/libs/walproposer/bindgen_deps.h @@ -24,3 +24,5 @@ void WalProposerCleanup(); // Initialize global variables before calling any Postgres C code. void MyContextInit(); + +XLogRecPtr MyInsertRecord(); diff --git a/libs/walproposer/build.rs b/libs/walproposer/build.rs index 8a5209f85c..8b8dcf04fd 100644 --- a/libs/walproposer/build.rs +++ b/libs/walproposer/build.rs @@ -109,6 +109,7 @@ fn main() -> anyhow::Result<()> { .allowlist_function("WalProposerRust") .allowlist_function("MyContextInit") .allowlist_function("WalProposerCleanup") + .allowlist_function("MyInsertRecord") .allowlist_var("wal_acceptors_list") .allowlist_var("wal_acceptor_reconnect_timeout") .allowlist_var("wal_acceptor_connection_timeout") diff --git a/libs/walproposer/build.sh b/libs/walproposer/build.sh index 08b2f29583..9833a2c812 100755 --- a/libs/walproposer/build.sh +++ b/libs/walproposer/build.sh @@ -6,7 +6,7 @@ cd /home/admin/simulator/libs/walproposer # TODO: rewrite to Makefile make -C ../.. neon-pg-ext-walproposer -# make -C ../../pg_install/build/v15/src/backend postgres-lib -s +make -C ../../pg_install/build/v15/src/backend postgres-lib -s cp ../../pg_install/build/v15/src/backend/libpostgres.a . cp ../../pg_install/build/v15/src/common/libpgcommon_srv.a . cp ../../pg_install/build/v15/src/port/libpgport_srv.a . diff --git a/libs/walproposer/libpqwalproposer.c b/libs/walproposer/libpqwalproposer.c index 602b62e850..2084f66754 100644 --- a/libs/walproposer/libpqwalproposer.c +++ b/libs/walproposer/libpqwalproposer.c @@ -2,6 +2,7 @@ #include "neon.h" #include "walproposer.h" #include "rust_bindings.h" +#include "replication/message.h" // defined in walproposer.h uint64 sim_redo_start_lsn; @@ -167,3 +168,214 @@ sim_start_replication(XLogRecPtr startptr) WalProposerPoll(); } } + +#define max_rdatas 16 + +void InitMyInsert(); +static void MyBeginInsert(); +static void MyRegisterData(char *data, int len); +static XLogRecPtr MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags); +static void MyCopyXLogRecordToWAL(int write_len, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos); + +/* + * An array of XLogRecData structs, to hold registered data. + */ +static XLogRecData rdatas[max_rdatas]; +static int num_rdatas; /* entries currently used */ +static uint32 mainrdata_len; /* total # of bytes in chain */ +static XLogRecData hdr_rdt; +static char hdr_scratch[16000]; +static XLogRecPtr CurrBytePos; +static XLogRecPtr PrevBytePos; + +void InitMyInsert() +{ + CurrBytePos = sim_redo_start_lsn; + PrevBytePos = InvalidXLogRecPtr; +} + +static void MyBeginInsert() +{ + num_rdatas = 0; + mainrdata_len = 0; +} + +static void MyRegisterData(char *data, int len) +{ + XLogRecData *rdata; + + if (num_rdatas >= max_rdatas) + walprop_log(ERROR, "too much WAL data"); + rdata = &rdatas[num_rdatas++]; + + rdata->data = data; + rdata->len = len; + rdata->next = NULL; + + if (num_rdatas > 1) { + rdatas[num_rdatas - 2].next = rdata; + } + + mainrdata_len += len; +} + +static XLogRecPtr +MyFinishInsert(RmgrId rmid, uint8 info, uint8 flags) +{ + XLogRecData *rdt; + uint32 total_len = 0; + int block_id; + pg_crc32c rdata_crc; + XLogRecord *rechdr; + char *scratch = hdr_scratch; + int size; + XLogRecPtr StartPos; + XLogRecPtr EndPos; + + /* + * Note: this function can be called multiple times for the same record. + * All the modifications we do to the rdata chains below must handle that. + */ + + /* The record begins with the fixed-size header */ + rechdr = (XLogRecord *) scratch; + scratch += SizeOfXLogRecord; + + hdr_rdt.data = hdr_scratch; + + if (num_rdatas > 0) + { + hdr_rdt.next = &rdatas[0]; + } + else + { + hdr_rdt.next = NULL; + } + + /* followed by main data, if any */ + if (mainrdata_len > 0) + { + if (mainrdata_len > 255) + { + *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG; + memcpy(scratch, &mainrdata_len, sizeof(uint32)); + scratch += sizeof(uint32); + } + else + { + *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT; + *(scratch++) = (uint8) mainrdata_len; + } + total_len += mainrdata_len; + } + + hdr_rdt.len = (scratch - hdr_scratch); + total_len += hdr_rdt.len; + + /* + * Calculate CRC of the data + * + * Note that the record header isn't added into the CRC initially since we + * don't know the prev-link yet. Thus, the CRC will represent the CRC of + * the whole record in the order: rdata, then backup blocks, then record + * header. + */ + INIT_CRC32C(rdata_crc); + COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); + for (size_t i = 0; i < num_rdatas; i++) + { + rdt = &rdatas[i]; + COMP_CRC32C(rdata_crc, rdt->data, rdt->len); + } + + /* + * Fill in the fields in the record header. Prev-link is filled in later, + * once we know where in the WAL the record will be inserted. The CRC does + * not include the record header yet. + */ + rechdr->xl_xid = 0; + rechdr->xl_tot_len = total_len; + rechdr->xl_info = info; + rechdr->xl_rmid = rmid; + rechdr->xl_prev = InvalidXLogRecPtr; + rechdr->xl_crc = rdata_crc; + + size = MAXALIGN(rechdr->xl_tot_len); + + /* All (non xlog-switch) records should contain data. */ + Assert(size > SizeOfXLogRecord); + + // Get the position. + StartPos = CurrBytePos; + EndPos = StartPos + size; + rechdr->xl_prev = PrevBytePos; + + // Update global pointers. + CurrBytePos = EndPos; + PrevBytePos = StartPos; + + /* + * Now that xl_prev has been filled in, calculate CRC of the record + * header. + */ + rdata_crc = rechdr->xl_crc; + COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc)); + FIN_CRC32C(rdata_crc); + rechdr->xl_crc = rdata_crc; + + // Now write it to disk. + MyCopyXLogRecordToWAL(rechdr->xl_tot_len, &hdr_rdt, StartPos, EndPos); + + return EndPos; +} + +static void +MyCopyXLogRecordToWAL(int write_len, XLogRecData *rdata, XLogRecPtr StartPos, XLogRecPtr EndPos) +{ + XLogRecPtr CurrPos; + int written; + + // Write hdr_rdt and `num_rdatas` other datas. + CurrPos = StartPos; + + while (rdata != NULL) + { + char *rdata_data = rdata->data; + int rdata_len = rdata->len; + + // Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0); + + XLogWalPropWrite(rdata_data, rdata_len, CurrPos); + CurrPos += rdata_len; + written += rdata_len; + + rdata = rdata->next; + } + + Assert(written == write_len); + CurrPos = MAXALIGN64(CurrPos); + Assert(CurrPos == EndPos); +} + +XLogRecPtr MyInsertRecord() +{ + const char *prefix = "prefix"; + const char *message = "message"; + size_t size = 7; + bool transactional = false; + + xl_logical_message xlrec; + + xlrec.dbId = 0; + xlrec.transactional = transactional; + /* trailing zero is critical; see logicalmsg_desc */ + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.message_size = size; + + MyBeginInsert(); + MyRegisterData((char *) &xlrec, SizeOfLogicalMessage); + MyRegisterData(unconstify(char *, prefix), xlrec.prefix_size); + MyRegisterData(unconstify(char *, message), size); + + return MyFinishInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE, XLOG_INCLUDE_ORIGIN); +} diff --git a/libs/walproposer/pgdata/postgresql.conf b/libs/walproposer/pgdata/postgresql.conf index 36a62d5982..ae85ca2b23 100644 --- a/libs/walproposer/pgdata/postgresql.conf +++ b/libs/walproposer/pgdata/postgresql.conf @@ -9,3 +9,4 @@ neon.tenant_id=cc6e67313d57283bad411600fbf5c142 neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e synchronous_standby_names=walproposer neon.safekeepers='node:1,node:2,node:3' +max_connections=100 diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index 954ba0aa49..f891194eb8 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -2,15 +2,15 @@ use std::{ffi::CString, str::FromStr, sync::Arc}; use safekeeper::simlib::{ network::{Delay, NetworkOptions}, - world::Node, - world::World, + world::{Node, NodeEvent}, + world::World, proto::AnyMessage, }; use utils::{id::TenantTimelineId, logging, lsn::Lsn}; use crate::{ bindings::{ neon_tenant_walproposer, neon_timeline_walproposer, wal_acceptor_connection_timeout, - wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers, sim_redo_start_lsn, + wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers, sim_redo_start_lsn, MyInsertRecord, }, c_context, simtest::safekeeper::run_server, @@ -179,9 +179,13 @@ struct WalProposer { impl WalProposer { fn gen_wal_record(&self) -> Lsn { - // TODO: + let new_ptr = unsafe { MyInsertRecord() }; - panic!("implement me") + self.node.network_chan().send(NodeEvent::Internal( + AnyMessage::LSN(new_ptr as u64), + )); + + return Lsn(new_ptr as u64); } } @@ -216,7 +220,10 @@ fn run_walproposer_generate_wal() { let wp = test.launch_walproposer(lsn); // let rec1 = wp.gen_wal_record(); - test.poll_for_duration(3000); + test.poll_for_duration(30); - // TODO: + for i in 0..100 { + wp.gen_wal_record(); + test.poll_for_duration(5); + } } diff --git a/libs/walproposer/test.c b/libs/walproposer/test.c index eea11119bf..3584a0b309 100644 --- a/libs/walproposer/test.c +++ b/libs/walproposer/test.c @@ -6,6 +6,8 @@ #include "postgres.h" #include "utils/memutils.h" #include "utils/guc.h" +#include "miscadmin.h" +#include "common/pg_prng.h" // From src/backend/main/main.c const char *progname = "fakepostgres"; @@ -65,6 +67,7 @@ void MyContextInit() { if (!initializedMemoryContext) { initializedMemoryContext = true; MemoryContextInit(); + pg_prng_seed(&pg_global_prng_state, 0); setenv("PGDATA", "/home/admin/simulator/libs/walproposer/pgdata", 1); @@ -78,6 +81,13 @@ void MyContextInit() { exit(1); log_min_messages = DEBUG5; + + InitializeMaxBackends(); + ChangeToDataDir(); + CreateSharedMemoryAndSemaphores(); + SetInstallXLogFileSegmentActive(); + // CreateAuxProcessResourceOwner(); + // StartupXLOG(); } pthread_mutex_unlock(&lock); } diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7b20dbc5c2..a52c670f4c 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -349,11 +349,16 @@ void WalProposerCleanup() } } +void InitMyInsert(); + void WalProposerRust() { struct stat stat_buf; walprop_log(LOG, "WalProposerRust"); + + InitMyInsert(); + #if PG_VERSION_NUM < 150000 ThisTimeLineID = 1; #endif @@ -479,7 +484,7 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events) } } walprop_log(FATAL, "unknown tcp connection"); - } else if (event.tag == Message && event.any_message == LSN) { + } else if (event.tag == Internal && event.any_message == LSN) { sim_epoll_rcv(0); sim_msg_get_lsn(&sim_latest_available_lsn); *occurred_events = (WaitEvent) {