Compare commits

...

17 Commits

Author SHA1 Message Date
Arseny Sher
c811ae0b91 convert appendresponse 2025-02-04 10:02:43 +01:00
Arseny Sher
777afbafe5 convert appendrequest 2025-01-30 13:21:40 +01:00
Arseny Sher
0d0cd16ea2 convert ProposerElected 2025-01-30 12:23:27 +01:00
Arseny Sher
217309c7ef remove propelected.timeline_start_lsn usages in sk 2025-01-30 11:29:14 +01:00
Arseny Sher
976afcee26 Convert VoteResponse, remove timeline_start_lsn. 2025-01-30 11:10:58 +01:00
Arseny Sher
20e974ecdd convert voterequest 2025-01-29 12:31:22 +01:00
Arseny Sher
eb43f65055 move greeting init v2 to PAMessageSerialize 2025-01-29 10:37:02 +01:00
Arseny Sher
96d67abd50 wp: print mconf 2025-01-28 13:42:53 +01:00
Arseny Sher
f7485c4459 convert acceptorgreeting 2025-01-23 10:26:13 +01:00
Arseny Sher
4ea7b22537 rename sk members to m for brevity 2025-01-22 13:52:30 +01:00
Arseny Sher
10a7878230 infra for v3 acceptor -> proposer msgs 2025-01-22 12:54:50 +01:00
Arseny Sher
c19a8b69f2 convert ProposerGreeting 2025-01-22 12:07:21 +01:00
Arseny Sher
8be17724d8 Add two safekeeper proto versions to test_normal_work. 2025-01-20 11:22:14 +01:00
Arseny Sher
234c3a29df Pass proto selection to safekeeper, prepare parsing v3. 2025-01-20 11:18:26 +01:00
Arseny Sher
db5513076a Add PAMessageSerialize and ProposerAcceptorGreeting v3 2025-01-15 16:29:20 +01:00
Arseny Sher
70d4e077a6 pgindent wp code 2025-01-15 16:29:20 +01:00
Arseny Sher
ae9db8975a Add START_WAL_PUSH proto_version and allow_timeline_creation options. 2025-01-15 16:29:20 +01:00
15 changed files with 1392 additions and 410 deletions

View File

@@ -38,14 +38,12 @@ impl Display for SafekeeperId {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(transparent)]
pub struct MemberSet {
pub members: Vec<SafekeeperId>,
pub m: Vec<SafekeeperId>,
}
impl MemberSet {
pub fn empty() -> Self {
MemberSet {
members: Vec::new(),
}
MemberSet { m: Vec::new() }
}
pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
@@ -53,11 +51,11 @@ impl MemberSet {
if hs.len() != members.len() {
bail!("duplicate safekeeper id in the set {:?}", members);
}
Ok(MemberSet { members })
Ok(MemberSet { m: members })
}
pub fn contains(&self, sk: &SafekeeperId) -> bool {
self.members.iter().any(|m| m.id == sk.id)
self.m.iter().any(|m| m.id == sk.id)
}
pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
@@ -67,7 +65,7 @@ impl MemberSet {
sk.id, self
));
}
self.members.push(sk);
self.m.push(sk);
Ok(())
}
}
@@ -75,11 +73,7 @@ impl MemberSet {
impl Display for MemberSet {
/// Display as a comma separated list of members.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let sks_str = self
.members
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>();
let sks_str = self.m.iter().map(|sk| sk.to_string()).collect::<Vec<_>>();
write!(f, "({})", sks_str.join(", "))
}
}

View File

@@ -215,6 +215,7 @@ impl Wrapper {
syncSafekeepers: config.sync_safekeepers,
systemId: 0,
pgTimeline: 1,
proto_version: 2,
callback_data,
};
let c_config = Box::into_raw(Box::new(c_config));

View File

@@ -51,6 +51,26 @@ HexDecodeString(uint8 *result, char *input, int nbytes)
return true;
}
/* --------------------------------
* pq_getmsgint16 - get a binary 2-byte int from a message buffer
* --------------------------------
*/
uint16
pq_getmsgint16(StringInfo msg)
{
return pq_getmsgint(msg, 2);
}
/* --------------------------------
* pq_getmsgint32 - get a binary 4-byte int from a message buffer
* --------------------------------
*/
uint32
pq_getmsgint32(StringInfo msg)
{
return pq_getmsgint(msg, 4);
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------

View File

@@ -8,6 +8,8 @@
#endif
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint16 pq_getmsgint16(StringInfo msg);
uint32 pq_getmsgint32(StringInfo msg);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);

View File

