diff --git a/pgxn/neon/libpqwalproposer.c b/pgxn/neon/libpqwalproposer.c index 1f739f3722..6b1e6a8bcc 100644 --- a/pgxn/neon/libpqwalproposer.c +++ b/pgxn/neon/libpqwalproposer.c @@ -10,51 +10,12 @@ struct WalProposerConn PGconn *pg_conn; bool is_nonblocking; /* whether the connection is non-blocking */ char *recvbuf; /* last received data from - * libpqprop_async_read */ + * walprop_async_read */ }; -/* Prototypes for exported functions */ -static char *libpqprop_error_message(WalProposerConn * conn); -static WalProposerConnStatusType libpqprop_status(WalProposerConn * conn); -static WalProposerConn * libpqprop_connect_start(char *conninfo); -static WalProposerConnectPollStatusType libpqprop_connect_poll(WalProposerConn * conn); -static bool libpqprop_send_query(WalProposerConn * conn, char *query); -static WalProposerExecStatusType libpqprop_get_query_result(WalProposerConn * conn); -static pgsocket libpqprop_socket(WalProposerConn * conn); -static int libpqprop_flush(WalProposerConn * conn); -static void libpqprop_finish(WalProposerConn * conn); -static PGAsyncReadResult libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount); -static PGAsyncWriteResult libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size); -static bool libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size); - -static WalProposerFunctionsType PQWalProposerFunctions = -{ - libpqprop_error_message, - libpqprop_status, - libpqprop_connect_start, - libpqprop_connect_poll, - libpqprop_send_query, - libpqprop_get_query_result, - libpqprop_socket, - libpqprop_flush, - libpqprop_finish, - libpqprop_async_read, - libpqprop_async_write, - libpqprop_blocking_write, -}; - -/* Module initialization */ -void -pg_init_libpqwalproposer(void) -{ - if (WalProposerFunctions != NULL) - elog(ERROR, "libpqwalproposer already loaded"); - WalProposerFunctions = &PQWalProposerFunctions; -} - /* Helper function */ static bool -ensure_nonblocking_status(WalProposerConn * conn, bool is_nonblocking) +ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking) { /* If we're already correctly blocking or nonblocking, all good */ if (is_nonblocking == conn->is_nonblocking) @@ -69,14 +30,14 @@ ensure_nonblocking_status(WalProposerConn * conn, bool is_nonblocking) } /* Exported function definitions */ -static char * -libpqprop_error_message(WalProposerConn * conn) +char * +walprop_error_message(WalProposerConn *conn) { return PQerrorMessage(conn->pg_conn); } -static WalProposerConnStatusType -libpqprop_status(WalProposerConn * conn) +WalProposerConnStatusType +walprop_status(WalProposerConn *conn) { switch (PQstatus(conn->pg_conn)) { @@ -89,8 +50,8 @@ libpqprop_status(WalProposerConn * conn) } } -static WalProposerConn * -libpqprop_connect_start(char *conninfo) +WalProposerConn * +walprop_connect_start(char *conninfo) { WalProposerConn *conn; PGconn *pg_conn; @@ -119,8 +80,8 @@ libpqprop_connect_start(char *conninfo) return conn; } -static WalProposerConnectPollStatusType -libpqprop_connect_poll(WalProposerConn * conn) +WalProposerConnectPollStatusType +walprop_connect_poll(WalProposerConn *conn) { WalProposerConnectPollStatusType return_val; @@ -160,8 +121,8 @@ libpqprop_connect_poll(WalProposerConn * conn) return return_val; } -static bool -libpqprop_send_query(WalProposerConn * conn, char *query) +bool +walprop_send_query(WalProposerConn *conn, char *query) { /* * We need to be in blocking mode for sending the query to run without @@ -177,8 +138,8 @@ libpqprop_send_query(WalProposerConn * conn, char *query) return true; } -static WalProposerExecStatusType -libpqprop_get_query_result(WalProposerConn * conn) +WalProposerExecStatusType +walprop_get_query_result(WalProposerConn *conn) { PGresult *result; WalProposerExecStatusType return_val; @@ -255,20 +216,20 @@ libpqprop_get_query_result(WalProposerConn * conn) return return_val; } -static pgsocket -libpqprop_socket(WalProposerConn * conn) +pgsocket +walprop_socket(WalProposerConn *conn) { return PQsocket(conn->pg_conn); } -static int -libpqprop_flush(WalProposerConn * conn) +int +walprop_flush(WalProposerConn *conn) { return (PQflush(conn->pg_conn)); } -static void -libpqprop_finish(WalProposerConn * conn) +void +walprop_finish(WalProposerConn *conn) { if (conn->recvbuf != NULL) PQfreemem(conn->recvbuf); @@ -282,8 +243,8 @@ libpqprop_finish(WalProposerConn * conn) * On success, the data is placed in *buf. It is valid until the next call * to this function. */ -static PGAsyncReadResult -libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount) +PGAsyncReadResult +walprop_async_read(WalProposerConn *conn, char **buf, int *amount) { int result; @@ -353,8 +314,8 @@ libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount) } } -static PGAsyncWriteResult -libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size) +PGAsyncWriteResult +walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) { int result; @@ -408,8 +369,12 @@ libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size) } } -static bool -libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size) +/* + * This function is very similar to walprop_async_write. For more + * information, refer to the comments there. + */ +bool +walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) { int result; @@ -417,10 +382,6 @@ libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size) if (!ensure_nonblocking_status(conn, false)) return false; - /* - * Ths function is very similar to libpqprop_async_write. For more - * information, refer to the comments there - */ if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1) return false; diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 2a2a163ee8..5c98902554 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -32,7 +32,6 @@ void _PG_init(void) { pg_init_libpagestore(); - pg_init_libpqwalproposer(); pg_init_walproposer(); EmitWarningsOnPlaceholders("neon"); diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index dad9c1b508..6b9ba372fb 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -13,7 +13,6 @@ #define NEON_H extern void pg_init_libpagestore(void); -extern void pg_init_libpqwalproposer(void); extern void pg_init_walproposer(void); #endif /* NEON_H */ diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index ff37be2de1..29290fa736 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -79,9 +79,6 @@ bool am_wal_proposer; char *neon_timeline_walproposer = NULL; char *neon_tenant_walproposer = NULL; -/* Declared in walproposer.h, defined here, initialized in libpqwalproposer.c */ -WalProposerFunctionsType *WalProposerFunctions = NULL; - #define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot" static int n_safekeepers = 0; @@ -438,10 +435,6 @@ WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId) char *sep; char *port; - /* Load the libpq-specific functions */ - if (WalProposerFunctions == NULL) - elog(ERROR, "libpqwalproposer didn't initialize correctly"); - load_file("libpqwalreceiver", false); if (WalReceiverFunctions == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 051c7c02a6..e237947441 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -446,31 +446,31 @@ typedef enum } WalProposerConnStatusType; /* Re-exported PQerrorMessage */ -typedef char *(*walprop_error_message_fn) (WalProposerConn * conn); +extern char *walprop_error_message(WalProposerConn *conn); /* Re-exported PQstatus */ -typedef WalProposerConnStatusType(*walprop_status_fn) (WalProposerConn * conn); +extern WalProposerConnStatusType walprop_status(WalProposerConn *conn); /* Re-exported PQconnectStart */ -typedef WalProposerConn * (*walprop_connect_start_fn) (char *conninfo); +extern WalProposerConn * walprop_connect_start(char *conninfo); /* Re-exported PQconectPoll */ -typedef WalProposerConnectPollStatusType(*walprop_connect_poll_fn) (WalProposerConn * conn); +extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn); /* Blocking wrapper around PQsendQuery */ -typedef bool (*walprop_send_query_fn) (WalProposerConn * conn, char *query); +extern bool walprop_send_query(WalProposerConn *conn, char *query); /* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */ -typedef WalProposerExecStatusType(*walprop_get_query_result_fn) (WalProposerConn * conn); +extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn); /* Re-exported PQsocket */ -typedef pgsocket (*walprop_socket_fn) (WalProposerConn * conn); +extern pgsocket walprop_socket(WalProposerConn *conn); /* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */ -typedef int (*walprop_flush_fn) (WalProposerConn * conn); +extern int walprop_flush(WalProposerConn *conn); /* Re-exported PQfinish */ -typedef void (*walprop_finish_fn) (WalProposerConn * conn); +extern void walprop_finish(WalProposerConn *conn); /* * Ergonomic wrapper around PGgetCopyData @@ -486,9 +486,7 @@ typedef void (*walprop_finish_fn) (WalProposerConn * conn); * performs a bit of extra checking work that's always required and is normally * somewhat verbose. */ -typedef PGAsyncReadResult(*walprop_async_read_fn) (WalProposerConn * conn, - char **buf, - int *amount); +extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount); /* * Ergonomic wrapper around PQputCopyData + PQflush @@ -497,69 +495,14 @@ typedef PGAsyncReadResult(*walprop_async_read_fn) (WalProposerConn * conn, * * For information on the meaning of return codes, refer to PGAsyncWriteResult. */ -typedef PGAsyncWriteResult(*walprop_async_write_fn) (WalProposerConn * conn, - void const *buf, - size_t size); +extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size); /* * Blocking equivalent to walprop_async_write_fn * * Returns 'true' if successful, 'false' on failure. */ -typedef bool (*walprop_blocking_write_fn) (WalProposerConn * conn, void const *buf, size_t size); - -/* All libpqwalproposer exported functions collected together. */ -typedef struct WalProposerFunctionsType -{ - walprop_error_message_fn walprop_error_message; - walprop_status_fn walprop_status; - walprop_connect_start_fn walprop_connect_start; - walprop_connect_poll_fn walprop_connect_poll; - walprop_send_query_fn walprop_send_query; - walprop_get_query_result_fn walprop_get_query_result; - walprop_socket_fn walprop_socket; - walprop_flush_fn walprop_flush; - walprop_finish_fn walprop_finish; - walprop_async_read_fn walprop_async_read; - walprop_async_write_fn walprop_async_write; - walprop_blocking_write_fn walprop_blocking_write; -} WalProposerFunctionsType; - -/* Allow the above functions to be "called" with normal syntax */ -#define walprop_error_message(conn) \ - WalProposerFunctions->walprop_error_message(conn) -#define walprop_status(conn) \ - WalProposerFunctions->walprop_status(conn) -#define walprop_connect_start(conninfo) \ - WalProposerFunctions->walprop_connect_start(conninfo) -#define walprop_connect_poll(conn) \ - WalProposerFunctions->walprop_connect_poll(conn) -#define walprop_send_query(conn, query) \ - WalProposerFunctions->walprop_send_query(conn, query) -#define walprop_get_query_result(conn) \ - WalProposerFunctions->walprop_get_query_result(conn) -#define walprop_set_nonblocking(conn, arg) \ - WalProposerFunctions->walprop_set_nonblocking(conn, arg) -#define walprop_socket(conn) \ - WalProposerFunctions->walprop_socket(conn) -#define walprop_flush(conn) \ - WalProposerFunctions->walprop_flush(conn) -#define walprop_finish(conn) \ - WalProposerFunctions->walprop_finish(conn) -#define walprop_async_read(conn, buf, amount) \ - WalProposerFunctions->walprop_async_read(conn, buf, amount) -#define walprop_async_write(conn, buf, size) \ - WalProposerFunctions->walprop_async_write(conn, buf, size) -#define walprop_blocking_write(conn, buf, size) \ - WalProposerFunctions->walprop_blocking_write(conn, buf, size) - -/* - * The runtime location of the libpqwalproposer functions. - * - * This pointer is set by the initializer in libpqwalproposer, so that we - * can use it later. - */ -extern PGDLLIMPORT WalProposerFunctionsType * WalProposerFunctions; +extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size); extern uint64 BackpressureThrottlingTime(void);