diff --git a/libs/walproposer/libpqwalproposer.c b/libs/walproposer/libpqwalproposer.c index 7f802216c5..d5c0ff76c2 100644 --- a/libs/walproposer/libpqwalproposer.c +++ b/libs/walproposer/libpqwalproposer.c @@ -18,7 +18,7 @@ struct WalProposerConn static bool ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking) { - walprop_log(INFO, "not implemented"); + walprop_log(LOG, "not implemented"); return false; } @@ -26,14 +26,14 @@ ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking) char * walprop_error_message(WalProposerConn *conn) { - walprop_log(INFO, "not implemented"); + walprop_log(LOG, "not implemented"); return NULL; } WalProposerConnStatusType walprop_status(WalProposerConn *conn) { - walprop_log(INFO, "not implemented: walprop_status"); + walprop_log(LOG, "not implemented: walprop_status"); return WP_CONNECTION_OK; } @@ -42,7 +42,7 @@ walprop_connect_start(char *conninfo) { WalProposerConn *conn; - walprop_log(INFO, "walprop_connect_start: %s", conninfo); + walprop_log(LOG, "walprop_connect_start: %s", conninfo); const char *connstr_prefix = "host=node port="; Assert(strncmp(conninfo, connstr_prefix, strlen(connstr_prefix)) == 0); @@ -57,21 +57,21 @@ walprop_connect_start(char *conninfo) WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn) { - walprop_log(INFO, "not implemented: walprop_connect_poll"); + walprop_log(LOG, "not implemented: walprop_connect_poll"); return WP_CONN_POLLING_OK; } bool walprop_send_query(WalProposerConn *conn, char *query) { - walprop_log(INFO, "not implemented: walprop_send_query"); + walprop_log(LOG, "not implemented: walprop_send_query"); return true; } WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn) { - walprop_log(INFO, "not implemented: walprop_get_query_result"); + walprop_log(LOG, "not implemented: walprop_get_query_result"); return WP_EXEC_SUCCESS_COPYBOTH; } @@ -84,14 +84,14 @@ walprop_socket(WalProposerConn *conn) int walprop_flush(WalProposerConn *conn) { - walprop_log(INFO, "not implemented"); + walprop_log(LOG, "not implemented"); return 0; } void walprop_finish(WalProposerConn *conn) { - walprop_log(INFO, "walprop_finish not implemented"); + walprop_log(LOG, "walprop_finish not implemented"); } /* @@ -113,7 +113,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) event = sim_epoll_rcv(0); - walprop_log(INFO, "walprop_async_read, T: %d, tcp: %d, tag: %d", (int) event.tag, (int) event.tcp, (int) event.any_message); + walprop_log(LOG, "walprop_async_read, T: %d, tcp: %d, tag: %d", (int) event.tag, (int) event.tcp, (int) event.any_message); Assert(event.tcp == conn->tcp); Assert(event.tag == Message); Assert(event.any_message == Bytes); @@ -121,7 +121,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) msg = (char*) sim_msg_get_bytes(&len); *buf = msg; *amount = len; - walprop_log(INFO, "walprop_async_read: %d", (int) len); + walprop_log(LOG, "walprop_async_read: %d", (int) len); return PG_ASYNC_READ_SUCCESS; } @@ -129,7 +129,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) { - walprop_log(INFO, "walprop_async_write"); + walprop_log(LOG, "walprop_async_write"); sim_msg_set_bytes(buf, size); sim_tcp_send(conn->tcp); return PG_ASYNC_WRITE_SUCCESS; @@ -142,7 +142,7 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) { - walprop_log(INFO, "walprop_blocking_write"); + walprop_log(LOG, "walprop_blocking_write"); sim_msg_set_bytes(buf, size); sim_tcp_send(conn->tcp); return true; @@ -151,7 +151,7 @@ walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) void sim_start_replication(XLogRecPtr startptr) { - walprop_log(INFO, "sim_start_replication: %X/%X", LSN_FORMAT_ARGS(startptr)); + walprop_log(LOG, "sim_start_replication: %X/%X", LSN_FORMAT_ARGS(startptr)); sim_latest_available_lsn = startptr; for (;;) diff --git a/libs/walproposer/src/lib.rs b/libs/walproposer/src/lib.rs index a27c58fd1b..d982c0f661 100644 --- a/libs/walproposer/src/lib.rs +++ b/libs/walproposer/src/lib.rs @@ -3,6 +3,7 @@ #![allow(non_snake_case)] use safekeeper::simlib::node_os::NodeOs; +use tracing::info; pub mod bindings { include!(concat!(env!("OUT_DIR"), "/bindings.rs")); @@ -10,8 +11,8 @@ pub mod bindings { #[no_mangle] pub extern "C" fn rust_function(a: u32) { - println!("Hello from Rust!"); - println!("a: {}", a); + info!("Hello from Rust!"); + info!("a: {}", a); } pub mod sim; diff --git a/libs/walproposer/src/sim.rs b/libs/walproposer/src/sim.rs index 4fc5a521dc..c2537f2883 100644 --- a/libs/walproposer/src/sim.rs +++ b/libs/walproposer/src/sim.rs @@ -1,3 +1,4 @@ +use log::debug; use safekeeper::simlib::{network::TCP, node_os::NodeOs, world::NodeEvent}; use std::{ cell::RefCell, @@ -208,7 +209,7 @@ pub extern "C" fn sim_now() -> i64 { #[no_mangle] pub extern "C" fn sim_exit(code: i32, msg: *const u8) { - println!("sim_exit({}, {:?})", code, msg); + debug!("sim_exit({}, {:?})", code, msg); sim_set_result(code, msg); // I tried to make use of pthread_exit, but it doesn't work. diff --git a/libs/walproposer/src/simtest/log.rs b/libs/walproposer/src/simtest/log.rs index f04e56f968..812154ee6c 100644 --- a/libs/walproposer/src/simtest/log.rs +++ b/libs/walproposer/src/simtest/log.rs @@ -2,6 +2,7 @@ use std::{sync::Arc, fmt}; use safekeeper::simlib::{world::World, sync::Mutex}; use tracing_subscriber::fmt::{time::FormatTime, format::Writer}; +use utils::logging; #[derive(Clone)] @@ -42,8 +43,12 @@ pub fn init_logger() -> SimClock { .with_target(false) .with_timer(clock.clone()) .with_ansi(true) + // .with_max_level(tracing::Level::DEBUG) .with_writer(std::io::stdout); base_logger.init(); + // logging::replace_panic_hook_with_tracing_panic_hook().forget(); + std::panic::set_hook(Box::new(|_| {})); + clock } diff --git a/libs/walproposer/src/simtest/safekeeper.rs b/libs/walproposer/src/simtest/safekeeper.rs index e17a1d0f62..5a106274a3 100644 --- a/libs/walproposer/src/simtest/safekeeper.rs +++ b/libs/walproposer/src/simtest/safekeeper.rs @@ -16,6 +16,7 @@ use safekeeper::{ timeline::TimelineError, SafeKeeperConf, }; +use tracing::debug; use utils::{ id::{NodeId, TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, @@ -52,7 +53,7 @@ impl GlobalMap { let mut timelines = HashMap::new(); for (&ttid, disk) in disk.timelines.lock().iter() { - info!("loading timeline {}", ttid); + debug!("loading timeline {}", ttid); let state = disk.state.lock().clone(); if state.server.wal_seg_size == 0 { @@ -96,7 +97,7 @@ impl GlobalMap { bail!("timeline {} already exists", ttid); } - info!("creating new timeline {}", ttid); + debug!("creating new timeline {}", ttid); let commit_lsn = Lsn::INVALID; let local_start_lsn = Lsn::INVALID; @@ -146,7 +147,7 @@ impl GlobalMap { } pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { - println!("started server {}", os.id()); + debug!("started server {}", os.id()); let conf = SafeKeeperConf { workdir: PathBuf::from("."), my_id: NodeId(os.id() as u64), @@ -194,11 +195,11 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { if let Some(conn) = conn { let res = conn.process_any(msg, &mut global); if res.is_err() { - println!("conn {:?} error: {:#}", tcp, res.unwrap_err()); + debug!("conn {:?} error: {:#}", tcp, res.unwrap_err()); conns.remove(&tcp.id()); } } else { - println!("conn {:?} was closed, dropping msg {:?}", tcp, msg); + debug!("conn {:?} was closed, dropping msg {:?}", tcp, msg); } } NodeEvent::Internal(_) => {} @@ -213,7 +214,7 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { conns.retain(|_, conn| { let res = conn.flush(&mut global); if res.is_err() { - println!("conn {:?} error: {:?}", conn.tcp, res); + debug!("conn {:?} error: {:?}", conn.tcp, res); } res.is_ok() }); @@ -230,7 +231,7 @@ impl ConnState { } let msg = ProposerAcceptorMessage::parse(copy_data)?; - // println!("got msg: {:?}", msg); + debug!("got msg: {:?}", msg); return self.process(msg, global); } else { bail!("unexpected message, expected AnyMessage::Bytes"); @@ -283,7 +284,7 @@ impl ConnState { match msg { ProposerAcceptorMessage::Greeting(ref greeting) => { - info!( + debug!( "start handshake with walproposer {:?}", self.tcp, ); @@ -343,13 +344,9 @@ impl ConnState { // // TODO: // } - // println!("sending reply: {:?}", reply); - let mut buf = BytesMut::with_capacity(128); reply.serialize(&mut buf)?; - // println!("sending reply len={}: {}", buf.len(), hex::encode(&buf)); - self.tcp.send(AnyMessage::Bytes(buf.into())); } Ok(()) @@ -358,7 +355,7 @@ impl ConnState { impl Drop for ConnState { fn drop(&mut self) { - println!("dropping conn: {:?}", self.tcp); + debug!("dropping conn: {:?}", self.tcp); if !std::thread::panicking() { self.tcp.close(); } diff --git a/libs/walproposer/src/simtest/util.rs b/libs/walproposer/src/simtest/util.rs index 60d9de6527..57df69596b 100644 --- a/libs/walproposer/src/simtest/util.rs +++ b/libs/walproposer/src/simtest/util.rs @@ -6,7 +6,7 @@ use safekeeper::simlib::{ world::World, world::{Node, NodeEvent}, time::EmptyEvent, }; -use tracing::{info, error, warn}; +use tracing::{info, error, warn, debug}; use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::{ @@ -45,7 +45,7 @@ impl SkNode { // start the server thread self.node.launch(move |os| { let res = run_server(os, disk); - println!("server {} finished: {:?}", id, res); + debug!("server {} finished: {:?}", id, res); }); } @@ -101,7 +101,6 @@ impl TestConfig { ]; let server_ids = [servers[0].id, servers[1].id, servers[2].id]; - info!("safekeepers: {:?}", server_ids); let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(","); let ttid = TenantTimelineId::generate(); @@ -164,7 +163,7 @@ pub struct Test { impl Test { fn launch_sync(&self) -> Arc { let client_node = self.world.new_node(); - info!("sync-safekeepers started at node {}", client_node.id); + debug!("sync-safekeepers started at node {}", client_node.id); // start the client thread let guc = self.safekeepers_guc.clone(); @@ -301,10 +300,10 @@ impl Test { anyhow::bail!("non-zero exitcode: {:?}", res); } let lsn = Lsn::from_str(&res.1)?; - info!("sync-safekeepers finished at LSN {}", lsn); + debug!("sync-safekeepers finished at LSN {}", lsn); wp = self.launch_walproposer(lsn); wait_node = wp.node.clone(); - info!("walproposer started at node {}", wait_node.id); + debug!("walproposer started at node {}", wait_node.id); sync_in_progress = false; } @@ -322,18 +321,18 @@ impl Test { started_tx += 1; wp.write_tx(); } - info!("written {} transactions", size); + debug!("written {} transactions", size); } else { skipped_tx += size; - info!("skipped {} transactions", size); + debug!("skipped {} transactions", size); } } TestAction::RestartSafekeeper(id) => { - info!("restarting safekeeper {}", id); + debug!("restarting safekeeper {}", id); self.servers[*id as usize].restart(); } TestAction::RestartWalProposer => { - info!("restarting walproposer"); + debug!("restarting walproposer"); wait_node.crash_stop(); sync_in_progress = true; wait_node = self.launch_sync(); @@ -346,7 +345,6 @@ impl Test { break; } let next_event_time = schedule[schedule_ptr].0; - info!("next event at {}, polling", next_event_time); // poll until the next event if wait_node.is_finished() { @@ -356,10 +354,10 @@ impl Test { } } - info!("finished schedule"); - info!("skipped_tx: {}", skipped_tx); - info!("started_tx: {}", started_tx); - info!("finished_tx: {}", finished_tx); + debug!("finished schedule"); + debug!("skipped_tx: {}", skipped_tx); + debug!("started_tx: {}", started_tx); + debug!("finished_tx: {}", finished_tx); Ok(()) } @@ -401,12 +399,12 @@ impl WalProposer { match lsn { Ok(lsn) => { self.commit_lsn = lsn; - info!("commit_lsn: {}", lsn); + debug!("commit_lsn: {}", lsn); while self.last_committed_tx < self.txes.len() && self.txes[self.last_committed_tx] <= lsn { - info!( + debug!( "Tx #{} was commited at {}, last_commit_lsn={}", self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn ); diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index 27d5304899..088c2943b9 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -1,11 +1,13 @@ use std::{ffi::CString, path::Path, str::FromStr, sync::Arc}; +use rand::Rng; use safekeeper::simlib::{ network::{Delay, NetworkOptions}, proto::AnyMessage, world::World, world::{Node, NodeEvent}, }; +use tracing::info; use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::{ @@ -28,11 +30,11 @@ fn sync_empty_safekeepers() { let lsn = test.sync_safekeepers().unwrap(); assert_eq!(lsn, Lsn(0)); - println!("Sucessfully synced empty safekeepers at 0/0"); + info!("Sucessfully synced empty safekeepers at 0/0"); let lsn = test.sync_safekeepers().unwrap(); assert_eq!(lsn, Lsn(0)); - println!("Sucessfully synced (again) empty safekeepers at 0/0"); + info!("Sucessfully synced (again) empty safekeepers at 0/0"); } #[test] @@ -44,7 +46,7 @@ fn run_walproposer_generate_wal() { let lsn = test.sync_safekeepers().unwrap(); assert_eq!(lsn, Lsn(0)); - println!("Sucessfully synced empty safekeepers at 0/0"); + info!("Sucessfully synced empty safekeepers at 0/0"); let mut wp = test.launch_walproposer(lsn); @@ -66,7 +68,7 @@ fn crash_safekeeper() { let lsn = test.sync_safekeepers().unwrap(); assert_eq!(lsn, Lsn(0)); - println!("Sucessfully synced empty safekeepers at 0/0"); + info!("Sucessfully synced empty safekeepers at 0/0"); let mut wp = test.launch_walproposer(lsn); @@ -95,7 +97,7 @@ fn test_simple_restart() { let lsn = test.sync_safekeepers().unwrap(); assert_eq!(lsn, Lsn(0)); - println!("Sucessfully synced empty safekeepers at 0/0"); + info!("Sucessfully synced empty safekeepers at 0/0"); let mut wp = test.launch_walproposer(lsn); @@ -112,7 +114,7 @@ fn test_simple_restart() { drop(wp); let lsn = test.sync_safekeepers().unwrap(); - println!("Sucessfully synced safekeepers at {}", lsn); + info!("Sucessfully synced safekeepers at {}", lsn); } #[test] @@ -141,4 +143,21 @@ fn test_simple_schedule() { ]; test.run_schedule(&schedule).unwrap(); + info!("Test finished, stopping all threads"); + test.world.stop_all(); +} + +#[test] +fn test_random_schedules() { + let clock = init_logger(); + let mut config = TestConfig::new(Some(clock)); + config.network.keepalive_timeout = Some(100); + + for i in 0..1000 { + let seed: u64 = rand::thread_rng().gen(); + let test = config.start(seed); + info!("Running test with seed {}", seed); + + test.world.stop_all(); + } } diff --git a/libs/walproposer/src/test.rs b/libs/walproposer/src/test.rs index 3723f13da6..9f5760711b 100644 --- a/libs/walproposer/src/test.rs +++ b/libs/walproposer/src/test.rs @@ -1,3 +1,5 @@ +use tracing::info; + use crate::bindings::{TestFunc, MyContextInit}; #[test] @@ -9,7 +11,7 @@ fn test_rust_c_calls() { }; res }).join().unwrap(); - println!("res: {}", res); + info!("res: {}", res); } #[test] diff --git a/libs/walproposer/test.c b/libs/walproposer/test.c index a32ff2124a..dcda54552e 100644 --- a/libs/walproposer/test.c +++ b/libs/walproposer/test.c @@ -80,7 +80,7 @@ void MyContextInit() { if (!SelectConfigFiles(NULL, progname)) exit(1); - log_min_messages = LOG; + log_min_messages = FATAL; Log_line_prefix = "[%p] "; InitializeMaxBackends(); diff --git a/safekeeper/src/simlib/network.rs b/safekeeper/src/simlib/network.rs index b90edb3f50..a14df40a61 100644 --- a/safekeeper/src/simlib/network.rs +++ b/safekeeper/src/simlib/network.rs @@ -6,6 +6,7 @@ use std::{ }; use rand::{rngs::StdRng, Rng}; +use tracing::debug; use super::{ chan::Chan, @@ -151,9 +152,9 @@ impl VirtualConnection { } if let Some(last_recv) = buffer.last_recv { if now - last_recv >= timeout { - println!( - "NET(time={}): connection {} timed out at node {}", - now, self.connection_id, node.id + debug!( + "NET: connection {} timed out at node {}", + self.connection_id, node.id ); to_close[node_idx] = true; } @@ -186,9 +187,9 @@ impl VirtualConnection { let msg = buffer.buf.pop_front().unwrap().1; let callback = TCP::new(self.clone(), direction ^ 1); - // println!( - // "NET(time={}): {:?} delivered, {}=>{}", - // now, msg, from_node.id, to_node.id + // debug!( + // "NET: {:?} delivered, {}=>{}", + // msg, from_node.id, to_node.id // ); buffer.last_recv = Some(now); self.schedule_timeout(); @@ -214,7 +215,7 @@ impl VirtualConnection { let buffer = &mut state.buffers[direction as usize]; if buffer.send_closed { - println!( + debug!( "NET: TCP #{} dropped message {:?} (broken pipe)", self.connection_id, msg ); @@ -222,7 +223,7 @@ impl VirtualConnection { } if close { - println!( + debug!( "NET: TCP #{} dropped message {:?} (pipe just broke)", self.connection_id, msg ); @@ -231,7 +232,7 @@ impl VirtualConnection { } if buffer.recv_closed { - println!( + debug!( "NET: TCP #{} dropped message {:?} (recv closed)", self.connection_id, msg ); @@ -257,7 +258,7 @@ impl VirtualConnection { let delay = if let Some(ms) = delay { ms } else { - println!("NET: TCP #{} dropped connect", self.connection_id); + debug!("NET: TCP #{} dropped connect", self.connection_id); buffer.send_closed = true; return; }; @@ -283,20 +284,20 @@ impl VirtualConnection { let mut state = self.state.lock(); let recv_buffer = &mut state.buffers[1 ^ node_idx]; if recv_buffer.recv_closed { - println!( + debug!( "NET: TCP #{} closed twice at node {}", self.connection_id, node.id ); return; } - println!( + debug!( "NET: TCP #{} closed at node {}", self.connection_id, node.id ); recv_buffer.recv_closed = true; for msg in recv_buffer.buf.drain(..) { - println!( + debug!( "NET: TCP #{} dropped message {:?} (closed)", self.connection_id, msg ); diff --git a/safekeeper/src/simlib/sync.rs b/safekeeper/src/simlib/sync.rs index aebff1af67..e455cf3136 100644 --- a/safekeeper/src/simlib/sync.rs +++ b/safekeeper/src/simlib/sync.rs @@ -1,5 +1,7 @@ use std::{backtrace::Backtrace, sync::Arc}; +use tracing::debug; + use super::world::{Node, NodeId, World}; pub type Mutex = parking_lot::Mutex; @@ -123,15 +125,12 @@ impl Park { if state.can_continue { // unconditional parking - // println!("YIELD PARKING: node {:?}", node.id); parking_lot::MutexGuard::unlocked(&mut state, || { // first put to world parking, then decrease the running threads counter node.internal_parking_middle(self.clone()); }); } else { - // println!("AWAIT PARKING: node {:?}", node.id); - parking_lot::MutexGuard::unlocked(&mut state, || { // conditional parking, decrease the running threads counter without parking node.internal_parking_start(self.clone()); @@ -146,7 +145,6 @@ impl Park { panic!("thread was crashed by the simulation"); } - // println!("CONDITION MET: node {:?}", node.id); // condition is met, we are now running instead of the waker thread. // the next thing is to park the thread in the world, then decrease // the running threads counter @@ -166,8 +164,6 @@ impl Park { panic!("node {} was crashed by the simulation", node.id); } - // println!("PARKING ENDED: node {:?}", node.id); - // We are the only running thread now, we just need to update the state, // and continue the execution. node.internal_parking_end(); @@ -178,7 +174,6 @@ impl Park { let park = Park::new(true); let node = Node::current(); Self::init_state(&mut park.lock.lock(), &node); - // println!("PARKING MIDDLE alt: node {:?}", node.id); node.internal_parking_ahead(park.clone()); park } @@ -194,7 +189,7 @@ impl Park { let mut state = self.lock.lock(); if state.can_continue { - println!( + debug!( "WARN wake() called on a thread that is already waked, node {:?}", state.node_id ); @@ -220,7 +215,7 @@ impl Park { let mut state = self.lock.lock(); if state.can_continue { - println!( + debug!( "WARN external_wake() called on a thread that is already waked, node {:?}", state.node_id ); @@ -237,7 +232,7 @@ impl Park { pub fn internal_world_wake(&self) { let mut state = self.lock.lock(); if state.finished { - println!( + debug!( "WARN internal_world_wake() called on a thread that is already waked, node {:?}", state.node_id ); @@ -260,8 +255,8 @@ impl Park { /// Print debug info about the parked thread. pub fn debug_print(&self) { // let state = self.lock.lock(); - // println!("PARK: node {:?} wake1={} wake2={}", state.node_id, state.can_continue, state.finished); - // println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace); + // debug!("PARK: node {:?} wake1={} wake2={}", state.node_id, state.can_continue, state.finished); + // debug!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace); } /// It feels that this function can cause deadlocks. diff --git a/safekeeper/src/simlib/time.rs b/safekeeper/src/simlib/time.rs index 6a3b5856ed..a4127580bd 100644 --- a/safekeeper/src/simlib/time.rs +++ b/safekeeper/src/simlib/time.rs @@ -35,7 +35,6 @@ impl Timing { if !self.is_event_ready() { let next_time = self.timers.peek().unwrap().time; - // println!("CLK(time={})", next_time); self.current_time = next_time; assert!(self.is_event_ready()); } diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index dbaa2375d0..30d6583dfe 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -98,6 +98,13 @@ impl World { } } + pub fn stop_all(&self) { + let nodes = self.nodes.lock().clone(); + for node in nodes { + node.crash_stop(); + } + } + /// Returns a writable end of a TCP connection, to send src->dst messages. pub fn open_tcp(self: &Arc, src: &Arc, dst: NodeId) -> TCP { // TODO: replace unwrap() with /dev/null socket. @@ -168,7 +175,7 @@ impl World { // First try to wake up unconditional thread. let to_resume = self.thread_to_unpark(); if let Some(park) = to_resume { - // println!("Waking up park at node {:?}", park.node_id()); + // debug!("Waking up park at node {:?}", park.node_id()); // Wake up the chosen thread. To do that: // 1. Increment the counter of running threads. @@ -189,7 +196,7 @@ impl World { // when code is waiting for external events. let time_event = self.timing.lock().step(); if let Some(event) = time_event { - // println!("Processing event: {:?}", event.event); + // debug!("Processing event: {:?}", event.event); event.process(); // to have a clean state after each step, wait for all threads to finish @@ -327,22 +334,22 @@ impl Node { *status = NodeStatus::Running; drop(status); - // park the current thread, [`launch`] will wait until it's parked - Park::yield_thread(); - - if let Some(nodes_init) = world.nodes_init.as_ref() { - nodes_init(NodeOs::new(world.clone(), node.clone())); - } - let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + // park the current thread, [`launch`] will wait until it's parked + Park::yield_thread(); + + if let Some(nodes_init) = world.nodes_init.as_ref() { + nodes_init(NodeOs::new(world.clone(), node.clone())); + } + f(NodeOs::new(world, node.clone())); })); match res { Ok(_) => { - println!("Node {} finished successfully", node.id); + debug!("Node {} finished successfully", node.id); } Err(e) => { - println!("Node {} finished with panic: {:?}", node.id, e); + debug!("Node {} finished with panic: {:?}", node.id, e); } } @@ -425,7 +432,6 @@ impl Node { } debug!("Node {} is crashing, status={:?}", self.id, status); - self.world.debug_print_state(); let park = self.world.find_parked_node(self); diff --git a/safekeeper/src/simtest/client.rs b/safekeeper/src/simtest/client.rs index d6f977b0e1..962c456e3b 100644 --- a/safekeeper/src/simtest/client.rs +++ b/safekeeper/src/simtest/client.rs @@ -1,3 +1,5 @@ +use tracing::info; + use crate::simlib::{ node_os::NodeOs, proto::{AnyMessage, ReplCell}, @@ -6,7 +8,7 @@ use crate::simlib::{ /// Copy all data from array to the remote node. pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) { - println!("started client"); + info!("started client"); let epoll = os.epoll(); let mut delivered = 0; @@ -15,7 +17,7 @@ pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) { while delivered < data.len() { let num = &data[delivered]; - println!("sending data: {:?}", num.clone()); + info!("sending data: {:?}", num.clone()); sock.send(AnyMessage::ReplCell(num.clone())); // loop { @@ -27,7 +29,7 @@ pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) { } } NodeEvent::Closed(_) => { - println!("connection closed, reestablishing"); + info!("connection closed, reestablishing"); sock = os.open_tcp(dst); } _ => {} @@ -38,9 +40,9 @@ pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) { let sock = os.open_tcp(dst); for num in data { - println!("sending data: {:?}", num.clone()); + info!("sending data: {:?}", num.clone()); sock.send(AnyMessage::ReplCell(num.clone())); } - println!("sent all data and finished client"); + info!("sent all data and finished client"); } diff --git a/safekeeper/src/simtest/server.rs b/safekeeper/src/simtest/server.rs index d55afec545..0ab662e04d 100644 --- a/safekeeper/src/simtest/server.rs +++ b/safekeeper/src/simtest/server.rs @@ -1,3 +1,5 @@ +use tracing::info; + use crate::simlib::{node_os::NodeOs, proto::AnyMessage, world::NodeEvent}; use super::disk::Storage; @@ -23,17 +25,17 @@ use super::disk::Storage; // } pub fn run_server(os: NodeOs, mut storage: Box>) { - println!("started server"); + info!("started server"); let epoll = os.epoll(); loop { let event = epoll.recv(); - println!("got event: {:?}", event); + info!("got event: {:?}", event); match event { NodeEvent::Message((msg, tcp)) => match msg { AnyMessage::ReplCell(cell) => { if cell.seqno != storage.flush_pos() { - println!("got out of order data: {:?}", cell); + info!("got out of order data: {:?}", cell); continue; } storage.write(cell.value);