Move walproposer state into struct (#5364)

This patch extracts all postgres-dependent functions in a separate
`walproposer_api` functions struct. It helps to compile walproposer as
static library without compiling all other postgres server code. This is
useful to allow calling walproposer C code from Rust, or linking this
library with anything else.

All global variables containing walproposer state were extracted to a
separate `WalProposer` struct. This makes it possible to run several
walproposers in the same process, in separate threads.

There were no logic changes and PR mostly consists of shuffling
functions between several files. We have a good test coverage for
walproposer code and I've seen no issues with tests while I was
refactoring it, so I don't expect any issues after merge.

ref https://github.com/neondatabase/neon/issues/547

---------

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
This commit is contained in:
Arthur Petukhovsky
2023-10-05 20:48:01 +03:00
committed by GitHub
parent 522aaca718
commit 8b15252f98
11 changed files with 2701 additions and 2356 deletions

View File

@@ -7,12 +7,12 @@ OBJS = \
extension_server.o \
file_cache.o \
libpagestore.o \
libpqwalproposer.o \
neon.o \
neon_utils.o \
pagestore_smgr.o \
relsize_cache.o \
walproposer.o \
walproposer_utils.o \
walproposer_pg.o \
control_plane_connector.o
PG_CPPFLAGS = -I$(libpq_srcdir)

View File

@@ -30,7 +30,7 @@
#include "neon.h"
#include "walproposer.h"
#include "walproposer_utils.h"
#include "neon_utils.h"
#define PageStoreTrace DEBUG5

View File

@@ -1,424 +0,0 @@
#include "postgres.h"
#include "libpq-fe.h"
#include "neon.h"
#include "walproposer.h"
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
struct WalProposerConn
{
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from
* walprop_async_read */
};
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
{
/* If we're already correctly blocking or nonblocking, all good */
if (is_nonblocking == conn->is_nonblocking)
return true;
/* Otherwise, set it appropriately */
if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1)
return false;
conn->is_nonblocking = is_nonblocking;
return true;
}
/* Exported function definitions */
char *
walprop_error_message(WalProposerConn *conn)
{
return PQerrorMessage(conn->pg_conn);
}
WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
switch (PQstatus(conn->pg_conn))
{
case CONNECTION_OK:
return WP_CONNECTION_OK;
case CONNECTION_BAD:
return WP_CONNECTION_BAD;
default:
return WP_CONNECTION_IN_PROGRESS;
}
}
WalProposerConn *
walprop_connect_start(char *conninfo, char *password)
{
WalProposerConn *conn;
PGconn *pg_conn;
const char *keywords[3];
const char *values[3];
int n;
/*
* Connect using the given connection string. If the
* NEON_AUTH_TOKEN environment variable was set, use that as
* the password.
*
* The connection options are parsed in the order they're given, so
* when we set the password before the connection string, the
* connection string can override the password from the env variable.
* Seems useful, although we don't currently use that capability
* anywhere.
*/
n = 0;
if (password)
{
keywords[n] = "password";
values[n] = password;
n++;
}
keywords[n] = "dbname";
values[n] = conninfo;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pg_conn = PQconnectStartParams(keywords, values, 1);
/*
* Allocation of a PQconn can fail, and will return NULL. We want to fully
* replicate the behavior of PQconnectStart here.
*/
if (!pg_conn)
return NULL;
/*
* And in theory this allocation can fail as well, but it's incredibly
* unlikely if we just successfully allocated a PGconn.
*
* palloc will exit on failure though, so there's not much we could do if
* it *did* fail.
*/
conn = palloc(sizeof(WalProposerConn));
conn->pg_conn = pg_conn;
conn->is_nonblocking = false; /* connections always start in blocking
* mode */
conn->recvbuf = NULL;
return conn;
}
WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
WalProposerConnectPollStatusType return_val;
switch (PQconnectPoll(conn->pg_conn))
{
case PGRES_POLLING_FAILED:
return_val = WP_CONN_POLLING_FAILED;
break;
case PGRES_POLLING_READING:
return_val = WP_CONN_POLLING_READING;
break;
case PGRES_POLLING_WRITING:
return_val = WP_CONN_POLLING_WRITING;
break;
case PGRES_POLLING_OK:
return_val = WP_CONN_POLLING_OK;
break;
/*
* There's a comment at its source about this constant being
* unused. We'll expect it's never returned.
*/
case PGRES_POLLING_ACTIVE:
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
/*
* This return is never actually reached, but it's here to make
* the compiler happy
*/
return WP_CONN_POLLING_FAILED;
default:
Assert(false);
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
}
return return_val;
}
bool
walprop_send_query(WalProposerConn *conn, char *query)
{
/*
* We need to be in blocking mode for sending the query to run without
* requiring a call to PQflush
*/
if (!ensure_nonblocking_status(conn, false))
return false;
/* PQsendQuery returns 1 on success, 0 on failure */
if (!PQsendQuery(conn->pg_conn, query))
return false;
return true;
}
WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
PGresult *result;
WalProposerExecStatusType return_val;
/* Marker variable if we need to log an unexpected success result */
char *unexpected_success = NULL;
/* Consume any input that we might be missing */
if (!PQconsumeInput(conn->pg_conn))
return WP_EXEC_FAILED;
if (PQisBusy(conn->pg_conn))
return WP_EXEC_NEEDS_INPUT;
result = PQgetResult(conn->pg_conn);
/*
* PQgetResult returns NULL only if getting the result was successful &
* there's no more of the result to get.
*/
if (!result)
{
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
return WP_EXEC_UNEXPECTED_SUCCESS;
}
/* Helper macro to reduce boilerplate */
#define UNEXPECTED_SUCCESS(msg) \
return_val = WP_EXEC_UNEXPECTED_SUCCESS; \
unexpected_success = msg; \
break;
switch (PQresultStatus(result))
{
/* "true" success case */
case PGRES_COPY_BOTH:
return_val = WP_EXEC_SUCCESS_COPYBOTH;
break;
/* Unexpected success case */
case PGRES_EMPTY_QUERY:
UNEXPECTED_SUCCESS("empty query return");
case PGRES_COMMAND_OK:
UNEXPECTED_SUCCESS("data-less command end");
case PGRES_TUPLES_OK:
UNEXPECTED_SUCCESS("tuples return");
case PGRES_COPY_OUT:
UNEXPECTED_SUCCESS("'Copy Out' response");
case PGRES_COPY_IN:
UNEXPECTED_SUCCESS("'Copy In' response");
case PGRES_SINGLE_TUPLE:
UNEXPECTED_SUCCESS("single tuple return");
case PGRES_PIPELINE_SYNC:
UNEXPECTED_SUCCESS("pipeline sync point");
/* Failure cases */
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_PIPELINE_ABORTED:
return_val = WP_EXEC_FAILED;
break;
default:
Assert(false);
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
}
if (unexpected_success)
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
return return_val;
}
pgsocket
walprop_socket(WalProposerConn *conn)
{
return PQsocket(conn->pg_conn);
}
int
walprop_flush(WalProposerConn *conn)
{
return (PQflush(conn->pg_conn));
}
void
walprop_finish(WalProposerConn *conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
PQfinish(conn->pg_conn);
pfree(conn);
}
/*
* Receive a message from the safekeeper.
*
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
PGAsyncReadResult
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
{
int result;
if (conn->recvbuf != NULL)
{
PQfreemem(conn->recvbuf);
conn->recvbuf = NULL;
}
/* Call PQconsumeInput so that we have the data we need */
if (!PQconsumeInput(conn->pg_conn))
{
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
/*
* The docs for PQgetCopyData list the return values as: 0 if the copy is
* still in progress, but no "complete row" is available -1 if the copy is
* done -2 if an error occurred (> 0) if it was successful; that value is
* the amount transferred.
*
* The protocol we use between walproposer and safekeeper means that we
* *usually* wouldn't expect to see that the copy is done, but this can
* sometimes be triggered by the server returning an ErrorResponse (which
* also happens to have the effect that the copy is done).
*/
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
{
case 0:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_TRY_AGAIN;
case -1:
{
/*
* If we get -1, it's probably because of a server error; the
* safekeeper won't normally send a CopyDone message.
*
* We can check PQgetResult to make sure that the server
* failed; it'll always result in PGRES_FATAL_ERROR
*/
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
/*
* If there was actually an error, it'll be properly reported
* by calls to PQerrorMessage -- we don't have to do anything
* else
*/
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
case -2:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
default:
/* Positive values indicate the size of the returned result */
*amount = result;
*buf = conn->recvbuf;
return PG_ASYNC_READ_SUCCESS;
}
}
PGAsyncWriteResult
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we aren't in non-blocking mode, switch to it. */
if (!ensure_nonblocking_status(conn, true))
return PG_ASYNC_WRITE_FAIL;
/*
* The docs for PQputcopyData list the return values as: 1 if the data was
* queued, 0 if it was not queued because of full buffers, or -1 if an
* error occurred
*/
result = PQputCopyData(conn->pg_conn, buf, size);
/*
* We won't get a result of zero because walproposer always empties the
* connection's buffers before sending more
*/
Assert(result != 0);
switch (result)
{
case 1:
/* good -- continue */
break;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQputCopyData", result);
}
/*
* After queueing the data, we still need to flush to get it to send. This
* might take multiple tries, but we don't want to wait around until it's
* done.
*
* PQflush has the following returns (directly quoting the docs): 0 if
* sucessful, 1 if it was unable to send all the data in the send queue
* yet -1 if it failed for some reason
*/
switch (result = PQflush(conn->pg_conn))
{
case 0:
return PG_ASYNC_WRITE_SUCCESS;
case 1:
return PG_ASYNC_WRITE_TRY_FLUSH;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQflush", result);
}
}
/*
* This function is very similar to walprop_async_write. For more
* information, refer to the comments there.
*/
bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we are in non-blocking mode, switch out of it. */
if (!ensure_nonblocking_status(conn, false))
return false;
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
return false;
Assert(result == 1);
/* Because the connection is non-blocking, flushing returns 0 or -1 */
if ((result = PQflush(conn->pg_conn)) == -1)
return false;
Assert(result == 0);
return true;
}

