Return LSN from sync safekeepers

This commit is contained in:
Arthur Petukhovsky
2023-07-24 21:15:35 +00:00
parent 296a0cbac2
commit d87e822169
11 changed files with 303 additions and 154 deletions

1
Cargo.lock generated
View File

@@ -4650,6 +4650,7 @@ dependencies = [
"env_logger",
"hex",
"hyper",
"libc",
"log",
"memoffset 0.8.0",
"once_cell",

View File

@@ -14,6 +14,7 @@ crc32c.workspace = true
hex.workspace = true
once_cell.workspace = true
log.workspace = true
libc.workspace = true
memoffset.workspace = true
thiserror.workspace = true
serde.workspace = true

View File

@@ -13,7 +13,7 @@ struct WalProposerConn
static bool
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
{
elog(INFO, "not implemented");
walprop_log(INFO, "not implemented");
return false;
}
@@ -21,14 +21,14 @@ ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
char *
walprop_error_message(WalProposerConn *conn)
{
elog(INFO, "not implemented");
walprop_log(INFO, "not implemented");
return NULL;
}
WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
elog(INFO, "not implemented: walprop_status");
walprop_log(INFO, "not implemented: walprop_status");
return WP_CONNECTION_OK;
}
@@ -37,7 +37,7 @@ walprop_connect_start(char *conninfo)
{
WalProposerConn *conn;
elog(INFO, "walprop_connect_start: %s", conninfo);
walprop_log(INFO, "walprop_connect_start: %s", conninfo);
const char *connstr_prefix = "host=node port=";
Assert(strncmp(conninfo, connstr_prefix, strlen(connstr_prefix)) == 0);
@@ -52,21 +52,21 @@ walprop_connect_start(char *conninfo)
WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
elog(INFO, "not implemented: walprop_connect_poll");
walprop_log(INFO, "not implemented: walprop_connect_poll");
return WP_CONN_POLLING_OK;
}
bool
walprop_send_query(WalProposerConn *conn, char *query)
{
elog(INFO, "not implemented: walprop_send_query");
walprop_log(INFO, "not implemented: walprop_send_query");
return true;
}
WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
elog(INFO, "not implemented: walprop_get_query_result");
walprop_log(INFO, "not implemented: walprop_get_query_result");
return WP_EXEC_SUCCESS_COPYBOTH;
}
@@ -79,14 +79,14 @@ walprop_socket(WalProposerConn *conn)
int
walprop_flush(WalProposerConn *conn)
{
elog(INFO, "not implemented");
walprop_log(INFO, "not implemented");
return 0;
}
void
walprop_finish(WalProposerConn *conn)
{
elog(INFO, "not implemented");
walprop_log(INFO, "not implemented");
}
/*
@@ -104,7 +104,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
msg = sim_msg_get_bytes(&len);
*buf = msg;
*amount = len;
elog(INFO, "walprop_async_read: %d", len);
walprop_log(INFO, "walprop_async_read: %d", len);
return PG_ASYNC_READ_SUCCESS;
}
@@ -112,7 +112,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
PGAsyncWriteResult
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
{
elog(INFO, "not implemented");
walprop_log(INFO, "not implemented");
return PG_ASYNC_WRITE_FAIL;
}
@@ -123,7 +123,7 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
elog(INFO, "not implemented: walprop_blocking_write");
walprop_log(INFO, "not implemented: walprop_blocking_write");
sim_msg_set_bytes(buf, size);
sim_tcp_send(conn->tcp);
return true;

View File

@@ -57,6 +57,8 @@ struct Event sim_epoll_rcv(int64_t timeout);
int64_t sim_now(void);
void sim_exit(int32_t code, const uint8_t *msg);
/**
* Get tag of the current message.
*/

View File

