mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
Pass proto selection to safekeeper, prepare parsing v3.
This commit is contained in:
@@ -317,70 +317,78 @@ pub enum ProposerAcceptorMessage {
|
||||
|
||||
impl ProposerAcceptorMessage {
|
||||
/// Parse proposer message.
|
||||
pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
|
||||
if proto_version != SK_PROTOCOL_VERSION {
|
||||
bail!(
|
||||
"incompatible protocol version {}, expected {}",
|
||||
proto_version,
|
||||
SK_PROTOCOL_VERSION
|
||||
);
|
||||
}
|
||||
// xxx using Reader is inefficient but easy to work with bincode
|
||||
let mut stream = msg_bytes.reader();
|
||||
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
|
||||
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
let msg = ProposerGreeting::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::Greeting(msg))
|
||||
pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
|
||||
if proto_version == 3 {
|
||||
if msg_bytes.is_empty() {
|
||||
bail!("ProposerAcceptorMessage is not complete: missing tag");
|
||||
}
|
||||
'v' => {
|
||||
let msg = VoteRequest::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::VoteRequest(msg))
|
||||
let tag = msg_bytes.get_u8() as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
bail!("not implemented");
|
||||
}
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
}
|
||||
'e' => {
|
||||
let mut msg_bytes = stream.into_inner();
|
||||
if msg_bytes.remaining() < 16 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
} else if proto_version == 2 {
|
||||
// xxx using Reader is inefficient but easy to work with bincode
|
||||
let mut stream = msg_bytes.reader();
|
||||
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
|
||||
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
|
||||
match tag {
|
||||
'g' => {
|
||||
let msg = ProposerGreeting::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::Greeting(msg))
|
||||
}
|
||||
let term = msg_bytes.get_u64_le();
|
||||
let start_streaming_at = msg_bytes.get_u64_le().into();
|
||||
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
|
||||
if msg_bytes.remaining() < 8 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
'v' => {
|
||||
let msg = VoteRequest::des_from(&mut stream)?;
|
||||
Ok(ProposerAcceptorMessage::VoteRequest(msg))
|
||||
}
|
||||
let timeline_start_lsn = msg_bytes.get_u64_le().into();
|
||||
let msg = ProposerElected {
|
||||
term,
|
||||
start_streaming_at,
|
||||
timeline_start_lsn,
|
||||
term_history,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Elected(msg))
|
||||
}
|
||||
'a' => {
|
||||
// read header followed by wal data
|
||||
let hdr = AppendRequestHeader::des_from(&mut stream)?;
|
||||
let rec_size = hdr
|
||||
.end_lsn
|
||||
.checked_sub(hdr.begin_lsn)
|
||||
.context("begin_lsn > end_lsn in AppendRequest")?
|
||||
.0 as usize;
|
||||
if rec_size > MAX_SEND_SIZE {
|
||||
bail!(
|
||||
"AppendRequest is longer than MAX_SEND_SIZE ({})",
|
||||
MAX_SEND_SIZE
|
||||
);
|
||||
'e' => {
|
||||
let mut msg_bytes = stream.into_inner();
|
||||
if msg_bytes.remaining() < 16 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let term = msg_bytes.get_u64_le();
|
||||
let start_streaming_at = msg_bytes.get_u64_le().into();
|
||||
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
|
||||
if msg_bytes.remaining() < 8 {
|
||||
bail!("ProposerElected message is not complete");
|
||||
}
|
||||
let timeline_start_lsn = msg_bytes.get_u64_le().into();
|
||||
let msg = ProposerElected {
|
||||
term,
|
||||
start_streaming_at,
|
||||
timeline_start_lsn,
|
||||
term_history,
|
||||
};
|
||||
Ok(ProposerAcceptorMessage::Elected(msg))
|
||||
}
|
||||
'a' => {
|
||||
// read header followed by wal data
|
||||
let hdr = AppendRequestHeader::des_from(&mut stream)?;
|
||||
let rec_size = hdr
|
||||
.end_lsn
|
||||
.checked_sub(hdr.begin_lsn)
|
||||
.context("begin_lsn > end_lsn in AppendRequest")?
|
||||
.0 as usize;
|
||||
if rec_size > MAX_SEND_SIZE {
|
||||
bail!(
|
||||
"AppendRequest is longer than MAX_SEND_SIZE ({})",
|
||||
MAX_SEND_SIZE
|
||||
);
|
||||
}
|
||||
|
||||
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
|
||||
stream.read_exact(&mut wal_data_vec)?;
|
||||
let wal_data = Bytes::from(wal_data_vec);
|
||||
let msg = AppendRequest { h: hdr, wal_data };
|
||||
let mut wal_data_vec: Vec<u8> = vec![0; rec_size];
|
||||
stream.read_exact(&mut wal_data_vec)?;
|
||||
let wal_data = Bytes::from(wal_data_vec);
|
||||
let msg = AppendRequest { h: hdr, wal_data };
|
||||
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
Ok(ProposerAcceptorMessage::AppendRequest(msg))
|
||||
}
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
}
|
||||
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
|
||||
} else {
|
||||
bail!("unsupported protocol version {}", proto_version);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user