Fix excessive logs

This commit is contained in:
Arthur Petukhovsky
2023-08-24 17:25:44 +00:00
parent 420d3bc18f
commit f8729f046d
15 changed files with 129 additions and 101 deletions

View File

@@ -18,7 +18,7 @@ struct WalProposerConn
static bool
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
{
walprop_log(INFO, "not implemented");
walprop_log(LOG, "not implemented");
return false;
}
@@ -26,14 +26,14 @@ ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
char *
walprop_error_message(WalProposerConn *conn)
{
walprop_log(INFO, "not implemented");
walprop_log(LOG, "not implemented");
return NULL;
}
WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
walprop_log(INFO, "not implemented: walprop_status");
walprop_log(LOG, "not implemented: walprop_status");
return WP_CONNECTION_OK;
}
@@ -42,7 +42,7 @@ walprop_connect_start(char *conninfo)
{
WalProposerConn *conn;
walprop_log(INFO, "walprop_connect_start: %s", conninfo);
walprop_log(LOG, "walprop_connect_start: %s", conninfo);
const char *connstr_prefix = "host=node port=";
Assert(strncmp(conninfo, connstr_prefix, strlen(connstr_prefix)) == 0);
@@ -57,21 +57,21 @@ walprop_connect_start(char *conninfo)
WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
walprop_log(INFO, "not implemented: walprop_connect_poll");
walprop_log(LOG, "not implemented: walprop_connect_poll");
return WP_CONN_POLLING_OK;
}
bool
walprop_send_query(WalProposerConn *conn, char *query)
{
walprop_log(INFO, "not implemented: walprop_send_query");
walprop_log(LOG, "not implemented: walprop_send_query");
return true;
}
WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
walprop_log(INFO, "not implemented: walprop_get_query_result");
walprop_log(LOG, "not implemented: walprop_get_query_result");
return WP_EXEC_SUCCESS_COPYBOTH;
}
@@ -84,14 +84,14 @@ walprop_socket(WalProposerConn *conn)
int
walprop_flush(WalProposerConn *conn)
{
walprop_log(INFO, "not implemented");
walprop_log(LOG, "not implemented");
return 0;
}
void
walprop_finish(WalProposerConn *conn)
{
walprop_log(INFO, "walprop_finish not implemented");
walprop_log(LOG, "walprop_finish not implemented");
}
/*
@@ -113,7 +113,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
event = sim_epoll_rcv(0);
walprop_log(INFO, "walprop_async_read, T: %d, tcp: %d, tag: %d", (int) event.tag, (int) event.tcp, (int) event.any_message);
walprop_log(LOG, "walprop_async_read, T: %d, tcp: %d, tag: %d", (int) event.tag, (int) event.tcp, (int) event.any_message);
Assert(event.tcp == conn->tcp);
Assert(event.tag == Message);
Assert(event.any_message == Bytes);
@@ -121,7 +121,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
msg = (char*) sim_msg_get_bytes(&len);
*buf = msg;
*amount = len;
walprop_log(INFO, "walprop_async_read: %d", (int) len);
walprop_log(LOG, "walprop_async_read: %d", (int) len);
return PG_ASYNC_READ_SUCCESS;
}
@@ -129,7 +129,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
PGAsyncWriteResult
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
{
walprop_log(INFO, "walprop_async_write");
walprop_log(LOG, "walprop_async_write");
sim_msg_set_bytes(buf, size);
sim_tcp_send(conn->tcp);
return PG_ASYNC_WRITE_SUCCESS;
@@ -142,7 +142,7 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
walprop_log(INFO, "walprop_blocking_write");
walprop_log(LOG, "walprop_blocking_write");
sim_msg_set_bytes(buf, size);
sim_tcp_send(conn->tcp);
return true;
@@ -151,7 +151,7 @@ walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
void
sim_start_replication(XLogRecPtr startptr)
{
walprop_log(INFO, "sim_start_replication: %X/%X", LSN_FORMAT_ARGS(startptr));
walprop_log(LOG, "sim_start_replication: %X/%X", LSN_FORMAT_ARGS(startptr));
sim_latest_available_lsn = startptr;
for (;;)

View File

@@ -3,6 +3,7 @@
#![allow(non_snake_case)]
use safekeeper::simlib::node_os::NodeOs;
use tracing::info;
pub mod bindings {
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
@@ -10,8 +11,8 @@ pub mod bindings {
#[no_mangle]
pub extern "C" fn rust_function(a: u32) {
println!("Hello from Rust!");
println!("a: {}", a);
info!("Hello from Rust!");
info!("a: {}", a);
}
pub mod sim;

View File

@@ -1,3 +1,4 @@
use log::debug;
use safekeeper::simlib::{network::TCP, node_os::NodeOs, world::NodeEvent};
use std::{
cell::RefCell,
@@ -208,7 +209,7 @@ pub extern "C" fn sim_now() -> i64 {
#[no_mangle]
pub extern "C" fn sim_exit(code: i32, msg: *const u8) {
println!("sim_exit({}, {:?})", code, msg);
debug!("sim_exit({}, {:?})", code, msg);
sim_set_result(code, msg);
// I tried to make use of pthread_exit, but it doesn't work.

View File

@@ -2,6 +2,7 @@ use std::{sync::Arc, fmt};
use safekeeper::simlib::{world::World, sync::Mutex};
use tracing_subscriber::fmt::{time::FormatTime, format::Writer};
use utils::logging;
#[derive(Clone)]
@@ -42,8 +43,12 @@ pub fn init_logger() -> SimClock {
.with_target(false)
.with_timer(clock.clone())
.with_ansi(true)
// .with_max_level(tracing::Level::DEBUG)
.with_writer(std::io::stdout);
base_logger.init();
// logging::replace_panic_hook_with_tracing_panic_hook().forget();
std::panic::set_hook(Box::new(|_| {}));
clock
}

View File

@@ -16,6 +16,7 @@ use safekeeper::{
timeline::TimelineError,
SafeKeeperConf,
};
use tracing::debug;
use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
@@ -52,7 +53,7 @@ impl GlobalMap {
let mut timelines = HashMap::new();
for (&ttid, disk) in disk.timelines.lock().iter() {
info!("loading timeline {}", ttid);
debug!("loading timeline {}", ttid);
let state = disk.state.lock().clone();
if state.server.wal_seg_size == 0 {
@@ -96,7 +97,7 @@ impl GlobalMap {
bail!("timeline {} already exists", ttid);
}
info!("creating new timeline {}", ttid);
debug!("creating new timeline {}", ttid);
let commit_lsn = Lsn::INVALID;
let local_start_lsn = Lsn::INVALID;
@@ -146,7 +147,7 @@ impl GlobalMap {
}
pub fn run_server(os: NodeOs, disk: Arc<Disk>) -> Result<()> {
println!("started server {}", os.id());
debug!("started server {}", os.id());
let conf = SafeKeeperConf {
workdir: PathBuf::from("."),
my_id: NodeId(os.id() as u64),
@@ -194,11 +195,11 @@ pub fn run_server(os: NodeOs, disk: Arc<Disk>) -> Result<()> {
if let Some(conn) = conn {
let res = conn.process_any(msg, &mut global);
if res.is_err() {
println!("conn {:?} error: {:#}", tcp, res.unwrap_err());
debug!("conn {:?} error: {:#}", tcp, res.unwrap_err());
conns.remove(&tcp.id());
}
} else {
println!("conn {:?} was closed, dropping msg {:?}", tcp, msg);
debug!("conn {:?} was closed, dropping msg {:?}", tcp, msg);
}
}
NodeEvent::Internal(_) => {}
@@ -213,7 +214,7 @@ pub fn run_server(os: NodeOs, disk: Arc<Disk>) -> Result<()> {
conns.retain(|_, conn| {
let res = conn.flush(&mut global);
if res.is_err() {
println!("conn {:?} error: {:?}", conn.tcp, res);
debug!("conn {:?} error: {:?}", conn.tcp, res);
}
res.is_ok()
});
@@ -230,7 +231,7 @@ impl ConnState {
}
let msg = ProposerAcceptorMessage::parse(copy_data)?;
// println!("got msg: {:?}", msg);
debug!("got msg: {:?}", msg);
return self.process(msg, global);
} else {
bail!("unexpected message, expected AnyMessage::Bytes");
@@ -283,7 +284,7 @@ impl ConnState {
match msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
debug!(
"start handshake with walproposer {:?}",
self.tcp,
);
@@ -343,13 +344,9 @@ impl ConnState {
// // TODO:
// }
// println!("sending reply: {:?}", reply);
let mut buf = BytesMut::with_capacity(128);
reply.serialize(&mut buf)?;
// println!("sending reply len={}: {}", buf.len(), hex::encode(&buf));
self.tcp.send(AnyMessage::Bytes(buf.into()));
}
Ok(())
@@ -358,7 +355,7 @@ impl ConnState {
impl Drop for ConnState {
fn drop(&mut self) {
println!("dropping conn: {:?}", self.tcp);
debug!("dropping conn: {:?}", self.tcp);
if !std::thread::panicking() {
self.tcp.close();
}

View File

@@ -6,7 +6,7 @@ use safekeeper::simlib::{
world::World,
world::{Node, NodeEvent}, time::EmptyEvent,
};
use tracing::{info, error, warn};
use tracing::{info, error, warn, debug};
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
@@ -45,7 +45,7 @@ impl SkNode {
// start the server thread
self.node.launch(move |os| {
let res = run_server(os, disk);
println!("server {} finished: {:?}", id, res);
debug!("server {} finished: {:?}", id, res);
});
}
@@ -101,7 +101,6 @@ impl TestConfig {
];
let server_ids = [servers[0].id, servers[1].id, servers[2].id];
info!("safekeepers: {:?}", server_ids);
let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(",");
let ttid = TenantTimelineId::generate();
@@ -164,7 +163,7 @@ pub struct Test {
impl Test {
fn launch_sync(&self) -> Arc<Node> {
let client_node = self.world.new_node();
info!("sync-safekeepers started at node {}", client_node.id);
debug!("sync-safekeepers started at node {}", client_node.id);
// start the client thread
let guc = self.safekeepers_guc.clone();
@@ -301,10 +300,10 @@ impl Test {
anyhow::bail!("non-zero exitcode: {:?}", res);
}
let lsn = Lsn::from_str(&res.1)?;
info!("sync-safekeepers finished at LSN {}", lsn);
debug!("sync-safekeepers finished at LSN {}", lsn);
wp = self.launch_walproposer(lsn);
wait_node = wp.node.clone();
info!("walproposer started at node {}", wait_node.id);
debug!("walproposer started at node {}", wait_node.id);
sync_in_progress = false;
}
@@ -322,18 +321,18 @@ impl Test {
started_tx += 1;
wp.write_tx();
}
info!("written {} transactions", size);
debug!("written {} transactions", size);
} else {
skipped_tx += size;
info!("skipped {} transactions", size);
debug!("skipped {} transactions", size);
}
}
TestAction::RestartSafekeeper(id) => {
info!("restarting safekeeper {}", id);
debug!("restarting safekeeper {}", id);
self.servers[*id as usize].restart();
}
TestAction::RestartWalProposer => {
info!("restarting walproposer");
debug!("restarting walproposer");
wait_node.crash_stop();
sync_in_progress = true;
wait_node = self.launch_sync();
@@ -346,7 +345,6 @@ impl Test {
break;
}
let next_event_time = schedule[schedule_ptr].0;
info!("next event at {}, polling", next_event_time);
// poll until the next event
if wait_node.is_finished() {
@@ -356,10 +354,10 @@ impl Test {
}
}
info!("finished schedule");
info!("skipped_tx: {}", skipped_tx);
info!("started_tx: {}", started_tx);
info!("finished_tx: {}", finished_tx);
debug!("finished schedule");
debug!("skipped_tx: {}", skipped_tx);
debug!("started_tx: {}", started_tx);
debug!("finished_tx: {}", finished_tx);
Ok(())
}
@@ -401,12 +399,12 @@ impl WalProposer {
match lsn {
Ok(lsn) => {
self.commit_lsn = lsn;
info!("commit_lsn: {}", lsn);
debug!("commit_lsn: {}", lsn);
while self.last_committed_tx < self.txes.len()
&& self.txes[self.last_committed_tx] <= lsn
{
info!(
debug!(
"Tx #{} was commited at {}, last_commit_lsn={}",
self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn
);

View File

@@ -1,11 +1,13 @@
use std::{ffi::CString, path::Path, str::FromStr, sync::Arc};
use rand::Rng;
use safekeeper::simlib::{
network::{Delay, NetworkOptions},
proto::AnyMessage,
world::World,
world::{Node, NodeEvent},
};
use tracing::info;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
@@ -28,11 +30,11 @@ fn sync_empty_safekeepers() {
let lsn = test.sync_safekeepers().unwrap();
assert_eq!(lsn, Lsn(0));
println!("Sucessfully synced empty safekeepers at 0/0");
info!("Sucessfully synced empty safekeepers at 0/0");
let lsn = test.sync_safekeepers().unwrap();
assert_eq!(lsn, Lsn(0));
println!("Sucessfully synced (again) empty safekeepers at 0/0");
info!("Sucessfully synced (again) empty safekeepers at 0/0");
}
#[test]
@@ -44,7 +46,7 @@ fn run_walproposer_generate_wal() {
let lsn = test.sync_safekeepers().unwrap();
assert_eq!(lsn, Lsn(0));
println!("Sucessfully synced empty safekeepers at 0/0");
info!("Sucessfully synced empty safekeepers at 0/0");
let mut wp = test.launch_walproposer(lsn);
@@ -66,7 +68,7 @@ fn crash_safekeeper() {
let lsn = test.sync_safekeepers().unwrap();
assert_eq!(lsn, Lsn(0));
println!("Sucessfully synced empty safekeepers at 0/0");
info!("Sucessfully synced empty safekeepers at 0/0");
let mut wp = test.launch_walproposer(lsn);
@@ -95,7 +97,7 @@ fn test_simple_restart() {
let lsn = test.sync_safekeepers().unwrap();
assert_eq!(lsn, Lsn(0));
println!("Sucessfully synced empty safekeepers at 0/0");
info!("Sucessfully synced empty safekeepers at 0/0");
let mut wp = test.launch_walproposer(lsn);
@@ -112,7 +114,7 @@ fn test_simple_restart() {
drop(wp);
let lsn = test.sync_safekeepers().unwrap();
println!("Sucessfully synced safekeepers at {}", lsn);
info!("Sucessfully synced safekeepers at {}", lsn);
}
#[test]
@@ -141,4 +143,21 @@ fn test_simple_schedule() {
];
test.run_schedule(&schedule).unwrap();
info!("Test finished, stopping all threads");
test.world.stop_all();
}
#[test]
fn test_random_schedules() {
let clock = init_logger();
let mut config = TestConfig::new(Some(clock));
config.network.keepalive_timeout = Some(100);
for i in 0..1000 {
let seed: u64 = rand::thread_rng().gen();
let test = config.start(seed);
info!("Running test with seed {}", seed);
test.world.stop_all();
}
}

View File

@@ -1,3 +1,5 @@
use tracing::info;
use crate::bindings::{TestFunc, MyContextInit};
#[test]
@@ -9,7 +11,7 @@ fn test_rust_c_calls() {
};
res
}).join().unwrap();
println!("res: {}", res);
info!("res: {}", res);
}
#[test]

View File

@@ -80,7 +80,7 @@ void MyContextInit() {
if (!SelectConfigFiles(NULL, progname))
exit(1);
log_min_messages = LOG;
log_min_messages = FATAL;
Log_line_prefix = "[%p] ";
InitializeMaxBackends();

View File

@@ -6,6 +6,7 @@ use std::{
};
use rand::{rngs::StdRng, Rng};
use tracing::debug;
use super::{
chan::Chan,
@@ -151,9 +152,9 @@ impl VirtualConnection {
}
if let Some(last_recv) = buffer.last_recv {
if now - last_recv >= timeout {
println!(
"NET(time={}): connection {} timed out at node {}",
now, self.connection_id, node.id
debug!(
"NET: connection {} timed out at node {}",
self.connection_id, node.id
);
to_close[node_idx] = true;
}
@@ -186,9 +187,9 @@ impl VirtualConnection {
let msg = buffer.buf.pop_front().unwrap().1;
let callback = TCP::new(self.clone(), direction ^ 1);
// println!(
// "NET(time={}): {:?} delivered, {}=>{}",
// now, msg, from_node.id, to_node.id
// debug!(
// "NET: {:?} delivered, {}=>{}",
// msg, from_node.id, to_node.id
// );
buffer.last_recv = Some(now);
self.schedule_timeout();
@@ -214,7 +215,7 @@ impl VirtualConnection {
let buffer = &mut state.buffers[direction as usize];
if buffer.send_closed {
println!(
debug!(
"NET: TCP #{} dropped message {:?} (broken pipe)",
self.connection_id, msg
);
@@ -222,7 +223,7 @@ impl VirtualConnection {
}
if close {
println!(
debug!(
"NET: TCP #{} dropped message {:?} (pipe just broke)",
self.connection_id, msg
);
@@ -231,7 +232,7 @@ impl VirtualConnection {
}
if buffer.recv_closed {
println!(
debug!(
"NET: TCP #{} dropped message {:?} (recv closed)",
self.connection_id, msg
);
@@ -257,7 +258,7 @@ impl VirtualConnection {
let delay = if let Some(ms) = delay {
ms
} else {
println!("NET: TCP #{} dropped connect", self.connection_id);
debug!("NET: TCP #{} dropped connect", self.connection_id);
buffer.send_closed = true;
return;
};
@@ -283,20 +284,20 @@ impl VirtualConnection {
let mut state = self.state.lock();
let recv_buffer = &mut state.buffers[1 ^ node_idx];
if recv_buffer.recv_closed {
println!(
debug!(
"NET: TCP #{} closed twice at node {}",
self.connection_id, node.id
);
return;
}
println!(
debug!(
"NET: TCP #{} closed at node {}",
self.connection_id, node.id
);
recv_buffer.recv_closed = true;
for msg in recv_buffer.buf.drain(..) {
println!(
debug!(
"NET: TCP #{} dropped message {:?} (closed)",
self.connection_id, msg
);

View File

@@ -1,5 +1,7 @@
use std::{backtrace::Backtrace, sync::Arc};
use tracing::debug;
use super::world::{Node, NodeId, World};
pub type Mutex<T> = parking_lot::Mutex<T>;
@@ -123,15 +125,12 @@ impl Park {
if state.can_continue {
// unconditional parking
// println!("YIELD PARKING: node {:?}", node.id);
parking_lot::MutexGuard::unlocked(&mut state, || {
// first put to world parking, then decrease the running threads counter
node.internal_parking_middle(self.clone());
});
} else {
// println!("AWAIT PARKING: node {:?}", node.id);
parking_lot::MutexGuard::unlocked(&mut state, || {
// conditional parking, decrease the running threads counter without parking
node.internal_parking_start(self.clone());
@@ -146,7 +145,6 @@ impl Park {
panic!("thread was crashed by the simulation");
}
// println!("CONDITION MET: node {:?}", node.id);
// condition is met, we are now running instead of the waker thread.
// the next thing is to park the thread in the world, then decrease
// the running threads counter
@@ -166,8 +164,6 @@ impl Park {
panic!("node {} was crashed by the simulation", node.id);
}
// println!("PARKING ENDED: node {:?}", node.id);
// We are the only running thread now, we just need to update the state,
// and continue the execution.
node.internal_parking_end();
@@ -178,7 +174,6 @@ impl Park {
let park = Park::new(true);
let node = Node::current();
Self::init_state(&mut park.lock.lock(), &node);
// println!("PARKING MIDDLE alt: node {:?}", node.id);
node.internal_parking_ahead(park.clone());
park
}
@@ -194,7 +189,7 @@ impl Park {
let mut state = self.lock.lock();
if state.can_continue {
println!(
debug!(
"WARN wake() called on a thread that is already waked, node {:?}",
state.node_id
);
@@ -220,7 +215,7 @@ impl Park {
let mut state = self.lock.lock();
if state.can_continue {
println!(
debug!(
"WARN external_wake() called on a thread that is already waked, node {:?}",
state.node_id
);
@@ -237,7 +232,7 @@ impl Park {
pub fn internal_world_wake(&self) {
let mut state = self.lock.lock();
if state.finished {
println!(
debug!(
"WARN internal_world_wake() called on a thread that is already waked, node {:?}",
state.node_id
);
@@ -260,8 +255,8 @@ impl Park {
/// Print debug info about the parked thread.
pub fn debug_print(&self) {
// let state = self.lock.lock();
// println!("PARK: node {:?} wake1={} wake2={}", state.node_id, state.can_continue, state.finished);
// println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace);
// debug!("PARK: node {:?} wake1={} wake2={}", state.node_id, state.can_continue, state.finished);
// debug!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace);
}
/// It feels that this function can cause deadlocks.

View File

@@ -35,7 +35,6 @@ impl Timing {
if !self.is_event_ready() {
let next_time = self.timers.peek().unwrap().time;
// println!("CLK(time={})", next_time);
self.current_time = next_time;
assert!(self.is_event_ready());
}

View File

@@ -98,6 +98,13 @@ impl World {
}
}
pub fn stop_all(&self) {
let nodes = self.nodes.lock().clone();
for node in nodes {
node.crash_stop();
}
}
/// Returns a writable end of a TCP connection, to send src->dst messages.
pub fn open_tcp(self: &Arc<World>, src: &Arc<Node>, dst: NodeId) -> TCP {
// TODO: replace unwrap() with /dev/null socket.
@@ -168,7 +175,7 @@ impl World {
// First try to wake up unconditional thread.
let to_resume = self.thread_to_unpark();
if let Some(park) = to_resume {
// println!("Waking up park at node {:?}", park.node_id());
// debug!("Waking up park at node {:?}", park.node_id());
// Wake up the chosen thread. To do that:
// 1. Increment the counter of running threads.
@@ -189,7 +196,7 @@ impl World {
// when code is waiting for external events.
let time_event = self.timing.lock().step();
if let Some(event) = time_event {
// println!("Processing event: {:?}", event.event);
// debug!("Processing event: {:?}", event.event);
event.process();
// to have a clean state after each step, wait for all threads to finish
@@ -327,22 +334,22 @@ impl Node {
*status = NodeStatus::Running;
drop(status);
// park the current thread, [`launch`] will wait until it's parked
Park::yield_thread();
if let Some(nodes_init) = world.nodes_init.as_ref() {
nodes_init(NodeOs::new(world.clone(), node.clone()));
}
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
// park the current thread, [`launch`] will wait until it's parked
Park::yield_thread();
if let Some(nodes_init) = world.nodes_init.as_ref() {
nodes_init(NodeOs::new(world.clone(), node.clone()));
}
f(NodeOs::new(world, node.clone()));
}));
match res {
Ok(_) => {
println!("Node {} finished successfully", node.id);
debug!("Node {} finished successfully", node.id);
}
Err(e) => {
println!("Node {} finished with panic: {:?}", node.id, e);
debug!("Node {} finished with panic: {:?}", node.id, e);
}
}
@@ -425,7 +432,6 @@ impl Node {
}
debug!("Node {} is crashing, status={:?}", self.id, status);
self.world.debug_print_state();
let park = self.world.find_parked_node(self);

View File

@@ -1,3 +1,5 @@
use tracing::info;
use crate::simlib::{
node_os::NodeOs,
proto::{AnyMessage, ReplCell},
@@ -6,7 +8,7 @@ use crate::simlib::{
/// Copy all data from array to the remote node.
pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
println!("started client");
info!("started client");
let epoll = os.epoll();
let mut delivered = 0;
@@ -15,7 +17,7 @@ pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
while delivered < data.len() {
let num = &data[delivered];
println!("sending data: {:?}", num.clone());
info!("sending data: {:?}", num.clone());
sock.send(AnyMessage::ReplCell(num.clone()));
// loop {
@@ -27,7 +29,7 @@ pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
}
}
NodeEvent::Closed(_) => {
println!("connection closed, reestablishing");
info!("connection closed, reestablishing");
sock = os.open_tcp(dst);
}
_ => {}
@@ -38,9 +40,9 @@ pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
let sock = os.open_tcp(dst);
for num in data {
println!("sending data: {:?}", num.clone());
info!("sending data: {:?}", num.clone());
sock.send(AnyMessage::ReplCell(num.clone()));
}
println!("sent all data and finished client");
info!("sent all data and finished client");
}

View File

@@ -1,3 +1,5 @@
use tracing::info;
use crate::simlib::{node_os::NodeOs, proto::AnyMessage, world::NodeEvent};
use super::disk::Storage;
@@ -23,17 +25,17 @@ use super::disk::Storage;
// }
pub fn run_server(os: NodeOs, mut storage: Box<dyn Storage<u32>>) {
println!("started server");
info!("started server");
let epoll = os.epoll();
loop {
let event = epoll.recv();
println!("got event: {:?}", event);
info!("got event: {:?}", event);
match event {
NodeEvent::Message((msg, tcp)) => match msg {
AnyMessage::ReplCell(cell) => {
if cell.seqno != storage.flush_pos() {
println!("got out of order data: {:?}", cell);
info!("got out of order data: {:?}", cell);
continue;
}
storage.write(cell.value);