compute <-> sk protocol v3 (#10647)

## Problem

As part of https://github.com/neondatabase/neon/issues/8614 we need to
pass membership configurations between compute and safekeepers.

## Summary of changes

Add version 3 of the protocol carrying membership configurations.
Greeting message in both sides gets full conf, and other messages
generation number only. Use protocol bump to include other accumulated
changes:
- stop packing whole structs on the wire as is;
- make the tag u8 instead of u64;
- send all ints in network order;
- drop proposer_uuid, we can pass it in START_WAL_PUSH and it wasn't
much useful anyway.
Per message changes, apart from mconf:
- ProposerGreeting: tenant / timeline id is sent now as hex cstring.
Remove proto version, it is passed outside in START_WAL_PUSH. Remove
postgres timeline, it is unused. Reorder fields a bit.
- AcceptorGreeting: reorder fields
- VoteResponse: timeline_start_lsn is removed. It can be taken from
first member of term history, and later we won't need it at all when all
timelines will be explicitly created. Vote itself is u8 instead of u64.
- ProposerElected: timeline_start_lsn is removed for the same reasons.
- AppendRequest: epoch_start_lsn removed, it is known from term history
in ProposerElected.

Both compute and sk are able to talk v2 and v3 to make rollbacks (in
case we need them) easier; neon.safekeeper_proto_version GUC sets the
client version. v2 code can be dropped later.

So far empty conf is passed everywhere, future PRs will handle them.

To test, add param to some tests choosing proto version; we want to test
both 2 and 3 until we fully migrate.

ref https://github.com/neondatabase/neon/issues/10326

---------

Co-authored-by: Arthur Petukhovsky <petuhovskiy@yandex.ru>
This commit is contained in:
Arseny Sher
2025-02-25 14:56:05 +03:00
committed by GitHub
parent 0d9a45a475
commit 758f597280
17 changed files with 1355 additions and 452 deletions

View File

@@ -68,14 +68,12 @@ impl Display for SafekeeperId {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(transparent)]
pub struct MemberSet {
pub members: Vec<SafekeeperId>,
pub m: Vec<SafekeeperId>,
}
impl MemberSet {
pub fn empty() -> Self {
MemberSet {
members: Vec::new(),
}
MemberSet { m: Vec::new() }
}
pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
@@ -83,11 +81,11 @@ impl MemberSet {
if hs.len() != members.len() {
bail!("duplicate safekeeper id in the set {:?}", members);
}
Ok(MemberSet { members })
Ok(MemberSet { m: members })
}
pub fn contains(&self, sk: &SafekeeperId) -> bool {
self.members.iter().any(|m| m.id == sk.id)
self.m.iter().any(|m| m.id == sk.id)
}
pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
@@ -97,7 +95,7 @@ impl MemberSet {
sk.id, self
));
}
self.members.push(sk);
self.m.push(sk);
Ok(())
}
}
@@ -105,11 +103,7 @@ impl MemberSet {
impl Display for MemberSet {
/// Display as a comma separated list of members.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let sks_str = self
.members
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>();
let sks_str = self.m.iter().map(|sk| sk.to_string()).collect::<Vec<_>>();
write!(f, "({})", sks_str.join(", "))
}
}

View File

