From f954d5c501ea75c3bd4d4590721910c48288ceae Mon Sep 17 00:00:00 2001 From: Patrick Insinger Date: Mon, 17 May 2021 15:11:03 -0400 Subject: [PATCH] pageserver - separate pagestream messages --- pageserver/src/page_service.rs | 195 ++++++++++++++++++--------------- 1 file changed, 105 insertions(+), 90 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index aac202b97c..f259137571 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -43,13 +43,7 @@ enum FeMessage { Close(FeCloseMessage), Sync, Terminate, - - // - // All that messages are actually CopyData from libpq point of view. - // - ZenithExistsRequest(ZenithRequest), - ZenithNblocksRequest(ZenithRequest), - ZenithReadRequest(ZenithRequest), + CopyData(Bytes), } #[derive(Debug)] @@ -66,19 +60,27 @@ enum BeMessage { DataRow(Bytes), CommandComplete, ControlFile, + CopyData(Bytes), +} - // - // All that messages are actually CopyData from libpq point of view. - // - ZenithStatusResponse(ZenithStatusResponse), - ZenithNblocksResponse(ZenithStatusResponse), - ZenithReadResponse(ZenithReadResponse), +// Wrapped in libpq CopyData +enum PagestreamFeMessage { + Exists(PagestreamRequest), + Nblocks(PagestreamRequest), + Read(PagestreamRequest), +} + +// Wrapped in libpq CopyData +enum PagestreamBeMessage { + Status(PagestreamStatusResponse), + Nblocks(PagestreamStatusResponse), + Read(PagestreamReadResponse), } const HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(Bytes::from_static(b"hello world")); #[derive(Debug)] -struct ZenithRequest { +struct PagestreamRequest { spcnode: u32, dbnode: u32, relnode: u32, @@ -88,13 +90,13 @@ struct ZenithRequest { } #[derive(Debug)] -struct ZenithStatusResponse { +struct PagestreamStatusResponse { ok: bool, n_blocks: u32, } #[derive(Debug)] -struct ZenithReadResponse { +struct PagestreamReadResponse { ok: bool, n_blocks: u32, page: Bytes, @@ -339,7 +341,7 @@ impl FeMessage { let mut body_buf: Vec = vec![0; bodylen as usize]; stream.read_exact(&mut body_buf)?; - let mut body = Bytes::from(body_buf); + let body = Bytes::from(body_buf); // Parse it match tag { @@ -351,35 +353,70 @@ impl FeMessage { b'C' => Ok(Some(FeCloseMessage::parse(body)?)), b'S' => Ok(Some(FeMessage::Sync)), b'X' => Ok(Some(FeMessage::Terminate)), - b'd' => { - let smgr_tag = body.get_u8(); - let zreq = ZenithRequest { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), - blkno: body.get_u32(), - lsn: Lsn::from(body.get_u64()), - }; - - // TODO: consider using protobuf or serde bincode for less error prone - // serialization. - match smgr_tag { - 0 => Ok(Some(FeMessage::ZenithExistsRequest(zreq))), - 1 => Ok(Some(FeMessage::ZenithNblocksRequest(zreq))), - 2 => Ok(Some(FeMessage::ZenithReadRequest(zreq))), - _ => Err(anyhow!( - "unknown smgr message tag: {},'{:?}'", - smgr_tag, - body - )), - } - } + b'd' => Ok(Some(FeMessage::CopyData(body))), tag => Err(anyhow!("unknown message tag: {},'{:?}'", tag, body)), } } } +impl PagestreamFeMessage { + fn parse(mut body: Bytes) -> anyhow::Result { + // TODO these gets can fail + + let smgr_tag = body.get_u8(); + let zreq = PagestreamRequest { + spcnode: body.get_u32(), + dbnode: body.get_u32(), + relnode: body.get_u32(), + forknum: body.get_u8(), + blkno: body.get_u32(), + lsn: Lsn::from(body.get_u64()), + }; + + // TODO: consider using protobuf or serde bincode for less error prone + // serialization. + match smgr_tag { + 0 => Ok(PagestreamFeMessage::Exists(zreq)), + 1 => Ok(PagestreamFeMessage::Nblocks(zreq)), + 2 => Ok(PagestreamFeMessage::Read(zreq)), + _ => Err(anyhow!( + "unknown smgr message tag: {},'{:?}'", + smgr_tag, + body + )), + } + } +} + +impl PagestreamBeMessage { + fn serialize(&self) -> Bytes { + let mut bytes = BytesMut::new(); + + match self { + Self::Status(resp) => { + bytes.put_u8(100); /* tag from pagestore_client.h */ + bytes.put_u8(resp.ok as u8); + bytes.put_u32(resp.n_blocks); + } + + Self::Nblocks(resp) => { + bytes.put_u8(101); /* tag from pagestore_client.h */ + bytes.put_u8(resp.ok as u8); + bytes.put_u32(resp.n_blocks); + } + + Self::Read(resp) => { + bytes.put_u8(102); /* tag from pagestore_client.h */ + bytes.put_u8(resp.ok as u8); + bytes.put_u32(resp.n_blocks); + bytes.put(&resp.page[..]); + } + } + + bytes.into() + } +} + /////////////////////////////////////////////////////////////////////////////// /// @@ -534,30 +571,10 @@ impl Connection { self.stream.write_all(&b)?; } - BeMessage::ZenithStatusResponse(resp) => { + BeMessage::CopyData(data) => { self.stream.write_u8(b'd')?; - self.stream.write_u32::(4 + 1 + 1 + 4)?; - self.stream.write_u8(100)?; /* tag from pagestore_client.h */ - self.stream.write_u8(resp.ok as u8)?; - self.stream.write_u32::(resp.n_blocks)?; - } - - BeMessage::ZenithNblocksResponse(resp) => { - self.stream.write_u8(b'd')?; - self.stream.write_u32::(4 + 1 + 1 + 4)?; - self.stream.write_u8(101)?; /* tag from pagestore_client.h */ - self.stream.write_u8(resp.ok as u8)?; - self.stream.write_u32::(resp.n_blocks)?; - } - - BeMessage::ZenithReadResponse(resp) => { - self.stream.write_u8(b'd')?; - self.stream - .write_u32::(4 + 1 + 1 + 4 + resp.page.len() as u32)?; - self.stream.write_u8(102)?; /* tag from pagestore_client.h */ - self.stream.write_u8(resp.ok as u8)?; - self.stream.write_u32::(resp.n_blocks)?; - self.stream.write_all(&resp.page.clone())?; + self.stream.write_u32::(4 + data.len() as u32)?; + self.stream.write_all(&data)?; } } @@ -774,20 +791,18 @@ impl Connection { self.stream.write_i16::(0)?; /* numAttributes */ self.stream.flush()?; - loop { - let message = self.read_message()?; + while let Some(message) = self.read_message()? { + trace!("query({:?}): {:?}", timelineid, message); - if let Some(m) = &message { - trace!("query({:?}): {:?}", timelineid, m); + let copy_data_bytes = match message { + FeMessage::CopyData(bytes) => bytes, + _ => continue, }; - if message.is_none() { - // connection was closed - return Ok(()); - } + let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; - match message { - Some(FeMessage::ZenithExistsRequest(req)) => { + let response = match zenith_fe_msg { + PagestreamFeMessage::Exists(req) => { let tag = RelTag { spcnode: req.spcnode, dbnode: req.dbnode, @@ -797,12 +812,12 @@ impl Connection { let exist = timeline.get_relsize_exists(tag, req.lsn).unwrap_or(false); - self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { + PagestreamBeMessage::Status(PagestreamStatusResponse { ok: exist, n_blocks: 0, - }))? + }) } - Some(FeMessage::ZenithNblocksRequest(req)) => { + PagestreamFeMessage::Nblocks(req) => { let tag = RelTag { spcnode: req.spcnode, dbnode: req.dbnode, @@ -812,12 +827,9 @@ impl Connection { let n_blocks = timeline.get_relsize(tag, req.lsn).unwrap_or(0); - self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { - ok: true, - n_blocks, - }))? + PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) } - Some(FeMessage::ZenithReadRequest(req)) => { + PagestreamFeMessage::Read(req) => { let buf_tag = BufferTag { rel: RelTag { spcnode: req.spcnode, @@ -828,28 +840,31 @@ impl Connection { blknum: req.blkno, }; - let msg = match timeline.get_page_at_lsn(buf_tag, req.lsn) { - Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse { + let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) { + Ok(p) => PagestreamReadResponse { ok: true, n_blocks: 0, page: p, - }), + }, Err(e) => { const ZERO_PAGE: [u8; 8192] = [0; 8192]; error!("get_page_at_lsn: {}", e); - BeMessage::ZenithReadResponse(ZenithReadResponse { + PagestreamReadResponse { ok: false, n_blocks: 0, page: Bytes::from_static(&ZERO_PAGE), - }) + } } }; - self.write_message(&msg)? + PagestreamBeMessage::Read(read_response) } - _ => {} - } + }; + + self.write_message(&BeMessage::CopyData(response.serialize()))?; } + + Ok(()) } fn handle_basebackup_request(