Test sync safekeepers

This commit is contained in:
Arthur Petukhovsky
2023-06-03 19:11:28 +00:00
parent 909d7fadb8
commit aed14f52d5
19 changed files with 400 additions and 88 deletions

1
Cargo.lock generated
View File

@@ -4649,6 +4649,7 @@ dependencies = [
"crc32c",
"env_logger",
"hex",
"hyper",
"log",
"memoffset 0.8.0",
"once_cell",

View File

@@ -20,6 +20,7 @@ serde.workspace = true
utils.workspace = true
safekeeper.workspace = true
postgres_ffi.workspace = true
hyper.workspace = true
workspace_hack.workspace = true

View File

@@ -4,15 +4,18 @@
* from. If you need to expose a new struct to Rust code, add the
* header here, and whitelist the struct in the build.rs file.
*/
// #include "c.h"
// #include "walproposer.h"
#include "c.h"
#include "walproposer.h"
#include <stdarg.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
// Calc a sum of two numbers. Used to test Rust->C function calls.
int TestFunc(int a, int b);
// Run a client for simple simlib test.
void RunClientC(uint32_t serverId);
void WalProposerRust();

View File

@@ -1,4 +1,5 @@
use std::{env, path::PathBuf};
use std::{env, path::PathBuf, process::Command};
use anyhow::{anyhow, Context};
use bindgen::CargoCallbacks;
extern crate bindgen;
@@ -33,12 +34,15 @@ fn main() -> anyhow::Result<()> {
// disable fPIE
println!("cargo:rustc-link-arg=-no-pie");
if !std::process::Command::new("./build.sh")
// print output of build.sh
let output = std::process::Command::new("./build.sh")
.output()
.expect("could not spawn `clang`")
.status
.success()
{
.expect("could not spawn `clang`");
println!("stdout: {}", String::from_utf8(output.stdout).unwrap());
println!("stderr: {}", String::from_utf8(output.stderr).unwrap());
if !output.status.success() {
// Panic if the command was not successful.
panic!("could not compile object file");
}
@@ -46,49 +50,49 @@ fn main() -> anyhow::Result<()> {
// // Finding the location of C headers for the Postgres server:
// // - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `<project_root>/pg_install`
// // - if there's a `bin/pg_config` file use it for getting include server, otherwise use `<project_root>/pg_install/{PG_MAJORVERSION}/include/postgresql/server`
// let pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") {
// postgres_install_dir.into()
// } else {
// PathBuf::from("pg_install")
// };
let pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") {
postgres_install_dir.into()
} else {
PathBuf::from("pg_install")
};
// let pg_version = "v15";
// let mut pg_install_dir_versioned = pg_install_dir.join(pg_version);
// if pg_install_dir_versioned.is_relative() {
// let cwd = env::current_dir().context("Failed to get current_dir")?;
// pg_install_dir_versioned = cwd.join("..").join("..").join(pg_install_dir_versioned);
// }
let pg_version = "v15";
let mut pg_install_dir_versioned = pg_install_dir.join(pg_version);
if pg_install_dir_versioned.is_relative() {
let cwd = env::current_dir().context("Failed to get current_dir")?;
pg_install_dir_versioned = cwd.join("..").join("..").join(pg_install_dir_versioned);
}
// let pg_config_bin = pg_install_dir_versioned
// .join(pg_version)
// .join("bin")
// .join("pg_config");
// let inc_server_path: String = if pg_config_bin.exists() {
// let output = Command::new(pg_config_bin)
// .arg("--includedir-server")
// .output()
// .context("failed to execute `pg_config --includedir-server`")?;
let pg_config_bin = pg_install_dir_versioned
.join(pg_version)
.join("bin")
.join("pg_config");
let inc_server_path: String = if pg_config_bin.exists() {
let output = Command::new(pg_config_bin)
.arg("--includedir-server")
.output()
.context("failed to execute `pg_config --includedir-server`")?;
// if !output.status.success() {
// panic!("`pg_config --includedir-server` failed")
// }
if !output.status.success() {
panic!("`pg_config --includedir-server` failed")
}
// String::from_utf8(output.stdout)
// .context("pg_config output is not UTF-8")?
// .trim_end()
// .into()
// } else {
// let server_path = pg_install_dir_versioned
// .join("include")
// .join("postgresql")
// .join("server")
// .into_os_string();
// server_path
// .into_string()
// .map_err(|s| anyhow!("Bad postgres server path {s:?}"))?
// };
String::from_utf8(output.stdout)
.context("pg_config output is not UTF-8")?
.trim_end()
.into()
} else {
let server_path = pg_install_dir_versioned
.join("include")
.join("postgresql")
.join("server")
.into_os_string();
server_path
.into_string()
.map_err(|s| anyhow!("Bad postgres server path {s:?}"))?
};
// let inc_pgxn_path = "/Users/arthur/zen/zenith/pgxn/neon";
let inc_pgxn_path = "/home/admin/simulator/pgxn/neon";
// The bindgen::Builder is the main entry point
// to bindgen, and lets you build up options for
@@ -104,8 +108,14 @@ fn main() -> anyhow::Result<()> {
.allowlist_function("RunClientC")
.allowlist_function("WalProposerRust")
.allowlist_function("MyContextInit")
// .clang_arg(format!("-I{inc_server_path}"))
// .clang_arg(format!("-I{inc_pgxn_path}"))
.allowlist_var("wal_acceptors_list")
.allowlist_var("wal_acceptor_reconnect_timeout")
.allowlist_var("wal_acceptor_connection_timeout")
.allowlist_var("am_wal_proposer")
.allowlist_var("neon_timeline_walproposer")
.allowlist_var("neon_tenant_walproposer")
.clang_arg(format!("-I{inc_server_path}"))
.clang_arg(format!("-I{inc_pgxn_path}"))
// Finish the builder and generate the bindings.
.generate()
// Unwrap the Result and panic on failure.

View File

@@ -1,9 +1,11 @@
#!/bin/bash
set -e
cd /home/admin/simulator/libs/walproposer
# TODO: rewrite to Makefile
make -C ../../ neon-pg-ext-walproposer -s
make -C ../.. neon-pg-ext-walproposer
# make -C ../../pg_install/build/v15/src/backend postgres-lib -s
cp ../../pg_install/build/v15/src/backend/libpostgres.a .
cp ../../pg_install/build/v15/src/common/libpgcommon_srv.a .

View File

@@ -1,10 +1,12 @@
#include "postgres.h"
#include "neon.h"
#include "walproposer.h"
#include "rust_bindings.h"
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
struct WalProposerConn
{
int64_t tcp;
};
/* Helper function */
@@ -26,43 +28,52 @@ walprop_error_message(WalProposerConn *conn)
WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
elog(INFO, "not implemented");
elog(INFO, "not implemented: walprop_status");
return WP_CONNECTION_OK;
}
WalProposerConn *
walprop_connect_start(char *conninfo)
{
elog(INFO, "not implemented");
return NULL;
WalProposerConn *conn;
elog(INFO, "walprop_connect_start: %s", conninfo);
const char *connstr_prefix = "host=node port=";
Assert(strncmp(conninfo, connstr_prefix, strlen(connstr_prefix)) == 0);
int nodeId = atoi(conninfo + strlen(connstr_prefix));
conn = palloc(sizeof(WalProposerConn));
conn->tcp = sim_open_tcp(nodeId);
return conn;
}
WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
elog(INFO, "not implemented");
elog(INFO, "not implemented: walprop_connect_poll");
return WP_CONN_POLLING_OK;
}
bool
walprop_send_query(WalProposerConn *conn, char *query)
{
elog(INFO, "not implemented");
return false;
elog(INFO, "not implemented: walprop_send_query");
return true;
}
WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
elog(INFO, "not implemented");
elog(INFO, "not implemented: walprop_get_query_result");
return WP_EXEC_SUCCESS_COPYBOTH;
}
pgsocket
walprop_socket(WalProposerConn *conn)
{
elog(INFO, "not implemented");
return 0;
return (pgsocket) conn->tcp;
}
int
@@ -87,8 +98,15 @@ walprop_finish(WalProposerConn *conn)
PGAsyncReadResult
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
{
elog(INFO, "not implemented");
return PG_ASYNC_READ_FAIL;
uintptr_t len;
char *msg;
msg = sim_msg_get_bytes(&len);
*buf = msg;
*amount = len;
elog(INFO, "walprop_async_read: %d", len);
return PG_ASYNC_READ_SUCCESS;
}
PGAsyncWriteResult
@@ -105,6 +123,8 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
elog(INFO, "not implemented");
return false;
elog(INFO, "not implemented: walprop_blocking_write");
sim_msg_set_bytes(buf, size);
sim_tcp_send(conn->tcp);
return true;
}

