pageserver - separate pagestream messages

This commit is contained in:
Patrick Insinger
2021-05-17 15:11:03 -04:00
committed by Patrick Insinger
parent ab2f0ad1a8
commit f954d5c501

View File

@@ -43,13 +43,7 @@ enum FeMessage {
Close(FeCloseMessage),
Sync,
Terminate,
//
// All that messages are actually CopyData from libpq point of view.
//
ZenithExistsRequest(ZenithRequest),
ZenithNblocksRequest(ZenithRequest),
ZenithReadRequest(ZenithRequest),
CopyData(Bytes),
}
#[derive(Debug)]
@@ -66,19 +60,27 @@ enum BeMessage {
DataRow(Bytes),
CommandComplete,
ControlFile,
CopyData(Bytes),
}
//
// All that messages are actually CopyData from libpq point of view.
//
ZenithStatusResponse(ZenithStatusResponse),
ZenithNblocksResponse(ZenithStatusResponse),
ZenithReadResponse(ZenithReadResponse),
// Wrapped in libpq CopyData
enum PagestreamFeMessage {
Exists(PagestreamRequest),
Nblocks(PagestreamRequest),
Read(PagestreamRequest),
}
// Wrapped in libpq CopyData
enum PagestreamBeMessage {
Status(PagestreamStatusResponse),
Nblocks(PagestreamStatusResponse),
Read(PagestreamReadResponse),
}
const HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(Bytes::from_static(b"hello world"));
#[derive(Debug)]
struct ZenithRequest {
struct PagestreamRequest {
spcnode: u32,
dbnode: u32,
relnode: u32,
@@ -88,13 +90,13 @@ struct ZenithRequest {
}
#[derive(Debug)]
struct ZenithStatusResponse {
struct PagestreamStatusResponse {
ok: bool,
n_blocks: u32,
}
#[derive(Debug)]
struct ZenithReadResponse {
struct PagestreamReadResponse {
ok: bool,
n_blocks: u32,
page: Bytes,
@@ -339,7 +341,7 @@ impl FeMessage {
let mut body_buf: Vec<u8> = vec![0; bodylen as usize];
stream.read_exact(&mut body_buf)?;
let mut body = Bytes::from(body_buf);
let body = Bytes::from(body_buf);
// Parse it
match tag {
@@ -351,35 +353,70 @@ impl FeMessage {
b'C' => Ok(Some(FeCloseMessage::parse(body)?)),
b'S' => Ok(Some(FeMessage::Sync)),
b'X' => Ok(Some(FeMessage::Terminate)),
b'd' => {
let smgr_tag = body.get_u8();
let zreq = ZenithRequest {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
blkno: body.get_u32(),
lsn: Lsn::from(body.get_u64()),
};
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
match smgr_tag {
0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))),
1 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))),
2 => Ok(Some(FeMessage::ZenithReadRequest(zreq))),
_ => Err(anyhow!(
"unknown smgr message tag: {},'{:?}'",
smgr_tag,
body
)),
}
}
b'd' => Ok(Some(FeMessage::CopyData(body))),
tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)),
}
}
}
impl PagestreamFeMessage {
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
let smgr_tag = body.get_u8();
let zreq = PagestreamRequest {
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
blkno: body.get_u32(),
lsn: Lsn::from(body.get_u64()),
};
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
match smgr_tag {
0 => Ok(PagestreamFeMessage::Exists(zreq)),
1 => Ok(PagestreamFeMessage::Nblocks(zreq)),
2 => Ok(PagestreamFeMessage::Read(zreq)),
_ => Err(anyhow!(
"unknown smgr message tag: {},'{:?}'",
smgr_tag,
body
)),
}
}
}
impl PagestreamBeMessage {
fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Status(resp) => {
bytes.put_u8(100); /* tag from pagestore_client.h */
bytes.put_u8(resp.ok as u8);
bytes.put_u32(resp.n_blocks);
}
Self::Nblocks(resp) => {
bytes.put_u8(101); /* tag from pagestore_client.h */
bytes.put_u8(resp.ok as u8);
bytes.put_u32(resp.n_blocks);
}
Self::Read(resp) => {
bytes.put_u8(102); /* tag from pagestore_client.h */
bytes.put_u8(resp.ok as u8);
bytes.put_u32(resp.n_blocks);
bytes.put(&resp.page[..]);
}
}
bytes.into()
}
}
///////////////////////////////////////////////////////////////////////////////
///
@@ -534,30 +571,10 @@ impl Connection {
self.stream.write_all(&b)?;
}
BeMessage::ZenithStatusResponse(resp) => {
BeMessage::CopyData(data) => {
self.stream.write_u8(b'd')?;
self.stream.write_u32::<BE>(4 + 1 + 1 + 4)?;
self.stream.write_u8(100)?; /* tag from pagestore_client.h */
self.stream.write_u8(resp.ok as u8)?;
self.stream.write_u32::<BE>(resp.n_blocks)?;
}
BeMessage::ZenithNblocksResponse(resp) => {
self.stream.write_u8(b'd')?;
self.stream.write_u32::<BE>(4 + 1 + 1 + 4)?;
self.stream.write_u8(101)?; /* tag from pagestore_client.h */
self.stream.write_u8(resp.ok as u8)?;
self.stream.write_u32::<BE>(resp.n_blocks)?;
}
BeMessage::ZenithReadResponse(resp) => {
self.stream.write_u8(b'd')?;
self.stream
.write_u32::<BE>(4 + 1 + 1 + 4 + resp.page.len() as u32)?;
self.stream.write_u8(102)?; /* tag from pagestore_client.h */
self.stream.write_u8(resp.ok as u8)?;
self.stream.write_u32::<BE>(resp.n_blocks)?;
self.stream.write_all(&resp.page.clone())?;
self.stream.write_u32::<BE>(4 + data.len() as u32)?;
self.stream.write_all(&data)?;
}
}
@@ -774,20 +791,18 @@ impl Connection {
self.stream.write_i16::<BE>(0)?; /* numAttributes */
self.stream.flush()?;
loop {
let message = self.read_message()?;
while let Some(message) = self.read_message()? {
trace!("query({:?}): {:?}", timelineid, message);
if let Some(m) = &message {
trace!("query({:?}): {:?}", timelineid, m);
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
_ => continue,
};
if message.is_none() {
// connection was closed
return Ok(());
}
let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
match message {
Some(FeMessage::ZenithExistsRequest(req)) => {
let response = match zenith_fe_msg {
PagestreamFeMessage::Exists(req) => {
let tag = RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
@@ -797,12 +812,12 @@ impl Connection {
let exist = timeline.get_relsize_exists(tag, req.lsn).unwrap_or(false);
self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse {
PagestreamBeMessage::Status(PagestreamStatusResponse {
ok: exist,
n_blocks: 0,
}))?
})
}
Some(FeMessage::ZenithNblocksRequest(req)) => {
PagestreamFeMessage::Nblocks(req) => {
let tag = RelTag {
spcnode: req.spcnode,
dbnode: req.dbnode,
@@ -812,12 +827,9 @@ impl Connection {
let n_blocks = timeline.get_relsize(tag, req.lsn).unwrap_or(0);
self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse {
ok: true,
n_blocks,
}))?
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })
}
Some(FeMessage::ZenithReadRequest(req)) => {
PagestreamFeMessage::Read(req) => {
let buf_tag = BufferTag {
rel: RelTag {
spcnode: req.spcnode,
@@ -828,28 +840,31 @@ impl Connection {
blknum: req.blkno,
};
let msg = match timeline.get_page_at_lsn(buf_tag, req.lsn) {
Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse {
let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) {
Ok(p) => PagestreamReadResponse {
ok: true,
n_blocks: 0,
page: p,
}),
},
Err(e) => {
const ZERO_PAGE: [u8; 8192] = [0; 8192];
error!("get_page_at_lsn: {}", e);
BeMessage::ZenithReadResponse(ZenithReadResponse {
PagestreamReadResponse {
ok: false,
n_blocks: 0,
page: Bytes::from_static(&ZERO_PAGE),
})
}
}
};
self.write_message(&msg)?
PagestreamBeMessage::Read(read_response)
}
_ => {}
}
};
self.write_message(&BeMessage::CopyData(response.serialize()))?;
}
Ok(())
}
fn handle_basebackup_request(