From 5e0550a6206f7aaeabf5d45eb1500e45d4612da6 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 24 May 2023 15:51:30 +0300 Subject: [PATCH] Add os.sleep and os.random --- safekeeper/src/simlib/chan.rs | 2 ++ safekeeper/src/simlib/node_os.rs | 18 +++++++++++ safekeeper/src/simlib/sync.rs | 53 ++++++++++++++++++++++-------- safekeeper/src/simlib/tcp.rs | 6 +--- safekeeper/src/simlib/time.rs | 44 +++++++++++++++++++------ safekeeper/src/simlib/world.rs | 55 ++++++++++++++++++++++++++++---- safekeeper/src/simtest/client.rs | 9 +++++- safekeeper/src/simtest/mod.rs | 9 ++++-- safekeeper/src/simtest/server.rs | 3 +- 9 files changed, 160 insertions(+), 39 deletions(-) diff --git a/safekeeper/src/simlib/chan.rs b/safekeeper/src/simlib/chan.rs index 14a424306a..e27cb93d37 100644 --- a/safekeeper/src/simlib/chan.rs +++ b/safekeeper/src/simlib/chan.rs @@ -24,12 +24,14 @@ impl Chan { } /// Append a message to the end of the queue. + /// Can be called from any thread. pub fn send(&self, t: T) { self.shared.queue.lock().push_back(t); self.shared.condvar.notify_one(); } /// Get a message from the front of the queue, or block if the queue is empty. + /// Can be called only from the node thread. pub fn recv(&self) -> T { // interrupt the receiver to prevent consuming everything at once Park::yield_thread(); diff --git a/safekeeper/src/simlib/node_os.rs b/safekeeper/src/simlib/node_os.rs index 6341e4a769..c80a6a1438 100644 --- a/safekeeper/src/simlib/node_os.rs +++ b/safekeeper/src/simlib/node_os.rs @@ -1,8 +1,11 @@ use std::sync::Arc; +use rand::Rng; + use super::{ chan::Chan, tcp::Tcp, + time::SendMessageEvent, world::{Node, NodeEvent, NodeId, World}, }; @@ -32,4 +35,19 @@ impl NodeOs { pub fn epoll(&self) -> Chan { self.internal.network_chan() } + + /// Sleep for a given number of milliseconds. + /// Currently matches the global virtual time, TODO may be good to + /// introduce a separate clocks for each node. + pub fn sleep(&self, ms: u64) { + let chan: Chan<()> = Chan::new(); + self.world + .schedule(ms, SendMessageEvent::new(chan.clone(), ())); + chan.recv(); + } + + /// Generate a random number in range [0, max). + pub fn random(&self, max: u64) -> u64 { + self.internal.rng.lock().gen_range(0..max) + } } diff --git a/safekeeper/src/simlib/sync.rs b/safekeeper/src/simlib/sync.rs index a854f4096d..7e019fb00a 100644 --- a/safekeeper/src/simlib/sync.rs +++ b/safekeeper/src/simlib/sync.rs @@ -1,11 +1,6 @@ -use std::{ - backtrace::Backtrace, - sync::Arc, -}; +use std::{backtrace::Backtrace, sync::Arc}; - - -use super::world::{Node, NodeId}; +use super::world::{Node, NodeId, World}; pub type Mutex = parking_lot::Mutex; @@ -52,18 +47,28 @@ impl Condvar { } } - /// Wakes up one blocked thread on this condvar, can be called only from the node thread. + /// Wakes up one blocked thread on this condvar. Usually can be called only from the node thread, + /// because we have a global running threads counter and we transfer it from the current thread + /// to the woken up thread. But we have a HACK here to allow calling it from the world thread. pub fn notify_one(&self) { // TODO: wake up random thread let to_wake = self.waiters.lock().waiters.pop(); - if let Some(waiter) = to_wake { - // block (park) the current thread, wake the other thread - waiter.wake(); + if Node::is_node_thread() { + if let Some(waiter) = to_wake { + // block (park) the current thread, wake the other thread + waiter.wake(); + } else { + // block (park) the current thread just in case + Park::yield_thread() + } } else { - // block (park) the current thread just in case - Park::yield_thread() + // HACK: custom notify_one implementation for the world thread + if let Some(waiter) = to_wake { + // block (park) the current thread, wake the other thread + waiter.external_wake(); + } } } } @@ -195,6 +200,28 @@ impl Park { self_park.park_wait_the_world(node, &mut state); } + /// Will wake up the thread that is currently conditionally parked. Can be called only + /// from the world threads. What it will do: + /// 1. Increase the running threads counter + /// 2. Wake up the waiting thread (it will park itself in the world) + pub fn external_wake(&self) { + let world = World::current(); + world.internal_parking_wake(); + + let mut state = self.lock.lock(); + if state.can_continue { + println!( + "WARN wake() called on a thread that is already waked, node {:?}", + state.node_id + ); + return; + } + state.can_continue = true; + // and here we park the waiting thread + self.cvar.notify_all(); + drop(state); + } + /// Will wake up the thread that is currently unconditionally parked. pub fn internal_world_wake(&self) { let mut state = self.lock.lock(); diff --git a/safekeeper/src/simlib/tcp.rs b/safekeeper/src/simlib/tcp.rs index de6349447b..ac6fe07a3c 100644 --- a/safekeeper/src/simlib/tcp.rs +++ b/safekeeper/src/simlib/tcp.rs @@ -1,8 +1,4 @@ -use std::{ - sync::{Arc}, -}; - - +use std::sync::Arc; use super::{ proto::AnyMessage, diff --git a/safekeeper/src/simlib/time.rs b/safekeeper/src/simlib/time.rs index 3fa5b3ec6c..3842241092 100644 --- a/safekeeper/src/simlib/time.rs +++ b/safekeeper/src/simlib/time.rs @@ -1,6 +1,6 @@ -use std::{cmp::Ordering, collections::BinaryHeap}; - +use std::{cmp::Ordering, collections::BinaryHeap, fmt::Debug}; +use super::chan::Chan; pub struct Timing { /// Current world's time. @@ -39,10 +39,14 @@ impl Timing { } /// TODO: write docs - pub fn schedule(&mut self, time: u64, event: Event) { + pub fn schedule_future(&mut self, ms: u64, event: Box) { self.nonce += 1; let nonce = self.nonce; - self.timers.push(Pending { time, nonce, event }) + self.timers.push(Pending { + time: self.current_time + ms, + nonce, + event, + }) } /// Return true if there is a ready event. @@ -56,7 +60,7 @@ impl Timing { pub struct Pending { pub time: u64, pub nonce: u32, - pub event: Event, + pub event: Box, } impl Pending { @@ -87,11 +91,31 @@ impl PartialEq for Pending { impl Eq for Pending {} -#[derive(Debug)] -pub enum Event {} +pub trait Event: Debug { + fn process(&self); +} -impl Event { - fn process(&self) { - // TODO: +pub struct SendMessageEvent { + chan: Chan, + msg: T, +} + +impl SendMessageEvent { + pub fn new(chan: Chan, msg: T) -> Box> { + Box::new(SendMessageEvent { chan, msg }) + } +} + +impl Event for SendMessageEvent { + fn process(&self) { + self.chan.send(self.msg.clone()); + } +} + +impl Debug for SendMessageEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SendMessageEvent") + .field("msg", &self.msg) + .finish() } } diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index 88b952277f..7703282fc6 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -1,8 +1,5 @@ use rand::{rngs::StdRng, Rng, SeedableRng}; -use std::{ - cell::RefCell, - sync::{Arc}, -}; +use std::{cell::RefCell, ops::DerefMut, sync::Arc}; use super::{ chan::Chan, @@ -10,7 +7,7 @@ use super::{ proto::AnyMessage, sync::{Mutex, Park}, tcp::Tcp, - time::Timing, + time::{Event, Timing}, wait_group::WaitGroup, }; @@ -45,16 +42,30 @@ impl World { } } + /// Create a new random number generator. + pub fn new_rng(&self) -> StdRng { + let mut rng = self.rng.lock(); + StdRng::from_rng(rng.deref_mut()).unwrap() + } + /// Create a new node. pub fn new_node(self: &Arc) -> Arc { // TODO: verify let mut nodes = self.nodes.lock(); let id = nodes.len() as NodeId; - let node = Arc::new(Node::new(id, self.clone())); + let node = Arc::new(Node::new(id, self.clone(), self.new_rng())); nodes.push(node.clone()); node } + /// Register world for the current thread. This is required before calling + /// step(). + pub fn register_world(self: &Arc) { + CURRENT_WORLD.with(|world| { + *world.borrow_mut() = Some(self.clone()); + }); + } + /// Get an internal node state by id. pub fn get_node(&self, id: NodeId) -> Option> { let nodes = self.nodes.lock(); @@ -122,6 +133,7 @@ impl World { if let Some(event) = timing.step() { println!("Processing event: {:?}", event.event); event.process(); + // to have a clean state after each step, wait for all threads to finish self.await_all(); return true; @@ -148,10 +160,33 @@ impl World { park.debug_print(); } } + + /// Schedule an event to be processed after `ms` milliseconds of global time. + pub fn schedule(&self, ms: u64, e: Box) { + let mut timing = self.timing.lock(); + timing.schedule_future(ms, e); + } + + /// Get the current world, panics if called from outside of a world thread. + pub fn current() -> Arc { + CURRENT_WORLD.with(|world| { + world + .borrow() + .as_ref() + .expect("World::current() called from outside of a world thread") + .clone() + }) + } + + pub fn internal_parking_wake(&self) { + // waking node with condition, increase the running threads counter + self.wait_group.add(1); + } } thread_local! { pub static CURRENT_NODE: RefCell>> = RefCell::new(None); + pub static CURRENT_WORLD: RefCell>> = RefCell::new(None); } /// Internal node state. @@ -161,6 +196,7 @@ pub struct Node { status: Mutex, world: Arc, join_handle: Mutex>>, + pub rng: Mutex, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -174,13 +210,14 @@ pub enum NodeStatus { } impl Node { - pub fn new(id: NodeId, world: Arc) -> Node { + pub fn new(id: NodeId, world: Arc, rng: StdRng) -> Node { Node { id, network: Chan::new(), status: Mutex::new(NodeStatus::NotStarted), world: world.clone(), join_handle: Mutex::new(None), + rng: Mutex::new(rng), } } @@ -270,6 +307,10 @@ impl Node { pub fn current() -> Arc { CURRENT_NODE.with(|current_node| current_node.borrow().clone().unwrap()) } + + pub fn is_node_thread() -> bool { + CURRENT_NODE.with(|current_node| current_node.borrow().is_some()) + } } /// Network events and timers. diff --git a/safekeeper/src/simtest/client.rs b/safekeeper/src/simtest/client.rs index 4a647cc96d..84d0022e99 100644 --- a/safekeeper/src/simtest/client.rs +++ b/safekeeper/src/simtest/client.rs @@ -1,11 +1,18 @@ -use crate::simlib::{node_os::NodeOs, proto::{ReplCell, AnyMessage}, world::NodeId}; +use crate::simlib::{ + node_os::NodeOs, + proto::{AnyMessage, ReplCell}, + world::NodeId, +}; /// Copy all data from array to the remote node. pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) { println!("started client"); + os.sleep(os.random(10000)); + let sock = os.open_tcp(dst); for num in data { + os.sleep(os.random(10000)); println!("start send data from client"); sock.send(AnyMessage::ReplCell(num.clone())); println!("finish send data from client"); diff --git a/safekeeper/src/simtest/mod.rs b/safekeeper/src/simtest/mod.rs index e6ca7ec6b2..f73c0594f4 100644 --- a/safekeeper/src/simtest/mod.rs +++ b/safekeeper/src/simtest/mod.rs @@ -2,13 +2,18 @@ mod client; mod disk; mod server; -use std::{sync::Arc}; +use std::sync::Arc; -use crate::{simlib::{world::World, proto::ReplCell}, simtest::{client::run_client, disk::SharedStorage, server::run_server}}; +use crate::{ + simlib::{proto::ReplCell, world::World}, + simtest::{client::run_client, disk::SharedStorage, server::run_server}, +}; #[test] fn start_simulation() { let world = Arc::new(World::new()); + world.register_world(); + let client_node = world.new_node(); let server_node = world.new_node(); let server_id = server_node.id; diff --git a/safekeeper/src/simtest/server.rs b/safekeeper/src/simtest/server.rs index 4111a90a01..30fc75f58a 100644 --- a/safekeeper/src/simtest/server.rs +++ b/safekeeper/src/simtest/server.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::simlib::{node_os::NodeOs, world::NodeEvent, proto::AnyMessage}; +use crate::simlib::{node_os::NodeOs, proto::AnyMessage, world::NodeEvent}; use super::disk::Storage; @@ -29,6 +29,7 @@ pub fn run_server(os: NodeOs, mut storage: Box>) { let epoll = os.epoll(); loop { + os.sleep(os.random(10000)); let event = epoll.recv(); println!("got event: {:?}", event); match event {