Prepare waiteventset.

This commit is contained in:
Arseny Sher
2023-11-24 22:12:25 +03:00
parent 4e2c70b0e8
commit 3ce3ccac0d
4 changed files with 321 additions and 223 deletions

View File

@@ -181,8 +181,10 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
{
if (state->rem_state == RS_NONE)
{
XLogRecPtr donor_lsn;
/* no connection yet; start one */
Safekeeper *donor = GetDonor(state->wp);
Safekeeper *donor = GetDonor(state->wp, &donor_lsn);
if (donor == NULL)
{
@@ -191,12 +193,13 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
return NEON_WALREAD_ERROR;
}
snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port);
elog(LOG, "establishing connection to %s to fetch WAL", state->donor_name);
elog(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL",
state->donor_name, LSN_FORMAT_ARGS(donor_lsn));
state->wp_conn = libpqwp_connect_start(donor->conninfo);
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
{
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to connect to %s:%s to fetch WAL: immediately failed with %s",
"failed to connect to %s to fetch WAL: immediately failed with %s",
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
@@ -260,13 +263,13 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
return NEON_WALREAD_WOULDBLOCK;
case WP_EXEC_FAILED:
snprintf(state->err_msg, sizeof(state->err_msg),
"get result from %s failed: %s",
"get START_REPLICATION result from %s failed: %s",
state->donor_name, PQerrorMessage(state->wp_conn->pg_conn));
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;
default: /* can't happen */
snprintf(state->err_msg, sizeof(state->err_msg),
"get result from %s: unexpected result",
"get START_REPLICATION result from %s: unexpected result",
state->donor_name);
NeonWALReaderResetRemote(state);
return NEON_WALREAD_ERROR;

View File

@@ -75,9 +75,9 @@ static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, Safekeeper
static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_state);
static bool AsyncFlush(Safekeeper *sk);
static int CompareLsn(const void *a, const void *b);
static char *FormatSafekeeperState(SafekeeperState state);
static char *FormatSafekeeperState(SafekeeperState state, SafekeeperActiveState active_state);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static char *FormatEvents(WalProposer *wp, uint32 events);
static char *FormatEvents(uint32 events);
WalProposer *
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
@@ -124,7 +124,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf);
wp->safekeeper[wp->n_safekeepers].xlogreader = NULL;
wp->safekeeper[wp->n_safekeepers].flushWrite = false;
wp->safekeeper[wp->n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
wp->safekeeper[wp->n_safekeepers].streamingAt = InvalidXLogRecPtr;
wp->n_safekeepers += 1;
@@ -273,7 +272,7 @@ WalProposerPoll(WalProposer *wp)
wp->config->safekeeper_connection_timeout))
{
walprop_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that",
sk->host, sk->port, FormatSafekeeperState(sk->state), wp->config->safekeeper_connection_timeout);
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state), wp->config->safekeeper_connection_timeout);
ShutdownConnection(sk);
}
}
@@ -308,7 +307,6 @@ ShutdownConnection(Safekeeper *sk)
{
sk->wp->api.conn_finish(sk);
sk->state = SS_OFFLINE;
sk->flushWrite = false;
sk->streamingAt = InvalidXLogRecPtr;
if (sk->voteResponse.termHistory.entries)
@@ -433,8 +431,6 @@ ReconnectSafekeepers(WalProposer *wp)
static void
AdvancePollState(Safekeeper *sk, uint32 events)
{
WalProposer *wp = sk->wp;
/*
* Sanity check. We assume further down that the operations don't block
* because the socket is ready.
@@ -486,7 +482,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
*/
case SS_VOTING:
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk->state));
sk->port, FormatSafekeeperState(sk->state, sk->active_state));
ResetConnection(sk);
return;
@@ -515,7 +511,7 @@ AdvancePollState(Safekeeper *sk, uint32 events)
*/
case SS_IDLE:
walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host,
sk->port, FormatSafekeeperState(sk->state));
sk->port, FormatSafekeeperState(sk->state, sk->active_state));
ResetConnection(sk);
return;
@@ -1188,6 +1184,7 @@ StartStreaming(Safekeeper *sk)
* once for a connection.
*/
sk->state = SS_ACTIVE;
sk->active_state = SS_ACTIVE_SEND;
sk->streamingAt = sk->startStreamingAt;
/* event set will be updated inside SendMessageToNode */
@@ -1246,9 +1243,13 @@ HandleActiveState(Safekeeper *sk, uint32 events)
{
WalProposer *wp = sk->wp;
uint32 newEvents = WL_SOCKET_READABLE;
if (events & WL_SOCKET_WRITEABLE)
/*
* Note: we don't known which socket awoke us (sk or nwr). However, as
* SendAppendRequests always tries to send at least one msg in
* SS_ACTIVE_SEND be careful not to go there if are only after sk
* response, otherwise it'd create busy loop of pings.
*/
if (events & WL_SOCKET_WRITEABLE || sk->active_state == SS_ACTIVE_READ_WAL)
if (!SendAppendRequests(sk))
return;
@@ -1256,28 +1257,18 @@ HandleActiveState(Safekeeper *sk, uint32 events)
if (!RecvAppendResponses(sk))
return;
/*
* We should wait for WL_SOCKET_WRITEABLE event if we have unflushed data
* in the buffer.
*
* LSN comparison checks if we have pending unsent messages. This check
* isn't necessary now, because we always send append messages immediately
* after arrival. But it's good to have it here in case we change this
* behavior in the future.
*/
if (sk->streamingAt != wp->availableLsn || sk->flushWrite)
newEvents |= WL_SOCKET_WRITEABLE;
wp->api.update_event_set(sk, newEvents);
/* configures event set for yield whatever is the substate */
wp->api.active_state_update_event_set(sk);
}
/*
* Send WAL messages starting from sk->streamingAt until the end or non-writable
* socket, whichever comes first. Caller should take care of updating event set.
* Even if no unsent WAL is available, at least one empty message will be sent
* as a heartbeat, if socket is ready.
* socket or neon_walreader blocks, whichever comes first; active_state is
* updated accordingly. Caller should take care of updating event set. Even if
* no unsent WAL is available, at least one empty message will be sent as a
* heartbeat, if socket is ready.
*
* Can change state if Async* functions encounter errors and reset connection.
* Resets state and kills the connections if any error on them is encountered.
* Returns false in this case, true otherwise.
*/
static bool
@@ -1285,11 +1276,11 @@ SendAppendRequests(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
XLogRecPtr endLsn;
AppendRequestHeader *req;
PGAsyncWriteResult writeResult;
bool sentAnything = false;
AppendRequestHeader *req;
if (sk->flushWrite)
if (sk->active_state == SS_ACTIVE_FLUSH)
{
if (!AsyncFlush(sk))
@@ -1300,94 +1291,99 @@ SendAppendRequests(Safekeeper *sk)
return sk->state == SS_ACTIVE;
/* Event set will be updated in the end of HandleActiveState */
sk->flushWrite = false;
sk->active_state = SS_ACTIVE_SEND;
}
while (sk->streamingAt != wp->availableLsn || !sentAnything)
{
sentAnything = true;
endLsn = sk->streamingAt;
endLsn += MAX_SEND_SIZE;
/* if we went beyond available WAL, back off */
if (endLsn > wp->availableLsn)
if (sk->active_state == SS_ACTIVE_SEND)
{
endLsn = wp->availableLsn;
sentAnything = true;
endLsn = sk->streamingAt;
endLsn += MAX_SEND_SIZE;
/* if we went beyond available WAL, back off */
if (endLsn > wp->availableLsn)
{
endLsn = wp->availableLsn;
}
req = &sk->appendRequest;
PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn);
walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
LSN_FORMAT_ARGS(req->commitLsn),
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
resetStringInfo(&sk->outbuf);
/* write AppendRequest header */
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
sk->active_state = SS_ACTIVE_READ_WAL;
}
req = &sk->appendRequest;
PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn);
walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
req->endLsn - req->beginLsn,
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->endLsn),
LSN_FORMAT_ARGS(req->commitLsn),
LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port);
resetStringInfo(&sk->outbuf);
/* write AppendRequest header */
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
/* write the WAL itself */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
/* wal_read will raise error on failure */
switch (wp->api.wal_read(sk,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn))
if (sk->active_state == SS_ACTIVE_READ_WAL)
{
case NEON_WALREAD_SUCCESS:
break;
case NEON_WALREAD_WOULDBLOCK:
walprop_log(LOG, "wal reading wouldblock");
/* todo */
ShutdownConnection(sk);
return false;
case NEON_WALREAD_ERROR:
walprop_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port,
NeonWALReaderErrMsg(sk->xlogreader));
ShutdownConnection(sk);
return false;
default:
Assert(false);
}
req = &sk->appendRequest;
sk->outbuf.len += req->endLsn - req->beginLsn;
switch (wp->api.wal_read(sk,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req->endLsn - req->beginLsn))
{
case NEON_WALREAD_SUCCESS:
break;
case NEON_WALREAD_WOULDBLOCK:
return true;
case NEON_WALREAD_ERROR:
walprop_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port,
NeonWALReaderErrMsg(sk->xlogreader));
ShutdownConnection(sk);
return false;
default:
Assert(false);
}
writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len);
sk->outbuf.len += req->endLsn - req->beginLsn;
/* Mark current message as sent, whatever the result is */
sk->streamingAt = endLsn;
writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len);
switch (writeResult)
{
case PG_ASYNC_WRITE_SUCCESS:
/* Continue writing the next message */
break;
/* Mark current message as sent, whatever the result is */
sk->streamingAt = req->endLsn;
case PG_ASYNC_WRITE_TRY_FLUSH:
switch (writeResult)
{
case PG_ASYNC_WRITE_SUCCESS:
/* Continue writing the next message */
sk->active_state = SS_ACTIVE_SEND;
break;
/*
* * We still need to call PQflush some more to finish the
* job. Caller function will handle this by setting right
* event* set.
*/
sk->flushWrite = true;
return true;
case PG_ASYNC_WRITE_TRY_FLUSH:
case PG_ASYNC_WRITE_FAIL:
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
default:
Assert(false);
return false;
/*
* We still need to call PQflush some more to finish the
* job. Caller function will handle this by setting right
* event set.
*/
sk->active_state = SS_ACTIVE_FLUSH;
return true;
case PG_ASYNC_WRITE_FAIL:
walprop_log(WARNING, "failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
default:
Assert(false);
return false;
}
}
}
@@ -1397,7 +1393,7 @@ SendAppendRequests(Safekeeper *sk)
/*
* Receive and process all available feedback.
*
* Can change state if Async* functions encounter errors and reset connection.
* Resets state and kills the connection if any error on it is encountered.
* Returns false in this case, true otherwise.
*
* NB: This function can call SendMessageToNode and produce new messages.
@@ -1580,18 +1576,19 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
/*
* Return safekeeper with active connection from which WAL can be downloaded, or
* none if it doesn't exist.
* none if it doesn't exist. donor_lsn is set to end position of the donor to
* the best of our knowledge.
*/
Safekeeper *
GetDonor(WalProposer *wp)
GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
{
XLogRecPtr donor_lsn = InvalidXLogRecPtr;
*donor_lsn = InvalidXLogRecPtr;
Safekeeper *donor = NULL;
int i;
if (wp->n_votes < wp->quorum)
{
walprop_log(WARNING, "GetDonor called before elections are winned");
walprop_log(WARNING, "GetDonor called before elections are won");
return NULL;
}
@@ -1600,10 +1597,10 @@ GetDonor(WalProposer *wp)
* about its position immediately after election before any feedbacks are
* sent.
*/
if (wp->safekeeper[wp->donor].state == SS_ACTIVE)
if (wp->safekeeper[wp->donor].state >= SS_IDLE)
{
donor = &wp->safekeeper[wp->donor];
donor_lsn = wp->propEpochStartLsn;
*donor_lsn = wp->propEpochStartLsn;
}
/*
@@ -1615,10 +1612,10 @@ GetDonor(WalProposer *wp)
{
Safekeeper *sk = &wp->safekeeper[i];
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn >= donor_lsn)
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > *donor_lsn)
{
donor = sk;
donor_lsn = sk->appendResponse.flushLsn;
*donor_lsn = sk->appendResponse.flushLsn;
}
}
return donor;
@@ -1729,7 +1726,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
case PG_ASYNC_READ_FAIL:
walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host,
sk->port, FormatSafekeeperState(sk->state),
sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
@@ -1769,7 +1766,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
if (tag != anymsg->tag)
{
walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk->state));
sk->port, FormatSafekeeperState(sk->state, sk->active_state));
ResetConnection(sk);
return false;
}
@@ -1840,12 +1837,13 @@ static bool
BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState success_state)
{
WalProposer *wp = sk->wp;
uint32 events;
uint32 sk_events;
uint32 nwr_events;
if (!wp->api.conn_blocking_write(sk, msg, msg_size))
{
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
@@ -1857,9 +1855,15 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes
* If the new state will be waiting for events to happen, update the event
* set to wait for those
*/
events = SafekeeperStateDesiredEvents(success_state);
if (events)
wp->api.update_event_set(sk, events);
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
/*
* nwr_events is relevant only during SS_ACTIVE which doesn't user
* BlockingWrite
*/
Assert(!nwr_events);
if (sk_events)
wp->api.update_event_set(sk, sk_events);
return true;
}
@@ -1892,7 +1896,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta
return false;
case PG_ASYNC_WRITE_FAIL:
walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return false;
@@ -1931,7 +1935,7 @@ AsyncFlush(Safekeeper *sk)
return false;
case -1:
walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s",
sk->host, sk->port, FormatSafekeeperState(sk->state),
sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state),
wp->api.conn_error_message(sk));
ResetConnection(sk);
return false;
@@ -1961,14 +1965,14 @@ CompareLsn(const void *a, const void *b)
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
* walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state, sk->active_state));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
* walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state, sk->active_state));
*/
static char *
FormatSafekeeperState(SafekeeperState state)
FormatSafekeeperState(SafekeeperState state, SafekeeperActiveState active_state)
{
char *return_val = NULL;
@@ -2000,7 +2004,18 @@ FormatSafekeeperState(SafekeeperState state)
return_val = "idle";
break;
case SS_ACTIVE:
return_val = "active";
switch (active_state)
{
case SS_ACTIVE_SEND:
return_val = "active send";
break;
case SS_ACTIVE_READ_WAL:
return_val = "active read WAL";
break;
case SS_ACTIVE_FLUSH:
return_val = "active flush";
break;
}
break;
}
@@ -2013,22 +2028,20 @@ FormatSafekeeperState(SafekeeperState state)
static void
AssertEventsOkForState(uint32 events, Safekeeper *sk)
{
WalProposer *wp = sk->wp;
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
/*
* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component. (b) if we are expecting something, there's
* overlap (i.e. `events & expected != 0`)
*/
uint32 sk_events;
uint32 nwr_events;
uint32 expected;
bool events_ok_for_state; /* long name so the `Assert` is more
* clear later */
if (expected == WL_NO_EVENTS)
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
else
events_ok_for_state = ((events & expected) != 0);
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
/*
* Without one more level of notify target indirection we have no way to
* distinguish which socket woke up us, so just union expected events.
*/
expected = sk_events | nwr_events;
events_ok_for_state = ((events & expected) != 0);
if (!events_ok_for_state)
{
@@ -2037,36 +2050,37 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk)
* and then an assertion that's guaranteed to fail.
*/
walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk->state));
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state, sk->active_state));
Assert(events_ok_for_state);
}
}
/* Returns the set of events a safekeeper in this state should be waiting on
/* Returns the set of events for both safekeeper (sk_events) and neon_walreader
* (nwr_events) sockets a safekeeper in this state should be waiting on.
*
* This will return WL_NO_EVENTS (= 0) for some events. */
uint32
SafekeeperStateDesiredEvents(SafekeeperState state)
void
SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events)
{
uint32 result = WL_NO_EVENTS;
*nwr_events = 0; /* nwr_events is empty for most states */
/* If the state doesn't have a modifier, we can check the base state */
switch (state)
switch (sk->state)
{
/* Connecting states say what they want in the name */
case SS_CONNECTING_READ:
result = WL_SOCKET_READABLE;
break;
*sk_events = WL_SOCKET_READABLE;
return;
case SS_CONNECTING_WRITE:
result = WL_SOCKET_WRITEABLE;
break;
*sk_events = WL_SOCKET_WRITEABLE;
return;
/* Reading states need the socket to be read-ready to continue */
case SS_WAIT_EXEC_RESULT:
case SS_HANDSHAKE_RECV:
case SS_WAIT_VERDICT:
result = WL_SOCKET_READABLE;
break;
*sk_events = WL_SOCKET_READABLE;
return;
/*
* Idle states use read-readiness as a sign that the connection
@@ -2074,32 +2088,56 @@ SafekeeperStateDesiredEvents(SafekeeperState state)
*/
case SS_VOTING:
case SS_IDLE:
result = WL_SOCKET_READABLE;
break;
*sk_events = WL_SOCKET_READABLE;
return;
/*
* Flush states require write-ready for flushing. Active state
* does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
case SS_SEND_ELECTED_FLUSH:
*sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
return;
case SS_ACTIVE:
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
break;
switch (sk->active_state)
{
/*
* Everything is sent; we just wait for sk responses and
* latch.
*
* Note: this assumes we send all available WAL to
* safekeeper in one wakeup (unless it blocks). Otherwise
* we would want WL_SOCKET_WRITEABLE here to finish the
* work.
*/
case SS_ACTIVE_SEND:
*sk_events = WL_SOCKET_READABLE;
return;
/*
* Waiting for neon_walreader socket, but we still read
* responses from sk socket.
*/
case SS_ACTIVE_READ_WAL:
*sk_events = WL_SOCKET_READABLE;
*nwr_events = NeonWALReaderEvents(sk->xlogreader);
return;
/*
* Need to flush the sk socket, so ignore neon_walreader
* one and set write interest on sk.
*/
case SS_ACTIVE_FLUSH:
*sk_events = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
return;
}
return;
/* The offline state expects no events. */
case SS_OFFLINE:
result = WL_NO_EVENTS;
break;
*sk_events = 0;
return;
default:
Assert(false);
break;
}
return result;
}
/* Returns a human-readable string corresponding to the event set
@@ -2110,7 +2148,7 @@ SafekeeperStateDesiredEvents(SafekeeperState state)
* The string should not be freed. It should also not be expected to remain the same between
* function calls. */
static char *
FormatEvents(WalProposer *wp, uint32 events)
FormatEvents(uint32 events)
{
static char return_str[8];

View File

@@ -98,8 +98,26 @@ typedef enum
SS_IDLE,
/*
* Waiting for neon walreader socket to receive chunk of WAL to send to
* this safekeeper.
* Active phase, when we acquired quorum and have WAL to send or feedback
* to read.
*/
SS_ACTIVE,
} SafekeeperState;
/*
* Sending WAL substates of SS_ACTIVE.
*/
typedef enum
{
/*
* We are ready to send more WAL, waiting for latch set to learn about
* more WAL becoming available (or just a timeout to send heartbeat).
*/
SS_ACTIVE_SEND,
/*
* Polling neon_walreader to receive chunk of WAL (probably remotely) to
* send to this safekeeper.
*
* Note: socket management is done completely inside walproposer_pg for
* simplicity, and thus simulation doesn't test it. Which is fine as
@@ -112,14 +130,13 @@ typedef enum
* problem is unlikely. Vice versa is also true (SS_ACTIVE doesn't handle
* walreader socket), but similarly shouldn't be a problem.
*/
SS_WAIT_REMOTE_WAL,
SS_ACTIVE_READ_WAL,
/*
* Active phase, when we acquired quorum and have WAL to send or feedback
* to read.
* Waiting for write readiness to flush the socket.
*/
SS_ACTIVE,
} SafekeeperState;
SS_ACTIVE_FLUSH,
} SafekeeperActiveState;
/* Consensus logical timestamp. */
typedef uint64 term_t;
@@ -329,12 +346,11 @@ typedef struct Safekeeper
*/
XLogRecPtr startStreamingAt;
bool flushWrite; /* set to true if we need to call AsyncFlush,*
* to flush pending messages */
XLogRecPtr streamingAt; /* current streaming position */
AppendRequestHeader appendRequest; /* request for sending to safekeeper */
SafekeeperState state; /* safekeeper state machine state */
SafekeeperActiveState active_state;
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
@@ -486,6 +502,9 @@ typedef struct walproposer_api
/* Update events for an existing safekeeper connection. */
void (*update_event_set) (Safekeeper *sk, uint32 events);
/* Configure wait event set for yield in SS_ACTIVE. */
void (*active_state_update_event_set) (Safekeeper *sk);
/* Add a new safekeeper connection to the event set. */
void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events);
@@ -681,8 +700,9 @@ extern void WalProposerFree(WalProposer *wp);
* WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to
* recreate set from scratch, hence the export.
*/
extern uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
extern Safekeeper *GetDonor(WalProposer *wp);
extern void SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events);
extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn);
#define WPEVENT 1337 /* special log level for walproposer internal
* events */

