This commit is contained in:
Arthur Petukhovsky
2023-03-09 00:51:14 +02:00
parent 9116c01614
commit dd4c8fb568
9 changed files with 295 additions and 17 deletions

27
Cargo.lock generated
View File

@@ -1014,6 +1014,20 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52"
[[package]]
name = "crossbeam"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@@ -1048,6 +1062,16 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.14"
@@ -3311,6 +3335,7 @@ dependencies = [
"clap 4.1.4",
"const_format",
"crc32c",
"crossbeam",
"fs2",
"git-version",
"hex",
@@ -3324,9 +3349,11 @@ dependencies = [
"postgres-protocol",
"postgres_ffi",
"pq_proto",
"rand",
"regex",
"remote_storage",
"safekeeper_api",
"scopeguard",
"serde",
"serde_json",
"serde_with",

View File

@@ -42,8 +42,11 @@ remote_storage.workspace = true
safekeeper_api.workspace = true
storage_broker.workspace = true
utils.workspace = true
scopeguard.workspace = true
workspace_hack.workspace = true
crossbeam = "0.8.2"
rand.workspace = true
[dev-dependencies]
tempfile.workspace = true

View File

@@ -1,7 +1,61 @@
use std::sync::Arc;
use anyhow::Result;
use super::sync::Mutex;
pub trait Storage<T> {
fn flush_pos(&self) -> u32;
fn flush(&mut self) -> Result<()>;
fn write(&mut self, t: T);
}
#[derive(Clone)]
pub struct SharedStorage<T> {
state: Arc<Mutex<InMemoryStorage<T>>>,
}
impl<T> SharedStorage<T> {
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(InMemoryStorage::new())),
}
}
}
impl<T> Storage<T> for SharedStorage<T> {
fn flush_pos(&self) -> u32 {
self.state.lock().flush_pos
}
fn flush(&mut self) -> Result<()> {
self.state.lock().flush()
}
fn write(&mut self, t: T) {
self.state.lock().write(t);
}
}
pub struct InMemoryStorage<T> {
data: Vec<T>,
flush_pos: u32,
}
impl<T> InMemoryStorage<T> {
pub fn new() -> Self {
Self {
data: Vec::new(),
flush_pos: 0,
}
}
pub fn flush(&mut self) -> Result<()> {
self.flush_pos = self.data.len() as u32;
Ok(())
}
pub fn write(&mut self, t: T) {
self.data.push(t);
}
}

View File

@@ -8,3 +8,4 @@ pub mod tcp;
pub mod chan;
pub mod sync;
pub mod start_test;
pub mod wait_group;

View File

@@ -16,10 +16,15 @@ impl NodeOs {
}
}
/// Get the node id.
pub fn id(&self) -> NodeId {
self.internal.id
}
/// Returns a writable pipe. All incoming messages should be polled
/// with [`network_epoll`]. Always successful.
pub fn open_tcp(&self, dst: NodeId) -> Tcp {
self.world.open_tcp(self.internal, dst)
self.world.open_tcp(&self.internal, dst)
}
/// Returns a channel to receive events from the network.

View File

