mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-09 05:30:37 +00:00
Compare commits
17 Commits
add_audit_
...
sk-members
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c811ae0b91 | ||
|
|
777afbafe5 | ||
|
|
0d0cd16ea2 | ||
|
|
217309c7ef | ||
|
|
976afcee26 | ||
|
|
20e974ecdd | ||
|
|
eb43f65055 | ||
|
|
96d67abd50 | ||
|
|
f7485c4459 | ||
|
|
4ea7b22537 | ||
|
|
10a7878230 | ||
|
|
c19a8b69f2 | ||
|
|
8be17724d8 | ||
|
|
234c3a29df | ||
|
|
db5513076a | ||
|
|
70d4e077a6 | ||
|
|
ae9db8975a |
@@ -38,14 +38,12 @@ impl Display for SafekeeperId {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(transparent)]
|
||||
pub struct MemberSet {
|
||||
pub members: Vec<SafekeeperId>,
|
||||
pub m: Vec<SafekeeperId>,
|
||||
}
|
||||
|
||||
impl MemberSet {
|
||||
pub fn empty() -> Self {
|
||||
MemberSet {
|
||||
members: Vec::new(),
|
||||
}
|
||||
MemberSet { m: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
|
||||
@@ -53,11 +51,11 @@ impl MemberSet {
|
||||
if hs.len() != members.len() {
|
||||
bail!("duplicate safekeeper id in the set {:?}", members);
|
||||
}
|
||||
Ok(MemberSet { members })
|
||||
Ok(MemberSet { m: members })
|
||||
}
|
||||
|
||||
pub fn contains(&self, sk: &SafekeeperId) -> bool {
|
||||
self.members.iter().any(|m| m.id == sk.id)
|
||||
self.m.iter().any(|m| m.id == sk.id)
|
||||
}
|
||||
|
||||
pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
|
||||
@@ -67,7 +65,7 @@ impl MemberSet {
|
||||
sk.id, self
|
||||
));
|
||||
}
|
||||
self.members.push(sk);
|
||||
self.m.push(sk);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -75,11 +73,7 @@ impl MemberSet {
|
||||
impl Display for MemberSet {
|
||||
/// Display as a comma separated list of members.
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let sks_str = self
|
||||
.members
|
||||
.iter()
|
||||
.map(|m| m.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
let sks_str = self.m.iter().map(|sk| sk.to_string()).collect::<Vec<_>>();
|
||||
write!(f, "({})", sks_str.join(", "))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,6 +215,7 @@ impl Wrapper {
|
||||
syncSafekeepers: config.sync_safekeepers,
|
||||
systemId: 0,
|
||||
pgTimeline: 1,
|
||||
proto_version: 2,
|
||||
callback_data,
|
||||
};
|
||||
let c_config = Box::into_raw(Box::new(c_config));
|
||||
|
||||
@@ -51,6 +51,26 @@ HexDecodeString(uint8 *result, char *input, int nbytes)
|
||||
return true;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint16 - get a binary 2-byte int from a message buffer
|
||||
* --------------------------------
|
||||
*/
|
||||
uint16
|
||||
pq_getmsgint16(StringInfo msg)
|
||||
{
|
||||
return pq_getmsgint(msg, 2);
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint32 - get a binary 4-byte int from a message buffer
|
||||
* --------------------------------
|
||||
*/
|
||||
uint32
|
||||
pq_getmsgint32(StringInfo msg)
|
||||
{
|
||||
return pq_getmsgint(msg, 4);
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
#endif
|
||||
|
||||
bool HexDecodeString(uint8 *result, char *input, int nbytes);
|
||||
uint16 pq_getmsgint16(StringInfo msg);
|
||||
uint32 pq_getmsgint32(StringInfo msg);
|
||||
uint32 pq_getmsgint32_le(StringInfo msg);
|
||||
uint64 pq_getmsgint64_le(StringInfo msg);
|
||||
void pq_sendint32_le(StringInfo buf, uint32 i);
|
||||
|
||||
@@ -70,6 +70,7 @@ static bool SendAppendRequests(Safekeeper *sk);
|
||||
static bool RecvAppendResponses(Safekeeper *sk);
|
||||
static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp);
|
||||
static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp);
|
||||
static void PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version);
|
||||
static void HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk);
|
||||
static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size);
|
||||
static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg);
|
||||
@@ -81,6 +82,8 @@ static char *FormatSafekeeperState(Safekeeper *sk);
|
||||
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
|
||||
static char *FormatEvents(WalProposer *wp, uint32 events);
|
||||
static void UpdateDonorShmem(WalProposer *wp);
|
||||
static char *MembershipConfigurationToString(MembershipConfiguration *mconf);
|
||||
static void MembershipConfigurationFree(MembershipConfiguration *mconf);
|
||||
|
||||
WalProposer *
|
||||
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
@@ -137,25 +140,21 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
}
|
||||
wp->quorum = wp->n_safekeepers / 2 + 1;
|
||||
|
||||
if (wp->config->proto_version != 2 && wp->config->proto_version != 3)
|
||||
wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version);
|
||||
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
|
||||
|
||||
/* Fill the greeting package */
|
||||
wp->greetRequest.tag = 'g';
|
||||
wp->greetRequest.protocolVersion = SK_PROTOCOL_VERSION;
|
||||
wp->greetRequest.pgVersion = PG_VERSION_NUM;
|
||||
wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId));
|
||||
wp->greetRequest.systemId = wp->config->systemId;
|
||||
if (!wp->config->neon_timeline)
|
||||
wp_log(FATAL, "neon.timeline_id is not provided");
|
||||
if (*wp->config->neon_timeline != '\0' &&
|
||||
!HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16))
|
||||
wp_log(FATAL, "could not parse neon.timeline_id, %s", wp->config->neon_timeline);
|
||||
wp->greetRequest.pam.tag = 'g';
|
||||
if (!wp->config->neon_tenant)
|
||||
wp_log(FATAL, "neon.tenant_id is not provided");
|
||||
if (*wp->config->neon_tenant != '\0' &&
|
||||
!HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16))
|
||||
wp_log(FATAL, "could not parse neon.tenant_id, %s", wp->config->neon_tenant);
|
||||
|
||||
wp->greetRequest.timeline = wp->config->pgTimeline;
|
||||
wp->greetRequest.walSegSize = wp->config->wal_segment_size;
|
||||
wp->greetRequest.tenant_id = wp->config->neon_tenant;
|
||||
if (!wp->config->neon_timeline)
|
||||
wp_log(FATAL, "neon.timeline_id is not provided");
|
||||
wp->greetRequest.timeline_id = wp->config->neon_timeline;
|
||||
wp->greetRequest.pg_version = PG_VERSION_NUM;
|
||||
wp->greetRequest.system_id = wp->config->systemId;
|
||||
wp->greetRequest.wal_seg_size = wp->config->wal_segment_size;
|
||||
|
||||
wp->api.init_event_set(wp);
|
||||
|
||||
@@ -165,12 +164,14 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
void
|
||||
WalProposerFree(WalProposer *wp)
|
||||
{
|
||||
MembershipConfigurationFree(&wp->mconf);
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
Safekeeper *sk = &wp->safekeeper[i];
|
||||
|
||||
Assert(sk->outbuf.data != NULL);
|
||||
pfree(sk->outbuf.data);
|
||||
MembershipConfigurationFree(&sk->greetResponse.mconf);
|
||||
if (sk->voteResponse.termHistory.entries)
|
||||
pfree(sk->voteResponse.termHistory.entries);
|
||||
sk->voteResponse.termHistory.entries = NULL;
|
||||
@@ -308,6 +309,7 @@ ShutdownConnection(Safekeeper *sk)
|
||||
sk->state = SS_OFFLINE;
|
||||
sk->streamingAt = InvalidXLogRecPtr;
|
||||
|
||||
MembershipConfigurationFree(&sk->greetResponse.mconf);
|
||||
if (sk->voteResponse.termHistory.entries)
|
||||
pfree(sk->voteResponse.termHistory.entries);
|
||||
sk->voteResponse.termHistory.entries = NULL;
|
||||
@@ -598,11 +600,14 @@ static void
|
||||
SendStartWALPush(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
#define CMD_LEN 512
|
||||
char cmd[CMD_LEN];
|
||||
|
||||
if (!wp->api.conn_send_query(sk, "START_WAL_PUSH"))
|
||||
snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version);
|
||||
if (!wp->api.conn_send_query(sk, cmd))
|
||||
{
|
||||
wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
|
||||
sk->host, sk->port, wp->api.conn_error_message(sk));
|
||||
wp_log(WARNING, "failed to send %s query to safekeeper %s:%s: %s",
|
||||
cmd, sk->host, sk->port, wp->api.conn_error_message(sk));
|
||||
ShutdownConnection(sk);
|
||||
return;
|
||||
}
|
||||
@@ -658,23 +663,33 @@ RecvStartWALPushResult(Safekeeper *sk)
|
||||
|
||||
/*
|
||||
* Start handshake: first of all send information about the
|
||||
* safekeeper. After sending, we wait on SS_HANDSHAKE_RECV for
|
||||
* walproposer. After sending, we wait on SS_HANDSHAKE_RECV for
|
||||
* a response to finish the handshake.
|
||||
*/
|
||||
static void
|
||||
SendProposerGreeting(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
char *mconf_toml = MembershipConfigurationToString(&wp->greetRequest.mconf);
|
||||
|
||||
wp_log(LOG, "sending ProposerGreeting to safekeeper %s:%s with mconf = %s", sk->host, sk->port, mconf_toml);
|
||||
pfree(mconf_toml);
|
||||
|
||||
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->greetRequest,
|
||||
&sk->outbuf, wp->config->proto_version);
|
||||
|
||||
/*
|
||||
* On failure, logging & resetting the connection is handled. We just need
|
||||
* to handle the control flow.
|
||||
*/
|
||||
BlockingWrite(sk, &sk->wp->greetRequest, sizeof(sk->wp->greetRequest), SS_HANDSHAKE_RECV);
|
||||
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
|
||||
}
|
||||
|
||||
static void
|
||||
RecvAcceptorGreeting(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
char *mconf_toml;
|
||||
|
||||
/*
|
||||
* If our reading doesn't immediately succeed, any necessary error
|
||||
@@ -685,7 +700,10 @@ RecvAcceptorGreeting(Safekeeper *sk)
|
||||
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse))
|
||||
return;
|
||||
|
||||
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, term=" INT64_FORMAT, sk->host, sk->port, sk->greetResponse.term);
|
||||
mconf_toml = MembershipConfigurationToString(&sk->greetResponse.mconf);
|
||||
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, node_id = %lu, mconf = %s, term=" UINT64_FORMAT,
|
||||
sk->host, sk->port, sk->greetResponse.nodeId, mconf_toml, sk->greetResponse.term);
|
||||
pfree(mconf_toml);
|
||||
|
||||
/* Protocol is all good, move to voting. */
|
||||
sk->state = SS_VOTING;
|
||||
@@ -707,12 +725,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
|
||||
wp->propTerm++;
|
||||
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
|
||||
|
||||
wp->voteRequest = (VoteRequest)
|
||||
{
|
||||
.tag = 'v',
|
||||
.term = wp->propTerm
|
||||
};
|
||||
memcpy(wp->voteRequest.proposerId.data, wp->greetRequest.proposerId.data, UUID_LEN);
|
||||
wp->voteRequest.pam.tag = 'v';
|
||||
wp->voteRequest.generation = wp->mconf.generation;
|
||||
wp->voteRequest.term = wp->propTerm;
|
||||
}
|
||||
}
|
||||
else if (sk->greetResponse.term > wp->propTerm)
|
||||
@@ -759,12 +774,14 @@ SendVoteRequest(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
|
||||
/* We have quorum for voting, send our vote request */
|
||||
wp_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term);
|
||||
/* On failure, logging & resetting is handled */
|
||||
if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT))
|
||||
return;
|
||||
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->voteRequest,
|
||||
&sk->outbuf, wp->config->proto_version);
|
||||
|
||||
/* We have quorum for voting, send our vote request */
|
||||
wp_log(LOG, "requesting vote from %s:%s for generation %u term " UINT64_FORMAT, sk->host, sk->port,
|
||||
wp->voteRequest.generation, wp->voteRequest.term);
|
||||
/* On failure, logging & resetting is handled */
|
||||
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_WAIT_VERDICT);
|
||||
/* If successful, wait for read-ready with SS_WAIT_VERDICT */
|
||||
}
|
||||
|
||||
@@ -778,11 +795,12 @@ RecvVoteResponse(Safekeeper *sk)
|
||||
return;
|
||||
|
||||
wp_log(LOG,
|
||||
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
|
||||
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
|
||||
"got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X",
|
||||
sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term,
|
||||
sk->voteResponse.voteGiven,
|
||||
GetHighestTerm(&sk->voteResponse.termHistory),
|
||||
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
|
||||
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
|
||||
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
|
||||
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn));
|
||||
|
||||
/*
|
||||
* In case of acceptor rejecting our vote, bail out, but only if either it
|
||||
@@ -847,9 +865,9 @@ HandleElectedProposer(WalProposer *wp)
|
||||
* otherwise we must be sync-safekeepers and we have nothing to do then.
|
||||
*
|
||||
* Proceeding is not only pointless but harmful, because we'd give
|
||||
* safekeepers term history starting with 0/0. These hacks will go away once
|
||||
* we disable implicit timeline creation on safekeepers and create it with
|
||||
* non zero LSN from the start.
|
||||
* safekeepers term history starting with 0/0. These hacks will go away
|
||||
* once we disable implicit timeline creation on safekeepers and create it
|
||||
* with non zero LSN from the start.
|
||||
*/
|
||||
if (wp->propEpochStartLsn == InvalidXLogRecPtr)
|
||||
{
|
||||
@@ -942,7 +960,6 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
wp->propEpochStartLsn = InvalidXLogRecPtr;
|
||||
wp->donorEpoch = 0;
|
||||
wp->truncateLsn = InvalidXLogRecPtr;
|
||||
wp->timelineStartLsn = InvalidXLogRecPtr;
|
||||
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
@@ -959,20 +976,6 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
wp->donor = i;
|
||||
}
|
||||
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
|
||||
|
||||
if (wp->safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr)
|
||||
{
|
||||
/* timelineStartLsn should be the same everywhere or unknown */
|
||||
if (wp->timelineStartLsn != InvalidXLogRecPtr &&
|
||||
wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn)
|
||||
{
|
||||
wp_log(WARNING,
|
||||
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
|
||||
LSN_FORMAT_ARGS(wp->timelineStartLsn),
|
||||
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
|
||||
}
|
||||
wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -995,22 +998,11 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
|
||||
{
|
||||
wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp);
|
||||
if (wp->timelineStartLsn == InvalidXLogRecPtr)
|
||||
{
|
||||
wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp);
|
||||
}
|
||||
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
|
||||
}
|
||||
pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn);
|
||||
|
||||
/*
|
||||
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so
|
||||
* it should never be zero at this point, if we know timelineStartLsn.
|
||||
*
|
||||
* timelineStartLsn can be zero only on the first syncSafekeepers run.
|
||||
*/
|
||||
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
|
||||
(wp->config->syncSafekeepers && wp->truncateLsn == wp->timelineStartLsn));
|
||||
Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers);
|
||||
|
||||
/*
|
||||
* We will be generating WAL since propEpochStartLsn, so we should set
|
||||
@@ -1052,10 +1044,11 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
|
||||
{
|
||||
/*
|
||||
* However, allow to proceed if last_log_term on the node which gave
|
||||
* the highest vote (i.e. point where we are going to start writing)
|
||||
* actually had been won by me; plain restart of walproposer not
|
||||
* intervened by concurrent compute which wrote WAL is ok.
|
||||
* However, allow to proceed if last_log_term on the node which
|
||||
* gave the highest vote (i.e. point where we are going to start
|
||||
* writing) actually had been won by me; plain restart of
|
||||
* walproposer not intervened by concurrent compute which wrote
|
||||
* WAL is ok.
|
||||
*
|
||||
* This avoids compute crash after manual term_bump.
|
||||
*/
|
||||
@@ -1125,14 +1118,8 @@ SendProposerElected(Safekeeper *sk)
|
||||
{
|
||||
/* safekeeper is empty or no common point, start from the beginning */
|
||||
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
|
||||
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u",
|
||||
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
|
||||
|
||||
/*
|
||||
* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline
|
||||
* is created manually (test_s3_wal_replay)
|
||||
*/
|
||||
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
|
||||
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, termHistory.n_entries=%u",
|
||||
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), wp->propTermHistory.n_entries);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1157,29 +1144,19 @@ SendProposerElected(Safekeeper *sk)
|
||||
|
||||
Assert(sk->startStreamingAt <= wp->availableLsn);
|
||||
|
||||
msg.tag = 'e';
|
||||
msg.apm.tag = 'e';
|
||||
msg.generation = wp->mconf.generation;
|
||||
msg.term = wp->propTerm;
|
||||
msg.startStreamingAt = sk->startStreamingAt;
|
||||
msg.termHistory = &wp->propTermHistory;
|
||||
msg.timelineStartLsn = wp->timelineStartLsn;
|
||||
|
||||
lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0;
|
||||
wp_log(LOG,
|
||||
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
|
||||
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
|
||||
|
||||
resetStringInfo(&sk->outbuf);
|
||||
pq_sendint64_le(&sk->outbuf, msg.tag);
|
||||
pq_sendint64_le(&sk->outbuf, msg.term);
|
||||
pq_sendint64_le(&sk->outbuf, msg.startStreamingAt);
|
||||
pq_sendint32_le(&sk->outbuf, msg.termHistory->n_entries);
|
||||
for (int i = 0; i < msg.termHistory->n_entries; i++)
|
||||
{
|
||||
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].term);
|
||||
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].lsn);
|
||||
}
|
||||
pq_sendint64_le(&sk->outbuf, msg.timelineStartLsn);
|
||||
"sending elected msg to node " UINT64_FORMAT " generation=%u term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s",
|
||||
sk->greetResponse.nodeId, msg.generation, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt),
|
||||
lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port);
|
||||
|
||||
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version);
|
||||
if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH))
|
||||
return;
|
||||
|
||||
@@ -1245,14 +1222,13 @@ static void
|
||||
PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn)
|
||||
{
|
||||
Assert(endLsn >= beginLsn);
|
||||
req->tag = 'a';
|
||||
req->apm.tag = 'a';
|
||||
req->generation = wp->mconf.generation;
|
||||
req->term = wp->propTerm;
|
||||
req->epochStartLsn = wp->propEpochStartLsn;
|
||||
req->beginLsn = beginLsn;
|
||||
req->endLsn = endLsn;
|
||||
req->commitLsn = wp->commitLsn;
|
||||
req->truncateLsn = wp->truncateLsn;
|
||||
req->proposerId = wp->greetRequest.proposerId;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1353,7 +1329,8 @@ SendAppendRequests(Safekeeper *sk)
|
||||
resetStringInfo(&sk->outbuf);
|
||||
|
||||
/* write AppendRequest header */
|
||||
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
|
||||
PAMessageSerialize(wp, (ProposerAcceptorMessage *) req, &sk->outbuf, wp->config->proto_version);
|
||||
/* prepare for reading WAL into the outbuf */
|
||||
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
|
||||
sk->active_state = SS_ACTIVE_READ_WAL;
|
||||
}
|
||||
@@ -1366,14 +1343,17 @@ SendAppendRequests(Safekeeper *sk)
|
||||
req = &sk->appendRequest;
|
||||
req_len = req->endLsn - req->beginLsn;
|
||||
|
||||
/* We send zero sized AppenRequests as heartbeats; don't wal_read for these. */
|
||||
/*
|
||||
* We send zero sized AppenRequests as heartbeats; don't wal_read
|
||||
* for these.
|
||||
*/
|
||||
if (req_len > 0)
|
||||
{
|
||||
switch (wp->api.wal_read(sk,
|
||||
&sk->outbuf.data[sk->outbuf.len],
|
||||
req->beginLsn,
|
||||
req_len,
|
||||
&errmsg))
|
||||
&sk->outbuf.data[sk->outbuf.len],
|
||||
req->beginLsn,
|
||||
req_len,
|
||||
&errmsg))
|
||||
{
|
||||
case NEON_WALREAD_SUCCESS:
|
||||
break;
|
||||
@@ -1381,7 +1361,7 @@ SendAppendRequests(Safekeeper *sk)
|
||||
return true;
|
||||
case NEON_WALREAD_ERROR:
|
||||
wp_log(WARNING, "WAL reading for node %s:%s failed: %s",
|
||||
sk->host, sk->port, errmsg);
|
||||
sk->host, sk->port, errmsg);
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
default:
|
||||
@@ -1469,11 +1449,11 @@ RecvAppendResponses(Safekeeper *sk)
|
||||
* Term has changed to higher one, probably another compute is
|
||||
* running. If this is the case we could PANIC as well because
|
||||
* likely it inserted some data and our basebackup is unsuitable
|
||||
* anymore. However, we also bump term manually (term_bump endpoint)
|
||||
* on safekeepers for migration purposes, in this case we do want
|
||||
* compute to stay alive. So restart walproposer with FATAL instead
|
||||
* of panicking; if basebackup is spoiled next election will notice
|
||||
* this.
|
||||
* anymore. However, we also bump term manually (term_bump
|
||||
* endpoint) on safekeepers for migration purposes, in this case
|
||||
* we do want compute to stay alive. So restart walproposer with
|
||||
* FATAL instead of panicking; if basebackup is spoiled next
|
||||
* election will notice this.
|
||||
*/
|
||||
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
|
||||
sk->host, sk->port,
|
||||
@@ -1749,6 +1729,208 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
|
||||
}
|
||||
}
|
||||
|
||||
/* Serialize MembershipConfiguration into buf. */
|
||||
static void
|
||||
MembershipConfigurationSerialize(MembershipConfiguration *mconf, StringInfo buf)
|
||||
{
|
||||
uint32 i;
|
||||
|
||||
pq_sendint32(buf, mconf->generation);
|
||||
|
||||
pq_sendint32(buf, mconf->members.len);
|
||||
for (i = 0; i < mconf->members.len; i++)
|
||||
{
|
||||
pq_sendint64(buf, mconf->members.m[i].node_id);
|
||||
pq_send_ascii_string(buf, mconf->members.m[i].host);
|
||||
pq_sendint16(buf, mconf->members.m[i].port);
|
||||
}
|
||||
|
||||
/*
|
||||
* There is no special mark for absent new_members; zero members in
|
||||
* invalid, so zero len means absent.
|
||||
*/
|
||||
pq_sendint32(buf, mconf->new_members.len);
|
||||
for (i = 0; i < mconf->new_members.len; i++)
|
||||
{
|
||||
pq_sendint64(buf, mconf->new_members.m[i].node_id);
|
||||
pq_send_ascii_string(buf, mconf->new_members.m[i].host);
|
||||
pq_sendint16(buf, mconf->new_members.m[i].port);
|
||||
}
|
||||
}
|
||||
|
||||
/* Serialize proposer -> acceptor message into buf using specified version */
|
||||
static void
|
||||
PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version)
|
||||
{
|
||||
/* both version are supported currently until we fully migrate to 3 */
|
||||
Assert(proto_version == 3 || proto_version == 2);
|
||||
|
||||
resetStringInfo(buf);
|
||||
|
||||
if (proto_version == 3)
|
||||
{
|
||||
/*
|
||||
* v2 sends structs for some messages as is, so commonly send tag only
|
||||
* for v3
|
||||
*/
|
||||
pq_sendint8(buf, msg->tag);
|
||||
|
||||
switch (msg->tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
ProposerGreeting *m = (ProposerGreeting *) msg;
|
||||
|
||||
pq_send_ascii_string(buf, m->tenant_id);
|
||||
pq_send_ascii_string(buf, m->timeline_id);
|
||||
MembershipConfigurationSerialize(&m->mconf, buf);
|
||||
pq_sendint32(buf, m->pg_version);
|
||||
pq_sendint64(buf, m->system_id);
|
||||
pq_sendint32(buf, m->wal_seg_size);
|
||||
break;
|
||||
}
|
||||
case 'v':
|
||||
{
|
||||
VoteRequest *m = (VoteRequest *) msg;
|
||||
|
||||
pq_sendint32(buf, m->generation);
|
||||
pq_sendint64(buf, m->term);
|
||||
break;
|
||||
|
||||
}
|
||||
case 'e':
|
||||
{
|
||||
ProposerElected *m = (ProposerElected *) msg;
|
||||
|
||||
pq_sendint32(buf, m->generation);
|
||||
pq_sendint64(buf, m->term);
|
||||
pq_sendint64(buf, m->startStreamingAt);
|
||||
pq_sendint32(buf, m->termHistory->n_entries);
|
||||
for (uint32 i = 0; i < m->termHistory->n_entries; i++)
|
||||
{
|
||||
pq_sendint64(buf, m->termHistory->entries[i].term);
|
||||
pq_sendint64(buf, m->termHistory->entries[i].lsn);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'a':
|
||||
{
|
||||
/*
|
||||
* Note: this serializes only AppendRequestHeader, caller
|
||||
* is expected to append WAL data later.
|
||||
*/
|
||||
AppendRequestHeader *m = (AppendRequestHeader *) msg;
|
||||
|
||||
pq_sendint32(buf, m->generation);
|
||||
pq_sendint64(buf, m->term);
|
||||
pq_sendint64(buf, m->beginLsn);
|
||||
pq_sendint64(buf, m->endLsn);
|
||||
pq_sendint64(buf, m->commitLsn);
|
||||
pq_sendint64(buf, m->truncateLsn);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (proto_version == 2)
|
||||
{
|
||||
switch (msg->tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
/* v2 sent struct as is */
|
||||
ProposerGreeting *m = (ProposerGreeting *) msg;
|
||||
ProposerGreetingV2 greetRequestV2;
|
||||
|
||||
/* Fill also v2 struct. */
|
||||
greetRequestV2.tag = 'g';
|
||||
greetRequestV2.protocolVersion = proto_version;
|
||||
greetRequestV2.pgVersion = m->pg_version;
|
||||
|
||||
/*
|
||||
* v3 removed this field because it's easier to pass as
|
||||
* libq or START_WAL_PUSH options
|
||||
*/
|
||||
memset(&greetRequestV2.proposerId, 0, sizeof(greetRequestV2.proposerId));
|
||||
greetRequestV2.systemId = wp->config->systemId;
|
||||
if (*m->timeline_id != '\0' &&
|
||||
!HexDecodeString(greetRequestV2.timeline_id, m->timeline_id, 16))
|
||||
wp_log(FATAL, "could not parse neon.timeline_id, %s", m->timeline_id);
|
||||
if (*m->tenant_id != '\0' &&
|
||||
!HexDecodeString(greetRequestV2.tenant_id, m->tenant_id, 16))
|
||||
wp_log(FATAL, "could not parse neon.tenant_id, %s", m->tenant_id);
|
||||
|
||||
greetRequestV2.timeline = wp->config->pgTimeline;
|
||||
greetRequestV2.walSegSize = wp->config->wal_segment_size;
|
||||
|
||||
pq_sendbytes(buf, (char *) &greetRequestV2, sizeof(greetRequestV2));
|
||||
break;
|
||||
}
|
||||
case 'v':
|
||||
{
|
||||
/* v2 sent struct as is */
|
||||
VoteRequest *m = (VoteRequest *) msg;
|
||||
VoteRequestV2 voteRequestV2;
|
||||
|
||||
voteRequestV2.tag = m->pam.tag;
|
||||
voteRequestV2.term = m->term;
|
||||
/* removed field */
|
||||
memset(&voteRequestV2.proposerId, 0, sizeof(voteRequestV2.proposerId));
|
||||
pq_sendbytes(buf, (char *) &voteRequestV2, sizeof(voteRequestV2));
|
||||
break;
|
||||
}
|
||||
case 'e':
|
||||
{
|
||||
ProposerElected *m = (ProposerElected *) msg;
|
||||
|
||||
pq_sendint64_le(buf, m->apm.tag);
|
||||
pq_sendint64_le(buf, m->term);
|
||||
pq_sendint64_le(buf, m->startStreamingAt);
|
||||
pq_sendint32_le(buf, m->termHistory->n_entries);
|
||||
for (int i = 0; i < m->termHistory->n_entries; i++)
|
||||
{
|
||||
pq_sendint64_le(buf, m->termHistory->entries[i].term);
|
||||
pq_sendint64_le(buf, m->termHistory->entries[i].lsn);
|
||||
}
|
||||
pq_sendint64_le(buf, 0); /* removed timeline_start_lsn */
|
||||
break;
|
||||
}
|
||||
case 'a':
|
||||
|
||||
/*
|
||||
* Note: this serializes only AppendRequestHeader, caller is
|
||||
* expected to append WAL data later.
|
||||
*/
|
||||
{
|
||||
/* v2 sent struct as is */
|
||||
AppendRequestHeader *m = (AppendRequestHeader *) msg;
|
||||
AppendRequestHeaderV2 appendRequestHeaderV2;
|
||||
|
||||
appendRequestHeaderV2.tag = m->apm.tag;
|
||||
appendRequestHeaderV2.term = m->term;
|
||||
appendRequestHeaderV2.epochStartLsn = 0; /* removed field */
|
||||
appendRequestHeaderV2.beginLsn = m->beginLsn;
|
||||
appendRequestHeaderV2.endLsn = m->endLsn;
|
||||
appendRequestHeaderV2.commitLsn = m->commitLsn;
|
||||
appendRequestHeaderV2.truncateLsn = m->truncateLsn;
|
||||
/* removed field */
|
||||
memset(&appendRequestHeaderV2.proposerId, 0, sizeof(appendRequestHeaderV2.proposerId));
|
||||
|
||||
pq_sendbytes(buf, (char *) &appendRequestHeaderV2, sizeof(appendRequestHeaderV2));
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
|
||||
}
|
||||
return;
|
||||
}
|
||||
wp_log(FATAL, "unexpected proto_version %d", proto_version);
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to read CopyData message from i'th safekeeper, resetting connection on
|
||||
* failure.
|
||||
@@ -1778,6 +1960,37 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Deserialize membership configuration from buf to mconf. */
|
||||
static void
|
||||
MembershipConfigurationDeserialize(MembershipConfiguration *mconf, StringInfo buf)
|
||||
{
|
||||
uint32 i;
|
||||
|
||||
mconf->generation = pq_getmsgint32(buf);
|
||||
mconf->members.len = pq_getmsgint32(buf);
|
||||
mconf->members.m = palloc0(sizeof(SafekeeperId) * mconf->members.len);
|
||||
for (i = 0; i < mconf->members.len; i++)
|
||||
{
|
||||
const char *buf_host;
|
||||
|
||||
mconf->members.m[i].node_id = pq_getmsgint64(buf);
|
||||
buf_host = pq_getmsgrawstring(buf);
|
||||
strlcpy(mconf->members.m[i].host, buf_host, sizeof(mconf->members.m[i].host));
|
||||
mconf->members.m[i].port = pq_getmsgint16(buf);
|
||||
}
|
||||
mconf->new_members.len = pq_getmsgint32(buf);
|
||||
mconf->new_members.m = palloc0(sizeof(SafekeeperId) * mconf->new_members.len);
|
||||
for (i = 0; i < mconf->new_members.len; i++)
|
||||
{
|
||||
const char *buf_host;
|
||||
|
||||
mconf->new_members.m[i].node_id = pq_getmsgint64(buf);
|
||||
buf_host = pq_getmsgrawstring(buf);
|
||||
strlcpy(mconf->new_members.m[i].host, buf_host, sizeof(mconf->new_members.m[i].host));
|
||||
mconf->new_members.m[i].port = pq_getmsgint16(buf);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Read next message with known type into provided struct, by reading a CopyData
|
||||
* block from the safekeeper's postgres connection, returning whether the read
|
||||
@@ -1786,6 +1999,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
|
||||
* If the read needs more polling, we return 'false' and keep the state
|
||||
* unmodified, waiting until it becomes read-ready to try again. If it fully
|
||||
* failed, a warning is emitted and the connection is reset.
|
||||
*
|
||||
* Note: it pallocs if needed, i.e. for AcceptorGreeting and VoteResponse fields.
|
||||
*/
|
||||
static bool
|
||||
AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
|
||||
@@ -1794,82 +2009,153 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
|
||||
|
||||
char *buf;
|
||||
int buf_size;
|
||||
uint64 tag;
|
||||
uint8 tag;
|
||||
StringInfoData s;
|
||||
|
||||
if (!(AsyncRead(sk, &buf, &buf_size)))
|
||||
return false;
|
||||
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
|
||||
|
||||
/* parse it */
|
||||
s.data = buf;
|
||||
s.len = buf_size;
|
||||
s.maxlen = buf_size;
|
||||
s.cursor = 0;
|
||||
|
||||
tag = pq_getmsgint64_le(&s);
|
||||
if (tag != anymsg->tag)
|
||||
if (wp->config->proto_version == 3)
|
||||
{
|
||||
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
|
||||
sk->port, FormatSafekeeperState(sk));
|
||||
ResetConnection(sk);
|
||||
return false;
|
||||
}
|
||||
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
|
||||
switch (tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->nodeId = pq_getmsgint64_le(&s);
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
case 'v':
|
||||
{
|
||||
VoteResponse *msg = (VoteResponse *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->voteGiven = pq_getmsgint64_le(&s);
|
||||
msg->flushLsn = pq_getmsgint64_le(&s);
|
||||
msg->truncateLsn = pq_getmsgint64_le(&s);
|
||||
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
|
||||
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
|
||||
for (int i = 0; i < msg->termHistory.n_entries; i++)
|
||||
tag = pq_getmsgbyte(&s);
|
||||
if (tag != anymsg->tag)
|
||||
{
|
||||
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
|
||||
sk->port, FormatSafekeeperState(sk));
|
||||
ResetConnection(sk);
|
||||
return false;
|
||||
}
|
||||
switch (tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
|
||||
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
|
||||
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
|
||||
|
||||
msg->nodeId = pq_getmsgint64(&s);
|
||||
MembershipConfigurationDeserialize(&msg->mconf, &s);
|
||||
msg->term = pq_getmsgint64(&s);
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
msg->timelineStartLsn = pq_getmsgint64_le(&s);
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
case 'v':
|
||||
{
|
||||
VoteResponse *msg = (VoteResponse *) anymsg;
|
||||
|
||||
case 'a':
|
||||
{
|
||||
AppendResponse *msg = (AppendResponse *) anymsg;
|
||||
msg->generation = pq_getmsgint32(&s);
|
||||
msg->term = pq_getmsgint64(&s);
|
||||
msg->voteGiven = pq_getmsgbyte(&s);
|
||||
msg->flushLsn = pq_getmsgint64(&s);
|
||||
msg->truncateLsn = pq_getmsgint64(&s);
|
||||
msg->termHistory.n_entries = pq_getmsgint32(&s);
|
||||
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
|
||||
for (uint32 i = 0; i < msg->termHistory.n_entries; i++)
|
||||
{
|
||||
msg->termHistory.entries[i].term = pq_getmsgint64(&s);
|
||||
msg->termHistory.entries[i].lsn = pq_getmsgint64(&s);
|
||||
}
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
case 'a':
|
||||
{
|
||||
AppendResponse *msg = (AppendResponse *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->flushLsn = pq_getmsgint64_le(&s);
|
||||
msg->commitLsn = pq_getmsgint64_le(&s);
|
||||
msg->hs.ts = pq_getmsgint64_le(&s);
|
||||
msg->hs.xmin.value = pq_getmsgint64_le(&s);
|
||||
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
|
||||
if (s.len > s.cursor)
|
||||
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
|
||||
else
|
||||
msg->ps_feedback.present = false;
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
Assert(false);
|
||||
return false;
|
||||
}
|
||||
msg->generation = pq_getmsgint32(&s);
|
||||
msg->term = pq_getmsgint64(&s);
|
||||
msg->flushLsn = pq_getmsgint64(&s);
|
||||
msg->commitLsn = pq_getmsgint64(&s);
|
||||
msg->hs.ts = pq_getmsgint64(&s);
|
||||
msg->hs.xmin.value = pq_getmsgint64(&s);
|
||||
msg->hs.catalog_xmin.value = pq_getmsgint64(&s);
|
||||
if (s.len > s.cursor)
|
||||
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
|
||||
else
|
||||
msg->ps_feedback.present = false;
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
default:
|
||||
{
|
||||
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (wp->config->proto_version == 2)
|
||||
{
|
||||
tag = pq_getmsgint64_le(&s);
|
||||
if (tag != anymsg->tag)
|
||||
{
|
||||
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
|
||||
sk->port, FormatSafekeeperState(sk));
|
||||
ResetConnection(sk);
|
||||
return false;
|
||||
}
|
||||
switch (tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->nodeId = pq_getmsgint64_le(&s);
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
case 'v':
|
||||
{
|
||||
VoteResponse *msg = (VoteResponse *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->voteGiven = pq_getmsgint64_le(&s);
|
||||
msg->flushLsn = pq_getmsgint64_le(&s);
|
||||
msg->truncateLsn = pq_getmsgint64_le(&s);
|
||||
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
|
||||
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
|
||||
for (int i = 0; i < msg->termHistory.n_entries; i++)
|
||||
{
|
||||
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
|
||||
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
|
||||
}
|
||||
pq_getmsgint64_le(&s); /* timelineStartLsn */
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
case 'a':
|
||||
{
|
||||
AppendResponse *msg = (AppendResponse *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->flushLsn = pq_getmsgint64_le(&s);
|
||||
msg->commitLsn = pq_getmsgint64_le(&s);
|
||||
msg->hs.ts = pq_getmsgint64_le(&s);
|
||||
msg->hs.xmin.value = pq_getmsgint64_le(&s);
|
||||
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
|
||||
if (s.len > s.cursor)
|
||||
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
|
||||
else
|
||||
msg->ps_feedback.present = false;
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2245,3 +2531,45 @@ FormatEvents(WalProposer *wp, uint32 events)
|
||||
|
||||
return (char *) &return_str;
|
||||
}
|
||||
|
||||
/* Dump mconf as toml for observability / debugging. Result is palloc'ed. */
|
||||
static char *
|
||||
MembershipConfigurationToString(MembershipConfiguration *mconf)
|
||||
{
|
||||
StringInfoData s;
|
||||
uint32 i;
|
||||
|
||||
initStringInfo(&s);
|
||||
appendStringInfo(&s, "{gen = %u", mconf->generation);
|
||||
appendStringInfoString(&s, ", members = [");
|
||||
for (i = 0; i < mconf->members.len; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
appendStringInfoString(&s, ", ");
|
||||
appendStringInfo(&s, "{node_id = %lu", mconf->members.m[i].node_id);
|
||||
appendStringInfo(&s, ", host = %s", mconf->members.m[i].host);
|
||||
appendStringInfo(&s, ", port = %u }", mconf->members.m[i].port);
|
||||
}
|
||||
appendStringInfo(&s, "], new_members = [");
|
||||
for (i = 0; i < mconf->new_members.len; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
appendStringInfoString(&s, ", ");
|
||||
appendStringInfo(&s, "{node_id = %lu", mconf->new_members.m[i].node_id);
|
||||
appendStringInfo(&s, ", host = %s", mconf->new_members.m[i].host);
|
||||
appendStringInfo(&s, ", port = %u }", mconf->new_members.m[i].port);
|
||||
}
|
||||
appendStringInfoString(&s, "]}");
|
||||
return s.data;
|
||||
}
|
||||
|
||||
static void
|
||||
MembershipConfigurationFree(MembershipConfiguration *mconf)
|
||||
{
|
||||
if (mconf->members.m)
|
||||
pfree(mconf->members.m);
|
||||
mconf->members.m = NULL;
|
||||
if (mconf->new_members.m)
|
||||
pfree(mconf->new_members.m);
|
||||
mconf->new_members.m = NULL;
|
||||
}
|
||||
|
||||
@@ -12,9 +12,6 @@
|
||||
#include "neon_walreader.h"
|
||||
#include "pagestore_client.h"
|
||||
|
||||
#define SK_MAGIC 0xCafeCeefu
|
||||
#define SK_PROTOCOL_VERSION 2
|
||||
|
||||
#define MAX_SAFEKEEPERS 32
|
||||
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
|
||||
* message */
|
||||
@@ -143,12 +140,71 @@ typedef uint64 term_t;
|
||||
/* neon storage node id */
|
||||
typedef uint64 NNodeId;
|
||||
|
||||
/*
|
||||
* Number uniquely identifying safekeeper membership configuration.
|
||||
* This and following structs pair ones in membership.rs.
|
||||
*/
|
||||
typedef uint32 Generation;
|
||||
|
||||
typedef struct SafekeeperId
|
||||
{
|
||||
NNodeId node_id;
|
||||
char host[MAXCONNINFO];
|
||||
uint16 port;
|
||||
} SafekeeperId;
|
||||
|
||||
/* Set of safekeepers. */
|
||||
typedef struct MemberSet
|
||||
{
|
||||
uint32 len; /* number of members */
|
||||
SafekeeperId *m; /* ids themselves */
|
||||
} MemberSet;
|
||||
|
||||
/* Timeline safekeeper membership configuration. */
|
||||
typedef struct MembershipConfiguration
|
||||
{
|
||||
Generation generation;
|
||||
MemberSet members;
|
||||
/* Has 0 n_members in non joint conf. */
|
||||
MemberSet new_members;
|
||||
} MembershipConfiguration;
|
||||
|
||||
/*
|
||||
* Proposer <-> Acceptor messaging.
|
||||
*/
|
||||
|
||||
typedef struct ProposerAcceptorMessage
|
||||
{
|
||||
uint8 tag;
|
||||
} ProposerAcceptorMessage;
|
||||
|
||||
/* Initial Proposer -> Acceptor message */
|
||||
typedef struct ProposerGreeting
|
||||
{
|
||||
ProposerAcceptorMessage pam; /* message tag */
|
||||
|
||||
/*
|
||||
* tenant/timeline ids as C strings with standard hex notation for ease of
|
||||
* printing. In principle they are not strictly needed as ttid is also
|
||||
* passed as libpq options.
|
||||
*/
|
||||
char *tenant_id;
|
||||
char *timeline_id;
|
||||
/* Full conf is carried to allow safekeeper switch */
|
||||
MembershipConfiguration mconf;
|
||||
|
||||
/*
|
||||
* pg_version and wal_seg_size are used for timeline creation until we
|
||||
* fully migrate to doing externally. systemId is only used as a sanity
|
||||
* cross check.
|
||||
*/
|
||||
uint32 pg_version; /* in PG_VERSION_NUM format */
|
||||
uint64 system_id; /* Postgres system identifier. */
|
||||
uint32 wal_seg_size;
|
||||
} ProposerGreeting;
|
||||
|
||||
/* protocol v2 variant, kept while wp supports it */
|
||||
typedef struct ProposerGreetingV2
|
||||
{
|
||||
uint64 tag; /* message tag */
|
||||
uint32 protocolVersion; /* proposer-safekeeper protocol version */
|
||||
@@ -159,32 +215,42 @@ typedef struct ProposerGreeting
|
||||
uint8 tenant_id[16];
|
||||
TimeLineID timeline;
|
||||
uint32 walSegSize;
|
||||
} ProposerGreeting;
|
||||
} ProposerGreetingV2;
|
||||
|
||||
typedef struct AcceptorProposerMessage
|
||||
{
|
||||
uint64 tag;
|
||||
uint8 tag;
|
||||
} AcceptorProposerMessage;
|
||||
|
||||
/*
|
||||
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
|
||||
* Acceptor -> Proposer initial response: the highest term acceptor voted for,
|
||||
* its node id and configuration.
|
||||
*/
|
||||
typedef struct AcceptorGreeting
|
||||
{
|
||||
AcceptorProposerMessage apm;
|
||||
term_t term;
|
||||
NNodeId nodeId;
|
||||
MembershipConfiguration mconf;
|
||||
term_t term;
|
||||
} AcceptorGreeting;
|
||||
|
||||
/*
|
||||
* Proposer -> Acceptor vote request.
|
||||
*/
|
||||
typedef struct VoteRequest
|
||||
{
|
||||
ProposerAcceptorMessage pam; /* message tag */
|
||||
Generation generation; /* membership conf generation */
|
||||
term_t term;
|
||||
} VoteRequest;
|
||||
|
||||
/* protocol v2 variant, kept while wp supports it */
|
||||
typedef struct VoteRequestV2
|
||||
{
|
||||
uint64 tag;
|
||||
term_t term;
|
||||
pg_uuid_t proposerId; /* for monitoring/debugging */
|
||||
} VoteRequest;
|
||||
} VoteRequestV2;
|
||||
|
||||
/* Element of term switching chain. */
|
||||
typedef struct TermSwitchEntry
|
||||
@@ -203,8 +269,15 @@ typedef struct TermHistory
|
||||
typedef struct VoteResponse
|
||||
{
|
||||
AcceptorProposerMessage apm;
|
||||
|
||||
/*
|
||||
* Membership conf generation. It's not strictly required because on
|
||||
* mismatch safekeeper is expected to ERROR the connection, but let's
|
||||
* sanity check it.
|
||||
*/
|
||||
Generation generation;
|
||||
term_t term;
|
||||
uint64 voteGiven;
|
||||
uint8 voteGiven;
|
||||
|
||||
/*
|
||||
* Safekeeper flush_lsn (end of WAL) + history of term switches allow
|
||||
@@ -214,7 +287,6 @@ typedef struct VoteResponse
|
||||
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for*
|
||||
* recovery of some safekeeper */
|
||||
TermHistory termHistory;
|
||||
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
|
||||
} VoteResponse;
|
||||
|
||||
/*
|
||||
@@ -223,20 +295,37 @@ typedef struct VoteResponse
|
||||
*/
|
||||
typedef struct ProposerElected
|
||||
{
|
||||
uint64 tag;
|
||||
AcceptorProposerMessage apm;
|
||||
Generation generation; /* membership conf generation */
|
||||
term_t term;
|
||||
/* proposer will send since this point */
|
||||
XLogRecPtr startStreamingAt;
|
||||
/* history of term switches up to this proposer */
|
||||
TermHistory *termHistory;
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
} ProposerElected;
|
||||
|
||||
/*
|
||||
* Header of request with WAL message sent from proposer to safekeeper.
|
||||
*/
|
||||
typedef struct AppendRequestHeader
|
||||
{
|
||||
AcceptorProposerMessage apm;
|
||||
Generation generation; /* membership conf generation */
|
||||
term_t term; /* term of the proposer */
|
||||
XLogRecPtr beginLsn; /* start position of message in WAL */
|
||||
XLogRecPtr endLsn; /* end position of message in WAL */
|
||||
XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
|
||||
|
||||
/*
|
||||
* minimal LSN which may be needed for recovery of some safekeeper (end
|
||||
* lsn + 1 of last chunk streamed to everyone)
|
||||
*/
|
||||
XLogRecPtr truncateLsn;
|
||||
/* in the AppendRequest message, WAL data follows */
|
||||
} AppendRequestHeader;
|
||||
|
||||
/* protocol v2 variant, kept while wp supports it */
|
||||
typedef struct AppendRequestHeaderV2
|
||||
{
|
||||
uint64 tag;
|
||||
term_t term; /* term of the proposer */
|
||||
@@ -256,7 +345,8 @@ typedef struct AppendRequestHeader
|
||||
*/
|
||||
XLogRecPtr truncateLsn;
|
||||
pg_uuid_t proposerId; /* for monitoring/debugging */
|
||||
} AppendRequestHeader;
|
||||
/* in the AppendRequest message, WAL data follows */
|
||||
} AppendRequestHeaderV2;
|
||||
|
||||
/*
|
||||
* Hot standby feedback received from replica
|
||||
@@ -309,6 +399,13 @@ typedef struct AppendResponse
|
||||
{
|
||||
AcceptorProposerMessage apm;
|
||||
|
||||
/*
|
||||
* Membership conf generation. It's not strictly required because on
|
||||
* mismatch safekeeper is expected to ERROR the connection, but let's
|
||||
* sanity check it.
|
||||
*/
|
||||
Generation generation;
|
||||
|
||||
/*
|
||||
* Current term of the safekeeper; if it is higher than proposer's, the
|
||||
* compute is out of date.
|
||||
@@ -644,6 +741,8 @@ typedef struct WalProposerConfig
|
||||
/* Will be passed to safekeepers in greet request. */
|
||||
TimeLineID pgTimeline;
|
||||
|
||||
int proto_version;
|
||||
|
||||
#ifdef WALPROPOSER_LIB
|
||||
void *callback_data;
|
||||
#endif
|
||||
@@ -656,11 +755,14 @@ typedef struct WalProposerConfig
|
||||
typedef struct WalProposer
|
||||
{
|
||||
WalProposerConfig *config;
|
||||
int n_safekeepers;
|
||||
/* Current walproposer membership configuration */
|
||||
MembershipConfiguration mconf;
|
||||
|
||||
/* (n_safekeepers / 2) + 1 */
|
||||
int quorum;
|
||||
|
||||
/* Number of occupied slots in safekeepers[] */
|
||||
int n_safekeepers;
|
||||
Safekeeper safekeeper[MAX_SAFEKEEPERS];
|
||||
|
||||
/* WAL has been generated up to this point */
|
||||
@@ -670,6 +772,7 @@ typedef struct WalProposer
|
||||
XLogRecPtr commitLsn;
|
||||
|
||||
ProposerGreeting greetRequest;
|
||||
ProposerGreetingV2 greetRequestV2;
|
||||
|
||||
/* Vote request for safekeeper */
|
||||
VoteRequest voteRequest;
|
||||
|
||||
@@ -155,6 +155,16 @@ pq_getmsgend(StringInfo msg)
|
||||
ExceptionalCondition("invalid msg format", __FILE__, __LINE__);
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_sendbytes - append raw data to a StringInfo buffer
|
||||
* --------------------------------
|
||||
*/
|
||||
void
|
||||
pq_sendbytes(StringInfo buf, const void *data, int datalen)
|
||||
{
|
||||
/* use variant that maintains a trailing null-byte, out of caution */
|
||||
appendBinaryStringInfo(buf, data, datalen);
|
||||
}
|
||||
|
||||
/*
|
||||
* Produce a C-string representation of a TimestampTz.
|
||||
|
||||
@@ -59,9 +59,11 @@
|
||||
|
||||
#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot"
|
||||
|
||||
/* GUCs */
|
||||
char *wal_acceptors_list = "";
|
||||
int wal_acceptor_reconnect_timeout = 1000;
|
||||
int wal_acceptor_connection_timeout = 10000;
|
||||
int safekeeper_proto_version = 2;
|
||||
|
||||
/* Set to true in the walproposer bgw. */
|
||||
static bool am_walproposer;
|
||||
@@ -126,6 +128,7 @@ init_walprop_config(bool syncSafekeepers)
|
||||
else
|
||||
walprop_config.systemId = 0;
|
||||
walprop_config.pgTimeline = walprop_pg_get_timeline_id();
|
||||
walprop_config.proto_version = safekeeper_proto_version;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -219,25 +222,37 @@ nwp_register_gucs(void)
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.safekeeper_proto_version",
|
||||
"Version of compute <-> safekeeper protocol.",
|
||||
"Used while migrating from 2 to 3.",
|
||||
&safekeeper_proto_version,
|
||||
2, 0, INT_MAX,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
|
||||
{
|
||||
int n_safekeepers = 0;
|
||||
char *curr_sk = safekeepers_list;
|
||||
int n_safekeepers = 0;
|
||||
char *curr_sk = safekeepers_list;
|
||||
|
||||
for (char *coma = safekeepers_list; coma != NULL && *coma != '\0'; curr_sk = coma)
|
||||
{
|
||||
if (++n_safekeepers >= MAX_SAFEKEEPERS) {
|
||||
if (++n_safekeepers >= MAX_SAFEKEEPERS)
|
||||
{
|
||||
wpg_log(FATAL, "too many safekeepers");
|
||||
}
|
||||
|
||||
coma = strchr(coma, ',');
|
||||
safekeepers[n_safekeepers-1] = curr_sk;
|
||||
safekeepers[n_safekeepers - 1] = curr_sk;
|
||||
|
||||
if (coma != NULL) {
|
||||
if (coma != NULL)
|
||||
{
|
||||
*coma++ = '\0';
|
||||
}
|
||||
}
|
||||
@@ -252,10 +267,10 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
|
||||
static bool
|
||||
safekeepers_cmp(char *old, char *new)
|
||||
{
|
||||
char *safekeepers_old[MAX_SAFEKEEPERS];
|
||||
char *safekeepers_new[MAX_SAFEKEEPERS];
|
||||
int len_old = 0;
|
||||
int len_new = 0;
|
||||
char *safekeepers_old[MAX_SAFEKEEPERS];
|
||||
char *safekeepers_new[MAX_SAFEKEEPERS];
|
||||
int len_old = 0;
|
||||
int len_new = 0;
|
||||
|
||||
len_old = split_safekeepers_list(old, safekeepers_old);
|
||||
len_new = split_safekeepers_list(new, safekeepers_new);
|
||||
@@ -292,7 +307,8 @@ assign_neon_safekeepers(const char *newval, void *extra)
|
||||
if (!am_walproposer)
|
||||
return;
|
||||
|
||||
if (!newval) {
|
||||
if (!newval)
|
||||
{
|
||||
/* should never happen */
|
||||
wpg_log(FATAL, "neon.safekeepers is empty");
|
||||
}
|
||||
@@ -301,11 +317,11 @@ assign_neon_safekeepers(const char *newval, void *extra)
|
||||
newval_copy = pstrdup(newval);
|
||||
oldval = pstrdup(wal_acceptors_list);
|
||||
|
||||
/*
|
||||
/*
|
||||
* TODO: restarting through FATAL is stupid and introduces 1s delay before
|
||||
* next bgw start. We should refactor walproposer to allow graceful exit and
|
||||
* thus remove this delay.
|
||||
* XXX: If you change anything here, sync with test_safekeepers_reconfigure_reorder.
|
||||
* next bgw start. We should refactor walproposer to allow graceful exit
|
||||
* and thus remove this delay. XXX: If you change anything here, sync with
|
||||
* test_safekeepers_reconfigure_reorder.
|
||||
*/
|
||||
if (!safekeepers_cmp(oldval, newval_copy))
|
||||
{
|
||||
@@ -454,7 +470,8 @@ backpressure_throttling_impl(void)
|
||||
memcpy(new_status, old_status, len);
|
||||
snprintf(new_status + len, 64, "backpressure throttling: lag %lu", lag);
|
||||
set_ps_display(new_status);
|
||||
new_status[len] = '\0'; /* truncate off " backpressure ..." to later reset the ps */
|
||||
new_status[len] = '\0'; /* truncate off " backpressure ..." to later
|
||||
* reset the ps */
|
||||
|
||||
elog(DEBUG2, "backpressure throttling: lag %lu", lag);
|
||||
start = GetCurrentTimestamp();
|
||||
@@ -621,7 +638,7 @@ walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos)
|
||||
wpg_log(LOG, "WAL proposer starts streaming at %X/%X",
|
||||
LSN_FORMAT_ARGS(startpos));
|
||||
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
|
||||
cmd.timeline = wp->greetRequest.timeline;
|
||||
cmd.timeline = wp->config->pgTimeline;
|
||||
cmd.startpoint = startpos;
|
||||
StartProposerReplication(wp, &cmd);
|
||||
}
|
||||
@@ -1963,10 +1980,11 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||
FullTransactionId xmin = hsFeedback.xmin;
|
||||
FullTransactionId catalog_xmin = hsFeedback.catalog_xmin;
|
||||
FullTransactionId next_xid = ReadNextFullTransactionId();
|
||||
|
||||
/*
|
||||
* Page server is updating nextXid in checkpoint each 1024 transactions,
|
||||
* so feedback xmin can be actually larger then nextXid and
|
||||
* function TransactionIdInRecentPast return false in this case,
|
||||
* Page server is updating nextXid in checkpoint each 1024
|
||||
* transactions, so feedback xmin can be actually larger then nextXid
|
||||
* and function TransactionIdInRecentPast return false in this case,
|
||||
* preventing update of slot's xmin.
|
||||
*/
|
||||
if (FullTransactionIdPrecedes(next_xid, xmin))
|
||||
|
||||
@@ -52,16 +52,70 @@ pub struct SafekeeperPostgresHandler {
|
||||
|
||||
/// Parsed Postgres command.
|
||||
enum SafekeeperPostgresCommand {
|
||||
StartWalPush,
|
||||
StartReplication { start_lsn: Lsn, term: Option<Term> },
|
||||
StartWalPush {
|
||||
proto_version: u32,
|
||||
// Eventually timelines will be always created explicitly by storcon.
|
||||
// This option allows legacy behaviour for compute to do that until we
|
||||
// fully migrate.
|
||||
allow_timeline_creation: bool,
|
||||
},
|
||||
StartReplication {
|
||||
start_lsn: Lsn,
|
||||
term: Option<Term>,
|
||||
},
|
||||
IdentifySystem,
|
||||
TimelineStatus,
|
||||
JSONCtrl { cmd: AppendLogicalMessage },
|
||||
JSONCtrl {
|
||||
cmd: AppendLogicalMessage,
|
||||
},
|
||||
}
|
||||
|
||||
fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
if cmd.starts_with("START_WAL_PUSH") {
|
||||
Ok(SafekeeperPostgresCommand::StartWalPush)
|
||||
// Allow additional options in postgres START_REPLICATION style like
|
||||
// START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false').
|
||||
// Parsing here is very naive and breaks in case of commas or
|
||||
// whitespaces in values, but enough for our purposes.
|
||||
let re = Regex::new(r"START_WAL_PUSH(\s+?\((.*)\))?").unwrap();
|
||||
let caps = re
|
||||
.captures(cmd)
|
||||
.context(format!("failed to parse START_WAL_PUSH command {}", cmd))?;
|
||||
// capture () content
|
||||
let options = caps.get(2).map(|m| m.as_str()).unwrap_or("");
|
||||
// default values
|
||||
let mut proto_version = 2;
|
||||
let mut allow_timeline_creation = true;
|
||||
for kvstr in options.split(",") {
|
||||
if kvstr.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let mut kvit = kvstr.split_whitespace();
|
||||
let key = kvit.next().context(format!(
|
||||
"failed to parse key in kv {} in command {}",
|
||||
kvstr, cmd
|
||||
))?;
|
||||
let value = kvit.next().context(format!(
|
||||
"failed to parse value in kv {} in command {}",
|
||||
kvstr, cmd
|
||||
))?;
|
||||
let value_trimmed = value.trim_matches('\'');
|
||||
if key == "proto_version" {
|
||||
proto_version = value_trimmed.parse::<u32>().context(format!(
|
||||
"failed to parse proto_version value {} in command {}",
|
||||
value, cmd
|
||||
))?;
|
||||
}
|
||||
if key == "allow_timeline_creation" {
|
||||
allow_timeline_creation = value_trimmed.parse::<bool>().context(format!(
|
||||
"failed to parse allow_timeline_creation value {} in command {}",
|
||||
value, cmd
|
||||
))?;
|
||||
}
|
||||
}
|
||||
Ok(SafekeeperPostgresCommand::StartWalPush {
|
||||
proto_version,
|
||||
allow_timeline_creation,
|
||||
})
|
||||
} else if cmd.starts_with("START_REPLICATION") {
|
||||
let re = Regex::new(
|
||||
// We follow postgres START_REPLICATION LOGICAL options to pass term.
|
||||
@@ -95,7 +149,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
|
||||
fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
|
||||
SafekeeperPostgresCommand::StartWalPush { .. } => "START_WAL_PUSH",
|
||||
SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
|
||||
SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS",
|
||||
SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
|
||||
@@ -293,8 +347,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => {
|
||||
self.handle_start_wal_push(pgb)
|
||||
SafekeeperPostgresCommand::StartWalPush {
|
||||
proto_version,
|
||||
allow_timeline_creation,
|
||||
} => {
|
||||
self.handle_start_wal_push(pgb, proto_version, allow_timeline_creation)
|
||||
.instrument(info_span!("WAL receiver"))
|
||||
.await
|
||||
}
|
||||
@@ -467,3 +524,39 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::SafekeeperPostgresCommand;
|
||||
|
||||
/// Test parsing of START_WAL_PUSH command
|
||||
#[test]
|
||||
fn test_start_wal_push_parse() {
|
||||
let cmd = "START_WAL_PUSH";
|
||||
let parsed = super::parse_cmd(cmd).expect("failed to parse");
|
||||
match parsed {
|
||||
SafekeeperPostgresCommand::StartWalPush {
|
||||
proto_version,
|
||||
allow_timeline_creation,
|
||||
} => {
|
||||
assert_eq!(proto_version, 2);
|
||||
assert!(allow_timeline_creation);
|
||||
}
|
||||
_ => panic!("unexpected command"),
|
||||
}
|
||||
|
||||
let cmd =
|
||||
"START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false', unknown 'hoho')";
|
||||
let parsed = super::parse_cmd(cmd).expect("failed to parse");
|
||||
match parsed {
|
||||
SafekeeperPostgresCommand::StartWalPush {
|
||||
proto_version,
|
||||
allow_timeline_creation,
|
||||
} => {
|
||||
assert_eq!(proto_version, 3);
|
||||
assert!(!allow_timeline_creation);
|
||||
}
|
||||
_ => panic!("unexpected command"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
use anyhow::Context;
|
||||
use postgres_backend::QueryError;
|
||||
use safekeeper_api::membership::Configuration;
|
||||
use safekeeper_api::membership::{Configuration, INVALID_GENERATION};
|
||||
use safekeeper_api::{ServerInfo, Term};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -133,10 +133,10 @@ async fn send_proposer_elected(
|
||||
let history = TermHistory(history_entries);
|
||||
|
||||
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
generation: INVALID_GENERATION,
|
||||
term,
|
||||
start_streaming_at: lsn,
|
||||
term_history: history,
|
||||
timeline_start_lsn: lsn,
|
||||
});
|
||||
|
||||
tli.process_msg(&proposer_elected_request).await?;
|
||||
@@ -170,13 +170,12 @@ pub async fn append_logical_message(
|
||||
|
||||
let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
generation: INVALID_GENERATION,
|
||||
term: msg.term,
|
||||
term_start_lsn: begin_lsn,
|
||||
begin_lsn,
|
||||
end_lsn,
|
||||
commit_lsn,
|
||||
truncate_lsn: msg.truncate_lsn,
|
||||
proposer_uuid: [0u8; 16],
|
||||
},
|
||||
wal_data,
|
||||
});
|
||||
|
||||
@@ -200,9 +200,14 @@ impl SafekeeperPostgresHandler {
|
||||
pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
proto_version: u32,
|
||||
allow_timeline_creation: bool,
|
||||
) -> Result<(), QueryError> {
|
||||
let mut tli: Option<WalResidentTimeline> = None;
|
||||
if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await {
|
||||
if let Err(end) = self
|
||||
.handle_start_wal_push_guts(pgb, &mut tli, proto_version, allow_timeline_creation)
|
||||
.await
|
||||
{
|
||||
// Log the result and probably send it to the client, closing the stream.
|
||||
let handle_end_fut = pgb.handle_copy_stream_end(end);
|
||||
// If we managed to create the timeline, augment logging with current LSNs etc.
|
||||
@@ -222,6 +227,8 @@ impl SafekeeperPostgresHandler {
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
tli: &mut Option<WalResidentTimeline>,
|
||||
proto_version: u32,
|
||||
allow_timeline_creation: bool,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
// The `tli` parameter is only used for passing _out_ a timeline, one should
|
||||
// not have been passed in.
|
||||
@@ -250,12 +257,17 @@ impl SafekeeperPostgresHandler {
|
||||
conn_id: self.conn_id,
|
||||
pgb_reader: &mut pgb_reader,
|
||||
peer_addr,
|
||||
proto_version,
|
||||
acceptor_handle: &mut acceptor_handle,
|
||||
global_timelines: self.global_timelines.clone(),
|
||||
};
|
||||
|
||||
// Read first message and create timeline if needed.
|
||||
let res = network_reader.read_first_message().await;
|
||||
// Read first message and create timeline if needed and allowed. This
|
||||
// won't be when timelines will be always created by storcon and
|
||||
// allow_timeline_creation becomes false.
|
||||
let res = network_reader
|
||||
.read_first_message(allow_timeline_creation)
|
||||
.await;
|
||||
|
||||
let network_res = if let Ok((timeline, next_msg)) = res {
|
||||
let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
|
||||
@@ -269,7 +281,7 @@ impl SafekeeperPostgresHandler {
|
||||
tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
|
||||
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
|
||||
r = network_write(pgb, reply_rx, pageserver_feedback_rx, proto_version) => r,
|
||||
_ = timeline_cancel.cancelled() => {
|
||||
return Err(CopyStreamHandlerEnd::Cancelled);
|
||||
}
|
||||
@@ -313,6 +325,7 @@ struct NetworkReader<'a, IO> {
|
||||
conn_id: ConnectionId,
|
||||
pgb_reader: &'a mut PostgresBackendReader<IO>,
|
||||
peer_addr: SocketAddr,
|
||||
proto_version: u32,
|
||||
// WalAcceptor is spawned when we learn server info from walproposer and
|
||||
// create timeline; handle is put here.
|
||||
acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
|
||||
@@ -322,31 +335,37 @@ struct NetworkReader<'a, IO> {
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
|
||||
async fn read_first_message(
|
||||
&mut self,
|
||||
allow_timeline_creation: bool,
|
||||
) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
|
||||
// Receive information about server to create timeline, if not yet.
|
||||
let next_msg = read_message(self.pgb_reader).await?;
|
||||
let next_msg = read_message(self.pgb_reader, self.proto_version).await?;
|
||||
let tli = match next_msg {
|
||||
ProposerAcceptorMessage::Greeting(ref greeting) => {
|
||||
info!(
|
||||
"start handshake with walproposer {} sysid {} timeline {}",
|
||||
self.peer_addr, greeting.system_id, greeting.tli,
|
||||
"start handshake with walproposer {} sysid {}",
|
||||
self.peer_addr, greeting.system_id,
|
||||
);
|
||||
let server_info = ServerInfo {
|
||||
pg_version: greeting.pg_version,
|
||||
system_id: greeting.system_id,
|
||||
wal_seg_size: greeting.wal_seg_size,
|
||||
};
|
||||
let tli = self
|
||||
.global_timelines
|
||||
.create(
|
||||
self.ttid,
|
||||
Configuration::empty(),
|
||||
server_info,
|
||||
Lsn::INVALID,
|
||||
Lsn::INVALID,
|
||||
)
|
||||
.await
|
||||
.context("create timeline")?;
|
||||
let tli = if allow_timeline_creation {
|
||||
self.global_timelines
|
||||
.create(
|
||||
self.ttid,
|
||||
Configuration::empty(),
|
||||
server_info,
|
||||
Lsn::INVALID,
|
||||
Lsn::INVALID,
|
||||
)
|
||||
.await
|
||||
.context("create timeline")?
|
||||
} else {
|
||||
self.global_timelines
|
||||
.get(self.ttid)
|
||||
.context("get timeline")?
|
||||
};
|
||||
tli.wal_residence_guard().await?
|
||||
}
|
||||
_ => {
|
||||
@@ -375,7 +394,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
|
||||
));
|
||||
|
||||
// Forward all messages to WalAcceptor
|
||||
read_network_loop(self.pgb_reader, msg_tx, next_msg).await
|
||||
read_network_loop(self.pgb_reader, msg_tx, next_msg, self.proto_version).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -383,9 +402,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
|
||||
/// TODO: Return Ok(None) on graceful termination.
|
||||
async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
pgb_reader: &mut PostgresBackendReader<IO>,
|
||||
proto_version: u32,
|
||||
) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
|
||||
let copy_data = pgb_reader.read_copy_message().await?;
|
||||
let msg = ProposerAcceptorMessage::parse(copy_data)?;
|
||||
let msg = ProposerAcceptorMessage::parse(copy_data, proto_version)?;
|
||||
Ok(msg)
|
||||
}
|
||||
|
||||
@@ -393,6 +413,7 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
pgb_reader: &mut PostgresBackendReader<IO>,
|
||||
msg_tx: Sender<ProposerAcceptorMessage>,
|
||||
mut next_msg: ProposerAcceptorMessage,
|
||||
proto_version: u32,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
/// Threshold for logging slow WalAcceptor sends.
|
||||
const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
|
||||
@@ -425,7 +446,7 @@ async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
|
||||
WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);
|
||||
|
||||
next_msg = read_message(pgb_reader).await?;
|
||||
next_msg = read_message(pgb_reader, proto_version).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -438,6 +459,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
pgb_writer: &mut PostgresBackend<IO>,
|
||||
mut reply_rx: Receiver<AcceptorProposerMessage>,
|
||||
mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
|
||||
proto_version: u32,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let mut buf = BytesMut::with_capacity(128);
|
||||
|
||||
@@ -475,7 +497,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
};
|
||||
|
||||
buf.clear();
|
||||
msg.serialize(&mut buf)?;
|
||||
msg.serialize(&mut buf, proto_version)?;
|
||||
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::{fmt, pin::pin};
|
||||
use anyhow::{bail, Context};
|
||||
use futures::StreamExt;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use safekeeper_api::membership::INVALID_GENERATION;
|
||||
use safekeeper_api::models::{PeerInfo, TimelineStatus};
|
||||
use safekeeper_api::Term;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
@@ -267,7 +268,10 @@ async fn recover(
|
||||
);
|
||||
|
||||
// Now understand our term history.
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
|
||||
generation: INVALID_GENERATION,
|
||||
term: donor.term,
|
||||
});
|
||||
let vote_response = match tli
|
||||
.process_msg(&vote_request)
|
||||
.await
|
||||
@@ -302,10 +306,10 @@ async fn recover(
|
||||
|
||||
// truncate WAL locally
|
||||
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
generation: INVALID_GENERATION,
|
||||
term: donor.term,
|
||||
start_streaming_at: last_common_point.lsn,
|
||||
term_history: donor_th,
|
||||
timeline_start_lsn: Lsn::INVALID,
|
||||
});
|
||||
// Successful ProposerElected handling always returns None. If term changed,
|
||||
// we'll find out that during the streaming. Note: it is expected to get
|
||||
@@ -434,13 +438,12 @@ async fn network_io(
|
||||
match msg {
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
let ar_hdr = AppendRequestHeader {
|
||||
generation: INVALID_GENERATION,
|
||||
term: donor.term,
|
||||
term_start_lsn: Lsn::INVALID, // unused
|
||||
begin_lsn: Lsn(xlog_data.wal_start()),
|
||||
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
|
||||
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
|
||||
truncate_lsn: Lsn::INVALID, // do not attempt to advance
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let ar = AppendRequest {
|
||||
h: ar_hdr,
|
||||
|
||||
@@ -5,6 +5,11 @@ use byteorder::{LittleEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
|
||||
use safekeeper_api::membership;
|
||||
use safekeeper_api::membership::Generation;
|
||||
use safekeeper_api::membership::MemberSet;
|
||||
use safekeeper_api::membership::SafekeeperId;
|
||||
use safekeeper_api::membership::INVALID_GENERATION;
|
||||
use safekeeper_api::models::HotStandbyFeedback;
|
||||
use safekeeper_api::Term;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -12,6 +17,7 @@ use std::cmp::max;
|
||||
use std::cmp::min;
|
||||
use std::fmt;
|
||||
use std::io::Read;
|
||||
use std::str::FromStr;
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
|
||||
use tracing::*;
|
||||
@@ -29,7 +35,8 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
const SK_PROTOCOL_VERSION: u32 = 2;
|
||||
pub const SK_PROTO_VERSION_2: u32 = 2;
|
||||
pub const SK_PROTO_VERSION_3: u32 = 3;
|
||||
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
@@ -56,8 +63,28 @@ impl TermHistory {
|
||||
TermHistory(Vec::new())
|
||||
}
|
||||
|
||||
// Parse TermHistory as n_entries followed by TermLsn pairs
|
||||
// Parse TermHistory as n_entries followed by TermLsn pairs in network order.
|
||||
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
|
||||
let n_entries = bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "TermHistory misses len")?;
|
||||
let mut res = Vec::with_capacity(n_entries as usize);
|
||||
for i in 0..n_entries {
|
||||
let term = bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| format!("TermHistory pos {} misses term", i))?;
|
||||
let lsn = bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| format!("TermHistory pos {} misses lsn", i))?
|
||||
.into();
|
||||
res.push(TermLsn { term, lsn })
|
||||
}
|
||||
Ok(TermHistory(res))
|
||||
}
|
||||
|
||||
// Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
|
||||
// TODO remove once v2 protocol is fully dropped.
|
||||
pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
|
||||
if bytes.remaining() < 4 {
|
||||
bail!("TermHistory misses len");
|
||||
}
|
||||
@@ -197,6 +224,18 @@ impl AcceptorState {
|
||||
/// Initial Proposer -> Acceptor message
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ProposerGreeting {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub mconf: membership::Configuration,
|
||||
/// Postgres server version
|
||||
pub pg_version: u32,
|
||||
pub system_id: SystemId,
|
||||
pub wal_seg_size: u32,
|
||||
}
|
||||
|
||||
/// V2 of the message; exists as a struct because we (de)serialized it as is.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ProposerGreetingV2 {
|
||||
/// proposer-acceptor protocol version
|
||||
pub protocol_version: u32,
|
||||
/// Postgres server version
|
||||
@@ -213,27 +252,35 @@ pub struct ProposerGreeting {
|
||||
/// (acceptor voted for).
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct AcceptorGreeting {
|
||||
term: u64,
|
||||
node_id: NodeId,
|
||||
mconf: membership::Configuration,
|
||||
term: u64,
|
||||
}
|
||||
|
||||
/// Vote request sent from proposer to safekeepers
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct VoteRequest {
|
||||
pub generation: Generation,
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
/// V2 of the message; exists as a struct because we (de)serialized it as is.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct VoteRequestV2 {
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
/// Vote itself, sent from safekeeper to proposer
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct VoteResponse {
|
||||
generation: Generation, // membership conf generation
|
||||
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
vote_given: u64, // fixme u64 due to padding
|
||||
vote_given: bool,
|
||||
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
|
||||
// proposer to choose the most advanced one.
|
||||
pub flush_lsn: Lsn,
|
||||
truncate_lsn: Lsn,
|
||||
pub term_history: TermHistory,
|
||||
timeline_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -242,10 +289,10 @@ pub struct VoteResponse {
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
pub struct ProposerElected {
|
||||
pub generation: Generation, // membership conf generation
|
||||
pub term: Term,
|
||||
pub start_streaming_at: Lsn,
|
||||
pub term_history: TermHistory,
|
||||
pub timeline_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
@@ -257,6 +304,22 @@ pub struct AppendRequest {
|
||||
}
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct AppendRequestHeader {
|
||||
pub generation: Generation, // membership conf generation
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
/// start position of message in WAL
|
||||
pub begin_lsn: Lsn,
|
||||
/// end position of message in WAL
|
||||
pub end_lsn: Lsn,
|
||||
/// LSN committed by quorum of safekeepers
|
||||
pub commit_lsn: Lsn,
|
||||
/// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
|
||||
pub truncate_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// V2 of the message; exists as a struct because we (de)serialized it as is.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct AppendRequestHeaderV2 {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
// TODO: remove this field from the protocol, it in unused -- LSN of term
|
||||
@@ -277,6 +340,9 @@ pub struct AppendRequestHeader {
|
||||
/// Report safekeeper state to proposer
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct AppendResponse {
|
||||
// Membership conf generation. Not strictly required because on mismatch
|
||||
// connection is reset, but let's sanity check it.
|
||||
generation: Generation,
|
||||
// Current term of the safekeeper; if it is higher than proposer's, the
|
||||
// compute is out of date.
|
||||
pub term: Term,
|
||||
@@ -293,8 +359,9 @@ pub struct AppendResponse {
|
||||
}
|
||||
|
||||
impl AppendResponse {
|
||||
fn term_only(term: Term) -> AppendResponse {
|
||||
fn term_only(generation: Generation, term: Term) -> AppendResponse {
|
||||
AppendResponse {
|
||||
generation,
|
||||
term,
|
||||
flush_lsn: Lsn(0),
|
||||
commit_lsn: Lsn(0),
|
||||
@@ -315,65 +382,317 @@ pub enum ProposerAcceptorMessage {
|
||||
FlushWAL,
|
||||
}
|
||||
|
||||
/// Augment Bytes with fallible get_uN where N is number of bytes methods.
|
||||
/// All reads are in network (big endian) order.
|
||||
trait BytesF {
|
||||
fn get_u8_f(&mut self) -> Result<u8>;
|
||||
fn get_u16_f(&mut self) -> Result<u16>;
|
||||
fn get_u32_f(&mut self) -> Result<u32>;
|
||||
fn get_u64_f(&mut self) -> Result<u64>;
|
||||
}
|
||||
|
||||
impl BytesF for Bytes {
|
||||
fn get_u8_f(&mut self) -> Result<u8> {
|
||||
if self.is_empty() {
|
||||
bail!("no bytes left, expected 1");
|
||||
}
|
||||
Ok(self.get_u8())
|
||||
}
|
||||
fn get_u16_f(&mut self) -> Result<u16> {
|
||||
if self.is_empty() {
|
||||
bail!("no bytes left, expected 2");
|
||||
}
|
||||
Ok(self.get_u16())
|
||||
}
|
||||
fn get_u32_f(&mut self) -> Result<u32> {
|
||||
if self.remaining() < 4 {
|
||||
bail!("only {} bytes left, expected 4", self.remaining());
|
||||
}
|
||||
Ok(self.get_u32())
|
||||
}
|
||||
fn get_u64_f(&mut self) -> Result<u64> {
|
||||
if self.remaining() < 8 {
|
||||
bail!("only {} bytes left, expected 8", self.remaining());
|
||||
}
|
||||
Ok(self.get_u64())
|
||||
}
|
||||
}
|
||||
|
||||
impl ProposerAcceptorMessage {
|
||||
/// Parse proposer message.
|
||||
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
|
||||
// xxx using Reader is inefficient but easy to work with bincode
|
||||
let mut stream = msg_bytes.reader();
|
||||
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
|
||||
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
let msg = ProposerGreeting::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::Greeting(msg))
|
||||
}
|
||||
'v' => {
|
||||
let msg = VoteRequest::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::VoteRequest(msg))
|
||||
}
|
||||
'e' => {
|
||||
let mut msg_bytes = stream.into_inner();
|
||||
if msg_bytes.remaining() < 16 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let term = msg_bytes.get_u64_le();
|
||||
let start_streaming_at = msg_bytes.get_u64_le().into();
|
||||
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
|
||||
if msg_bytes.remaining() < 8 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let timeline_start_lsn = msg_bytes.get_u64_le().into();
|
||||
let msg = ProposerElected {
|
||||
term,
|
||||
start_streaming_at,
|
||||
timeline_start_lsn,
|
||||
term_history,
|
||||
/// Read cstring from Bytes.
|
||||
fn get_cstr(buf: &mut Bytes) -> Result<String> {
|
||||
let pos = buf
|
||||
.iter()
|
||||
.position(|x| *x == 0)
|
||||
.ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
|
||||
let result = buf.split_to(pos);
|
||||
buf.advance(1); // drop the null terminator
|
||||
match std::str::from_utf8(&result) {
|
||||
Ok(s) => Ok(s.to_string()),
|
||||
Err(e) => bail!("invalid utf8 in cstring: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Read membership::Configuration from Bytes.
|
||||
fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
|
||||
let generation = buf.get_u32_f().with_context(|| "reading generation")?;
|
||||
let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
|
||||
// Main member set must have at least someone in valid configuration.
|
||||
// Empty conf is allowed until we fully migrate.
|
||||
if generation != INVALID_GENERATION && members_len == 0 {
|
||||
bail!("empty members_len");
|
||||
}
|
||||
let mut members = MemberSet::empty();
|
||||
for i in 0..members_len {
|
||||
let id = buf
|
||||
.get_u64_f()
|
||||
.with_context(|| format!("reading member {} node_id", i))?;
|
||||
let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?;
|
||||
let pg_port = buf
|
||||
.get_u16_f()
|
||||
.with_context(|| format!("reading member {} port", i))?;
|
||||
let sk = SafekeeperId {
|
||||
id: NodeId(id),
|
||||
host,
|
||||
pg_port,
|
||||
};
|
||||
members.add(sk)?;
|
||||
}
|
||||
let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
|
||||
// Non joint conf.
|
||||
if new_members_len == 0 {
|
||||
Ok(membership::Configuration {
|
||||
generation,
|
||||
members,
|
||||
new_members: None,
|
||||
})
|
||||
} else {
|
||||
let mut new_members = MemberSet::empty();
|
||||
for i in 0..new_members_len {
|
||||
let id = buf
|
||||
.get_u64_f()
|
||||
.with_context(|| format!("reading new member {} node_id", i))?;
|
||||
let host = Self::get_cstr(buf)
|
||||
.with_context(|| format!("reading new member {} host", i))?;
|
||||
let pg_port = buf
|
||||
.get_u16_f()
|
||||
.with_context(|| format!("reading new member {} port", i))?;
|
||||
let sk = SafekeeperId {
|
||||
id: NodeId(id),
|
||||
host,
|
||||
pg_port,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Elected(msg))
|
||||
new_members.add(sk)?;
|
||||
}
|
||||
'a' => {
|
||||
// read header followed by wal data
|
||||
let hdr = AppendRequestHeader::des_from(&mut stream)?;
|
||||
let rec_size = hdr
|
||||
.end_lsn
|
||||
.checked_sub(hdr.begin_lsn)
|
||||
.context("begin_lsn > end_lsn in AppendRequest")?
|
||||
.0 as usize;
|
||||
if rec_size > MAX_SEND_SIZE {
|
||||
bail!(
|
||||
"AppendRequest is longer than MAX_SEND_SIZE ({})",
|
||||
MAX_SEND_SIZE
|
||||
);
|
||||
Ok(membership::Configuration {
|
||||
generation,
|
||||
members,
|
||||
new_members: Some(new_members),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse proposer message.
|
||||
pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
|
||||
if proto_version == SK_PROTO_VERSION_3 {
|
||||
if msg_bytes.is_empty() {
|
||||
bail!("ProposerAcceptorMessage is not complete: missing tag");
|
||||
}
|
||||
let tag = msg_bytes.get_u8_f().with_context(|| {
|
||||
"ProposerAcceptorMessage is not complete: missing tag".to_string()
|
||||
})? as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
let tenant_id_str =
|
||||
Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
|
||||
let tenant_id = TenantId::from_str(&tenant_id_str)?;
|
||||
let timeline_id_str =
|
||||
Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
|
||||
let timeline_id = TimelineId::from_str(&timeline_id_str)?;
|
||||
let mconf = Self::get_mconf(&mut msg_bytes)?;
|
||||
let pg_version = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading pg_version")?;
|
||||
let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
|
||||
let wal_seg_size = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading wal_seg_size")?;
|
||||
let g = ProposerGreeting {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
mconf,
|
||||
pg_version,
|
||||
system_id,
|
||||
wal_seg_size,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Greeting(g))
|
||||
}
|
||||
'v' => {
|
||||
let generation = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading generation")?;
|
||||
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
|
||||
let v = VoteRequest { generation, term };
|
||||
Ok(ProposerAcceptorMessage::VoteRequest(v))
|
||||
}
|
||||
'e' => {
|
||||
let generation = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading generation")?;
|
||||
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
|
||||
let start_streaming_at: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading start_streaming_at")?
|
||||
.into();
|
||||
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
|
||||
let msg = ProposerElected {
|
||||
generation,
|
||||
term,
|
||||
start_streaming_at,
|
||||
term_history,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Elected(msg))
|
||||
}
|
||||
'a' => {
|
||||
let generation = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading generation")?;
|
||||
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
|
||||
let begin_lsn: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading begin_lsn")?
|
||||
.into();
|
||||
let end_lsn: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading end_lsn")?
|
||||
.into();
|
||||
let commit_lsn: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading commit_lsn")?
|
||||
.into();
|
||||
let truncate_lsn: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading truncate_lsn")?
|
||||
.into();
|
||||
let hdr = AppendRequestHeader {
|
||||
generation,
|
||||
term,
|
||||
begin_lsn,
|
||||
end_lsn,
|
||||
commit_lsn,
|
||||
truncate_lsn,
|
||||
};
|
||||
let rec_size = hdr
|
||||
.end_lsn
|
||||
.checked_sub(hdr.begin_lsn)
|
||||
.context("begin_lsn > end_lsn in AppendRequest")?
|
||||
.0 as usize;
|
||||
if rec_size > MAX_SEND_SIZE {
|
||||
bail!(
|
||||
"AppendRequest is longer than MAX_SEND_SIZE ({})",
|
||||
MAX_SEND_SIZE
|
||||
);
|
||||
}
|
||||
if msg_bytes.remaining() < rec_size {
|
||||
bail!(
|
||||
"reading WAL: only {} bytes left, wanted {}",
|
||||
msg_bytes.remaining(),
|
||||
rec_size
|
||||
);
|
||||
}
|
||||
let wal_data = msg_bytes.copy_to_bytes(rec_size);
|
||||
let msg = AppendRequest { h: hdr, wal_data };
|
||||
|
||||
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
|
||||
stream.read_exact(&mut wal_data_vec)?;
|
||||
let wal_data = Bytes::from(wal_data_vec);
|
||||
let msg = AppendRequest { h: hdr, wal_data };
|
||||
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
}
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
}
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
// TODO remove proto_version == 3 after converting all msgs
|
||||
} else if proto_version == SK_PROTO_VERSION_2 || proto_version == SK_PROTO_VERSION_3 {
|
||||
// xxx using Reader is inefficient but easy to work with bincode
|
||||
let mut stream = msg_bytes.reader();
|
||||
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
|
||||
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
|
||||
let g = ProposerGreeting {
|
||||
tenant_id: msgv2.tenant_id,
|
||||
timeline_id: msgv2.timeline_id,
|
||||
mconf: membership::Configuration {
|
||||
generation: INVALID_GENERATION,
|
||||
members: MemberSet::empty(),
|
||||
new_members: None,
|
||||
},
|
||||
pg_version: msgv2.pg_version,
|
||||
system_id: msgv2.system_id,
|
||||
wal_seg_size: msgv2.wal_seg_size,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Greeting(g))
|
||||
}
|
||||
'v' => {
|
||||
let msg = VoteRequestV2::des_from(&mut stream)?;
|
||||
let v = VoteRequest {
|
||||
generation: INVALID_GENERATION,
|
||||
term: msg.term,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::VoteRequest(v))
|
||||
}
|
||||
'e' => {
|
||||
let mut msg_bytes = stream.into_inner();
|
||||
if msg_bytes.remaining() < 16 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let term = msg_bytes.get_u64_le();
|
||||
let start_streaming_at = msg_bytes.get_u64_le().into();
|
||||
let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
|
||||
if msg_bytes.remaining() < 8 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let _timeline_start_lsn = msg_bytes.get_u64_le();
|
||||
let msg = ProposerElected {
|
||||
generation: INVALID_GENERATION,
|
||||
term,
|
||||
start_streaming_at,
|
||||
term_history,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Elected(msg))
|
||||
}
|
||||
'a' => {
|
||||
// read header followed by wal data
|
||||
let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
|
||||
let hdr = AppendRequestHeader {
|
||||
generation: INVALID_GENERATION,
|
||||
term: hdrv2.term,
|
||||
begin_lsn: hdrv2.begin_lsn,
|
||||
end_lsn: hdrv2.end_lsn,
|
||||
commit_lsn: hdrv2.commit_lsn,
|
||||
truncate_lsn: hdrv2.truncate_lsn,
|
||||
};
|
||||
let rec_size = hdr
|
||||
.end_lsn
|
||||
.checked_sub(hdr.begin_lsn)
|
||||
.context("begin_lsn > end_lsn in AppendRequest")?
|
||||
.0 as usize;
|
||||
if rec_size > MAX_SEND_SIZE {
|
||||
bail!(
|
||||
"AppendRequest is longer than MAX_SEND_SIZE ({})",
|
||||
MAX_SEND_SIZE
|
||||
);
|
||||
}
|
||||
|
||||
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
|
||||
stream.read_exact(&mut wal_data_vec)?;
|
||||
let wal_data = Bytes::from(wal_data_vec);
|
||||
|
||||
let msg = AppendRequest { h: hdr, wal_data };
|
||||
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
}
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
}
|
||||
} else {
|
||||
bail!("unsupported protocol version {}", proto_version);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -387,36 +706,21 @@ impl ProposerAcceptorMessage {
|
||||
// We explicitly list all fields, to draw attention here when new fields are added.
|
||||
let mut size = BASE_SIZE;
|
||||
size += match self {
|
||||
Self::Greeting(ProposerGreeting {
|
||||
protocol_version: _,
|
||||
pg_version: _,
|
||||
proposer_id: _,
|
||||
system_id: _,
|
||||
timeline_id: _,
|
||||
tenant_id: _,
|
||||
tli: _,
|
||||
wal_seg_size: _,
|
||||
}) => 0,
|
||||
Self::Greeting(_) => 0,
|
||||
|
||||
Self::VoteRequest(VoteRequest { term: _ }) => 0,
|
||||
Self::VoteRequest(_) => 0,
|
||||
|
||||
Self::Elected(ProposerElected {
|
||||
term: _,
|
||||
start_streaming_at: _,
|
||||
term_history: _,
|
||||
timeline_start_lsn: _,
|
||||
}) => 0,
|
||||
Self::Elected(_) => 0,
|
||||
|
||||
Self::AppendRequest(AppendRequest {
|
||||
h:
|
||||
AppendRequestHeader {
|
||||
generation: _,
|
||||
term: _,
|
||||
term_start_lsn: _,
|
||||
begin_lsn: _,
|
||||
end_lsn: _,
|
||||
commit_lsn: _,
|
||||
truncate_lsn: _,
|
||||
proposer_uuid: _,
|
||||
},
|
||||
wal_data,
|
||||
}) => wal_data.len(),
|
||||
@@ -424,13 +728,12 @@ impl ProposerAcceptorMessage {
|
||||
Self::NoFlushAppendRequest(AppendRequest {
|
||||
h:
|
||||
AppendRequestHeader {
|
||||
generation: _,
|
||||
term: _,
|
||||
term_start_lsn: _,
|
||||
begin_lsn: _,
|
||||
end_lsn: _,
|
||||
commit_lsn: _,
|
||||
truncate_lsn: _,
|
||||
proposer_uuid: _,
|
||||
},
|
||||
wal_data,
|
||||
}) => wal_data.len(),
|
||||
@@ -451,45 +754,118 @@ pub enum AcceptorProposerMessage {
|
||||
}
|
||||
|
||||
impl AcceptorProposerMessage {
|
||||
/// Serialize acceptor -> proposer message.
|
||||
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
|
||||
match self {
|
||||
AcceptorProposerMessage::Greeting(msg) => {
|
||||
buf.put_u64_le('g' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.node_id.0);
|
||||
}
|
||||
AcceptorProposerMessage::VoteResponse(msg) => {
|
||||
buf.put_u64_le('v' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.vote_given);
|
||||
buf.put_u64_le(msg.flush_lsn.into());
|
||||
buf.put_u64_le(msg.truncate_lsn.into());
|
||||
buf.put_u32_le(msg.term_history.0.len() as u32);
|
||||
for e in &msg.term_history.0 {
|
||||
buf.put_u64_le(e.term);
|
||||
buf.put_u64_le(e.lsn.into());
|
||||
}
|
||||
buf.put_u64_le(msg.timeline_start_lsn.into());
|
||||
}
|
||||
AcceptorProposerMessage::AppendResponse(msg) => {
|
||||
buf.put_u64_le('a' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.flush_lsn.into());
|
||||
buf.put_u64_le(msg.commit_lsn.into());
|
||||
buf.put_i64_le(msg.hs_feedback.ts);
|
||||
buf.put_u64_le(msg.hs_feedback.xmin);
|
||||
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
|
||||
fn put_cstr(buf: &mut BytesMut, s: &str) {
|
||||
buf.put_slice(s.as_bytes());
|
||||
buf.put_u8(0); // null terminator
|
||||
}
|
||||
|
||||
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
|
||||
// if it is not present.
|
||||
if let Some(ref msg) = msg.pageserver_feedback {
|
||||
msg.serialize(buf);
|
||||
}
|
||||
}
|
||||
/// Serialize membership::Configuration into buf.
|
||||
fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
|
||||
buf.put_u32(mconf.generation);
|
||||
buf.put_u32(mconf.members.m.len() as u32);
|
||||
for sk in &mconf.members.m {
|
||||
buf.put_u64(sk.id.0);
|
||||
Self::put_cstr(buf, &sk.host);
|
||||
buf.put_u16(sk.pg_port);
|
||||
}
|
||||
if let Some(ref new_members) = mconf.new_members {
|
||||
buf.put_u32(new_members.m.len() as u32);
|
||||
for sk in &new_members.m {
|
||||
buf.put_u64(sk.id.0);
|
||||
Self::put_cstr(buf, &sk.host);
|
||||
buf.put_u16(sk.pg_port);
|
||||
}
|
||||
} else {
|
||||
buf.put_u32(0);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
/// Serialize acceptor -> proposer message.
|
||||
pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
|
||||
if proto_version == SK_PROTO_VERSION_3 {
|
||||
match self {
|
||||
AcceptorProposerMessage::Greeting(msg) => {
|
||||
buf.put_u8('g' as u8);
|
||||
buf.put_u64(msg.node_id.0);
|
||||
Self::serialize_mconf(buf, &msg.mconf);
|
||||
buf.put_u64(msg.term)
|
||||
}
|
||||
AcceptorProposerMessage::VoteResponse(msg) => {
|
||||
buf.put_u8('v' as u8);
|
||||
buf.put_u32(msg.generation);
|
||||
buf.put_u64(msg.term);
|
||||
buf.put_u8(msg.vote_given as u8);
|
||||
buf.put_u64(msg.flush_lsn.into());
|
||||
buf.put_u64(msg.truncate_lsn.into());
|
||||
buf.put_u32(msg.term_history.0.len() as u32);
|
||||
for e in &msg.term_history.0 {
|
||||
buf.put_u64(e.term);
|
||||
buf.put_u64(e.lsn.into());
|
||||
}
|
||||
}
|
||||
AcceptorProposerMessage::AppendResponse(msg) => {
|
||||
buf.put_u8('a' as u8);
|
||||
buf.put_u32(msg.generation);
|
||||
buf.put_u64(msg.term);
|
||||
buf.put_u64(msg.flush_lsn.into());
|
||||
buf.put_u64(msg.commit_lsn.into());
|
||||
buf.put_i64(msg.hs_feedback.ts);
|
||||
buf.put_u64(msg.hs_feedback.xmin);
|
||||
buf.put_u64(msg.hs_feedback.catalog_xmin);
|
||||
|
||||
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
|
||||
// if it is not present.
|
||||
if let Some(ref msg) = msg.pageserver_feedback {
|
||||
msg.serialize(buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
// TODO remove 3 after converting all msgs
|
||||
} else if proto_version == SK_PROTO_VERSION_2 {
|
||||
match self {
|
||||
AcceptorProposerMessage::Greeting(msg) => {
|
||||
buf.put_u64_le('g' as u64);
|
||||
// v2 didn't have mconf and fields were reordered
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.node_id.0);
|
||||
}
|
||||
AcceptorProposerMessage::VoteResponse(msg) => {
|
||||
// v2 didn't have generation, had u64 vote_given and timeline_start_lsn
|
||||
buf.put_u64_le('v' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.vote_given as u64);
|
||||
buf.put_u64_le(msg.flush_lsn.into());
|
||||
buf.put_u64_le(msg.truncate_lsn.into());
|
||||
buf.put_u32_le(msg.term_history.0.len() as u32);
|
||||
for e in &msg.term_history.0 {
|
||||
buf.put_u64_le(e.term);
|
||||
buf.put_u64_le(e.lsn.into());
|
||||
}
|
||||
// removed timeline_start_lsn
|
||||
buf.put_u64_le(0);
|
||||
}
|
||||
AcceptorProposerMessage::AppendResponse(msg) => {
|
||||
// v2 didn't have generation
|
||||
buf.put_u64_le('a' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.flush_lsn.into());
|
||||
buf.put_u64_le(msg.commit_lsn.into());
|
||||
buf.put_i64_le(msg.hs_feedback.ts);
|
||||
buf.put_u64_le(msg.hs_feedback.xmin);
|
||||
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
|
||||
|
||||
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
|
||||
// if it is not present.
|
||||
if let Some(ref msg) = msg.pageserver_feedback {
|
||||
msg.serialize(buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
bail!("unsupported protocol version {}", proto_version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -586,14 +962,6 @@ where
|
||||
&mut self,
|
||||
msg: &ProposerGreeting,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
// Check protocol compatibility
|
||||
if msg.protocol_version != SK_PROTOCOL_VERSION {
|
||||
bail!(
|
||||
"incompatible protocol version {}, expected {}",
|
||||
msg.protocol_version,
|
||||
SK_PROTOCOL_VERSION
|
||||
);
|
||||
}
|
||||
/* Postgres major version mismatch is treated as fatal error
|
||||
* because safekeepers parse WAL headers and the format
|
||||
* may change between versions.
|
||||
@@ -648,15 +1016,16 @@ where
|
||||
self.state.finish_change(&state).await?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"processed greeting from walproposer {}, sending term {:?}",
|
||||
msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
|
||||
self.state.acceptor_state.term
|
||||
);
|
||||
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
|
||||
term: self.state.acceptor_state.term,
|
||||
let apg = AcceptorGreeting {
|
||||
node_id: self.node_id,
|
||||
})))
|
||||
mconf: self.state.mconf.clone(),
|
||||
term: self.state.acceptor_state.term,
|
||||
};
|
||||
info!(
|
||||
"processed greeting {:?} from walproposer, sending {:?}",
|
||||
msg, apg
|
||||
);
|
||||
Ok(Some(AcceptorProposerMessage::Greeting(apg)))
|
||||
}
|
||||
|
||||
/// Give vote for the given term, if we haven't done that previously.
|
||||
@@ -677,12 +1046,12 @@ where
|
||||
self.wal_store.flush_wal().await?;
|
||||
// initialize with refusal
|
||||
let mut resp = VoteResponse {
|
||||
generation: self.state.mconf.generation,
|
||||
term: self.state.acceptor_state.term,
|
||||
vote_given: false as u64,
|
||||
vote_given: false,
|
||||
flush_lsn: self.flush_lsn(),
|
||||
truncate_lsn: self.state.inmem.peer_horizon_lsn,
|
||||
term_history: self.get_term_history(),
|
||||
timeline_start_lsn: self.state.timeline_start_lsn,
|
||||
};
|
||||
if self.state.acceptor_state.term < msg.term {
|
||||
let mut state = self.state.start_change();
|
||||
@@ -691,15 +1060,16 @@ where
|
||||
self.state.finish_change(&state).await?;
|
||||
|
||||
resp.term = self.state.acceptor_state.term;
|
||||
resp.vote_given = true as u64;
|
||||
resp.vote_given = true;
|
||||
}
|
||||
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
|
||||
info!("processed {:?}: sending {:?}", msg, &resp);
|
||||
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
|
||||
}
|
||||
|
||||
/// Form AppendResponse from current state.
|
||||
fn append_response(&self) -> AppendResponse {
|
||||
let ar = AppendResponse {
|
||||
generation: self.state.mconf.generation,
|
||||
term: self.state.acceptor_state.term,
|
||||
flush_lsn: self.flush_lsn(),
|
||||
commit_lsn: self.state.commit_lsn,
|
||||
@@ -798,18 +1168,22 @@ where
|
||||
// Here we learn initial LSN for the first time, set fields
|
||||
// interested in that.
|
||||
|
||||
if state.timeline_start_lsn == Lsn(0) {
|
||||
// Remember point where WAL begins globally.
|
||||
state.timeline_start_lsn = msg.timeline_start_lsn;
|
||||
info!(
|
||||
"setting timeline_start_lsn to {:?}",
|
||||
state.timeline_start_lsn
|
||||
);
|
||||
if let Some(start_lsn) = msg.term_history.0.first() {
|
||||
if state.timeline_start_lsn == Lsn(0) {
|
||||
// Remember point where WAL begins globally. In the future it
|
||||
// will be intialized immediately on timeline creation.
|
||||
state.timeline_start_lsn = start_lsn.lsn;
|
||||
info!(
|
||||
"setting timeline_start_lsn to {:?}",
|
||||
state.timeline_start_lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if state.peer_horizon_lsn == Lsn(0) {
|
||||
// Update peer_horizon_lsn as soon as we know where timeline starts.
|
||||
// It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
|
||||
state.peer_horizon_lsn = msg.timeline_start_lsn;
|
||||
state.peer_horizon_lsn = state.timeline_start_lsn;
|
||||
}
|
||||
if state.local_start_lsn == Lsn(0) {
|
||||
state.local_start_lsn = msg.start_streaming_at;
|
||||
@@ -889,7 +1263,10 @@ where
|
||||
|
||||
// If our term is higher, immediately refuse the message.
|
||||
if self.state.acceptor_state.term > msg.h.term {
|
||||
let resp = AppendResponse::term_only(self.state.acceptor_state.term);
|
||||
let resp = AppendResponse::term_only(
|
||||
self.state.mconf.generation,
|
||||
self.state.acceptor_state.term,
|
||||
);
|
||||
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
|
||||
}
|
||||
|
||||
@@ -917,10 +1294,8 @@ where
|
||||
);
|
||||
}
|
||||
|
||||
// Now we know that we are in the same term as the proposer,
|
||||
// processing the message.
|
||||
|
||||
self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
|
||||
// Now we know that we are in the same term as the proposer, process the
|
||||
// message.
|
||||
|
||||
// do the job
|
||||
if !msg.wal_data.is_empty() {
|
||||
|
||||
@@ -15,7 +15,9 @@ use desim::{
|
||||
};
|
||||
use http::Uri;
|
||||
use safekeeper::{
|
||||
safekeeper::{ProposerAcceptorMessage, SafeKeeper, UNKNOWN_SERVER_VERSION},
|
||||
safekeeper::{
|
||||
ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION,
|
||||
},
|
||||
state::{TimelinePersistentState, TimelineState},
|
||||
timeline::TimelineError,
|
||||
wal_storage::Storage,
|
||||
@@ -283,7 +285,7 @@ impl ConnState {
|
||||
bail!("finished processing START_REPLICATION")
|
||||
}
|
||||
|
||||
let msg = ProposerAcceptorMessage::parse(copy_data)?;
|
||||
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?;
|
||||
debug!("got msg: {:?}", msg);
|
||||
self.process(msg, global)
|
||||
} else {
|
||||
|
||||
@@ -6,9 +6,14 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
|
||||
|
||||
def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
|
||||
def check_tenant(
|
||||
env: NeonEnv, pageserver_http: PageserverHttpClient, safekeeper_proto_version: int
|
||||
):
|
||||
tenant_id, timeline_id = env.create_tenant()
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
config_lines = [
|
||||
f"neon.safekeeper_proto_version = {safekeeper_proto_version}",
|
||||
]
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=config_lines)
|
||||
# we rely upon autocommit after each statement
|
||||
res_1 = endpoint.safe_psql_many(
|
||||
queries=[
|
||||
@@ -33,7 +38,14 @@ def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
|
||||
|
||||
|
||||
@pytest.mark.parametrize("num_timelines,num_safekeepers", [(3, 1)])
|
||||
def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_safekeepers: int):
|
||||
# Test both proto versions until we fully migrate.
|
||||
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
|
||||
def test_normal_work(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
num_timelines: int,
|
||||
num_safekeepers: int,
|
||||
safekeeper_proto_version: int,
|
||||
):
|
||||
"""
|
||||
Basic test:
|
||||
* create new tenant with a timeline
|
||||
@@ -52,4 +64,4 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
for _ in range(num_timelines):
|
||||
check_tenant(env, pageserver_http)
|
||||
check_tenant(env, pageserver_http, safekeeper_proto_version)
|
||||
|
||||
Reference in New Issue
Block a user