From 0e1ff5db4c9be717bc726f9af070f4298e89a0ec Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Mon, 25 Sep 2023 11:26:25 +0000 Subject: [PATCH] Move libpqwalproposer to walproposer_pg --- pgxn/neon/Makefile | 1 - pgxn/neon/libpqwalproposer.c | 424 -------------------------------- pgxn/neon/walproposer.c | 84 +++---- pgxn/neon/walproposer.h | 108 +++----- pgxn/neon/walproposer_pg.c | 462 ++++++++++++++++++++++++++++++++++- 5 files changed, 529 insertions(+), 550 deletions(-) delete mode 100644 pgxn/neon/libpqwalproposer.c diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 126d12ca77..cf6d37c27c 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -7,7 +7,6 @@ OBJS = \ extension_server.o \ file_cache.o \ libpagestore.o \ - libpqwalproposer.o \ neon.o \ pagestore_smgr.o \ relsize_cache.o \ diff --git a/pgxn/neon/libpqwalproposer.c b/pgxn/neon/libpqwalproposer.c deleted file mode 100644 index ce9a1475d3..0000000000 --- a/pgxn/neon/libpqwalproposer.c +++ /dev/null @@ -1,424 +0,0 @@ -#include "postgres.h" - -#include "libpq-fe.h" -#include "neon.h" -#include "walproposer.h" - -/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */ -struct WalProposerConn -{ - PGconn *pg_conn; - bool is_nonblocking; /* whether the connection is non-blocking */ - char *recvbuf; /* last received data from - * walprop_async_read */ -}; - -/* Helper function */ -static bool -ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking) -{ - /* If we're already correctly blocking or nonblocking, all good */ - if (is_nonblocking == conn->is_nonblocking) - return true; - - /* Otherwise, set it appropriately */ - if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1) - return false; - - conn->is_nonblocking = is_nonblocking; - return true; -} - -/* Exported function definitions */ -char * -walprop_error_message(WalProposerConn *conn) -{ - return PQerrorMessage(conn->pg_conn); -} - -WalProposerConnStatusType -walprop_status(WalProposerConn *conn) -{ - switch (PQstatus(conn->pg_conn)) - { - case CONNECTION_OK: - return WP_CONNECTION_OK; - case CONNECTION_BAD: - return WP_CONNECTION_BAD; - default: - return WP_CONNECTION_IN_PROGRESS; - } -} - -WalProposerConn * -walprop_connect_start(char *conninfo, char *password) -{ - WalProposerConn *conn; - PGconn *pg_conn; - const char *keywords[3]; - const char *values[3]; - int n; - - /* - * Connect using the given connection string. If the - * NEON_AUTH_TOKEN environment variable was set, use that as - * the password. - * - * The connection options are parsed in the order they're given, so - * when we set the password before the connection string, the - * connection string can override the password from the env variable. - * Seems useful, although we don't currently use that capability - * anywhere. - */ - n = 0; - if (password) - { - keywords[n] = "password"; - values[n] = password; - n++; - } - keywords[n] = "dbname"; - values[n] = conninfo; - n++; - keywords[n] = NULL; - values[n] = NULL; - n++; - pg_conn = PQconnectStartParams(keywords, values, 1); - - /* - * Allocation of a PQconn can fail, and will return NULL. We want to fully - * replicate the behavior of PQconnectStart here. - */ - if (!pg_conn) - return NULL; - - /* - * And in theory this allocation can fail as well, but it's incredibly - * unlikely if we just successfully allocated a PGconn. - * - * palloc will exit on failure though, so there's not much we could do if - * it *did* fail. - */ - conn = palloc(sizeof(WalProposerConn)); - conn->pg_conn = pg_conn; - conn->is_nonblocking = false; /* connections always start in blocking - * mode */ - conn->recvbuf = NULL; - return conn; -} - -WalProposerConnectPollStatusType -walprop_connect_poll(WalProposerConn *conn) -{ - WalProposerConnectPollStatusType return_val; - - switch (PQconnectPoll(conn->pg_conn)) - { - case PGRES_POLLING_FAILED: - return_val = WP_CONN_POLLING_FAILED; - break; - case PGRES_POLLING_READING: - return_val = WP_CONN_POLLING_READING; - break; - case PGRES_POLLING_WRITING: - return_val = WP_CONN_POLLING_WRITING; - break; - case PGRES_POLLING_OK: - return_val = WP_CONN_POLLING_OK; - break; - - /* - * There's a comment at its source about this constant being - * unused. We'll expect it's never returned. - */ - case PGRES_POLLING_ACTIVE: - elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll"); - - /* - * This return is never actually reached, but it's here to make - * the compiler happy - */ - return WP_CONN_POLLING_FAILED; - - default: - Assert(false); - return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */ - } - - return return_val; -} - -bool -walprop_send_query(WalProposerConn *conn, char *query) -{ - /* - * We need to be in blocking mode for sending the query to run without - * requiring a call to PQflush - */ - if (!ensure_nonblocking_status(conn, false)) - return false; - - /* PQsendQuery returns 1 on success, 0 on failure */ - if (!PQsendQuery(conn->pg_conn, query)) - return false; - - return true; -} - -WalProposerExecStatusType -walprop_get_query_result(WalProposerConn *conn) -{ - PGresult *result; - WalProposerExecStatusType return_val; - - /* Marker variable if we need to log an unexpected success result */ - char *unexpected_success = NULL; - - /* Consume any input that we might be missing */ - if (!PQconsumeInput(conn->pg_conn)) - return WP_EXEC_FAILED; - - if (PQisBusy(conn->pg_conn)) - return WP_EXEC_NEEDS_INPUT; - - - result = PQgetResult(conn->pg_conn); - - /* - * PQgetResult returns NULL only if getting the result was successful & - * there's no more of the result to get. - */ - if (!result) - { - elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results"); - return WP_EXEC_UNEXPECTED_SUCCESS; - } - - /* Helper macro to reduce boilerplate */ -#define UNEXPECTED_SUCCESS(msg) \ - return_val = WP_EXEC_UNEXPECTED_SUCCESS; \ - unexpected_success = msg; \ - break; - - - switch (PQresultStatus(result)) - { - /* "true" success case */ - case PGRES_COPY_BOTH: - return_val = WP_EXEC_SUCCESS_COPYBOTH; - break; - - /* Unexpected success case */ - case PGRES_EMPTY_QUERY: - UNEXPECTED_SUCCESS("empty query return"); - case PGRES_COMMAND_OK: - UNEXPECTED_SUCCESS("data-less command end"); - case PGRES_TUPLES_OK: - UNEXPECTED_SUCCESS("tuples return"); - case PGRES_COPY_OUT: - UNEXPECTED_SUCCESS("'Copy Out' response"); - case PGRES_COPY_IN: - UNEXPECTED_SUCCESS("'Copy In' response"); - case PGRES_SINGLE_TUPLE: - UNEXPECTED_SUCCESS("single tuple return"); - case PGRES_PIPELINE_SYNC: - UNEXPECTED_SUCCESS("pipeline sync point"); - - /* Failure cases */ - case PGRES_BAD_RESPONSE: - case PGRES_NONFATAL_ERROR: - case PGRES_FATAL_ERROR: - case PGRES_PIPELINE_ABORTED: - return_val = WP_EXEC_FAILED; - break; - - default: - Assert(false); - return_val = WP_EXEC_FAILED; /* keep the compiler quiet */ - } - - if (unexpected_success) - elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success); - - return return_val; -} - -pgsocket -walprop_socket(WalProposerConn *conn) -{ - return PQsocket(conn->pg_conn); -} - -int -walprop_flush(WalProposerConn *conn) -{ - return (PQflush(conn->pg_conn)); -} - -void -walprop_finish(WalProposerConn *conn) -{ - if (conn->recvbuf != NULL) - PQfreemem(conn->recvbuf); - PQfinish(conn->pg_conn); - pfree(conn); -} - -/* - * Receive a message from the safekeeper. - * - * On success, the data is placed in *buf. It is valid until the next call - * to this function. - */ -PGAsyncReadResult -walprop_async_read(WalProposerConn *conn, char **buf, int *amount) -{ - int result; - - if (conn->recvbuf != NULL) - { - PQfreemem(conn->recvbuf); - conn->recvbuf = NULL; - } - - /* Call PQconsumeInput so that we have the data we need */ - if (!PQconsumeInput(conn->pg_conn)) - { - *amount = 0; - *buf = NULL; - return PG_ASYNC_READ_FAIL; - } - - /* - * The docs for PQgetCopyData list the return values as: 0 if the copy is - * still in progress, but no "complete row" is available -1 if the copy is - * done -2 if an error occurred (> 0) if it was successful; that value is - * the amount transferred. - * - * The protocol we use between walproposer and safekeeper means that we - * *usually* wouldn't expect to see that the copy is done, but this can - * sometimes be triggered by the server returning an ErrorResponse (which - * also happens to have the effect that the copy is done). - */ - switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true)) - { - case 0: - *amount = 0; - *buf = NULL; - return PG_ASYNC_READ_TRY_AGAIN; - case -1: - { - /* - * If we get -1, it's probably because of a server error; the - * safekeeper won't normally send a CopyDone message. - * - * We can check PQgetResult to make sure that the server - * failed; it'll always result in PGRES_FATAL_ERROR - */ - ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn)); - - if (status != PGRES_FATAL_ERROR) - elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status); - - /* - * If there was actually an error, it'll be properly reported - * by calls to PQerrorMessage -- we don't have to do anything - * else - */ - *amount = 0; - *buf = NULL; - return PG_ASYNC_READ_FAIL; - } - case -2: - *amount = 0; - *buf = NULL; - return PG_ASYNC_READ_FAIL; - default: - /* Positive values indicate the size of the returned result */ - *amount = result; - *buf = conn->recvbuf; - return PG_ASYNC_READ_SUCCESS; - } -} - -PGAsyncWriteResult -walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) -{ - int result; - - /* If we aren't in non-blocking mode, switch to it. */ - if (!ensure_nonblocking_status(conn, true)) - return PG_ASYNC_WRITE_FAIL; - - /* - * The docs for PQputcopyData list the return values as: 1 if the data was - * queued, 0 if it was not queued because of full buffers, or -1 if an - * error occurred - */ - result = PQputCopyData(conn->pg_conn, buf, size); - - /* - * We won't get a result of zero because walproposer always empties the - * connection's buffers before sending more - */ - Assert(result != 0); - - switch (result) - { - case 1: - /* good -- continue */ - break; - case -1: - return PG_ASYNC_WRITE_FAIL; - default: - elog(FATAL, "invalid return %d from PQputCopyData", result); - } - - /* - * After queueing the data, we still need to flush to get it to send. This - * might take multiple tries, but we don't want to wait around until it's - * done. - * - * PQflush has the following returns (directly quoting the docs): 0 if - * sucessful, 1 if it was unable to send all the data in the send queue - * yet -1 if it failed for some reason - */ - switch (result = PQflush(conn->pg_conn)) - { - case 0: - return PG_ASYNC_WRITE_SUCCESS; - case 1: - return PG_ASYNC_WRITE_TRY_FLUSH; - case -1: - return PG_ASYNC_WRITE_FAIL; - default: - elog(FATAL, "invalid return %d from PQflush", result); - } -} - -/* - * This function is very similar to walprop_async_write. For more - * information, refer to the comments there. - */ -bool -walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) -{ - int result; - - /* If we are in non-blocking mode, switch out of it. */ - if (!ensure_nonblocking_status(conn, false)) - return false; - - if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1) - return false; - - Assert(result == 1); - - /* Because the connection is non-blocking, flushing returns 0 or -1 */ - - if ((result = PQflush(conn->pg_conn)) == -1) - return false; - - Assert(result == 0); - return true; -} diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index a4a8d0c8c4..3cfc9ecc67 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -174,7 +174,7 @@ WalProposerMain(Datum main_arg) WalProposerInit(walprop_pg.get_flush_rec_ptr(), systemId); - last_reconnect_attempt = GetCurrentTimestamp(); + last_reconnect_attempt = walprop_pg.get_current_timestamp(); walprop_pg.init_walsender(); @@ -208,7 +208,7 @@ WalProposerPoll(void) bool late_cv_trigger = false; WaitEvent event = {0}; int rc = 0; - TimestampTz now = GetCurrentTimestamp(); + TimestampTz now = walprop_pg.get_current_timestamp(); long timeout = TimeToReconnect(now); #if PG_MAJORVERSION_NUM >= 16 @@ -276,7 +276,7 @@ WalProposerPoll(void) } } - now = GetCurrentTimestamp(); + now = walprop_pg.get_current_timestamp(); if (rc == 0 || TimeToReconnect(now) <= 0) /* timeout expired: poll state */ { TimestampTz now; @@ -293,7 +293,7 @@ WalProposerPoll(void) /* * Abandon connection attempts which take too long. */ - now = GetCurrentTimestamp(); + now = walprop_pg.get_current_timestamp(); for (int i = 0; i < n_safekeepers; i++) { Safekeeper *sk = &safekeeper[i]; @@ -317,9 +317,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) char *sep; char *port; - load_file("libpqwalreceiver", false); - if (WalReceiverFunctions == NULL) - elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + walprop_pg.load_libpqwalreceiver(); for (host = wal_acceptors_list; host != NULL && *host != '\0'; host = sep) { @@ -384,12 +382,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) !HexDecodeString(greetRequest.tenant_id, neon_tenant, 16)) elog(FATAL, "Could not parse neon.tenant_id, %s", neon_tenant); -#if PG_VERSION_NUM >= 150000 - /* FIXME don't use hardcoded timeline id */ - greetRequest.timeline = 1; -#else - greetRequest.timeline = ThisTimeLineID; -#endif + greetRequest.timeline = walprop_pg.get_timeline_id(); greetRequest.walSegSize = wal_segment_size; InitEventSet(); @@ -482,7 +475,7 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove) if (sk->conn != NULL) { desired_events = SafekeeperStateDesiredEvents(sk->state); - sk->eventPos = AddWaitEventToSet(waitEvents, desired_events, walprop_socket(sk->conn), NULL, sk); + sk->eventPos = AddWaitEventToSet(waitEvents, desired_events, walprop_pg.conn_socket(sk->conn), NULL, sk); } } } @@ -492,7 +485,7 @@ static void ShutdownConnection(Safekeeper *sk) { if (sk->conn) - walprop_finish(sk->conn); + walprop_pg.conn_finish(sk->conn); sk->conn = NULL; sk->state = SS_OFFLINE; sk->flushWrite = false; @@ -524,7 +517,7 @@ ResetConnection(Safekeeper *sk) /* * Try to establish new connection */ - sk->conn = walprop_connect_start((char *) &sk->conninfo, neon_auth_token); + sk->conn = walprop_pg.conn_connect_start((char *) &sk->conninfo, neon_auth_token); /* * "If the result is null, then libpq has been unable to allocate a new @@ -538,7 +531,7 @@ ResetConnection(Safekeeper *sk) * PQconnectPoll. Before we do that though, we need to check that it * didn't immediately fail. */ - if (walprop_status(sk->conn) == WP_CONNECTION_BAD) + if (walprop_pg.conn_status(sk->conn) == WP_CONNECTION_BAD) { /*--- * According to libpq docs: @@ -550,13 +543,13 @@ ResetConnection(Safekeeper *sk) * https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS */ elog(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s", - sk->host, sk->port, walprop_error_message(sk->conn)); + sk->host, sk->port, walprop_pg.conn_error_message(sk->conn)); /* * Even though the connection failed, we still need to clean up the * object */ - walprop_finish(sk->conn); + walprop_pg.conn_finish(sk->conn); sk->conn = NULL; return; } @@ -577,9 +570,9 @@ ResetConnection(Safekeeper *sk) elog(LOG, "connecting with node %s:%s", sk->host, sk->port); sk->state = SS_CONNECTING_WRITE; - sk->latestMsgReceivedAt = GetCurrentTimestamp(); + sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp(); - sock = walprop_socket(sk->conn); + sock = walprop_pg.conn_socket(sk->conn); sk->eventPos = AddWaitEventToSet(waitEvents, WL_SOCKET_WRITEABLE, sock, NULL, sk); return; } @@ -609,7 +602,7 @@ TimeToReconnect(TimestampTz now) static void ReconnectSafekeepers(void) { - TimestampTz now = GetCurrentTimestamp(); + TimestampTz now = walprop_pg.get_current_timestamp(); if (TimeToReconnect(now) == 0) { @@ -725,7 +718,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) static void HandleConnectionEvent(Safekeeper *sk) { - WalProposerConnectPollStatusType result = walprop_connect_poll(sk->conn); + WalProposerConnectPollStatusType result = walprop_pg.conn_connect_poll(sk->conn); /* The new set of events we'll wait on, after updating */ uint32 new_events = WL_NO_EVENTS; @@ -735,7 +728,7 @@ HandleConnectionEvent(Safekeeper *sk) case WP_CONN_POLLING_OK: elog(LOG, "connected with node %s:%s", sk->host, sk->port); - sk->latestMsgReceivedAt = GetCurrentTimestamp(); + sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp(); /* * We have to pick some event to update event set. We'll * eventually need the socket to be readable, so we go with that. @@ -757,7 +750,7 @@ HandleConnectionEvent(Safekeeper *sk) case WP_CONN_POLLING_FAILED: elog(WARNING, "failed to connect to node '%s:%s': %s", - sk->host, sk->port, walprop_error_message(sk->conn)); + sk->host, sk->port, walprop_pg.conn_error_message(sk->conn)); /* * If connecting failed, we don't want to restart the connection @@ -774,7 +767,7 @@ HandleConnectionEvent(Safekeeper *sk) * old event and re-register an event on the new socket. */ HackyRemoveWalProposerEvent(sk); - sk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_socket(sk->conn), NULL, sk); + sk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_pg.conn_socket(sk->conn), NULL, sk); /* If we successfully connected, send START_WAL_PUSH query */ if (result == WP_CONN_POLLING_OK) @@ -789,10 +782,10 @@ HandleConnectionEvent(Safekeeper *sk) static void SendStartWALPush(Safekeeper *sk) { - if (!walprop_send_query(sk->conn, "START_WAL_PUSH")) + if (!walprop_pg.conn_send_query(sk->conn, "START_WAL_PUSH")) { elog(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", - sk->host, sk->port, walprop_error_message(sk->conn)); + sk->host, sk->port, walprop_pg.conn_error_message(sk->conn)); ShutdownConnection(sk); return; } @@ -803,7 +796,7 @@ SendStartWALPush(Safekeeper *sk) static void RecvStartWALPushResult(Safekeeper *sk) { - switch (walprop_get_query_result(sk->conn)) + switch (walprop_pg.conn_get_query_result(sk->conn)) { /* * Successful result, move on to starting the handshake @@ -827,7 +820,7 @@ RecvStartWALPushResult(Safekeeper *sk) case WP_EXEC_FAILED: elog(WARNING, "Failed to send query to safekeeper %s:%s: %s", - sk->host, sk->port, walprop_error_message(sk->conn)); + sk->host, sk->port, walprop_pg.conn_error_message(sk->conn)); ShutdownConnection(sk); return; @@ -1611,19 +1604,14 @@ SendAppendRequests(Safekeeper *sk) &sk->outbuf.data[sk->outbuf.len], req->beginLsn, req->endLsn - req->beginLsn, -#if PG_VERSION_NUM >= 150000 - /* FIXME don't use hardcoded timeline_id here */ - 1, -#else - ThisTimeLineID, -#endif + walprop_pg.get_timeline_id(), &errinfo)) { WALReadRaiseError(&errinfo); } sk->outbuf.len += req->endLsn - req->beginLsn; - writeResult = walprop_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len); + writeResult = walprop_pg.conn_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len); /* Mark current message as sent, whatever the result is */ sk->streamingAt = endLsn; @@ -1647,7 +1635,7 @@ SendAppendRequests(Safekeeper *sk) case PG_ASYNC_WRITE_FAIL: elog(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_error_message(sk->conn)); + walprop_pg.conn_error_message(sk->conn)); ShutdownConnection(sk); return false; default: @@ -1951,7 +1939,7 @@ HandleSafekeeperResponse(void) * pageserver. */ quorumFeedback.rf.disk_consistent_lsn, - GetCurrentTimestamp(), false); + walprop_pg.get_current_timestamp(), false); } CombineHotStanbyFeedbacks(&hsFeedback); @@ -2050,7 +2038,7 @@ HandleSafekeeperResponse(void) static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size) { - switch (walprop_async_read(sk->conn, buf, buf_size)) + switch (walprop_pg.conn_async_read(sk->conn, buf, buf_size)) { case PG_ASYNC_READ_SUCCESS: return true; @@ -2062,7 +2050,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) case PG_ASYNC_READ_FAIL: elog(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_error_message(sk->conn)); + walprop_pg.conn_error_message(sk->conn)); ShutdownConnection(sk); return false; } @@ -2103,7 +2091,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg) ResetConnection(sk); return false; } - sk->latestMsgReceivedAt = GetCurrentTimestamp(); + sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp(); switch (tag) { case 'g': @@ -2171,11 +2159,11 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes { uint32 events; - if (!walprop_blocking_write(sk->conn, msg, msg_size)) + if (!walprop_pg.conn_blocking_write(sk->conn, msg, msg_size)) { elog(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_error_message(sk->conn)); + walprop_pg.conn_error_message(sk->conn)); ShutdownConnection(sk); return false; } @@ -2203,7 +2191,7 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state) { - switch (walprop_async_write(sk->conn, msg, msg_size)) + switch (walprop_pg.conn_async_write(sk->conn, msg, msg_size)) { case PG_ASYNC_WRITE_SUCCESS: return true; @@ -2220,7 +2208,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta case PG_ASYNC_WRITE_FAIL: elog(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_error_message(sk->conn)); + walprop_pg.conn_error_message(sk->conn)); ShutdownConnection(sk); return false; default: @@ -2246,7 +2234,7 @@ AsyncFlush(Safekeeper *sk) * 1 if unable to send everything yet [call PQflush again] * -1 if it failed [emit an error] */ - switch (walprop_flush(sk->conn)) + switch (walprop_pg.conn_flush(sk->conn)) { case 0: /* flush is done */ @@ -2257,7 +2245,7 @@ AsyncFlush(Safekeeper *sk) case -1: elog(WARNING, "Failed to flush write to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - walprop_error_message(sk->conn)); + walprop_pg.conn_error_message(sk->conn)); ResetConnection(sk); return false; default: diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 93935f7452..430bde66d1 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -298,23 +298,6 @@ typedef struct WalproposerShmemState pg_atomic_uint64 backpressureThrottlingTime; } WalproposerShmemState; -/* - * Collection of hooks for walproposer, to call postgres functions, - * read WAL and send it over the network. - */ -typedef struct walproposer_api -{ - WalproposerShmemState * (*get_shmem_state) (void); - void (*replication_feedback_set) (PageserverFeedback * rf); - void (*start_streaming) (XLogRecPtr startpos, TimeLineID timeline); - void (*init_walsender) (void); - void (*init_standalone_sync_safekeepers) (void); - uint64 (*init_bgworker) (void); - XLogRecPtr (*get_flush_rec_ptr) (void); -} walproposer_api; - -extern const walproposer_api walprop_pg; - /* * Report safekeeper state to proposer */ @@ -461,65 +444,38 @@ typedef enum WP_CONNECTION_IN_PROGRESS, } WalProposerConnStatusType; -/* Re-exported PQerrorMessage */ -extern char *walprop_error_message(WalProposerConn *conn); - -/* Re-exported PQstatus */ -extern WalProposerConnStatusType walprop_status(WalProposerConn *conn); - -/* Re-exported PQconnectStart */ -extern WalProposerConn * walprop_connect_start(char *conninfo, char *password); - -/* Re-exported PQconectPoll */ -extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn); - -/* Blocking wrapper around PQsendQuery */ -extern bool walprop_send_query(WalProposerConn *conn, char *query); - -/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */ -extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn); - -/* Re-exported PQsocket */ -extern pgsocket walprop_socket(WalProposerConn *conn); - -/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */ -extern int walprop_flush(WalProposerConn *conn); - -/* Re-exported PQfinish */ -extern void walprop_finish(WalProposerConn *conn); - -/* - * Ergonomic wrapper around PGgetCopyData - * - * Reads a CopyData block from a safekeeper, setting *amount to the number - * of bytes returned. - * - * This function is allowed to assume certain properties specific to the - * protocol with the safekeepers, so it should not be used as-is for any - * other purpose. - * - * Note: If possible, using is generally preferred, because it - * performs a bit of extra checking work that's always required and is normally - * somewhat verbose. - */ -extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount); - -/* - * Ergonomic wrapper around PQputCopyData + PQflush - * - * Starts to write a CopyData block to a safekeeper. - * - * For information on the meaning of return codes, refer to PGAsyncWriteResult. - */ -extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size); - -/* - * Blocking equivalent to walprop_async_write_fn - * - * Returns 'true' if successful, 'false' on failure. - */ -extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size); - extern uint64 BackpressureThrottlingTime(void); +/* + * Collection of hooks for walproposer, to call postgres functions, + * read WAL and send it over the network. + */ +typedef struct walproposer_api +{ + WalproposerShmemState * (*get_shmem_state) (void); + void (*replication_feedback_set) (PageserverFeedback * rf); + void (*start_streaming) (XLogRecPtr startpos, TimeLineID timeline); + void (*init_walsender) (void); + void (*init_standalone_sync_safekeepers) (void); + uint64 (*init_bgworker) (void); + XLogRecPtr (*get_flush_rec_ptr) (void); + TimestampTz (*get_current_timestamp) (void); + TimeLineID (*get_timeline_id) (void); + void (*load_libpqwalreceiver) (void); + char * (*conn_error_message) (WalProposerConn * conn); + WalProposerConnStatusType (*conn_status) (WalProposerConn * conn); + WalProposerConn * (*conn_connect_start) (char *conninfo, char *password); + WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn * conn); + bool (*conn_send_query) (WalProposerConn * conn, char * query); + WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn * conn); + pgsocket (*conn_socket) (WalProposerConn * conn); + int (*conn_flush) (WalProposerConn * conn); + void (*conn_finish) (WalProposerConn * conn); + PGAsyncReadResult (*conn_async_read) (WalProposerConn * conn, char **buf, int *amount); + PGAsyncWriteResult (*conn_async_write) (WalProposerConn * conn, void const *buf, size_t size); + bool (*conn_blocking_write) (WalProposerConn * conn, void const *buf, size_t size); +} walproposer_api; + +extern const walproposer_api walprop_pg; + #endif /* __NEON_WALPROPOSER_H__ */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 5eeb8c2d53..9b22e31a71 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -40,6 +40,7 @@ #include "neon.h" #include "walproposer.h" #include "walproposer_utils.h" +#include "libpq-fe.h" char *wal_acceptors_list = ""; int wal_acceptor_reconnect_timeout = 1000; @@ -437,6 +438,450 @@ walprop_pg_get_flush_rec_ptr(void) #endif } +static TimestampTz +walprop_pg_get_current_timestamp(void) +{ + return GetCurrentTimestamp(); +} + +static TimeLineID +walprop_pg_get_timeline_id(void) +{ +#if PG_VERSION_NUM >= 150000 + /* FIXME don't use hardcoded timeline id */ + return 1; +#else + return ThisTimeLineID; +#endif +} + +static void +walprop_pg_load_libpqwalreceiver(void) +{ + load_file("libpqwalreceiver", false); + if (WalReceiverFunctions == NULL) + elog(ERROR, "libpqwalreceiver didn't initialize correctly"); +} + +/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */ +struct WalProposerConn +{ + PGconn *pg_conn; + bool is_nonblocking; /* whether the connection is non-blocking */ + char *recvbuf; /* last received data from + * walprop_async_read */ +}; + +/* Helper function */ +static bool +ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking) +{ + /* If we're already correctly blocking or nonblocking, all good */ + if (is_nonblocking == conn->is_nonblocking) + return true; + + /* Otherwise, set it appropriately */ + if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1) + return false; + + conn->is_nonblocking = is_nonblocking; + return true; +} + +/* Exported function definitions */ +static char * +walprop_error_message(WalProposerConn *conn) +{ + return PQerrorMessage(conn->pg_conn); +} + +static WalProposerConnStatusType +walprop_status(WalProposerConn *conn) +{ + switch (PQstatus(conn->pg_conn)) + { + case CONNECTION_OK: + return WP_CONNECTION_OK; + case CONNECTION_BAD: + return WP_CONNECTION_BAD; + default: + return WP_CONNECTION_IN_PROGRESS; + } +} + +static WalProposerConn * +walprop_connect_start(char *conninfo, char *password) +{ + WalProposerConn *conn; + PGconn *pg_conn; + const char *keywords[3]; + const char *values[3]; + int n; + + /* + * Connect using the given connection string. If the + * NEON_AUTH_TOKEN environment variable was set, use that as + * the password. + * + * The connection options are parsed in the order they're given, so + * when we set the password before the connection string, the + * connection string can override the password from the env variable. + * Seems useful, although we don't currently use that capability + * anywhere. + */ + n = 0; + if (password) + { + keywords[n] = "password"; + values[n] = password; + n++; + } + keywords[n] = "dbname"; + values[n] = conninfo; + n++; + keywords[n] = NULL; + values[n] = NULL; + n++; + pg_conn = PQconnectStartParams(keywords, values, 1); + + /* + * Allocation of a PQconn can fail, and will return NULL. We want to fully + * replicate the behavior of PQconnectStart here. + */ + if (!pg_conn) + return NULL; + + /* + * And in theory this allocation can fail as well, but it's incredibly + * unlikely if we just successfully allocated a PGconn. + * + * palloc will exit on failure though, so there's not much we could do if + * it *did* fail. + */ + conn = palloc(sizeof(WalProposerConn)); + conn->pg_conn = pg_conn; + conn->is_nonblocking = false; /* connections always start in blocking + * mode */ + conn->recvbuf = NULL; + return conn; +} + +static WalProposerConnectPollStatusType +walprop_connect_poll(WalProposerConn *conn) +{ + WalProposerConnectPollStatusType return_val; + + switch (PQconnectPoll(conn->pg_conn)) + { + case PGRES_POLLING_FAILED: + return_val = WP_CONN_POLLING_FAILED; + break; + case PGRES_POLLING_READING: + return_val = WP_CONN_POLLING_READING; + break; + case PGRES_POLLING_WRITING: + return_val = WP_CONN_POLLING_WRITING; + break; + case PGRES_POLLING_OK: + return_val = WP_CONN_POLLING_OK; + break; + + /* + * There's a comment at its source about this constant being + * unused. We'll expect it's never returned. + */ + case PGRES_POLLING_ACTIVE: + elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll"); + + /* + * This return is never actually reached, but it's here to make + * the compiler happy + */ + return WP_CONN_POLLING_FAILED; + + default: + Assert(false); + return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */ + } + + return return_val; +} + +static bool +walprop_send_query(WalProposerConn *conn, char *query) +{ + /* + * We need to be in blocking mode for sending the query to run without + * requiring a call to PQflush + */ + if (!ensure_nonblocking_status(conn, false)) + return false; + + /* PQsendQuery returns 1 on success, 0 on failure */ + if (!PQsendQuery(conn->pg_conn, query)) + return false; + + return true; +} + +static WalProposerExecStatusType +walprop_get_query_result(WalProposerConn *conn) +{ + PGresult *result; + WalProposerExecStatusType return_val; + + /* Marker variable if we need to log an unexpected success result */ + char *unexpected_success = NULL; + + /* Consume any input that we might be missing */ + if (!PQconsumeInput(conn->pg_conn)) + return WP_EXEC_FAILED; + + if (PQisBusy(conn->pg_conn)) + return WP_EXEC_NEEDS_INPUT; + + + result = PQgetResult(conn->pg_conn); + + /* + * PQgetResult returns NULL only if getting the result was successful & + * there's no more of the result to get. + */ + if (!result) + { + elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results"); + return WP_EXEC_UNEXPECTED_SUCCESS; + } + + /* Helper macro to reduce boilerplate */ +#define UNEXPECTED_SUCCESS(msg) \ + return_val = WP_EXEC_UNEXPECTED_SUCCESS; \ + unexpected_success = msg; \ + break; + + + switch (PQresultStatus(result)) + { + /* "true" success case */ + case PGRES_COPY_BOTH: + return_val = WP_EXEC_SUCCESS_COPYBOTH; + break; + + /* Unexpected success case */ + case PGRES_EMPTY_QUERY: + UNEXPECTED_SUCCESS("empty query return"); + case PGRES_COMMAND_OK: + UNEXPECTED_SUCCESS("data-less command end"); + case PGRES_TUPLES_OK: + UNEXPECTED_SUCCESS("tuples return"); + case PGRES_COPY_OUT: + UNEXPECTED_SUCCESS("'Copy Out' response"); + case PGRES_COPY_IN: + UNEXPECTED_SUCCESS("'Copy In' response"); + case PGRES_SINGLE_TUPLE: + UNEXPECTED_SUCCESS("single tuple return"); + case PGRES_PIPELINE_SYNC: + UNEXPECTED_SUCCESS("pipeline sync point"); + + /* Failure cases */ + case PGRES_BAD_RESPONSE: + case PGRES_NONFATAL_ERROR: + case PGRES_FATAL_ERROR: + case PGRES_PIPELINE_ABORTED: + return_val = WP_EXEC_FAILED; + break; + + default: + Assert(false); + return_val = WP_EXEC_FAILED; /* keep the compiler quiet */ + } + + if (unexpected_success) + elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success); + + return return_val; +} + +static pgsocket +walprop_socket(WalProposerConn *conn) +{ + return PQsocket(conn->pg_conn); +} + +static int +walprop_flush(WalProposerConn *conn) +{ + return (PQflush(conn->pg_conn)); +} + +static void +walprop_finish(WalProposerConn *conn) +{ + if (conn->recvbuf != NULL) + PQfreemem(conn->recvbuf); + PQfinish(conn->pg_conn); + pfree(conn); +} + +/* + * Receive a message from the safekeeper. + * + * On success, the data is placed in *buf. It is valid until the next call + * to this function. + */ +static PGAsyncReadResult +walprop_async_read(WalProposerConn *conn, char **buf, int *amount) +{ + int result; + + if (conn->recvbuf != NULL) + { + PQfreemem(conn->recvbuf); + conn->recvbuf = NULL; + } + + /* Call PQconsumeInput so that we have the data we need */ + if (!PQconsumeInput(conn->pg_conn)) + { + *amount = 0; + *buf = NULL; + return PG_ASYNC_READ_FAIL; + } + + /* + * The docs for PQgetCopyData list the return values as: 0 if the copy is + * still in progress, but no "complete row" is available -1 if the copy is + * done -2 if an error occurred (> 0) if it was successful; that value is + * the amount transferred. + * + * The protocol we use between walproposer and safekeeper means that we + * *usually* wouldn't expect to see that the copy is done, but this can + * sometimes be triggered by the server returning an ErrorResponse (which + * also happens to have the effect that the copy is done). + */ + switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true)) + { + case 0: + *amount = 0; + *buf = NULL; + return PG_ASYNC_READ_TRY_AGAIN; + case -1: + { + /* + * If we get -1, it's probably because of a server error; the + * safekeeper won't normally send a CopyDone message. + * + * We can check PQgetResult to make sure that the server + * failed; it'll always result in PGRES_FATAL_ERROR + */ + ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn)); + + if (status != PGRES_FATAL_ERROR) + elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status); + + /* + * If there was actually an error, it'll be properly reported + * by calls to PQerrorMessage -- we don't have to do anything + * else + */ + *amount = 0; + *buf = NULL; + return PG_ASYNC_READ_FAIL; + } + case -2: + *amount = 0; + *buf = NULL; + return PG_ASYNC_READ_FAIL; + default: + /* Positive values indicate the size of the returned result */ + *amount = result; + *buf = conn->recvbuf; + return PG_ASYNC_READ_SUCCESS; + } +} + +static PGAsyncWriteResult +walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) +{ + int result; + + /* If we aren't in non-blocking mode, switch to it. */ + if (!ensure_nonblocking_status(conn, true)) + return PG_ASYNC_WRITE_FAIL; + + /* + * The docs for PQputcopyData list the return values as: 1 if the data was + * queued, 0 if it was not queued because of full buffers, or -1 if an + * error occurred + */ + result = PQputCopyData(conn->pg_conn, buf, size); + + /* + * We won't get a result of zero because walproposer always empties the + * connection's buffers before sending more + */ + Assert(result != 0); + + switch (result) + { + case 1: + /* good -- continue */ + break; + case -1: + return PG_ASYNC_WRITE_FAIL; + default: + elog(FATAL, "invalid return %d from PQputCopyData", result); + } + + /* + * After queueing the data, we still need to flush to get it to send. This + * might take multiple tries, but we don't want to wait around until it's + * done. + * + * PQflush has the following returns (directly quoting the docs): 0 if + * sucessful, 1 if it was unable to send all the data in the send queue + * yet -1 if it failed for some reason + */ + switch (result = PQflush(conn->pg_conn)) + { + case 0: + return PG_ASYNC_WRITE_SUCCESS; + case 1: + return PG_ASYNC_WRITE_TRY_FLUSH; + case -1: + return PG_ASYNC_WRITE_FAIL; + default: + elog(FATAL, "invalid return %d from PQflush", result); + } +} + +/* + * This function is very similar to walprop_async_write. For more + * information, refer to the comments there. + */ +static bool +walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) +{ + int result; + + /* If we are in non-blocking mode, switch out of it. */ + if (!ensure_nonblocking_status(conn, false)) + return false; + + if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1) + return false; + + Assert(result == 1); + + /* Because the connection is non-blocking, flushing returns 0 or -1 */ + + if ((result = PQflush(conn->pg_conn)) == -1) + return false; + + Assert(result == 0); + return true; +} + /* * Temporary globally exported walproposer API for postgres. */ @@ -447,5 +892,20 @@ const walproposer_api walprop_pg = { .init_walsender = walprop_pg_init_walsender, .init_standalone_sync_safekeepers = walprop_pg_init_standalone_sync_safekeepers, .init_bgworker = walprop_pg_init_bgworker, - .get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr + .get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr, + .get_current_timestamp = walprop_pg_get_current_timestamp, + .get_timeline_id = walprop_pg_get_timeline_id, + .load_libpqwalreceiver = walprop_pg_load_libpqwalreceiver, + .conn_error_message = walprop_error_message, + .conn_status = walprop_status, + .conn_connect_start = walprop_connect_start, + .conn_connect_poll = walprop_connect_poll, + .conn_send_query = walprop_send_query, + .conn_get_query_result = walprop_get_query_result, + .conn_socket = walprop_socket, + .conn_flush = walprop_flush, + .conn_finish = walprop_finish, + .conn_async_read = walprop_async_read, + .conn_async_write = walprop_async_write, + .conn_blocking_write = walprop_blocking_write, };