mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
This PR contains the first version of a [FoundationDB-like](https://www.youtube.com/watch?v=4fFDFbi3toc) simulation testing for safekeeper and walproposer. ### desim This is a core "framework" for running determenistic simulation. It operates on threads, allowing to test syncronous code (like walproposer). `libs/desim/src/executor.rs` contains implementation of a determenistic thread execution. This is achieved by blocking all threads, and each time allowing only a single thread to make an execution step. All executor's threads are blocked using `yield_me(after_ms)` function. This function is called when a thread wants to sleep or wait for an external notification (like blocking on a channel until it has a ready message). `libs/desim/src/chan.rs` contains implementation of a channel (basic sync primitive). It has unlimited capacity and any thread can push or read messages to/from it. `libs/desim/src/network.rs` has a very naive implementation of a network (only reliable TCP-like connections are supported for now), that can have arbitrary delays for each package and failure injections for breaking connections with some probability. `libs/desim/src/world.rs` ties everything together, to have a concept of virtual nodes that can have network connections between them. ### walproposer_sim Has everything to run walproposer and safekeepers in a simulation. `safekeeper.rs` reimplements all necesary stuff from `receive_wal.rs`, `send_wal.rs` and `timelines_global_map.rs`. `walproposer_api.rs` implements all walproposer callback to use simulation library. `simulation.rs` defines a schedule – a set of events like `restart <sk>` or `write_wal` that should happen at time `<ts>`. It also has code to spawn walproposer/safekeeper threads and provide config to them. ### tests `simple_test.rs` has tests that just start walproposer and 3 safekeepers together in a simulation, and tests that they are not crashing right away. `misc_test.rs` has tests checking more advanced simulation cases, like crashing or restarting threads, testing memory deallocation, etc. `random_test.rs` is the main test, it checks thousands of random seeds (schedules) for correctness. It roughly corresponds to running a real python integration test in an environment with very unstable network and cpu, but in a determenistic way (each seed results in the same execution log) and much much faster. Closes #547 --------- Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
58 lines
2.0 KiB
Rust
58 lines
2.0 KiB
Rust
use std::collections::HashMap;
|
|
|
|
const BLOCK_SIZE: usize = 8192;
|
|
|
|
/// A simple in-memory implementation of a block storage. Can be used to implement external
|
|
/// storage in tests.
|
|
pub struct BlockStorage {
|
|
blocks: HashMap<u64, [u8; BLOCK_SIZE]>,
|
|
}
|
|
|
|
impl Default for BlockStorage {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl BlockStorage {
|
|
pub fn new() -> Self {
|
|
BlockStorage {
|
|
blocks: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
pub fn read(&self, pos: u64, buf: &mut [u8]) {
|
|
let mut buf_offset = 0;
|
|
let mut storage_pos = pos;
|
|
while buf_offset < buf.len() {
|
|
let block_id = storage_pos / BLOCK_SIZE as u64;
|
|
let block = self.blocks.get(&block_id).unwrap_or(&[0; BLOCK_SIZE]);
|
|
let block_offset = storage_pos % BLOCK_SIZE as u64;
|
|
let block_len = BLOCK_SIZE as u64 - block_offset;
|
|
let buf_len = buf.len() - buf_offset;
|
|
let copy_len = std::cmp::min(block_len as usize, buf_len);
|
|
buf[buf_offset..buf_offset + copy_len]
|
|
.copy_from_slice(&block[block_offset as usize..block_offset as usize + copy_len]);
|
|
buf_offset += copy_len;
|
|
storage_pos += copy_len as u64;
|
|
}
|
|
}
|
|
|
|
pub fn write(&mut self, pos: u64, buf: &[u8]) {
|
|
let mut buf_offset = 0;
|
|
let mut storage_pos = pos;
|
|
while buf_offset < buf.len() {
|
|
let block_id = storage_pos / BLOCK_SIZE as u64;
|
|
let block = self.blocks.entry(block_id).or_insert([0; BLOCK_SIZE]);
|
|
let block_offset = storage_pos % BLOCK_SIZE as u64;
|
|
let block_len = BLOCK_SIZE as u64 - block_offset;
|
|
let buf_len = buf.len() - buf_offset;
|
|
let copy_len = std::cmp::min(block_len as usize, buf_len);
|
|
block[block_offset as usize..block_offset as usize + copy_len]
|
|
.copy_from_slice(&buf[buf_offset..buf_offset + copy_len]);
|
|
buf_offset += copy_len;
|
|
storage_pos += copy_len as u64
|
|
}
|
|
}
|
|
}
|