diff --git a/Cargo.lock b/Cargo.lock index 3bf73ee964..67a5533220 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4649,6 +4649,7 @@ dependencies = [ "crc32c", "env_logger", "hex", + "hyper", "log", "memoffset 0.8.0", "once_cell", diff --git a/libs/walproposer/Cargo.toml b/libs/walproposer/Cargo.toml index 9d675bf339..688932e85d 100644 --- a/libs/walproposer/Cargo.toml +++ b/libs/walproposer/Cargo.toml @@ -20,6 +20,7 @@ serde.workspace = true utils.workspace = true safekeeper.workspace = true postgres_ffi.workspace = true +hyper.workspace = true workspace_hack.workspace = true diff --git a/libs/walproposer/bindgen_deps.h b/libs/walproposer/bindgen_deps.h index 2023d4f8d0..ab9365c8e8 100644 --- a/libs/walproposer/bindgen_deps.h +++ b/libs/walproposer/bindgen_deps.h @@ -4,15 +4,18 @@ * from. If you need to expose a new struct to Rust code, add the * header here, and whitelist the struct in the build.rs file. */ -// #include "c.h" -// #include "walproposer.h" +#include "c.h" +#include "walproposer.h" #include #include #include #include +// Calc a sum of two numbers. Used to test Rust->C function calls. int TestFunc(int a, int b); + +// Run a client for simple simlib test. void RunClientC(uint32_t serverId); void WalProposerRust(); diff --git a/libs/walproposer/build.rs b/libs/walproposer/build.rs index 36189659a5..627cbd3bed 100644 --- a/libs/walproposer/build.rs +++ b/libs/walproposer/build.rs @@ -1,4 +1,5 @@ -use std::{env, path::PathBuf}; +use std::{env, path::PathBuf, process::Command}; +use anyhow::{anyhow, Context}; use bindgen::CargoCallbacks; extern crate bindgen; @@ -33,12 +34,15 @@ fn main() -> anyhow::Result<()> { // disable fPIE println!("cargo:rustc-link-arg=-no-pie"); - if !std::process::Command::new("./build.sh") + // print output of build.sh + let output = std::process::Command::new("./build.sh") .output() - .expect("could not spawn `clang`") - .status - .success() - { + .expect("could not spawn `clang`"); + + println!("stdout: {}", String::from_utf8(output.stdout).unwrap()); + println!("stderr: {}", String::from_utf8(output.stderr).unwrap()); + + if !output.status.success() { // Panic if the command was not successful. panic!("could not compile object file"); } @@ -46,49 +50,49 @@ fn main() -> anyhow::Result<()> { // // Finding the location of C headers for the Postgres server: // // - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `/pg_install` // // - if there's a `bin/pg_config` file use it for getting include server, otherwise use `/pg_install/{PG_MAJORVERSION}/include/postgresql/server` - // let pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") { - // postgres_install_dir.into() - // } else { - // PathBuf::from("pg_install") - // }; + let pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") { + postgres_install_dir.into() + } else { + PathBuf::from("pg_install") + }; - // let pg_version = "v15"; - // let mut pg_install_dir_versioned = pg_install_dir.join(pg_version); - // if pg_install_dir_versioned.is_relative() { - // let cwd = env::current_dir().context("Failed to get current_dir")?; - // pg_install_dir_versioned = cwd.join("..").join("..").join(pg_install_dir_versioned); - // } + let pg_version = "v15"; + let mut pg_install_dir_versioned = pg_install_dir.join(pg_version); + if pg_install_dir_versioned.is_relative() { + let cwd = env::current_dir().context("Failed to get current_dir")?; + pg_install_dir_versioned = cwd.join("..").join("..").join(pg_install_dir_versioned); + } - // let pg_config_bin = pg_install_dir_versioned - // .join(pg_version) - // .join("bin") - // .join("pg_config"); - // let inc_server_path: String = if pg_config_bin.exists() { - // let output = Command::new(pg_config_bin) - // .arg("--includedir-server") - // .output() - // .context("failed to execute `pg_config --includedir-server`")?; + let pg_config_bin = pg_install_dir_versioned + .join(pg_version) + .join("bin") + .join("pg_config"); + let inc_server_path: String = if pg_config_bin.exists() { + let output = Command::new(pg_config_bin) + .arg("--includedir-server") + .output() + .context("failed to execute `pg_config --includedir-server`")?; - // if !output.status.success() { - // panic!("`pg_config --includedir-server` failed") - // } + if !output.status.success() { + panic!("`pg_config --includedir-server` failed") + } - // String::from_utf8(output.stdout) - // .context("pg_config output is not UTF-8")? - // .trim_end() - // .into() - // } else { - // let server_path = pg_install_dir_versioned - // .join("include") - // .join("postgresql") - // .join("server") - // .into_os_string(); - // server_path - // .into_string() - // .map_err(|s| anyhow!("Bad postgres server path {s:?}"))? - // }; + String::from_utf8(output.stdout) + .context("pg_config output is not UTF-8")? + .trim_end() + .into() + } else { + let server_path = pg_install_dir_versioned + .join("include") + .join("postgresql") + .join("server") + .into_os_string(); + server_path + .into_string() + .map_err(|s| anyhow!("Bad postgres server path {s:?}"))? + }; - // let inc_pgxn_path = "/Users/arthur/zen/zenith/pgxn/neon"; + let inc_pgxn_path = "/home/admin/simulator/pgxn/neon"; // The bindgen::Builder is the main entry point // to bindgen, and lets you build up options for @@ -104,8 +108,14 @@ fn main() -> anyhow::Result<()> { .allowlist_function("RunClientC") .allowlist_function("WalProposerRust") .allowlist_function("MyContextInit") - // .clang_arg(format!("-I{inc_server_path}")) - // .clang_arg(format!("-I{inc_pgxn_path}")) + .allowlist_var("wal_acceptors_list") + .allowlist_var("wal_acceptor_reconnect_timeout") + .allowlist_var("wal_acceptor_connection_timeout") + .allowlist_var("am_wal_proposer") + .allowlist_var("neon_timeline_walproposer") + .allowlist_var("neon_tenant_walproposer") + .clang_arg(format!("-I{inc_server_path}")) + .clang_arg(format!("-I{inc_pgxn_path}")) // Finish the builder and generate the bindings. .generate() // Unwrap the Result and panic on failure. diff --git a/libs/walproposer/build.sh b/libs/walproposer/build.sh index f440faa337..08b2f29583 100755 --- a/libs/walproposer/build.sh +++ b/libs/walproposer/build.sh @@ -1,9 +1,11 @@ #!/bin/bash set -e +cd /home/admin/simulator/libs/walproposer + # TODO: rewrite to Makefile -make -C ../../ neon-pg-ext-walproposer -s +make -C ../.. neon-pg-ext-walproposer # make -C ../../pg_install/build/v15/src/backend postgres-lib -s cp ../../pg_install/build/v15/src/backend/libpostgres.a . cp ../../pg_install/build/v15/src/common/libpgcommon_srv.a . diff --git a/libs/walproposer/libpqwalproposer.c b/libs/walproposer/libpqwalproposer.c index 4c0f0f49fa..0c61c53573 100644 --- a/libs/walproposer/libpqwalproposer.c +++ b/libs/walproposer/libpqwalproposer.c @@ -1,10 +1,12 @@ #include "postgres.h" #include "neon.h" #include "walproposer.h" +#include "rust_bindings.h" /* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */ struct WalProposerConn { + int64_t tcp; }; /* Helper function */ @@ -26,43 +28,52 @@ walprop_error_message(WalProposerConn *conn) WalProposerConnStatusType walprop_status(WalProposerConn *conn) { - elog(INFO, "not implemented"); + elog(INFO, "not implemented: walprop_status"); return WP_CONNECTION_OK; } WalProposerConn * walprop_connect_start(char *conninfo) { - elog(INFO, "not implemented"); - return NULL; + WalProposerConn *conn; + + elog(INFO, "walprop_connect_start: %s", conninfo); + + const char *connstr_prefix = "host=node port="; + Assert(strncmp(conninfo, connstr_prefix, strlen(connstr_prefix)) == 0); + + int nodeId = atoi(conninfo + strlen(connstr_prefix)); + + conn = palloc(sizeof(WalProposerConn)); + conn->tcp = sim_open_tcp(nodeId); + return conn; } WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn) { - elog(INFO, "not implemented"); + elog(INFO, "not implemented: walprop_connect_poll"); return WP_CONN_POLLING_OK; } bool walprop_send_query(WalProposerConn *conn, char *query) { - elog(INFO, "not implemented"); - return false; + elog(INFO, "not implemented: walprop_send_query"); + return true; } WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn) { - elog(INFO, "not implemented"); + elog(INFO, "not implemented: walprop_get_query_result"); return WP_EXEC_SUCCESS_COPYBOTH; } pgsocket walprop_socket(WalProposerConn *conn) { - elog(INFO, "not implemented"); - return 0; + return (pgsocket) conn->tcp; } int @@ -87,8 +98,15 @@ walprop_finish(WalProposerConn *conn) PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount) { - elog(INFO, "not implemented"); - return PG_ASYNC_READ_FAIL; + uintptr_t len; + char *msg; + + msg = sim_msg_get_bytes(&len); + *buf = msg; + *amount = len; + elog(INFO, "walprop_async_read: %d", len); + + return PG_ASYNC_READ_SUCCESS; } PGAsyncWriteResult @@ -105,6 +123,8 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) { - elog(INFO, "not implemented"); - return false; + elog(INFO, "not implemented: walprop_blocking_write"); + sim_msg_set_bytes(buf, size); + sim_tcp_send(conn->tcp); + return true; } diff --git a/libs/walproposer/pgdata/postgresql.conf b/libs/walproposer/pgdata/postgresql.conf new file mode 100644 index 0000000000..36a62d5982 --- /dev/null +++ b/libs/walproposer/pgdata/postgresql.conf @@ -0,0 +1,11 @@ +wal_log_hints=off +hot_standby=on +fsync=off +wal_level=replica +restart_after_crash=off +shared_preload_libraries=neon +neon.pageserver_connstring='' +neon.tenant_id=cc6e67313d57283bad411600fbf5c142 +neon.timeline_id=de6fa815c1e45aa61491c3d34c4eb33e +synchronous_standby_names=walproposer +neon.safekeepers='node:1,node:2,node:3' diff --git a/libs/walproposer/rust_bindings.h b/libs/walproposer/rust_bindings.h index e258bdebe7..d1014ffbae 100644 --- a/libs/walproposer/rust_bindings.h +++ b/libs/walproposer/rust_bindings.h @@ -19,6 +19,7 @@ typedef uint8_t AnyMessageTag; * List of all possible NodeEvent. */ enum EventTag { + Timeout, Accept, Closed, Message, @@ -52,7 +53,14 @@ int64_t sim_open_tcp(uint32_t dst); */ void sim_tcp_send(int64_t tcp); -struct Event sim_epoll_rcv(void); +struct Event sim_epoll_rcv(int64_t timeout); + +int64_t sim_now(void); + +/** + * Get tag of the current message. + */ +AnyMessageTag sim_msg_tag(void); /** * Read AnyMessage::Just32 message. @@ -63,3 +71,13 @@ void sim_msg_get_just_u32(uint32_t *val); * Write AnyMessage::ReplCell message. */ void sim_msg_set_repl_cell(uint32_t value, uint32_t client_id, uint32_t seqno); + +/** + * Write AnyMessage::Bytes message. + */ +void sim_msg_set_bytes(const uint8_t *bytes, uintptr_t len); + +/** + * Read AnyMessage::Bytes message. + */ +const uint8_t *sim_msg_get_bytes(uintptr_t *len); diff --git a/libs/walproposer/src/sim.rs b/libs/walproposer/src/sim.rs index 1cbf5e6474..5ef28b3e7d 100644 --- a/libs/walproposer/src/sim.rs +++ b/libs/walproposer/src/sim.rs @@ -1,8 +1,8 @@ use std::{cell::RefCell, collections::HashMap}; -use safekeeper::simlib::{network::TCP, node_os::NodeOs, proto::AnyMessage, world::NodeEvent}; +use safekeeper::simlib::{network::TCP, node_os::NodeOs, world::NodeEvent}; -use crate::sim_proto::{AnyMessageTag, Event, EventTag, MESSAGE_BUF}; +use crate::sim_proto::{AnyMessageTag, Event, EventTag, MESSAGE_BUF, anymessage_tag}; thread_local! { static CURRENT_NODE_OS: RefCell> = RefCell::new(None); @@ -69,8 +69,18 @@ pub extern "C" fn sim_tcp_send(tcp: i64) { } #[no_mangle] -pub extern "C" fn sim_epoll_rcv() -> Event { - let event = os().epoll().recv(); +pub extern "C" fn sim_epoll_rcv(timeout: i64) -> Event { + let event = os().epoll_recv(timeout); + let event = if let Some(event) = event { + event + } else { + return Event { + tag: EventTag::Timeout, + tcp: 0, + any_message: AnyMessageTag::None, + }; + }; + match event { NodeEvent::Accept(tcp) => Event { tag: EventTag::Accept, @@ -91,14 +101,17 @@ pub extern "C" fn sim_epoll_rcv() -> Event { Event { tag: EventTag::Message, tcp: tcp_save(tcp), - any_message: match message { - AnyMessage::None => AnyMessageTag::None, - AnyMessage::InternalConnect => AnyMessageTag::InternalConnect, - AnyMessage::Just32(_) => AnyMessageTag::Just32, - AnyMessage::ReplCell(_) => AnyMessageTag::ReplCell, - AnyMessage::Bytes(_) => AnyMessageTag::Bytes, - }, + any_message: anymessage_tag(&message), } } + NodeEvent::WakeTimeout(_) => { + // can't happen + unreachable!() + } } } + +#[no_mangle] +pub extern "C" fn sim_now() -> i64 { + os().now() as i64 +} diff --git a/libs/walproposer/src/sim_proto.rs b/libs/walproposer/src/sim_proto.rs index dcd9b43b1d..9fccfd9b3b 100644 --- a/libs/walproposer/src/sim_proto.rs +++ b/libs/walproposer/src/sim_proto.rs @@ -1,10 +1,26 @@ use safekeeper::simlib::proto::{AnyMessage, ReplCell}; use std::cell::RefCell; +pub(crate) fn anymessage_tag(msg: &AnyMessage) -> AnyMessageTag { + match msg { + AnyMessage::None => AnyMessageTag::None, + AnyMessage::InternalConnect => AnyMessageTag::InternalConnect, + AnyMessage::Just32(_) => AnyMessageTag::Just32, + AnyMessage::ReplCell(_) => AnyMessageTag::ReplCell, + AnyMessage::Bytes(_) => AnyMessageTag::Bytes, + } +} + thread_local! { pub static MESSAGE_BUF: RefCell = RefCell::new(AnyMessage::None); } +#[no_mangle] +/// Get tag of the current message. +pub extern "C" fn sim_msg_tag() -> AnyMessageTag { + MESSAGE_BUF.with(|cell| anymessage_tag(&*cell.borrow())) +} + #[no_mangle] /// Read AnyMessage::Just32 message. pub extern "C" fn sim_msg_get_just_u32(val: &mut u32) { @@ -28,6 +44,34 @@ pub extern "C" fn sim_msg_set_repl_cell(value: u32, client_id: u32, seqno: u32) }); } +#[no_mangle] +/// Write AnyMessage::Bytes message. +pub extern "C" fn sim_msg_set_bytes(bytes: *const u8, len: usize) { + MESSAGE_BUF.with(|cell| { + // copy bytes to a Rust Vec + let mut v = Vec::with_capacity(len); + unsafe { + v.set_len(len); + std::ptr::copy_nonoverlapping(bytes, v.as_mut_ptr(), len); + } + *cell.borrow_mut() = AnyMessage::Bytes(v.into()); + }); +} + +#[no_mangle] +/// Read AnyMessage::Bytes message. +pub extern "C" fn sim_msg_get_bytes(len: *mut usize) -> *const u8 { + MESSAGE_BUF.with(|cell| match &*cell.borrow() { + AnyMessage::Bytes(v) => { + unsafe { + *len = v.len(); + v.as_ptr() + } + } + _ => panic!("expected Bytes message"), + }) +} + #[repr(C)] /// Event returned by epoll_recv. pub struct Event { @@ -39,6 +83,7 @@ pub struct Event { #[repr(u8)] /// List of all possible NodeEvent. pub enum EventTag { + Timeout, Accept, Closed, Message, diff --git a/libs/walproposer/src/simtest/safekeeper.rs b/libs/walproposer/src/simtest/safekeeper.rs index ac095248ea..54a11a61fd 100644 --- a/libs/walproposer/src/simtest/safekeeper.rs +++ b/libs/walproposer/src/simtest/safekeeper.rs @@ -2,12 +2,13 @@ //! Gets messages from the network, passes them down to consensus module and //! sends replies back. -use std::collections::HashMap; +use std::{collections::HashMap, path::PathBuf, time::Duration}; use bytes::BytesMut; +use hyper::Uri; use log::info; use safekeeper::{simlib::{node_os::NodeOs, network::TCP, world::NodeEvent, proto::AnyMessage}, safekeeper::{ProposerAcceptorMessage, ServerInfo, SafeKeeperState, UNKNOWN_SERVER_VERSION, SafeKeeper}, timeline::{TimelineError}, SafeKeeperConf}; -use utils::{id::TenantTimelineId, lsn::Lsn}; +use utils::{id::{TenantTimelineId, NodeId}, lsn::Lsn}; use anyhow::{Result, bail}; use crate::simtest::storage::{InMemoryState, DummyWalStore}; @@ -28,6 +29,21 @@ struct SharedState { pub fn run_server(os: NodeOs) -> Result<()> { println!("started server {}", os.id()); + let conf = SafeKeeperConf { + workdir: PathBuf::from("."), + my_id: NodeId(os.id() as u64), + listen_pg_addr: String::new(), + listen_http_addr: String::new(), + no_sync: false, + broker_endpoint: "/".parse::().unwrap(), + broker_keepalive_interval: Duration::from_secs(0), + heartbeat_timeout: Duration::from_secs(0), + remote_storage: None, + max_offloader_lag_bytes: 0, + backup_runtime_threads: None, + wal_backup_enabled: false, + auth: None, + }; let mut conns: HashMap = HashMap::new(); @@ -46,7 +62,7 @@ pub fn run_server(os: NodeOs) -> Result<()> { NodeEvent::Accept(tcp) => { conns.insert(tcp.id(), ConnState { tcp, - conf: SafeKeeperConf::dummy(), + conf: conf.clone(), greeting: false, ttid: TenantTimelineId::empty(), tli: None, @@ -66,6 +82,7 @@ pub fn run_server(os: NodeOs) -> Result<()> { } } NodeEvent::Closed(_) => {} + NodeEvent::WakeTimeout(_) => {} } // TODO: make simulator support multiple events per tick @@ -86,6 +103,7 @@ impl ConnState { fn process_any(&mut self, any: AnyMessage) -> Result<()> { if let AnyMessage::Bytes(copy_data) = any { let msg = ProposerAcceptorMessage::parse(copy_data)?; + println!("got msg: {:?}", msg); return self.process(msg); } else { bail!("unexpected message, expected AnyMessage::Bytes"); @@ -157,8 +175,6 @@ impl ConnState { ); } } - - return Ok(()); } match msg { diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index e25c8de141..5506493997 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -1,11 +1,14 @@ -use std::sync::Arc; +use std::{sync::Arc, ffi::CString}; use safekeeper::simlib::{network::{Delay, NetworkOptions}, world::World}; +use utils::{id::TenantTimelineId, logging}; -use crate::{simtest::safekeeper::run_server, c_context}; +use crate::{simtest::safekeeper::run_server, c_context, bindings::{WalProposerRust, wal_acceptors_list, wal_acceptor_reconnect_timeout, wal_acceptor_connection_timeout, neon_tenant_walproposer, neon_timeline_walproposer}}; #[test] fn run_walproposer_safekeeper_test() { + logging::init(logging::LogFormat::Plain).unwrap(); + let delay = Delay { min: 1, max: 5, @@ -26,10 +29,24 @@ fn run_walproposer_safekeeper_test() { let client_node = world.new_node(); let servers = [world.new_node(), world.new_node(), world.new_node()]; - // let server_ids = [servers[0].id, servers[1].id, servers[2].id]; + let server_ids = [servers[0].id, servers[1].id, servers[2].id]; + let safekeepers_guc = server_ids.map(|id| format!("node:{}", id)).join(","); + + println!("server ids: {:?}", safekeepers_guc); + let ttid = TenantTimelineId::generate(); // start the client thread client_node.launch(move |_| { + let list = CString::new(safekeepers_guc).unwrap(); + + unsafe { + wal_acceptors_list = list.into_raw(); + wal_acceptor_reconnect_timeout = 1000; + wal_acceptor_connection_timeout = 5000; + neon_tenant_walproposer = CString::new(ttid.tenant_id.to_string()).unwrap().into_raw(); + neon_timeline_walproposer = CString::new(ttid.timeline_id.to_string()).unwrap().into_raw(); + WalProposerRust(); + } // TODO: run sync-safekeepers }); @@ -44,7 +61,7 @@ fn run_walproposer_safekeeper_test() { } world.await_all(); - let time_limit = 1_000_000; + let time_limit = 1_000_0; while world.step() && world.now() < time_limit {} diff --git a/libs/walproposer/test.c b/libs/walproposer/test.c index 680743facd..eea11119bf 100644 --- a/libs/walproposer/test.c +++ b/libs/walproposer/test.c @@ -2,8 +2,10 @@ #include "rust_bindings.h" #include #include +#include #include "postgres.h" #include "utils/memutils.h" +#include "utils/guc.h" // From src/backend/main/main.c const char *progname = "fakepostgres"; @@ -30,7 +32,7 @@ void RunClientC(uint32_t serverId) { sim_msg_set_repl_cell(delivered+1, clientId, delivered); sim_tcp_send(tcp); - Event event = sim_epoll_rcv(); + Event event = sim_epoll_rcv(-1); switch (event.tag) { case Closed: @@ -63,6 +65,19 @@ void MyContextInit() { if (!initializedMemoryContext) { initializedMemoryContext = true; MemoryContextInit(); + + setenv("PGDATA", "/home/admin/simulator/libs/walproposer/pgdata", 1); + + /* + * Set default values for command-line options. + */ + InitializeGUCOptions(); + + /* Acquire configuration parameters */ + if (!SelectConfigFiles(NULL, progname)) + exit(1); + + log_min_messages = DEBUG5; } pthread_mutex_unlock(&lock); } diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 61a96e3925..b8d3e35b81 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -15,14 +15,11 @@ OBJS = \ PG_CPPFLAGS = -I$(libpq_srcdir) PG_LIBS = $(libpq) -PG_LIBS_INTERNAL = $(libpq) -SHLIB_LINK_INTERNAL = $(libpq) + EXTENSION = neon DATA = neon--1.0.sql PGFILEDESC = "neon - cloud storage for PostgreSQL" -# PROGRAM = boop - PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) include $(PGXS) diff --git a/pgxn/neon/rust_bindings.h b/pgxn/neon/rust_bindings.h new file mode 120000 index 0000000000..3ccf941e67 --- /dev/null +++ b/pgxn/neon/rust_bindings.h @@ -0,0 +1 @@ +../../libs/walproposer/rust_bindings.h \ No newline at end of file diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 1b34e5291d..81a2b51a2a 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -83,6 +83,12 @@ char *neon_tenant_walproposer = NULL; char *neon_safekeeper_token_walproposer = NULL; #define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot" +#define SIMLIB + +#ifdef SIMLIB +#include "rust_bindings.h" +#define GetCurrentTimestamp() ((TimestampTz) sim_now()) +#endif static int n_safekeepers = 0; static int quorum = 0; @@ -319,6 +325,7 @@ nwp_shmem_startup_hook(void) void WalProposerRust() { elog(LOG, "WalProposerRust"); + WalProposerSync(0, NULL); } /* @@ -382,6 +389,36 @@ WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos) BroadcastAppendRequest(); } +#ifdef SIMLIB +int +SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events) +{ + Event event = sim_epoll_rcv(timeout); + if (event.tag == Closed) { + // TODO: shutdown connection? + // elog(LOG, "connection closed"); + // ShutdownConnection(sk); + return 0; + } else if (event.tag == Message) { + Assert(event.any_message == Bytes); + for (int i = 0; i < n_safekeepers; i++) { + if (safekeeper[i].conn && ((int64_t) walprop_socket(safekeeper[i].conn)) == event.tcp) { + *occurred_events = (WaitEvent) { + .events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE, + }; + *sk = &safekeeper[i]; + return 1; + } + } + elog(FATAL, "unknown tcp connection"); + } else if (event.tag == Timeout) { + return 0; + } else { + Assert(false); + } +} +#endif + /* * Advance the WAL proposer state machine, waiting each time for events to occur. * Will exit only when latch is set, i.e. new WAL should be pushed from walsender @@ -397,16 +434,25 @@ WalProposerPoll(void) WaitEvent event; TimestampTz now = GetCurrentTimestamp(); +#ifndef SIMLIB rc = WaitEventSetWait(waitEvents, TimeToReconnect(now), &event, 1, WAIT_EVENT_WAL_SENDER_MAIN); sk = (Safekeeper *) event.user_data; +#else + rc = SimWaitEventSetWait(&sk, TimeToReconnect(now), &event); +#endif /* * If the event contains something that one of our safekeeper states * was waiting for, we'll advance its state. */ if (rc != 0 && (event.events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))) + { AdvancePollState(sk, event.events); + #ifdef SIMLIB + // TODO: assert that code consumed incoming message + #endif + } /* * If the timeout expired, attempt to reconnect to any safekeepers @@ -421,7 +467,9 @@ WalProposerPoll(void) */ if (rc != 0 && (event.events & WL_LATCH_SET)) { + #ifndef SIMLIB ResetLatch(MyLatch); + #endif break; } @@ -491,9 +539,11 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) char *sep; char *port; +#ifndef SIMLIB load_file("libpqwalreceiver", false); if (WalReceiverFunctions == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); +#endif for (host = wal_acceptors_list; host != NULL && *host != '\0'; host = sep) { @@ -597,6 +647,8 @@ WalProposerLoop(void) WalProposerPoll(); } +#ifndef SIMLIB + /* Initializes the internal event set, provided that it is currently null */ static void InitEventSet(void) @@ -668,6 +720,26 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove) } } +#else +static void +InitEventSet(void) +{ + elog(DEBUG5, "InitEventSet"); +} + +static void +UpdateEventSet(Safekeeper *sk, uint32 events) +{ + elog(DEBUG5, "UpdateEventSet"); +} + +static void +HackyRemoveWalProposerEvent(Safekeeper *to_remove) +{ + elog(DEBUG5, "HackyRemoveWalProposerEvent"); +} +#endif + /* Shuts down and cleans up the connection for a safekeeper. Sets its state to SS_OFFLINE */ static void ShutdownConnection(Safekeeper *sk) @@ -760,8 +832,13 @@ ResetConnection(Safekeeper *sk) sk->state = SS_CONNECTING_WRITE; sk->latestMsgReceivedAt = GetCurrentTimestamp(); +#ifndef SIMLIB sock = walprop_socket(sk->conn); sk->eventPos = AddWaitEventToSet(waitEvents, WL_SOCKET_WRITEABLE, sock, NULL, sk); +#else + HandleConnectionEvent(sk); + RecvStartWALPushResult(sk); +#endif return; } @@ -950,12 +1027,14 @@ HandleConnectionEvent(Safekeeper *sk) return; } +#ifndef SIMLIB /* * Because PQconnectPoll can change the socket, we have to un-register the * old event and re-register an event on the new socket. */ HackyRemoveWalProposerEvent(sk); sk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_socket(sk->conn), NULL, sk); +#endif /* If we successfully connected, send START_WAL_PUSH query */ if (result == WP_CONN_POLLING_OK) diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 1abaab2cc6..35807a4c88 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -10,6 +10,8 @@ #include "utils/uuid.h" #include "replication/walreceiver.h" +#define SIMLIB + #define SK_MAGIC 0xCafeCeefu #define SK_PROTOCOL_VERSION 2 @@ -374,8 +376,11 @@ typedef struct Safekeeper XLogRecPtr streamingAt; /* current streaming position */ AppendRequestHeader appendRequest; /* request for sending to safekeeper */ +#ifndef SIMLIB int eventPos; /* position in wait event set. Equal to -1 if* * no event */ +#endif + SafekeeperState state; /* safekeeper state machine state */ TimestampTz latestMsgReceivedAt; /* when latest msg is received */ AcceptorGreeting greetResponse; /* acceptor greeting */ diff --git a/safekeeper/src/simlib/node_os.rs b/safekeeper/src/simlib/node_os.rs index b582aa0c7e..3ed4e61137 100644 --- a/safekeeper/src/simlib/node_os.rs +++ b/safekeeper/src/simlib/node_os.rs @@ -26,6 +26,10 @@ impl NodeOs { self.internal.id } + pub fn now(&self) -> u64 { + self.world.now() + } + /// Returns a writable pipe. All incoming messages should be polled /// with [`network_epoll`]. Always successful. pub fn open_tcp(&self, dst: NodeId) -> TCP { @@ -37,6 +41,59 @@ impl NodeOs { self.internal.network_chan() } + /// Returns next event from the epoll channel with timeout. + /// Returns `None` if timeout is reached. + /// -1 – wait forever. + /// 0 - poll, return immediately. + /// >0 - wait for timeout milliseconds. + pub fn epoll_recv(&self, timeout: i64) -> Option { + let epoll = self.epoll(); + + let ready_event = loop { + let event = epoll.try_recv(); + if let Some(NodeEvent::WakeTimeout(_)) = event { + continue; + } + break event; + }; + + if let Some(event) = ready_event { + // return event if it's ready + return Some(event); + } + + if timeout == 0 { + // poll, return immediately + return None; + } + + // or wait for timeout + + let rand_nonce = self.random(u64::MAX); + if timeout > 0 { + self.world + .schedule( + timeout as u64, + SendMessageEvent::new( + epoll.clone(), + NodeEvent::WakeTimeout(rand_nonce), + ), + ); + } + + loop { + match epoll.recv() { + NodeEvent::WakeTimeout(nonce) if nonce == rand_nonce => { + return None; + } + NodeEvent::WakeTimeout(_) => {} + event => { + return Some(event); + } + } + } + } + /// Sleep for a given number of milliseconds. /// Currently matches the global virtual time, TODO may be good to /// introduce a separate clocks for each node. diff --git a/safekeeper/src/simlib/world.rs b/safekeeper/src/simlib/world.rs index 322121c08f..5e1a17ad87 100644 --- a/safekeeper/src/simlib/world.rs +++ b/safekeeper/src/simlib/world.rs @@ -358,5 +358,6 @@ pub enum NodeEvent { Accept(TCP), Closed(TCP), Message((AnyMessage, TCP)), + WakeTimeout(u64), // TODO: close? }