diff --git a/pageserver/src/bin/replay.rs b/pageserver/src/bin/replay.rs index 7ccbe0911b..7d0009d757 100644 --- a/pageserver/src/bin/replay.rs +++ b/pageserver/src/bin/replay.rs @@ -1,3 +1,4 @@ +use bytes::BytesMut; use pageserver::page_service::PagestreamFeMessage; use std::{ fs::{read_dir, File}, @@ -9,7 +10,7 @@ use tokio::{io::AsyncWriteExt, net::TcpStream}; use clap::{App, Arg}; use utils::{ - pq_proto::FeMessage, + pq_proto::{BeMessage, FeMessage}, zid::{ZTenantId, ZTimelineId}, }; @@ -46,10 +47,24 @@ impl PagestreamApi { } async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> { - let msg_bytes = msg.serialize(); - self.stream.write_all(&msg_bytes).await?; + let request = { + let msg_bytes = msg.serialize(); + let mut buf = BytesMut::new(); + let copy_msg = BeMessage::CopyData(&msg_bytes); + BeMessage::write(&mut buf, ©_msg)?; + buf.freeze() + }; + println!("sending msg"); + self.stream.write_all(&request).await?; + println!("sent"); - let _response = FeMessage::read_fut(&mut self.stream).await?; + // TODO It's actually a be message, but it doesn't have a parser. + // So error response (code b'E' parses incorrectly as FeExecuteMessage) + let response = match FeMessage::read_fut(&mut self.stream).await? { + Some(FeMessage::CopyData(page)) => page, + r => panic!("Expected CopyData message, got: {:?}", r), + }; + println!("received"); Ok(()) } @@ -61,7 +76,7 @@ async fn replay_trace( ) -> anyhow::Result<()> { while let Ok(msg) = PagestreamFeMessage::parse(reader) { println!("Parsed message {:?}", msg); - // pagestream.make_request(msg).await?; + pagestream.make_request(msg).await?; } Ok(()) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 35c1567a59..3e2cc35d65 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -125,6 +125,7 @@ impl PagestreamFeMessage { match self { Self::Exists(req) => { + bytes.put_u8(0); bytes.put_u8(if req.latest { 1 } else { 0 }); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); @@ -151,6 +152,7 @@ impl PagestreamFeMessage { bytes.put_u32(req.rel.dbnode); bytes.put_u32(req.rel.relnode); bytes.put_u8(req.rel.forknum); + bytes.put_u32(req.blkno); } Self::DbSize(req) => {