From d7eeb73f6fa8ae96c8311737a7b5c04b4b9c1e82 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Wed, 2 Nov 2022 23:44:07 -0400 Subject: [PATCH] Impl serialize for pagestream FeMessage (#2741) --- Cargo.lock | 1 + libs/pageserver_api/Cargo.toml | 1 + libs/pageserver_api/src/models.rs | 99 +++++++++++++++++++++++-------- pageserver/src/page_service.rs | 3 +- 4 files changed, 77 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01b8abda9a..98daddbd96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2174,6 +2174,7 @@ name = "pageserver_api" version = "0.1.0" dependencies = [ "anyhow", + "byteorder", "bytes", "const_format", "postgres_ffi", diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 9121cd4989..2102ae5373 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -9,6 +9,7 @@ serde_with = "2.0" const_format = "0.2.21" anyhow = { version = "1.0", features = ["backtrace"] } bytes = "1.0.1" +byteorder = "1.4.3" utils = { path = "../utils" } postgres_ffi = { path = "../postgres_ffi" } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 4360f76fd1..3453f9672a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1,5 +1,6 @@ use std::num::NonZeroU64; +use byteorder::{BigEndian, ReadBytesExt}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use utils::{ @@ -9,7 +10,7 @@ use utils::{ use crate::reltag::RelTag; use anyhow::bail; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; /// A state of a tenant in pageserver's memory. #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -296,52 +297,98 @@ pub struct PagestreamDbSizeResponse { } impl PagestreamFeMessage { - pub fn parse(mut body: Bytes) -> anyhow::Result { + pub fn serialize(&self) -> Bytes { + let mut bytes = BytesMut::new(); + + match self { + Self::Exists(req) => { + bytes.put_u8(0); + bytes.put_u8(if req.latest { 1 } else { 0 }); + bytes.put_u64(req.lsn.0); + bytes.put_u32(req.rel.spcnode); + bytes.put_u32(req.rel.dbnode); + bytes.put_u32(req.rel.relnode); + bytes.put_u8(req.rel.forknum); + } + + Self::Nblocks(req) => { + bytes.put_u8(1); + bytes.put_u8(if req.latest { 1 } else { 0 }); + bytes.put_u64(req.lsn.0); + bytes.put_u32(req.rel.spcnode); + bytes.put_u32(req.rel.dbnode); + bytes.put_u32(req.rel.relnode); + bytes.put_u8(req.rel.forknum); + } + + Self::GetPage(req) => { + bytes.put_u8(2); + bytes.put_u8(if req.latest { 1 } else { 0 }); + bytes.put_u64(req.lsn.0); + bytes.put_u32(req.rel.spcnode); + bytes.put_u32(req.rel.dbnode); + bytes.put_u32(req.rel.relnode); + bytes.put_u8(req.rel.forknum); + bytes.put_u32(req.blkno); + } + + Self::DbSize(req) => { + bytes.put_u8(3); + bytes.put_u8(if req.latest { 1 } else { 0 }); + bytes.put_u64(req.lsn.0); + bytes.put_u32(req.dbnode); + } + } + + bytes.into() + } + + pub fn parse(body: &mut R) -> anyhow::Result { // TODO these gets can fail // these correspond to the NeonMessageTag enum in pagestore_client.h // // TODO: consider using protobuf or serde bincode for less error prone // serialization. - let msg_tag = body.get_u8(); + let msg_tag = body.read_u8()?; match msg_tag { 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), rel: RelTag { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, }, })), 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), rel: RelTag { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, }, })), 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), rel: RelTag { - spcnode: body.get_u32(), - dbnode: body.get_u32(), - relnode: body.get_u32(), - forknum: body.get_u8(), + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, }, - blkno: body.get_u32(), + blkno: body.read_u32::()?, })), 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - latest: body.get_u8() != 0, - lsn: Lsn::from(body.get_u64()), - dbnode: body.get_u32(), + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), + dbnode: body.read_u32::()?, })), - _ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body), + _ => bail!("unknown smgr message tag: {:?}", msg_tag), } } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index aec91bc7f1..f83ab1929a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,6 +10,7 @@ // use anyhow::{bail, ensure, Context, Result}; +use bytes::Buf; use bytes::Bytes; use futures::{Stream, StreamExt}; use pageserver_api::models::{ @@ -299,7 +300,7 @@ impl PageServerHandler { trace!("query: {copy_data_bytes:?}"); - let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; let response = match neon_fe_msg { PagestreamFeMessage::Exists(req) => {