View File

@@ -18,6 +18,10 @@ extern char *neon_auth_token;
extern char *neon_timeline;
extern char *neon_tenant;
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
@@ -30,4 +34,10 @@ extern void pg_init_extension_server(void);
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
extern uint64 BackpressureThrottlingTime(void);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
#endif /* NEON_H */

116
pgxn/neon/neon_utils.c Normal file
View File

@@ -0,0 +1,116 @@
#include "postgres.h"
#include "access/timeline.h"
#include "access/xlogutils.h"
#include "common/logging.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
#include "libpq-fe.h"
#include <netinet/tcp.h>
#include <unistd.h>
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
#include "access/xlogrecovery.h"
#endif
#if PG_MAJORVERSION_NUM >= 16
#include "utils/guc.h"
#endif
/*
* Convert a character which represents a hexadecimal digit to an integer.
*
* Returns -1 if the character is not a hexadecimal digit.
*/
int
HexDecodeChar(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
}
/*
* Decode a hex string into a byte string, 2 hex chars per byte.
*
* Returns false if invalid characters are encountered; otherwise true.
*/
bool
HexDecodeString(uint8 *result, char *input, int nbytes)
{
int i;
for (i = 0; i < nbytes; ++i)
{
int n1 = HexDecodeChar(input[i * 2]);
int n2 = HexDecodeChar(input[i * 2 + 1]);
if (n1 < 0 || n2 < 0)
return false;
result[i] = n1 * 16 + n2;
}
return true;
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint32
pq_getmsgint32_le(StringInfo msg)
{
uint32 n32;
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
return n32;
}
/* --------------------------------
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint64
pq_getmsgint64_le(StringInfo msg)
{
uint64 n64;
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
return n64;
}
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
void
pq_sendint32_le(StringInfo buf, uint32 i)
{
enlargeStringInfo(buf, sizeof(uint32));
memcpy(buf->data + buf->len, &i, sizeof(uint32));
buf->len += sizeof(uint32);
}
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
void
pq_sendint64_le(StringInfo buf, uint64 i)
{
enlargeStringInfo(buf, sizeof(uint64));
memcpy(buf->data + buf->len, &i, sizeof(uint64));
buf->len += sizeof(uint64);
}

12
pgxn/neon/neon_utils.h Normal file
View File

@@ -0,0 +1,12 @@
#ifndef __NEON_UTILS_H__
#define __NEON_UTILS_H__
#include "postgres.h"
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
#endif /* __NEON_UTILS_H__ */

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,8 @@
#ifndef __NEON_WALPROPOSER_H__
#define __NEON_WALPROPOSER_H__
#include "access/xlogdefs.h"
#include "postgres.h"
#include "access/xlogdefs.h"
#include "port.h"
#include "access/xlog_internal.h"
#include "access/transam.h"
@@ -16,29 +16,15 @@
#define MAX_SAFEKEEPERS 32
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
* message */
#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender*
* message header */
#define XLOG_HDR_END_POS (1 + 8) /* offset of end position in wal sender*
* message header */
/*
* In the spirit of WL_SOCKET_READABLE and others, this corresponds to no events having occurred,
* because all WL_* events are given flags equal to some (1 << i), starting from i = 0
*/
#define WL_NO_EVENTS 0
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern bool am_wal_proposer;
struct WalProposerConn; /* Defined in libpqwalproposer */
struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */
typedef struct WalProposerConn WalProposerConn;
struct WalMessage;
typedef struct WalMessage WalMessage;
/* Possible return values from ReadPGAsync */
typedef enum
{
@@ -52,7 +38,7 @@ typedef enum
PG_ASYNC_READ_TRY_AGAIN,
/* Reading failed. Check PQerrorMessage(conn) */
PG_ASYNC_READ_FAIL,
} PGAsyncReadResult;
} PGAsyncReadResult;
/* Possible return values from WritePGAsync */
typedef enum
@@ -71,7 +57,7 @@ typedef enum
PG_ASYNC_WRITE_TRY_FLUSH,
/* Writing failed. Check PQerrorMessage(conn) */
PG_ASYNC_WRITE_FAIL,
} PGAsyncWriteResult;
} PGAsyncWriteResult;
/*
* WAL safekeeper state, which is used to wait for some event.
@@ -147,7 +133,7 @@ typedef enum
* to read.
*/
SS_ACTIVE,
} SafekeeperState;
} SafekeeperState;
/* Consensus logical timestamp. */
typedef uint64 term_t;
@@ -171,12 +157,12 @@ typedef struct ProposerGreeting
uint8 tenant_id[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
} ProposerGreeting;
typedef struct AcceptorProposerMessage
{
uint64 tag;
} AcceptorProposerMessage;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
@@ -186,7 +172,7 @@ typedef struct AcceptorGreeting
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
} AcceptorGreeting;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
@@ -196,20 +182,20 @@ typedef struct VoteRequest
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
} VoteRequest;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
{
term_t term;
XLogRecPtr lsn;
} TermSwitchEntry;
} TermSwitchEntry;
typedef struct TermHistory
{
uint32 n_entries;
TermSwitchEntry *entries;
} TermHistory;
} TermHistory;
/* Vote itself, sent from safekeeper to proposer */
typedef struct VoteResponse
@@ -227,7 +213,7 @@ typedef struct VoteResponse
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
} VoteResponse;
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
@@ -243,7 +229,7 @@ typedef struct ProposerElected
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
@@ -268,7 +254,7 @@ typedef struct AppendRequestHeader
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
} AppendRequestHeader;
/*
* Hot standby feedback received from replica
@@ -278,7 +264,7 @@ typedef struct HotStandbyFeedback
TimestampTz ts;
FullTransactionId xmin;
FullTransactionId catalog_xmin;
} HotStandbyFeedback;
} HotStandbyFeedback;
typedef struct PageserverFeedback
{
@@ -289,7 +275,7 @@ typedef struct PageserverFeedback
XLogRecPtr disk_consistent_lsn;
XLogRecPtr remote_consistent_lsn;
TimestampTz replytime;
} PageserverFeedback;
} PageserverFeedback;
typedef struct WalproposerShmemState
{
@@ -297,7 +283,7 @@ typedef struct WalproposerShmemState
PageserverFeedback feedback;
term_t mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
} WalproposerShmemState;
} WalproposerShmemState;
/*
* Report safekeeper state to proposer
@@ -321,17 +307,22 @@ typedef struct AppendResponse
/* and custom neon feedback. */
/* This part of the message is extensible. */
PageserverFeedback rf;
} AppendResponse;
} AppendResponse;
/* PageserverFeedback is extensible part of the message that is parsed separately */
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
struct WalProposer;
typedef struct WalProposer WalProposer;
/*
* Descriptor of safekeeper
*/
typedef struct Safekeeper
{
WalProposer *wp;
char const *host;
char const *port;
@@ -340,7 +331,7 @@ typedef struct Safekeeper
*
* May contain private information like password and should not be logged.
*/
char conninfo[MAXCONNINFO];
char conninfo[MAXCONNINFO];
/*
* postgres protocol connection to the WAL acceptor
@@ -373,27 +364,12 @@ typedef struct Safekeeper
int eventPos; /* position in wait event set. Equal to -1 if*
* no event */
SafekeeperState state; /* safekeeper state machine state */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
AppendResponse appendResponse; /* feedback for master */
} Safekeeper;
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(void);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
extern void StartProposerReplication(StartReplicationCmd *cmd);
extern Size WalproposerShmemSize(void);
extern bool WalproposerShmemInit(void);
extern void replication_feedback_set(PageserverFeedback *rf);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
/* libpqwalproposer hooks & helper type */
/* Re-exported PostgresPollingStatusType */
typedef enum
{
@@ -406,7 +382,7 @@ typedef enum
* 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused.
* We've removed it here to avoid clutter.
*/
} WalProposerConnectPollStatusType;
} WalProposerConnectPollStatusType;
/* Re-exported and modified ExecStatusType */
typedef enum
@@ -431,7 +407,7 @@ typedef enum
WP_EXEC_NEEDS_INPUT,
/* Catch-all failure. Check PQerrorMessage. */
WP_EXEC_FAILED,
} WalProposerExecStatusType;
} WalProposerExecStatusType;
/* Re-exported ConnStatusType */
typedef enum
@@ -445,67 +421,252 @@ typedef enum
* that extra functionality, so we collect them into a single tag here.
*/
WP_CONNECTION_IN_PROGRESS,
} WalProposerConnStatusType;
/* Re-exported PQerrorMessage */
extern char *walprop_error_message(WalProposerConn *conn);
/* Re-exported PQstatus */
extern WalProposerConnStatusType walprop_status(WalProposerConn *conn);
/* Re-exported PQconnectStart */
extern WalProposerConn * walprop_connect_start(char *conninfo, char *password);
/* Re-exported PQconectPoll */
extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn);
/* Blocking wrapper around PQsendQuery */
extern bool walprop_send_query(WalProposerConn *conn, char *query);
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn);
/* Re-exported PQsocket */
extern pgsocket walprop_socket(WalProposerConn *conn);
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
extern int walprop_flush(WalProposerConn *conn);
/* Re-exported PQfinish */
extern void walprop_finish(WalProposerConn *conn);
} WalProposerConnStatusType;
/*
* Ergonomic wrapper around PGgetCopyData
*
* Reads a CopyData block from a safekeeper, setting *amount to the number
* of bytes returned.
*
* This function is allowed to assume certain properties specific to the
* protocol with the safekeepers, so it should not be used as-is for any
* other purpose.
*
* Note: If possible, using <AsyncRead> is generally preferred, because it
* performs a bit of extra checking work that's always required and is normally
* somewhat verbose.
* Collection of hooks for walproposer, to call postgres functions,
* read WAL and send it over the network.
*/
extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount);
typedef struct walproposer_api
{
/*
* Get WalproposerShmemState. This is used to store information about last
* elected term.
*/
WalproposerShmemState *(*get_shmem_state) (void);
/*
* Start receiving notifications about new WAL. This is an infinite loop
* which calls WalProposerBroadcast() and WalProposerPoll() to send the
* WAL.
*/
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos);
/* Get pointer to the latest available WAL. */
XLogRecPtr (*get_flush_rec_ptr) (void);
/* Get current time. */
TimestampTz (*get_current_timestamp) (void);
/* Get postgres timeline. */
TimeLineID (*get_timeline_id) (void);
/* Current error message, aka PQerrorMessage. */
char *(*conn_error_message) (WalProposerConn *conn);
/* Connection status, aka PQstatus. */
WalProposerConnStatusType (*conn_status) (WalProposerConn *conn);
/* Start the connection, aka PQconnectStart. */
WalProposerConn *(*conn_connect_start) (char *conninfo);
/* Poll an asynchronous connection, aka PQconnectPoll. */
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn *conn);
/* Send a blocking SQL query, aka PQsendQuery. */
bool (*conn_send_query) (WalProposerConn *conn, char *query);
/* Read the query result, aka PQgetResult. */
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn *conn);
/* Flush buffer to the network, aka PQflush. */
int (*conn_flush) (WalProposerConn *conn);
/* Close the connection, aka PQfinish. */
void (*conn_finish) (WalProposerConn *conn);
/* Try to read CopyData message, aka PQgetCopyData. */
PGAsyncReadResult (*conn_async_read) (WalProposerConn *conn, char **buf, int *amount);
/* Try to write CopyData message, aka PQputCopyData. */
PGAsyncWriteResult (*conn_async_write) (WalProposerConn *conn, void const *buf, size_t size);
/* Blocking CopyData write, aka PQputCopyData + PQflush. */
bool (*conn_blocking_write) (WalProposerConn *conn, void const *buf, size_t size);
/* Download WAL from startpos to endpos and make it available locally. */
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
/* Read WAL from disk to buf. */
void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count);
/* Allocate WAL reader. */
XLogReaderState *(*wal_reader_allocate) (void);
/* Deallocate event set. */
void (*free_event_set) (void);
/* Initialize event set. */
void (*init_event_set) (int n_safekeepers);
/* Update events for an existing safekeeper connection. */
void (*update_event_set) (Safekeeper *sk, uint32 events);
/* Add a new safekeeper connection to the event set. */
void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events);
/*
* Wait until some event happens: - timeout is reached - socket event for
* safekeeper connection - new WAL is available
*
* Returns 0 if timeout is reached, 1 if some event happened. Updates
* events mask to indicate events and sets sk to the safekeeper which has
* an event.
*/
int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events);
/* Read random bytes. */
bool (*strong_random) (void *buf, size_t len);
/*
* Get a basebackup LSN. Used to cross-validate with the latest available
* LSN on the safekeepers.
*/
XLogRecPtr (*get_redo_start_lsn) (void);
/*
* Finish sync safekeepers with the given LSN. This function should not
* return and should exit the program.
*/
void (*finish_sync_safekeepers) (XLogRecPtr lsn);
/*
* Called after every new message from the safekeeper. Used to propagate
* backpressure feedback and to confirm WAL persistence (has been commited
* on the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
/*
* Called on peer_horizon_lsn updates. Used to advance replication slot
* and to free up disk space by deleting unnecessary WAL.
*/
void (*confirm_wal_streamed) (XLogRecPtr lsn);
} walproposer_api;
/*
* Ergonomic wrapper around PQputCopyData + PQflush
*
* Starts to write a CopyData block to a safekeeper.
*
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
* Configuration of the WAL proposer.
*/
extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size);
typedef struct WalProposerConfig
{
/* hex-encoded TenantId cstr */
char *neon_tenant;
/* hex-encoded TimelineId cstr */
char *neon_timeline;
/*
* Comma-separated list of safekeepers, in the following format:
* host1:port1,host2:port2,host3:port3
*
* This cstr should be editable.
*/
char *safekeepers_list;
/*
* WalProposer reconnects to offline safekeepers once in this interval.
* Time is in milliseconds.
*/
int safekeeper_reconnect_timeout;
/*
* WalProposer terminates the connection if it doesn't receive any message
* from the safekeeper in this interval. Time is in milliseconds.
*/
int safekeeper_connection_timeout;
/*
* WAL segment size. Will be passed to safekeepers in greet request. Also
* used to detect page headers.
*/
int wal_segment_size;
/*
* If safekeeper was started in sync mode, walproposer will not subscribe
* for new WAL and will exit when quorum of safekeepers will be synced to
* the latest available LSN.
*/
bool syncSafekeepers;
/* Will be passed to safekeepers in greet request. */
uint64 systemId;
} WalProposerConfig;
/*
* Blocking equivalent to walprop_async_write_fn
*
* Returns 'true' if successful, 'false' on failure.
* WAL proposer state.
*/
extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size);
typedef struct WalProposer
{
WalProposerConfig *config;
int n_safekeepers;
extern uint64 BackpressureThrottlingTime(void);
/* (n_safekeepers / 2) + 1 */
int quorum;
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;
/* last commitLsn broadcasted to safekeepers */
XLogRecPtr lastSentCommitLsn;
ProposerGreeting greetRequest;
/* Vote request for safekeeper */
VoteRequest voteRequest;
/*
* Minimal LSN which may be needed for recovery of some safekeeper,
* record-aligned (first record which might not yet received by someone).
*/
XLogRecPtr truncateLsn;
/*
* Term of the proposer. We want our term to be highest and unique, so we
* collect terms from safekeepers quorum, choose max and +1. After that
* our term is fixed and must not change. If we observe that some
* safekeeper has higher term, it means that we have another running
* compute, so we must stop immediately.
*/
term_t propTerm;
/* term history of the proposer */
TermHistory propTermHistory;
/* epoch start lsn of the proposer */
XLogRecPtr propEpochStartLsn;
/* Most advanced acceptor epoch */
term_t donorEpoch;
/* Most advanced acceptor */
int donor;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
/* number of votes collected from safekeepers */
int n_votes;
/* number of successful connections over the lifetime of walproposer */
int n_connected;
/*
* Timestamp of the last reconnection attempt. Related to
* config->safekeeper_reconnect_timeout
*/
TimestampTz last_reconnect_attempt;
walproposer_api api;
} WalProposer;
extern WalProposer *WalProposerCreate(WalProposerConfig *config, walproposer_api api);
extern void WalProposerStart(WalProposer *wp);
extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(WalProposer *wp);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
#endif /* __NEON_WALPROPOSER_H__ */

