From ad5f16f72489897a5a64ac681428b41465b46024 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 17 Sep 2021 16:38:14 +0300 Subject: [PATCH] Improve the protocol between Postgres and page server. - Use different message formats for different kinds of response messages. - Add an Error message, for passing errors from page server to Postgres. Previously, we would respond to 'exists' request with 'false', and to 'nblocks' request with 0, if an error happened. Fix those to return an error message to the client. GetPage requests had a mechanism to return an error, but it was just a flag with no error message. - Add a flag to requests, to indicate that we actually want the latest page version on the timeline, and the LSN is just a hint that we know that there haven't been any modifications since that LSN. The flag isn't used for anything yet, but I'm planning to use it to fix https://github.com/zenithdb/zenith/issues/567 --- pageserver/src/page_service.rs | 242 ++++++++++++++++----------------- vendor/postgres | 2 +- 2 files changed, 119 insertions(+), 125 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f38a4b9eed..1947e4b059 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -42,66 +42,103 @@ use crate::PageServerConf; // Wrapped in libpq CopyData enum PagestreamFeMessage { - Exists(PagestreamRequest), - Nblocks(PagestreamRequest), - Read(PagestreamRequest), + Exists(PagestreamExistsRequest), + Nblocks(PagestreamNblocksRequest), + GetPage(PagestreamGetPageRequest), } // Wrapped in libpq CopyData enum PagestreamBeMessage { - Status(PagestreamStatusResponse), - Nblocks(PagestreamStatusResponse), - Read(PagestreamReadResponse), + Exists(PagestreamExistsResponse), + Nblocks(PagestreamNblocksResponse), + GetPage(PagestreamGetPageResponse), + Error(PagestreamErrorResponse), } #[derive(Debug)] -struct PagestreamRequest { - spcnode: u32, - dbnode: u32, - relnode: u32, - forknum: u8, - blkno: u32, +struct PagestreamExistsRequest { + latest: bool, lsn: Lsn, + rel: RelTag, } #[derive(Debug)] -struct PagestreamStatusResponse { - ok: bool, +struct PagestreamNblocksRequest { + latest: bool, + lsn: Lsn, + rel: RelTag, +} + +#[derive(Debug)] +struct PagestreamGetPageRequest { + latest: bool, + lsn: Lsn, + rel: RelTag, + blkno: u32, +} + +#[derive(Debug)] +struct PagestreamExistsResponse { + exists: bool, +} + +#[derive(Debug)] +struct PagestreamNblocksResponse { n_blocks: u32, } #[derive(Debug)] -struct PagestreamReadResponse { - ok: bool, - n_blocks: u32, +struct PagestreamGetPageResponse { page: Bytes, } +#[derive(Debug)] +struct PagestreamErrorResponse { + message: String, +} + impl PagestreamFeMessage { fn parse(mut body: Bytes) -> anyhow::Result { // TODO these gets can fail - let smgr_tag = body.get_u8(); - let zreq = PagestreamRequest { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), - blkno: body.get_u32(), - lsn: Lsn::from(body.get_u64()), - }; - + // these correspond to the ZenithMessageTag enum in pagestore_client.h + // // TODO: consider using protobuf or serde bincode for less error prone // serialization. - match smgr_tag { - 0 => Ok(PagestreamFeMessage::Exists(zreq)), - 1 => Ok(PagestreamFeMessage::Nblocks(zreq)), - 2 => Ok(PagestreamFeMessage::Read(zreq)), - _ => Err(anyhow!( - "unknown smgr message tag: {},'{:?}'", - smgr_tag, - body - )), + let msg_tag = body.get_u8(); + match msg_tag { + 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + rel: RelTag { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + }, + })), + 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + rel: RelTag { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + }, + })), + 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + rel: RelTag { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + }, + blkno: body.get_u32(), + })), + _ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body), } } } @@ -111,24 +148,26 @@ impl PagestreamBeMessage { let mut bytes = BytesMut::new(); match self { - Self::Status(resp) => { + Self::Exists(resp) => { bytes.put_u8(100); /* tag from pagestore_client.h */ - bytes.put_u8(resp.ok as u8); - bytes.put_u32(resp.n_blocks); + bytes.put_u8(resp.exists as u8); } Self::Nblocks(resp) => { bytes.put_u8(101); /* tag from pagestore_client.h */ - bytes.put_u8(resp.ok as u8); bytes.put_u32(resp.n_blocks); } - Self::Read(resp) => { + Self::GetPage(resp) => { bytes.put_u8(102); /* tag from pagestore_client.h */ - bytes.put_u8(resp.ok as u8); - bytes.put_u32(resp.n_blocks); bytes.put(&resp.page[..]); } + + Self::Error(resp) => { + bytes.put_u8(103); /* tag from pagestore_client.h */ + bytes.put(resp.message.as_bytes()); + bytes.put_u8(0); // null terminator + } } bytes.into() @@ -254,13 +293,20 @@ impl PageServerHandler { PagestreamFeMessage::Nblocks(req) => SMGR_QUERY_TIME .with_label_values(&["get_rel_size"]) .observe_closure_duration(|| self.handle_get_nblocks_request(&*timeline, &req)), - PagestreamFeMessage::Read(req) => SMGR_QUERY_TIME + PagestreamFeMessage::GetPage(req) => SMGR_QUERY_TIME .with_label_values(&["get_page_at_lsn"]) .observe_closure_duration(|| { self.handle_get_page_at_lsn_request(&*timeline, &req) }), }; + let response = response.unwrap_or_else(|e| { + error!("error reading relation or page version: {}", e); + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + }); + pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; } @@ -283,102 +329,50 @@ impl PageServerHandler { fn handle_get_rel_exists_request( &self, timeline: &dyn Timeline, - req: &PagestreamRequest, - ) -> PagestreamBeMessage { - let rel = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - let tag = RelishTag::Relation(rel); + req: &PagestreamExistsRequest, + ) -> Result { + let tag = RelishTag::Relation(req.rel); + let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn)?; - let result = match Self::wait_or_get_last_lsn(timeline, req.lsn) { - Ok(lsn) => timeline.get_rel_exists(tag, lsn), - Err(e) => Err(e), - }; + let exists = timeline.get_rel_exists(tag, lsn)?; - PagestreamBeMessage::Status(match result { - Ok(exist) => PagestreamStatusResponse { - ok: exist, - n_blocks: 0, - }, - // On error, return false - Err(_) => PagestreamStatusResponse { - ok: false, - n_blocks: 0, - }, - }) + Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { + exists, + })) } fn handle_get_nblocks_request( &self, timeline: &dyn Timeline, - req: &PagestreamRequest, - ) -> PagestreamBeMessage { - let rel = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - let tag = RelishTag::Relation(rel); + req: &PagestreamNblocksRequest, + ) -> Result { + let tag = RelishTag::Relation(req.rel); + let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn)?; - let result = match Self::wait_or_get_last_lsn(timeline, req.lsn) { - Ok(lsn) => timeline.get_relish_size(tag, lsn), - Err(e) => Err(e), - }; + let n_blocks = timeline.get_relish_size(tag, lsn)?; - PagestreamBeMessage::Nblocks(match result { - Ok(Some(n_blocks)) => PagestreamStatusResponse { ok: true, n_blocks }, - // Return 0 if relation is not found. - // This is what postgres smgr expects. - Ok(None) => PagestreamStatusResponse { - ok: true, - n_blocks: 0, - }, - // On error, also return 0 - Err(_) => PagestreamStatusResponse { - ok: true, - n_blocks: 0, - }, - }) + // Return 0 if relation is not found. + // This is what postgres smgr expects. + let n_blocks = n_blocks.unwrap_or(0); + + Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { + n_blocks, + })) } fn handle_get_page_at_lsn_request( &self, timeline: &dyn Timeline, - req: &PagestreamRequest, - ) -> PagestreamBeMessage { - let rel = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - let tag = RelishTag::Relation(rel); + req: &PagestreamGetPageRequest, + ) -> Result { + let tag = RelishTag::Relation(req.rel); + let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn)?; - let result = match Self::wait_or_get_last_lsn(timeline, req.lsn) { - Ok(lsn) => timeline.get_page_at_lsn(tag, req.blkno, lsn), - Err(e) => Err(e), - }; + let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?; - PagestreamBeMessage::Read(match result { - Ok(page) => PagestreamReadResponse { - ok: true, - n_blocks: 0, - page, - }, - Err(e) => { - const ZERO_PAGE: [u8; 8192] = [0; 8192]; - error!("get_page_at_lsn: {}", e); - PagestreamReadResponse { - ok: false, - n_blocks: 0, - page: Bytes::from_static(&ZERO_PAGE), - } - } - }) + Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { + page, + })) } fn handle_basebackup_request( diff --git a/vendor/postgres b/vendor/postgres index a2e929e090..4787dcadfe 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit a2e929e090f3a6638917b9022edbe04006169d90 +Subproject commit 4787dcadfe64a7c705be86c8370682a6fc04062c