diff --git a/safekeeper/src/simlib/mod.rs b/safekeeper/src/simlib/mod.rs index 844b5d8c45..e65edd4b70 100644 --- a/safekeeper/src/simlib/mod.rs +++ b/safekeeper/src/simlib/mod.rs @@ -1,8 +1,8 @@ pub mod chan; +pub mod network; pub mod node_os; pub mod proto; pub mod sync; -pub mod network; pub mod time; pub mod wait_group; pub mod world; diff --git a/safekeeper/src/simlib/network.rs b/safekeeper/src/simlib/network.rs index 8b4aee44b4..4172deb522 100644 --- a/safekeeper/src/simlib/network.rs +++ b/safekeeper/src/simlib/network.rs @@ -1,45 +1,121 @@ -use std::{sync::Arc, collections::VecDeque, fmt::{Debug, self}, ops::DerefMut}; +use std::{ + collections::VecDeque, + fmt::{self, Debug}, + ops::DerefMut, + sync::Arc, +}; + +use rand::{rngs::StdRng, Rng}; use super::{ proto::AnyMessage, - world::{Node, NodeEvent, World}, time::NetworkEvent, sync::Mutex, + sync::Mutex, + time::NetworkEvent, + world::{Node, NodeEvent, World}, }; +#[derive(Clone)] +pub struct Delay { + pub min: u64, + pub max: u64, + pub fail_prob: f64, // [0; 1] +} + +impl Delay { + /// No delay, no failures. + pub fn empty() -> Delay { + Delay { + min: 0, + max: 0, + fail_prob: 0.0, + } + } + + /// Fixed delay. + pub fn fixed(ms: u64) -> Delay { + Delay { + min: ms, + max: ms, + fail_prob: 0.0, + } + } + + /// Generate a random delay in range [min, max]. Return None if the + /// message should be dropped. + pub fn delay(&self, rng: &mut StdRng) -> Option { + if rng.gen_bool(self.fail_prob) { + return None; + } + Some(rng.gen_range(self.min..=self.max)) + } +} + +pub struct NetworkOptions { + /// Connection will be automatically closed after this timeout. + pub timeout: Option, + pub connect_delay: Delay, + pub send_delay: Delay, +} + // 0 - from node(0) to node(1) // 1 - from node(1) to node(0) type MessageDirection = u8; /// Virtual connection between two nodes. -/// Node 0 is the creator of the connection. +/// Node 0 is the creator of the connection (client), +/// and node 1 is the acceptor (server). pub struct VirtualConnection { /// Connection id, used for logging and debugging. pub connection_id: u64, pub world: Arc, pub nodes: [Arc; 2], state: Mutex, + options: Arc, } struct ConnectionState { buffers: [NetworkBuffer; 2], + rng: StdRng, } impl VirtualConnection { - pub fn new(id: u64, world: Arc, src: Arc, dst: Arc) -> Arc { + pub fn new( + id: u64, + world: Arc, + src: Arc, + dst: Arc, + options: Arc, + ) -> Arc { + let now = world.now(); + let rng = world.new_rng(); + let conn = Arc::new(Self { connection_id: id, world, - nodes: [src.clone(), dst.clone()], + nodes: [src, dst], state: Mutex::new(ConnectionState { - buffers: [NetworkBuffer::new(), NetworkBuffer::new()], + buffers: [NetworkBuffer::new(None), NetworkBuffer::new(Some(now))], + rng, }), + options, }); + conn.schedule_timeout(); + conn.send_connect(); + // TODO: add connection to the dst node // conn.nodes[1].network_chan().send(NodeEvent::Connection(conn.clone())); conn } + /// Notify the future about the possible timeout. + fn schedule_timeout(self: &Arc) { + if let Some(timeout) = self.options.timeout { + self.world.schedule(timeout, self.as_event()); + } + } + /// Transmit some of the messages from the buffer to the nodes. pub fn process(self: &Arc) { let now = self.world.now(); @@ -47,20 +123,79 @@ impl VirtualConnection { let mut state = self.state.lock(); for direction in 0..2 { - self.process_direction(state.deref_mut(), now, direction as MessageDirection, &self.nodes[direction], &self.nodes[direction^1]); + self.process_direction( + state.deref_mut(), + now, + direction as MessageDirection, + &self.nodes[direction], + &self.nodes[direction ^ 1], + ); + } + + // Close the one side of the connection by timeout if the node + // has not received any messages for a long time. + if let Some(timeout) = self.options.timeout { + let mut to_close = [false, false]; + for direction in 0..2 { + let node_idx = direction ^ 1; + let node = &self.nodes[node_idx]; + + let buffer = &mut state.buffers[direction]; + if buffer.recv_closed { + continue; + } + if let Some(last_recv) = buffer.last_recv { + if now - last_recv >= timeout { + println!( + "NET(time={}): connection {} timed out at node {}", + now, self.connection_id, node.id + ); + to_close[node_idx] = true; + } + } + } + drop(state); + + for node_idx in 0..2 { + if to_close[node_idx] { + self.close(node_idx); + } + } } } /// Process messages in the buffer in the given direction. - fn process_direction(self: &Arc, state: &mut ConnectionState, now: u64, direction: MessageDirection, from_node: &Arc, to_node: &Arc) { + fn process_direction( + self: &Arc, + state: &mut ConnectionState, + now: u64, + direction: MessageDirection, + from_node: &Arc, + to_node: &Arc, + ) { let buffer = &mut state.buffers[direction as usize]; - + if buffer.recv_closed { + assert!(buffer.buf.is_empty()); + } + while !buffer.buf.is_empty() && buffer.buf.front().unwrap().0 <= now { let msg = buffer.buf.pop_front().unwrap().1; - let callback = TCP::new(self.clone(), direction^1); + let callback = TCP::new(self.clone(), direction ^ 1); - println!("NET(time={}): {:?} delivered, {}=>{}", now, msg, from_node.id, to_node.id); - to_node.network_chan().send(NodeEvent::Message((msg, callback))); + println!( + "NET(time={}): {:?} delivered, {}=>{}", + now, msg, from_node.id, to_node.id + ); + buffer.last_recv = Some(now); + self.schedule_timeout(); + + if let AnyMessage::InternalConnect = msg { + to_node.network_chan().send(NodeEvent::Accept(callback)); + } else { + to_node + .network_chan() + .send(NodeEvent::Message((msg, callback))); + } } } @@ -68,13 +203,108 @@ impl VirtualConnection { pub fn send(self: &Arc, direction: MessageDirection, msg: AnyMessage) { let now = self.world.now(); let mut state = self.state.lock(); + + let (delay, close) = if let Some(ms) = self.options.send_delay.delay(&mut state.rng) { + (ms, false) + } else { + (0, true) + }; + let buffer = &mut state.buffers[direction as usize]; + if close { + buffer.send_closed = true; + return; + } - // Send a message 1ms into the future. - buffer.buf.push_back((now+1, msg)); - self.world.schedule(1, self.as_event()); + if buffer.send_closed { + println!( + "NET: TCP #{} dropped message {:?} (broken pipe)", + self.connection_id, msg + ); + return; + } - // TODO: more involved logic, random delays, connection breaks, etc. + if close { + println!( + "NET: TCP #{} dropped message {:?} (pipe just broke)", + self.connection_id, msg + ); + buffer.send_closed = true; + return; + } + + if buffer.recv_closed { + println!( + "NET: TCP #{} dropped message {:?} (recv closed)", + self.connection_id, msg + ); + return; + } + + // Send a message into the future. + buffer.buf.push_back((now + delay, msg)); + self.world.schedule(delay, self.as_event()); + } + + /// Send the handshake (Accept) to the server. + fn send_connect(self: &Arc) { + let now = self.world.now(); + let mut state = self.state.lock(); + let delay = self.options.connect_delay.delay(&mut state.rng); + let buffer = &mut state.buffers[0]; + assert!(buffer.buf.is_empty()); + assert!(!buffer.recv_closed); + assert!(!buffer.send_closed); + assert!(buffer.last_recv.is_none()); + + let delay = if let Some(ms) = delay { + ms + } else { + println!("NET: TCP #{} dropped connect", self.connection_id); + buffer.send_closed = true; + return; + }; + + // Send a message into the future. + buffer + .buf + .push_back((now + delay, AnyMessage::InternalConnect)); + self.world.schedule(delay, self.as_event()); + } + + /// Close the connection. Only one side of the connection will be closed, + /// and no further messages will be delivered. The other side will not be notified. + pub fn close(self: &Arc, node_idx: usize) { + let node = &self.nodes[node_idx]; + + let mut state = self.state.lock(); + let recv_buffer = &mut state.buffers[1 ^ node_idx]; + if recv_buffer.recv_closed { + println!( + "NET: TCP #{} closed twice at node {}", + self.connection_id, node.id + ); + return; + } + + println!( + "NET: TCP #{} closed at node {}", + self.connection_id, node.id + ); + recv_buffer.recv_closed = true; + for msg in recv_buffer.buf.drain(..) { + println!( + "NET: TCP #{} dropped message {:?} (closed)", + self.connection_id, msg + ); + } + + let send_buffer = &mut state.buffers[node_idx]; + send_buffer.send_closed = true; + // TODO: notify the other side? + + node.network_chan() + .send(NodeEvent::Closed(TCP::new(self.clone(), node_idx as u8))); } /// Get an event suitable for scheduling. @@ -84,14 +314,28 @@ impl VirtualConnection { } struct NetworkBuffer { - // Messages paired with time of delivery + /// Messages paired with time of delivery buf: VecDeque<(u64, AnyMessage)>, + /// True if the connection is closed on the receiving side, + /// i.e. no more messages from the buffer will be delivered. + recv_closed: bool, + /// True if the connection is closed on the sending side, + /// i.e. no more messages will be added to the buffer. + send_closed: bool, + /// Last time a message was delivered from the buffer. + /// If None, it means that the server is the receiver and + /// it has not yet aware of this connection (i.e. has not + /// received the Accept). + last_recv: Option, } impl NetworkBuffer { - fn new() -> Self { + fn new(last_recv: Option) -> Self { Self { buf: VecDeque::new(), + recv_closed: false, + send_closed: false, + last_recv, } } } @@ -100,7 +344,7 @@ impl NetworkBuffer { /// There are almost no errors, writes are always successful (but may end up in void). /// Reads are implemented as a messages in a shared queue, refer to [`NodeOs::network_epoll`] /// for details. -/// +/// /// TCP struct is just a one side of a connection. To create a connection, use [`NodeOs::open_tcp`]. #[derive(Clone)] pub struct TCP { @@ -110,7 +354,13 @@ pub struct TCP { impl Debug for TCP { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "TCP #{} {{{}=>{}}}", self.conn.connection_id, self.conn.nodes[self.dir as usize].id, self.conn.nodes[1 - self.dir as usize].id) + write!( + f, + "TCP #{} {{{}=>{}}}", + self.conn.connection_id, + self.conn.nodes[self.dir as usize].id, + self.conn.nodes[1 - self.dir as usize].id + ) } } diff --git a/safekeeper/src/simlib/proto.rs b/safekeeper/src/simlib/proto.rs index ef865f9126..b3d05641b8 100644 --- a/safekeeper/src/simlib/proto.rs +++ b/safekeeper/src/simlib/proto.rs @@ -2,6 +2,8 @@ /// Grouped by the receiver node. #[derive(Clone, Debug)] pub enum AnyMessage { + /// Used internally for notifying node about new incoming connection. + InternalConnect, Just32(u32), ReplCell(ReplCell), } diff --git a/safekeeper/src/simlib/time.rs b/safekeeper/src/simlib/time.rs index e92115f014..6713704b43 100644 --- a/safekeeper/src/simlib/time.rs +++ b/safekeeper/src/simlib/time.rs @@ -28,7 +28,7 @@ impl Timing { /// Tick-tock the global clock. Return the event ready to be processed /// or move the clock forward and then return the event. pub fn step(&mut self) -> Option { - if self.timers.len() == 0 { + if self.timers.is_empty() { // no future events return None; } @@ -90,7 +90,7 @@ impl Ord for Pending { impl PartialEq for Pending { fn eq(&self, other: &Self) -> bool { - &(other.time, other.nonce) == &(self.time, self.nonce) + (other.time, other.nonce) == (self.time, self.nonce) } } diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index 65a9b72600..b81de8b896 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -1,12 +1,16 @@ use rand::{rngs::StdRng, Rng, SeedableRng}; -use std::{cell::RefCell, ops::DerefMut, sync::{Arc, atomic::AtomicU64}}; +use std::{ + cell::RefCell, + ops::DerefMut, + sync::{atomic::AtomicU64, Arc}, +}; use super::{ chan::Chan, + network::{NetworkOptions, VirtualConnection, TCP}, node_os::NodeOs, proto::AnyMessage, sync::{Mutex, Park}, - network::{TCP, VirtualConnection}, time::{Event, Timing}, wait_group::WaitGroup, }; @@ -32,17 +36,21 @@ pub struct World { /// Network connection counter. connection_counter: AtomicU64, + + /// Network options. + network_options: Arc, } impl World { - pub fn new() -> World { + pub fn new(seed: u64, network_options: Arc) -> World { World { nodes: Mutex::new(Vec::new()), unconditional_parking: Mutex::new(Vec::new()), wait_group: WaitGroup::new(), - rng: Mutex::new(StdRng::seed_from_u64(1337)), + rng: Mutex::new(StdRng::seed_from_u64(seed)), timing: Mutex::new(Timing::new()), connection_counter: AtomicU64::new(0), + network_options, } } @@ -86,9 +94,17 @@ impl World { // TODO: replace unwrap() with /dev/null socket. let dst = self.get_node(dst).unwrap(); - let id = self.connection_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let conn = VirtualConnection::new(id, self.clone(), src.clone(), dst); - + let id = self + .connection_counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let conn = VirtualConnection::new( + id, + self.clone(), + src.clone(), + dst, + self.network_options.clone(), + ); + // MessageDirection(0) is src->dst TCP::new(conn, 0) } @@ -139,7 +155,7 @@ impl World { // when code is waiting for external events. let time_event = self.timing.lock().step(); if let Some(event) = time_event { - println!("Processing event: {:?}", event.event); + // println!("Processing event: {:?}", event.event); event.process(); // to have a clean state after each step, wait for all threads to finish @@ -229,7 +245,7 @@ impl Node { id, network: Chan::new(), status: Mutex::new(NodeStatus::NotStarted), - world: world.clone(), + world, join_handle: Mutex::new(None), rng: Mutex::new(rng), } @@ -330,7 +346,8 @@ impl Node { /// Network events and timers. #[derive(Clone, Debug)] pub enum NodeEvent { - Accept, + Accept(TCP), + Closed(TCP), Message((AnyMessage, TCP)), // TODO: close? } diff --git a/safekeeper/src/simtest/client.rs b/safekeeper/src/simtest/client.rs index c484050b03..223b843008 100644 --- a/safekeeper/src/simtest/client.rs +++ b/safekeeper/src/simtest/client.rs @@ -8,11 +8,8 @@ use crate::simlib::{ pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) { println!("started client"); - os.sleep(os.random(10000)); - let sock = os.open_tcp(dst); for num in data { - os.sleep(os.random(10000)); println!("sending data: {:?}", num.clone()); sock.send(AnyMessage::ReplCell(num.clone())); } diff --git a/safekeeper/src/simtest/mod.rs b/safekeeper/src/simtest/mod.rs index 5f4a8d8c2e..0c54973411 100644 --- a/safekeeper/src/simtest/mod.rs +++ b/safekeeper/src/simtest/mod.rs @@ -5,13 +5,34 @@ mod server; use std::sync::Arc; use crate::{ - simlib::{proto::ReplCell, world::World}, + simlib::{ + network::{Delay, NetworkOptions}, + proto::ReplCell, + world::World, + }, simtest::{client::run_client, disk::SharedStorage, server::run_server}, }; #[test] -fn start_simulation() { - let world = Arc::new(World::new()); +fn run_test() { + let delay = Delay { + min: 1, + max: 10, + fail_prob: 0.0, + }; + + let network = NetworkOptions { + timeout: Some(1000), + connect_delay: delay.clone(), + send_delay: delay.clone(), + }; + + start_simulation(1337, network); +} + +fn start_simulation(seed: u64, network: NetworkOptions) { + let network = Arc::new(network); + let world = Arc::new(World::new(seed, network)); world.register_world(); let client_node = world.new_node(); diff --git a/safekeeper/src/simtest/server.rs b/safekeeper/src/simtest/server.rs index 666e63f74e..d55afec545 100644 --- a/safekeeper/src/simtest/server.rs +++ b/safekeeper/src/simtest/server.rs @@ -1,44 +1,50 @@ -use std::collections::HashMap; - use crate::simlib::{node_os::NodeOs, proto::AnyMessage, world::NodeEvent}; use super::disk::Storage; -pub struct DiskLog { - pub map: HashMap, -} +// pub struct DiskLog { +// pub map: HashMap, +// } -impl DiskLog { - pub fn new() -> Self { - Self { - map: HashMap::new(), - } - } +// impl DiskLog { +// pub fn new() -> Self { +// Self { +// map: HashMap::new(), +// } +// } - pub fn get(&self, key: &str) -> u32 { - self.map.get(key).copied().unwrap_or(0) - } +// pub fn get(&self, key: &str) -> u32 { +// self.map.get(key).copied().unwrap_or(0) +// } - pub fn set(&mut self, key: &str, value: u32) { - self.map.insert(key.to_string(), value); - } -} +// pub fn set(&mut self, key: &str, value: u32) { +// self.map.insert(key.to_string(), value); +// } +// } pub fn run_server(os: NodeOs, mut storage: Box>) { println!("started server"); let epoll = os.epoll(); loop { - os.sleep(os.random(10000)); let event = epoll.recv(); println!("got event: {:?}", event); match event { - NodeEvent::Message((msg, _)) => match msg { - AnyMessage::ReplCell(num) => { - storage.write(num.value); + NodeEvent::Message((msg, tcp)) => match msg { + AnyMessage::ReplCell(cell) => { + if cell.seqno != storage.flush_pos() { + println!("got out of order data: {:?}", cell); + continue; + } + storage.write(cell.value); + storage.flush().unwrap(); + tcp.send(AnyMessage::Just32(storage.flush_pos())); } _ => {} }, + NodeEvent::Accept(tcp) => { + tcp.send(AnyMessage::Just32(storage.flush_pos())); + } _ => {} } }