From 6436432a7776143145194f334387ef99efd8d708 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 25 May 2023 12:53:20 +0300 Subject: [PATCH] Showcase network failures --- safekeeper/src/simlib/network.rs | 6 +----- safekeeper/src/simtest/client.rs | 30 +++++++++++++++++++++++++++++- safekeeper/src/simtest/mod.rs | 10 ++++++---- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/safekeeper/src/simlib/network.rs b/safekeeper/src/simlib/network.rs index 4172deb522..e69ce98593 100644 --- a/safekeeper/src/simlib/network.rs +++ b/safekeeper/src/simlib/network.rs @@ -50,6 +50,7 @@ impl Delay { } } +#[derive(Clone)] pub struct NetworkOptions { /// Connection will be automatically closed after this timeout. pub timeout: Option, @@ -211,11 +212,6 @@ impl VirtualConnection { }; let buffer = &mut state.buffers[direction as usize]; - if close { - buffer.send_closed = true; - return; - } - if buffer.send_closed { println!( "NET: TCP #{} dropped message {:?} (broken pipe)", diff --git a/safekeeper/src/simtest/client.rs b/safekeeper/src/simtest/client.rs index 223b843008..d6f977b0e1 100644 --- a/safekeeper/src/simtest/client.rs +++ b/safekeeper/src/simtest/client.rs @@ -1,13 +1,41 @@ use crate::simlib::{ node_os::NodeOs, proto::{AnyMessage, ReplCell}, - world::NodeId, + world::{NodeEvent, NodeId}, }; /// Copy all data from array to the remote node. pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) { println!("started client"); + let epoll = os.epoll(); + let mut delivered = 0; + + let mut sock = os.open_tcp(dst); + + while delivered < data.len() { + let num = &data[delivered]; + println!("sending data: {:?}", num.clone()); + sock.send(AnyMessage::ReplCell(num.clone())); + + // loop { + let event = epoll.recv(); + match event { + NodeEvent::Message((AnyMessage::Just32(flush_pos), _)) => { + if flush_pos == 1 + delivered as u32 { + delivered += 1; + } + } + NodeEvent::Closed(_) => { + println!("connection closed, reestablishing"); + sock = os.open_tcp(dst); + } + _ => {} + } + + // } + } + let sock = os.open_tcp(dst); for num in data { println!("sending data: {:?}", num.clone()); diff --git a/safekeeper/src/simtest/mod.rs b/safekeeper/src/simtest/mod.rs index 0c54973411..57ab33a034 100644 --- a/safekeeper/src/simtest/mod.rs +++ b/safekeeper/src/simtest/mod.rs @@ -18,7 +18,7 @@ fn run_test() { let delay = Delay { min: 1, max: 10, - fail_prob: 0.0, + fail_prob: 0.4, }; let network = NetworkOptions { @@ -27,10 +27,12 @@ fn run_test() { send_delay: delay.clone(), }; - start_simulation(1337, network); + for seed in 0..2000 { + start_simulation(seed, network.clone(), 1_000_000); + } } -fn start_simulation(seed: u64, network: NetworkOptions) { +fn start_simulation(seed: u64, network: NetworkOptions, time_limit: u64) { let network = Arc::new(network); let world = Arc::new(World::new(seed, network)); world.register_world(); @@ -51,7 +53,7 @@ fn start_simulation(seed: u64, network: NetworkOptions) { world.await_all(); - while world.step() {} + while world.step() && world.now() < time_limit {} let disk_data = shared_storage.state.lock().data.clone(); assert!(verify_data(&disk_data, &u32_data[..]));