From 3ce3ccac0d670d17683f0ee545a1bd9c2a2f30de Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 24 Nov 2023 22:12:25 +0300 Subject: [PATCH] Prepare waiteventset. --- pgxn/neon/neon_walreader.c | 13 +- pgxn/neon/walproposer.c | 374 ++++++++++++++++++++----------------- pgxn/neon/walproposer.h | 42 +++-- pgxn/neon/walproposer_pg.c | 115 ++++++++---- 4 files changed, 321 insertions(+), 223 deletions(-) diff --git a/pgxn/neon/neon_walreader.c b/pgxn/neon/neon_walreader.c index 4ce0bc3ced..0f68d19a3c 100644 --- a/pgxn/neon/neon_walreader.c +++ b/pgxn/neon/neon_walreader.c @@ -181,8 +181,10 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou { if (state->rem_state == RS_NONE) { + XLogRecPtr donor_lsn; + /* no connection yet; start one */ - Safekeeper *donor = GetDonor(state->wp); + Safekeeper *donor = GetDonor(state->wp, &donor_lsn); if (donor == NULL) { @@ -191,12 +193,13 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou return NEON_WALREAD_ERROR; } snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port); - elog(LOG, "establishing connection to %s to fetch WAL", state->donor_name); + elog(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL", + state->donor_name, LSN_FORMAT_ARGS(donor_lsn)); state->wp_conn = libpqwp_connect_start(donor->conninfo); if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD) { snprintf(state->err_msg, sizeof(state->err_msg), - "failed to connect to %s:%s to fetch WAL: immediately failed with %s", + "failed to connect to %s to fetch WAL: immediately failed with %s", state->donor_name, PQerrorMessage(state->wp_conn->pg_conn)); NeonWALReaderResetRemote(state); return NEON_WALREAD_ERROR; @@ -260,13 +263,13 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou return NEON_WALREAD_WOULDBLOCK; case WP_EXEC_FAILED: snprintf(state->err_msg, sizeof(state->err_msg), - "get result from %s failed: %s", + "get START_REPLICATION result from %s failed: %s", state->donor_name, PQerrorMessage(state->wp_conn->pg_conn)); NeonWALReaderResetRemote(state); return NEON_WALREAD_ERROR; default: /* can't happen */ snprintf(state->err_msg, sizeof(state->err_msg), - "get result from %s: unexpected result", + "get START_REPLICATION result from %s: unexpected result", state->donor_name); NeonWALReaderResetRemote(state); return NEON_WALREAD_ERROR; diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 6cc3f2a840..7d8e582b6b 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -75,9 +75,9 @@ static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, Safekeeper static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state); static bool AsyncFlush(Safekeeper *sk); static int CompareLsn(const void *a, const void *b); -static char *FormatSafekeeperState(SafekeeperState state); +static char *FormatSafekeeperState(SafekeeperState state, SafekeeperActiveState active_state); static void AssertEventsOkForState(uint32 events, Safekeeper *sk); -static char *FormatEvents(WalProposer *wp, uint32 events); +static char *FormatEvents(uint32 events); WalProposer * WalProposerCreate(WalProposerConfig *config, walproposer_api api) @@ -124,7 +124,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf); wp->safekeeper[wp->n_safekeepers].xlogreader = NULL; - wp->safekeeper[wp->n_safekeepers].flushWrite = false; wp->safekeeper[wp->n_safekeepers].startStreamingAt = InvalidXLogRecPtr; wp->safekeeper[wp->n_safekeepers].streamingAt = InvalidXLogRecPtr; wp->n_safekeepers += 1; @@ -273,7 +272,7 @@ WalProposerPoll(WalProposer *wp) wp->config->safekeeper_connection_timeout)) { walprop_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that", - sk->host, sk->port, FormatSafekeeperState(sk->state), wp->config->safekeeper_connection_timeout); + sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state), wp->config->safekeeper_connection_timeout); ShutdownConnection(sk); } } @@ -308,7 +307,6 @@ ShutdownConnection(Safekeeper *sk) { sk->wp->api.conn_finish(sk); sk->state = SS_OFFLINE; - sk->flushWrite = false; sk->streamingAt = InvalidXLogRecPtr; if (sk->voteResponse.termHistory.entries) @@ -433,8 +431,6 @@ ReconnectSafekeepers(WalProposer *wp) static void AdvancePollState(Safekeeper *sk, uint32 events) { - WalProposer *wp = sk->wp; - /* * Sanity check. We assume further down that the operations don't block * because the socket is ready. @@ -486,7 +482,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) */ case SS_VOTING: walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, - sk->port, FormatSafekeeperState(sk->state)); + sk->port, FormatSafekeeperState(sk->state, sk->active_state)); ResetConnection(sk); return; @@ -515,7 +511,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) */ case SS_IDLE: walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, - sk->port, FormatSafekeeperState(sk->state)); + sk->port, FormatSafekeeperState(sk->state, sk->active_state)); ResetConnection(sk); return; @@ -1188,6 +1184,7 @@ StartStreaming(Safekeeper *sk) * once for a connection. */ sk->state = SS_ACTIVE; + sk->active_state = SS_ACTIVE_SEND; sk->streamingAt = sk->startStreamingAt; /* event set will be updated inside SendMessageToNode */ @@ -1246,9 +1243,13 @@ HandleActiveState(Safekeeper *sk, uint32 events) { WalProposer *wp = sk->wp; - uint32 newEvents = WL_SOCKET_READABLE; - - if (events & WL_SOCKET_WRITEABLE) + /* + * Note: we don't known which socket awoke us (sk or nwr). However, as + * SendAppendRequests always tries to send at least one msg in + * SS_ACTIVE_SEND be careful not to go there if are only after sk + * response, otherwise it'd create busy loop of pings. + */ + if (events & WL_SOCKET_WRITEABLE || sk->active_state == SS_ACTIVE_READ_WAL) if (!SendAppendRequests(sk)) return; @@ -1256,28 +1257,18 @@ HandleActiveState(Safekeeper *sk, uint32 events) if (!RecvAppendResponses(sk)) return; - /* - * We should wait for WL_SOCKET_WRITEABLE event if we have unflushed data - * in the buffer. - * - * LSN comparison checks if we have pending unsent messages. This check - * isn't necessary now, because we always send append messages immediately - * after arrival. But it's good to have it here in case we change this - * behavior in the future. - */ - if (sk->streamingAt != wp->availableLsn || sk->flushWrite) - newEvents |= WL_SOCKET_WRITEABLE; - - wp->api.update_event_set(sk, newEvents); + /* configures event set for yield whatever is the substate */ + wp->api.active_state_update_event_set(sk); } /* * Send WAL messages starting from sk->streamingAt until the end or non-writable - * socket, whichever comes first. Caller should take care of updating event set. - * Even if no unsent WAL is available, at least one empty message will be sent - * as a heartbeat, if socket is ready. + * socket or neon_walreader blocks, whichever comes first; active_state is + * updated accordingly. Caller should take care of updating event set. Even if + * no unsent WAL is available, at least one empty message will be sent as a + * heartbeat, if socket is ready. * - * Can change state if Async* functions encounter errors and reset connection. + * Resets state and kills the connections if any error on them is encountered. * Returns false in this case, true otherwise. */ static bool @@ -1285,11 +1276,11 @@ SendAppendRequests(Safekeeper *sk) { WalProposer *wp = sk->wp; XLogRecPtr endLsn; - AppendRequestHeader *req; PGAsyncWriteResult writeResult; bool sentAnything = false; + AppendRequestHeader *req; - if (sk->flushWrite) + if (sk->active_state == SS_ACTIVE_FLUSH) { if (!AsyncFlush(sk)) @@ -1300,94 +1291,99 @@ SendAppendRequests(Safekeeper *sk) return sk->state == SS_ACTIVE; /* Event set will be updated in the end of HandleActiveState */ - sk->flushWrite = false; + sk->active_state = SS_ACTIVE_SEND; } while (sk->streamingAt != wp->availableLsn || !sentAnything) { - sentAnything = true; - - endLsn = sk->streamingAt; - endLsn += MAX_SEND_SIZE; - - /* if we went beyond available WAL, back off */ - if (endLsn > wp->availableLsn) + if (sk->active_state == SS_ACTIVE_SEND) { - endLsn = wp->availableLsn; + sentAnything = true; + + endLsn = sk->streamingAt; + endLsn += MAX_SEND_SIZE; + + /* if we went beyond available WAL, back off */ + if (endLsn > wp->availableLsn) + { + endLsn = wp->availableLsn; + } + + req = &sk->appendRequest; + PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn); + + walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", + req->endLsn - req->beginLsn, + LSN_FORMAT_ARGS(req->beginLsn), + LSN_FORMAT_ARGS(req->endLsn), + LSN_FORMAT_ARGS(req->commitLsn), + LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port); + + resetStringInfo(&sk->outbuf); + + /* write AppendRequest header */ + appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader)); + enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); + sk->active_state = SS_ACTIVE_READ_WAL; } - req = &sk->appendRequest; - PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn); - - walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", - req->endLsn - req->beginLsn, - LSN_FORMAT_ARGS(req->beginLsn), - LSN_FORMAT_ARGS(req->endLsn), - LSN_FORMAT_ARGS(req->commitLsn), - LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port); - - resetStringInfo(&sk->outbuf); - - /* write AppendRequest header */ - appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader)); - - /* write the WAL itself */ - enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); - /* wal_read will raise error on failure */ - switch (wp->api.wal_read(sk, - &sk->outbuf.data[sk->outbuf.len], - req->beginLsn, - req->endLsn - req->beginLsn)) + if (sk->active_state == SS_ACTIVE_READ_WAL) { - case NEON_WALREAD_SUCCESS: - break; - case NEON_WALREAD_WOULDBLOCK: - walprop_log(LOG, "wal reading wouldblock"); - /* todo */ - ShutdownConnection(sk); - return false; - case NEON_WALREAD_ERROR: - walprop_log(WARNING, "WAL reading for node %s:%s failed: %s", - sk->host, sk->port, - NeonWALReaderErrMsg(sk->xlogreader)); - ShutdownConnection(sk); - return false; - default: - Assert(false); - } + req = &sk->appendRequest; - sk->outbuf.len += req->endLsn - req->beginLsn; + switch (wp->api.wal_read(sk, + &sk->outbuf.data[sk->outbuf.len], + req->beginLsn, + req->endLsn - req->beginLsn)) + { + case NEON_WALREAD_SUCCESS: + break; + case NEON_WALREAD_WOULDBLOCK: + return true; + case NEON_WALREAD_ERROR: + walprop_log(WARNING, "WAL reading for node %s:%s failed: %s", + sk->host, sk->port, + NeonWALReaderErrMsg(sk->xlogreader)); + ShutdownConnection(sk); + return false; + default: + Assert(false); + } - writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len); + sk->outbuf.len += req->endLsn - req->beginLsn; - /* Mark current message as sent, whatever the result is */ - sk->streamingAt = endLsn; + writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len); - switch (writeResult) - { - case PG_ASYNC_WRITE_SUCCESS: - /* Continue writing the next message */ - break; + /* Mark current message as sent, whatever the result is */ + sk->streamingAt = req->endLsn; - case PG_ASYNC_WRITE_TRY_FLUSH: + switch (writeResult) + { + case PG_ASYNC_WRITE_SUCCESS: + /* Continue writing the next message */ + sk->active_state = SS_ACTIVE_SEND; + break; - /* - * * We still need to call PQflush some more to finish the - * job. Caller function will handle this by setting right - * event* set. - */ - sk->flushWrite = true; - return true; + case PG_ASYNC_WRITE_TRY_FLUSH: - case PG_ASYNC_WRITE_FAIL: - walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk)); - ShutdownConnection(sk); - return false; - default: - Assert(false); - return false; + /* + * We still need to call PQflush some more to finish the + * job. Caller function will handle this by setting right + * event set. + */ + sk->active_state = SS_ACTIVE_FLUSH; + return true; + + case PG_ASYNC_WRITE_FAIL: + walprop_log(WARNING, "failed to send to node %s:%s in %s state: %s", + sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state), + wp->api.conn_error_message(sk)); + ShutdownConnection(sk); + return false; + default: + Assert(false); + return false; + } } } @@ -1397,7 +1393,7 @@ SendAppendRequests(Safekeeper *sk) /* * Receive and process all available feedback. * - * Can change state if Async* functions encounter errors and reset connection. + * Resets state and kills the connection if any error on it is encountered. * Returns false in this case, true otherwise. * * NB: This function can call SendMessageToNode and produce new messages. @@ -1580,18 +1576,19 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp) /* * Return safekeeper with active connection from which WAL can be downloaded, or - * none if it doesn't exist. + * none if it doesn't exist. donor_lsn is set to end position of the donor to + * the best of our knowledge. */ Safekeeper * -GetDonor(WalProposer *wp) +GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn) { - XLogRecPtr donor_lsn = InvalidXLogRecPtr; + *donor_lsn = InvalidXLogRecPtr; Safekeeper *donor = NULL; int i; if (wp->n_votes < wp->quorum) { - walprop_log(WARNING, "GetDonor called before elections are winned"); + walprop_log(WARNING, "GetDonor called before elections are won"); return NULL; } @@ -1600,10 +1597,10 @@ GetDonor(WalProposer *wp) * about its position immediately after election before any feedbacks are * sent. */ - if (wp->safekeeper[wp->donor].state == SS_ACTIVE) + if (wp->safekeeper[wp->donor].state >= SS_IDLE) { donor = &wp->safekeeper[wp->donor]; - donor_lsn = wp->propEpochStartLsn; + *donor_lsn = wp->propEpochStartLsn; } /* @@ -1615,10 +1612,10 @@ GetDonor(WalProposer *wp) { Safekeeper *sk = &wp->safekeeper[i]; - if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn >= donor_lsn) + if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > *donor_lsn) { donor = sk; - donor_lsn = sk->appendResponse.flushLsn; + *donor_lsn = sk->appendResponse.flushLsn; } } return donor; @@ -1729,7 +1726,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) case PG_ASYNC_READ_FAIL: walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host, - sk->port, FormatSafekeeperState(sk->state), + sk->port, FormatSafekeeperState(sk->state, sk->active_state), wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; @@ -1769,7 +1766,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) if (tag != anymsg->tag) { walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, - sk->port, FormatSafekeeperState(sk->state)); + sk->port, FormatSafekeeperState(sk->state, sk->active_state)); ResetConnection(sk); return false; } @@ -1840,12 +1837,13 @@ static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState success_state) { WalProposer *wp = sk->wp; - uint32 events; + uint32 sk_events; + uint32 nwr_events; if (!wp->api.conn_blocking_write(sk, msg, msg_size)) { walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk->state), + sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state), wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; @@ -1857,9 +1855,15 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes * If the new state will be waiting for events to happen, update the event * set to wait for those */ - events = SafekeeperStateDesiredEvents(success_state); - if (events) - wp->api.update_event_set(sk, events); + SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events); + + /* + * nwr_events is relevant only during SS_ACTIVE which doesn't user + * BlockingWrite + */ + Assert(!nwr_events); + if (sk_events) + wp->api.update_event_set(sk, sk_events); return true; } @@ -1892,7 +1896,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta return false; case PG_ASYNC_WRITE_FAIL: walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk->state), + sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state), wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; @@ -1931,7 +1935,7 @@ AsyncFlush(Safekeeper *sk) return false; case -1: walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s", - sk->host, sk->port, FormatSafekeeperState(sk->state), + sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state), wp->api.conn_error_message(sk)); ResetConnection(sk); return false; @@ -1961,14 +1965,14 @@ CompareLsn(const void *a, const void *b) * * The strings are intended to be used as a prefix to "state", e.g.: * - * walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state)); + * walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state, sk->active_state)); * * If this sort of phrasing doesn't fit the message, instead use something like: * - * walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state)); + * walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state, sk->active_state)); */ static char * -FormatSafekeeperState(SafekeeperState state) +FormatSafekeeperState(SafekeeperState state, SafekeeperActiveState active_state) { char *return_val = NULL; @@ -2000,7 +2004,18 @@ FormatSafekeeperState(SafekeeperState state) return_val = "idle"; break; case SS_ACTIVE: - return_val = "active"; + switch (active_state) + { + case SS_ACTIVE_SEND: + return_val = "active send"; + break; + case SS_ACTIVE_READ_WAL: + return_val = "active read WAL"; + break; + case SS_ACTIVE_FLUSH: + return_val = "active flush"; + break; + } break; } @@ -2013,22 +2028,20 @@ FormatSafekeeperState(SafekeeperState state) static void AssertEventsOkForState(uint32 events, Safekeeper *sk) { - WalProposer *wp = sk->wp; - uint32 expected = SafekeeperStateDesiredEvents(sk->state); - - /* - * The events are in-line with what we're expecting, under two conditions: - * (a) if we aren't expecting anything, `events` has no read- or - * write-ready component. (b) if we are expecting something, there's - * overlap (i.e. `events & expected != 0`) - */ + uint32 sk_events; + uint32 nwr_events; + uint32 expected; bool events_ok_for_state; /* long name so the `Assert` is more * clear later */ - if (expected == WL_NO_EVENTS) - events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0); - else - events_ok_for_state = ((events & expected) != 0); + SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events); + + /* + * Without one more level of notify target indirection we have no way to + * distinguish which socket woke up us, so just union expected events. + */ + expected = sk_events | nwr_events; + events_ok_for_state = ((events & expected) != 0); if (!events_ok_for_state) { @@ -2037,36 +2050,37 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk) * and then an assertion that's guaranteed to fail. */ walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", - FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk->state)); + FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state)); Assert(events_ok_for_state); } } -/* Returns the set of events a safekeeper in this state should be waiting on +/* Returns the set of events for both safekeeper (sk_events) and neon_walreader + * (nwr_events) sockets a safekeeper in this state should be waiting on. * * This will return WL_NO_EVENTS (= 0) for some events. */ -uint32 -SafekeeperStateDesiredEvents(SafekeeperState state) +void +SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events) { - uint32 result = WL_NO_EVENTS; + *nwr_events = 0; /* nwr_events is empty for most states */ /* If the state doesn't have a modifier, we can check the base state */ - switch (state) + switch (sk->state) { /* Connecting states say what they want in the name */ case SS_CONNECTING_READ: - result = WL_SOCKET_READABLE; - break; + *sk_events = WL_SOCKET_READABLE; + return; case SS_CONNECTING_WRITE: - result = WL_SOCKET_WRITEABLE; - break; + *sk_events = WL_SOCKET_WRITEABLE; + return; /* Reading states need the socket to be read-ready to continue */ case SS_WAIT_EXEC_RESULT: case SS_HANDSHAKE_RECV: case SS_WAIT_VERDICT: - result = WL_SOCKET_READABLE; - break; + *sk_events = WL_SOCKET_READABLE; + return; /* * Idle states use read-readiness as a sign that the connection @@ -2074,32 +2088,56 @@ SafekeeperStateDesiredEvents(SafekeeperState state) */ case SS_VOTING: case SS_IDLE: - result = WL_SOCKET_READABLE; - break; + *sk_events = WL_SOCKET_READABLE; + return; - /* - * Flush states require write-ready for flushing. Active state - * does both reading and writing. - * - * TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We - * should check sk->flushWrite here to set WL_SOCKET_WRITEABLE. - */ case SS_SEND_ELECTED_FLUSH: + *sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; + return; + case SS_ACTIVE: - result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; - break; + switch (sk->active_state) + { + /* + * Everything is sent; we just wait for sk responses and + * latch. + * + * Note: this assumes we send all available WAL to + * safekeeper in one wakeup (unless it blocks). Otherwise + * we would want WL_SOCKET_WRITEABLE here to finish the + * work. + */ + case SS_ACTIVE_SEND: + *sk_events = WL_SOCKET_READABLE; + return; + + /* + * Waiting for neon_walreader socket, but we still read + * responses from sk socket. + */ + case SS_ACTIVE_READ_WAL: + *sk_events = WL_SOCKET_READABLE; + *nwr_events = NeonWALReaderEvents(sk->xlogreader); + return; + + /* + * Need to flush the sk socket, so ignore neon_walreader + * one and set write interest on sk. + */ + case SS_ACTIVE_FLUSH: + *sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; + return; + } + return; /* The offline state expects no events. */ case SS_OFFLINE: - result = WL_NO_EVENTS; - break; + *sk_events = 0; + return; default: Assert(false); - break; } - - return result; } /* Returns a human-readable string corresponding to the event set @@ -2110,7 +2148,7 @@ SafekeeperStateDesiredEvents(SafekeeperState state) * The string should not be freed. It should also not be expected to remain the same between * function calls. */ static char * -FormatEvents(WalProposer *wp, uint32 events) +FormatEvents(uint32 events) { static char return_str[8]; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index f3d161a300..77154c6b57 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -98,8 +98,26 @@ typedef enum SS_IDLE, /* - * Waiting for neon walreader socket to receive chunk of WAL to send to - * this safekeeper. + * Active phase, when we acquired quorum and have WAL to send or feedback + * to read. + */ + SS_ACTIVE, +} SafekeeperState; + +/* + * Sending WAL substates of SS_ACTIVE. + */ +typedef enum +{ + /* + * We are ready to send more WAL, waiting for latch set to learn about + * more WAL becoming available (or just a timeout to send heartbeat). + */ + SS_ACTIVE_SEND, + + /* + * Polling neon_walreader to receive chunk of WAL (probably remotely) to + * send to this safekeeper. * * Note: socket management is done completely inside walproposer_pg for * simplicity, and thus simulation doesn't test it. Which is fine as @@ -112,14 +130,13 @@ typedef enum * problem is unlikely. Vice versa is also true (SS_ACTIVE doesn't handle * walreader socket), but similarly shouldn't be a problem. */ - SS_WAIT_REMOTE_WAL, + SS_ACTIVE_READ_WAL, /* - * Active phase, when we acquired quorum and have WAL to send or feedback - * to read. + * Waiting for write readiness to flush the socket. */ - SS_ACTIVE, -} SafekeeperState; + SS_ACTIVE_FLUSH, +} SafekeeperActiveState; /* Consensus logical timestamp. */ typedef uint64 term_t; @@ -329,12 +346,11 @@ typedef struct Safekeeper */ XLogRecPtr startStreamingAt; - bool flushWrite; /* set to true if we need to call AsyncFlush,* - * to flush pending messages */ XLogRecPtr streamingAt; /* current streaming position */ AppendRequestHeader appendRequest; /* request for sending to safekeeper */ SafekeeperState state; /* safekeeper state machine state */ + SafekeeperActiveState active_state; TimestampTz latestMsgReceivedAt; /* when latest msg is received */ AcceptorGreeting greetResponse; /* acceptor greeting */ VoteResponse voteResponse; /* the vote */ @@ -486,6 +502,9 @@ typedef struct walproposer_api /* Update events for an existing safekeeper connection. */ void (*update_event_set) (Safekeeper *sk, uint32 events); + /* Configure wait event set for yield in SS_ACTIVE. */ + void (*active_state_update_event_set) (Safekeeper *sk); + /* Add a new safekeeper connection to the event set. */ void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events); @@ -681,8 +700,9 @@ extern void WalProposerFree(WalProposer *wp); * WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to * recreate set from scratch, hence the export. */ -extern uint32 SafekeeperStateDesiredEvents(SafekeeperState state); -extern Safekeeper *GetDonor(WalProposer *wp); +extern void SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events); +extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn); + #define WPEVENT 1337 /* special log level for walproposer internal * events */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index a48c5a776b..8368f48187 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1438,38 +1438,22 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count) startptr, count, walprop_pg_get_timeline_id()); - switch (res) + + if (res == NEON_WALREAD_SUCCESS) { - case NEON_WALREAD_SUCCESS: - /* don't wake up us until we send the chunk */ - update_nwr_event_set(sk, 0); - - /* - * If we have the socket subscribed, but walreader doesn't need - * any events, it must mean that remote connection just closed - * hoping to do next read locally. Remove the socket then. - */ - if (NeonWALReaderEvents(sk->xlogreader) == 0) - rm_safekeeper_event_set(sk, false); - return res; - case NEON_WALREAD_WOULDBLOCK: - - /* - * TODO: instead of reattaching socket (and thus recreating WES) - * each time we should keep it if possible, i.e. if connection is - * already established. Note that single neon_walreader object can - * switch between local and remote reads multiple times during its - * lifetime, so careful bookkeeping is needed here. - */ + /* + * If we have the socket subscribed, but walreader doesn't need any + * events, it must mean that remote connection just closed hoping to + * do next read locally. Remove the socket then. It is important to do + * as otherwise next read might open another connection and we won't + * be able to distinguish whether we have correct socket added in wait + * event set. + */ + if (NeonWALReaderEvents(sk->xlogreader) == 0) rm_safekeeper_event_set(sk, false); - add_nwr_event_set(sk, NeonWALReaderEvents(sk->xlogreader)); - return res; - case NEON_WALREAD_ERROR: - rm_safekeeper_event_set(sk, false); - return res; - default: - Assert(false); } + + return res; } static void @@ -1494,6 +1478,7 @@ walprop_pg_free_event_set(WalProposer *wp) for (int i = 0; i < wp->n_safekeepers; i++) { wp->safekeeper[i].eventPos = -1; + wp->safekeeper[i].nwrEventPos = -1; } } @@ -1509,6 +1494,12 @@ walprop_pg_init_event_set(WalProposer *wp) MyLatch, NULL); AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, NULL); + + for (int i = 0; i < wp->n_safekeepers; i++) + { + wp->safekeeper[i].eventPos = -1; + wp->safekeeper[i].nwrEventPos = -1; + } } /* add safekeeper socket to wait event set */ @@ -1525,6 +1516,7 @@ add_nwr_event_set(Safekeeper *sk, uint32 events) { Assert(sk->nwrEventPos == -1); sk->nwrEventPos = AddWaitEventToSet(waitEvents, events, NeonWALReaderSocket(sk->xlogreader), NULL, sk); + elog(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events); } static void @@ -1536,7 +1528,10 @@ walprop_pg_update_event_set(Safekeeper *sk, uint32 events) ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL); } -/* Can be called when nwr socket doesn't exist, does nothing in this case. */ +/* + * Update neon_walreader event. + * Can be called when nwr socket doesn't exist, does nothing in this case. + */ static void update_nwr_event_set(Safekeeper *sk, uint32 events) { @@ -1545,6 +1540,39 @@ update_nwr_event_set(Safekeeper *sk, uint32 events) ModifyWaitEvent(waitEvents, sk->nwrEventPos, events, NULL); } + +static void +walprop_pg_active_state_update_event_set(Safekeeper *sk) +{ + uint32 sk_events; + uint32 nwr_events; + + Assert(sk->state == SS_ACTIVE); + SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events); + + /* + * If we need to wait for neon_walreader, ensure we have up to date socket + * in the wait event set. + */ + if (sk->active_state == SS_ACTIVE_READ_WAL) + { + /* + * TODO: instead of reattaching socket (and thus recreating WES) each + * time we should keep it if possible, i.e. if connection is already + * established. Note that single neon_walreader object can switch + * between local and remote reads multiple times during its lifetime, + * so careful bookkeeping is needed here. + */ + rm_safekeeper_event_set(sk, false); + add_nwr_event_set(sk, nwr_events); + } + else + { + update_nwr_event_set(sk, 0); + } + walprop_pg_update_event_set(sk, sk_events); +} + static void walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove) { @@ -1559,13 +1587,16 @@ walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove) * avoided if possible. * * If is_sk is true, socket of connection to safekeeper is removed; otherwise - * socket of neon wal reader. + * socket of neon_walreader. */ static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk) { WalProposer *wp = to_remove->wp; + elog(DEBUG5, "sk %s:%s: removing event, is_sk %d", + to_remove->host, to_remove->port, is_sk); + /* * Shortpath for exiting if have nothing to do. We never call this * function with safekeeper socket not existing, but do that with neon @@ -1590,7 +1621,6 @@ rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk) */ for (int i = 0; i < wp->n_safekeepers; i++) { - uint32 desired_events = WL_NO_EVENTS; Safekeeper *sk = &wp->safekeeper[i]; if (sk == to_remove) @@ -1599,21 +1629,27 @@ rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk) sk->eventPos = -1; else sk->nwrEventPos = -1; - continue; } - /* If this safekeeper isn't offline, add events for it! */ + /* + * If this safekeeper isn't offline, add events for it, except for the + * event requested to remove. + */ if (sk->state != SS_OFFLINE) { - if (!is_sk) + uint32 sk_events; + uint32 nwr_events; + + SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events); + + if (sk != to_remove || !is_sk) { - desired_events = SafekeeperStateDesiredEvents(sk->state); /* will set sk->eventPos */ - wp->api.add_safekeeper_event_set(sk, desired_events); + wp->api.add_safekeeper_event_set(sk, sk_events); } - else if (NeonWALReaderEvents(sk->xlogreader) != 0) + else if ((sk != to_remove || is_sk) && nwr_events) { - add_nwr_event_set(sk, NeonWALReaderEvents(sk->xlogreader)); + add_nwr_event_set(sk, nwr_events); } } } @@ -1884,6 +1920,7 @@ static const walproposer_api walprop_pg = { .wal_reader_allocate = walprop_pg_wal_reader_allocate, .init_event_set = walprop_pg_init_event_set, .update_event_set = walprop_pg_update_event_set, + .active_state_update_event_set = walprop_pg_active_state_update_event_set, .add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set, .rm_safekeeper_event_set = walprop_pg_rm_safekeeper_event_set, .wait_event_set = walprop_pg_wait_event_set,