From c982540705f20e4469b6f7e4f6cfe42bbed340ed Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 20 Nov 2023 17:09:39 +0100 Subject: [PATCH] Extract libpqwalproposer again. It will be used by neon_walreader. --- pgxn/neon/libpqwalproposer.h | 96 ++++++++++++++++++++++ pgxn/neon/walproposer.c | 6 -- pgxn/neon/walproposer.h | 63 +-------------- pgxn/neon/walproposer_pg.c | 149 ++++++++++++++++++++++------------- 4 files changed, 192 insertions(+), 122 deletions(-) create mode 100644 pgxn/neon/libpqwalproposer.h diff --git a/pgxn/neon/libpqwalproposer.h b/pgxn/neon/libpqwalproposer.h new file mode 100644 index 0000000000..cd7e568a47 --- /dev/null +++ b/pgxn/neon/libpqwalproposer.h @@ -0,0 +1,96 @@ +/* + * Interface to set of libpq wrappers walproposer and neon_walreader need. + * Similar to libpqwalreceiver, but it has blocking connection establishment and + * pqexec which don't fit us. Implementation is at walproposer_pg.c. + */ +#ifndef ___LIBPQWALPROPOSER_H__ +#define ___LIBPQWALPROPOSER_H__ + +/* Re-exported and modified ExecStatusType */ +typedef enum +{ + /* We received a single CopyBoth result */ + WP_EXEC_SUCCESS_COPYBOTH, + + /* + * Any success result other than a single CopyBoth was received. The + * specifics of the result were already logged, but it may be useful to + * provide an error message indicating which safekeeper messed up. + * + * Do not expect PQerrorMessage to be appropriately set. + */ + WP_EXEC_UNEXPECTED_SUCCESS, + + /* + * No result available at this time. Wait until read-ready, then call + * again. Internally, this is returned when PQisBusy indicates that + * PQgetResult would block. + */ + WP_EXEC_NEEDS_INPUT, + /* Catch-all failure. Check PQerrorMessage. */ + WP_EXEC_FAILED, +} WalProposerExecStatusType; + +/* Possible return values from walprop_async_read */ +typedef enum +{ + /* The full read was successful. buf now points to the data */ + PG_ASYNC_READ_SUCCESS, + + /* + * The read is ongoing. Wait until the connection is read-ready, then try + * again. + */ + PG_ASYNC_READ_TRY_AGAIN, + /* Reading failed. Check PQerrorMessage(conn) */ + PG_ASYNC_READ_FAIL, +} PGAsyncReadResult; + +/* Possible return values from walprop_async_write */ +typedef enum +{ + /* The write fully completed */ + PG_ASYNC_WRITE_SUCCESS, + + /* + * The write started, but you'll need to call PQflush some more times to + * finish it off. We just tried, so it's best to wait until the connection + * is read- or write-ready to try again. + * + * If it becomes read-ready, call PQconsumeInput and flush again. If it + * becomes write-ready, just call PQflush. + */ + PG_ASYNC_WRITE_TRY_FLUSH, + /* Writing failed. Check PQerrorMessage(conn) */ + PG_ASYNC_WRITE_FAIL, +} PGAsyncWriteResult; + +/* + * This header is included by walproposer.h to define walproposer_api; if we're + * building walproposer without pg, ignore libpq part, leaving only interface + * types. + */ +#ifndef WALPROPOSER_LIB + +#include "libpq-fe.h" + +/* + * Sometimes working directly with underlying PGconn is simpler, export the + * whole thing for simplicity. + */ +typedef struct WalProposerConn +{ + PGconn *pg_conn; + bool is_nonblocking; /* whether the connection is non-blocking */ + char *recvbuf; /* last received CopyData message from + * walprop_async_read */ +} WalProposerConn; + +extern WalProposerConn *libpqwp_connect_start(char *conninfo); +extern bool libpqwp_send_query(WalProposerConn *conn, char *query); +extern WalProposerExecStatusType libpqwp_get_query_result(WalProposerConn *conn); +extern PGAsyncReadResult libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount); +extern void libpqwp_disconnect(WalProposerConn *conn); + +#endif /* WALPROPOSER_LIB */ +#endif /* ___LIBPQWALPROPOSER_H__ */ diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 97767a09ed..7dd652c232 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -316,12 +316,6 @@ ShutdownConnection(Safekeeper *sk) sk->voteResponse.termHistory.entries = NULL; sk->wp->api.rm_safekeeper_event_set(sk); - - if (sk->xlogreader) - { - NeonWALReaderFree(sk->xlogreader); - sk->xlogreader = NULL; - } } /* diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 3ad4c2faef..0c09702589 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -10,6 +10,7 @@ #include "utils/uuid.h" #include "replication/walreceiver.h" +#include "libpqwalproposer.h" #include "neon_walreader.h" #define SK_MAGIC 0xCafeCeefu @@ -24,43 +25,9 @@ */ #define WL_NO_EVENTS 0 -struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */ +struct WalProposerConn; /* Defined in libpqwalproposer.h */ typedef struct WalProposerConn WalProposerConn; -/* Possible return values from ReadPGAsync */ -typedef enum -{ - /* The full read was successful. buf now points to the data */ - PG_ASYNC_READ_SUCCESS, - - /* - * The read is ongoing. Wait until the connection is read-ready, then try - * again. - */ - PG_ASYNC_READ_TRY_AGAIN, - /* Reading failed. Check PQerrorMessage(conn) */ - PG_ASYNC_READ_FAIL, -} PGAsyncReadResult; - -/* Possible return values from WritePGAsync */ -typedef enum -{ - /* The write fully completed */ - PG_ASYNC_WRITE_SUCCESS, - - /* - * The write started, but you'll need to call PQflush some more times to - * finish it off. We just tried, so it's best to wait until the connection - * is read- or write-ready to try again. - * - * If it becomes read-ready, call PQconsumeInput and flush again. If it - * becomes write-ready, just call PQflush. - */ - PG_ASYNC_WRITE_TRY_FLUSH, - /* Writing failed. Check PQerrorMessage(conn) */ - PG_ASYNC_WRITE_FAIL, -} PGAsyncWriteResult; - /* * WAL safekeeper state, which is used to wait for some event. * @@ -405,31 +372,6 @@ typedef enum */ } WalProposerConnectPollStatusType; -/* Re-exported and modified ExecStatusType */ -typedef enum -{ - /* We received a single CopyBoth result */ - WP_EXEC_SUCCESS_COPYBOTH, - - /* - * Any success result other than a single CopyBoth was received. The - * specifics of the result were already logged, but it may be useful to - * provide an error message indicating which safekeeper messed up. - * - * Do not expect PQerrorMessage to be appropriately set. - */ - WP_EXEC_UNEXPECTED_SUCCESS, - - /* - * No result available at this time. Wait until read-ready, then call - * again. Internally, this is returned when PQisBusy indicates that - * PQgetResult would block. - */ - WP_EXEC_NEEDS_INPUT, - /* Catch-all failure. Check PQerrorMessage. */ - WP_EXEC_FAILED, -} WalProposerExecStatusType; - /* Re-exported ConnStatusType */ typedef enum { @@ -712,6 +654,7 @@ extern void WalProposerStart(WalProposer *wp); extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos); extern void WalProposerPoll(WalProposer *wp); extern void WalProposerFree(WalProposer *wp); + /* * WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to * recreate set from scratch, hence the export. diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index c562fa2cfa..df522e3758 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -43,11 +43,13 @@ #include "utils/ps_status.h" #include "utils/timestamp.h" -#include "neon.h" -#include "walproposer.h" -#include "neon_walreader.h" #include "libpq-fe.h" +#include "libpqwalproposer.h" +#include "neon.h" +#include "neon_walreader.h" +#include "walproposer.h" + #define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */ #define XLOG_HDR_START_POS 1 /* offset of start position in wal sender* * message header */ @@ -542,14 +544,6 @@ walprop_pg_load_libpqwalreceiver(void) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); } -/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */ -struct WalProposerConn -{ - PGconn *pg_conn; - bool is_nonblocking; /* whether the connection is non-blocking */ - char *recvbuf; /* last received data from walprop_async_read */ -}; - /* Helper function */ static bool ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking) @@ -587,16 +581,17 @@ walprop_status(Safekeeper *sk) } } -static void -walprop_connect_start(Safekeeper *sk) +WalProposerConn * +libpqwp_connect_start(char *conninfo) { + PGconn *pg_conn; + WalProposerConn *conn; const char *keywords[3]; const char *values[3]; int n; char *password = neon_auth_token; - Assert(sk->conn == NULL); /* * Connect using the given connection string. If the NEON_AUTH_TOKEN @@ -615,7 +610,7 @@ walprop_connect_start(Safekeeper *sk) n++; } keywords[n] = "dbname"; - values[n] = sk->conninfo; + values[n] = conninfo; n++; keywords[n] = NULL; values[n] = NULL; @@ -636,11 +631,20 @@ walprop_connect_start(Safekeeper *sk) * palloc will exit on failure though, so there's not much we could do if * it *did* fail. */ - sk->conn = palloc(sizeof(WalProposerConn)); - sk->conn->pg_conn = pg_conn; - sk->conn->is_nonblocking = false; /* connections always start in - * blocking mode */ - sk->conn->recvbuf = NULL; + conn = palloc(sizeof(WalProposerConn)); + conn->pg_conn = pg_conn; + conn->is_nonblocking = false; /* connections always start in blocking + * mode */ + conn->recvbuf = NULL; + return conn; +} + +static void +walprop_connect_start(Safekeeper *sk) +{ + Assert(sk->conn == NULL); + sk->conn = libpqwp_connect_start(sk->conninfo); + } static WalProposerConnectPollStatusType @@ -684,26 +688,33 @@ walprop_connect_poll(Safekeeper *sk) return return_val; } -static bool -walprop_send_query(Safekeeper *sk, char *query) +extern bool +libpqwp_send_query(WalProposerConn *conn, char *query) { /* * We need to be in blocking mode for sending the query to run without * requiring a call to PQflush */ - if (!ensure_nonblocking_status(sk->conn, false)) + if (!ensure_nonblocking_status(conn, false)) return false; /* PQsendQuery returns 1 on success, 0 on failure */ - if (!PQsendQuery(sk->conn->pg_conn, query)) + if (!PQsendQuery(conn->pg_conn, query)) return false; return true; } -static WalProposerExecStatusType -walprop_get_query_result(Safekeeper *sk) +static bool +walprop_send_query(Safekeeper *sk, char *query) { + return libpqwp_send_query(sk->conn, query); +} + +WalProposerExecStatusType +libpqwp_get_query_result(WalProposerConn *conn) +{ + PGresult *result; WalProposerExecStatusType return_val; @@ -711,14 +722,14 @@ walprop_get_query_result(Safekeeper *sk) char *unexpected_success = NULL; /* Consume any input that we might be missing */ - if (!PQconsumeInput(sk->conn->pg_conn)) + if (!PQconsumeInput(conn->pg_conn)) return WP_EXEC_FAILED; - if (PQisBusy(sk->conn->pg_conn)) + if (PQisBusy(conn->pg_conn)) return WP_EXEC_NEEDS_INPUT; - result = PQgetResult(sk->conn->pg_conn); + result = PQgetResult(conn->pg_conn); /* * PQgetResult returns NULL only if getting the result was successful & @@ -779,6 +790,12 @@ walprop_get_query_result(Safekeeper *sk) return return_val; } +static WalProposerExecStatusType +walprop_get_query_result(Safekeeper *sk) +{ + return libpqwp_get_query_result(sk->conn); +} + static pgsocket walprop_socket(Safekeeper *sk) { @@ -791,38 +808,20 @@ walprop_flush(Safekeeper *sk) return (PQflush(sk->conn->pg_conn)); } -static void -walprop_finish(Safekeeper *sk) +PGAsyncReadResult +libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount) { - if (!sk->conn) - return; - if (sk->conn->recvbuf != NULL) - PQfreemem(sk->conn->recvbuf); - PQfinish(sk->conn->pg_conn); - pfree(sk->conn); - sk->conn = NULL; -} - -/* - * Receive a message from the safekeeper. - * - * On success, the data is placed in *buf. It is valid until the next call - * to this function. - */ -static PGAsyncReadResult -walprop_async_read(Safekeeper *sk, char **buf, int *amount) -{ int result; - if (sk->conn->recvbuf != NULL) + if (conn->recvbuf != NULL) { - PQfreemem(sk->conn->recvbuf); - sk->conn->recvbuf = NULL; + PQfreemem(conn->recvbuf); + conn->recvbuf = NULL; } /* Call PQconsumeInput so that we have the data we need */ - if (!PQconsumeInput(sk->conn->pg_conn)) + if (!PQconsumeInput(conn->pg_conn)) { *amount = 0; *buf = NULL; @@ -840,7 +839,7 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount) * sometimes be triggered by the server returning an ErrorResponse (which * also happens to have the effect that the copy is done). */ - switch (result = PQgetCopyData(sk->conn->pg_conn, &sk->conn->recvbuf, true)) + switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true)) { case 0: *amount = 0; @@ -855,7 +854,7 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount) * We can check PQgetResult to make sure that the server * failed; it'll always result in PGRES_FATAL_ERROR */ - ExecStatusType status = PQresultStatus(PQgetResult(sk->conn->pg_conn)); + ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn)); if (status != PGRES_FATAL_ERROR) elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status); @@ -876,11 +875,23 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount) default: /* Positive values indicate the size of the returned result */ *amount = result; - *buf = sk->conn->recvbuf; + *buf = conn->recvbuf; return PG_ASYNC_READ_SUCCESS; } } +/* + * Receive a message from the safekeeper. + * + * On success, the data is placed in *buf. It is valid until the next call + * to this function. + */ +static PGAsyncReadResult +walprop_async_read(Safekeeper *sk, char **buf, int *amount) +{ + return libpqwp_async_read(sk->conn, buf, amount); +} + static PGAsyncWriteResult walprop_async_write(Safekeeper *sk, void const *buf, size_t size) { @@ -963,6 +974,32 @@ walprop_blocking_write(Safekeeper *sk, void const *buf, size_t size) return true; } +void +libpqwp_disconnect(WalProposerConn *conn) +{ + if (conn->recvbuf != NULL) + PQfreemem(conn->recvbuf); + PQfinish(conn->pg_conn); + pfree(conn); +} + +static void +walprop_finish(Safekeeper *sk) +{ + if (sk->conn) + { + libpqwp_disconnect(sk->conn); + sk->conn = NULL; + } + + /* free xlogreader */ + if (sk->xlogreader) + { + NeonWALReaderFree(sk->xlogreader); + sk->xlogreader = NULL; + } +} + /* * Subscribe for new WAL and stream it in the loop to safekeepers. *