Move libpqwalproposer to walproposer_pg

This commit is contained in:
Arthur Petukhovsky
2023-09-25 11:26:25 +00:00
parent fbaca131ca
commit 0e1ff5db4c
5 changed files with 529 additions and 550 deletions

View File

@@ -7,7 +7,6 @@ OBJS = \
extension_server.o \
file_cache.o \
libpagestore.o \
libpqwalproposer.o \
neon.o \
pagestore_smgr.o \
relsize_cache.o \

View File

@@ -1,424 +0,0 @@
#include "postgres.h"
#include "libpq-fe.h"
#include "neon.h"
#include "walproposer.h"
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
struct WalProposerConn
{
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from
* walprop_async_read */
};
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
{
/* If we're already correctly blocking or nonblocking, all good */
if (is_nonblocking == conn->is_nonblocking)
return true;
/* Otherwise, set it appropriately */
if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1)
return false;
conn->is_nonblocking = is_nonblocking;
return true;
}
/* Exported function definitions */
char *
walprop_error_message(WalProposerConn *conn)
{
return PQerrorMessage(conn->pg_conn);
}
WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
switch (PQstatus(conn->pg_conn))
{
case CONNECTION_OK:
return WP_CONNECTION_OK;
case CONNECTION_BAD:
return WP_CONNECTION_BAD;
default:
return WP_CONNECTION_IN_PROGRESS;
}
}
WalProposerConn *
walprop_connect_start(char *conninfo, char *password)
{
WalProposerConn *conn;
PGconn *pg_conn;
const char *keywords[3];
const char *values[3];
int n;
/*
* Connect using the given connection string. If the
* NEON_AUTH_TOKEN environment variable was set, use that as
* the password.
*
* The connection options are parsed in the order they're given, so
* when we set the password before the connection string, the
* connection string can override the password from the env variable.
* Seems useful, although we don't currently use that capability
* anywhere.
*/
n = 0;
if (password)
{
keywords[n] = "password";
values[n] = password;
n++;
}
keywords[n] = "dbname";
values[n] = conninfo;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pg_conn = PQconnectStartParams(keywords, values, 1);
/*
* Allocation of a PQconn can fail, and will return NULL. We want to fully
* replicate the behavior of PQconnectStart here.
*/
if (!pg_conn)
return NULL;
/*
* And in theory this allocation can fail as well, but it's incredibly
* unlikely if we just successfully allocated a PGconn.
*
* palloc will exit on failure though, so there's not much we could do if
* it *did* fail.
*/
conn = palloc(sizeof(WalProposerConn));
conn->pg_conn = pg_conn;
conn->is_nonblocking = false; /* connections always start in blocking
* mode */
conn->recvbuf = NULL;
return conn;
}
WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
WalProposerConnectPollStatusType return_val;
switch (PQconnectPoll(conn->pg_conn))
{
case PGRES_POLLING_FAILED:
return_val = WP_CONN_POLLING_FAILED;
break;
case PGRES_POLLING_READING:
return_val = WP_CONN_POLLING_READING;
break;
case PGRES_POLLING_WRITING:
return_val = WP_CONN_POLLING_WRITING;
break;
case PGRES_POLLING_OK:
return_val = WP_CONN_POLLING_OK;
break;
/*
* There's a comment at its source about this constant being
* unused. We'll expect it's never returned.
*/
case PGRES_POLLING_ACTIVE:
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
/*
* This return is never actually reached, but it's here to make
* the compiler happy
*/
return WP_CONN_POLLING_FAILED;
default:
Assert(false);
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
}
return return_val;
}
bool
walprop_send_query(WalProposerConn *conn, char *query)
{
/*
* We need to be in blocking mode for sending the query to run without
* requiring a call to PQflush
*/
if (!ensure_nonblocking_status(conn, false))
return false;
/* PQsendQuery returns 1 on success, 0 on failure */
if (!PQsendQuery(conn->pg_conn, query))
return false;
return true;
}
WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
PGresult *result;
WalProposerExecStatusType return_val;
/* Marker variable if we need to log an unexpected success result */
char *unexpected_success = NULL;
/* Consume any input that we might be missing */
if (!PQconsumeInput(conn->pg_conn))
return WP_EXEC_FAILED;
if (PQisBusy(conn->pg_conn))
return WP_EXEC_NEEDS_INPUT;
result = PQgetResult(conn->pg_conn);
/*
* PQgetResult returns NULL only if getting the result was successful &
* there's no more of the result to get.
*/
if (!result)
{
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
return WP_EXEC_UNEXPECTED_SUCCESS;
}
/* Helper macro to reduce boilerplate */
#define UNEXPECTED_SUCCESS(msg) \
return_val = WP_EXEC_UNEXPECTED_SUCCESS; \
unexpected_success = msg; \
break;
switch (PQresultStatus(result))
{
/* "true" success case */
case PGRES_COPY_BOTH:
return_val = WP_EXEC_SUCCESS_COPYBOTH;
break;
/* Unexpected success case */
case PGRES_EMPTY_QUERY:
UNEXPECTED_SUCCESS("empty query return");
case PGRES_COMMAND_OK:
UNEXPECTED_SUCCESS("data-less command end");
case PGRES_TUPLES_OK:
UNEXPECTED_SUCCESS("tuples return");
case PGRES_COPY_OUT:
UNEXPECTED_SUCCESS("'Copy Out' response");
case PGRES_COPY_IN:
UNEXPECTED_SUCCESS("'Copy In' response");
case PGRES_SINGLE_TUPLE:
UNEXPECTED_SUCCESS("single tuple return");
case PGRES_PIPELINE_SYNC:
UNEXPECTED_SUCCESS("pipeline sync point");
/* Failure cases */
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_PIPELINE_ABORTED:
return_val = WP_EXEC_FAILED;
break;
default:
Assert(false);
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
}
if (unexpected_success)
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
return return_val;
}
pgsocket
walprop_socket(WalProposerConn *conn)
{
return PQsocket(conn->pg_conn);
}
int
walprop_flush(WalProposerConn *conn)
{
return (PQflush(conn->pg_conn));
}
void
walprop_finish(WalProposerConn *conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
PQfinish(conn->pg_conn);
pfree(conn);
}
/*
* Receive a message from the safekeeper.
*
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
PGAsyncReadResult
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
{
int result;
if (conn->recvbuf != NULL)
{
PQfreemem(conn->recvbuf);
conn->recvbuf = NULL;
}
/* Call PQconsumeInput so that we have the data we need */
if (!PQconsumeInput(conn->pg_conn))
{
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
/*
* The docs for PQgetCopyData list the return values as: 0 if the copy is
* still in progress, but no "complete row" is available -1 if the copy is
* done -2 if an error occurred (> 0) if it was successful; that value is
* the amount transferred.
*
* The protocol we use between walproposer and safekeeper means that we
* *usually* wouldn't expect to see that the copy is done, but this can
* sometimes be triggered by the server returning an ErrorResponse (which
* also happens to have the effect that the copy is done).
*/
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
{
case 0:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_TRY_AGAIN;
case -1:
{
/*
* If we get -1, it's probably because of a server error; the
* safekeeper won't normally send a CopyDone message.
*
* We can check PQgetResult to make sure that the server
* failed; it'll always result in PGRES_FATAL_ERROR
*/
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
/*
* If there was actually an error, it'll be properly reported
* by calls to PQerrorMessage -- we don't have to do anything
* else
*/
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
case -2:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
default:
/* Positive values indicate the size of the returned result */
*amount = result;
*buf = conn->recvbuf;
return PG_ASYNC_READ_SUCCESS;
}
}
PGAsyncWriteResult
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we aren't in non-blocking mode, switch to it. */
if (!ensure_nonblocking_status(conn, true))
return PG_ASYNC_WRITE_FAIL;
/*
* The docs for PQputcopyData list the return values as: 1 if the data was
* queued, 0 if it was not queued because of full buffers, or -1 if an
* error occurred
*/
result = PQputCopyData(conn->pg_conn, buf, size);
/*
* We won't get a result of zero because walproposer always empties the
* connection's buffers before sending more
*/
Assert(result != 0);
switch (result)
{
case 1:
/* good -- continue */
break;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQputCopyData", result);
}
/*
* After queueing the data, we still need to flush to get it to send. This
* might take multiple tries, but we don't want to wait around until it's
* done.
*
* PQflush has the following returns (directly quoting the docs): 0 if
* sucessful, 1 if it was unable to send all the data in the send queue
* yet -1 if it failed for some reason
*/
switch (result = PQflush(conn->pg_conn))
{
case 0:
return PG_ASYNC_WRITE_SUCCESS;
case 1:
return PG_ASYNC_WRITE_TRY_FLUSH;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQflush", result);
}
}
/*
* This function is very similar to walprop_async_write. For more
* information, refer to the comments there.
*/
bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we are in non-blocking mode, switch out of it. */
if (!ensure_nonblocking_status(conn, false))
return false;
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
return false;
Assert(result == 1);
/* Because the connection is non-blocking, flushing returns 0 or -1 */
if ((result = PQflush(conn->pg_conn)) == -1)
return false;
Assert(result == 0);
return true;
}

