Make walproposer 100% globals free

This commit is contained in:
Arthur Petukhovsky
2023-09-26 22:12:17 +00:00
parent f550c26aca
commit 3bf6ecb2a6
4 changed files with 508 additions and 431 deletions

View File

@@ -33,4 +33,7 @@ extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block
extern uint64 BackpressureThrottlingTime(void);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
#endif /* NEON_H */

File diff suppressed because it is too large Load Diff

View File

@@ -317,11 +317,16 @@ typedef struct AppendResponse
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
struct WalProposer;
typedef struct WalProposer WalProposer;
/*
* Descriptor of safekeeper
*/
typedef struct Safekeeper
{
WalProposer *wp;
char const *host;
char const *port;
@@ -369,13 +374,6 @@ typedef struct Safekeeper
AppendResponse appendResponse; /* feedback for master */
} Safekeeper;
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(void);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
/* Re-exported PostgresPollingStatusType */
typedef enum
{
@@ -436,17 +434,13 @@ typedef enum
typedef struct walproposer_api
{
WalproposerShmemState * (*get_shmem_state) (void);
void (*start_streaming) (XLogRecPtr startpos, TimeLineID timeline);
void (*init_walsender) (void);
void (*init_standalone_sync_safekeepers) (void);
uint64 (*init_bgworker) (void);
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos, TimeLineID timeline);
XLogRecPtr (*get_flush_rec_ptr) (void);
TimestampTz (*get_current_timestamp) (void);
TimeLineID (*get_timeline_id) (void);
void (*load_libpqwalreceiver) (void);
char * (*conn_error_message) (WalProposerConn * conn);
WalProposerConnStatusType (*conn_status) (WalProposerConn * conn);
WalProposerConn * (*conn_connect_start) (char *conninfo, char *password);
WalProposerConn * (*conn_connect_start) (char *conninfo);
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn * conn);
bool (*conn_send_query) (WalProposerConn * conn, char * query);
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn * conn);
@@ -467,10 +461,67 @@ typedef struct walproposer_api
bool (*strong_random) (void *buf, size_t len);
XLogRecPtr (*get_redo_start_lsn) (void);
void (*finish_sync_safekeepers) (XLogRecPtr lsn);
void (*process_safekeeper_feedback) (Safekeeper * safekeepers, int n_safekeepers, bool isSync, XLogRecPtr commitLsn);
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
void (*confirm_wal_streamed) (XLogRecPtr lsn);
} walproposer_api;
extern const walproposer_api walprop_pg;
typedef struct WalProposerConfig {
char *neon_tenant;
char *neon_timeline;
char *safekeepers_list;
int safekeeper_reconnect_timeout;
int safekeeper_connection_timeout;
int wal_segment_size;
bool syncSafekeepers;
uint64 systemId;
} WalProposerConfig;
typedef struct WalProposer {
WalProposerConfig *config;
int n_safekeepers;
int quorum;
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;
/* last commitLsn broadcast to safekeepers */
XLogRecPtr lastSentCommitLsn;
ProposerGreeting greetRequest;
/* Vote request for safekeeper */
VoteRequest voteRequest;
/*
* Minimal LSN which may be needed for recovery of some safekeeper,
* record-aligned (first record which might not yet received by someone).
*/
XLogRecPtr truncateLsn;
/*
* Term of the proposer. We want our term to be highest and unique,
* so we collect terms from safekeepers quorum, choose max and +1.
* After that our term is fixed and must not change. If we observe
* that some safekeeper has higher term, it means that we have another
* running compute, so we must stop immediately.
*/
term_t propTerm;
/* term history of the proposer */
TermHistory propTermHistory;
/* epoch start lsn of the proposer */
XLogRecPtr propEpochStartLsn;
/* Most advanced acceptor epoch */
term_t donorEpoch;
/* Most advanced acceptor */
int donor;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
int n_votes;
int n_connected;
TimestampTz last_reconnect_attempt;
walproposer_api api;
} WalProposer;
extern WalProposer *WalProposerCreate(WalProposerConfig *config, walproposer_api api);
extern void WalProposerStart(WalProposer *wp);
extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(WalProposer *wp);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
#endif /* __NEON_WALPROPOSER_H__ */

