diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 2610da4311..456da90d89 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -30,4 +30,7 @@ extern void pg_init_extension_server(void); extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id); extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id); +extern uint64 BackpressureThrottlingTime(void); +extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn); + #endif /* NEON_H */ diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 98cf6c6806..2b2f2d8354 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -1,8 +1,8 @@ #ifndef __NEON_WALPROPOSER_H__ #define __NEON_WALPROPOSER_H__ -#include "access/xlogdefs.h" #include "postgres.h" +#include "access/xlogdefs.h" #include "port.h" #include "access/xlog_internal.h" #include "access/transam.h" @@ -16,12 +16,6 @@ #define MAX_SAFEKEEPERS 32 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL * message */ -#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */ -#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender* - * message header */ -#define XLOG_HDR_END_POS (1 + 8) /* offset of end position in wal sender* - * message header */ - /* * In the spirit of WL_SOCKET_READABLE and others, this corresponds to no events having occurred, * because all WL_* events are given flags equal to some (1 << i), starting from i = 0 @@ -32,12 +26,9 @@ extern char *wal_acceptors_list; extern int wal_acceptor_reconnect_timeout; extern int wal_acceptor_connection_timeout; -struct WalProposerConn; /* Defined in libpqwalproposer */ +struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */ typedef struct WalProposerConn WalProposerConn; -struct WalMessage; -typedef struct WalMessage WalMessage; - /* Possible return values from ReadPGAsync */ typedef enum { @@ -385,11 +376,6 @@ extern void WalProposerPoll(void); extern void ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf); -extern void replication_feedback_set(PageserverFeedback *rf); -extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn); - -/* libpqwalproposer hooks & helper type */ - /* Re-exported PostgresPollingStatusType */ typedef enum { @@ -443,8 +429,6 @@ typedef enum WP_CONNECTION_IN_PROGRESS, } WalProposerConnStatusType; -extern uint64 BackpressureThrottlingTime(void); - /* * Collection of hooks for walproposer, to call postgres functions, * read WAL and send it over the network. @@ -452,7 +436,6 @@ extern uint64 BackpressureThrottlingTime(void); typedef struct walproposer_api { WalproposerShmemState * (*get_shmem_state) (void); - void (*replication_feedback_set) (PageserverFeedback * rf); void (*start_streaming) (XLogRecPtr startpos, TimeLineID timeline); void (*init_walsender) (void); void (*init_standalone_sync_safekeepers) (void); diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 8995c9d6b8..f9ee86d958 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -39,14 +39,18 @@ #include "walproposer.h" #include "libpq-fe.h" +#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */ +#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender* + * message header */ + +#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot" + char *wal_acceptors_list = ""; int wal_acceptor_reconnect_timeout = 1000; int wal_acceptor_connection_timeout = 10000; static AppendResponse quorumFeedback; -#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot" - static WalproposerShmemState * walprop_shared; static void nwp_shmem_startup_hook(void); @@ -1473,7 +1477,7 @@ GetLatestNeonFeedback(PageserverFeedback * rf, Safekeeper * safekeepers, int n_s LSN_FORMAT_ARGS(rf->remote_consistent_lsn), rf->replytime); - walprop_pg.replication_feedback_set(rf); + replication_feedback_set(rf); } /* @@ -1546,7 +1550,7 @@ walprop_pg_process_safekeeper_feedback(Safekeeper * safekeepers, int n_safekeepe * pageserver. */ quorumFeedback.rf.disk_consistent_lsn, - walprop_pg.get_current_timestamp(), false); + walprop_pg_get_current_timestamp(), false); } CombineHotStanbyFeedbacks(&hsFeedback, safekeepers, n_safekeepers); @@ -1574,7 +1578,6 @@ walprop_pg_confirm_wal_streamed(XLogRecPtr lsn) */ const walproposer_api walprop_pg = { .get_shmem_state = walprop_pg_get_shmem_state, - .replication_feedback_set = replication_feedback_set, .start_streaming = walprop_pg_start_streaming, .init_walsender = walprop_pg_init_walsender, .init_standalone_sync_safekeepers = walprop_pg_init_standalone_sync_safekeepers,