View File

@@ -1438,38 +1438,22 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count)
startptr,
count,
walprop_pg_get_timeline_id());
switch (res)
if (res == NEON_WALREAD_SUCCESS)
{
case NEON_WALREAD_SUCCESS:
/* don't wake up us until we send the chunk */
update_nwr_event_set(sk, 0);
/*
* If we have the socket subscribed, but walreader doesn't need
* any events, it must mean that remote connection just closed
* hoping to do next read locally. Remove the socket then.
*/
if (NeonWALReaderEvents(sk->xlogreader) == 0)
rm_safekeeper_event_set(sk, false);
return res;
case NEON_WALREAD_WOULDBLOCK:
/*
* TODO: instead of reattaching socket (and thus recreating WES)
* each time we should keep it if possible, i.e. if connection is
* already established. Note that single neon_walreader object can
* switch between local and remote reads multiple times during its
* lifetime, so careful bookkeeping is needed here.
*/
/*
* If we have the socket subscribed, but walreader doesn't need any
* events, it must mean that remote connection just closed hoping to
* do next read locally. Remove the socket then. It is important to do
* as otherwise next read might open another connection and we won't
* be able to distinguish whether we have correct socket added in wait
* event set.
*/
if (NeonWALReaderEvents(sk->xlogreader) == 0)
rm_safekeeper_event_set(sk, false);
add_nwr_event_set(sk, NeonWALReaderEvents(sk->xlogreader));
return res;
case NEON_WALREAD_ERROR:
rm_safekeeper_event_set(sk, false);
return res;
default:
Assert(false);
}
return res;
}
static void
@@ -1494,6 +1478,7 @@ walprop_pg_free_event_set(WalProposer *wp)
for (int i = 0; i < wp->n_safekeepers; i++)
{
wp->safekeeper[i].eventPos = -1;
wp->safekeeper[i].nwrEventPos = -1;
}
}
@@ -1509,6 +1494,12 @@ walprop_pg_init_event_set(WalProposer *wp)
MyLatch, NULL);
AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
for (int i = 0; i < wp->n_safekeepers; i++)
{
wp->safekeeper[i].eventPos = -1;
wp->safekeeper[i].nwrEventPos = -1;
}
}
/* add safekeeper socket to wait event set */
@@ -1525,6 +1516,7 @@ add_nwr_event_set(Safekeeper *sk, uint32 events)
{
Assert(sk->nwrEventPos == -1);
sk->nwrEventPos = AddWaitEventToSet(waitEvents, events, NeonWALReaderSocket(sk->xlogreader), NULL, sk);
elog(DEBUG5, "sk %s:%s: added nwr socket events %d", sk->host, sk->port, events);
}
static void
@@ -1536,7 +1528,10 @@ walprop_pg_update_event_set(Safekeeper *sk, uint32 events)
ModifyWaitEvent(waitEvents, sk->eventPos, events, NULL);
}
/* Can be called when nwr socket doesn't exist, does nothing in this case. */
/*
* Update neon_walreader event.
* Can be called when nwr socket doesn't exist, does nothing in this case.
*/
static void
update_nwr_event_set(Safekeeper *sk, uint32 events)
{
@@ -1545,6 +1540,39 @@ update_nwr_event_set(Safekeeper *sk, uint32 events)
ModifyWaitEvent(waitEvents, sk->nwrEventPos, events, NULL);
}
static void
walprop_pg_active_state_update_event_set(Safekeeper *sk)
{
uint32 sk_events;
uint32 nwr_events;
Assert(sk->state == SS_ACTIVE);
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
/*
* If we need to wait for neon_walreader, ensure we have up to date socket
* in the wait event set.
*/
if (sk->active_state == SS_ACTIVE_READ_WAL)
{
/*
* TODO: instead of reattaching socket (and thus recreating WES) each
* time we should keep it if possible, i.e. if connection is already
* established. Note that single neon_walreader object can switch
* between local and remote reads multiple times during its lifetime,
* so careful bookkeeping is needed here.
*/
rm_safekeeper_event_set(sk, false);
add_nwr_event_set(sk, nwr_events);
}
else
{
update_nwr_event_set(sk, 0);
}
walprop_pg_update_event_set(sk, sk_events);
}
static void
walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove)
{
@@ -1559,13 +1587,16 @@ walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove)
* avoided if possible.
*
* If is_sk is true, socket of connection to safekeeper is removed; otherwise
* socket of neon wal reader.
* socket of neon_walreader.
*/
static void
rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk)
{
WalProposer *wp = to_remove->wp;
elog(DEBUG5, "sk %s:%s: removing event, is_sk %d",
to_remove->host, to_remove->port, is_sk);
/*
* Shortpath for exiting if have nothing to do. We never call this
* function with safekeeper socket not existing, but do that with neon
@@ -1590,7 +1621,6 @@ rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk)
*/
for (int i = 0; i < wp->n_safekeepers; i++)
{
uint32 desired_events = WL_NO_EVENTS;
Safekeeper *sk = &wp->safekeeper[i];
if (sk == to_remove)
@@ -1599,21 +1629,27 @@ rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk)
sk->eventPos = -1;
else
sk->nwrEventPos = -1;
continue;
}
/* If this safekeeper isn't offline, add events for it! */
/*
* If this safekeeper isn't offline, add events for it, except for the
* event requested to remove.
*/
if (sk->state != SS_OFFLINE)
{
if (!is_sk)
uint32 sk_events;
uint32 nwr_events;
SafekeeperStateDesiredEvents(sk, &sk_events, &nwr_events);
if (sk != to_remove || !is_sk)
{
desired_events = SafekeeperStateDesiredEvents(sk->state);
/* will set sk->eventPos */
wp->api.add_safekeeper_event_set(sk, desired_events);
wp->api.add_safekeeper_event_set(sk, sk_events);
}
else if (NeonWALReaderEvents(sk->xlogreader) != 0)
else if ((sk != to_remove || is_sk) && nwr_events)
{
add_nwr_event_set(sk, NeonWALReaderEvents(sk->xlogreader));
add_nwr_event_set(sk, nwr_events);
}
}
}
@@ -1884,6 +1920,7 @@ static const walproposer_api walprop_pg = {
.wal_reader_allocate = walprop_pg_wal_reader_allocate,
.init_event_set = walprop_pg_init_event_set,
.update_event_set = walprop_pg_update_event_set,
.active_state_update_event_set = walprop_pg_active_state_update_event_set,
.add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set,
.rm_safekeeper_event_set = walprop_pg_rm_safekeeper_event_set,
.wait_event_set = walprop_pg_wait_event_set,