move test message tag to 99 and represent Fe message tag as enum, like we do for Be message

This commit is contained in:
Christian Schwarz
2025-01-16 18:07:56 +01:00
parent c19a16792a
commit 0c3ab9c494
2 changed files with 107 additions and 59 deletions

View File

@@ -1417,6 +1417,21 @@ pub enum PagestreamBeMessage {
Test(PagestreamTestResponse),
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamFeMessageTag {
Exists = 0,
Nblocks = 1,
GetPage = 2,
Error = 3,
DbSize = 4,
GetSlruSegment = 5,
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 99,
}
// Keep in sync with `pagestore_client.h`
#[repr(u8)]
enum PagestreamBeMessageTag {
@@ -1426,10 +1441,29 @@ enum PagestreamBeMessageTag {
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
/// Test message discrimimant is unstable
/* future tags above this line */
/// For testing purposes, not available in production.
#[cfg(feature = "testing")]
Test = 106,
Test = 199,
}
impl TryFrom<u8> for PagestreamFeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
match value {
0 => Ok(PagestreamFeMessageTag::Exists),
1 => Ok(PagestreamFeMessageTag::Nblocks),
2 => Ok(PagestreamFeMessageTag::GetPage),
3 => Ok(PagestreamFeMessageTag::Error),
4 => Ok(PagestreamFeMessageTag::DbSize),
5 => Ok(PagestreamFeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
99 => Ok(PagestreamFeMessageTag::Test),
_ => Err(value),
}
}
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
fn try_from(value: u8) -> Result<Self, u8> {
@@ -1441,7 +1475,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
#[cfg(feature = "testing")]
106 => Ok(PagestreamBeMessageTag::Test),
199 => Ok(PagestreamBeMessageTag::Test),
_ => Err(value),
}
}
@@ -1592,7 +1626,7 @@ impl PagestreamFeMessage {
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(PagestreamFeMessageTag::Exists as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1603,7 +1637,7 @@ impl PagestreamFeMessage {
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(PagestreamFeMessageTag::Nblocks as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1614,7 +1648,7 @@ impl PagestreamFeMessage {
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(PagestreamFeMessageTag::GetPage as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1626,7 +1660,7 @@ impl PagestreamFeMessage {
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(PagestreamFeMessageTag::DbSize as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1634,7 +1668,7 @@ impl PagestreamFeMessage {
}
Self::GetSlruSegment(req) => {
bytes.put_u8(4);
bytes.put_u8(PagestreamFeMessageTag::GetSlruSegment as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1643,7 +1677,7 @@ impl PagestreamFeMessage {
}
#[cfg(feature = "testing")]
Self::Test(req) => {
bytes.put_u8(5);
bytes.put_u8(PagestreamFeMessageTag::Test as u8);
bytes.put_u64(req.hdr.reqid);
bytes.put_u64(req.hdr.request_lsn.0);
bytes.put_u64(req.hdr.not_modified_since.0);
@@ -1679,56 +1713,66 @@ impl PagestreamFeMessage {
),
};
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
match PagestreamFeMessageTag::try_from(msg_tag)
.map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))?
{
PagestreamFeMessageTag::Exists => {
Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::Nblocks => {
Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
}))
}
PagestreamFeMessageTag::GetPage => {
Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
}))
}
PagestreamFeMessageTag::DbSize => {
Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,
not_modified_since,
},
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
}))
}
PagestreamFeMessageTag::GetSlruSegment => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
hdr: PagestreamRequest {
reqid,
@@ -1740,7 +1784,7 @@ impl PagestreamFeMessage {
},
)),
#[cfg(feature = "testing")]
5 => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
PagestreamFeMessageTag::Test => Ok(PagestreamFeMessage::Test(PagestreamTestRequest {
hdr: PagestreamRequest {
reqid,
request_lsn,

View File

@@ -34,6 +34,8 @@ typedef enum
T_NeonGetPageRequest,
T_NeonDbSizeRequest,
T_NeonGetSlruSegmentRequest,
/* future tags above this line */
T_NeonTestRequest = 99, /* only in cfg(feature = "testing") */
/* pagestore -> pagestore_client */
T_NeonExistsResponse = 100,
@@ -42,6 +44,8 @@ typedef enum
T_NeonErrorResponse,
T_NeonDbSizeResponse,
T_NeonGetSlruSegmentResponse,
/* future tags above this line */
T_NeonTestResponse = 199, /* only in cfg(feature = "testing") */
} NeonMessageTag;
typedef uint64 NeonRequestId;