mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
It looks deterministic now
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
||||
use super::sync::{Mutex, Condvar};
|
||||
use super::sync::{Mutex, Condvar, Park};
|
||||
|
||||
/// FIFO channel with blocking send and receive. Can be cloned and shared between threads.
|
||||
#[derive(Clone)]
|
||||
@@ -31,6 +31,9 @@ impl<T: Clone> Chan<T> {
|
||||
|
||||
/// Get a message from the front of the queue, or block if the queue is empty.
|
||||
pub fn recv(&self) -> T {
|
||||
// interrupt the receiver to prevent consuming everything at once
|
||||
Park::yield_thread();
|
||||
|
||||
let mut queue = self.shared.queue.lock();
|
||||
loop {
|
||||
if let Some(t) = queue.pop_front() {
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
use std::{sync::Arc, backtrace::Backtrace, io::{self, Write}};
|
||||
|
||||
use parking_lot::MutexGuard;
|
||||
|
||||
use super::world::{Node, NodeId};
|
||||
|
||||
pub type Mutex<T> = parking_lot::Mutex<T>;
|
||||
|
||||
/// More deterministic condvar.
|
||||
/// More deterministic condvar. Determenism comes from the fact that
|
||||
/// at all times there is at most one running thread.
|
||||
pub struct Condvar {
|
||||
waiters: Mutex<CondvarState>,
|
||||
}
|
||||
@@ -33,32 +36,30 @@ impl Condvar {
|
||||
});
|
||||
}
|
||||
|
||||
/// Wakes up all blocked threads on this condvar.
|
||||
/// Wakes up all blocked threads on this condvar, can be called only from the node thread.
|
||||
pub fn notify_all(&self) {
|
||||
// TODO: check that it's waked up in random order and yield to the scheduler
|
||||
|
||||
let mut state = self.waiters.lock();
|
||||
for waiter in state.waiters.drain(..) {
|
||||
let mut waiters = self.waiters.lock().waiters.drain(..).collect::<Vec<_>>();
|
||||
for waiter in waiters.drain(..) {
|
||||
// block (park) the current thread, wake the other thread
|
||||
waiter.wake();
|
||||
}
|
||||
drop(state);
|
||||
|
||||
// yield the current thread to the scheduler
|
||||
Park::yield_thread();
|
||||
}
|
||||
|
||||
/// Wakes up one blocked thread on this condvar.
|
||||
/// Wakes up one blocked thread on this condvar, can be called only from the node thread.
|
||||
pub fn notify_one(&self) {
|
||||
// TODO: wake up random thread
|
||||
|
||||
let mut state = self.waiters.lock();
|
||||
if let Some(waiter) = state.waiters.pop() {
|
||||
waiter.wake();
|
||||
}
|
||||
drop(state);
|
||||
let to_wake = self.waiters.lock().waiters.pop();
|
||||
|
||||
// yield the current thread to the scheduler
|
||||
Park::yield_thread();
|
||||
if let Some(waiter) = to_wake {
|
||||
// block (park) the current thread, wake the other thread
|
||||
waiter.wake();
|
||||
} else {
|
||||
// block (park) the current thread just in case
|
||||
Park::yield_thread()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,54 +96,97 @@ impl Park {
|
||||
)
|
||||
}
|
||||
|
||||
fn init_state(state: &mut ParkState, node: &Arc<Node>) {
|
||||
state.node_id = Some(node.id);
|
||||
state.backtrace = Some(Backtrace::capture());
|
||||
}
|
||||
|
||||
/// Should be called once by the waiting thread. Blocks the thread until wake() is called,
|
||||
/// and until the thread is woken up by the world simulation.
|
||||
pub fn park(self: &Arc<Self>) {
|
||||
let node = Node::current();
|
||||
|
||||
// start parking
|
||||
// start blocking
|
||||
let mut state = self.lock.lock();
|
||||
state.node_id = Some(node.id);
|
||||
state.backtrace = Some(Backtrace::capture());
|
||||
Self::init_state(&mut state, &node);
|
||||
|
||||
println!("PARKING STARTED: node {:?}", node.id);
|
||||
if state.can_continue {
|
||||
// unconditional parking
|
||||
println!("YIELD PARKING: node {:?}", node.id);
|
||||
|
||||
parking_lot::MutexGuard::unlocked(&mut state, || {
|
||||
node.internal_parking_start();
|
||||
});
|
||||
parking_lot::MutexGuard::unlocked(&mut state, || {
|
||||
// first put to world parking, then decrease the running threads counter
|
||||
node.internal_parking_middle(self.clone());
|
||||
});
|
||||
} else {
|
||||
println!("AWAIT PARKING: node {:?}", node.id);
|
||||
|
||||
scopeguard::defer! {
|
||||
node.internal_parking_end();
|
||||
};
|
||||
parking_lot::MutexGuard::unlocked(&mut state, || {
|
||||
// conditional parking, decrease the running threads counter without parking
|
||||
node.internal_parking_start();
|
||||
});
|
||||
|
||||
// wait for condition
|
||||
while !state.can_continue {
|
||||
self.cvar.wait(&mut state);
|
||||
// wait for condition
|
||||
while !state.can_continue {
|
||||
self.cvar.wait(&mut state);
|
||||
}
|
||||
|
||||
println!("CONDITION MET: node {:?}", node.id);
|
||||
// condition is met, we are now running instead of the waker thread.
|
||||
// the next thing is to park the thread in the world, then decrease
|
||||
// the running threads counter
|
||||
node.internal_parking_middle(self.clone());
|
||||
}
|
||||
|
||||
println!("PARKING MIDDLE: node {:?}", node.id);
|
||||
self.park_wait_the_world(node, &mut state);
|
||||
}
|
||||
|
||||
fn park_wait_the_world(&self, node: Arc<Node>, state: &mut parking_lot::MutexGuard<ParkState>) {
|
||||
// condition is met, wait for world simulation to wake us up
|
||||
node.internal_parking_middle(self.clone());
|
||||
|
||||
while !state.finished {
|
||||
self.cvar.wait(&mut state);
|
||||
self.cvar.wait(state);
|
||||
}
|
||||
|
||||
println!("PARKING ENDED: node {:?}", node.id);
|
||||
|
||||
// finish parking
|
||||
// We are the only running thread now, we just need to update the state,
|
||||
// and continue the execution.
|
||||
node.internal_parking_end();
|
||||
}
|
||||
|
||||
/// Will wake up the thread that is currently conditionally parked.
|
||||
/// Hacky way to register parking before the thread is actually blocked.
|
||||
fn park_ahead_now() -> Arc<Park> {
|
||||
let park = Park::new(true);
|
||||
let node = Node::current();
|
||||
Self::init_state(&mut park.lock.lock(), &node);
|
||||
println!("PARKING MIDDLE alt: node {:?}", node.id);
|
||||
node.internal_parking_ahead(park.clone());
|
||||
park
|
||||
}
|
||||
|
||||
/// Will wake up the thread that is currently conditionally parked. Can be called only
|
||||
/// from the node thread, because it will block the caller thread. What it will do:
|
||||
/// 1. Park the thread that called wake() in the world
|
||||
/// 2. Wake up the waiting thread (it will also park in the world)
|
||||
/// 3. Block the thread that called wake()
|
||||
pub fn wake(&self) {
|
||||
// parking the thread that called wake()
|
||||
let self_park = Park::park_ahead_now();
|
||||
|
||||
let mut state = self.lock.lock();
|
||||
if state.can_continue {
|
||||
println!("WARN wake() called on a thread that is already waked, node {:?}", state.node_id);
|
||||
return;
|
||||
}
|
||||
state.can_continue = true;
|
||||
// and here we park the waiting thread
|
||||
self.cvar.notify_all();
|
||||
drop(state);
|
||||
|
||||
// and here we block the thread that called wake() by defer
|
||||
let node = Node::current();
|
||||
let mut state = self_park.lock.lock();
|
||||
self_park.park_wait_the_world(node, &mut state);
|
||||
}
|
||||
|
||||
/// Will wake up the thread that is currently unconditionally parked.
|
||||
@@ -158,8 +202,9 @@ impl Park {
|
||||
|
||||
/// Print debug info about the parked thread.
|
||||
pub fn debug_print(&self) {
|
||||
let mut state = self.lock.lock();
|
||||
println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace);
|
||||
let state = self.lock.lock();
|
||||
println!("PARK: node {:?} wake1={} wake2={}", state.node_id, state.can_continue, state.finished);
|
||||
// println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace);
|
||||
}
|
||||
|
||||
/// It feels that this function can cause deadlocks.
|
||||
|
||||
@@ -79,7 +79,10 @@ impl World {
|
||||
|
||||
println!("Waking up park at node {:?}", park.node_id());
|
||||
|
||||
// wake up the chosen thread
|
||||
// Wake up the chosen thread. To do that:
|
||||
// 1. Increment the counter of running threads.
|
||||
// 2. Send a singal to continue the thread.
|
||||
self.wait_group.add(1);
|
||||
park.internal_world_wake();
|
||||
|
||||
// to have a clean state after each step, wait for all threads to finish
|
||||
@@ -93,10 +96,9 @@ impl World {
|
||||
for node in self.nodes.lock().iter() {
|
||||
println!("[DEBUG] node id={:?} status={:?}", node.id, node.status.lock());
|
||||
}
|
||||
// for park in self.unconditional_parking.lock().iter() {
|
||||
// println!("[DEBUG] parked thread, stacktrace:");
|
||||
// park.debug_print();
|
||||
// }
|
||||
for park in self.unconditional_parking.lock().iter() {
|
||||
park.debug_print();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +159,7 @@ impl Node {
|
||||
*status = NodeStatus::Running;
|
||||
drop(status);
|
||||
|
||||
// block on the world simulation
|
||||
// park the current thread, [`launch`] will wait until it's parked
|
||||
Park::yield_thread();
|
||||
// TODO: recover from panic (update state, log the error)
|
||||
f(NodeOs::new(world, node.clone()));
|
||||
@@ -167,6 +169,10 @@ impl Node {
|
||||
// TODO: log the thread is finished
|
||||
});
|
||||
*self.join_handle.lock() = Some(join_handle);
|
||||
|
||||
// we need to wait for the thread to park, to assure that threads
|
||||
// are parked in deterministic order
|
||||
self.world.wait_group.wait();
|
||||
}
|
||||
|
||||
/// Returns a channel to receive events from the network.
|
||||
@@ -175,20 +181,40 @@ impl Node {
|
||||
}
|
||||
|
||||
pub fn internal_parking_start(&self) {
|
||||
// node started parking (waiting for condition)
|
||||
// Node started parking (waiting for condition), and the current thread
|
||||
// is the only one running, so we need to do:
|
||||
// 1. Change the node status to Waiting
|
||||
// 2. Decrease the running threads counter
|
||||
// 3. Block the current thread until it's woken up (outside this function)
|
||||
*self.status.lock() = NodeStatus::Waiting;
|
||||
self.world.wait_group.done();
|
||||
}
|
||||
|
||||
pub fn internal_parking_middle(&self, park: Arc<Park>) {
|
||||
// this park entered the unconditional_parking state
|
||||
// [`park`] entered the unconditional_parking state, and the current thread
|
||||
// is the only one running, so we need to do:
|
||||
// 1. Change the node status to Parked
|
||||
// 2. Park in the world list
|
||||
// 3. Decrease the running threads counter
|
||||
// 4. Block the current thread until it's woken up (outside this function)
|
||||
*self.status.lock() = NodeStatus::Parked;
|
||||
self.world.unconditional_parking.lock().push(park);
|
||||
self.world.wait_group.done();
|
||||
}
|
||||
|
||||
pub fn internal_parking_ahead(&self, park: Arc<Park>) {
|
||||
// [`park`] entered the unconditional_parking state, and the current thread
|
||||
// wants to transfer control to another thread, so we need to do:
|
||||
// 1. Change the node status to Parked
|
||||
// 2. Park in the world list
|
||||
// 3. Notify the other thread to continue
|
||||
// 4. Block the current thread until it's woken up (outside this function)
|
||||
*self.status.lock() = NodeStatus::Parked;
|
||||
self.world.unconditional_parking.lock().push(park);
|
||||
}
|
||||
|
||||
pub fn internal_parking_end(&self) {
|
||||
// node finished parking, increase the running threads counter
|
||||
self.world.wait_group.add(1);
|
||||
// node finished parking, now it's running again
|
||||
*self.status.lock() = NodeStatus::Running;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user