Remove unnecessary indirections of libpqwalproposer functions

In the Postgres backend, we cannot link directly with libpq (check the
pgsql-hackers arhive for all kinds of fun that ensued when we tried to
do that). Therefore, the libpq functions are used through the thin
wrapper functions in libpqwalreceiver.so, and libpqwalreceiver.so is
loaded dynamically. To hide the dynamic loading and make the calls
look like regular functions, we use macros to hide the function
pointers.

We had inherited the same indirections in libpqwalproposer, but it's
not needed since the neon extension is already a shared library that's
loaded dynamically. There's no problem calling the functions directly
there. Remove the indirections.
This commit is contained in:
Heikki Linnakangas
2022-10-18 17:00:06 +03:00
committed by Heikki Linnakangas
parent 0cd2d91b9d
commit 41550ec8bf
5 changed files with 42 additions and 147 deletions

View File

@@ -10,51 +10,12 @@ struct WalProposerConn
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from
* libpqprop_async_read */
* walprop_async_read */
};
/* Prototypes for exported functions */
static char *libpqprop_error_message(WalProposerConn * conn);
static WalProposerConnStatusType libpqprop_status(WalProposerConn * conn);
static WalProposerConn * libpqprop_connect_start(char *conninfo);
static WalProposerConnectPollStatusType libpqprop_connect_poll(WalProposerConn * conn);
static bool libpqprop_send_query(WalProposerConn * conn, char *query);
static WalProposerExecStatusType libpqprop_get_query_result(WalProposerConn * conn);
static pgsocket libpqprop_socket(WalProposerConn * conn);
static int libpqprop_flush(WalProposerConn * conn);
static void libpqprop_finish(WalProposerConn * conn);
static PGAsyncReadResult libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount);
static PGAsyncWriteResult libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size);
static bool libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size);
static WalProposerFunctionsType PQWalProposerFunctions =
{
libpqprop_error_message,
libpqprop_status,
libpqprop_connect_start,
libpqprop_connect_poll,
libpqprop_send_query,
libpqprop_get_query_result,
libpqprop_socket,
libpqprop_flush,
libpqprop_finish,
libpqprop_async_read,
libpqprop_async_write,
libpqprop_blocking_write,
};
/* Module initialization */
void
pg_init_libpqwalproposer(void)
{
if (WalProposerFunctions != NULL)
elog(ERROR, "libpqwalproposer already loaded");
WalProposerFunctions = &PQWalProposerFunctions;
}
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn * conn, bool is_nonblocking)
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
{
/* If we're already correctly blocking or nonblocking, all good */
if (is_nonblocking == conn->is_nonblocking)
@@ -69,14 +30,14 @@ ensure_nonblocking_status(WalProposerConn * conn, bool is_nonblocking)
}
/* Exported function definitions */
static char *
libpqprop_error_message(WalProposerConn * conn)
char *
walprop_error_message(WalProposerConn *conn)
{
return PQerrorMessage(conn->pg_conn);
}
static WalProposerConnStatusType
libpqprop_status(WalProposerConn * conn)
WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
switch (PQstatus(conn->pg_conn))
{
@@ -89,8 +50,8 @@ libpqprop_status(WalProposerConn * conn)
}
}
static WalProposerConn *
libpqprop_connect_start(char *conninfo)
WalProposerConn *
walprop_connect_start(char *conninfo)
{
WalProposerConn *conn;
PGconn *pg_conn;
@@ -119,8 +80,8 @@ libpqprop_connect_start(char *conninfo)
return conn;
}
static WalProposerConnectPollStatusType
libpqprop_connect_poll(WalProposerConn * conn)
WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
WalProposerConnectPollStatusType return_val;
@@ -160,8 +121,8 @@ libpqprop_connect_poll(WalProposerConn * conn)
return return_val;
}
static bool
libpqprop_send_query(WalProposerConn * conn, char *query)
bool
walprop_send_query(WalProposerConn *conn, char *query)
{
/*
* We need to be in blocking mode for sending the query to run without
@@ -177,8 +138,8 @@ libpqprop_send_query(WalProposerConn * conn, char *query)
return true;
}
static WalProposerExecStatusType
libpqprop_get_query_result(WalProposerConn * conn)
WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
PGresult *result;
WalProposerExecStatusType return_val;
@@ -255,20 +216,20 @@ libpqprop_get_query_result(WalProposerConn * conn)
return return_val;
}
static pgsocket
libpqprop_socket(WalProposerConn * conn)
pgsocket
walprop_socket(WalProposerConn *conn)
{
return PQsocket(conn->pg_conn);
}
static int
libpqprop_flush(WalProposerConn * conn)
int
walprop_flush(WalProposerConn *conn)
{
return (PQflush(conn->pg_conn));
}
static void
libpqprop_finish(WalProposerConn * conn)
void
walprop_finish(WalProposerConn *conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
@@ -282,8 +243,8 @@ libpqprop_finish(WalProposerConn * conn)
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
static PGAsyncReadResult
libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount)
PGAsyncReadResult
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
{
int result;
@@ -353,8 +314,8 @@ libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount)
}
}
static PGAsyncWriteResult
libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size)
PGAsyncWriteResult
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
@@ -408,8 +369,12 @@ libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size)
}
}
static bool
libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size)
/*
* This function is very similar to walprop_async_write. For more
* information, refer to the comments there.
*/
bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
@@ -417,10 +382,6 @@ libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size)
if (!ensure_nonblocking_status(conn, false))
return false;
/*
* Ths function is very similar to libpqprop_async_write. For more
* information, refer to the comments there
*/
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
return false;

