Run many schedules

This commit is contained in:
Arthur Petukhovsky
2023-08-24 23:42:11 +00:00
parent f8729f046d
commit 41b9750e81
7 changed files with 103 additions and 23 deletions

View File

@@ -5,6 +5,7 @@ use std::{
collections::HashMap,
ffi::{CStr, CString},
};
use tracing::trace;
use crate::sim_proto::{anymessage_tag, AnyMessageTag, Event, EventTag, MESSAGE_BUF};
@@ -209,7 +210,7 @@ pub extern "C" fn sim_now() -> i64 {
#[no_mangle]
pub extern "C" fn sim_exit(code: i32, msg: *const u8) {
debug!("sim_exit({}, {:?})", code, msg);
trace!("sim_exit({}, {:?})", code, msg);
sim_set_result(code, msg);
// I tried to make use of pthread_exit, but it doesn't work.
@@ -226,5 +227,6 @@ pub extern "C" fn sim_exit(code: i32, msg: *const u8) {
pub extern "C" fn sim_set_result(code: i32, msg: *const u8) {
let msg = unsafe { CStr::from_ptr(msg as *const i8) };
let msg = msg.to_string_lossy().into_owned();
debug!("sim_set_result({}, {:?})", code, msg);
os().set_result(code, msg);
}

View File

@@ -1,12 +1,14 @@
use std::{ffi::CString, path::Path, str::FromStr, sync::Arc};
use rand::{Rng, SeedableRng};
use safekeeper::simlib::{
network::{Delay, NetworkOptions},
proto::AnyMessage,
time::EmptyEvent,
world::World,
world::{Node, NodeEvent}, time::EmptyEvent,
world::{Node, NodeEvent},
};
use tracing::{info, error, warn, debug};
use tracing::{debug, error, info, warn};
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
@@ -16,7 +18,10 @@ use crate::{
MyInsertRecord, WalProposerCleanup, WalProposerRust,
},
c_context,
simtest::{safekeeper::run_server, log::{SimClock, init_logger}},
simtest::{
log::{init_logger, SimClock},
safekeeper::run_server,
},
};
use super::disk::Disk;
@@ -272,7 +277,6 @@ impl Test {
}
}
let mut wait_node = self.launch_sync();
// fake walproposer
let mut wp = WalProposer {
@@ -297,7 +301,10 @@ impl Test {
if sync_in_progress && wait_node.is_finished() {
let res = wait_node.result.lock().clone();
if res.0 != 0 {
anyhow::bail!("non-zero exitcode: {:?}", res);
warn!("sync non-zero exitcode: {:?}", res);
debug!("restarting walproposer");
wait_node = self.launch_sync();
continue;
}
let lsn = Lsn::from_str(&res.1)?;
debug!("sync-safekeepers finished at LSN {}", lsn);
@@ -350,7 +357,10 @@ impl Test {
if wait_node.is_finished() {
while self.world.step() && self.world.now() < next_event_time {}
} else {
while self.world.step() && self.world.now() < next_event_time && !wait_node.is_finished() {}
while self.world.step()
&& self.world.now() < next_event_time
&& !wait_node.is_finished()
{}
}
}
@@ -425,7 +435,7 @@ impl WalProposer {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum TestAction {
WriteTx(usize),
RestartSafekeeper(usize),
@@ -433,3 +443,24 @@ pub enum TestAction {
}
pub type Schedule = Vec<(u64, TestAction)>;
pub fn generate_schedule(seed: u64) -> Schedule {
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
let mut schedule = Vec::new();
let mut time = 0;
let cnt = rng.gen_range(1..100);
for _ in 0..cnt {
time += rng.gen_range(0..100);
let action = match rng.gen_range(0..3) {
0 => TestAction::WriteTx(rng.gen_range(1..10)),
1 => TestAction::RestartSafekeeper(rng.gen_range(0..3)),
2 => TestAction::RestartWalProposer,
_ => unreachable!(),
};
schedule.push((time, action));
}
schedule
}

View File

@@ -7,7 +7,7 @@ use safekeeper::simlib::{
world::World,
world::{Node, NodeEvent},
};
use tracing::info;
use tracing::{info, warn};
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
@@ -17,10 +17,17 @@ use crate::{
MyInsertRecord, WalProposerCleanup, WalProposerRust,
},
c_context,
simtest::{safekeeper::run_server, log::{SimClock, init_logger}, util::TestConfig},
simtest::{
log::{init_logger, SimClock},
safekeeper::run_server,
util::{generate_schedule, TestConfig},
},
};
use super::{disk::Disk, util::{Schedule, TestAction}};
use super::{
disk::Disk,
util::{Schedule, TestAction},
};
#[test]
fn sync_empty_safekeepers() {
@@ -118,7 +125,7 @@ fn test_simple_restart() {
}
#[test]
fn test_simple_schedule() {
fn test_simple_schedule() -> anyhow::Result<()> {
let clock = init_logger();
let mut config = TestConfig::new(Some(clock));
config.network.keepalive_timeout = Some(100);
@@ -142,13 +149,15 @@ fn test_simple_schedule() {
(1000, TestAction::WriteTx(5)),
];
test.run_schedule(&schedule).unwrap();
test.run_schedule(&schedule)?;
info!("Test finished, stopping all threads");
test.world.stop_all();
Ok(())
}
#[test]
fn test_random_schedules() {
fn test_random_schedules() -> anyhow::Result<()> {
let clock = init_logger();
let mut config = TestConfig::new(Some(clock));
config.network.keepalive_timeout = Some(100);
@@ -156,8 +165,40 @@ fn test_random_schedules() {
for i in 0..1000 {
let seed: u64 = rand::thread_rng().gen();
let test = config.start(seed);
info!("Running test with seed {}", seed);
warn!("Running test with seed {}", seed);
let schedule = generate_schedule(seed);
test.run_schedule(&schedule)?;
test.world.stop_all();
}
Ok(())
}
#[test]
fn test_one_schedule() -> anyhow::Result<()> {
let clock = init_logger();
let mut config = TestConfig::new(Some(clock));
config.network.keepalive_timeout = Some(100);
// let seed = 6762900106769428342;
// let test = config.start(seed);
// warn!("Running test with seed {}", seed);
// let schedule = generate_schedule(seed);
// info!("schedule: {:?}", schedule);
// test.run_schedule(&schedule)?;
// test.world.stop_all();
let seed = 14035854184686918762;
let test = config.start(seed);
warn!("Running test with seed {}", seed);
let schedule = generate_schedule(seed);
info!("schedule: {:?}", schedule);
test.run_schedule(&schedule).unwrap();
test.world.stop_all();
Ok(())
}

View File

@@ -347,6 +347,8 @@ void WalProposerCleanup()
SpinLockInit(&walprop_shared->mutex);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
}
XLogWalPropClose(0);
}
void InitMyInsert();
@@ -644,6 +646,9 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
{
walprop_log(FATAL, "Too many safekeepers");
}
memset(&safekeeper[n_safekeepers], 0, sizeof(Safekeeper));
safekeeper[n_safekeepers].host = host;
safekeeper[n_safekeepers].port = port;
safekeeper[n_safekeepers].state = SS_OFFLINE;

View File

@@ -486,9 +486,9 @@ XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
void
XLogWalPropClose(XLogRecPtr recptr)
{
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
// Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
if (close(walpropFile) != 0)
if (walpropFile >= 0 && close(walpropFile) != 0)
{
char xlogfname[MAXFNAMELEN];

View File

@@ -305,6 +305,8 @@ impl VirtualConnection {
let send_buffer = &mut state.buffers[node_idx];
send_buffer.send_closed = true;
drop(state);
// TODO: notify the other side?
self.dst_sockets[node_idx].send(NodeEvent::Closed(TCP::new(self.clone(), node_idx as u8)));

View File

@@ -1,11 +1,11 @@
use rand::{rngs::StdRng, Rng, SeedableRng};
use tracing::debug;
use std::{
cell::RefCell,
ops::DerefMut,
panic::AssertUnwindSafe,
sync::{atomic::AtomicU64, Arc},
};
use tracing::debug;
use super::{
chan::Chan,
@@ -215,11 +215,7 @@ impl World {
self.unconditional_parking.lock().len()
);
for node in self.nodes.lock().iter() {
debug!(
"node id={:?} status={:?}",
node.id,
node.status.lock()
);
debug!("node id={:?} status={:?}", node.id, node.status.lock());
}
for park in self.unconditional_parking.lock().iter() {
park.debug_print();
@@ -422,6 +418,8 @@ impl Node {
}
pub fn crash_stop(self: &Arc<Self>) {
self.world.await_all();
let status = self.status.lock().clone();
match status {
NodeStatus::NotStarted | NodeStatus::Finished | NodeStatus::Failed => return,
@@ -444,6 +442,7 @@ impl Node {
};
park.debug_print();
// self.world.debug_print_state();
// unplug old network socket, and create a new one
*self.network.lock() = Chan::new();