mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
This PR contains the first version of a [FoundationDB-like](https://www.youtube.com/watch?v=4fFDFbi3toc) simulation testing for safekeeper and walproposer. ### desim This is a core "framework" for running determenistic simulation. It operates on threads, allowing to test syncronous code (like walproposer). `libs/desim/src/executor.rs` contains implementation of a determenistic thread execution. This is achieved by blocking all threads, and each time allowing only a single thread to make an execution step. All executor's threads are blocked using `yield_me(after_ms)` function. This function is called when a thread wants to sleep or wait for an external notification (like blocking on a channel until it has a ready message). `libs/desim/src/chan.rs` contains implementation of a channel (basic sync primitive). It has unlimited capacity and any thread can push or read messages to/from it. `libs/desim/src/network.rs` has a very naive implementation of a network (only reliable TCP-like connections are supported for now), that can have arbitrary delays for each package and failure injections for breaking connections with some probability. `libs/desim/src/world.rs` ties everything together, to have a concept of virtual nodes that can have network connections between them. ### walproposer_sim Has everything to run walproposer and safekeepers in a simulation. `safekeeper.rs` reimplements all necesary stuff from `receive_wal.rs`, `send_wal.rs` and `timelines_global_map.rs`. `walproposer_api.rs` implements all walproposer callback to use simulation library. `simulation.rs` defines a schedule – a set of events like `restart <sk>` or `write_wal` that should happen at time `<ts>`. It also has code to spawn walproposer/safekeeper threads and provide config to them. ### tests `simple_test.rs` has tests that just start walproposer and 3 safekeepers together in a simulation, and tests that they are not crashing right away. `misc_test.rs` has tests checking more advanced simulation cases, like crashing or restarting threads, testing memory deallocation, etc. `random_test.rs` is the main test, it checks thousands of random seeds (schedules) for correctness. It roughly corresponds to running a real python integration test in an environment with very unstable network and cpu, but in a determenistic way (each seed results in the same execution log) and much much faster. Closes #547 --------- Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
109 lines
2.6 KiB
Rust
109 lines
2.6 KiB
Rust
use std::{collections::VecDeque, sync::Arc};
|
|
|
|
use parking_lot::{Mutex, MutexGuard};
|
|
|
|
use crate::executor::{self, PollSome, Waker};
|
|
|
|
/// FIFO channel with blocking send and receive. Can be cloned and shared between threads.
|
|
/// Blocking functions should be used only from threads that are managed by the executor.
|
|
pub struct Chan<T> {
|
|
shared: Arc<State<T>>,
|
|
}
|
|
|
|
impl<T> Clone for Chan<T> {
|
|
fn clone(&self) -> Self {
|
|
Chan {
|
|
shared: self.shared.clone(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> Default for Chan<T> {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl<T> Chan<T> {
|
|
pub fn new() -> Chan<T> {
|
|
Chan {
|
|
shared: Arc::new(State {
|
|
queue: Mutex::new(VecDeque::new()),
|
|
waker: Waker::new(),
|
|
}),
|
|
}
|
|
}
|
|
|
|
/// Get a message from the front of the queue, block if the queue is empty.
|
|
/// If not called from the executor thread, it can block forever.
|
|
pub fn recv(&self) -> T {
|
|
self.shared.recv()
|
|
}
|
|
|
|
/// Panic if the queue is empty.
|
|
pub fn must_recv(&self) -> T {
|
|
self.shared
|
|
.try_recv()
|
|
.expect("message should've been ready")
|
|
}
|
|
|
|
/// Get a message from the front of the queue, return None if the queue is empty.
|
|
/// Never blocks.
|
|
pub fn try_recv(&self) -> Option<T> {
|
|
self.shared.try_recv()
|
|
}
|
|
|
|
/// Send a message to the back of the queue.
|
|
pub fn send(&self, t: T) {
|
|
self.shared.send(t);
|
|
}
|
|
}
|
|
|
|
struct State<T> {
|
|
queue: Mutex<VecDeque<T>>,
|
|
waker: Waker,
|
|
}
|
|
|
|
impl<T> State<T> {
|
|
fn send(&self, t: T) {
|
|
self.queue.lock().push_back(t);
|
|
self.waker.wake_all();
|
|
}
|
|
|
|
fn try_recv(&self) -> Option<T> {
|
|
let mut q = self.queue.lock();
|
|
q.pop_front()
|
|
}
|
|
|
|
fn recv(&self) -> T {
|
|
// interrupt the receiver to prevent consuming everything at once
|
|
executor::yield_me(0);
|
|
|
|
let mut queue = self.queue.lock();
|
|
if let Some(t) = queue.pop_front() {
|
|
return t;
|
|
}
|
|
loop {
|
|
self.waker.wake_me_later();
|
|
if let Some(t) = queue.pop_front() {
|
|
return t;
|
|
}
|
|
MutexGuard::unlocked(&mut queue, || {
|
|
executor::yield_me(-1);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> PollSome for Chan<T> {
|
|
/// Schedules a wakeup for the current thread.
|
|
fn wake_me(&self) {
|
|
self.shared.waker.wake_me_later();
|
|
}
|
|
|
|
/// Checks if chan has any pending messages.
|
|
fn has_some(&self) -> bool {
|
|
!self.shared.queue.lock().is_empty()
|
|
}
|
|
}
|