diff --git a/libs/walproposer/rust_bindings.h b/libs/walproposer/rust_bindings.h index 57e18f9dfb..700ddca607 100644 --- a/libs/walproposer/rust_bindings.h +++ b/libs/walproposer/rust_bindings.h @@ -73,6 +73,8 @@ void sim_exit(int32_t code, const uint8_t *msg); void sim_set_result(int32_t code, const uint8_t *msg); +void sim_log_event(const int8_t *msg); + /** * Get tag of the current message. */ diff --git a/libs/walproposer/src/sim.rs b/libs/walproposer/src/sim.rs index 4770a97fe9..98b10637d7 100644 --- a/libs/walproposer/src/sim.rs +++ b/libs/walproposer/src/sim.rs @@ -230,3 +230,11 @@ pub extern "C" fn sim_set_result(code: i32, msg: *const u8) { debug!("sim_set_result({}, {:?})", code, msg); os().set_result(code, msg); } + +#[no_mangle] +pub extern "C" fn sim_log_event(msg: *const i8) { + let msg = unsafe { CStr::from_ptr(msg) }; + let msg = msg.to_string_lossy().into_owned(); + debug!("sim_log_event({:?})", msg); + os().log_event(msg); +} diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index ca7b316f8a..f6a28122ab 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -156,6 +156,42 @@ fn test_simple_schedule() -> anyhow::Result<()> { Ok(()) } +#[test] +fn test_many_tx() -> anyhow::Result<()> { + let clock = init_logger(); + let mut config = TestConfig::new(Some(clock)); + let test = config.start(1337); + + let mut schedule: Schedule = vec![]; + for i in 0..100 { + schedule.push((i * 10, TestAction::WriteTx(10))); + } + + test.run_schedule(&schedule)?; + info!("Test finished, stopping all threads"); + test.world.stop_all(); + + let events = test.world.take_events(); + info!("Events: {:?}", events); + let last_commit_lsn = events + .iter() + .filter_map(|event| { + if event.data.starts_with("commit_lsn;") { + let lsn: u64 = event.data.split(';').nth(1).unwrap().parse().unwrap(); + return Some(lsn); + } + None + }) + .last() + .unwrap(); + + let initdb_lsn = 21623024; + let diff = last_commit_lsn - initdb_lsn; + info!("Last commit lsn: {}, diff: {}", last_commit_lsn, diff); + assert!(diff > 1000 * 8); + Ok(()) +} + #[test] fn test_random_schedules() -> anyhow::Result<()> { let clock = init_logger(); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index ae1587ca34..d68332555b 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1553,6 +1553,8 @@ DetermineEpochStartLsn(void) safekeeper[donor].host, safekeeper[donor].port, LSN_FORMAT_ARGS(truncateLsn)); + sim_log("prop_elected;%lu", propEpochStartLsn); + /* * Ensure the basebackup we are running (at RedoStartLsn) matches LSN * since which we are going to write according to the consensus. If not, @@ -2129,6 +2131,7 @@ RecvAppendResponses(Safekeeper *sk) minQuorumLsn = GetAcknowledgedByQuorumWALPosition(); if (minQuorumLsn > lastSentCommitLsn) { + sim_log("commit_lsn;%lu", minQuorumLsn); BroadcastAppendRequest(); lastSentCommitLsn = minQuorumLsn; } diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index dea0f06e0d..5aadc7a87d 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -23,6 +23,12 @@ } while (0) #define exit(code) sim_exit(code, "exit()") + +#define sim_log(fmt, ...) do { \ + char buf[1024]; \ + snprintf(buf, sizeof(buf), fmt, ##__VA_ARGS__); \ + sim_log_event(buf); \ +} while (0) #else #define walprop_log(tag, fmt, ...) ereport(tag, \ (errmsg(WALPROPOSER_TAG fmt, ##__VA_ARGS__), \ diff --git a/safekeeper/src/simlib/node_os.rs b/safekeeper/src/simlib/node_os.rs index 1e49ca81f1..b05704d81a 100644 --- a/safekeeper/src/simlib/node_os.rs +++ b/safekeeper/src/simlib/node_os.rs @@ -164,4 +164,8 @@ impl NodeOs { pub fn set_result(&self, code: i32, result: String) { *self.internal.result.lock() = (code, result); } + + pub fn log_event(&self, data: String) { + self.world.add_event(self.id(), data) + } } diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index f679c8410a..4472a97c12 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -44,6 +44,9 @@ pub struct World { /// Optional function to initialize nodes right after thread creation. nodes_init: Option>, + + /// Internal event log. + events: Mutex>, } impl World { @@ -61,6 +64,7 @@ impl World { connection_counter: AtomicU64::new(0), network_options, nodes_init, + events: Mutex::new(Vec::new()), } } @@ -263,6 +267,18 @@ impl World { } Some(parking.swap_remove(found?)) } + + pub fn add_event(&self, node: NodeId, data: String) { + let time = self.now(); + self.events.lock().push(SEvent { time, node, data }); + } + + pub fn take_events(&self) -> Vec { + let mut events = self.events.lock(); + let mut res = Vec::new(); + std::mem::swap(&mut res, &mut events); + res + } } thread_local! { @@ -464,3 +480,10 @@ pub enum NodeEvent { WakeTimeout(u64), // TODO: close? } + +#[derive(Debug)] +pub struct SEvent { + pub time: u64, + pub node: NodeId, + pub data: String, +}