@@ -1,8 +1,11 @@
use std::{cell::RefCell, collections::HashMap};
use safekeeper::simlib::{network::TCP, node_os::NodeOs, world::NodeEvent};
use std::{
cell::RefCell,
collections::HashMap,
ffi::{CStr, CString},
};
use crate::sim_proto::{AnyMessageTag, Event, EventTag, MESSAGE_BUF, anymessage_tag};
use crate::sim_proto::{anymessage_tag, AnyMessageTag, Event, EventTag, MESSAGE_BUF};
thread_local! {
static CURRENT_NODE_OS: RefCell<Option<NodeOs>> = RefCell::new(None);
@@ -115,3 +118,20 @@ pub extern "C" fn sim_epoll_rcv(timeout: i64) -> Event {
pub extern "C" fn sim_now() -> i64 {
os().now() as i64
}
#[no_mangle]
pub extern "C" fn sim_exit(code: i32, msg: *const u8) {
let msg = unsafe { CStr::from_ptr(msg as *const i8) };
let msg = msg.to_string_lossy().into_owned();
println!("sim_exit({}, {:?})", code, msg);
os().set_result(code, msg);
// I tried to make use of pthread_exit, but it doesn't work.
// https://github.com/rust-lang/unsafe-code-guidelines/issues/211
// unsafe { libc::pthread_exit(std::ptr::null_mut()) };
// https://doc.rust-lang.org/nomicon/unwinding.html
// Everyone on the internet saying this is UB, but it works for me,
// so I'm going to use it for now.
panic!("sim_exit() called from C code")
}

View File

@@ -1,69 +1,137 @@
use std::{sync::Arc, ffi::CString};
use std::{ffi::CString, str::FromStr, sync::Arc};
use safekeeper::simlib::{network::{Delay, NetworkOptions}, world::World};
use utils::{id::TenantTimelineId, logging};
use safekeeper::simlib::{
network::{Delay, NetworkOptions},
world::Node,
world::World,
};
use utils::{id::TenantTimelineId, logging, lsn::Lsn};
use crate::{simtest::safekeeper::run_server, c_context, bindings::{WalProposerRust, wal_acceptors_list, wal_acceptor_reconnect_timeout, wal_acceptor_connection_timeout, neon_tenant_walproposer, neon_timeline_walproposer}};
use crate::{
bindings::{
neon_tenant_walproposer, neon_timeline_walproposer, wal_acceptor_connection_timeout,
wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust,
},
c_context,
simtest::safekeeper::run_server,
};
#[test]
fn run_walproposer_safekeeper_test() {
logging::init(logging::LogFormat::Plain).unwrap();
struct TestConfig {
network: Arc<NetworkOptions>,
timeout: u64,
}
let delay = Delay {
min: 1,
max: 5,
fail_prob: 0.0,
};
let network = NetworkOptions {
timeout: Some(1000),
connect_delay: delay.clone(),
send_delay: delay.clone(),
};
let seed = 1337;
let network = Arc::new(network);
let world = Arc::new(World::new(seed, network, c_context()));
world.register_world();
let client_node = world.new_node();
let servers = [world.new_node(), world.new_node(), world.new_node()];
let server_ids = [servers[0].id, servers[1].id, servers[2].id];
let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(",");
println!("server ids: {:?}", safekeepers_guc);
let ttid = TenantTimelineId::generate();
// start the client thread
client_node.launch(move |_| {
let list = CString::new(safekeepers_guc).unwrap();
unsafe {
wal_acceptors_list = list.into_raw();
wal_acceptor_reconnect_timeout = 1000;
wal_acceptor_connection_timeout = 5000;
neon_tenant_walproposer = CString::new(ttid.tenant_id.to_string()).unwrap().into_raw();
neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string()).unwrap().into_raw();
WalProposerRust();
impl TestConfig {
fn new() -> Self {
Self {
network: Arc::new(NetworkOptions {
timeout: Some(1000),
connect_delay: Delay {
min: 1,
max: 5,
fail_prob: 0.0,
},
send_delay: Delay {
min: 1,
max: 5,
fail_prob: 0.0,
},
}),
timeout: 1_000 * 10,
}
// TODO: run sync-safekeepers
});
// start server threads
for ptr in servers.iter() {
let server = ptr.clone();
let id = server.id;
server.launch(move |os| {
let res = run_server(os);
println!("server {} finished: {:?}", id, res);
});
}
world.await_all();
let time_limit = 1_000_0;
fn start(&self, seed: u64) -> Test {
let world = Arc::new(World::new(seed, self.network.clone(), c_context()));
world.register_world();
while world.step() && world.now() < time_limit {}
let servers = [world.new_node(), world.new_node(), world.new_node()];
let server_ids = [servers[0].id, servers[1].id, servers[2].id];
let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(",");
let ttid = TenantTimelineId::generate();
// TODO: verify sync_safekeepers LSN
// start the server threads
for ptr in servers.iter() {
let server = ptr.clone();
let id = server.id;
server.launch(move |os| {
let res = run_server(os);
println!("server {} finished: {:?}", id, res);
});
}
// wait init for all servers
world.await_all();
Test {
world,
servers,
server_ids,
safekeepers_guc,
ttid,
timeout: self.timeout,
}
}
}
struct Test {
world: Arc<World>,
servers: [Arc<Node>; 3],
server_ids: [u32; 3],
safekeepers_guc: String,
ttid: TenantTimelineId,
timeout: u64,
}
impl Test {
fn sync_safekeepers(&self) -> anyhow::Result<Lsn> {
let client_node = self.world.new_node();
// start the client thread
let guc = self.safekeepers_guc.clone();
let ttid = self.ttid.clone();
client_node.launch(move |_| {
let list = CString::new(guc).unwrap();
unsafe {
wal_acceptors_list = list.into_raw();
wal_acceptor_reconnect_timeout = 1000;
wal_acceptor_connection_timeout = 5000;
neon_tenant_walproposer =
CString::new(ttid.tenant_id.to_string()).unwrap().into_raw();
neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string())
.unwrap()
.into_raw();
WalProposerRust();
}
});
self.world.await_all();
// poll until exit or timeout
let time_limit = self.timeout;
while self.world.step() && self.world.now() < time_limit && !client_node.is_finished() {}
if !client_node.is_finished() {
anyhow::bail!("timeout or idle stuck");
}
let res = client_node.result.lock().clone();
if res.0 != 0 {
anyhow::bail!("non-zero exitcode: {:?}", res);
}
let lsn = Lsn::from_str(&res.1)?;
Ok(lsn)
}
}
#[test]
fn sync_empty_safekeepers() {
logging::init(logging::LogFormat::Plain).unwrap();
let config = TestConfig::new();
let test = config.start(1337);
let lsn = test.sync_safekeepers().unwrap();
assert_eq!(lsn, Lsn(0));
}

View File

@@ -323,7 +323,7 @@ nwp_shmem_startup_hook(void)
void WalProposerRust()
{
elog(LOG, "WalProposerRust");
walprop_log(LOG, "WalProposerRust");
WalProposerSync(0, NULL);
}
@@ -395,7 +395,7 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events)
Event event = sim_epoll_rcv(timeout);
if (event.tag == Closed) {
// TODO: shutdown connection?
// elog(LOG, "connection closed");
// walprop_log(LOG, "connection closed");
// ShutdownConnection(sk);
return 0;
} else if (event.tag == Message) {
@@ -409,7 +409,7 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events)
return 1;
}
}
elog(FATAL, "unknown tcp connection");
walprop_log(FATAL, "unknown tcp connection");
} else if (event.tag == Timeout) {
return 0;
} else {
@@ -497,7 +497,7 @@ WalProposerPoll(void)
if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now,
wal_acceptor_connection_timeout))
{
elog(WARNING, "failed to connect to node '%s:%s' in '%s' state: exceeded connection timeout %dms",
walprop_log(WARNING, "failed to connect to node '%s:%s' in '%s' state: exceeded connection timeout %dms",
sk->host, sk->port, FormatSafekeeperState(sk->state), wal_acceptor_connection_timeout);
ShutdownConnection(sk);
}
@@ -541,7 +541,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
#ifndef SIMLIB
load_file("libpqwalreceiver", false);
if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
walprop_log(ERROR, "libpqwalreceiver didn't initialize correctly");
#endif
for (host = wal_acceptors_list; host != NULL && *host != '\0'; host = sep)
@@ -549,7 +549,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
port = strchr(host, ':');
if (port == NULL)
{
elog(FATAL, "port is not specified");
walprop_log(FATAL, "port is not specified");
}
*port++ = '\0';
sep = strchr(port, ',');
@@ -557,7 +557,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
*sep++ = '\0';
if (n_safekeepers + 1 >= MAX_SAFEKEEPERS)
{
elog(FATAL, "Too many safekeepers");
walprop_log(FATAL, "Too many safekeepers");
}
safekeeper[n_safekeepers].host = host;
safekeeper[n_safekeepers].port = port;
@@ -580,13 +580,13 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
}
if (written > MAXCONNINFO || written < 0)
elog(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
walprop_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
}
initStringInfo(&safekeeper[n_safekeepers].outbuf);
safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
if (safekeeper[n_safekeepers].xlogreader == NULL)
elog(FATAL, "Failed to allocate xlog reader");
walprop_log(FATAL, "Failed to allocate xlog reader");
safekeeper[n_safekeepers].flushWrite = false;
safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr;
@@ -594,7 +594,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
}
if (n_safekeepers < 1)
{
elog(FATAL, "Safekeepers addresses are not specified");
walprop_log(FATAL, "Safekeepers addresses are not specified");
}
quorum = n_safekeepers / 2 + 1;
@@ -605,15 +605,15 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
pg_strong_random(&greetRequest.proposerId, sizeof(greetRequest.proposerId));
greetRequest.systemId = systemId;
if (!neon_timeline_walproposer)
elog(FATAL, "neon.timeline_id is not provided");
walprop_log(FATAL, "neon.timeline_id is not provided");
if (*neon_timeline_walproposer != '\0' &&
!HexDecodeString(greetRequest.timeline_id, neon_timeline_walproposer, 16))
elog(FATAL, "Could not parse neon.timeline_id, %s", neon_timeline_walproposer);
walprop_log(FATAL, "Could not parse neon.timeline_id, %s", neon_timeline_walproposer);
if (!neon_tenant_walproposer)
elog(FATAL, "neon.tenant_id is not provided");
walprop_log(FATAL, "neon.tenant_id is not provided");
if (*neon_tenant_walproposer != '\0' &&
!HexDecodeString(greetRequest.tenant_id, neon_tenant_walproposer, 16))
elog(FATAL, "Could not parse neon.tenant_id, %s", neon_tenant_walproposer);
walprop_log(FATAL, "Could not parse neon.tenant_id, %s", neon_tenant_walproposer);
#if PG_VERSION_NUM >= 150000
/* FIXME don't use hardcoded timeline id */
@@ -653,7 +653,7 @@ static void
InitEventSet(void)
{
if (waitEvents)
elog(FATAL, "double-initialization of event set");
walprop_log(FATAL, "double-initialization of event set");
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + n_safekeepers);
AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET,
@@ -723,19 +723,19 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove)
static void
InitEventSet(void)
{
elog(DEBUG5, "InitEventSet");
walprop_log(DEBUG5, "InitEventSet");
}
static void
UpdateEventSet(Safekeeper *sk, uint32 events)
{
elog(DEBUG5, "UpdateEventSet");
walprop_log(DEBUG5, "UpdateEventSet");
}
static void
HackyRemoveWalProposerEvent(Safekeeper *to_remove)
{
elog(DEBUG5, "HackyRemoveWalProposerEvent");
walprop_log(DEBUG5, "HackyRemoveWalProposerEvent");
}
#endif
@@ -783,7 +783,7 @@ ResetConnection(Safekeeper *sk)
* PGconn structure"
*/
if (!sk->conn)
elog(FATAL, "failed to allocate new PGconn object");
walprop_log(FATAL, "failed to allocate new PGconn object");
/*
* PQconnectStart won't actually start connecting until we run
@@ -801,7 +801,7 @@ ResetConnection(Safekeeper *sk)
*
* https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS
*/
elog(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s",
walprop_log(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s",
sk->host, sk->port, walprop_error_message(sk->conn));
/*
@@ -826,7 +826,7 @@ ResetConnection(Safekeeper *sk)
* (see libpqrcv_connect, defined in
* src/backend/replication/libpqwalreceiver/libpqwalreceiver.c)
*/
elog(LOG, "connecting with node %s:%s", sk->host, sk->port);
walprop_log(LOG, "connecting with node %s:%s", sk->host, sk->port);
sk->state = SS_CONNECTING_WRITE;
sk->latestMsgReceivedAt = GetCurrentTimestamp();
@@ -900,7 +900,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
* ResetConnection
*/
case SS_OFFLINE:
elog(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline",
walprop_log(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline",
sk->host, sk->port);
break; /* actually unreachable, but prevents
* -Wimplicit-fallthrough */
@@ -936,7 +936,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
* requests.
*/
case SS_VOTING:
elog(WARNING, "EOF from node %s:%s in %s state", sk->host,
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk->state));
ResetConnection(sk);
return;
@@ -965,7 +965,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
* Idle state for waiting votes from quorum.
*/
case SS_IDLE:
elog(WARNING, "EOF from node %s:%s in %s state", sk->host,
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk->state));
ResetConnection(sk);
return;
@@ -990,7 +990,7 @@ HandleConnectionEvent(Safekeeper *sk)
switch (result)
{
case WP_CONN_POLLING_OK:
elog(LOG, "connected with node %s:%s", sk->host,
walprop_log(LOG, "connected with node %s:%s", sk->host,
sk->port);
sk->latestMsgReceivedAt = GetCurrentTimestamp();
/*
@@ -1013,7 +1013,7 @@ HandleConnectionEvent(Safekeeper *sk)
break;
case WP_CONN_POLLING_FAILED:
elog(WARNING, "failed to connect to node '%s:%s': %s",
walprop_log(WARNING, "failed to connect to node '%s:%s': %s",
sk->host, sk->port, walprop_error_message(sk->conn));
/*
@@ -1050,7 +1050,7 @@ SendStartWALPush(Safekeeper *sk)
{
if (!walprop_send_query(sk->conn, "START_WAL_PUSH"))
{
elog(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
walprop_log(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
sk->host, sk->port, walprop_error_message(sk->conn));
ShutdownConnection(sk);
return;
@@ -1085,7 +1085,7 @@ RecvStartWALPushResult(Safekeeper *sk)
break;
case WP_EXEC_FAILED:
elog(WARNING, "Failed to send query to safekeeper %s:%s: %s",
walprop_log(WARNING, "Failed to send query to safekeeper %s:%s: %s",
sk->host, sk->port, walprop_error_message(sk->conn));
ShutdownConnection(sk);
return;
@@ -1096,7 +1096,7 @@ RecvStartWALPushResult(Safekeeper *sk)
* wrong"
*/
case WP_EXEC_UNEXPECTED_SUCCESS:
elog(WARNING, "Received bad response from safekeeper %s:%s query execution",
walprop_log(WARNING, "Received bad response from safekeeper %s:%s query execution",
sk->host, sk->port);
ShutdownConnection(sk);
return;
@@ -1143,7 +1143,7 @@ RecvAcceptorGreeting(Safekeeper *sk)
if (n_connected == quorum)
{
propTerm++;
elog(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, quorum, propTerm);
walprop_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, quorum, propTerm);
voteRequest = (VoteRequest)
{
@@ -1156,7 +1156,7 @@ RecvAcceptorGreeting(Safekeeper *sk)
else if (sk->greetResponse.term > propTerm)
{
/* Another compute with higher term is running. */
elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->greetResponse.term, propTerm);
}
@@ -1196,7 +1196,7 @@ static void
SendVoteRequest(Safekeeper *sk)
{
/* We have quorum for voting, send our vote request */
elog(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, voteRequest.term);
walprop_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, voteRequest.term);
/* On failure, logging & resetting is handled */
if (!BlockingWrite(sk, &voteRequest, sizeof(voteRequest), SS_WAIT_VERDICT))
return;
@@ -1211,7 +1211,7 @@ RecvVoteResponse(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->voteResponse))
return;
elog(LOG,
walprop_log(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
@@ -1226,7 +1226,7 @@ RecvVoteResponse(Safekeeper *sk)
if ((!sk->voteResponse.voteGiven) &&
(sk->voteResponse.term > propTerm || n_votes < quorum))
{
elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "",
sk->host, sk->port,
sk->voteResponse.term, propTerm);
}
@@ -1271,18 +1271,24 @@ HandleElectedProposer(void)
*/
if (truncateLsn < propEpochStartLsn)
{
elog(LOG,
walprop_log(LOG,
"start recovery because truncateLsn=%X/%X is not "
"equal to epochStartLsn=%X/%X",
LSN_FORMAT_ARGS(truncateLsn),
LSN_FORMAT_ARGS(propEpochStartLsn));
/* Perform recovery */
if (!WalProposerRecovery(donor, greetRequest.timeline, truncateLsn, propEpochStartLsn))
elog(FATAL, "Failed to recover state");
walprop_log(FATAL, "Failed to recover state");
}
else if (syncSafekeepers)
{
// elog(FATAL, "synced %X/%X", LSN_FORMAT_ARGS(propEpochStartLsn));
#ifdef SIMLIB
char lsn_str[8 + 1 + 8 + 1];
snprintf(lsn_str, sizeof(lsn_str), "%X/%X", LSN_FORMAT_ARGS(propEpochStartLsn));
sim_exit(0, lsn_str);
#endif
/* Sync is not needed: just exit */
fprintf(stdout, "%X/%X\n", LSN_FORMAT_ARGS(propEpochStartLsn));
exit(0);
@@ -1385,7 +1391,7 @@ DetermineEpochStartLsn(void)
if (timelineStartLsn != InvalidXLogRecPtr &&
timelineStartLsn != safekeeper[i].voteResponse.timelineStartLsn)
{
elog(WARNING,
walprop_log(WARNING,
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
LSN_FORMAT_ARGS(timelineStartLsn),
LSN_FORMAT_ARGS(safekeeper[i].voteResponse.timelineStartLsn));
@@ -1406,7 +1412,7 @@ DetermineEpochStartLsn(void)
{
timelineStartLsn = GetRedoStartLsn();
}
elog(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(propEpochStartLsn));
walprop_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(propEpochStartLsn));
}
/*
@@ -1433,7 +1439,7 @@ DetermineEpochStartLsn(void)
propTermHistory.entries[propTermHistory.n_entries - 1].term = propTerm;
propTermHistory.entries[propTermHistory.n_entries - 1].lsn = propEpochStartLsn;
elog(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
walprop_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
quorum,
propTerm,
LSN_FORMAT_ARGS(propEpochStartLsn),
@@ -1463,7 +1469,7 @@ DetermineEpochStartLsn(void)
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
walprop_shared->mineLastElectedTerm)))
{
elog(PANIC,
walprop_log(PANIC,
"collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X",
LSN_FORMAT_ARGS(propEpochStartLsn),
LSN_FORMAT_ARGS(GetRedoStartLsn()));
@@ -1492,7 +1498,7 @@ WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr startpos, XLogRec
err)));
return false;
}
elog(LOG,
walprop_log(LOG,
"start recovery from %s:%s starting from %X/%08X till %X/%08X timeline "
"%d",
safekeeper[donor].host, safekeeper[donor].port, (uint32) (startpos >> 32),
@@ -1626,7 +1632,7 @@ SendProposerElected(Safekeeper *sk)
*/
sk->startStreamingAt = truncateLsn;
elog(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X",
walprop_log(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X",
sk->host, sk->port, LSN_FORMAT_ARGS(propTermHistory.entries[0].lsn),
LSN_FORMAT_ARGS(sk->startStreamingAt));
}
@@ -1661,7 +1667,7 @@ SendProposerElected(Safekeeper *sk)
msg.timelineStartLsn = timelineStartLsn;
lastCommonTerm = i >= 0 ? propTermHistory.entries[i].term : 0;
elog(LOG,
walprop_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
@@ -1691,7 +1697,7 @@ WalProposerStartStreaming(XLogRecPtr startpos)
{
StartReplicationCmd cmd;
elog(LOG, "WAL proposer starts streaming at %X/%X",
walprop_log(LOG, "WAL proposer starts streaming at %X/%X",
LSN_FORMAT_ARGS(startpos));
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
cmd.timeline = greetRequest.timeline;
@@ -1893,7 +1899,7 @@ SendAppendRequests(Safekeeper *sk)
return true;
case PG_ASYNC_WRITE_FAIL:
elog(WARNING, "Failed to send to node %s:%s in %s state: %s",
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
ShutdownConnection(sk);
@@ -1942,7 +1948,7 @@ RecvAppendResponses(Safekeeper *sk)
if (sk->appendResponse.term > propTerm)
{
/* Another compute with higher term is running. */
elog(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "",
walprop_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "",
sk->host, sk->port,
sk->appendResponse.term, propTerm);
}
@@ -1988,7 +1994,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->currentClusterSize = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: current_timeline_size %lu",
walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: current_timeline_size %lu",
rf->currentClusterSize);
}
else if (strcmp(key, "ps_writelsn") == 0)
@@ -1996,7 +2002,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->ps_writelsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_writelsn %X/%X",
walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: ps_writelsn %X/%X",
LSN_FORMAT_ARGS(rf->ps_writelsn));
}
else if (strcmp(key, "ps_flushlsn") == 0)
@@ -2004,7 +2010,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->ps_flushlsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_flushlsn %X/%X",
walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: ps_flushlsn %X/%X",
LSN_FORMAT_ARGS(rf->ps_flushlsn));
}
else if (strcmp(key, "ps_applylsn") == 0)
@@ -2012,7 +2018,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->ps_applylsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_applylsn %X/%X",
walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: ps_applylsn %X/%X",
LSN_FORMAT_ARGS(rf->ps_applylsn));
}
else if (strcmp(key, "ps_replytime") == 0)
@@ -2025,7 +2031,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *
/* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(rf->ps_replytime));
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_replytime %lu reply_time: %s",
walprop_log(DEBUG2, "ParseReplicationFeedbackMessage: ps_replytime %lu reply_time: %s",
rf->ps_replytime, replyTimeStr);
pfree(replyTimeStr);
@@ -2040,7 +2046,7 @@ ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *
* Skip unknown keys to support backward compatibile protocol
* changes
*/
elog(LOG, "ParseReplicationFeedbackMessage: unknown key: %s len %d", key, len);
walprop_log(LOG, "ParseReplicationFeedbackMessage: unknown key: %s len %d", key, len);
pq_getmsgbytes(reply_message, len);
};
}
@@ -2191,7 +2197,7 @@ GetLatestNeonFeedback(ReplicationFeedback * rf)
rf->ps_applylsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_applylsn;
rf->ps_replytime = safekeeper[latest_safekeeper].appendResponse.rf.ps_replytime;
elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu,"
walprop_log(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu,"
" ps_writelsn %X/%X, ps_flushlsn %X/%X, ps_applylsn %X/%X, ps_replytime %lu",
rf->currentClusterSize,
LSN_FORMAT_ARGS(rf->ps_writelsn),
@@ -2311,6 +2317,13 @@ HandleSafekeeperResponse(void)
}
if (n_synced >= quorum)
{
#ifdef SIMLIB
char lsn_str[8 + 1 + 8 + 1];
snprintf(lsn_str, sizeof(lsn_str), "%X/%X", LSN_FORMAT_ARGS(propEpochStartLsn));
sim_exit(0, lsn_str);
#endif
/* All safekeepers synced! */
fprintf(stdout, "%X/%X\n", LSN_FORMAT_ARGS(propEpochStartLsn));
exit(0);
@@ -2335,7 +2348,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
return false;
case PG_ASYNC_READ_FAIL:
elog(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host,
walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host,
sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
ShutdownConnection(sk);
@@ -2373,7 +2386,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg)
tag = pq_getmsgint64_le(&s);
if (tag != anymsg->tag)
{
elog(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk->state));
ResetConnection(sk);
return false;
@@ -2448,7 +2461,7 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes
if (!walprop_blocking_write(sk->conn, msg, msg_size))
{
elog(WARNING, "Failed to send to node %s:%s in %s state: %s",
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
ShutdownConnection(sk);
@@ -2493,7 +2506,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta
UpdateEventSet(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
return false;
case PG_ASYNC_WRITE_FAIL:
elog(WARNING, "Failed to send to node %s:%s in %s state: %s",
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
ShutdownConnection(sk);
@@ -2530,7 +2543,7 @@ AsyncFlush(Safekeeper *sk)
/* Nothing to do; try again when the socket's ready */
return false;
case -1:
elog(WARNING, "Failed to flush write to node %s:%s in %s state: %s",
walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
ResetConnection(sk);
@@ -2558,7 +2571,7 @@ backpressure_lag_impl(void)
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
#define MB ((XLogRecPtr)1024 * 1024)
elog(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X",
walprop_log(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X",
LSN_FORMAT_ARGS(myFlushLsn),
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),
@@ -2606,7 +2619,7 @@ backpressure_throttling_impl(void)
/* Suspend writers until replicas catch up */
set_ps_display("backpressure throttling");
elog(DEBUG2, "backpressure throttling: lag %lu", lag);
walprop_log(DEBUG2, "backpressure throttling: lag %lu", lag);
start = GetCurrentTimestamp();
pg_usleep(BACK_PRESSURE_DELAY);
stop = GetCurrentTimestamp();

View File

@@ -10,6 +10,24 @@
#include "utils/uuid.h"
#include "replication/walreceiver.h"
#define WALPROPOSER_TAG "[WALPROPOSER] "
#ifdef SIMLIB
#define walprop_log(tag, fmt, ...) do { \
ereport((tag > WARNING ? WARNING : tag), \
(errmsg(fmt, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), internalerrposition(0))); \
if (tag > WARNING) \
sim_exit(tag, "walprop_log error macros"); \
} while (0)
#define exit(code) sim_exit(code, "exit()")
#else
#define walprop_log(tag, fmt, ...) ereport(tag, \
(errmsg(WALPROPOSER_TAG fmt, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), internalerrposition(0)))
#endif
#define SK_MAGIC 0xCafeCeefu
#define SK_PROTOCOL_VERSION 2
@@ -33,6 +51,9 @@ extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern bool am_wal_proposer;
/* If true, we're exiting. */
extern bool walproposer_exited;
struct WalProposerConn; /* Defined in libpqwalproposer */
typedef struct WalProposerConn WalProposerConn;

View File

@@ -121,11 +121,11 @@ CompareLsn(const void *a, const void *b)
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
* walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
* walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
*/
char *
FormatSafekeeperState(SafekeeperState state)
@@ -192,10 +192,10 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk)
if (!events_ok_for_state)
{
/*
* To give a descriptive message in the case of failure, we use elog
* To give a descriptive message in the case of failure, we use walprop_log
* and then an assertion that's guaranteed to fail.
*/
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
Assert(events_ok_for_state);
}
@@ -298,7 +298,7 @@ FormatEvents(uint32 events)
if (events & (~all_flags))
{
elog(WARNING, "Event formatting found unexpected component %d",
walprop_log(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
@@ -1111,7 +1111,7 @@ XLogSendPhysical(void)
WalSndCaughtUp = true;
elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
walprop_log(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
LSN_FORMAT_ARGS(sendTimeLineValidUpto),
LSN_FORMAT_ARGS(sentPtr));
return;

View File

@@ -56,7 +56,7 @@ impl NodeOs {
}
break event;
};
if let Some(event) = ready_event {
// return event if it's ready
return Some(event);
@@ -68,17 +68,13 @@ impl NodeOs {
}
// or wait for timeout
let rand_nonce = self.random(u64::MAX);
if timeout > 0 {
self.world
.schedule(
timeout as u64,
SendMessageEvent::new(
epoll.clone(),
NodeEvent::WakeTimeout(rand_nonce),
),
);
self.world.schedule(
timeout as u64,
SendMessageEvent::new(epoll.clone(), NodeEvent::WakeTimeout(rand_nonce)),
);
}
loop {
@@ -108,4 +104,9 @@ impl NodeOs {
pub fn random(&self, max: u64) -> u64 {
self.internal.rng.lock().gen_range(0..max)
}
/// Set the result for the current node.
pub fn set_result(&self, code: i32, result: String) {
*self.internal.result.lock() = (code, result);
}
}

View File

@@ -2,6 +2,7 @@ use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{
cell::RefCell,
ops::DerefMut,
panic::AssertUnwindSafe,
sync::{atomic::AtomicU64, Arc},
};
@@ -45,7 +46,11 @@ pub struct World {
}
impl World {
pub fn new(seed: u64, network_options: Arc<NetworkOptions>, nodes_init: Option<Box<dyn Fn(NodeOs) + Send + Sync>>) -> World {
pub fn new(
seed: u64,
network_options: Arc<NetworkOptions>,
nodes_init: Option<Box<dyn Fn(NodeOs) + Send + Sync>>,
) -> World {
World {
nodes: Mutex::new(Vec::new()),
unconditional_parking: Mutex::new(Vec::new()),
@@ -231,6 +236,8 @@ pub struct Node {
world: Arc<World>,
join_handle: Mutex<Option<std::thread::JoinHandle<()>>>,
pub rng: Mutex<StdRng>,
/// Every node can set a result string, which can be read by the test.
pub result: Mutex<(i32, String)>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -252,6 +259,7 @@ impl Node {
world,
join_handle: Mutex::new(None),
rng: Mutex::new(rng),
result: Mutex::new((-1, String::new())),
}
}
@@ -285,8 +293,17 @@ impl Node {
nodes_init(NodeOs::new(world.clone(), node.clone()));
}
// TODO: recover from panic (update state, log the error)
f(NodeOs::new(world, node.clone()));
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
f(NodeOs::new(world, node.clone()));
}));
match res {
Ok(_) => {
println!("Node {} finished successfully", node.id);
}
Err(e) => {
println!("Node {} finished with panic: {:?}", node.id, e);
}
}
let mut status = node.status.lock();
*status = NodeStatus::Finished;
@@ -350,6 +367,11 @@ impl Node {
pub fn is_node_thread() -> bool {
CURRENT_NODE.with(|current_node| current_node.borrow().is_some())
}
pub fn is_finished(&self) -> bool {
let status = self.status.lock();
*status == NodeStatus::Finished
}
}
/// Network events and timers.