Add basic support for network delays

This commit is contained in:
Arthur Petukhovsky
2023-05-24 20:28:53 +03:00
parent 5e0550a620
commit 87c9edac7c
9 changed files with 182 additions and 51 deletions

View File

@@ -2,7 +2,7 @@ pub mod chan;
pub mod node_os;
pub mod proto;
pub mod sync;
pub mod tcp;
pub mod network;
pub mod time;
pub mod wait_group;
pub mod world;

View File

@@ -0,0 +1,127 @@
use std::{sync::Arc, collections::VecDeque, fmt::{Debug, self}, ops::DerefMut};
use super::{
proto::AnyMessage,
world::{Node, NodeEvent, World}, time::NetworkEvent, sync::Mutex,
};
// 0 - from node(0) to node(1)
// 1 - from node(1) to node(0)
type MessageDirection = u8;
/// Virtual connection between two nodes.
/// Node 0 is the creator of the connection.
pub struct VirtualConnection {
/// Connection id, used for logging and debugging.
pub connection_id: u64,
pub world: Arc<World>,
pub nodes: [Arc<Node>; 2],
state: Mutex<ConnectionState>,
}
struct ConnectionState {
buffers: [NetworkBuffer; 2],
}
impl VirtualConnection {
pub fn new(id: u64, world: Arc<World>, src: Arc<Node>, dst: Arc<Node>) -> Arc<Self> {
let conn = Arc::new(Self {
connection_id: id,
world,
nodes: [src.clone(), dst.clone()],
state: Mutex::new(ConnectionState {
buffers: [NetworkBuffer::new(), NetworkBuffer::new()],
}),
});
// TODO: add connection to the dst node
// conn.nodes[1].network_chan().send(NodeEvent::Connection(conn.clone()));
conn
}
/// Transmit some of the messages from the buffer to the nodes.
pub fn process(self: &Arc<Self>) {
let now = self.world.now();
let mut state = self.state.lock();
for direction in 0..2 {
self.process_direction(state.deref_mut(), now, direction as MessageDirection, &self.nodes[direction], &self.nodes[direction^1]);
}
}
/// Process messages in the buffer in the given direction.
fn process_direction(self: &Arc<Self>, state: &mut ConnectionState, now: u64, direction: MessageDirection, from_node: &Arc<Node>, to_node: &Arc<Node>) {
let buffer = &mut state.buffers[direction as usize];
while !buffer.buf.is_empty() && buffer.buf.front().unwrap().0 <= now {
let msg = buffer.buf.pop_front().unwrap().1;
let callback = TCP::new(self.clone(), direction^1);
println!("NET(time={}): {:?} delivered, {}=>{}", now, msg, from_node.id, to_node.id);
to_node.network_chan().send(NodeEvent::Message((msg, callback)));
}
}
/// Send a message to the buffer.
pub fn send(self: &Arc<Self>, direction: MessageDirection, msg: AnyMessage) {
let now = self.world.now();
let mut state = self.state.lock();
let buffer = &mut state.buffers[direction as usize];
// Send a message 1ms into the future.
buffer.buf.push_back((now+1, msg));
self.world.schedule(1, self.as_event());
// TODO: more involved logic, random delays, connection breaks, etc.
}
/// Get an event suitable for scheduling.
fn as_event(self: &Arc<Self>) -> Box<NetworkEvent> {
Box::new(NetworkEvent(self.clone()))
}
}
struct NetworkBuffer {
// Messages paired with time of delivery
buf: VecDeque<(u64, AnyMessage)>,
}
impl NetworkBuffer {
fn new() -> Self {
Self {
buf: VecDeque::new(),
}
}
}
/// Simplistic simulation of a bidirectional network stream without reordering (TCP).
/// There are almost no errors, writes are always successful (but may end up in void).
/// Reads are implemented as a messages in a shared queue, refer to [`NodeOs::network_epoll`]
/// for details.
///
/// TCP struct is just a one side of a connection. To create a connection, use [`NodeOs::open_tcp`].
#[derive(Clone)]
pub struct TCP {
conn: Arc<VirtualConnection>,
dir: MessageDirection,
}
impl Debug for TCP {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TCP #{} {{{}=>{}}}", self.conn.connection_id, self.conn.nodes[self.dir as usize].id, self.conn.nodes[1 - self.dir as usize].id)
}
}
impl TCP {
pub fn new(conn: Arc<VirtualConnection>, dir: MessageDirection) -> TCP {
TCP { conn, dir }
}
/// Send a message to the other side. It's guaranteed that it will not arrive
/// before the arrival of all messages sent earlier.
pub fn send(&self, msg: AnyMessage) {
self.conn.send(self.dir, msg);
}
}

