Compare commits

...

21 Commits

Author SHA1 Message Date
Arseny Sher
b0005aada4 review fix 2025-02-25 10:47:40 +01:00
Arseny Sher
748bb5d855 remove todo 2025-02-07 11:51:37 +01:00
Arseny Sher
60ed1277f0 add v3 to one more test 2025-02-04 12:08:26 +01:00
Arseny Sher
bb93f390f3 fix test_simple_sync_safekeepers 2025-02-04 12:00:46 +01:00
Arseny Sher
db97d3f140 fix unit tests 2025-02-04 11:22:35 +01:00
Arseny Sher
42059b027e convert appendresponse 2025-02-04 10:11:27 +01:00
Arseny Sher
9aad617d76 convert appendrequest 2025-02-04 10:11:27 +01:00
Arseny Sher
443228e6b7 convert ProposerElected 2025-02-04 10:11:27 +01:00
Arseny Sher
50c93da736 remove propelected.timeline_start_lsn usages in sk 2025-02-04 10:11:27 +01:00
Arseny Sher
8fb781f908 Convert VoteResponse, remove timeline_start_lsn. 2025-02-04 10:11:27 +01:00
Arseny Sher
d5065339aa convert voterequest 2025-02-04 10:11:27 +01:00
Arseny Sher
9ecc22e355 move greeting init v2 to PAMessageSerialize 2025-02-04 10:11:27 +01:00
Arseny Sher
874f4ff0d9 wp: print mconf 2025-02-04 10:11:27 +01:00
Arseny Sher
ec745e4d08 convert acceptorgreeting 2025-02-04 10:11:27 +01:00
Arseny Sher
8683462157 rename sk members to m for brevity 2025-02-04 10:11:27 +01:00
Arseny Sher
7961f969a3 infra for v3 acceptor -> proposer msgs 2025-02-04 10:11:27 +01:00
Arseny Sher
98e98ac650 convert ProposerGreeting 2025-02-04 10:11:27 +01:00
Arseny Sher
efdddc062b Add two safekeeper proto versions to test_normal_work. 2025-02-04 10:11:27 +01:00
Arseny Sher
5e9c22f1b6 Pass proto selection to safekeeper, prepare parsing v3. 2025-02-04 10:11:27 +01:00
Arseny Sher
9630b5c965 Add PAMessageSerialize and ProposerAcceptorGreeting v3 2025-02-04 10:11:27 +01:00
Arseny Sher
0c386b272a pgindent wp code 2025-02-04 10:11:27 +01:00
17 changed files with 1347 additions and 452 deletions

View File

@@ -38,14 +38,12 @@ impl Display for SafekeeperId {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(transparent)]
pub struct MemberSet {
pub members: Vec<SafekeeperId>,
pub m: Vec<SafekeeperId>,
}
impl MemberSet {
pub fn empty() -> Self {
MemberSet {
members: Vec::new(),
}
MemberSet { m: Vec::new() }
}
pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
@@ -53,11 +51,11 @@ impl MemberSet {
if hs.len() != members.len() {
bail!("duplicate safekeeper id in the set {:?}", members);
}
Ok(MemberSet { members })
Ok(MemberSet { m: members })
}
pub fn contains(&self, sk: &SafekeeperId) -> bool {
self.members.iter().any(|m| m.id == sk.id)
self.m.iter().any(|m| m.id == sk.id)
}
pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
@@ -67,7 +65,7 @@ impl MemberSet {
sk.id, self
));
}
self.members.push(sk);
self.m.push(sk);
Ok(())
}
}
@@ -75,11 +73,7 @@ impl MemberSet {
impl Display for MemberSet {
/// Display as a comma separated list of members.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let sks_str = self
.members
.iter()
.map(|m| m.to_string())
.collect::<Vec<_>>();
let sks_str = self.m.iter().map(|sk| sk.to_string()).collect::<Vec<_>>();
write!(f, "({})", sks_str.join(", "))
}
}

View File

