mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
Fix read/write in walproposer
This commit is contained in:
@@ -116,8 +116,10 @@ fn main() -> anyhow::Result<()> {
|
||||
.allowlist_var("neon_timeline_walproposer")
|
||||
.allowlist_var("neon_tenant_walproposer")
|
||||
.allowlist_var("syncSafekeepers")
|
||||
.allowlist_var("sim_redo_start_lsn")
|
||||
.clang_arg(format!("-I{inc_server_path}"))
|
||||
.clang_arg(format!("-I{inc_pgxn_path}"))
|
||||
.clang_arg(format!("-DSIMLIB"))
|
||||
// Finish the builder and generate the bindings.
|
||||
.generate()
|
||||
// Unwrap the Result and panic on failure.
|
||||
|
||||
@@ -3,6 +3,10 @@
|
||||
#include "walproposer.h"
|
||||
#include "rust_bindings.h"
|
||||
|
||||
// defined in walproposer.h
|
||||
uint64 sim_redo_start_lsn;
|
||||
XLogRecPtr sim_latest_available_lsn;
|
||||
|
||||
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
|
||||
struct WalProposerConn
|
||||
{
|
||||
@@ -100,6 +104,18 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
|
||||
{
|
||||
uintptr_t len;
|
||||
char *msg;
|
||||
Event event;
|
||||
|
||||
event = sim_epoll_peek(0);
|
||||
if (event.tcp != conn->tcp || event.tag != Message || event.any_message != Bytes)
|
||||
return PG_ASYNC_READ_TRY_AGAIN;
|
||||
|
||||
event = sim_epoll_rcv(0);
|
||||
|
||||
walprop_log(INFO, "walprop_async_read, T: %d, tcp: %d, tag: %d", (int) event.tag, (int) event.tcp, (int) event.any_message);
|
||||
Assert(event.tcp == conn->tcp);
|
||||
Assert(event.tag == Message);
|
||||
Assert(event.any_message == Bytes);
|
||||
|
||||
msg = sim_msg_get_bytes(&len);
|
||||
*buf = msg;
|
||||
@@ -112,8 +128,10 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
|
||||
PGAsyncWriteResult
|
||||
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
|
||||
{
|
||||
walprop_log(INFO, "not implemented");
|
||||
return PG_ASYNC_WRITE_FAIL;
|
||||
walprop_log(INFO, "walprop_async_write");
|
||||
sim_msg_set_bytes(buf, size);
|
||||
sim_tcp_send(conn->tcp);
|
||||
return PG_ASYNC_WRITE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -123,8 +141,29 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
|
||||
bool
|
||||
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
|
||||
{
|
||||
walprop_log(INFO, "not implemented: walprop_blocking_write");
|
||||
walprop_log(INFO, "walprop_blocking_write");
|
||||
sim_msg_set_bytes(buf, size);
|
||||
sim_tcp_send(conn->tcp);
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
sim_start_replication(XLogRecPtr startptr)
|
||||
{
|
||||
walprop_log(INFO, "sim_start_replication: %X/%X", LSN_FORMAT_ARGS(startptr));
|
||||
sim_latest_available_lsn = startptr;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
XLogRecPtr endptr = sim_latest_available_lsn;
|
||||
|
||||
Assert(startptr <= endptr);
|
||||
if (endptr > startptr)
|
||||
{
|
||||
WalProposerBroadcast(startptr, endptr);
|
||||
startptr = endptr;
|
||||
}
|
||||
|
||||
WalProposerPoll();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ enum AnyMessageTag {
|
||||
Just32,
|
||||
ReplCell,
|
||||
Bytes,
|
||||
LSN,
|
||||
};
|
||||
typedef uint8_t AnyMessageTag;
|
||||
|
||||
@@ -23,6 +24,7 @@ enum EventTag {
|
||||
Accept,
|
||||
Closed,
|
||||
Message,
|
||||
Internal,
|
||||
};
|
||||
typedef uint8_t EventTag;
|
||||
|
||||
@@ -55,6 +57,8 @@ void sim_tcp_send(int64_t tcp);
|
||||
|
||||
struct Event sim_epoll_rcv(int64_t timeout);
|
||||
|
||||
struct Event sim_epoll_peek(int64_t timeout);
|
||||
|
||||
int64_t sim_now(void);
|
||||
|
||||
void sim_exit(int32_t code, const uint8_t *msg);
|
||||
@@ -69,6 +73,11 @@ AnyMessageTag sim_msg_tag(void);
|
||||
*/
|
||||
void sim_msg_get_just_u32(uint32_t *val);
|
||||
|
||||
/**
|
||||
* Read AnyMessage::LSN message.
|
||||
*/
|
||||
void sim_msg_get_lsn(uint64_t *val);
|
||||
|
||||
/**
|
||||
* Write AnyMessage::ReplCell message.
|
||||
*/
|
||||
|
||||
@@ -126,6 +126,51 @@ pub extern "C" fn sim_epoll_rcv(timeout: i64) -> Event {
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn sim_epoll_peek(timeout: i64) -> Event {
|
||||
let event = os().epoll_peek(timeout);
|
||||
let event = if let Some(event) = event {
|
||||
event
|
||||
} else {
|
||||
return Event {
|
||||
tag: EventTag::Timeout,
|
||||
tcp: 0,
|
||||
any_message: AnyMessageTag::None,
|
||||
};
|
||||
};
|
||||
|
||||
match event {
|
||||
NodeEvent::Accept(tcp) => Event {
|
||||
tag: EventTag::Accept,
|
||||
tcp: tcp_save(tcp),
|
||||
any_message: AnyMessageTag::None,
|
||||
},
|
||||
NodeEvent::Closed(tcp) => Event {
|
||||
tag: EventTag::Closed,
|
||||
tcp: tcp_save(tcp),
|
||||
any_message: AnyMessageTag::None,
|
||||
},
|
||||
NodeEvent::Message((message, tcp)) => {
|
||||
Event {
|
||||
tag: EventTag::Message,
|
||||
tcp: tcp_save(tcp),
|
||||
any_message: anymessage_tag(&message),
|
||||
}
|
||||
}
|
||||
NodeEvent::Internal(message) => {
|
||||
Event {
|
||||
tag: EventTag::Internal,
|
||||
tcp: 0,
|
||||
any_message: anymessage_tag(&message),
|
||||
}
|
||||
}
|
||||
NodeEvent::WakeTimeout(_) => {
|
||||
// can't happen
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub extern "C" fn sim_now() -> i64 {
|
||||
os().now() as i64
|
||||
|
||||
@@ -8,6 +8,7 @@ pub(crate) fn anymessage_tag(msg: &AnyMessage) -> AnyMessageTag {
|
||||
AnyMessage::Just32(_) => AnyMessageTag::Just32,
|
||||
AnyMessage::ReplCell(_) => AnyMessageTag::ReplCell,
|
||||
AnyMessage::Bytes(_) => AnyMessageTag::Bytes,
|
||||
AnyMessage::LSN(_) => AnyMessageTag::LSN,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +33,17 @@ pub extern "C" fn sim_msg_get_just_u32(val: &mut u32) {
|
||||
});
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
/// Read AnyMessage::LSN message.
|
||||
pub extern "C" fn sim_msg_get_lsn(val: &mut u64) {
|
||||
MESSAGE_BUF.with(|cell| match &*cell.borrow() {
|
||||
AnyMessage::LSN(v) => {
|
||||
*val = *v;
|
||||
}
|
||||
_ => panic!("expected LSN message"),
|
||||
});
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
/// Write AnyMessage::ReplCell message.
|
||||
pub extern "C" fn sim_msg_set_repl_cell(value: u32, client_id: u32, seqno: u32) {
|
||||
@@ -98,4 +110,5 @@ pub enum AnyMessageTag {
|
||||
Just32,
|
||||
ReplCell,
|
||||
Bytes,
|
||||
LSN,
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::{collections::HashMap, path::PathBuf, time::Duration};
|
||||
use bytes::BytesMut;
|
||||
use hyper::Uri;
|
||||
use log::info;
|
||||
use safekeeper::{simlib::{node_os::NodeOs, network::TCP, world::NodeEvent, proto::AnyMessage}, safekeeper::{ProposerAcceptorMessage, ServerInfo, SafeKeeperState, UNKNOWN_SERVER_VERSION, SafeKeeper}, timeline::{TimelineError}, SafeKeeperConf};
|
||||
use safekeeper::{simlib::{node_os::NodeOs, network::TCP, world::NodeEvent, proto::AnyMessage}, safekeeper::{ProposerAcceptorMessage, ServerInfo, SafeKeeperState, UNKNOWN_SERVER_VERSION, SafeKeeper, AcceptorProposerMessage}, timeline::{TimelineError}, SafeKeeperConf};
|
||||
use utils::{id::{TenantTimelineId, NodeId}, lsn::Lsn};
|
||||
use anyhow::{Result, bail};
|
||||
|
||||
@@ -81,6 +81,7 @@ pub fn run_server(os: NodeOs) -> Result<()> {
|
||||
println!("conn {:?} was closed, dropping msg {:?}", tcp, msg);
|
||||
}
|
||||
}
|
||||
NodeEvent::Internal(_) => {}
|
||||
NodeEvent::Closed(_) => {}
|
||||
NodeEvent::WakeTimeout(_) => {}
|
||||
}
|
||||
@@ -203,10 +204,20 @@ impl ConnState {
|
||||
/// Make safekeeper process a message and send a reply to the TCP
|
||||
fn process_sk_msg(&mut self, msg: &ProposerAcceptorMessage) -> Result<()> {
|
||||
let shared_state = self.tli.as_mut().unwrap();
|
||||
let reply = shared_state.sk.process_msg(msg)?;
|
||||
if let Some(reply) = reply {
|
||||
let mut reply = shared_state.sk.process_msg(msg)?;
|
||||
if let Some(reply) = &mut reply {
|
||||
// // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
|
||||
// if let AcceptorProposerMessage::AppendResponse(ref mut resp) = reply {
|
||||
// // TODO:
|
||||
// }
|
||||
|
||||
println!("sending reply: {:?}", reply);
|
||||
|
||||
let mut buf = BytesMut::with_capacity(128);
|
||||
reply.serialize(&mut buf)?;
|
||||
|
||||
println!("sending reply len={}: {}", buf.len(), hex::encode(&buf));
|
||||
|
||||
self.tcp.send(AnyMessage::Bytes(buf.into()));
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -10,7 +10,7 @@ use utils::{id::TenantTimelineId, logging, lsn::Lsn};
|
||||
use crate::{
|
||||
bindings::{
|
||||
neon_tenant_walproposer, neon_timeline_walproposer, wal_acceptor_connection_timeout,
|
||||
wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers,
|
||||
wal_acceptor_reconnect_timeout, wal_acceptors_list, WalProposerRust, WalProposerCleanup, syncSafekeepers, sim_redo_start_lsn,
|
||||
},
|
||||
c_context,
|
||||
simtest::safekeeper::run_server,
|
||||
@@ -130,6 +130,13 @@ impl Test {
|
||||
fn launch_walproposer(&self, lsn: Lsn) -> WalProposer {
|
||||
let client_node = self.world.new_node();
|
||||
|
||||
let lsn = if lsn.0 == 0 {
|
||||
// usual LSN after basebackup
|
||||
Lsn(21623024)
|
||||
} else {
|
||||
lsn
|
||||
};
|
||||
|
||||
// start the client thread
|
||||
let guc = self.safekeepers_guc.clone();
|
||||
let ttid = self.ttid.clone();
|
||||
@@ -138,9 +145,8 @@ impl Test {
|
||||
|
||||
unsafe {
|
||||
WalProposerCleanup();
|
||||
|
||||
// TODO: set LSN to a variable
|
||||
|
||||
sim_redo_start_lsn = lsn.0;
|
||||
syncSafekeepers = false;
|
||||
wal_acceptors_list = list.into_raw();
|
||||
wal_acceptor_reconnect_timeout = 1000;
|
||||
@@ -160,6 +166,11 @@ impl Test {
|
||||
node: client_node,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_for_duration(&self, duration: u64) {
|
||||
let time_limit = std::cmp::min(self.world.now() + duration, self.timeout);
|
||||
while self.world.step() && self.world.now() < time_limit {}
|
||||
}
|
||||
}
|
||||
|
||||
struct WalProposer {
|
||||
@@ -202,7 +213,9 @@ fn run_walproposer_generate_wal() {
|
||||
println!("Sucessfully synced empty safekeepers at 0/0");
|
||||
|
||||
let wp = test.launch_walproposer(lsn);
|
||||
let rec1 = wp.gen_wal_record();
|
||||
// let rec1 = wp.gen_wal_record();
|
||||
|
||||
test.poll_for_duration(3000);
|
||||
|
||||
// TODO:
|
||||
}
|
||||
}
|
||||
|
||||
@@ -340,6 +340,7 @@ void WalProposerCleanup()
|
||||
n_connected = 0;
|
||||
last_reconnect_attempt = 0;
|
||||
|
||||
walprop_shared = palloc(WalproposerShmemSize());
|
||||
if (walprop_shared != NULL)
|
||||
{
|
||||
memset(walprop_shared, 0, WalproposerShmemSize());
|
||||
@@ -459,14 +460,15 @@ WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos)
|
||||
int
|
||||
SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events)
|
||||
{
|
||||
Event event = sim_epoll_rcv(timeout);
|
||||
Event event = sim_epoll_peek(timeout);
|
||||
if (event.tag == Closed) {
|
||||
sim_epoll_rcv(0);
|
||||
// TODO: shutdown connection?
|
||||
// walprop_log(LOG, "connection closed");
|
||||
// ShutdownConnection(sk);
|
||||
return 0;
|
||||
} else if (event.tag == Message) {
|
||||
Assert(event.any_message == Bytes);
|
||||
} else if (event.tag == Message && event.any_message == Bytes) {
|
||||
// !!! code must read the message
|
||||
for (int i = 0; i < n_safekeepers; i++) {
|
||||
if (safekeeper[i].conn && ((int64_t) walprop_socket(safekeeper[i].conn)) == event.tcp) {
|
||||
*occurred_events = (WaitEvent) {
|
||||
@@ -477,12 +479,18 @@ SimWaitEventSetWait(Safekeeper **sk, long timeout, WaitEvent *occurred_events)
|
||||
}
|
||||
}
|
||||
walprop_log(FATAL, "unknown tcp connection");
|
||||
} else if (event.tag == Message && event.any_message == LSN) {
|
||||
sim_epoll_rcv(0);
|
||||
sim_msg_get_lsn(&sim_latest_available_lsn);
|
||||
*occurred_events = (WaitEvent) {
|
||||
.events = WL_LATCH_SET,
|
||||
};
|
||||
return 1;
|
||||
} else if (event.tag == Timeout) {
|
||||
return 0;
|
||||
} else {
|
||||
Assert(false);
|
||||
}
|
||||
// TODO: handle notification about new LSN available
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -2446,7 +2454,12 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg)
|
||||
if (!(AsyncRead(sk, &buf, &buf_size)))
|
||||
return false;
|
||||
|
||||
/* parse it */
|
||||
for (int i = 0; i < buf_size; i++) {
|
||||
fprintf(stderr, "%02x", buf[i]);
|
||||
}
|
||||
fprintf(stderr, "\n");
|
||||
|
||||
/* parse it */
|
||||
s.data = buf;
|
||||
s.len = buf_size;
|
||||
s.cursor = 0;
|
||||
|
||||
@@ -28,6 +28,12 @@
|
||||
errhidestmt(true), errhidecontext(true), internalerrposition(0)))
|
||||
#endif
|
||||
|
||||
#ifdef SIMLIB
|
||||
extern uint64 sim_redo_start_lsn;
|
||||
#define GetRedoStartLsn() sim_redo_start_lsn
|
||||
extern XLogRecPtr sim_latest_available_lsn;
|
||||
#endif
|
||||
|
||||
#define SK_MAGIC 0xCafeCeefu
|
||||
#define SK_PROTOCOL_VERSION 2
|
||||
|
||||
|
||||
@@ -505,6 +505,8 @@ XLogWalPropClose(XLogRecPtr recptr)
|
||||
|
||||
/* START of cloned functions from walsender.c */
|
||||
|
||||
void sim_start_replication(XLogRecPtr startpoint);
|
||||
|
||||
/*
|
||||
* Handle START_REPLICATION command.
|
||||
*
|
||||
@@ -517,6 +519,11 @@ StartProposerReplication(StartReplicationCmd *cmd)
|
||||
XLogRecPtr FlushPtr;
|
||||
TimeLineID currTLI;
|
||||
|
||||
#ifdef SIMLIB
|
||||
sim_start_replication(cmd->startpoint);
|
||||
return;
|
||||
#endif
|
||||
|
||||
#if PG_VERSION_NUM < 150000
|
||||
if (ThisTimeLineID == 0)
|
||||
ereport(ERROR,
|
||||
|
||||
@@ -45,9 +45,29 @@ impl<T: Clone> Chan<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as `recv`, but doesn't take the message from the queue.
|
||||
pub fn peek(&self) -> T {
|
||||
// interrupt the receiver to prevent consuming everything at once
|
||||
Park::yield_thread();
|
||||
|
||||
let mut queue = self.shared.queue.lock();
|
||||
loop {
|
||||
if let Some(t) = queue.front().cloned() {
|
||||
return t;
|
||||
}
|
||||
self.shared.condvar.wait(&mut queue);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a message from the front of the queue, or return `None` if the queue is empty.
|
||||
pub fn try_recv(&self) -> Option<T> {
|
||||
let mut queue = self.shared.queue.lock();
|
||||
queue.pop_front()
|
||||
}
|
||||
|
||||
/// Clone a message from the front of the queue, or return `None` if the queue is empty.
|
||||
pub fn try_peek(&self) -> Option<T> {
|
||||
let queue = self.shared.queue.lock();
|
||||
queue.front().cloned()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,6 +90,55 @@ impl NodeOs {
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as epoll_recv, but does not remove the event from the queue.
|
||||
pub fn epoll_peek(&self, timeout: i64) -> Option<NodeEvent> {
|
||||
let epoll = self.epoll();
|
||||
|
||||
let ready_event = loop {
|
||||
let event = epoll.try_peek();
|
||||
if let Some(NodeEvent::WakeTimeout(_)) = event {
|
||||
assert!(epoll.try_recv().is_some());
|
||||
continue;
|
||||
}
|
||||
break event;
|
||||
};
|
||||
|
||||
if let Some(event) = ready_event {
|
||||
// return event if it's ready
|
||||
return Some(event);
|
||||
}
|
||||
|
||||
if timeout == 0 {
|
||||
// poll, return immediately
|
||||
return None;
|
||||
}
|
||||
|
||||
// or wait for timeout
|
||||
|
||||
let rand_nonce = self.random(u64::MAX);
|
||||
if timeout > 0 {
|
||||
self.world.schedule(
|
||||
timeout as u64,
|
||||
SendMessageEvent::new(epoll.clone(), NodeEvent::WakeTimeout(rand_nonce)),
|
||||
);
|
||||
}
|
||||
|
||||
loop {
|
||||
match epoll.peek() {
|
||||
NodeEvent::WakeTimeout(nonce) if nonce == rand_nonce => {
|
||||
assert!(epoll.try_recv().is_some());
|
||||
return None;
|
||||
}
|
||||
NodeEvent::WakeTimeout(_) => {
|
||||
assert!(epoll.try_recv().is_some());
|
||||
}
|
||||
event => {
|
||||
return Some(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Sleep for a given number of milliseconds.
|
||||
/// Currently matches the global virtual time, TODO may be good to
|
||||
/// introduce a separate clocks for each node.
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use bytes::Bytes;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// All possible flavours of messages.
|
||||
/// Grouped by the receiver node.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone)]
|
||||
pub enum AnyMessage {
|
||||
/// Not used, empty placeholder.
|
||||
None,
|
||||
@@ -11,6 +14,20 @@ pub enum AnyMessage {
|
||||
Just32(u32),
|
||||
ReplCell(ReplCell),
|
||||
Bytes(Bytes),
|
||||
LSN(u64),
|
||||
}
|
||||
|
||||
impl Debug for AnyMessage {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
AnyMessage::None => write!(f, "None"),
|
||||
AnyMessage::InternalConnect => write!(f, "InternalConnect"),
|
||||
AnyMessage::Just32(v) => write!(f, "Just32({})", v),
|
||||
AnyMessage::ReplCell(v) => write!(f, "ReplCell({:?})", v),
|
||||
AnyMessage::Bytes(v) => write!(f, "Bytes({})", hex::encode(v)),
|
||||
AnyMessage::LSN(v) => write!(f, "LSN({})", Lsn(*v)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
||||
Reference in New Issue
Block a user