View File

@@ -174,7 +174,7 @@ WalProposerMain(Datum main_arg)
WalProposerInit(walprop_pg.get_flush_rec_ptr(), systemId);
last_reconnect_attempt = GetCurrentTimestamp();
last_reconnect_attempt = walprop_pg.get_current_timestamp();
walprop_pg.init_walsender();
@@ -208,7 +208,7 @@ WalProposerPoll(void)
bool late_cv_trigger = false;
WaitEvent event = {0};
int rc = 0;
TimestampTz now = GetCurrentTimestamp();
TimestampTz now = walprop_pg.get_current_timestamp();
long timeout = TimeToReconnect(now);
#if PG_MAJORVERSION_NUM >= 16
@@ -276,7 +276,7 @@ WalProposerPoll(void)
}
}
now = GetCurrentTimestamp();
now = walprop_pg.get_current_timestamp();
if (rc == 0 || TimeToReconnect(now) <= 0) /* timeout expired: poll state */
{
TimestampTz now;
@@ -293,7 +293,7 @@ WalProposerPoll(void)
/*
* Abandon connection attempts which take too long.
*/
now = GetCurrentTimestamp();
now = walprop_pg.get_current_timestamp();
for (int i = 0; i < n_safekeepers; i++)
{
Safekeeper *sk = &safekeeper[i];
@@ -317,9 +317,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
char *sep;
char *port;
load_file("libpqwalreceiver", false);
if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
walprop_pg.load_libpqwalreceiver();
for (host = wal_acceptors_list; host != NULL && *host != '\0'; host = sep)
{
@@ -384,12 +382,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
!HexDecodeString(greetRequest.tenant_id, neon_tenant, 16))
elog(FATAL, "Could not parse neon.tenant_id, %s", neon_tenant);
#if PG_VERSION_NUM >= 150000
/* FIXME don't use hardcoded timeline id */
greetRequest.timeline = 1;
#else
greetRequest.timeline = ThisTimeLineID;
#endif
greetRequest.timeline = walprop_pg.get_timeline_id();
greetRequest.walSegSize = wal_segment_size;
InitEventSet();
@@ -482,7 +475,7 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove)
if (sk->conn != NULL)
{
desired_events = SafekeeperStateDesiredEvents(sk->state);
sk->eventPos = AddWaitEventToSet(waitEvents, desired_events, walprop_socket(sk->conn), NULL, sk);
sk->eventPos = AddWaitEventToSet(waitEvents, desired_events, walprop_pg.conn_socket(sk->conn), NULL, sk);
}
}
}
@@ -492,7 +485,7 @@ static void
ShutdownConnection(Safekeeper *sk)
{
if (sk->conn)
walprop_finish(sk->conn);
walprop_pg.conn_finish(sk->conn);
sk->conn = NULL;
sk->state = SS_OFFLINE;
sk->flushWrite = false;
@@ -524,7 +517,7 @@ ResetConnection(Safekeeper *sk)
/*
* Try to establish new connection
*/
sk->conn = walprop_connect_start((char *) &sk->conninfo, neon_auth_token);
sk->conn = walprop_pg.conn_connect_start((char *) &sk->conninfo, neon_auth_token);
/*
* "If the result is null, then libpq has been unable to allocate a new
@@ -538,7 +531,7 @@ ResetConnection(Safekeeper *sk)
* PQconnectPoll. Before we do that though, we need to check that it
* didn't immediately fail.
*/
if (walprop_status(sk->conn) == WP_CONNECTION_BAD)
if (walprop_pg.conn_status(sk->conn) == WP_CONNECTION_BAD)
{
/*---
* According to libpq docs:
@@ -550,13 +543,13 @@ ResetConnection(Safekeeper *sk)
* https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS
*/
elog(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s",
sk->host, sk->port, walprop_error_message(sk->conn));
sk->host, sk->port, walprop_pg.conn_error_message(sk->conn));
/*
* Even though the connection failed, we still need to clean up the
* object
*/
walprop_finish(sk->conn);
walprop_pg.conn_finish(sk->conn);
sk->conn = NULL;
return;
}
@@ -577,9 +570,9 @@ ResetConnection(Safekeeper *sk)
elog(LOG, "connecting with node %s:%s", sk->host, sk->port);
sk->state = SS_CONNECTING_WRITE;
sk->latestMsgReceivedAt = GetCurrentTimestamp();
sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp();
sock = walprop_socket(sk->conn);
sock = walprop_pg.conn_socket(sk->conn);
sk->eventPos = AddWaitEventToSet(waitEvents, WL_SOCKET_WRITEABLE, sock, NULL, sk);
return;
}
@@ -609,7 +602,7 @@ TimeToReconnect(TimestampTz now)
static void
ReconnectSafekeepers(void)
{
TimestampTz now = GetCurrentTimestamp();
TimestampTz now = walprop_pg.get_current_timestamp();
if (TimeToReconnect(now) == 0)
{
@@ -725,7 +718,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
static void
HandleConnectionEvent(Safekeeper *sk)
{
WalProposerConnectPollStatusType result = walprop_connect_poll(sk->conn);
WalProposerConnectPollStatusType result = walprop_pg.conn_connect_poll(sk->conn);
/* The new set of events we'll wait on, after updating */
uint32 new_events = WL_NO_EVENTS;
@@ -735,7 +728,7 @@ HandleConnectionEvent(Safekeeper *sk)
case WP_CONN_POLLING_OK:
elog(LOG, "connected with node %s:%s", sk->host,
sk->port);
sk->latestMsgReceivedAt = GetCurrentTimestamp();
sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp();
/*
* We have to pick some event to update event set. We'll
* eventually need the socket to be readable, so we go with that.
@@ -757,7 +750,7 @@ HandleConnectionEvent(Safekeeper *sk)
case WP_CONN_POLLING_FAILED:
elog(WARNING, "failed to connect to node '%s:%s': %s",
sk->host, sk->port, walprop_error_message(sk->conn));
sk->host, sk->port, walprop_pg.conn_error_message(sk->conn));
/*
* If connecting failed, we don't want to restart the connection
@@ -774,7 +767,7 @@ HandleConnectionEvent(Safekeeper *sk)
* old event and re-register an event on the new socket.
*/
HackyRemoveWalProposerEvent(sk);
sk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_socket(sk->conn), NULL, sk);
sk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_pg.conn_socket(sk->conn), NULL, sk);
/* If we successfully connected, send START_WAL_PUSH query */
if (result == WP_CONN_POLLING_OK)
@@ -789,10 +782,10 @@ HandleConnectionEvent(Safekeeper *sk)
static void
SendStartWALPush(Safekeeper *sk)
{
if (!walprop_send_query(sk->conn, "START_WAL_PUSH"))
if (!walprop_pg.conn_send_query(sk->conn, "START_WAL_PUSH"))
{
elog(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
sk->host, sk->port, walprop_error_message(sk->conn));
sk->host, sk->port, walprop_pg.conn_error_message(sk->conn));
ShutdownConnection(sk);
return;
}
@@ -803,7 +796,7 @@ SendStartWALPush(Safekeeper *sk)
static void
RecvStartWALPushResult(Safekeeper *sk)
{
switch (walprop_get_query_result(sk->conn))
switch (walprop_pg.conn_get_query_result(sk->conn))
{
/*
* Successful result, move on to starting the handshake
@@ -827,7 +820,7 @@ RecvStartWALPushResult(Safekeeper *sk)
case WP_EXEC_FAILED:
elog(WARNING, "Failed to send query to safekeeper %s:%s: %s",
sk->host, sk->port, walprop_error_message(sk->conn));
sk->host, sk->port, walprop_pg.conn_error_message(sk->conn));
ShutdownConnection(sk);
return;
@@ -1611,19 +1604,14 @@ SendAppendRequests(Safekeeper *sk)
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn,
#if PG_VERSION_NUM >= 150000
/* FIXME don't use hardcoded timeline_id here */
1,
#else
ThisTimeLineID,
#endif
walprop_pg.get_timeline_id(),
&errinfo))
{
WALReadRaiseError(&errinfo);
}
sk->outbuf.len += req->endLsn - req->beginLsn;
writeResult = walprop_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len);
writeResult = walprop_pg.conn_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len);
/* Mark current message as sent, whatever the result is */
sk->streamingAt = endLsn;
@@ -1647,7 +1635,7 @@ SendAppendRequests(Safekeeper *sk)
case PG_ASYNC_WRITE_FAIL:
elog(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
walprop_pg.conn_error_message(sk->conn));
ShutdownConnection(sk);
return false;
default:
@@ -1951,7 +1939,7 @@ HandleSafekeeperResponse(void)
* pageserver.
*/
quorumFeedback.rf.disk_consistent_lsn,
GetCurrentTimestamp(), false);
walprop_pg.get_current_timestamp(), false);
}
CombineHotStanbyFeedbacks(&hsFeedback);
@@ -2050,7 +2038,7 @@ HandleSafekeeperResponse(void)
static bool
AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
{
switch (walprop_async_read(sk->conn, buf, buf_size))
switch (walprop_pg.conn_async_read(sk->conn, buf, buf_size))
{
case PG_ASYNC_READ_SUCCESS:
return true;
@@ -2062,7 +2050,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
case PG_ASYNC_READ_FAIL:
elog(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host,
sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
walprop_pg.conn_error_message(sk->conn));
ShutdownConnection(sk);
return false;
}
@@ -2103,7 +2091,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg)
ResetConnection(sk);
return false;
}
sk->latestMsgReceivedAt = GetCurrentTimestamp();
sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp();
switch (tag)
{
case 'g':
@@ -2171,11 +2159,11 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes
{
uint32 events;
if (!walprop_blocking_write(sk->conn, msg, msg_size))
if (!walprop_pg.conn_blocking_write(sk->conn, msg, msg_size))
{
elog(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
walprop_pg.conn_error_message(sk->conn));
ShutdownConnection(sk);
return false;
}
@@ -2203,7 +2191,7 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes
static bool
AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state)
{
switch (walprop_async_write(sk->conn, msg, msg_size))
switch (walprop_pg.conn_async_write(sk->conn, msg, msg_size))
{
case PG_ASYNC_WRITE_SUCCESS:
return true;
@@ -2220,7 +2208,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta
case PG_ASYNC_WRITE_FAIL:
elog(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
walprop_pg.conn_error_message(sk->conn));
ShutdownConnection(sk);
return false;
default:
@@ -2246,7 +2234,7 @@ AsyncFlush(Safekeeper *sk)
* 1 if unable to send everything yet [call PQflush again]
* -1 if it failed [emit an error]
*/
switch (walprop_flush(sk->conn))
switch (walprop_pg.conn_flush(sk->conn))
{
case 0:
/* flush is done */
@@ -2257,7 +2245,7 @@ AsyncFlush(Safekeeper *sk)
case -1:
elog(WARNING, "Failed to flush write to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
walprop_error_message(sk->conn));
walprop_pg.conn_error_message(sk->conn));
ResetConnection(sk);
return false;
default:

View File

@@ -298,23 +298,6 @@ typedef struct WalproposerShmemState
pg_atomic_uint64 backpressureThrottlingTime;
} WalproposerShmemState;
/*
* Collection of hooks for walproposer, to call postgres functions,
* read WAL and send it over the network.
*/
typedef struct walproposer_api
{
WalproposerShmemState * (*get_shmem_state) (void);
void (*replication_feedback_set) (PageserverFeedback * rf);
void (*start_streaming) (XLogRecPtr startpos, TimeLineID timeline);
void (*init_walsender) (void);
void (*init_standalone_sync_safekeepers) (void);
uint64 (*init_bgworker) (void);
XLogRecPtr (*get_flush_rec_ptr) (void);
} walproposer_api;
extern const walproposer_api walprop_pg;
/*
* Report safekeeper state to proposer
*/
@@ -461,65 +444,38 @@ typedef enum
WP_CONNECTION_IN_PROGRESS,
} WalProposerConnStatusType;
/* Re-exported PQerrorMessage */
extern char *walprop_error_message(WalProposerConn *conn);
/* Re-exported PQstatus */
extern WalProposerConnStatusType walprop_status(WalProposerConn *conn);
/* Re-exported PQconnectStart */
extern WalProposerConn * walprop_connect_start(char *conninfo, char *password);
/* Re-exported PQconectPoll */
extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn);
/* Blocking wrapper around PQsendQuery */
extern bool walprop_send_query(WalProposerConn *conn, char *query);
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn);
/* Re-exported PQsocket */
extern pgsocket walprop_socket(WalProposerConn *conn);
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
extern int walprop_flush(WalProposerConn *conn);
/* Re-exported PQfinish */
extern void walprop_finish(WalProposerConn *conn);
/*
* Ergonomic wrapper around PGgetCopyData
*
* Reads a CopyData block from a safekeeper, setting *amount to the number
* of bytes returned.
*
* This function is allowed to assume certain properties specific to the
* protocol with the safekeepers, so it should not be used as-is for any
* other purpose.
*
* Note: If possible, using <AsyncRead> is generally preferred, because it
* performs a bit of extra checking work that's always required and is normally
* somewhat verbose.
*/
extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount);
/*
* Ergonomic wrapper around PQputCopyData + PQflush
*
* Starts to write a CopyData block to a safekeeper.
*
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
*/
extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size);
/*
* Blocking equivalent to walprop_async_write_fn
*
* Returns 'true' if successful, 'false' on failure.
*/
extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size);
extern uint64 BackpressureThrottlingTime(void);
/*
* Collection of hooks for walproposer, to call postgres functions,
* read WAL and send it over the network.
*/
typedef struct walproposer_api
{
WalproposerShmemState * (*get_shmem_state) (void);
void (*replication_feedback_set) (PageserverFeedback * rf);
void (*start_streaming) (XLogRecPtr startpos, TimeLineID timeline);
void (*init_walsender) (void);
void (*init_standalone_sync_safekeepers) (void);
uint64 (*init_bgworker) (void);
XLogRecPtr (*get_flush_rec_ptr) (void);
TimestampTz (*get_current_timestamp) (void);
TimeLineID (*get_timeline_id) (void);
void (*load_libpqwalreceiver) (void);
char * (*conn_error_message) (WalProposerConn * conn);
WalProposerConnStatusType (*conn_status) (WalProposerConn * conn);
WalProposerConn * (*conn_connect_start) (char *conninfo, char *password);
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn * conn);
bool (*conn_send_query) (WalProposerConn * conn, char * query);
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn * conn);
pgsocket (*conn_socket) (WalProposerConn * conn);
int (*conn_flush) (WalProposerConn * conn);
void (*conn_finish) (WalProposerConn * conn);
PGAsyncReadResult (*conn_async_read) (WalProposerConn * conn, char **buf, int *amount);
PGAsyncWriteResult (*conn_async_write) (WalProposerConn * conn, void const *buf, size_t size);
bool (*conn_blocking_write) (WalProposerConn * conn, void const *buf, size_t size);
} walproposer_api;
extern const walproposer_api walprop_pg;
#endif /* __NEON_WALPROPOSER_H__ */

