diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 284ebe55e2..e3261083e6 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1145,6 +1145,7 @@ SendProposerElected(Safekeeper *sk) Assert(sk->startStreamingAt <= wp->availableLsn); msg.apm.tag = 'e'; + msg.generation = wp->mconf.generation; msg.term = wp->propTerm; msg.startStreamingAt = sk->startStreamingAt; msg.termHistory = &wp->propTermHistory; @@ -1221,9 +1222,9 @@ static void PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn) { Assert(endLsn >= beginLsn); - req->tag = 'a'; + req->apm.tag = 'a'; + req->generation = wp->mconf.generation; req->term = wp->propTerm; - req->epochStartLsn = wp->propEpochStartLsn; req->beginLsn = beginLsn; req->endLsn = endLsn; req->commitLsn = wp->commitLsn; @@ -1766,8 +1767,7 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf resetStringInfo(buf); - /* removeme tag check after converting all msgs */ - if (proto_version == 3 && (msg->tag == 'g' || msg->tag == 'v' || msg->tag == 'e')) + if (proto_version == 3) { /* * v2 sends structs for some messages as is, so commonly send tag only @@ -1813,15 +1813,29 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf } break; } + case 'a': + { + /* + * Note: this serializes only AppendRequestHeader, caller is + * expected to append WAL data later. + */ + AppendRequestHeader *m = (AppendRequestHeader *) msg; + + pq_sendint32(buf, m->generation); + pq_sendint64(buf, m->term); + pq_sendint64(buf, m->beginLsn); + pq_sendint64(buf, m->endLsn); + pq_sendint64(buf, m->commitLsn); + pq_sendint64(buf, m->truncateLsn); + break; + } default: wp_log(FATAL, "unexpected message type %c to serialize", msg->tag); } return; } - if (proto_version == 2 || proto_version == 3) - /* TODO remove proto_version == 3 after converting all msgs */ - /* removeme tag check after converting all msgs */ + if (proto_version == 2) { switch (msg->tag) { @@ -1885,14 +1899,26 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf break; } case 'a': - /* * Note: this serializes only AppendRequestHeader, caller is * expected to append WAL data later. */ { /* v2 sent struct as is */ - pq_sendbytes(buf, (char *) msg, sizeof(AppendRequestHeader)); + AppendRequestHeader *m = (AppendRequestHeader *) msg; + AppendRequestHeaderV2 appendRequestHeaderV2; + + appendRequestHeaderV2.tag = m->apm.tag; + appendRequestHeaderV2.term = m->term; + appendRequestHeaderV2.epochStartLsn = 0; /* removed field */ + appendRequestHeaderV2.beginLsn = m->beginLsn; + appendRequestHeaderV2.endLsn = m->endLsn; + appendRequestHeaderV2.commitLsn = m->commitLsn; + appendRequestHeaderV2.truncateLsn = m->truncateLsn; + /* removed field */ + memset(&appendRequestHeaderV2.proposerId, 0, sizeof(appendRequestHeaderV2.proposerId)); + + pq_sendbytes(buf, (char *) &appendRequestHeaderV2, sizeof(appendRequestHeaderV2)); break; } diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 82a11fb99d..6dac23a4f3 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -308,6 +308,24 @@ typedef struct ProposerElected * Header of request with WAL message sent from proposer to safekeeper. */ typedef struct AppendRequestHeader +{ + AcceptorProposerMessage apm; + Generation generation; /* membership conf generation */ + term_t term; /* term of the proposer */ + XLogRecPtr beginLsn; /* start position of message in WAL */ + XLogRecPtr endLsn; /* end position of message in WAL */ + XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */ + + /* + * minimal LSN which may be needed for recovery of some safekeeper (end + * lsn + 1 of last chunk streamed to everyone) + */ + XLogRecPtr truncateLsn; + /* in the AppendRequest message, WAL data follows */ +} AppendRequestHeader; + +/* protocol v2 variant, kept while wp supports it */ +typedef struct AppendRequestHeaderV2 { uint64 tag; term_t term; /* term of the proposer */ @@ -328,7 +346,7 @@ typedef struct AppendRequestHeader XLogRecPtr truncateLsn; pg_uuid_t proposerId; /* for monitoring/debugging */ /* in the AppendRequest message, WAL data follows */ -} AppendRequestHeader; +} AppendRequestHeaderV2; /* * Hot standby feedback received from replica diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 9c788d9684..8d7c1109ad 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -170,13 +170,12 @@ pub async fn append_logical_message( let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest { h: AppendRequestHeader { + generation: INVALID_GENERATION, term: msg.term, - term_start_lsn: begin_lsn, begin_lsn, end_lsn, commit_lsn, truncate_lsn: msg.truncate_lsn, - proposer_uuid: [0u8; 16], }, wal_data, }); diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index b3a14b2dd0..d025e64986 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -438,13 +438,12 @@ async fn network_io( match msg { ReplicationMessage::XLogData(xlog_data) => { let ar_hdr = AppendRequestHeader { + generation: INVALID_GENERATION, term: donor.term, - term_start_lsn: Lsn::INVALID, // unused begin_lsn: Lsn(xlog_data.wal_start()), end_lsn: Lsn(xlog_data.wal_start()) + xlog_data.data().len() as u64, commit_lsn: Lsn::INVALID, // do not attempt to advance, peer communication anyway does it truncate_lsn: Lsn::INVALID, // do not attempt to advance - proposer_uuid: [0; 16], }; let ar = AppendRequest { h: ar_hdr, diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 7dc286944b..dc75b45e07 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -273,7 +273,7 @@ pub struct VoteRequestV2 { /// Vote itself, sent from safekeeper to proposer #[derive(Debug, Serialize)] pub struct VoteResponse { - generation: Generation, + generation: Generation, // membership conf generation pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date. vote_given: bool, // Safekeeper flush_lsn (end of WAL) + history of term switches allow @@ -289,7 +289,7 @@ pub struct VoteResponse { */ #[derive(Debug)] pub struct ProposerElected { - pub generation: Generation, + pub generation: Generation, // membership conf generation pub term: Term, pub start_streaming_at: Lsn, pub term_history: TermHistory, @@ -304,6 +304,22 @@ pub struct AppendRequest { } #[derive(Debug, Clone, Deserialize)] pub struct AppendRequestHeader { + pub generation: Generation, // membership conf generation + // safekeeper's current term; if it is higher than proposer's, the compute is out of date. + pub term: Term, + /// start position of message in WAL + pub begin_lsn: Lsn, + /// end position of message in WAL + pub end_lsn: Lsn, + /// LSN committed by quorum of safekeepers + pub commit_lsn: Lsn, + /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper + pub truncate_lsn: Lsn, +} + +/// V2 of the message; exists as a struct because we (de)serialized it as is. +#[derive(Debug, Clone, Deserialize)] +pub struct AppendRequestHeaderV2 { // safekeeper's current term; if it is higher than proposer's, the compute is out of date. pub term: Term, // TODO: remove this field from the protocol, it in unused -- LSN of term @@ -474,9 +490,7 @@ impl ProposerAcceptorMessage { /// Parse proposer message. pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result { - // TODO remove after converting all msgs - let t = msg_bytes[0] as char; - if proto_version == SK_PROTO_VERSION_3 && (t == 'g' || t == 'v' || t == 'e') { + if proto_version == SK_PROTO_VERSION_3 { if msg_bytes.is_empty() { bail!("ProposerAcceptorMessage is not complete: missing tag"); } @@ -535,6 +549,58 @@ impl ProposerAcceptorMessage { }; Ok(ProposerAcceptorMessage::Elected(msg)) } + 'a' => { + let generation = msg_bytes + .get_u32_f() + .with_context(|| "reading generation")?; + let term = msg_bytes.get_u64_f().with_context(|| "reading term")?; + let begin_lsn: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading begin_lsn")? + .into(); + let end_lsn: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading end_lsn")? + .into(); + let commit_lsn: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading commit_lsn")? + .into(); + let truncate_lsn: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading truncate_lsn")? + .into(); + let hdr = AppendRequestHeader { + generation, + term, + begin_lsn, + end_lsn, + commit_lsn, + truncate_lsn, + }; + let rec_size = hdr + .end_lsn + .checked_sub(hdr.begin_lsn) + .context("begin_lsn > end_lsn in AppendRequest")? + .0 as usize; + if rec_size > MAX_SEND_SIZE { + bail!( + "AppendRequest is longer than MAX_SEND_SIZE ({})", + MAX_SEND_SIZE + ); + } + if msg_bytes.remaining() < rec_size { + bail!( + "reading WAL: only {} bytes left, wanted {}", + msg_bytes.remaining(), + rec_size + ); + } + let wal_data = msg_bytes.copy_to_bytes(rec_size); + let msg = AppendRequest { h: hdr, wal_data }; + + Ok(ProposerAcceptorMessage::AppendRequest(msg)) + } _ => bail!("unknown proposer-acceptor message tag: {}", tag), } // TODO remove proto_version == 3 after converting all msgs @@ -590,7 +656,15 @@ impl ProposerAcceptorMessage { } 'a' => { // read header followed by wal data - let hdr = AppendRequestHeader::des_from(&mut stream)?; + let hdrv2 = AppendRequestHeaderV2::des_from(&mut stream)?; + let hdr = AppendRequestHeader { + generation: INVALID_GENERATION, + term: hdrv2.term, + begin_lsn: hdrv2.begin_lsn, + end_lsn: hdrv2.end_lsn, + commit_lsn: hdrv2.commit_lsn, + truncate_lsn: hdrv2.truncate_lsn, + }; let rec_size = hdr .end_lsn .checked_sub(hdr.begin_lsn) @@ -606,6 +680,7 @@ impl ProposerAcceptorMessage { let mut wal_data_vec: Vec = vec![0; rec_size]; stream.read_exact(&mut wal_data_vec)?; let wal_data = Bytes::from(wal_data_vec); + let msg = AppendRequest { h: hdr, wal_data }; Ok(ProposerAcceptorMessage::AppendRequest(msg)) @@ -636,13 +711,12 @@ impl ProposerAcceptorMessage { Self::AppendRequest(AppendRequest { h: AppendRequestHeader { + generation: _, term: _, - term_start_lsn: _, begin_lsn: _, end_lsn: _, commit_lsn: _, truncate_lsn: _, - proposer_uuid: _, }, wal_data, }) => wal_data.len(), @@ -650,13 +724,12 @@ impl ProposerAcceptorMessage { Self::NoFlushAppendRequest(AppendRequest { h: AppendRequestHeader { + generation: _, term: _, - term_start_lsn: _, begin_lsn: _, end_lsn: _, commit_lsn: _, truncate_lsn: _, - proposer_uuid: _, }, wal_data, }) => wal_data.len(), @@ -1201,10 +1274,8 @@ where ); } - // Now we know that we are in the same term as the proposer, - // processing the message. - - self.state.inmem.proposer_uuid = msg.h.proposer_uuid; + // Now we know that we are in the same term as the proposer, process the + // message. // do the job if !msg.wal_data.is_empty() {