1667
pgxn/neon/walproposer_pg.c Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,659 +0,0 @@
#include "postgres.h"
#include "access/timeline.h"
#include "access/xlogutils.h"
#include "common/logging.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "walproposer_utils.h"
#include "replication/walsender_private.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
#include "libpq-fe.h"
#include <netinet/tcp.h>
#include <unistd.h>
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
#include "access/xlogrecovery.h"
#endif
#if PG_MAJORVERSION_NUM >= 16
#include "utils/guc.h"
#endif
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID
* corresponding the filename of walpropFile.
*/
static int walpropFile = -1;
static TimeLineID walpropFileTLI = 0;
static XLogSegNo walpropSegNo = 0;
/* START cloned file-local variables and functions from walsender.c */
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static void WalSndLoop(void);
static void XLogBroadcastWalProposer(void);
/* END cloned file-level variables and functions from walsender.c */
int
CompareLsn(const void *a, const void *b)
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 < lsn2)
return -1;
else if (lsn1 == lsn2)
return 0;
else
return 1;
}
/* Returns a human-readable string corresonding to the SafekeeperState
*
* The string should not be freed.
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
*/
char *
FormatSafekeeperState(SafekeeperState state)
{
char *return_val = NULL;
switch (state)
{
case SS_OFFLINE:
return_val = "offline";
break;
case SS_CONNECTING_READ:
case SS_CONNECTING_WRITE:
return_val = "connecting";
break;
case SS_WAIT_EXEC_RESULT:
return_val = "receiving query result";
break;
case SS_HANDSHAKE_RECV:
return_val = "handshake (receiving)";
break;
case SS_VOTING:
return_val = "voting";
break;
case SS_WAIT_VERDICT:
return_val = "wait-for-verdict";
break;
case SS_SEND_ELECTED_FLUSH:
return_val = "send-announcement-flush";
break;
case SS_IDLE:
return_val = "idle";
break;
case SS_ACTIVE:
return_val = "active";
break;
}
Assert(return_val != NULL);
return return_val;
}
/* Asserts that the provided events are expected for given safekeeper's state */
void
AssertEventsOkForState(uint32 events, Safekeeper *sk)
{
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
/*
* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component. (b) if we are expecting something, there's
* overlap (i.e. `events & expected != 0`)
*/
bool events_ok_for_state; /* long name so the `Assert` is more
* clear later */
if (expected == WL_NO_EVENTS)
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
else
events_ok_for_state = ((events & expected) != 0);
if (!events_ok_for_state)
{
/*
* To give a descriptive message in the case of failure, we use elog
* and then an assertion that's guaranteed to fail.
*/
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
Assert(events_ok_for_state);
}
}
/* Returns the set of events a safekeeper in this state should be waiting on
*
* This will return WL_NO_EVENTS (= 0) for some events. */
uint32
SafekeeperStateDesiredEvents(SafekeeperState state)
{
uint32 result = WL_NO_EVENTS;
/* If the state doesn't have a modifier, we can check the base state */
switch (state)
{
/* Connecting states say what they want in the name */
case SS_CONNECTING_READ:
result = WL_SOCKET_READABLE;
break;
case SS_CONNECTING_WRITE:
result = WL_SOCKET_WRITEABLE;
break;
/* Reading states need the socket to be read-ready to continue */
case SS_WAIT_EXEC_RESULT:
case SS_HANDSHAKE_RECV:
case SS_WAIT_VERDICT:
result = WL_SOCKET_READABLE;
break;
/*
* Idle states use read-readiness as a sign that the connection
* has been disconnected.
*/
case SS_VOTING:
case SS_IDLE:
result = WL_SOCKET_READABLE;
break;
/*
* Flush states require write-ready for flushing. Active state
* does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
case SS_SEND_ELECTED_FLUSH:
case SS_ACTIVE:
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
break;
/* The offline state expects no events. */
case SS_OFFLINE:
result = WL_NO_EVENTS;
break;
default:
Assert(false);
break;
}
return result;
}
/* Returns a human-readable string corresponding to the event set
*
* If the events do not correspond to something set as the `events` field of a `WaitEvent`, the
* returned string may be meaingless.
*
* The string should not be freed. It should also not be expected to remain the same between
* function calls. */
char *
FormatEvents(uint32 events)
{
static char return_str[8];
/* Helper variable to check if there's extra bits */
uint32 all_flags = WL_LATCH_SET
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
/*
* The formatting here isn't supposed to be *particularly* useful -- it's
* just to give an sense of what events have been triggered without
* needing to remember your powers of two.
*/
return_str[0] = (events & WL_LATCH_SET) ? 'L' : '_';
return_str[1] = (events & WL_SOCKET_READABLE) ? 'R' : '_';
return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_';
return_str[3] = (events & WL_TIMEOUT) ? 'T' : '_';
return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_';
return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_';
return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_';
if (events & (~all_flags))
{
elog(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
}
else
return_str[6] = '\0';
return (char *) &return_str;
}
/*
* Convert a character which represents a hexadecimal digit to an integer.
*
* Returns -1 if the character is not a hexadecimal digit.
*/
static int
HexDecodeChar(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
}
/*
* Decode a hex string into a byte string, 2 hex chars per byte.
*
* Returns false if invalid characters are encountered; otherwise true.
*/
bool
HexDecodeString(uint8 *result, char *input, int nbytes)
{
int i;
for (i = 0; i < nbytes; ++i)
{
int n1 = HexDecodeChar(input[i * 2]);
int n2 = HexDecodeChar(input[i * 2 + 1]);
if (n1 < 0 || n2 < 0)
return false;
result[i] = n1 * 16 + n2;
}
return true;
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint32
pq_getmsgint32_le(StringInfo msg)
{
uint32 n32;
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
return n32;
}
/* --------------------------------
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint64
pq_getmsgint64_le(StringInfo msg)
{
uint64 n64;
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
return n64;
}
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
void
pq_sendint32_le(StringInfo buf, uint32 i)
{
enlargeStringInfo(buf, sizeof(uint32));
memcpy(buf->data + buf->len, &i, sizeof(uint32));
buf->len += sizeof(uint32);
}
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
void
pq_sendint64_le(StringInfo buf, uint64 i)
{
enlargeStringInfo(buf, sizeof(uint64));
memcpy(buf->data + buf->len, &i, sizeof(uint64));
buf->len += sizeof(uint64);
}
/*
* Write XLOG data to disk.
*/
void
XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int startoff;
int byteswritten;
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
XLogWalPropClose(recptr);
if (walpropFile < 0)
{
#if PG_VERSION_NUM >= 150000
/* FIXME Is it ok to use hardcoded value here? */
TimeLineID tli = 1;
#else
bool use_existent = true;
#endif
/* Create/use new log file */
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
#if PG_VERSION_NUM >= 150000
walpropFile = XLogFileInit(walpropSegNo, tli);
walpropFileTLI = tli;
#else
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
walpropFileTLI = ThisTimeLineID;
#endif
}
/* Calculate the start offset of the received logs */
startoff = XLogSegmentOffset(recptr, wal_segment_size);
if (startoff + nbytes > wal_segment_size)
segbytes = wal_segment_size - startoff;
else
segbytes = nbytes;
/* OK to write the logs */
errno = 0;
byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff);
if (byteswritten <= 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno;
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
save_errno = errno;
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
xlogfname, startoff, (unsigned long) segbytes)));
}
/* Update state for write */
recptr += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
}
/*
* Close the current segment if it's fully written up in the last cycle of
* the loop.
*/
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
{
XLogWalPropClose(recptr);
}
}
/*
* Close the current segment.
*/
void
XLogWalPropClose(XLogRecPtr recptr)
{
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
if (close(walpropFile) != 0)
{
char xlogfname[MAXFNAMELEN];
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
xlogfname)));
}
walpropFile = -1;
}
/* START of cloned functions from walsender.c */
/*
* Subscribe for new WAL and stream it in the loop to safekeepers.
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
*/
void
StartProposerReplication(StartReplicationCmd *cmd)
{
XLogRecPtr FlushPtr;
TimeLineID currTLI;
#if PG_VERSION_NUM < 150000
if (ThisTimeLineID == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
#endif
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
*
* NOTE: wal_level can only change at shutdown, so in most cases it is
* difficult for there to be WAL data that we can still see that was
* written at wal_level='minimal'.
*/
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use a logical replication slot for physical replication")));
/*
* We don't need to verify the slot's restart_lsn here; instead we
* rely on the caller requesting the starting point to use. If the
* WAL segment doesn't exist, we'll fail later.
*/
}
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
* kept in ThisTimeLineID.
*
* Neon doesn't currently use PG Timelines, but it may in the future, so
* we keep this code around to lighten the load for when we need it.
*/
#if PG_VERSION_NUM >= 150000
FlushPtr = GetFlushRecPtr(&currTLI);
#else
FlushPtr = GetFlushRecPtr();
currTLI = ThisTimeLineID;
#endif
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
*/
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
LSN_FORMAT_ARGS(cmd->startpoint),
LSN_FORMAT_ARGS(FlushPtr))));
}
/* Start streaming from the requested point */
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* Infinite send loop, never returns */
WalSndLoop();
WalSndSetState(WALSNDSTATE_STARTUP);
if (cmd->slotname)
ReplicationSlotRelease();
}
/*
* Main loop that waits for LSN updates and calls the walproposer.
* Synchronous replication sets latch in WalSndWakeup at walsender.c
*/
static void
WalSndLoop(void)
{
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
for (;;)
{
CHECK_FOR_INTERRUPTS();
XLogBroadcastWalProposer();
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll();
}
}
/*
* Notify walproposer about the new WAL position.
*/
static void
XLogBroadcastWalProposer(void)
{
XLogRecPtr startptr;
XLogRecPtr endptr;
/* Start from the last sent position */
startptr = sentPtr;
/*
* Streaming the current timeline on a primary.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of WALRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
#if PG_VERSION_NUM >= 150000
endptr = GetFlushRecPtr(NULL);
#else
endptr = GetFlushRecPtr();
#endif
/*
* Record the current system time as an approximation of the time at which
* this WAL location was written for the purposes of lag tracking.
*
* In theory we could make XLogFlush() record a time in shmem whenever WAL
* is flushed and we could get that time as well as the LSN when we call
* GetFlushRecPtr() above (and likewise for the cascading standby
* equivalent), but rather than putting any new code into the hot WAL path
* it seems good enough to capture the time here. We should reach this
* after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
* may take some time, we read the WAL flush pointer and take the time
* very close to together here so that we'll get a later position if it is
* still moving.
*
* Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
* this gives us a cheap approximation for the WAL flush time for this
* LSN.
*
* Note that the LSN is not necessarily the LSN for the data contained in
* the present message; it's the end of the WAL, which might be further
* ahead. All the lag tracking machinery cares about is finding out when
* that arbitrary LSN is eventually reported as written, flushed and
* applied, so that it can measure the elapsed time.
*/
LagTrackerWrite(endptr, GetCurrentTimestamp());
/* Do we have any work to do? */
Assert(startptr <= endptr);
if (endptr <= startptr)
return;
WalProposerBroadcast(startptr, endptr);
sentPtr = endptr;
/* Update shared memory status */
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->sentPtr = sentPtr;
SpinLockRelease(&walsnd->mutex);
}
/* Report progress of XLOG streaming in PS display */
if (update_process_title)
{
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
LSN_FORMAT_ARGS(sentPtr));
set_ps_display(activitymsg);
}
}

View File

@@ -1,19 +0,0 @@
#ifndef __NEON_WALPROPOSER_UTILS_H__
#define __NEON_WALPROPOSER_UTILS_H__
#include "walproposer.h"
int CompareLsn(const void *a, const void *b);
char *FormatSafekeeperState(SafekeeperState state);
void AssertEventsOkForState(uint32 events, Safekeeper *sk);
uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
char *FormatEvents(uint32 events);
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
void XLogWalPropClose(XLogRecPtr recptr);
#endif /* __NEON_WALPROPOSER_UTILS_H__ */