diff --git a/libs/safekeeper_api/src/membership.rs b/libs/safekeeper_api/src/membership.rs index 8b14a4f290..2f20ec5f94 100644 --- a/libs/safekeeper_api/src/membership.rs +++ b/libs/safekeeper_api/src/membership.rs @@ -68,14 +68,12 @@ impl Display for SafekeeperId { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(transparent)] pub struct MemberSet { - pub members: Vec, + pub m: Vec, } impl MemberSet { pub fn empty() -> Self { - MemberSet { - members: Vec::new(), - } + MemberSet { m: Vec::new() } } pub fn new(members: Vec) -> anyhow::Result { @@ -83,11 +81,11 @@ impl MemberSet { if hs.len() != members.len() { bail!("duplicate safekeeper id in the set {:?}", members); } - Ok(MemberSet { members }) + Ok(MemberSet { m: members }) } pub fn contains(&self, sk: &SafekeeperId) -> bool { - self.members.iter().any(|m| m.id == sk.id) + self.m.iter().any(|m| m.id == sk.id) } pub fn add(&mut self, sk: SafekeeperId) -> anyhow::Result<()> { @@ -97,7 +95,7 @@ impl MemberSet { sk.id, self )); } - self.members.push(sk); + self.m.push(sk); Ok(()) } } @@ -105,11 +103,7 @@ impl MemberSet { impl Display for MemberSet { /// Display as a comma separated list of members. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let sks_str = self - .members - .iter() - .map(|m| m.to_string()) - .collect::>(); + let sks_str = self.m.iter().map(|sk| sk.to_string()).collect::>(); write!(f, "({})", sks_str.join(", ")) } } diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index ba75171db2..60b606c64a 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -215,6 +215,7 @@ impl Wrapper { syncSafekeepers: config.sync_safekeepers, systemId: 0, pgTimeline: 1, + proto_version: 3, callback_data, }; let c_config = Box::into_raw(Box::new(c_config)); @@ -276,6 +277,7 @@ mod tests { use core::panic; use std::{ cell::Cell, + ffi::CString, sync::{atomic::AtomicUsize, mpsc::sync_channel}, }; @@ -496,57 +498,64 @@ mod tests { // Messages definitions are at walproposer.h // xxx: it would be better to extract them from safekeeper crate and // use serialization/deserialization here. - let greeting_tag = (b'g' as u64).to_ne_bytes(); - let proto_version = 2_u32.to_ne_bytes(); - let pg_version: [u8; 4] = PG_VERSION_NUM.to_ne_bytes(); - let proposer_id = [0; 16]; - let system_id = 0_u64.to_ne_bytes(); - let tenant_id = ttid.tenant_id.as_arr(); - let timeline_id = ttid.timeline_id.as_arr(); - let pg_tli = 1_u32.to_ne_bytes(); - let wal_seg_size = 16777216_u32.to_ne_bytes(); + let greeting_tag = (b'g').to_be_bytes(); + let tenant_id = CString::new(ttid.tenant_id.to_string()) + .unwrap() + .into_bytes_with_nul(); + let timeline_id = CString::new(ttid.timeline_id.to_string()) + .unwrap() + .into_bytes_with_nul(); + let mconf_gen = 0_u32.to_be_bytes(); + let mconf_members_len = 0_u32.to_be_bytes(); + let mconf_members_new_len = 0_u32.to_be_bytes(); + let pg_version: [u8; 4] = PG_VERSION_NUM.to_be_bytes(); + let system_id = 0_u64.to_be_bytes(); + let wal_seg_size = 16777216_u32.to_be_bytes(); + let proposer_greeting = [ greeting_tag.as_slice(), - proto_version.as_slice(), - pg_version.as_slice(), - proposer_id.as_slice(), - system_id.as_slice(), tenant_id.as_slice(), timeline_id.as_slice(), - pg_tli.as_slice(), + mconf_gen.as_slice(), + mconf_members_len.as_slice(), + mconf_members_new_len.as_slice(), + pg_version.as_slice(), + system_id.as_slice(), wal_seg_size.as_slice(), ] .concat(); - let voting_tag = (b'v' as u64).to_ne_bytes(); - let vote_request_term = 3_u64.to_ne_bytes(); - let proposer_id = [0; 16]; + let voting_tag = (b'v').to_be_bytes(); + let vote_request_term = 3_u64.to_be_bytes(); let vote_request = [ voting_tag.as_slice(), + mconf_gen.as_slice(), vote_request_term.as_slice(), - proposer_id.as_slice(), ] .concat(); - let acceptor_greeting_term = 2_u64.to_ne_bytes(); - let acceptor_greeting_node_id = 1_u64.to_ne_bytes(); + let acceptor_greeting_term = 2_u64.to_be_bytes(); + let acceptor_greeting_node_id = 1_u64.to_be_bytes(); let acceptor_greeting = [ greeting_tag.as_slice(), - acceptor_greeting_term.as_slice(), acceptor_greeting_node_id.as_slice(), + mconf_gen.as_slice(), + mconf_members_len.as_slice(), + mconf_members_new_len.as_slice(), + acceptor_greeting_term.as_slice(), ] .concat(); - let vote_response_term = 3_u64.to_ne_bytes(); - let vote_given = 1_u64.to_ne_bytes(); - let flush_lsn = 0x539_u64.to_ne_bytes(); - let truncate_lsn = 0x539_u64.to_ne_bytes(); - let th_len = 1_u32.to_ne_bytes(); - let th_term = 2_u64.to_ne_bytes(); - let th_lsn = 0x539_u64.to_ne_bytes(); - let timeline_start_lsn = 0x539_u64.to_ne_bytes(); + let vote_response_term = 3_u64.to_be_bytes(); + let vote_given = 1_u8.to_be_bytes(); + let flush_lsn = 0x539_u64.to_be_bytes(); + let truncate_lsn = 0x539_u64.to_be_bytes(); + let th_len = 1_u32.to_be_bytes(); + let th_term = 2_u64.to_be_bytes(); + let th_lsn = 0x539_u64.to_be_bytes(); let vote_response = [ voting_tag.as_slice(), + mconf_gen.as_slice(), vote_response_term.as_slice(), vote_given.as_slice(), flush_lsn.as_slice(), @@ -554,7 +563,6 @@ mod tests { th_len.as_slice(), th_term.as_slice(), th_lsn.as_slice(), - timeline_start_lsn.as_slice(), ] .concat(); diff --git a/pgxn/neon/neon_utils.c b/pgxn/neon/neon_utils.c index 1fb4ed9522..1fad44bd58 100644 --- a/pgxn/neon/neon_utils.c +++ b/pgxn/neon/neon_utils.c @@ -51,6 +51,26 @@ HexDecodeString(uint8 *result, char *input, int nbytes) return true; } +/* -------------------------------- + * pq_getmsgint16 - get a binary 2-byte int from a message buffer + * -------------------------------- + */ +uint16 +pq_getmsgint16(StringInfo msg) +{ + return pq_getmsgint(msg, 2); +} + +/* -------------------------------- + * pq_getmsgint32 - get a binary 4-byte int from a message buffer + * -------------------------------- + */ +uint32 +pq_getmsgint32(StringInfo msg) +{ + return pq_getmsgint(msg, 4); +} + /* -------------------------------- * pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order * -------------------------------- diff --git a/pgxn/neon/neon_utils.h b/pgxn/neon/neon_utils.h index 89683714f1..7480ac28cc 100644 --- a/pgxn/neon/neon_utils.h +++ b/pgxn/neon/neon_utils.h @@ -8,6 +8,8 @@ #endif bool HexDecodeString(uint8 *result, char *input, int nbytes); +uint16 pq_getmsgint16(StringInfo msg); +uint32 pq_getmsgint32(StringInfo msg); uint32 pq_getmsgint32_le(StringInfo msg); uint64 pq_getmsgint64_le(StringInfo msg); void pq_sendint32_le(StringInfo buf, uint32 i); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7472fd6afc..d7604e30d7 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -70,6 +70,7 @@ static bool SendAppendRequests(Safekeeper *sk); static bool RecvAppendResponses(Safekeeper *sk); static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp); static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp); +static void PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version); static void HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk); static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size); static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg); @@ -81,6 +82,8 @@ static char *FormatSafekeeperState(Safekeeper *sk); static void AssertEventsOkForState(uint32 events, Safekeeper *sk); static char *FormatEvents(WalProposer *wp, uint32 events); static void UpdateDonorShmem(WalProposer *wp); +static char *MembershipConfigurationToString(MembershipConfiguration *mconf); +static void MembershipConfigurationFree(MembershipConfiguration *mconf); WalProposer * WalProposerCreate(WalProposerConfig *config, walproposer_api api) @@ -137,25 +140,21 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) } wp->quorum = wp->n_safekeepers / 2 + 1; + if (wp->config->proto_version != 2 && wp->config->proto_version != 3) + wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version); + wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version); + /* Fill the greeting package */ - wp->greetRequest.tag = 'g'; - wp->greetRequest.protocolVersion = SK_PROTOCOL_VERSION; - wp->greetRequest.pgVersion = PG_VERSION_NUM; - wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId)); - wp->greetRequest.systemId = wp->config->systemId; - if (!wp->config->neon_timeline) - wp_log(FATAL, "neon.timeline_id is not provided"); - if (*wp->config->neon_timeline != '\0' && - !HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16)) - wp_log(FATAL, "could not parse neon.timeline_id, %s", wp->config->neon_timeline); + wp->greetRequest.pam.tag = 'g'; if (!wp->config->neon_tenant) wp_log(FATAL, "neon.tenant_id is not provided"); - if (*wp->config->neon_tenant != '\0' && - !HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16)) - wp_log(FATAL, "could not parse neon.tenant_id, %s", wp->config->neon_tenant); - - wp->greetRequest.timeline = wp->config->pgTimeline; - wp->greetRequest.walSegSize = wp->config->wal_segment_size; + wp->greetRequest.tenant_id = wp->config->neon_tenant; + if (!wp->config->neon_timeline) + wp_log(FATAL, "neon.timeline_id is not provided"); + wp->greetRequest.timeline_id = wp->config->neon_timeline; + wp->greetRequest.pg_version = PG_VERSION_NUM; + wp->greetRequest.system_id = wp->config->systemId; + wp->greetRequest.wal_seg_size = wp->config->wal_segment_size; wp->api.init_event_set(wp); @@ -165,12 +164,14 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) void WalProposerFree(WalProposer *wp) { + MembershipConfigurationFree(&wp->mconf); for (int i = 0; i < wp->n_safekeepers; i++) { Safekeeper *sk = &wp->safekeeper[i]; Assert(sk->outbuf.data != NULL); pfree(sk->outbuf.data); + MembershipConfigurationFree(&sk->greetResponse.mconf); if (sk->voteResponse.termHistory.entries) pfree(sk->voteResponse.termHistory.entries); sk->voteResponse.termHistory.entries = NULL; @@ -308,6 +309,7 @@ ShutdownConnection(Safekeeper *sk) sk->state = SS_OFFLINE; sk->streamingAt = InvalidXLogRecPtr; + MembershipConfigurationFree(&sk->greetResponse.mconf); if (sk->voteResponse.termHistory.entries) pfree(sk->voteResponse.termHistory.entries); sk->voteResponse.termHistory.entries = NULL; @@ -598,11 +600,14 @@ static void SendStartWALPush(Safekeeper *sk) { WalProposer *wp = sk->wp; +#define CMD_LEN 512 + char cmd[CMD_LEN]; - if (!wp->api.conn_send_query(sk, "START_WAL_PUSH")) + snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version); + if (!wp->api.conn_send_query(sk, cmd)) { - wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to send '%s' query to safekeeper %s:%s: %s", + cmd, sk->host, sk->port, wp->api.conn_error_message(sk)); ShutdownConnection(sk); return; } @@ -658,23 +663,33 @@ RecvStartWALPushResult(Safekeeper *sk) /* * Start handshake: first of all send information about the - * safekeeper. After sending, we wait on SS_HANDSHAKE_RECV for + * walproposer. After sending, we wait on SS_HANDSHAKE_RECV for * a response to finish the handshake. */ static void SendProposerGreeting(Safekeeper *sk) { + WalProposer *wp = sk->wp; + char *mconf_toml = MembershipConfigurationToString(&wp->greetRequest.mconf); + + wp_log(LOG, "sending ProposerGreeting to safekeeper %s:%s with mconf = %s", sk->host, sk->port, mconf_toml); + pfree(mconf_toml); + + PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->greetRequest, + &sk->outbuf, wp->config->proto_version); + /* * On failure, logging & resetting the connection is handled. We just need * to handle the control flow. */ - BlockingWrite(sk, &sk->wp->greetRequest, sizeof(sk->wp->greetRequest), SS_HANDSHAKE_RECV); + BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV); } static void RecvAcceptorGreeting(Safekeeper *sk) { WalProposer *wp = sk->wp; + char *mconf_toml; /* * If our reading doesn't immediately succeed, any necessary error @@ -685,7 +700,10 @@ RecvAcceptorGreeting(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse)) return; - wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, term=" INT64_FORMAT, sk->host, sk->port, sk->greetResponse.term); + mconf_toml = MembershipConfigurationToString(&sk->greetResponse.mconf); + wp_log(LOG, "received AcceptorGreeting from safekeeper %s:%s, node_id = %lu, mconf = %s, term=" UINT64_FORMAT, + sk->host, sk->port, sk->greetResponse.nodeId, mconf_toml, sk->greetResponse.term); + pfree(mconf_toml); /* Protocol is all good, move to voting. */ sk->state = SS_VOTING; @@ -707,12 +725,9 @@ RecvAcceptorGreeting(Safekeeper *sk) wp->propTerm++; wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); - wp->voteRequest = (VoteRequest) - { - .tag = 'v', - .term = wp->propTerm - }; - memcpy(wp->voteRequest.proposerId.data, wp->greetRequest.proposerId.data, UUID_LEN); + wp->voteRequest.pam.tag = 'v'; + wp->voteRequest.generation = wp->mconf.generation; + wp->voteRequest.term = wp->propTerm; } } else if (sk->greetResponse.term > wp->propTerm) @@ -759,12 +774,14 @@ SendVoteRequest(Safekeeper *sk) { WalProposer *wp = sk->wp; - /* We have quorum for voting, send our vote request */ - wp_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term); - /* On failure, logging & resetting is handled */ - if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT)) - return; + PAMessageSerialize(wp, (ProposerAcceptorMessage *) &wp->voteRequest, + &sk->outbuf, wp->config->proto_version); + /* We have quorum for voting, send our vote request */ + wp_log(LOG, "requesting vote from %s:%s for generation %u term " UINT64_FORMAT, sk->host, sk->port, + wp->voteRequest.generation, wp->voteRequest.term); + /* On failure, logging & resetting is handled */ + BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_WAIT_VERDICT); /* If successful, wait for read-ready with SS_WAIT_VERDICT */ } @@ -778,11 +795,12 @@ RecvVoteResponse(Safekeeper *sk) return; wp_log(LOG, - "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X", - sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory), + "got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X", + sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term, + sk->voteResponse.voteGiven, + GetHighestTerm(&sk->voteResponse.termHistory), LSN_FORMAT_ARGS(sk->voteResponse.flushLsn), - LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn), - LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn)); + LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn)); /* * In case of acceptor rejecting our vote, bail out, but only if either it @@ -847,9 +865,9 @@ HandleElectedProposer(WalProposer *wp) * otherwise we must be sync-safekeepers and we have nothing to do then. * * Proceeding is not only pointless but harmful, because we'd give - * safekeepers term history starting with 0/0. These hacks will go away once - * we disable implicit timeline creation on safekeepers and create it with - * non zero LSN from the start. + * safekeepers term history starting with 0/0. These hacks will go away + * once we disable implicit timeline creation on safekeepers and create it + * with non zero LSN from the start. */ if (wp->propEpochStartLsn == InvalidXLogRecPtr) { @@ -942,7 +960,6 @@ DetermineEpochStartLsn(WalProposer *wp) wp->propEpochStartLsn = InvalidXLogRecPtr; wp->donorEpoch = 0; wp->truncateLsn = InvalidXLogRecPtr; - wp->timelineStartLsn = InvalidXLogRecPtr; for (int i = 0; i < wp->n_safekeepers; i++) { @@ -959,20 +976,6 @@ DetermineEpochStartLsn(WalProposer *wp) wp->donor = i; } wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); - - if (wp->safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr) - { - /* timelineStartLsn should be the same everywhere or unknown */ - if (wp->timelineStartLsn != InvalidXLogRecPtr && - wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn) - { - wp_log(WARNING, - "inconsistent timelineStartLsn: current %X/%X, received %X/%X", - LSN_FORMAT_ARGS(wp->timelineStartLsn), - LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn)); - } - wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn; - } } } @@ -995,22 +998,11 @@ DetermineEpochStartLsn(WalProposer *wp) if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers) { wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp); - if (wp->timelineStartLsn == InvalidXLogRecPtr) - { - wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp); - } wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); } pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn); - /* - * Safekeepers are setting truncateLsn after timelineStartLsn is known, so - * it should never be zero at this point, if we know timelineStartLsn. - * - * timelineStartLsn can be zero only on the first syncSafekeepers run. - */ - Assert((wp->truncateLsn != InvalidXLogRecPtr) || - (wp->config->syncSafekeepers && wp->truncateLsn == wp->timelineStartLsn)); + Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers); /* * We will be generating WAL since propEpochStartLsn, so we should set @@ -1053,10 +1045,11 @@ DetermineEpochStartLsn(WalProposer *wp) if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp)) { /* - * However, allow to proceed if last_log_term on the node which gave - * the highest vote (i.e. point where we are going to start writing) - * actually had been won by me; plain restart of walproposer not - * intervened by concurrent compute which wrote WAL is ok. + * However, allow to proceed if last_log_term on the node which + * gave the highest vote (i.e. point where we are going to start + * writing) actually had been won by me; plain restart of + * walproposer not intervened by concurrent compute which wrote + * WAL is ok. * * This avoids compute crash after manual term_bump. */ @@ -1126,14 +1119,8 @@ SendProposerElected(Safekeeper *sk) { /* safekeeper is empty or no common point, start from the beginning */ sk->startStreamingAt = wp->propTermHistory.entries[0].lsn; - wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u", - sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries); - - /* - * wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline - * is created manually (test_s3_wal_replay) - */ - Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr); + wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, termHistory.n_entries=%u", + sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), wp->propTermHistory.n_entries); } else { @@ -1158,29 +1145,19 @@ SendProposerElected(Safekeeper *sk) Assert(sk->startStreamingAt <= wp->availableLsn); - msg.tag = 'e'; + msg.apm.tag = 'e'; + msg.generation = wp->mconf.generation; msg.term = wp->propTerm; msg.startStreamingAt = sk->startStreamingAt; msg.termHistory = &wp->propTermHistory; - msg.timelineStartLsn = wp->timelineStartLsn; lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0; wp_log(LOG, - "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X", - sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn)); - - resetStringInfo(&sk->outbuf); - pq_sendint64_le(&sk->outbuf, msg.tag); - pq_sendint64_le(&sk->outbuf, msg.term); - pq_sendint64_le(&sk->outbuf, msg.startStreamingAt); - pq_sendint32_le(&sk->outbuf, msg.termHistory->n_entries); - for (int i = 0; i < msg.termHistory->n_entries; i++) - { - pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].term); - pq_sendint64_le(&sk->outbuf, msg.termHistory->entries[i].lsn); - } - pq_sendint64_le(&sk->outbuf, msg.timelineStartLsn); + "sending elected msg to node " UINT64_FORMAT " generation=%u term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s", + sk->greetResponse.nodeId, msg.generation, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), + lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port); + PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version); if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH)) return; @@ -1246,14 +1223,13 @@ static void PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn) { Assert(endLsn >= beginLsn); - req->tag = 'a'; + req->apm.tag = 'a'; + req->generation = wp->mconf.generation; req->term = wp->propTerm; - req->epochStartLsn = wp->propEpochStartLsn; req->beginLsn = beginLsn; req->endLsn = endLsn; req->commitLsn = wp->commitLsn; req->truncateLsn = wp->truncateLsn; - req->proposerId = wp->greetRequest.proposerId; } /* @@ -1354,7 +1330,8 @@ SendAppendRequests(Safekeeper *sk) resetStringInfo(&sk->outbuf); /* write AppendRequest header */ - appendBinaryStringInfo(&sk->outbuf, (char *) req, sizeof(AppendRequestHeader)); + PAMessageSerialize(wp, (ProposerAcceptorMessage *) req, &sk->outbuf, wp->config->proto_version); + /* prepare for reading WAL into the outbuf */ enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); sk->active_state = SS_ACTIVE_READ_WAL; } @@ -1367,14 +1344,17 @@ SendAppendRequests(Safekeeper *sk) req = &sk->appendRequest; req_len = req->endLsn - req->beginLsn; - /* We send zero sized AppenRequests as heartbeats; don't wal_read for these. */ + /* + * We send zero sized AppenRequests as heartbeats; don't wal_read + * for these. + */ if (req_len > 0) { switch (wp->api.wal_read(sk, - &sk->outbuf.data[sk->outbuf.len], - req->beginLsn, - req_len, - &errmsg)) + &sk->outbuf.data[sk->outbuf.len], + req->beginLsn, + req_len, + &errmsg)) { case NEON_WALREAD_SUCCESS: break; @@ -1382,7 +1362,7 @@ SendAppendRequests(Safekeeper *sk) return true; case NEON_WALREAD_ERROR: wp_log(WARNING, "WAL reading for node %s:%s failed: %s", - sk->host, sk->port, errmsg); + sk->host, sk->port, errmsg); ShutdownConnection(sk); return false; default: @@ -1470,11 +1450,11 @@ RecvAppendResponses(Safekeeper *sk) * Term has changed to higher one, probably another compute is * running. If this is the case we could PANIC as well because * likely it inserted some data and our basebackup is unsuitable - * anymore. However, we also bump term manually (term_bump endpoint) - * on safekeepers for migration purposes, in this case we do want - * compute to stay alive. So restart walproposer with FATAL instead - * of panicking; if basebackup is spoiled next election will notice - * this. + * anymore. However, we also bump term manually (term_bump + * endpoint) on safekeepers for migration purposes, in this case + * we do want compute to stay alive. So restart walproposer with + * FATAL instead of panicking; if basebackup is spoiled next + * election will notice this. */ wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us", sk->host, sk->port, @@ -1509,7 +1489,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese for (i = 0; i < nkeys; i++) { - const char *key = pq_getmsgstring(reply_message); + const char *key = pq_getmsgrawstring(reply_message); unsigned int value_len = pq_getmsgint(reply_message, sizeof(int32)); if (strcmp(key, "current_timeline_size") == 0) @@ -1750,6 +1730,208 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) } } +/* Serialize MembershipConfiguration into buf. */ +static void +MembershipConfigurationSerialize(MembershipConfiguration *mconf, StringInfo buf) +{ + uint32 i; + + pq_sendint32(buf, mconf->generation); + + pq_sendint32(buf, mconf->members.len); + for (i = 0; i < mconf->members.len; i++) + { + pq_sendint64(buf, mconf->members.m[i].node_id); + pq_send_ascii_string(buf, mconf->members.m[i].host); + pq_sendint16(buf, mconf->members.m[i].port); + } + + /* + * There is no special mark for absent new_members; zero members in + * invalid, so zero len means absent. + */ + pq_sendint32(buf, mconf->new_members.len); + for (i = 0; i < mconf->new_members.len; i++) + { + pq_sendint64(buf, mconf->new_members.m[i].node_id); + pq_send_ascii_string(buf, mconf->new_members.m[i].host); + pq_sendint16(buf, mconf->new_members.m[i].port); + } +} + +/* Serialize proposer -> acceptor message into buf using specified version */ +static void +PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version) +{ + /* both version are supported currently until we fully migrate to 3 */ + Assert(proto_version == 3 || proto_version == 2); + + resetStringInfo(buf); + + if (proto_version == 3) + { + /* + * v2 sends structs for some messages as is, so commonly send tag only + * for v3 + */ + pq_sendint8(buf, msg->tag); + + switch (msg->tag) + { + case 'g': + { + ProposerGreeting *m = (ProposerGreeting *) msg; + + pq_send_ascii_string(buf, m->tenant_id); + pq_send_ascii_string(buf, m->timeline_id); + MembershipConfigurationSerialize(&m->mconf, buf); + pq_sendint32(buf, m->pg_version); + pq_sendint64(buf, m->system_id); + pq_sendint32(buf, m->wal_seg_size); + break; + } + case 'v': + { + VoteRequest *m = (VoteRequest *) msg; + + pq_sendint32(buf, m->generation); + pq_sendint64(buf, m->term); + break; + + } + case 'e': + { + ProposerElected *m = (ProposerElected *) msg; + + pq_sendint32(buf, m->generation); + pq_sendint64(buf, m->term); + pq_sendint64(buf, m->startStreamingAt); + pq_sendint32(buf, m->termHistory->n_entries); + for (uint32 i = 0; i < m->termHistory->n_entries; i++) + { + pq_sendint64(buf, m->termHistory->entries[i].term); + pq_sendint64(buf, m->termHistory->entries[i].lsn); + } + break; + } + case 'a': + { + /* + * Note: this serializes only AppendRequestHeader, caller + * is expected to append WAL data later. + */ + AppendRequestHeader *m = (AppendRequestHeader *) msg; + + pq_sendint32(buf, m->generation); + pq_sendint64(buf, m->term); + pq_sendint64(buf, m->beginLsn); + pq_sendint64(buf, m->endLsn); + pq_sendint64(buf, m->commitLsn); + pq_sendint64(buf, m->truncateLsn); + break; + } + default: + wp_log(FATAL, "unexpected message type %c to serialize", msg->tag); + } + return; + } + + if (proto_version == 2) + { + switch (msg->tag) + { + case 'g': + { + /* v2 sent struct as is */ + ProposerGreeting *m = (ProposerGreeting *) msg; + ProposerGreetingV2 greetRequestV2; + + /* Fill also v2 struct. */ + greetRequestV2.tag = 'g'; + greetRequestV2.protocolVersion = proto_version; + greetRequestV2.pgVersion = m->pg_version; + + /* + * v3 removed this field because it's easier to pass as + * libq or START_WAL_PUSH options + */ + memset(&greetRequestV2.proposerId, 0, sizeof(greetRequestV2.proposerId)); + greetRequestV2.systemId = wp->config->systemId; + if (*m->timeline_id != '\0' && + !HexDecodeString(greetRequestV2.timeline_id, m->timeline_id, 16)) + wp_log(FATAL, "could not parse neon.timeline_id, %s", m->timeline_id); + if (*m->tenant_id != '\0' && + !HexDecodeString(greetRequestV2.tenant_id, m->tenant_id, 16)) + wp_log(FATAL, "could not parse neon.tenant_id, %s", m->tenant_id); + + greetRequestV2.timeline = wp->config->pgTimeline; + greetRequestV2.walSegSize = wp->config->wal_segment_size; + + pq_sendbytes(buf, (char *) &greetRequestV2, sizeof(greetRequestV2)); + break; + } + case 'v': + { + /* v2 sent struct as is */ + VoteRequest *m = (VoteRequest *) msg; + VoteRequestV2 voteRequestV2; + + voteRequestV2.tag = m->pam.tag; + voteRequestV2.term = m->term; + /* removed field */ + memset(&voteRequestV2.proposerId, 0, sizeof(voteRequestV2.proposerId)); + pq_sendbytes(buf, (char *) &voteRequestV2, sizeof(voteRequestV2)); + break; + } + case 'e': + { + ProposerElected *m = (ProposerElected *) msg; + + pq_sendint64_le(buf, m->apm.tag); + pq_sendint64_le(buf, m->term); + pq_sendint64_le(buf, m->startStreamingAt); + pq_sendint32_le(buf, m->termHistory->n_entries); + for (int i = 0; i < m->termHistory->n_entries; i++) + { + pq_sendint64_le(buf, m->termHistory->entries[i].term); + pq_sendint64_le(buf, m->termHistory->entries[i].lsn); + } + pq_sendint64_le(buf, 0); /* removed timeline_start_lsn */ + break; + } + case 'a': + + /* + * Note: this serializes only AppendRequestHeader, caller is + * expected to append WAL data later. + */ + { + /* v2 sent struct as is */ + AppendRequestHeader *m = (AppendRequestHeader *) msg; + AppendRequestHeaderV2 appendRequestHeaderV2; + + appendRequestHeaderV2.tag = m->apm.tag; + appendRequestHeaderV2.term = m->term; + appendRequestHeaderV2.epochStartLsn = 0; /* removed field */ + appendRequestHeaderV2.beginLsn = m->beginLsn; + appendRequestHeaderV2.endLsn = m->endLsn; + appendRequestHeaderV2.commitLsn = m->commitLsn; + appendRequestHeaderV2.truncateLsn = m->truncateLsn; + /* removed field */ + memset(&appendRequestHeaderV2.proposerId, 0, sizeof(appendRequestHeaderV2.proposerId)); + + pq_sendbytes(buf, (char *) &appendRequestHeaderV2, sizeof(appendRequestHeaderV2)); + break; + } + + default: + wp_log(FATAL, "unexpected message type %c to serialize", msg->tag); + } + return; + } + wp_log(FATAL, "unexpected proto_version %d", proto_version); +} + /* * Try to read CopyData message from i'th safekeeper, resetting connection on * failure. @@ -1779,6 +1961,37 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) return false; } +/* Deserialize membership configuration from buf to mconf. */ +static void +MembershipConfigurationDeserialize(MembershipConfiguration *mconf, StringInfo buf) +{ + uint32 i; + + mconf->generation = pq_getmsgint32(buf); + mconf->members.len = pq_getmsgint32(buf); + mconf->members.m = palloc0(sizeof(SafekeeperId) * mconf->members.len); + for (i = 0; i < mconf->members.len; i++) + { + const char *buf_host; + + mconf->members.m[i].node_id = pq_getmsgint64(buf); + buf_host = pq_getmsgrawstring(buf); + strlcpy(mconf->members.m[i].host, buf_host, sizeof(mconf->members.m[i].host)); + mconf->members.m[i].port = pq_getmsgint16(buf); + } + mconf->new_members.len = pq_getmsgint32(buf); + mconf->new_members.m = palloc0(sizeof(SafekeeperId) * mconf->new_members.len); + for (i = 0; i < mconf->new_members.len; i++) + { + const char *buf_host; + + mconf->new_members.m[i].node_id = pq_getmsgint64(buf); + buf_host = pq_getmsgrawstring(buf); + strlcpy(mconf->new_members.m[i].host, buf_host, sizeof(mconf->new_members.m[i].host)); + mconf->new_members.m[i].port = pq_getmsgint16(buf); + } +} + /* * Read next message with known type into provided struct, by reading a CopyData * block from the safekeeper's postgres connection, returning whether the read @@ -1787,6 +2000,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) * If the read needs more polling, we return 'false' and keep the state * unmodified, waiting until it becomes read-ready to try again. If it fully * failed, a warning is emitted and the connection is reset. + * + * Note: it pallocs if needed, i.e. for AcceptorGreeting and VoteResponse fields. */ static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) @@ -1795,82 +2010,154 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) char *buf; int buf_size; - uint64 tag; + uint8 tag; StringInfoData s; if (!(AsyncRead(sk, &buf, &buf_size))) return false; + sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp); /* parse it */ s.data = buf; s.len = buf_size; + s.maxlen = buf_size; s.cursor = 0; - tag = pq_getmsgint64_le(&s); - if (tag != anymsg->tag) + if (wp->config->proto_version == 3) { - wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, - sk->port, FormatSafekeeperState(sk)); - ResetConnection(sk); - return false; - } - sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp); - switch (tag) - { - case 'g': - { - AcceptorGreeting *msg = (AcceptorGreeting *) anymsg; - - msg->term = pq_getmsgint64_le(&s); - msg->nodeId = pq_getmsgint64_le(&s); - pq_getmsgend(&s); - return true; - } - - case 'v': - { - VoteResponse *msg = (VoteResponse *) anymsg; - - msg->term = pq_getmsgint64_le(&s); - msg->voteGiven = pq_getmsgint64_le(&s); - msg->flushLsn = pq_getmsgint64_le(&s); - msg->truncateLsn = pq_getmsgint64_le(&s); - msg->termHistory.n_entries = pq_getmsgint32_le(&s); - msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries); - for (int i = 0; i < msg->termHistory.n_entries; i++) + tag = pq_getmsgbyte(&s); + if (tag != anymsg->tag) + { + wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, + sk->port, FormatSafekeeperState(sk)); + ResetConnection(sk); + return false; + } + switch (tag) + { + case 'g': { - msg->termHistory.entries[i].term = pq_getmsgint64_le(&s); - msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s); + AcceptorGreeting *msg = (AcceptorGreeting *) anymsg; + + msg->nodeId = pq_getmsgint64(&s); + MembershipConfigurationDeserialize(&msg->mconf, &s); + msg->term = pq_getmsgint64(&s); + pq_getmsgend(&s); + return true; } - msg->timelineStartLsn = pq_getmsgint64_le(&s); - pq_getmsgend(&s); - return true; - } + case 'v': + { + VoteResponse *msg = (VoteResponse *) anymsg; - case 'a': - { - AppendResponse *msg = (AppendResponse *) anymsg; + msg->generation = pq_getmsgint32(&s); + msg->term = pq_getmsgint64(&s); + msg->voteGiven = pq_getmsgbyte(&s); + msg->flushLsn = pq_getmsgint64(&s); + msg->truncateLsn = pq_getmsgint64(&s); + msg->termHistory.n_entries = pq_getmsgint32(&s); + msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries); + for (uint32 i = 0; i < msg->termHistory.n_entries; i++) + { + msg->termHistory.entries[i].term = pq_getmsgint64(&s); + msg->termHistory.entries[i].lsn = pq_getmsgint64(&s); + } + pq_getmsgend(&s); + return true; + } + case 'a': + { + AppendResponse *msg = (AppendResponse *) anymsg; - msg->term = pq_getmsgint64_le(&s); - msg->flushLsn = pq_getmsgint64_le(&s); - msg->commitLsn = pq_getmsgint64_le(&s); - msg->hs.ts = pq_getmsgint64_le(&s); - msg->hs.xmin.value = pq_getmsgint64_le(&s); - msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s); - if (s.len > s.cursor) - ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback); - else - msg->ps_feedback.present = false; - pq_getmsgend(&s); - return true; - } - - default: - { - Assert(false); - return false; - } + msg->generation = pq_getmsgint32(&s); + msg->term = pq_getmsgint64(&s); + msg->flushLsn = pq_getmsgint64(&s); + msg->commitLsn = pq_getmsgint64(&s); + msg->hs.ts = pq_getmsgint64(&s); + msg->hs.xmin.value = pq_getmsgint64(&s); + msg->hs.catalog_xmin.value = pq_getmsgint64(&s); + if (s.len > s.cursor) + ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback); + else + msg->ps_feedback.present = false; + pq_getmsgend(&s); + return true; + } + default: + { + wp_log(FATAL, "unexpected message tag %c to read", (char) tag); + return false; + } + } } + else if (wp->config->proto_version == 2) + { + tag = pq_getmsgint64_le(&s); + if (tag != anymsg->tag) + { + wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, + sk->port, FormatSafekeeperState(sk)); + ResetConnection(sk); + return false; + } + switch (tag) + { + case 'g': + { + AcceptorGreeting *msg = (AcceptorGreeting *) anymsg; + + msg->term = pq_getmsgint64_le(&s); + msg->nodeId = pq_getmsgint64_le(&s); + pq_getmsgend(&s); + return true; + } + + case 'v': + { + VoteResponse *msg = (VoteResponse *) anymsg; + + msg->term = pq_getmsgint64_le(&s); + msg->voteGiven = pq_getmsgint64_le(&s); + msg->flushLsn = pq_getmsgint64_le(&s); + msg->truncateLsn = pq_getmsgint64_le(&s); + msg->termHistory.n_entries = pq_getmsgint32_le(&s); + msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries); + for (int i = 0; i < msg->termHistory.n_entries; i++) + { + msg->termHistory.entries[i].term = pq_getmsgint64_le(&s); + msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s); + } + pq_getmsgint64_le(&s); /* timelineStartLsn */ + pq_getmsgend(&s); + return true; + } + + case 'a': + { + AppendResponse *msg = (AppendResponse *) anymsg; + + msg->term = pq_getmsgint64_le(&s); + msg->flushLsn = pq_getmsgint64_le(&s); + msg->commitLsn = pq_getmsgint64_le(&s); + msg->hs.ts = pq_getmsgint64_le(&s); + msg->hs.xmin.value = pq_getmsgint64_le(&s); + msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s); + if (s.len > s.cursor) + ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback); + else + msg->ps_feedback.present = false; + pq_getmsgend(&s); + return true; + } + + default: + { + wp_log(FATAL, "unexpected message tag %c to read", (char) tag); + return false; + } + } + } + wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version); + return false; /* keep the compiler quiet */ } /* @@ -2246,3 +2533,45 @@ FormatEvents(WalProposer *wp, uint32 events) return (char *) &return_str; } + +/* Dump mconf as toml for observability / debugging. Result is palloc'ed. */ +static char * +MembershipConfigurationToString(MembershipConfiguration *mconf) +{ + StringInfoData s; + uint32 i; + + initStringInfo(&s); + appendStringInfo(&s, "{gen = %u", mconf->generation); + appendStringInfoString(&s, ", members = ["); + for (i = 0; i < mconf->members.len; i++) + { + if (i > 0) + appendStringInfoString(&s, ", "); + appendStringInfo(&s, "{node_id = %lu", mconf->members.m[i].node_id); + appendStringInfo(&s, ", host = %s", mconf->members.m[i].host); + appendStringInfo(&s, ", port = %u }", mconf->members.m[i].port); + } + appendStringInfo(&s, "], new_members = ["); + for (i = 0; i < mconf->new_members.len; i++) + { + if (i > 0) + appendStringInfoString(&s, ", "); + appendStringInfo(&s, "{node_id = %lu", mconf->new_members.m[i].node_id); + appendStringInfo(&s, ", host = %s", mconf->new_members.m[i].host); + appendStringInfo(&s, ", port = %u }", mconf->new_members.m[i].port); + } + appendStringInfoString(&s, "]}"); + return s.data; +} + +static void +MembershipConfigurationFree(MembershipConfiguration *mconf) +{ + if (mconf->members.m) + pfree(mconf->members.m); + mconf->members.m = NULL; + if (mconf->new_members.m) + pfree(mconf->new_members.m); + mconf->new_members.m = NULL; +} diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index d8c44f8182..eee55f924f 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -12,9 +12,6 @@ #include "neon_walreader.h" #include "pagestore_client.h" -#define SK_MAGIC 0xCafeCeefu -#define SK_PROTOCOL_VERSION 2 - #define MAX_SAFEKEEPERS 32 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL * message */ @@ -143,12 +140,71 @@ typedef uint64 term_t; /* neon storage node id */ typedef uint64 NNodeId; +/* + * Number uniquely identifying safekeeper membership configuration. + * This and following structs pair ones in membership.rs. + */ +typedef uint32 Generation; + +typedef struct SafekeeperId +{ + NNodeId node_id; + char host[MAXCONNINFO]; + uint16 port; +} SafekeeperId; + +/* Set of safekeepers. */ +typedef struct MemberSet +{ + uint32 len; /* number of members */ + SafekeeperId *m; /* ids themselves */ +} MemberSet; + +/* Timeline safekeeper membership configuration. */ +typedef struct MembershipConfiguration +{ + Generation generation; + MemberSet members; + /* Has 0 n_members in non joint conf. */ + MemberSet new_members; +} MembershipConfiguration; + /* * Proposer <-> Acceptor messaging. */ +typedef struct ProposerAcceptorMessage +{ + uint8 tag; +} ProposerAcceptorMessage; + /* Initial Proposer -> Acceptor message */ typedef struct ProposerGreeting +{ + ProposerAcceptorMessage pam; /* message tag */ + + /* + * tenant/timeline ids as C strings with standard hex notation for ease of + * printing. In principle they are not strictly needed as ttid is also + * passed as libpq options. + */ + char *tenant_id; + char *timeline_id; + /* Full conf is carried to allow safekeeper switch */ + MembershipConfiguration mconf; + + /* + * pg_version and wal_seg_size are used for timeline creation until we + * fully migrate to doing externally. systemId is only used as a sanity + * cross check. + */ + uint32 pg_version; /* in PG_VERSION_NUM format */ + uint64 system_id; /* Postgres system identifier. */ + uint32 wal_seg_size; +} ProposerGreeting; + +/* protocol v2 variant, kept while wp supports it */ +typedef struct ProposerGreetingV2 { uint64 tag; /* message tag */ uint32 protocolVersion; /* proposer-safekeeper protocol version */ @@ -159,32 +215,42 @@ typedef struct ProposerGreeting uint8 tenant_id[16]; TimeLineID timeline; uint32 walSegSize; -} ProposerGreeting; +} ProposerGreetingV2; typedef struct AcceptorProposerMessage { - uint64 tag; + uint8 tag; } AcceptorProposerMessage; /* - * Acceptor -> Proposer initial response: the highest term acceptor voted for. + * Acceptor -> Proposer initial response: the highest term acceptor voted for, + * its node id and configuration. */ typedef struct AcceptorGreeting { AcceptorProposerMessage apm; - term_t term; NNodeId nodeId; + MembershipConfiguration mconf; + term_t term; } AcceptorGreeting; /* * Proposer -> Acceptor vote request. */ typedef struct VoteRequest +{ + ProposerAcceptorMessage pam; /* message tag */ + Generation generation; /* membership conf generation */ + term_t term; +} VoteRequest; + +/* protocol v2 variant, kept while wp supports it */ +typedef struct VoteRequestV2 { uint64 tag; term_t term; pg_uuid_t proposerId; /* for monitoring/debugging */ -} VoteRequest; +} VoteRequestV2; /* Element of term switching chain. */ typedef struct TermSwitchEntry @@ -203,8 +269,15 @@ typedef struct TermHistory typedef struct VoteResponse { AcceptorProposerMessage apm; + + /* + * Membership conf generation. It's not strictly required because on + * mismatch safekeeper is expected to ERROR the connection, but let's + * sanity check it. + */ + Generation generation; term_t term; - uint64 voteGiven; + uint8 voteGiven; /* * Safekeeper flush_lsn (end of WAL) + history of term switches allow @@ -214,7 +287,6 @@ typedef struct VoteResponse XLogRecPtr truncateLsn; /* minimal LSN which may be needed for* * recovery of some safekeeper */ TermHistory termHistory; - XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */ } VoteResponse; /* @@ -223,20 +295,37 @@ typedef struct VoteResponse */ typedef struct ProposerElected { - uint64 tag; + AcceptorProposerMessage apm; + Generation generation; /* membership conf generation */ term_t term; /* proposer will send since this point */ XLogRecPtr startStreamingAt; /* history of term switches up to this proposer */ TermHistory *termHistory; - /* timeline globally starts at this LSN */ - XLogRecPtr timelineStartLsn; } ProposerElected; /* * Header of request with WAL message sent from proposer to safekeeper. */ typedef struct AppendRequestHeader +{ + AcceptorProposerMessage apm; + Generation generation; /* membership conf generation */ + term_t term; /* term of the proposer */ + XLogRecPtr beginLsn; /* start position of message in WAL */ + XLogRecPtr endLsn; /* end position of message in WAL */ + XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */ + + /* + * minimal LSN which may be needed for recovery of some safekeeper (end + * lsn + 1 of last chunk streamed to everyone) + */ + XLogRecPtr truncateLsn; + /* in the AppendRequest message, WAL data follows */ +} AppendRequestHeader; + +/* protocol v2 variant, kept while wp supports it */ +typedef struct AppendRequestHeaderV2 { uint64 tag; term_t term; /* term of the proposer */ @@ -256,7 +345,8 @@ typedef struct AppendRequestHeader */ XLogRecPtr truncateLsn; pg_uuid_t proposerId; /* for monitoring/debugging */ -} AppendRequestHeader; + /* in the AppendRequest message, WAL data follows */ +} AppendRequestHeaderV2; /* * Hot standby feedback received from replica @@ -309,6 +399,13 @@ typedef struct AppendResponse { AcceptorProposerMessage apm; + /* + * Membership conf generation. It's not strictly required because on + * mismatch safekeeper is expected to ERROR the connection, but let's + * sanity check it. + */ + Generation generation; + /* * Current term of the safekeeper; if it is higher than proposer's, the * compute is out of date. @@ -644,6 +741,8 @@ typedef struct WalProposerConfig /* Will be passed to safekeepers in greet request. */ TimeLineID pgTimeline; + int proto_version; + #ifdef WALPROPOSER_LIB void *callback_data; #endif @@ -656,11 +755,14 @@ typedef struct WalProposerConfig typedef struct WalProposer { WalProposerConfig *config; - int n_safekeepers; + /* Current walproposer membership configuration */ + MembershipConfiguration mconf; /* (n_safekeepers / 2) + 1 */ int quorum; + /* Number of occupied slots in safekeepers[] */ + int n_safekeepers; Safekeeper safekeeper[MAX_SAFEKEEPERS]; /* WAL has been generated up to this point */ @@ -670,6 +772,7 @@ typedef struct WalProposer XLogRecPtr commitLsn; ProposerGreeting greetRequest; + ProposerGreetingV2 greetRequestV2; /* Vote request for safekeeper */ VoteRequest voteRequest; diff --git a/pgxn/neon/walproposer_compat.c b/pgxn/neon/walproposer_compat.c index 35d984c52e..a986160224 100644 --- a/pgxn/neon/walproposer_compat.c +++ b/pgxn/neon/walproposer_compat.c @@ -117,14 +117,13 @@ pq_getmsgbytes(StringInfo msg, int datalen) } /* -------------------------------- - * pq_getmsgstring - get a null-terminated text string (with conversion) + * pq_getmsgrawstring - get a null-terminated text string - NO conversion * - * May return a pointer directly into the message buffer, or a pointer - * to a palloc'd conversion result. + * Returns a pointer directly into the message buffer. * -------------------------------- */ const char * -pq_getmsgstring(StringInfo msg) +pq_getmsgrawstring(StringInfo msg) { char *str; int slen; @@ -155,6 +154,45 @@ pq_getmsgend(StringInfo msg) ExceptionalCondition("invalid msg format", __FILE__, __LINE__); } +/* -------------------------------- + * pq_sendbytes - append raw data to a StringInfo buffer + * -------------------------------- + */ +void +pq_sendbytes(StringInfo buf, const void *data, int datalen) +{ + /* use variant that maintains a trailing null-byte, out of caution */ + appendBinaryStringInfo(buf, data, datalen); +} + +/* -------------------------------- + * pq_send_ascii_string - append a null-terminated text string (without conversion) + * + * This function intentionally bypasses encoding conversion, instead just + * silently replacing any non-7-bit-ASCII characters with question marks. + * It is used only when we are having trouble sending an error message to + * the client with normal localization and encoding conversion. The caller + * should already have taken measures to ensure the string is just ASCII; + * the extra work here is just to make certain we don't send a badly encoded + * string to the client (which might or might not be robust about that). + * + * NB: passed text string must be null-terminated, and so is the data + * sent to the frontend. + * -------------------------------- + */ +void +pq_send_ascii_string(StringInfo buf, const char *str) +{ + while (*str) + { + char ch = *str++; + + if (IS_HIGHBIT_SET(ch)) + ch = '?'; + appendStringInfoCharMacro(buf, ch); + } + appendStringInfoChar(buf, '\0'); +} /* * Produce a C-string representation of a TimestampTz. diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 86444084ff..b21184de57 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -59,9 +59,11 @@ #define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot" +/* GUCs */ char *wal_acceptors_list = ""; int wal_acceptor_reconnect_timeout = 1000; int wal_acceptor_connection_timeout = 10000; +int safekeeper_proto_version = 2; /* Set to true in the walproposer bgw. */ static bool am_walproposer; @@ -126,6 +128,7 @@ init_walprop_config(bool syncSafekeepers) else walprop_config.systemId = 0; walprop_config.pgTimeline = walprop_pg_get_timeline_id(); + walprop_config.proto_version = safekeeper_proto_version; } /* @@ -219,25 +222,37 @@ nwp_register_gucs(void) PGC_SIGHUP, GUC_UNIT_MS, NULL, NULL, NULL); + + DefineCustomIntVariable( + "neon.safekeeper_proto_version", + "Version of compute <-> safekeeper protocol.", + "Used while migrating from 2 to 3.", + &safekeeper_proto_version, + 2, 0, INT_MAX, + PGC_POSTMASTER, + 0, + NULL, NULL, NULL); } static int split_safekeepers_list(char *safekeepers_list, char *safekeepers[]) { - int n_safekeepers = 0; - char *curr_sk = safekeepers_list; + int n_safekeepers = 0; + char *curr_sk = safekeepers_list; for (char *coma = safekeepers_list; coma != NULL && *coma != '\0'; curr_sk = coma) { - if (++n_safekeepers >= MAX_SAFEKEEPERS) { + if (++n_safekeepers >= MAX_SAFEKEEPERS) + { wpg_log(FATAL, "too many safekeepers"); } coma = strchr(coma, ','); - safekeepers[n_safekeepers-1] = curr_sk; + safekeepers[n_safekeepers - 1] = curr_sk; - if (coma != NULL) { + if (coma != NULL) + { *coma++ = '\0'; } } @@ -252,10 +267,10 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[]) static bool safekeepers_cmp(char *old, char *new) { - char *safekeepers_old[MAX_SAFEKEEPERS]; - char *safekeepers_new[MAX_SAFEKEEPERS]; - int len_old = 0; - int len_new = 0; + char *safekeepers_old[MAX_SAFEKEEPERS]; + char *safekeepers_new[MAX_SAFEKEEPERS]; + int len_old = 0; + int len_new = 0; len_old = split_safekeepers_list(old, safekeepers_old); len_new = split_safekeepers_list(new, safekeepers_new); @@ -292,7 +307,8 @@ assign_neon_safekeepers(const char *newval, void *extra) if (!am_walproposer) return; - if (!newval) { + if (!newval) + { /* should never happen */ wpg_log(FATAL, "neon.safekeepers is empty"); } @@ -301,11 +317,11 @@ assign_neon_safekeepers(const char *newval, void *extra) newval_copy = pstrdup(newval); oldval = pstrdup(wal_acceptors_list); - /* + /* * TODO: restarting through FATAL is stupid and introduces 1s delay before - * next bgw start. We should refactor walproposer to allow graceful exit and - * thus remove this delay. - * XXX: If you change anything here, sync with test_safekeepers_reconfigure_reorder. + * next bgw start. We should refactor walproposer to allow graceful exit + * and thus remove this delay. XXX: If you change anything here, sync with + * test_safekeepers_reconfigure_reorder. */ if (!safekeepers_cmp(oldval, newval_copy)) { @@ -454,7 +470,8 @@ backpressure_throttling_impl(void) memcpy(new_status, old_status, len); snprintf(new_status + len, 64, "backpressure throttling: lag %lu", lag); set_ps_display(new_status); - new_status[len] = '\0'; /* truncate off " backpressure ..." to later reset the ps */ + new_status[len] = '\0'; /* truncate off " backpressure ..." to later + * reset the ps */ elog(DEBUG2, "backpressure throttling: lag %lu", lag); start = GetCurrentTimestamp(); @@ -621,7 +638,7 @@ walprop_pg_start_streaming(WalProposer *wp, XLogRecPtr startpos) wpg_log(LOG, "WAL proposer starts streaming at %X/%X", LSN_FORMAT_ARGS(startpos)); cmd.slotname = WAL_PROPOSER_SLOT_NAME; - cmd.timeline = wp->greetRequest.timeline; + cmd.timeline = wp->config->pgTimeline; cmd.startpoint = startpos; StartProposerReplication(wp, &cmd); } @@ -1963,10 +1980,11 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk) FullTransactionId xmin = hsFeedback.xmin; FullTransactionId catalog_xmin = hsFeedback.catalog_xmin; FullTransactionId next_xid = ReadNextFullTransactionId(); + /* - * Page server is updating nextXid in checkpoint each 1024 transactions, - * so feedback xmin can be actually larger then nextXid and - * function TransactionIdInRecentPast return false in this case, + * Page server is updating nextXid in checkpoint each 1024 + * transactions, so feedback xmin can be actually larger then nextXid + * and function TransactionIdInRecentPast return false in this case, * preventing update of slot's xmin. */ if (FullTransactionIdPrecedes(next_xid, xmin)) diff --git a/safekeeper/benches/receive_wal.rs b/safekeeper/benches/receive_wal.rs index 19c6662e74..1c0ae66f01 100644 --- a/safekeeper/benches/receive_wal.rs +++ b/safekeeper/benches/receive_wal.rs @@ -13,6 +13,7 @@ use safekeeper::safekeeper::{ AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, }; use safekeeper::test_utils::Env; +use safekeeper_api::membership::SafekeeperGeneration as Generation; use tokio::io::AsyncWriteExt as _; use utils::id::{NodeId, TenantTimelineId}; use utils::lsn::Lsn; @@ -88,13 +89,12 @@ fn bench_process_msg(c: &mut Criterion) { let (lsn, record) = walgen.next().expect("endless WAL"); ProposerAcceptorMessage::AppendRequest(AppendRequest { h: AppendRequestHeader { + generation: Generation::new(0), term: 1, - term_start_lsn: Lsn(0), begin_lsn: lsn, end_lsn: lsn + record.len() as u64, commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }, wal_data: record, }) @@ -160,13 +160,12 @@ fn bench_wal_acceptor(c: &mut Criterion) { .take(n) .map(|(lsn, record)| AppendRequest { h: AppendRequestHeader { + generation: Generation::new(0), term: 1, - term_start_lsn: Lsn(0), begin_lsn: lsn, end_lsn: lsn + record.len() as u64, commit_lsn: Lsn(0), truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }, wal_data: record, }) @@ -262,13 +261,12 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) { runtime.block_on(async { let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest { h: AppendRequestHeader { + generation: Generation::new(0), term: 1, - term_start_lsn: Lsn(0), begin_lsn: lsn, end_lsn: lsn + record.len() as u64, commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }, wal_data: record, }); diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 19e17c4a75..8d7c1109ad 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -8,7 +8,7 @@ use anyhow::Context; use postgres_backend::QueryError; -use safekeeper_api::membership::Configuration; +use safekeeper_api::membership::{Configuration, INVALID_GENERATION}; use safekeeper_api::{ServerInfo, Term}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -133,10 +133,10 @@ async fn send_proposer_elected( let history = TermHistory(history_entries); let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected { + generation: INVALID_GENERATION, term, start_streaming_at: lsn, term_history: history, - timeline_start_lsn: lsn, }); tli.process_msg(&proposer_elected_request).await?; @@ -170,13 +170,12 @@ pub async fn append_logical_message( let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest { h: AppendRequestHeader { + generation: INVALID_GENERATION, term: msg.term, - term_start_lsn: begin_lsn, begin_lsn, end_lsn, commit_lsn, truncate_lsn: msg.truncate_lsn, - proposer_uuid: [0u8; 16], }, wal_data, }); diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index cb42f6f414..a94e6930e1 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -281,7 +281,7 @@ impl SafekeeperPostgresHandler { tokio::select! { // todo: add read|write .context to these errors r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r, - r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r, + r = network_write(pgb, reply_rx, pageserver_feedback_rx, proto_version) => r, _ = timeline_cancel.cancelled() => { return Err(CopyStreamHandlerEnd::Cancelled); } @@ -342,8 +342,8 @@ impl NetworkReader<'_, IO> { let tli = match next_msg { ProposerAcceptorMessage::Greeting(ref greeting) => { info!( - "start handshake with walproposer {} sysid {} timeline {}", - self.peer_addr, greeting.system_id, greeting.tli, + "start handshake with walproposer {} sysid {}", + self.peer_addr, greeting.system_id, ); let server_info = ServerInfo { pg_version: greeting.pg_version, @@ -459,6 +459,7 @@ async fn network_write( pgb_writer: &mut PostgresBackend, mut reply_rx: Receiver, mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver, + proto_version: u32, ) -> Result<(), CopyStreamHandlerEnd> { let mut buf = BytesMut::with_capacity(128); @@ -496,7 +497,7 @@ async fn network_write( }; buf.clear(); - msg.serialize(&mut buf)?; + msg.serialize(&mut buf, proto_version)?; pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?; } } diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 35394eb6ed..3e9080ebbe 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -7,6 +7,7 @@ use std::{fmt, pin::pin}; use anyhow::{bail, Context}; use futures::StreamExt; use postgres_protocol::message::backend::ReplicationMessage; +use safekeeper_api::membership::INVALID_GENERATION; use safekeeper_api::models::{PeerInfo, TimelineStatus}; use safekeeper_api::Term; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -267,7 +268,10 @@ async fn recover( ); // Now understand our term history. - let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: donor.term }); + let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { + generation: INVALID_GENERATION, + term: donor.term, + }); let vote_response = match tli .process_msg(&vote_request) .await @@ -302,10 +306,10 @@ async fn recover( // truncate WAL locally let pe = ProposerAcceptorMessage::Elected(ProposerElected { + generation: INVALID_GENERATION, term: donor.term, start_streaming_at: last_common_point.lsn, term_history: donor_th, - timeline_start_lsn: Lsn::INVALID, }); // Successful ProposerElected handling always returns None. If term changed, // we'll find out that during the streaming. Note: it is expected to get @@ -437,13 +441,12 @@ async fn network_io( match msg { ReplicationMessage::XLogData(xlog_data) => { let ar_hdr = AppendRequestHeader { + generation: INVALID_GENERATION, term: donor.term, - term_start_lsn: Lsn::INVALID, // unused begin_lsn: Lsn(xlog_data.wal_start()), end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64, commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it truncate_lsn: Lsn::INVALID, // do not attempt to advance - proposer_uuid: [0; 16], }; let ar = AppendRequest { h: ar_hdr, diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index f816f8459a..f429cafed2 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -5,6 +5,11 @@ use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::{TimeLineID, MAX_SEND_SIZE}; +use safekeeper_api::membership; +use safekeeper_api::membership::MemberSet; +use safekeeper_api::membership::SafekeeperGeneration as Generation; +use safekeeper_api::membership::SafekeeperId; +use safekeeper_api::membership::INVALID_GENERATION; use safekeeper_api::models::HotStandbyFeedback; use safekeeper_api::Term; use serde::{Deserialize, Serialize}; @@ -12,6 +17,7 @@ use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; +use std::str::FromStr; use storage_broker::proto::SafekeeperTimelineInfo; use tracing::*; @@ -29,7 +35,8 @@ use utils::{ lsn::Lsn, }; -pub const SK_PROTOCOL_VERSION: u32 = 2; +pub const SK_PROTO_VERSION_2: u32 = 2; +pub const SK_PROTO_VERSION_3: u32 = 3; pub const UNKNOWN_SERVER_VERSION: u32 = 0; #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] @@ -56,8 +63,28 @@ impl TermHistory { TermHistory(Vec::new()) } - // Parse TermHistory as n_entries followed by TermLsn pairs + // Parse TermHistory as n_entries followed by TermLsn pairs in network order. pub fn from_bytes(bytes: &mut Bytes) -> Result { + let n_entries = bytes + .get_u32_f() + .with_context(|| "TermHistory misses len")?; + let mut res = Vec::with_capacity(n_entries as usize); + for i in 0..n_entries { + let term = bytes + .get_u64_f() + .with_context(|| format!("TermHistory pos {} misses term", i))?; + let lsn = bytes + .get_u64_f() + .with_context(|| format!("TermHistory pos {} misses lsn", i))? + .into(); + res.push(TermLsn { term, lsn }) + } + Ok(TermHistory(res)) + } + + // Parse TermHistory as n_entries followed by TermLsn pairs in LE order. + // TODO remove once v2 protocol is fully dropped. + pub fn from_bytes_le(bytes: &mut Bytes) -> Result { if bytes.remaining() < 4 { bail!("TermHistory misses len"); } @@ -197,6 +224,18 @@ impl AcceptorState { /// Initial Proposer -> Acceptor message #[derive(Debug, Deserialize)] pub struct ProposerGreeting { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub mconf: membership::Configuration, + /// Postgres server version + pub pg_version: u32, + pub system_id: SystemId, + pub wal_seg_size: u32, +} + +/// V2 of the message; exists as a struct because we (de)serialized it as is. +#[derive(Debug, Deserialize)] +pub struct ProposerGreetingV2 { /// proposer-acceptor protocol version pub protocol_version: u32, /// Postgres server version @@ -213,27 +252,35 @@ pub struct ProposerGreeting { /// (acceptor voted for). #[derive(Debug, Serialize)] pub struct AcceptorGreeting { - term: u64, node_id: NodeId, + mconf: membership::Configuration, + term: u64, } /// Vote request sent from proposer to safekeepers -#[derive(Debug, Deserialize)] +#[derive(Debug)] pub struct VoteRequest { + pub generation: Generation, + pub term: Term, +} + +/// V2 of the message; exists as a struct because we (de)serialized it as is. +#[derive(Debug, Deserialize)] +pub struct VoteRequestV2 { pub term: Term, } /// Vote itself, sent from safekeeper to proposer #[derive(Debug, Serialize)] pub struct VoteResponse { + generation: Generation, // membership conf generation pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date. - vote_given: u64, // fixme u64 due to padding + vote_given: bool, // Safekeeper flush_lsn (end of WAL) + history of term switches allow // proposer to choose the most advanced one. pub flush_lsn: Lsn, truncate_lsn: Lsn, pub term_history: TermHistory, - timeline_start_lsn: Lsn, } /* @@ -242,10 +289,10 @@ pub struct VoteResponse { */ #[derive(Debug)] pub struct ProposerElected { + pub generation: Generation, // membership conf generation pub term: Term, pub start_streaming_at: Lsn, pub term_history: TermHistory, - pub timeline_start_lsn: Lsn, } /// Request with WAL message sent from proposer to safekeeper. Along the way it @@ -257,6 +304,22 @@ pub struct AppendRequest { } #[derive(Debug, Clone, Deserialize)] pub struct AppendRequestHeader { + pub generation: Generation, // membership conf generation + // safekeeper's current term; if it is higher than proposer's, the compute is out of date. + pub term: Term, + /// start position of message in WAL + pub begin_lsn: Lsn, + /// end position of message in WAL + pub end_lsn: Lsn, + /// LSN committed by quorum of safekeepers + pub commit_lsn: Lsn, + /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper + pub truncate_lsn: Lsn, +} + +/// V2 of the message; exists as a struct because we (de)serialized it as is. +#[derive(Debug, Clone, Deserialize)] +pub struct AppendRequestHeaderV2 { // safekeeper's current term; if it is higher than proposer's, the compute is out of date. pub term: Term, // TODO: remove this field from the protocol, it in unused -- LSN of term @@ -277,6 +340,9 @@ pub struct AppendRequestHeader { /// Report safekeeper state to proposer #[derive(Debug, Serialize, Clone)] pub struct AppendResponse { + // Membership conf generation. Not strictly required because on mismatch + // connection is reset, but let's sanity check it. + generation: Generation, // Current term of the safekeeper; if it is higher than proposer's, the // compute is out of date. pub term: Term, @@ -293,8 +359,9 @@ pub struct AppendResponse { } impl AppendResponse { - fn term_only(term: Term) -> AppendResponse { + fn term_only(generation: Generation, term: Term) -> AppendResponse { AppendResponse { + generation, term, flush_lsn: Lsn(0), commit_lsn: Lsn(0), @@ -315,72 +382,322 @@ pub enum ProposerAcceptorMessage { FlushWAL, } -impl ProposerAcceptorMessage { - /// Parse proposer message. - pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result { - if proto_version != SK_PROTOCOL_VERSION { - bail!( - "incompatible protocol version {}, expected {}", - proto_version, - SK_PROTOCOL_VERSION - ); +/// Augment Bytes with fallible get_uN where N is number of bytes methods. +/// All reads are in network (big endian) order. +trait BytesF { + fn get_u8_f(&mut self) -> Result; + fn get_u16_f(&mut self) -> Result; + fn get_u32_f(&mut self) -> Result; + fn get_u64_f(&mut self) -> Result; +} + +impl BytesF for Bytes { + fn get_u8_f(&mut self) -> Result { + if self.is_empty() { + bail!("no bytes left, expected 1"); } - // xxx using Reader is inefficient but easy to work with bincode - let mut stream = msg_bytes.reader(); - // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is - let tag = stream.read_u64::()? as u8 as char; - match tag { - 'g' => { - let msg = ProposerGreeting::des_from(&mut stream)?; - Ok(ProposerAcceptorMessage::Greeting(msg)) - } - 'v' => { - let msg = VoteRequest::des_from(&mut stream)?; - Ok(ProposerAcceptorMessage::VoteRequest(msg)) - } - 'e' => { - let mut msg_bytes = stream.into_inner(); - if msg_bytes.remaining() < 16 { - bail!("ProposerElected message is not complete"); - } - let term = msg_bytes.get_u64_le(); - let start_streaming_at = msg_bytes.get_u64_le().into(); - let term_history = TermHistory::from_bytes(&mut msg_bytes)?; - if msg_bytes.remaining() < 8 { - bail!("ProposerElected message is not complete"); - } - let timeline_start_lsn = msg_bytes.get_u64_le().into(); - let msg = ProposerElected { - term, - start_streaming_at, - timeline_start_lsn, - term_history, + Ok(self.get_u8()) + } + fn get_u16_f(&mut self) -> Result { + if self.remaining() < 2 { + bail!("no bytes left, expected 2"); + } + Ok(self.get_u16()) + } + fn get_u32_f(&mut self) -> Result { + if self.remaining() < 4 { + bail!("only {} bytes left, expected 4", self.remaining()); + } + Ok(self.get_u32()) + } + fn get_u64_f(&mut self) -> Result { + if self.remaining() < 8 { + bail!("only {} bytes left, expected 8", self.remaining()); + } + Ok(self.get_u64()) + } +} + +impl ProposerAcceptorMessage { + /// Read cstring from Bytes. + fn get_cstr(buf: &mut Bytes) -> Result { + let pos = buf + .iter() + .position(|x| *x == 0) + .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?; + let result = buf.split_to(pos); + buf.advance(1); // drop the null terminator + match std::str::from_utf8(&result) { + Ok(s) => Ok(s.to_string()), + Err(e) => bail!("invalid utf8 in cstring: {}", e), + } + } + + /// Read membership::Configuration from Bytes. + fn get_mconf(buf: &mut Bytes) -> Result { + let generation = Generation::new(buf.get_u32_f().with_context(|| "reading generation")?); + let members_len = buf.get_u32_f().with_context(|| "reading members_len")?; + // Main member set must have at least someone in valid configuration. + // Empty conf is allowed until we fully migrate. + if generation != INVALID_GENERATION && members_len == 0 { + bail!("empty members_len"); + } + let mut members = MemberSet::empty(); + for i in 0..members_len { + let id = buf + .get_u64_f() + .with_context(|| format!("reading member {} node_id", i))?; + let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?; + let pg_port = buf + .get_u16_f() + .with_context(|| format!("reading member {} port", i))?; + let sk = SafekeeperId { + id: NodeId(id), + host, + pg_port, + }; + members.add(sk)?; + } + let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?; + // Non joint conf. + if new_members_len == 0 { + Ok(membership::Configuration { + generation, + members, + new_members: None, + }) + } else { + let mut new_members = MemberSet::empty(); + for i in 0..new_members_len { + let id = buf + .get_u64_f() + .with_context(|| format!("reading new member {} node_id", i))?; + let host = Self::get_cstr(buf) + .with_context(|| format!("reading new member {} host", i))?; + let pg_port = buf + .get_u16_f() + .with_context(|| format!("reading new member {} port", i))?; + let sk = SafekeeperId { + id: NodeId(id), + host, + pg_port, }; - Ok(ProposerAcceptorMessage::Elected(msg)) + new_members.add(sk)?; } - 'a' => { - // read header followed by wal data - let hdr = AppendRequestHeader::des_from(&mut stream)?; - let rec_size = hdr - .end_lsn - .checked_sub(hdr.begin_lsn) - .context("begin_lsn > end_lsn in AppendRequest")? - .0 as usize; - if rec_size > MAX_SEND_SIZE { - bail!( - "AppendRequest is longer than MAX_SEND_SIZE ({})", - MAX_SEND_SIZE - ); + Ok(membership::Configuration { + generation, + members, + new_members: Some(new_members), + }) + } + } + + /// Parse proposer message. + pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result { + if proto_version == SK_PROTO_VERSION_3 { + if msg_bytes.is_empty() { + bail!("ProposerAcceptorMessage is not complete: missing tag"); + } + let tag = msg_bytes.get_u8_f().with_context(|| { + "ProposerAcceptorMessage is not complete: missing tag".to_string() + })? as char; + match tag { + 'g' => { + let tenant_id_str = + Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?; + let tenant_id = TenantId::from_str(&tenant_id_str)?; + let timeline_id_str = + Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?; + let timeline_id = TimelineId::from_str(&timeline_id_str)?; + let mconf = Self::get_mconf(&mut msg_bytes)?; + let pg_version = msg_bytes + .get_u32_f() + .with_context(|| "reading pg_version")?; + let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?; + let wal_seg_size = msg_bytes + .get_u32_f() + .with_context(|| "reading wal_seg_size")?; + let g = ProposerGreeting { + tenant_id, + timeline_id, + mconf, + pg_version, + system_id, + wal_seg_size, + }; + Ok(ProposerAcceptorMessage::Greeting(g)) } + 'v' => { + let generation = Generation::new( + msg_bytes + .get_u32_f() + .with_context(|| "reading generation")?, + ); + let term = msg_bytes.get_u64_f().with_context(|| "reading term")?; + let v = VoteRequest { generation, term }; + Ok(ProposerAcceptorMessage::VoteRequest(v)) + } + 'e' => { + let generation = Generation::new( + msg_bytes + .get_u32_f() + .with_context(|| "reading generation")?, + ); + let term = msg_bytes.get_u64_f().with_context(|| "reading term")?; + let start_streaming_at: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading start_streaming_at")? + .into(); + let term_history = TermHistory::from_bytes(&mut msg_bytes)?; + let msg = ProposerElected { + generation, + term, + start_streaming_at, + term_history, + }; + Ok(ProposerAcceptorMessage::Elected(msg)) + } + 'a' => { + let generation = Generation::new( + msg_bytes + .get_u32_f() + .with_context(|| "reading generation")?, + ); + let term = msg_bytes.get_u64_f().with_context(|| "reading term")?; + let begin_lsn: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading begin_lsn")? + .into(); + let end_lsn: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading end_lsn")? + .into(); + let commit_lsn: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading commit_lsn")? + .into(); + let truncate_lsn: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading truncate_lsn")? + .into(); + let hdr = AppendRequestHeader { + generation, + term, + begin_lsn, + end_lsn, + commit_lsn, + truncate_lsn, + }; + let rec_size = hdr + .end_lsn + .checked_sub(hdr.begin_lsn) + .context("begin_lsn > end_lsn in AppendRequest")? + .0 as usize; + if rec_size > MAX_SEND_SIZE { + bail!( + "AppendRequest is longer than MAX_SEND_SIZE ({})", + MAX_SEND_SIZE + ); + } + if msg_bytes.remaining() < rec_size { + bail!( + "reading WAL: only {} bytes left, wanted {}", + msg_bytes.remaining(), + rec_size + ); + } + let wal_data = msg_bytes.copy_to_bytes(rec_size); + let msg = AppendRequest { h: hdr, wal_data }; - let mut wal_data_vec: Vec = vec![0; rec_size]; - stream.read_exact(&mut wal_data_vec)?; - let wal_data = Bytes::from(wal_data_vec); - let msg = AppendRequest { h: hdr, wal_data }; - - Ok(ProposerAcceptorMessage::AppendRequest(msg)) + Ok(ProposerAcceptorMessage::AppendRequest(msg)) + } + _ => bail!("unknown proposer-acceptor message tag: {}", tag), } - _ => bail!("unknown proposer-acceptor message tag: {}", tag), + } else if proto_version == SK_PROTO_VERSION_2 { + // xxx using Reader is inefficient but easy to work with bincode + let mut stream = msg_bytes.reader(); + // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is + let tag = stream.read_u64::()? as u8 as char; + match tag { + 'g' => { + let msgv2 = ProposerGreetingV2::des_from(&mut stream)?; + let g = ProposerGreeting { + tenant_id: msgv2.tenant_id, + timeline_id: msgv2.timeline_id, + mconf: membership::Configuration { + generation: INVALID_GENERATION, + members: MemberSet::empty(), + new_members: None, + }, + pg_version: msgv2.pg_version, + system_id: msgv2.system_id, + wal_seg_size: msgv2.wal_seg_size, + }; + Ok(ProposerAcceptorMessage::Greeting(g)) + } + 'v' => { + let msg = VoteRequestV2::des_from(&mut stream)?; + let v = VoteRequest { + generation: INVALID_GENERATION, + term: msg.term, + }; + Ok(ProposerAcceptorMessage::VoteRequest(v)) + } + 'e' => { + let mut msg_bytes = stream.into_inner(); + if msg_bytes.remaining() < 16 { + bail!("ProposerElected message is not complete"); + } + let term = msg_bytes.get_u64_le(); + let start_streaming_at = msg_bytes.get_u64_le().into(); + let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?; + if msg_bytes.remaining() < 8 { + bail!("ProposerElected message is not complete"); + } + let _timeline_start_lsn = msg_bytes.get_u64_le(); + let msg = ProposerElected { + generation: INVALID_GENERATION, + term, + start_streaming_at, + term_history, + }; + Ok(ProposerAcceptorMessage::Elected(msg)) + } + 'a' => { + // read header followed by wal data + let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?; + let hdr = AppendRequestHeader { + generation: INVALID_GENERATION, + term: hdrv2.term, + begin_lsn: hdrv2.begin_lsn, + end_lsn: hdrv2.end_lsn, + commit_lsn: hdrv2.commit_lsn, + truncate_lsn: hdrv2.truncate_lsn, + }; + let rec_size = hdr + .end_lsn + .checked_sub(hdr.begin_lsn) + .context("begin_lsn > end_lsn in AppendRequest")? + .0 as usize; + if rec_size > MAX_SEND_SIZE { + bail!( + "AppendRequest is longer than MAX_SEND_SIZE ({})", + MAX_SEND_SIZE + ); + } + + let mut wal_data_vec: Vec = vec![0; rec_size]; + stream.read_exact(&mut wal_data_vec)?; + let wal_data = Bytes::from(wal_data_vec); + + let msg = AppendRequest { h: hdr, wal_data }; + + Ok(ProposerAcceptorMessage::AppendRequest(msg)) + } + _ => bail!("unknown proposer-acceptor message tag: {}", tag), + } + } else { + bail!("unsupported protocol version {}", proto_version); } } @@ -394,36 +711,21 @@ impl ProposerAcceptorMessage { // We explicitly list all fields, to draw attention here when new fields are added. let mut size = BASE_SIZE; size += match self { - Self::Greeting(ProposerGreeting { - protocol_version: _, - pg_version: _, - proposer_id: _, - system_id: _, - timeline_id: _, - tenant_id: _, - tli: _, - wal_seg_size: _, - }) => 0, + Self::Greeting(_) => 0, - Self::VoteRequest(VoteRequest { term: _ }) => 0, + Self::VoteRequest(_) => 0, - Self::Elected(ProposerElected { - term: _, - start_streaming_at: _, - term_history: _, - timeline_start_lsn: _, - }) => 0, + Self::Elected(_) => 0, Self::AppendRequest(AppendRequest { h: AppendRequestHeader { + generation: _, term: _, - term_start_lsn: _, begin_lsn: _, end_lsn: _, commit_lsn: _, truncate_lsn: _, - proposer_uuid: _, }, wal_data, }) => wal_data.len(), @@ -431,13 +733,12 @@ impl ProposerAcceptorMessage { Self::NoFlushAppendRequest(AppendRequest { h: AppendRequestHeader { + generation: _, term: _, - term_start_lsn: _, begin_lsn: _, end_lsn: _, commit_lsn: _, truncate_lsn: _, - proposer_uuid: _, }, wal_data, }) => wal_data.len(), @@ -458,45 +759,118 @@ pub enum AcceptorProposerMessage { } impl AcceptorProposerMessage { - /// Serialize acceptor -> proposer message. - pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> { - match self { - AcceptorProposerMessage::Greeting(msg) => { - buf.put_u64_le('g' as u64); - buf.put_u64_le(msg.term); - buf.put_u64_le(msg.node_id.0); - } - AcceptorProposerMessage::VoteResponse(msg) => { - buf.put_u64_le('v' as u64); - buf.put_u64_le(msg.term); - buf.put_u64_le(msg.vote_given); - buf.put_u64_le(msg.flush_lsn.into()); - buf.put_u64_le(msg.truncate_lsn.into()); - buf.put_u32_le(msg.term_history.0.len() as u32); - for e in &msg.term_history.0 { - buf.put_u64_le(e.term); - buf.put_u64_le(e.lsn.into()); - } - buf.put_u64_le(msg.timeline_start_lsn.into()); - } - AcceptorProposerMessage::AppendResponse(msg) => { - buf.put_u64_le('a' as u64); - buf.put_u64_le(msg.term); - buf.put_u64_le(msg.flush_lsn.into()); - buf.put_u64_le(msg.commit_lsn.into()); - buf.put_i64_le(msg.hs_feedback.ts); - buf.put_u64_le(msg.hs_feedback.xmin); - buf.put_u64_le(msg.hs_feedback.catalog_xmin); + fn put_cstr(buf: &mut BytesMut, s: &str) { + buf.put_slice(s.as_bytes()); + buf.put_u8(0); // null terminator + } - // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback - // if it is not present. - if let Some(ref msg) = msg.pageserver_feedback { - msg.serialize(buf); - } - } + /// Serialize membership::Configuration into buf. + fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) { + buf.put_u32(mconf.generation.into_inner()); + buf.put_u32(mconf.members.m.len() as u32); + for sk in &mconf.members.m { + buf.put_u64(sk.id.0); + Self::put_cstr(buf, &sk.host); + buf.put_u16(sk.pg_port); } + if let Some(ref new_members) = mconf.new_members { + buf.put_u32(new_members.m.len() as u32); + for sk in &new_members.m { + buf.put_u64(sk.id.0); + Self::put_cstr(buf, &sk.host); + buf.put_u16(sk.pg_port); + } + } else { + buf.put_u32(0); + } + } - Ok(()) + /// Serialize acceptor -> proposer message. + pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> { + if proto_version == SK_PROTO_VERSION_3 { + match self { + AcceptorProposerMessage::Greeting(msg) => { + buf.put_u8(b'g'); + buf.put_u64(msg.node_id.0); + Self::serialize_mconf(buf, &msg.mconf); + buf.put_u64(msg.term) + } + AcceptorProposerMessage::VoteResponse(msg) => { + buf.put_u8(b'v'); + buf.put_u32(msg.generation.into_inner()); + buf.put_u64(msg.term); + buf.put_u8(msg.vote_given as u8); + buf.put_u64(msg.flush_lsn.into()); + buf.put_u64(msg.truncate_lsn.into()); + buf.put_u32(msg.term_history.0.len() as u32); + for e in &msg.term_history.0 { + buf.put_u64(e.term); + buf.put_u64(e.lsn.into()); + } + } + AcceptorProposerMessage::AppendResponse(msg) => { + buf.put_u8(b'a'); + buf.put_u32(msg.generation.into_inner()); + buf.put_u64(msg.term); + buf.put_u64(msg.flush_lsn.into()); + buf.put_u64(msg.commit_lsn.into()); + buf.put_i64(msg.hs_feedback.ts); + buf.put_u64(msg.hs_feedback.xmin); + buf.put_u64(msg.hs_feedback.catalog_xmin); + + // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback + // if it is not present. + if let Some(ref msg) = msg.pageserver_feedback { + msg.serialize(buf); + } + } + } + Ok(()) + // TODO remove 3 after converting all msgs + } else if proto_version == SK_PROTO_VERSION_2 { + match self { + AcceptorProposerMessage::Greeting(msg) => { + buf.put_u64_le('g' as u64); + // v2 didn't have mconf and fields were reordered + buf.put_u64_le(msg.term); + buf.put_u64_le(msg.node_id.0); + } + AcceptorProposerMessage::VoteResponse(msg) => { + // v2 didn't have generation, had u64 vote_given and timeline_start_lsn + buf.put_u64_le('v' as u64); + buf.put_u64_le(msg.term); + buf.put_u64_le(msg.vote_given as u64); + buf.put_u64_le(msg.flush_lsn.into()); + buf.put_u64_le(msg.truncate_lsn.into()); + buf.put_u32_le(msg.term_history.0.len() as u32); + for e in &msg.term_history.0 { + buf.put_u64_le(e.term); + buf.put_u64_le(e.lsn.into()); + } + // removed timeline_start_lsn + buf.put_u64_le(0); + } + AcceptorProposerMessage::AppendResponse(msg) => { + // v2 didn't have generation + buf.put_u64_le('a' as u64); + buf.put_u64_le(msg.term); + buf.put_u64_le(msg.flush_lsn.into()); + buf.put_u64_le(msg.commit_lsn.into()); + buf.put_i64_le(msg.hs_feedback.ts); + buf.put_u64_le(msg.hs_feedback.xmin); + buf.put_u64_le(msg.hs_feedback.catalog_xmin); + + // AsyncReadMessage in walproposer.c will not try to decode pageserver_feedback + // if it is not present. + if let Some(ref msg) = msg.pageserver_feedback { + msg.serialize(buf); + } + } + } + Ok(()) + } else { + bail!("unsupported protocol version {}", proto_version); + } } } @@ -593,14 +967,6 @@ where &mut self, msg: &ProposerGreeting, ) -> Result> { - // Check protocol compatibility - if msg.protocol_version != SK_PROTOCOL_VERSION { - bail!( - "incompatible protocol version {}, expected {}", - msg.protocol_version, - SK_PROTOCOL_VERSION - ); - } /* Postgres major version mismatch is treated as fatal error * because safekeepers parse WAL headers and the format * may change between versions. @@ -655,15 +1021,16 @@ where self.state.finish_change(&state).await?; } - info!( - "processed greeting from walproposer {}, sending term {:?}", - msg.proposer_id.map(|b| format!("{:X}", b)).join(""), - self.state.acceptor_state.term - ); - Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting { - term: self.state.acceptor_state.term, + let apg = AcceptorGreeting { node_id: self.node_id, - }))) + mconf: self.state.mconf.clone(), + term: self.state.acceptor_state.term, + }; + info!( + "processed greeting {:?} from walproposer, sending {:?}", + msg, apg + ); + Ok(Some(AcceptorProposerMessage::Greeting(apg))) } /// Give vote for the given term, if we haven't done that previously. @@ -684,12 +1051,12 @@ where self.wal_store.flush_wal().await?; // initialize with refusal let mut resp = VoteResponse { + generation: self.state.mconf.generation, term: self.state.acceptor_state.term, - vote_given: false as u64, + vote_given: false, flush_lsn: self.flush_lsn(), truncate_lsn: self.state.inmem.peer_horizon_lsn, term_history: self.get_term_history(), - timeline_start_lsn: self.state.timeline_start_lsn, }; if self.state.acceptor_state.term < msg.term { let mut state = self.state.start_change(); @@ -698,15 +1065,16 @@ where self.state.finish_change(&state).await?; resp.term = self.state.acceptor_state.term; - resp.vote_given = true as u64; + resp.vote_given = true; } - info!("processed VoteRequest for term {}: {:?}", msg.term, &resp); + info!("processed {:?}: sending {:?}", msg, &resp); Ok(Some(AcceptorProposerMessage::VoteResponse(resp))) } /// Form AppendResponse from current state. fn append_response(&self) -> AppendResponse { let ar = AppendResponse { + generation: self.state.mconf.generation, term: self.state.acceptor_state.term, flush_lsn: self.flush_lsn(), commit_lsn: self.state.commit_lsn, @@ -805,18 +1173,22 @@ where // Here we learn initial LSN for the first time, set fields // interested in that. - if state.timeline_start_lsn == Lsn(0) { - // Remember point where WAL begins globally. - state.timeline_start_lsn = msg.timeline_start_lsn; - info!( - "setting timeline_start_lsn to {:?}", - state.timeline_start_lsn - ); + if let Some(start_lsn) = msg.term_history.0.first() { + if state.timeline_start_lsn == Lsn(0) { + // Remember point where WAL begins globally. In the future it + // will be intialized immediately on timeline creation. + state.timeline_start_lsn = start_lsn.lsn; + info!( + "setting timeline_start_lsn to {:?}", + state.timeline_start_lsn + ); + } } + if state.peer_horizon_lsn == Lsn(0) { // Update peer_horizon_lsn as soon as we know where timeline starts. // It means that peer_horizon_lsn cannot be zero after we know timeline_start_lsn. - state.peer_horizon_lsn = msg.timeline_start_lsn; + state.peer_horizon_lsn = state.timeline_start_lsn; } if state.local_start_lsn == Lsn(0) { state.local_start_lsn = msg.start_streaming_at; @@ -896,7 +1268,10 @@ where // If our term is higher, immediately refuse the message. if self.state.acceptor_state.term > msg.h.term { - let resp = AppendResponse::term_only(self.state.acceptor_state.term); + let resp = AppendResponse::term_only( + self.state.mconf.generation, + self.state.acceptor_state.term, + ); return Ok(Some(AcceptorProposerMessage::AppendResponse(resp))); } @@ -924,10 +1299,8 @@ where ); } - // Now we know that we are in the same term as the proposer, - // processing the message. - - self.state.inmem.proposer_uuid = msg.h.proposer_uuid; + // Now we know that we are in the same term as the proposer, process the + // message. // do the job if !msg.wal_data.is_empty() { @@ -1097,10 +1470,13 @@ mod tests { let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); // check voting for 1 is ok - let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); + let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { + generation: Generation::new(0), + term: 1, + }); let mut vote_resp = sk.process_msg(&vote_request).await; match vote_resp.unwrap() { - Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given), r => panic!("unexpected response: {:?}", r), } @@ -1115,7 +1491,7 @@ mod tests { // and ensure voting second time for 1 is not ok vote_resp = sk.process_msg(&vote_request).await; match vote_resp.unwrap() { - Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given), r => panic!("unexpected response: {:?}", r), } } @@ -1130,13 +1506,12 @@ mod tests { let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); let mut ar_hdr = AppendRequestHeader { + generation: Generation::new(0), term: 2, - term_start_lsn: Lsn(3), begin_lsn: Lsn(1), end_lsn: Lsn(2), commit_lsn: Lsn(0), truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }; let mut append_request = AppendRequest { h: ar_hdr.clone(), @@ -1144,6 +1519,7 @@ mod tests { }; let pem = ProposerElected { + generation: Generation::new(0), term: 2, start_streaming_at: Lsn(1), term_history: TermHistory(vec![ @@ -1156,7 +1532,6 @@ mod tests { lsn: Lsn(3), }, ]), - timeline_start_lsn: Lsn(1), }; sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) .await @@ -1191,26 +1566,25 @@ mod tests { let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); let pem = ProposerElected { + generation: Generation::new(0), term: 1, start_streaming_at: Lsn(1), term_history: TermHistory(vec![TermLsn { term: 1, lsn: Lsn(1), }]), - timeline_start_lsn: Lsn(1), }; sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) .await .unwrap(); let ar_hdr = AppendRequestHeader { + generation: Generation::new(0), term: 1, - term_start_lsn: Lsn(3), begin_lsn: Lsn(1), end_lsn: Lsn(2), commit_lsn: Lsn(0), truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }; let append_request = AppendRequest { h: ar_hdr.clone(), diff --git a/safekeeper/src/test_utils.rs b/safekeeper/src/test_utils.rs index 79ceddd366..32af4537d3 100644 --- a/safekeeper/src/test_utils.rs +++ b/safekeeper/src/test_utils.rs @@ -14,6 +14,7 @@ use crate::wal_backup::remote_timeline_path; use crate::{control_file, receive_wal, wal_storage, SafeKeeperConf}; use camino_tempfile::Utf8TempDir; use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator}; +use safekeeper_api::membership::SafekeeperGeneration as Generation; use tokio::fs::create_dir_all; use utils::id::{NodeId, TenantTimelineId}; use utils::lsn::Lsn; @@ -73,10 +74,10 @@ impl Env { // Emulate an initial election. safekeeper .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected { + generation: Generation::new(0), term: 1, start_streaming_at: start_lsn, term_history: TermHistory(vec![(1, start_lsn).into()]), - timeline_start_lsn: start_lsn, })) .await?; @@ -146,13 +147,12 @@ impl Env { let req = AppendRequest { h: AppendRequestHeader { + generation: Generation::new(0), term: 1, - term_start_lsn: start_lsn, begin_lsn: lsn, end_lsn: lsn + record.len() as u64, commit_lsn: lsn, truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }, wal_data: record, }; diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 0023a4d22a..b9dfabe0d7 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -15,9 +15,7 @@ use desim::{ }; use http::Uri; use safekeeper::{ - safekeeper::{ - ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION, - }, + safekeeper::{ProposerAcceptorMessage, SafeKeeper, SK_PROTO_VERSION_3, UNKNOWN_SERVER_VERSION}, state::{TimelinePersistentState, TimelineState}, timeline::TimelineError, wal_storage::Storage, @@ -287,7 +285,7 @@ impl ConnState { bail!("finished processing START_REPLICATION") } - let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?; + let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTO_VERSION_3)?; debug!("got msg: {:?}", msg); self.process(msg, global) } else { @@ -403,7 +401,7 @@ impl ConnState { // TODO: if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn let mut buf = BytesMut::with_capacity(128); - reply.serialize(&mut buf)?; + reply.serialize(&mut buf, SK_PROTO_VERSION_3)?; self.tcp.send(AnyMessage::Bytes(buf.into())); } diff --git a/test_runner/regress/test_normal_work.py b/test_runner/regress/test_normal_work.py index ae2d171058..c8458b963e 100644 --- a/test_runner/regress/test_normal_work.py +++ b/test_runner/regress/test_normal_work.py @@ -6,9 +6,14 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.pageserver.http import PageserverHttpClient -def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient): +def check_tenant( + env: NeonEnv, pageserver_http: PageserverHttpClient, safekeeper_proto_version: int +): tenant_id, timeline_id = env.create_tenant() - endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + config_lines = [ + f"neon.safekeeper_proto_version = {safekeeper_proto_version}", + ] + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=config_lines) # we rely upon autocommit after each statement res_1 = endpoint.safe_psql_many( queries=[ @@ -33,7 +38,14 @@ def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient): @pytest.mark.parametrize("num_timelines,num_safekeepers", [(3, 1)]) -def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_safekeepers: int): +# Test both proto versions until we fully migrate. +@pytest.mark.parametrize("safekeeper_proto_version", [2, 3]) +def test_normal_work( + neon_env_builder: NeonEnvBuilder, + num_timelines: int, + num_safekeepers: int, + safekeeper_proto_version: int, +): """ Basic test: * create new tenant with a timeline @@ -52,4 +64,4 @@ def test_normal_work(neon_env_builder: NeonEnvBuilder, num_timelines: int, num_s pageserver_http = env.pageserver.http_client() for _ in range(num_timelines): - check_tenant(env, pageserver_http) + check_tenant(env, pageserver_http, safekeeper_proto_version) diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 936c774657..56539a0a08 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -539,13 +539,16 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder): asyncio.run(run_recovery_uncommitted(env)) -async def run_wal_truncation(env: NeonEnv): +async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int): tenant_id = env.initial_tenant timeline_id = env.initial_timeline (sk1, sk2, sk3) = env.safekeepers - ep = env.endpoints.create_start("main") + config_lines = [ + f"neon.safekeeper_proto_version = {safekeeper_proto_version}", + ] + ep = env.endpoints.create_start("main", config_lines=config_lines) ep.safe_psql("create table t (key int, value text)") ep.safe_psql("insert into t select generate_series(1, 100), 'payload'") @@ -572,6 +575,7 @@ async def run_wal_truncation(env: NeonEnv): sk2.start() ep = env.endpoints.create_start( "main", + config_lines=config_lines, ) ep.safe_psql("insert into t select generate_series(1, 200), 'payload'") @@ -590,11 +594,13 @@ async def run_wal_truncation(env: NeonEnv): # Simple deterministic test creating tail of WAL on safekeeper which is # truncated when majority without this sk elects walproposer starting earlier. -def test_wal_truncation(neon_env_builder: NeonEnvBuilder): +# Test both proto versions until we fully migrate. +@pytest.mark.parametrize("safekeeper_proto_version", [2, 3]) +def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_version: int): neon_env_builder.num_safekeepers = 3 env = neon_env_builder.init_start() - asyncio.run(run_wal_truncation(env)) + asyncio.run(run_wal_truncation(env, safekeeper_proto_version)) async def run_segment_init_failure(env: NeonEnv):