Add accept, close and delays to the network

This commit is contained in:
Arthur Petukhovsky
2023-05-25 12:26:57 +03:00
parent 87c9edac7c
commit 1b8918e665
8 changed files with 354 additions and 61 deletions

View File

@@ -1,8 +1,8 @@
pub mod chan;
pub mod network;
pub mod node_os;
pub mod proto;
pub mod sync;
pub mod network;
pub mod time;
pub mod wait_group;
pub mod world;

View File

@@ -1,45 +1,121 @@
use std::{sync::Arc, collections::VecDeque, fmt::{Debug, self}, ops::DerefMut};
use std::{
collections::VecDeque,
fmt::{self, Debug},
ops::DerefMut,
sync::Arc,
};
use rand::{rngs::StdRng, Rng};
use super::{
proto::AnyMessage,
world::{Node, NodeEvent, World}, time::NetworkEvent, sync::Mutex,
sync::Mutex,
time::NetworkEvent,
world::{Node, NodeEvent, World},
};
#[derive(Clone)]
pub struct Delay {
pub min: u64,
pub max: u64,
pub fail_prob: f64, // [0; 1]
}
impl Delay {
/// No delay, no failures.
pub fn empty() -> Delay {
Delay {
min: 0,
max: 0,
fail_prob: 0.0,
}
}
/// Fixed delay.
pub fn fixed(ms: u64) -> Delay {
Delay {
min: ms,
max: ms,
fail_prob: 0.0,
}
}
/// Generate a random delay in range [min, max]. Return None if the
/// message should be dropped.
pub fn delay(&self, rng: &mut StdRng) -> Option<u64> {
if rng.gen_bool(self.fail_prob) {
return None;
}
Some(rng.gen_range(self.min..=self.max))
}
}
pub struct NetworkOptions {
/// Connection will be automatically closed after this timeout.
pub timeout: Option<u64>,
pub connect_delay: Delay,
pub send_delay: Delay,
}
// 0 - from node(0) to node(1)
// 1 - from node(1) to node(0)
type MessageDirection = u8;
/// Virtual connection between two nodes.
/// Node 0 is the creator of the connection.
/// Node 0 is the creator of the connection (client),
/// and node 1 is the acceptor (server).
pub struct VirtualConnection {
/// Connection id, used for logging and debugging.
pub connection_id: u64,
pub world: Arc<World>,
pub nodes: [Arc<Node>; 2],
state: Mutex<ConnectionState>,
options: Arc<NetworkOptions>,
}
struct ConnectionState {
buffers: [NetworkBuffer; 2],
rng: StdRng,
}
impl VirtualConnection {
pub fn new(id: u64, world: Arc<World>, src: Arc<Node>, dst: Arc<Node>) -> Arc<Self> {
pub fn new(
id: u64,
world: Arc<World>,
src: Arc<Node>,
dst: Arc<Node>,
options: Arc<NetworkOptions>,
) -> Arc<Self> {
let now = world.now();
let rng = world.new_rng();
let conn = Arc::new(Self {
connection_id: id,
world,
nodes: [src.clone(), dst.clone()],
nodes: [src, dst],
state: Mutex::new(ConnectionState {
buffers: [NetworkBuffer::new(), NetworkBuffer::new()],
buffers: [NetworkBuffer::new(None), NetworkBuffer::new(Some(now))],
rng,
}),
options,
});
conn.schedule_timeout();
conn.send_connect();
// TODO: add connection to the dst node
// conn.nodes[1].network_chan().send(NodeEvent::Connection(conn.clone()));
conn
}
/// Notify the future about the possible timeout.
fn schedule_timeout(self: &Arc<Self>) {
if let Some(timeout) = self.options.timeout {
self.world.schedule(timeout, self.as_event());
}
}
/// Transmit some of the messages from the buffer to the nodes.
pub fn process(self: &Arc<Self>) {
let now = self.world.now();
@@ -47,20 +123,79 @@ impl VirtualConnection {
let mut state = self.state.lock();
for direction in 0..2 {
self.process_direction(state.deref_mut(), now, direction as MessageDirection, &self.nodes[direction], &self.nodes[direction^1]);
self.process_direction(
state.deref_mut(),
now,
direction as MessageDirection,
&self.nodes[direction],
&self.nodes[direction ^ 1],
);
}
// Close the one side of the connection by timeout if the node
// has not received any messages for a long time.
if let Some(timeout) = self.options.timeout {
let mut to_close = [false, false];
for direction in 0..2 {
let node_idx = direction ^ 1;
let node = &self.nodes[node_idx];
let buffer = &mut state.buffers[direction];
if buffer.recv_closed {
continue;
}
if let Some(last_recv) = buffer.last_recv {
if now - last_recv >= timeout {
println!(
"NET(time={}): connection {} timed out at node {}",
now, self.connection_id, node.id
);
to_close[node_idx] = true;
}
}
}
drop(state);
for node_idx in 0..2 {
if to_close[node_idx] {
self.close(node_idx);
}
}
}
}
/// Process messages in the buffer in the given direction.
fn process_direction(self: &Arc<Self>, state: &mut ConnectionState, now: u64, direction: MessageDirection, from_node: &Arc<Node>, to_node: &Arc<Node>) {
fn process_direction(
self: &Arc<Self>,
state: &mut ConnectionState,
now: u64,
direction: MessageDirection,
from_node: &Arc<Node>,
to_node: &Arc<Node>,
) {
let buffer = &mut state.buffers[direction as usize];
if buffer.recv_closed {
assert!(buffer.buf.is_empty());
}
while !buffer.buf.is_empty() && buffer.buf.front().unwrap().0 <= now {
let msg = buffer.buf.pop_front().unwrap().1;
let callback = TCP::new(self.clone(), direction^1);
let callback = TCP::new(self.clone(), direction ^ 1);
println!("NET(time={}): {:?} delivered, {}=>{}", now, msg, from_node.id, to_node.id);
to_node.network_chan().send(NodeEvent::Message((msg, callback)));
println!(
"NET(time={}): {:?} delivered, {}=>{}",
now, msg, from_node.id, to_node.id
);
buffer.last_recv = Some(now);
self.schedule_timeout();
if let AnyMessage::InternalConnect = msg {
to_node.network_chan().send(NodeEvent::Accept(callback));
} else {
to_node
.network_chan()
.send(NodeEvent::Message((msg, callback)));
}
}
}
@@ -68,13 +203,108 @@ impl VirtualConnection {
pub fn send(self: &Arc<Self>, direction: MessageDirection, msg: AnyMessage) {
let now = self.world.now();
let mut state = self.state.lock();
let (delay, close) = if let Some(ms) = self.options.send_delay.delay(&mut state.rng) {
(ms, false)
} else {
(0, true)
};
let buffer = &mut state.buffers[direction as usize];
if close {
buffer.send_closed = true;
return;
}
// Send a message 1ms into the future.
buffer.buf.push_back((now+1, msg));
self.world.schedule(1, self.as_event());
if buffer.send_closed {
println!(
"NET: TCP #{} dropped message {:?} (broken pipe)",
self.connection_id, msg
);
return;
}
// TODO: more involved logic, random delays, connection breaks, etc.
if close {
println!(
"NET: TCP #{} dropped message {:?} (pipe just broke)",
self.connection_id, msg
);
buffer.send_closed = true;
return;
}
if buffer.recv_closed {
println!(
"NET: TCP #{} dropped message {:?} (recv closed)",
self.connection_id, msg
);
return;
}
// Send a message into the future.
buffer.buf.push_back((now + delay, msg));
self.world.schedule(delay, self.as_event());
}
/// Send the handshake (Accept) to the server.
fn send_connect(self: &Arc<Self>) {
let now = self.world.now();
let mut state = self.state.lock();
let delay = self.options.connect_delay.delay(&mut state.rng);
let buffer = &mut state.buffers[0];
assert!(buffer.buf.is_empty());
assert!(!buffer.recv_closed);
assert!(!buffer.send_closed);
assert!(buffer.last_recv.is_none());
let delay = if let Some(ms) = delay {
ms
} else {
println!("NET: TCP #{} dropped connect", self.connection_id);
buffer.send_closed = true;
return;
};
// Send a message into the future.
buffer
.buf
.push_back((now + delay, AnyMessage::InternalConnect));
self.world.schedule(delay, self.as_event());
}
/// Close the connection. Only one side of the connection will be closed,
/// and no further messages will be delivered. The other side will not be notified.
pub fn close(self: &Arc<Self>, node_idx: usize) {
let node = &self.nodes[node_idx];
let mut state = self.state.lock();
let recv_buffer = &mut state.buffers[1 ^ node_idx];
if recv_buffer.recv_closed {
println!(
"NET: TCP #{} closed twice at node {}",
self.connection_id, node.id
);
return;
}
println!(
"NET: TCP #{} closed at node {}",
self.connection_id, node.id
);
recv_buffer.recv_closed = true;
for msg in recv_buffer.buf.drain(..) {
println!(
"NET: TCP #{} dropped message {:?} (closed)",
self.connection_id, msg
);
}
let send_buffer = &mut state.buffers[node_idx];
send_buffer.send_closed = true;
// TODO: notify the other side?
node.network_chan()
.send(NodeEvent::Closed(TCP::new(self.clone(), node_idx as u8)));
}
/// Get an event suitable for scheduling.
@@ -84,14 +314,28 @@ impl VirtualConnection {
}
struct NetworkBuffer {
// Messages paired with time of delivery
/// Messages paired with time of delivery
buf: VecDeque<(u64, AnyMessage)>,
/// True if the connection is closed on the receiving side,
/// i.e. no more messages from the buffer will be delivered.
recv_closed: bool,
/// True if the connection is closed on the sending side,
/// i.e. no more messages will be added to the buffer.
send_closed: bool,
/// Last time a message was delivered from the buffer.
/// If None, it means that the server is the receiver and
/// it has not yet aware of this connection (i.e. has not
/// received the Accept).
last_recv: Option<u64>,
}
impl NetworkBuffer {
fn new() -> Self {
fn new(last_recv: Option<u64>) -> Self {
Self {
buf: VecDeque::new(),
recv_closed: false,
send_closed: false,
last_recv,
}
}
}
@@ -100,7 +344,7 @@ impl NetworkBuffer {
/// There are almost no errors, writes are always successful (but may end up in void).
/// Reads are implemented as a messages in a shared queue, refer to [`NodeOs::network_epoll`]
/// for details.
///
///
/// TCP struct is just a one side of a connection. To create a connection, use [`NodeOs::open_tcp`].
#[derive(Clone)]
pub struct TCP {
@@ -110,7 +354,13 @@ pub struct TCP {
impl Debug for TCP {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TCP #{} {{{}=>{}}}", self.conn.connection_id, self.conn.nodes[self.dir as usize].id, self.conn.nodes[1 - self.dir as usize].id)
write!(
f,
"TCP #{} {{{}=>{}}}",
self.conn.connection_id,
self.conn.nodes[self.dir as usize].id,
self.conn.nodes[1 - self.dir as usize].id
)
}
}

View File

@@ -2,6 +2,8 @@
/// Grouped by the receiver node.
#[derive(Clone, Debug)]
pub enum AnyMessage {
/// Used internally for notifying node about new incoming connection.
InternalConnect,
Just32(u32),
ReplCell(ReplCell),
}

View File

@@ -28,7 +28,7 @@ impl Timing {
/// Tick-tock the global clock. Return the event ready to be processed
/// or move the clock forward and then return the event.
pub fn step(&mut self) -> Option<Pending> {
if self.timers.len() == 0 {
if self.timers.is_empty() {
// no future events
return None;
}
@@ -90,7 +90,7 @@ impl Ord for Pending {
impl PartialEq for Pending {
fn eq(&self, other: &Self) -> bool {
&(other.time, other.nonce) == &(self.time, self.nonce)
(other.time, other.nonce) == (self.time, self.nonce)
}
}

View File

@@ -1,12 +1,16 @@
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{cell::RefCell, ops::DerefMut, sync::{Arc, atomic::AtomicU64}};
use std::{
cell::RefCell,
ops::DerefMut,
sync::{atomic::AtomicU64, Arc},
};
use super::{
chan::Chan,
network::{NetworkOptions, VirtualConnection, TCP},
node_os::NodeOs,
proto::AnyMessage,
sync::{Mutex, Park},
network::{TCP, VirtualConnection},
time::{Event, Timing},
wait_group::WaitGroup,
};
@@ -32,17 +36,21 @@ pub struct World {
/// Network connection counter.
connection_counter: AtomicU64,
/// Network options.
network_options: Arc<NetworkOptions>,
}
impl World {
pub fn new() -> World {
pub fn new(seed: u64, network_options: Arc<NetworkOptions>) -> World {
World {
nodes: Mutex::new(Vec::new()),
unconditional_parking: Mutex::new(Vec::new()),
wait_group: WaitGroup::new(),
rng: Mutex::new(StdRng::seed_from_u64(1337)),
rng: Mutex::new(StdRng::seed_from_u64(seed)),
timing: Mutex::new(Timing::new()),
connection_counter: AtomicU64::new(0),
network_options,
}
}
@@ -86,9 +94,17 @@ impl World {
// TODO: replace unwrap() with /dev/null socket.
let dst = self.get_node(dst).unwrap();
let id = self.connection_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let conn = VirtualConnection::new(id, self.clone(), src.clone(), dst);
let id = self
.connection_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let conn = VirtualConnection::new(
id,
self.clone(),
src.clone(),
dst,
self.network_options.clone(),
);
// MessageDirection(0) is src->dst
TCP::new(conn, 0)
}
@@ -139,7 +155,7 @@ impl World {
// when code is waiting for external events.
let time_event = self.timing.lock().step();
if let Some(event) = time_event {
println!("Processing event: {:?}", event.event);
// println!("Processing event: {:?}", event.event);
event.process();
// to have a clean state after each step, wait for all threads to finish
@@ -229,7 +245,7 @@ impl Node {
id,
network: Chan::new(),
status: Mutex::new(NodeStatus::NotStarted),
world: world.clone(),
world,
join_handle: Mutex::new(None),
rng: Mutex::new(rng),
}
@@ -330,7 +346,8 @@ impl Node {
/// Network events and timers.
#[derive(Clone, Debug)]
pub enum NodeEvent {
Accept,
Accept(TCP),
Closed(TCP),
Message((AnyMessage, TCP)),
// TODO: close?
}

View File

@@ -8,11 +8,8 @@ use crate::simlib::{
pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
println!("started client");
os.sleep(os.random(10000));
let sock = os.open_tcp(dst);
for num in data {
os.sleep(os.random(10000));
println!("sending data: {:?}", num.clone());
sock.send(AnyMessage::ReplCell(num.clone()));
}

View File

@@ -5,13 +5,34 @@ mod server;
use std::sync::Arc;
use crate::{
simlib::{proto::ReplCell, world::World},
simlib::{
network::{Delay, NetworkOptions},
proto::ReplCell,
world::World,
},
simtest::{client::run_client, disk::SharedStorage, server::run_server},
};
#[test]
fn start_simulation() {
let world = Arc::new(World::new());
fn run_test() {
let delay = Delay {
min: 1,
max: 10,
fail_prob: 0.0,
};
let network = NetworkOptions {
timeout: Some(1000),
connect_delay: delay.clone(),
send_delay: delay.clone(),
};
start_simulation(1337, network);
}
fn start_simulation(seed: u64, network: NetworkOptions) {
let network = Arc::new(network);
let world = Arc::new(World::new(seed, network));
world.register_world();
let client_node = world.new_node();

View File

@@ -1,44 +1,50 @@
use std::collections::HashMap;
use crate::simlib::{node_os::NodeOs, proto::AnyMessage, world::NodeEvent};
use super::disk::Storage;
pub struct DiskLog {
pub map: HashMap<String, u32>,
}
// pub struct DiskLog {
// pub map: HashMap<String, u32>,
// }
impl DiskLog {
pub fn new() -> Self {
Self {
map: HashMap::new(),
}
}
// impl DiskLog {
// pub fn new() -> Self {
// Self {
// map: HashMap::new(),
// }
// }
pub fn get(&self, key: &str) -> u32 {
self.map.get(key).copied().unwrap_or(0)
}
// pub fn get(&self, key: &str) -> u32 {
// self.map.get(key).copied().unwrap_or(0)
// }
pub fn set(&mut self, key: &str, value: u32) {
self.map.insert(key.to_string(), value);
}
}
// pub fn set(&mut self, key: &str, value: u32) {
// self.map.insert(key.to_string(), value);
// }
// }
pub fn run_server(os: NodeOs, mut storage: Box<dyn Storage<u32>>) {
println!("started server");
let epoll = os.epoll();
loop {
os.sleep(os.random(10000));
let event = epoll.recv();
println!("got event: {:?}", event);
match event {
NodeEvent::Message((msg, _)) => match msg {
AnyMessage::ReplCell(num) => {
storage.write(num.value);
NodeEvent::Message((msg, tcp)) => match msg {
AnyMessage::ReplCell(cell) => {
if cell.seqno != storage.flush_pos() {
println!("got out of order data: {:?}", cell);
continue;
}
storage.write(cell.value);
storage.flush().unwrap();
tcp.send(AnyMessage::Just32(storage.flush_pos()));
}
_ => {}
},
NodeEvent::Accept(tcp) => {
tcp.send(AnyMessage::Just32(storage.flush_pos()));
}
_ => {}
}
}