This commit is contained in:
Arthur Petukhovsky
2023-03-09 14:51:29 +02:00
parent dd4c8fb568
commit f2fb9f6be9
6 changed files with 129 additions and 45 deletions

View File

@@ -2,8 +2,12 @@ use super::{node_os::NodeOs, world::NodeId, proto::AnyMessage};
/// Copy all data from array to the remote node.
pub fn run_client(os: NodeOs, data: &[u32], dst: NodeId) {
println!("started client");
let sock = os.open_tcp(dst);
for num in data {
sock.send(AnyMessage::Just32(num.clone()));
}
println!("sent all data and finished client");
}

View File

@@ -23,9 +23,12 @@ impl DiskLog {
}
pub fn run_server(os: NodeOs, mut storage: Box<dyn Storage<u32>>) {
println!("started server");
let epoll = os.network_epoll();
loop {
let event = epoll.recv();
println!("got event: {:?}", event);
match event {
NetworkEvent::Message(msg) => {
match msg {

View File

@@ -1,6 +1,6 @@
/// All possible flavours of messages.
/// Grouped by the receiver node.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum AnyMessage {
Just32(u32),
}

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, thread::sleep_ms, time::Duration};
use super::{world::World, client::run_client, disklog::run_server, disk::SharedStorage};
@@ -22,11 +22,13 @@ fn start_simulation() {
run_server(os, Box::new(server_storage))
});
world.debug_print_state();
world.await_all();
world.debug_print_state();
while world.step() {
println!("made a step!");
std::thread::sleep(Duration::from_millis(200));
world.debug_print_state();
}
}

View File

@@ -1,5 +1,7 @@
use std::{sync::Arc, backtrace::Backtrace, io::{self, Write}};
use super::world::{Node, NodeId};
pub type Mutex<T> = parking_lot::Mutex<T>;
/// More deterministic condvar.
@@ -20,7 +22,7 @@ impl Condvar {
/// Blocks the current thread until this condition variable receives a notification.
pub fn wait<'a, T>(&self, guard: &mut parking_lot::MutexGuard<'a, T>) {
let park = Park::new();
let park = Park::new(false);
// add the waiter to the list
self.waiters.lock().waiters.push(park.clone());
@@ -33,22 +35,29 @@ impl Condvar {
/// Wakes up all blocked threads on this condvar.
pub fn notify_all(&self) {
// TODO: wake up in random order, yield to the scheduler
// TODO: check that it's waked up in random order and yield to the scheduler
let mut state = self.waiters.lock();
for waiter in state.waiters.drain(..) {
waiter.wake();
}
drop(state);
// yield the current thread to the scheduler
Park::yield_thread();
}
/// Wakes up one blocked thread on this condvar.
pub fn notify_one(&self) {
// TODO: wake up random thread, yield to the scheduler
// TODO: wake up random thread
let mut state = self.waiters.lock();
if let Some(waiter) = state.waiters.pop() {
waiter.wake();
}
// yield the current thread to the scheduler
Park::yield_thread();
}
}
@@ -59,61 +68,108 @@ pub struct Park {
}
struct ParkState {
/// False means that thread cannot continue without external signal,
/// i.e. waiting for some event to happen.
can_continue: bool,
/// False means that thread is unconditionally parked and waiting for
/// world simulation to wake it up. True means that the parking is
/// finished and the thread can continue.
finished: bool,
dbg_signal: u8,
node_id: Option<NodeId>,
backtrace: Option<Backtrace>,
}
impl Park {
pub fn new() -> Arc<Park> {
pub fn new(can_continue: bool) -> Arc<Park> {
Arc::new(
Park {
lock: Mutex::new(ParkState {
can_continue,
finished: false,
dbg_signal: 0,
node_id: None,
backtrace: None,
}),
cvar: parking_lot::Condvar::new(),
}
)
}
/// Should be called once by the waiting thread. Blocks the thread until wake() is called.
pub fn park(&self) {
// TODO: update state of the current thread in the world struct
/// Should be called once by the waiting thread. Blocks the thread until wake() is called,
/// and until the thread is woken up by the world simulation.
pub fn park(self: &Arc<Self>) {
let node = Node::current();
// start parking
let mut state = self.lock.lock();
state.node_id = Some(node.id);
state.backtrace = Some(Backtrace::capture());
println!("PARKING STARTED: node {:?}", node.id);
parking_lot::MutexGuard::unlocked(&mut state, || {
node.internal_parking_start();
});
scopeguard::defer! {
node.internal_parking_end();
};
// wait for condition
while !state.can_continue {
self.cvar.wait(&mut state);
}
println!("PARKING MIDDLE: node {:?}", node.id);
// condition is met, wait for world simulation to wake us up
node.internal_parking_middle(self.clone());
while !state.finished {
self.cvar.wait(&mut state);
// check if debug info was requested
if state.dbg_signal != 0 {
let bt = Backtrace::capture();
println!("DEBUG: thread {:?} is parked at {:?}", std::thread::current().id(), bt);
// TODO: fix bad ordering of output
io::stdout().flush().unwrap();
state.dbg_signal = 0;
// trigger a notification to wake up the caller thread
self.cvar.notify_all();
}
}
println!("PARKING ENDED: node {:?}", node.id);
// finish parking
}
/// Will wake up the thread that is currently parked.
/// Will wake up the thread that is currently conditionally parked.
pub fn wake(&self) {
let mut state = self.lock.lock();
if state.can_continue {
println!("WARN wake() called on a thread that is already waked, node {:?}", state.node_id);
return;
}
state.can_continue = true;
self.cvar.notify_all();
}
/// Will wake up the thread that is currently unconditionally parked.
pub fn internal_world_wake(&self) {
let mut state = self.lock.lock();
if state.finished {
println!("WARN internal_world_wake() called on a thread that is already waked, node {:?}", state.node_id);
return;
}
state.finished = true;
self.cvar.notify_all();
}
/// Send a signal to the thread that is currently parked to print debug info.
/// Print debug info about the parked thread.
pub fn debug_print(&self) {
let mut state = self.lock.lock();
state.dbg_signal = 1;
self.cvar.notify_all();
println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace);
}
while !state.dbg_signal == 0 && !state.finished {
self.cvar.wait(&mut state);
}
/// It feels that this function can cause deadlocks.
pub fn node_id(&self) -> Option<NodeId> {
let state = self.lock.lock();
state.node_id
}
/// Yield the current thread to the world simulation.
pub fn yield_thread() {
let park = Park::new(true);
park.park();
}
}

View File

@@ -1,4 +1,4 @@
use std::sync::{atomic::AtomicI32, Arc};
use std::{sync::{atomic::AtomicI32, Arc}, cell::RefCell};
use rand::{rngs::StdRng, SeedableRng, Rng};
use super::{tcp::Tcp, sync::{Mutex, Park}, chan::Chan, proto::AnyMessage, node_os::NodeOs, wait_group::WaitGroup};
@@ -77,8 +77,10 @@ impl World {
let park = parking.swap_remove(chosen_one);
drop(parking);
println!("Waking up park at node {:?}", park.node_id());
// wake up the chosen thread
park.wake();
park.internal_world_wake();
// to have a clean state after each step, wait for all threads to finish
self.await_all();
@@ -98,6 +100,10 @@ impl World {
}
}
thread_local! {
pub static CURRENT_NODE: RefCell<Option<Arc<Node>>> = RefCell::new(None);
}
/// Internal node state.
pub struct Node {
pub id: NodeId,
@@ -111,6 +117,7 @@ pub struct Node {
pub enum NodeStatus {
NotStarted,
Running,
Waiting,
Parked,
Finished,
Failed,
@@ -133,6 +140,10 @@ impl Node {
let world = self.world.clone();
world.wait_group.add(1);
let join_handle = std::thread::spawn(move || {
CURRENT_NODE.with(|current_node| {
*current_node.borrow_mut() = Some(node.clone());
});
let wg = world.wait_group.clone();
scopeguard::defer! {
wg.done();
@@ -146,7 +157,8 @@ impl Node {
*status = NodeStatus::Running;
drop(status);
node.park_me();
// block on the world simulation
Park::yield_thread();
// TODO: recover from panic (update state, log the error)
f(NodeOs::new(world, node.clone()));
@@ -162,26 +174,33 @@ impl Node {
self.network.clone()
}
/// Park the node current thread until world simulation will decide to continue.
pub fn park_me(&self) {
// TODO: try to rewrite this function
let park = Park::new();
let mut parking = self.world.unconditional_parking.lock();
parking.push(park.clone());
drop(parking);
// decrease the running threads counter, because current thread is parked
pub fn internal_parking_start(&self) {
// node started parking (waiting for condition)
*self.status.lock() = NodeStatus::Waiting;
self.world.wait_group.done();
// and increase it once it will wake up
scopeguard::defer!(self.world.wait_group.add(1));
}
pub fn internal_parking_middle(&self, park: Arc<Park>) {
// this park entered the unconditional_parking state
*self.status.lock() = NodeStatus::Parked;
park.park();
self.world.unconditional_parking.lock().push(park);
}
pub fn internal_parking_end(&self) {
// node finished parking, increase the running threads counter
self.world.wait_group.add(1);
*self.status.lock() = NodeStatus::Running;
}
/// Get the current node, panics if called from outside of a node thread.
pub fn current() -> Arc<Node> {
CURRENT_NODE.with(|current_node| {
current_node.borrow().clone().unwrap()
})
}
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum NetworkEvent {
Accept,
Message(AnyMessage),