@@ -215,6 +215,7 @@ impl Wrapper {
syncSafekeepers: config.sync_safekeepers,
systemId: 0,
pgTimeline: 1,
proto_version: 3,
callback_data,
};
let c_config = Box::into_raw(Box::new(c_config));
@@ -276,6 +277,7 @@ mod tests {
use core::panic;
use std::{
cell::Cell,
ffi::CString,
sync::{atomic::AtomicUsize, mpsc::sync_channel},
};
@@ -496,57 +498,64 @@ mod tests {
// Messages definitions are at walproposer.h
// xxx: it would be better to extract them from safekeeper crate and
// use serialization/deserialization here.
let greeting_tag = (b'g' as u64).to_ne_bytes();
let proto_version = 2_u32.to_ne_bytes();
let pg_version: [u8; 4] = PG_VERSION_NUM.to_ne_bytes();
let proposer_id = [0; 16];
let system_id = 0_u64.to_ne_bytes();
let tenant_id = ttid.tenant_id.as_arr();
let timeline_id = ttid.timeline_id.as_arr();
let pg_tli = 1_u32.to_ne_bytes();
let wal_seg_size = 16777216_u32.to_ne_bytes();
let greeting_tag = (b'g').to_be_bytes();
let tenant_id = CString::new(ttid.tenant_id.to_string())
.unwrap()
.into_bytes_with_nul();
let timeline_id = CString::new(ttid.timeline_id.to_string())
.unwrap()
.into_bytes_with_nul();
let mconf_gen = 0_u32.to_be_bytes();
let mconf_members_len = 0_u32.to_be_bytes();
let mconf_members_new_len = 0_u32.to_be_bytes();
let pg_version: [u8; 4] = PG_VERSION_NUM.to_be_bytes();
let system_id = 0_u64.to_be_bytes();
let wal_seg_size = 16777216_u32.to_be_bytes();
let proposer_greeting = [
greeting_tag.as_slice(),
proto_version.as_slice(),
pg_version.as_slice(),
proposer_id.as_slice(),
system_id.as_slice(),
tenant_id.as_slice(),
timeline_id.as_slice(),
pg_tli.as_slice(),
mconf_gen.as_slice(),
mconf_members_len.as_slice(),
mconf_members_new_len.as_slice(),
pg_version.as_slice(),
system_id.as_slice(),
wal_seg_size.as_slice(),
]
.concat();
let voting_tag = (b'v' as u64).to_ne_bytes();
let vote_request_term = 3_u64.to_ne_bytes();
let proposer_id = [0; 16];
let voting_tag = (b'v').to_be_bytes();
let vote_request_term = 3_u64.to_be_bytes();
let vote_request = [
voting_tag.as_slice(),
mconf_gen.as_slice(),
vote_request_term.as_slice(),
proposer_id.as_slice(),
]
.concat();
let acceptor_greeting_term = 2_u64.to_ne_bytes();
let acceptor_greeting_node_id = 1_u64.to_ne_bytes();
let acceptor_greeting_term = 2_u64.to_be_bytes();
let acceptor_greeting_node_id = 1_u64.to_be_bytes();
let acceptor_greeting = [
greeting_tag.as_slice(),
acceptor_greeting_term.as_slice(),
acceptor_greeting_node_id.as_slice(),
mconf_gen.as_slice(),
mconf_members_len.as_slice(),
mconf_members_new_len.as_slice(),
acceptor_greeting_term.as_slice(),
]
.concat();
let vote_response_term = 3_u64.to_ne_bytes();
let vote_given = 1_u64.to_ne_bytes();
let flush_lsn = 0x539_u64.to_ne_bytes();
let truncate_lsn = 0x539_u64.to_ne_bytes();
let th_len = 1_u32.to_ne_bytes();
let th_term = 2_u64.to_ne_bytes();
let th_lsn = 0x539_u64.to_ne_bytes();
let timeline_start_lsn = 0x539_u64.to_ne_bytes();
let vote_response_term = 3_u64.to_be_bytes();
let vote_given = 1_u8.to_be_bytes();
let flush_lsn = 0x539_u64.to_be_bytes();
let truncate_lsn = 0x539_u64.to_be_bytes();
let th_len = 1_u32.to_be_bytes();
let th_term = 2_u64.to_be_bytes();
let th_lsn = 0x539_u64.to_be_bytes();
let vote_response = [
voting_tag.as_slice(),
mconf_gen.as_slice(),
vote_response_term.as_slice(),
vote_given.as_slice(),
flush_lsn.as_slice(),
@@ -554,7 +563,6 @@ mod tests {
th_len.as_slice(),
th_term.as_slice(),
th_lsn.as_slice(),
timeline_start_lsn.as_slice(),
]
.concat();

View File

@@ -51,6 +51,26 @@ HexDecodeString(uint8 *result, char *input, int nbytes)
return true;
}
/* --------------------------------
* pq_getmsgint16 - get a binary 2-byte int from a message buffer
* --------------------------------
*/
uint16
pq_getmsgint16(StringInfo msg)
{
return pq_getmsgint(msg, 2);
}
/* --------------------------------
* pq_getmsgint32 - get a binary 4-byte int from a message buffer
* --------------------------------
*/
uint32
pq_getmsgint32(StringInfo msg)
{
return pq_getmsgint(msg, 4);
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------

View File

@@ -8,6 +8,8 @@
#endif
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint16 pq_getmsgint16(StringInfo msg);
uint32 pq_getmsgint32(StringInfo msg);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);

View File

@@ -70,6 +70,7 @@ static bool SendAppendRequests(Safekeeper *sk);
static bool RecvAppendResponses(Safekeeper *sk);
static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp);
static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp);
static void PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version);
static void HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk);
static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size);
static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg);
@@ -81,6 +82,8 @@ static char *FormatSafekeeperState(Safekeeper *sk);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static char *FormatEvents(WalProposer *wp, uint32 events);
static void UpdateDonorShmem(WalProposer *wp);
static char *MembershipConfigurationToString(MembershipConfiguration *mconf);
static void MembershipConfigurationFree(MembershipConfiguration *mconf);
WalProposer *
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
@@ -137,25 +140,21 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
wp->quorum = wp->n_safekeepers / 2 + 1;
if (wp->config->proto_version != 2 && wp->config->proto_version != 3)
wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version);
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
/* Fill the greeting package */
wp->greetRequest.tag = 'g';
wp->greetRequest.protocolVersion = SK_PROTOCOL_VERSION;
wp->greetRequest.pgVersion = PG_VERSION_NUM;
wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId));
wp->greetRequest.systemId = wp->config->systemId;
if (!wp->config->neon_timeline)
wp_log(FATAL, "neon.timeline_id is not provided");
if (*wp->config->neon_timeline != '\0' &&
!HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16))
wp_log(FATAL, "could not parse neon.timeline_id, %s", wp->config->neon_timeline);
wp->greetRequest.pam.tag = 'g';
if (!wp->config->neon_tenant)
wp_log(FATAL, "neon.tenant_id is not provided");
if (*wp->config->neon_tenant != '\0' &&
!HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16))
wp_log(FATAL, "could not parse neon.tenant_id, %s", wp->config->neon_tenant);
wp->greetRequest.timeline = wp->config->pgTimeline;
wp->greetRequest.walSegSize = wp->config->wal_segment_size;
wp->greetRequest.tenant_id = wp->config->neon_tenant;
if (!wp->config->neon_timeline)
wp_log(FATAL, "neon.timeline_id is not provided");
wp->greetRequest.timeline_id = wp->config->neon_timeline;
wp->greetRequest.pg_version = PG_VERSION_NUM;
wp->greetRequest.system_id = wp->config->systemId;
wp->greetRequest.wal_seg_size = wp->config->wal_segment_size;
wp->api.init_event_set(wp);
@@ -165,12 +164,14 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
void
WalProposerFree(WalProposer *wp)
{
MembershipConfigurationFree(&wp->mconf);
for (int i = 0; i < wp->n_safekeepers; i++)
{
Safekeeper *sk = &wp->safekeeper[i];
Assert(sk->outbuf.data != NULL);
pfree(sk->outbuf.data);
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
sk->voteResponse.termHistory.entries = NULL;
@@ -308,6 +309,7 @@ ShutdownConnection(Safekeeper *sk)
sk->state = SS_OFFLINE;
sk->streamingAt = InvalidXLogRecPtr;
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
sk->voteResponse.termHistory.entries = NULL;
@@ -598,11 +600,14 @@ static void
SendStartWALPush(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
#define CMD_LEN 512
char cmd[CMD_LEN];
if (!wp->api.conn_send_query(sk, "START_WAL_PUSH"))
snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version);
if (!wp->api.conn_send_query(sk, cmd))
{
wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to send '%s' query to safekeeper %s:%s: %s",
cmd, sk->host, sk->port, wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return;
}
@@ -658,23 +663,33 @@ RecvStartWALPushResult(Safekeeper *sk)
/*
* Start handshake: first of all send information about the
* safekeeper. After sending, we wait on SS_HANDSHAKE_RECV for
* walproposer. After sending, we wait on SS_HANDSHAKE_RECV for
* a response to finish the handshake.
*/
static void
SendProposerGreeting(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
char *mconf_toml = MembershipConfigurationToString(&wp->greetRequest.mconf);
wp_log(LOG, "sending ProposerGreeting to safekeeper %s:%s with mconf = %s", sk->host, sk->port, mconf_toml);
pfree(mconf_toml);
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->greetRequest,
&sk->outbuf, wp->config->proto_version);
/*
* On failure, logging & resetting the connection is handled. We just need
* to handle the control flow.
*/
BlockingWrite(sk, &sk->wp->greetRequest, sizeof(sk->wp->greetRequest), SS_HANDSHAKE_RECV);
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
}
static void
RecvAcceptorGreeting(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
char *mconf_toml;
/*
* If our reading doesn't immediately succeed, any necessary error
@@ -685,7 +700,10 @@ RecvAcceptorGreeting(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse))
return;
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, term=" INT64_FORMAT, sk->host, sk->port, sk->greetResponse.term);
mconf_toml = MembershipConfigurationToString(&sk->greetResponse.mconf);
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, node_id = %lu, mconf = %s, term=" UINT64_FORMAT,
sk->host, sk->port, sk->greetResponse.nodeId, mconf_toml, sk->greetResponse.term);
pfree(mconf_toml);
/* Protocol is all good, move to voting. */
sk->state = SS_VOTING;
@@ -707,12 +725,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
wp->propTerm++;
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
wp->voteRequest = (VoteRequest)
{
.tag = 'v',
.term = wp->propTerm
};
memcpy(wp->voteRequest.proposerId.data, wp->greetRequest.proposerId.data, UUID_LEN);
wp->voteRequest.pam.tag = 'v';
wp->voteRequest.generation = wp->mconf.generation;
wp->voteRequest.term = wp->propTerm;
}
}
else if (sk->greetResponse.term > wp->propTerm)
@@ -759,12 +774,14 @@ SendVoteRequest(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
/* We have quorum for voting, send our vote request */
wp_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term);
/* On failure, logging & resetting is handled */
if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT))
return;
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->voteRequest,
&sk->outbuf, wp->config->proto_version);
/* We have quorum for voting, send our vote request */
wp_log(LOG, "requesting vote from %s:%s for generation %u term " UINT64_FORMAT, sk->host, sk->port,
wp->voteRequest.generation, wp->voteRequest.term);
/* On failure, logging & resetting is handled */
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_WAIT_VERDICT);
/* If successful, wait for read-ready with SS_WAIT_VERDICT */
}
@@ -778,11 +795,12 @@ RecvVoteResponse(Safekeeper *sk)
return;
wp_log(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
"got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term,
sk->voteResponse.voteGiven,
GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn));
/*
* In case of acceptor rejecting our vote, bail out, but only if either it
@@ -847,9 +865,9 @@ HandleElectedProposer(WalProposer *wp)
* otherwise we must be sync-safekeepers and we have nothing to do then.
*
* Proceeding is not only pointless but harmful, because we'd give
* safekeepers term history starting with 0/0. These hacks will go away once
* we disable implicit timeline creation on safekeepers and create it with
* non zero LSN from the start.
* safekeepers term history starting with 0/0. These hacks will go away
* once we disable implicit timeline creation on safekeepers and create it
* with non zero LSN from the start.
*/
if (wp->propEpochStartLsn == InvalidXLogRecPtr)
{
@@ -942,7 +960,6 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->propEpochStartLsn = InvalidXLogRecPtr;
wp->donorEpoch = 0;
wp->truncateLsn = InvalidXLogRecPtr;
wp->timelineStartLsn = InvalidXLogRecPtr;
for (int i = 0; i < wp->n_safekeepers; i++)
{
@@ -959,20 +976,6 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->donor = i;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
if (wp->safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr)
{
/* timelineStartLsn should be the same everywhere or unknown */
if (wp->timelineStartLsn != InvalidXLogRecPtr &&
wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn)
{
wp_log(WARNING,
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
LSN_FORMAT_ARGS(wp->timelineStartLsn),
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
}
wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn;
}
}
}
@@ -995,22 +998,11 @@ DetermineEpochStartLsn(WalProposer *wp)
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
{
wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp);
if (wp->timelineStartLsn == InvalidXLogRecPtr)
{
wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp);
}
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
}
pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn);
/*
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so
* it should never be zero at this point, if we know timelineStartLsn.
*
* timelineStartLsn can be zero only on the first syncSafekeepers run.
*/
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
(wp->config->syncSafekeepers && wp->truncateLsn == wp->timelineStartLsn));
Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers);
/*
* We will be generating WAL since propEpochStartLsn, so we should set
@@ -1053,10 +1045,11 @@ DetermineEpochStartLsn(WalProposer *wp)
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
{
/*
* However, allow to proceed if last_log_term on the node which gave
* the highest vote (i.e. point where we are going to start writing)
* actually had been won by me; plain restart of walproposer not
* intervened by concurrent compute which wrote WAL is ok.
* However, allow to proceed if last_log_term on the node which
* gave the highest vote (i.e. point where we are going to start
* writing) actually had been won by me; plain restart of
* walproposer not intervened by concurrent compute which wrote
* WAL is ok.
*
* This avoids compute crash after manual term_bump.
*/
@@ -1126,14 +1119,8 @@ SendProposerElected(Safekeeper *sk)
{
/* safekeeper is empty or no common point, start from the beginning */
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u",
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
/*
* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline
* is created manually (test_s3_wal_replay)
*/
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, termHistory.n_entries=%u",
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), wp->propTermHistory.n_entries);
}
else
{
@@ -1158,29 +1145,19 @@ SendProposerElected(Safekeeper *sk)
Assert(sk->startStreamingAt <= wp->availableLsn);
msg.tag = 'e';
msg.apm.tag = 'e';
msg.generation = wp->mconf.generation;
msg.term = wp->propTerm;
msg.startStreamingAt = sk->startStreamingAt;
msg.termHistory = &wp->propTermHistory;
msg.timelineStartLsn = wp->timelineStartLsn;
lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0;
wp_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
resetStringInfo(&sk->outbuf);
pq_sendint64_le(&sk->outbuf, msg.tag);
pq_sendint64_le(&sk->outbuf, msg.term);
pq_sendint64_le(&sk->outbuf, msg.startStreamingAt);
pq_sendint32_le(&sk->outbuf, msg.termHistory->n_entries);
for (int i = 0; i < msg.termHistory->n_entries; i++)
{
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].term);
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].lsn);
}
pq_sendint64_le(&sk->outbuf, msg.timelineStartLsn);
"sending elected msg to node " UINT64_FORMAT " generation=%u term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s",
sk->greetResponse.nodeId, msg.generation, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt),
lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port);
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version);
if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH))
return;
@@ -1246,14 +1223,13 @@ static void
PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn)
{
Assert(endLsn >= beginLsn);
req->tag = 'a';
req->apm.tag = 'a';
req->generation = wp->mconf.generation;
req->term = wp->propTerm;
req->epochStartLsn = wp->propEpochStartLsn;
req->beginLsn = beginLsn;
req->endLsn = endLsn;
req->commitLsn = wp->commitLsn;
req->truncateLsn = wp->truncateLsn;
req->proposerId = wp->greetRequest.proposerId;
}
/*
@@ -1354,7 +1330,8 @@ SendAppendRequests(Safekeeper *sk)
resetStringInfo(&sk->outbuf);
/* write AppendRequest header */
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
PAMessageSerialize(wp, (ProposerAcceptorMessage *) req, &sk->outbuf, wp->config->proto_version);
/* prepare for reading WAL into the outbuf */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
sk->active_state = SS_ACTIVE_READ_WAL;
}
@@ -1367,14 +1344,17 @@ SendAppendRequests(Safekeeper *sk)
req = &sk->appendRequest;
req_len = req->endLsn - req->beginLsn;
/* We send zero sized AppenRequests as heartbeats; don't wal_read for these. */
/*
* We send zero sized AppenRequests as heartbeats; don't wal_read
* for these.
*/
if (req_len > 0)
{
switch (wp->api.wal_read(sk,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req_len,
&errmsg))
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req_len,
&errmsg))
{
case NEON_WALREAD_SUCCESS:
break;
@@ -1382,7 +1362,7 @@ SendAppendRequests(Safekeeper *sk)
return true;
case NEON_WALREAD_ERROR:
wp_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port, errmsg);
sk->host, sk->port, errmsg);
ShutdownConnection(sk);
return false;
default:
@@ -1470,11 +1450,11 @@ RecvAppendResponses(Safekeeper *sk)
* Term has changed to higher one, probably another compute is
* running. If this is the case we could PANIC as well because
* likely it inserted some data and our basebackup is unsuitable
* anymore. However, we also bump term manually (term_bump endpoint)
* on safekeepers for migration purposes, in this case we do want
* compute to stay alive. So restart walproposer with FATAL instead
* of panicking; if basebackup is spoiled next election will notice
* this.
* anymore. However, we also bump term manually (term_bump
* endpoint) on safekeepers for migration purposes, in this case
* we do want compute to stay alive. So restart walproposer with
* FATAL instead of panicking; if basebackup is spoiled next
* election will notice this.
*/
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
sk->host, sk->port,
@@ -1509,7 +1489,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
for (i = 0; i < nkeys; i++)
{
const char *key = pq_getmsgstring(reply_message);
const char *key = pq_getmsgrawstring(reply_message);
unsigned int value_len = pq_getmsgint(reply_message, sizeof(int32));
if (strcmp(key, "current_timeline_size") == 0)
@@ -1750,6 +1730,208 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
}
}
/* Serialize MembershipConfiguration into buf. */
static void
MembershipConfigurationSerialize(MembershipConfiguration *mconf, StringInfo buf)
{
uint32 i;
pq_sendint32(buf, mconf->generation);
pq_sendint32(buf, mconf->members.len);
for (i = 0; i < mconf->members.len; i++)
{
pq_sendint64(buf, mconf->members.m[i].node_id);
pq_send_ascii_string(buf, mconf->members.m[i].host);
pq_sendint16(buf, mconf->members.m[i].port);
}
/*
* There is no special mark for absent new_members; zero members in
* invalid, so zero len means absent.
*/
pq_sendint32(buf, mconf->new_members.len);
for (i = 0; i < mconf->new_members.len; i++)
{
pq_sendint64(buf, mconf->new_members.m[i].node_id);
pq_send_ascii_string(buf, mconf->new_members.m[i].host);
pq_sendint16(buf, mconf->new_members.m[i].port);
}
}
/* Serialize proposer -> acceptor message into buf using specified version */
static void
PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version)
{
/* both version are supported currently until we fully migrate to 3 */
Assert(proto_version == 3 || proto_version == 2);
resetStringInfo(buf);
if (proto_version == 3)
{
/*
* v2 sends structs for some messages as is, so commonly send tag only
* for v3
*/
pq_sendint8(buf, msg->tag);
switch (msg->tag)
{
case 'g':
{
ProposerGreeting *m = (ProposerGreeting *) msg;
pq_send_ascii_string(buf, m->tenant_id);
pq_send_ascii_string(buf, m->timeline_id);
MembershipConfigurationSerialize(&m->mconf, buf);
pq_sendint32(buf, m->pg_version);
pq_sendint64(buf, m->system_id);
pq_sendint32(buf, m->wal_seg_size);
break;
}
case 'v':
{
VoteRequest *m = (VoteRequest *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
break;
}
case 'e':
{
ProposerElected *m = (ProposerElected *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
pq_sendint64(buf, m->startStreamingAt);
pq_sendint32(buf, m->termHistory->n_entries);
for (uint32 i = 0; i < m->termHistory->n_entries; i++)
{
pq_sendint64(buf, m->termHistory->entries[i].term);
pq_sendint64(buf, m->termHistory->entries[i].lsn);
}
break;
}
case 'a':
{
/*
* Note: this serializes only AppendRequestHeader, caller
* is expected to append WAL data later.
*/
AppendRequestHeader *m = (AppendRequestHeader *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
pq_sendint64(buf, m->beginLsn);
pq_sendint64(buf, m->endLsn);
pq_sendint64(buf, m->commitLsn);
pq_sendint64(buf, m->truncateLsn);
break;
}
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
}
return;
}
if (proto_version == 2)
{
switch (msg->tag)
{
case 'g':
{
/* v2 sent struct as is */
ProposerGreeting *m = (ProposerGreeting *) msg;
ProposerGreetingV2 greetRequestV2;
/* Fill also v2 struct. */
greetRequestV2.tag = 'g';
greetRequestV2.protocolVersion = proto_version;
greetRequestV2.pgVersion = m->pg_version;
/*
* v3 removed this field because it's easier to pass as
* libq or START_WAL_PUSH options
*/
memset(&greetRequestV2.proposerId, 0, sizeof(greetRequestV2.proposerId));
greetRequestV2.systemId = wp->config->systemId;
if (*m->timeline_id != '\0' &&
!HexDecodeString(greetRequestV2.timeline_id, m->timeline_id, 16))
wp_log(FATAL, "could not parse neon.timeline_id, %s", m->timeline_id);
if (*m->tenant_id != '\0' &&
!HexDecodeString(greetRequestV2.tenant_id, m->tenant_id, 16))
wp_log(FATAL, "could not parse neon.tenant_id, %s", m->tenant_id);
greetRequestV2.timeline = wp->config->pgTimeline;
greetRequestV2.walSegSize = wp->config->wal_segment_size;
pq_sendbytes(buf, (char *) &greetRequestV2, sizeof(greetRequestV2));
break;
}
case 'v':
{
/* v2 sent struct as is */
VoteRequest *m = (VoteRequest *) msg;
VoteRequestV2 voteRequestV2;
voteRequestV2.tag = m->pam.tag;
voteRequestV2.term = m->term;
/* removed field */
memset(&voteRequestV2.proposerId, 0, sizeof(voteRequestV2.proposerId));
pq_sendbytes(buf, (char *) &voteRequestV2, sizeof(voteRequestV2));
break;
}
case 'e':
{
ProposerElected *m = (ProposerElected *) msg;
pq_sendint64_le(buf, m->apm.tag);
pq_sendint64_le(buf, m->term);
pq_sendint64_le(buf, m->startStreamingAt);
pq_sendint32_le(buf, m->termHistory->n_entries);
for (int i = 0; i < m->termHistory->n_entries; i++)
{
pq_sendint64_le(buf, m->termHistory->entries[i].term);
pq_sendint64_le(buf, m->termHistory->entries[i].lsn);
}
pq_sendint64_le(buf, 0); /* removed timeline_start_lsn */
break;
}
case 'a':
/*
* Note: this serializes only AppendRequestHeader, caller is
* expected to append WAL data later.
*/
{
/* v2 sent struct as is */
AppendRequestHeader *m = (AppendRequestHeader *) msg;
AppendRequestHeaderV2 appendRequestHeaderV2;
appendRequestHeaderV2.tag = m->apm.tag;
appendRequestHeaderV2.term = m->term;
appendRequestHeaderV2.epochStartLsn = 0; /* removed field */
appendRequestHeaderV2.beginLsn = m->beginLsn;
appendRequestHeaderV2.endLsn = m->endLsn;
appendRequestHeaderV2.commitLsn = m->commitLsn;
appendRequestHeaderV2.truncateLsn = m->truncateLsn;
/* removed field */
memset(&appendRequestHeaderV2.proposerId, 0, sizeof(appendRequestHeaderV2.proposerId));
pq_sendbytes(buf, (char *) &appendRequestHeaderV2, sizeof(appendRequestHeaderV2));
break;
}
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
}
return;
}
wp_log(FATAL, "unexpected proto_version %d", proto_version);
}
/*
* Try to read CopyData message from i'th safekeeper, resetting connection on
* failure.
@@ -1779,6 +1961,37 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
return false;
}
/* Deserialize membership configuration from buf to mconf. */
static void
MembershipConfigurationDeserialize(MembershipConfiguration *mconf, StringInfo buf)
{
uint32 i;
mconf->generation = pq_getmsgint32(buf);
mconf->members.len = pq_getmsgint32(buf);
mconf->members.m = palloc0(sizeof(SafekeeperId) * mconf->members.len);
for (i = 0; i < mconf->members.len; i++)
{
const char *buf_host;
mconf->members.m[i].node_id = pq_getmsgint64(buf);
buf_host = pq_getmsgrawstring(buf);
strlcpy(mconf->members.m[i].host, buf_host, sizeof(mconf->members.m[i].host));
mconf->members.m[i].port = pq_getmsgint16(buf);
}
mconf->new_members.len = pq_getmsgint32(buf);
mconf->new_members.m = palloc0(sizeof(SafekeeperId) * mconf->new_members.len);
for (i = 0; i < mconf->new_members.len; i++)
{
const char *buf_host;
mconf->new_members.m[i].node_id = pq_getmsgint64(buf);
buf_host = pq_getmsgrawstring(buf);
strlcpy(mconf->new_members.m[i].host, buf_host, sizeof(mconf->new_members.m[i].host));
mconf->new_members.m[i].port = pq_getmsgint16(buf);
}
}
/*
* Read next message with known type into provided struct, by reading a CopyData
* block from the safekeeper's postgres connection, returning whether the read
@@ -1787,6 +2000,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
* If the read needs more polling, we return 'false' and keep the state
* unmodified, waiting until it becomes read-ready to try again. If it fully
* failed, a warning is emitted and the connection is reset.
*
* Note: it pallocs if needed, i.e. for AcceptorGreeting and VoteResponse fields.
*/
static bool
AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
@@ -1795,82 +2010,154 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
char *buf;
int buf_size;
uint64 tag;
uint8 tag;
StringInfoData s;
if (!(AsyncRead(sk, &buf, &buf_size)))
return false;
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
/* parse it */
s.data = buf;
s.len = buf_size;
s.maxlen = buf_size;
s.cursor = 0;
tag = pq_getmsgint64_le(&s);
if (tag != anymsg->tag)
if (wp->config->proto_version == 3)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
switch (tag)
{
case 'g':
{
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->nodeId = pq_getmsgint64_le(&s);
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->voteGiven = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->truncateLsn = pq_getmsgint64_le(&s);
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (int i = 0; i < msg->termHistory.n_entries; i++)
tag = pq_getmsgbyte(&s);
if (tag != anymsg->tag)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
switch (tag)
{
case 'g':
{
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->nodeId = pq_getmsgint64(&s);
MembershipConfigurationDeserialize(&msg->mconf, &s);
msg->term = pq_getmsgint64(&s);
pq_getmsgend(&s);
return true;
}
msg->timelineStartLsn = pq_getmsgint64_le(&s);
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
case 'a':
{
AppendResponse *msg = (AppendResponse *) anymsg;
msg->generation = pq_getmsgint32(&s);
msg->term = pq_getmsgint64(&s);
msg->voteGiven = pq_getmsgbyte(&s);
msg->flushLsn = pq_getmsgint64(&s);
msg->truncateLsn = pq_getmsgint64(&s);
msg->termHistory.n_entries = pq_getmsgint32(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (uint32 i = 0; i < msg->termHistory.n_entries; i++)
{
msg->termHistory.entries[i].term = pq_getmsgint64(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64(&s);
}
pq_getmsgend(&s);
return true;
}
case 'a':
{
AppendResponse *msg = (AppendResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->commitLsn = pq_getmsgint64_le(&s);
msg->hs.ts = pq_getmsgint64_le(&s);
msg->hs.xmin.value = pq_getmsgint64_le(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}
default:
{
Assert(false);
return false;
}
msg->generation = pq_getmsgint32(&s);
msg->term = pq_getmsgint64(&s);
msg->flushLsn = pq_getmsgint64(&s);
msg->commitLsn = pq_getmsgint64(&s);
msg->hs.ts = pq_getmsgint64(&s);
msg->hs.xmin.value = pq_getmsgint64(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64(&s);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}
default:
{
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
return false;
}
}
}
else if (wp->config->proto_version == 2)
{
tag = pq_getmsgint64_le(&s);
if (tag != anymsg->tag)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
switch (tag)
{
case 'g':
{
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->nodeId = pq_getmsgint64_le(&s);
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->voteGiven = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->truncateLsn = pq_getmsgint64_le(&s);
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (int i = 0; i < msg->termHistory.n_entries; i++)
{
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
}
pq_getmsgint64_le(&s); /* timelineStartLsn */
pq_getmsgend(&s);
return true;
}
case 'a':
{
AppendResponse *msg = (AppendResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->commitLsn = pq_getmsgint64_le(&s);
msg->hs.ts = pq_getmsgint64_le(&s);
msg->hs.xmin.value = pq_getmsgint64_le(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}
default:
{
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
return false;
}
}
}
wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version);
return false; /* keep the compiler quiet */
}
/*
@@ -2246,3 +2533,45 @@ FormatEvents(WalProposer *wp, uint32 events)
return (char *) &return_str;
}
/* Dump mconf as toml for observability / debugging. Result is palloc'ed. */
static char *
MembershipConfigurationToString(MembershipConfiguration *mconf)
{
StringInfoData s;
uint32 i;
initStringInfo(&s);
appendStringInfo(&s, "{gen = %u", mconf->generation);
appendStringInfoString(&s, ", members = [");
for (i = 0; i < mconf->members.len; i++)
{
if (i > 0)
appendStringInfoString(&s, ", ");
appendStringInfo(&s, "{node_id = %lu", mconf->members.m[i].node_id);
appendStringInfo(&s, ", host = %s", mconf->members.m[i].host);
appendStringInfo(&s, ", port = %u }", mconf->members.m[i].port);
}
appendStringInfo(&s, "], new_members = [");
for (i = 0; i < mconf->new_members.len; i++)
{
if (i > 0)
appendStringInfoString(&s, ", ");
appendStringInfo(&s, "{node_id = %lu", mconf->new_members.m[i].node_id);
appendStringInfo(&s, ", host = %s", mconf->new_members.m[i].host);
appendStringInfo(&s, ", port = %u }", mconf->new_members.m[i].port);
}
appendStringInfoString(&s, "]}");
return s.data;
}
static void
MembershipConfigurationFree(MembershipConfiguration *mconf)
{
if (mconf->members.m)
pfree(mconf->members.m);
mconf->members.m = NULL;
if (mconf->new_members.m)
pfree(mconf->new_members.m);
mconf->new_members.m = NULL;
}

View File

@@ -12,9 +12,6 @@
#include "neon_walreader.h"
#include "pagestore_client.h"
#define SK_MAGIC 0xCafeCeefu
#define SK_PROTOCOL_VERSION 2
#define MAX_SAFEKEEPERS 32
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
* message */
@@ -143,12 +140,71 @@ typedef uint64 term_t;
/* neon storage node id */
typedef uint64 NNodeId;
/*
* Number uniquely identifying safekeeper membership configuration.
* This and following structs pair ones in membership.rs.
*/
typedef uint32 Generation;
typedef struct SafekeeperId
{
NNodeId node_id;
char host[MAXCONNINFO];
uint16 port;
} SafekeeperId;
/* Set of safekeepers. */
typedef struct MemberSet
{
uint32 len; /* number of members */
SafekeeperId *m; /* ids themselves */
} MemberSet;
/* Timeline safekeeper membership configuration. */
typedef struct MembershipConfiguration
{
Generation generation;
MemberSet members;
/* Has 0 n_members in non joint conf. */
MemberSet new_members;
} MembershipConfiguration;
/*
* Proposer <-> Acceptor messaging.
*/
typedef struct ProposerAcceptorMessage
{
uint8 tag;
} ProposerAcceptorMessage;
/* Initial Proposer -> Acceptor message */
typedef struct ProposerGreeting
{
ProposerAcceptorMessage pam; /* message tag */
/*
* tenant/timeline ids as C strings with standard hex notation for ease of
* printing. In principle they are not strictly needed as ttid is also
* passed as libpq options.
*/
char *tenant_id;
char *timeline_id;
/* Full conf is carried to allow safekeeper switch */
MembershipConfiguration mconf;
/*
* pg_version and wal_seg_size are used for timeline creation until we
* fully migrate to doing externally. systemId is only used as a sanity
* cross check.
*/
uint32 pg_version; /* in PG_VERSION_NUM format */
uint64 system_id; /* Postgres system identifier. */
uint32 wal_seg_size;
} ProposerGreeting;
/* protocol v2 variant, kept while wp supports it */
typedef struct ProposerGreetingV2
{
uint64 tag; /* message tag */
uint32 protocolVersion; /* proposer-safekeeper protocol version */
@@ -159,32 +215,42 @@ typedef struct ProposerGreeting
uint8 tenant_id[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
} ProposerGreetingV2;
typedef struct AcceptorProposerMessage
{
uint64 tag;
uint8 tag;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
* Acceptor -> Proposer initial response: the highest term acceptor voted for,
* its node id and configuration.
*/
typedef struct AcceptorGreeting
{
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
MembershipConfiguration mconf;
term_t term;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
*/
typedef struct VoteRequest
{
ProposerAcceptorMessage pam; /* message tag */
Generation generation; /* membership conf generation */
term_t term;
} VoteRequest;
/* protocol v2 variant, kept while wp supports it */
typedef struct VoteRequestV2
{
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
} VoteRequestV2;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
@@ -203,8 +269,15 @@ typedef struct TermHistory
typedef struct VoteResponse
{
AcceptorProposerMessage apm;
/*
* Membership conf generation. It's not strictly required because on
* mismatch safekeeper is expected to ERROR the connection, but let's
* sanity check it.
*/
Generation generation;
term_t term;
uint64 voteGiven;
uint8 voteGiven;
/*
* Safekeeper flush_lsn (end of WAL) + history of term switches allow
@@ -214,7 +287,6 @@ typedef struct VoteResponse
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for*
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
/*
@@ -223,20 +295,37 @@ typedef struct VoteResponse
*/
typedef struct ProposerElected
{
uint64 tag;
AcceptorProposerMessage apm;
Generation generation; /* membership conf generation */
term_t term;
/* proposer will send since this point */
XLogRecPtr startStreamingAt;
/* history of term switches up to this proposer */
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
*/
typedef struct AppendRequestHeader
{
AcceptorProposerMessage apm;
Generation generation; /* membership conf generation */
term_t term; /* term of the proposer */
XLogRecPtr beginLsn; /* start position of message in WAL */
XLogRecPtr endLsn; /* end position of message in WAL */
XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
/*
* minimal LSN which may be needed for recovery of some safekeeper (end
* lsn + 1 of last chunk streamed to everyone)
*/
XLogRecPtr truncateLsn;
/* in the AppendRequest message, WAL data follows */
} AppendRequestHeader;
/* protocol v2 variant, kept while wp supports it */
typedef struct AppendRequestHeaderV2
{
uint64 tag;
term_t term; /* term of the proposer */
@@ -256,7 +345,8 @@ typedef struct AppendRequestHeader
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
/* in the AppendRequest message, WAL data follows */
} AppendRequestHeaderV2;
/*
* Hot standby feedback received from replica
@@ -309,6 +399,13 @@ typedef struct AppendResponse
{
AcceptorProposerMessage apm;
/*
* Membership conf generation. It's not strictly required because on
* mismatch safekeeper is expected to ERROR the connection, but let's
* sanity check it.
*/
Generation generation;
/*
* Current term of the safekeeper; if it is higher than proposer's, the
* compute is out of date.
@@ -644,6 +741,8 @@ typedef struct WalProposerConfig
/* Will be passed to safekeepers in greet request. */
TimeLineID pgTimeline;
int proto_version;
#ifdef WALPROPOSER_LIB
void *callback_data;
#endif
@@ -656,11 +755,14 @@ typedef struct WalProposerConfig
typedef struct WalProposer
{
WalProposerConfig *config;
int n_safekeepers;
/* Current walproposer membership configuration */
MembershipConfiguration mconf;
/* (n_safekeepers / 2) + 1 */
int quorum;
/* Number of occupied slots in safekeepers[] */
int n_safekeepers;
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */
@@ -670,6 +772,7 @@ typedef struct WalProposer
XLogRecPtr commitLsn;
ProposerGreeting greetRequest;
ProposerGreetingV2 greetRequestV2;
/* Vote request for safekeeper */
VoteRequest voteRequest;

View File

@@ -117,14 +117,13 @@ pq_getmsgbytes(StringInfo msg, int datalen)
}
/* --------------------------------
* pq_getmsgstring - get a null-terminated text string (with conversion)
* pq_getmsgrawstring - get a null-terminated text string - NO conversion
*
* May return a pointer directly into the message buffer, or a pointer
* to a palloc'd conversion result.
* Returns a pointer directly into the message buffer.
* --------------------------------
*/
const char *
pq_getmsgstring(StringInfo msg)
pq_getmsgrawstring(StringInfo msg)
{
char *str;
int slen;
@@ -155,6 +154,45 @@ pq_getmsgend(StringInfo msg)
ExceptionalCondition("invalid msg format", __FILE__, __LINE__);
}
/* --------------------------------
* pq_sendbytes - append raw data to a StringInfo buffer
* --------------------------------
*/
void
pq_sendbytes(StringInfo buf, const void *data, int datalen)
{
/* use variant that maintains a trailing null-byte, out of caution */
appendBinaryStringInfo(buf, data, datalen);
}
/* --------------------------------
* pq_send_ascii_string - append a null-terminated text string (without conversion)
*
* This function intentionally bypasses encoding conversion, instead just
* silently replacing any non-7-bit-ASCII characters with question marks.
* It is used only when we are having trouble sending an error message to
* the client with normal localization and encoding conversion. The caller
* should already have taken measures to ensure the string is just ASCII;
* the extra work here is just to make certain we don't send a badly encoded
* string to the client (which might or might not be robust about that).
*
* NB: passed text string must be null-terminated, and so is the data
* sent to the frontend.
* --------------------------------
*/
void
pq_send_ascii_string(StringInfo buf, const char *str)
{
while (*str)
{
char ch = *str++;
if (IS_HIGHBIT_SET(ch))
ch = '?';
appendStringInfoCharMacro(buf, ch);
}
appendStringInfoChar(buf, '\0');
}
/*
* Produce a C-string representation of a TimestampTz.

View File

@@ -59,9 +59,11 @@
#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot"
/* GUCs */
char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
int safekeeper_proto_version = 2;
/* Set to true in the walproposer bgw. */
static bool am_walproposer;
@@ -126,6 +128,7 @@ init_walprop_config(bool syncSafekeepers)
else
walprop_config.systemId = 0;
walprop_config.pgTimeline = walprop_pg_get_timeline_id();
walprop_config.proto_version = safekeeper_proto_version;
}
/*
@@ -219,25 +222,37 @@ nwp_register_gucs(void)
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable(
"neon.safekeeper_proto_version",
"Version of compute <-> safekeeper protocol.",
"Used while migrating from 2 to 3.",
&safekeeper_proto_version,
2, 0, INT_MAX,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
}
static int
split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
{
int n_safekeepers = 0;
char *curr_sk = safekeepers_list;
int n_safekeepers = 0;
char *curr_sk = safekeepers_list;
for (char *coma = safekeepers_list; coma != NULL && *coma != '\0'; curr_sk = coma)
{
if (++n_safekeepers >= MAX_SAFEKEEPERS) {
if (++n_safekeepers >= MAX_SAFEKEEPERS)
{
wpg_log(FATAL, "too many safekeepers");
}
coma = strchr(coma, ',');
safekeepers[n_safekeepers-1] = curr_sk;
safekeepers[n_safekeepers - 1] = curr_sk;
if (coma != NULL) {
if (coma != NULL)
{
*coma++ = '\0';
}
}
@@ -252,10 +267,10 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
static bool
safekeepers_cmp(char *old, char *new)
{
char *safekeepers_old[MAX_SAFEKEEPERS];
char *safekeepers_new[MAX_SAFEKEEPERS];
int len_old = 0;
int len_new = 0;
char *safekeepers_old[MAX_SAFEKEEPERS];
char *safekeepers_new[MAX_SAFEKEEPERS];
int len_old = 0;
int len_new = 0;
len_old = split_safekeepers_list(old, safekeepers_old);
len_new = split_safekeepers_list(new, safekeepers_new);
@@ -292,7 +307,8 @@ assign_neon_safekeepers(const char *newval, void *extra)
if (!am_walproposer)
return;
if (!newval) {
if (!newval)
{
/* should never happen */
wpg_log(FATAL, "neon.safekeepers is empty");
}
@@ -301,11 +317,11 @@ assign_neon_safekeepers(const char *newval, void *extra)
newval_copy = pstrdup(newval);
oldval = pstrdup(wal_acceptors_list);
/*
/*
* TODO: restarting through FATAL is stupid and introduces 1s delay before
* next bgw start. We should refactor walproposer to allow graceful exit and
* thus remove this delay.
* XXX: If you change anything here, sync with test_safekeepers_reconfigure_reorder.
* next bgw start. We should refactor walproposer to allow graceful exit
* and thus remove this delay. XXX: If you change anything here, sync with
* test_safekeepers_reconfigure_reorder.
*/
if (!safekeepers_cmp(oldval, newval_copy))
{
@@ -454,7 +470,8 @@ backpressure_throttling_impl(void)
memcpy(new_status, old_status, len);
snprintf(new_status + len, 64, "backpressure throttling: lag %lu", lag);
set_ps_display(new_status);
new_status[len] = '\0'; /* truncate off " backpressure ..." to later reset the ps */
new_status[len] = '\0'; /* truncate off " backpressure ..." to later
* reset the ps */
elog(DEBUG2, "backpressure throttling: lag %lu", lag);
start = GetCurrentTimestamp();
@@ -621,7 +638,7 @@ walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos)
wpg_log(LOG, "WAL proposer starts streaming at %X/%X",
LSN_FORMAT_ARGS(startpos));
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
cmd.timeline = wp->greetRequest.timeline;
cmd.timeline = wp->config->pgTimeline;
cmd.startpoint = startpos;
StartProposerReplication(wp, &cmd);
}
@@ -1963,10 +1980,11 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
FullTransactionId xmin = hsFeedback.xmin;
FullTransactionId catalog_xmin = hsFeedback.catalog_xmin;
FullTransactionId next_xid = ReadNextFullTransactionId();
/*
* Page server is updating nextXid in checkpoint each 1024 transactions,
* so feedback xmin can be actually larger then nextXid and
* function TransactionIdInRecentPast return false in this case,
* Page server is updating nextXid in checkpoint each 1024
* transactions, so feedback xmin can be actually larger then nextXid
* and function TransactionIdInRecentPast return false in this case,
* preventing update of slot's xmin.
*/
if (FullTransactionIdPrecedes(next_xid, xmin))

View File

@@ -13,6 +13,7 @@ use safekeeper::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
};
use safekeeper::test_utils::Env;
use safekeeper_api::membership::SafekeeperGeneration as Generation;
use tokio::io::AsyncWriteExt as _;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
@@ -88,13 +89,12 @@ fn bench_process_msg(c: &mut Criterion) {
let (lsn, record) = walgen.next().expect("endless WAL");
ProposerAcceptorMessage::AppendRequest(AppendRequest {
h: AppendRequestHeader {
generation: Generation::new(0),
term: 1,
term_start_lsn: Lsn(0),
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
})
@@ -160,13 +160,12 @@ fn bench_wal_acceptor(c: &mut Criterion) {
.take(n)
.map(|(lsn, record)| AppendRequest {
h: AppendRequestHeader {
generation: Generation::new(0),
term: 1,
term_start_lsn: Lsn(0),
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
})
@@ -262,13 +261,12 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
runtime.block_on(async {
let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest {
h: AppendRequestHeader {
generation: Generation::new(0),
term: 1,
term_start_lsn: Lsn(0),
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
});

View File

@@ -8,7 +8,7 @@
use anyhow::Context;
use postgres_backend::QueryError;
use safekeeper_api::membership::Configuration;
use safekeeper_api::membership::{Configuration, INVALID_GENERATION};
use safekeeper_api::{ServerInfo, Term};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -133,10 +133,10 @@ async fn send_proposer_elected(
let history = TermHistory(history_entries);
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
generation: INVALID_GENERATION,
term,
start_streaming_at: lsn,
term_history: history,
timeline_start_lsn: lsn,
});
tli.process_msg(&proposer_elected_request).await?;
@@ -170,13 +170,12 @@ pub async fn append_logical_message(
let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
h: AppendRequestHeader {
generation: INVALID_GENERATION,
term: msg.term,
term_start_lsn: begin_lsn,
begin_lsn,
end_lsn,
commit_lsn,
truncate_lsn: msg.truncate_lsn,
proposer_uuid: [0u8; 16],
},
wal_data,
});

View File

@@ -281,7 +281,7 @@ impl SafekeeperPostgresHandler {
tokio::select! {
// todo: add read|write .context to these errors
r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
r = network_write(pgb, reply_rx, pageserver_feedback_rx, proto_version) => r,
_ = timeline_cancel.cancelled() => {
return Err(CopyStreamHandlerEnd::Cancelled);
}
@@ -342,8 +342,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
let tli = match next_msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with walproposer {} sysid {} timeline {}",
self.peer_addr, greeting.system_id, greeting.tli,
"start handshake with walproposer {} sysid {}",
self.peer_addr, greeting.system_id,
);
let server_info = ServerInfo {
pg_version: greeting.pg_version,
@@ -459,6 +459,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_writer: &mut PostgresBackend<IO>,
mut reply_rx: Receiver<AcceptorProposerMessage>,
mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
proto_version: u32,
) -> Result<(), CopyStreamHandlerEnd> {
let mut buf = BytesMut::with_capacity(128);
@@ -496,7 +497,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
};
buf.clear();
msg.serialize(&mut buf)?;
msg.serialize(&mut buf, proto_version)?;
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
}
}

View File

@@ -7,6 +7,7 @@ use std::{fmt, pin::pin};
use anyhow::{bail, Context};
use futures::StreamExt;
use postgres_protocol::message::backend::ReplicationMessage;
use safekeeper_api::membership::INVALID_GENERATION;
use safekeeper_api::models::{PeerInfo, TimelineStatus};
use safekeeper_api::Term;
use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -267,7 +268,10 @@ async fn recover(
);
// Now understand our term history.
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
generation: INVALID_GENERATION,
term: donor.term,
});
let vote_response = match tli
.process_msg(&vote_request)
.await
@@ -302,10 +306,10 @@ async fn recover(
// truncate WAL locally
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
generation: INVALID_GENERATION,
term: donor.term,
start_streaming_at: last_common_point.lsn,
term_history: donor_th,
timeline_start_lsn: Lsn::INVALID,
});
// Successful ProposerElected handling always returns None. If term changed,
// we'll find out that during the streaming. Note: it is expected to get
@@ -437,13 +441,12 @@ async fn network_io(
match msg {
ReplicationMessage::XLogData(xlog_data) => {
let ar_hdr = AppendRequestHeader {
generation: INVALID_GENERATION,
term: donor.term,
term_start_lsn: Lsn::INVALID, // unused
begin_lsn: Lsn(xlog_data.wal_start()),
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
truncate_lsn: Lsn::INVALID, // do not attempt to advance
proposer_uuid: [0; 16],
};
let ar = AppendRequest {
h: ar_hdr,

View File

@@ -5,6 +5,11 @@ use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
use safekeeper_api::membership;
use safekeeper_api::membership::MemberSet;
use safekeeper_api::membership::SafekeeperGeneration as Generation;
use safekeeper_api::membership::SafekeeperId;
use safekeeper_api::membership::INVALID_GENERATION;
use safekeeper_api::models::HotStandbyFeedback;
use safekeeper_api::Term;
use serde::{Deserialize, Serialize};
@@ -12,6 +17,7 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use std::str::FromStr;
use storage_broker::proto::SafekeeperTimelineInfo;
use tracing::*;
@@ -29,7 +35,8 @@ use utils::{
lsn::Lsn,
};
pub const SK_PROTOCOL_VERSION: u32 = 2;
pub const SK_PROTO_VERSION_2: u32 = 2;
pub const SK_PROTO_VERSION_3: u32 = 3;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
@@ -56,8 +63,28 @@ impl TermHistory {
TermHistory(Vec::new())
}
// Parse TermHistory as n_entries followed by TermLsn pairs
// Parse TermHistory as n_entries followed by TermLsn pairs in network order.
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
let n_entries = bytes
.get_u32_f()
.with_context(|| "TermHistory misses len")?;
let mut res = Vec::with_capacity(n_entries as usize);
for i in 0..n_entries {
let term = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses term", i))?;
let lsn = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses lsn", i))?
.into();
res.push(TermLsn { term, lsn })
}
Ok(TermHistory(res))
}
// Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
// TODO remove once v2 protocol is fully dropped.
pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
if bytes.remaining() < 4 {
bail!("TermHistory misses len");
}
@@ -197,6 +224,18 @@ impl AcceptorState {
/// Initial Proposer -> Acceptor message
#[derive(Debug, Deserialize)]
pub struct ProposerGreeting {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mconf: membership::Configuration,
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
pub wal_seg_size: u32,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Deserialize)]
pub struct ProposerGreetingV2 {
/// proposer-acceptor protocol version
pub protocol_version: u32,
/// Postgres server version
@@ -213,27 +252,35 @@ pub struct ProposerGreeting {
/// (acceptor voted for).
#[derive(Debug, Serialize)]
pub struct AcceptorGreeting {
term: u64,
node_id: NodeId,
mconf: membership::Configuration,
term: u64,
}
/// Vote request sent from proposer to safekeepers
#[derive(Debug, Deserialize)]
#[derive(Debug)]
pub struct VoteRequest {
pub generation: Generation,
pub term: Term,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Deserialize)]
pub struct VoteRequestV2 {
pub term: Term,
}
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
pub struct VoteResponse {
generation: Generation, // membership conf generation
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
vote_given: bool,
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
// proposer to choose the most advanced one.
pub flush_lsn: Lsn,
truncate_lsn: Lsn,
pub term_history: TermHistory,
timeline_start_lsn: Lsn,
}
/*
@@ -242,10 +289,10 @@ pub struct VoteResponse {
*/
#[derive(Debug)]
pub struct ProposerElected {
pub generation: Generation, // membership conf generation
pub term: Term,
pub start_streaming_at: Lsn,
pub term_history: TermHistory,
pub timeline_start_lsn: Lsn,
}
/// Request with WAL message sent from proposer to safekeeper. Along the way it
@@ -257,6 +304,22 @@ pub struct AppendRequest {
}
#[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeader {
pub generation: Generation, // membership conf generation
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
/// start position of message in WAL
pub begin_lsn: Lsn,
/// end position of message in WAL
pub end_lsn: Lsn,
/// LSN committed by quorum of safekeepers
pub commit_lsn: Lsn,
/// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
pub truncate_lsn: Lsn,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeaderV2 {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
// TODO: remove this field from the protocol, it in unused -- LSN of term
@@ -277,6 +340,9 @@ pub struct AppendRequestHeader {
/// Report safekeeper state to proposer
#[derive(Debug, Serialize, Clone)]
pub struct AppendResponse {
// Membership conf generation. Not strictly required because on mismatch
// connection is reset, but let's sanity check it.
generation: Generation,
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.
pub term: Term,
@@ -293,8 +359,9 @@ pub struct AppendResponse {
}
impl AppendResponse {
fn term_only(term: Term) -> AppendResponse {
fn term_only(generation: Generation, term: Term) -> AppendResponse {
AppendResponse {
generation,
term,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
@@ -315,72 +382,322 @@ pub enum ProposerAcceptorMessage {
FlushWAL,
}
impl ProposerAcceptorMessage {
/// Parse proposer message.
pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
if proto_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
proto_version,
SK_PROTOCOL_VERSION
);
/// Augment Bytes with fallible get_uN where N is number of bytes methods.
/// All reads are in network (big endian) order.
trait BytesF {
fn get_u8_f(&mut self) -> Result<u8>;
fn get_u16_f(&mut self) -> Result<u16>;
fn get_u32_f(&mut self) -> Result<u32>;
fn get_u64_f(&mut self) -> Result<u64>;
}
impl BytesF for Bytes {
fn get_u8_f(&mut self) -> Result<u8> {
if self.is_empty() {
bail!("no bytes left, expected 1");
}
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
'g' => {
let msg = ProposerGreeting::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::Greeting(msg))
}
'v' => {
let msg = VoteRequest::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::VoteRequest(msg))
}
'e' => {
let mut msg_bytes = stream.into_inner();
if msg_bytes.remaining() < 16 {
bail!("ProposerElected message is not complete");
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
if msg_bytes.remaining() < 8 {
bail!("ProposerElected message is not complete");
}
let timeline_start_lsn = msg_bytes.get_u64_le().into();
let msg = ProposerElected {
term,
start_streaming_at,
timeline_start_lsn,
term_history,
Ok(self.get_u8())
}
fn get_u16_f(&mut self) -> Result<u16> {
if self.remaining() < 2 {
bail!("no bytes left, expected 2");
}
Ok(self.get_u16())
}
fn get_u32_f(&mut self) -> Result<u32> {
if self.remaining() < 4 {
bail!("only {} bytes left, expected 4", self.remaining());
}
Ok(self.get_u32())
}
fn get_u64_f(&mut self) -> Result<u64> {
if self.remaining() < 8 {
bail!("only {} bytes left, expected 8", self.remaining());
}
Ok(self.get_u64())
}
}
impl ProposerAcceptorMessage {
/// Read cstring from Bytes.
fn get_cstr(buf: &mut Bytes) -> Result<String> {
let pos = buf
.iter()
.position(|x| *x == 0)
.ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
let result = buf.split_to(pos);
buf.advance(1); // drop the null terminator
match std::str::from_utf8(&result) {
Ok(s) => Ok(s.to_string()),
Err(e) => bail!("invalid utf8 in cstring: {}", e),
}
}
/// Read membership::Configuration from Bytes.
fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?);
let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
// Main member set must have at least someone in valid configuration.
// Empty conf is allowed until we fully migrate.
if generation != INVALID_GENERATION && members_len == 0 {
bail!("empty members_len");
}
let mut members = MemberSet::empty();
for i in 0..members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading member {} node_id", i))?;
let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading member {} port", i))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
pg_port,
};
members.add(sk)?;
}
let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
// Non joint conf.
if new_members_len == 0 {
Ok(membership::Configuration {
generation,
members,
new_members: None,
})
} else {
let mut new_members = MemberSet::empty();
for i in 0..new_members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading new member {} node_id", i))?;
let host = Self::get_cstr(buf)
.with_context(|| format!("reading new member {} host", i))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading new member {} port", i))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
pg_port,
};
Ok(ProposerAcceptorMessage::Elected(msg))
new_members.add(sk)?;
}
'a' => {
// read header followed by wal data
let hdr = AppendRequestHeader::des_from(&mut stream)?;
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.context("begin_lsn > end_lsn in AppendRequest")?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
"AppendRequest is longer than MAX_SEND_SIZE ({})",
MAX_SEND_SIZE
);
Ok(membership::Configuration {
generation,
members,
new_members: Some(new_members),
})
}
}
/// Parse proposer message.
pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
if proto_version == SK_PROTO_VERSION_3 {
if msg_bytes.is_empty() {
bail!("ProposerAcceptorMessage is not complete: missing tag");
}
let tag = msg_bytes.get_u8_f().with_context(|| {
"ProposerAcceptorMessage is not complete: missing tag".to_string()
})? as char;
match tag {
'g' => {
let tenant_id_str =
Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
let tenant_id = TenantId::from_str(&tenant_id_str)?;
let timeline_id_str =
Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
let timeline_id = TimelineId::from_str(&timeline_id_str)?;
let mconf = Self::get_mconf(&mut msg_bytes)?;
let pg_version = msg_bytes
.get_u32_f()
.with_context(|| "reading pg_version")?;
let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
let wal_seg_size = msg_bytes
.get_u32_f()
.with_context(|| "reading wal_seg_size")?;
let g = ProposerGreeting {
tenant_id,
timeline_id,
mconf,
pg_version,
system_id,
wal_seg_size,
};
Ok(ProposerAcceptorMessage::Greeting(g))
}
'v' => {
let generation = Generation::new(
msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?,
);
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let v = VoteRequest { generation, term };
Ok(ProposerAcceptorMessage::VoteRequest(v))
}
'e' => {
let generation = Generation::new(
msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?,
);
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let start_streaming_at: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading start_streaming_at")?
.into();
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
let msg = ProposerElected {
generation,
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
'a' => {
let generation = Generation::new(
msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?,
);
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let begin_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading begin_lsn")?
.into();
let end_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading end_lsn")?
.into();
let commit_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading commit_lsn")?
.into();
let truncate_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading truncate_lsn")?
.into();
let hdr = AppendRequestHeader {
generation,
term,
begin_lsn,
end_lsn,
commit_lsn,
truncate_lsn,
};
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.context("begin_lsn > end_lsn in AppendRequest")?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
"AppendRequest is longer than MAX_SEND_SIZE ({})",
MAX_SEND_SIZE
);
}
if msg_bytes.remaining() < rec_size {
bail!(
"reading WAL: only {} bytes left, wanted {}",
msg_bytes.remaining(),
rec_size
);
}
let wal_data = msg_bytes.copy_to_bytes(rec_size);
let msg = AppendRequest { h: hdr, wal_data };
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
stream.read_exact(&mut wal_data_vec)?;
let wal_data = Bytes::from(wal_data_vec);
let msg = AppendRequest { h: hdr, wal_data };
Ok(ProposerAcceptorMessage::AppendRequest(msg))
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
} else if proto_version == SK_PROTO_VERSION_2 {
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
'g' => {
let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
let g = ProposerGreeting {
tenant_id: msgv2.tenant_id,
timeline_id: msgv2.timeline_id,
mconf: membership::Configuration {
generation: INVALID_GENERATION,
members: MemberSet::empty(),
new_members: None,
},
pg_version: msgv2.pg_version,
system_id: msgv2.system_id,
wal_seg_size: msgv2.wal_seg_size,
};
Ok(ProposerAcceptorMessage::Greeting(g))
}
'v' => {
let msg = VoteRequestV2::des_from(&mut stream)?;
let v = VoteRequest {
generation: INVALID_GENERATION,
term: msg.term,
};
Ok(ProposerAcceptorMessage::VoteRequest(v))
}
'e' => {
let mut msg_bytes = stream.into_inner();
if msg_bytes.remaining() < 16 {
bail!("ProposerElected message is not complete");
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
if msg_bytes.remaining() < 8 {
bail!("ProposerElected message is not complete");
}
let _timeline_start_lsn = msg_bytes.get_u64_le();
let msg = ProposerElected {
generation: INVALID_GENERATION,
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
'a' => {
// read header followed by wal data
let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
let hdr = AppendRequestHeader {
generation: INVALID_GENERATION,
term: hdrv2.term,
begin_lsn: hdrv2.begin_lsn,
end_lsn: hdrv2.end_lsn,
commit_lsn: hdrv2.commit_lsn,
truncate_lsn: hdrv2.truncate_lsn,
};
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.context("begin_lsn > end_lsn in AppendRequest")?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
"AppendRequest is longer than MAX_SEND_SIZE ({})",
MAX_SEND_SIZE
);
}
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
stream.read_exact(&mut wal_data_vec)?;
let wal_data = Bytes::from(wal_data_vec);
let msg = AppendRequest { h: hdr, wal_data };
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
} else {
bail!("unsupported protocol version {}", proto_version);
}
}
@@ -394,36 +711,21 @@ impl ProposerAcceptorMessage {
// We explicitly list all fields, to draw attention here when new fields are added.
let mut size = BASE_SIZE;
size += match self {
Self::Greeting(ProposerGreeting {
protocol_version: _,
pg_version: _,
proposer_id: _,
system_id: _,
timeline_id: _,
tenant_id: _,
tli: _,
wal_seg_size: _,
}) => 0,
Self::Greeting(_) => 0,
Self::VoteRequest(VoteRequest { term: _ }) => 0,
Self::VoteRequest(_) => 0,
Self::Elected(ProposerElected {
term: _,
start_streaming_at: _,
term_history: _,
timeline_start_lsn: _,
}) => 0,
Self::Elected(_) => 0,
Self::AppendRequest(AppendRequest {
h:
AppendRequestHeader {
generation: _,
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),
@@ -431,13 +733,12 @@ impl ProposerAcceptorMessage {
Self::NoFlushAppendRequest(AppendRequest {
h:
AppendRequestHeader {
generation: _,
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),
@@ -458,45 +759,118 @@ pub enum AcceptorProposerMessage {
}
impl AcceptorProposerMessage {
/// Serialize acceptor -> proposer message.
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u64_le('g' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.node_id.0);
}
AcceptorProposerMessage::VoteResponse(msg) => {
buf.put_u64_le('v' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
buf.put_u64_le(msg.timeline_start_lsn.into());
}
AcceptorProposerMessage::AppendResponse(msg) => {
buf.put_u64_le('a' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.commit_lsn.into());
buf.put_i64_le(msg.hs_feedback.ts);
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
fn put_cstr(buf: &mut BytesMut, s: &str) {
buf.put_slice(s.as_bytes());
buf.put_u8(0); // null terminator
}
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
/// Serialize membership::Configuration into buf.
fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
buf.put_u32(mconf.generation.into_inner());
buf.put_u32(mconf.members.m.len() as u32);
for sk in &mconf.members.m {
buf.put_u64(sk.id.0);
Self::put_cstr(buf, &sk.host);
buf.put_u16(sk.pg_port);
}
if let Some(ref new_members) = mconf.new_members {
buf.put_u32(new_members.m.len() as u32);
for sk in &new_members.m {
buf.put_u64(sk.id.0);
Self::put_cstr(buf, &sk.host);
buf.put_u16(sk.pg_port);
}
} else {
buf.put_u32(0);
}
}
Ok(())
/// Serialize acceptor -> proposer message.
pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
if proto_version == SK_PROTO_VERSION_3 {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u8(b'g');
buf.put_u64(msg.node_id.0);
Self::serialize_mconf(buf, &msg.mconf);
buf.put_u64(msg.term)
}
AcceptorProposerMessage::VoteResponse(msg) => {
buf.put_u8(b'v');
buf.put_u32(msg.generation.into_inner());
buf.put_u64(msg.term);
buf.put_u8(msg.vote_given as u8);
buf.put_u64(msg.flush_lsn.into());
buf.put_u64(msg.truncate_lsn.into());
buf.put_u32(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64(e.term);
buf.put_u64(e.lsn.into());
}
}
AcceptorProposerMessage::AppendResponse(msg) => {
buf.put_u8(b'a');
buf.put_u32(msg.generation.into_inner());
buf.put_u64(msg.term);
buf.put_u64(msg.flush_lsn.into());
buf.put_u64(msg.commit_lsn.into());
buf.put_i64(msg.hs_feedback.ts);
buf.put_u64(msg.hs_feedback.xmin);
buf.put_u64(msg.hs_feedback.catalog_xmin);
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
}
Ok(())
// TODO remove 3 after converting all msgs
} else if proto_version == SK_PROTO_VERSION_2 {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u64_le('g' as u64);
// v2 didn't have mconf and fields were reordered
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.node_id.0);
}
AcceptorProposerMessage::VoteResponse(msg) => {
// v2 didn't have generation, had u64 vote_given and timeline_start_lsn
buf.put_u64_le('v' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given as u64);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
// removed timeline_start_lsn
buf.put_u64_le(0);
}
AcceptorProposerMessage::AppendResponse(msg) => {
// v2 didn't have generation
buf.put_u64_le('a' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.commit_lsn.into());
buf.put_i64_le(msg.hs_feedback.ts);
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
}
Ok(())
} else {
bail!("unsupported protocol version {}", proto_version);
}
}
}
@@ -593,14 +967,6 @@ where
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
// Check protocol compatibility
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
msg.protocol_version,
SK_PROTOCOL_VERSION
);
}
/* Postgres major version mismatch is treated as fatal error
* because safekeepers parse WAL headers and the format
* may change between versions.
@@ -655,15 +1021,16 @@ where
self.state.finish_change(&state).await?;
}
info!(
"processed greeting from walproposer {}, sending term {:?}",
msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
self.state.acceptor_state.term
);
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.state.acceptor_state.term,
let apg = AcceptorGreeting {
node_id: self.node_id,
})))
mconf: self.state.mconf.clone(),
term: self.state.acceptor_state.term,
};
info!(
"processed greeting {:?} from walproposer, sending {:?}",
msg, apg
);
Ok(Some(AcceptorProposerMessage::Greeting(apg)))
}
/// Give vote for the given term, if we haven't done that previously.
@@ -684,12 +1051,12 @@ where
self.wal_store.flush_wal().await?;
// initialize with refusal
let mut resp = VoteResponse {
generation: self.state.mconf.generation,
term: self.state.acceptor_state.term,
vote_given: false as u64,
vote_given: false,
flush_lsn: self.flush_lsn(),
truncate_lsn: self.state.inmem.peer_horizon_lsn,
term_history: self.get_term_history(),
timeline_start_lsn: self.state.timeline_start_lsn,
};
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.start_change();
@@ -698,15 +1065,16 @@ where
self.state.finish_change(&state).await?;
resp.term = self.state.acceptor_state.term;
resp.vote_given = true as u64;
resp.vote_given = true;
}
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
info!("processed {:?}: sending {:?}", msg, &resp);
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
let ar = AppendResponse {
generation: self.state.mconf.generation,
term: self.state.acceptor_state.term,
flush_lsn: self.flush_lsn(),
commit_lsn: self.state.commit_lsn,
@@ -805,18 +1173,22 @@ where
// Here we learn initial LSN for the first time, set fields
// interested in that.
if state.timeline_start_lsn == Lsn(0) {
// Remember point where WAL begins globally.
state.timeline_start_lsn = msg.timeline_start_lsn;
info!(
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
if let Some(start_lsn) = msg.term_history.0.first() {
if state.timeline_start_lsn == Lsn(0) {
// Remember point where WAL begins globally. In the future it
// will be intialized immediately on timeline creation.
state.timeline_start_lsn = start_lsn.lsn;
info!(
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
}
}
if state.peer_horizon_lsn == Lsn(0) {
// Update peer_horizon_lsn as soon as we know where timeline starts.
// It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
state.peer_horizon_lsn = msg.timeline_start_lsn;
state.peer_horizon_lsn = state.timeline_start_lsn;
}
if state.local_start_lsn == Lsn(0) {
state.local_start_lsn = msg.start_streaming_at;
@@ -896,7 +1268,10 @@ where
// If our term is higher, immediately refuse the message.
if self.state.acceptor_state.term > msg.h.term {
let resp = AppendResponse::term_only(self.state.acceptor_state.term);
let resp = AppendResponse::term_only(
self.state.mconf.generation,
self.state.acceptor_state.term,
);
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
@@ -924,10 +1299,8 @@ where
);
}
// Now we know that we are in the same term as the proposer,
// processing the message.
self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
// Now we know that we are in the same term as the proposer, process the
// message.
// do the job
if !msg.wal_data.is_empty() {
@@ -1097,10 +1470,13 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
// check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
generation: Generation::new(0),
term: 1,
});
let mut vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
r => panic!("unexpected response: {:?}", r),
}
@@ -1115,7 +1491,7 @@ mod tests {
// and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
r => panic!("unexpected response: {:?}", r),
}
}
@@ -1130,13 +1506,12 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let mut ar_hdr = AppendRequestHeader {
generation: Generation::new(0),
term: 2,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let mut append_request = AppendRequest {
h: ar_hdr.clone(),
@@ -1144,6 +1519,7 @@ mod tests {
};
let pem = ProposerElected {
generation: Generation::new(0),
term: 2,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![
@@ -1156,7 +1532,6 @@ mod tests {
lsn: Lsn(3),
},
]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
@@ -1191,26 +1566,25 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let pem = ProposerElected {
generation: Generation::new(0),
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(1),
}]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();
let ar_hdr = AppendRequestHeader {
generation: Generation::new(0),
term: 1,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let append_request = AppendRequest {
h: ar_hdr.clone(),

View File

@@ -14,6 +14,7 @@ use crate::wal_backup::remote_timeline_path;
use crate::{control_file, receive_wal, wal_storage, SafeKeeperConf};
use camino_tempfile::Utf8TempDir;
use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
use safekeeper_api::membership::SafekeeperGeneration as Generation;
use tokio::fs::create_dir_all;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
@@ -73,10 +74,10 @@ impl Env {
// Emulate an initial election.
safekeeper
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
generation: Generation::new(0),
term: 1,
start_streaming_at: start_lsn,
term_history: TermHistory(vec![(1, start_lsn).into()]),
timeline_start_lsn: start_lsn,
}))
.await?;
@@ -146,13 +147,12 @@ impl Env {
let req = AppendRequest {
h: AppendRequestHeader {
generation: Generation::new(0),
term: 1,
term_start_lsn: start_lsn,
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: lsn,
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
};

View File

@@ -15,9 +15,7 @@ use desim::{
};
use http::Uri;
use safekeeper::{
safekeeper::{
ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION,
},
safekeeper::{ProposerAcceptorMessage, SafeKeeper, SK_PROTO_VERSION_3, UNKNOWN_SERVER_VERSION},
state::{TimelinePersistentState, TimelineState},
timeline::TimelineError,
wal_storage::Storage,
@@ -287,7 +285,7 @@ impl ConnState {
bail!("finished processing START_REPLICATION")
}
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?;
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTO_VERSION_3)?;
debug!("got msg: {:?}", msg);
self.process(msg, global)
} else {
@@ -403,7 +401,7 @@ impl ConnState {
// TODO: if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
let mut buf = BytesMut::with_capacity(128);
reply.serialize(&mut buf)?;
reply.serialize(&mut buf, SK_PROTO_VERSION_3)?;
self.tcp.send(AnyMessage::Bytes(buf.into()));
}

View File

@@ -6,9 +6,14 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.pageserver.http import PageserverHttpClient
def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
def check_tenant(
env: NeonEnv, pageserver_http: PageserverHttpClient, safekeeper_proto_version: int
):
tenant_id, timeline_id = env.create_tenant()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
config_lines = [
f"neon.safekeeper_proto_version = {safekeeper_proto_version}",
]
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=config_lines)
# we rely upon autocommit after each statement
res_1 = endpoint.safe_psql_many(
queries=[
@@ -33,7 +38,14 @@ def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
@pytest.mark.parametrize("num_timelines,num_safekeepers", [(3, 1)])
def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_safekeepers: int):
# Test both proto versions until we fully migrate.
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
def test_normal_work(
neon_env_builder: NeonEnvBuilder,
num_timelines: int,
num_safekeepers: int,
safekeeper_proto_version: int,
):
"""
Basic test:
* create new tenant with a timeline
@@ -52,4 +64,4 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s
pageserver_http = env.pageserver.http_client()
for _ in range(num_timelines):
check_tenant(env, pageserver_http)
check_tenant(env, pageserver_http, safekeeper_proto_version)

View File

@@ -539,13 +539,16 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_recovery_uncommitted(env))
async def run_wal_truncation(env: NeonEnv):
async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(sk1, sk2, sk3) = env.safekeepers
ep = env.endpoints.create_start("main")
config_lines = [
f"neon.safekeeper_proto_version = {safekeeper_proto_version}",
]
ep = env.endpoints.create_start("main", config_lines=config_lines)
ep.safe_psql("create table t (key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
@@ -572,6 +575,7 @@ async def run_wal_truncation(env: NeonEnv):
sk2.start()
ep = env.endpoints.create_start(
"main",
config_lines=config_lines,
)
ep.safe_psql("insert into t select generate_series(1, 200), 'payload'")
@@ -590,11 +594,13 @@ async def run_wal_truncation(env: NeonEnv):
# Simple deterministic test creating tail of WAL on safekeeper which is
# truncated when majority without this sk elects walproposer starting earlier.
def test_wal_truncation(neon_env_builder: NeonEnvBuilder):
# Test both proto versions until we fully migrate.
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_version: int):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
asyncio.run(run_wal_truncation(env))
asyncio.run(run_wal_truncation(env, safekeeper_proto_version))
async def run_segment_init_failure(env: NeonEnv):