Add os.sleep and os.random

This commit is contained in:
Arthur Petukhovsky
2023-05-24 15:51:30 +03:00
parent 06f493f525
commit 5e0550a620
9 changed files with 160 additions and 39 deletions

View File

@@ -24,12 +24,14 @@ impl<T: Clone> Chan<T> {
}
/// Append a message to the end of the queue.
/// Can be called from any thread.
pub fn send(&self, t: T) {
self.shared.queue.lock().push_back(t);
self.shared.condvar.notify_one();
}
/// Get a message from the front of the queue, or block if the queue is empty.
/// Can be called only from the node thread.
pub fn recv(&self) -> T {
// interrupt the receiver to prevent consuming everything at once
Park::yield_thread();

View File

@@ -1,8 +1,11 @@
use std::sync::Arc;
use rand::Rng;
use super::{
chan::Chan,
tcp::Tcp,
time::SendMessageEvent,
world::{Node, NodeEvent, NodeId, World},
};
@@ -32,4 +35,19 @@ impl NodeOs {
pub fn epoll(&self) -> Chan<NodeEvent> {
self.internal.network_chan()
}
/// Sleep for a given number of milliseconds.
/// Currently matches the global virtual time, TODO may be good to
/// introduce a separate clocks for each node.
pub fn sleep(&self, ms: u64) {
let chan: Chan<()> = Chan::new();
self.world
.schedule(ms, SendMessageEvent::new(chan.clone(), ()));
chan.recv();
}
/// Generate a random number in range [0, max).
pub fn random(&self, max: u64) -> u64 {
self.internal.rng.lock().gen_range(0..max)
}
}

View File

@@ -1,11 +1,6 @@
use std::{
backtrace::Backtrace,
sync::Arc,
};
use std::{backtrace::Backtrace, sync::Arc};
use super::world::{Node, NodeId};
use super::world::{Node, NodeId, World};
pub type Mutex<T> = parking_lot::Mutex<T>;
@@ -52,18 +47,28 @@ impl Condvar {
}
}
/// Wakes up one blocked thread on this condvar, can be called only from the node thread.
/// Wakes up one blocked thread on this condvar. Usually can be called only from the node thread,
/// because we have a global running threads counter and we transfer it from the current thread
/// to the woken up thread. But we have a HACK here to allow calling it from the world thread.
pub fn notify_one(&self) {
// TODO: wake up random thread
let to_wake = self.waiters.lock().waiters.pop();
if let Some(waiter) = to_wake {
// block (park) the current thread, wake the other thread
waiter.wake();
if Node::is_node_thread() {
if let Some(waiter) = to_wake {
// block (park) the current thread, wake the other thread
waiter.wake();
} else {
// block (park) the current thread just in case
Park::yield_thread()
}
} else {
// block (park) the current thread just in case
Park::yield_thread()
// HACK: custom notify_one implementation for the world thread
if let Some(waiter) = to_wake {
// block (park) the current thread, wake the other thread
waiter.external_wake();
}
}
}
}
@@ -195,6 +200,28 @@ impl Park {
self_park.park_wait_the_world(node, &mut state);
}
/// Will wake up the thread that is currently conditionally parked. Can be called only
/// from the world threads. What it will do:
/// 1. Increase the running threads counter
/// 2. Wake up the waiting thread (it will park itself in the world)
pub fn external_wake(&self) {
let world = World::current();
world.internal_parking_wake();
let mut state = self.lock.lock();
if state.can_continue {
println!(
"WARN wake() called on a thread that is already waked, node {:?}",
state.node_id
);
return;
}
state.can_continue = true;
// and here we park the waiting thread
self.cvar.notify_all();
drop(state);
}
/// Will wake up the thread that is currently unconditionally parked.
pub fn internal_world_wake(&self) {
let mut state = self.lock.lock();

View File

@@ -1,8 +1,4 @@
use std::{
sync::{Arc},
};
use std::sync::Arc;
use super::{
proto::AnyMessage,

View File

@@ -1,6 +1,6 @@
use std::{cmp::Ordering, collections::BinaryHeap};
use std::{cmp::Ordering, collections::BinaryHeap, fmt::Debug};
use super::chan::Chan;
pub struct Timing {
/// Current world's time.
@@ -39,10 +39,14 @@ impl Timing {
}
/// TODO: write docs
pub fn schedule(&mut self, time: u64, event: Event) {
pub fn schedule_future(&mut self, ms: u64, event: Box<dyn Event + Send + Sync>) {
self.nonce += 1;
let nonce = self.nonce;
self.timers.push(Pending { time, nonce, event })
self.timers.push(Pending {
time: self.current_time + ms,
nonce,
event,
})
}
/// Return true if there is a ready event.
@@ -56,7 +60,7 @@ impl Timing {
pub struct Pending {
pub time: u64,
pub nonce: u32,
pub event: Event,
pub event: Box<dyn Event + Send + Sync>,
}
impl Pending {
@@ -87,11 +91,31 @@ impl PartialEq for Pending {
impl Eq for Pending {}
#[derive(Debug)]
pub enum Event {}
pub trait Event: Debug {
fn process(&self);
}
impl Event {
fn process(&self) {
// TODO:
pub struct SendMessageEvent<T: Debug + Clone> {
chan: Chan<T>,
msg: T,
}
impl<T: Debug + Clone> SendMessageEvent<T> {
pub fn new(chan: Chan<T>, msg: T) -> Box<SendMessageEvent<T>> {
Box::new(SendMessageEvent { chan, msg })
}
}
impl<T: Debug + Clone> Event for SendMessageEvent<T> {
fn process(&self) {
self.chan.send(self.msg.clone());
}
}
impl<T: Debug + Clone> Debug for SendMessageEvent<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SendMessageEvent")
.field("msg", &self.msg)
.finish()
}
}

View File

@@ -1,8 +1,5 @@
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{
cell::RefCell,
sync::{Arc},
};
use std::{cell::RefCell, ops::DerefMut, sync::Arc};
use super::{
chan::Chan,
@@ -10,7 +7,7 @@ use super::{
proto::AnyMessage,
sync::{Mutex, Park},
tcp::Tcp,
time::Timing,
time::{Event, Timing},
wait_group::WaitGroup,
};
@@ -45,16 +42,30 @@ impl World {
}
}
/// Create a new random number generator.
pub fn new_rng(&self) -> StdRng {
let mut rng = self.rng.lock();
StdRng::from_rng(rng.deref_mut()).unwrap()
}
/// Create a new node.
pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
// TODO: verify
let mut nodes = self.nodes.lock();
let id = nodes.len() as NodeId;
let node = Arc::new(Node::new(id, self.clone()));
let node = Arc::new(Node::new(id, self.clone(), self.new_rng()));
nodes.push(node.clone());
node
}
/// Register world for the current thread. This is required before calling
/// step().
pub fn register_world(self: &Arc<Self>) {
CURRENT_WORLD.with(|world| {
*world.borrow_mut() = Some(self.clone());
});
}
/// Get an internal node state by id.
pub fn get_node(&self, id: NodeId) -> Option<Arc<Node>> {
let nodes = self.nodes.lock();
@@ -122,6 +133,7 @@ impl World {
if let Some(event) = timing.step() {
println!("Processing event: {:?}", event.event);
event.process();
// to have a clean state after each step, wait for all threads to finish
self.await_all();
return true;
@@ -148,10 +160,33 @@ impl World {
park.debug_print();
}
}
/// Schedule an event to be processed after `ms` milliseconds of global time.
pub fn schedule(&self, ms: u64, e: Box<dyn Event + Send + Sync>) {
let mut timing = self.timing.lock();
timing.schedule_future(ms, e);
}
/// Get the current world, panics if called from outside of a world thread.
pub fn current() -> Arc<World> {
CURRENT_WORLD.with(|world| {
world
.borrow()
.as_ref()
.expect("World::current() called from outside of a world thread")
.clone()
})
}
pub fn internal_parking_wake(&self) {
// waking node with condition, increase the running threads counter
self.wait_group.add(1);
}
}
thread_local! {
pub static CURRENT_NODE: RefCell<Option<Arc<Node>>> = RefCell::new(None);
pub static CURRENT_WORLD: RefCell<Option<Arc<World>>> = RefCell::new(None);
}
/// Internal node state.
@@ -161,6 +196,7 @@ pub struct Node {
status: Mutex<NodeStatus>,
world: Arc<World>,
join_handle: Mutex<Option<std::thread::JoinHandle<()>>>,
pub rng: Mutex<StdRng>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -174,13 +210,14 @@ pub enum NodeStatus {
}
impl Node {
pub fn new(id: NodeId, world: Arc<World>) -> Node {
pub fn new(id: NodeId, world: Arc<World>, rng: StdRng) -> Node {
Node {
id,
network: Chan::new(),
status: Mutex::new(NodeStatus::NotStarted),
world: world.clone(),
join_handle: Mutex::new(None),
rng: Mutex::new(rng),
}
}
@@ -270,6 +307,10 @@ impl Node {
pub fn current() -> Arc<Node> {
CURRENT_NODE.with(|current_node| current_node.borrow().clone().unwrap())
}
pub fn is_node_thread() -> bool {
CURRENT_NODE.with(|current_node| current_node.borrow().is_some())
}
}
/// Network events and timers.

View File

@@ -1,11 +1,18 @@
use crate::simlib::{node_os::NodeOs, proto::{ReplCell, AnyMessage}, world::NodeId};
use crate::simlib::{
node_os::NodeOs,
proto::{AnyMessage, ReplCell},
world::NodeId,
};
/// Copy all data from array to the remote node.
pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
println!("started client");
os.sleep(os.random(10000));
let sock = os.open_tcp(dst);
for num in data {
os.sleep(os.random(10000));
println!("start send data from client");
sock.send(AnyMessage::ReplCell(num.clone()));
println!("finish send data from client");

View File

@@ -2,13 +2,18 @@ mod client;
mod disk;
mod server;
use std::{sync::Arc};
use std::sync::Arc;
use crate::{simlib::{world::World, proto::ReplCell}, simtest::{client::run_client, disk::SharedStorage, server::run_server}};
use crate::{
simlib::{proto::ReplCell, world::World},
simtest::{client::run_client, disk::SharedStorage, server::run_server},
};
#[test]
fn start_simulation() {
let world = Arc::new(World::new());
world.register_world();
let client_node = world.new_node();
let server_node = world.new_node();
let server_id = server_node.id;

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap;
use crate::simlib::{node_os::NodeOs, world::NodeEvent, proto::AnyMessage};
use crate::simlib::{node_os::NodeOs, proto::AnyMessage, world::NodeEvent};
use super::disk::Storage;
@@ -29,6 +29,7 @@ pub fn run_server(os: NodeOs, mut storage: Box<dyn Storage<u32>>) {
let epoll = os.epoll();
loop {
os.sleep(os.random(10000));
let event = epoll.recv();
println!("got event: {:?}", event);
match event {