@@ -70,6 +70,7 @@ static bool SendAppendRequests(Safekeeper *sk);
static bool RecvAppendResponses(Safekeeper *sk);
static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp);
static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp);
static void PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version);
static void HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk);
static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size);
static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg);
@@ -81,6 +82,8 @@ static char *FormatSafekeeperState(Safekeeper *sk);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static char *FormatEvents(WalProposer *wp, uint32 events);
static void UpdateDonorShmem(WalProposer *wp);
static char *MembershipConfigurationToString(MembershipConfiguration *mconf);
static void MembershipConfigurationFree(MembershipConfiguration *mconf);
WalProposer *
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
@@ -137,25 +140,21 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
wp->quorum = wp->n_safekeepers / 2 + 1;
if (wp->config->proto_version != 2 && wp->config->proto_version != 3)
wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version);
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
/* Fill the greeting package */
wp->greetRequest.tag = 'g';
wp->greetRequest.protocolVersion = SK_PROTOCOL_VERSION;
wp->greetRequest.pgVersion = PG_VERSION_NUM;
wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId));
wp->greetRequest.systemId = wp->config->systemId;
if (!wp->config->neon_timeline)
wp_log(FATAL, "neon.timeline_id is not provided");
if (*wp->config->neon_timeline != '\0' &&
!HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16))
wp_log(FATAL, "could not parse neon.timeline_id, %s", wp->config->neon_timeline);
wp->greetRequest.pam.tag = 'g';
if (!wp->config->neon_tenant)
wp_log(FATAL, "neon.tenant_id is not provided");
if (*wp->config->neon_tenant != '\0' &&
!HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16))
wp_log(FATAL, "could not parse neon.tenant_id, %s", wp->config->neon_tenant);
wp->greetRequest.timeline = wp->config->pgTimeline;
wp->greetRequest.walSegSize = wp->config->wal_segment_size;
wp->greetRequest.tenant_id = wp->config->neon_tenant;
if (!wp->config->neon_timeline)
wp_log(FATAL, "neon.timeline_id is not provided");
wp->greetRequest.timeline_id = wp->config->neon_timeline;
wp->greetRequest.pg_version = PG_VERSION_NUM;
wp->greetRequest.system_id = wp->config->systemId;
wp->greetRequest.wal_seg_size = wp->config->wal_segment_size;
wp->api.init_event_set(wp);
@@ -165,12 +164,14 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
void
WalProposerFree(WalProposer *wp)
{
MembershipConfigurationFree(&wp->mconf);
for (int i = 0; i < wp->n_safekeepers; i++)
{
Safekeeper *sk = &wp->safekeeper[i];
Assert(sk->outbuf.data != NULL);
pfree(sk->outbuf.data);
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
sk->voteResponse.termHistory.entries = NULL;
@@ -308,6 +309,7 @@ ShutdownConnection(Safekeeper *sk)
sk->state = SS_OFFLINE;
sk->streamingAt = InvalidXLogRecPtr;
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
sk->voteResponse.termHistory.entries = NULL;
@@ -598,11 +600,14 @@ static void
SendStartWALPush(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
#define CMD_LEN 512
char cmd[CMD_LEN];
if (!wp->api.conn_send_query(sk, "START_WAL_PUSH"))
snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version);
if (!wp->api.conn_send_query(sk, cmd))
{
wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to send %s query to safekeeper %s:%s: %s",
cmd, sk->host, sk->port, wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return;
}
@@ -658,23 +663,33 @@ RecvStartWALPushResult(Safekeeper *sk)
/*
* Start handshake: first of all send information about the
* safekeeper. After sending, we wait on SS_HANDSHAKE_RECV for
* walproposer. After sending, we wait on SS_HANDSHAKE_RECV for
* a response to finish the handshake.
*/
static void
SendProposerGreeting(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
char *mconf_toml = MembershipConfigurationToString(&wp->greetRequest.mconf);
wp_log(LOG, "sending ProposerGreeting to safekeeper %s:%s with mconf = %s", sk->host, sk->port, mconf_toml);
pfree(mconf_toml);
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->greetRequest,
&sk->outbuf, wp->config->proto_version);
/*
* On failure, logging & resetting the connection is handled. We just need
* to handle the control flow.
*/
BlockingWrite(sk, &sk->wp->greetRequest, sizeof(sk->wp->greetRequest), SS_HANDSHAKE_RECV);
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
}
static void
RecvAcceptorGreeting(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
char *mconf_toml;
/*
* If our reading doesn't immediately succeed, any necessary error
@@ -685,7 +700,10 @@ RecvAcceptorGreeting(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse))
return;
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, term=" INT64_FORMAT, sk->host, sk->port, sk->greetResponse.term);
mconf_toml = MembershipConfigurationToString(&sk->greetResponse.mconf);
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, node_id = %lu, mconf = %s, term=" UINT64_FORMAT,
sk->host, sk->port, sk->greetResponse.nodeId, mconf_toml, sk->greetResponse.term);
pfree(mconf_toml);
/* Protocol is all good, move to voting. */
sk->state = SS_VOTING;
@@ -707,12 +725,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
wp->propTerm++;
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
wp->voteRequest = (VoteRequest)
{
.tag = 'v',
.term = wp->propTerm
};
memcpy(wp->voteRequest.proposerId.data, wp->greetRequest.proposerId.data, UUID_LEN);
wp->voteRequest.pam.tag = 'v';
wp->voteRequest.generation = wp->mconf.generation;
wp->voteRequest.term = wp->propTerm;
}
}
else if (sk->greetResponse.term > wp->propTerm)
@@ -759,12 +774,14 @@ SendVoteRequest(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
/* We have quorum for voting, send our vote request */
wp_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term);
/* On failure, logging & resetting is handled */
if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT))
return;
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->voteRequest,
&sk->outbuf, wp->config->proto_version);
/* We have quorum for voting, send our vote request */
wp_log(LOG, "requesting vote from %s:%s for generation %u term " UINT64_FORMAT, sk->host, sk->port,
wp->voteRequest.generation, wp->voteRequest.term);
/* On failure, logging & resetting is handled */
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_WAIT_VERDICT);
/* If successful, wait for read-ready with SS_WAIT_VERDICT */
}
@@ -778,11 +795,12 @@ RecvVoteResponse(Safekeeper *sk)
return;
wp_log(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
"got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term,
sk->voteResponse.voteGiven,
GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn));
/*
* In case of acceptor rejecting our vote, bail out, but only if either it
@@ -847,9 +865,9 @@ HandleElectedProposer(WalProposer *wp)
* otherwise we must be sync-safekeepers and we have nothing to do then.
*
* Proceeding is not only pointless but harmful, because we'd give
* safekeepers term history starting with 0/0. These hacks will go away once
* we disable implicit timeline creation on safekeepers and create it with
* non zero LSN from the start.
* safekeepers term history starting with 0/0. These hacks will go away
* once we disable implicit timeline creation on safekeepers and create it
* with non zero LSN from the start.
*/
if (wp->propEpochStartLsn == InvalidXLogRecPtr)
{
@@ -942,7 +960,6 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->propEpochStartLsn = InvalidXLogRecPtr;
wp->donorEpoch = 0;
wp->truncateLsn = InvalidXLogRecPtr;
wp->timelineStartLsn = InvalidXLogRecPtr;
for (int i = 0; i < wp->n_safekeepers; i++)
{
@@ -959,20 +976,6 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->donor = i;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
if (wp->safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr)
{
/* timelineStartLsn should be the same everywhere or unknown */
if (wp->timelineStartLsn != InvalidXLogRecPtr &&
wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn)
{
wp_log(WARNING,
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
LSN_FORMAT_ARGS(wp->timelineStartLsn),
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
}
wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn;
}
}
}
@@ -995,22 +998,11 @@ DetermineEpochStartLsn(WalProposer *wp)
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
{
wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp);
if (wp->timelineStartLsn == InvalidXLogRecPtr)
{
wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp);
}
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
}
pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn);
/*
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so
* it should never be zero at this point, if we know timelineStartLsn.
*
* timelineStartLsn can be zero only on the first syncSafekeepers run.
*/
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
(wp->config->syncSafekeepers && wp->truncateLsn == wp->timelineStartLsn));
Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers);
/*
* We will be generating WAL since propEpochStartLsn, so we should set
@@ -1052,10 +1044,11 @@ DetermineEpochStartLsn(WalProposer *wp)
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
{
/*
* However, allow to proceed if last_log_term on the node which gave
* the highest vote (i.e. point where we are going to start writing)
* actually had been won by me; plain restart of walproposer not
* intervened by concurrent compute which wrote WAL is ok.
* However, allow to proceed if last_log_term on the node which
* gave the highest vote (i.e. point where we are going to start
* writing) actually had been won by me; plain restart of
* walproposer not intervened by concurrent compute which wrote
* WAL is ok.
*
* This avoids compute crash after manual term_bump.
*/
@@ -1125,14 +1118,8 @@ SendProposerElected(Safekeeper *sk)
{
/* safekeeper is empty or no common point, start from the beginning */
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u",
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
/*
* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline
* is created manually (test_s3_wal_replay)
*/
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, termHistory.n_entries=%u",
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), wp->propTermHistory.n_entries);
}
else
{
@@ -1157,29 +1144,19 @@ SendProposerElected(Safekeeper *sk)
Assert(sk->startStreamingAt <= wp->availableLsn);
msg.tag = 'e';
msg.apm.tag = 'e';
msg.generation = wp->mconf.generation;
msg.term = wp->propTerm;
msg.startStreamingAt = sk->startStreamingAt;
msg.termHistory = &wp->propTermHistory;
msg.timelineStartLsn = wp->timelineStartLsn;
lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0;
wp_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
resetStringInfo(&sk->outbuf);
pq_sendint64_le(&sk->outbuf, msg.tag);
pq_sendint64_le(&sk->outbuf, msg.term);
pq_sendint64_le(&sk->outbuf, msg.startStreamingAt);
pq_sendint32_le(&sk->outbuf, msg.termHistory->n_entries);
for (int i = 0; i < msg.termHistory->n_entries; i++)
{
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].term);
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].lsn);
}
pq_sendint64_le(&sk->outbuf, msg.timelineStartLsn);
"sending elected msg to node " UINT64_FORMAT " generation=%u term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s",
sk->greetResponse.nodeId, msg.generation, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt),
lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port);
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version);
if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH))
return;
@@ -1245,14 +1222,13 @@ static void
PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn)
{
Assert(endLsn >= beginLsn);
req->tag = 'a';
req->apm.tag = 'a';
req->generation = wp->mconf.generation;
req->term = wp->propTerm;
req->epochStartLsn = wp->propEpochStartLsn;
req->beginLsn = beginLsn;
req->endLsn = endLsn;
req->commitLsn = wp->commitLsn;
req->truncateLsn = wp->truncateLsn;
req->proposerId = wp->greetRequest.proposerId;
}
/*
@@ -1353,7 +1329,8 @@ SendAppendRequests(Safekeeper *sk)
resetStringInfo(&sk->outbuf);
/* write AppendRequest header */
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
PAMessageSerialize(wp, (ProposerAcceptorMessage *) req, &sk->outbuf, wp->config->proto_version);
/* prepare for reading WAL into the outbuf */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
sk->active_state = SS_ACTIVE_READ_WAL;
}
@@ -1366,14 +1343,17 @@ SendAppendRequests(Safekeeper *sk)
req = &sk->appendRequest;
req_len = req->endLsn - req->beginLsn;
/* We send zero sized AppenRequests as heartbeats; don't wal_read for these. */
/*
* We send zero sized AppenRequests as heartbeats; don't wal_read
* for these.
*/
if (req_len > 0)
{
switch (wp->api.wal_read(sk,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req_len,
&errmsg))
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req_len,
&errmsg))
{
case NEON_WALREAD_SUCCESS:
break;
@@ -1381,7 +1361,7 @@ SendAppendRequests(Safekeeper *sk)
return true;
case NEON_WALREAD_ERROR:
wp_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port, errmsg);
sk->host, sk->port, errmsg);
ShutdownConnection(sk);
return false;
default:
@@ -1469,11 +1449,11 @@ RecvAppendResponses(Safekeeper *sk)
* Term has changed to higher one, probably another compute is
* running. If this is the case we could PANIC as well because
* likely it inserted some data and our basebackup is unsuitable
* anymore. However, we also bump term manually (term_bump endpoint)
* on safekeepers for migration purposes, in this case we do want
* compute to stay alive. So restart walproposer with FATAL instead
* of panicking; if basebackup is spoiled next election will notice
* this.
* anymore. However, we also bump term manually (term_bump
* endpoint) on safekeepers for migration purposes, in this case
* we do want compute to stay alive. So restart walproposer with
* FATAL instead of panicking; if basebackup is spoiled next
* election will notice this.
*/
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
sk->host, sk->port,
@@ -1749,6 +1729,208 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
}
}
/* Serialize MembershipConfiguration into buf. */
static void
MembershipConfigurationSerialize(MembershipConfiguration *mconf, StringInfo buf)
{
uint32 i;
pq_sendint32(buf, mconf->generation);
pq_sendint32(buf, mconf->members.len);
for (i = 0; i < mconf->members.len; i++)
{
pq_sendint64(buf, mconf->members.m[i].node_id);
pq_send_ascii_string(buf, mconf->members.m[i].host);
pq_sendint16(buf, mconf->members.m[i].port);
}
/*
* There is no special mark for absent new_members; zero members in
* invalid, so zero len means absent.
*/
pq_sendint32(buf, mconf->new_members.len);
for (i = 0; i < mconf->new_members.len; i++)
{
pq_sendint64(buf, mconf->new_members.m[i].node_id);
pq_send_ascii_string(buf, mconf->new_members.m[i].host);
pq_sendint16(buf, mconf->new_members.m[i].port);
}
}
/* Serialize proposer -> acceptor message into buf using specified version */
static void
PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version)
{
/* both version are supported currently until we fully migrate to 3 */
Assert(proto_version == 3 || proto_version == 2);
resetStringInfo(buf);
if (proto_version == 3)
{
/*
* v2 sends structs for some messages as is, so commonly send tag only
* for v3
*/
pq_sendint8(buf, msg->tag);
switch (msg->tag)
{
case 'g':
{
ProposerGreeting *m = (ProposerGreeting *) msg;
pq_send_ascii_string(buf, m->tenant_id);
pq_send_ascii_string(buf, m->timeline_id);
MembershipConfigurationSerialize(&m->mconf, buf);
pq_sendint32(buf, m->pg_version);
pq_sendint64(buf, m->system_id);
pq_sendint32(buf, m->wal_seg_size);
break;
}
case 'v':
{
VoteRequest *m = (VoteRequest *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
break;
}
case 'e':
{
ProposerElected *m = (ProposerElected *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
pq_sendint64(buf, m->startStreamingAt);
pq_sendint32(buf, m->termHistory->n_entries);
for (uint32 i = 0; i < m->termHistory->n_entries; i++)
{
pq_sendint64(buf, m->termHistory->entries[i].term);
pq_sendint64(buf, m->termHistory->entries[i].lsn);
}
break;
}
case 'a':
{
/*
* Note: this serializes only AppendRequestHeader, caller
* is expected to append WAL data later.
*/
AppendRequestHeader *m = (AppendRequestHeader *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
pq_sendint64(buf, m->beginLsn);
pq_sendint64(buf, m->endLsn);
pq_sendint64(buf, m->commitLsn);
pq_sendint64(buf, m->truncateLsn);
break;
}
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
}
return;
}
if (proto_version == 2)
{
switch (msg->tag)
{
case 'g':
{
/* v2 sent struct as is */
ProposerGreeting *m = (ProposerGreeting *) msg;
ProposerGreetingV2 greetRequestV2;
/* Fill also v2 struct. */
greetRequestV2.tag = 'g';
greetRequestV2.protocolVersion = proto_version;
greetRequestV2.pgVersion = m->pg_version;
/*
* v3 removed this field because it's easier to pass as
* libq or START_WAL_PUSH options
*/
memset(&greetRequestV2.proposerId, 0, sizeof(greetRequestV2.proposerId));
greetRequestV2.systemId = wp->config->systemId;
if (*m->timeline_id != '\0' &&
!HexDecodeString(greetRequestV2.timeline_id, m->timeline_id, 16))
wp_log(FATAL, "could not parse neon.timeline_id, %s", m->timeline_id);
if (*m->tenant_id != '\0' &&
!HexDecodeString(greetRequestV2.tenant_id, m->tenant_id, 16))
wp_log(FATAL, "could not parse neon.tenant_id, %s", m->tenant_id);
greetRequestV2.timeline = wp->config->pgTimeline;
greetRequestV2.walSegSize = wp->config->wal_segment_size;
pq_sendbytes(buf, (char *) &greetRequestV2, sizeof(greetRequestV2));
break;
}
case 'v':
{
/* v2 sent struct as is */
VoteRequest *m = (VoteRequest *) msg;
VoteRequestV2 voteRequestV2;
voteRequestV2.tag = m->pam.tag;
voteRequestV2.term = m->term;
/* removed field */
memset(&voteRequestV2.proposerId, 0, sizeof(voteRequestV2.proposerId));
pq_sendbytes(buf, (char *) &voteRequestV2, sizeof(voteRequestV2));
break;
}
case 'e':
{
ProposerElected *m = (ProposerElected *) msg;
pq_sendint64_le(buf, m->apm.tag);
pq_sendint64_le(buf, m->term);
pq_sendint64_le(buf, m->startStreamingAt);
pq_sendint32_le(buf, m->termHistory->n_entries);
for (int i = 0; i < m->termHistory->n_entries; i++)
{
pq_sendint64_le(buf, m->termHistory->entries[i].term);
pq_sendint64_le(buf, m->termHistory->entries[i].lsn);
}
pq_sendint64_le(buf, 0); /* removed timeline_start_lsn */
break;
}
case 'a':
/*
* Note: this serializes only AppendRequestHeader, caller is
* expected to append WAL data later.
*/
{
/* v2 sent struct as is */
AppendRequestHeader *m = (AppendRequestHeader *) msg;
AppendRequestHeaderV2 appendRequestHeaderV2;
appendRequestHeaderV2.tag = m->apm.tag;
appendRequestHeaderV2.term = m->term;
appendRequestHeaderV2.epochStartLsn = 0; /* removed field */
appendRequestHeaderV2.beginLsn = m->beginLsn;
appendRequestHeaderV2.endLsn = m->endLsn;
appendRequestHeaderV2.commitLsn = m->commitLsn;
appendRequestHeaderV2.truncateLsn = m->truncateLsn;
/* removed field */
memset(&appendRequestHeaderV2.proposerId, 0, sizeof(appendRequestHeaderV2.proposerId));
pq_sendbytes(buf, (char *) &appendRequestHeaderV2, sizeof(appendRequestHeaderV2));
break;
}
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
}
return;
}
wp_log(FATAL, "unexpected proto_version %d", proto_version);
}
/*
* Try to read CopyData message from i'th safekeeper, resetting connection on
* failure.
@@ -1778,6 +1960,37 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
return false;
}
/* Deserialize membership configuration from buf to mconf. */
static void
MembershipConfigurationDeserialize(MembershipConfiguration *mconf, StringInfo buf)
{
uint32 i;
mconf->generation = pq_getmsgint32(buf);
mconf->members.len = pq_getmsgint32(buf);
mconf->members.m = palloc0(sizeof(SafekeeperId) * mconf->members.len);
for (i = 0; i < mconf->members.len; i++)
{
const char *buf_host;
mconf->members.m[i].node_id = pq_getmsgint64(buf);
buf_host = pq_getmsgrawstring(buf);
strlcpy(mconf->members.m[i].host, buf_host, sizeof(mconf->members.m[i].host));
mconf->members.m[i].port = pq_getmsgint16(buf);
}
mconf->new_members.len = pq_getmsgint32(buf);
mconf->new_members.m = palloc0(sizeof(SafekeeperId) * mconf->new_members.len);
for (i = 0; i < mconf->new_members.len; i++)
{
const char *buf_host;
mconf->new_members.m[i].node_id = pq_getmsgint64(buf);
buf_host = pq_getmsgrawstring(buf);
strlcpy(mconf->new_members.m[i].host, buf_host, sizeof(mconf->new_members.m[i].host));
mconf->new_members.m[i].port = pq_getmsgint16(buf);
}
}
/*
* Read next message with known type into provided struct, by reading a CopyData
* block from the safekeeper's postgres connection, returning whether the read
@@ -1786,6 +1999,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
* If the read needs more polling, we return 'false' and keep the state
* unmodified, waiting until it becomes read-ready to try again. If it fully
* failed, a warning is emitted and the connection is reset.
*
* Note: it pallocs if needed, i.e. for AcceptorGreeting and VoteResponse fields.
*/
static bool
AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
@@ -1794,82 +2009,153 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
char *buf;
int buf_size;
uint64 tag;
uint8 tag;
StringInfoData s;
if (!(AsyncRead(sk, &buf, &buf_size)))
return false;
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
/* parse it */
s.data = buf;
s.len = buf_size;
s.maxlen = buf_size;
s.cursor = 0;
tag = pq_getmsgint64_le(&s);
if (tag != anymsg->tag)
if (wp->config->proto_version == 3)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
switch (tag)
{
case 'g':
{
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->nodeId = pq_getmsgint64_le(&s);
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->voteGiven = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->truncateLsn = pq_getmsgint64_le(&s);
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (int i = 0; i < msg->termHistory.n_entries; i++)
tag = pq_getmsgbyte(&s);
if (tag != anymsg->tag)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
switch (tag)
{
case 'g':
{
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->nodeId = pq_getmsgint64(&s);
MembershipConfigurationDeserialize(&msg->mconf, &s);
msg->term = pq_getmsgint64(&s);
pq_getmsgend(&s);
return true;
}
msg->timelineStartLsn = pq_getmsgint64_le(&s);
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
case 'a':
{
AppendResponse *msg = (AppendResponse *) anymsg;
msg->generation = pq_getmsgint32(&s);
msg->term = pq_getmsgint64(&s);
msg->voteGiven = pq_getmsgbyte(&s);
msg->flushLsn = pq_getmsgint64(&s);
msg->truncateLsn = pq_getmsgint64(&s);
msg->termHistory.n_entries = pq_getmsgint32(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (uint32 i = 0; i < msg->termHistory.n_entries; i++)
{
msg->termHistory.entries[i].term = pq_getmsgint64(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64(&s);
}
pq_getmsgend(&s);
return true;
}
case 'a':
{
AppendResponse *msg = (AppendResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->commitLsn = pq_getmsgint64_le(&s);
msg->hs.ts = pq_getmsgint64_le(&s);
msg->hs.xmin.value = pq_getmsgint64_le(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}
default:
{
Assert(false);
return false;
}
msg->generation = pq_getmsgint32(&s);
msg->term = pq_getmsgint64(&s);
msg->flushLsn = pq_getmsgint64(&s);
msg->commitLsn = pq_getmsgint64(&s);
msg->hs.ts = pq_getmsgint64(&s);
msg->hs.xmin.value = pq_getmsgint64(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64(&s);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}
default:
{
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
return false;
}
}
}
else if (wp->config->proto_version == 2)
{
tag = pq_getmsgint64_le(&s);
if (tag != anymsg->tag)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
switch (tag)
{
case 'g':
{
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->nodeId = pq_getmsgint64_le(&s);
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->voteGiven = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->truncateLsn = pq_getmsgint64_le(&s);
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (int i = 0; i < msg->termHistory.n_entries; i++)
{
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
}
pq_getmsgint64_le(&s); /* timelineStartLsn */
pq_getmsgend(&s);
return true;
}
case 'a':
{
AppendResponse *msg = (AppendResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->commitLsn = pq_getmsgint64_le(&s);
msg->hs.ts = pq_getmsgint64_le(&s);
msg->hs.xmin.value = pq_getmsgint64_le(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}
default:
{
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
return false;
}
}
}
wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version);
}
/*
@@ -2245,3 +2531,45 @@ FormatEvents(WalProposer *wp, uint32 events)
return (char *) &return_str;
}
/* Dump mconf as toml for observability / debugging. Result is palloc'ed. */
static char *
MembershipConfigurationToString(MembershipConfiguration *mconf)
{
StringInfoData s;
uint32 i;
initStringInfo(&s);
appendStringInfo(&s, "{gen = %u", mconf->generation);
appendStringInfoString(&s, ", members = [");
for (i = 0; i < mconf->members.len; i++)
{
if (i > 0)
appendStringInfoString(&s, ", ");
appendStringInfo(&s, "{node_id = %lu", mconf->members.m[i].node_id);
appendStringInfo(&s, ", host = %s", mconf->members.m[i].host);
appendStringInfo(&s, ", port = %u }", mconf->members.m[i].port);
}
appendStringInfo(&s, "], new_members = [");
for (i = 0; i < mconf->new_members.len; i++)
{
if (i > 0)
appendStringInfoString(&s, ", ");
appendStringInfo(&s, "{node_id = %lu", mconf->new_members.m[i].node_id);
appendStringInfo(&s, ", host = %s", mconf->new_members.m[i].host);
appendStringInfo(&s, ", port = %u }", mconf->new_members.m[i].port);
}
appendStringInfoString(&s, "]}");
return s.data;
}
static void
MembershipConfigurationFree(MembershipConfiguration *mconf)
{
if (mconf->members.m)
pfree(mconf->members.m);
mconf->members.m = NULL;
if (mconf->new_members.m)
pfree(mconf->new_members.m);
mconf->new_members.m = NULL;
}

View File

@@ -12,9 +12,6 @@
#include "neon_walreader.h"
#include "pagestore_client.h"
#define SK_MAGIC 0xCafeCeefu
#define SK_PROTOCOL_VERSION 2
#define MAX_SAFEKEEPERS 32
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
* message */
@@ -143,12 +140,71 @@ typedef uint64 term_t;
/* neon storage node id */
typedef uint64 NNodeId;
/*
* Number uniquely identifying safekeeper membership configuration.
* This and following structs pair ones in membership.rs.
*/
typedef uint32 Generation;
typedef struct SafekeeperId
{
NNodeId node_id;
char host[MAXCONNINFO];
uint16 port;
} SafekeeperId;
/* Set of safekeepers. */
typedef struct MemberSet
{
uint32 len; /* number of members */
SafekeeperId *m; /* ids themselves */
} MemberSet;
/* Timeline safekeeper membership configuration. */
typedef struct MembershipConfiguration
{
Generation generation;
MemberSet members;
/* Has 0 n_members in non joint conf. */
MemberSet new_members;
} MembershipConfiguration;
/*
* Proposer <-> Acceptor messaging.
*/
typedef struct ProposerAcceptorMessage
{
uint8 tag;
} ProposerAcceptorMessage;
/* Initial Proposer -> Acceptor message */
typedef struct ProposerGreeting
{
ProposerAcceptorMessage pam; /* message tag */
/*
* tenant/timeline ids as C strings with standard hex notation for ease of
* printing. In principle they are not strictly needed as ttid is also
* passed as libpq options.
*/
char *tenant_id;
char *timeline_id;
/* Full conf is carried to allow safekeeper switch */
MembershipConfiguration mconf;
/*
* pg_version and wal_seg_size are used for timeline creation until we
* fully migrate to doing externally. systemId is only used as a sanity
* cross check.
*/
uint32 pg_version; /* in PG_VERSION_NUM format */
uint64 system_id; /* Postgres system identifier. */
uint32 wal_seg_size;
} ProposerGreeting;
/* protocol v2 variant, kept while wp supports it */
typedef struct ProposerGreetingV2
{
uint64 tag; /* message tag */
uint32 protocolVersion; /* proposer-safekeeper protocol version */
@@ -159,32 +215,42 @@ typedef struct ProposerGreeting
uint8 tenant_id[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
} ProposerGreetingV2;
typedef struct AcceptorProposerMessage
{
uint64 tag;
uint8 tag;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
* Acceptor -> Proposer initial response: the highest term acceptor voted for,
* its node id and configuration.
*/
typedef struct AcceptorGreeting
{
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
MembershipConfiguration mconf;
term_t term;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
*/
typedef struct VoteRequest
{
ProposerAcceptorMessage pam; /* message tag */
Generation generation; /* membership conf generation */
term_t term;
} VoteRequest;
/* protocol v2 variant, kept while wp supports it */
typedef struct VoteRequestV2
{
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
} VoteRequestV2;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
@@ -203,8 +269,15 @@ typedef struct TermHistory
typedef struct VoteResponse
{
AcceptorProposerMessage apm;
/*
* Membership conf generation. It's not strictly required because on
* mismatch safekeeper is expected to ERROR the connection, but let's
* sanity check it.
*/
Generation generation;
term_t term;
uint64 voteGiven;
uint8 voteGiven;
/*
* Safekeeper flush_lsn (end of WAL) + history of term switches allow
@@ -214,7 +287,6 @@ typedef struct VoteResponse
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for*
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
/*
@@ -223,20 +295,37 @@ typedef struct VoteResponse
*/
typedef struct ProposerElected
{
uint64 tag;
AcceptorProposerMessage apm;
Generation generation; /* membership conf generation */
term_t term;
/* proposer will send since this point */
XLogRecPtr startStreamingAt;
/* history of term switches up to this proposer */
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
*/
typedef struct AppendRequestHeader
{
AcceptorProposerMessage apm;
Generation generation; /* membership conf generation */
term_t term; /* term of the proposer */
XLogRecPtr beginLsn; /* start position of message in WAL */
XLogRecPtr endLsn; /* end position of message in WAL */
XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
/*
* minimal LSN which may be needed for recovery of some safekeeper (end
* lsn + 1 of last chunk streamed to everyone)
*/
XLogRecPtr truncateLsn;
/* in the AppendRequest message, WAL data follows */
} AppendRequestHeader;
/* protocol v2 variant, kept while wp supports it */
typedef struct AppendRequestHeaderV2
{
uint64 tag;
term_t term; /* term of the proposer */
@@ -256,7 +345,8 @@ typedef struct AppendRequestHeader
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
/* in the AppendRequest message, WAL data follows */
} AppendRequestHeaderV2;
/*
* Hot standby feedback received from replica
@@ -309,6 +399,13 @@ typedef struct AppendResponse
{
AcceptorProposerMessage apm;
/*
* Membership conf generation. It's not strictly required because on
* mismatch safekeeper is expected to ERROR the connection, but let's
* sanity check it.
*/
Generation generation;
/*
* Current term of the safekeeper; if it is higher than proposer's, the
* compute is out of date.
@@ -644,6 +741,8 @@ typedef struct WalProposerConfig
/* Will be passed to safekeepers in greet request. */
TimeLineID pgTimeline;
int proto_version;
#ifdef WALPROPOSER_LIB
void *callback_data;
#endif
@@ -656,11 +755,14 @@ typedef struct WalProposerConfig
typedef struct WalProposer
{
WalProposerConfig *config;
int n_safekeepers;
/* Current walproposer membership configuration */
MembershipConfiguration mconf;
/* (n_safekeepers / 2) + 1 */
int quorum;
/* Number of occupied slots in safekeepers[] */
int n_safekeepers;
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */
@@ -670,6 +772,7 @@ typedef struct WalProposer
XLogRecPtr commitLsn;
ProposerGreeting greetRequest;
ProposerGreetingV2 greetRequestV2;
/* Vote request for safekeeper */
VoteRequest voteRequest;

View File

@@ -155,6 +155,16 @@ pq_getmsgend(StringInfo msg)
ExceptionalCondition("invalid msg format", __FILE__, __LINE__);
}
/* --------------------------------
* pq_sendbytes - append raw data to a StringInfo buffer
* --------------------------------
*/
void
pq_sendbytes(StringInfo buf, const void *data, int datalen)
{
/* use variant that maintains a trailing null-byte, out of caution */
appendBinaryStringInfo(buf, data, datalen);
}
/*
* Produce a C-string representation of a TimestampTz.

View File

@@ -59,9 +59,11 @@
#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot"
/* GUCs */
char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
int safekeeper_proto_version = 2;
/* Set to true in the walproposer bgw. */
static bool am_walproposer;
@@ -126,6 +128,7 @@ init_walprop_config(bool syncSafekeepers)
else
walprop_config.systemId = 0;
walprop_config.pgTimeline = walprop_pg_get_timeline_id();
walprop_config.proto_version = safekeeper_proto_version;
}
/*
@@ -219,25 +222,37 @@ nwp_register_gucs(void)
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable(
"neon.safekeeper_proto_version",
"Version of compute <-> safekeeper protocol.",
"Used while migrating from 2 to 3.",
&safekeeper_proto_version,
2, 0, INT_MAX,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
}
static int
split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
{
int n_safekeepers = 0;
char *curr_sk = safekeepers_list;
int n_safekeepers = 0;
char *curr_sk = safekeepers_list;
for (char *coma = safekeepers_list; coma != NULL && *coma != '\0'; curr_sk = coma)
{
if (++n_safekeepers >= MAX_SAFEKEEPERS) {
if (++n_safekeepers >= MAX_SAFEKEEPERS)
{
wpg_log(FATAL, "too many safekeepers");
}
coma = strchr(coma, ',');
safekeepers[n_safekeepers-1] = curr_sk;
safekeepers[n_safekeepers - 1] = curr_sk;
if (coma != NULL) {
if (coma != NULL)
{
*coma++ = '\0';
}
}
@@ -252,10 +267,10 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
static bool
safekeepers_cmp(char *old, char *new)
{
char *safekeepers_old[MAX_SAFEKEEPERS];
char *safekeepers_new[MAX_SAFEKEEPERS];
int len_old = 0;
int len_new = 0;
char *safekeepers_old[MAX_SAFEKEEPERS];
char *safekeepers_new[MAX_SAFEKEEPERS];
int len_old = 0;
int len_new = 0;
len_old = split_safekeepers_list(old, safekeepers_old);
len_new = split_safekeepers_list(new, safekeepers_new);
@@ -292,7 +307,8 @@ assign_neon_safekeepers(const char *newval, void *extra)
if (!am_walproposer)
return;
if (!newval) {
if (!newval)
{
/* should never happen */
wpg_log(FATAL, "neon.safekeepers is empty");
}
@@ -301,11 +317,11 @@ assign_neon_safekeepers(const char *newval, void *extra)
newval_copy = pstrdup(newval);
oldval = pstrdup(wal_acceptors_list);
/*
/*
* TODO: restarting through FATAL is stupid and introduces 1s delay before
* next bgw start. We should refactor walproposer to allow graceful exit and
* thus remove this delay.
* XXX: If you change anything here, sync with test_safekeepers_reconfigure_reorder.
* next bgw start. We should refactor walproposer to allow graceful exit
* and thus remove this delay. XXX: If you change anything here, sync with
* test_safekeepers_reconfigure_reorder.
*/
if (!safekeepers_cmp(oldval, newval_copy))
{
@@ -454,7 +470,8 @@ backpressure_throttling_impl(void)
memcpy(new_status, old_status, len);
snprintf(new_status + len, 64, "backpressure throttling: lag %lu", lag);
set_ps_display(new_status);
new_status[len] = '\0'; /* truncate off " backpressure ..." to later reset the ps */
new_status[len] = '\0'; /* truncate off " backpressure ..." to later
* reset the ps */
elog(DEBUG2, "backpressure throttling: lag %lu", lag);
start = GetCurrentTimestamp();
@@ -621,7 +638,7 @@ walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos)
wpg_log(LOG, "WAL proposer starts streaming at %X/%X",
LSN_FORMAT_ARGS(startpos));
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
cmd.timeline = wp->greetRequest.timeline;
cmd.timeline = wp->config->pgTimeline;
cmd.startpoint = startpos;
StartProposerReplication(wp, &cmd);
}
@@ -1963,10 +1980,11 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
FullTransactionId xmin = hsFeedback.xmin;
FullTransactionId catalog_xmin = hsFeedback.catalog_xmin;
FullTransactionId next_xid = ReadNextFullTransactionId();
/*
* Page server is updating nextXid in checkpoint each 1024 transactions,
* so feedback xmin can be actually larger then nextXid and
* function TransactionIdInRecentPast return false in this case,
* Page server is updating nextXid in checkpoint each 1024
* transactions, so feedback xmin can be actually larger then nextXid
* and function TransactionIdInRecentPast return false in this case,
* preventing update of slot's xmin.
*/
if (FullTransactionIdPrecedes(next_xid, xmin))

View File

@@ -52,16 +52,70 @@ pub struct SafekeeperPostgresHandler {
/// Parsed Postgres command.
enum SafekeeperPostgresCommand {
StartWalPush,
StartReplication { start_lsn: Lsn, term: Option<Term> },
StartWalPush {
proto_version: u32,
// Eventually timelines will be always created explicitly by storcon.
// This option allows legacy behaviour for compute to do that until we
// fully migrate.
allow_timeline_creation: bool,
},
StartReplication {
start_lsn: Lsn,
term: Option<Term>,
},
IdentifySystem,
TimelineStatus,
JSONCtrl { cmd: AppendLogicalMessage },
JSONCtrl {
cmd: AppendLogicalMessage,
},
}
fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
if cmd.starts_with("START_WAL_PUSH") {
Ok(SafekeeperPostgresCommand::StartWalPush)
// Allow additional options in postgres START_REPLICATION style like
// START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false').
// Parsing here is very naive and breaks in case of commas or
// whitespaces in values, but enough for our purposes.
let re = Regex::new(r"START_WAL_PUSH(\s+?\((.*)\))?").unwrap();
let caps = re
.captures(cmd)
.context(format!("failed to parse START_WAL_PUSH command {}", cmd))?;
// capture () content
let options = caps.get(2).map(|m| m.as_str()).unwrap_or("");
// default values
let mut proto_version = 2;
let mut allow_timeline_creation = true;
for kvstr in options.split(",") {
if kvstr.is_empty() {
continue;
}
let mut kvit = kvstr.split_whitespace();
let key = kvit.next().context(format!(
"failed to parse key in kv {} in command {}",
kvstr, cmd
))?;
let value = kvit.next().context(format!(
"failed to parse value in kv {} in command {}",
kvstr, cmd
))?;
let value_trimmed = value.trim_matches('\'');
if key == "proto_version" {
proto_version = value_trimmed.parse::<u32>().context(format!(
"failed to parse proto_version value {} in command {}",
value, cmd
))?;
}
if key == "allow_timeline_creation" {
allow_timeline_creation = value_trimmed.parse::<bool>().context(format!(
"failed to parse allow_timeline_creation value {} in command {}",
value, cmd
))?;
}
}
Ok(SafekeeperPostgresCommand::StartWalPush {
proto_version,
allow_timeline_creation,
})
} else if cmd.starts_with("START_REPLICATION") {
let re = Regex::new(
// We follow postgres START_REPLICATION LOGICAL options to pass term.
@@ -95,7 +149,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
match cmd {
SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
SafekeeperPostgresCommand::StartWalPush { .. } => "START_WAL_PUSH",
SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS",
SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
@@ -293,8 +347,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
SafekeeperPostgresCommand::StartWalPush {
proto_version,
allow_timeline_creation,
} => {
self.handle_start_wal_push(pgb, proto_version, allow_timeline_creation)
.instrument(info_span!("WAL receiver"))
.await
}
@@ -467,3 +524,39 @@ impl SafekeeperPostgresHandler {
}
}
}
#[cfg(test)]
mod tests {
use super::SafekeeperPostgresCommand;
/// Test parsing of START_WAL_PUSH command
#[test]
fn test_start_wal_push_parse() {
let cmd = "START_WAL_PUSH";
let parsed = super::parse_cmd(cmd).expect("failed to parse");
match parsed {
SafekeeperPostgresCommand::StartWalPush {
proto_version,
allow_timeline_creation,
} => {
assert_eq!(proto_version, 2);
assert!(allow_timeline_creation);
}
_ => panic!("unexpected command"),
}
let cmd =
"START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false', unknown 'hoho')";
let parsed = super::parse_cmd(cmd).expect("failed to parse");
match parsed {
SafekeeperPostgresCommand::StartWalPush {
proto_version,
allow_timeline_creation,
} => {
assert_eq!(proto_version, 3);
assert!(!allow_timeline_creation);
}
_ => panic!("unexpected command"),
}
}
}

View File

@@ -8,7 +8,7 @@
use anyhow::Context;
use postgres_backend::QueryError;
use safekeeper_api::membership::Configuration;
use safekeeper_api::membership::{Configuration, INVALID_GENERATION};
use safekeeper_api::{ServerInfo, Term};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -133,10 +133,10 @@ async fn send_proposer_elected(
let history = TermHistory(history_entries);
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
generation: INVALID_GENERATION,
term,
start_streaming_at: lsn,
term_history: history,
timeline_start_lsn: lsn,
});
tli.process_msg(&proposer_elected_request).await?;
@@ -170,13 +170,12 @@ pub async fn append_logical_message(
let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
h: AppendRequestHeader {
generation: INVALID_GENERATION,
term: msg.term,
term_start_lsn: begin_lsn,
begin_lsn,
end_lsn,
commit_lsn,
truncate_lsn: msg.truncate_lsn,
proposer_uuid: [0u8; 16],
},
wal_data,
});

View File

@@ -200,9 +200,14 @@ impl SafekeeperPostgresHandler {
pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
proto_version: u32,
allow_timeline_creation: bool,
) -> Result<(), QueryError> {
let mut tli: Option<WalResidentTimeline> = None;
if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await {
if let Err(end) = self
.handle_start_wal_push_guts(pgb, &mut tli, proto_version, allow_timeline_creation)
.await
{
// Log the result and probably send it to the client, closing the stream.
let handle_end_fut = pgb.handle_copy_stream_end(end);
// If we managed to create the timeline, augment logging with current LSNs etc.
@@ -222,6 +227,8 @@ impl SafekeeperPostgresHandler {
&mut self,
pgb: &mut PostgresBackend<IO>,
tli: &mut Option<WalResidentTimeline>,
proto_version: u32,
allow_timeline_creation: bool,
) -> Result<(), CopyStreamHandlerEnd> {
// The `tli` parameter is only used for passing _out_ a timeline, one should
// not have been passed in.
@@ -250,12 +257,17 @@ impl SafekeeperPostgresHandler {
conn_id: self.conn_id,
pgb_reader: &mut pgb_reader,
peer_addr,
proto_version,
acceptor_handle: &mut acceptor_handle,
global_timelines: self.global_timelines.clone(),
};
// Read first message and create timeline if needed.
let res = network_reader.read_first_message().await;
// Read first message and create timeline if needed and allowed. This
// won't be when timelines will be always created by storcon and
// allow_timeline_creation becomes false.
let res = network_reader
.read_first_message(allow_timeline_creation)
.await;
let network_res = if let Ok((timeline, next_msg)) = res {
let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
@@ -269,7 +281,7 @@ impl SafekeeperPostgresHandler {
tokio::select! {
// todo: add read|write .context to these errors
r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
r = network_write(pgb, reply_rx, pageserver_feedback_rx, proto_version) => r,
_ = timeline_cancel.cancelled() => {
return Err(CopyStreamHandlerEnd::Cancelled);
}
@@ -313,6 +325,7 @@ struct NetworkReader<'a, IO> {
conn_id: ConnectionId,
pgb_reader: &'a mut PostgresBackendReader<IO>,
peer_addr: SocketAddr,
proto_version: u32,
// WalAcceptor is spawned when we learn server info from walproposer and
// create timeline; handle is put here.
acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
@@ -322,31 +335,37 @@ struct NetworkReader<'a, IO> {
impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
async fn read_first_message(
&mut self,
allow_timeline_creation: bool,
) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
// Receive information about server to create timeline, if not yet.
let next_msg = read_message(self.pgb_reader).await?;
let next_msg = read_message(self.pgb_reader, self.proto_version).await?;
let tli = match next_msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with walproposer {} sysid {} timeline {}",
self.peer_addr, greeting.system_id, greeting.tli,
"start handshake with walproposer {} sysid {}",
self.peer_addr, greeting.system_id,
);
let server_info = ServerInfo {
pg_version: greeting.pg_version,
system_id: greeting.system_id,
wal_seg_size: greeting.wal_seg_size,
};
let tli = self
.global_timelines
.create(
self.ttid,
Configuration::empty(),
server_info,
Lsn::INVALID,
Lsn::INVALID,
)
.await
.context("create timeline")?;
let tli = if allow_timeline_creation {
self.global_timelines
.create(
self.ttid,
Configuration::empty(),
server_info,
Lsn::INVALID,
Lsn::INVALID,
)
.await
.context("create timeline")?
} else {
self.global_timelines
.get(self.ttid)
.context("get timeline")?
};
tli.wal_residence_guard().await?
}
_ => {
@@ -375,7 +394,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
));
// Forward all messages to WalAcceptor
read_network_loop(self.pgb_reader, msg_tx, next_msg).await
read_network_loop(self.pgb_reader, msg_tx, next_msg, self.proto_version).await
}
}
@@ -383,9 +402,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
/// TODO: Return Ok(None) on graceful termination.
async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_reader: &mut PostgresBackendReader<IO>,
proto_version: u32,
) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
let copy_data = pgb_reader.read_copy_message().await?;
let msg = ProposerAcceptorMessage::parse(copy_data)?;
let msg = ProposerAcceptorMessage::parse(copy_data, proto_version)?;
Ok(msg)
}
@@ -393,6 +413,7 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_reader: &mut PostgresBackendReader<IO>,
msg_tx: Sender<ProposerAcceptorMessage>,
mut next_msg: ProposerAcceptorMessage,
proto_version: u32,
) -> Result<(), CopyStreamHandlerEnd> {
/// Threshold for logging slow WalAcceptor sends.
const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
@@ -425,7 +446,7 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);
next_msg = read_message(pgb_reader).await?;
next_msg = read_message(pgb_reader, proto_version).await?;
}
}
@@ -438,6 +459,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_writer: &mut PostgresBackend<IO>,
mut reply_rx: Receiver<AcceptorProposerMessage>,
mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
proto_version: u32,
) -> Result<(), CopyStreamHandlerEnd> {
let mut buf = BytesMut::with_capacity(128);
@@ -475,7 +497,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
};
buf.clear();
msg.serialize(&mut buf)?;
msg.serialize(&mut buf, proto_version)?;
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
}
}

View File

@@ -7,6 +7,7 @@ use std::{fmt, pin::pin};
use anyhow::{bail, Context};
use futures::StreamExt;
use postgres_protocol::message::backend::ReplicationMessage;
use safekeeper_api::membership::INVALID_GENERATION;
use safekeeper_api::models::{PeerInfo, TimelineStatus};
use safekeeper_api::Term;
use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -267,7 +268,10 @@ async fn recover(
);
// Now understand our term history.
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
generation: INVALID_GENERATION,
term: donor.term,
});
let vote_response = match tli
.process_msg(&vote_request)
.await
@@ -302,10 +306,10 @@ async fn recover(
// truncate WAL locally
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
generation: INVALID_GENERATION,
term: donor.term,
start_streaming_at: last_common_point.lsn,
term_history: donor_th,
timeline_start_lsn: Lsn::INVALID,
});
// Successful ProposerElected handling always returns None. If term changed,
// we'll find out that during the streaming. Note: it is expected to get
@@ -434,13 +438,12 @@ async fn network_io(
match msg {
ReplicationMessage::XLogData(xlog_data) => {
let ar_hdr = AppendRequestHeader {
generation: INVALID_GENERATION,
term: donor.term,
term_start_lsn: Lsn::INVALID, // unused
begin_lsn: Lsn(xlog_data.wal_start()),
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
truncate_lsn: Lsn::INVALID, // do not attempt to advance
proposer_uuid: [0; 16],
};
let ar = AppendRequest {
h: ar_hdr,

View File

@@ -5,6 +5,11 @@ use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
use safekeeper_api::membership;
use safekeeper_api::membership::Generation;
use safekeeper_api::membership::MemberSet;
use safekeeper_api::membership::SafekeeperId;
use safekeeper_api::membership::INVALID_GENERATION;
use safekeeper_api::models::HotStandbyFeedback;
use safekeeper_api::Term;
use serde::{Deserialize, Serialize};
@@ -12,6 +17,7 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use std::str::FromStr;
use storage_broker::proto::SafekeeperTimelineInfo;
use tracing::*;
@@ -29,7 +35,8 @@ use utils::{
lsn::Lsn,
};
const SK_PROTOCOL_VERSION: u32 = 2;
pub const SK_PROTO_VERSION_2: u32 = 2;
pub const SK_PROTO_VERSION_3: u32 = 3;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
@@ -56,8 +63,28 @@ impl TermHistory {
TermHistory(Vec::new())
}
// Parse TermHistory as n_entries followed by TermLsn pairs
// Parse TermHistory as n_entries followed by TermLsn pairs in network order.
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
let n_entries = bytes
.get_u32_f()
.with_context(|| "TermHistory misses len")?;
let mut res = Vec::with_capacity(n_entries as usize);
for i in 0..n_entries {
let term = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses term", i))?;
let lsn = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses lsn", i))?
.into();
res.push(TermLsn { term, lsn })
}
Ok(TermHistory(res))
}
// Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
// TODO remove once v2 protocol is fully dropped.
pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
if bytes.remaining() < 4 {
bail!("TermHistory misses len");
}
@@ -197,6 +224,18 @@ impl AcceptorState {
/// Initial Proposer -> Acceptor message
#[derive(Debug, Deserialize)]
pub struct ProposerGreeting {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mconf: membership::Configuration,
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
pub wal_seg_size: u32,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Deserialize)]
pub struct ProposerGreetingV2 {
/// proposer-acceptor protocol version
pub protocol_version: u32,
/// Postgres server version
@@ -213,27 +252,35 @@ pub struct ProposerGreeting {
/// (acceptor voted for).
#[derive(Debug, Serialize)]
pub struct AcceptorGreeting {
term: u64,
node_id: NodeId,
mconf: membership::Configuration,
term: u64,
}
/// Vote request sent from proposer to safekeepers
#[derive(Debug, Deserialize)]
#[derive(Debug)]
pub struct VoteRequest {
pub generation: Generation,
pub term: Term,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Deserialize)]
pub struct VoteRequestV2 {
pub term: Term,
}
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
pub struct VoteResponse {
generation: Generation, // membership conf generation
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
vote_given: bool,
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
// proposer to choose the most advanced one.
pub flush_lsn: Lsn,
truncate_lsn: Lsn,
pub term_history: TermHistory,
timeline_start_lsn: Lsn,
}
/*
@@ -242,10 +289,10 @@ pub struct VoteResponse {
*/
#[derive(Debug)]
pub struct ProposerElected {
pub generation: Generation, // membership conf generation
pub term: Term,
pub start_streaming_at: Lsn,
pub term_history: TermHistory,
pub timeline_start_lsn: Lsn,
}
/// Request with WAL message sent from proposer to safekeeper. Along the way it
@@ -257,6 +304,22 @@ pub struct AppendRequest {
}
#[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeader {
pub generation: Generation, // membership conf generation
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
/// start position of message in WAL
pub begin_lsn: Lsn,
/// end position of message in WAL
pub end_lsn: Lsn,
/// LSN committed by quorum of safekeepers
pub commit_lsn: Lsn,
/// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
pub truncate_lsn: Lsn,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeaderV2 {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
// TODO: remove this field from the protocol, it in unused -- LSN of term
@@ -277,6 +340,9 @@ pub struct AppendRequestHeader {
/// Report safekeeper state to proposer
#[derive(Debug, Serialize, Clone)]
pub struct AppendResponse {
// Membership conf generation. Not strictly required because on mismatch
// connection is reset, but let's sanity check it.
generation: Generation,
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.
pub term: Term,
@@ -293,8 +359,9 @@ pub struct AppendResponse {
}
impl AppendResponse {
fn term_only(term: Term) -> AppendResponse {
fn term_only(generation: Generation, term: Term) -> AppendResponse {
AppendResponse {
generation,
term,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
@@ -315,65 +382,317 @@ pub enum ProposerAcceptorMessage {
FlushWAL,
}
/// Augment Bytes with fallible get_uN where N is number of bytes methods.
/// All reads are in network (big endian) order.
trait BytesF {
fn get_u8_f(&mut self) -> Result<u8>;
fn get_u16_f(&mut self) -> Result<u16>;
fn get_u32_f(&mut self) -> Result<u32>;
fn get_u64_f(&mut self) -> Result<u64>;
}
impl BytesF for Bytes {
fn get_u8_f(&mut self) -> Result<u8> {
if self.is_empty() {
bail!("no bytes left, expected 1");
}
Ok(self.get_u8())
}
fn get_u16_f(&mut self) -> Result<u16> {
if self.is_empty() {
bail!("no bytes left, expected 2");
}
Ok(self.get_u16())
}
fn get_u32_f(&mut self) -> Result<u32> {
if self.remaining() < 4 {
bail!("only {} bytes left, expected 4", self.remaining());
}
Ok(self.get_u32())
}
fn get_u64_f(&mut self) -> Result<u64> {
if self.remaining() < 8 {
bail!("only {} bytes left, expected 8", self.remaining());
}
Ok(self.get_u64())
}
}
impl ProposerAcceptorMessage {
/// Parse proposer message.
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
'g' => {
let msg = ProposerGreeting::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::Greeting(msg))
}
'v' => {
let msg = VoteRequest::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::VoteRequest(msg))
}
'e' => {
let mut msg_bytes = stream.into_inner();
if msg_bytes.remaining() < 16 {
bail!("ProposerElected message is not complete");
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
if msg_bytes.remaining() < 8 {
bail!("ProposerElected message is not complete");
}
let timeline_start_lsn = msg_bytes.get_u64_le().into();
let msg = ProposerElected {
term,
start_streaming_at,
timeline_start_lsn,
term_history,
/// Read cstring from Bytes.
fn get_cstr(buf: &mut Bytes) -> Result<String> {
let pos = buf
.iter()
.position(|x| *x == 0)
.ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
let result = buf.split_to(pos);
buf.advance(1); // drop the null terminator
match std::str::from_utf8(&result) {
Ok(s) => Ok(s.to_string()),
Err(e) => bail!("invalid utf8 in cstring: {}", e),
}
}
/// Read membership::Configuration from Bytes.
fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
let generation = buf.get_u32_f().with_context(|| "reading generation")?;
let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
// Main member set must have at least someone in valid configuration.
// Empty conf is allowed until we fully migrate.
if generation != INVALID_GENERATION && members_len == 0 {
bail!("empty members_len");
}
let mut members = MemberSet::empty();
for i in 0..members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading member {} node_id", i))?;
let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading member {} port", i))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
pg_port,
};
members.add(sk)?;
}
let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
// Non joint conf.
if new_members_len == 0 {
Ok(membership::Configuration {
generation,
members,
new_members: None,
})
} else {
let mut new_members = MemberSet::empty();
for i in 0..new_members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading new member {} node_id", i))?;
let host = Self::get_cstr(buf)
.with_context(|| format!("reading new member {} host", i))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading new member {} port", i))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
pg_port,
};
Ok(ProposerAcceptorMessage::Elected(msg))
new_members.add(sk)?;
}
'a' => {
// read header followed by wal data
let hdr = AppendRequestHeader::des_from(&mut stream)?;
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.context("begin_lsn > end_lsn in AppendRequest")?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
"AppendRequest is longer than MAX_SEND_SIZE ({})",
MAX_SEND_SIZE
);
Ok(membership::Configuration {
generation,
members,
new_members: Some(new_members),
})
}
}
/// Parse proposer message.
pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
if proto_version == SK_PROTO_VERSION_3 {
if msg_bytes.is_empty() {
bail!("ProposerAcceptorMessage is not complete: missing tag");
}
let tag = msg_bytes.get_u8_f().with_context(|| {
"ProposerAcceptorMessage is not complete: missing tag".to_string()
})? as char;
match tag {
'g' => {
let tenant_id_str =
Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
let tenant_id = TenantId::from_str(&tenant_id_str)?;
let timeline_id_str =
Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
let timeline_id = TimelineId::from_str(&timeline_id_str)?;
let mconf = Self::get_mconf(&mut msg_bytes)?;
let pg_version = msg_bytes
.get_u32_f()
.with_context(|| "reading pg_version")?;
let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
let wal_seg_size = msg_bytes
.get_u32_f()
.with_context(|| "reading wal_seg_size")?;
let g = ProposerGreeting {
tenant_id,
timeline_id,
mconf,
pg_version,
system_id,
wal_seg_size,
};
Ok(ProposerAcceptorMessage::Greeting(g))
}
'v' => {
let generation = msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?;
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let v = VoteRequest { generation, term };
Ok(ProposerAcceptorMessage::VoteRequest(v))
}
'e' => {
let generation = msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?;
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let start_streaming_at: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading start_streaming_at")?
.into();
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
let msg = ProposerElected {
generation,
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
'a' => {
let generation = msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?;
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let begin_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading begin_lsn")?
.into();
let end_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading end_lsn")?
.into();
let commit_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading commit_lsn")?
.into();
let truncate_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading truncate_lsn")?
.into();
let hdr = AppendRequestHeader {
generation,
term,
begin_lsn,
end_lsn,
commit_lsn,
truncate_lsn,
};
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.context("begin_lsn > end_lsn in AppendRequest")?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
"AppendRequest is longer than MAX_SEND_SIZE ({})",
MAX_SEND_SIZE
);
}
if msg_bytes.remaining() < rec_size {
bail!(
"reading WAL: only {} bytes left, wanted {}",
msg_bytes.remaining(),
rec_size
);
}
let wal_data = msg_bytes.copy_to_bytes(rec_size);
let msg = AppendRequest { h: hdr, wal_data };
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
stream.read_exact(&mut wal_data_vec)?;
let wal_data = Bytes::from(wal_data_vec);
let msg = AppendRequest { h: hdr, wal_data };
Ok(ProposerAcceptorMessage::AppendRequest(msg))
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
// TODO remove proto_version == 3 after converting all msgs
} else if proto_version == SK_PROTO_VERSION_2 || proto_version == SK_PROTO_VERSION_3 {
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
'g' => {
let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
let g = ProposerGreeting {
tenant_id: msgv2.tenant_id,
timeline_id: msgv2.timeline_id,
mconf: membership::Configuration {
generation: INVALID_GENERATION,
members: MemberSet::empty(),
new_members: None,
},
pg_version: msgv2.pg_version,
system_id: msgv2.system_id,
wal_seg_size: msgv2.wal_seg_size,
};
Ok(ProposerAcceptorMessage::Greeting(g))
}
'v' => {
let msg = VoteRequestV2::des_from(&mut stream)?;
let v = VoteRequest {
generation: INVALID_GENERATION,
term: msg.term,
};
Ok(ProposerAcceptorMessage::VoteRequest(v))
}
'e' => {
let mut msg_bytes = stream.into_inner();
if msg_bytes.remaining() < 16 {
bail!("ProposerElected message is not complete");
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
if msg_bytes.remaining() < 8 {
bail!("ProposerElected message is not complete");
}
let _timeline_start_lsn = msg_bytes.get_u64_le();
let msg = ProposerElected {
generation: INVALID_GENERATION,
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
'a' => {
// read header followed by wal data
let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
let hdr = AppendRequestHeader {
generation: INVALID_GENERATION,
term: hdrv2.term,
begin_lsn: hdrv2.begin_lsn,
end_lsn: hdrv2.end_lsn,
commit_lsn: hdrv2.commit_lsn,
truncate_lsn: hdrv2.truncate_lsn,
};
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.context("begin_lsn > end_lsn in AppendRequest")?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
"AppendRequest is longer than MAX_SEND_SIZE ({})",
MAX_SEND_SIZE
);
}
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
stream.read_exact(&mut wal_data_vec)?;
let wal_data = Bytes::from(wal_data_vec);
let msg = AppendRequest { h: hdr, wal_data };
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
} else {
bail!("unsupported protocol version {}", proto_version);
}
}
@@ -387,36 +706,21 @@ impl ProposerAcceptorMessage {
// We explicitly list all fields, to draw attention here when new fields are added.
let mut size = BASE_SIZE;
size += match self {
Self::Greeting(ProposerGreeting {
protocol_version: _,
pg_version: _,
proposer_id: _,
system_id: _,
timeline_id: _,
tenant_id: _,
tli: _,
wal_seg_size: _,
}) => 0,
Self::Greeting(_) => 0,
Self::VoteRequest(VoteRequest { term: _ }) => 0,
Self::VoteRequest(_) => 0,
Self::Elected(ProposerElected {
term: _,
start_streaming_at: _,
term_history: _,
timeline_start_lsn: _,
}) => 0,
Self::Elected(_) => 0,
Self::AppendRequest(AppendRequest {
h:
AppendRequestHeader {
generation: _,
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),
@@ -424,13 +728,12 @@ impl ProposerAcceptorMessage {
Self::NoFlushAppendRequest(AppendRequest {
h:
AppendRequestHeader {
generation: _,
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),
@@ -451,45 +754,118 @@ pub enum AcceptorProposerMessage {
}
impl AcceptorProposerMessage {
/// Serialize acceptor -> proposer message.
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u64_le('g' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.node_id.0);
}
AcceptorProposerMessage::VoteResponse(msg) => {
buf.put_u64_le('v' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
buf.put_u64_le(msg.timeline_start_lsn.into());
}
AcceptorProposerMessage::AppendResponse(msg) => {
buf.put_u64_le('a' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.commit_lsn.into());
buf.put_i64_le(msg.hs_feedback.ts);
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
fn put_cstr(buf: &mut BytesMut, s: &str) {
buf.put_slice(s.as_bytes());
buf.put_u8(0); // null terminator
}
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
/// Serialize membership::Configuration into buf.
fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
buf.put_u32(mconf.generation);
buf.put_u32(mconf.members.m.len() as u32);
for sk in &mconf.members.m {
buf.put_u64(sk.id.0);
Self::put_cstr(buf, &sk.host);
buf.put_u16(sk.pg_port);
}
if let Some(ref new_members) = mconf.new_members {
buf.put_u32(new_members.m.len() as u32);
for sk in &new_members.m {
buf.put_u64(sk.id.0);
Self::put_cstr(buf, &sk.host);
buf.put_u16(sk.pg_port);
}
} else {
buf.put_u32(0);
}
}
Ok(())
/// Serialize acceptor -> proposer message.
pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
if proto_version == SK_PROTO_VERSION_3 {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u8('g' as u8);
buf.put_u64(msg.node_id.0);
Self::serialize_mconf(buf, &msg.mconf);
buf.put_u64(msg.term)
}
AcceptorProposerMessage::VoteResponse(msg) => {
buf.put_u8('v' as u8);
buf.put_u32(msg.generation);
buf.put_u64(msg.term);
buf.put_u8(msg.vote_given as u8);
buf.put_u64(msg.flush_lsn.into());
buf.put_u64(msg.truncate_lsn.into());
buf.put_u32(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64(e.term);
buf.put_u64(e.lsn.into());
}
}
AcceptorProposerMessage::AppendResponse(msg) => {
buf.put_u8('a' as u8);
buf.put_u32(msg.generation);
buf.put_u64(msg.term);
buf.put_u64(msg.flush_lsn.into());
buf.put_u64(msg.commit_lsn.into());
buf.put_i64(msg.hs_feedback.ts);
buf.put_u64(msg.hs_feedback.xmin);
buf.put_u64(msg.hs_feedback.catalog_xmin);
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
}
Ok(())
// TODO remove 3 after converting all msgs
} else if proto_version == SK_PROTO_VERSION_2 {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u64_le('g' as u64);
// v2 didn't have mconf and fields were reordered
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.node_id.0);
}
AcceptorProposerMessage::VoteResponse(msg) => {
// v2 didn't have generation, had u64 vote_given and timeline_start_lsn
buf.put_u64_le('v' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given as u64);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
// removed timeline_start_lsn
buf.put_u64_le(0);
}
AcceptorProposerMessage::AppendResponse(msg) => {
// v2 didn't have generation
buf.put_u64_le('a' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.commit_lsn.into());
buf.put_i64_le(msg.hs_feedback.ts);
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
}
Ok(())
} else {
bail!("unsupported protocol version {}", proto_version);
}
}
}
@@ -586,14 +962,6 @@ where
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
// Check protocol compatibility
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
msg.protocol_version,
SK_PROTOCOL_VERSION
);
}
/* Postgres major version mismatch is treated as fatal error
* because safekeepers parse WAL headers and the format
* may change between versions.
@@ -648,15 +1016,16 @@ where
self.state.finish_change(&state).await?;
}
info!(
"processed greeting from walproposer {}, sending term {:?}",
msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
self.state.acceptor_state.term
);
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.state.acceptor_state.term,
let apg = AcceptorGreeting {
node_id: self.node_id,
})))
mconf: self.state.mconf.clone(),
term: self.state.acceptor_state.term,
};
info!(
"processed greeting {:?} from walproposer, sending {:?}",
msg, apg
);
Ok(Some(AcceptorProposerMessage::Greeting(apg)))
}
/// Give vote for the given term, if we haven't done that previously.
@@ -677,12 +1046,12 @@ where
self.wal_store.flush_wal().await?;
// initialize with refusal
let mut resp = VoteResponse {
generation: self.state.mconf.generation,
term: self.state.acceptor_state.term,
vote_given: false as u64,
vote_given: false,
flush_lsn: self.flush_lsn(),
truncate_lsn: self.state.inmem.peer_horizon_lsn,
term_history: self.get_term_history(),
timeline_start_lsn: self.state.timeline_start_lsn,
};
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.start_change();
@@ -691,15 +1060,16 @@ where
self.state.finish_change(&state).await?;
resp.term = self.state.acceptor_state.term;
resp.vote_given = true as u64;
resp.vote_given = true;
}
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
info!("processed {:?}: sending {:?}", msg, &resp);
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
let ar = AppendResponse {
generation: self.state.mconf.generation,
term: self.state.acceptor_state.term,
flush_lsn: self.flush_lsn(),
commit_lsn: self.state.commit_lsn,
@@ -798,18 +1168,22 @@ where
// Here we learn initial LSN for the first time, set fields
// interested in that.
if state.timeline_start_lsn == Lsn(0) {
// Remember point where WAL begins globally.
state.timeline_start_lsn = msg.timeline_start_lsn;
info!(
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
if let Some(start_lsn) = msg.term_history.0.first() {
if state.timeline_start_lsn == Lsn(0) {
// Remember point where WAL begins globally. In the future it
// will be intialized immediately on timeline creation.
state.timeline_start_lsn = start_lsn.lsn;
info!(
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
}
}
if state.peer_horizon_lsn == Lsn(0) {
// Update peer_horizon_lsn as soon as we know where timeline starts.
// It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
state.peer_horizon_lsn = msg.timeline_start_lsn;
state.peer_horizon_lsn = state.timeline_start_lsn;
}
if state.local_start_lsn == Lsn(0) {
state.local_start_lsn = msg.start_streaming_at;
@@ -889,7 +1263,10 @@ where
// If our term is higher, immediately refuse the message.
if self.state.acceptor_state.term > msg.h.term {
let resp = AppendResponse::term_only(self.state.acceptor_state.term);
let resp = AppendResponse::term_only(
self.state.mconf.generation,
self.state.acceptor_state.term,
);
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
@@ -917,10 +1294,8 @@ where
);
}
// Now we know that we are in the same term as the proposer,
// processing the message.
self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
// Now we know that we are in the same term as the proposer, process the
// message.
// do the job
if !msg.wal_data.is_empty() {

View File

@@ -15,7 +15,9 @@ use desim::{
};
use http::Uri;
use safekeeper::{
safekeeper::{ProposerAcceptorMessage, SafeKeeper, UNKNOWN_SERVER_VERSION},
safekeeper::{
ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION,
},
state::{TimelinePersistentState, TimelineState},
timeline::TimelineError,
wal_storage::Storage,
@@ -283,7 +285,7 @@ impl ConnState {
bail!("finished processing START_REPLICATION")
}
let msg = ProposerAcceptorMessage::parse(copy_data)?;
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?;
debug!("got msg: {:?}", msg);
self.process(msg, global)
} else {

View File

@@ -6,9 +6,14 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.pageserver.http import PageserverHttpClient
def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
def check_tenant(
env: NeonEnv, pageserver_http: PageserverHttpClient, safekeeper_proto_version: int
):
tenant_id, timeline_id = env.create_tenant()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
config_lines = [
f"neon.safekeeper_proto_version = {safekeeper_proto_version}",
]
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=config_lines)
# we rely upon autocommit after each statement
res_1 = endpoint.safe_psql_many(
queries=[
@@ -33,7 +38,14 @@ def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
@pytest.mark.parametrize("num_timelines,num_safekeepers", [(3, 1)])
def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_safekeepers: int):
# Test both proto versions until we fully migrate.
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
def test_normal_work(
neon_env_builder: NeonEnvBuilder,
num_timelines: int,
num_safekeepers: int,
safekeeper_proto_version: int,
):
"""
Basic test:
* create new tenant with a timeline
@@ -52,4 +64,4 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s
pageserver_http = env.pageserver.http_client()
for _ in range(num_timelines):
check_tenant(env, pageserver_http)
check_tenant(env, pageserver_http, safekeeper_proto_version)