fix unit tests

This commit is contained in:
Arseny Sher
2025-02-04 11:22:35 +01:00
parent 42059b027e
commit db97d3f140
6 changed files with 55 additions and 31 deletions

View File

@@ -1489,7 +1489,7 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
for (i = 0; i < nkeys; i++)
{
const char *key = pq_getmsgstring(reply_message);
const char *key = pq_getmsgrawstring(reply_message);
unsigned int value_len = pq_getmsgint(reply_message, sizeof(int32));
if (strcmp(key, "current_timeline_size") == 0)
@@ -2157,6 +2157,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
}
}
wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version);
return false; /* keep the compiler quiet */
}
/*

View File

@@ -117,14 +117,13 @@ pq_getmsgbytes(StringInfo msg, int datalen)
}
/* --------------------------------
* pq_getmsgstring - get a null-terminated text string (with conversion)
* pq_getmsgrawstring - get a null-terminated text string - NO conversion
*
* May return a pointer directly into the message buffer, or a pointer
* to a palloc'd conversion result.
* Returns a pointer directly into the message buffer.
* --------------------------------
*/
const char *
pq_getmsgstring(StringInfo msg)
pq_getmsgrawstring(StringInfo msg)
{
char *str;
int slen;
@@ -166,6 +165,35 @@ pq_sendbytes(StringInfo buf, const void *data, int datalen)
appendBinaryStringInfo(buf, data, datalen);
}
/* --------------------------------
* pq_send_ascii_string - append a null-terminated text string (without conversion)
*
* This function intentionally bypasses encoding conversion, instead just
* silently replacing any non-7-bit-ASCII characters with question marks.
* It is used only when we are having trouble sending an error message to
* the client with normal localization and encoding conversion. The caller
* should already have taken measures to ensure the string is just ASCII;
* the extra work here is just to make certain we don't send a badly encoded
* string to the client (which might or might not be robust about that).
*
* NB: passed text string must be null-terminated, and so is the data
* sent to the frontend.
* --------------------------------
*/
void
pq_send_ascii_string(StringInfo buf, const char *str)
{
while (*str)
{
char ch = *str++;
if (IS_HIGHBIT_SET(ch))
ch = '?';
appendStringInfoCharMacro(buf, ch);
}
appendStringInfoChar(buf, '\0');
}
/*
* Produce a C-string representation of a TimestampTz.
*

View File

@@ -88,13 +88,12 @@ fn bench_process_msg(c: &mut Criterion) {
let (lsn, record) = walgen.next().expect("endless WAL");
ProposerAcceptorMessage::AppendRequest(AppendRequest {
h: AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: Lsn(0),
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
})
@@ -160,13 +159,12 @@ fn bench_wal_acceptor(c: &mut Criterion) {
.take(n)
.map(|(lsn, record)| AppendRequest {
h: AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: Lsn(0),
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
})
@@ -262,13 +260,12 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
runtime.block_on(async {
let reqgen = walgen.take(count).map(|(lsn, record)| AppendRequest {
h: AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: Lsn(0),
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: if commit { lsn } else { Lsn(0) }, // commit previous record
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
});

View File

@@ -785,13 +785,13 @@ impl AcceptorProposerMessage {
if proto_version == SK_PROTO_VERSION_3 {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u8('g' as u8);
buf.put_u8(b'g');
buf.put_u64(msg.node_id.0);
Self::serialize_mconf(buf, &msg.mconf);
buf.put_u64(msg.term)
}
AcceptorProposerMessage::VoteResponse(msg) => {
buf.put_u8('v' as u8);
buf.put_u8(b'v');
buf.put_u32(msg.generation);
buf.put_u64(msg.term);
buf.put_u8(msg.vote_given as u8);
@@ -804,7 +804,7 @@ impl AcceptorProposerMessage {
}
}
AcceptorProposerMessage::AppendResponse(msg) => {
buf.put_u8('a' as u8);
buf.put_u8(b'a');
buf.put_u32(msg.generation);
buf.put_u64(msg.term);
buf.put_u64(msg.flush_lsn.into());
@@ -1465,10 +1465,13 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
// check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest {
generation: 0,
term: 1,
});
let mut vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given),
r => panic!("unexpected response: {:?}", r),
}
@@ -1483,7 +1486,7 @@ mod tests {
// and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request).await;
match vote_resp.unwrap() {
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0),
Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(!resp.vote_given),
r => panic!("unexpected response: {:?}", r),
}
}
@@ -1498,13 +1501,12 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let mut ar_hdr = AppendRequestHeader {
generation: 0,
term: 2,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let mut append_request = AppendRequest {
h: ar_hdr.clone(),
@@ -1512,6 +1514,7 @@ mod tests {
};
let pem = ProposerElected {
generation: 0,
term: 2,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![
@@ -1524,7 +1527,6 @@ mod tests {
lsn: Lsn(3),
},
]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
@@ -1559,26 +1561,25 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let pem = ProposerElected {
generation: 0,
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(1),
}]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();
let ar_hdr = AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let append_request = AppendRequest {
h: ar_hdr.clone(),

View File

@@ -73,10 +73,10 @@ impl Env {
// Emulate an initial election.
safekeeper
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
generation: 0,
term: 1,
start_streaming_at: start_lsn,
term_history: TermHistory(vec![(1, start_lsn).into()]),
timeline_start_lsn: start_lsn,
}))
.await?;
@@ -142,13 +142,12 @@ impl Env {
let req = AppendRequest {
h: AppendRequestHeader {
generation: 0,
term: 1,
term_start_lsn: start_lsn,
begin_lsn: lsn,
end_lsn: lsn + record.len() as u64,
commit_lsn: lsn,
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
},
wal_data: record,
};

View File

@@ -15,9 +15,7 @@ use desim::{
};
use http::Uri;
use safekeeper::{
safekeeper::{
ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION,
},
safekeeper::{ProposerAcceptorMessage, SafeKeeper, SK_PROTO_VERSION_3, UNKNOWN_SERVER_VERSION},
state::{TimelinePersistentState, TimelineState},
timeline::TimelineError,
wal_storage::Storage,
@@ -287,7 +285,7 @@ impl ConnState {
bail!("finished processing START_REPLICATION")
}
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?;
let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTO_VERSION_3)?;
debug!("got msg: {:?}", msg);
self.process(msg, global)
} else {
@@ -403,7 +401,7 @@ impl ConnState {
// TODO: if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
let mut buf = BytesMut::with_capacity(128);
reply.serialize(&mut buf)?;
reply.serialize(&mut buf, SK_PROTO_VERSION_3)?;
self.tcp.send(AnyMessage::Bytes(buf.into()));
}