From 420d3bc18f73addedc2211c1175c83336aa22bd3 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 24 Aug 2023 15:24:38 +0000 Subject: [PATCH] Add simulation schedule --- libs/walproposer/src/simtest/mod.rs | 1 + libs/walproposer/src/simtest/safekeeper.rs | 4 +- libs/walproposer/src/simtest/simple_client.rs | 2 +- libs/walproposer/src/simtest/util.rs | 437 ++++++++++++++++++ libs/walproposer/src/simtest/wp_sk.rs | 320 ++----------- safekeeper/src/simlib/network.rs | 6 +- safekeeper/src/simlib/sync.rs | 2 +- safekeeper/src/simlib/time.rs | 7 + safekeeper/src/simlib/world.rs | 12 +- safekeeper/src/simtest/mod.rs | 2 +- 10 files changed, 489 insertions(+), 304 deletions(-) create mode 100644 libs/walproposer/src/simtest/util.rs diff --git a/libs/walproposer/src/simtest/mod.rs b/libs/walproposer/src/simtest/mod.rs index 81b0f37ff1..bef5dfa05b 100644 --- a/libs/walproposer/src/simtest/mod.rs +++ b/libs/walproposer/src/simtest/mod.rs @@ -8,3 +8,4 @@ pub mod disk; pub mod safekeeper; pub mod storage; pub mod log; +pub mod util; diff --git a/libs/walproposer/src/simtest/safekeeper.rs b/libs/walproposer/src/simtest/safekeeper.rs index 358a357b75..e17a1d0f62 100644 --- a/libs/walproposer/src/simtest/safekeeper.rs +++ b/libs/walproposer/src/simtest/safekeeper.rs @@ -284,8 +284,8 @@ impl ConnState { match msg { ProposerAcceptorMessage::Greeting(ref greeting) => { info!( - "start handshake with walproposer {:?} sysid {} timeline {}", - self.tcp, greeting.system_id, greeting.tli, + "start handshake with walproposer {:?}", + self.tcp, ); let server_info = ServerInfo { pg_version: greeting.pg_version, diff --git a/libs/walproposer/src/simtest/simple_client.rs b/libs/walproposer/src/simtest/simple_client.rs index f4c8f9f456..3914c62f7c 100644 --- a/libs/walproposer/src/simtest/simple_client.rs +++ b/libs/walproposer/src/simtest/simple_client.rs @@ -19,7 +19,7 @@ fn run_rust_c_test() { }; let network = NetworkOptions { - timeout: Some(50), + keepalive_timeout: Some(50), connect_delay: delay.clone(), send_delay: delay.clone(), }; diff --git a/libs/walproposer/src/simtest/util.rs b/libs/walproposer/src/simtest/util.rs new file mode 100644 index 0000000000..60d9de6527 --- /dev/null +++ b/libs/walproposer/src/simtest/util.rs @@ -0,0 +1,437 @@ +use std::{ffi::CString, path::Path, str::FromStr, sync::Arc}; + +use safekeeper::simlib::{ + network::{Delay, NetworkOptions}, + proto::AnyMessage, + world::World, + world::{Node, NodeEvent}, time::EmptyEvent, +}; +use tracing::{info, error, warn}; +use utils::{id::TenantTimelineId, lsn::Lsn}; + +use crate::{ + bindings::{ + neon_tenant_walproposer, neon_timeline_walproposer, sim_redo_start_lsn, syncSafekeepers, + wal_acceptor_connection_timeout, wal_acceptor_reconnect_timeout, wal_acceptors_list, + MyInsertRecord, WalProposerCleanup, WalProposerRust, + }, + c_context, + simtest::{safekeeper::run_server, log::{SimClock, init_logger}}, +}; + +use super::disk::Disk; + +pub struct SkNode { + pub node: Arc, + pub id: u32, + pub disk: Arc, +} + +impl SkNode { + pub fn new(node: Arc) -> Self { + let disk = Arc::new(Disk::new()); + let res = Self { + id: node.id, + node, + disk, + }; + res.launch(); + res + } + + pub fn launch(&self) { + let id = self.id; + let disk = self.disk.clone(); + // start the server thread + self.node.launch(move |os| { + let res = run_server(os, disk); + println!("server {} finished: {:?}", id, res); + }); + } + + pub fn restart(&self) { + self.node.crash_stop(); + self.launch(); + } +} + +pub struct TestConfig { + pub network: NetworkOptions, + pub timeout: u64, + pub clock: Option, +} + +impl TestConfig { + pub fn new(clock: Option) -> Self { + Self { + network: NetworkOptions { + keepalive_timeout: Some(2000), + connect_delay: Delay { + min: 1, + max: 5, + fail_prob: 0.0, + }, + send_delay: Delay { + min: 1, + max: 5, + fail_prob: 0.0, + }, + }, + timeout: 1_000 * 10, + clock, + } + } + + pub fn start(&self, seed: u64) -> Test { + let world = Arc::new(World::new( + seed, + Arc::new(self.network.clone()), + c_context(), + )); + world.register_world(); + + if let Some(clock) = &self.clock { + clock.set_world(world.clone()); + } + + let servers = [ + SkNode::new(world.new_node()), + SkNode::new(world.new_node()), + SkNode::new(world.new_node()), + ]; + + let server_ids = [servers[0].id, servers[1].id, servers[2].id]; + info!("safekeepers: {:?}", server_ids); + + let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(","); + let ttid = TenantTimelineId::generate(); + + // wait init for all servers + world.await_all(); + + // clean up pgdata directory + self.init_pgdata(); + + Test { + world, + servers, + safekeepers_guc, + ttid, + timeout: self.timeout, + } + } + + pub fn init_pgdata(&self) { + let pgdata = Path::new("/home/admin/simulator/libs/walproposer/pgdata"); + if pgdata.exists() { + std::fs::remove_dir_all(pgdata).unwrap(); + } + std::fs::create_dir(pgdata).unwrap(); + + // create empty pg_wal and pg_notify subdirs + std::fs::create_dir(pgdata.join("pg_wal")).unwrap(); + std::fs::create_dir(pgdata.join("pg_notify")).unwrap(); + + // write postgresql.conf + let mut conf = std::fs::File::create(pgdata.join("postgresql.conf")).unwrap(); + let content = " +wal_log_hints=off +hot_standby=on +fsync=off +wal_level=replica +restart_after_crash=off +shared_preload_libraries=neon +neon.pageserver_connstring='' +neon.tenant_id=cc6e67313d57283bad411600fbf5c142 +neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e +synchronous_standby_names=walproposer +neon.safekeepers='node:1,node:2,node:3' +max_connections=100 +"; + + std::io::Write::write_all(&mut conf, content.as_bytes()).unwrap(); + } +} + +pub struct Test { + pub world: Arc, + pub servers: [SkNode; 3], + pub safekeepers_guc: String, + pub ttid: TenantTimelineId, + pub timeout: u64, +} + +impl Test { + fn launch_sync(&self) -> Arc { + let client_node = self.world.new_node(); + info!("sync-safekeepers started at node {}", client_node.id); + + // start the client thread + let guc = self.safekeepers_guc.clone(); + let ttid = self.ttid.clone(); + client_node.launch(move |_| { + let list = CString::new(guc).unwrap(); + + unsafe { + WalProposerCleanup(); + + syncSafekeepers = true; + wal_acceptors_list = list.into_raw(); + wal_acceptor_reconnect_timeout = 1000; + wal_acceptor_connection_timeout = 5000; + neon_tenant_walproposer = + CString::new(ttid.tenant_id.to_string()).unwrap().into_raw(); + neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string()) + .unwrap() + .into_raw(); + WalProposerRust(); + } + }); + + self.world.await_all(); + + client_node + } + + pub fn sync_safekeepers(&self) -> anyhow::Result { + let client_node = self.launch_sync(); + + // poll until exit or timeout + let time_limit = self.timeout; + while self.world.step() && self.world.now() < time_limit && !client_node.is_finished() {} + + if !client_node.is_finished() { + anyhow::bail!("timeout or idle stuck"); + } + + let res = client_node.result.lock().clone(); + if res.0 != 0 { + anyhow::bail!("non-zero exitcode: {:?}", res); + } + let lsn = Lsn::from_str(&res.1)?; + Ok(lsn) + } + + pub fn launch_walproposer(&self, lsn: Lsn) -> WalProposer { + let client_node = self.world.new_node(); + + let lsn = if lsn.0 == 0 { + // usual LSN after basebackup + Lsn(21623024) + } else { + lsn + }; + + // start the client thread + let guc = self.safekeepers_guc.clone(); + let ttid = self.ttid.clone(); + client_node.launch(move |_| { + let list = CString::new(guc).unwrap(); + + unsafe { + WalProposerCleanup(); + + sim_redo_start_lsn = lsn.0; + syncSafekeepers = false; + wal_acceptors_list = list.into_raw(); + wal_acceptor_reconnect_timeout = 1000; + wal_acceptor_connection_timeout = 5000; + neon_tenant_walproposer = + CString::new(ttid.tenant_id.to_string()).unwrap().into_raw(); + neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string()) + .unwrap() + .into_raw(); + WalProposerRust(); + } + }); + + self.world.await_all(); + + WalProposer { + node: client_node, + txes: Vec::new(), + last_committed_tx: 0, + commit_lsn: Lsn(0), + } + } + + pub fn poll_for_duration(&self, duration: u64) { + let time_limit = std::cmp::min(self.world.now() + duration, self.timeout); + while self.world.step() && self.world.now() < time_limit {} + } + + pub fn run_schedule(&self, schedule: &Schedule) -> anyhow::Result<()> { + { + let empty_event = Box::new(EmptyEvent); + + let now = self.world.now(); + for (time, _) in schedule { + if *time < now { + continue; + } + self.world.schedule(*time - now, empty_event.clone()) + } + } + + + let mut wait_node = self.launch_sync(); + // fake walproposer + let mut wp = WalProposer { + node: wait_node.clone(), + txes: Vec::new(), + last_committed_tx: 0, + commit_lsn: Lsn(0), + }; + let mut sync_in_progress = true; + + let mut skipped_tx = 0; + let mut started_tx = 0; + let mut finished_tx = 0; + + let mut schedule_ptr = 0; + + loop { + if !sync_in_progress { + finished_tx += wp.update(); + } + + if sync_in_progress && wait_node.is_finished() { + let res = wait_node.result.lock().clone(); + if res.0 != 0 { + anyhow::bail!("non-zero exitcode: {:?}", res); + } + let lsn = Lsn::from_str(&res.1)?; + info!("sync-safekeepers finished at LSN {}", lsn); + wp = self.launch_walproposer(lsn); + wait_node = wp.node.clone(); + info!("walproposer started at node {}", wait_node.id); + sync_in_progress = false; + } + + let now = self.world.now(); + while schedule_ptr < schedule.len() && schedule[schedule_ptr].0 <= now { + if now != schedule[schedule_ptr].0 { + warn!("skipped event {:?} at {}", schedule[schedule_ptr], now); + } + + let action = &schedule[schedule_ptr].1; + match action { + TestAction::WriteTx(size) => { + if !sync_in_progress && !wait_node.is_finished() { + for _ in 0..*size { + started_tx += 1; + wp.write_tx(); + } + info!("written {} transactions", size); + } else { + skipped_tx += size; + info!("skipped {} transactions", size); + } + } + TestAction::RestartSafekeeper(id) => { + info!("restarting safekeeper {}", id); + self.servers[*id as usize].restart(); + } + TestAction::RestartWalProposer => { + info!("restarting walproposer"); + wait_node.crash_stop(); + sync_in_progress = true; + wait_node = self.launch_sync(); + } + } + schedule_ptr += 1; + } + + if schedule_ptr == schedule.len() { + break; + } + let next_event_time = schedule[schedule_ptr].0; + info!("next event at {}, polling", next_event_time); + + // poll until the next event + if wait_node.is_finished() { + while self.world.step() && self.world.now() < next_event_time {} + } else { + while self.world.step() && self.world.now() < next_event_time && !wait_node.is_finished() {} + } + } + + info!("finished schedule"); + info!("skipped_tx: {}", skipped_tx); + info!("started_tx: {}", started_tx); + info!("finished_tx: {}", finished_tx); + + Ok(()) + } +} + +pub struct WalProposer { + pub node: Arc, + pub txes: Vec, + pub last_committed_tx: usize, + pub commit_lsn: Lsn, +} + +impl WalProposer { + pub fn write_tx(&mut self) -> usize { + let new_ptr = unsafe { MyInsertRecord() }; + + self.node + .network_chan() + .send(NodeEvent::Internal(AnyMessage::LSN(new_ptr as u64))); + + let tx_id = self.txes.len(); + self.txes.push(Lsn(new_ptr as u64)); + + tx_id + } + + /// Updates committed status. + pub fn update(&mut self) -> u64 { + let last_result = self.node.result.lock().clone(); + if last_result.0 != 1 { + // not an LSN update + return 0; + } + + let mut commited_now = 0; + + let lsn_str = last_result.1; + let lsn = Lsn::from_str(&lsn_str); + match lsn { + Ok(lsn) => { + self.commit_lsn = lsn; + info!("commit_lsn: {}", lsn); + + while self.last_committed_tx < self.txes.len() + && self.txes[self.last_committed_tx] <= lsn + { + info!( + "Tx #{} was commited at {}, last_commit_lsn={}", + self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn + ); + commited_now += 1; + self.last_committed_tx += 1; + } + } + Err(e) => { + error!("failed to parse LSN: {:?}", e); + } + } + + commited_now + } + + pub fn stop(&self) { + self.node.crash_stop(); + } +} + +#[derive(Debug)] +pub enum TestAction { + WriteTx(usize), + RestartSafekeeper(usize), + RestartWalProposer, +} + +pub type Schedule = Vec<(u64, TestAction)>; diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index 63a8ed8183..27d5304899 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -15,298 +15,10 @@ use crate::{ MyInsertRecord, WalProposerCleanup, WalProposerRust, }, c_context, - simtest::{safekeeper::run_server, log::{SimClock, init_logger}}, + simtest::{safekeeper::run_server, log::{SimClock, init_logger}, util::TestConfig}, }; -use super::disk::Disk; - -struct SkNode { - node: Arc, - id: u32, - disk: Arc, -} - -impl SkNode { - fn new(node: Arc) -> Self { - let disk = Arc::new(Disk::new()); - let res = Self { - id: node.id, - node, - disk, - }; - res.launch(); - res - } - - fn launch(&self) { - let id = self.id; - let disk = self.disk.clone(); - // start the server thread - self.node.launch(move |os| { - let res = run_server(os, disk); - println!("server {} finished: {:?}", id, res); - }); - } - - fn restart(&self) { - self.node.crash_stop(); - self.launch(); - } -} - -struct TestConfig { - network: NetworkOptions, - timeout: u64, - clock: Option, -} - -impl TestConfig { - fn new(clock: Option) -> Self { - Self { - network: NetworkOptions { - timeout: Some(2000), - connect_delay: Delay { - min: 1, - max: 5, - fail_prob: 0.0, - }, - send_delay: Delay { - min: 1, - max: 5, - fail_prob: 0.0, - }, - }, - timeout: 1_000 * 10, - clock, - } - } - - fn start(&self, seed: u64) -> Test { - let world = Arc::new(World::new( - seed, - Arc::new(self.network.clone()), - c_context(), - )); - world.register_world(); - - if let Some(clock) = &self.clock { - clock.set_world(world.clone()); - } - - let servers = [ - SkNode::new(world.new_node()), - SkNode::new(world.new_node()), - SkNode::new(world.new_node()), - ]; - - let server_ids = [servers[0].id, servers[1].id, servers[2].id]; - let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(","); - let ttid = TenantTimelineId::generate(); - - // wait init for all servers - world.await_all(); - - // clean up pgdata directory - self.init_pgdata(); - - Test { - world, - servers, - safekeepers_guc, - ttid, - timeout: self.timeout, - } - } - - fn init_pgdata(&self) { - let pgdata = Path::new("/home/admin/simulator/libs/walproposer/pgdata"); - if pgdata.exists() { - std::fs::remove_dir_all(pgdata).unwrap(); - } - std::fs::create_dir(pgdata).unwrap(); - - // create empty pg_wal and pg_notify subdirs - std::fs::create_dir(pgdata.join("pg_wal")).unwrap(); - std::fs::create_dir(pgdata.join("pg_notify")).unwrap(); - - // write postgresql.conf - let mut conf = std::fs::File::create(pgdata.join("postgresql.conf")).unwrap(); - let content = " -wal_log_hints=off -hot_standby=on -fsync=off -wal_level=replica -restart_after_crash=off -shared_preload_libraries=neon -neon.pageserver_connstring='' -neon.tenant_id=cc6e67313d57283bad411600fbf5c142 -neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e -synchronous_standby_names=walproposer -neon.safekeepers='node:1,node:2,node:3' -max_connections=100 -"; - - std::io::Write::write_all(&mut conf, content.as_bytes()).unwrap(); - } -} - -struct Test { - world: Arc, - servers: [SkNode; 3], - safekeepers_guc: String, - ttid: TenantTimelineId, - timeout: u64, -} - -impl Test { - fn sync_safekeepers(&self) -> anyhow::Result { - let client_node = self.world.new_node(); - - // start the client thread - let guc = self.safekeepers_guc.clone(); - let ttid = self.ttid.clone(); - client_node.launch(move |_| { - let list = CString::new(guc).unwrap(); - - unsafe { - WalProposerCleanup(); - - syncSafekeepers = true; - wal_acceptors_list = list.into_raw(); - wal_acceptor_reconnect_timeout = 1000; - wal_acceptor_connection_timeout = 5000; - neon_tenant_walproposer = - CString::new(ttid.tenant_id.to_string()).unwrap().into_raw(); - neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string()) - .unwrap() - .into_raw(); - WalProposerRust(); - } - }); - - self.world.await_all(); - - // poll until exit or timeout - let time_limit = self.timeout; - while self.world.step() && self.world.now() < time_limit && !client_node.is_finished() {} - - if !client_node.is_finished() { - anyhow::bail!("timeout or idle stuck"); - } - - let res = client_node.result.lock().clone(); - if res.0 != 0 { - anyhow::bail!("non-zero exitcode: {:?}", res); - } - let lsn = Lsn::from_str(&res.1)?; - Ok(lsn) - } - - fn launch_walproposer(&self, lsn: Lsn) -> WalProposer { - let client_node = self.world.new_node(); - - let lsn = if lsn.0 == 0 { - // usual LSN after basebackup - Lsn(21623024) - } else { - lsn - }; - - // start the client thread - let guc = self.safekeepers_guc.clone(); - let ttid = self.ttid.clone(); - client_node.launch(move |_| { - let list = CString::new(guc).unwrap(); - - unsafe { - WalProposerCleanup(); - - sim_redo_start_lsn = lsn.0; - syncSafekeepers = false; - wal_acceptors_list = list.into_raw(); - wal_acceptor_reconnect_timeout = 1000; - wal_acceptor_connection_timeout = 5000; - neon_tenant_walproposer = - CString::new(ttid.tenant_id.to_string()).unwrap().into_raw(); - neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string()) - .unwrap() - .into_raw(); - WalProposerRust(); - } - }); - - self.world.await_all(); - - WalProposer { - node: client_node, - txes: Vec::new(), - last_committed_tx: 0, - commit_lsn: Lsn(0), - } - } - - fn poll_for_duration(&self, duration: u64) { - let time_limit = std::cmp::min(self.world.now() + duration, self.timeout); - while self.world.step() && self.world.now() < time_limit {} - } -} - -struct WalProposer { - node: Arc, - txes: Vec, - last_committed_tx: usize, - commit_lsn: Lsn, -} - -impl WalProposer { - fn write_tx(&mut self) -> usize { - let new_ptr = unsafe { MyInsertRecord() }; - - self.node - .network_chan() - .send(NodeEvent::Internal(AnyMessage::LSN(new_ptr as u64))); - - let tx_id = self.txes.len(); - self.txes.push(Lsn(new_ptr as u64)); - - tx_id - } - - /// Updates committed status. - fn update(&mut self) { - let last_result = self.node.result.lock().clone(); - if last_result.0 != 1 { - // not an LSN update - return; - } - - let lsn_str = last_result.1; - let lsn = Lsn::from_str(&lsn_str); - match lsn { - Ok(lsn) => { - self.commit_lsn = lsn; - println!("commit_lsn: {}", lsn); - - while self.last_committed_tx < self.txes.len() - && self.txes[self.last_committed_tx] <= lsn - { - println!( - "Tx #{} was commited at {}, last_commit_lsn={}", - self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn - ); - self.last_committed_tx += 1; - } - } - Err(e) => { - println!("failed to parse LSN: {:?}", e); - } - } - } - - fn stop(&self) { - self.node.crash_stop(); - } -} +use super::{disk::Disk, util::{Schedule, TestAction}}; #[test] fn sync_empty_safekeepers() { @@ -402,3 +114,31 @@ fn test_simple_restart() { let lsn = test.sync_safekeepers().unwrap(); println!("Sucessfully synced safekeepers at {}", lsn); } + +#[test] +fn test_simple_schedule() { + let clock = init_logger(); + let mut config = TestConfig::new(Some(clock)); + config.network.keepalive_timeout = Some(100); + let test = config.start(1337); + + let schedule: Schedule = vec![ + (0, TestAction::RestartWalProposer), + (50, TestAction::WriteTx(5)), + (100, TestAction::RestartSafekeeper(0)), + (100, TestAction::WriteTx(5)), + (110, TestAction::RestartSafekeeper(1)), + (110, TestAction::WriteTx(5)), + (120, TestAction::RestartSafekeeper(2)), + (120, TestAction::WriteTx(5)), + (201, TestAction::RestartWalProposer), + (251, TestAction::RestartSafekeeper(0)), + (251, TestAction::RestartSafekeeper(1)), + (251, TestAction::RestartSafekeeper(2)), + (251, TestAction::WriteTx(5)), + (255, TestAction::WriteTx(5)), + (1000, TestAction::WriteTx(5)), + ]; + + test.run_schedule(&schedule).unwrap(); +} diff --git a/safekeeper/src/simlib/network.rs b/safekeeper/src/simlib/network.rs index f2aa9c2c84..b90edb3f50 100644 --- a/safekeeper/src/simlib/network.rs +++ b/safekeeper/src/simlib/network.rs @@ -54,7 +54,7 @@ impl Delay { #[derive(Clone)] pub struct NetworkOptions { /// Connection will be automatically closed after this timeout. - pub timeout: Option, + pub keepalive_timeout: Option, pub connect_delay: Delay, pub send_delay: Delay, } @@ -117,7 +117,7 @@ impl VirtualConnection { /// Notify the future about the possible timeout. fn schedule_timeout(self: &Arc) { - if let Some(timeout) = self.options.timeout { + if let Some(timeout) = self.options.keepalive_timeout { self.world.schedule(timeout, self.as_event()); } } @@ -139,7 +139,7 @@ impl VirtualConnection { // Close the one side of the connection by timeout if the node // has not received any messages for a long time. - if let Some(timeout) = self.options.timeout { + if let Some(timeout) = self.options.keepalive_timeout { let mut to_close = [false, false]; for direction in 0..2 { let node_idx = direction ^ 1; diff --git a/safekeeper/src/simlib/sync.rs b/safekeeper/src/simlib/sync.rs index d731fae202..aebff1af67 100644 --- a/safekeeper/src/simlib/sync.rs +++ b/safekeeper/src/simlib/sync.rs @@ -163,7 +163,7 @@ impl Park { } if state.panic { - panic!("thread was crashed by the simulation"); + panic!("node {} was crashed by the simulation", node.id); } // println!("PARKING ENDED: node {:?}", node.id); diff --git a/safekeeper/src/simlib/time.rs b/safekeeper/src/simlib/time.rs index 9e8c224599..6a3b5856ed 100644 --- a/safekeeper/src/simlib/time.rs +++ b/safekeeper/src/simlib/time.rs @@ -143,3 +143,10 @@ impl Debug for NetworkEvent { .finish() } } + +#[derive(Copy, Clone, Debug)] +pub struct EmptyEvent; + +impl Event for EmptyEvent { + fn process(&self) {} +} diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index e13dcbffde..dbaa2375d0 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -1,4 +1,5 @@ use rand::{rngs::StdRng, Rng, SeedableRng}; +use tracing::debug; use std::{ cell::RefCell, ops::DerefMut, @@ -71,7 +72,6 @@ impl World { /// Create a new node. pub fn new_node(self: &Arc) -> Arc { - // TODO: verify let mut nodes = self.nodes.lock(); let id = nodes.len() as NodeId; let node = Arc::new(Node::new(id, self.clone(), self.new_rng())); @@ -202,14 +202,14 @@ impl World { /// Print full world state to stdout. pub fn debug_print_state(&self) { - println!( - "[DEBUG] World state, nodes.len()={:?}, parking.len()={:?}", + debug!( + "World state, nodes.len()={:?}, parking.len()={:?}", self.nodes.lock().len(), self.unconditional_parking.lock().len() ); for node in self.nodes.lock().iter() { - println!( - "[DEBUG] node id={:?} status={:?}", + debug!( + "node id={:?} status={:?}", node.id, node.status.lock() ); @@ -424,7 +424,7 @@ impl Node { NodeStatus::Waiting | NodeStatus::Parked => {} } - println!("Node {} is crashing, status={:?}", self.id, status); + debug!("Node {} is crashing, status={:?}", self.id, status); self.world.debug_print_state(); let park = self.world.find_parked_node(self); diff --git a/safekeeper/src/simtest/mod.rs b/safekeeper/src/simtest/mod.rs index 558d6e0781..37b04219f6 100644 --- a/safekeeper/src/simtest/mod.rs +++ b/safekeeper/src/simtest/mod.rs @@ -27,7 +27,7 @@ mod tests { }; let network = NetworkOptions { - timeout: Some(50), + keepalive_timeout: Some(50), connect_delay: delay.clone(), send_delay: delay.clone(), };