diff --git a/libs/walproposer/.gitignore b/libs/walproposer/.gitignore index 8800186e5f..ca3a577a84 100644 --- a/libs/walproposer/.gitignore +++ b/libs/walproposer/.gitignore @@ -1,3 +1,4 @@ *.a *.o *.tmp +pgdata diff --git a/libs/walproposer/pgdata/postgresql.conf b/libs/walproposer/pgdata/postgresql.conf deleted file mode 100644 index ae85ca2b23..0000000000 --- a/libs/walproposer/pgdata/postgresql.conf +++ /dev/null @@ -1,12 +0,0 @@ -wal_log_hints=off -hot_standby=on -fsync=off -wal_level=replica -restart_after_crash=off -shared_preload_libraries=neon -neon.pageserver_connstring='' -neon.tenant_id=cc6e67313d57283bad411600fbf5c142 -neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e -synchronous_standby_names=walproposer -neon.safekeepers='node:1,node:2,node:3' -max_connections=100 diff --git a/libs/walproposer/rust_bindings.h b/libs/walproposer/rust_bindings.h index a9b6f44001..5a8bc0ee64 100644 --- a/libs/walproposer/rust_bindings.h +++ b/libs/walproposer/rust_bindings.h @@ -63,6 +63,8 @@ int64_t sim_now(void); void sim_exit(int32_t code, const uint8_t *msg); +void sim_set_result(int32_t code, const uint8_t *msg); + /** * Get tag of the current message. */ diff --git a/libs/walproposer/src/sim.rs b/libs/walproposer/src/sim.rs index 3cc08f6a44..2c3ee49798 100644 --- a/libs/walproposer/src/sim.rs +++ b/libs/walproposer/src/sim.rs @@ -150,20 +150,16 @@ pub extern "C" fn sim_epoll_peek(timeout: i64) -> Event { tcp: tcp_save(tcp), any_message: AnyMessageTag::None, }, - NodeEvent::Message((message, tcp)) => { - Event { - tag: EventTag::Message, - tcp: tcp_save(tcp), - any_message: anymessage_tag(&message), - } - } - NodeEvent::Internal(message) => { - Event { - tag: EventTag::Internal, - tcp: 0, - any_message: anymessage_tag(&message), - } - } + NodeEvent::Message((message, tcp)) => Event { + tag: EventTag::Message, + tcp: tcp_save(tcp), + any_message: anymessage_tag(&message), + }, + NodeEvent::Internal(message) => Event { + tag: EventTag::Internal, + tcp: 0, + any_message: anymessage_tag(&message), + }, NodeEvent::WakeTimeout(_) => { // can't happen unreachable!() @@ -178,10 +174,8 @@ pub extern "C" fn sim_now() -> i64 { #[no_mangle] pub extern "C" fn sim_exit(code: i32, msg: *const u8) { - let msg = unsafe { CStr::from_ptr(msg as *const i8) }; - let msg = msg.to_string_lossy().into_owned(); println!("sim_exit({}, {:?})", code, msg); - os().set_result(code, msg); + sim_set_result(code, msg); // I tried to make use of pthread_exit, but it doesn't work. // https://github.com/rust-lang/unsafe-code-guidelines/issues/211 @@ -192,3 +186,10 @@ pub extern "C" fn sim_exit(code: i32, msg: *const u8) { // so I'm going to use it for now. panic!("sim_exit() called from C code") } + +#[no_mangle] +pub extern "C" fn sim_set_result(code: i32, msg: *const u8) { + let msg = unsafe { CStr::from_ptr(msg as *const i8) }; + let msg = msg.to_string_lossy().into_owned(); + os().set_result(code, msg); +} diff --git a/libs/walproposer/src/simtest/safekeeper.rs b/libs/walproposer/src/simtest/safekeeper.rs index 24a6ae0543..fb7ce97af1 100644 --- a/libs/walproposer/src/simtest/safekeeper.rs +++ b/libs/walproposer/src/simtest/safekeeper.rs @@ -4,14 +4,25 @@ use std::{collections::HashMap, path::PathBuf, time::Duration}; +use anyhow::{bail, Result}; use bytes::BytesMut; use hyper::Uri; use log::info; -use safekeeper::{simlib::{node_os::NodeOs, network::TCP, world::NodeEvent, proto::AnyMessage}, safekeeper::{ProposerAcceptorMessage, ServerInfo, SafeKeeperState, UNKNOWN_SERVER_VERSION, SafeKeeper, AcceptorProposerMessage}, timeline::{TimelineError}, SafeKeeperConf}; -use utils::{id::{TenantTimelineId, NodeId}, lsn::Lsn}; -use anyhow::{Result, bail}; +use safekeeper::{ + safekeeper::{ + AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, + UNKNOWN_SERVER_VERSION, + }, + simlib::{network::TCP, node_os::NodeOs, proto::AnyMessage, world::NodeEvent}, + timeline::TimelineError, + SafeKeeperConf, +}; +use utils::{ + id::{NodeId, TenantTimelineId}, + lsn::Lsn, +}; -use crate::simtest::storage::{InMemoryState, DummyWalStore}; +use crate::simtest::storage::{DummyWalStore, InMemoryState}; struct ConnState { tcp: TCP, @@ -60,14 +71,17 @@ pub fn run_server(os: NodeOs) -> Result<()> { match event { NodeEvent::Accept(tcp) => { - conns.insert(tcp.id(), ConnState { - tcp, - conf: conf.clone(), - greeting: false, - ttid: TenantTimelineId::empty(), - tli: None, - flush_pending: false, - }); + conns.insert( + tcp.id(), + ConnState { + tcp, + conf: conf.clone(), + greeting: false, + ttid: TenantTimelineId::empty(), + tli: None, + flush_pending: false, + }, + ); } NodeEvent::Message((msg, tcp)) => { let conn = conns.get_mut(&tcp.id()); @@ -104,7 +118,7 @@ impl ConnState { fn process_any(&mut self, any: AnyMessage) -> Result<()> { if let AnyMessage::Bytes(copy_data) = any { let msg = ProposerAcceptorMessage::parse(copy_data)?; - println!("got msg: {:?}", msg); + // println!("got msg: {:?}", msg); return self.process(msg); } else { bail!("unexpected message, expected AnyMessage::Bytes"); @@ -120,7 +134,7 @@ impl ConnState { // TODO: load state from in-memory storage let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn); - + if state.server.wal_seg_size == 0 { bail!(TimelineError::UninitializedWalSegSize(ttid)); } @@ -142,12 +156,10 @@ impl ConnState { // TODO: implement "persistent" storage for tests let wal_store = DummyWalStore::new(); - + let sk = SafeKeeper::new(control_store, wal_store, self.conf.my_id)?; - self.tli = Some(SharedState { - sk, - }); + self.tli = Some(SharedState { sk }); Ok(()) } @@ -155,7 +167,7 @@ impl ConnState { fn process(&mut self, msg: ProposerAcceptorMessage) -> Result<()> { if !self.greeting { self.greeting = true; - + match msg { ProposerAcceptorMessage::Greeting(ref greeting) => { info!( @@ -171,9 +183,7 @@ impl ConnState { self.create_timeline(ttid, server_info)? } _ => { - bail!( - "unexpected message {msg:?} instead of greeting" - ); + bail!("unexpected message {msg:?} instead of greeting"); } } } @@ -181,7 +191,9 @@ impl ConnState { match msg { ProposerAcceptorMessage::AppendRequest(append_request) => { self.flush_pending = true; - self.process_sk_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(append_request))?; + self.process_sk_msg(&ProposerAcceptorMessage::NoFlushAppendRequest( + append_request, + ))?; } other => { self.process_sk_msg(&other)?; @@ -211,12 +223,12 @@ impl ConnState { // // TODO: // } - println!("sending reply: {:?}", reply); + // println!("sending reply: {:?}", reply); let mut buf = BytesMut::with_capacity(128); reply.serialize(&mut buf)?; - println!("sending reply len={}: {}", buf.len(), hex::encode(&buf)); + // println!("sending reply len={}: {}", buf.len(), hex::encode(&buf)); self.tcp.send(AnyMessage::Bytes(buf.into())); } @@ -227,7 +239,9 @@ impl ConnState { impl Drop for ConnState { fn drop(&mut self) { println!("dropping conn: {:?}", self.tcp); - self.tcp.close(); + if !std::thread::panicking() { + self.tcp.close(); + } // TODO: clean up non-fsynced WAL } } diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index f891194eb8..ad16d4dc32 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -1,21 +1,50 @@ -use std::{ffi::CString, str::FromStr, sync::Arc}; +use std::{ffi::CString, path::Path, str::FromStr, sync::Arc}; use safekeeper::simlib::{ network::{Delay, NetworkOptions}, + proto::AnyMessage, + world::World, world::{Node, NodeEvent}, - world::World, proto::AnyMessage, }; use utils::{id::TenantTimelineId, logging, lsn::Lsn}; use crate::{ bindings::{ - neon_tenant_walproposer, neon_timeline_walproposer, wal_acceptor_connection_timeout, - wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers, sim_redo_start_lsn, MyInsertRecord, + neon_tenant_walproposer, neon_timeline_walproposer, sim_redo_start_lsn, syncSafekeepers, + wal_acceptor_connection_timeout, wal_acceptor_reconnect_timeout, wal_acceptors_list, + MyInsertRecord, WalProposerCleanup, WalProposerRust, }, c_context, simtest::safekeeper::run_server, }; +struct SkNode { + node: Arc, + id: u32, +} + +impl SkNode { + fn new(node: Arc) -> Self { + let res = Self { id: node.id, node }; + res.launch(); + res + } + + fn launch(&self) { + let id = self.id; + // start the server thread + self.node.launch(move |os| { + let res = run_server(os); + println!("server {} finished: {:?}", id, res); + }); + } + + fn restart(&self) { + self.node.crash_stop(); + self.launch(); + } +} + struct TestConfig { network: NetworkOptions, timeout: u64, @@ -42,42 +71,73 @@ impl TestConfig { } fn start(&self, seed: u64) -> Test { - let world = Arc::new(World::new(seed, Arc::new(self.network.clone()), c_context())); + let world = Arc::new(World::new( + seed, + Arc::new(self.network.clone()), + c_context(), + )); world.register_world(); - let servers = [world.new_node(), world.new_node(), world.new_node()]; + let servers = [ + SkNode::new(world.new_node()), + SkNode::new(world.new_node()), + SkNode::new(world.new_node()), + ]; + let server_ids = [servers[0].id, servers[1].id, servers[2].id]; let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(","); let ttid = TenantTimelineId::generate(); - // start the server threads - for ptr in servers.iter() { - let server = ptr.clone(); - let id = server.id; - server.launch(move |os| { - let res = run_server(os); - println!("server {} finished: {:?}", id, res); - }); - } - // wait init for all servers world.await_all(); + // clean up pgdata directory + self.init_pgdata(); + Test { world, servers, - server_ids, safekeepers_guc, ttid, timeout: self.timeout, } } + + fn init_pgdata(&self) { + let pgdata = Path::new("/home/admin/simulator/libs/walproposer/pgdata"); + if pgdata.exists() { + std::fs::remove_dir_all(pgdata).unwrap(); + } + std::fs::create_dir(pgdata).unwrap(); + + // create empty pg_wal and pg_notify subdirs + std::fs::create_dir(pgdata.join("pg_wal")).unwrap(); + std::fs::create_dir(pgdata.join("pg_notify")).unwrap(); + + // write postgresql.conf + let mut conf = std::fs::File::create(pgdata.join("postgresql.conf")).unwrap(); + let content = " +wal_log_hints=off +hot_standby=on +fsync=off +wal_level=replica +restart_after_crash=off +shared_preload_libraries=neon +neon.pageserver_connstring='' +neon.tenant_id=cc6e67313d57283bad411600fbf5c142 +neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e +synchronous_standby_names=walproposer +neon.safekeepers='node:1,node:2,node:3' +max_connections=100 +"; + + std::io::Write::write_all(&mut conf, content.as_bytes()).unwrap(); + } } struct Test { world: Arc, - servers: [Arc; 3], - server_ids: [u32; 3], + servers: [SkNode; 3], safekeepers_guc: String, ttid: TenantTimelineId, timeout: u64, @@ -162,8 +222,11 @@ impl Test { self.world.await_all(); - WalProposer { + WalProposer { node: client_node, + txes: Vec::new(), + last_committed_tx: 0, + commit_lsn: Lsn(0), } } @@ -175,17 +238,54 @@ impl Test { struct WalProposer { node: Arc, + txes: Vec, + last_committed_tx: usize, + commit_lsn: Lsn, } impl WalProposer { - fn gen_wal_record(&self) -> Lsn { + fn write_tx(&mut self) -> usize { let new_ptr = unsafe { MyInsertRecord() }; - self.node.network_chan().send(NodeEvent::Internal( - AnyMessage::LSN(new_ptr as u64), - )); + self.node + .network_chan() + .send(NodeEvent::Internal(AnyMessage::LSN(new_ptr as u64))); - return Lsn(new_ptr as u64); + let tx_id = self.txes.len(); + self.txes.push(Lsn(new_ptr as u64)); + + tx_id + } + + /// Updates committed status. + fn update(&mut self) { + let last_result = self.node.result.lock().clone(); + if last_result.0 != 1 { + // not an LSN update + return; + } + + let lsn_str = last_result.1; + let lsn = Lsn::from_str(&lsn_str); + match lsn { + Ok(lsn) => { + self.commit_lsn = lsn; + println!("commit_lsn: {}", lsn); + + while self.last_committed_tx < self.txes.len() + && self.txes[self.last_committed_tx] <= lsn + { + println!( + "Tx #{} was commited at {}, last_commit_lsn={}", + self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn + ); + self.last_committed_tx += 1; + } + } + Err(e) => { + println!("failed to parse LSN: {:?}", e); + } + } } } @@ -202,7 +302,7 @@ fn sync_empty_safekeepers() { let lsn = test.sync_safekeepers().unwrap(); assert_eq!(lsn, Lsn(0)); - println!("Sucessfully synced empty safekeepers at 0/0"); + println!("Sucessfully synced (again) empty safekeepers at 0/0"); } #[test] @@ -217,13 +317,43 @@ fn run_walproposer_generate_wal() { assert_eq!(lsn, Lsn(0)); println!("Sucessfully synced empty safekeepers at 0/0"); - let wp = test.launch_walproposer(lsn); - // let rec1 = wp.gen_wal_record(); + let mut wp = test.launch_walproposer(lsn); test.poll_for_duration(30); for i in 0..100 { - wp.gen_wal_record(); + wp.write_tx(); test.poll_for_duration(5); + wp.update(); } } + +#[test] +fn crash_safekeeper() { + logging::init(logging::LogFormat::Plain).unwrap(); + + let mut config = TestConfig::new(); + // config.network.timeout = Some(250); + let test = config.start(1337); + + let lsn = test.sync_safekeepers().unwrap(); + assert_eq!(lsn, Lsn(0)); + println!("Sucessfully synced empty safekeepers at 0/0"); + + let mut wp = test.launch_walproposer(lsn); + + test.poll_for_duration(30); + wp.update(); + + wp.write_tx(); + wp.write_tx(); + wp.write_tx(); + + test.servers[0].restart(); + + test.poll_for_duration(100); + wp.update(); + + test.poll_for_duration(1000); + wp.update(); +} diff --git a/libs/walproposer/test.c b/libs/walproposer/test.c index 3584a0b309..ab8093b97f 100644 --- a/libs/walproposer/test.c +++ b/libs/walproposer/test.c @@ -80,7 +80,7 @@ void MyContextInit() { if (!SelectConfigFiles(NULL, progname)) exit(1); - log_min_messages = DEBUG5; + log_min_messages = LOG; InitializeMaxBackends(); ChangeToDataDir(); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index a52c670f4c..6c2a9d8d91 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -2332,6 +2332,16 @@ HandleSafekeeperResponse(void) quorumFeedback.rf.ps_flushlsn, GetCurrentTimestamp(), false); #endif + + #ifdef SIMLIB + if (!syncSafekeepers) + { + char lsn_str[8 + 1 + 8 + 1]; + + snprintf(lsn_str, sizeof(lsn_str), "%X/%X", LSN_FORMAT_ARGS(quorumFeedback.flushLsn)); + sim_set_result(1, lsn_str); + } + #endif } CombineHotStanbyFeedbacks(&hsFeedback); @@ -2466,10 +2476,10 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) if (!(AsyncRead(sk, &buf, &buf_size))) return false; - for (int i = 0; i < buf_size; i++) { - fprintf(stderr, "%02x", buf[i]); - } - fprintf(stderr, "\n"); + // for (int i = 0; i < buf_size; i++) { + // fprintf(stderr, "%02x", buf[i]); + // } + // fprintf(stderr, "\n"); /* parse it */ s.data = buf; diff --git a/safekeeper/src/simlib/network.rs b/safekeeper/src/simlib/network.rs index 1bc2d0b47c..6773826df1 100644 --- a/safekeeper/src/simlib/network.rs +++ b/safekeeper/src/simlib/network.rs @@ -8,6 +8,7 @@ use std::{ use rand::{rngs::StdRng, Rng}; use super::{ + chan::Chan, proto::AnyMessage, sync::Mutex, time::NetworkEvent, @@ -70,6 +71,7 @@ pub struct VirtualConnection { pub connection_id: u64, pub world: Arc, pub nodes: [Arc; 2], + dst_sockets: [Chan; 2], state: Mutex, options: Arc, } @@ -93,6 +95,7 @@ impl VirtualConnection { let conn = Arc::new(Self { connection_id: id, world, + dst_sockets: [src.network_chan(), dst.network_chan()], nodes: [src, dst], state: Mutex::new(ConnectionState { buffers: [NetworkBuffer::new(None), NetworkBuffer::new(Some(now))], @@ -105,7 +108,7 @@ impl VirtualConnection { conn.send_connect(); // TODO: add connection to the dst node - // conn.nodes[1].network_chan().send(NodeEvent::Connection(conn.clone())); + // conn.dst_sockets[1].send(NodeEvent::Connection(conn.clone())); conn } @@ -128,8 +131,7 @@ impl VirtualConnection { state.deref_mut(), now, direction as MessageDirection, - &self.nodes[direction], - &self.nodes[direction ^ 1], + &self.dst_sockets[direction ^ 1], ); } @@ -171,8 +173,7 @@ impl VirtualConnection { state: &mut ConnectionState, now: u64, direction: MessageDirection, - from_node: &Arc, - to_node: &Arc, + to_socket: &Chan, ) { let buffer = &mut state.buffers[direction as usize]; if buffer.recv_closed { @@ -183,19 +184,17 @@ impl VirtualConnection { let msg = buffer.buf.pop_front().unwrap().1; let callback = TCP::new(self.clone(), direction ^ 1); - println!( - "NET(time={}): {:?} delivered, {}=>{}", - now, msg, from_node.id, to_node.id - ); + // println!( + // "NET(time={}): {:?} delivered, {}=>{}", + // now, msg, from_node.id, to_node.id + // ); buffer.last_recv = Some(now); self.schedule_timeout(); if let AnyMessage::InternalConnect = msg { - to_node.network_chan().send(NodeEvent::Accept(callback)); + to_socket.send(NodeEvent::Accept(callback)); } else { - to_node - .network_chan() - .send(NodeEvent::Message((msg, callback))); + to_socket.send(NodeEvent::Message((msg, callback))); } } } @@ -299,8 +298,7 @@ impl VirtualConnection { send_buffer.send_closed = true; // TODO: notify the other side? - node.network_chan() - .send(NodeEvent::Closed(TCP::new(self.clone(), node_idx as u8))); + self.dst_sockets[node_idx].send(NodeEvent::Closed(TCP::new(self.clone(), node_idx as u8))); } /// Get an event suitable for scheduling. diff --git a/safekeeper/src/simlib/sync.rs b/safekeeper/src/simlib/sync.rs index 7e019fb00a..d731fae202 100644 --- a/safekeeper/src/simlib/sync.rs +++ b/safekeeper/src/simlib/sync.rs @@ -87,6 +87,8 @@ struct ParkState { /// world simulation to wake it up. True means that the parking is /// finished and the thread can continue. finished: bool, + /// True means that the thread should wake up and panic. + panic: bool, node_id: Option, backtrace: Option, } @@ -97,6 +99,7 @@ impl Park { lock: Mutex::new(ParkState { can_continue, finished: false, + panic: false, node_id: None, backtrace: None, }), @@ -131,7 +134,7 @@ impl Park { parking_lot::MutexGuard::unlocked(&mut state, || { // conditional parking, decrease the running threads counter without parking - node.internal_parking_start(); + node.internal_parking_start(self.clone()); }); // wait for condition @@ -139,6 +142,10 @@ impl Park { self.cvar.wait(&mut state); } + if state.panic { + panic!("thread was crashed by the simulation"); + } + // println!("CONDITION MET: node {:?}", node.id); // condition is met, we are now running instead of the waker thread. // the next thing is to park the thread in the world, then decrease @@ -155,6 +162,10 @@ impl Park { self.cvar.wait(state); } + if state.panic { + panic!("thread was crashed by the simulation"); + } + // println!("PARKING ENDED: node {:?}", node.id); // We are the only running thread now, we just need to update the state, @@ -187,11 +198,11 @@ impl Park { "WARN wake() called on a thread that is already waked, node {:?}", state.node_id ); - return; + } else { + state.can_continue = true; + // and here we park the waiting thread + self.cvar.notify_all(); } - state.can_continue = true; - // and here we park the waiting thread - self.cvar.notify_all(); drop(state); // and here we block the thread that called wake() by defer @@ -206,16 +217,16 @@ impl Park { /// 2. Wake up the waiting thread (it will park itself in the world) pub fn external_wake(&self) { let world = World::current(); - world.internal_parking_wake(); let mut state = self.lock.lock(); if state.can_continue { println!( - "WARN wake() called on a thread that is already waked, node {:?}", + "WARN external_wake() called on a thread that is already waked, node {:?}", state.node_id ); return; } + world.internal_parking_wake(); state.can_continue = true; // and here we park the waiting thread self.cvar.notify_all(); @@ -236,9 +247,19 @@ impl Park { self.cvar.notify_all(); } + /// Will wake up thread to panic instantly. + pub fn crash_panic(&self) { + let mut state = self.lock.lock(); + state.can_continue = true; + state.finished = true; + state.panic = true; + self.cvar.notify_all(); + drop(state); + } + /// Print debug info about the parked thread. pub fn debug_print(&self) { - let _state = self.lock.lock(); + // let state = self.lock.lock(); // println!("PARK: node {:?} wake1={} wake2={}", state.node_id, state.can_continue, state.finished); // println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace); } diff --git a/safekeeper/src/simlib/time.rs b/safekeeper/src/simlib/time.rs index 6713704b43..9e8c224599 100644 --- a/safekeeper/src/simlib/time.rs +++ b/safekeeper/src/simlib/time.rs @@ -35,7 +35,7 @@ impl Timing { if !self.is_event_ready() { let next_time = self.timers.peek().unwrap().time; - println!("CLK(time={})", next_time); + // println!("CLK(time={})", next_time); self.current_time = next_time; assert!(self.is_event_ready()); } diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index e7792d91ee..bb5bca45ab 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -221,6 +221,20 @@ impl World { // waking node with condition, increase the running threads counter self.wait_group.add(1); } + + fn find_parked_node(&self, node: &Node) -> Option> { + let mut parking = self.unconditional_parking.lock(); + let mut found: Option = None; + for (i, park) in parking.iter().enumerate() { + if park.node_id() == Some(node.id) { + if found.is_some() { + panic!("found more than one parked thread for node {}", node.id); + } + found = Some(i); + } + } + Some(parking.swap_remove(found?)) + } } thread_local! { @@ -231,8 +245,9 @@ thread_local! { /// Internal node state. pub struct Node { pub id: NodeId, - network: Chan, + network: Mutex>, status: Mutex, + waiting_park: Mutex>, world: Arc, join_handle: Mutex>>, pub rng: Mutex, @@ -254,8 +269,9 @@ impl Node { pub fn new(id: NodeId, world: Arc, rng: StdRng) -> Node { Node { id, - network: Chan::new(), + network: Mutex::new(Chan::new()), status: Mutex::new(NodeStatus::NotStarted), + waiting_park: Mutex::new(Park::new(false)), world, join_handle: Mutex::new(None), rng: Mutex::new(rng), @@ -279,7 +295,7 @@ impl Node { } let mut status = node.status.lock(); - if *status != NodeStatus::NotStarted { + if *status != NodeStatus::NotStarted && *status != NodeStatus::Finished { // clearly a caller bug, should never happen panic!("node {} is already running", node.id); } @@ -307,7 +323,6 @@ impl Node { let mut status = node.status.lock(); *status = NodeStatus::Finished; - // TODO: log the thread is finished }); *self.join_handle.lock() = Some(join_handle); @@ -318,16 +333,17 @@ impl Node { /// Returns a channel to receive events from the network. pub fn network_chan(&self) -> Chan { - self.network.clone() + self.network.lock().clone() } - pub fn internal_parking_start(&self) { + pub fn internal_parking_start(&self, park: Arc) { // Node started parking (waiting for condition), and the current thread // is the only one running, so we need to do: // 1. Change the node status to Waiting // 2. Decrease the running threads counter // 3. Block the current thread until it's woken up (outside this function) *self.status.lock() = NodeStatus::Waiting; + *self.waiting_park.lock() = park; self.world.wait_group.done(); } @@ -372,6 +388,40 @@ impl Node { let status = self.status.lock(); *status == NodeStatus::Finished } + + pub fn crash_stop(self: &Arc) { + let status = self.status.lock().clone(); + match status { + NodeStatus::NotStarted | NodeStatus::Finished | NodeStatus::Failed => return, + NodeStatus::Running => { + panic!("crash unexpected node state: Running") + } + NodeStatus::Waiting | NodeStatus::Parked => {} + } + + println!("Node {} is crashing, status={:?}", self.id, status); + self.world.debug_print_state(); + + let park = self.world.find_parked_node(self); + + let park = if park.is_some() { + assert!(status == NodeStatus::Parked); + park.unwrap() + } else { + assert!(status == NodeStatus::Waiting); + self.waiting_park.lock().clone() + }; + + park.debug_print(); + + // unplug old network socket, and create a new one + *self.network.lock() = Chan::new(); + + self.world.wait_group.add(1); + park.crash_panic(); + // self.world.debug_print_state(); + self.world.wait_group.wait(); + } } /// Network events and timers.