Abstract away waitEvents handling

This commit is contained in:
Arthur Petukhovsky
2023-09-25 15:58:38 +00:00
parent 227eb21303
commit 029623bfe7
3 changed files with 158 additions and 100 deletions

View File

@@ -84,7 +84,6 @@ static XLogRecPtr lastSentCommitLsn; /* last commitLsn broadcast to*
* safekeepers */
static ProposerGreeting greetRequest;
static VoteRequest voteRequest; /* Vote request for safekeeper */
static WaitEventSet *waitEvents;
static AppendResponse quorumFeedback;
/*
* Minimal LSN which may be needed for recovery of some safekeeper,
@@ -114,8 +113,6 @@ static void WalProposerRegister(void);
static void WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId);
static void WalProposerStart(void);
static void WalProposerLoop(void);
static void InitEventSet(void);
static void UpdateEventSet(Safekeeper *sk, uint32 events);
static void HackyRemoveWalProposerEvent(Safekeeper *to_remove);
static void ShutdownConnection(Safekeeper *sk);
static void ResetConnection(Safekeeper *sk);
@@ -209,53 +206,25 @@ WalProposerPoll(void)
while (true)
{
Safekeeper *sk = NULL;
bool wait_timeout = false;
bool late_cv_trigger = false;
WaitEvent event = {0};
int rc = 0;
uint32 events = 0;
TimestampTz now = walprop_pg.get_current_timestamp();
long timeout = TimeToReconnect(now);
#if PG_MAJORVERSION_NUM >= 16
if (WalSndCtl != NULL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
#endif
/*
* Wait for a wait event to happen, or timeout:
* - Safekeeper socket can become available for READ or WRITE
* - Our latch got set, because
* * PG15-: We got woken up by a process triggering the WalSender
* * PG16+: WalSndCtl->wal_flush_cv was triggered
*/
rc = WaitEventSetWait(waitEvents, timeout,
&event, 1, WAIT_EVENT_WAL_SENDER_MAIN);
#if PG_MAJORVERSION_NUM >= 16
if (WalSndCtl != NULL)
late_cv_trigger = ConditionVariableCancelSleep();
#endif
/*
* If wait is terminated by latch set (walsenders' latch is set on
* each wal flush), then exit loop. (no need for pm death check due to
* WL_EXIT_ON_PM_DEATH)
*/
if ((rc == 1 && event.events & WL_LATCH_SET) || late_cv_trigger)
{
/* Reset our latch */
ResetLatch(MyLatch);
rc = walprop_pg.wait_event_set(timeout, &sk, &events);
/* Exit loop if latch is set (we got new WAL) */
if ((rc == 1 && events & WL_LATCH_SET))
break;
}
/*
* If the event contains something that one of our safekeeper states
* was waiting for, we'll advance its state.
*/
if (rc == 1 && (event.events & (WL_SOCKET_MASK)))
if (rc == 1 && (events & WL_SOCKET_MASK))
{
sk = (Safekeeper *) event.user_data;
AdvancePollState(sk, event.events);
Assert(sk != NULL);
AdvancePollState(sk, events);
}
/*
@@ -266,8 +235,6 @@ WalProposerPoll(void)
if (rc == 0) /* timeout expired */
{
wait_timeout = true;
/*
* Ensure flushrecptr is set to a recent value. This fixes a case
* where we've not been notified of new WAL records when we were
@@ -356,7 +323,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
}
initStringInfo(&safekeeper[n_safekeepers].outbuf);
safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
safekeeper[n_safekeepers].xlogreader = walprop_pg.wal_reader_allocate();
if (safekeeper[n_safekeepers].xlogreader == NULL)
elog(FATAL, "Failed to allocate xlog reader");
safekeeper[n_safekeepers].flushWrite = false;
@@ -390,7 +357,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
greetRequest.timeline = walprop_pg.get_timeline_id();
greetRequest.walSegSize = wal_segment_size;
InitEventSet();
walprop_pg.init_event_set(n_safekeepers);
}
static void
@@ -413,37 +380,6 @@ WalProposerLoop(void)
WalProposerPoll();
}
/* Initializes the internal event set, provided that it is currently null */
static void
InitEventSet(void)
{
if (waitEvents)
elog(FATAL, "double-initialization of event set");
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + n_safekeepers);
AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
}
/*
* Updates the events we're already waiting on for the safekeeper, setting it to
* the provided `events`
*
* This function is called any time the safekeeper's state switches to one where
* it has to wait to continue. This includes the full body of AdvancePollState
* and calls to IO helper functions.
*/
static void
UpdateEventSet(Safekeeper *sk, uint32 events)
{
/* eventPos = -1 when we don't have an event */
Assert(sk->eventPos != -1);
ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL);
}
/*
* Hack: provides a way to remove the event corresponding to an individual walproposer from the set.
*
@@ -453,13 +389,9 @@ static void
HackyRemoveWalProposerEvent(Safekeeper *to_remove)
{
/* Remove the existing event set */
if (waitEvents)
{
FreeWaitEventSet(waitEvents);
waitEvents = NULL;
}
walprop_pg.free_event_set();
/* Re-initialize it without adding any safekeeper events */
InitEventSet();
walprop_pg.init_event_set(n_safekeepers);
/*
* loop through the existing safekeepers. If they aren't the one we're
@@ -480,7 +412,8 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove)
if (sk->conn != NULL)
{
desired_events = SafekeeperStateDesiredEvents(sk->state);
sk->eventPos = AddWaitEventToSet(waitEvents, desired_events, walprop_pg.conn_socket(sk->conn), NULL, sk);
/* will set sk->eventPos */
walprop_pg.add_safekeeper_event_set(sk, desired_events);
}
}
}
@@ -512,8 +445,6 @@ ShutdownConnection(Safekeeper *sk)
static void
ResetConnection(Safekeeper *sk)
{
pgsocket sock; /* socket of the new connection */
if (sk->state != SS_OFFLINE)
{
ShutdownConnection(sk);
@@ -577,8 +508,7 @@ ResetConnection(Safekeeper *sk)
sk->state = SS_CONNECTING_WRITE;
sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp();
sock = walprop_pg.conn_socket(sk->conn);
sk->eventPos = AddWaitEventToSet(waitEvents, WL_SOCKET_WRITEABLE, sock, NULL, sk);
walprop_pg.add_safekeeper_event_set(sk, WL_SOCKET_WRITEABLE);
return;
}
@@ -772,7 +702,7 @@ HandleConnectionEvent(Safekeeper *sk)
* old event and re-register an event on the new socket.
*/
HackyRemoveWalProposerEvent(sk);
sk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_pg.conn_socket(sk->conn), NULL, sk);
walprop_pg.add_safekeeper_event_set(sk, new_events);
/* If we successfully connected, send START_WAL_PUSH query */
if (result == WP_CONN_POLLING_OK)
@@ -795,7 +725,7 @@ SendStartWALPush(Safekeeper *sk)
return;
}
sk->state = SS_WAIT_EXEC_RESULT;
UpdateEventSet(sk, WL_SOCKET_READABLE);
walprop_pg.update_event_set(sk, WL_SOCKET_READABLE);
}
static void
@@ -919,7 +849,7 @@ RecvAcceptorGreeting(Safekeeper *sk)
* SS_VOTING is an idle state; read-ready indicates the connection
* closed.
*/
UpdateEventSet(sk, WL_SOCKET_READABLE);
walprop_pg.update_event_set(sk, WL_SOCKET_READABLE);
}
else
{
@@ -992,8 +922,8 @@ RecvVoteResponse(Safekeeper *sk)
else
{
sk->state = SS_IDLE;
UpdateEventSet(sk, WL_SOCKET_READABLE); /* Idle states wait for
* read-ready */
/* Idle state waits for read-ready events */
walprop_pg.update_event_set(sk, WL_SOCKET_READABLE);
HandleElectedProposer();
}
@@ -1434,7 +1364,7 @@ HandleActiveState(Safekeeper *sk, uint32 events)
if (sk->streamingAt != availableLsn || sk->flushWrite)
newEvents |= WL_SOCKET_WRITEABLE;
UpdateEventSet(sk, newEvents);
walprop_pg.update_event_set(sk, newEvents);
}
/*
@@ -1452,7 +1382,6 @@ SendAppendRequests(Safekeeper *sk)
XLogRecPtr endLsn;
AppendRequestHeader *req;
PGAsyncWriteResult writeResult;
WALReadError errinfo;
bool sentAnything = false;
if (sk->flushWrite)
@@ -1500,15 +1429,11 @@ SendAppendRequests(Safekeeper *sk)
/* write the WAL itself */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
if (!WALRead(sk->xlogreader,
/* will raise error on failure */
walprop_pg.wal_read(sk->xlogreader,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn,
walprop_pg.get_timeline_id(),
&errinfo))
{
WALReadRaiseError(&errinfo);
}
req->endLsn - req->beginLsn);
sk->outbuf.len += req->endLsn - req->beginLsn;
writeResult = walprop_pg.conn_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len);
@@ -2076,7 +2001,7 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes
*/
events = SafekeeperStateDesiredEvents(success_state);
if (events)
UpdateEventSet(sk, events);
walprop_pg.update_event_set(sk, events);
return true;
}
@@ -2103,7 +2028,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta
* this function
*/
sk->state = flush_state;
UpdateEventSet(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
walprop_pg.update_event_set(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);
return false;
case PG_ASYNC_WRITE_FAIL:
elog(WARNING, "Failed to send to node %s:%s in %s state: %s",

View File

@@ -474,6 +474,13 @@ typedef struct walproposer_api
PGAsyncWriteResult (*conn_async_write) (WalProposerConn * conn, void const *buf, size_t size);
bool (*conn_blocking_write) (WalProposerConn * conn, void const *buf, size_t size);
bool (*recovery_download) (Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count);
XLogReaderState * (*wal_reader_allocate) (void);
void (*free_event_set) (void);
void (*init_event_set) (int n_safekeepers);
void (*update_event_set) (Safekeeper * sk, uint32 events);
void (*add_safekeeper_event_set) (Safekeeper * sk, uint32 events);
int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events);
} walproposer_api;
extern const walproposer_api walprop_pg;

