convert ProposerGreeting

This commit is contained in:
Arseny Sher
2025-01-22 11:57:27 +01:00
parent 8be17724d8
commit c19a8b69f2
4 changed files with 251 additions and 86 deletions

View File

@@ -144,13 +144,12 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
/* Fill the greeting package */
wp->greetRequest.pam.tag = 'g';
wp->greetRequest.proto_version = wp->config->proto_version;
if (!wp->config->neon_tenant)
wp_log(FATAL, "neon.tenant_id is not provided");
wp->greetRequest.tenant_id = wp->config->neon_tenant;
if (!wp->config->neon_timeline)
wp_log(FATAL, "neon.timeline_id is not provided");
wp->greetRequest.timeline_id = wp->config->neon_tenant;
wp->greetRequest.timeline_id = wp->config->neon_timeline;
wp->greetRequest.pg_version = PG_VERSION_NUM;
wp->greetRequest.system_id = wp->config->systemId;
wp->greetRequest.wal_seg_size = wp->config->wal_segment_size;
@@ -1771,6 +1770,35 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk)
}
}
/* Serialize MembershipConfiguration into buf. */
static void
MembershipConfigurationSerialize(MembershipConfiguration *mconf, StringInfo buf)
{
int i;
pq_sendint32(buf, mconf->generation);
pq_sendint32(buf, mconf->members.len);
for (i = 0; i < mconf->members.len; i++)
{
pq_sendint64(buf, mconf->members.m[i].node_id);
pq_send_ascii_string(buf, mconf->members.m[i].host);
pq_sendint16(buf, mconf->members.m[i].port);
}
/*
* There is no special mark for absent new_members; zero members in
* invalid, so zero len means absent.
*/
pq_sendint32(buf, mconf->new_members.len);
for (i = 0; i < mconf->new_members.len; i++)
{
pq_sendint64(buf, mconf->new_members.m[i].node_id);
pq_send_ascii_string(buf, mconf->new_members.m[i].host);
pq_sendint16(buf, mconf->new_members.m[i].port);
}
}
/* Serialize proposer -> acceptor message into buf using specified version */
static void
PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version)
@@ -1780,58 +1808,54 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf
resetStringInfo(buf);
/*
* v2 sends structs for some messages as is, so commonly send tag only for
* v3
*/
if (proto_version == 3)
pq_sendint8(buf, msg->tag);
switch (msg->tag)
if (proto_version == 3 && msg->tag == 'g')
//removeme tag check after converting all msgs
{
case 'g':
{
if (proto_version == 3)
/*
* v2 sends structs for some messages as is, so commonly send tag only
* for v3
*/
if (proto_version == 3)
pq_sendint8(buf, msg->tag);
switch (msg->tag)
{
case 'g':
{
ProposerGreeting *m = (ProposerGreeting *) msg;
pq_send_ascii_string(buf, m->tenant_id);
pq_send_ascii_string(buf, m->timeline_id);
MembershipConfigurationSerialize(&m->mconf, buf);
pq_sendint32(buf, m->pg_version);
pq_sendint64(buf, m->system_id);
pq_sendint32(buf, m->wal_seg_size);
break;
}
else if (proto_version == 2)
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
}
return;
}
if (proto_version == 2 || proto_version == 3) // TODO remove proto_version == 3 after converting all msgs
//removeme tag check after converting all msgs
{
switch (msg->tag)
{
case 'g':
{
/* v2 sent struct as is */
pq_sendbytes(buf, (char *) &wp->greetRequestV2, sizeof(wp->greetRequestV2));
break;
}
wp_log(FATAL, "unexpected proto_version %d", proto_version);
break; /* keep the compiler quiet */
}
case 'v':
{
if (proto_version == 3)
{
VoteRequest *m = (VoteRequest *) msg;
Assert(false);
break;
}
else if (proto_version == 2)
case 'v':
{
/* v2 sent struct as is */
pq_sendbytes(buf, (char *) &wp->voteRequest, sizeof(wp->voteRequest));
break;
}
wp_log(FATAL, "unexpected proto_version %d", proto_version);
break; /* keep the compiler quiet */
}
case 'e':
{
if (proto_version == 3)
{
Assert(false);
break;
}
else if (proto_version == 2)
case 'e':
{
ProposerElected *m = (ProposerElected *) msg;
@@ -1847,33 +1871,24 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf
pq_sendint64_le(buf, m->timelineStartLsn);
break;
}
wp_log(FATAL, "unexpected proto_version %d", proto_version);
break; /* keep the compiler quiet */
}
case 'a':
{
case 'a':
/*
* Note: this serializes only AppendRequestHeader, caller is
* expected to append WAL data later.
*/
if (proto_version == 3)
{
Assert(false);
break;
}
else if (proto_version == 2)
{
/* v2 sent struct as is */
pq_sendbytes(buf, (char *) msg, sizeof(AppendRequestHeader));
break;
}
wp_log(FATAL, "unexpected proto_version %d", proto_version);
break; /* keep the compiler quiet */
}
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
}
return;
}
wp_log(FATAL, "unexpected proto_version %d", proto_version);
}
/*
@@ -1913,6 +1928,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
* If the read needs more polling, we return 'false' and keep the state
* unmodified, waiting until it becomes read-ready to try again. If it fully
* failed, a warning is emitted and the connection is reset.
*
* Note: it pallocs if needed, i.e. for AcceptorGreeting and VoteResponse fields.
*/
static bool
AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)

