mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Crash safekeepers
This commit is contained in:
1
libs/walproposer/.gitignore
vendored
1
libs/walproposer/.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
*.a
|
||||
*.o
|
||||
*.tmp
|
||||
pgdata
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
wal_log_hints=off
|
||||
hot_standby=on
|
||||
fsync=off
|
||||
wal_level=replica
|
||||
restart_after_crash=off
|
||||
shared_preload_libraries=neon
|
||||
neon.pageserver_connstring=''
|
||||
neon.tenant_id=cc6e67313d57283bad411600fbf5c142
|
||||
neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e
|
||||
synchronous_standby_names=walproposer
|
||||
neon.safekeepers='node:1,node:2,node:3'
|
||||
max_connections=100
|
||||
@@ -63,6 +63,8 @@ int64_t sim_now(void);
|
||||
|
||||
void sim_exit(int32_t code, const uint8_t *msg);
|
||||
|
||||
void sim_set_result(int32_t code, const uint8_t *msg);
|
||||
|
||||
/**
|
||||
* Get tag of the current message.
|
||||
*/
|
||||
|
||||
@@ -150,20 +150,16 @@ pub extern "C" fn sim_epoll_peek(timeout: i64) -> Event {
|
||||
tcp: tcp_save(tcp),
|
||||
any_message: AnyMessageTag::None,
|
||||
},
|
||||
NodeEvent::Message((message, tcp)) => {
|
||||
Event {
|
||||
tag: EventTag::Message,
|
||||
tcp: tcp_save(tcp),
|
||||
any_message: anymessage_tag(&message),
|
||||
}
|
||||
}
|
||||
NodeEvent::Internal(message) => {
|
||||
Event {
|
||||
tag: EventTag::Internal,
|
||||
tcp: 0,
|
||||
any_message: anymessage_tag(&message),
|
||||
}
|
||||
}
|
||||
NodeEvent::Message((message, tcp)) => Event {
|
||||
tag: EventTag::Message,
|
||||
tcp: tcp_save(tcp),
|
||||
any_message: anymessage_tag(&message),
|
||||
},
|
||||
NodeEvent::Internal(message) => Event {
|
||||
tag: EventTag::Internal,
|
||||
tcp: 0,
|
||||
any_message: anymessage_tag(&message),
|
||||
},
|
||||
NodeEvent::WakeTimeout(_) => {
|
||||
// can't happen
|
||||
unreachable!()
|
||||
@@ -178,10 +174,8 @@ pub extern "C" fn sim_now() -> i64 {
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn sim_exit(code: i32, msg: *const u8) {
|
||||
let msg = unsafe { CStr::from_ptr(msg as *const i8) };
|
||||
let msg = msg.to_string_lossy().into_owned();
|
||||
println!("sim_exit({}, {:?})", code, msg);
|
||||
os().set_result(code, msg);
|
||||
sim_set_result(code, msg);
|
||||
|
||||
// I tried to make use of pthread_exit, but it doesn't work.
|
||||
// https://github.com/rust-lang/unsafe-code-guidelines/issues/211
|
||||
@@ -192,3 +186,10 @@ pub extern "C" fn sim_exit(code: i32, msg: *const u8) {
|
||||
// so I'm going to use it for now.
|
||||
panic!("sim_exit() called from C code")
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn sim_set_result(code: i32, msg: *const u8) {
|
||||
let msg = unsafe { CStr::from_ptr(msg as *const i8) };
|
||||
let msg = msg.to_string_lossy().into_owned();
|
||||
os().set_result(code, msg);
|
||||
}
|
||||
|
||||
@@ -4,14 +4,25 @@
|
||||
|
||||
use std::{collections::HashMap, path::PathBuf, time::Duration};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::BytesMut;
|
||||
use hyper::Uri;
|
||||
use log::info;
|
||||
use safekeeper::{simlib::{node_os::NodeOs, network::TCP, world::NodeEvent, proto::AnyMessage}, safekeeper::{ProposerAcceptorMessage, ServerInfo, SafeKeeperState, UNKNOWN_SERVER_VERSION, SafeKeeper, AcceptorProposerMessage}, timeline::{TimelineError}, SafeKeeperConf};
|
||||
use utils::{id::{TenantTimelineId, NodeId}, lsn::Lsn};
|
||||
use anyhow::{Result, bail};
|
||||
use safekeeper::{
|
||||
safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
|
||||
UNKNOWN_SERVER_VERSION,
|
||||
},
|
||||
simlib::{network::TCP, node_os::NodeOs, proto::AnyMessage, world::NodeEvent},
|
||||
timeline::TimelineError,
|
||||
SafeKeeperConf,
|
||||
};
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::simtest::storage::{InMemoryState, DummyWalStore};
|
||||
use crate::simtest::storage::{DummyWalStore, InMemoryState};
|
||||
|
||||
struct ConnState {
|
||||
tcp: TCP,
|
||||
@@ -60,14 +71,17 @@ pub fn run_server(os: NodeOs) -> Result<()> {
|
||||
|
||||
match event {
|
||||
NodeEvent::Accept(tcp) => {
|
||||
conns.insert(tcp.id(), ConnState {
|
||||
tcp,
|
||||
conf: conf.clone(),
|
||||
greeting: false,
|
||||
ttid: TenantTimelineId::empty(),
|
||||
tli: None,
|
||||
flush_pending: false,
|
||||
});
|
||||
conns.insert(
|
||||
tcp.id(),
|
||||
ConnState {
|
||||
tcp,
|
||||
conf: conf.clone(),
|
||||
greeting: false,
|
||||
ttid: TenantTimelineId::empty(),
|
||||
tli: None,
|
||||
flush_pending: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
NodeEvent::Message((msg, tcp)) => {
|
||||
let conn = conns.get_mut(&tcp.id());
|
||||
@@ -104,7 +118,7 @@ impl ConnState {
|
||||
fn process_any(&mut self, any: AnyMessage) -> Result<()> {
|
||||
if let AnyMessage::Bytes(copy_data) = any {
|
||||
let msg = ProposerAcceptorMessage::parse(copy_data)?;
|
||||
println!("got msg: {:?}", msg);
|
||||
// println!("got msg: {:?}", msg);
|
||||
return self.process(msg);
|
||||
} else {
|
||||
bail!("unexpected message, expected AnyMessage::Bytes");
|
||||
@@ -120,7 +134,7 @@ impl ConnState {
|
||||
|
||||
// TODO: load state from in-memory storage
|
||||
let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
|
||||
|
||||
|
||||
if state.server.wal_seg_size == 0 {
|
||||
bail!(TimelineError::UninitializedWalSegSize(ttid));
|
||||
}
|
||||
@@ -142,12 +156,10 @@ impl ConnState {
|
||||
|
||||
// TODO: implement "persistent" storage for tests
|
||||
let wal_store = DummyWalStore::new();
|
||||
|
||||
|
||||
let sk = SafeKeeper::new(control_store, wal_store, self.conf.my_id)?;
|
||||
|
||||
self.tli = Some(SharedState {
|
||||
sk,
|
||||
});
|
||||
self.tli = Some(SharedState { sk });
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -155,7 +167,7 @@ impl ConnState {
|
||||
fn process(&mut self, msg: ProposerAcceptorMessage) -> Result<()> {
|
||||
if !self.greeting {
|
||||
self.greeting = true;
|
||||
|
||||
|
||||
match msg {
|
||||
ProposerAcceptorMessage::Greeting(ref greeting) => {
|
||||
info!(
|
||||
@@ -171,9 +183,7 @@ impl ConnState {
|
||||
self.create_timeline(ttid, server_info)?
|
||||
}
|
||||
_ => {
|
||||
bail!(
|
||||
"unexpected message {msg:?} instead of greeting"
|
||||
);
|
||||
bail!("unexpected message {msg:?} instead of greeting");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -181,7 +191,9 @@ impl ConnState {
|
||||
match msg {
|
||||
ProposerAcceptorMessage::AppendRequest(append_request) => {
|
||||
self.flush_pending = true;
|
||||
self.process_sk_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(append_request))?;
|
||||
self.process_sk_msg(&ProposerAcceptorMessage::NoFlushAppendRequest(
|
||||
append_request,
|
||||
))?;
|
||||
}
|
||||
other => {
|
||||
self.process_sk_msg(&other)?;
|
||||
@@ -211,12 +223,12 @@ impl ConnState {
|
||||
// // TODO:
|
||||
// }
|
||||
|
||||
println!("sending reply: {:?}", reply);
|
||||
// println!("sending reply: {:?}", reply);
|
||||
|
||||
let mut buf = BytesMut::with_capacity(128);
|
||||
reply.serialize(&mut buf)?;
|
||||
|
||||
println!("sending reply len={}: {}", buf.len(), hex::encode(&buf));
|
||||
// println!("sending reply len={}: {}", buf.len(), hex::encode(&buf));
|
||||
|
||||
self.tcp.send(AnyMessage::Bytes(buf.into()));
|
||||
}
|
||||
@@ -227,7 +239,9 @@ impl ConnState {
|
||||
impl Drop for ConnState {
|
||||
fn drop(&mut self) {
|
||||
println!("dropping conn: {:?}", self.tcp);
|
||||
self.tcp.close();
|
||||
if !std::thread::panicking() {
|
||||
self.tcp.close();
|
||||
}
|
||||
// TODO: clean up non-fsynced WAL
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,21 +1,50 @@
|
||||
use std::{ffi::CString, str::FromStr, sync::Arc};
|
||||
use std::{ffi::CString, path::Path, str::FromStr, sync::Arc};
|
||||
|
||||
use safekeeper::simlib::{
|
||||
network::{Delay, NetworkOptions},
|
||||
proto::AnyMessage,
|
||||
world::World,
|
||||
world::{Node, NodeEvent},
|
||||
world::World, proto::AnyMessage,
|
||||
};
|
||||
use utils::{id::TenantTimelineId, logging, lsn::Lsn};
|
||||
|
||||
use crate::{
|
||||
bindings::{
|
||||
neon_tenant_walproposer, neon_timeline_walproposer, wal_acceptor_connection_timeout,
|
||||
wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers, sim_redo_start_lsn, MyInsertRecord,
|
||||
neon_tenant_walproposer, neon_timeline_walproposer, sim_redo_start_lsn, syncSafekeepers,
|
||||
wal_acceptor_connection_timeout, wal_acceptor_reconnect_timeout, wal_acceptors_list,
|
||||
MyInsertRecord, WalProposerCleanup, WalProposerRust,
|
||||
},
|
||||
c_context,
|
||||
simtest::safekeeper::run_server,
|
||||
};
|
||||
|
||||
struct SkNode {
|
||||
node: Arc<Node>,
|
||||
id: u32,
|
||||
}
|
||||
|
||||
impl SkNode {
|
||||
fn new(node: Arc<Node>) -> Self {
|
||||
let res = Self { id: node.id, node };
|
||||
res.launch();
|
||||
res
|
||||
}
|
||||
|
||||
fn launch(&self) {
|
||||
let id = self.id;
|
||||
// start the server thread
|
||||
self.node.launch(move |os| {
|
||||
let res = run_server(os);
|
||||
println!("server {} finished: {:?}", id, res);
|
||||
});
|
||||
}
|
||||
|
||||
fn restart(&self) {
|
||||
self.node.crash_stop();
|
||||
self.launch();
|
||||
}
|
||||
}
|
||||
|
||||
struct TestConfig {
|
||||
network: NetworkOptions,
|
||||
timeout: u64,
|
||||
@@ -42,42 +71,73 @@ impl TestConfig {
|
||||
}
|
||||
|
||||
fn start(&self, seed: u64) -> Test {
|
||||
let world = Arc::new(World::new(seed, Arc::new(self.network.clone()), c_context()));
|
||||
let world = Arc::new(World::new(
|
||||
seed,
|
||||
Arc::new(self.network.clone()),
|
||||
c_context(),
|
||||
));
|
||||
world.register_world();
|
||||
|
||||
let servers = [world.new_node(), world.new_node(), world.new_node()];
|
||||
let servers = [
|
||||
SkNode::new(world.new_node()),
|
||||
SkNode::new(world.new_node()),
|
||||
SkNode::new(world.new_node()),
|
||||
];
|
||||
|
||||
let server_ids = [servers[0].id, servers[1].id, servers[2].id];
|
||||
let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(",");
|
||||
let ttid = TenantTimelineId::generate();
|
||||
|
||||
// start the server threads
|
||||
for ptr in servers.iter() {
|
||||
let server = ptr.clone();
|
||||
let id = server.id;
|
||||
server.launch(move |os| {
|
||||
let res = run_server(os);
|
||||
println!("server {} finished: {:?}", id, res);
|
||||
});
|
||||
}
|
||||
|
||||
// wait init for all servers
|
||||
world.await_all();
|
||||
|
||||
// clean up pgdata directory
|
||||
self.init_pgdata();
|
||||
|
||||
Test {
|
||||
world,
|
||||
servers,
|
||||
server_ids,
|
||||
safekeepers_guc,
|
||||
ttid,
|
||||
timeout: self.timeout,
|
||||
}
|
||||
}
|
||||
|
||||
fn init_pgdata(&self) {
|
||||
let pgdata = Path::new("/home/admin/simulator/libs/walproposer/pgdata");
|
||||
if pgdata.exists() {
|
||||
std::fs::remove_dir_all(pgdata).unwrap();
|
||||
}
|
||||
std::fs::create_dir(pgdata).unwrap();
|
||||
|
||||
// create empty pg_wal and pg_notify subdirs
|
||||
std::fs::create_dir(pgdata.join("pg_wal")).unwrap();
|
||||
std::fs::create_dir(pgdata.join("pg_notify")).unwrap();
|
||||
|
||||
// write postgresql.conf
|
||||
let mut conf = std::fs::File::create(pgdata.join("postgresql.conf")).unwrap();
|
||||
let content = "
|
||||
wal_log_hints=off
|
||||
hot_standby=on
|
||||
fsync=off
|
||||
wal_level=replica
|
||||
restart_after_crash=off
|
||||
shared_preload_libraries=neon
|
||||
neon.pageserver_connstring=''
|
||||
neon.tenant_id=cc6e67313d57283bad411600fbf5c142
|
||||
neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e
|
||||
synchronous_standby_names=walproposer
|
||||
neon.safekeepers='node:1,node:2,node:3'
|
||||
max_connections=100
|
||||
";
|
||||
|
||||
std::io::Write::write_all(&mut conf, content.as_bytes()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
struct Test {
|
||||
world: Arc<World>,
|
||||
servers: [Arc<Node>; 3],
|
||||
server_ids: [u32; 3],
|
||||
servers: [SkNode; 3],
|
||||
safekeepers_guc: String,
|
||||
ttid: TenantTimelineId,
|
||||
timeout: u64,
|
||||
@@ -162,8 +222,11 @@ impl Test {
|
||||
|
||||
self.world.await_all();
|
||||
|
||||
WalProposer {
|
||||
WalProposer {
|
||||
node: client_node,
|
||||
txes: Vec::new(),
|
||||
last_committed_tx: 0,
|
||||
commit_lsn: Lsn(0),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,17 +238,54 @@ impl Test {
|
||||
|
||||
struct WalProposer {
|
||||
node: Arc<Node>,
|
||||
txes: Vec<Lsn>,
|
||||
last_committed_tx: usize,
|
||||
commit_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl WalProposer {
|
||||
fn gen_wal_record(&self) -> Lsn {
|
||||
fn write_tx(&mut self) -> usize {
|
||||
let new_ptr = unsafe { MyInsertRecord() };
|
||||
|
||||
self.node.network_chan().send(NodeEvent::Internal(
|
||||
AnyMessage::LSN(new_ptr as u64),
|
||||
));
|
||||
self.node
|
||||
.network_chan()
|
||||
.send(NodeEvent::Internal(AnyMessage::LSN(new_ptr as u64)));
|
||||
|
||||
return Lsn(new_ptr as u64);
|
||||
let tx_id = self.txes.len();
|
||||
self.txes.push(Lsn(new_ptr as u64));
|
||||
|
||||
tx_id
|
||||
}
|
||||
|
||||
/// Updates committed status.
|
||||
fn update(&mut self) {
|
||||
let last_result = self.node.result.lock().clone();
|
||||
if last_result.0 != 1 {
|
||||
// not an LSN update
|
||||
return;
|
||||
}
|
||||
|
||||
let lsn_str = last_result.1;
|
||||
let lsn = Lsn::from_str(&lsn_str);
|
||||
match lsn {
|
||||
Ok(lsn) => {
|
||||
self.commit_lsn = lsn;
|
||||
println!("commit_lsn: {}", lsn);
|
||||
|
||||
while self.last_committed_tx < self.txes.len()
|
||||
&& self.txes[self.last_committed_tx] <= lsn
|
||||
{
|
||||
println!(
|
||||
"Tx #{} was commited at {}, last_commit_lsn={}",
|
||||
self.last_committed_tx, self.txes[self.last_committed_tx], self.commit_lsn
|
||||
);
|
||||
self.last_committed_tx += 1;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("failed to parse LSN: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,7 +302,7 @@ fn sync_empty_safekeepers() {
|
||||
|
||||
let lsn = test.sync_safekeepers().unwrap();
|
||||
assert_eq!(lsn, Lsn(0));
|
||||
println!("Sucessfully synced empty safekeepers at 0/0");
|
||||
println!("Sucessfully synced (again) empty safekeepers at 0/0");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -217,13 +317,43 @@ fn run_walproposer_generate_wal() {
|
||||
assert_eq!(lsn, Lsn(0));
|
||||
println!("Sucessfully synced empty safekeepers at 0/0");
|
||||
|
||||
let wp = test.launch_walproposer(lsn);
|
||||
// let rec1 = wp.gen_wal_record();
|
||||
let mut wp = test.launch_walproposer(lsn);
|
||||
|
||||
test.poll_for_duration(30);
|
||||
|
||||
for i in 0..100 {
|
||||
wp.gen_wal_record();
|
||||
wp.write_tx();
|
||||
test.poll_for_duration(5);
|
||||
wp.update();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn crash_safekeeper() {
|
||||
logging::init(logging::LogFormat::Plain).unwrap();
|
||||
|
||||
let mut config = TestConfig::new();
|
||||
// config.network.timeout = Some(250);
|
||||
let test = config.start(1337);
|
||||
|
||||
let lsn = test.sync_safekeepers().unwrap();
|
||||
assert_eq!(lsn, Lsn(0));
|
||||
println!("Sucessfully synced empty safekeepers at 0/0");
|
||||
|
||||
let mut wp = test.launch_walproposer(lsn);
|
||||
|
||||
test.poll_for_duration(30);
|
||||
wp.update();
|
||||
|
||||
wp.write_tx();
|
||||
wp.write_tx();
|
||||
wp.write_tx();
|
||||
|
||||
test.servers[0].restart();
|
||||
|
||||
test.poll_for_duration(100);
|
||||
wp.update();
|
||||
|
||||
test.poll_for_duration(1000);
|
||||
wp.update();
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ void MyContextInit() {
|
||||
if (!SelectConfigFiles(NULL, progname))
|
||||
exit(1);
|
||||
|
||||
log_min_messages = DEBUG5;
|
||||
log_min_messages = LOG;
|
||||
|
||||
InitializeMaxBackends();
|
||||
ChangeToDataDir();
|
||||
|
||||
@@ -2332,6 +2332,16 @@ HandleSafekeeperResponse(void)
|
||||
quorumFeedback.rf.ps_flushlsn,
|
||||
GetCurrentTimestamp(), false);
|
||||
#endif
|
||||
|
||||
#ifdef SIMLIB
|
||||
if (!syncSafekeepers)
|
||||
{
|
||||
char lsn_str[8 + 1 + 8 + 1];
|
||||
|
||||
snprintf(lsn_str, sizeof(lsn_str), "%X/%X", LSN_FORMAT_ARGS(quorumFeedback.flushLsn));
|
||||
sim_set_result(1, lsn_str);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
CombineHotStanbyFeedbacks(&hsFeedback);
|
||||
@@ -2466,10 +2476,10 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg)
|
||||
if (!(AsyncRead(sk, &buf, &buf_size)))
|
||||
return false;
|
||||
|
||||
for (int i = 0; i < buf_size; i++) {
|
||||
fprintf(stderr, "%02x", buf[i]);
|
||||
}
|
||||
fprintf(stderr, "\n");
|
||||
// for (int i = 0; i < buf_size; i++) {
|
||||
// fprintf(stderr, "%02x", buf[i]);
|
||||
// }
|
||||
// fprintf(stderr, "\n");
|
||||
|
||||
/* parse it */
|
||||
s.data = buf;
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::{
|
||||
use rand::{rngs::StdRng, Rng};
|
||||
|
||||
use super::{
|
||||
chan::Chan,
|
||||
proto::AnyMessage,
|
||||
sync::Mutex,
|
||||
time::NetworkEvent,
|
||||
@@ -70,6 +71,7 @@ pub struct VirtualConnection {
|
||||
pub connection_id: u64,
|
||||
pub world: Arc<World>,
|
||||
pub nodes: [Arc<Node>; 2],
|
||||
dst_sockets: [Chan<NodeEvent>; 2],
|
||||
state: Mutex<ConnectionState>,
|
||||
options: Arc<NetworkOptions>,
|
||||
}
|
||||
@@ -93,6 +95,7 @@ impl VirtualConnection {
|
||||
let conn = Arc::new(Self {
|
||||
connection_id: id,
|
||||
world,
|
||||
dst_sockets: [src.network_chan(), dst.network_chan()],
|
||||
nodes: [src, dst],
|
||||
state: Mutex::new(ConnectionState {
|
||||
buffers: [NetworkBuffer::new(None), NetworkBuffer::new(Some(now))],
|
||||
@@ -105,7 +108,7 @@ impl VirtualConnection {
|
||||
conn.send_connect();
|
||||
|
||||
// TODO: add connection to the dst node
|
||||
// conn.nodes[1].network_chan().send(NodeEvent::Connection(conn.clone()));
|
||||
// conn.dst_sockets[1].send(NodeEvent::Connection(conn.clone()));
|
||||
|
||||
conn
|
||||
}
|
||||
@@ -128,8 +131,7 @@ impl VirtualConnection {
|
||||
state.deref_mut(),
|
||||
now,
|
||||
direction as MessageDirection,
|
||||
&self.nodes[direction],
|
||||
&self.nodes[direction ^ 1],
|
||||
&self.dst_sockets[direction ^ 1],
|
||||
);
|
||||
}
|
||||
|
||||
@@ -171,8 +173,7 @@ impl VirtualConnection {
|
||||
state: &mut ConnectionState,
|
||||
now: u64,
|
||||
direction: MessageDirection,
|
||||
from_node: &Arc<Node>,
|
||||
to_node: &Arc<Node>,
|
||||
to_socket: &Chan<NodeEvent>,
|
||||
) {
|
||||
let buffer = &mut state.buffers[direction as usize];
|
||||
if buffer.recv_closed {
|
||||
@@ -183,19 +184,17 @@ impl VirtualConnection {
|
||||
let msg = buffer.buf.pop_front().unwrap().1;
|
||||
let callback = TCP::new(self.clone(), direction ^ 1);
|
||||
|
||||
println!(
|
||||
"NET(time={}): {:?} delivered, {}=>{}",
|
||||
now, msg, from_node.id, to_node.id
|
||||
);
|
||||
// println!(
|
||||
// "NET(time={}): {:?} delivered, {}=>{}",
|
||||
// now, msg, from_node.id, to_node.id
|
||||
// );
|
||||
buffer.last_recv = Some(now);
|
||||
self.schedule_timeout();
|
||||
|
||||
if let AnyMessage::InternalConnect = msg {
|
||||
to_node.network_chan().send(NodeEvent::Accept(callback));
|
||||
to_socket.send(NodeEvent::Accept(callback));
|
||||
} else {
|
||||
to_node
|
||||
.network_chan()
|
||||
.send(NodeEvent::Message((msg, callback)));
|
||||
to_socket.send(NodeEvent::Message((msg, callback)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -299,8 +298,7 @@ impl VirtualConnection {
|
||||
send_buffer.send_closed = true;
|
||||
// TODO: notify the other side?
|
||||
|
||||
node.network_chan()
|
||||
.send(NodeEvent::Closed(TCP::new(self.clone(), node_idx as u8)));
|
||||
self.dst_sockets[node_idx].send(NodeEvent::Closed(TCP::new(self.clone(), node_idx as u8)));
|
||||
}
|
||||
|
||||
/// Get an event suitable for scheduling.
|
||||
|
||||
@@ -87,6 +87,8 @@ struct ParkState {
|
||||
/// world simulation to wake it up. True means that the parking is
|
||||
/// finished and the thread can continue.
|
||||
finished: bool,
|
||||
/// True means that the thread should wake up and panic.
|
||||
panic: bool,
|
||||
node_id: Option<NodeId>,
|
||||
backtrace: Option<Backtrace>,
|
||||
}
|
||||
@@ -97,6 +99,7 @@ impl Park {
|
||||
lock: Mutex::new(ParkState {
|
||||
can_continue,
|
||||
finished: false,
|
||||
panic: false,
|
||||
node_id: None,
|
||||
backtrace: None,
|
||||
}),
|
||||
@@ -131,7 +134,7 @@ impl Park {
|
||||
|
||||
parking_lot::MutexGuard::unlocked(&mut state, || {
|
||||
// conditional parking, decrease the running threads counter without parking
|
||||
node.internal_parking_start();
|
||||
node.internal_parking_start(self.clone());
|
||||
});
|
||||
|
||||
// wait for condition
|
||||
@@ -139,6 +142,10 @@ impl Park {
|
||||
self.cvar.wait(&mut state);
|
||||
}
|
||||
|
||||
if state.panic {
|
||||
panic!("thread was crashed by the simulation");
|
||||
}
|
||||
|
||||
// println!("CONDITION MET: node {:?}", node.id);
|
||||
// condition is met, we are now running instead of the waker thread.
|
||||
// the next thing is to park the thread in the world, then decrease
|
||||
@@ -155,6 +162,10 @@ impl Park {
|
||||
self.cvar.wait(state);
|
||||
}
|
||||
|
||||
if state.panic {
|
||||
panic!("thread was crashed by the simulation");
|
||||
}
|
||||
|
||||
// println!("PARKING ENDED: node {:?}", node.id);
|
||||
|
||||
// We are the only running thread now, we just need to update the state,
|
||||
@@ -187,11 +198,11 @@ impl Park {
|
||||
"WARN wake() called on a thread that is already waked, node {:?}",
|
||||
state.node_id
|
||||
);
|
||||
return;
|
||||
} else {
|
||||
state.can_continue = true;
|
||||
// and here we park the waiting thread
|
||||
self.cvar.notify_all();
|
||||
}
|
||||
state.can_continue = true;
|
||||
// and here we park the waiting thread
|
||||
self.cvar.notify_all();
|
||||
drop(state);
|
||||
|
||||
// and here we block the thread that called wake() by defer
|
||||
@@ -206,16 +217,16 @@ impl Park {
|
||||
/// 2. Wake up the waiting thread (it will park itself in the world)
|
||||
pub fn external_wake(&self) {
|
||||
let world = World::current();
|
||||
world.internal_parking_wake();
|
||||
|
||||
let mut state = self.lock.lock();
|
||||
if state.can_continue {
|
||||
println!(
|
||||
"WARN wake() called on a thread that is already waked, node {:?}",
|
||||
"WARN external_wake() called on a thread that is already waked, node {:?}",
|
||||
state.node_id
|
||||
);
|
||||
return;
|
||||
}
|
||||
world.internal_parking_wake();
|
||||
state.can_continue = true;
|
||||
// and here we park the waiting thread
|
||||
self.cvar.notify_all();
|
||||
@@ -236,9 +247,19 @@ impl Park {
|
||||
self.cvar.notify_all();
|
||||
}
|
||||
|
||||
/// Will wake up thread to panic instantly.
|
||||
pub fn crash_panic(&self) {
|
||||
let mut state = self.lock.lock();
|
||||
state.can_continue = true;
|
||||
state.finished = true;
|
||||
state.panic = true;
|
||||
self.cvar.notify_all();
|
||||
drop(state);
|
||||
}
|
||||
|
||||
/// Print debug info about the parked thread.
|
||||
pub fn debug_print(&self) {
|
||||
let _state = self.lock.lock();
|
||||
// let state = self.lock.lock();
|
||||
// println!("PARK: node {:?} wake1={} wake2={}", state.node_id, state.can_continue, state.finished);
|
||||
// println!("DEBUG: node {:?} wake1={} wake2={}, trace={:?}", state.node_id, state.can_continue, state.finished, state.backtrace);
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ impl Timing {
|
||||
|
||||
if !self.is_event_ready() {
|
||||
let next_time = self.timers.peek().unwrap().time;
|
||||
println!("CLK(time={})", next_time);
|
||||
// println!("CLK(time={})", next_time);
|
||||
self.current_time = next_time;
|
||||
assert!(self.is_event_ready());
|
||||
}
|
||||
|
||||
@@ -221,6 +221,20 @@ impl World {
|
||||
// waking node with condition, increase the running threads counter
|
||||
self.wait_group.add(1);
|
||||
}
|
||||
|
||||
fn find_parked_node(&self, node: &Node) -> Option<Arc<Park>> {
|
||||
let mut parking = self.unconditional_parking.lock();
|
||||
let mut found: Option<usize> = None;
|
||||
for (i, park) in parking.iter().enumerate() {
|
||||
if park.node_id() == Some(node.id) {
|
||||
if found.is_some() {
|
||||
panic!("found more than one parked thread for node {}", node.id);
|
||||
}
|
||||
found = Some(i);
|
||||
}
|
||||
}
|
||||
Some(parking.swap_remove(found?))
|
||||
}
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
@@ -231,8 +245,9 @@ thread_local! {
|
||||
/// Internal node state.
|
||||
pub struct Node {
|
||||
pub id: NodeId,
|
||||
network: Chan<NodeEvent>,
|
||||
network: Mutex<Chan<NodeEvent>>,
|
||||
status: Mutex<NodeStatus>,
|
||||
waiting_park: Mutex<Arc<Park>>,
|
||||
world: Arc<World>,
|
||||
join_handle: Mutex<Option<std::thread::JoinHandle<()>>>,
|
||||
pub rng: Mutex<StdRng>,
|
||||
@@ -254,8 +269,9 @@ impl Node {
|
||||
pub fn new(id: NodeId, world: Arc<World>, rng: StdRng) -> Node {
|
||||
Node {
|
||||
id,
|
||||
network: Chan::new(),
|
||||
network: Mutex::new(Chan::new()),
|
||||
status: Mutex::new(NodeStatus::NotStarted),
|
||||
waiting_park: Mutex::new(Park::new(false)),
|
||||
world,
|
||||
join_handle: Mutex::new(None),
|
||||
rng: Mutex::new(rng),
|
||||
@@ -279,7 +295,7 @@ impl Node {
|
||||
}
|
||||
|
||||
let mut status = node.status.lock();
|
||||
if *status != NodeStatus::NotStarted {
|
||||
if *status != NodeStatus::NotStarted && *status != NodeStatus::Finished {
|
||||
// clearly a caller bug, should never happen
|
||||
panic!("node {} is already running", node.id);
|
||||
}
|
||||
@@ -307,7 +323,6 @@ impl Node {
|
||||
|
||||
let mut status = node.status.lock();
|
||||
*status = NodeStatus::Finished;
|
||||
// TODO: log the thread is finished
|
||||
});
|
||||
*self.join_handle.lock() = Some(join_handle);
|
||||
|
||||
@@ -318,16 +333,17 @@ impl Node {
|
||||
|
||||
/// Returns a channel to receive events from the network.
|
||||
pub fn network_chan(&self) -> Chan<NodeEvent> {
|
||||
self.network.clone()
|
||||
self.network.lock().clone()
|
||||
}
|
||||
|
||||
pub fn internal_parking_start(&self) {
|
||||
pub fn internal_parking_start(&self, park: Arc<Park>) {
|
||||
// Node started parking (waiting for condition), and the current thread
|
||||
// is the only one running, so we need to do:
|
||||
// 1. Change the node status to Waiting
|
||||
// 2. Decrease the running threads counter
|
||||
// 3. Block the current thread until it's woken up (outside this function)
|
||||
*self.status.lock() = NodeStatus::Waiting;
|
||||
*self.waiting_park.lock() = park;
|
||||
self.world.wait_group.done();
|
||||
}
|
||||
|
||||
@@ -372,6 +388,40 @@ impl Node {
|
||||
let status = self.status.lock();
|
||||
*status == NodeStatus::Finished
|
||||
}
|
||||
|
||||
pub fn crash_stop(self: &Arc<Self>) {
|
||||
let status = self.status.lock().clone();
|
||||
match status {
|
||||
NodeStatus::NotStarted | NodeStatus::Finished | NodeStatus::Failed => return,
|
||||
NodeStatus::Running => {
|
||||
panic!("crash unexpected node state: Running")
|
||||
}
|
||||
NodeStatus::Waiting | NodeStatus::Parked => {}
|
||||
}
|
||||
|
||||
println!("Node {} is crashing, status={:?}", self.id, status);
|
||||
self.world.debug_print_state();
|
||||
|
||||
let park = self.world.find_parked_node(self);
|
||||
|
||||
let park = if park.is_some() {
|
||||
assert!(status == NodeStatus::Parked);
|
||||
park.unwrap()
|
||||
} else {
|
||||
assert!(status == NodeStatus::Waiting);
|
||||
self.waiting_park.lock().clone()
|
||||
};
|
||||
|
||||
park.debug_print();
|
||||
|
||||
// unplug old network socket, and create a new one
|
||||
*self.network.lock() = Chan::new();
|
||||
|
||||
self.world.wait_group.add(1);
|
||||
park.crash_panic();
|
||||
// self.world.debug_print_state();
|
||||
self.world.wait_group.wait();
|
||||
}
|
||||
}
|
||||
|
||||
/// Network events and timers.
|
||||
|
||||
Reference in New Issue
Block a user