diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 30a3b267ff..9789d7dfa7 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -35,6 +35,7 @@ use zenith_utils::zid::{ZTenantId, ZTimelineId}; use crate::basebackup; use crate::branches; use crate::relish::*; +use crate::repository::Timeline; use crate::tenant_mgr; use crate::walreceiver; use crate::PageServerConf; @@ -248,80 +249,19 @@ impl PageServerHandler { let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; let response = match zenith_fe_msg { - PagestreamFeMessage::Exists(req) => { - let rel = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - let tag = RelishTag::Relation(rel); - - let exist = SMGR_QUERY_TIME - .with_label_values(&["get_rel_exists"]) - .observe_closure_duration(|| { - timeline.get_rel_exists(tag, req.lsn).unwrap_or(false) - }); - - PagestreamBeMessage::Status(PagestreamStatusResponse { - ok: exist, - n_blocks: 0, - }) - } - PagestreamFeMessage::Nblocks(req) => { - let rel = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - let tag = RelishTag::Relation(rel); - - let n_blocks = SMGR_QUERY_TIME - .with_label_values(&["get_rel_size"]) - .observe_closure_duration(|| { - // Return 0 if relation is not found. - // This is what postgres smgr expects. - timeline - .get_relish_size(tag, req.lsn) - .unwrap_or(Some(0)) - .unwrap_or(0) - }); - - PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) - } - PagestreamFeMessage::Read(req) => { - let rel = RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }; - let tag = RelishTag::Relation(rel); - - let read_response = SMGR_QUERY_TIME - .with_label_values(&["get_page_at_lsn"]) - .observe_closure_duration(|| { - match timeline.get_page_at_lsn(tag, req.blkno, req.lsn) { - Ok(p) => PagestreamReadResponse { - ok: true, - n_blocks: 0, - page: p, - }, - Err(e) => { - const ZERO_PAGE: [u8; 8192] = [0; 8192]; - error!("get_page_at_lsn: {}", e); - PagestreamReadResponse { - ok: false, - n_blocks: 0, - page: Bytes::from_static(&ZERO_PAGE), - } - } - } - }); - - PagestreamBeMessage::Read(read_response) - } + PagestreamFeMessage::Exists(req) => SMGR_QUERY_TIME + .with_label_values(&["get_rel_exists"]) + .observe_closure_duration(|| { + self.handle_get_rel_exists_request(&*timeline, &req) + }), + PagestreamFeMessage::Nblocks(req) => SMGR_QUERY_TIME + .with_label_values(&["get_rel_size"]) + .observe_closure_duration(|| self.handle_get_nblocks_request(&*timeline, &req)), + PagestreamFeMessage::Read(req) => SMGR_QUERY_TIME + .with_label_values(&["get_page_at_lsn"]) + .observe_closure_duration(|| { + self.handle_get_page_at_lsn_request(&*timeline, &req) + }), }; pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; @@ -330,6 +270,98 @@ impl PageServerHandler { Ok(()) } + fn handle_get_rel_exists_request( + &self, + timeline: &dyn Timeline, + req: &PagestreamRequest, + ) -> PagestreamBeMessage { + let rel = RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + let tag = RelishTag::Relation(rel); + + let result = timeline.get_rel_exists(tag, req.lsn); + + PagestreamBeMessage::Status(match result { + Ok(exist) => PagestreamStatusResponse { + ok: exist, + n_blocks: 0, + }, + // On error, return false + Err(_) => PagestreamStatusResponse { + ok: false, + n_blocks: 0, + }, + }) + } + + fn handle_get_nblocks_request( + &self, + timeline: &dyn Timeline, + req: &PagestreamRequest, + ) -> PagestreamBeMessage { + let rel = RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + let tag = RelishTag::Relation(rel); + + let result = timeline.get_relish_size(tag, req.lsn); + + PagestreamBeMessage::Nblocks(match result { + Ok(Some(n_blocks)) => PagestreamStatusResponse { ok: true, n_blocks }, + // Return 0 if relation is not found. + // This is what postgres smgr expects. + Ok(None) => PagestreamStatusResponse { + ok: true, + n_blocks: 0, + }, + // On error, also return 0 + Err(_) => PagestreamStatusResponse { + ok: true, + n_blocks: 0, + }, + }) + } + + fn handle_get_page_at_lsn_request( + &self, + timeline: &dyn Timeline, + req: &PagestreamRequest, + ) -> PagestreamBeMessage { + let rel = RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + let tag = RelishTag::Relation(rel); + + let result = timeline.get_page_at_lsn(tag, req.blkno, req.lsn); + + PagestreamBeMessage::Read(match result { + Ok(page) => PagestreamReadResponse { + ok: true, + n_blocks: 0, + page, + }, + Err(e) => { + const ZERO_PAGE: [u8; 8192] = [0; 8192]; + error!("get_page_at_lsn: {}", e); + PagestreamReadResponse { + ok: false, + n_blocks: 0, + page: Bytes::from_static(&ZERO_PAGE), + } + } + }) + } + fn handle_basebackup_request( &self, pgb: &mut PostgresBackend,