diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f38a4b9eed..1947e4b059 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -42,66 +42,103 @@ use crate::PageServerConf; // Wrapped in libpq CopyData enum PagestreamFeMessage { - Exists(PagestreamRequest), - Nblocks(PagestreamRequest), - Read(PagestreamRequest), + Exists(PagestreamExistsRequest), + Nblocks(PagestreamNblocksRequest), + GetPage(PagestreamGetPageRequest), } // Wrapped in libpq CopyData enum PagestreamBeMessage { - Status(PagestreamStatusResponse), - Nblocks(PagestreamStatusResponse), - Read(PagestreamReadResponse), + Exists(PagestreamExistsResponse), + Nblocks(PagestreamNblocksResponse), + GetPage(PagestreamGetPageResponse), + Error(PagestreamErrorResponse), } #[derive(Debug)] -struct PagestreamRequest { - spcnode: u32, - dbnode: u32, - relnode: u32, - forknum: u8, - blkno: u32, +struct PagestreamExistsRequest { + latest: bool, lsn: Lsn, + rel: RelTag, } #[derive(Debug)] -struct PagestreamStatusResponse { - ok: bool, +struct PagestreamNblocksRequest { + latest: bool, + lsn: Lsn, + rel: RelTag, +} + +#[derive(Debug)] +struct PagestreamGetPageRequest { + latest: bool, + lsn: Lsn, + rel: RelTag, + blkno: u32, +} + +#[derive(Debug)] +struct PagestreamExistsResponse { + exists: bool, +} + +#[derive(Debug)] +struct PagestreamNblocksResponse { n_blocks: u32, } #[derive(Debug)] -struct PagestreamReadResponse { - ok: bool, - n_blocks: u32, +struct PagestreamGetPageResponse { page: Bytes, } +#[derive(Debug)] +struct PagestreamErrorResponse { + message: String, +} + impl PagestreamFeMessage { fn parse(mut body: Bytes) -> anyhow::Result { // TODO these gets can fail - let smgr_tag = body.get_u8(); - let zreq = PagestreamRequest { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), - blkno: body.get_u32(), - lsn: Lsn::from(body.get_u64()), - }; - + // these correspond to the ZenithMessageTag enum in pagestore_client.h + // // TODO: consider using protobuf or serde bincode for less error prone // serialization. - match smgr_tag { - 0 => Ok(PagestreamFeMessage::Exists(zreq)), - 1 => Ok(PagestreamFeMessage::Nblocks(zreq)), - 2 => Ok(PagestreamFeMessage::Read(zreq)), - _ => Err(anyhow!( - "unknown smgr message tag: {},'{:?}'", - smgr_tag, - body - )), + let msg_tag = body.get_u8(); + match msg_tag { + 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + rel: RelTag { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + }, + })), + 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + rel: RelTag { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + }, + })), + 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + latest: body.get_u8() != 0, + lsn: Lsn::from(body.get_u64()), + rel: RelTag { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + }, + blkno: body.get_u32(), + })), + _ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body), } } } @@ -111,24 +148,26 @@ impl PagestreamBeMessage { let mut bytes = BytesMut::new(); match self { - Self::Status(resp) => { + Self::Exists(resp) => { bytes.put_u8(100); /* tag from pagestore_client.h */ - bytes.put_u8(resp.ok as u8); - bytes.put_u32(resp.n_blocks); + bytes.put_u8(resp.exists as u8); } Self::Nblocks(resp) => { bytes.put_u8(101); /* tag from pagestore_client.h */ - bytes.put_u8(resp.ok as u8); bytes.put_u32(resp.n_blocks); } - Self::Read(resp) => { + Self::GetPage(resp) => { bytes.put_u8(102); /* tag from pagestore_client.h */ - bytes.put_u8(resp.ok as u8); - bytes.put_u32(resp.n_blocks); bytes.put(&resp.page[..]); } + + Self::Error(resp) => { + bytes.put_u8(103); /* tag from pagestore_client.h */ + bytes.put(resp.message.as_bytes()); + bytes.put_u8(0); // null terminator + } } bytes.into() @@ -254,13 +293,20 @@ impl PageServerHandler { PagestreamFeMessage::Nblocks(req) => SMGR_QUERY_TIME .with_label_values(&["get_rel_size"]) .observe_closure_duration(|| self.handle_get_nblocks_request(&*timeline, &req)), - PagestreamFeMessage::Read(req) => SMGR_QUERY_TIME + PagestreamFeMessage::GetPage(req) => SMGR_QUERY_TIME .with_label_values(&["get_page_at_lsn"]) .observe_closure_duration(|| { self.handle_get_page_at_lsn_request(&*timeline, &req) }), }; + let response = response.unwrap_or_else(|e| { + error!("error reading relation or page version: {}", e); + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + }); + pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; } @@ -283,102 +329,50 @@ impl PageServerHandler { fn handle_get_rel_exists_request( &self, timeline: &dyn Timeline, - req: &PagestreamRequest, - ) -> PagestreamBeMessage { - let rel = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - let tag = RelishTag::Relation(rel); + req: &PagestreamExistsRequest, + ) -> Result { + let tag = RelishTag::Relation(req.rel); + let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn)?; - let result = match Self::wait_or_get_last_lsn(timeline, req.lsn) { - Ok(lsn) => timeline.get_rel_exists(tag, lsn), - Err(e) => Err(e), - }; + let exists = timeline.get_rel_exists(tag, lsn)?; - PagestreamBeMessage::Status(match result { - Ok(exist) => PagestreamStatusResponse { - ok: exist, - n_blocks: 0, - }, - // On error, return false - Err(_) => PagestreamStatusResponse { - ok: false, - n_blocks: 0, - }, - }) + Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { + exists, + })) } fn handle_get_nblocks_request( &self, timeline: &dyn Timeline, - req: &PagestreamRequest, - ) -> PagestreamBeMessage { - let rel = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - let tag = RelishTag::Relation(rel); + req: &PagestreamNblocksRequest, + ) -> Result { + let tag = RelishTag::Relation(req.rel); + let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn)?; - let result = match Self::wait_or_get_last_lsn(timeline, req.lsn) { - Ok(lsn) => timeline.get_relish_size(tag, lsn), - Err(e) => Err(e), - }; + let n_blocks = timeline.get_relish_size(tag, lsn)?; - PagestreamBeMessage::Nblocks(match result { - Ok(Some(n_blocks)) => PagestreamStatusResponse { ok: true, n_blocks }, - // Return 0 if relation is not found. - // This is what postgres smgr expects. - Ok(None) => PagestreamStatusResponse { - ok: true, - n_blocks: 0, - }, - // On error, also return 0 - Err(_) => PagestreamStatusResponse { - ok: true, - n_blocks: 0, - }, - }) + // Return 0 if relation is not found. + // This is what postgres smgr expects. + let n_blocks = n_blocks.unwrap_or(0); + + Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { + n_blocks, + })) } fn handle_get_page_at_lsn_request( &self, timeline: &dyn Timeline, - req: &PagestreamRequest, - ) -> PagestreamBeMessage { - let rel = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - let tag = RelishTag::Relation(rel); + req: &PagestreamGetPageRequest, + ) -> Result { + let tag = RelishTag::Relation(req.rel); + let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn)?; - let result = match Self::wait_or_get_last_lsn(timeline, req.lsn) { - Ok(lsn) => timeline.get_page_at_lsn(tag, req.blkno, lsn), - Err(e) => Err(e), - }; + let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?; - PagestreamBeMessage::Read(match result { - Ok(page) => PagestreamReadResponse { - ok: true, - n_blocks: 0, - page, - }, - Err(e) => { - const ZERO_PAGE: [u8; 8192] = [0; 8192]; - error!("get_page_at_lsn: {}", e); - PagestreamReadResponse { - ok: false, - n_blocks: 0, - page: Bytes::from_static(&ZERO_PAGE), - } - } - }) + Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { + page, + })) } fn handle_basebackup_request( diff --git a/vendor/postgres b/vendor/postgres index a2e929e090..4787dcadfe 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit a2e929e090f3a6638917b9022edbe04006169d90 +Subproject commit 4787dcadfe64a7c705be86c8370682a6fc04062c