View File

@@ -32,7 +32,6 @@ void
_PG_init(void)
{
pg_init_libpagestore();
pg_init_libpqwalproposer();
pg_init_walproposer();
EmitWarningsOnPlaceholders("neon");

View File

@@ -13,7 +13,6 @@
#define NEON_H
extern void pg_init_libpagestore(void);
extern void pg_init_libpqwalproposer(void);
extern void pg_init_walproposer(void);
#endif /* NEON_H */

View File

@@ -79,9 +79,6 @@ bool am_wal_proposer;
char *neon_timeline_walproposer = NULL;
char *neon_tenant_walproposer = NULL;
/* Declared in walproposer.h, defined here, initialized in libpqwalproposer.c */
WalProposerFunctionsType *WalProposerFunctions = NULL;
#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot"
static int n_safekeepers = 0;
@@ -438,10 +435,6 @@ WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId)
char *sep;
char *port;
/* Load the libpq-specific functions */
if (WalProposerFunctions == NULL)
elog(ERROR, "libpqwalproposer didn't initialize correctly");
load_file("libpqwalreceiver", false);
if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");

View File

@@ -446,31 +446,31 @@ typedef enum
} WalProposerConnStatusType;
/* Re-exported PQerrorMessage */
typedef char *(*walprop_error_message_fn) (WalProposerConn * conn);
extern char *walprop_error_message(WalProposerConn *conn);
/* Re-exported PQstatus */
typedef WalProposerConnStatusType(*walprop_status_fn) (WalProposerConn * conn);
extern WalProposerConnStatusType walprop_status(WalProposerConn *conn);
/* Re-exported PQconnectStart */
typedef WalProposerConn * (*walprop_connect_start_fn) (char *conninfo);
extern WalProposerConn * walprop_connect_start(char *conninfo);
/* Re-exported PQconectPoll */
typedef WalProposerConnectPollStatusType(*walprop_connect_poll_fn) (WalProposerConn * conn);
extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn);
/* Blocking wrapper around PQsendQuery */
typedef bool (*walprop_send_query_fn) (WalProposerConn * conn, char *query);
extern bool walprop_send_query(WalProposerConn *conn, char *query);
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
typedef WalProposerExecStatusType(*walprop_get_query_result_fn) (WalProposerConn * conn);
extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn);
/* Re-exported PQsocket */
typedef pgsocket (*walprop_socket_fn) (WalProposerConn * conn);
extern pgsocket walprop_socket(WalProposerConn *conn);
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
typedef int (*walprop_flush_fn) (WalProposerConn * conn);
extern int walprop_flush(WalProposerConn *conn);
/* Re-exported PQfinish */
typedef void (*walprop_finish_fn) (WalProposerConn * conn);
extern void walprop_finish(WalProposerConn *conn);
/*
* Ergonomic wrapper around PGgetCopyData
@@ -486,9 +486,7 @@ typedef void (*walprop_finish_fn) (WalProposerConn * conn);
* performs a bit of extra checking work that's always required and is normally
* somewhat verbose.
*/
typedef PGAsyncReadResult(*walprop_async_read_fn) (WalProposerConn * conn,
char **buf,
int *amount);
extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount);
/*
* Ergonomic wrapper around PQputCopyData + PQflush
@@ -497,69 +495,14 @@ typedef PGAsyncReadResult(*walprop_async_read_fn) (WalProposerConn * conn,
*
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
*/
typedef PGAsyncWriteResult(*walprop_async_write_fn) (WalProposerConn * conn,
void const *buf,
size_t size);
extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size);
/*
* Blocking equivalent to walprop_async_write_fn
*
* Returns 'true' if successful, 'false' on failure.
*/
typedef bool (*walprop_blocking_write_fn) (WalProposerConn * conn, void const *buf, size_t size);
/* All libpqwalproposer exported functions collected together. */
typedef struct WalProposerFunctionsType
{
walprop_error_message_fn walprop_error_message;
walprop_status_fn walprop_status;
walprop_connect_start_fn walprop_connect_start;
walprop_connect_poll_fn walprop_connect_poll;
walprop_send_query_fn walprop_send_query;
walprop_get_query_result_fn walprop_get_query_result;
walprop_socket_fn walprop_socket;
walprop_flush_fn walprop_flush;
walprop_finish_fn walprop_finish;
walprop_async_read_fn walprop_async_read;
walprop_async_write_fn walprop_async_write;
walprop_blocking_write_fn walprop_blocking_write;
} WalProposerFunctionsType;
/* Allow the above functions to be "called" with normal syntax */
#define walprop_error_message(conn) \
WalProposerFunctions->walprop_error_message(conn)
#define walprop_status(conn) \
WalProposerFunctions->walprop_status(conn)
#define walprop_connect_start(conninfo) \
WalProposerFunctions->walprop_connect_start(conninfo)
#define walprop_connect_poll(conn) \
WalProposerFunctions->walprop_connect_poll(conn)
#define walprop_send_query(conn, query) \
WalProposerFunctions->walprop_send_query(conn, query)
#define walprop_get_query_result(conn) \
WalProposerFunctions->walprop_get_query_result(conn)
#define walprop_set_nonblocking(conn, arg) \
WalProposerFunctions->walprop_set_nonblocking(conn, arg)
#define walprop_socket(conn) \
WalProposerFunctions->walprop_socket(conn)
#define walprop_flush(conn) \
WalProposerFunctions->walprop_flush(conn)
#define walprop_finish(conn) \
WalProposerFunctions->walprop_finish(conn)
#define walprop_async_read(conn, buf, amount) \
WalProposerFunctions->walprop_async_read(conn, buf, amount)
#define walprop_async_write(conn, buf, size) \
WalProposerFunctions->walprop_async_write(conn, buf, size)
#define walprop_blocking_write(conn, buf, size) \
WalProposerFunctions->walprop_blocking_write(conn, buf, size)
/*
* The runtime location of the libpqwalproposer functions.
*
* This pointer is set by the initializer in libpqwalproposer, so that we
* can use it later.
*/
extern PGDLLIMPORT WalProposerFunctionsType * WalProposerFunctions;
extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size);
extern uint64 BackpressureThrottlingTime(void);