@@ -1,9 +1,32 @@
use std::sync::Arc;
use super::world::World;
use super::{world::World, client::run_client, disklog::run_server, disk::SharedStorage};
#[test]
fn start_simulation() {
let world = Arc::new(World::new());
let client_node = world.new_node();
let server_node = world.new_node();
let server_id = server_node.id;
// start the client thread
let data = [1, 2, 3, 4, 5];
client_node.launch(move |os| {
run_client(os, &data, server_id)
});
// start the server thread
let shared_storage = SharedStorage::new();
let server_storage = shared_storage.clone();
server_node.launch(move |os| {
run_server(os, Box::new(server_storage))
});
world.await_all();
world.debug_print_state();
while world.step() {
println!("made a step!");
world.debug_print_state();
}
}

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, backtrace::Backtrace, io::{self, Write}};
pub type Mutex<T> = parking_lot::Mutex<T>;
@@ -60,13 +60,17 @@ pub struct Park {
struct ParkState {
finished: bool,
dbg_signal: u8,
}
impl Park {
pub fn new() -> Arc<Park> {
Arc::new(
Park {
lock: Mutex::new(ParkState { finished: false }),
lock: Mutex::new(ParkState {
finished: false,
dbg_signal: 0,
}),
cvar: parking_lot::Condvar::new(),
}
)
@@ -80,6 +84,17 @@ impl Park {
let mut state = self.lock.lock();
while !state.finished {
self.cvar.wait(&mut state);
// check if debug info was requested
if state.dbg_signal != 0 {
let bt = Backtrace::capture();
println!("DEBUG: thread {:?} is parked at {:?}", std::thread::current().id(), bt);
// TODO: fix bad ordering of output
io::stdout().flush().unwrap();
state.dbg_signal = 0;
// trigger a notification to wake up the caller thread
self.cvar.notify_all();
}
}
// finish parking
}
@@ -90,4 +105,15 @@ impl Park {
state.finished = true;
self.cvar.notify_all();
}
/// Send a signal to the thread that is currently parked to print debug info.
pub fn debug_print(&self) {
let mut state = self.lock.lock();
state.dbg_signal = 1;
self.cvar.notify_all();
while !state.dbg_signal == 0 && !state.finished {
self.cvar.wait(&mut state);
}
}
}

View File

@@ -0,0 +1,54 @@
use std::sync::{Arc, Condvar, Mutex};
/// This is a custom waitgroup for internal use, shouldn't be used by the custom code.
#[derive(Clone)]
pub struct WaitGroup {
inner: Arc<Inner>,
}
/// Inner state of a `WaitGroup`.
struct Inner {
// using std convar
cvar: Condvar,
count: Mutex<i32>,
}
impl Default for WaitGroup {
fn default() -> Self {
Self {
inner: Arc::new(Inner {
cvar: Condvar::new(),
count: Mutex::new(0),
}),
}
}
}
impl WaitGroup {
pub fn new() -> Self {
Self::default()
}
pub fn wait(&self) {
if *self.inner.count.lock().unwrap() <= 0 {
return;
}
let mut count = self.inner.count.lock().unwrap();
while *count > 0 {
count = self.inner.cvar.wait(count).unwrap();
}
}
pub fn add(&self, delta: i32) {
let mut count = self.inner.count.lock().unwrap();
*count += delta;
if *count <= 0 {
self.inner.cvar.notify_all();
}
}
pub fn done(&self) {
self.add(-1);
}
}

View File

@@ -1,30 +1,41 @@
use std::sync::{atomic::AtomicI32, Arc};
use rand::{rngs::StdRng, SeedableRng, Rng};
use super::{tcp::Tcp, sync::Mutex, chan::Chan, proto::AnyMessage};
use super::{tcp::Tcp, sync::{Mutex, Park}, chan::Chan, proto::AnyMessage, node_os::NodeOs, wait_group::WaitGroup};
pub type NodeId = u32;
/// Full world simulation state, shared between all nodes.
pub struct World {
nodes: Mutex<Vec<Arc<Node>>>,
/// List of parked threads, to be woken up by the world simulation.
unconditional_parking: Mutex<Vec<Arc<Park>>>,
/// Counter for running threads. Generally should not be more than 1, if you want
/// to get a deterministic simulation. 0 means that all threads are parked or finished.
wait_group: WaitGroup,
/// Random number generator.
rng: Mutex<StdRng>,
}
impl World {
pub fn new() -> World {
World{
nodes: Mutex::new(Vec::new()),
unconditional_parking: Mutex::new(Vec::new()),
wait_group: WaitGroup::new(),
rng: Mutex::new(StdRng::seed_from_u64(1337)),
}
}
/// Create a new node.
pub fn new_node(&self) -> Arc<Node> {
pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
// TODO: verify
let mut nodes = self.nodes.lock();
let id = nodes.len() as NodeId;
let node = Arc::new(Node{
id,
network: Chan::new(),
});
let node = Arc::new(Node::new(id, self.clone()));
nodes.push(node.clone());
node
}
@@ -41,12 +52,50 @@ impl World {
}
/// Returns a writable end of a TCP connection, to send src->dst messages.
pub fn open_tcp(&self, src: Arc<Node>, dst: NodeId) -> Tcp {
pub fn open_tcp(&self, src: &Arc<Node>, dst: NodeId) -> Tcp {
// TODO: replace unwrap() with /dev/null socket.
let dst = self.get_node(dst).unwrap();
Tcp::new(dst)
}
/// Blocks the current thread until all nodes will park or finish.
pub fn await_all(&self) {
self.wait_group.wait();
}
pub fn step(&self) -> bool {
self.await_all();
let mut parking = self.unconditional_parking.lock();
if parking.is_empty() {
// nothing to do, all threads have finished
return false;
}
let chosen_one = self.rng.lock().gen_range(0..parking.len());
let park = parking.swap_remove(chosen_one);
drop(parking);
// wake up the chosen thread
park.wake();
// to have a clean state after each step, wait for all threads to finish
self.await_all();
return true;
}
/// Print full world state to stdout.
pub fn debug_print_state(&self) {
println!("[DEBUG] World state, nodes.len()={:?}, parking.len()={:?}", self.nodes.lock().len(), self.unconditional_parking.lock().len());
for node in self.nodes.lock().iter() {
println!("[DEBUG] node id={:?} status={:?}", node.id, node.status.lock());
}
// for park in self.unconditional_parking.lock().iter() {
// println!("[DEBUG] parked thread, stacktrace:");
// park.debug_print();
// }
}
}
/// Internal node state.
@@ -55,6 +104,7 @@ pub struct Node {
network: Chan<NetworkEvent>,
status: Mutex<NodeStatus>,
world: Arc<World>,
join_handle: Mutex<Option<std::thread::JoinHandle<()>>>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -71,29 +121,64 @@ impl Node {
Node{
id,
network: Chan::new(),
status: Mutex::new(NodeStatus::NotStarted),
status: Mutex::new(NodeStatus::NotStarted),
world: world.clone(),
join_handle: Mutex::new(None),
}
}
pub fn launch(&self, f: impl FnOnce() + Send + 'static) {
/// Set a code to run in this node thread.
pub fn launch(self: &Arc<Self>, f: impl FnOnce(NodeOs) + Send + 'static) {
let node = self.clone();
std::thread::spawn(move || {
let status = node.status.lock();
let world = self.world.clone();
world.wait_group.add(1);
let join_handle = std::thread::spawn(move || {
let wg = world.wait_group.clone();
scopeguard::defer! {
wg.done();
}
let mut status = node.status.lock();
if *status != NodeStatus::NotStarted {
// unhandled panic, clearly a caller bug, should never happen
// clearly a caller bug, should never happen
panic!("node {} is already running", node.id);
}
*status = NodeStatus::Running;
drop(status);
// TODO:
node.park_me();
// TODO: recover from panic (update state, log the error)
f(NodeOs::new(world, node.clone()));
let mut status = node.status.lock();
*status = NodeStatus::Finished;
// TODO: log the thread is finished
});
*self.join_handle.lock() = Some(join_handle);
}
/// Returns a channel to receive events from the network.
pub fn network_chan(&self) -> Chan<NetworkEvent> {
self.network.clone()
}
/// Park the node current thread until world simulation will decide to continue.
pub fn park_me(&self) {
// TODO: try to rewrite this function
let park = Park::new();
let mut parking = self.world.unconditional_parking.lock();
parking.push(park.clone());
drop(parking);
// decrease the running threads counter, because current thread is parked
self.world.wait_group.done();
// and increase it once it will wake up
scopeguard::defer!(self.world.wait_group.add(1));
*self.status.lock() = NodeStatus::Parked;
park.park();
*self.status.lock() = NodeStatus::Running;
}
}
#[derive(Clone)]