diff --git a/safekeeper/src/sim/client.rs b/safekeeper/src/sim/client.rs index bc613f7904..bf67df94cc 100644 --- a/safekeeper/src/sim/client.rs +++ b/safekeeper/src/sim/client.rs @@ -2,8 +2,12 @@ use super::{node_os::NodeOs, world::NodeId, proto::AnyMessage}; /// Copy all data from array to the remote node. pub fn run_client(os: NodeOs, data: &[u32], dst: NodeId) { + println!("started client"); + let sock = os.open_tcp(dst); for num in data { sock.send(AnyMessage::Just32(num.clone())); } + + println!("sent all data and finished client"); } diff --git a/safekeeper/src/sim/disklog.rs b/safekeeper/src/sim/disklog.rs index 8d12851dcc..e718431811 100644 --- a/safekeeper/src/sim/disklog.rs +++ b/safekeeper/src/sim/disklog.rs @@ -23,9 +23,12 @@ impl DiskLog { } pub fn run_server(os: NodeOs, mut storage: Box>) { + println!("started server"); + let epoll = os.network_epoll(); loop { let event = epoll.recv(); + println!("got event: {:?}", event); match event { NetworkEvent::Message(msg) => { match msg { diff --git a/safekeeper/src/sim/proto.rs b/safekeeper/src/sim/proto.rs index 237cb582cc..4637b5f725 100644 --- a/safekeeper/src/sim/proto.rs +++ b/safekeeper/src/sim/proto.rs @@ -1,6 +1,6 @@ /// All possible flavours of messages. /// Grouped by the receiver node. -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum AnyMessage { Just32(u32), } diff --git a/safekeeper/src/sim/start_test.rs b/safekeeper/src/sim/start_test.rs index f5a7f87080..dfa8a0d7c3 100644 --- a/safekeeper/src/sim/start_test.rs +++ b/safekeeper/src/sim/start_test.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, thread::sleep_ms, time::Duration}; use super::{world::World, client::run_client, disklog::run_server, disk::SharedStorage}; @@ -22,11 +22,13 @@ fn start_simulation() { run_server(os, Box::new(server_storage)) }); + world.debug_print_state(); world.await_all(); world.debug_print_state(); while world.step() { println!("made a step!"); + std::thread::sleep(Duration::from_millis(200)); world.debug_print_state(); } } diff --git a/safekeeper/src/sim/sync.rs b/safekeeper/src/sim/sync.rs index 6a25492869..680835c7dc 100644 --- a/safekeeper/src/sim/sync.rs +++ b/safekeeper/src/sim/sync.rs @@ -1,5 +1,7 @@ use std::{sync::Arc, backtrace::Backtrace, io::{self, Write}}; +use super::world::{Node, NodeId}; + pub type Mutex = parking_lot::Mutex; /// More deterministic condvar. @@ -20,7 +22,7 @@ impl Condvar { /// Blocks the current thread until this condition variable receives a notification. pub fn wait<'a, T>(&self, guard: &mut parking_lot::MutexGuard<'a, T>) { - let park = Park::new(); + let park = Park::new(false); // add the waiter to the list self.waiters.lock().waiters.push(park.clone()); @@ -33,22 +35,29 @@ impl Condvar { /// Wakes up all blocked threads on this condvar. pub fn notify_all(&self) { - // TODO: wake up in random order, yield to the scheduler + // TODO: check that it's waked up in random order and yield to the scheduler let mut state = self.waiters.lock(); for waiter in state.waiters.drain(..) { waiter.wake(); } + drop(state); + + // yield the current thread to the scheduler + Park::yield_thread(); } /// Wakes up one blocked thread on this condvar. pub fn notify_one(&self) { - // TODO: wake up random thread, yield to the scheduler + // TODO: wake up random thread let mut state = self.waiters.lock(); if let Some(waiter) = state.waiters.pop() { waiter.wake(); } + + // yield the current thread to the scheduler + Park::yield_thread(); } } @@ -59,61 +68,108 @@ pub struct Park { } struct ParkState { + /// False means that thread cannot continue without external signal, + /// i.e. waiting for some event to happen. + can_continue: bool, + /// False means that thread is unconditionally parked and waiting for + /// world simulation to wake it up. True means that the parking is + /// finished and the thread can continue. finished: bool, - dbg_signal: u8, + node_id: Option, + backtrace: Option, } impl Park { - pub fn new() -> Arc { + pub fn new(can_continue: bool) -> Arc { Arc::new( Park { lock: Mutex::new(ParkState { + can_continue, finished: false, - dbg_signal: 0, + node_id: None, + backtrace: None, }), cvar: parking_lot::Condvar::new(), } ) } - /// Should be called once by the waiting thread. Blocks the thread until wake() is called. - pub fn park(&self) { - // TODO: update state of the current thread in the world struct + /// Should be called once by the waiting thread. Blocks the thread until wake() is called, + /// and until the thread is woken up by the world simulation. + pub fn park(self: &Arc) { + let node = Node::current(); // start parking let mut state = self.lock.lock(); + state.node_id = Some(node.id); + state.backtrace = Some(Backtrace::capture()); + + println!("PARKING STARTED: node {:?}", node.id); + + parking_lot::MutexGuard::unlocked(&mut state, || { + node.internal_parking_start(); + }); + + scopeguard::defer! { + node.internal_parking_end(); + }; + + // wait for condition + while !state.can_continue { + self.cvar.wait(&mut state); + } + + println!("PARKING MIDDLE: node {:?}", node.id); + + // condition is met, wait for world simulation to wake us up + node.internal_parking_middle(self.clone()); + while !state.finished { self.cvar.wait(&mut state); - - // check if debug info was requested - if state.dbg_signal != 0 { - let bt = Backtrace::capture(); - println!("DEBUG: thread {:?} is parked at {:?}", std::thread::current().id(), bt); - // TODO: fix bad ordering of output - io::stdout().flush().unwrap(); - state.dbg_signal = 0; - // trigger a notification to wake up the caller thread - self.cvar.notify_all(); - } } + + println!("PARKING ENDED: node {:?}", node.id); + // finish parking } - /// Will wake up the thread that is currently parked. + /// Will wake up the thread that is currently conditionally parked. pub fn wake(&self) { let mut state = self.lock.lock(); + if state.can_continue { + println!("WARN wake() called on a thread that is already waked, node {:?}", state.node_id); + return; + } + state.can_continue = true; + self.cvar.notify_all(); + } + + /// Will wake up the thread that is currently unconditionally parked. + pub fn internal_world_wake(&self) { + let mut state = self.lock.lock(); + if state.finished { + println!("WARN internal_world_wake() called on a thread that is already waked, node {:?}", state.node_id); + return; + } state.finished = true; self.cvar.notify_all(); } - /// Send a signal to the thread that is currently parked to print debug info. + /// Print debug info about the parked thread. pub fn debug_print(&self) { let mut state = self.lock.lock(); - state.dbg_signal = 1; - self.cvar.notify_all(); + println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace); + } - while !state.dbg_signal == 0 && !state.finished { - self.cvar.wait(&mut state); - } + /// It feels that this function can cause deadlocks. + pub fn node_id(&self) -> Option { + let state = self.lock.lock(); + state.node_id + } + + /// Yield the current thread to the world simulation. + pub fn yield_thread() { + let park = Park::new(true); + park.park(); } } diff --git a/safekeeper/src/sim/world.rs b/safekeeper/src/sim/world.rs index c62032cd3f..78bb1a9ac0 100644 --- a/safekeeper/src/sim/world.rs +++ b/safekeeper/src/sim/world.rs @@ -1,4 +1,4 @@ -use std::sync::{atomic::AtomicI32, Arc}; +use std::{sync::{atomic::AtomicI32, Arc}, cell::RefCell}; use rand::{rngs::StdRng, SeedableRng, Rng}; use super::{tcp::Tcp, sync::{Mutex, Park}, chan::Chan, proto::AnyMessage, node_os::NodeOs, wait_group::WaitGroup}; @@ -77,8 +77,10 @@ impl World { let park = parking.swap_remove(chosen_one); drop(parking); + println!("Waking up park at node {:?}", park.node_id()); + // wake up the chosen thread - park.wake(); + park.internal_world_wake(); // to have a clean state after each step, wait for all threads to finish self.await_all(); @@ -98,6 +100,10 @@ impl World { } } +thread_local! { + pub static CURRENT_NODE: RefCell>> = RefCell::new(None); +} + /// Internal node state. pub struct Node { pub id: NodeId, @@ -111,6 +117,7 @@ pub struct Node { pub enum NodeStatus { NotStarted, Running, + Waiting, Parked, Finished, Failed, @@ -133,6 +140,10 @@ impl Node { let world = self.world.clone(); world.wait_group.add(1); let join_handle = std::thread::spawn(move || { + CURRENT_NODE.with(|current_node| { + *current_node.borrow_mut() = Some(node.clone()); + }); + let wg = world.wait_group.clone(); scopeguard::defer! { wg.done(); @@ -146,7 +157,8 @@ impl Node { *status = NodeStatus::Running; drop(status); - node.park_me(); + // block on the world simulation + Park::yield_thread(); // TODO: recover from panic (update state, log the error) f(NodeOs::new(world, node.clone())); @@ -162,26 +174,33 @@ impl Node { self.network.clone() } - /// Park the node current thread until world simulation will decide to continue. - pub fn park_me(&self) { - // TODO: try to rewrite this function - let park = Park::new(); - let mut parking = self.world.unconditional_parking.lock(); - parking.push(park.clone()); - drop(parking); - - // decrease the running threads counter, because current thread is parked + pub fn internal_parking_start(&self) { + // node started parking (waiting for condition) + *self.status.lock() = NodeStatus::Waiting; self.world.wait_group.done(); - // and increase it once it will wake up - scopeguard::defer!(self.world.wait_group.add(1)); + } + pub fn internal_parking_middle(&self, park: Arc) { + // this park entered the unconditional_parking state *self.status.lock() = NodeStatus::Parked; - park.park(); + self.world.unconditional_parking.lock().push(park); + } + + pub fn internal_parking_end(&self) { + // node finished parking, increase the running threads counter + self.world.wait_group.add(1); *self.status.lock() = NodeStatus::Running; } + + /// Get the current node, panics if called from outside of a node thread. + pub fn current() -> Arc { + CURRENT_NODE.with(|current_node| { + current_node.borrow().clone().unwrap() + }) + } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum NetworkEvent { Accept, Message(AnyMessage),