View File

@@ -4,7 +4,7 @@ use rand::Rng;
use super::{
chan::Chan,
tcp::Tcp,
network::TCP,
time::SendMessageEvent,
world::{Node, NodeEvent, NodeId, World},
};
@@ -27,7 +27,7 @@ impl NodeOs {
/// Returns a writable pipe. All incoming messages should be polled
/// with [`network_epoll`]. Always successful.
pub fn open_tcp(&self, dst: NodeId) -> Tcp {
pub fn open_tcp(&self, dst: NodeId) -> TCP {
self.world.open_tcp(&self.internal, dst)
}

View File

@@ -1,28 +0,0 @@
use std::sync::Arc;
use super::{
proto::AnyMessage,
world::{Node, NodeEvent},
};
/// Simplistic simulation of a bidirectional network stream without reordering (TCP).
/// There are almost no errors, writes are always successful (but may end up in void).
/// Reads are implemented as a messages in a shared queue, refer to [`NodeOs::network_epoll`]
/// for details.
pub struct Tcp {
// TODO: replace with internal TCP buffer, to add random delays and close support
dst: Arc<Node>,
}
impl Tcp {
pub fn new(dst: Arc<Node>) -> Tcp {
Tcp { dst }
}
/// Send a message to the other side. It's guaranteed that it will not arrive
/// before the arrival of all messages sent earlier.
pub fn send(&self, msg: AnyMessage) {
// TODO: send to the internal TCP buffer
self.dst.network_chan().send(NodeEvent::Message(msg));
}
}

View File

@@ -1,6 +1,6 @@
use std::{cmp::Ordering, collections::BinaryHeap, fmt::Debug};
use std::{cmp::Ordering, collections::BinaryHeap, fmt::Debug, sync::Arc};
use super::chan::Chan;
use super::{chan::Chan, network::VirtualConnection};
pub struct Timing {
/// Current world's time.
@@ -20,6 +20,11 @@ impl Timing {
}
}
/// Return the current world's time.
pub fn now(&self) -> u64 {
self.current_time
}
/// Tick-tock the global clock. Return the event ready to be processed
/// or move the clock forward and then return the event.
pub fn step(&mut self) -> Option<Pending> {
@@ -30,7 +35,7 @@ impl Timing {
if !self.is_event_ready() {
let next_time = self.timers.peek().unwrap().time;
println!("Advancing time from {} to {}", self.current_time, next_time);
println!("CLK(time={})", next_time);
self.current_time = next_time;
assert!(self.is_event_ready());
}
@@ -114,8 +119,27 @@ impl<T: Debug + Clone> Event for SendMessageEvent<T> {
impl<T: Debug + Clone> Debug for SendMessageEvent<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// TODO: add more context about receiver channel
f.debug_struct("SendMessageEvent")
.field("msg", &self.msg)
.finish()
}
}
pub struct NetworkEvent(pub Arc<VirtualConnection>);
impl Event for NetworkEvent {
fn process(&self) {
self.0.process();
}
}
impl Debug for NetworkEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Network")
.field("conn", &self.0.connection_id)
.field("node[0]", &self.0.nodes[0].id)
.field("node[1]", &self.0.nodes[1].id)
.finish()
}
}

View File

@@ -1,12 +1,12 @@
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{cell::RefCell, ops::DerefMut, sync::Arc};
use std::{cell::RefCell, ops::DerefMut, sync::{Arc, atomic::AtomicU64}};
use super::{
chan::Chan,
node_os::NodeOs,
proto::AnyMessage,
sync::{Mutex, Park},
tcp::Tcp,
network::{TCP, VirtualConnection},
time::{Event, Timing},
wait_group::WaitGroup,
};
@@ -29,6 +29,9 @@ pub struct World {
/// Timers and stuff.
timing: Mutex<Timing>,
/// Network connection counter.
connection_counter: AtomicU64,
}
impl World {
@@ -39,6 +42,7 @@ impl World {
wait_group: WaitGroup::new(),
rng: Mutex::new(StdRng::seed_from_u64(1337)),
timing: Mutex::new(Timing::new()),
connection_counter: AtomicU64::new(0),
}
}
@@ -78,11 +82,15 @@ impl World {
}
/// Returns a writable end of a TCP connection, to send src->dst messages.
pub fn open_tcp(&self, _src: &Arc<Node>, dst: NodeId) -> Tcp {
pub fn open_tcp(self: &Arc<World>, src: &Arc<Node>, dst: NodeId) -> TCP {
// TODO: replace unwrap() with /dev/null socket.
let dst = self.get_node(dst).unwrap();
Tcp::new(dst)
let id = self.connection_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let conn = VirtualConnection::new(id, self.clone(), src.clone(), dst);
// MessageDirection(0) is src->dst
TCP::new(conn, 0)
}
/// Blocks the current thread until all nodes will park or finish.
@@ -110,7 +118,7 @@ impl World {
// First try to wake up unconditional thread.
let to_resume = self.thread_to_unpark();
if let Some(park) = to_resume {
println!("Waking up park at node {:?}", park.node_id());
// println!("Waking up park at node {:?}", park.node_id());
// Wake up the chosen thread. To do that:
// 1. Increment the counter of running threads.
@@ -129,8 +137,8 @@ impl World {
// This way all code running in simulation is considered to be
// instant in terms of "virtual time", and time is advanced only
// when code is waiting for external events.
let mut timing = self.timing.lock();
if let Some(event) = timing.step() {
let time_event = self.timing.lock().step();
if let Some(event) = time_event {
println!("Processing event: {:?}", event.event);
event.process();
@@ -167,6 +175,12 @@ impl World {
timing.schedule_future(ms, e);
}
/// Get current time.
pub fn now(&self) -> u64 {
let timing = self.timing.lock();
timing.now()
}
/// Get the current world, panics if called from outside of a world thread.
pub fn current() -> Arc<World> {
CURRENT_WORLD.with(|world| {
@@ -317,6 +331,6 @@ impl Node {
#[derive(Clone, Debug)]
pub enum NodeEvent {
Accept,
Message(AnyMessage),
Message((AnyMessage, TCP)),
// TODO: close?
}

View File

@@ -13,9 +13,8 @@ pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
let sock = os.open_tcp(dst);
for num in data {
os.sleep(os.random(10000));
println!("start send data from client");
println!("sending data: {:?}", num.clone());
sock.send(AnyMessage::ReplCell(num.clone()));
println!("finish send data from client");
}
println!("sent all data and finished client");

View File

@@ -28,14 +28,9 @@ fn start_simulation() {
let server_storage = shared_storage.clone();
server_node.launch(move |os| run_server(os, Box::new(server_storage)));
world.debug_print_state();
world.await_all();
world.debug_print_state();
while world.step() {
println!("made a step!");
world.debug_print_state();
}
while world.step() {}
let disk_data = shared_storage.state.lock().data.clone();
assert!(verify_data(&disk_data, &u32_data[..]));

View File

@@ -33,7 +33,7 @@ pub fn run_server(os: NodeOs, mut storage: Box<dyn Storage<u32>>) {
let event = epoll.recv();
println!("got event: {:?}", event);
match event {
NodeEvent::Message(msg) => match msg {
NodeEvent::Message((msg, _)) => match msg {
AnyMessage::ReplCell(num) => {
storage.write(num.value);
}