View File

@@ -156,8 +156,8 @@ typedef struct SafekeeperId
/* Set of safekeepers. */
typedef struct MemberSet
{
uint32 n_members;
SafekeeperId *members;
uint32 len; /* number of members */
SafekeeperId *m; /* ids themselves */
} MemberSet;
/* Timeline safekeeper membership configuration. */
@@ -182,7 +182,6 @@ typedef struct ProposerAcceptorMessage
typedef struct ProposerGreeting
{
ProposerAcceptorMessage pam; /* message tag */
uint32 proto_version; /* proposer-safekeeper protocol version */
/*
* tenant/timeline ids as C strings with standard hex notation for ease of
@@ -191,6 +190,7 @@ typedef struct ProposerGreeting
*/
char *tenant_id;
char *timeline_id;
/* Full conf is carried to allow safekeeper switch */
MembershipConfiguration mconf;
/*

View File

@@ -342,8 +342,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
let tli = match next_msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with walproposer {} sysid {} timeline {}",
self.peer_addr, greeting.system_id, greeting.tli,
"start handshake with walproposer {} sysid {}",
self.peer_addr, greeting.system_id,
);
let server_info = ServerInfo {
pg_version: greeting.pg_version,

View File

@@ -5,6 +5,10 @@ use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
use safekeeper_api::membership;
use safekeeper_api::membership::MemberSet;
use safekeeper_api::membership::SafekeeperId;
use safekeeper_api::membership::INVALID_GENERATION;
use safekeeper_api::models::HotStandbyFeedback;
use safekeeper_api::Term;
use serde::{Deserialize, Serialize};
@@ -12,6 +16,7 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use std::str::FromStr;
use storage_broker::proto::SafekeeperTimelineInfo;
use tracing::*;
@@ -197,6 +202,18 @@ impl AcceptorState {
/// Initial Proposer -> Acceptor message
#[derive(Debug, Deserialize)]
pub struct ProposerGreeting {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mconf: membership::Configuration,
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
pub wal_seg_size: u32,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
#[derive(Debug, Deserialize)]
pub struct ProposerGreetingV2 {
/// proposer-acceptor protocol version
pub protocol_version: u32,
/// Postgres server version
@@ -315,29 +332,178 @@ pub enum ProposerAcceptorMessage {
FlushWAL,
}
/// Augment Bytes with fallible get_uN where N is number of bytes methods.
/// All reads are in network (big endian) order.
trait BytesF {
fn get_u8_f(&mut self) -> Result<u8>;
fn get_u16_f(&mut self) -> Result<u16>;
fn get_u32_f(&mut self) -> Result<u32>;
fn get_u64_f(&mut self) -> Result<u64>;
}
impl BytesF for Bytes {
fn get_u8_f(&mut self) -> Result<u8> {
if self.is_empty() {
bail!("no bytes left, expected 1");
}
Ok(self.get_u8())
}
fn get_u16_f(&mut self) -> Result<u16> {
if self.is_empty() {
bail!("no bytes left, expected 2");
}
Ok(self.get_u16())
}
fn get_u32_f(&mut self) -> Result<u32> {
if self.remaining() < 4 {
bail!("only {} bytes left, expected 4", self.remaining());
}
Ok(self.get_u32())
}
fn get_u64_f(&mut self) -> Result<u64> {
if self.remaining() < 8 {
bail!("only {} bytes left, expected 8", self.remaining());
}
Ok(self.get_u64())
}
}
impl ProposerAcceptorMessage {
/// Read cstring from Bytes.
fn read_cstr(buf: &mut Bytes) -> Result<String> {
let pos = buf
.iter()
.position(|x| *x == 0)
.ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?;
let result = buf.split_to(pos);
buf.advance(1); // drop the null terminator
match std::str::from_utf8(&result) {
Ok(s) => Ok(s.to_string()),
Err(e) => bail!("invalid utf8 in cstring: {}", e),
}
}
/// Read membership::Configuration from Bytes.
fn read_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
let generation = buf.get_u32_f().with_context(|| "reading generation")?;
let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
// Main member set must have at least someone in valid configuration.
// Empty conf is allowed until we fully migrate.
if generation != INVALID_GENERATION && members_len == 0 {
bail!("empty members_len");
}
let mut members = MemberSet::empty();
for i in 0..members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading member {} node_id", i))?;
let host =
Self::read_cstr(buf).with_context(|| format!("reading member {} host", i))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading member {} port", i))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
pg_port,
};
members.add(sk)?;
}
let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?;
// Non joint conf.
if new_members_len == 0 {
Ok(membership::Configuration {
generation,
members,
new_members: None,
})
} else {
let mut new_members = MemberSet::empty();
for i in 0..new_members_len {
let id = buf
.get_u64_f()
.with_context(|| format!("reading new member {} node_id", i))?;
let host = Self::read_cstr(buf)
.with_context(|| format!("reading new member {} host", i))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading new member {} port", i))?;
let sk = SafekeeperId {
id: NodeId(id),
host,
pg_port,
};
new_members.add(sk)?;
}
Ok(membership::Configuration {
generation,
members,
new_members: Some(new_members),
})
}
}
/// Parse proposer message.
pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
if proto_version == 3 {
// TODO remove after converting all msgs
let t = msg_bytes[0] as char;
if proto_version == 3 && t == 'g' {
if msg_bytes.is_empty() {
bail!("ProposerAcceptorMessage is not complete: missing tag");
}
let tag = msg_bytes.get_u8() as char;
let tag = msg_bytes.get_u8_f().with_context(|| {
"ProposerAcceptorMessage is not complete: missing tag".to_string()
})? as char;
match tag {
'g' => {
bail!("not implemented");
let tenant_id_str =
Self::read_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
let tenant_id = TenantId::from_str(&tenant_id_str)?;
let timeline_id_str =
Self::read_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
let timeline_id = TimelineId::from_str(&timeline_id_str)?;
let mconf = Self::read_mconf(&mut msg_bytes)?;
let pg_version = msg_bytes
.get_u32_f()
.with_context(|| "reading pg_version")?;
let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?;
let wal_seg_size = msg_bytes
.get_u32_f()
.with_context(|| "reading wal_seg_size")?;
let g = ProposerGreeting {
tenant_id,
timeline_id,
mconf,
pg_version,
system_id,
wal_seg_size,
};
Ok(ProposerAcceptorMessage::Greeting(g))
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
} else if proto_version == 2 {
// TODO remove proto_version == 3 after converting all msgs
} else if proto_version == 2 || proto_version == 3 {
// xxx using Reader is inefficient but easy to work with bincode
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
'g' => {
let msg = ProposerGreeting::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::Greeting(msg))
let msgv2 = ProposerGreetingV2::des_from(&mut stream)?;
let g = ProposerGreeting {
tenant_id: msgv2.tenant_id,
timeline_id: msgv2.timeline_id,
mconf: membership::Configuration {
generation: INVALID_GENERATION,
members: MemberSet::empty(),
new_members: None,
},
pg_version: msgv2.pg_version,
system_id: msgv2.system_id,
wal_seg_size: msgv2.wal_seg_size,
};
Ok(ProposerAcceptorMessage::Greeting(g))
}
'v' => {
let msg = VoteRequest::des_from(&mut stream)?;
@@ -402,16 +568,7 @@ impl ProposerAcceptorMessage {
// We explicitly list all fields, to draw attention here when new fields are added.
let mut size = BASE_SIZE;
size += match self {
Self::Greeting(ProposerGreeting {
protocol_version: _,
pg_version: _,
proposer_id: _,
system_id: _,
timeline_id: _,
tenant_id: _,
tli: _,
wal_seg_size: _,
}) => 0,
Self::Greeting(_) => 0,
Self::VoteRequest(VoteRequest { term: _ }) => 0,
@@ -601,14 +758,6 @@ where
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
// Check protocol compatibility
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
msg.protocol_version,
SK_PROTOCOL_VERSION
);
}
/* Postgres major version mismatch is treated as fatal error
* because safekeepers parse WAL headers and the format
* may change between versions.
@@ -664,9 +813,8 @@ where
}
info!(
"processed greeting from walproposer {}, sending term {:?}",
msg.proposer_id.map(|b| format!("{:X}", b)).join(""),
self.state.acceptor_state.term
"processed greeting {:?} from walproposer, sending term {:?}",
msg, self.state.acceptor_state.term
);
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.state.acceptor_state.term,