diff --git a/safekeeper/src/sim/chan.rs b/safekeeper/src/sim/chan.rs index 47304dba5c..da4828e478 100644 --- a/safekeeper/src/sim/chan.rs +++ b/safekeeper/src/sim/chan.rs @@ -1,6 +1,6 @@ use std::{collections::VecDeque, sync::Arc}; -use super::sync::{Mutex, Condvar}; +use super::sync::{Mutex, Condvar, Park}; /// FIFO channel with blocking send and receive. Can be cloned and shared between threads. #[derive(Clone)] @@ -31,6 +31,9 @@ impl Chan { /// Get a message from the front of the queue, or block if the queue is empty. pub fn recv(&self) -> T { + // interrupt the receiver to prevent consuming everything at once + Park::yield_thread(); + let mut queue = self.shared.queue.lock(); loop { if let Some(t) = queue.pop_front() { diff --git a/safekeeper/src/sim/sync.rs b/safekeeper/src/sim/sync.rs index d49e0d6957..ec0733a961 100644 --- a/safekeeper/src/sim/sync.rs +++ b/safekeeper/src/sim/sync.rs @@ -1,10 +1,13 @@ use std::{sync::Arc, backtrace::Backtrace, io::{self, Write}}; +use parking_lot::MutexGuard; + use super::world::{Node, NodeId}; pub type Mutex = parking_lot::Mutex; -/// More deterministic condvar. +/// More deterministic condvar. Determenism comes from the fact that +/// at all times there is at most one running thread. pub struct Condvar { waiters: Mutex, } @@ -33,32 +36,30 @@ impl Condvar { }); } - /// Wakes up all blocked threads on this condvar. + /// Wakes up all blocked threads on this condvar, can be called only from the node thread. pub fn notify_all(&self) { // TODO: check that it's waked up in random order and yield to the scheduler - let mut state = self.waiters.lock(); - for waiter in state.waiters.drain(..) { + let mut waiters = self.waiters.lock().waiters.drain(..).collect::>(); + for waiter in waiters.drain(..) { + // block (park) the current thread, wake the other thread waiter.wake(); } - drop(state); - - // yield the current thread to the scheduler - Park::yield_thread(); } - /// Wakes up one blocked thread on this condvar. + /// Wakes up one blocked thread on this condvar, can be called only from the node thread. pub fn notify_one(&self) { // TODO: wake up random thread - let mut state = self.waiters.lock(); - if let Some(waiter) = state.waiters.pop() { - waiter.wake(); - } - drop(state); + let to_wake = self.waiters.lock().waiters.pop(); - // yield the current thread to the scheduler - Park::yield_thread(); + if let Some(waiter) = to_wake { + // block (park) the current thread, wake the other thread + waiter.wake(); + } else { + // block (park) the current thread just in case + Park::yield_thread() + } } } @@ -95,54 +96,97 @@ impl Park { ) } + fn init_state(state: &mut ParkState, node: &Arc) { + state.node_id = Some(node.id); + state.backtrace = Some(Backtrace::capture()); + } + /// Should be called once by the waiting thread. Blocks the thread until wake() is called, /// and until the thread is woken up by the world simulation. pub fn park(self: &Arc) { let node = Node::current(); - // start parking + // start blocking let mut state = self.lock.lock(); - state.node_id = Some(node.id); - state.backtrace = Some(Backtrace::capture()); + Self::init_state(&mut state, &node); - println!("PARKING STARTED: node {:?}", node.id); + if state.can_continue { + // unconditional parking + println!("YIELD PARKING: node {:?}", node.id); - parking_lot::MutexGuard::unlocked(&mut state, || { - node.internal_parking_start(); - }); + parking_lot::MutexGuard::unlocked(&mut state, || { + // first put to world parking, then decrease the running threads counter + node.internal_parking_middle(self.clone()); + }); + } else { + println!("AWAIT PARKING: node {:?}", node.id); - scopeguard::defer! { - node.internal_parking_end(); - }; + parking_lot::MutexGuard::unlocked(&mut state, || { + // conditional parking, decrease the running threads counter without parking + node.internal_parking_start(); + }); - // wait for condition - while !state.can_continue { - self.cvar.wait(&mut state); + // wait for condition + while !state.can_continue { + self.cvar.wait(&mut state); + } + + println!("CONDITION MET: node {:?}", node.id); + // condition is met, we are now running instead of the waker thread. + // the next thing is to park the thread in the world, then decrease + // the running threads counter + node.internal_parking_middle(self.clone()); } - println!("PARKING MIDDLE: node {:?}", node.id); + self.park_wait_the_world(node, &mut state); + } + fn park_wait_the_world(&self, node: Arc, state: &mut parking_lot::MutexGuard) { // condition is met, wait for world simulation to wake us up - node.internal_parking_middle(self.clone()); - while !state.finished { - self.cvar.wait(&mut state); + self.cvar.wait(state); } println!("PARKING ENDED: node {:?}", node.id); - // finish parking + // We are the only running thread now, we just need to update the state, + // and continue the execution. + node.internal_parking_end(); } - /// Will wake up the thread that is currently conditionally parked. + /// Hacky way to register parking before the thread is actually blocked. + fn park_ahead_now() -> Arc { + let park = Park::new(true); + let node = Node::current(); + Self::init_state(&mut park.lock.lock(), &node); + println!("PARKING MIDDLE alt: node {:?}", node.id); + node.internal_parking_ahead(park.clone()); + park + } + + /// Will wake up the thread that is currently conditionally parked. Can be called only + /// from the node thread, because it will block the caller thread. What it will do: + /// 1. Park the thread that called wake() in the world + /// 2. Wake up the waiting thread (it will also park in the world) + /// 3. Block the thread that called wake() pub fn wake(&self) { + // parking the thread that called wake() + let self_park = Park::park_ahead_now(); + let mut state = self.lock.lock(); if state.can_continue { println!("WARN wake() called on a thread that is already waked, node {:?}", state.node_id); return; } state.can_continue = true; + // and here we park the waiting thread self.cvar.notify_all(); + drop(state); + + // and here we block the thread that called wake() by defer + let node = Node::current(); + let mut state = self_park.lock.lock(); + self_park.park_wait_the_world(node, &mut state); } /// Will wake up the thread that is currently unconditionally parked. @@ -158,8 +202,9 @@ impl Park { /// Print debug info about the parked thread. pub fn debug_print(&self) { - let mut state = self.lock.lock(); - println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace); + let state = self.lock.lock(); + println!("PARK: node {:?} wake1={} wake2={}", state.node_id, state.can_continue, state.finished); + // println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace); } /// It feels that this function can cause deadlocks. diff --git a/safekeeper/src/sim/world.rs b/safekeeper/src/sim/world.rs index 78bb1a9ac0..6e2a596d0f 100644 --- a/safekeeper/src/sim/world.rs +++ b/safekeeper/src/sim/world.rs @@ -79,7 +79,10 @@ impl World { println!("Waking up park at node {:?}", park.node_id()); - // wake up the chosen thread + // Wake up the chosen thread. To do that: + // 1. Increment the counter of running threads. + // 2. Send a singal to continue the thread. + self.wait_group.add(1); park.internal_world_wake(); // to have a clean state after each step, wait for all threads to finish @@ -93,10 +96,9 @@ impl World { for node in self.nodes.lock().iter() { println!("[DEBUG] node id={:?} status={:?}", node.id, node.status.lock()); } - // for park in self.unconditional_parking.lock().iter() { - // println!("[DEBUG] parked thread, stacktrace:"); - // park.debug_print(); - // } + for park in self.unconditional_parking.lock().iter() { + park.debug_print(); + } } } @@ -157,7 +159,7 @@ impl Node { *status = NodeStatus::Running; drop(status); - // block on the world simulation + // park the current thread, [`launch`] will wait until it's parked Park::yield_thread(); // TODO: recover from panic (update state, log the error) f(NodeOs::new(world, node.clone())); @@ -167,6 +169,10 @@ impl Node { // TODO: log the thread is finished }); *self.join_handle.lock() = Some(join_handle); + + // we need to wait for the thread to park, to assure that threads + // are parked in deterministic order + self.world.wait_group.wait(); } /// Returns a channel to receive events from the network. @@ -175,20 +181,40 @@ impl Node { } pub fn internal_parking_start(&self) { - // node started parking (waiting for condition) + // Node started parking (waiting for condition), and the current thread + // is the only one running, so we need to do: + // 1. Change the node status to Waiting + // 2. Decrease the running threads counter + // 3. Block the current thread until it's woken up (outside this function) *self.status.lock() = NodeStatus::Waiting; self.world.wait_group.done(); } pub fn internal_parking_middle(&self, park: Arc) { - // this park entered the unconditional_parking state + // [`park`] entered the unconditional_parking state, and the current thread + // is the only one running, so we need to do: + // 1. Change the node status to Parked + // 2. Park in the world list + // 3. Decrease the running threads counter + // 4. Block the current thread until it's woken up (outside this function) + *self.status.lock() = NodeStatus::Parked; + self.world.unconditional_parking.lock().push(park); + self.world.wait_group.done(); + } + + pub fn internal_parking_ahead(&self, park: Arc) { + // [`park`] entered the unconditional_parking state, and the current thread + // wants to transfer control to another thread, so we need to do: + // 1. Change the node status to Parked + // 2. Park in the world list + // 3. Notify the other thread to continue + // 4. Block the current thread until it's woken up (outside this function) *self.status.lock() = NodeStatus::Parked; self.world.unconditional_parking.lock().push(park); } pub fn internal_parking_end(&self) { - // node finished parking, increase the running threads counter - self.world.wait_group.add(1); + // node finished parking, now it's running again *self.status.lock() = NodeStatus::Running; }