From d87e822169912698a12e58faec4caf4b629ad547 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Mon, 24 Jul 2023 21:15:35 +0000 Subject: [PATCH] Return LSN from sync safekeepers --- Cargo.lock | 1 + libs/walproposer/Cargo.toml | 1 + libs/walproposer/libpqwalproposer.c | 24 ++-- libs/walproposer/rust_bindings.h | 2 + libs/walproposer/src/sim.rs | 26 +++- libs/walproposer/src/simtest/wp_sk.rs | 186 ++++++++++++++++++-------- pgxn/neon/walproposer.c | 135 ++++++++++--------- pgxn/neon/walproposer.h | 21 +++ pgxn/neon/walproposer_utils.c | 12 +- safekeeper/src/simlib/node_os.rs | 21 +-- safekeeper/src/simlib/world.rs | 28 +++- 11 files changed, 303 insertions(+), 154 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67a5533220..6124c7fb1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4650,6 +4650,7 @@ dependencies = [ "env_logger", "hex", "hyper", + "libc", "log", "memoffset 0.8.0", "once_cell", diff --git a/libs/walproposer/Cargo.toml b/libs/walproposer/Cargo.toml index 688932e85d..cb7b84bc11 100644 --- a/libs/walproposer/Cargo.toml +++ b/libs/walproposer/Cargo.toml @@ -14,6 +14,7 @@ crc32c.workspace = true hex.workspace = true once_cell.workspace = true log.workspace = true +libc.workspace = true memoffset.workspace = true thiserror.workspace = true serde.workspace = true diff --git a/libs/walproposer/libpqwalproposer.c b/libs/walproposer/libpqwalproposer.c index 0c61c53573..c45fb38336 100644 --- a/libs/walproposer/libpqwalproposer.c +++ b/libs/walproposer/libpqwalproposer.c @@ -13,7 +13,7 @@ struct WalProposerConn static bool ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking) { - elog(INFO, "not implemented"); + walprop_log(INFO, "not implemented"); return false; } @@ -21,14 +21,14 @@ ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking) char * walprop_error_message(WalProposerConn *conn) { - elog(INFO, "not implemented"); + walprop_log(INFO, "not implemented"); return NULL; } WalProposerConnStatusType walprop_status(WalProposerConn *conn) { - elog(INFO, "not implemented: walprop_status"); + walprop_log(INFO, "not implemented: walprop_status"); return WP_CONNECTION_OK; } @@ -37,7 +37,7 @@ walprop_connect_start(char *conninfo) { WalProposerConn *conn; - elog(INFO, "walprop_connect_start: %s", conninfo); + walprop_log(INFO, "walprop_connect_start: %s", conninfo); const char *connstr_prefix = "host=node port="; Assert(strncmp(conninfo, connstr_prefix, strlen(connstr_prefix)) == 0); @@ -52,21 +52,21 @@ walprop_connect_start(char *conninfo) WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn) { - elog(INFO, "not implemented: walprop_connect_poll"); + walprop_log(INFO, "not implemented: walprop_connect_poll"); return WP_CONN_POLLING_OK; } bool walprop_send_query(WalProposerConn *conn, char *query) { - elog(INFO, "not implemented: walprop_send_query"); + walprop_log(INFO, "not implemented: walprop_send_query"); return true; } WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn) { - elog(INFO, "not implemented: walprop_get_query_result"); + walprop_log(INFO, "not implemented: walprop_get_query_result"); return WP_EXEC_SUCCESS_COPYBOTH; } @@ -79,14 +79,14 @@ walprop_socket(WalProposerConn *conn) int walprop_flush(WalProposerConn *conn) { - elog(INFO, "not implemented"); + walprop_log(INFO, "not implemented"); return 0; } void walprop_finish(WalProposerConn *conn) { - elog(INFO, "not implemented"); + walprop_log(INFO, "not implemented"); } /* @@ -104,7 +104,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) msg = sim_msg_get_bytes(&len); *buf = msg; *amount = len; - elog(INFO, "walprop_async_read: %d", len); + walprop_log(INFO, "walprop_async_read: %d", len); return PG_ASYNC_READ_SUCCESS; } @@ -112,7 +112,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) { - elog(INFO, "not implemented"); + walprop_log(INFO, "not implemented"); return PG_ASYNC_WRITE_FAIL; } @@ -123,7 +123,7 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) { - elog(INFO, "not implemented: walprop_blocking_write"); + walprop_log(INFO, "not implemented: walprop_blocking_write"); sim_msg_set_bytes(buf, size); sim_tcp_send(conn->tcp); return true; diff --git a/libs/walproposer/rust_bindings.h b/libs/walproposer/rust_bindings.h index d1014ffbae..e65a7bbaed 100644 --- a/libs/walproposer/rust_bindings.h +++ b/libs/walproposer/rust_bindings.h @@ -57,6 +57,8 @@ struct Event sim_epoll_rcv(int64_t timeout); int64_t sim_now(void); +void sim_exit(int32_t code, const uint8_t *msg); + /** * Get tag of the current message. */ diff --git a/libs/walproposer/src/sim.rs b/libs/walproposer/src/sim.rs index 5ef28b3e7d..1b58238189 100644 --- a/libs/walproposer/src/sim.rs +++ b/libs/walproposer/src/sim.rs @@ -1,8 +1,11 @@ -use std::{cell::RefCell, collections::HashMap}; - use safekeeper::simlib::{network::TCP, node_os::NodeOs, world::NodeEvent}; +use std::{ + cell::RefCell, + collections::HashMap, + ffi::{CStr, CString}, +}; -use crate::sim_proto::{AnyMessageTag, Event, EventTag, MESSAGE_BUF, anymessage_tag}; +use crate::sim_proto::{anymessage_tag, AnyMessageTag, Event, EventTag, MESSAGE_BUF}; thread_local! { static CURRENT_NODE_OS: RefCell> = RefCell::new(None); @@ -115,3 +118,20 @@ pub extern "C" fn sim_epoll_rcv(timeout: i64) -> Event { pub extern "C" fn sim_now() -> i64 { os().now() as i64 } + +#[no_mangle] +pub extern "C" fn sim_exit(code: i32, msg: *const u8) { + let msg = unsafe { CStr::from_ptr(msg as *const i8) }; + let msg = msg.to_string_lossy().into_owned(); + println!("sim_exit({}, {:?})", code, msg); + os().set_result(code, msg); + + // I tried to make use of pthread_exit, but it doesn't work. + // https://github.com/rust-lang/unsafe-code-guidelines/issues/211 + // unsafe { libc::pthread_exit(std::ptr::null_mut()) }; + + // https://doc.rust-lang.org/nomicon/unwinding.html + // Everyone on the internet saying this is UB, but it works for me, + // so I'm going to use it for now. + panic!("sim_exit() called from C code") +} diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index 5506493997..3d67266bb8 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -1,69 +1,137 @@ -use std::{sync::Arc, ffi::CString}; +use std::{ffi::CString, str::FromStr, sync::Arc}; -use safekeeper::simlib::{network::{Delay, NetworkOptions}, world::World}; -use utils::{id::TenantTimelineId, logging}; +use safekeeper::simlib::{ + network::{Delay, NetworkOptions}, + world::Node, + world::World, +}; +use utils::{id::TenantTimelineId, logging, lsn::Lsn}; -use crate::{simtest::safekeeper::run_server, c_context, bindings::{WalProposerRust, wal_acceptors_list, wal_acceptor_reconnect_timeout, wal_acceptor_connection_timeout, neon_tenant_walproposer, neon_timeline_walproposer}}; +use crate::{ + bindings::{ + neon_tenant_walproposer, neon_timeline_walproposer, wal_acceptor_connection_timeout, + wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, + }, + c_context, + simtest::safekeeper::run_server, +}; -#[test] -fn run_walproposer_safekeeper_test() { - logging::init(logging::LogFormat::Plain).unwrap(); +struct TestConfig { + network: Arc, + timeout: u64, +} - let delay = Delay { - min: 1, - max: 5, - fail_prob: 0.0, - }; - - let network = NetworkOptions { - timeout: Some(1000), - connect_delay: delay.clone(), - send_delay: delay.clone(), - }; - let seed = 1337; - - let network = Arc::new(network); - let world = Arc::new(World::new(seed, network, c_context())); - world.register_world(); - - let client_node = world.new_node(); - - let servers = [world.new_node(), world.new_node(), world.new_node()]; - let server_ids = [servers[0].id, servers[1].id, servers[2].id]; - let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(","); - - println!("server ids: {:?}", safekeepers_guc); - let ttid = TenantTimelineId::generate(); - - // start the client thread - client_node.launch(move |_| { - let list = CString::new(safekeepers_guc).unwrap(); - - unsafe { - wal_acceptors_list = list.into_raw(); - wal_acceptor_reconnect_timeout = 1000; - wal_acceptor_connection_timeout = 5000; - neon_tenant_walproposer = CString::new(ttid.tenant_id.to_string()).unwrap().into_raw(); - neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string()).unwrap().into_raw(); - WalProposerRust(); +impl TestConfig { + fn new() -> Self { + Self { + network: Arc::new(NetworkOptions { + timeout: Some(1000), + connect_delay: Delay { + min: 1, + max: 5, + fail_prob: 0.0, + }, + send_delay: Delay { + min: 1, + max: 5, + fail_prob: 0.0, + }, + }), + timeout: 1_000 * 10, } - // TODO: run sync-safekeepers - }); - - // start server threads - for ptr in servers.iter() { - let server = ptr.clone(); - let id = server.id; - server.launch(move |os| { - let res = run_server(os); - println!("server {} finished: {:?}", id, res); - }); } - world.await_all(); - let time_limit = 1_000_0; + fn start(&self, seed: u64) -> Test { + let world = Arc::new(World::new(seed, self.network.clone(), c_context())); + world.register_world(); - while world.step() && world.now() < time_limit {} + let servers = [world.new_node(), world.new_node(), world.new_node()]; + let server_ids = [servers[0].id, servers[1].id, servers[2].id]; + let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(","); + let ttid = TenantTimelineId::generate(); - // TODO: verify sync_safekeepers LSN + // start the server threads + for ptr in servers.iter() { + let server = ptr.clone(); + let id = server.id; + server.launch(move |os| { + let res = run_server(os); + println!("server {} finished: {:?}", id, res); + }); + } + + // wait init for all servers + world.await_all(); + + Test { + world, + servers, + server_ids, + safekeepers_guc, + ttid, + timeout: self.timeout, + } + } +} + +struct Test { + world: Arc, + servers: [Arc; 3], + server_ids: [u32; 3], + safekeepers_guc: String, + ttid: TenantTimelineId, + timeout: u64, +} + +impl Test { + fn sync_safekeepers(&self) -> anyhow::Result { + let client_node = self.world.new_node(); + + // start the client thread + let guc = self.safekeepers_guc.clone(); + let ttid = self.ttid.clone(); + client_node.launch(move |_| { + let list = CString::new(guc).unwrap(); + + unsafe { + wal_acceptors_list = list.into_raw(); + wal_acceptor_reconnect_timeout = 1000; + wal_acceptor_connection_timeout = 5000; + neon_tenant_walproposer = + CString::new(ttid.tenant_id.to_string()).unwrap().into_raw(); + neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string()) + .unwrap() + .into_raw(); + WalProposerRust(); + } + }); + + self.world.await_all(); + + // poll until exit or timeout + let time_limit = self.timeout; + while self.world.step() && self.world.now() < time_limit && !client_node.is_finished() {} + + if !client_node.is_finished() { + anyhow::bail!("timeout or idle stuck"); + } + + let res = client_node.result.lock().clone(); + if res.0 != 0 { + anyhow::bail!("non-zero exitcode: {:?}", res); + } + let lsn = Lsn::from_str(&res.1)?; + Ok(lsn) + } +} + +#[test] +fn sync_empty_safekeepers() { + logging::init(logging::LogFormat::Plain).unwrap(); + + let config = TestConfig::new(); + let test = config.start(1337); + + let lsn = test.sync_safekeepers().unwrap(); + assert_eq!(lsn, Lsn(0)); } diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 5842c1716a..98988c8239 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -323,7 +323,7 @@ nwp_shmem_startup_hook(void) void WalProposerRust() { - elog(LOG, "WalProposerRust"); + walprop_log(LOG, "WalProposerRust"); WalProposerSync(0, NULL); } @@ -395,7 +395,7 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events) Event event = sim_epoll_rcv(timeout); if (event.tag == Closed) { // TODO: shutdown connection? - // elog(LOG, "connection closed"); + // walprop_log(LOG, "connection closed"); // ShutdownConnection(sk); return 0; } else if (event.tag == Message) { @@ -409,7 +409,7 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events) return 1; } } - elog(FATAL, "unknown tcp connection"); + walprop_log(FATAL, "unknown tcp connection"); } else if (event.tag == Timeout) { return 0; } else { @@ -497,7 +497,7 @@ WalProposerPoll(void) if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now, wal_acceptor_connection_timeout)) { - elog(WARNING, "failed to connect to node '%s:%s' in '%s' state: exceeded connection timeout %dms", + walprop_log(WARNING, "failed to connect to node '%s:%s' in '%s' state: exceeded connection timeout %dms", sk->host, sk->port, FormatSafekeeperState(sk->state), wal_acceptor_connection_timeout); ShutdownConnection(sk); } @@ -541,7 +541,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) #ifndef SIMLIB load_file("libpqwalreceiver", false); if (WalReceiverFunctions == NULL) - elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + walprop_log(ERROR, "libpqwalreceiver didn't initialize correctly"); #endif for (host = wal_acceptors_list; host != NULL && *host != '\0'; host = sep) @@ -549,7 +549,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) port = strchr(host, ':'); if (port == NULL) { - elog(FATAL, "port is not specified"); + walprop_log(FATAL, "port is not specified"); } *port++ = '\0'; sep = strchr(port, ','); @@ -557,7 +557,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) *sep++ = '\0'; if (n_safekeepers + 1 >= MAX_SAFEKEEPERS) { - elog(FATAL, "Too many safekeepers"); + walprop_log(FATAL, "Too many safekeepers"); } safekeeper[n_safekeepers].host = host; safekeeper[n_safekeepers].port = port; @@ -580,13 +580,13 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) } if (written > MAXCONNINFO || written < 0) - elog(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); + walprop_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); } initStringInfo(&safekeeper[n_safekeepers].outbuf); safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL); if (safekeeper[n_safekeepers].xlogreader == NULL) - elog(FATAL, "Failed to allocate xlog reader"); + walprop_log(FATAL, "Failed to allocate xlog reader"); safekeeper[n_safekeepers].flushWrite = false; safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr; safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr; @@ -594,7 +594,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) } if (n_safekeepers < 1) { - elog(FATAL, "Safekeepers addresses are not specified"); + walprop_log(FATAL, "Safekeepers addresses are not specified"); } quorum = n_safekeepers / 2 + 1; @@ -605,15 +605,15 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) pg_strong_random(&greetRequest.proposerId, sizeof(greetRequest.proposerId)); greetRequest.systemId = systemId; if (!neon_timeline_walproposer) - elog(FATAL, "neon.timeline_id is not provided"); + walprop_log(FATAL, "neon.timeline_id is not provided"); if (*neon_timeline_walproposer != '\0' && !HexDecodeString(greetRequest.timeline_id, neon_timeline_walproposer, 16)) - elog(FATAL, "Could not parse neon.timeline_id, %s", neon_timeline_walproposer); + walprop_log(FATAL, "Could not parse neon.timeline_id, %s", neon_timeline_walproposer); if (!neon_tenant_walproposer) - elog(FATAL, "neon.tenant_id is not provided"); + walprop_log(FATAL, "neon.tenant_id is not provided"); if (*neon_tenant_walproposer != '\0' && !HexDecodeString(greetRequest.tenant_id, neon_tenant_walproposer, 16)) - elog(FATAL, "Could not parse neon.tenant_id, %s", neon_tenant_walproposer); + walprop_log(FATAL, "Could not parse neon.tenant_id, %s", neon_tenant_walproposer); #if PG_VERSION_NUM >= 150000 /* FIXME don't use hardcoded timeline id */ @@ -653,7 +653,7 @@ static void InitEventSet(void) { if (waitEvents) - elog(FATAL, "double-initialization of event set"); + walprop_log(FATAL, "double-initialization of event set"); waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + n_safekeepers); AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET, @@ -723,19 +723,19 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove) static void InitEventSet(void) { - elog(DEBUG5, "InitEventSet"); + walprop_log(DEBUG5, "InitEventSet"); } static void UpdateEventSet(Safekeeper *sk, uint32 events) { - elog(DEBUG5, "UpdateEventSet"); + walprop_log(DEBUG5, "UpdateEventSet"); } static void HackyRemoveWalProposerEvent(Safekeeper *to_remove) { - elog(DEBUG5, "HackyRemoveWalProposerEvent"); + walprop_log(DEBUG5, "HackyRemoveWalProposerEvent"); } #endif @@ -783,7 +783,7 @@ ResetConnection(Safekeeper *sk) * PGconn structure" */ if (!sk->conn) - elog(FATAL, "failed to allocate new PGconn object"); + walprop_log(FATAL, "failed to allocate new PGconn object"); /* * PQconnectStart won't actually start connecting until we run @@ -801,7 +801,7 @@ ResetConnection(Safekeeper *sk) * * https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS */ - elog(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s", + walprop_log(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s", sk->host, sk->port, walprop_error_message(sk->conn)); /* @@ -826,7 +826,7 @@ ResetConnection(Safekeeper *sk) * (see libpqrcv_connect, defined in * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c) */ - elog(LOG, "connecting with node %s:%s", sk->host, sk->port); + walprop_log(LOG, "connecting with node %s:%s", sk->host, sk->port); sk->state = SS_CONNECTING_WRITE; sk->latestMsgReceivedAt = GetCurrentTimestamp(); @@ -900,7 +900,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) * ResetConnection */ case SS_OFFLINE: - elog(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline", + walprop_log(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline", sk->host, sk->port); break; /* actually unreachable, but prevents * -Wimplicit-fallthrough */ @@ -936,7 +936,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) * requests. */ case SS_VOTING: - elog(WARNING, "EOF from node %s:%s in %s state", sk->host, + walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk->state)); ResetConnection(sk); return; @@ -965,7 +965,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) * Idle state for waiting votes from quorum. */ case SS_IDLE: - elog(WARNING, "EOF from node %s:%s in %s state", sk->host, + walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk->state)); ResetConnection(sk); return; @@ -990,7 +990,7 @@ HandleConnectionEvent(Safekeeper *sk) switch (result) { case WP_CONN_POLLING_OK: - elog(LOG, "connected with node %s:%s", sk->host, + walprop_log(LOG, "connected with node %s:%s", sk->host, sk->port); sk->latestMsgReceivedAt = GetCurrentTimestamp(); /* @@ -1013,7 +1013,7 @@ HandleConnectionEvent(Safekeeper *sk) break; case WP_CONN_POLLING_FAILED: - elog(WARNING, "failed to connect to node '%s:%s': %s", + walprop_log(WARNING, "failed to connect to node '%s:%s': %s", sk->host, sk->port, walprop_error_message(sk->conn)); /* @@ -1050,7 +1050,7 @@ SendStartWALPush(Safekeeper *sk) { if (!walprop_send_query(sk->conn, "START_WAL_PUSH")) { - elog(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", + walprop_log(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", sk->host, sk->port, walprop_error_message(sk->conn)); ShutdownConnection(sk); return; @@ -1085,7 +1085,7 @@ RecvStartWALPushResult(Safekeeper *sk) break; case WP_EXEC_FAILED: - elog(WARNING, "Failed to send query to safekeeper %s:%s: %s", + walprop_log(WARNING, "Failed to send query to safekeeper %s:%s: %s", sk->host, sk->port, walprop_error_message(sk->conn)); ShutdownConnection(sk); return; @@ -1096,7 +1096,7 @@ RecvStartWALPushResult(Safekeeper *sk) * wrong" */ case WP_EXEC_UNEXPECTED_SUCCESS: - elog(WARNING, "Received bad response from safekeeper %s:%s query execution", + walprop_log(WARNING, "Received bad response from safekeeper %s:%s query execution", sk->host, sk->port); ShutdownConnection(sk); return; @@ -1143,7 +1143,7 @@ RecvAcceptorGreeting(Safekeeper *sk) if (n_connected == quorum) { propTerm++; - elog(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, quorum, propTerm); + walprop_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, quorum, propTerm); voteRequest = (VoteRequest) { @@ -1156,7 +1156,7 @@ RecvAcceptorGreeting(Safekeeper *sk) else if (sk->greetResponse.term > propTerm) { /* Another compute with higher term is running. */ - elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", + walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", sk->host, sk->port, sk->greetResponse.term, propTerm); } @@ -1196,7 +1196,7 @@ static void SendVoteRequest(Safekeeper *sk) { /* We have quorum for voting, send our vote request */ - elog(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, voteRequest.term); + walprop_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, voteRequest.term); /* On failure, logging & resetting is handled */ if (!BlockingWrite(sk, &voteRequest, sizeof(voteRequest), SS_WAIT_VERDICT)) return; @@ -1211,7 +1211,7 @@ RecvVoteResponse(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->voteResponse)) return; - elog(LOG, + walprop_log(LOG, "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X", sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory), LSN_FORMAT_ARGS(sk->voteResponse.flushLsn), @@ -1226,7 +1226,7 @@ RecvVoteResponse(Safekeeper *sk) if ((!sk->voteResponse.voteGiven) && (sk->voteResponse.term > propTerm || n_votes < quorum)) { - elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", + walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", sk->host, sk->port, sk->voteResponse.term, propTerm); } @@ -1271,18 +1271,24 @@ HandleElectedProposer(void) */ if (truncateLsn < propEpochStartLsn) { - elog(LOG, + walprop_log(LOG, "start recovery because truncateLsn=%X/%X is not " "equal to epochStartLsn=%X/%X", LSN_FORMAT_ARGS(truncateLsn), LSN_FORMAT_ARGS(propEpochStartLsn)); /* Perform recovery */ if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn)) - elog(FATAL, "Failed to recover state"); + walprop_log(FATAL, "Failed to recover state"); } else if (syncSafekeepers) { - // elog(FATAL, "synced %X/%X", LSN_FORMAT_ARGS(propEpochStartLsn)); + #ifdef SIMLIB + char lsn_str[8 + 1 + 8 + 1]; + + snprintf(lsn_str, sizeof(lsn_str), "%X/%X", LSN_FORMAT_ARGS(propEpochStartLsn)); + sim_exit(0, lsn_str); + #endif + /* Sync is not needed: just exit */ fprintf(stdout, "%X/%X\n", LSN_FORMAT_ARGS(propEpochStartLsn)); exit(0); @@ -1385,7 +1391,7 @@ DetermineEpochStartLsn(void) if (timelineStartLsn != InvalidXLogRecPtr && timelineStartLsn != safekeeper[i].voteResponse.timelineStartLsn) { - elog(WARNING, + walprop_log(WARNING, "inconsistent timelineStartLsn: current %X/%X, received %X/%X", LSN_FORMAT_ARGS(timelineStartLsn), LSN_FORMAT_ARGS(safekeeper[i].voteResponse.timelineStartLsn)); @@ -1406,7 +1412,7 @@ DetermineEpochStartLsn(void) { timelineStartLsn = GetRedoStartLsn(); } - elog(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(propEpochStartLsn)); + walprop_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(propEpochStartLsn)); } /* @@ -1433,7 +1439,7 @@ DetermineEpochStartLsn(void) propTermHistory.entries[propTermHistory.n_entries - 1].term = propTerm; propTermHistory.entries[propTermHistory.n_entries - 1].lsn = propEpochStartLsn; - elog(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", + walprop_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", quorum, propTerm, LSN_FORMAT_ARGS(propEpochStartLsn), @@ -1463,7 +1469,7 @@ DetermineEpochStartLsn(void) if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term == walprop_shared->mineLastElectedTerm))) { - elog(PANIC, + walprop_log(PANIC, "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", LSN_FORMAT_ARGS(propEpochStartLsn), LSN_FORMAT_ARGS(GetRedoStartLsn())); @@ -1492,7 +1498,7 @@ WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr startpos, XLogRec err))); return false; } - elog(LOG, + walprop_log(LOG, "start recovery from %s:%s starting from %X/%08X till %X/%08X timeline " "%d", safekeeper[donor].host, safekeeper[donor].port, (uint32) (startpos >> 32), @@ -1626,7 +1632,7 @@ SendProposerElected(Safekeeper *sk) */ sk->startStreamingAt = truncateLsn; - elog(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X", + walprop_log(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X", sk->host, sk->port, LSN_FORMAT_ARGS(propTermHistory.entries[0].lsn), LSN_FORMAT_ARGS(sk->startStreamingAt)); } @@ -1661,7 +1667,7 @@ SendProposerElected(Safekeeper *sk) msg.timelineStartLsn = timelineStartLsn; lastCommonTerm = i >= 0 ? propTermHistory.entries[i].term : 0; - elog(LOG, + walprop_log(LOG, "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X", sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn)); @@ -1691,7 +1697,7 @@ WalProposerStartStreaming(XLogRecPtr startpos) { StartReplicationCmd cmd; - elog(LOG, "WAL proposer starts streaming at %X/%X", + walprop_log(LOG, "WAL proposer starts streaming at %X/%X", LSN_FORMAT_ARGS(startpos)); cmd.slotname = WAL_PROPOSER_SLOT_NAME; cmd.timeline = greetRequest.timeline; @@ -1893,7 +1899,7 @@ SendAppendRequests(Safekeeper *sk) return true; case PG_ASYNC_WRITE_FAIL: - elog(WARNING, "Failed to send to node %s:%s in %s state: %s", + walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), walprop_error_message(sk->conn)); ShutdownConnection(sk); @@ -1942,7 +1948,7 @@ RecvAppendResponses(Safekeeper *sk) if (sk->appendResponse.term > propTerm) { /* Another compute with higher term is running. */ - elog(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "", + walprop_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "", sk->host, sk->port, sk->appendResponse.term, propTerm); } @@ -1988,7 +1994,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->currentClusterSize = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: current_timeline_size %lu", + walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: current_timeline_size %lu", rf->currentClusterSize); } else if (strcmp(key, "ps_writelsn") == 0) @@ -1996,7 +2002,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->ps_writelsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_writelsn %X/%X", + walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: ps_writelsn %X/%X", LSN_FORMAT_ARGS(rf->ps_writelsn)); } else if (strcmp(key, "ps_flushlsn") == 0) @@ -2004,7 +2010,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->ps_flushlsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_flushlsn %X/%X", + walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: ps_flushlsn %X/%X", LSN_FORMAT_ARGS(rf->ps_flushlsn)); } else if (strcmp(key, "ps_applylsn") == 0) @@ -2012,7 +2018,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->ps_applylsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_applylsn %X/%X", + walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: ps_applylsn %X/%X", LSN_FORMAT_ARGS(rf->ps_applylsn)); } else if (strcmp(key, "ps_replytime") == 0) @@ -2025,7 +2031,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * /* Copy because timestamptz_to_str returns a static buffer */ replyTimeStr = pstrdup(timestamptz_to_str(rf->ps_replytime)); - elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_replytime %lu reply_time: %s", + walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: ps_replytime %lu reply_time: %s", rf->ps_replytime, replyTimeStr); pfree(replyTimeStr); @@ -2040,7 +2046,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback * * Skip unknown keys to support backward compatibile protocol * changes */ - elog(LOG, "ParseReplicationFeedbackMessage: unknown key: %s len %d", key, len); + walprop_log(LOG, "ParseReplicationFeedbackMessage: unknown key: %s len %d", key, len); pq_getmsgbytes(reply_message, len); }; } @@ -2191,7 +2197,7 @@ GetLatestNeonFeedback(ReplicationFeedback * rf) rf->ps_applylsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_applylsn; rf->ps_replytime = safekeeper[latest_safekeeper].appendResponse.rf.ps_replytime; - elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu," + walprop_log(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu," " ps_writelsn %X/%X, ps_flushlsn %X/%X, ps_applylsn %X/%X, ps_replytime %lu", rf->currentClusterSize, LSN_FORMAT_ARGS(rf->ps_writelsn), @@ -2311,6 +2317,13 @@ HandleSafekeeperResponse(void) } if (n_synced >= quorum) { + #ifdef SIMLIB + char lsn_str[8 + 1 + 8 + 1]; + + snprintf(lsn_str, sizeof(lsn_str), "%X/%X", LSN_FORMAT_ARGS(propEpochStartLsn)); + sim_exit(0, lsn_str); + #endif + /* All safekeepers synced! */ fprintf(stdout, "%X/%X\n", LSN_FORMAT_ARGS(propEpochStartLsn)); exit(0); @@ -2335,7 +2348,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) return false; case PG_ASYNC_READ_FAIL: - elog(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host, + walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), walprop_error_message(sk->conn)); ShutdownConnection(sk); @@ -2373,7 +2386,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) tag = pq_getmsgint64_le(&s); if (tag != anymsg->tag) { - elog(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, + walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, sk->port, FormatSafekeeperState(sk->state)); ResetConnection(sk); return false; @@ -2448,7 +2461,7 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes if (!walprop_blocking_write(sk->conn, msg, msg_size)) { - elog(WARNING, "Failed to send to node %s:%s in %s state: %s", + walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), walprop_error_message(sk->conn)); ShutdownConnection(sk); @@ -2493,7 +2506,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta UpdateEventSet(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); return false; case PG_ASYNC_WRITE_FAIL: - elog(WARNING, "Failed to send to node %s:%s in %s state: %s", + walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), walprop_error_message(sk->conn)); ShutdownConnection(sk); @@ -2530,7 +2543,7 @@ AsyncFlush(Safekeeper *sk) /* Nothing to do; try again when the socket's ready */ return false; case -1: - elog(WARNING, "Failed to flush write to node %s:%s in %s state: %s", + walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), walprop_error_message(sk->conn)); ResetConnection(sk); @@ -2558,7 +2571,7 @@ backpressure_lag_impl(void) replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr); #define MB ((XLogRecPtr)1024 * 1024) - elog(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X", + walprop_log(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X", LSN_FORMAT_ARGS(myFlushLsn), LSN_FORMAT_ARGS(writePtr), LSN_FORMAT_ARGS(flushPtr), @@ -2606,7 +2619,7 @@ backpressure_throttling_impl(void) /* Suspend writers until replicas catch up */ set_ps_display("backpressure throttling"); - elog(DEBUG2, "backpressure throttling: lag %lu", lag); + walprop_log(DEBUG2, "backpressure throttling: lag %lu", lag); start = GetCurrentTimestamp(); pg_usleep(BACK_PRESSURE_DELAY); stop = GetCurrentTimestamp(); diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index c18612a8ba..ddf3163455 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -10,6 +10,24 @@ #include "utils/uuid.h" #include "replication/walreceiver.h" +#define WALPROPOSER_TAG "[WALPROPOSER] " + +#ifdef SIMLIB +#define walprop_log(tag, fmt, ...) do { \ + ereport((tag > WARNING ? WARNING : tag), \ + (errmsg(fmt, ##__VA_ARGS__), \ + errhidestmt(true), errhidecontext(true), internalerrposition(0))); \ + if (tag > WARNING) \ + sim_exit(tag, "walprop_log error macros"); \ +} while (0) + +#define exit(code) sim_exit(code, "exit()") +#else +#define walprop_log(tag, fmt, ...) ereport(tag, \ + (errmsg(WALPROPOSER_TAG fmt, ##__VA_ARGS__), \ + errhidestmt(true), errhidecontext(true), internalerrposition(0))) +#endif + #define SK_MAGIC 0xCafeCeefu #define SK_PROTOCOL_VERSION 2 @@ -33,6 +51,9 @@ extern int wal_acceptor_reconnect_timeout; extern int wal_acceptor_connection_timeout; extern bool am_wal_proposer; +/* If true, we're exiting. */ +extern bool walproposer_exited; + struct WalProposerConn; /* Defined in libpqwalproposer */ typedef struct WalProposerConn WalProposerConn; diff --git a/pgxn/neon/walproposer_utils.c b/pgxn/neon/walproposer_utils.c index e1dcaa081d..78f5e3fd1c 100644 --- a/pgxn/neon/walproposer_utils.c +++ b/pgxn/neon/walproposer_utils.c @@ -121,11 +121,11 @@ CompareLsn(const void *a, const void *b) * * The strings are intended to be used as a prefix to "state", e.g.: * - * elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state)); + * walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state)); * * If this sort of phrasing doesn't fit the message, instead use something like: * - * elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state)); + * walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state)); */ char * FormatSafekeeperState(SafekeeperState state) @@ -192,10 +192,10 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk) if (!events_ok_for_state) { /* - * To give a descriptive message in the case of failure, we use elog + * To give a descriptive message in the case of failure, we use walprop_log * and then an assertion that's guaranteed to fail. */ - elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", + walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state)); Assert(events_ok_for_state); } @@ -298,7 +298,7 @@ FormatEvents(uint32 events) if (events & (~all_flags)) { - elog(WARNING, "Event formatting found unexpected component %d", + walprop_log(WARNING, "Event formatting found unexpected component %d", events & (~all_flags)); return_str[6] = '*'; return_str[7] = '\0'; @@ -1111,7 +1111,7 @@ XLogSendPhysical(void) WalSndCaughtUp = true; - elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", + walprop_log(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", LSN_FORMAT_ARGS(sendTimeLineValidUpto), LSN_FORMAT_ARGS(sentPtr)); return; diff --git a/safekeeper/src/simlib/node_os.rs b/safekeeper/src/simlib/node_os.rs index 3ed4e61137..20d42b7ffa 100644 --- a/safekeeper/src/simlib/node_os.rs +++ b/safekeeper/src/simlib/node_os.rs @@ -56,7 +56,7 @@ impl NodeOs { } break event; }; - + if let Some(event) = ready_event { // return event if it's ready return Some(event); @@ -68,17 +68,13 @@ impl NodeOs { } // or wait for timeout - + let rand_nonce = self.random(u64::MAX); if timeout > 0 { - self.world - .schedule( - timeout as u64, - SendMessageEvent::new( - epoll.clone(), - NodeEvent::WakeTimeout(rand_nonce), - ), - ); + self.world.schedule( + timeout as u64, + SendMessageEvent::new(epoll.clone(), NodeEvent::WakeTimeout(rand_nonce)), + ); } loop { @@ -108,4 +104,9 @@ impl NodeOs { pub fn random(&self, max: u64) -> u64 { self.internal.rng.lock().gen_range(0..max) } + + /// Set the result for the current node. + pub fn set_result(&self, code: i32, result: String) { + *self.internal.result.lock() = (code, result); + } } diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index 5e1a17ad87..5d105cba97 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -2,6 +2,7 @@ use rand::{rngs::StdRng, Rng, SeedableRng}; use std::{ cell::RefCell, ops::DerefMut, + panic::AssertUnwindSafe, sync::{atomic::AtomicU64, Arc}, }; @@ -45,7 +46,11 @@ pub struct World { } impl World { - pub fn new(seed: u64, network_options: Arc, nodes_init: Option>) -> World { + pub fn new( + seed: u64, + network_options: Arc, + nodes_init: Option>, + ) -> World { World { nodes: Mutex::new(Vec::new()), unconditional_parking: Mutex::new(Vec::new()), @@ -231,6 +236,8 @@ pub struct Node { world: Arc, join_handle: Mutex>>, pub rng: Mutex, + /// Every node can set a result string, which can be read by the test. + pub result: Mutex<(i32, String)>, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -252,6 +259,7 @@ impl Node { world, join_handle: Mutex::new(None), rng: Mutex::new(rng), + result: Mutex::new((-1, String::new())), } } @@ -285,8 +293,17 @@ impl Node { nodes_init(NodeOs::new(world.clone(), node.clone())); } - // TODO: recover from panic (update state, log the error) - f(NodeOs::new(world, node.clone())); + let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + f(NodeOs::new(world, node.clone())); + })); + match res { + Ok(_) => { + println!("Node {} finished successfully", node.id); + } + Err(e) => { + println!("Node {} finished with panic: {:?}", node.id, e); + } + } let mut status = node.status.lock(); *status = NodeStatus::Finished; @@ -350,6 +367,11 @@ impl Node { pub fn is_node_thread() -> bool { CURRENT_NODE.with(|current_node| current_node.borrow().is_some()) } + + pub fn is_finished(&self) -> bool { + let status = self.status.lock(); + *status == NodeStatus::Finished + } } /// Network events and timers.