@@ -215,6 +215,7 @@ impl Wrapper {
syncSafekeepers: config.sync_safekeepers,
systemId: 0,
pgTimeline: 1,
proto_version: 3,
callback_data,
};
let c_config = Box::into_raw(Box::new(c_config));
@@ -276,6 +277,7 @@ mod tests {
use core::panic;
use std::{
cell::Cell,
ffi::CString,
sync::{atomic::AtomicUsize, mpsc::sync_channel},
};
@@ -496,57 +498,64 @@ mod tests {
// Messages definitions are at walproposer.h
// xxx: it would be better to extract them from safekeeper crate and
// use serialization/deserialization here.
let greeting_tag = (b'g' as u64).to_ne_bytes();
let proto_version = 2_u32.to_ne_bytes();
let pg_version: [u8; 4] = PG_VERSION_NUM.to_ne_bytes();
let proposer_id = [0; 16];
let system_id = 0_u64.to_ne_bytes();
let tenant_id = ttid.tenant_id.as_arr();
let timeline_id = ttid.timeline_id.as_arr();
let pg_tli = 1_u32.to_ne_bytes();
let wal_seg_size = 16777216_u32.to_ne_bytes();
let greeting_tag = (b'g').to_be_bytes();
let tenant_id = CString::new(ttid.tenant_id.to_string())
.unwrap()
.into_bytes_with_nul();
let timeline_id = CString::new(ttid.timeline_id.to_string())
.unwrap()
.into_bytes_with_nul();
let mconf_gen = 0_u32.to_be_bytes();
let mconf_members_len = 0_u32.to_be_bytes();
let mconf_members_new_len = 0_u32.to_be_bytes();
let pg_version: [u8; 4] = PG_VERSION_NUM.to_be_bytes();
let system_id = 0_u64.to_be_bytes();
let wal_seg_size = 16777216_u32.to_be_bytes();
let proposer_greeting = [
greeting_tag.as_slice(),
proto_version.as_slice(),
pg_version.as_slice(),
proposer_id.as_slice(),
system_id.as_slice(),
tenant_id.as_slice(),
timeline_id.as_slice(),
pg_tli.as_slice(),
mconf_gen.as_slice(),
mconf_members_len.as_slice(),
mconf_members_new_len.as_slice(),
pg_version.as_slice(),
system_id.as_slice(),
wal_seg_size.as_slice(),
]
.concat();
let voting_tag = (b'v' as u64).to_ne_bytes();
let vote_request_term = 3_u64.to_ne_bytes();
let proposer_id = [0; 16];
let voting_tag = (b'v').to_be_bytes();
let vote_request_term = 3_u64.to_be_bytes();
let vote_request = [
voting_tag.as_slice(),
mconf_gen.as_slice(),
vote_request_term.as_slice(),
proposer_id.as_slice(),
]
.concat();
let acceptor_greeting_term = 2_u64.to_ne_bytes();
let acceptor_greeting_node_id = 1_u64.to_ne_bytes();
let acceptor_greeting_term = 2_u64.to_be_bytes();
let acceptor_greeting_node_id = 1_u64.to_be_bytes();
let acceptor_greeting = [
greeting_tag.as_slice(),
acceptor_greeting_term.as_slice(),
acceptor_greeting_node_id.as_slice(),
mconf_gen.as_slice(),
mconf_members_len.as_slice(),
mconf_members_new_len.as_slice(),
acceptor_greeting_term.as_slice(),
]
.concat();
let vote_response_term = 3_u64.to_ne_bytes();
let vote_given = 1_u64.to_ne_bytes();
let flush_lsn = 0x539_u64.to_ne_bytes();
let truncate_lsn = 0x539_u64.to_ne_bytes();
let th_len = 1_u32.to_ne_bytes();
let th_term = 2_u64.to_ne_bytes();
let th_lsn = 0x539_u64.to_ne_bytes();
let timeline_start_lsn = 0x539_u64.to_ne_bytes();
let vote_response_term = 3_u64.to_be_bytes();
let vote_given = 1_u8.to_be_bytes();
let flush_lsn = 0x539_u64.to_be_bytes();
let truncate_lsn = 0x539_u64.to_be_bytes();
let th_len = 1_u32.to_be_bytes();
let th_term = 2_u64.to_be_bytes();
let th_lsn = 0x539_u64.to_be_bytes();
let vote_response = [
voting_tag.as_slice(),
mconf_gen.as_slice(),
vote_response_term.as_slice(),
vote_given.as_slice(),
flush_lsn.as_slice(),
@@ -554,7 +563,6 @@ mod tests {
th_len.as_slice(),
th_term.as_slice(),
th_lsn.as_slice(),
timeline_start_lsn.as_slice(),
]
.concat();

View File

@@ -51,6 +51,26 @@ HexDecodeString(uint8 *result, char *input, int nbytes)
return true;
}
/* --------------------------------
* pq_getmsgint16 - get a binary 2-byte int from a message buffer
* --------------------------------
*/
uint16
pq_getmsgint16(StringInfo msg)
{
return pq_getmsgint(msg, 2);
}
/* --------------------------------
* pq_getmsgint32 - get a binary 4-byte int from a message buffer
* --------------------------------
*/
uint32
pq_getmsgint32(StringInfo msg)
{
return pq_getmsgint(msg, 4);
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------

View File

@@ -8,6 +8,8 @@
#endif
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint16 pq_getmsgint16(StringInfo msg);
uint32 pq_getmsgint32(StringInfo msg);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);

View File

@@ -70,6 +70,7 @@ static bool SendAppendRequests(Safekeeper *sk);
static bool RecvAppendResponses(Safekeeper *sk);
static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp);
static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp);
static void PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version);
static void HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk);
static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size);
static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg);
@@ -81,6 +82,8 @@ static char *FormatSafekeeperState(Safekeeper *sk);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static char *FormatEvents(WalProposer *wp, uint32 events);
static void UpdateDonorShmem(WalProposer *wp);
static char *MembershipConfigurationToString(MembershipConfiguration *mconf);
static void MembershipConfigurationFree(MembershipConfiguration *mconf);
WalProposer *
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
@@ -137,25 +140,21 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
}
wp->quorum = wp->n_safekeepers / 2 + 1;
if (wp->config->proto_version != 2 && wp->config->proto_version != 3)
wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version);
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
/* Fill the greeting package */
wp->greetRequest.tag = 'g';
wp->greetRequest.protocolVersion = SK_PROTOCOL_VERSION;
wp->greetRequest.pgVersion = PG_VERSION_NUM;
wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId));
wp->greetRequest.systemId = wp->config->systemId;
if (!wp->config->neon_timeline)
wp_log(FATAL, "neon.timeline_id is not provided");
if (*wp->config->neon_timeline != '\0' &&
!HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16))
wp_log(FATAL, "could not parse neon.timeline_id, %s", wp->config->neon_timeline);
wp->greetRequest.pam.tag = 'g';
if (!wp->config->neon_tenant)
wp_log(FATAL, "neon.tenant_id is not provided");
if (*wp->config->neon_tenant != '\0' &&
!HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16))
wp_log(FATAL, "could not parse neon.tenant_id, %s", wp->config->neon_tenant);
wp->greetRequest.timeline = wp->config->pgTimeline;
wp->greetRequest.walSegSize = wp->config->wal_segment_size;
wp->greetRequest.tenant_id = wp->config->neon_tenant;
if (!wp->config->neon_timeline)
wp_log(FATAL, "neon.timeline_id is not provided");
wp->greetRequest.timeline_id = wp->config->neon_timeline;
wp->greetRequest.pg_version = PG_VERSION_NUM;
wp->greetRequest.system_id = wp->config->systemId;
wp->greetRequest.wal_seg_size = wp->config->wal_segment_size;
wp->api.init_event_set(wp);
@@ -165,12 +164,14 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
void
WalProposerFree(WalProposer *wp)
{
MembershipConfigurationFree(&wp->mconf);
for (int i = 0; i < wp->n_safekeepers; i++)
{
Safekeeper *sk = &wp->safekeeper[i];
Assert(sk->outbuf.data != NULL);
pfree(sk->outbuf.data);
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
sk->voteResponse.termHistory.entries = NULL;
@@ -308,6 +309,7 @@ ShutdownConnection(Safekeeper *sk)
sk->state = SS_OFFLINE;
sk->streamingAt = InvalidXLogRecPtr;
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
sk->voteResponse.termHistory.entries = NULL;
@@ -598,11 +600,14 @@ static void
SendStartWALPush(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
#define CMD_LEN 512
char cmd[CMD_LEN];
if (!wp->api.conn_send_query(sk, "START_WAL_PUSH"))
snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version);
if (!wp->api.conn_send_query(sk, cmd))
{
wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
sk->host, sk->port, wp->api.conn_error_message(sk));
wp_log(WARNING, "failed to send %s query to safekeeper %s:%s: %s",
cmd, sk->host, sk->port, wp->api.conn_error_message(sk));
ShutdownConnection(sk);
return;
}
@@ -658,23 +663,33 @@ RecvStartWALPushResult(Safekeeper *sk)
/*
* Start handshake: first of all send information about the
* safekeeper. After sending, we wait on SS_HANDSHAKE_RECV for
* walproposer. After sending, we wait on SS_HANDSHAKE_RECV for
* a response to finish the handshake.
*/
static void
SendProposerGreeting(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
char *mconf_toml = MembershipConfigurationToString(&wp->greetRequest.mconf);
wp_log(LOG, "sending ProposerGreeting to safekeeper %s:%s with mconf = %s", sk->host, sk->port, mconf_toml);
pfree(mconf_toml);
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->greetRequest,
&sk->outbuf, wp->config->proto_version);
/*
* On failure, logging & resetting the connection is handled. We just need
* to handle the control flow.
*/
BlockingWrite(sk, &sk->wp->greetRequest, sizeof(sk->wp->greetRequest), SS_HANDSHAKE_RECV);
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
}
static void
RecvAcceptorGreeting(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
char *mconf_toml;
/*
* If our reading doesn't immediately succeed, any necessary error
@@ -685,7 +700,10 @@ RecvAcceptorGreeting(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse))
return;
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, term=" INT64_FORMAT, sk->host, sk->port, sk->greetResponse.term);
mconf_toml = MembershipConfigurationToString(&sk->greetResponse.mconf);
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, node_id = %lu, mconf = %s, term=" UINT64_FORMAT,
sk->host, sk->port, sk->greetResponse.nodeId, mconf_toml, sk->greetResponse.term);
pfree(mconf_toml);
/* Protocol is all good, move to voting. */
sk->state = SS_VOTING;
@@ -707,12 +725,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
wp->propTerm++;
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
wp->voteRequest = (VoteRequest)
{
.tag = 'v',
.term = wp->propTerm
};
memcpy(wp->voteRequest.proposerId.data, wp->greetRequest.proposerId.data, UUID_LEN);
wp->voteRequest.pam.tag = 'v';
wp->voteRequest.generation = wp->mconf.generation;
wp->voteRequest.term = wp->propTerm;
}
}
else if (sk->greetResponse.term > wp->propTerm)
@@ -759,12 +774,14 @@ SendVoteRequest(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
/* We have quorum for voting, send our vote request */
wp_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term);
/* On failure, logging & resetting is handled */
if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT))
return;
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->voteRequest,
&sk->outbuf, wp->config->proto_version);
/* We have quorum for voting, send our vote request */
wp_log(LOG, "requesting vote from %s:%s for generation %u term " UINT64_FORMAT, sk->host, sk->port,
wp->voteRequest.generation, wp->voteRequest.term);
/* On failure, logging & resetting is handled */
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_WAIT_VERDICT);
/* If successful, wait for read-ready with SS_WAIT_VERDICT */
}
@@ -778,11 +795,12 @@ RecvVoteResponse(Safekeeper *sk)
return;
wp_log(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
"got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term,
sk->voteResponse.voteGiven,
GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn));
/*
* In case of acceptor rejecting our vote, bail out, but only if either it
@@ -847,9 +865,9 @@ HandleElectedProposer(WalProposer *wp)
* otherwise we must be sync-safekeepers and we have nothing to do then.
*
* Proceeding is not only pointless but harmful, because we'd give
* safekeepers term history starting with 0/0. These hacks will go away once
* we disable implicit timeline creation on safekeepers and create it with
* non zero LSN from the start.
* safekeepers term history starting with 0/0. These hacks will go away
* once we disable implicit timeline creation on safekeepers and create it
* with non zero LSN from the start.
*/
if (wp->propEpochStartLsn == InvalidXLogRecPtr)
{
@@ -942,7 +960,6 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->propEpochStartLsn = InvalidXLogRecPtr;
wp->donorEpoch = 0;
wp->truncateLsn = InvalidXLogRecPtr;
wp->timelineStartLsn = InvalidXLogRecPtr;
for (int i = 0; i < wp->n_safekeepers; i++)
{
@@ -959,20 +976,6 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->donor = i;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
if (wp->safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr)
{
/* timelineStartLsn should be the same everywhere or unknown */
if (wp->timelineStartLsn != InvalidXLogRecPtr &&
wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn)
{
wp_log(WARNING,
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
LSN_FORMAT_ARGS(wp->timelineStartLsn),
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
}
wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn;
}
}
}
@@ -995,22 +998,11 @@ DetermineEpochStartLsn(WalProposer *wp)
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
{
wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp);
if (wp->timelineStartLsn == InvalidXLogRecPtr)
{
wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp);
}
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
}
pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn);
/*
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so
* it should never be zero at this point, if we know timelineStartLsn.
*
* timelineStartLsn can be zero only on the first syncSafekeepers run.
*/
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
(wp->config->syncSafekeepers && wp->truncateLsn == wp->timelineStartLsn));
Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers);
/*
* We will be generating WAL since propEpochStartLsn, so we should set
@@ -1053,10 +1045,11 @@ DetermineEpochStartLsn(WalProposer *wp)
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
{
/*
* However, allow to proceed if last_log_term on the node which gave
* the highest vote (i.e. point where we are going to start writing)
* actually had been won by me; plain restart of walproposer not
* intervened by concurrent compute which wrote WAL is ok.
* However, allow to proceed if last_log_term on the node which
* gave the highest vote (i.e. point where we are going to start
* writing) actually had been won by me; plain restart of
* walproposer not intervened by concurrent compute which wrote
* WAL is ok.
*
* This avoids compute crash after manual term_bump.
*/
@@ -1126,14 +1119,8 @@ SendProposerElected(Safekeeper *sk)
{
/* safekeeper is empty or no common point, start from the beginning */
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u",
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
/*
* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline
* is created manually (test_s3_wal_replay)
*/
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, termHistory.n_entries=%u",
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), wp->propTermHistory.n_entries);
}
else
{
@@ -1158,29 +1145,19 @@ SendProposerElected(Safekeeper *sk)
Assert(sk->startStreamingAt <= wp->availableLsn);
msg.tag = 'e';
msg.apm.tag = 'e';
msg.generation = wp->mconf.generation;
msg.term = wp->propTerm;
msg.startStreamingAt = sk->startStreamingAt;
msg.termHistory = &wp->propTermHistory;
msg.timelineStartLsn = wp->timelineStartLsn;
lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0;
wp_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
resetStringInfo(&sk->outbuf);
pq_sendint64_le(&sk->outbuf, msg.tag);
pq_sendint64_le(&sk->outbuf, msg.term);
pq_sendint64_le(&sk->outbuf, msg.startStreamingAt);
pq_sendint32_le(&sk->outbuf, msg.termHistory->n_entries);
for (int i = 0; i < msg.termHistory->n_entries; i++)
{
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].term);
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].lsn);
}
pq_sendint64_le(&sk->outbuf, msg.timelineStartLsn);
"sending elected msg to node " UINT64_FORMAT " generation=%u term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s",
sk->greetResponse.nodeId, msg.generation, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt),
lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port);
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version);
if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH))
return;
@@ -1246,14 +1223,13 @@ static void
PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn)
{
Assert(endLsn >= beginLsn);
req->tag = 'a';
req->apm.tag = 'a';
req->generation = wp->mconf.generation;
req->term = wp->propTerm;
req->epochStartLsn = wp->propEpochStartLsn;
req->beginLsn = beginLsn;
req->endLsn = endLsn;
req->commitLsn = wp->commitLsn;
req->truncateLsn = wp->truncateLsn;
req->proposerId = wp->greetRequest.proposerId;
}
/*
@@ -1354,7 +1330,8 @@ SendAppendRequests(Safekeeper *sk)
resetStringInfo(&sk->outbuf);
/* write AppendRequest header */
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
PAMessageSerialize(wp, (ProposerAcceptorMessage *) req, &sk->outbuf, wp->config->proto_version);
/* prepare for reading WAL into the outbuf */
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
sk->active_state = SS_ACTIVE_READ_WAL;
}
@@ -1367,14 +1344,17 @@ SendAppendRequests(Safekeeper *sk)
req = &sk->appendRequest;
req_len = req->endLsn - req->beginLsn;
/* We send zero sized AppenRequests as heartbeats; don't wal_read for these. */
/*
* We send zero sized AppenRequests as heartbeats; don't wal_read
* for these.
*/
if (req_len > 0)
{
switch (wp->api.wal_read(sk,
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req_len,
&errmsg))
&sk->outbuf.data[sk->outbuf.len],
req->beginLsn,
req_len,
&errmsg))
{
case NEON_WALREAD_SUCCESS:
break;
@@ -1382,7 +1362,7 @@ SendAppendRequests(Safekeeper *sk)
return true;
case NEON_WALREAD_ERROR:
wp_log(WARNING, "WAL reading for node %s:%s failed: %s",
sk->host, sk->port, errmsg);
sk->host, sk->port, errmsg);
ShutdownConnection(sk);
return false;
default:
@@ -1470,11 +1450,11 @@ RecvAppendResponses(Safekeeper *sk)
* Term has changed to higher one, probably another compute is
* running. If this is the case we could PANIC as well because
* likely it inserted some data and our basebackup is unsuitable
* anymore. However, we also bump term manually (term_bump endpoint)
* on safekeepers for migration purposes, in this case we do want
* compute to stay alive. So restart walproposer with FATAL instead
* of panicking; if basebackup is spoiled next election will notice
* this.
* anymore. However, we also bump term manually (term_bump
* endpoint) on safekeepers for migration purposes, in this case
* we do want compute to stay alive. So restart walproposer with
* FATAL instead of panicking; if basebackup is spoiled next
* election will notice this.
*/
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
sk->host, sk->port,
@@ -1509,7 +1489,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
for (i = 0; i < nkeys; i++)
{
const char *key = pq_getmsgstring(reply_message);
const char *key = pq_getmsgrawstring(reply_message);
unsigned int value_len = pq_getmsgint(reply_message, sizeof(int32));
if (strcmp(key, "current_timeline_size") == 0)
@@ -1750,6 +1730,208 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
}
}
/* Serialize MembershipConfiguration into buf. */
static void
MembershipConfigurationSerialize(MembershipConfiguration *mconf, StringInfo buf)
{
uint32 i;
pq_sendint32(buf, mconf->generation);
pq_sendint32(buf, mconf->members.len);
for (i = 0; i < mconf->members.len; i++)
{
pq_sendint64(buf, mconf->members.m[i].node_id);
pq_send_ascii_string(buf, mconf->members.m[i].host);
pq_sendint16(buf, mconf->members.m[i].port);
}
/*
* There is no special mark for absent new_members; zero members in
* invalid, so zero len means absent.
*/
pq_sendint32(buf, mconf->new_members.len);
for (i = 0; i < mconf->new_members.len; i++)
{
pq_sendint64(buf, mconf->new_members.m[i].node_id);
pq_send_ascii_string(buf, mconf->new_members.m[i].host);
pq_sendint16(buf, mconf->new_members.m[i].port);
}
}
/* Serialize proposer -> acceptor message into buf using specified version */
static void
PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version)
{
/* both version are supported currently until we fully migrate to 3 */
Assert(proto_version == 3 || proto_version == 2);
resetStringInfo(buf);
if (proto_version == 3)
{
/*
* v2 sends structs for some messages as is, so commonly send tag only
* for v3
*/
pq_sendint8(buf, msg->tag);
switch (msg->tag)
{
case 'g':
{
ProposerGreeting *m = (ProposerGreeting *) msg;
pq_send_ascii_string(buf, m->tenant_id);
pq_send_ascii_string(buf, m->timeline_id);
MembershipConfigurationSerialize(&m->mconf, buf);
pq_sendint32(buf, m->pg_version);
pq_sendint64(buf, m->system_id);
pq_sendint32(buf, m->wal_seg_size);
break;
}
case 'v':
{
VoteRequest *m = (VoteRequest *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
break;
}
case 'e':
{
ProposerElected *m = (ProposerElected *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
pq_sendint64(buf, m->startStreamingAt);
pq_sendint32(buf, m->termHistory->n_entries);
for (uint32 i = 0; i < m->termHistory->n_entries; i++)
{
pq_sendint64(buf, m->termHistory->entries[i].term);
pq_sendint64(buf, m->termHistory->entries[i].lsn);
}
break;
}
case 'a':
{
/*
* Note: this serializes only AppendRequestHeader, caller
* is expected to append WAL data later.
*/
AppendRequestHeader *m = (AppendRequestHeader *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
pq_sendint64(buf, m->beginLsn);
pq_sendint64(buf, m->endLsn);
pq_sendint64(buf, m->commitLsn);
pq_sendint64(buf, m->truncateLsn);
break;
}
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
}
return;
}
if (proto_version == 2)
{
switch (msg->tag)
{
case 'g':
{
/* v2 sent struct as is */
ProposerGreeting *m = (ProposerGreeting *) msg;
ProposerGreetingV2 greetRequestV2;
/* Fill also v2 struct. */
greetRequestV2.tag = 'g';
greetRequestV2.protocolVersion = proto_version;
greetRequestV2.pgVersion = m->pg_version;
/*
* v3 removed this field because it's easier to pass as
* libq or START_WAL_PUSH options
*/
memset(&greetRequestV2.proposerId, 0, sizeof(greetRequestV2.proposerId));
greetRequestV2.systemId = wp->config->systemId;
if (*m->timeline_id != '\0' &&
!HexDecodeString(greetRequestV2.timeline_id, m->timeline_id, 16))
wp_log(FATAL, "could not parse neon.timeline_id, %s", m->timeline_id);
if (*m->tenant_id != '\0' &&
!HexDecodeString(greetRequestV2.tenant_id, m->tenant_id, 16))
wp_log(FATAL, "could not parse neon.tenant_id, %s", m->tenant_id);
greetRequestV2.timeline = wp->config->pgTimeline;
greetRequestV2.walSegSize = wp->config->wal_segment_size;
pq_sendbytes(buf, (char *) &greetRequestV2, sizeof(greetRequestV2));
break;
}
case 'v':
{
/* v2 sent struct as is */
VoteRequest *m = (VoteRequest *) msg;
VoteRequestV2 voteRequestV2;
voteRequestV2.tag = m->pam.tag;
voteRequestV2.term = m->term;
/* removed field */
memset(&voteRequestV2.proposerId, 0, sizeof(voteRequestV2.proposerId));
pq_sendbytes(buf, (char *) &voteRequestV2, sizeof(voteRequestV2));
break;
}
case 'e':
{
ProposerElected *m = (ProposerElected *) msg;
pq_sendint64_le(buf, m->apm.tag);
pq_sendint64_le(buf, m->term);
pq_sendint64_le(buf, m->startStreamingAt);
pq_sendint32_le(buf, m->termHistory->n_entries);
for (int i = 0; i < m->termHistory->n_entries; i++)
{
pq_sendint64_le(buf, m->termHistory->entries[i].term);
pq_sendint64_le(buf, m->termHistory->entries[i].lsn);
}
pq_sendint64_le(buf, 0); /* removed timeline_start_lsn */
break;
}
case 'a':
/*
* Note: this serializes only AppendRequestHeader, caller is
* expected to append WAL data later.
*/
{
/* v2 sent struct as is */
AppendRequestHeader *m = (AppendRequestHeader *) msg;
AppendRequestHeaderV2 appendRequestHeaderV2;
appendRequestHeaderV2.tag = m->apm.tag;
appendRequestHeaderV2.term = m->term;
appendRequestHeaderV2.epochStartLsn = 0; /* removed field */
appendRequestHeaderV2.beginLsn = m->beginLsn;
appendRequestHeaderV2.endLsn = m->endLsn;
appendRequestHeaderV2.commitLsn = m->commitLsn;
appendRequestHeaderV2.truncateLsn = m->truncateLsn;
/* removed field */
memset(&appendRequestHeaderV2.proposerId, 0, sizeof(appendRequestHeaderV2.proposerId));
pq_sendbytes(buf, (char *) &appendRequestHeaderV2, sizeof(appendRequestHeaderV2));
break;
}
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
}
return;
}
wp_log(FATAL, "unexpected proto_version %d", proto_version);
}
/*
* Try to read CopyData message from i'th safekeeper, resetting connection on
* failure.
@@ -1779,6 +1961,37 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
return false;
}
/* Deserialize membership configuration from buf to mconf. */
static void
MembershipConfigurationDeserialize(MembershipConfiguration *mconf, StringInfo buf)
{
uint32 i;
mconf->generation = pq_getmsgint32(buf);
mconf->members.len = pq_getmsgint32(buf);
mconf->members.m = palloc0(sizeof(SafekeeperId) * mconf->members.len);
for (i = 0; i < mconf->members.len; i++)
{
const char *buf_host;
mconf->members.m[i].node_id = pq_getmsgint64(buf);
buf_host = pq_getmsgrawstring(buf);
strlcpy(mconf->members.m[i].host, buf_host, sizeof(mconf->members.m[i].host));
mconf->members.m[i].port = pq_getmsgint16(buf);
}
mconf->new_members.len = pq_getmsgint32(buf);
mconf->new_members.m = palloc0(sizeof(SafekeeperId) * mconf->new_members.len);
for (i = 0; i < mconf->new_members.len; i++)
{
const char *buf_host;
mconf->new_members.m[i].node_id = pq_getmsgint64(buf);
buf_host = pq_getmsgrawstring(buf);
strlcpy(mconf->new_members.m[i].host, buf_host, sizeof(mconf->new_members.m[i].host));
mconf->new_members.m[i].port = pq_getmsgint16(buf);
}
}
/*
* Read next message with known type into provided struct, by reading a CopyData
* block from the safekeeper's postgres connection, returning whether the read
@@ -1787,6 +2000,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
* If the read needs more polling, we return 'false' and keep the state
* unmodified, waiting until it becomes read-ready to try again. If it fully
* failed, a warning is emitted and the connection is reset.
*
* Note: it pallocs if needed, i.e. for AcceptorGreeting and VoteResponse fields.
*/
static bool
AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
@@ -1795,82 +2010,154 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
char *buf;
int buf_size;
uint64 tag;
uint8 tag;
StringInfoData s;
if (!(AsyncRead(sk, &buf, &buf_size)))
return false;
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
/* parse it */
s.data = buf;
s.len = buf_size;
s.maxlen = buf_size;
s.cursor = 0;
tag = pq_getmsgint64_le(&s);
if (tag != anymsg->tag)
if (wp->config->proto_version == 3)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
switch (tag)
{
case 'g':
{
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->nodeId = pq_getmsgint64_le(&s);
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->voteGiven = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->truncateLsn = pq_getmsgint64_le(&s);
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (int i = 0; i < msg->termHistory.n_entries; i++)
tag = pq_getmsgbyte(&s);
if (tag != anymsg->tag)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
switch (tag)
{
case 'g':
{
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->nodeId = pq_getmsgint64(&s);
MembershipConfigurationDeserialize(&msg->mconf, &s);
msg->term = pq_getmsgint64(&s);
pq_getmsgend(&s);
return true;
}
msg->timelineStartLsn = pq_getmsgint64_le(&s);
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
case 'a':
{
AppendResponse *msg = (AppendResponse *) anymsg;
msg->generation = pq_getmsgint32(&s);
msg->term = pq_getmsgint64(&s);
msg->voteGiven = pq_getmsgbyte(&s);
msg->flushLsn = pq_getmsgint64(&s);
msg->truncateLsn = pq_getmsgint64(&s);
msg->termHistory.n_entries = pq_getmsgint32(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (uint32 i = 0; i < msg->termHistory.n_entries; i++)
{
msg->termHistory.entries[i].term = pq_getmsgint64(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64(&s);
}
pq_getmsgend(&s);
return true;
}
case 'a':
{
AppendResponse *msg = (AppendResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->commitLsn = pq_getmsgint64_le(&s);
msg->hs.ts = pq_getmsgint64_le(&s);
msg->hs.xmin.value = pq_getmsgint64_le(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}
default:
{
Assert(false);
return false;
}
msg->generation = pq_getmsgint32(&s);
msg->term = pq_getmsgint64(&s);
msg->flushLsn = pq_getmsgint64(&s);
msg->commitLsn = pq_getmsgint64(&s);
msg->hs.ts = pq_getmsgint64(&s);
msg->hs.xmin.value = pq_getmsgint64(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64(&s);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}
default:
{
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
return false;
}
}
}
else if (wp->config->proto_version == 2)
{
tag = pq_getmsgint64_le(&s);
if (tag != anymsg->tag)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
sk->port, FormatSafekeeperState(sk));
ResetConnection(sk);
return false;
}
switch (tag)
{
case 'g':
{
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->nodeId = pq_getmsgint64_le(&s);
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->voteGiven = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->truncateLsn = pq_getmsgint64_le(&s);
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (int i = 0; i < msg->termHistory.n_entries; i++)
{
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
}
pq_getmsgint64_le(&s); /* timelineStartLsn */
pq_getmsgend(&s);
return true;
}
case 'a':
{
AppendResponse *msg = (AppendResponse *) anymsg;
msg->term = pq_getmsgint64_le(&s);
msg->flushLsn = pq_getmsgint64_le(&s);
msg->commitLsn = pq_getmsgint64_le(&s);
msg->hs.ts = pq_getmsgint64_le(&s);
msg->hs.xmin.value = pq_getmsgint64_le(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}
default:
{
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
return false;
}
}
}
wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version);
return false; /* keep the compiler quiet */
}
/*
@@ -2246,3 +2533,45 @@ FormatEvents(WalProposer *wp, uint32 events)
return (char *) &return_str;
}
/* Dump mconf as toml for observability / debugging. Result is palloc'ed. */
static char *
MembershipConfigurationToString(MembershipConfiguration *mconf)
{
StringInfoData s;
uint32 i;
initStringInfo(&s);
appendStringInfo(&s, "{gen = %u", mconf->generation);
appendStringInfoString(&s, ", members = [");
for (i = 0; i < mconf->members.len; i++)
{
if (i > 0)
appendStringInfoString(&s, ", ");
appendStringInfo(&s, "{node_id = %lu", mconf->members.m[i].node_id);
appendStringInfo(&s, ", host = %s", mconf->members.m[i].host);
appendStringInfo(&s, ", port = %u }", mconf->members.m[i].port);
}
appendStringInfo(&s, "], new_members = [");
for (i = 0; i < mconf->new_members.len; i++)
{
if (i > 0)
appendStringInfoString(&s, ", ");
appendStringInfo(&s, "{node_id = %lu", mconf->new_members.m[i].node_id);
appendStringInfo(&s, ", host = %s", mconf->new_members.m[i].host);
appendStringInfo(&s, ", port = %u }", mconf->new_members.m[i].port);
}
appendStringInfoString(&s, "]}");
return s.data;
}
static void
MembershipConfigurationFree(MembershipConfiguration *mconf)
{
if (mconf->members.m)
pfree(mconf->members.m);
mconf->members.m = NULL;
if (mconf->new_members.m)
pfree(mconf->new_members.m);
mconf->new_members.m = NULL;
}

View File

@@ -12,9 +12,6 @@
#include "neon_walreader.h"
#include "pagestore_client.h"
#define SK_MAGIC 0xCafeCeefu
#define SK_PROTOCOL_VERSION 2
#define MAX_SAFEKEEPERS 32
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
* message */
@@ -143,12 +140,71 @@ typedef uint64 term_t;
/* neon storage node id */
typedef uint64 NNodeId;
/*
* Number uniquely identifying safekeeper membership configuration.
* This and following structs pair ones in membership.rs.
*/
typedef uint32 Generation;
typedef struct SafekeeperId
{
NNodeId node_id;
char host[MAXCONNINFO];
uint16 port;
} SafekeeperId;
/* Set of safekeepers. */
typedef struct MemberSet
{
uint32 len; /* number of members */
SafekeeperId *m; /* ids themselves */
} MemberSet;
/* Timeline safekeeper membership configuration. */
typedef struct MembershipConfiguration
{
Generation generation;
MemberSet members;
/* Has 0 n_members in non joint conf. */
MemberSet new_members;
} MembershipConfiguration;
/*
* Proposer <-> Acceptor messaging.
*/
typedef struct ProposerAcceptorMessage
{
uint8 tag;
} ProposerAcceptorMessage;
/* Initial Proposer -> Acceptor message */
typedef struct ProposerGreeting
{
ProposerAcceptorMessage pam; /* message tag */
/*
* tenant/timeline ids as C strings with standard hex notation for ease of
* printing. In principle they are not strictly needed as ttid is also
* passed as libpq options.
*/
char *tenant_id;
char *timeline_id;
/* Full conf is carried to allow safekeeper switch */
MembershipConfiguration mconf;
/*
* pg_version and wal_seg_size are used for timeline creation until we
* fully migrate to doing externally. systemId is only used as a sanity
* cross check.
*/
uint32 pg_version; /* in PG_VERSION_NUM format */
uint64 system_id; /* Postgres system identifier. */
uint32 wal_seg_size;
} ProposerGreeting;
/* protocol v2 variant, kept while wp supports it */
typedef struct ProposerGreetingV2
{
uint64 tag; /* message tag */
uint32 protocolVersion; /* proposer-safekeeper protocol version */
@@ -159,32 +215,42 @@ typedef struct ProposerGreeting
uint8 tenant_id[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
} ProposerGreetingV2;
typedef struct AcceptorProposerMessage
{
uint64 tag;
uint8 tag;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
* Acceptor -> Proposer initial response: the highest term acceptor voted for,
* its node id and configuration.
*/
typedef struct AcceptorGreeting
{
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
MembershipConfiguration mconf;
term_t term;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
*/
typedef struct VoteRequest
{
ProposerAcceptorMessage pam; /* message tag */
Generation generation; /* membership conf generation */
term_t term;
} VoteRequest;
/* protocol v2 variant, kept while wp supports it */
typedef struct VoteRequestV2
{
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
} VoteRequestV2;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
@@ -203,8 +269,15 @@ typedef struct TermHistory
typedef struct VoteResponse
{
AcceptorProposerMessage apm;
/*
* Membership conf generation. It's not strictly required because on
* mismatch safekeeper is expected to ERROR the connection, but let's
* sanity check it.
*/
Generation generation;
term_t term;
uint64 voteGiven;
uint8 voteGiven;
/*
* Safekeeper flush_lsn (end of WAL) + history of term switches allow
@@ -214,7 +287,6 @@ typedef struct VoteResponse
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for*
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
/*
@@ -223,20 +295,37 @@ typedef struct VoteResponse
*/
typedef struct ProposerElected
{
uint64 tag;
AcceptorProposerMessage apm;
Generation generation; /* membership conf generation */
term_t term;
/* proposer will send since this point */
XLogRecPtr startStreamingAt;
/* history of term switches up to this proposer */
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
*/
typedef struct AppendRequestHeader
{
AcceptorProposerMessage apm;
Generation generation; /* membership conf generation */
term_t term; /* term of the proposer */
XLogRecPtr beginLsn; /* start position of message in WAL */
XLogRecPtr endLsn; /* end position of message in WAL */
XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
/*
* minimal LSN which may be needed for recovery of some safekeeper (end
* lsn + 1 of last chunk streamed to everyone)
*/
XLogRecPtr truncateLsn;
/* in the AppendRequest message, WAL data follows */
} AppendRequestHeader;
/* protocol v2 variant, kept while wp supports it */
typedef struct AppendRequestHeaderV2
{
uint64 tag;
term_t term; /* term of the proposer */
@@ -256,7 +345,8 @@ typedef struct AppendRequestHeader
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
/* in the AppendRequest message, WAL data follows */
} AppendRequestHeaderV2;
/*
* Hot standby feedback received from replica
@@ -309,6 +399,13 @@ typedef struct AppendResponse
{
AcceptorProposerMessage apm;
/*
* Membership conf generation. It's not strictly required because on
* mismatch safekeeper is expected to ERROR the connection, but let's
* sanity check it.
*/
Generation generation;
/*
* Current term of the safekeeper; if it is higher than proposer's, the
* compute is out of date.
@@ -644,6 +741,8 @@ typedef struct WalProposerConfig
/* Will be passed to safekeepers in greet request. */
TimeLineID pgTimeline;
int proto_version;
#ifdef WALPROPOSER_LIB
void *callback_data;
#endif
@@ -656,11 +755,14 @@ typedef struct WalProposerConfig
typedef struct WalProposer
{
WalProposerConfig *config;
int n_safekeepers;
/* Current walproposer membership configuration */
MembershipConfiguration mconf;
/* (n_safekeepers / 2) + 1 */
int quorum;
/* Number of occupied slots in safekeepers[] */
int n_safekeepers;
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */
@@ -670,6 +772,7 @@ typedef struct WalProposer
XLogRecPtr commitLsn;
ProposerGreeting greetRequest;
ProposerGreetingV2 greetRequestV2;
/* Vote request for safekeeper */
VoteRequest voteRequest;

View File

@@ -117,14 +117,13 @@ pq_getmsgbytes(StringInfo msg, int datalen)
}
/* --------------------------------
* pq_getmsgstring - get a null-terminated text string (with conversion)
* pq_getmsgrawstring - get a null-terminated text string - NO conversion
*
* May return a pointer directly into the message buffer, or a pointer
* to a palloc'd conversion result.
* Returns a pointer directly into the message buffer.
* --------------------------------
*/
const char *
pq_getmsgstring(StringInfo msg)
pq_getmsgrawstring(StringInfo msg)
{
char *str;
int slen;
@@ -155,6 +154,45 @@ pq_getmsgend(StringInfo msg)
ExceptionalCondition("invalid msg format", __FILE__, __LINE__);
}
/* --------------------------------
* pq_sendbytes - append raw data to a StringInfo buffer
* --------------------------------
*/
void
pq_sendbytes(StringInfo buf, const void *data, int datalen)
{
/* use variant that maintains a trailing null-byte, out of caution */
appendBinaryStringInfo(buf, data, datalen);
}
/* --------------------------------
* pq_send_ascii_string - append a null-terminated text string (without conversion)
*
* This function intentionally bypasses encoding conversion, instead just
* silently replacing any non-7-bit-ASCII characters with question marks.
* It is used only when we are having trouble sending an error message to
* the client with normal localization and encoding conversion. The caller
* should already have taken measures to ensure the string is just ASCII;
* the extra work here is just to make certain we don't send a badly encoded
* string to the client (which might or might not be robust about that).
*
* NB: passed text string must be null-terminated, and so is the data
* sent to the frontend.
* --------------------------------
*/
void
pq_send_ascii_string(StringInfo buf, const char *str)
{
while (*str)
{
char ch = *str++;
if (IS_HIGHBIT_SET(ch))
ch = '?';
appendStringInfoCharMacro(buf, ch);
}
appendStringInfoChar(buf, '\0');
}
/*
* Produce a C-string representation of a TimestampTz.

View File

@@ -59,9 +59,11 @@
#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot"
/* GUCs */
char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
int safekeeper_proto_version = 2;
/* Set to true in the walproposer bgw. */
static bool am_walproposer;
@@ -126,6 +128,7 @@ init_walprop_config(bool syncSafekeepers)
else
walprop_config.systemId = 0;
walprop_config.pgTimeline = walprop_pg_get_timeline_id();
walprop_config.proto_version = safekeeper_proto_version;
}
/*
@@ -219,25 +222,37 @@ nwp_register_gucs(void)
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomIntVariable(
"neon.safekeeper_proto_version",
"Version of compute <-> safekeeper protocol.",
"Used while migrating from 2 to 3.",
&safekeeper_proto_version,
2, 0, INT_MAX,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
}
static int
split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
{
int n_safekeepers = 0;
char *curr_sk = safekeepers_list;
int n_safekeepers = 0;
char *curr_sk = safekeepers_list;
for (char *coma = safekeepers_list; coma != NULL && *coma != '\0'; curr_sk = coma)
{
if (++n_safekeepers >= MAX_SAFEKEEPERS) {
if (++n_safekeepers >= MAX_SAFEKEEPERS)
{
wpg_log(FATAL, "too many safekeepers");
}
coma = strchr(coma, ',');
safekeepers[n_safekeepers-1] = curr_sk;
safekeepers[n_safekeepers - 1] = curr_sk;
if (coma != NULL) {
if (coma != NULL)
{
*coma++ = '\0';
}
}
@@ -252,10 +267,10 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
static bool
safekeepers_cmp(char *old, char *new)
{
char *safekeepers_old[MAX_SAFEKEEPERS];
char *safekeepers_new[MAX_SAFEKEEPERS];
int len_old = 0;
int len_new = 0;
char *safekeepers_old[MAX_SAFEKEEPERS];
char *safekeepers_new[MAX_SAFEKEEPERS];
int len_old = 0;
int len_new = 0;
len_old = split_safekeepers_list(old, safekeepers_old);
len_new = split_safekeepers_list(new, safekeepers_new);
@@ -292,7 +307,8 @@ assign_neon_safekeepers(const char *newval, void *extra)
if (!am_walproposer)
return;
if (!newval) {
if (!newval)
{
/* should never happen */
wpg_log(FATAL, "neon.safekeepers is empty");
}
@@ -301,11 +317,11 @@ assign_neon_safekeepers(const char *newval, void *extra)
newval_copy = pstrdup(newval);
oldval = pstrdup(wal_acceptors_list);
/*
/*
* TODO: restarting through FATAL is stupid and introduces 1s delay before
* next bgw start. We should refactor walproposer to allow graceful exit and
* thus remove this delay.
* XXX: If you change anything here, sync with test_safekeepers_reconfigure_reorder.
* next bgw start. We should refactor walproposer to allow graceful exit
* and thus remove this delay. XXX: If you change anything here, sync with
* test_safekeepers_reconfigure_reorder.
*/
if (!safekeepers_cmp(oldval, newval_copy))
{
@@ -454,7 +470,8 @@ backpressure_throttling_impl(void)
memcpy(new_status, old_status, len);
snprintf(new_status + len, 64, "backpressure throttling: lag %lu", lag);
set_ps_display(new_status);
new_status[len] = '\0'; /* truncate off " backpressure ..." to later reset the ps */
new_status[len] = '\0'; /* truncate off " backpressure ..." to later
* reset the ps */
elog(DEBUG2, "backpressure throttling: lag %lu", lag);
start = GetCurrentTimestamp();
@@ -621,7 +638,7 @@ walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos)
wpg_log(LOG, "WAL proposer starts streaming at %X/%X",
LSN_FORMAT_ARGS(startpos));
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
cmd.timeline = wp->greetRequest.timeline;
cmd.timeline = wp->config->pgTimeline;
cmd.startpoint = startpos;
StartProposerReplication(wp, &cmd);
}
@@ -1963,10 +1980,11 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
FullTransactionId xmin = hsFeedback.xmin;
FullTransactionId catalog_xmin = hsFeedback.catalog_xmin;
FullTransactionId next_xid = ReadNextFullTransactionId();
/*
* Page server is updating nextXid in checkpoint each 1024 transactions,
* so feedback xmin can be actually larger then nextXid and
* function TransactionIdInRecentPast return false in this case,
* Page server is updating nextXid in checkpoint each 1024
* transactions, so feedback xmin can be actually larger then nextXid
* and function TransactionIdInRecentPast return false in this case,
* preventing update of slot's xmin.
*/
if (FullTransactionIdPrecedes(next_xid, xmin))

View File

@@ -88,13 +88,12 @@ fn bench_process_msg(c: &mut Criterion) {
let (lsn, record) = walgen.next().expect("endless WAL");
ProposerAcceptorMessage::AppendRequest(AppendRequest {
h: AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: Lsn(0),
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
})
@@ -160,13 +159,12 @@ fn bench_wal_acceptor(c: &mut Criterion) {
.take(n)
.map(|(lsn, record)| AppendRequest {
h: AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: Lsn(0),
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
})
@@ -262,13 +260,12 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
runtime.block_on(async {
let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest {
h: AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: Lsn(0),
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
});

View File

@@ -8,7 +8,7 @@
use anyhow::Context;
use postgres_backend::QueryError;
use safekeeper_api::membership::Configuration;
use safekeeper_api::membership::{Configuration, INVALID_GENERATION};
use safekeeper_api::{ServerInfo, Term};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -133,10 +133,10 @@ async fn send_proposer_elected(
let history = TermHistory(history_entries);
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
generation: INVALID_GENERATION,
term,
start_streaming_at: lsn,
term_history: history,
timeline_start_lsn: lsn,
});
tli.process_msg(&proposer_elected_request).await?;
@@ -170,13 +170,12 @@ pub async fn append_logical_message(
let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
h: AppendRequestHeader {
generation: INVALID_GENERATION,
term: msg.term,
term_start_lsn: begin_lsn,
begin_lsn,
end_lsn,
commit_lsn,
truncate_lsn: msg.truncate_lsn,
proposer_uuid: [0u8; 16],
},
wal_data,
});

View File

@@ -281,7 +281,7 @@ impl SafekeeperPostgresHandler {
tokio::select! {
// todo: add read|write .context to these errors
r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
r = network_write(pgb, reply_rx, pageserver_feedback_rx, proto_version) => r,
_ = timeline_cancel.cancelled() => {
return Err(CopyStreamHandlerEnd::Cancelled);
}
@@ -342,8 +342,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
let tli = match next_msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with walproposer {} sysid {} timeline {}",
self.peer_addr, greeting.system_id, greeting.tli,
"start handshake with walproposer {} sysid {}",
self.peer_addr, greeting.system_id,
);
let server_info = ServerInfo {
pg_version: greeting.pg_version,
@@ -459,6 +459,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
pgb_writer: &mut PostgresBackend<IO>,
mut reply_rx: Receiver<AcceptorProposerMessage>,
mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
proto_version: u32,
) -> Result<(), CopyStreamHandlerEnd> {
let mut buf = BytesMut::with_capacity(128);
@@ -496,7 +497,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
};
buf.clear();
msg.serialize(&mut buf)?;
msg.serialize(&mut buf, proto_version)?;
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
}
}

View File

@@ -7,6 +7,7 @@ use std::{fmt, pin::pin};
use anyhow::{bail, Context};
use futures::StreamExt;
use postgres_protocol::message::backend::ReplicationMessage;
use safekeeper_api::membership::INVALID_GENERATION;
use safekeeper_api::models::{PeerInfo, TimelineStatus};
use safekeeper_api::Term;
use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -267,7 +268,10 @@ async fn recover(
);
// Now understand our term history.
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
generation: INVALID_GENERATION,
term: donor.term,
});
let vote_response = match tli
.process_msg(&vote_request)
.await
@@ -302,10 +306,10 @@ async fn recover(
// truncate WAL locally
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
generation: INVALID_GENERATION,
term: donor.term,
start_streaming_at: last_common_point.lsn,
term_history: donor_th,
timeline_start_lsn: Lsn::INVALID,
});
// Successful ProposerElected handling always returns None. If term changed,
// we'll find out that during the streaming. Note: it is expected to get
@@ -434,13 +438,12 @@ async fn network_io(
match msg {
ReplicationMessage::XLogData(xlog_data) => {
let ar_hdr = AppendRequestHeader {
generation: INVALID_GENERATION,
term: donor.term,
term_start_lsn: Lsn::INVALID, // unused
begin_lsn: Lsn(xlog_data.wal_start()),
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
truncate_lsn: Lsn::INVALID, // do not attempt to advance
proposer_uuid: [0; 16],
};
let ar = AppendRequest {
h: ar_hdr,

View File

@@ -5,6 +5,11 @@ use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
use safekeeper_api::membership;
use safekeeper_api::membership::Generation;
use safekeeper_api::membership::MemberSet;
use safekeeper_api::membership::SafekeeperId;
use safekeeper_api::membership::INVALID_GENERATION;
use safekeeper_api::models::HotStandbyFeedback;
use safekeeper_api::Term;
use serde::{Deserialize, Serialize};
@@ -12,6 +17,7 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use std::str::FromStr;
use storage_broker::proto::SafekeeperTimelineInfo;
use tracing::*;
@@ -29,7 +35,8 @@ use utils::{
lsn::Lsn,
};
pub const SK_PROTOCOL_VERSION: u32 = 2;
pub const SK_PROTO_VERSION_2: u32 = 2;
pub const SK_PROTO_VERSION_3: u32 = 3;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
@@ -56,8 +63,28 @@ impl TermHistory {
TermHistory(Vec::new())
}
// Parse TermHistory as n_entries followed by TermLsn pairs
// Parse TermHistory as n_entries followed by TermLsn pairs in network order.
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
let n_entries = bytes
.get_u32_f()
.with_context(|| "TermHistory misses len")?;
let mut res = Vec::with_capacity(n_entries as usize);
for i in 0..n_entries {
let term = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses term", i))?;
let lsn = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses lsn", i))?
.into();
res.push(TermLsn { term, lsn })
}
Ok(TermHistory(res))
}
// Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
// TODO remove once v2 protocol is fully dropped.
pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
if bytes.remaining() < 4 {
bail!("TermHistory misses len");
}
@@ -197,6 +224,18 @@ impl AcceptorState {
/// Initial Proposer -> Acceptor message
#[derive(Debug, Deserialize)]
pub struct ProposerGreeting {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mconf: membership::Configuration,
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
pub wal_seg_size: u32,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Deserialize)]
pub struct ProposerGreetingV2 {
/// proposer-acceptor protocol version
pub protocol_version: u32,
/// Postgres server version
@@ -213,27 +252,35 @@ pub struct ProposerGreeting {
/// (acceptor voted for).
#[derive(Debug, Serialize)]
pub struct AcceptorGreeting {
term: u64,
node_id: NodeId,
mconf: membership::Configuration,
term: u64,
}
/// Vote request sent from proposer to safekeepers
#[derive(Debug, Deserialize)]
#[derive(Debug)]
pub struct VoteRequest {
pub generation: Generation,
pub term: Term,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Deserialize)]
pub struct VoteRequestV2 {
pub term: Term,
}
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
pub struct VoteResponse {
generation: Generation, // membership conf generation
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
vote_given: bool,
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
// proposer to choose the most advanced one.
pub flush_lsn: Lsn,
truncate_lsn: Lsn,
pub term_history: TermHistory,
timeline_start_lsn: Lsn,
}
/*
@@ -242,10 +289,10 @@ pub struct VoteResponse {
*/
#[derive(Debug)]
pub struct ProposerElected {
pub generation: Generation, // membership conf generation
pub term: Term,
pub start_streaming_at: Lsn,
pub term_history: TermHistory,
pub timeline_start_lsn: Lsn,
}
/// Request with WAL message sent from proposer to safekeeper. Along the way it
@@ -257,6 +304,22 @@ pub struct AppendRequest {
}
#[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeader {
pub generation: Generation, // membership conf generation
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
/// start position of message in WAL
pub begin_lsn: Lsn,
/// end position of message in WAL
pub end_lsn: Lsn,
/// LSN committed by quorum of safekeepers
pub commit_lsn: Lsn,
/// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
pub truncate_lsn: Lsn,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Clone, Deserialize)]
pub struct AppendRequestHeaderV2 {
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
pub term: Term,
// TODO: remove this field from the protocol, it in unused -- LSN of term
@@ -277,6 +340,9 @@ pub struct AppendRequestHeader {
/// Report safekeeper state to proposer
#[derive(Debug, Serialize, Clone)]
pub struct AppendResponse {
// Membership conf generation. Not strictly required because on mismatch
// connection is reset, but let's sanity check it.
generation: Generation,
// Current term of the safekeeper; if it is higher than proposer's, the
// compute is out of date.
pub term: Term,
@@ -293,8 +359,9 @@ pub struct AppendResponse {
}
impl AppendResponse {
fn term_only(term: Term) -> AppendResponse {
fn term_only(generation: Generation, term: Term) -> AppendResponse {
AppendResponse {
generation,
term,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
@@ -315,72 +382,316 @@ pub enum ProposerAcceptorMessage {
FlushWAL,
}
impl ProposerAcceptorMessage {
/// Parse proposer message.
pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
if proto_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
proto_version,
SK_PROTOCOL_VERSION
);
/// Augment Bytes with fallible get_uN where N is number of bytes methods.
/// All reads are in network (big endian) order.
trait BytesF {
fn get_u8_f(&mut self) -> Result<u8>;
fn get_u16_f(&mut self) -> Result<u16>;
fn get_u32_f(&mut self) -> Result<u32>;
fn get_u64_f(&mut self) -> Result<u64>;
}
impl BytesF for Bytes {
fn get_u8_f(&mut self) -> Result<u8> {
if self.is_empty() {
bail!("no bytes left, expected 1");
}
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
'g' => {
let msg = ProposerGreeting::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::Greeting(msg))
}
'v' => {
let msg = VoteRequest::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::VoteRequest(msg))
}
'e' => {
let mut msg_bytes = stream.into_inner();
if msg_bytes.remaining() < 16 {
bail!("ProposerElected message is not complete");
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
if msg_bytes.remaining() < 8 {
bail!("ProposerElected message is not complete");
}
let timeline_start_lsn = msg_bytes.get_u64_le().into();
let msg = ProposerElected {
term,
start_streaming_at,
timeline_start_lsn,
term_history,
Ok(self.get_u8())
}
fn get_u16_f(&mut self) -> Result<u16> {
if self.remaining() < 2 {
bail!("no bytes left, expected 2");
}
Ok(self.get_u16())
}
fn get_u32_f(&mut self) -> Result<u32> {
if self.remaining() < 4 {
bail!("only {} bytes left, expected 4", self.remaining());
}
Ok(self.get_u32())
}
fn get_u64_f(&mut self) -> Result<u64> {
if self.remaining() < 8 {
bail!("only {} bytes left, expected 8", self.remaining());
}
Ok(self.get_u64())
}
}
impl ProposerAcceptorMessage {
/// Read cstring from Bytes.
fn get_cstr(buf: &mut Bytes) -> Result<String> {
let pos = buf
.iter()
.position(|x| *x == 0)
.ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
let result = buf.split_to(pos);
buf.advance(1); // drop the null terminator
match std::str::from_utf8(&result) {
Ok(s) => Ok(s.to_string()),
Err(e) => bail!("invalid utf8 in cstring: {}", e),
}
}
/// Read membership::Configuration from Bytes.
fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
let generation = buf.get_u32_f().with_context(|| "reading generation")?;
let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
// Main member set must have at least someone in valid configuration.
// Empty conf is allowed until we fully migrate.
if generation != INVALID_GENERATION && members_len == 0 {
bail!("empty members_len");
}
let mut members = MemberSet::empty();
for i in 0..members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading member {} node_id", i))?;
let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading member {} port", i))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
pg_port,
};
members.add(sk)?;
}
let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
// Non joint conf.
if new_members_len == 0 {
Ok(membership::Configuration {
generation,
members,
new_members: None,
})
} else {
let mut new_members = MemberSet::empty();
for i in 0..new_members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading new member {} node_id", i))?;
let host = Self::get_cstr(buf)
.with_context(|| format!("reading new member {} host", i))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading new member {} port", i))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
pg_port,
};
Ok(ProposerAcceptorMessage::Elected(msg))
new_members.add(sk)?;
}
'a' => {
// read header followed by wal data
let hdr = AppendRequestHeader::des_from(&mut stream)?;
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.context("begin_lsn > end_lsn in AppendRequest")?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
"AppendRequest is longer than MAX_SEND_SIZE ({})",
MAX_SEND_SIZE
);
Ok(membership::Configuration {
generation,
members,
new_members: Some(new_members),
})
}
}
/// Parse proposer message.
pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
if proto_version == SK_PROTO_VERSION_3 {
if msg_bytes.is_empty() {
bail!("ProposerAcceptorMessage is not complete: missing tag");
}
let tag = msg_bytes.get_u8_f().with_context(|| {
"ProposerAcceptorMessage is not complete: missing tag".to_string()
})? as char;
match tag {
'g' => {
let tenant_id_str =
Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
let tenant_id = TenantId::from_str(&tenant_id_str)?;
let timeline_id_str =
Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
let timeline_id = TimelineId::from_str(&timeline_id_str)?;
let mconf = Self::get_mconf(&mut msg_bytes)?;
let pg_version = msg_bytes
.get_u32_f()
.with_context(|| "reading pg_version")?;
let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
let wal_seg_size = msg_bytes
.get_u32_f()
.with_context(|| "reading wal_seg_size")?;
let g = ProposerGreeting {
tenant_id,
timeline_id,
mconf,
pg_version,
system_id,
wal_seg_size,
};
Ok(ProposerAcceptorMessage::Greeting(g))
}
'v' => {
let generation = msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?;
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let v = VoteRequest { generation, term };
Ok(ProposerAcceptorMessage::VoteRequest(v))
}
'e' => {
let generation = msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?;
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let start_streaming_at: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading start_streaming_at")?
.into();
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
let msg = ProposerElected {
generation,
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
'a' => {
let generation = msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?;
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let begin_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading begin_lsn")?
.into();
let end_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading end_lsn")?
.into();
let commit_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading commit_lsn")?
.into();
let truncate_lsn: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading truncate_lsn")?
.into();
let hdr = AppendRequestHeader {
generation,
term,
begin_lsn,
end_lsn,
commit_lsn,
truncate_lsn,
};
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.context("begin_lsn > end_lsn in AppendRequest")?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
"AppendRequest is longer than MAX_SEND_SIZE ({})",
MAX_SEND_SIZE
);
}
if msg_bytes.remaining() < rec_size {
bail!(
"reading WAL: only {} bytes left, wanted {}",
msg_bytes.remaining(),
rec_size
);
}
let wal_data = msg_bytes.copy_to_bytes(rec_size);
let msg = AppendRequest { h: hdr, wal_data };
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
stream.read_exact(&mut wal_data_vec)?;
let wal_data = Bytes::from(wal_data_vec);
let msg = AppendRequest { h: hdr, wal_data };
Ok(ProposerAcceptorMessage::AppendRequest(msg))
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
} else if proto_version == SK_PROTO_VERSION_2 {
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
'g' => {
let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
let g = ProposerGreeting {
tenant_id: msgv2.tenant_id,
timeline_id: msgv2.timeline_id,
mconf: membership::Configuration {
generation: INVALID_GENERATION,
members: MemberSet::empty(),
new_members: None,
},
pg_version: msgv2.pg_version,
system_id: msgv2.system_id,
wal_seg_size: msgv2.wal_seg_size,
};
Ok(ProposerAcceptorMessage::Greeting(g))
}
'v' => {
let msg = VoteRequestV2::des_from(&mut stream)?;
let v = VoteRequest {
generation: INVALID_GENERATION,
term: msg.term,
};
Ok(ProposerAcceptorMessage::VoteRequest(v))
}
'e' => {
let mut msg_bytes = stream.into_inner();
if msg_bytes.remaining() < 16 {
bail!("ProposerElected message is not complete");
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
if msg_bytes.remaining() < 8 {
bail!("ProposerElected message is not complete");
}
let _timeline_start_lsn = msg_bytes.get_u64_le();
let msg = ProposerElected {
generation: INVALID_GENERATION,
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
'a' => {
// read header followed by wal data
let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
let hdr = AppendRequestHeader {
generation: INVALID_GENERATION,
term: hdrv2.term,
begin_lsn: hdrv2.begin_lsn,
end_lsn: hdrv2.end_lsn,
commit_lsn: hdrv2.commit_lsn,
truncate_lsn: hdrv2.truncate_lsn,
};
let rec_size = hdr
.end_lsn
.checked_sub(hdr.begin_lsn)
.context("begin_lsn > end_lsn in AppendRequest")?
.0 as usize;
if rec_size > MAX_SEND_SIZE {
bail!(
"AppendRequest is longer than MAX_SEND_SIZE ({})",
MAX_SEND_SIZE
);
}
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
stream.read_exact(&mut wal_data_vec)?;
let wal_data = Bytes::from(wal_data_vec);
let msg = AppendRequest { h: hdr, wal_data };
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
} else {
bail!("unsupported protocol version {}", proto_version);
}
}
@@ -394,36 +705,21 @@ impl ProposerAcceptorMessage {
// We explicitly list all fields, to draw attention here when new fields are added.
let mut size = BASE_SIZE;
size += match self {
Self::Greeting(ProposerGreeting {
protocol_version: _,
pg_version: _,
proposer_id: _,
system_id: _,
timeline_id: _,
tenant_id: _,
tli: _,
wal_seg_size: _,
}) => 0,
Self::Greeting(_) => 0,
Self::VoteRequest(VoteRequest { term: _ }) => 0,
Self::VoteRequest(_) => 0,
Self::Elected(ProposerElected {
term: _,
start_streaming_at: _,
term_history: _,
timeline_start_lsn: _,
}) => 0,
Self::Elected(_) => 0,
Self::AppendRequest(AppendRequest {
h:
AppendRequestHeader {
generation: _,
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),
@@ -431,13 +727,12 @@ impl ProposerAcceptorMessage {
Self::NoFlushAppendRequest(AppendRequest {
h:
AppendRequestHeader {
generation: _,
term: _,
term_start_lsn: _,
begin_lsn: _,
end_lsn: _,
commit_lsn: _,
truncate_lsn: _,
proposer_uuid: _,
},
wal_data,
}) => wal_data.len(),
@@ -458,45 +753,118 @@ pub enum AcceptorProposerMessage {
}
impl AcceptorProposerMessage {
/// Serialize acceptor -> proposer message.
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u64_le('g' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.node_id.0);
}
AcceptorProposerMessage::VoteResponse(msg) => {
buf.put_u64_le('v' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
buf.put_u64_le(msg.timeline_start_lsn.into());
}
AcceptorProposerMessage::AppendResponse(msg) => {
buf.put_u64_le('a' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.commit_lsn.into());
buf.put_i64_le(msg.hs_feedback.ts);
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
fn put_cstr(buf: &mut BytesMut, s: &str) {
buf.put_slice(s.as_bytes());
buf.put_u8(0); // null terminator
}
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
/// Serialize membership::Configuration into buf.
fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
buf.put_u32(mconf.generation);
buf.put_u32(mconf.members.m.len() as u32);
for sk in &mconf.members.m {
buf.put_u64(sk.id.0);
Self::put_cstr(buf, &sk.host);
buf.put_u16(sk.pg_port);
}
if let Some(ref new_members) = mconf.new_members {
buf.put_u32(new_members.m.len() as u32);
for sk in &new_members.m {
buf.put_u64(sk.id.0);
Self::put_cstr(buf, &sk.host);
buf.put_u16(sk.pg_port);
}
} else {
buf.put_u32(0);
}
}
Ok(())
/// Serialize acceptor -> proposer message.
pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
if proto_version == SK_PROTO_VERSION_3 {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u8(b'g');
buf.put_u64(msg.node_id.0);
Self::serialize_mconf(buf, &msg.mconf);
buf.put_u64(msg.term)
}
AcceptorProposerMessage::VoteResponse(msg) => {
buf.put_u8(b'v');
buf.put_u32(msg.generation);
buf.put_u64(msg.term);
buf.put_u8(msg.vote_given as u8);
buf.put_u64(msg.flush_lsn.into());
buf.put_u64(msg.truncate_lsn.into());
buf.put_u32(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64(e.term);
buf.put_u64(e.lsn.into());
}
}
AcceptorProposerMessage::AppendResponse(msg) => {
buf.put_u8(b'a');
buf.put_u32(msg.generation);
buf.put_u64(msg.term);
buf.put_u64(msg.flush_lsn.into());
buf.put_u64(msg.commit_lsn.into());
buf.put_i64(msg.hs_feedback.ts);
buf.put_u64(msg.hs_feedback.xmin);
buf.put_u64(msg.hs_feedback.catalog_xmin);
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
}
Ok(())
// TODO remove 3 after converting all msgs
} else if proto_version == SK_PROTO_VERSION_2 {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u64_le('g' as u64);
// v2 didn't have mconf and fields were reordered
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.node_id.0);
}
AcceptorProposerMessage::VoteResponse(msg) => {
// v2 didn't have generation, had u64 vote_given and timeline_start_lsn
buf.put_u64_le('v' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given as u64);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
// removed timeline_start_lsn
buf.put_u64_le(0);
}
AcceptorProposerMessage::AppendResponse(msg) => {
// v2 didn't have generation
buf.put_u64_le('a' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.commit_lsn.into());
buf.put_i64_le(msg.hs_feedback.ts);
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
// if it is not present.
if let Some(ref msg) = msg.pageserver_feedback {
msg.serialize(buf);
}
}
}
Ok(())
} else {
bail!("unsupported protocol version {}", proto_version);
}
}
}
@@ -593,14 +961,6 @@ where
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
// Check protocol compatibility
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
msg.protocol_version,
SK_PROTOCOL_VERSION
);
}
/* Postgres major version mismatch is treated as fatal error
* because safekeepers parse WAL headers and the format
* may change between versions.
@@ -655,15 +1015,16 @@ where
self.state.finish_change(&state).await?;
}
info!(
"processed greeting from walproposer {}, sending term {:?}",
msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
self.state.acceptor_state.term
);
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.state.acceptor_state.term,
let apg = AcceptorGreeting {
node_id: self.node_id,
})))
mconf: self.state.mconf.clone(),
term: self.state.acceptor_state.term,
};
info!(
"processed greeting {:?} from walproposer, sending {:?}",
msg, apg
);
Ok(Some(AcceptorProposerMessage::Greeting(apg)))
}
/// Give vote for the given term, if we haven't done that previously.
@@ -684,12 +1045,12 @@ where
self.wal_store.flush_wal().await?;
// initialize with refusal
let mut resp = VoteResponse {
generation: self.state.mconf.generation,
term: self.state.acceptor_state.term,
vote_given: false as u64,
vote_given: false,
flush_lsn: self.flush_lsn(),
truncate_lsn: self.state.inmem.peer_horizon_lsn,
term_history: self.get_term_history(),
timeline_start_lsn: self.state.timeline_start_lsn,
};
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.start_change();
@@ -698,15 +1059,16 @@ where
self.state.finish_change(&state).await?;
resp.term = self.state.acceptor_state.term;
resp.vote_given = true as u64;
resp.vote_given = true;
}
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
info!("processed {:?}: sending {:?}", msg, &resp);
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
let ar = AppendResponse {
generation: self.state.mconf.generation,
term: self.state.acceptor_state.term,
flush_lsn: self.flush_lsn(),
commit_lsn: self.state.commit_lsn,
@@ -805,18 +1167,22 @@ where
// Here we learn initial LSN for the first time, set fields
// interested in that.
if state.timeline_start_lsn == Lsn(0) {
// Remember point where WAL begins globally.
state.timeline_start_lsn = msg.timeline_start_lsn;
info!(
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
if let Some(start_lsn) = msg.term_history.0.first() {
if state.timeline_start_lsn == Lsn(0) {
// Remember point where WAL begins globally. In the future it
// will be intialized immediately on timeline creation.
state.timeline_start_lsn = start_lsn.lsn;
info!(
"setting timeline_start_lsn to {:?}",
state.timeline_start_lsn
);
}
}
if state.peer_horizon_lsn == Lsn(0) {
// Update peer_horizon_lsn as soon as we know where timeline starts.
// It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
state.peer_horizon_lsn = msg.timeline_start_lsn;
state.peer_horizon_lsn = state.timeline_start_lsn;
}
if state.local_start_lsn == Lsn(0) {
state.local_start_lsn = msg.start_streaming_at;
@@ -896,7 +1262,10 @@ where
// If our term is higher, immediately refuse the message.
if self.state.acceptor_state.term > msg.h.term {
let resp = AppendResponse::term_only(self.state.acceptor_state.term);
let resp = AppendResponse::term_only(
self.state.mconf.generation,
self.state.acceptor_state.term,
);
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
@@ -924,10 +1293,8 @@ where
);
}
// Now we know that we are in the same term as the proposer,
// processing the message.
self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
// Now we know that we are in the same term as the proposer, process the
// message.
// do the job
if !msg.wal_data.is_empty() {
@@ -1097,10 +1464,13 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
// check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
generation: 0,
term: 1,
});
let mut vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
r => panic!("unexpected response: {:?}", r),
}
@@ -1115,7 +1485,7 @@ mod tests {
// and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
r => panic!("unexpected response: {:?}", r),
}
}
@@ -1130,13 +1500,12 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let mut ar_hdr = AppendRequestHeader {
generation: 0,
term: 2,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let mut append_request = AppendRequest {
h: ar_hdr.clone(),
@@ -1144,6 +1513,7 @@ mod tests {
};
let pem = ProposerElected {
generation: 0,
term: 2,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![
@@ -1156,7 +1526,6 @@ mod tests {
lsn: Lsn(3),
},
]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
@@ -1191,26 +1560,25 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let pem = ProposerElected {
generation: 0,
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(1),
}]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();
let ar_hdr = AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let append_request = AppendRequest {
h: ar_hdr.clone(),

View File

@@ -73,10 +73,10 @@ impl Env {
// Emulate an initial election.
safekeeper
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
generation: 0,
term: 1,
start_streaming_at: start_lsn,
term_history: TermHistory(vec![(1, start_lsn).into()]),
timeline_start_lsn: start_lsn,
}))
.await?;
@@ -142,13 +142,12 @@ impl Env {
let req = AppendRequest {
h: AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: start_lsn,
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: lsn,
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
};

View File

@@ -15,9 +15,7 @@ use desim::{
};
use http::Uri;
use safekeeper::{
safekeeper::{
ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION,
},
safekeeper::{ProposerAcceptorMessage, SafeKeeper, SK_PROTO_VERSION_3, UNKNOWN_SERVER_VERSION},
state::{TimelinePersistentState, TimelineState},
timeline::TimelineError,
wal_storage::Storage,
@@ -287,7 +285,7 @@ impl ConnState {
bail!("finished processing START_REPLICATION")
}
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?;
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTO_VERSION_3)?;
debug!("got msg: {:?}", msg);
self.process(msg, global)
} else {
@@ -403,7 +401,7 @@ impl ConnState {
// TODO: if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
let mut buf = BytesMut::with_capacity(128);
reply.serialize(&mut buf)?;
reply.serialize(&mut buf, SK_PROTO_VERSION_3)?;
self.tcp.send(AnyMessage::Bytes(buf.into()));
}

View File

@@ -6,9 +6,14 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.pageserver.http import PageserverHttpClient
def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
def check_tenant(
env: NeonEnv, pageserver_http: PageserverHttpClient, safekeeper_proto_version: int
):
tenant_id, timeline_id = env.create_tenant()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
config_lines = [
f"neon.safekeeper_proto_version = {safekeeper_proto_version}",
]
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=config_lines)
# we rely upon autocommit after each statement
res_1 = endpoint.safe_psql_many(
queries=[
@@ -33,7 +38,14 @@ def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
@pytest.mark.parametrize("num_timelines,num_safekeepers", [(3, 1)])
def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_safekeepers: int):
# Test both proto versions until we fully migrate.
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
def test_normal_work(
neon_env_builder: NeonEnvBuilder,
num_timelines: int,
num_safekeepers: int,
safekeeper_proto_version: int,
):
"""
Basic test:
* create new tenant with a timeline
@@ -52,4 +64,4 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s
pageserver_http = env.pageserver.http_client()
for _ in range(num_timelines):
check_tenant(env, pageserver_http)
check_tenant(env, pageserver_http, safekeeper_proto_version)

View File

@@ -538,13 +538,16 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_recovery_uncommitted(env))
async def run_wal_truncation(env: NeonEnv):
async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(sk1, sk2, sk3) = env.safekeepers
ep = env.endpoints.create_start("main")
config_lines = [
f"neon.safekeeper_proto_version = {safekeeper_proto_version}",
]
ep = env.endpoints.create_start("main", config_lines=config_lines)
ep.safe_psql("create table t (key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
@@ -571,6 +574,7 @@ async def run_wal_truncation(env: NeonEnv):
sk2.start()
ep = env.endpoints.create_start(
"main",
config_lines=config_lines,
)
ep.safe_psql("insert into t select generate_series(1, 200), 'payload'")
@@ -589,11 +593,13 @@ async def run_wal_truncation(env: NeonEnv):
# Simple deterministic test creating tail of WAL on safekeeper which is
# truncated when majority without this sk elects walproposer starting earlier.
def test_wal_truncation(neon_env_builder: NeonEnvBuilder):
# Test both proto versions until we fully migrate.
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_version: int):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
asyncio.run(run_wal_truncation(env))
asyncio.run(run_wal_truncation(env, safekeeper_proto_version))
async def run_segment_init_failure(env: NeonEnv):