diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7ce69ab82b..23126fecf9 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1489,7 +1489,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese for (i = 0; i < nkeys; i++) { - const char *key = pq_getmsgstring(reply_message); + const char *key = pq_getmsgrawstring(reply_message); unsigned int value_len = pq_getmsgint(reply_message, sizeof(int32)); if (strcmp(key, "current_timeline_size") == 0) @@ -2157,6 +2157,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) } } wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version); + return false; /* keep the compiler quiet */ } /* diff --git a/pgxn/neon/walproposer_compat.c b/pgxn/neon/walproposer_compat.c index a3dfe1182e..a986160224 100644 --- a/pgxn/neon/walproposer_compat.c +++ b/pgxn/neon/walproposer_compat.c @@ -117,14 +117,13 @@ pq_getmsgbytes(StringInfo msg, int datalen) } /* -------------------------------- - * pq_getmsgstring - get a null-terminated text string (with conversion) + * pq_getmsgrawstring - get a null-terminated text string - NO conversion * - * May return a pointer directly into the message buffer, or a pointer - * to a palloc'd conversion result. + * Returns a pointer directly into the message buffer. * -------------------------------- */ const char * -pq_getmsgstring(StringInfo msg) +pq_getmsgrawstring(StringInfo msg) { char *str; int slen; @@ -166,6 +165,35 @@ pq_sendbytes(StringInfo buf, const void *data, int datalen) appendBinaryStringInfo(buf, data, datalen); } +/* -------------------------------- + * pq_send_ascii_string - append a null-terminated text string (without conversion) + * + * This function intentionally bypasses encoding conversion, instead just + * silently replacing any non-7-bit-ASCII characters with question marks. + * It is used only when we are having trouble sending an error message to + * the client with normal localization and encoding conversion. The caller + * should already have taken measures to ensure the string is just ASCII; + * the extra work here is just to make certain we don't send a badly encoded + * string to the client (which might or might not be robust about that). + * + * NB: passed text string must be null-terminated, and so is the data + * sent to the frontend. + * -------------------------------- + */ +void +pq_send_ascii_string(StringInfo buf, const char *str) +{ + while (*str) + { + char ch = *str++; + + if (IS_HIGHBIT_SET(ch)) + ch = '?'; + appendStringInfoCharMacro(buf, ch); + } + appendStringInfoChar(buf, '\0'); +} + /* * Produce a C-string representation of a TimestampTz. * diff --git a/safekeeper/benches/receive_wal.rs b/safekeeper/benches/receive_wal.rs index 19c6662e74..934930fde3 100644 --- a/safekeeper/benches/receive_wal.rs +++ b/safekeeper/benches/receive_wal.rs @@ -88,13 +88,12 @@ fn bench_process_msg(c: &mut Criterion) { let (lsn, record) = walgen.next().expect("endless WAL"); ProposerAcceptorMessage::AppendRequest(AppendRequest { h: AppendRequestHeader { + generation: 0, term: 1, - term_start_lsn: Lsn(0), begin_lsn: lsn, end_lsn: lsn + record.len() as u64, commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }, wal_data: record, }) @@ -160,13 +159,12 @@ fn bench_wal_acceptor(c: &mut Criterion) { .take(n) .map(|(lsn, record)| AppendRequest { h: AppendRequestHeader { + generation: 0, term: 1, - term_start_lsn: Lsn(0), begin_lsn: lsn, end_lsn: lsn + record.len() as u64, commit_lsn: Lsn(0), truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }, wal_data: record, }) @@ -262,13 +260,12 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) { runtime.block_on(async { let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest { h: AppendRequestHeader { + generation: 0, term: 1, - term_start_lsn: Lsn(0), begin_lsn: lsn, end_lsn: lsn + record.len() as u64, commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }, wal_data: record, }); diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index cb1b832f79..787edb9759 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -785,13 +785,13 @@ impl AcceptorProposerMessage { if proto_version == SK_PROTO_VERSION_3 { match self { AcceptorProposerMessage::Greeting(msg) => { - buf.put_u8('g' as u8); + buf.put_u8(b'g'); buf.put_u64(msg.node_id.0); Self::serialize_mconf(buf, &msg.mconf); buf.put_u64(msg.term) } AcceptorProposerMessage::VoteResponse(msg) => { - buf.put_u8('v' as u8); + buf.put_u8(b'v'); buf.put_u32(msg.generation); buf.put_u64(msg.term); buf.put_u8(msg.vote_given as u8); @@ -804,7 +804,7 @@ impl AcceptorProposerMessage { } } AcceptorProposerMessage::AppendResponse(msg) => { - buf.put_u8('a' as u8); + buf.put_u8(b'a'); buf.put_u32(msg.generation); buf.put_u64(msg.term); buf.put_u64(msg.flush_lsn.into()); @@ -1465,10 +1465,13 @@ mod tests { let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); // check voting for 1 is ok - let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); + let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { + generation: 0, + term: 1, + }); let mut vote_resp = sk.process_msg(&vote_request).await; match vote_resp.unwrap() { - Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given), r => panic!("unexpected response: {:?}", r), } @@ -1483,7 +1486,7 @@ mod tests { // and ensure voting second time for 1 is not ok vote_resp = sk.process_msg(&vote_request).await; match vote_resp.unwrap() { - Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0), + Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given), r => panic!("unexpected response: {:?}", r), } } @@ -1498,13 +1501,12 @@ mod tests { let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); let mut ar_hdr = AppendRequestHeader { + generation: 0, term: 2, - term_start_lsn: Lsn(3), begin_lsn: Lsn(1), end_lsn: Lsn(2), commit_lsn: Lsn(0), truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }; let mut append_request = AppendRequest { h: ar_hdr.clone(), @@ -1512,6 +1514,7 @@ mod tests { }; let pem = ProposerElected { + generation: 0, term: 2, start_streaming_at: Lsn(1), term_history: TermHistory(vec![ @@ -1524,7 +1527,6 @@ mod tests { lsn: Lsn(3), }, ]), - timeline_start_lsn: Lsn(1), }; sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) .await @@ -1559,26 +1561,25 @@ mod tests { let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap(); let pem = ProposerElected { + generation: 0, term: 1, start_streaming_at: Lsn(1), term_history: TermHistory(vec![TermLsn { term: 1, lsn: Lsn(1), }]), - timeline_start_lsn: Lsn(1), }; sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) .await .unwrap(); let ar_hdr = AppendRequestHeader { + generation: 0, term: 1, - term_start_lsn: Lsn(3), begin_lsn: Lsn(1), end_lsn: Lsn(2), commit_lsn: Lsn(0), truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }; let append_request = AppendRequest { h: ar_hdr.clone(), diff --git a/safekeeper/src/test_utils.rs b/safekeeper/src/test_utils.rs index 4e851c5b3d..2372a306ae 100644 --- a/safekeeper/src/test_utils.rs +++ b/safekeeper/src/test_utils.rs @@ -73,10 +73,10 @@ impl Env { // Emulate an initial election. safekeeper .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected { + generation: 0, term: 1, start_streaming_at: start_lsn, term_history: TermHistory(vec![(1, start_lsn).into()]), - timeline_start_lsn: start_lsn, })) .await?; @@ -142,13 +142,12 @@ impl Env { let req = AppendRequest { h: AppendRequestHeader { + generation: 0, term: 1, - term_start_lsn: start_lsn, begin_lsn: lsn, end_lsn: lsn + record.len() as u64, commit_lsn: lsn, truncate_lsn: Lsn(0), - proposer_uuid: [0; 16], }, wal_data: record, }; diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 0023a4d22a..b9dfabe0d7 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -15,9 +15,7 @@ use desim::{ }; use http::Uri; use safekeeper::{ - safekeeper::{ - ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION, - }, + safekeeper::{ProposerAcceptorMessage, SafeKeeper, SK_PROTO_VERSION_3, UNKNOWN_SERVER_VERSION}, state::{TimelinePersistentState, TimelineState}, timeline::TimelineError, wal_storage::Storage, @@ -287,7 +285,7 @@ impl ConnState { bail!("finished processing START_REPLICATION") } - let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?; + let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTO_VERSION_3)?; debug!("got msg: {:?}", msg); self.process(msg, global) } else { @@ -403,7 +401,7 @@ impl ConnState { // TODO: if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn let mut buf = BytesMut::with_capacity(128); - reply.serialize(&mut buf)?; + reply.serialize(&mut buf, SK_PROTO_VERSION_3)?; self.tcp.send(AnyMessage::Bytes(buf.into())); }