diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 701d3ed305..206a8c12ef 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + io::Read, num::{NonZeroU64, NonZeroUsize}, time::SystemTime, }; @@ -770,31 +771,44 @@ impl PagestreamBeMessage { pub fn deserialize(buf: Bytes) -> anyhow::Result { let mut buf = buf.reader(); let msg_tag = buf.read_u8()?; - match msg_tag { - 100 => todo!(), - 101 => todo!(), + let ok = match msg_tag { + 100 => { + let exists = buf.read_u8()?; + Self::Exists(PagestreamExistsResponse { + exists: exists != 0, + }) + } + 101 => { + let n_blocks = buf.read_u32::()?; + Self::Nblocks(PagestreamNblocksResponse { n_blocks }) + } 102 => { - let buf = buf.get_ref(); - /* TODO use constant */ - if buf.len() == 8192 { - Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { - page: buf.clone(), - })) - } else { - anyhow::bail!("invalid page size: {}", buf.len()); - } + let mut page = vec![0; 8192]; // TODO: use MaybeUninit + buf.read_exact(&mut page)?; + PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() }) } 103 => { let buf = buf.get_ref(); let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?; let rust_str = cstr.to_str()?; - Ok(PagestreamBeMessage::Error(PagestreamErrorResponse { + PagestreamBeMessage::Error(PagestreamErrorResponse { message: rust_str.to_owned(), - })) + }) + } + 104 => { + let db_size = buf.read_i64::()?; + Self::DbSize(PagestreamDbSizeResponse { db_size }) } - 104 => todo!(), _ => bail!("unknown tag: {:?}", msg_tag), + }; + let remaining = buf.into_inner(); + if !remaining.is_empty() { + anyhow::bail!( + "remaining bytes in msg with tag={msg_tag}: {}", + remaining.len() + ); } + Ok(ok) } }