View File

@@ -0,0 +1,11 @@
wal_log_hints=off
hot_standby=on
fsync=off
wal_level=replica
restart_after_crash=off
shared_preload_libraries=neon
neon.pageserver_connstring=''
neon.tenant_id=cc6e67313d57283bad411600fbf5c142
neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e
synchronous_standby_names=walproposer
neon.safekeepers='node:1,node:2,node:3'

View File

@@ -19,6 +19,7 @@ typedef uint8_t AnyMessageTag;
* List of all possible NodeEvent.
*/
enum EventTag {
Timeout,
Accept,
Closed,
Message,
@@ -52,7 +53,14 @@ int64_t sim_open_tcp(uint32_t dst);
*/
void sim_tcp_send(int64_t tcp);
struct Event sim_epoll_rcv(void);
struct Event sim_epoll_rcv(int64_t timeout);
int64_t sim_now(void);
/**
* Get tag of the current message.
*/
AnyMessageTag sim_msg_tag(void);
/**
* Read AnyMessage::Just32 message.
@@ -63,3 +71,13 @@ void sim_msg_get_just_u32(uint32_t *val);
* Write AnyMessage::ReplCell message.
*/
void sim_msg_set_repl_cell(uint32_t value, uint32_t client_id, uint32_t seqno);
/**
* Write AnyMessage::Bytes message.
*/
void sim_msg_set_bytes(const uint8_t *bytes, uintptr_t len);
/**
* Read AnyMessage::Bytes message.
*/
const uint8_t *sim_msg_get_bytes(uintptr_t *len);

View File

@@ -1,8 +1,8 @@
use std::{cell::RefCell, collections::HashMap};
use safekeeper::simlib::{network::TCP, node_os::NodeOs, proto::AnyMessage, world::NodeEvent};
use safekeeper::simlib::{network::TCP, node_os::NodeOs, world::NodeEvent};
use crate::sim_proto::{AnyMessageTag, Event, EventTag, MESSAGE_BUF};
use crate::sim_proto::{AnyMessageTag, Event, EventTag, MESSAGE_BUF, anymessage_tag};
thread_local! {
static CURRENT_NODE_OS: RefCell<Option<NodeOs>> = RefCell::new(None);
@@ -69,8 +69,18 @@ pub extern "C" fn sim_tcp_send(tcp: i64) {
}
#[no_mangle]
pub extern "C" fn sim_epoll_rcv() -> Event {
let event = os().epoll().recv();
pub extern "C" fn sim_epoll_rcv(timeout: i64) -> Event {
let event = os().epoll_recv(timeout);
let event = if let Some(event) = event {
event
} else {
return Event {
tag: EventTag::Timeout,
tcp: 0,
any_message: AnyMessageTag::None,
};
};
match event {
NodeEvent::Accept(tcp) => Event {
tag: EventTag::Accept,
@@ -91,14 +101,17 @@ pub extern "C" fn sim_epoll_rcv() -> Event {
Event {
tag: EventTag::Message,
tcp: tcp_save(tcp),
any_message: match message {
AnyMessage::None => AnyMessageTag::None,
AnyMessage::InternalConnect => AnyMessageTag::InternalConnect,
AnyMessage::Just32(_) => AnyMessageTag::Just32,
AnyMessage::ReplCell(_) => AnyMessageTag::ReplCell,
AnyMessage::Bytes(_) => AnyMessageTag::Bytes,
},
any_message: anymessage_tag(&message),
}
}
NodeEvent::WakeTimeout(_) => {
// can't happen
unreachable!()
}
}
}
#[no_mangle]
pub extern "C" fn sim_now() -> i64 {
os().now() as i64
}

View File

@@ -1,10 +1,26 @@
use safekeeper::simlib::proto::{AnyMessage, ReplCell};
use std::cell::RefCell;
pub(crate) fn anymessage_tag(msg: &AnyMessage) -> AnyMessageTag {
match msg {
AnyMessage::None => AnyMessageTag::None,
AnyMessage::InternalConnect => AnyMessageTag::InternalConnect,
AnyMessage::Just32(_) => AnyMessageTag::Just32,
AnyMessage::ReplCell(_) => AnyMessageTag::ReplCell,
AnyMessage::Bytes(_) => AnyMessageTag::Bytes,
}
}
thread_local! {
pub static MESSAGE_BUF: RefCell<AnyMessage> = RefCell::new(AnyMessage::None);
}
#[no_mangle]
/// Get tag of the current message.
pub extern "C" fn sim_msg_tag() -> AnyMessageTag {
MESSAGE_BUF.with(|cell| anymessage_tag(&*cell.borrow()))
}
#[no_mangle]
/// Read AnyMessage::Just32 message.
pub extern "C" fn sim_msg_get_just_u32(val: &mut u32) {
@@ -28,6 +44,34 @@ pub extern "C" fn sim_msg_set_repl_cell(value: u32, client_id: u32, seqno: u32)
});
}
#[no_mangle]
/// Write AnyMessage::Bytes message.
pub extern "C" fn sim_msg_set_bytes(bytes: *const u8, len: usize) {
MESSAGE_BUF.with(|cell| {
// copy bytes to a Rust Vec
let mut v = Vec::with_capacity(len);
unsafe {
v.set_len(len);
std::ptr::copy_nonoverlapping(bytes, v.as_mut_ptr(), len);
}
*cell.borrow_mut() = AnyMessage::Bytes(v.into());
});
}
#[no_mangle]
/// Read AnyMessage::Bytes message.
pub extern "C" fn sim_msg_get_bytes(len: *mut usize) -> *const u8 {
MESSAGE_BUF.with(|cell| match &*cell.borrow() {
AnyMessage::Bytes(v) => {
unsafe {
*len = v.len();
v.as_ptr()
}
}
_ => panic!("expected Bytes message"),
})
}
#[repr(C)]
/// Event returned by epoll_recv.
pub struct Event {
@@ -39,6 +83,7 @@ pub struct Event {
#[repr(u8)]
/// List of all possible NodeEvent.
pub enum EventTag {
Timeout,
Accept,
Closed,
Message,

View File

@@ -2,12 +2,13 @@
//! Gets messages from the network, passes them down to consensus module and
//! sends replies back.
use std::collections::HashMap;
use std::{collections::HashMap, path::PathBuf, time::Duration};
use bytes::BytesMut;
use hyper::Uri;
use log::info;
use safekeeper::{simlib::{node_os::NodeOs, network::TCP, world::NodeEvent, proto::AnyMessage}, safekeeper::{ProposerAcceptorMessage, ServerInfo, SafeKeeperState, UNKNOWN_SERVER_VERSION, SafeKeeper}, timeline::{TimelineError}, SafeKeeperConf};
use utils::{id::TenantTimelineId, lsn::Lsn};
use utils::{id::{TenantTimelineId, NodeId}, lsn::Lsn};
use anyhow::{Result, bail};
use crate::simtest::storage::{InMemoryState, DummyWalStore};
@@ -28,6 +29,21 @@ struct SharedState {
pub fn run_server(os: NodeOs) -> Result<()> {
println!("started server {}", os.id());
let conf = SafeKeeperConf {
workdir: PathBuf::from("."),
my_id: NodeId(os.id() as u64),
listen_pg_addr: String::new(),
listen_http_addr: String::new(),
no_sync: false,
broker_endpoint: "/".parse::<Uri>().unwrap(),
broker_keepalive_interval: Duration::from_secs(0),
heartbeat_timeout: Duration::from_secs(0),
remote_storage: None,
max_offloader_lag_bytes: 0,
backup_runtime_threads: None,
wal_backup_enabled: false,
auth: None,
};
let mut conns: HashMap<i64, ConnState> = HashMap::new();
@@ -46,7 +62,7 @@ pub fn run_server(os: NodeOs) -> Result<()> {
NodeEvent::Accept(tcp) => {
conns.insert(tcp.id(), ConnState {
tcp,
conf: SafeKeeperConf::dummy(),
conf: conf.clone(),
greeting: false,
ttid: TenantTimelineId::empty(),
tli: None,
@@ -66,6 +82,7 @@ pub fn run_server(os: NodeOs) -> Result<()> {
}
}
NodeEvent::Closed(_) => {}
NodeEvent::WakeTimeout(_) => {}
}
// TODO: make simulator support multiple events per tick
@@ -86,6 +103,7 @@ impl ConnState {
fn process_any(&mut self, any: AnyMessage) -> Result<()> {
if let AnyMessage::Bytes(copy_data) = any {
let msg = ProposerAcceptorMessage::parse(copy_data)?;
println!("got msg: {:?}", msg);
return self.process(msg);
} else {
bail!("unexpected message, expected AnyMessage::Bytes");
@@ -157,8 +175,6 @@ impl ConnState {
);
}
}
return Ok(());
}
match msg {

View File

@@ -1,11 +1,14 @@
use std::sync::Arc;
use std::{sync::Arc, ffi::CString};
use safekeeper::simlib::{network::{Delay, NetworkOptions}, world::World};
use utils::{id::TenantTimelineId, logging};
use crate::{simtest::safekeeper::run_server, c_context};
use crate::{simtest::safekeeper::run_server, c_context, bindings::{WalProposerRust, wal_acceptors_list, wal_acceptor_reconnect_timeout, wal_acceptor_connection_timeout, neon_tenant_walproposer, neon_timeline_walproposer}};
#[test]
fn run_walproposer_safekeeper_test() {
logging::init(logging::LogFormat::Plain).unwrap();
let delay = Delay {
min: 1,
max: 5,
@@ -26,10 +29,24 @@ fn run_walproposer_safekeeper_test() {
let client_node = world.new_node();
let servers = [world.new_node(), world.new_node(), world.new_node()];
// let server_ids = [servers[0].id, servers[1].id, servers[2].id];
let server_ids = [servers[0].id, servers[1].id, servers[2].id];
let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(",");
println!("server ids: {:?}", safekeepers_guc);
let ttid = TenantTimelineId::generate();
// start the client thread
client_node.launch(move |_| {
let list = CString::new(safekeepers_guc).unwrap();
unsafe {
wal_acceptors_list = list.into_raw();
wal_acceptor_reconnect_timeout = 1000;
wal_acceptor_connection_timeout = 5000;
neon_tenant_walproposer = CString::new(ttid.tenant_id.to_string()).unwrap().into_raw();
neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string()).unwrap().into_raw();
WalProposerRust();
}
// TODO: run sync-safekeepers
});
@@ -44,7 +61,7 @@ fn run_walproposer_safekeeper_test() {
}
world.await_all();
let time_limit = 1_000_000;
let time_limit = 1_000_0;
while world.step() && world.now() < time_limit {}

View File

@@ -2,8 +2,10 @@
#include "rust_bindings.h"
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include "postgres.h"
#include "utils/memutils.h"
#include "utils/guc.h"
// From src/backend/main/main.c
const char *progname = "fakepostgres";
@@ -30,7 +32,7 @@ void RunClientC(uint32_t serverId) {
sim_msg_set_repl_cell(delivered+1, clientId, delivered);
sim_tcp_send(tcp);
Event event = sim_epoll_rcv();
Event event = sim_epoll_rcv(-1);
switch (event.tag)
{
case Closed:
@@ -63,6 +65,19 @@ void MyContextInit() {
if (!initializedMemoryContext) {
initializedMemoryContext = true;
MemoryContextInit();
setenv("PGDATA", "/home/admin/simulator/libs/walproposer/pgdata", 1);
/*
* Set default values for command-line options.
*/
InitializeGUCOptions();
/* Acquire configuration parameters */
if (!SelectConfigFiles(NULL, progname))
exit(1);
log_min_messages = DEBUG5;
}
pthread_mutex_unlock(&lock);
}

View File

@@ -15,14 +15,11 @@ OBJS = \
PG_CPPFLAGS = -I$(libpq_srcdir)
PG_LIBS = $(libpq)
PG_LIBS_INTERNAL = $(libpq)
SHLIB_LINK_INTERNAL = $(libpq)
EXTENSION = neon
DATA = neon--1.0.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
# PROGRAM = boop
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

1
pgxn/neon/rust_bindings.h Symbolic link
View File

@@ -0,0 +1 @@
../../libs/walproposer/rust_bindings.h

View File

@@ -83,6 +83,12 @@ char *neon_tenant_walproposer = NULL;
char *neon_safekeeper_token_walproposer = NULL;
#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot"
#define SIMLIB
#ifdef SIMLIB
#include "rust_bindings.h"
#define GetCurrentTimestamp() ((TimestampTz) sim_now())
#endif
static int n_safekeepers = 0;
static int quorum = 0;
@@ -319,6 +325,7 @@ nwp_shmem_startup_hook(void)
void WalProposerRust()
{
elog(LOG, "WalProposerRust");
WalProposerSync(0, NULL);
}
/*
@@ -382,6 +389,36 @@ WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos)
BroadcastAppendRequest();
}
#ifdef SIMLIB
int
SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events)
{
Event event = sim_epoll_rcv(timeout);
if (event.tag == Closed) {
// TODO: shutdown connection?
// elog(LOG, "connection closed");
// ShutdownConnection(sk);
return 0;
} else if (event.tag == Message) {
Assert(event.any_message == Bytes);
for (int i = 0; i < n_safekeepers; i++) {
if (safekeeper[i].conn && ((int64_t) walprop_socket(safekeeper[i].conn)) == event.tcp) {
*occurred_events = (WaitEvent) {
.events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
};
*sk = &safekeeper[i];
return 1;
}
}
elog(FATAL, "unknown tcp connection");
} else if (event.tag == Timeout) {
return 0;
} else {
Assert(false);
}
}
#endif
/*
* Advance the WAL proposer state machine, waiting each time for events to occur.
* Will exit only when latch is set, i.e. new WAL should be pushed from walsender
@@ -397,16 +434,25 @@ WalProposerPoll(void)
WaitEvent event;
TimestampTz now = GetCurrentTimestamp();
#ifndef SIMLIB
rc = WaitEventSetWait(waitEvents, TimeToReconnect(now),
&event, 1, WAIT_EVENT_WAL_SENDER_MAIN);
sk = (Safekeeper *) event.user_data;
#else
rc = SimWaitEventSetWait(&sk, TimeToReconnect(now), &event);
#endif
/*
* If the event contains something that one of our safekeeper states
* was waiting for, we'll advance its state.
*/
if (rc != 0 && (event.events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)))
{
AdvancePollState(sk, event.events);
#ifdef SIMLIB
// TODO: assert that code consumed incoming message
#endif
}
/*
* If the timeout expired, attempt to reconnect to any safekeepers
@@ -421,7 +467,9 @@ WalProposerPoll(void)
*/
if (rc != 0 && (event.events & WL_LATCH_SET))
{
#ifndef SIMLIB
ResetLatch(MyLatch);
#endif
break;
}
@@ -491,9 +539,11 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
char *sep;
char *port;
#ifndef SIMLIB
load_file("libpqwalreceiver", false);
if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
#endif
for (host = wal_acceptors_list; host != NULL && *host != '\0'; host = sep)
{
@@ -597,6 +647,8 @@ WalProposerLoop(void)
WalProposerPoll();
}
#ifndef SIMLIB
/* Initializes the internal event set, provided that it is currently null */
static void
InitEventSet(void)
@@ -668,6 +720,26 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove)
}
}
#else
static void
InitEventSet(void)
{
elog(DEBUG5, "InitEventSet");
}
static void
UpdateEventSet(Safekeeper *sk, uint32 events)
{
elog(DEBUG5, "UpdateEventSet");
}
static void
HackyRemoveWalProposerEvent(Safekeeper *to_remove)
{
elog(DEBUG5, "HackyRemoveWalProposerEvent");
}
#endif
/* Shuts down and cleans up the connection for a safekeeper. Sets its state to SS_OFFLINE */
static void
ShutdownConnection(Safekeeper *sk)
@@ -760,8 +832,13 @@ ResetConnection(Safekeeper *sk)
sk->state = SS_CONNECTING_WRITE;
sk->latestMsgReceivedAt = GetCurrentTimestamp();
#ifndef SIMLIB
sock = walprop_socket(sk->conn);
sk->eventPos = AddWaitEventToSet(waitEvents, WL_SOCKET_WRITEABLE, sock, NULL, sk);
#else
HandleConnectionEvent(sk);
RecvStartWALPushResult(sk);
#endif
return;
}
@@ -950,12 +1027,14 @@ HandleConnectionEvent(Safekeeper *sk)
return;
}
#ifndef SIMLIB
/*
* Because PQconnectPoll can change the socket, we have to un-register the
* old event and re-register an event on the new socket.
*/
HackyRemoveWalProposerEvent(sk);
sk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_socket(sk->conn), NULL, sk);
#endif
/* If we successfully connected, send START_WAL_PUSH query */
if (result == WP_CONN_POLLING_OK)

View File

@@ -10,6 +10,8 @@
#include "utils/uuid.h"
#include "replication/walreceiver.h"
#define SIMLIB
#define SK_MAGIC 0xCafeCeefu
#define SK_PROTOCOL_VERSION 2
@@ -374,8 +376,11 @@ typedef struct Safekeeper
XLogRecPtr streamingAt; /* current streaming position */
AppendRequestHeader appendRequest; /* request for sending to safekeeper */
#ifndef SIMLIB
int eventPos; /* position in wait event set. Equal to -1 if*
* no event */
#endif
SafekeeperState state; /* safekeeper state machine state */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
AcceptorGreeting greetResponse; /* acceptor greeting */

View File

@@ -26,6 +26,10 @@ impl NodeOs {
self.internal.id
}
pub fn now(&self) -> u64 {
self.world.now()
}
/// Returns a writable pipe. All incoming messages should be polled
/// with [`network_epoll`]. Always successful.
pub fn open_tcp(&self, dst: NodeId) -> TCP {
@@ -37,6 +41,59 @@ impl NodeOs {
self.internal.network_chan()
}
/// Returns next event from the epoll channel with timeout.
/// Returns `None` if timeout is reached.
/// -1 wait forever.
/// 0 - poll, return immediately.
/// >0 - wait for timeout milliseconds.
pub fn epoll_recv(&self, timeout: i64) -> Option<NodeEvent> {
let epoll = self.epoll();
let ready_event = loop {
let event = epoll.try_recv();
if let Some(NodeEvent::WakeTimeout(_)) = event {
continue;
}
break event;
};
if let Some(event) = ready_event {
// return event if it's ready
return Some(event);
}
if timeout == 0 {
// poll, return immediately
return None;
}
// or wait for timeout
let rand_nonce = self.random(u64::MAX);
if timeout > 0 {
self.world
.schedule(
timeout as u64,
SendMessageEvent::new(
epoll.clone(),
NodeEvent::WakeTimeout(rand_nonce),
),
);
}
loop {
match epoll.recv() {
NodeEvent::WakeTimeout(nonce) if nonce == rand_nonce => {
return None;
}
NodeEvent::WakeTimeout(_) => {}
event => {
return Some(event);
}
}
}
}
/// Sleep for a given number of milliseconds.
/// Currently matches the global virtual time, TODO may be good to
/// introduce a separate clocks for each node.

View File

@@ -358,5 +358,6 @@ pub enum NodeEvent {
Accept(TCP),
Closed(TCP),
Message((AnyMessage, TCP)),
WakeTimeout(u64),
// TODO: close?
}