mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
This PR contains the first version of a [FoundationDB-like](https://www.youtube.com/watch?v=4fFDFbi3toc) simulation testing for safekeeper and walproposer. ### desim This is a core "framework" for running determenistic simulation. It operates on threads, allowing to test syncronous code (like walproposer). `libs/desim/src/executor.rs` contains implementation of a determenistic thread execution. This is achieved by blocking all threads, and each time allowing only a single thread to make an execution step. All executor's threads are blocked using `yield_me(after_ms)` function. This function is called when a thread wants to sleep or wait for an external notification (like blocking on a channel until it has a ready message). `libs/desim/src/chan.rs` contains implementation of a channel (basic sync primitive). It has unlimited capacity and any thread can push or read messages to/from it. `libs/desim/src/network.rs` has a very naive implementation of a network (only reliable TCP-like connections are supported for now), that can have arbitrary delays for each package and failure injections for breaking connections with some probability. `libs/desim/src/world.rs` ties everything together, to have a concept of virtual nodes that can have network connections between them. ### walproposer_sim Has everything to run walproposer and safekeepers in a simulation. `safekeeper.rs` reimplements all necesary stuff from `receive_wal.rs`, `send_wal.rs` and `timelines_global_map.rs`. `walproposer_api.rs` implements all walproposer callback to use simulation library. `simulation.rs` defines a schedule – a set of events like `restart <sk>` or `write_wal` that should happen at time `<ts>`. It also has code to spawn walproposer/safekeeper threads and provide config to them. ### tests `simple_test.rs` has tests that just start walproposer and 3 safekeepers together in a simulation, and tests that they are not crashing right away. `misc_test.rs` has tests checking more advanced simulation cases, like crashing or restarting threads, testing memory deallocation, etc. `random_test.rs` is the main test, it checks thousands of random seeds (schedules) for correctness. It roughly corresponds to running a real python integration test in an environment with very unstable network and cpu, but in a determenistic way (each seed results in the same execution log) and much much faster. Closes #547 --------- Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
188 lines
5.8 KiB
Rust
188 lines
5.8 KiB
Rust
use desim::proto::SimEvent;
|
|
use tracing::debug;
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
enum NodeKind {
|
|
Unknown,
|
|
Safekeeper,
|
|
WalProposer,
|
|
}
|
|
|
|
impl Default for NodeKind {
|
|
fn default() -> Self {
|
|
Self::Unknown
|
|
}
|
|
}
|
|
|
|
/// Simulation state of walproposer/safekeeper, derived from the simulation logs.
|
|
#[derive(Clone, Debug, Default)]
|
|
struct NodeInfo {
|
|
kind: NodeKind,
|
|
|
|
// walproposer
|
|
is_sync: bool,
|
|
term: u64,
|
|
epoch_lsn: u64,
|
|
|
|
// safekeeper
|
|
commit_lsn: u64,
|
|
flush_lsn: u64,
|
|
}
|
|
|
|
impl NodeInfo {
|
|
fn init_kind(&mut self, kind: NodeKind) {
|
|
if self.kind == NodeKind::Unknown {
|
|
self.kind = kind;
|
|
} else {
|
|
assert!(self.kind == kind);
|
|
}
|
|
}
|
|
|
|
fn started(&mut self, data: &str) {
|
|
let mut parts = data.split(';');
|
|
assert!(parts.next().unwrap() == "started");
|
|
match parts.next().unwrap() {
|
|
"safekeeper" => {
|
|
self.init_kind(NodeKind::Safekeeper);
|
|
}
|
|
"walproposer" => {
|
|
self.init_kind(NodeKind::WalProposer);
|
|
let is_sync: u8 = parts.next().unwrap().parse().unwrap();
|
|
self.is_sync = is_sync != 0;
|
|
}
|
|
_ => unreachable!(),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Global state of the simulation, derived from the simulation logs.
|
|
#[derive(Debug, Default)]
|
|
struct GlobalState {
|
|
nodes: Vec<NodeInfo>,
|
|
commit_lsn: u64,
|
|
write_lsn: u64,
|
|
max_write_lsn: u64,
|
|
|
|
written_wal: u64,
|
|
written_records: u64,
|
|
}
|
|
|
|
impl GlobalState {
|
|
fn new() -> Self {
|
|
Default::default()
|
|
}
|
|
|
|
fn get(&mut self, id: u32) -> &mut NodeInfo {
|
|
let id = id as usize;
|
|
if id >= self.nodes.len() {
|
|
self.nodes.resize(id + 1, NodeInfo::default());
|
|
}
|
|
&mut self.nodes[id]
|
|
}
|
|
}
|
|
|
|
/// Try to find inconsistencies in the simulation log.
|
|
pub fn validate_events(events: Vec<SimEvent>) {
|
|
const INITDB_LSN: u64 = 21623024;
|
|
|
|
let hook = std::panic::take_hook();
|
|
scopeguard::defer_on_success! {
|
|
std::panic::set_hook(hook);
|
|
};
|
|
|
|
let mut state = GlobalState::new();
|
|
state.max_write_lsn = INITDB_LSN;
|
|
|
|
for event in events {
|
|
debug!("{:?}", event);
|
|
|
|
let node = state.get(event.node);
|
|
if event.data.starts_with("started;") {
|
|
node.started(&event.data);
|
|
continue;
|
|
}
|
|
assert!(node.kind != NodeKind::Unknown);
|
|
|
|
// drop reference to unlock state
|
|
let mut node = node.clone();
|
|
|
|
let mut parts = event.data.split(';');
|
|
match node.kind {
|
|
NodeKind::Safekeeper => match parts.next().unwrap() {
|
|
"tli_loaded" => {
|
|
let flush_lsn: u64 = parts.next().unwrap().parse().unwrap();
|
|
let commit_lsn: u64 = parts.next().unwrap().parse().unwrap();
|
|
node.flush_lsn = flush_lsn;
|
|
node.commit_lsn = commit_lsn;
|
|
}
|
|
_ => unreachable!(),
|
|
},
|
|
NodeKind::WalProposer => {
|
|
match parts.next().unwrap() {
|
|
"prop_elected" => {
|
|
let prop_lsn: u64 = parts.next().unwrap().parse().unwrap();
|
|
let prop_term: u64 = parts.next().unwrap().parse().unwrap();
|
|
let prev_lsn: u64 = parts.next().unwrap().parse().unwrap();
|
|
let prev_term: u64 = parts.next().unwrap().parse().unwrap();
|
|
|
|
assert!(prop_lsn >= prev_lsn);
|
|
assert!(prop_term >= prev_term);
|
|
|
|
assert!(prop_lsn >= state.commit_lsn);
|
|
|
|
if prop_lsn > state.write_lsn {
|
|
assert!(prop_lsn <= state.max_write_lsn);
|
|
debug!(
|
|
"moving write_lsn up from {} to {}",
|
|
state.write_lsn, prop_lsn
|
|
);
|
|
state.write_lsn = prop_lsn;
|
|
}
|
|
if prop_lsn < state.write_lsn {
|
|
debug!(
|
|
"moving write_lsn down from {} to {}",
|
|
state.write_lsn, prop_lsn
|
|
);
|
|
state.write_lsn = prop_lsn;
|
|
}
|
|
|
|
node.epoch_lsn = prop_lsn;
|
|
node.term = prop_term;
|
|
}
|
|
"write_wal" => {
|
|
assert!(!node.is_sync);
|
|
let start_lsn: u64 = parts.next().unwrap().parse().unwrap();
|
|
let end_lsn: u64 = parts.next().unwrap().parse().unwrap();
|
|
let cnt: u64 = parts.next().unwrap().parse().unwrap();
|
|
|
|
let size = end_lsn - start_lsn;
|
|
state.written_wal += size;
|
|
state.written_records += cnt;
|
|
|
|
// TODO: If we allow writing WAL before winning the election
|
|
|
|
assert!(start_lsn >= state.commit_lsn);
|
|
assert!(end_lsn >= start_lsn);
|
|
// assert!(start_lsn == state.write_lsn);
|
|
state.write_lsn = end_lsn;
|
|
|
|
if end_lsn > state.max_write_lsn {
|
|
state.max_write_lsn = end_lsn;
|
|
}
|
|
}
|
|
"commit_lsn" => {
|
|
let lsn: u64 = parts.next().unwrap().parse().unwrap();
|
|
assert!(lsn >= state.commit_lsn);
|
|
state.commit_lsn = lsn;
|
|
}
|
|
_ => unreachable!(),
|
|
}
|
|
}
|
|
_ => unreachable!(),
|
|
}
|
|
|
|
// update the node in the state struct
|
|
*state.get(event.node) = node;
|
|
}
|
|
}
|