View File

@@ -50,8 +50,10 @@ int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
static AppendResponse quorumFeedback;
static WalproposerShmemState * walprop_shared;
static WalProposerConfig walprop_config;
static const walproposer_api walprop_pg;
static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
@@ -60,6 +62,12 @@ static uint64 backpressure_lag_impl(void);
static bool backpressure_throttling_impl(void);
static void walprop_register_bgworker(void);
static void walprop_pg_init_standalone_sync_safekeepers(void);
static void walprop_pg_init_walsender(void);
static void walprop_pg_init_bgworker(void);
static TimestampTz walprop_pg_get_current_timestamp(void);
static void walprop_pg_load_libpqwalreceiver(void);
static process_interrupts_callback_t PrevProcessInterruptsCallback;
static shmem_startup_hook_type prev_shmem_startup_hook_type;
#if PG_VERSION_NUM >= 150000
@@ -69,13 +77,65 @@ static void walproposer_shmem_request(void);
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static void StartProposerReplication(StartReplicationCmd *cmd);
static void WalSndLoop(void);
static void XLogBroadcastWalProposer(void);
static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
static void WalSndLoop(WalProposer *wp);
static void XLogBroadcastWalProposer(WalProposer *wp);
static void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalPropClose(XLogRecPtr recptr);
static void
init_walprop_config(bool syncSafekeepers)
{
walprop_config.neon_tenant = neon_tenant;
walprop_config.neon_timeline = neon_timeline;
walprop_config.safekeepers_list = wal_acceptors_list;
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
walprop_config.wal_segment_size = wal_segment_size;
walprop_config.syncSafekeepers = syncSafekeepers;
if (!syncSafekeepers)
walprop_config.systemId = GetSystemIdentifier();
else
walprop_config.systemId = 0;
}
/*
* Entry point for `postgres --sync-safekeepers`.
*/
PGDLLEXPORT void
WalProposerSync(int argc, char *argv[])
{
WalProposer *wp;
init_walprop_config(true);
walprop_pg_init_standalone_sync_safekeepers();
walprop_pg_load_libpqwalreceiver();
wp = WalProposerCreate(&walprop_config, walprop_pg);
WalProposerStart(wp);
}
/*
* WAL proposer bgworker entry point.
*/
PGDLLEXPORT void
WalProposerMain(Datum main_arg)
{
WalProposer *wp;
init_walprop_config(false);
walprop_pg_init_bgworker();
walprop_pg_load_libpqwalreceiver();
wp = WalProposerCreate(&walprop_config, walprop_pg);
wp->last_reconnect_attempt = walprop_pg_get_current_timestamp();
walprop_pg_init_walsender();
WalProposerStart(wp);
}
/*
* Initialize GUCs, bgworker, shmem and backpressure.
*/
@@ -341,7 +401,7 @@ replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRe
* Start walsender streaming replication
*/
static void
walprop_pg_start_streaming(XLogRecPtr startpos, TimeLineID timeline)
walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos, TimeLineID timeline)
{
StartReplicationCmd cmd;
@@ -350,7 +410,7 @@ walprop_pg_start_streaming(XLogRecPtr startpos, TimeLineID timeline)
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
cmd.timeline = timeline;
cmd.startpoint = startpos;
StartProposerReplication(&cmd);
StartProposerReplication(wp, &cmd);
}
static void
@@ -413,7 +473,7 @@ walprop_pg_init_standalone_sync_safekeepers(void)
BackgroundWorkerUnblockSignals();
}
static uint64
static void
walprop_pg_init_bgworker(void)
{
#if PG_VERSION_NUM >= 150000
@@ -436,8 +496,6 @@ walprop_pg_init_bgworker(void)
#else
GetXLogReplayRecPtr(&ThisTimeLineID);
#endif
return GetSystemIdentifier();
}
static XLogRecPtr
@@ -522,13 +580,14 @@ walprop_status(WalProposerConn *conn)
}
static WalProposerConn *
walprop_connect_start(char *conninfo, char *password)
walprop_connect_start(char *conninfo)
{
WalProposerConn *conn;
PGconn *pg_conn;
const char *keywords[3];
const char *values[3];
int n;
char *password = neon_auth_token;
/*
* Connect using the given connection string. If the
@@ -901,7 +960,7 @@ walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
* to the main loop.
*/
static void
StartProposerReplication(StartReplicationCmd *cmd)
StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
{
XLogRecPtr FlushPtr;
TimeLineID currTLI;
@@ -986,7 +1045,7 @@ StartProposerReplication(StartReplicationCmd *cmd)
SyncRepInitConfig();
/* Infinite send loop, never returns */
WalSndLoop();
WalSndLoop(wp);
WalSndSetState(WALSNDSTATE_STARTUP);
@@ -999,7 +1058,7 @@ StartProposerReplication(StartReplicationCmd *cmd)
* Synchronous replication sets latch in WalSndWakeup at walsender.c
*/
static void
WalSndLoop(void)
WalSndLoop(WalProposer *wp)
{
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
@@ -1008,11 +1067,11 @@ WalSndLoop(void)
{
CHECK_FOR_INTERRUPTS();
XLogBroadcastWalProposer();
XLogBroadcastWalProposer(wp);
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll();
WalProposerPoll(wp);
}
}
@@ -1020,7 +1079,7 @@ WalSndLoop(void)
* Notify walproposer about the new WAL position.
*/
static void
XLogBroadcastWalProposer(void)
XLogBroadcastWalProposer(WalProposer *wp)
{
XLogRecPtr startptr;
XLogRecPtr endptr;
@@ -1075,7 +1134,7 @@ XLogBroadcastWalProposer(void)
if (endptr <= startptr)
return;
WalProposerBroadcast(startptr, endptr);
WalProposerBroadcast(wp, startptr, endptr);
sentPtr = endptr;
/* Update shared memory status */
@@ -1449,25 +1508,25 @@ walprop_pg_finish_sync_safekeepers(XLogRecPtr lsn)
* Get PageserverFeedback fields from the most advanced safekeeper
*/
static void
GetLatestNeonFeedback(PageserverFeedback * rf, Safekeeper * safekeepers, int n_safekeepers)
GetLatestNeonFeedback(PageserverFeedback * rf, WalProposer *wp)
{
int latest_safekeeper = 0;
XLogRecPtr last_received_lsn = InvalidXLogRecPtr;
for (int i = 0; i < n_safekeepers; i++)
for (int i = 0; i < wp->n_safekeepers; i++)
{
if (safekeepers[i].appendResponse.rf.last_received_lsn > last_received_lsn)
if (wp->safekeeper[i].appendResponse.rf.last_received_lsn > last_received_lsn)
{
latest_safekeeper = i;
last_received_lsn = safekeepers[i].appendResponse.rf.last_received_lsn;
last_received_lsn = wp->safekeeper[i].appendResponse.rf.last_received_lsn;
}
}
rf->currentClusterSize = safekeepers[latest_safekeeper].appendResponse.rf.currentClusterSize;
rf->last_received_lsn = safekeepers[latest_safekeeper].appendResponse.rf.last_received_lsn;
rf->disk_consistent_lsn = safekeepers[latest_safekeeper].appendResponse.rf.disk_consistent_lsn;
rf->remote_consistent_lsn = safekeepers[latest_safekeeper].appendResponse.rf.remote_consistent_lsn;
rf->replytime = safekeepers[latest_safekeeper].appendResponse.rf.replytime;
rf->currentClusterSize = wp->safekeeper[latest_safekeeper].appendResponse.rf.currentClusterSize;
rf->last_received_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.last_received_lsn;
rf->disk_consistent_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.disk_consistent_lsn;
rf->remote_consistent_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.remote_consistent_lsn;
rf->replytime = wp->safekeeper[latest_safekeeper].appendResponse.rf.replytime;
elog(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu,"
" last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu",
@@ -1484,17 +1543,17 @@ GetLatestNeonFeedback(PageserverFeedback * rf, Safekeeper * safekeepers, int n_s
* Combine hot standby feedbacks from all safekeepers.
*/
static void
CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, Safekeeper * safekeepers, int n_safekeepers)
CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, WalProposer *wp)
{
hs->ts = 0;
hs->xmin.value = ~0; /* largest unsigned value */
hs->catalog_xmin.value = ~0; /* largest unsigned value */
for (int i = 0; i < n_safekeepers; i++)
for (int i = 0; i < wp->n_safekeepers; i++)
{
if (safekeepers[i].appendResponse.hs.ts != 0)
if (wp->safekeeper[i].appendResponse.hs.ts != 0)
{
HotStandbyFeedback *skhs = &safekeepers[i].appendResponse.hs;
HotStandbyFeedback *skhs = &wp->safekeeper[i].appendResponse.hs;
if (FullTransactionIdIsNormal(skhs->xmin)
&& FullTransactionIdPrecedes(skhs->xmin, hs->xmin))
{
@@ -1517,17 +1576,17 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs, Safekeeper * safekeepers, int
}
static void
walprop_pg_process_safekeeper_feedback(Safekeeper * safekeepers, int n_safekeepers, bool isSync, XLogRecPtr commitLsn)
walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
{
HotStandbyFeedback hsFeedback;
XLogRecPtr diskConsistentLsn;
diskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn;
if (!isSync)
if (!wp->config->syncSafekeepers)
{
/* Get PageserverFeedback fields from the most advanced safekeeper */
GetLatestNeonFeedback(&quorumFeedback.rf, safekeepers, n_safekeepers);
GetLatestNeonFeedback(&quorumFeedback.rf, wp);
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);
}
@@ -1538,7 +1597,7 @@ walprop_pg_process_safekeeper_feedback(Safekeeper * safekeepers, int n_safekeepe
quorumFeedback.flushLsn = commitLsn;
/* advance the replication slot */
if (!isSync)
if (!wp->config->syncSafekeepers)
ProcessStandbyReply(
/* write_lsn - This is what durably stored in WAL service. */
quorumFeedback.flushLsn,
@@ -1553,11 +1612,11 @@ walprop_pg_process_safekeeper_feedback(Safekeeper * safekeepers, int n_safekeepe
walprop_pg_get_current_timestamp(), false);
}
CombineHotStanbyFeedbacks(&hsFeedback, safekeepers, n_safekeepers);
CombineHotStanbyFeedbacks(&hsFeedback, wp);
if (hsFeedback.ts != 0 && memcmp(&hsFeedback, &quorumFeedback.hs, sizeof hsFeedback) != 0)
{
quorumFeedback.hs = hsFeedback;
if (!isSync)
if (!wp->config->syncSafekeepers)
ProcessStandbyHSFeedback(hsFeedback.ts,
XidFromFullTransactionId(hsFeedback.xmin),
EpochFromFullTransactionId(hsFeedback.xmin),
@@ -1576,16 +1635,12 @@ walprop_pg_confirm_wal_streamed(XLogRecPtr lsn)
/*
* Temporary globally exported walproposer API for postgres.
*/
const walproposer_api walprop_pg = {
static const walproposer_api walprop_pg = {
.get_shmem_state = walprop_pg_get_shmem_state,
.start_streaming = walprop_pg_start_streaming,
.init_walsender = walprop_pg_init_walsender,
.init_standalone_sync_safekeepers = walprop_pg_init_standalone_sync_safekeepers,
.init_bgworker = walprop_pg_init_bgworker,
.get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr,
.get_current_timestamp = walprop_pg_get_current_timestamp,
.get_timeline_id = walprop_pg_get_timeline_id,
.load_libpqwalreceiver = walprop_pg_load_libpqwalreceiver,
.conn_error_message = walprop_error_message,
.conn_status = walprop_status,
.conn_connect_start = walprop_connect_start,