mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
Extract libpqwalproposer again.
It will be used by neon_walreader.
This commit is contained in:
96
pgxn/neon/libpqwalproposer.h
Normal file
96
pgxn/neon/libpqwalproposer.h
Normal file
@@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Interface to set of libpq wrappers walproposer and neon_walreader need.
|
||||
* Similar to libpqwalreceiver, but it has blocking connection establishment and
|
||||
* pqexec which don't fit us. Implementation is at walproposer_pg.c.
|
||||
*/
|
||||
#ifndef ___LIBPQWALPROPOSER_H__
|
||||
#define ___LIBPQWALPROPOSER_H__
|
||||
|
||||
/* Re-exported and modified ExecStatusType */
|
||||
typedef enum
|
||||
{
|
||||
/* We received a single CopyBoth result */
|
||||
WP_EXEC_SUCCESS_COPYBOTH,
|
||||
|
||||
/*
|
||||
* Any success result other than a single CopyBoth was received. The
|
||||
* specifics of the result were already logged, but it may be useful to
|
||||
* provide an error message indicating which safekeeper messed up.
|
||||
*
|
||||
* Do not expect PQerrorMessage to be appropriately set.
|
||||
*/
|
||||
WP_EXEC_UNEXPECTED_SUCCESS,
|
||||
|
||||
/*
|
||||
* No result available at this time. Wait until read-ready, then call
|
||||
* again. Internally, this is returned when PQisBusy indicates that
|
||||
* PQgetResult would block.
|
||||
*/
|
||||
WP_EXEC_NEEDS_INPUT,
|
||||
/* Catch-all failure. Check PQerrorMessage. */
|
||||
WP_EXEC_FAILED,
|
||||
} WalProposerExecStatusType;
|
||||
|
||||
/* Possible return values from walprop_async_read */
|
||||
typedef enum
|
||||
{
|
||||
/* The full read was successful. buf now points to the data */
|
||||
PG_ASYNC_READ_SUCCESS,
|
||||
|
||||
/*
|
||||
* The read is ongoing. Wait until the connection is read-ready, then try
|
||||
* again.
|
||||
*/
|
||||
PG_ASYNC_READ_TRY_AGAIN,
|
||||
/* Reading failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_READ_FAIL,
|
||||
} PGAsyncReadResult;
|
||||
|
||||
/* Possible return values from walprop_async_write */
|
||||
typedef enum
|
||||
{
|
||||
/* The write fully completed */
|
||||
PG_ASYNC_WRITE_SUCCESS,
|
||||
|
||||
/*
|
||||
* The write started, but you'll need to call PQflush some more times to
|
||||
* finish it off. We just tried, so it's best to wait until the connection
|
||||
* is read- or write-ready to try again.
|
||||
*
|
||||
* If it becomes read-ready, call PQconsumeInput and flush again. If it
|
||||
* becomes write-ready, just call PQflush.
|
||||
*/
|
||||
PG_ASYNC_WRITE_TRY_FLUSH,
|
||||
/* Writing failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_WRITE_FAIL,
|
||||
} PGAsyncWriteResult;
|
||||
|
||||
/*
|
||||
* This header is included by walproposer.h to define walproposer_api; if we're
|
||||
* building walproposer without pg, ignore libpq part, leaving only interface
|
||||
* types.
|
||||
*/
|
||||
#ifndef WALPROPOSER_LIB
|
||||
|
||||
#include "libpq-fe.h"
|
||||
|
||||
/*
|
||||
* Sometimes working directly with underlying PGconn is simpler, export the
|
||||
* whole thing for simplicity.
|
||||
*/
|
||||
typedef struct WalProposerConn
|
||||
{
|
||||
PGconn *pg_conn;
|
||||
bool is_nonblocking; /* whether the connection is non-blocking */
|
||||
char *recvbuf; /* last received CopyData message from
|
||||
* walprop_async_read */
|
||||
} WalProposerConn;
|
||||
|
||||
extern WalProposerConn *libpqwp_connect_start(char *conninfo);
|
||||
extern bool libpqwp_send_query(WalProposerConn *conn, char *query);
|
||||
extern WalProposerExecStatusType libpqwp_get_query_result(WalProposerConn *conn);
|
||||
extern PGAsyncReadResult libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount);
|
||||
extern void libpqwp_disconnect(WalProposerConn *conn);
|
||||
|
||||
#endif /* WALPROPOSER_LIB */
|
||||
#endif /* ___LIBPQWALPROPOSER_H__ */
|
||||
@@ -316,12 +316,6 @@ ShutdownConnection(Safekeeper *sk)
|
||||
sk->voteResponse.termHistory.entries = NULL;
|
||||
|
||||
sk->wp->api.rm_safekeeper_event_set(sk);
|
||||
|
||||
if (sk->xlogreader)
|
||||
{
|
||||
NeonWALReaderFree(sk->xlogreader);
|
||||
sk->xlogreader = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "utils/uuid.h"
|
||||
#include "replication/walreceiver.h"
|
||||
|
||||
#include "libpqwalproposer.h"
|
||||
#include "neon_walreader.h"
|
||||
|
||||
#define SK_MAGIC 0xCafeCeefu
|
||||
@@ -24,43 +25,9 @@
|
||||
*/
|
||||
#define WL_NO_EVENTS 0
|
||||
|
||||
struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */
|
||||
struct WalProposerConn; /* Defined in libpqwalproposer.h */
|
||||
typedef struct WalProposerConn WalProposerConn;
|
||||
|
||||
/* Possible return values from ReadPGAsync */
|
||||
typedef enum
|
||||
{
|
||||
/* The full read was successful. buf now points to the data */
|
||||
PG_ASYNC_READ_SUCCESS,
|
||||
|
||||
/*
|
||||
* The read is ongoing. Wait until the connection is read-ready, then try
|
||||
* again.
|
||||
*/
|
||||
PG_ASYNC_READ_TRY_AGAIN,
|
||||
/* Reading failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_READ_FAIL,
|
||||
} PGAsyncReadResult;
|
||||
|
||||
/* Possible return values from WritePGAsync */
|
||||
typedef enum
|
||||
{
|
||||
/* The write fully completed */
|
||||
PG_ASYNC_WRITE_SUCCESS,
|
||||
|
||||
/*
|
||||
* The write started, but you'll need to call PQflush some more times to
|
||||
* finish it off. We just tried, so it's best to wait until the connection
|
||||
* is read- or write-ready to try again.
|
||||
*
|
||||
* If it becomes read-ready, call PQconsumeInput and flush again. If it
|
||||
* becomes write-ready, just call PQflush.
|
||||
*/
|
||||
PG_ASYNC_WRITE_TRY_FLUSH,
|
||||
/* Writing failed. Check PQerrorMessage(conn) */
|
||||
PG_ASYNC_WRITE_FAIL,
|
||||
} PGAsyncWriteResult;
|
||||
|
||||
/*
|
||||
* WAL safekeeper state, which is used to wait for some event.
|
||||
*
|
||||
@@ -405,31 +372,6 @@ typedef enum
|
||||
*/
|
||||
} WalProposerConnectPollStatusType;
|
||||
|
||||
/* Re-exported and modified ExecStatusType */
|
||||
typedef enum
|
||||
{
|
||||
/* We received a single CopyBoth result */
|
||||
WP_EXEC_SUCCESS_COPYBOTH,
|
||||
|
||||
/*
|
||||
* Any success result other than a single CopyBoth was received. The
|
||||
* specifics of the result were already logged, but it may be useful to
|
||||
* provide an error message indicating which safekeeper messed up.
|
||||
*
|
||||
* Do not expect PQerrorMessage to be appropriately set.
|
||||
*/
|
||||
WP_EXEC_UNEXPECTED_SUCCESS,
|
||||
|
||||
/*
|
||||
* No result available at this time. Wait until read-ready, then call
|
||||
* again. Internally, this is returned when PQisBusy indicates that
|
||||
* PQgetResult would block.
|
||||
*/
|
||||
WP_EXEC_NEEDS_INPUT,
|
||||
/* Catch-all failure. Check PQerrorMessage. */
|
||||
WP_EXEC_FAILED,
|
||||
} WalProposerExecStatusType;
|
||||
|
||||
/* Re-exported ConnStatusType */
|
||||
typedef enum
|
||||
{
|
||||
@@ -712,6 +654,7 @@ extern void WalProposerStart(WalProposer *wp);
|
||||
extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos);
|
||||
extern void WalProposerPoll(WalProposer *wp);
|
||||
extern void WalProposerFree(WalProposer *wp);
|
||||
|
||||
/*
|
||||
* WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to
|
||||
* recreate set from scratch, hence the export.
|
||||
|
||||
@@ -43,11 +43,13 @@
|
||||
#include "utils/ps_status.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "neon_walreader.h"
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include "libpqwalproposer.h"
|
||||
#include "neon.h"
|
||||
#include "neon_walreader.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */
|
||||
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender*
|
||||
* message header */
|
||||
@@ -542,14 +544,6 @@ walprop_pg_load_libpqwalreceiver(void)
|
||||
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
|
||||
}
|
||||
|
||||
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
|
||||
struct WalProposerConn
|
||||
{
|
||||
PGconn *pg_conn;
|
||||
bool is_nonblocking; /* whether the connection is non-blocking */
|
||||
char *recvbuf; /* last received data from walprop_async_read */
|
||||
};
|
||||
|
||||
/* Helper function */
|
||||
static bool
|
||||
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
|
||||
@@ -587,16 +581,17 @@ walprop_status(Safekeeper *sk)
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_connect_start(Safekeeper *sk)
|
||||
WalProposerConn *
|
||||
libpqwp_connect_start(char *conninfo)
|
||||
{
|
||||
|
||||
PGconn *pg_conn;
|
||||
WalProposerConn *conn;
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n;
|
||||
char *password = neon_auth_token;
|
||||
|
||||
Assert(sk->conn == NULL);
|
||||
|
||||
/*
|
||||
* Connect using the given connection string. If the NEON_AUTH_TOKEN
|
||||
@@ -615,7 +610,7 @@ walprop_connect_start(Safekeeper *sk)
|
||||
n++;
|
||||
}
|
||||
keywords[n] = "dbname";
|
||||
values[n] = sk->conninfo;
|
||||
values[n] = conninfo;
|
||||
n++;
|
||||
keywords[n] = NULL;
|
||||
values[n] = NULL;
|
||||
@@ -636,11 +631,20 @@ walprop_connect_start(Safekeeper *sk)
|
||||
* palloc will exit on failure though, so there's not much we could do if
|
||||
* it *did* fail.
|
||||
*/
|
||||
sk->conn = palloc(sizeof(WalProposerConn));
|
||||
sk->conn->pg_conn = pg_conn;
|
||||
sk->conn->is_nonblocking = false; /* connections always start in
|
||||
* blocking mode */
|
||||
sk->conn->recvbuf = NULL;
|
||||
conn = palloc(sizeof(WalProposerConn));
|
||||
conn->pg_conn = pg_conn;
|
||||
conn->is_nonblocking = false; /* connections always start in blocking
|
||||
* mode */
|
||||
conn->recvbuf = NULL;
|
||||
return conn;
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_connect_start(Safekeeper *sk)
|
||||
{
|
||||
Assert(sk->conn == NULL);
|
||||
sk->conn = libpqwp_connect_start(sk->conninfo);
|
||||
|
||||
}
|
||||
|
||||
static WalProposerConnectPollStatusType
|
||||
@@ -684,26 +688,33 @@ walprop_connect_poll(Safekeeper *sk)
|
||||
return return_val;
|
||||
}
|
||||
|
||||
static bool
|
||||
walprop_send_query(Safekeeper *sk, char *query)
|
||||
extern bool
|
||||
libpqwp_send_query(WalProposerConn *conn, char *query)
|
||||
{
|
||||
/*
|
||||
* We need to be in blocking mode for sending the query to run without
|
||||
* requiring a call to PQflush
|
||||
*/
|
||||
if (!ensure_nonblocking_status(sk->conn, false))
|
||||
if (!ensure_nonblocking_status(conn, false))
|
||||
return false;
|
||||
|
||||
/* PQsendQuery returns 1 on success, 0 on failure */
|
||||
if (!PQsendQuery(sk->conn->pg_conn, query))
|
||||
if (!PQsendQuery(conn->pg_conn, query))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static WalProposerExecStatusType
|
||||
walprop_get_query_result(Safekeeper *sk)
|
||||
static bool
|
||||
walprop_send_query(Safekeeper *sk, char *query)
|
||||
{
|
||||
return libpqwp_send_query(sk->conn, query);
|
||||
}
|
||||
|
||||
WalProposerExecStatusType
|
||||
libpqwp_get_query_result(WalProposerConn *conn)
|
||||
{
|
||||
|
||||
PGresult *result;
|
||||
WalProposerExecStatusType return_val;
|
||||
|
||||
@@ -711,14 +722,14 @@ walprop_get_query_result(Safekeeper *sk)
|
||||
char *unexpected_success = NULL;
|
||||
|
||||
/* Consume any input that we might be missing */
|
||||
if (!PQconsumeInput(sk->conn->pg_conn))
|
||||
if (!PQconsumeInput(conn->pg_conn))
|
||||
return WP_EXEC_FAILED;
|
||||
|
||||
if (PQisBusy(sk->conn->pg_conn))
|
||||
if (PQisBusy(conn->pg_conn))
|
||||
return WP_EXEC_NEEDS_INPUT;
|
||||
|
||||
|
||||
result = PQgetResult(sk->conn->pg_conn);
|
||||
result = PQgetResult(conn->pg_conn);
|
||||
|
||||
/*
|
||||
* PQgetResult returns NULL only if getting the result was successful &
|
||||
@@ -779,6 +790,12 @@ walprop_get_query_result(Safekeeper *sk)
|
||||
return return_val;
|
||||
}
|
||||
|
||||
static WalProposerExecStatusType
|
||||
walprop_get_query_result(Safekeeper *sk)
|
||||
{
|
||||
return libpqwp_get_query_result(sk->conn);
|
||||
}
|
||||
|
||||
static pgsocket
|
||||
walprop_socket(Safekeeper *sk)
|
||||
{
|
||||
@@ -791,38 +808,20 @@ walprop_flush(Safekeeper *sk)
|
||||
return (PQflush(sk->conn->pg_conn));
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_finish(Safekeeper *sk)
|
||||
PGAsyncReadResult
|
||||
libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount)
|
||||
{
|
||||
if (!sk->conn)
|
||||
return;
|
||||
|
||||
if (sk->conn->recvbuf != NULL)
|
||||
PQfreemem(sk->conn->recvbuf);
|
||||
PQfinish(sk->conn->pg_conn);
|
||||
pfree(sk->conn);
|
||||
sk->conn = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive a message from the safekeeper.
|
||||
*
|
||||
* On success, the data is placed in *buf. It is valid until the next call
|
||||
* to this function.
|
||||
*/
|
||||
static PGAsyncReadResult
|
||||
walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
{
|
||||
int result;
|
||||
|
||||
if (sk->conn->recvbuf != NULL)
|
||||
if (conn->recvbuf != NULL)
|
||||
{
|
||||
PQfreemem(sk->conn->recvbuf);
|
||||
sk->conn->recvbuf = NULL;
|
||||
PQfreemem(conn->recvbuf);
|
||||
conn->recvbuf = NULL;
|
||||
}
|
||||
|
||||
/* Call PQconsumeInput so that we have the data we need */
|
||||
if (!PQconsumeInput(sk->conn->pg_conn))
|
||||
if (!PQconsumeInput(conn->pg_conn))
|
||||
{
|
||||
*amount = 0;
|
||||
*buf = NULL;
|
||||
@@ -840,7 +839,7 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
* sometimes be triggered by the server returning an ErrorResponse (which
|
||||
* also happens to have the effect that the copy is done).
|
||||
*/
|
||||
switch (result = PQgetCopyData(sk->conn->pg_conn, &sk->conn->recvbuf, true))
|
||||
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
|
||||
{
|
||||
case 0:
|
||||
*amount = 0;
|
||||
@@ -855,7 +854,7 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
* We can check PQgetResult to make sure that the server
|
||||
* failed; it'll always result in PGRES_FATAL_ERROR
|
||||
*/
|
||||
ExecStatusType status = PQresultStatus(PQgetResult(sk->conn->pg_conn));
|
||||
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
|
||||
|
||||
if (status != PGRES_FATAL_ERROR)
|
||||
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
|
||||
@@ -876,11 +875,23 @@ walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
default:
|
||||
/* Positive values indicate the size of the returned result */
|
||||
*amount = result;
|
||||
*buf = sk->conn->recvbuf;
|
||||
*buf = conn->recvbuf;
|
||||
return PG_ASYNC_READ_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive a message from the safekeeper.
|
||||
*
|
||||
* On success, the data is placed in *buf. It is valid until the next call
|
||||
* to this function.
|
||||
*/
|
||||
static PGAsyncReadResult
|
||||
walprop_async_read(Safekeeper *sk, char **buf, int *amount)
|
||||
{
|
||||
return libpqwp_async_read(sk->conn, buf, amount);
|
||||
}
|
||||
|
||||
static PGAsyncWriteResult
|
||||
walprop_async_write(Safekeeper *sk, void const *buf, size_t size)
|
||||
{
|
||||
@@ -963,6 +974,32 @@ walprop_blocking_write(Safekeeper *sk, void const *buf, size_t size)
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
libpqwp_disconnect(WalProposerConn *conn)
|
||||
{
|
||||
if (conn->recvbuf != NULL)
|
||||
PQfreemem(conn->recvbuf);
|
||||
PQfinish(conn->pg_conn);
|
||||
pfree(conn);
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_finish(Safekeeper *sk)
|
||||
{
|
||||
if (sk->conn)
|
||||
{
|
||||
libpqwp_disconnect(sk->conn);
|
||||
sk->conn = NULL;
|
||||
}
|
||||
|
||||
/* free xlogreader */
|
||||
if (sk->xlogreader)
|
||||
{
|
||||
NeonWALReaderFree(sk->xlogreader);
|
||||
sk->xlogreader = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Subscribe for new WAL and stream it in the loop to safekeepers.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user