diff --git a/libs/walproposer/build.rs b/libs/walproposer/build.rs index 97614c7019..8a5209f85c 100644 --- a/libs/walproposer/build.rs +++ b/libs/walproposer/build.rs @@ -116,8 +116,10 @@ fn main() -> anyhow::Result<()> { .allowlist_var("neon_timeline_walproposer") .allowlist_var("neon_tenant_walproposer") .allowlist_var("syncSafekeepers") + .allowlist_var("sim_redo_start_lsn") .clang_arg(format!("-I{inc_server_path}")) .clang_arg(format!("-I{inc_pgxn_path}")) + .clang_arg(format!("-DSIMLIB")) // Finish the builder and generate the bindings. .generate() // Unwrap the Result and panic on failure. diff --git a/libs/walproposer/libpqwalproposer.c b/libs/walproposer/libpqwalproposer.c index c45fb38336..742b36f627 100644 --- a/libs/walproposer/libpqwalproposer.c +++ b/libs/walproposer/libpqwalproposer.c @@ -3,6 +3,10 @@ #include "walproposer.h" #include "rust_bindings.h" +// defined in walproposer.h +uint64 sim_redo_start_lsn; +XLogRecPtr sim_latest_available_lsn; + /* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */ struct WalProposerConn { @@ -100,6 +104,18 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) { uintptr_t len; char *msg; + Event event; + + event = sim_epoll_peek(0); + if (event.tcp != conn->tcp || event.tag != Message || event.any_message != Bytes) + return PG_ASYNC_READ_TRY_AGAIN; + + event = sim_epoll_rcv(0); + + walprop_log(INFO, "walprop_async_read, T: %d, tcp: %d, tag: %d", (int) event.tag, (int) event.tcp, (int) event.any_message); + Assert(event.tcp == conn->tcp); + Assert(event.tag == Message); + Assert(event.any_message == Bytes); msg = sim_msg_get_bytes(&len); *buf = msg; @@ -112,8 +128,10 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) { - walprop_log(INFO, "not implemented"); - return PG_ASYNC_WRITE_FAIL; + walprop_log(INFO, "walprop_async_write"); + sim_msg_set_bytes(buf, size); + sim_tcp_send(conn->tcp); + return PG_ASYNC_WRITE_SUCCESS; } /* @@ -123,8 +141,29 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) { - walprop_log(INFO, "not implemented: walprop_blocking_write"); + walprop_log(INFO, "walprop_blocking_write"); sim_msg_set_bytes(buf, size); sim_tcp_send(conn->tcp); return true; } + +void +sim_start_replication(XLogRecPtr startptr) +{ + walprop_log(INFO, "sim_start_replication: %X/%X", LSN_FORMAT_ARGS(startptr)); + sim_latest_available_lsn = startptr; + + for (;;) + { + XLogRecPtr endptr = sim_latest_available_lsn; + + Assert(startptr <= endptr); + if (endptr > startptr) + { + WalProposerBroadcast(startptr, endptr); + startptr = endptr; + } + + WalProposerPoll(); + } +} diff --git a/libs/walproposer/rust_bindings.h b/libs/walproposer/rust_bindings.h index e65a7bbaed..7032624594 100644 --- a/libs/walproposer/rust_bindings.h +++ b/libs/walproposer/rust_bindings.h @@ -12,6 +12,7 @@ enum AnyMessageTag { Just32, ReplCell, Bytes, + LSN, }; typedef uint8_t AnyMessageTag; @@ -23,6 +24,7 @@ enum EventTag { Accept, Closed, Message, + Internal, }; typedef uint8_t EventTag; @@ -55,6 +57,8 @@ void sim_tcp_send(int64_t tcp); struct Event sim_epoll_rcv(int64_t timeout); +struct Event sim_epoll_peek(int64_t timeout); + int64_t sim_now(void); void sim_exit(int32_t code, const uint8_t *msg); @@ -69,6 +73,11 @@ AnyMessageTag sim_msg_tag(void); */ void sim_msg_get_just_u32(uint32_t *val); +/** + * Read AnyMessage::LSN message. + */ +void sim_msg_get_lsn(uint64_t *val); + /** * Write AnyMessage::ReplCell message. */ diff --git a/libs/walproposer/src/sim.rs b/libs/walproposer/src/sim.rs index 9b9f1cb935..3cc08f6a44 100644 --- a/libs/walproposer/src/sim.rs +++ b/libs/walproposer/src/sim.rs @@ -126,6 +126,51 @@ pub extern "C" fn sim_epoll_rcv(timeout: i64) -> Event { } } +#[no_mangle] +pub extern "C" fn sim_epoll_peek(timeout: i64) -> Event { + let event = os().epoll_peek(timeout); + let event = if let Some(event) = event { + event + } else { + return Event { + tag: EventTag::Timeout, + tcp: 0, + any_message: AnyMessageTag::None, + }; + }; + + match event { + NodeEvent::Accept(tcp) => Event { + tag: EventTag::Accept, + tcp: tcp_save(tcp), + any_message: AnyMessageTag::None, + }, + NodeEvent::Closed(tcp) => Event { + tag: EventTag::Closed, + tcp: tcp_save(tcp), + any_message: AnyMessageTag::None, + }, + NodeEvent::Message((message, tcp)) => { + Event { + tag: EventTag::Message, + tcp: tcp_save(tcp), + any_message: anymessage_tag(&message), + } + } + NodeEvent::Internal(message) => { + Event { + tag: EventTag::Internal, + tcp: 0, + any_message: anymessage_tag(&message), + } + } + NodeEvent::WakeTimeout(_) => { + // can't happen + unreachable!() + } + } +} + #[no_mangle] pub extern "C" fn sim_now() -> i64 { os().now() as i64 diff --git a/libs/walproposer/src/sim_proto.rs b/libs/walproposer/src/sim_proto.rs index f1616c0384..c7fad99cce 100644 --- a/libs/walproposer/src/sim_proto.rs +++ b/libs/walproposer/src/sim_proto.rs @@ -8,6 +8,7 @@ pub(crate) fn anymessage_tag(msg: &AnyMessage) -> AnyMessageTag { AnyMessage::Just32(_) => AnyMessageTag::Just32, AnyMessage::ReplCell(_) => AnyMessageTag::ReplCell, AnyMessage::Bytes(_) => AnyMessageTag::Bytes, + AnyMessage::LSN(_) => AnyMessageTag::LSN, } } @@ -32,6 +33,17 @@ pub extern "C" fn sim_msg_get_just_u32(val: &mut u32) { }); } +#[no_mangle] +/// Read AnyMessage::LSN message. +pub extern "C" fn sim_msg_get_lsn(val: &mut u64) { + MESSAGE_BUF.with(|cell| match &*cell.borrow() { + AnyMessage::LSN(v) => { + *val = *v; + } + _ => panic!("expected LSN message"), + }); +} + #[no_mangle] /// Write AnyMessage::ReplCell message. pub extern "C" fn sim_msg_set_repl_cell(value: u32, client_id: u32, seqno: u32) { @@ -98,4 +110,5 @@ pub enum AnyMessageTag { Just32, ReplCell, Bytes, + LSN, } diff --git a/libs/walproposer/src/simtest/safekeeper.rs b/libs/walproposer/src/simtest/safekeeper.rs index 54a11a61fd..24a6ae0543 100644 --- a/libs/walproposer/src/simtest/safekeeper.rs +++ b/libs/walproposer/src/simtest/safekeeper.rs @@ -7,7 +7,7 @@ use std::{collections::HashMap, path::PathBuf, time::Duration}; use bytes::BytesMut; use hyper::Uri; use log::info; -use safekeeper::{simlib::{node_os::NodeOs, network::TCP, world::NodeEvent, proto::AnyMessage}, safekeeper::{ProposerAcceptorMessage, ServerInfo, SafeKeeperState, UNKNOWN_SERVER_VERSION, SafeKeeper}, timeline::{TimelineError}, SafeKeeperConf}; +use safekeeper::{simlib::{node_os::NodeOs, network::TCP, world::NodeEvent, proto::AnyMessage}, safekeeper::{ProposerAcceptorMessage, ServerInfo, SafeKeeperState, UNKNOWN_SERVER_VERSION, SafeKeeper, AcceptorProposerMessage}, timeline::{TimelineError}, SafeKeeperConf}; use utils::{id::{TenantTimelineId, NodeId}, lsn::Lsn}; use anyhow::{Result, bail}; @@ -81,6 +81,7 @@ pub fn run_server(os: NodeOs) -> Result<()> { println!("conn {:?} was closed, dropping msg {:?}", tcp, msg); } } + NodeEvent::Internal(_) => {} NodeEvent::Closed(_) => {} NodeEvent::WakeTimeout(_) => {} } @@ -203,10 +204,20 @@ impl ConnState { /// Make safekeeper process a message and send a reply to the TCP fn process_sk_msg(&mut self, msg: &ProposerAcceptorMessage) -> Result<()> { let shared_state = self.tli.as_mut().unwrap(); - let reply = shared_state.sk.process_msg(msg)?; - if let Some(reply) = reply { + let mut reply = shared_state.sk.process_msg(msg)?; + if let Some(reply) = &mut reply { + // // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn + // if let AcceptorProposerMessage::AppendResponse(ref mut resp) = reply { + // // TODO: + // } + + println!("sending reply: {:?}", reply); + let mut buf = BytesMut::with_capacity(128); reply.serialize(&mut buf)?; + + println!("sending reply len={}: {}", buf.len(), hex::encode(&buf)); + self.tcp.send(AnyMessage::Bytes(buf.into())); } Ok(()) diff --git a/libs/walproposer/src/simtest/wp_sk.rs b/libs/walproposer/src/simtest/wp_sk.rs index 7dbbfcf8ca..a1724e2f78 100644 --- a/libs/walproposer/src/simtest/wp_sk.rs +++ b/libs/walproposer/src/simtest/wp_sk.rs @@ -10,7 +10,7 @@ use utils::{id::TenantTimelineId, logging, lsn::Lsn}; use crate::{ bindings::{ neon_tenant_walproposer, neon_timeline_walproposer, wal_acceptor_connection_timeout, - wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers, + wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers, sim_redo_start_lsn, }, c_context, simtest::safekeeper::run_server, @@ -130,6 +130,13 @@ impl Test { fn launch_walproposer(&self, lsn: Lsn) -> WalProposer { let client_node = self.world.new_node(); + let lsn = if lsn.0 == 0 { + // usual LSN after basebackup + Lsn(21623024) + } else { + lsn + }; + // start the client thread let guc = self.safekeepers_guc.clone(); let ttid = self.ttid.clone(); @@ -138,9 +145,8 @@ impl Test { unsafe { WalProposerCleanup(); - - // TODO: set LSN to a variable + sim_redo_start_lsn = lsn.0; syncSafekeepers = false; wal_acceptors_list = list.into_raw(); wal_acceptor_reconnect_timeout = 1000; @@ -160,6 +166,11 @@ impl Test { node: client_node, } } + + fn poll_for_duration(&self, duration: u64) { + let time_limit = std::cmp::min(self.world.now() + duration, self.timeout); + while self.world.step() && self.world.now() < time_limit {} + } } struct WalProposer { @@ -202,7 +213,9 @@ fn run_walproposer_generate_wal() { println!("Sucessfully synced empty safekeepers at 0/0"); let wp = test.launch_walproposer(lsn); - let rec1 = wp.gen_wal_record(); + // let rec1 = wp.gen_wal_record(); + + test.poll_for_duration(3000); // TODO: -} \ No newline at end of file +} diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index ad5807d548..defb9bd0a8 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -340,6 +340,7 @@ void WalProposerCleanup() n_connected = 0; last_reconnect_attempt = 0; + walprop_shared = palloc(WalproposerShmemSize()); if (walprop_shared != NULL) { memset(walprop_shared, 0, WalproposerShmemSize()); @@ -459,14 +460,15 @@ WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos) int SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events) { - Event event = sim_epoll_rcv(timeout); + Event event = sim_epoll_peek(timeout); if (event.tag == Closed) { + sim_epoll_rcv(0); // TODO: shutdown connection? // walprop_log(LOG, "connection closed"); // ShutdownConnection(sk); return 0; - } else if (event.tag == Message) { - Assert(event.any_message == Bytes); + } else if (event.tag == Message && event.any_message == Bytes) { + // !!! code must read the message for (int i = 0; i < n_safekeepers; i++) { if (safekeeper[i].conn && ((int64_t) walprop_socket(safekeeper[i].conn)) == event.tcp) { *occurred_events = (WaitEvent) { @@ -477,12 +479,18 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events) } } walprop_log(FATAL, "unknown tcp connection"); + } else if (event.tag == Message && event.any_message == LSN) { + sim_epoll_rcv(0); + sim_msg_get_lsn(&sim_latest_available_lsn); + *occurred_events = (WaitEvent) { + .events = WL_LATCH_SET, + }; + return 1; } else if (event.tag == Timeout) { return 0; } else { Assert(false); } - // TODO: handle notification about new LSN available } #endif @@ -2446,7 +2454,12 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) if (!(AsyncRead(sk, &buf, &buf_size))) return false; - /* parse it */ + for (int i = 0; i < buf_size; i++) { + fprintf(stderr, "%02x", buf[i]); + } + fprintf(stderr, "\n"); + + /* parse it */ s.data = buf; s.len = buf_size; s.cursor = 0; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 29c07eeac2..701a3d784b 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -28,6 +28,12 @@ errhidestmt(true), errhidecontext(true), internalerrposition(0))) #endif +#ifdef SIMLIB +extern uint64 sim_redo_start_lsn; +#define GetRedoStartLsn() sim_redo_start_lsn +extern XLogRecPtr sim_latest_available_lsn; +#endif + #define SK_MAGIC 0xCafeCeefu #define SK_PROTOCOL_VERSION 2 diff --git a/pgxn/neon/walproposer_utils.c b/pgxn/neon/walproposer_utils.c index 78f5e3fd1c..be59cba7f8 100644 --- a/pgxn/neon/walproposer_utils.c +++ b/pgxn/neon/walproposer_utils.c @@ -505,6 +505,8 @@ XLogWalPropClose(XLogRecPtr recptr) /* START of cloned functions from walsender.c */ +void sim_start_replication(XLogRecPtr startpoint); + /* * Handle START_REPLICATION command. * @@ -517,6 +519,11 @@ StartProposerReplication(StartReplicationCmd *cmd) XLogRecPtr FlushPtr; TimeLineID currTLI; +#ifdef SIMLIB + sim_start_replication(cmd->startpoint); + return; +#endif + #if PG_VERSION_NUM < 150000 if (ThisTimeLineID == 0) ereport(ERROR, diff --git a/safekeeper/src/simlib/chan.rs b/safekeeper/src/simlib/chan.rs index dfa7c6ee52..21505d0dbe 100644 --- a/safekeeper/src/simlib/chan.rs +++ b/safekeeper/src/simlib/chan.rs @@ -45,9 +45,29 @@ impl Chan { } } + /// Same as `recv`, but doesn't take the message from the queue. + pub fn peek(&self) -> T { + // interrupt the receiver to prevent consuming everything at once + Park::yield_thread(); + + let mut queue = self.shared.queue.lock(); + loop { + if let Some(t) = queue.front().cloned() { + return t; + } + self.shared.condvar.wait(&mut queue); + } + } + /// Get a message from the front of the queue, or return `None` if the queue is empty. pub fn try_recv(&self) -> Option { let mut queue = self.shared.queue.lock(); queue.pop_front() } + + /// Clone a message from the front of the queue, or return `None` if the queue is empty. + pub fn try_peek(&self) -> Option { + let queue = self.shared.queue.lock(); + queue.front().cloned() + } } diff --git a/safekeeper/src/simlib/node_os.rs b/safekeeper/src/simlib/node_os.rs index 20d42b7ffa..2aac7984fe 100644 --- a/safekeeper/src/simlib/node_os.rs +++ b/safekeeper/src/simlib/node_os.rs @@ -90,6 +90,55 @@ impl NodeOs { } } + /// Same as epoll_recv, but does not remove the event from the queue. + pub fn epoll_peek(&self, timeout: i64) -> Option { + let epoll = self.epoll(); + + let ready_event = loop { + let event = epoll.try_peek(); + if let Some(NodeEvent::WakeTimeout(_)) = event { + assert!(epoll.try_recv().is_some()); + continue; + } + break event; + }; + + if let Some(event) = ready_event { + // return event if it's ready + return Some(event); + } + + if timeout == 0 { + // poll, return immediately + return None; + } + + // or wait for timeout + + let rand_nonce = self.random(u64::MAX); + if timeout > 0 { + self.world.schedule( + timeout as u64, + SendMessageEvent::new(epoll.clone(), NodeEvent::WakeTimeout(rand_nonce)), + ); + } + + loop { + match epoll.peek() { + NodeEvent::WakeTimeout(nonce) if nonce == rand_nonce => { + assert!(epoll.try_recv().is_some()); + return None; + } + NodeEvent::WakeTimeout(_) => { + assert!(epoll.try_recv().is_some()); + } + event => { + return Some(event); + } + } + } + } + /// Sleep for a given number of milliseconds. /// Currently matches the global virtual time, TODO may be good to /// introduce a separate clocks for each node. diff --git a/safekeeper/src/simlib/proto.rs b/safekeeper/src/simlib/proto.rs index 1b2222e3ed..850b7da17b 100644 --- a/safekeeper/src/simlib/proto.rs +++ b/safekeeper/src/simlib/proto.rs @@ -1,8 +1,11 @@ +use std::fmt::Debug; + use bytes::Bytes; +use utils::lsn::Lsn; /// All possible flavours of messages. /// Grouped by the receiver node. -#[derive(Clone, Debug)] +#[derive(Clone)] pub enum AnyMessage { /// Not used, empty placeholder. None, @@ -11,6 +14,20 @@ pub enum AnyMessage { Just32(u32), ReplCell(ReplCell), Bytes(Bytes), + LSN(u64), +} + +impl Debug for AnyMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AnyMessage::None => write!(f, "None"), + AnyMessage::InternalConnect => write!(f, "InternalConnect"), + AnyMessage::Just32(v) => write!(f, "Just32({})", v), + AnyMessage::ReplCell(v) => write!(f, "ReplCell({:?})", v), + AnyMessage::Bytes(v) => write!(f, "Bytes({})", hex::encode(v)), + AnyMessage::LSN(v) => write!(f, "LSN({})", Lsn(*v)), + } + } } #[derive(Clone, Debug)]