mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
Improve the protocol between Postgres and page server.
- Use different message formats for different kinds of response messages. - Add an Error message, for passing errors from page server to Postgres. Previously, we would respond to 'exists' request with 'false', and to 'nblocks' request with 0, if an error happened. Fix those to return an error message to the client. GetPage requests had a mechanism to return an error, but it was just a flag with no error message. - Add a flag to requests, to indicate that we actually want the latest page version on the timeline, and the LSN is just a hint that we know that there haven't been any modifications since that LSN. The flag isn't used for anything yet, but I'm planning to use it to fix https://github.com/zenithdb/zenith/issues/567
This commit is contained in:
@@ -42,66 +42,103 @@ use crate::PageServerConf;
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
enum PagestreamFeMessage {
|
||||
Exists(PagestreamRequest),
|
||||
Nblocks(PagestreamRequest),
|
||||
Read(PagestreamRequest),
|
||||
Exists(PagestreamExistsRequest),
|
||||
Nblocks(PagestreamNblocksRequest),
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
enum PagestreamBeMessage {
|
||||
Status(PagestreamStatusResponse),
|
||||
Nblocks(PagestreamStatusResponse),
|
||||
Read(PagestreamReadResponse),
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
GetPage(PagestreamGetPageResponse),
|
||||
Error(PagestreamErrorResponse),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamRequest {
|
||||
spcnode: u32,
|
||||
dbnode: u32,
|
||||
relnode: u32,
|
||||
forknum: u8,
|
||||
blkno: u32,
|
||||
struct PagestreamExistsRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamStatusResponse {
|
||||
ok: bool,
|
||||
struct PagestreamNblocksRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamGetPageRequest {
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
rel: RelTag,
|
||||
blkno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamExistsResponse {
|
||||
exists: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamNblocksResponse {
|
||||
n_blocks: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamReadResponse {
|
||||
ok: bool,
|
||||
n_blocks: u32,
|
||||
struct PagestreamGetPageResponse {
|
||||
page: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamErrorResponse {
|
||||
message: String,
|
||||
}
|
||||
|
||||
impl PagestreamFeMessage {
|
||||
fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
|
||||
// TODO these gets can fail
|
||||
|
||||
let smgr_tag = body.get_u8();
|
||||
let zreq = PagestreamRequest {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
blkno: body.get_u32(),
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
};
|
||||
|
||||
// these correspond to the ZenithMessageTag enum in pagestore_client.h
|
||||
//
|
||||
// TODO: consider using protobuf or serde bincode for less error prone
|
||||
// serialization.
|
||||
match smgr_tag {
|
||||
0 => Ok(PagestreamFeMessage::Exists(zreq)),
|
||||
1 => Ok(PagestreamFeMessage::Nblocks(zreq)),
|
||||
2 => Ok(PagestreamFeMessage::Read(zreq)),
|
||||
_ => Err(anyhow!(
|
||||
"unknown smgr message tag: {},'{:?}'",
|
||||
smgr_tag,
|
||||
body
|
||||
)),
|
||||
let msg_tag = body.get_u8();
|
||||
match msg_tag {
|
||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
rel: RelTag {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
},
|
||||
})),
|
||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
rel: RelTag {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
},
|
||||
})),
|
||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
latest: body.get_u8() != 0,
|
||||
lsn: Lsn::from(body.get_u64()),
|
||||
rel: RelTag {
|
||||
spcnode: body.get_u32(),
|
||||
dbnode: body.get_u32(),
|
||||
relnode: body.get_u32(),
|
||||
forknum: body.get_u8(),
|
||||
},
|
||||
blkno: body.get_u32(),
|
||||
})),
|
||||
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -111,24 +148,26 @@ impl PagestreamBeMessage {
|
||||
let mut bytes = BytesMut::new();
|
||||
|
||||
match self {
|
||||
Self::Status(resp) => {
|
||||
Self::Exists(resp) => {
|
||||
bytes.put_u8(100); /* tag from pagestore_client.h */
|
||||
bytes.put_u8(resp.ok as u8);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
bytes.put_u8(resp.exists as u8);
|
||||
}
|
||||
|
||||
Self::Nblocks(resp) => {
|
||||
bytes.put_u8(101); /* tag from pagestore_client.h */
|
||||
bytes.put_u8(resp.ok as u8);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
}
|
||||
|
||||
Self::Read(resp) => {
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(102); /* tag from pagestore_client.h */
|
||||
bytes.put_u8(resp.ok as u8);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
bytes.put(&resp.page[..]);
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
bytes.put_u8(103); /* tag from pagestore_client.h */
|
||||
bytes.put(resp.message.as_bytes());
|
||||
bytes.put_u8(0); // null terminator
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -254,13 +293,20 @@ impl PageServerHandler {
|
||||
PagestreamFeMessage::Nblocks(req) => SMGR_QUERY_TIME
|
||||
.with_label_values(&["get_rel_size"])
|
||||
.observe_closure_duration(|| self.handle_get_nblocks_request(&*timeline, &req)),
|
||||
PagestreamFeMessage::Read(req) => SMGR_QUERY_TIME
|
||||
PagestreamFeMessage::GetPage(req) => SMGR_QUERY_TIME
|
||||
.with_label_values(&["get_page_at_lsn"])
|
||||
.observe_closure_duration(|| {
|
||||
self.handle_get_page_at_lsn_request(&*timeline, &req)
|
||||
}),
|
||||
};
|
||||
|
||||
let response = response.unwrap_or_else(|e| {
|
||||
error!("error reading relation or page version: {}", e);
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
|
||||
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
|
||||
}
|
||||
|
||||
@@ -283,102 +329,50 @@ impl PageServerHandler {
|
||||
fn handle_get_rel_exists_request(
|
||||
&self,
|
||||
timeline: &dyn Timeline,
|
||||
req: &PagestreamRequest,
|
||||
) -> PagestreamBeMessage {
|
||||
let rel = RelTag {
|
||||
spcnode: req.spcnode,
|
||||
dbnode: req.dbnode,
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
};
|
||||
let tag = RelishTag::Relation(rel);
|
||||
req: &PagestreamExistsRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
let tag = RelishTag::Relation(req.rel);
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn)?;
|
||||
|
||||
let result = match Self::wait_or_get_last_lsn(timeline, req.lsn) {
|
||||
Ok(lsn) => timeline.get_rel_exists(tag, lsn),
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
let exists = timeline.get_rel_exists(tag, lsn)?;
|
||||
|
||||
PagestreamBeMessage::Status(match result {
|
||||
Ok(exist) => PagestreamStatusResponse {
|
||||
ok: exist,
|
||||
n_blocks: 0,
|
||||
},
|
||||
// On error, return false
|
||||
Err(_) => PagestreamStatusResponse {
|
||||
ok: false,
|
||||
n_blocks: 0,
|
||||
},
|
||||
})
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
exists,
|
||||
}))
|
||||
}
|
||||
|
||||
fn handle_get_nblocks_request(
|
||||
&self,
|
||||
timeline: &dyn Timeline,
|
||||
req: &PagestreamRequest,
|
||||
) -> PagestreamBeMessage {
|
||||
let rel = RelTag {
|
||||
spcnode: req.spcnode,
|
||||
dbnode: req.dbnode,
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
};
|
||||
let tag = RelishTag::Relation(rel);
|
||||
req: &PagestreamNblocksRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
let tag = RelishTag::Relation(req.rel);
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn)?;
|
||||
|
||||
let result = match Self::wait_or_get_last_lsn(timeline, req.lsn) {
|
||||
Ok(lsn) => timeline.get_relish_size(tag, lsn),
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
let n_blocks = timeline.get_relish_size(tag, lsn)?;
|
||||
|
||||
PagestreamBeMessage::Nblocks(match result {
|
||||
Ok(Some(n_blocks)) => PagestreamStatusResponse { ok: true, n_blocks },
|
||||
// Return 0 if relation is not found.
|
||||
// This is what postgres smgr expects.
|
||||
Ok(None) => PagestreamStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
},
|
||||
// On error, also return 0
|
||||
Err(_) => PagestreamStatusResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
},
|
||||
})
|
||||
// Return 0 if relation is not found.
|
||||
// This is what postgres smgr expects.
|
||||
let n_blocks = n_blocks.unwrap_or(0);
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
n_blocks,
|
||||
}))
|
||||
}
|
||||
|
||||
fn handle_get_page_at_lsn_request(
|
||||
&self,
|
||||
timeline: &dyn Timeline,
|
||||
req: &PagestreamRequest,
|
||||
) -> PagestreamBeMessage {
|
||||
let rel = RelTag {
|
||||
spcnode: req.spcnode,
|
||||
dbnode: req.dbnode,
|
||||
relnode: req.relnode,
|
||||
forknum: req.forknum,
|
||||
};
|
||||
let tag = RelishTag::Relation(rel);
|
||||
req: &PagestreamGetPageRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
let tag = RelishTag::Relation(req.rel);
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn)?;
|
||||
|
||||
let result = match Self::wait_or_get_last_lsn(timeline, req.lsn) {
|
||||
Ok(lsn) => timeline.get_page_at_lsn(tag, req.blkno, lsn),
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?;
|
||||
|
||||
PagestreamBeMessage::Read(match result {
|
||||
Ok(page) => PagestreamReadResponse {
|
||||
ok: true,
|
||||
n_blocks: 0,
|
||||
page,
|
||||
},
|
||||
Err(e) => {
|
||||
const ZERO_PAGE: [u8; 8192] = [0; 8192];
|
||||
error!("get_page_at_lsn: {}", e);
|
||||
PagestreamReadResponse {
|
||||
ok: false,
|
||||
n_blocks: 0,
|
||||
page: Bytes::from_static(&ZERO_PAGE),
|
||||
}
|
||||
}
|
||||
})
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
}))
|
||||
}
|
||||
|
||||
fn handle_basebackup_request(
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: a2e929e090...4787dcadfe
Reference in New Issue
Block a user