View File

@@ -1313,6 +1313,125 @@ XLogWalPropClose(XLogRecPtr recptr)
walpropFile = -1;
}
static void
walprop_pg_wal_read(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count)
{
WALReadError errinfo;
if (!WALRead(state,
buf,
startptr,
count,
walprop_pg_get_timeline_id(),
&errinfo))
{
WALReadRaiseError(&errinfo);
}
}
static XLogReaderState *
walprop_pg_wal_reader_allocate(void)
{
return XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL);
}
static WaitEventSet *waitEvents;
static void
walprop_pg_free_event_set(void)
{
if (waitEvents)
{
FreeWaitEventSet(waitEvents);
waitEvents = NULL;
}
}
static void
walprop_pg_init_event_set(int n_safekeepers)
{
if (waitEvents)
elog(FATAL, "double-initialization of event set");
waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + n_safekeepers);
AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
}
static void
walprop_pg_update_event_set(Safekeeper * sk, uint32 events)
{
/* eventPos = -1 when we don't have an event */
Assert(sk->eventPos != -1);
ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL);
}
static void
walprop_pg_add_safekeeper_event_set(Safekeeper * sk, uint32 events)
{
sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk->conn), NULL, sk);
}
static int
walprop_pg_wait_event_set(long timeout, Safekeeper **sk, uint32 *events)
{
WaitEvent event = {0};
int rc = 0;
bool late_cv_trigger = false;
*sk = NULL;
*events = 0;
#if PG_MAJORVERSION_NUM >= 16
if (WalSndCtl != NULL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
#endif
/*
* Wait for a wait event to happen, or timeout:
* - Safekeeper socket can become available for READ or WRITE
* - Our latch got set, because
* * PG15-: We got woken up by a process triggering the WalSender
* * PG16+: WalSndCtl->wal_flush_cv was triggered
*/
rc = WaitEventSetWait(waitEvents, timeout,
&event, 1, WAIT_EVENT_WAL_SENDER_MAIN);
#if PG_MAJORVERSION_NUM >= 16
if (WalSndCtl != NULL)
late_cv_trigger = ConditionVariableCancelSleep();
#endif
/*
* If wait is terminated by latch set (walsenders' latch is set on
* each wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH)
*/
if ((rc == 1 && event.events & WL_LATCH_SET) || late_cv_trigger)
{
/* Reset our latch */
ResetLatch(MyLatch);
*events = WL_LATCH_SET;
return 1;
}
/*
* If the event contains something about the socket, it means we got
* an event from a safekeeper socket.
*/
if (rc == 1 && (event.events & (WL_SOCKET_MASK)))
{
*sk = (Safekeeper *) event.user_data;
*events = event.events;
return 1;
}
/* XXX: Can we have non-timeout event here? */
*events = event.events;
return rc;
}
/*
* Temporary globally exported walproposer API for postgres.
*/
@@ -1340,4 +1459,11 @@ const walproposer_api walprop_pg = {
.conn_async_write = walprop_async_write,
.conn_blocking_write = walprop_blocking_write,
.recovery_download = WalProposerRecovery,
.wal_read = walprop_pg_wal_read,
.wal_reader_allocate = walprop_pg_wal_reader_allocate,
.free_event_set = walprop_pg_free_event_set,
.init_event_set = walprop_pg_init_event_set,
.update_event_set = walprop_pg_update_event_set,
.add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set,
.wait_event_set = walprop_pg_wait_event_set,
};