mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
21 Commits
release-pr
...
sk-proto-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0005aada4 | ||
|
|
748bb5d855 | ||
|
|
60ed1277f0 | ||
|
|
bb93f390f3 | ||
|
|
db97d3f140 | ||
|
|
42059b027e | ||
|
|
9aad617d76 | ||
|
|
443228e6b7 | ||
|
|
50c93da736 | ||
|
|
8fb781f908 | ||
|
|
d5065339aa | ||
|
|
9ecc22e355 | ||
|
|
874f4ff0d9 | ||
|
|
ec745e4d08 | ||
|
|
8683462157 | ||
|
|
7961f969a3 | ||
|
|
98e98ac650 | ||
|
|
efdddc062b | ||
|
|
5e9c22f1b6 | ||
|
|
9630b5c965 | ||
|
|
0c386b272a |
@@ -38,14 +38,12 @@ impl Display for SafekeeperId {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(transparent)]
|
||||
pub struct MemberSet {
|
||||
pub members: Vec<SafekeeperId>,
|
||||
pub m: Vec<SafekeeperId>,
|
||||
}
|
||||
|
||||
impl MemberSet {
|
||||
pub fn empty() -> Self {
|
||||
MemberSet {
|
||||
members: Vec::new(),
|
||||
}
|
||||
MemberSet { m: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn new(members: Vec<SafekeeperId>) -> anyhow::Result<Self> {
|
||||
@@ -53,11 +51,11 @@ impl MemberSet {
|
||||
if hs.len() != members.len() {
|
||||
bail!("duplicate safekeeper id in the set {:?}", members);
|
||||
}
|
||||
Ok(MemberSet { members })
|
||||
Ok(MemberSet { m: members })
|
||||
}
|
||||
|
||||
pub fn contains(&self, sk: &SafekeeperId) -> bool {
|
||||
self.members.iter().any(|m| m.id == sk.id)
|
||||
self.m.iter().any(|m| m.id == sk.id)
|
||||
}
|
||||
|
||||
pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> {
|
||||
@@ -67,7 +65,7 @@ impl MemberSet {
|
||||
sk.id, self
|
||||
));
|
||||
}
|
||||
self.members.push(sk);
|
||||
self.m.push(sk);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -75,11 +73,7 @@ impl MemberSet {
|
||||
impl Display for MemberSet {
|
||||
/// Display as a comma separated list of members.
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let sks_str = self
|
||||
.members
|
||||
.iter()
|
||||
.map(|m| m.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
let sks_str = self.m.iter().map(|sk| sk.to_string()).collect::<Vec<_>>();
|
||||
write!(f, "({})", sks_str.join(", "))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,6 +215,7 @@ impl Wrapper {
|
||||
syncSafekeepers: config.sync_safekeepers,
|
||||
systemId: 0,
|
||||
pgTimeline: 1,
|
||||
proto_version: 3,
|
||||
callback_data,
|
||||
};
|
||||
let c_config = Box::into_raw(Box::new(c_config));
|
||||
@@ -276,6 +277,7 @@ mod tests {
|
||||
use core::panic;
|
||||
use std::{
|
||||
cell::Cell,
|
||||
ffi::CString,
|
||||
sync::{atomic::AtomicUsize, mpsc::sync_channel},
|
||||
};
|
||||
|
||||
@@ -496,57 +498,64 @@ mod tests {
|
||||
// Messages definitions are at walproposer.h
|
||||
// xxx: it would be better to extract them from safekeeper crate and
|
||||
// use serialization/deserialization here.
|
||||
let greeting_tag = (b'g' as u64).to_ne_bytes();
|
||||
let proto_version = 2_u32.to_ne_bytes();
|
||||
let pg_version: [u8; 4] = PG_VERSION_NUM.to_ne_bytes();
|
||||
let proposer_id = [0; 16];
|
||||
let system_id = 0_u64.to_ne_bytes();
|
||||
let tenant_id = ttid.tenant_id.as_arr();
|
||||
let timeline_id = ttid.timeline_id.as_arr();
|
||||
let pg_tli = 1_u32.to_ne_bytes();
|
||||
let wal_seg_size = 16777216_u32.to_ne_bytes();
|
||||
let greeting_tag = (b'g').to_be_bytes();
|
||||
let tenant_id = CString::new(ttid.tenant_id.to_string())
|
||||
.unwrap()
|
||||
.into_bytes_with_nul();
|
||||
let timeline_id = CString::new(ttid.timeline_id.to_string())
|
||||
.unwrap()
|
||||
.into_bytes_with_nul();
|
||||
let mconf_gen = 0_u32.to_be_bytes();
|
||||
let mconf_members_len = 0_u32.to_be_bytes();
|
||||
let mconf_members_new_len = 0_u32.to_be_bytes();
|
||||
let pg_version: [u8; 4] = PG_VERSION_NUM.to_be_bytes();
|
||||
let system_id = 0_u64.to_be_bytes();
|
||||
let wal_seg_size = 16777216_u32.to_be_bytes();
|
||||
|
||||
let proposer_greeting = [
|
||||
greeting_tag.as_slice(),
|
||||
proto_version.as_slice(),
|
||||
pg_version.as_slice(),
|
||||
proposer_id.as_slice(),
|
||||
system_id.as_slice(),
|
||||
tenant_id.as_slice(),
|
||||
timeline_id.as_slice(),
|
||||
pg_tli.as_slice(),
|
||||
mconf_gen.as_slice(),
|
||||
mconf_members_len.as_slice(),
|
||||
mconf_members_new_len.as_slice(),
|
||||
pg_version.as_slice(),
|
||||
system_id.as_slice(),
|
||||
wal_seg_size.as_slice(),
|
||||
]
|
||||
.concat();
|
||||
|
||||
let voting_tag = (b'v' as u64).to_ne_bytes();
|
||||
let vote_request_term = 3_u64.to_ne_bytes();
|
||||
let proposer_id = [0; 16];
|
||||
let voting_tag = (b'v').to_be_bytes();
|
||||
let vote_request_term = 3_u64.to_be_bytes();
|
||||
let vote_request = [
|
||||
voting_tag.as_slice(),
|
||||
mconf_gen.as_slice(),
|
||||
vote_request_term.as_slice(),
|
||||
proposer_id.as_slice(),
|
||||
]
|
||||
.concat();
|
||||
|
||||
let acceptor_greeting_term = 2_u64.to_ne_bytes();
|
||||
let acceptor_greeting_node_id = 1_u64.to_ne_bytes();
|
||||
let acceptor_greeting_term = 2_u64.to_be_bytes();
|
||||
let acceptor_greeting_node_id = 1_u64.to_be_bytes();
|
||||
let acceptor_greeting = [
|
||||
greeting_tag.as_slice(),
|
||||
acceptor_greeting_term.as_slice(),
|
||||
acceptor_greeting_node_id.as_slice(),
|
||||
mconf_gen.as_slice(),
|
||||
mconf_members_len.as_slice(),
|
||||
mconf_members_new_len.as_slice(),
|
||||
acceptor_greeting_term.as_slice(),
|
||||
]
|
||||
.concat();
|
||||
|
||||
let vote_response_term = 3_u64.to_ne_bytes();
|
||||
let vote_given = 1_u64.to_ne_bytes();
|
||||
let flush_lsn = 0x539_u64.to_ne_bytes();
|
||||
let truncate_lsn = 0x539_u64.to_ne_bytes();
|
||||
let th_len = 1_u32.to_ne_bytes();
|
||||
let th_term = 2_u64.to_ne_bytes();
|
||||
let th_lsn = 0x539_u64.to_ne_bytes();
|
||||
let timeline_start_lsn = 0x539_u64.to_ne_bytes();
|
||||
let vote_response_term = 3_u64.to_be_bytes();
|
||||
let vote_given = 1_u8.to_be_bytes();
|
||||
let flush_lsn = 0x539_u64.to_be_bytes();
|
||||
let truncate_lsn = 0x539_u64.to_be_bytes();
|
||||
let th_len = 1_u32.to_be_bytes();
|
||||
let th_term = 2_u64.to_be_bytes();
|
||||
let th_lsn = 0x539_u64.to_be_bytes();
|
||||
let vote_response = [
|
||||
voting_tag.as_slice(),
|
||||
mconf_gen.as_slice(),
|
||||
vote_response_term.as_slice(),
|
||||
vote_given.as_slice(),
|
||||
flush_lsn.as_slice(),
|
||||
@@ -554,7 +563,6 @@ mod tests {
|
||||
th_len.as_slice(),
|
||||
th_term.as_slice(),
|
||||
th_lsn.as_slice(),
|
||||
timeline_start_lsn.as_slice(),
|
||||
]
|
||||
.concat();
|
||||
|
||||
|
||||
@@ -51,6 +51,26 @@ HexDecodeString(uint8 *result, char *input, int nbytes)
|
||||
return true;
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint16 - get a binary 2-byte int from a message buffer
|
||||
* --------------------------------
|
||||
*/
|
||||
uint16
|
||||
pq_getmsgint16(StringInfo msg)
|
||||
{
|
||||
return pq_getmsgint(msg, 2);
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint32 - get a binary 4-byte int from a message buffer
|
||||
* --------------------------------
|
||||
*/
|
||||
uint32
|
||||
pq_getmsgint32(StringInfo msg)
|
||||
{
|
||||
return pq_getmsgint(msg, 4);
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
|
||||
* --------------------------------
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
#endif
|
||||
|
||||
bool HexDecodeString(uint8 *result, char *input, int nbytes);
|
||||
uint16 pq_getmsgint16(StringInfo msg);
|
||||
uint32 pq_getmsgint32(StringInfo msg);
|
||||
uint32 pq_getmsgint32_le(StringInfo msg);
|
||||
uint64 pq_getmsgint64_le(StringInfo msg);
|
||||
void pq_sendint32_le(StringInfo buf, uint32 i);
|
||||
|
||||
@@ -70,6 +70,7 @@ static bool SendAppendRequests(Safekeeper *sk);
|
||||
static bool RecvAppendResponses(Safekeeper *sk);
|
||||
static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp);
|
||||
static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp);
|
||||
static void PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version);
|
||||
static void HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk);
|
||||
static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size);
|
||||
static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg);
|
||||
@@ -81,6 +82,8 @@ static char *FormatSafekeeperState(Safekeeper *sk);
|
||||
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
|
||||
static char *FormatEvents(WalProposer *wp, uint32 events);
|
||||
static void UpdateDonorShmem(WalProposer *wp);
|
||||
static char *MembershipConfigurationToString(MembershipConfiguration *mconf);
|
||||
static void MembershipConfigurationFree(MembershipConfiguration *mconf);
|
||||
|
||||
WalProposer *
|
||||
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
@@ -137,25 +140,21 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
}
|
||||
wp->quorum = wp->n_safekeepers / 2 + 1;
|
||||
|
||||
if (wp->config->proto_version != 2 && wp->config->proto_version != 3)
|
||||
wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version);
|
||||
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
|
||||
|
||||
/* Fill the greeting package */
|
||||
wp->greetRequest.tag = 'g';
|
||||
wp->greetRequest.protocolVersion = SK_PROTOCOL_VERSION;
|
||||
wp->greetRequest.pgVersion = PG_VERSION_NUM;
|
||||
wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId));
|
||||
wp->greetRequest.systemId = wp->config->systemId;
|
||||
if (!wp->config->neon_timeline)
|
||||
wp_log(FATAL, "neon.timeline_id is not provided");
|
||||
if (*wp->config->neon_timeline != '\0' &&
|
||||
!HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16))
|
||||
wp_log(FATAL, "could not parse neon.timeline_id, %s", wp->config->neon_timeline);
|
||||
wp->greetRequest.pam.tag = 'g';
|
||||
if (!wp->config->neon_tenant)
|
||||
wp_log(FATAL, "neon.tenant_id is not provided");
|
||||
if (*wp->config->neon_tenant != '\0' &&
|
||||
!HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16))
|
||||
wp_log(FATAL, "could not parse neon.tenant_id, %s", wp->config->neon_tenant);
|
||||
|
||||
wp->greetRequest.timeline = wp->config->pgTimeline;
|
||||
wp->greetRequest.walSegSize = wp->config->wal_segment_size;
|
||||
wp->greetRequest.tenant_id = wp->config->neon_tenant;
|
||||
if (!wp->config->neon_timeline)
|
||||
wp_log(FATAL, "neon.timeline_id is not provided");
|
||||
wp->greetRequest.timeline_id = wp->config->neon_timeline;
|
||||
wp->greetRequest.pg_version = PG_VERSION_NUM;
|
||||
wp->greetRequest.system_id = wp->config->systemId;
|
||||
wp->greetRequest.wal_seg_size = wp->config->wal_segment_size;
|
||||
|
||||
wp->api.init_event_set(wp);
|
||||
|
||||
@@ -165,12 +164,14 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
void
|
||||
WalProposerFree(WalProposer *wp)
|
||||
{
|
||||
MembershipConfigurationFree(&wp->mconf);
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
Safekeeper *sk = &wp->safekeeper[i];
|
||||
|
||||
Assert(sk->outbuf.data != NULL);
|
||||
pfree(sk->outbuf.data);
|
||||
MembershipConfigurationFree(&sk->greetResponse.mconf);
|
||||
if (sk->voteResponse.termHistory.entries)
|
||||
pfree(sk->voteResponse.termHistory.entries);
|
||||
sk->voteResponse.termHistory.entries = NULL;
|
||||
@@ -308,6 +309,7 @@ ShutdownConnection(Safekeeper *sk)
|
||||
sk->state = SS_OFFLINE;
|
||||
sk->streamingAt = InvalidXLogRecPtr;
|
||||
|
||||
MembershipConfigurationFree(&sk->greetResponse.mconf);
|
||||
if (sk->voteResponse.termHistory.entries)
|
||||
pfree(sk->voteResponse.termHistory.entries);
|
||||
sk->voteResponse.termHistory.entries = NULL;
|
||||
@@ -598,11 +600,14 @@ static void
|
||||
SendStartWALPush(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
#define CMD_LEN 512
|
||||
char cmd[CMD_LEN];
|
||||
|
||||
if (!wp->api.conn_send_query(sk, "START_WAL_PUSH"))
|
||||
snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version);
|
||||
if (!wp->api.conn_send_query(sk, cmd))
|
||||
{
|
||||
wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s",
|
||||
sk->host, sk->port, wp->api.conn_error_message(sk));
|
||||
wp_log(WARNING, "failed to send %s query to safekeeper %s:%s: %s",
|
||||
cmd, sk->host, sk->port, wp->api.conn_error_message(sk));
|
||||
ShutdownConnection(sk);
|
||||
return;
|
||||
}
|
||||
@@ -658,23 +663,33 @@ RecvStartWALPushResult(Safekeeper *sk)
|
||||
|
||||
/*
|
||||
* Start handshake: first of all send information about the
|
||||
* safekeeper. After sending, we wait on SS_HANDSHAKE_RECV for
|
||||
* walproposer. After sending, we wait on SS_HANDSHAKE_RECV for
|
||||
* a response to finish the handshake.
|
||||
*/
|
||||
static void
|
||||
SendProposerGreeting(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
char *mconf_toml = MembershipConfigurationToString(&wp->greetRequest.mconf);
|
||||
|
||||
wp_log(LOG, "sending ProposerGreeting to safekeeper %s:%s with mconf = %s", sk->host, sk->port, mconf_toml);
|
||||
pfree(mconf_toml);
|
||||
|
||||
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->greetRequest,
|
||||
&sk->outbuf, wp->config->proto_version);
|
||||
|
||||
/*
|
||||
* On failure, logging & resetting the connection is handled. We just need
|
||||
* to handle the control flow.
|
||||
*/
|
||||
BlockingWrite(sk, &sk->wp->greetRequest, sizeof(sk->wp->greetRequest), SS_HANDSHAKE_RECV);
|
||||
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
|
||||
}
|
||||
|
||||
static void
|
||||
RecvAcceptorGreeting(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
char *mconf_toml;
|
||||
|
||||
/*
|
||||
* If our reading doesn't immediately succeed, any necessary error
|
||||
@@ -685,7 +700,10 @@ RecvAcceptorGreeting(Safekeeper *sk)
|
||||
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse))
|
||||
return;
|
||||
|
||||
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, term=" INT64_FORMAT, sk->host, sk->port, sk->greetResponse.term);
|
||||
mconf_toml = MembershipConfigurationToString(&sk->greetResponse.mconf);
|
||||
wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, node_id = %lu, mconf = %s, term=" UINT64_FORMAT,
|
||||
sk->host, sk->port, sk->greetResponse.nodeId, mconf_toml, sk->greetResponse.term);
|
||||
pfree(mconf_toml);
|
||||
|
||||
/* Protocol is all good, move to voting. */
|
||||
sk->state = SS_VOTING;
|
||||
@@ -707,12 +725,9 @@ RecvAcceptorGreeting(Safekeeper *sk)
|
||||
wp->propTerm++;
|
||||
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
|
||||
|
||||
wp->voteRequest = (VoteRequest)
|
||||
{
|
||||
.tag = 'v',
|
||||
.term = wp->propTerm
|
||||
};
|
||||
memcpy(wp->voteRequest.proposerId.data, wp->greetRequest.proposerId.data, UUID_LEN);
|
||||
wp->voteRequest.pam.tag = 'v';
|
||||
wp->voteRequest.generation = wp->mconf.generation;
|
||||
wp->voteRequest.term = wp->propTerm;
|
||||
}
|
||||
}
|
||||
else if (sk->greetResponse.term > wp->propTerm)
|
||||
@@ -759,12 +774,14 @@ SendVoteRequest(Safekeeper *sk)
|
||||
{
|
||||
WalProposer *wp = sk->wp;
|
||||
|
||||
/* We have quorum for voting, send our vote request */
|
||||
wp_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term);
|
||||
/* On failure, logging & resetting is handled */
|
||||
if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT))
|
||||
return;
|
||||
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->voteRequest,
|
||||
&sk->outbuf, wp->config->proto_version);
|
||||
|
||||
/* We have quorum for voting, send our vote request */
|
||||
wp_log(LOG, "requesting vote from %s:%s for generation %u term " UINT64_FORMAT, sk->host, sk->port,
|
||||
wp->voteRequest.generation, wp->voteRequest.term);
|
||||
/* On failure, logging & resetting is handled */
|
||||
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_WAIT_VERDICT);
|
||||
/* If successful, wait for read-ready with SS_WAIT_VERDICT */
|
||||
}
|
||||
|
||||
@@ -778,11 +795,12 @@ RecvVoteResponse(Safekeeper *sk)
|
||||
return;
|
||||
|
||||
wp_log(LOG,
|
||||
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
|
||||
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
|
||||
"got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X",
|
||||
sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term,
|
||||
sk->voteResponse.voteGiven,
|
||||
GetHighestTerm(&sk->voteResponse.termHistory),
|
||||
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
|
||||
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
|
||||
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
|
||||
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn));
|
||||
|
||||
/*
|
||||
* In case of acceptor rejecting our vote, bail out, but only if either it
|
||||
@@ -847,9 +865,9 @@ HandleElectedProposer(WalProposer *wp)
|
||||
* otherwise we must be sync-safekeepers and we have nothing to do then.
|
||||
*
|
||||
* Proceeding is not only pointless but harmful, because we'd give
|
||||
* safekeepers term history starting with 0/0. These hacks will go away once
|
||||
* we disable implicit timeline creation on safekeepers and create it with
|
||||
* non zero LSN from the start.
|
||||
* safekeepers term history starting with 0/0. These hacks will go away
|
||||
* once we disable implicit timeline creation on safekeepers and create it
|
||||
* with non zero LSN from the start.
|
||||
*/
|
||||
if (wp->propEpochStartLsn == InvalidXLogRecPtr)
|
||||
{
|
||||
@@ -942,7 +960,6 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
wp->propEpochStartLsn = InvalidXLogRecPtr;
|
||||
wp->donorEpoch = 0;
|
||||
wp->truncateLsn = InvalidXLogRecPtr;
|
||||
wp->timelineStartLsn = InvalidXLogRecPtr;
|
||||
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
@@ -959,20 +976,6 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
wp->donor = i;
|
||||
}
|
||||
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
|
||||
|
||||
if (wp->safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr)
|
||||
{
|
||||
/* timelineStartLsn should be the same everywhere or unknown */
|
||||
if (wp->timelineStartLsn != InvalidXLogRecPtr &&
|
||||
wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn)
|
||||
{
|
||||
wp_log(WARNING,
|
||||
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
|
||||
LSN_FORMAT_ARGS(wp->timelineStartLsn),
|
||||
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
|
||||
}
|
||||
wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -995,22 +998,11 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
|
||||
{
|
||||
wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp);
|
||||
if (wp->timelineStartLsn == InvalidXLogRecPtr)
|
||||
{
|
||||
wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp);
|
||||
}
|
||||
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
|
||||
}
|
||||
pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn);
|
||||
|
||||
/*
|
||||
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so
|
||||
* it should never be zero at this point, if we know timelineStartLsn.
|
||||
*
|
||||
* timelineStartLsn can be zero only on the first syncSafekeepers run.
|
||||
*/
|
||||
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
|
||||
(wp->config->syncSafekeepers && wp->truncateLsn == wp->timelineStartLsn));
|
||||
Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers);
|
||||
|
||||
/*
|
||||
* We will be generating WAL since propEpochStartLsn, so we should set
|
||||
@@ -1053,10 +1045,11 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
|
||||
{
|
||||
/*
|
||||
* However, allow to proceed if last_log_term on the node which gave
|
||||
* the highest vote (i.e. point where we are going to start writing)
|
||||
* actually had been won by me; plain restart of walproposer not
|
||||
* intervened by concurrent compute which wrote WAL is ok.
|
||||
* However, allow to proceed if last_log_term on the node which
|
||||
* gave the highest vote (i.e. point where we are going to start
|
||||
* writing) actually had been won by me; plain restart of
|
||||
* walproposer not intervened by concurrent compute which wrote
|
||||
* WAL is ok.
|
||||
*
|
||||
* This avoids compute crash after manual term_bump.
|
||||
*/
|
||||
@@ -1126,14 +1119,8 @@ SendProposerElected(Safekeeper *sk)
|
||||
{
|
||||
/* safekeeper is empty or no common point, start from the beginning */
|
||||
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
|
||||
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u",
|
||||
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
|
||||
|
||||
/*
|
||||
* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline
|
||||
* is created manually (test_s3_wal_replay)
|
||||
*/
|
||||
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
|
||||
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, termHistory.n_entries=%u",
|
||||
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), wp->propTermHistory.n_entries);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1158,29 +1145,19 @@ SendProposerElected(Safekeeper *sk)
|
||||
|
||||
Assert(sk->startStreamingAt <= wp->availableLsn);
|
||||
|
||||
msg.tag = 'e';
|
||||
msg.apm.tag = 'e';
|
||||
msg.generation = wp->mconf.generation;
|
||||
msg.term = wp->propTerm;
|
||||
msg.startStreamingAt = sk->startStreamingAt;
|
||||
msg.termHistory = &wp->propTermHistory;
|
||||
msg.timelineStartLsn = wp->timelineStartLsn;
|
||||
|
||||
lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0;
|
||||
wp_log(LOG,
|
||||
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
|
||||
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
|
||||
|
||||
resetStringInfo(&sk->outbuf);
|
||||
pq_sendint64_le(&sk->outbuf, msg.tag);
|
||||
pq_sendint64_le(&sk->outbuf, msg.term);
|
||||
pq_sendint64_le(&sk->outbuf, msg.startStreamingAt);
|
||||
pq_sendint32_le(&sk->outbuf, msg.termHistory->n_entries);
|
||||
for (int i = 0; i < msg.termHistory->n_entries; i++)
|
||||
{
|
||||
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].term);
|
||||
pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].lsn);
|
||||
}
|
||||
pq_sendint64_le(&sk->outbuf, msg.timelineStartLsn);
|
||||
"sending elected msg to node " UINT64_FORMAT " generation=%u term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s",
|
||||
sk->greetResponse.nodeId, msg.generation, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt),
|
||||
lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port);
|
||||
|
||||
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version);
|
||||
if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH))
|
||||
return;
|
||||
|
||||
@@ -1246,14 +1223,13 @@ static void
|
||||
PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn)
|
||||
{
|
||||
Assert(endLsn >= beginLsn);
|
||||
req->tag = 'a';
|
||||
req->apm.tag = 'a';
|
||||
req->generation = wp->mconf.generation;
|
||||
req->term = wp->propTerm;
|
||||
req->epochStartLsn = wp->propEpochStartLsn;
|
||||
req->beginLsn = beginLsn;
|
||||
req->endLsn = endLsn;
|
||||
req->commitLsn = wp->commitLsn;
|
||||
req->truncateLsn = wp->truncateLsn;
|
||||
req->proposerId = wp->greetRequest.proposerId;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1354,7 +1330,8 @@ SendAppendRequests(Safekeeper *sk)
|
||||
resetStringInfo(&sk->outbuf);
|
||||
|
||||
/* write AppendRequest header */
|
||||
appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader));
|
||||
PAMessageSerialize(wp, (ProposerAcceptorMessage *) req, &sk->outbuf, wp->config->proto_version);
|
||||
/* prepare for reading WAL into the outbuf */
|
||||
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
|
||||
sk->active_state = SS_ACTIVE_READ_WAL;
|
||||
}
|
||||
@@ -1367,14 +1344,17 @@ SendAppendRequests(Safekeeper *sk)
|
||||
req = &sk->appendRequest;
|
||||
req_len = req->endLsn - req->beginLsn;
|
||||
|
||||
/* We send zero sized AppenRequests as heartbeats; don't wal_read for these. */
|
||||
/*
|
||||
* We send zero sized AppenRequests as heartbeats; don't wal_read
|
||||
* for these.
|
||||
*/
|
||||
if (req_len > 0)
|
||||
{
|
||||
switch (wp->api.wal_read(sk,
|
||||
&sk->outbuf.data[sk->outbuf.len],
|
||||
req->beginLsn,
|
||||
req_len,
|
||||
&errmsg))
|
||||
&sk->outbuf.data[sk->outbuf.len],
|
||||
req->beginLsn,
|
||||
req_len,
|
||||
&errmsg))
|
||||
{
|
||||
case NEON_WALREAD_SUCCESS:
|
||||
break;
|
||||
@@ -1382,7 +1362,7 @@ SendAppendRequests(Safekeeper *sk)
|
||||
return true;
|
||||
case NEON_WALREAD_ERROR:
|
||||
wp_log(WARNING, "WAL reading for node %s:%s failed: %s",
|
||||
sk->host, sk->port, errmsg);
|
||||
sk->host, sk->port, errmsg);
|
||||
ShutdownConnection(sk);
|
||||
return false;
|
||||
default:
|
||||
@@ -1470,11 +1450,11 @@ RecvAppendResponses(Safekeeper *sk)
|
||||
* Term has changed to higher one, probably another compute is
|
||||
* running. If this is the case we could PANIC as well because
|
||||
* likely it inserted some data and our basebackup is unsuitable
|
||||
* anymore. However, we also bump term manually (term_bump endpoint)
|
||||
* on safekeepers for migration purposes, in this case we do want
|
||||
* compute to stay alive. So restart walproposer with FATAL instead
|
||||
* of panicking; if basebackup is spoiled next election will notice
|
||||
* this.
|
||||
* anymore. However, we also bump term manually (term_bump
|
||||
* endpoint) on safekeepers for migration purposes, in this case
|
||||
* we do want compute to stay alive. So restart walproposer with
|
||||
* FATAL instead of panicking; if basebackup is spoiled next
|
||||
* election will notice this.
|
||||
*/
|
||||
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
|
||||
sk->host, sk->port,
|
||||
@@ -1509,7 +1489,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
|
||||
|
||||
for (i = 0; i < nkeys; i++)
|
||||
{
|
||||
const char *key = pq_getmsgstring(reply_message);
|
||||
const char *key = pq_getmsgrawstring(reply_message);
|
||||
unsigned int value_len = pq_getmsgint(reply_message, sizeof(int32));
|
||||
|
||||
if (strcmp(key, "current_timeline_size") == 0)
|
||||
@@ -1750,6 +1730,208 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
|
||||
}
|
||||
}
|
||||
|
||||
/* Serialize MembershipConfiguration into buf. */
|
||||
static void
|
||||
MembershipConfigurationSerialize(MembershipConfiguration *mconf, StringInfo buf)
|
||||
{
|
||||
uint32 i;
|
||||
|
||||
pq_sendint32(buf, mconf->generation);
|
||||
|
||||
pq_sendint32(buf, mconf->members.len);
|
||||
for (i = 0; i < mconf->members.len; i++)
|
||||
{
|
||||
pq_sendint64(buf, mconf->members.m[i].node_id);
|
||||
pq_send_ascii_string(buf, mconf->members.m[i].host);
|
||||
pq_sendint16(buf, mconf->members.m[i].port);
|
||||
}
|
||||
|
||||
/*
|
||||
* There is no special mark for absent new_members; zero members in
|
||||
* invalid, so zero len means absent.
|
||||
*/
|
||||
pq_sendint32(buf, mconf->new_members.len);
|
||||
for (i = 0; i < mconf->new_members.len; i++)
|
||||
{
|
||||
pq_sendint64(buf, mconf->new_members.m[i].node_id);
|
||||
pq_send_ascii_string(buf, mconf->new_members.m[i].host);
|
||||
pq_sendint16(buf, mconf->new_members.m[i].port);
|
||||
}
|
||||
}
|
||||
|
||||
/* Serialize proposer -> acceptor message into buf using specified version */
|
||||
static void
|
||||
PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version)
|
||||
{
|
||||
/* both version are supported currently until we fully migrate to 3 */
|
||||
Assert(proto_version == 3 || proto_version == 2);
|
||||
|
||||
resetStringInfo(buf);
|
||||
|
||||
if (proto_version == 3)
|
||||
{
|
||||
/*
|
||||
* v2 sends structs for some messages as is, so commonly send tag only
|
||||
* for v3
|
||||
*/
|
||||
pq_sendint8(buf, msg->tag);
|
||||
|
||||
switch (msg->tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
ProposerGreeting *m = (ProposerGreeting *) msg;
|
||||
|
||||
pq_send_ascii_string(buf, m->tenant_id);
|
||||
pq_send_ascii_string(buf, m->timeline_id);
|
||||
MembershipConfigurationSerialize(&m->mconf, buf);
|
||||
pq_sendint32(buf, m->pg_version);
|
||||
pq_sendint64(buf, m->system_id);
|
||||
pq_sendint32(buf, m->wal_seg_size);
|
||||
break;
|
||||
}
|
||||
case 'v':
|
||||
{
|
||||
VoteRequest *m = (VoteRequest *) msg;
|
||||
|
||||
pq_sendint32(buf, m->generation);
|
||||
pq_sendint64(buf, m->term);
|
||||
break;
|
||||
|
||||
}
|
||||
case 'e':
|
||||
{
|
||||
ProposerElected *m = (ProposerElected *) msg;
|
||||
|
||||
pq_sendint32(buf, m->generation);
|
||||
pq_sendint64(buf, m->term);
|
||||
pq_sendint64(buf, m->startStreamingAt);
|
||||
pq_sendint32(buf, m->termHistory->n_entries);
|
||||
for (uint32 i = 0; i < m->termHistory->n_entries; i++)
|
||||
{
|
||||
pq_sendint64(buf, m->termHistory->entries[i].term);
|
||||
pq_sendint64(buf, m->termHistory->entries[i].lsn);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'a':
|
||||
{
|
||||
/*
|
||||
* Note: this serializes only AppendRequestHeader, caller
|
||||
* is expected to append WAL data later.
|
||||
*/
|
||||
AppendRequestHeader *m = (AppendRequestHeader *) msg;
|
||||
|
||||
pq_sendint32(buf, m->generation);
|
||||
pq_sendint64(buf, m->term);
|
||||
pq_sendint64(buf, m->beginLsn);
|
||||
pq_sendint64(buf, m->endLsn);
|
||||
pq_sendint64(buf, m->commitLsn);
|
||||
pq_sendint64(buf, m->truncateLsn);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (proto_version == 2)
|
||||
{
|
||||
switch (msg->tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
/* v2 sent struct as is */
|
||||
ProposerGreeting *m = (ProposerGreeting *) msg;
|
||||
ProposerGreetingV2 greetRequestV2;
|
||||
|
||||
/* Fill also v2 struct. */
|
||||
greetRequestV2.tag = 'g';
|
||||
greetRequestV2.protocolVersion = proto_version;
|
||||
greetRequestV2.pgVersion = m->pg_version;
|
||||
|
||||
/*
|
||||
* v3 removed this field because it's easier to pass as
|
||||
* libq or START_WAL_PUSH options
|
||||
*/
|
||||
memset(&greetRequestV2.proposerId, 0, sizeof(greetRequestV2.proposerId));
|
||||
greetRequestV2.systemId = wp->config->systemId;
|
||||
if (*m->timeline_id != '\0' &&
|
||||
!HexDecodeString(greetRequestV2.timeline_id, m->timeline_id, 16))
|
||||
wp_log(FATAL, "could not parse neon.timeline_id, %s", m->timeline_id);
|
||||
if (*m->tenant_id != '\0' &&
|
||||
!HexDecodeString(greetRequestV2.tenant_id, m->tenant_id, 16))
|
||||
wp_log(FATAL, "could not parse neon.tenant_id, %s", m->tenant_id);
|
||||
|
||||
greetRequestV2.timeline = wp->config->pgTimeline;
|
||||
greetRequestV2.walSegSize = wp->config->wal_segment_size;
|
||||
|
||||
pq_sendbytes(buf, (char *) &greetRequestV2, sizeof(greetRequestV2));
|
||||
break;
|
||||
}
|
||||
case 'v':
|
||||
{
|
||||
/* v2 sent struct as is */
|
||||
VoteRequest *m = (VoteRequest *) msg;
|
||||
VoteRequestV2 voteRequestV2;
|
||||
|
||||
voteRequestV2.tag = m->pam.tag;
|
||||
voteRequestV2.term = m->term;
|
||||
/* removed field */
|
||||
memset(&voteRequestV2.proposerId, 0, sizeof(voteRequestV2.proposerId));
|
||||
pq_sendbytes(buf, (char *) &voteRequestV2, sizeof(voteRequestV2));
|
||||
break;
|
||||
}
|
||||
case 'e':
|
||||
{
|
||||
ProposerElected *m = (ProposerElected *) msg;
|
||||
|
||||
pq_sendint64_le(buf, m->apm.tag);
|
||||
pq_sendint64_le(buf, m->term);
|
||||
pq_sendint64_le(buf, m->startStreamingAt);
|
||||
pq_sendint32_le(buf, m->termHistory->n_entries);
|
||||
for (int i = 0; i < m->termHistory->n_entries; i++)
|
||||
{
|
||||
pq_sendint64_le(buf, m->termHistory->entries[i].term);
|
||||
pq_sendint64_le(buf, m->termHistory->entries[i].lsn);
|
||||
}
|
||||
pq_sendint64_le(buf, 0); /* removed timeline_start_lsn */
|
||||
break;
|
||||
}
|
||||
case 'a':
|
||||
|
||||
/*
|
||||
* Note: this serializes only AppendRequestHeader, caller is
|
||||
* expected to append WAL data later.
|
||||
*/
|
||||
{
|
||||
/* v2 sent struct as is */
|
||||
AppendRequestHeader *m = (AppendRequestHeader *) msg;
|
||||
AppendRequestHeaderV2 appendRequestHeaderV2;
|
||||
|
||||
appendRequestHeaderV2.tag = m->apm.tag;
|
||||
appendRequestHeaderV2.term = m->term;
|
||||
appendRequestHeaderV2.epochStartLsn = 0; /* removed field */
|
||||
appendRequestHeaderV2.beginLsn = m->beginLsn;
|
||||
appendRequestHeaderV2.endLsn = m->endLsn;
|
||||
appendRequestHeaderV2.commitLsn = m->commitLsn;
|
||||
appendRequestHeaderV2.truncateLsn = m->truncateLsn;
|
||||
/* removed field */
|
||||
memset(&appendRequestHeaderV2.proposerId, 0, sizeof(appendRequestHeaderV2.proposerId));
|
||||
|
||||
pq_sendbytes(buf, (char *) &appendRequestHeaderV2, sizeof(appendRequestHeaderV2));
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
|
||||
}
|
||||
return;
|
||||
}
|
||||
wp_log(FATAL, "unexpected proto_version %d", proto_version);
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to read CopyData message from i'th safekeeper, resetting connection on
|
||||
* failure.
|
||||
@@ -1779,6 +1961,37 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Deserialize membership configuration from buf to mconf. */
|
||||
static void
|
||||
MembershipConfigurationDeserialize(MembershipConfiguration *mconf, StringInfo buf)
|
||||
{
|
||||
uint32 i;
|
||||
|
||||
mconf->generation = pq_getmsgint32(buf);
|
||||
mconf->members.len = pq_getmsgint32(buf);
|
||||
mconf->members.m = palloc0(sizeof(SafekeeperId) * mconf->members.len);
|
||||
for (i = 0; i < mconf->members.len; i++)
|
||||
{
|
||||
const char *buf_host;
|
||||
|
||||
mconf->members.m[i].node_id = pq_getmsgint64(buf);
|
||||
buf_host = pq_getmsgrawstring(buf);
|
||||
strlcpy(mconf->members.m[i].host, buf_host, sizeof(mconf->members.m[i].host));
|
||||
mconf->members.m[i].port = pq_getmsgint16(buf);
|
||||
}
|
||||
mconf->new_members.len = pq_getmsgint32(buf);
|
||||
mconf->new_members.m = palloc0(sizeof(SafekeeperId) * mconf->new_members.len);
|
||||
for (i = 0; i < mconf->new_members.len; i++)
|
||||
{
|
||||
const char *buf_host;
|
||||
|
||||
mconf->new_members.m[i].node_id = pq_getmsgint64(buf);
|
||||
buf_host = pq_getmsgrawstring(buf);
|
||||
strlcpy(mconf->new_members.m[i].host, buf_host, sizeof(mconf->new_members.m[i].host));
|
||||
mconf->new_members.m[i].port = pq_getmsgint16(buf);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Read next message with known type into provided struct, by reading a CopyData
|
||||
* block from the safekeeper's postgres connection, returning whether the read
|
||||
@@ -1787,6 +2000,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
|
||||
* If the read needs more polling, we return 'false' and keep the state
|
||||
* unmodified, waiting until it becomes read-ready to try again. If it fully
|
||||
* failed, a warning is emitted and the connection is reset.
|
||||
*
|
||||
* Note: it pallocs if needed, i.e. for AcceptorGreeting and VoteResponse fields.
|
||||
*/
|
||||
static bool
|
||||
AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
|
||||
@@ -1795,82 +2010,154 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
|
||||
|
||||
char *buf;
|
||||
int buf_size;
|
||||
uint64 tag;
|
||||
uint8 tag;
|
||||
StringInfoData s;
|
||||
|
||||
if (!(AsyncRead(sk, &buf, &buf_size)))
|
||||
return false;
|
||||
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
|
||||
|
||||
/* parse it */
|
||||
s.data = buf;
|
||||
s.len = buf_size;
|
||||
s.maxlen = buf_size;
|
||||
s.cursor = 0;
|
||||
|
||||
tag = pq_getmsgint64_le(&s);
|
||||
if (tag != anymsg->tag)
|
||||
if (wp->config->proto_version == 3)
|
||||
{
|
||||
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
|
||||
sk->port, FormatSafekeeperState(sk));
|
||||
ResetConnection(sk);
|
||||
return false;
|
||||
}
|
||||
sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp);
|
||||
switch (tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->nodeId = pq_getmsgint64_le(&s);
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
case 'v':
|
||||
{
|
||||
VoteResponse *msg = (VoteResponse *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->voteGiven = pq_getmsgint64_le(&s);
|
||||
msg->flushLsn = pq_getmsgint64_le(&s);
|
||||
msg->truncateLsn = pq_getmsgint64_le(&s);
|
||||
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
|
||||
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
|
||||
for (int i = 0; i < msg->termHistory.n_entries; i++)
|
||||
tag = pq_getmsgbyte(&s);
|
||||
if (tag != anymsg->tag)
|
||||
{
|
||||
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
|
||||
sk->port, FormatSafekeeperState(sk));
|
||||
ResetConnection(sk);
|
||||
return false;
|
||||
}
|
||||
switch (tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
|
||||
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
|
||||
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
|
||||
|
||||
msg->nodeId = pq_getmsgint64(&s);
|
||||
MembershipConfigurationDeserialize(&msg->mconf, &s);
|
||||
msg->term = pq_getmsgint64(&s);
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
msg->timelineStartLsn = pq_getmsgint64_le(&s);
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
case 'v':
|
||||
{
|
||||
VoteResponse *msg = (VoteResponse *) anymsg;
|
||||
|
||||
case 'a':
|
||||
{
|
||||
AppendResponse *msg = (AppendResponse *) anymsg;
|
||||
msg->generation = pq_getmsgint32(&s);
|
||||
msg->term = pq_getmsgint64(&s);
|
||||
msg->voteGiven = pq_getmsgbyte(&s);
|
||||
msg->flushLsn = pq_getmsgint64(&s);
|
||||
msg->truncateLsn = pq_getmsgint64(&s);
|
||||
msg->termHistory.n_entries = pq_getmsgint32(&s);
|
||||
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
|
||||
for (uint32 i = 0; i < msg->termHistory.n_entries; i++)
|
||||
{
|
||||
msg->termHistory.entries[i].term = pq_getmsgint64(&s);
|
||||
msg->termHistory.entries[i].lsn = pq_getmsgint64(&s);
|
||||
}
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
case 'a':
|
||||
{
|
||||
AppendResponse *msg = (AppendResponse *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->flushLsn = pq_getmsgint64_le(&s);
|
||||
msg->commitLsn = pq_getmsgint64_le(&s);
|
||||
msg->hs.ts = pq_getmsgint64_le(&s);
|
||||
msg->hs.xmin.value = pq_getmsgint64_le(&s);
|
||||
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
|
||||
if (s.len > s.cursor)
|
||||
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
|
||||
else
|
||||
msg->ps_feedback.present = false;
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
Assert(false);
|
||||
return false;
|
||||
}
|
||||
msg->generation = pq_getmsgint32(&s);
|
||||
msg->term = pq_getmsgint64(&s);
|
||||
msg->flushLsn = pq_getmsgint64(&s);
|
||||
msg->commitLsn = pq_getmsgint64(&s);
|
||||
msg->hs.ts = pq_getmsgint64(&s);
|
||||
msg->hs.xmin.value = pq_getmsgint64(&s);
|
||||
msg->hs.catalog_xmin.value = pq_getmsgint64(&s);
|
||||
if (s.len > s.cursor)
|
||||
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
|
||||
else
|
||||
msg->ps_feedback.present = false;
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
default:
|
||||
{
|
||||
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (wp->config->proto_version == 2)
|
||||
{
|
||||
tag = pq_getmsgint64_le(&s);
|
||||
if (tag != anymsg->tag)
|
||||
{
|
||||
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
|
||||
sk->port, FormatSafekeeperState(sk));
|
||||
ResetConnection(sk);
|
||||
return false;
|
||||
}
|
||||
switch (tag)
|
||||
{
|
||||
case 'g':
|
||||
{
|
||||
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->nodeId = pq_getmsgint64_le(&s);
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
case 'v':
|
||||
{
|
||||
VoteResponse *msg = (VoteResponse *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->voteGiven = pq_getmsgint64_le(&s);
|
||||
msg->flushLsn = pq_getmsgint64_le(&s);
|
||||
msg->truncateLsn = pq_getmsgint64_le(&s);
|
||||
msg->termHistory.n_entries = pq_getmsgint32_le(&s);
|
||||
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
|
||||
for (int i = 0; i < msg->termHistory.n_entries; i++)
|
||||
{
|
||||
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
|
||||
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
|
||||
}
|
||||
pq_getmsgint64_le(&s); /* timelineStartLsn */
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
case 'a':
|
||||
{
|
||||
AppendResponse *msg = (AppendResponse *) anymsg;
|
||||
|
||||
msg->term = pq_getmsgint64_le(&s);
|
||||
msg->flushLsn = pq_getmsgint64_le(&s);
|
||||
msg->commitLsn = pq_getmsgint64_le(&s);
|
||||
msg->hs.ts = pq_getmsgint64_le(&s);
|
||||
msg->hs.xmin.value = pq_getmsgint64_le(&s);
|
||||
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
|
||||
if (s.len > s.cursor)
|
||||
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
|
||||
else
|
||||
msg->ps_feedback.present = false;
|
||||
pq_getmsgend(&s);
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version);
|
||||
return false; /* keep the compiler quiet */
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2246,3 +2533,45 @@ FormatEvents(WalProposer *wp, uint32 events)
|
||||
|
||||
return (char *) &return_str;
|
||||
}
|
||||
|
||||
/* Dump mconf as toml for observability / debugging. Result is palloc'ed. */
|
||||
static char *
|
||||
MembershipConfigurationToString(MembershipConfiguration *mconf)
|
||||
{
|
||||
StringInfoData s;
|
||||
uint32 i;
|
||||
|
||||
initStringInfo(&s);
|
||||
appendStringInfo(&s, "{gen = %u", mconf->generation);
|
||||
appendStringInfoString(&s, ", members = [");
|
||||
for (i = 0; i < mconf->members.len; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
appendStringInfoString(&s, ", ");
|
||||
appendStringInfo(&s, "{node_id = %lu", mconf->members.m[i].node_id);
|
||||
appendStringInfo(&s, ", host = %s", mconf->members.m[i].host);
|
||||
appendStringInfo(&s, ", port = %u }", mconf->members.m[i].port);
|
||||
}
|
||||
appendStringInfo(&s, "], new_members = [");
|
||||
for (i = 0; i < mconf->new_members.len; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
appendStringInfoString(&s, ", ");
|
||||
appendStringInfo(&s, "{node_id = %lu", mconf->new_members.m[i].node_id);
|
||||
appendStringInfo(&s, ", host = %s", mconf->new_members.m[i].host);
|
||||
appendStringInfo(&s, ", port = %u }", mconf->new_members.m[i].port);
|
||||
}
|
||||
appendStringInfoString(&s, "]}");
|
||||
return s.data;
|
||||
}
|
||||
|
||||
static void
|
||||
MembershipConfigurationFree(MembershipConfiguration *mconf)
|
||||
{
|
||||
if (mconf->members.m)
|
||||
pfree(mconf->members.m);
|
||||
mconf->members.m = NULL;
|
||||
if (mconf->new_members.m)
|
||||
pfree(mconf->new_members.m);
|
||||
mconf->new_members.m = NULL;
|
||||
}
|
||||
|
||||
@@ -12,9 +12,6 @@
|
||||
#include "neon_walreader.h"
|
||||
#include "pagestore_client.h"
|
||||
|
||||
#define SK_MAGIC 0xCafeCeefu
|
||||
#define SK_PROTOCOL_VERSION 2
|
||||
|
||||
#define MAX_SAFEKEEPERS 32
|
||||
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
|
||||
* message */
|
||||
@@ -143,12 +140,71 @@ typedef uint64 term_t;
|
||||
/* neon storage node id */
|
||||
typedef uint64 NNodeId;
|
||||
|
||||
/*
|
||||
* Number uniquely identifying safekeeper membership configuration.
|
||||
* This and following structs pair ones in membership.rs.
|
||||
*/
|
||||
typedef uint32 Generation;
|
||||
|
||||
typedef struct SafekeeperId
|
||||
{
|
||||
NNodeId node_id;
|
||||
char host[MAXCONNINFO];
|
||||
uint16 port;
|
||||
} SafekeeperId;
|
||||
|
||||
/* Set of safekeepers. */
|
||||
typedef struct MemberSet
|
||||
{
|
||||
uint32 len; /* number of members */
|
||||
SafekeeperId *m; /* ids themselves */
|
||||
} MemberSet;
|
||||
|
||||
/* Timeline safekeeper membership configuration. */
|
||||
typedef struct MembershipConfiguration
|
||||
{
|
||||
Generation generation;
|
||||
MemberSet members;
|
||||
/* Has 0 n_members in non joint conf. */
|
||||
MemberSet new_members;
|
||||
} MembershipConfiguration;
|
||||
|
||||
/*
|
||||
* Proposer <-> Acceptor messaging.
|
||||
*/
|
||||
|
||||
typedef struct ProposerAcceptorMessage
|
||||
{
|
||||
uint8 tag;
|
||||
} ProposerAcceptorMessage;
|
||||
|
||||
/* Initial Proposer -> Acceptor message */
|
||||
typedef struct ProposerGreeting
|
||||
{
|
||||
ProposerAcceptorMessage pam; /* message tag */
|
||||
|
||||
/*
|
||||
* tenant/timeline ids as C strings with standard hex notation for ease of
|
||||
* printing. In principle they are not strictly needed as ttid is also
|
||||
* passed as libpq options.
|
||||
*/
|
||||
char *tenant_id;
|
||||
char *timeline_id;
|
||||
/* Full conf is carried to allow safekeeper switch */
|
||||
MembershipConfiguration mconf;
|
||||
|
||||
/*
|
||||
* pg_version and wal_seg_size are used for timeline creation until we
|
||||
* fully migrate to doing externally. systemId is only used as a sanity
|
||||
* cross check.
|
||||
*/
|
||||
uint32 pg_version; /* in PG_VERSION_NUM format */
|
||||
uint64 system_id; /* Postgres system identifier. */
|
||||
uint32 wal_seg_size;
|
||||
} ProposerGreeting;
|
||||
|
||||
/* protocol v2 variant, kept while wp supports it */
|
||||
typedef struct ProposerGreetingV2
|
||||
{
|
||||
uint64 tag; /* message tag */
|
||||
uint32 protocolVersion; /* proposer-safekeeper protocol version */
|
||||
@@ -159,32 +215,42 @@ typedef struct ProposerGreeting
|
||||
uint8 tenant_id[16];
|
||||
TimeLineID timeline;
|
||||
uint32 walSegSize;
|
||||
} ProposerGreeting;
|
||||
} ProposerGreetingV2;
|
||||
|
||||
typedef struct AcceptorProposerMessage
|
||||
{
|
||||
uint64 tag;
|
||||
uint8 tag;
|
||||
} AcceptorProposerMessage;
|
||||
|
||||
/*
|
||||
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
|
||||
* Acceptor -> Proposer initial response: the highest term acceptor voted for,
|
||||
* its node id and configuration.
|
||||
*/
|
||||
typedef struct AcceptorGreeting
|
||||
{
|
||||
AcceptorProposerMessage apm;
|
||||
term_t term;
|
||||
NNodeId nodeId;
|
||||
MembershipConfiguration mconf;
|
||||
term_t term;
|
||||
} AcceptorGreeting;
|
||||
|
||||
/*
|
||||
* Proposer -> Acceptor vote request.
|
||||
*/
|
||||
typedef struct VoteRequest
|
||||
{
|
||||
ProposerAcceptorMessage pam; /* message tag */
|
||||
Generation generation; /* membership conf generation */
|
||||
term_t term;
|
||||
} VoteRequest;
|
||||
|
||||
/* protocol v2 variant, kept while wp supports it */
|
||||
typedef struct VoteRequestV2
|
||||
{
|
||||
uint64 tag;
|
||||
term_t term;
|
||||
pg_uuid_t proposerId; /* for monitoring/debugging */
|
||||
} VoteRequest;
|
||||
} VoteRequestV2;
|
||||
|
||||
/* Element of term switching chain. */
|
||||
typedef struct TermSwitchEntry
|
||||
@@ -203,8 +269,15 @@ typedef struct TermHistory
|
||||
typedef struct VoteResponse
|
||||
{
|
||||
AcceptorProposerMessage apm;
|
||||
|
||||
/*
|
||||
* Membership conf generation. It's not strictly required because on
|
||||
* mismatch safekeeper is expected to ERROR the connection, but let's
|
||||
* sanity check it.
|
||||
*/
|
||||
Generation generation;
|
||||
term_t term;
|
||||
uint64 voteGiven;
|
||||
uint8 voteGiven;
|
||||
|
||||
/*
|
||||
* Safekeeper flush_lsn (end of WAL) + history of term switches allow
|
||||
@@ -214,7 +287,6 @@ typedef struct VoteResponse
|
||||
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for*
|
||||
* recovery of some safekeeper */
|
||||
TermHistory termHistory;
|
||||
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
|
||||
} VoteResponse;
|
||||
|
||||
/*
|
||||
@@ -223,20 +295,37 @@ typedef struct VoteResponse
|
||||
*/
|
||||
typedef struct ProposerElected
|
||||
{
|
||||
uint64 tag;
|
||||
AcceptorProposerMessage apm;
|
||||
Generation generation; /* membership conf generation */
|
||||
term_t term;
|
||||
/* proposer will send since this point */
|
||||
XLogRecPtr startStreamingAt;
|
||||
/* history of term switches up to this proposer */
|
||||
TermHistory *termHistory;
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
} ProposerElected;
|
||||
|
||||
/*
|
||||
* Header of request with WAL message sent from proposer to safekeeper.
|
||||
*/
|
||||
typedef struct AppendRequestHeader
|
||||
{
|
||||
AcceptorProposerMessage apm;
|
||||
Generation generation; /* membership conf generation */
|
||||
term_t term; /* term of the proposer */
|
||||
XLogRecPtr beginLsn; /* start position of message in WAL */
|
||||
XLogRecPtr endLsn; /* end position of message in WAL */
|
||||
XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
|
||||
|
||||
/*
|
||||
* minimal LSN which may be needed for recovery of some safekeeper (end
|
||||
* lsn + 1 of last chunk streamed to everyone)
|
||||
*/
|
||||
XLogRecPtr truncateLsn;
|
||||
/* in the AppendRequest message, WAL data follows */
|
||||
} AppendRequestHeader;
|
||||
|
||||
/* protocol v2 variant, kept while wp supports it */
|
||||
typedef struct AppendRequestHeaderV2
|
||||
{
|
||||
uint64 tag;
|
||||
term_t term; /* term of the proposer */
|
||||
@@ -256,7 +345,8 @@ typedef struct AppendRequestHeader
|
||||
*/
|
||||
XLogRecPtr truncateLsn;
|
||||
pg_uuid_t proposerId; /* for monitoring/debugging */
|
||||
} AppendRequestHeader;
|
||||
/* in the AppendRequest message, WAL data follows */
|
||||
} AppendRequestHeaderV2;
|
||||
|
||||
/*
|
||||
* Hot standby feedback received from replica
|
||||
@@ -309,6 +399,13 @@ typedef struct AppendResponse
|
||||
{
|
||||
AcceptorProposerMessage apm;
|
||||
|
||||
/*
|
||||
* Membership conf generation. It's not strictly required because on
|
||||
* mismatch safekeeper is expected to ERROR the connection, but let's
|
||||
* sanity check it.
|
||||
*/
|
||||
Generation generation;
|
||||
|
||||
/*
|
||||
* Current term of the safekeeper; if it is higher than proposer's, the
|
||||
* compute is out of date.
|
||||
@@ -644,6 +741,8 @@ typedef struct WalProposerConfig
|
||||
/* Will be passed to safekeepers in greet request. */
|
||||
TimeLineID pgTimeline;
|
||||
|
||||
int proto_version;
|
||||
|
||||
#ifdef WALPROPOSER_LIB
|
||||
void *callback_data;
|
||||
#endif
|
||||
@@ -656,11 +755,14 @@ typedef struct WalProposerConfig
|
||||
typedef struct WalProposer
|
||||
{
|
||||
WalProposerConfig *config;
|
||||
int n_safekeepers;
|
||||
/* Current walproposer membership configuration */
|
||||
MembershipConfiguration mconf;
|
||||
|
||||
/* (n_safekeepers / 2) + 1 */
|
||||
int quorum;
|
||||
|
||||
/* Number of occupied slots in safekeepers[] */
|
||||
int n_safekeepers;
|
||||
Safekeeper safekeeper[MAX_SAFEKEEPERS];
|
||||
|
||||
/* WAL has been generated up to this point */
|
||||
@@ -670,6 +772,7 @@ typedef struct WalProposer
|
||||
XLogRecPtr commitLsn;
|
||||
|
||||
ProposerGreeting greetRequest;
|
||||
ProposerGreetingV2 greetRequestV2;
|
||||
|
||||
/* Vote request for safekeeper */
|
||||
VoteRequest voteRequest;
|
||||
|
||||
@@ -117,14 +117,13 @@ pq_getmsgbytes(StringInfo msg, int datalen)
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_getmsgstring - get a null-terminated text string (with conversion)
|
||||
* pq_getmsgrawstring - get a null-terminated text string - NO conversion
|
||||
*
|
||||
* May return a pointer directly into the message buffer, or a pointer
|
||||
* to a palloc'd conversion result.
|
||||
* Returns a pointer directly into the message buffer.
|
||||
* --------------------------------
|
||||
*/
|
||||
const char *
|
||||
pq_getmsgstring(StringInfo msg)
|
||||
pq_getmsgrawstring(StringInfo msg)
|
||||
{
|
||||
char *str;
|
||||
int slen;
|
||||
@@ -155,6 +154,45 @@ pq_getmsgend(StringInfo msg)
|
||||
ExceptionalCondition("invalid msg format", __FILE__, __LINE__);
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_sendbytes - append raw data to a StringInfo buffer
|
||||
* --------------------------------
|
||||
*/
|
||||
void
|
||||
pq_sendbytes(StringInfo buf, const void *data, int datalen)
|
||||
{
|
||||
/* use variant that maintains a trailing null-byte, out of caution */
|
||||
appendBinaryStringInfo(buf, data, datalen);
|
||||
}
|
||||
|
||||
/* --------------------------------
|
||||
* pq_send_ascii_string - append a null-terminated text string (without conversion)
|
||||
*
|
||||
* This function intentionally bypasses encoding conversion, instead just
|
||||
* silently replacing any non-7-bit-ASCII characters with question marks.
|
||||
* It is used only when we are having trouble sending an error message to
|
||||
* the client with normal localization and encoding conversion. The caller
|
||||
* should already have taken measures to ensure the string is just ASCII;
|
||||
* the extra work here is just to make certain we don't send a badly encoded
|
||||
* string to the client (which might or might not be robust about that).
|
||||
*
|
||||
* NB: passed text string must be null-terminated, and so is the data
|
||||
* sent to the frontend.
|
||||
* --------------------------------
|
||||
*/
|
||||
void
|
||||
pq_send_ascii_string(StringInfo buf, const char *str)
|
||||
{
|
||||
while (*str)
|
||||
{
|
||||
char ch = *str++;
|
||||
|
||||
if (IS_HIGHBIT_SET(ch))
|
||||
ch = '?';
|
||||
appendStringInfoCharMacro(buf, ch);
|
||||
}
|
||||
appendStringInfoChar(buf, '\0');
|
||||
}
|
||||
|
||||
/*
|
||||
* Produce a C-string representation of a TimestampTz.
|
||||
|
||||
@@ -59,9 +59,11 @@
|
||||
|
||||
#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot"
|
||||
|
||||
/* GUCs */
|
||||
char *wal_acceptors_list = "";
|
||||
int wal_acceptor_reconnect_timeout = 1000;
|
||||
int wal_acceptor_connection_timeout = 10000;
|
||||
int safekeeper_proto_version = 2;
|
||||
|
||||
/* Set to true in the walproposer bgw. */
|
||||
static bool am_walproposer;
|
||||
@@ -126,6 +128,7 @@ init_walprop_config(bool syncSafekeepers)
|
||||
else
|
||||
walprop_config.systemId = 0;
|
||||
walprop_config.pgTimeline = walprop_pg_get_timeline_id();
|
||||
walprop_config.proto_version = safekeeper_proto_version;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -219,25 +222,37 @@ nwp_register_gucs(void)
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.safekeeper_proto_version",
|
||||
"Version of compute <-> safekeeper protocol.",
|
||||
"Used while migrating from 2 to 3.",
|
||||
&safekeeper_proto_version,
|
||||
2, 0, INT_MAX,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
|
||||
{
|
||||
int n_safekeepers = 0;
|
||||
char *curr_sk = safekeepers_list;
|
||||
int n_safekeepers = 0;
|
||||
char *curr_sk = safekeepers_list;
|
||||
|
||||
for (char *coma = safekeepers_list; coma != NULL && *coma != '\0'; curr_sk = coma)
|
||||
{
|
||||
if (++n_safekeepers >= MAX_SAFEKEEPERS) {
|
||||
if (++n_safekeepers >= MAX_SAFEKEEPERS)
|
||||
{
|
||||
wpg_log(FATAL, "too many safekeepers");
|
||||
}
|
||||
|
||||
coma = strchr(coma, ',');
|
||||
safekeepers[n_safekeepers-1] = curr_sk;
|
||||
safekeepers[n_safekeepers - 1] = curr_sk;
|
||||
|
||||
if (coma != NULL) {
|
||||
if (coma != NULL)
|
||||
{
|
||||
*coma++ = '\0';
|
||||
}
|
||||
}
|
||||
@@ -252,10 +267,10 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
|
||||
static bool
|
||||
safekeepers_cmp(char *old, char *new)
|
||||
{
|
||||
char *safekeepers_old[MAX_SAFEKEEPERS];
|
||||
char *safekeepers_new[MAX_SAFEKEEPERS];
|
||||
int len_old = 0;
|
||||
int len_new = 0;
|
||||
char *safekeepers_old[MAX_SAFEKEEPERS];
|
||||
char *safekeepers_new[MAX_SAFEKEEPERS];
|
||||
int len_old = 0;
|
||||
int len_new = 0;
|
||||
|
||||
len_old = split_safekeepers_list(old, safekeepers_old);
|
||||
len_new = split_safekeepers_list(new, safekeepers_new);
|
||||
@@ -292,7 +307,8 @@ assign_neon_safekeepers(const char *newval, void *extra)
|
||||
if (!am_walproposer)
|
||||
return;
|
||||
|
||||
if (!newval) {
|
||||
if (!newval)
|
||||
{
|
||||
/* should never happen */
|
||||
wpg_log(FATAL, "neon.safekeepers is empty");
|
||||
}
|
||||
@@ -301,11 +317,11 @@ assign_neon_safekeepers(const char *newval, void *extra)
|
||||
newval_copy = pstrdup(newval);
|
||||
oldval = pstrdup(wal_acceptors_list);
|
||||
|
||||
/*
|
||||
/*
|
||||
* TODO: restarting through FATAL is stupid and introduces 1s delay before
|
||||
* next bgw start. We should refactor walproposer to allow graceful exit and
|
||||
* thus remove this delay.
|
||||
* XXX: If you change anything here, sync with test_safekeepers_reconfigure_reorder.
|
||||
* next bgw start. We should refactor walproposer to allow graceful exit
|
||||
* and thus remove this delay. XXX: If you change anything here, sync with
|
||||
* test_safekeepers_reconfigure_reorder.
|
||||
*/
|
||||
if (!safekeepers_cmp(oldval, newval_copy))
|
||||
{
|
||||
@@ -454,7 +470,8 @@ backpressure_throttling_impl(void)
|
||||
memcpy(new_status, old_status, len);
|
||||
snprintf(new_status + len, 64, "backpressure throttling: lag %lu", lag);
|
||||
set_ps_display(new_status);
|
||||
new_status[len] = '\0'; /* truncate off " backpressure ..." to later reset the ps */
|
||||
new_status[len] = '\0'; /* truncate off " backpressure ..." to later
|
||||
* reset the ps */
|
||||
|
||||
elog(DEBUG2, "backpressure throttling: lag %lu", lag);
|
||||
start = GetCurrentTimestamp();
|
||||
@@ -621,7 +638,7 @@ walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos)
|
||||
wpg_log(LOG, "WAL proposer starts streaming at %X/%X",
|
||||
LSN_FORMAT_ARGS(startpos));
|
||||
cmd.slotname = WAL_PROPOSER_SLOT_NAME;
|
||||
cmd.timeline = wp->greetRequest.timeline;
|
||||
cmd.timeline = wp->config->pgTimeline;
|
||||
cmd.startpoint = startpos;
|
||||
StartProposerReplication(wp, &cmd);
|
||||
}
|
||||
@@ -1963,10 +1980,11 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||
FullTransactionId xmin = hsFeedback.xmin;
|
||||
FullTransactionId catalog_xmin = hsFeedback.catalog_xmin;
|
||||
FullTransactionId next_xid = ReadNextFullTransactionId();
|
||||
|
||||
/*
|
||||
* Page server is updating nextXid in checkpoint each 1024 transactions,
|
||||
* so feedback xmin can be actually larger then nextXid and
|
||||
* function TransactionIdInRecentPast return false in this case,
|
||||
* Page server is updating nextXid in checkpoint each 1024
|
||||
* transactions, so feedback xmin can be actually larger then nextXid
|
||||
* and function TransactionIdInRecentPast return false in this case,
|
||||
* preventing update of slot's xmin.
|
||||
*/
|
||||
if (FullTransactionIdPrecedes(next_xid, xmin))
|
||||
|
||||
@@ -88,13 +88,12 @@ fn bench_process_msg(c: &mut Criterion) {
|
||||
let (lsn, record) = walgen.next().expect("endless WAL");
|
||||
ProposerAcceptorMessage::AppendRequest(AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
generation: 0,
|
||||
term: 1,
|
||||
term_start_lsn: Lsn(0),
|
||||
begin_lsn: lsn,
|
||||
end_lsn: lsn + record.len() as u64,
|
||||
commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
},
|
||||
wal_data: record,
|
||||
})
|
||||
@@ -160,13 +159,12 @@ fn bench_wal_acceptor(c: &mut Criterion) {
|
||||
.take(n)
|
||||
.map(|(lsn, record)| AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
generation: 0,
|
||||
term: 1,
|
||||
term_start_lsn: Lsn(0),
|
||||
begin_lsn: lsn,
|
||||
end_lsn: lsn + record.len() as u64,
|
||||
commit_lsn: Lsn(0),
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
},
|
||||
wal_data: record,
|
||||
})
|
||||
@@ -262,13 +260,12 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
|
||||
runtime.block_on(async {
|
||||
let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
generation: 0,
|
||||
term: 1,
|
||||
term_start_lsn: Lsn(0),
|
||||
begin_lsn: lsn,
|
||||
end_lsn: lsn + record.len() as u64,
|
||||
commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
},
|
||||
wal_data: record,
|
||||
});
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
use anyhow::Context;
|
||||
use postgres_backend::QueryError;
|
||||
use safekeeper_api::membership::Configuration;
|
||||
use safekeeper_api::membership::{Configuration, INVALID_GENERATION};
|
||||
use safekeeper_api::{ServerInfo, Term};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -133,10 +133,10 @@ async fn send_proposer_elected(
|
||||
let history = TermHistory(history_entries);
|
||||
|
||||
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
generation: INVALID_GENERATION,
|
||||
term,
|
||||
start_streaming_at: lsn,
|
||||
term_history: history,
|
||||
timeline_start_lsn: lsn,
|
||||
});
|
||||
|
||||
tli.process_msg(&proposer_elected_request).await?;
|
||||
@@ -170,13 +170,12 @@ pub async fn append_logical_message(
|
||||
|
||||
let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
generation: INVALID_GENERATION,
|
||||
term: msg.term,
|
||||
term_start_lsn: begin_lsn,
|
||||
begin_lsn,
|
||||
end_lsn,
|
||||
commit_lsn,
|
||||
truncate_lsn: msg.truncate_lsn,
|
||||
proposer_uuid: [0u8; 16],
|
||||
},
|
||||
wal_data,
|
||||
});
|
||||
|
||||
@@ -281,7 +281,7 @@ impl SafekeeperPostgresHandler {
|
||||
tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
|
||||
r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
|
||||
r = network_write(pgb, reply_rx, pageserver_feedback_rx, proto_version) => r,
|
||||
_ = timeline_cancel.cancelled() => {
|
||||
return Err(CopyStreamHandlerEnd::Cancelled);
|
||||
}
|
||||
@@ -342,8 +342,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
|
||||
let tli = match next_msg {
|
||||
ProposerAcceptorMessage::Greeting(ref greeting) => {
|
||||
info!(
|
||||
"start handshake with walproposer {} sysid {} timeline {}",
|
||||
self.peer_addr, greeting.system_id, greeting.tli,
|
||||
"start handshake with walproposer {} sysid {}",
|
||||
self.peer_addr, greeting.system_id,
|
||||
);
|
||||
let server_info = ServerInfo {
|
||||
pg_version: greeting.pg_version,
|
||||
@@ -459,6 +459,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
pgb_writer: &mut PostgresBackend<IO>,
|
||||
mut reply_rx: Receiver<AcceptorProposerMessage>,
|
||||
mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
|
||||
proto_version: u32,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let mut buf = BytesMut::with_capacity(128);
|
||||
|
||||
@@ -496,7 +497,7 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
};
|
||||
|
||||
buf.clear();
|
||||
msg.serialize(&mut buf)?;
|
||||
msg.serialize(&mut buf, proto_version)?;
|
||||
pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::{fmt, pin::pin};
|
||||
use anyhow::{bail, Context};
|
||||
use futures::StreamExt;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use safekeeper_api::membership::INVALID_GENERATION;
|
||||
use safekeeper_api::models::{PeerInfo, TimelineStatus};
|
||||
use safekeeper_api::Term;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
@@ -267,7 +268,10 @@ async fn recover(
|
||||
);
|
||||
|
||||
// Now understand our term history.
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term });
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
|
||||
generation: INVALID_GENERATION,
|
||||
term: donor.term,
|
||||
});
|
||||
let vote_response = match tli
|
||||
.process_msg(&vote_request)
|
||||
.await
|
||||
@@ -302,10 +306,10 @@ async fn recover(
|
||||
|
||||
// truncate WAL locally
|
||||
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
generation: INVALID_GENERATION,
|
||||
term: donor.term,
|
||||
start_streaming_at: last_common_point.lsn,
|
||||
term_history: donor_th,
|
||||
timeline_start_lsn: Lsn::INVALID,
|
||||
});
|
||||
// Successful ProposerElected handling always returns None. If term changed,
|
||||
// we'll find out that during the streaming. Note: it is expected to get
|
||||
@@ -434,13 +438,12 @@ async fn network_io(
|
||||
match msg {
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
let ar_hdr = AppendRequestHeader {
|
||||
generation: INVALID_GENERATION,
|
||||
term: donor.term,
|
||||
term_start_lsn: Lsn::INVALID, // unused
|
||||
begin_lsn: Lsn(xlog_data.wal_start()),
|
||||
end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64,
|
||||
commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it
|
||||
truncate_lsn: Lsn::INVALID, // do not attempt to advance
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let ar = AppendRequest {
|
||||
h: ar_hdr,
|
||||
|
||||
@@ -5,6 +5,11 @@ use byteorder::{LittleEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
|
||||
use safekeeper_api::membership;
|
||||
use safekeeper_api::membership::Generation;
|
||||
use safekeeper_api::membership::MemberSet;
|
||||
use safekeeper_api::membership::SafekeeperId;
|
||||
use safekeeper_api::membership::INVALID_GENERATION;
|
||||
use safekeeper_api::models::HotStandbyFeedback;
|
||||
use safekeeper_api::Term;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -12,6 +17,7 @@ use std::cmp::max;
|
||||
use std::cmp::min;
|
||||
use std::fmt;
|
||||
use std::io::Read;
|
||||
use std::str::FromStr;
|
||||
use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
|
||||
use tracing::*;
|
||||
@@ -29,7 +35,8 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
pub const SK_PROTOCOL_VERSION: u32 = 2;
|
||||
pub const SK_PROTO_VERSION_2: u32 = 2;
|
||||
pub const SK_PROTO_VERSION_3: u32 = 3;
|
||||
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
@@ -56,8 +63,28 @@ impl TermHistory {
|
||||
TermHistory(Vec::new())
|
||||
}
|
||||
|
||||
// Parse TermHistory as n_entries followed by TermLsn pairs
|
||||
// Parse TermHistory as n_entries followed by TermLsn pairs in network order.
|
||||
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
|
||||
let n_entries = bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "TermHistory misses len")?;
|
||||
let mut res = Vec::with_capacity(n_entries as usize);
|
||||
for i in 0..n_entries {
|
||||
let term = bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| format!("TermHistory pos {} misses term", i))?;
|
||||
let lsn = bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| format!("TermHistory pos {} misses lsn", i))?
|
||||
.into();
|
||||
res.push(TermLsn { term, lsn })
|
||||
}
|
||||
Ok(TermHistory(res))
|
||||
}
|
||||
|
||||
// Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
|
||||
// TODO remove once v2 protocol is fully dropped.
|
||||
pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
|
||||
if bytes.remaining() < 4 {
|
||||
bail!("TermHistory misses len");
|
||||
}
|
||||
@@ -197,6 +224,18 @@ impl AcceptorState {
|
||||
/// Initial Proposer -> Acceptor message
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ProposerGreeting {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub mconf: membership::Configuration,
|
||||
/// Postgres server version
|
||||
pub pg_version: u32,
|
||||
pub system_id: SystemId,
|
||||
pub wal_seg_size: u32,
|
||||
}
|
||||
|
||||
/// V2 of the message; exists as a struct because we (de)serialized it as is.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ProposerGreetingV2 {
|
||||
/// proposer-acceptor protocol version
|
||||
pub protocol_version: u32,
|
||||
/// Postgres server version
|
||||
@@ -213,27 +252,35 @@ pub struct ProposerGreeting {
|
||||
/// (acceptor voted for).
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct AcceptorGreeting {
|
||||
term: u64,
|
||||
node_id: NodeId,
|
||||
mconf: membership::Configuration,
|
||||
term: u64,
|
||||
}
|
||||
|
||||
/// Vote request sent from proposer to safekeepers
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct VoteRequest {
|
||||
pub generation: Generation,
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
/// V2 of the message; exists as a struct because we (de)serialized it as is.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct VoteRequestV2 {
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
/// Vote itself, sent from safekeeper to proposer
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct VoteResponse {
|
||||
generation: Generation, // membership conf generation
|
||||
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
vote_given: u64, // fixme u64 due to padding
|
||||
vote_given: bool,
|
||||
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
|
||||
// proposer to choose the most advanced one.
|
||||
pub flush_lsn: Lsn,
|
||||
truncate_lsn: Lsn,
|
||||
pub term_history: TermHistory,
|
||||
timeline_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -242,10 +289,10 @@ pub struct VoteResponse {
|
||||
*/
|
||||
#[derive(Debug)]
|
||||
pub struct ProposerElected {
|
||||
pub generation: Generation, // membership conf generation
|
||||
pub term: Term,
|
||||
pub start_streaming_at: Lsn,
|
||||
pub term_history: TermHistory,
|
||||
pub timeline_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// Request with WAL message sent from proposer to safekeeper. Along the way it
|
||||
@@ -257,6 +304,22 @@ pub struct AppendRequest {
|
||||
}
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct AppendRequestHeader {
|
||||
pub generation: Generation, // membership conf generation
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
/// start position of message in WAL
|
||||
pub begin_lsn: Lsn,
|
||||
/// end position of message in WAL
|
||||
pub end_lsn: Lsn,
|
||||
/// LSN committed by quorum of safekeepers
|
||||
pub commit_lsn: Lsn,
|
||||
/// minimal LSN which may be needed by proposer to perform recovery of some safekeeper
|
||||
pub truncate_lsn: Lsn,
|
||||
}
|
||||
|
||||
/// V2 of the message; exists as a struct because we (de)serialized it as is.
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct AppendRequestHeaderV2 {
|
||||
// safekeeper's current term; if it is higher than proposer's, the compute is out of date.
|
||||
pub term: Term,
|
||||
// TODO: remove this field from the protocol, it in unused -- LSN of term
|
||||
@@ -277,6 +340,9 @@ pub struct AppendRequestHeader {
|
||||
/// Report safekeeper state to proposer
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct AppendResponse {
|
||||
// Membership conf generation. Not strictly required because on mismatch
|
||||
// connection is reset, but let's sanity check it.
|
||||
generation: Generation,
|
||||
// Current term of the safekeeper; if it is higher than proposer's, the
|
||||
// compute is out of date.
|
||||
pub term: Term,
|
||||
@@ -293,8 +359,9 @@ pub struct AppendResponse {
|
||||
}
|
||||
|
||||
impl AppendResponse {
|
||||
fn term_only(term: Term) -> AppendResponse {
|
||||
fn term_only(generation: Generation, term: Term) -> AppendResponse {
|
||||
AppendResponse {
|
||||
generation,
|
||||
term,
|
||||
flush_lsn: Lsn(0),
|
||||
commit_lsn: Lsn(0),
|
||||
@@ -315,72 +382,316 @@ pub enum ProposerAcceptorMessage {
|
||||
FlushWAL,
|
||||
}
|
||||
|
||||
impl ProposerAcceptorMessage {
|
||||
/// Parse proposer message.
|
||||
pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
|
||||
if proto_version != SK_PROTOCOL_VERSION {
|
||||
bail!(
|
||||
"incompatible protocol version {}, expected {}",
|
||||
proto_version,
|
||||
SK_PROTOCOL_VERSION
|
||||
);
|
||||
/// Augment Bytes with fallible get_uN where N is number of bytes methods.
|
||||
/// All reads are in network (big endian) order.
|
||||
trait BytesF {
|
||||
fn get_u8_f(&mut self) -> Result<u8>;
|
||||
fn get_u16_f(&mut self) -> Result<u16>;
|
||||
fn get_u32_f(&mut self) -> Result<u32>;
|
||||
fn get_u64_f(&mut self) -> Result<u64>;
|
||||
}
|
||||
|
||||
impl BytesF for Bytes {
|
||||
fn get_u8_f(&mut self) -> Result<u8> {
|
||||
if self.is_empty() {
|
||||
bail!("no bytes left, expected 1");
|
||||
}
|
||||
// xxx using Reader is inefficient but easy to work with bincode
|
||||
let mut stream = msg_bytes.reader();
|
||||
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
|
||||
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
let msg = ProposerGreeting::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::Greeting(msg))
|
||||
}
|
||||
'v' => {
|
||||
let msg = VoteRequest::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::VoteRequest(msg))
|
||||
}
|
||||
'e' => {
|
||||
let mut msg_bytes = stream.into_inner();
|
||||
if msg_bytes.remaining() < 16 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let term = msg_bytes.get_u64_le();
|
||||
let start_streaming_at = msg_bytes.get_u64_le().into();
|
||||
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
|
||||
if msg_bytes.remaining() < 8 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let timeline_start_lsn = msg_bytes.get_u64_le().into();
|
||||
let msg = ProposerElected {
|
||||
term,
|
||||
start_streaming_at,
|
||||
timeline_start_lsn,
|
||||
term_history,
|
||||
Ok(self.get_u8())
|
||||
}
|
||||
fn get_u16_f(&mut self) -> Result<u16> {
|
||||
if self.remaining() < 2 {
|
||||
bail!("no bytes left, expected 2");
|
||||
}
|
||||
Ok(self.get_u16())
|
||||
}
|
||||
fn get_u32_f(&mut self) -> Result<u32> {
|
||||
if self.remaining() < 4 {
|
||||
bail!("only {} bytes left, expected 4", self.remaining());
|
||||
}
|
||||
Ok(self.get_u32())
|
||||
}
|
||||
fn get_u64_f(&mut self) -> Result<u64> {
|
||||
if self.remaining() < 8 {
|
||||
bail!("only {} bytes left, expected 8", self.remaining());
|
||||
}
|
||||
Ok(self.get_u64())
|
||||
}
|
||||
}
|
||||
|
||||
impl ProposerAcceptorMessage {
|
||||
/// Read cstring from Bytes.
|
||||
fn get_cstr(buf: &mut Bytes) -> Result<String> {
|
||||
let pos = buf
|
||||
.iter()
|
||||
.position(|x| *x == 0)
|
||||
.ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
|
||||
let result = buf.split_to(pos);
|
||||
buf.advance(1); // drop the null terminator
|
||||
match std::str::from_utf8(&result) {
|
||||
Ok(s) => Ok(s.to_string()),
|
||||
Err(e) => bail!("invalid utf8 in cstring: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Read membership::Configuration from Bytes.
|
||||
fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
|
||||
let generation = buf.get_u32_f().with_context(|| "reading generation")?;
|
||||
let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
|
||||
// Main member set must have at least someone in valid configuration.
|
||||
// Empty conf is allowed until we fully migrate.
|
||||
if generation != INVALID_GENERATION && members_len == 0 {
|
||||
bail!("empty members_len");
|
||||
}
|
||||
let mut members = MemberSet::empty();
|
||||
for i in 0..members_len {
|
||||
let id = buf
|
||||
.get_u64_f()
|
||||
.with_context(|| format!("reading member {} node_id", i))?;
|
||||
let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?;
|
||||
let pg_port = buf
|
||||
.get_u16_f()
|
||||
.with_context(|| format!("reading member {} port", i))?;
|
||||
let sk = SafekeeperId {
|
||||
id: NodeId(id),
|
||||
host,
|
||||
pg_port,
|
||||
};
|
||||
members.add(sk)?;
|
||||
}
|
||||
let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
|
||||
// Non joint conf.
|
||||
if new_members_len == 0 {
|
||||
Ok(membership::Configuration {
|
||||
generation,
|
||||
members,
|
||||
new_members: None,
|
||||
})
|
||||
} else {
|
||||
let mut new_members = MemberSet::empty();
|
||||
for i in 0..new_members_len {
|
||||
let id = buf
|
||||
.get_u64_f()
|
||||
.with_context(|| format!("reading new member {} node_id", i))?;
|
||||
let host = Self::get_cstr(buf)
|
||||
.with_context(|| format!("reading new member {} host", i))?;
|
||||
let pg_port = buf
|
||||
.get_u16_f()
|
||||
.with_context(|| format!("reading new member {} port", i))?;
|
||||
let sk = SafekeeperId {
|
||||
id: NodeId(id),
|
||||
host,
|
||||
pg_port,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Elected(msg))
|
||||
new_members.add(sk)?;
|
||||
}
|
||||
'a' => {
|
||||
// read header followed by wal data
|
||||
let hdr = AppendRequestHeader::des_from(&mut stream)?;
|
||||
let rec_size = hdr
|
||||
.end_lsn
|
||||
.checked_sub(hdr.begin_lsn)
|
||||
.context("begin_lsn > end_lsn in AppendRequest")?
|
||||
.0 as usize;
|
||||
if rec_size > MAX_SEND_SIZE {
|
||||
bail!(
|
||||
"AppendRequest is longer than MAX_SEND_SIZE ({})",
|
||||
MAX_SEND_SIZE
|
||||
);
|
||||
Ok(membership::Configuration {
|
||||
generation,
|
||||
members,
|
||||
new_members: Some(new_members),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse proposer message.
|
||||
pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
|
||||
if proto_version == SK_PROTO_VERSION_3 {
|
||||
if msg_bytes.is_empty() {
|
||||
bail!("ProposerAcceptorMessage is not complete: missing tag");
|
||||
}
|
||||
let tag = msg_bytes.get_u8_f().with_context(|| {
|
||||
"ProposerAcceptorMessage is not complete: missing tag".to_string()
|
||||
})? as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
let tenant_id_str =
|
||||
Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
|
||||
let tenant_id = TenantId::from_str(&tenant_id_str)?;
|
||||
let timeline_id_str =
|
||||
Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
|
||||
let timeline_id = TimelineId::from_str(&timeline_id_str)?;
|
||||
let mconf = Self::get_mconf(&mut msg_bytes)?;
|
||||
let pg_version = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading pg_version")?;
|
||||
let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
|
||||
let wal_seg_size = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading wal_seg_size")?;
|
||||
let g = ProposerGreeting {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
mconf,
|
||||
pg_version,
|
||||
system_id,
|
||||
wal_seg_size,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Greeting(g))
|
||||
}
|
||||
'v' => {
|
||||
let generation = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading generation")?;
|
||||
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
|
||||
let v = VoteRequest { generation, term };
|
||||
Ok(ProposerAcceptorMessage::VoteRequest(v))
|
||||
}
|
||||
'e' => {
|
||||
let generation = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading generation")?;
|
||||
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
|
||||
let start_streaming_at: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading start_streaming_at")?
|
||||
.into();
|
||||
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
|
||||
let msg = ProposerElected {
|
||||
generation,
|
||||
term,
|
||||
start_streaming_at,
|
||||
term_history,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Elected(msg))
|
||||
}
|
||||
'a' => {
|
||||
let generation = msg_bytes
|
||||
.get_u32_f()
|
||||
.with_context(|| "reading generation")?;
|
||||
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
|
||||
let begin_lsn: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading begin_lsn")?
|
||||
.into();
|
||||
let end_lsn: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading end_lsn")?
|
||||
.into();
|
||||
let commit_lsn: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading commit_lsn")?
|
||||
.into();
|
||||
let truncate_lsn: Lsn = msg_bytes
|
||||
.get_u64_f()
|
||||
.with_context(|| "reading truncate_lsn")?
|
||||
.into();
|
||||
let hdr = AppendRequestHeader {
|
||||
generation,
|
||||
term,
|
||||
begin_lsn,
|
||||
end_lsn,
|
||||
commit_lsn,
|
||||
truncate_lsn,
|
||||
};
|
||||
let rec_size = hdr
|
||||
.end_lsn
|
||||
.checked_sub(hdr.begin_lsn)
|
||||
.context("begin_lsn > end_lsn in AppendRequest")?
|
||||
.0 as usize;
|
||||
if rec_size > MAX_SEND_SIZE {
|
||||
bail!(
|
||||
"AppendRequest is longer than MAX_SEND_SIZE ({})",
|
||||
MAX_SEND_SIZE
|
||||
);
|
||||
}
|
||||
if msg_bytes.remaining() < rec_size {
|
||||
bail!(
|
||||
"reading WAL: only {} bytes left, wanted {}",
|
||||
msg_bytes.remaining(),
|
||||
rec_size
|
||||
);
|
||||
}
|
||||
let wal_data = msg_bytes.copy_to_bytes(rec_size);
|
||||
let msg = AppendRequest { h: hdr, wal_data };
|
||||
|
||||
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
|
||||
stream.read_exact(&mut wal_data_vec)?;
|
||||
let wal_data = Bytes::from(wal_data_vec);
|
||||
let msg = AppendRequest { h: hdr, wal_data };
|
||||
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
}
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
}
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
} else if proto_version == SK_PROTO_VERSION_2 {
|
||||
// xxx using Reader is inefficient but easy to work with bincode
|
||||
let mut stream = msg_bytes.reader();
|
||||
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
|
||||
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
|
||||
let g = ProposerGreeting {
|
||||
tenant_id: msgv2.tenant_id,
|
||||
timeline_id: msgv2.timeline_id,
|
||||
mconf: membership::Configuration {
|
||||
generation: INVALID_GENERATION,
|
||||
members: MemberSet::empty(),
|
||||
new_members: None,
|
||||
},
|
||||
pg_version: msgv2.pg_version,
|
||||
system_id: msgv2.system_id,
|
||||
wal_seg_size: msgv2.wal_seg_size,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Greeting(g))
|
||||
}
|
||||
'v' => {
|
||||
let msg = VoteRequestV2::des_from(&mut stream)?;
|
||||
let v = VoteRequest {
|
||||
generation: INVALID_GENERATION,
|
||||
term: msg.term,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::VoteRequest(v))
|
||||
}
|
||||
'e' => {
|
||||
let mut msg_bytes = stream.into_inner();
|
||||
if msg_bytes.remaining() < 16 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let term = msg_bytes.get_u64_le();
|
||||
let start_streaming_at = msg_bytes.get_u64_le().into();
|
||||
let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
|
||||
if msg_bytes.remaining() < 8 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let _timeline_start_lsn = msg_bytes.get_u64_le();
|
||||
let msg = ProposerElected {
|
||||
generation: INVALID_GENERATION,
|
||||
term,
|
||||
start_streaming_at,
|
||||
term_history,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Elected(msg))
|
||||
}
|
||||
'a' => {
|
||||
// read header followed by wal data
|
||||
let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?;
|
||||
let hdr = AppendRequestHeader {
|
||||
generation: INVALID_GENERATION,
|
||||
term: hdrv2.term,
|
||||
begin_lsn: hdrv2.begin_lsn,
|
||||
end_lsn: hdrv2.end_lsn,
|
||||
commit_lsn: hdrv2.commit_lsn,
|
||||
truncate_lsn: hdrv2.truncate_lsn,
|
||||
};
|
||||
let rec_size = hdr
|
||||
.end_lsn
|
||||
.checked_sub(hdr.begin_lsn)
|
||||
.context("begin_lsn > end_lsn in AppendRequest")?
|
||||
.0 as usize;
|
||||
if rec_size > MAX_SEND_SIZE {
|
||||
bail!(
|
||||
"AppendRequest is longer than MAX_SEND_SIZE ({})",
|
||||
MAX_SEND_SIZE
|
||||
);
|
||||
}
|
||||
|
||||
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
|
||||
stream.read_exact(&mut wal_data_vec)?;
|
||||
let wal_data = Bytes::from(wal_data_vec);
|
||||
|
||||
let msg = AppendRequest { h: hdr, wal_data };
|
||||
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
}
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
}
|
||||
} else {
|
||||
bail!("unsupported protocol version {}", proto_version);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -394,36 +705,21 @@ impl ProposerAcceptorMessage {
|
||||
// We explicitly list all fields, to draw attention here when new fields are added.
|
||||
let mut size = BASE_SIZE;
|
||||
size += match self {
|
||||
Self::Greeting(ProposerGreeting {
|
||||
protocol_version: _,
|
||||
pg_version: _,
|
||||
proposer_id: _,
|
||||
system_id: _,
|
||||
timeline_id: _,
|
||||
tenant_id: _,
|
||||
tli: _,
|
||||
wal_seg_size: _,
|
||||
}) => 0,
|
||||
Self::Greeting(_) => 0,
|
||||
|
||||
Self::VoteRequest(VoteRequest { term: _ }) => 0,
|
||||
Self::VoteRequest(_) => 0,
|
||||
|
||||
Self::Elected(ProposerElected {
|
||||
term: _,
|
||||
start_streaming_at: _,
|
||||
term_history: _,
|
||||
timeline_start_lsn: _,
|
||||
}) => 0,
|
||||
Self::Elected(_) => 0,
|
||||
|
||||
Self::AppendRequest(AppendRequest {
|
||||
h:
|
||||
AppendRequestHeader {
|
||||
generation: _,
|
||||
term: _,
|
||||
term_start_lsn: _,
|
||||
begin_lsn: _,
|
||||
end_lsn: _,
|
||||
commit_lsn: _,
|
||||
truncate_lsn: _,
|
||||
proposer_uuid: _,
|
||||
},
|
||||
wal_data,
|
||||
}) => wal_data.len(),
|
||||
@@ -431,13 +727,12 @@ impl ProposerAcceptorMessage {
|
||||
Self::NoFlushAppendRequest(AppendRequest {
|
||||
h:
|
||||
AppendRequestHeader {
|
||||
generation: _,
|
||||
term: _,
|
||||
term_start_lsn: _,
|
||||
begin_lsn: _,
|
||||
end_lsn: _,
|
||||
commit_lsn: _,
|
||||
truncate_lsn: _,
|
||||
proposer_uuid: _,
|
||||
},
|
||||
wal_data,
|
||||
}) => wal_data.len(),
|
||||
@@ -458,45 +753,118 @@ pub enum AcceptorProposerMessage {
|
||||
}
|
||||
|
||||
impl AcceptorProposerMessage {
|
||||
/// Serialize acceptor -> proposer message.
|
||||
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
|
||||
match self {
|
||||
AcceptorProposerMessage::Greeting(msg) => {
|
||||
buf.put_u64_le('g' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.node_id.0);
|
||||
}
|
||||
AcceptorProposerMessage::VoteResponse(msg) => {
|
||||
buf.put_u64_le('v' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.vote_given);
|
||||
buf.put_u64_le(msg.flush_lsn.into());
|
||||
buf.put_u64_le(msg.truncate_lsn.into());
|
||||
buf.put_u32_le(msg.term_history.0.len() as u32);
|
||||
for e in &msg.term_history.0 {
|
||||
buf.put_u64_le(e.term);
|
||||
buf.put_u64_le(e.lsn.into());
|
||||
}
|
||||
buf.put_u64_le(msg.timeline_start_lsn.into());
|
||||
}
|
||||
AcceptorProposerMessage::AppendResponse(msg) => {
|
||||
buf.put_u64_le('a' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.flush_lsn.into());
|
||||
buf.put_u64_le(msg.commit_lsn.into());
|
||||
buf.put_i64_le(msg.hs_feedback.ts);
|
||||
buf.put_u64_le(msg.hs_feedback.xmin);
|
||||
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
|
||||
fn put_cstr(buf: &mut BytesMut, s: &str) {
|
||||
buf.put_slice(s.as_bytes());
|
||||
buf.put_u8(0); // null terminator
|
||||
}
|
||||
|
||||
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
|
||||
// if it is not present.
|
||||
if let Some(ref msg) = msg.pageserver_feedback {
|
||||
msg.serialize(buf);
|
||||
}
|
||||
}
|
||||
/// Serialize membership::Configuration into buf.
|
||||
fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
|
||||
buf.put_u32(mconf.generation);
|
||||
buf.put_u32(mconf.members.m.len() as u32);
|
||||
for sk in &mconf.members.m {
|
||||
buf.put_u64(sk.id.0);
|
||||
Self::put_cstr(buf, &sk.host);
|
||||
buf.put_u16(sk.pg_port);
|
||||
}
|
||||
if let Some(ref new_members) = mconf.new_members {
|
||||
buf.put_u32(new_members.m.len() as u32);
|
||||
for sk in &new_members.m {
|
||||
buf.put_u64(sk.id.0);
|
||||
Self::put_cstr(buf, &sk.host);
|
||||
buf.put_u16(sk.pg_port);
|
||||
}
|
||||
} else {
|
||||
buf.put_u32(0);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
/// Serialize acceptor -> proposer message.
|
||||
pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
|
||||
if proto_version == SK_PROTO_VERSION_3 {
|
||||
match self {
|
||||
AcceptorProposerMessage::Greeting(msg) => {
|
||||
buf.put_u8(b'g');
|
||||
buf.put_u64(msg.node_id.0);
|
||||
Self::serialize_mconf(buf, &msg.mconf);
|
||||
buf.put_u64(msg.term)
|
||||
}
|
||||
AcceptorProposerMessage::VoteResponse(msg) => {
|
||||
buf.put_u8(b'v');
|
||||
buf.put_u32(msg.generation);
|
||||
buf.put_u64(msg.term);
|
||||
buf.put_u8(msg.vote_given as u8);
|
||||
buf.put_u64(msg.flush_lsn.into());
|
||||
buf.put_u64(msg.truncate_lsn.into());
|
||||
buf.put_u32(msg.term_history.0.len() as u32);
|
||||
for e in &msg.term_history.0 {
|
||||
buf.put_u64(e.term);
|
||||
buf.put_u64(e.lsn.into());
|
||||
}
|
||||
}
|
||||
AcceptorProposerMessage::AppendResponse(msg) => {
|
||||
buf.put_u8(b'a');
|
||||
buf.put_u32(msg.generation);
|
||||
buf.put_u64(msg.term);
|
||||
buf.put_u64(msg.flush_lsn.into());
|
||||
buf.put_u64(msg.commit_lsn.into());
|
||||
buf.put_i64(msg.hs_feedback.ts);
|
||||
buf.put_u64(msg.hs_feedback.xmin);
|
||||
buf.put_u64(msg.hs_feedback.catalog_xmin);
|
||||
|
||||
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
|
||||
// if it is not present.
|
||||
if let Some(ref msg) = msg.pageserver_feedback {
|
||||
msg.serialize(buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
// TODO remove 3 after converting all msgs
|
||||
} else if proto_version == SK_PROTO_VERSION_2 {
|
||||
match self {
|
||||
AcceptorProposerMessage::Greeting(msg) => {
|
||||
buf.put_u64_le('g' as u64);
|
||||
// v2 didn't have mconf and fields were reordered
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.node_id.0);
|
||||
}
|
||||
AcceptorProposerMessage::VoteResponse(msg) => {
|
||||
// v2 didn't have generation, had u64 vote_given and timeline_start_lsn
|
||||
buf.put_u64_le('v' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.vote_given as u64);
|
||||
buf.put_u64_le(msg.flush_lsn.into());
|
||||
buf.put_u64_le(msg.truncate_lsn.into());
|
||||
buf.put_u32_le(msg.term_history.0.len() as u32);
|
||||
for e in &msg.term_history.0 {
|
||||
buf.put_u64_le(e.term);
|
||||
buf.put_u64_le(e.lsn.into());
|
||||
}
|
||||
// removed timeline_start_lsn
|
||||
buf.put_u64_le(0);
|
||||
}
|
||||
AcceptorProposerMessage::AppendResponse(msg) => {
|
||||
// v2 didn't have generation
|
||||
buf.put_u64_le('a' as u64);
|
||||
buf.put_u64_le(msg.term);
|
||||
buf.put_u64_le(msg.flush_lsn.into());
|
||||
buf.put_u64_le(msg.commit_lsn.into());
|
||||
buf.put_i64_le(msg.hs_feedback.ts);
|
||||
buf.put_u64_le(msg.hs_feedback.xmin);
|
||||
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
|
||||
|
||||
// AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback
|
||||
// if it is not present.
|
||||
if let Some(ref msg) = msg.pageserver_feedback {
|
||||
msg.serialize(buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
bail!("unsupported protocol version {}", proto_version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -593,14 +961,6 @@ where
|
||||
&mut self,
|
||||
msg: &ProposerGreeting,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
// Check protocol compatibility
|
||||
if msg.protocol_version != SK_PROTOCOL_VERSION {
|
||||
bail!(
|
||||
"incompatible protocol version {}, expected {}",
|
||||
msg.protocol_version,
|
||||
SK_PROTOCOL_VERSION
|
||||
);
|
||||
}
|
||||
/* Postgres major version mismatch is treated as fatal error
|
||||
* because safekeepers parse WAL headers and the format
|
||||
* may change between versions.
|
||||
@@ -655,15 +1015,16 @@ where
|
||||
self.state.finish_change(&state).await?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"processed greeting from walproposer {}, sending term {:?}",
|
||||
msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
|
||||
self.state.acceptor_state.term
|
||||
);
|
||||
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
|
||||
term: self.state.acceptor_state.term,
|
||||
let apg = AcceptorGreeting {
|
||||
node_id: self.node_id,
|
||||
})))
|
||||
mconf: self.state.mconf.clone(),
|
||||
term: self.state.acceptor_state.term,
|
||||
};
|
||||
info!(
|
||||
"processed greeting {:?} from walproposer, sending {:?}",
|
||||
msg, apg
|
||||
);
|
||||
Ok(Some(AcceptorProposerMessage::Greeting(apg)))
|
||||
}
|
||||
|
||||
/// Give vote for the given term, if we haven't done that previously.
|
||||
@@ -684,12 +1045,12 @@ where
|
||||
self.wal_store.flush_wal().await?;
|
||||
// initialize with refusal
|
||||
let mut resp = VoteResponse {
|
||||
generation: self.state.mconf.generation,
|
||||
term: self.state.acceptor_state.term,
|
||||
vote_given: false as u64,
|
||||
vote_given: false,
|
||||
flush_lsn: self.flush_lsn(),
|
||||
truncate_lsn: self.state.inmem.peer_horizon_lsn,
|
||||
term_history: self.get_term_history(),
|
||||
timeline_start_lsn: self.state.timeline_start_lsn,
|
||||
};
|
||||
if self.state.acceptor_state.term < msg.term {
|
||||
let mut state = self.state.start_change();
|
||||
@@ -698,15 +1059,16 @@ where
|
||||
self.state.finish_change(&state).await?;
|
||||
|
||||
resp.term = self.state.acceptor_state.term;
|
||||
resp.vote_given = true as u64;
|
||||
resp.vote_given = true;
|
||||
}
|
||||
info!("processed VoteRequest for term {}: {:?}", msg.term, &resp);
|
||||
info!("processed {:?}: sending {:?}", msg, &resp);
|
||||
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
|
||||
}
|
||||
|
||||
/// Form AppendResponse from current state.
|
||||
fn append_response(&self) -> AppendResponse {
|
||||
let ar = AppendResponse {
|
||||
generation: self.state.mconf.generation,
|
||||
term: self.state.acceptor_state.term,
|
||||
flush_lsn: self.flush_lsn(),
|
||||
commit_lsn: self.state.commit_lsn,
|
||||
@@ -805,18 +1167,22 @@ where
|
||||
// Here we learn initial LSN for the first time, set fields
|
||||
// interested in that.
|
||||
|
||||
if state.timeline_start_lsn == Lsn(0) {
|
||||
// Remember point where WAL begins globally.
|
||||
state.timeline_start_lsn = msg.timeline_start_lsn;
|
||||
info!(
|
||||
"setting timeline_start_lsn to {:?}",
|
||||
state.timeline_start_lsn
|
||||
);
|
||||
if let Some(start_lsn) = msg.term_history.0.first() {
|
||||
if state.timeline_start_lsn == Lsn(0) {
|
||||
// Remember point where WAL begins globally. In the future it
|
||||
// will be intialized immediately on timeline creation.
|
||||
state.timeline_start_lsn = start_lsn.lsn;
|
||||
info!(
|
||||
"setting timeline_start_lsn to {:?}",
|
||||
state.timeline_start_lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if state.peer_horizon_lsn == Lsn(0) {
|
||||
// Update peer_horizon_lsn as soon as we know where timeline starts.
|
||||
// It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn.
|
||||
state.peer_horizon_lsn = msg.timeline_start_lsn;
|
||||
state.peer_horizon_lsn = state.timeline_start_lsn;
|
||||
}
|
||||
if state.local_start_lsn == Lsn(0) {
|
||||
state.local_start_lsn = msg.start_streaming_at;
|
||||
@@ -896,7 +1262,10 @@ where
|
||||
|
||||
// If our term is higher, immediately refuse the message.
|
||||
if self.state.acceptor_state.term > msg.h.term {
|
||||
let resp = AppendResponse::term_only(self.state.acceptor_state.term);
|
||||
let resp = AppendResponse::term_only(
|
||||
self.state.mconf.generation,
|
||||
self.state.acceptor_state.term,
|
||||
);
|
||||
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
|
||||
}
|
||||
|
||||
@@ -924,10 +1293,8 @@ where
|
||||
);
|
||||
}
|
||||
|
||||
// Now we know that we are in the same term as the proposer,
|
||||
// processing the message.
|
||||
|
||||
self.state.inmem.proposer_uuid = msg.h.proposer_uuid;
|
||||
// Now we know that we are in the same term as the proposer, process the
|
||||
// message.
|
||||
|
||||
// do the job
|
||||
if !msg.wal_data.is_empty() {
|
||||
@@ -1097,10 +1464,13 @@ mod tests {
|
||||
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
|
||||
|
||||
// check voting for 1 is ok
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
|
||||
generation: 0,
|
||||
term: 1,
|
||||
});
|
||||
let mut vote_resp = sk.process_msg(&vote_request).await;
|
||||
match vote_resp.unwrap() {
|
||||
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
|
||||
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
|
||||
r => panic!("unexpected response: {:?}", r),
|
||||
}
|
||||
|
||||
@@ -1115,7 +1485,7 @@ mod tests {
|
||||
// and ensure voting second time for 1 is not ok
|
||||
vote_resp = sk.process_msg(&vote_request).await;
|
||||
match vote_resp.unwrap() {
|
||||
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
|
||||
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
|
||||
r => panic!("unexpected response: {:?}", r),
|
||||
}
|
||||
}
|
||||
@@ -1130,13 +1500,12 @@ mod tests {
|
||||
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
|
||||
|
||||
let mut ar_hdr = AppendRequestHeader {
|
||||
generation: 0,
|
||||
term: 2,
|
||||
term_start_lsn: Lsn(3),
|
||||
begin_lsn: Lsn(1),
|
||||
end_lsn: Lsn(2),
|
||||
commit_lsn: Lsn(0),
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let mut append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
@@ -1144,6 +1513,7 @@ mod tests {
|
||||
};
|
||||
|
||||
let pem = ProposerElected {
|
||||
generation: 0,
|
||||
term: 2,
|
||||
start_streaming_at: Lsn(1),
|
||||
term_history: TermHistory(vec![
|
||||
@@ -1156,7 +1526,6 @@ mod tests {
|
||||
lsn: Lsn(3),
|
||||
},
|
||||
]),
|
||||
timeline_start_lsn: Lsn(1),
|
||||
};
|
||||
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
|
||||
.await
|
||||
@@ -1191,26 +1560,25 @@ mod tests {
|
||||
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
|
||||
|
||||
let pem = ProposerElected {
|
||||
generation: 0,
|
||||
term: 1,
|
||||
start_streaming_at: Lsn(1),
|
||||
term_history: TermHistory(vec![TermLsn {
|
||||
term: 1,
|
||||
lsn: Lsn(1),
|
||||
}]),
|
||||
timeline_start_lsn: Lsn(1),
|
||||
};
|
||||
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let ar_hdr = AppendRequestHeader {
|
||||
generation: 0,
|
||||
term: 1,
|
||||
term_start_lsn: Lsn(3),
|
||||
begin_lsn: Lsn(1),
|
||||
end_lsn: Lsn(2),
|
||||
commit_lsn: Lsn(0),
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
};
|
||||
let append_request = AppendRequest {
|
||||
h: ar_hdr.clone(),
|
||||
|
||||
@@ -73,10 +73,10 @@ impl Env {
|
||||
// Emulate an initial election.
|
||||
safekeeper
|
||||
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
generation: 0,
|
||||
term: 1,
|
||||
start_streaming_at: start_lsn,
|
||||
term_history: TermHistory(vec![(1, start_lsn).into()]),
|
||||
timeline_start_lsn: start_lsn,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -142,13 +142,12 @@ impl Env {
|
||||
|
||||
let req = AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
generation: 0,
|
||||
term: 1,
|
||||
term_start_lsn: start_lsn,
|
||||
begin_lsn: lsn,
|
||||
end_lsn: lsn + record.len() as u64,
|
||||
commit_lsn: lsn,
|
||||
truncate_lsn: Lsn(0),
|
||||
proposer_uuid: [0; 16],
|
||||
},
|
||||
wal_data: record,
|
||||
};
|
||||
|
||||
@@ -15,9 +15,7 @@ use desim::{
|
||||
};
|
||||
use http::Uri;
|
||||
use safekeeper::{
|
||||
safekeeper::{
|
||||
ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION,
|
||||
},
|
||||
safekeeper::{ProposerAcceptorMessage, SafeKeeper, SK_PROTO_VERSION_3, UNKNOWN_SERVER_VERSION},
|
||||
state::{TimelinePersistentState, TimelineState},
|
||||
timeline::TimelineError,
|
||||
wal_storage::Storage,
|
||||
@@ -287,7 +285,7 @@ impl ConnState {
|
||||
bail!("finished processing START_REPLICATION")
|
||||
}
|
||||
|
||||
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?;
|
||||
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTO_VERSION_3)?;
|
||||
debug!("got msg: {:?}", msg);
|
||||
self.process(msg, global)
|
||||
} else {
|
||||
@@ -403,7 +401,7 @@ impl ConnState {
|
||||
// TODO: if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
|
||||
|
||||
let mut buf = BytesMut::with_capacity(128);
|
||||
reply.serialize(&mut buf)?;
|
||||
reply.serialize(&mut buf, SK_PROTO_VERSION_3)?;
|
||||
|
||||
self.tcp.send(AnyMessage::Bytes(buf.into()));
|
||||
}
|
||||
|
||||
@@ -6,9 +6,14 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
|
||||
|
||||
def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
|
||||
def check_tenant(
|
||||
env: NeonEnv, pageserver_http: PageserverHttpClient, safekeeper_proto_version: int
|
||||
):
|
||||
tenant_id, timeline_id = env.create_tenant()
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
config_lines = [
|
||||
f"neon.safekeeper_proto_version = {safekeeper_proto_version}",
|
||||
]
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=config_lines)
|
||||
# we rely upon autocommit after each statement
|
||||
res_1 = endpoint.safe_psql_many(
|
||||
queries=[
|
||||
@@ -33,7 +38,14 @@ def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
|
||||
|
||||
|
||||
@pytest.mark.parametrize("num_timelines,num_safekeepers", [(3, 1)])
|
||||
def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_safekeepers: int):
|
||||
# Test both proto versions until we fully migrate.
|
||||
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
|
||||
def test_normal_work(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
num_timelines: int,
|
||||
num_safekeepers: int,
|
||||
safekeeper_proto_version: int,
|
||||
):
|
||||
"""
|
||||
Basic test:
|
||||
* create new tenant with a timeline
|
||||
@@ -52,4 +64,4 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
for _ in range(num_timelines):
|
||||
check_tenant(env, pageserver_http)
|
||||
check_tenant(env, pageserver_http, safekeeper_proto_version)
|
||||
|
||||
@@ -538,13 +538,16 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
|
||||
asyncio.run(run_recovery_uncommitted(env))
|
||||
|
||||
|
||||
async def run_wal_truncation(env: NeonEnv):
|
||||
async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
(sk1, sk2, sk3) = env.safekeepers
|
||||
|
||||
ep = env.endpoints.create_start("main")
|
||||
config_lines = [
|
||||
f"neon.safekeeper_proto_version = {safekeeper_proto_version}",
|
||||
]
|
||||
ep = env.endpoints.create_start("main", config_lines=config_lines)
|
||||
ep.safe_psql("create table t (key int, value text)")
|
||||
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
|
||||
|
||||
@@ -571,6 +574,7 @@ async def run_wal_truncation(env: NeonEnv):
|
||||
sk2.start()
|
||||
ep = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=config_lines,
|
||||
)
|
||||
ep.safe_psql("insert into t select generate_series(1, 200), 'payload'")
|
||||
|
||||
@@ -589,11 +593,13 @@ async def run_wal_truncation(env: NeonEnv):
|
||||
|
||||
# Simple deterministic test creating tail of WAL on safekeeper which is
|
||||
# truncated when majority without this sk elects walproposer starting earlier.
|
||||
def test_wal_truncation(neon_env_builder: NeonEnvBuilder):
|
||||
# Test both proto versions until we fully migrate.
|
||||
@pytest.mark.parametrize("safekeeper_proto_version", [2, 3])
|
||||
def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_version: int):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_wal_truncation(env))
|
||||
asyncio.run(run_wal_truncation(env, safekeeper_proto_version))
|
||||
|
||||
|
||||
async def run_segment_init_failure(env: NeonEnv):
|
||||
|
||||
Reference in New Issue
Block a user