From 029623bfe7b8a14824c766ac173819678575b6ef Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Mon, 25 Sep 2023 15:58:38 +0000 Subject: [PATCH] Abstract away waitEvents handling --- pgxn/neon/walproposer.c | 125 ++++++++---------------------------- pgxn/neon/walproposer.h | 7 +++ pgxn/neon/walproposer_pg.c | 126 +++++++++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+), 100 deletions(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 89b37d25fb..0e79d07c76 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -84,7 +84,6 @@ static XLogRecPtr lastSentCommitLsn; /* last commitLsn broadcast to* * safekeepers */ static ProposerGreeting greetRequest; static VoteRequest voteRequest; /* Vote request for safekeeper */ -static WaitEventSet *waitEvents; static AppendResponse quorumFeedback; /* * Minimal LSN which may be needed for recovery of some safekeeper, @@ -114,8 +113,6 @@ static void WalProposerRegister(void); static void WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId); static void WalProposerStart(void); static void WalProposerLoop(void); -static void InitEventSet(void); -static void UpdateEventSet(Safekeeper *sk, uint32 events); static void HackyRemoveWalProposerEvent(Safekeeper *to_remove); static void ShutdownConnection(Safekeeper *sk); static void ResetConnection(Safekeeper *sk); @@ -209,53 +206,25 @@ WalProposerPoll(void) while (true) { Safekeeper *sk = NULL; - bool wait_timeout = false; - bool late_cv_trigger = false; - WaitEvent event = {0}; int rc = 0; + uint32 events = 0; TimestampTz now = walprop_pg.get_current_timestamp(); long timeout = TimeToReconnect(now); -#if PG_MAJORVERSION_NUM >= 16 - if (WalSndCtl != NULL) - ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); -#endif - - /* - * Wait for a wait event to happen, or timeout: - * - Safekeeper socket can become available for READ or WRITE - * - Our latch got set, because - * * PG15-: We got woken up by a process triggering the WalSender - * * PG16+: WalSndCtl->wal_flush_cv was triggered - */ - rc = WaitEventSetWait(waitEvents, timeout, - &event, 1, WAIT_EVENT_WAL_SENDER_MAIN); -#if PG_MAJORVERSION_NUM >= 16 - if (WalSndCtl != NULL) - late_cv_trigger = ConditionVariableCancelSleep(); -#endif - - /* - * If wait is terminated by latch set (walsenders' latch is set on - * each wal flush), then exit loop. (no need for pm death check due to - * WL_EXIT_ON_PM_DEATH) - */ - if ((rc == 1 && event.events & WL_LATCH_SET) || late_cv_trigger) - { - /* Reset our latch */ - ResetLatch(MyLatch); + rc = walprop_pg.wait_event_set(timeout, &sk, &events); + /* Exit loop if latch is set (we got new WAL) */ + if ((rc == 1 && events & WL_LATCH_SET)) break; - } /* * If the event contains something that one of our safekeeper states * was waiting for, we'll advance its state. */ - if (rc == 1 && (event.events & (WL_SOCKET_MASK))) + if (rc == 1 && (events & WL_SOCKET_MASK)) { - sk = (Safekeeper *) event.user_data; - AdvancePollState(sk, event.events); + Assert(sk != NULL); + AdvancePollState(sk, events); } /* @@ -266,8 +235,6 @@ WalProposerPoll(void) if (rc == 0) /* timeout expired */ { - wait_timeout = true; - /* * Ensure flushrecptr is set to a recent value. This fixes a case * where we've not been notified of new WAL records when we were @@ -356,7 +323,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) } initStringInfo(&safekeeper[n_safekeepers].outbuf); - safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL); + safekeeper[n_safekeepers].xlogreader = walprop_pg.wal_reader_allocate(); if (safekeeper[n_safekeepers].xlogreader == NULL) elog(FATAL, "Failed to allocate xlog reader"); safekeeper[n_safekeepers].flushWrite = false; @@ -390,7 +357,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) greetRequest.timeline = walprop_pg.get_timeline_id(); greetRequest.walSegSize = wal_segment_size; - InitEventSet(); + walprop_pg.init_event_set(n_safekeepers); } static void @@ -413,37 +380,6 @@ WalProposerLoop(void) WalProposerPoll(); } -/* Initializes the internal event set, provided that it is currently null */ -static void -InitEventSet(void) -{ - if (waitEvents) - elog(FATAL, "double-initialization of event set"); - - waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + n_safekeepers); - AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); -} - -/* - * Updates the events we're already waiting on for the safekeeper, setting it to - * the provided `events` - * - * This function is called any time the safekeeper's state switches to one where - * it has to wait to continue. This includes the full body of AdvancePollState - * and calls to IO helper functions. - */ -static void -UpdateEventSet(Safekeeper *sk, uint32 events) -{ - /* eventPos = -1 when we don't have an event */ - Assert(sk->eventPos != -1); - - ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL); -} - /* * Hack: provides a way to remove the event corresponding to an individual walproposer from the set. * @@ -453,13 +389,9 @@ static void HackyRemoveWalProposerEvent(Safekeeper *to_remove) { /* Remove the existing event set */ - if (waitEvents) - { - FreeWaitEventSet(waitEvents); - waitEvents = NULL; - } + walprop_pg.free_event_set(); /* Re-initialize it without adding any safekeeper events */ - InitEventSet(); + walprop_pg.init_event_set(n_safekeepers); /* * loop through the existing safekeepers. If they aren't the one we're @@ -480,7 +412,8 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove) if (sk->conn != NULL) { desired_events = SafekeeperStateDesiredEvents(sk->state); - sk->eventPos = AddWaitEventToSet(waitEvents, desired_events, walprop_pg.conn_socket(sk->conn), NULL, sk); + /* will set sk->eventPos */ + walprop_pg.add_safekeeper_event_set(sk, desired_events); } } } @@ -512,8 +445,6 @@ ShutdownConnection(Safekeeper *sk) static void ResetConnection(Safekeeper *sk) { - pgsocket sock; /* socket of the new connection */ - if (sk->state != SS_OFFLINE) { ShutdownConnection(sk); @@ -577,8 +508,7 @@ ResetConnection(Safekeeper *sk) sk->state = SS_CONNECTING_WRITE; sk->latestMsgReceivedAt = walprop_pg.get_current_timestamp(); - sock = walprop_pg.conn_socket(sk->conn); - sk->eventPos = AddWaitEventToSet(waitEvents, WL_SOCKET_WRITEABLE, sock, NULL, sk); + walprop_pg.add_safekeeper_event_set(sk, WL_SOCKET_WRITEABLE); return; } @@ -772,7 +702,7 @@ HandleConnectionEvent(Safekeeper *sk) * old event and re-register an event on the new socket. */ HackyRemoveWalProposerEvent(sk); - sk->eventPos = AddWaitEventToSet(waitEvents, new_events, walprop_pg.conn_socket(sk->conn), NULL, sk); + walprop_pg.add_safekeeper_event_set(sk, new_events); /* If we successfully connected, send START_WAL_PUSH query */ if (result == WP_CONN_POLLING_OK) @@ -795,7 +725,7 @@ SendStartWALPush(Safekeeper *sk) return; } sk->state = SS_WAIT_EXEC_RESULT; - UpdateEventSet(sk, WL_SOCKET_READABLE); + walprop_pg.update_event_set(sk, WL_SOCKET_READABLE); } static void @@ -919,7 +849,7 @@ RecvAcceptorGreeting(Safekeeper *sk) * SS_VOTING is an idle state; read-ready indicates the connection * closed. */ - UpdateEventSet(sk, WL_SOCKET_READABLE); + walprop_pg.update_event_set(sk, WL_SOCKET_READABLE); } else { @@ -992,8 +922,8 @@ RecvVoteResponse(Safekeeper *sk) else { sk->state = SS_IDLE; - UpdateEventSet(sk, WL_SOCKET_READABLE); /* Idle states wait for - * read-ready */ + /* Idle state waits for read-ready events */ + walprop_pg.update_event_set(sk, WL_SOCKET_READABLE); HandleElectedProposer(); } @@ -1434,7 +1364,7 @@ HandleActiveState(Safekeeper *sk, uint32 events) if (sk->streamingAt != availableLsn || sk->flushWrite) newEvents |= WL_SOCKET_WRITEABLE; - UpdateEventSet(sk, newEvents); + walprop_pg.update_event_set(sk, newEvents); } /* @@ -1452,7 +1382,6 @@ SendAppendRequests(Safekeeper *sk) XLogRecPtr endLsn; AppendRequestHeader *req; PGAsyncWriteResult writeResult; - WALReadError errinfo; bool sentAnything = false; if (sk->flushWrite) @@ -1500,15 +1429,11 @@ SendAppendRequests(Safekeeper *sk) /* write the WAL itself */ enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); - if (!WALRead(sk->xlogreader, + /* will raise error on failure */ + walprop_pg.wal_read(sk->xlogreader, &sk->outbuf.data[sk->outbuf.len], req->beginLsn, - req->endLsn - req->beginLsn, - walprop_pg.get_timeline_id(), - &errinfo)) - { - WALReadRaiseError(&errinfo); - } + req->endLsn - req->beginLsn); sk->outbuf.len += req->endLsn - req->beginLsn; writeResult = walprop_pg.conn_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len); @@ -2076,7 +2001,7 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes */ events = SafekeeperStateDesiredEvents(success_state); if (events) - UpdateEventSet(sk, events); + walprop_pg.update_event_set(sk, events); return true; } @@ -2103,7 +2028,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta * this function */ sk->state = flush_state; - UpdateEventSet(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); + walprop_pg.update_event_set(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); return false; case PG_ASYNC_WRITE_FAIL: elog(WARNING, "Failed to send to node %s:%s in %s state: %s", diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index f2d7925082..c2b08af3e7 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -474,6 +474,13 @@ typedef struct walproposer_api PGAsyncWriteResult (*conn_async_write) (WalProposerConn * conn, void const *buf, size_t size); bool (*conn_blocking_write) (WalProposerConn * conn, void const *buf, size_t size); bool (*recovery_download) (Safekeeper * sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos); + void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count); + XLogReaderState * (*wal_reader_allocate) (void); + void (*free_event_set) (void); + void (*init_event_set) (int n_safekeepers); + void (*update_event_set) (Safekeeper * sk, uint32 events); + void (*add_safekeeper_event_set) (Safekeeper * sk, uint32 events); + int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events); } walproposer_api; extern const walproposer_api walprop_pg; diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index b5096c97c3..13f8160ce0 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1313,6 +1313,125 @@ XLogWalPropClose(XLogRecPtr recptr) walpropFile = -1; } +static void +walprop_pg_wal_read(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count) +{ + WALReadError errinfo; + + if (!WALRead(state, + buf, + startptr, + count, + walprop_pg_get_timeline_id(), + &errinfo)) + { + WALReadRaiseError(&errinfo); + } +} + +static XLogReaderState * +walprop_pg_wal_reader_allocate(void) +{ + return XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL); +} + +static WaitEventSet *waitEvents; + +static void +walprop_pg_free_event_set(void) +{ + if (waitEvents) + { + FreeWaitEventSet(waitEvents); + waitEvents = NULL; + } +} + +static void +walprop_pg_init_event_set(int n_safekeepers) +{ + if (waitEvents) + elog(FATAL, "double-initialization of event set"); + + waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + n_safekeepers); + AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); +} + +static void +walprop_pg_update_event_set(Safekeeper * sk, uint32 events) +{ + /* eventPos = -1 when we don't have an event */ + Assert(sk->eventPos != -1); + + ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL); +} + +static void +walprop_pg_add_safekeeper_event_set(Safekeeper * sk, uint32 events) +{ + sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk->conn), NULL, sk); +} + +static int +walprop_pg_wait_event_set(long timeout, Safekeeper **sk, uint32 *events) +{ + WaitEvent event = {0}; + int rc = 0; + bool late_cv_trigger = false; + + *sk = NULL; + *events = 0; + +#if PG_MAJORVERSION_NUM >= 16 + if (WalSndCtl != NULL) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); +#endif + + /* + * Wait for a wait event to happen, or timeout: + * - Safekeeper socket can become available for READ or WRITE + * - Our latch got set, because + * * PG15-: We got woken up by a process triggering the WalSender + * * PG16+: WalSndCtl->wal_flush_cv was triggered + */ + rc = WaitEventSetWait(waitEvents, timeout, + &event, 1, WAIT_EVENT_WAL_SENDER_MAIN); +#if PG_MAJORVERSION_NUM >= 16 + if (WalSndCtl != NULL) + late_cv_trigger = ConditionVariableCancelSleep(); +#endif + + /* + * If wait is terminated by latch set (walsenders' latch is set on + * each wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH) + */ + if ((rc == 1 && event.events & WL_LATCH_SET) || late_cv_trigger) + { + /* Reset our latch */ + ResetLatch(MyLatch); + *events = WL_LATCH_SET; + return 1; + } + + /* + * If the event contains something about the socket, it means we got + * an event from a safekeeper socket. + */ + if (rc == 1 && (event.events & (WL_SOCKET_MASK))) + { + *sk = (Safekeeper *) event.user_data; + *events = event.events; + return 1; + } + + /* XXX: Can we have non-timeout event here? */ + *events = event.events; + return rc; +} + /* * Temporary globally exported walproposer API for postgres. */ @@ -1340,4 +1459,11 @@ const walproposer_api walprop_pg = { .conn_async_write = walprop_async_write, .conn_blocking_write = walprop_blocking_write, .recovery_download = WalProposerRecovery, + .wal_read = walprop_pg_wal_read, + .wal_reader_allocate = walprop_pg_wal_reader_allocate, + .free_event_set = walprop_pg_free_event_set, + .init_event_set = walprop_pg_init_event_set, + .update_event_set = walprop_pg_update_event_set, + .add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set, + .wait_event_set = walprop_pg_wait_event_set, };