Add test for 1000 WAL messages

This commit is contained in:
Arthur Petukhovsky
2023-09-16 14:58:26 +00:00
parent 0dc262a84a
commit eb2886b401
7 changed files with 82 additions and 0 deletions

View File

@@ -73,6 +73,8 @@ void sim_exit(int32_t code, const uint8_t *msg);
void sim_set_result(int32_t code, const uint8_t *msg);
void sim_log_event(const int8_t *msg);
/**
* Get tag of the current message.
*/

View File

@@ -230,3 +230,11 @@ pub extern "C" fn sim_set_result(code: i32, msg: *const u8) {
debug!("sim_set_result({}, {:?})", code, msg);
os().set_result(code, msg);
}
#[no_mangle]
pub extern "C" fn sim_log_event(msg: *const i8) {
let msg = unsafe { CStr::from_ptr(msg) };
let msg = msg.to_string_lossy().into_owned();
debug!("sim_log_event({:?})", msg);
os().log_event(msg);
}

View File

@@ -156,6 +156,42 @@ fn test_simple_schedule() -> anyhow::Result<()> {
Ok(())
}
#[test]
fn test_many_tx() -> anyhow::Result<()> {
let clock = init_logger();
let mut config = TestConfig::new(Some(clock));
let test = config.start(1337);
let mut schedule: Schedule = vec![];
for i in 0..100 {
schedule.push((i * 10, TestAction::WriteTx(10)));
}
test.run_schedule(&schedule)?;
info!("Test finished, stopping all threads");
test.world.stop_all();
let events = test.world.take_events();
info!("Events: {:?}", events);
let last_commit_lsn = events
.iter()
.filter_map(|event| {
if event.data.starts_with("commit_lsn;") {
let lsn: u64 = event.data.split(';').nth(1).unwrap().parse().unwrap();
return Some(lsn);
}
None
})
.last()
.unwrap();
let initdb_lsn = 21623024;
let diff = last_commit_lsn - initdb_lsn;
info!("Last commit lsn: {}, diff: {}", last_commit_lsn, diff);
assert!(diff > 1000 * 8);
Ok(())
}
#[test]
fn test_random_schedules() -> anyhow::Result<()> {
let clock = init_logger();

View File

@@ -1553,6 +1553,8 @@ DetermineEpochStartLsn(void)
safekeeper[donor].host, safekeeper[donor].port,
LSN_FORMAT_ARGS(truncateLsn));
sim_log("prop_elected;%lu", propEpochStartLsn);
/*
* Ensure the basebackup we are running (at RedoStartLsn) matches LSN
* since which we are going to write according to the consensus. If not,
@@ -2129,6 +2131,7 @@ RecvAppendResponses(Safekeeper *sk)
minQuorumLsn = GetAcknowledgedByQuorumWALPosition();
if (minQuorumLsn > lastSentCommitLsn)
{
sim_log("commit_lsn;%lu", minQuorumLsn);
BroadcastAppendRequest();
lastSentCommitLsn = minQuorumLsn;
}

View File

@@ -23,6 +23,12 @@
} while (0)
#define exit(code) sim_exit(code, "exit()")
#define sim_log(fmt, ...) do { \
char buf[1024]; \
snprintf(buf, sizeof(buf), fmt, ##__VA_ARGS__); \
sim_log_event(buf); \
} while (0)
#else
#define walprop_log(tag, fmt, ...) ereport(tag, \
(errmsg(WALPROPOSER_TAG fmt, ##__VA_ARGS__), \

View File

@@ -164,4 +164,8 @@ impl NodeOs {
pub fn set_result(&self, code: i32, result: String) {
*self.internal.result.lock() = (code, result);
}
pub fn log_event(&self, data: String) {
self.world.add_event(self.id(), data)
}
}

View File

@@ -44,6 +44,9 @@ pub struct World {
/// Optional function to initialize nodes right after thread creation.
nodes_init: Option<Box<dyn Fn(NodeOs) + Send + Sync>>,
/// Internal event log.
events: Mutex<Vec<SEvent>>,
}
impl World {
@@ -61,6 +64,7 @@ impl World {
connection_counter: AtomicU64::new(0),
network_options,
nodes_init,
events: Mutex::new(Vec::new()),
}
}
@@ -263,6 +267,18 @@ impl World {
}
Some(parking.swap_remove(found?))
}
pub fn add_event(&self, node: NodeId, data: String) {
let time = self.now();
self.events.lock().push(SEvent { time, node, data });
}
pub fn take_events(&self) -> Vec<SEvent> {
let mut events = self.events.lock();
let mut res = Vec::new();
std::mem::swap(&mut res, &mut events);
res
}
}
thread_local! {
@@ -464,3 +480,10 @@ pub enum NodeEvent {
WakeTimeout(u64),
// TODO: close?
}
#[derive(Debug)]
pub struct SEvent {
pub time: u64,
pub node: NodeId,
pub data: String,
}