View File

@@ -40,6 +40,7 @@
#include "neon.h"
#include "walproposer.h"
#include "walproposer_utils.h"
#include "libpq-fe.h"
char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
@@ -437,6 +438,450 @@ walprop_pg_get_flush_rec_ptr(void)
#endif
}
static TimestampTz
walprop_pg_get_current_timestamp(void)
{
return GetCurrentTimestamp();
}
static TimeLineID
walprop_pg_get_timeline_id(void)
{
#if PG_VERSION_NUM >= 150000
/* FIXME don't use hardcoded timeline id */
return 1;
#else
return ThisTimeLineID;
#endif
}
static void
walprop_pg_load_libpqwalreceiver(void)
{
load_file("libpqwalreceiver", false);
if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
}
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
struct WalProposerConn
{
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from
* walprop_async_read */
};
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
{
/* If we're already correctly blocking or nonblocking, all good */
if (is_nonblocking == conn->is_nonblocking)
return true;
/* Otherwise, set it appropriately */
if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1)
return false;
conn->is_nonblocking = is_nonblocking;
return true;
}
/* Exported function definitions */
static char *
walprop_error_message(WalProposerConn *conn)
{
return PQerrorMessage(conn->pg_conn);
}
static WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
switch (PQstatus(conn->pg_conn))
{
case CONNECTION_OK:
return WP_CONNECTION_OK;
case CONNECTION_BAD:
return WP_CONNECTION_BAD;
default:
return WP_CONNECTION_IN_PROGRESS;
}
}
static WalProposerConn *
walprop_connect_start(char *conninfo, char *password)
{
WalProposerConn *conn;
PGconn *pg_conn;
const char *keywords[3];
const char *values[3];
int n;
/*
* Connect using the given connection string. If the
* NEON_AUTH_TOKEN environment variable was set, use that as
* the password.
*
* The connection options are parsed in the order they're given, so
* when we set the password before the connection string, the
* connection string can override the password from the env variable.
* Seems useful, although we don't currently use that capability
* anywhere.
*/
n = 0;
if (password)
{
keywords[n] = "password";
values[n] = password;
n++;
}
keywords[n] = "dbname";
values[n] = conninfo;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pg_conn = PQconnectStartParams(keywords, values, 1);
/*
* Allocation of a PQconn can fail, and will return NULL. We want to fully
* replicate the behavior of PQconnectStart here.
*/
if (!pg_conn)
return NULL;
/*
* And in theory this allocation can fail as well, but it's incredibly
* unlikely if we just successfully allocated a PGconn.
*
* palloc will exit on failure though, so there's not much we could do if
* it *did* fail.
*/
conn = palloc(sizeof(WalProposerConn));
conn->pg_conn = pg_conn;
conn->is_nonblocking = false; /* connections always start in blocking
* mode */
conn->recvbuf = NULL;
return conn;
}
static WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
WalProposerConnectPollStatusType return_val;
switch (PQconnectPoll(conn->pg_conn))
{
case PGRES_POLLING_FAILED:
return_val = WP_CONN_POLLING_FAILED;
break;
case PGRES_POLLING_READING:
return_val = WP_CONN_POLLING_READING;
break;
case PGRES_POLLING_WRITING:
return_val = WP_CONN_POLLING_WRITING;
break;
case PGRES_POLLING_OK:
return_val = WP_CONN_POLLING_OK;
break;
/*
* There's a comment at its source about this constant being
* unused. We'll expect it's never returned.
*/
case PGRES_POLLING_ACTIVE:
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
/*
* This return is never actually reached, but it's here to make
* the compiler happy
*/
return WP_CONN_POLLING_FAILED;
default:
Assert(false);
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
}
return return_val;
}
static bool
walprop_send_query(WalProposerConn *conn, char *query)
{
/*
* We need to be in blocking mode for sending the query to run without
* requiring a call to PQflush
*/
if (!ensure_nonblocking_status(conn, false))
return false;
/* PQsendQuery returns 1 on success, 0 on failure */
if (!PQsendQuery(conn->pg_conn, query))
return false;
return true;
}
static WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
PGresult *result;
WalProposerExecStatusType return_val;
/* Marker variable if we need to log an unexpected success result */
char *unexpected_success = NULL;
/* Consume any input that we might be missing */
if (!PQconsumeInput(conn->pg_conn))
return WP_EXEC_FAILED;
if (PQisBusy(conn->pg_conn))
return WP_EXEC_NEEDS_INPUT;
result = PQgetResult(conn->pg_conn);
/*
* PQgetResult returns NULL only if getting the result was successful &
* there's no more of the result to get.
*/
if (!result)
{
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
return WP_EXEC_UNEXPECTED_SUCCESS;
}
/* Helper macro to reduce boilerplate */
#define UNEXPECTED_SUCCESS(msg) \
return_val = WP_EXEC_UNEXPECTED_SUCCESS; \
unexpected_success = msg; \
break;
switch (PQresultStatus(result))
{
/* "true" success case */
case PGRES_COPY_BOTH:
return_val = WP_EXEC_SUCCESS_COPYBOTH;
break;
/* Unexpected success case */
case PGRES_EMPTY_QUERY:
UNEXPECTED_SUCCESS("empty query return");
case PGRES_COMMAND_OK:
UNEXPECTED_SUCCESS("data-less command end");
case PGRES_TUPLES_OK:
UNEXPECTED_SUCCESS("tuples return");
case PGRES_COPY_OUT:
UNEXPECTED_SUCCESS("'Copy Out' response");
case PGRES_COPY_IN:
UNEXPECTED_SUCCESS("'Copy In' response");
case PGRES_SINGLE_TUPLE:
UNEXPECTED_SUCCESS("single tuple return");
case PGRES_PIPELINE_SYNC:
UNEXPECTED_SUCCESS("pipeline sync point");
/* Failure cases */
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_PIPELINE_ABORTED:
return_val = WP_EXEC_FAILED;
break;
default:
Assert(false);
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
}
if (unexpected_success)
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
return return_val;
}
static pgsocket
walprop_socket(WalProposerConn *conn)
{
return PQsocket(conn->pg_conn);
}
static int
walprop_flush(WalProposerConn *conn)
{
return (PQflush(conn->pg_conn));
}
static void
walprop_finish(WalProposerConn *conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
PQfinish(conn->pg_conn);
pfree(conn);
}
/*
* Receive a message from the safekeeper.
*
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
static PGAsyncReadResult
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
{
int result;
if (conn->recvbuf != NULL)
{
PQfreemem(conn->recvbuf);
conn->recvbuf = NULL;
}
/* Call PQconsumeInput so that we have the data we need */
if (!PQconsumeInput(conn->pg_conn))
{
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
/*
* The docs for PQgetCopyData list the return values as: 0 if the copy is
* still in progress, but no "complete row" is available -1 if the copy is
* done -2 if an error occurred (> 0) if it was successful; that value is
* the amount transferred.
*
* The protocol we use between walproposer and safekeeper means that we
* *usually* wouldn't expect to see that the copy is done, but this can
* sometimes be triggered by the server returning an ErrorResponse (which
* also happens to have the effect that the copy is done).
*/
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
{
case 0:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_TRY_AGAIN;
case -1:
{
/*
* If we get -1, it's probably because of a server error; the
* safekeeper won't normally send a CopyDone message.
*
* We can check PQgetResult to make sure that the server
* failed; it'll always result in PGRES_FATAL_ERROR
*/
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
/*
* If there was actually an error, it'll be properly reported
* by calls to PQerrorMessage -- we don't have to do anything
* else
*/
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
case -2:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
default:
/* Positive values indicate the size of the returned result */
*amount = result;
*buf = conn->recvbuf;
return PG_ASYNC_READ_SUCCESS;
}
}
static PGAsyncWriteResult
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we aren't in non-blocking mode, switch to it. */
if (!ensure_nonblocking_status(conn, true))
return PG_ASYNC_WRITE_FAIL;
/*
* The docs for PQputcopyData list the return values as: 1 if the data was
* queued, 0 if it was not queued because of full buffers, or -1 if an
* error occurred
*/
result = PQputCopyData(conn->pg_conn, buf, size);
/*
* We won't get a result of zero because walproposer always empties the
* connection's buffers before sending more
*/
Assert(result != 0);
switch (result)
{
case 1:
/* good -- continue */
break;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQputCopyData", result);
}
/*
* After queueing the data, we still need to flush to get it to send. This
* might take multiple tries, but we don't want to wait around until it's
* done.
*
* PQflush has the following returns (directly quoting the docs): 0 if
* sucessful, 1 if it was unable to send all the data in the send queue
* yet -1 if it failed for some reason
*/
switch (result = PQflush(conn->pg_conn))
{
case 0:
return PG_ASYNC_WRITE_SUCCESS;
case 1:
return PG_ASYNC_WRITE_TRY_FLUSH;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQflush", result);
}
}
/*
* This function is very similar to walprop_async_write. For more
* information, refer to the comments there.
*/
static bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we are in non-blocking mode, switch out of it. */
if (!ensure_nonblocking_status(conn, false))
return false;
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
return false;
Assert(result == 1);
/* Because the connection is non-blocking, flushing returns 0 or -1 */
if ((result = PQflush(conn->pg_conn)) == -1)
return false;
Assert(result == 0);
return true;
}
/*
* Temporary globally exported walproposer API for postgres.
*/
@@ -447,5 +892,20 @@ const walproposer_api walprop_pg = {
.init_walsender = walprop_pg_init_walsender,
.init_standalone_sync_safekeepers = walprop_pg_init_standalone_sync_safekeepers,
.init_bgworker = walprop_pg_init_bgworker,
.get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr
.get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr,
.get_current_timestamp = walprop_pg_get_current_timestamp,
.get_timeline_id = walprop_pg_get_timeline_id,
.load_libpqwalreceiver = walprop_pg_load_libpqwalreceiver,
.conn_error_message = walprop_error_message,
.conn_status = walprop_status,
.conn_connect_start = walprop_connect_start,
.conn_connect_poll = walprop_connect_poll,
.conn_send_query = walprop_send_query,
.conn_get_query_result = walprop_get_query_result,
.conn_socket = walprop_socket,
.conn_flush = walprop_flush,
.conn_finish = walprop_finish,
.conn_async_read = walprop_async_read,
.conn_async_write = walprop_async_write,
.conn_blocking_write = walprop_blocking_write,
};