mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Fix bugs
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use bytes::BytesMut;
|
||||
use pageserver::page_service::PagestreamFeMessage;
|
||||
use std::{
|
||||
fs::{read_dir, File},
|
||||
@@ -9,7 +10,7 @@ use tokio::{io::AsyncWriteExt, net::TcpStream};
|
||||
|
||||
use clap::{App, Arg};
|
||||
use utils::{
|
||||
pq_proto::FeMessage,
|
||||
pq_proto::{BeMessage, FeMessage},
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
};
|
||||
|
||||
@@ -46,10 +47,24 @@ impl PagestreamApi {
|
||||
}
|
||||
|
||||
async fn make_request(&mut self, msg: PagestreamFeMessage) -> anyhow::Result<()> {
|
||||
let msg_bytes = msg.serialize();
|
||||
self.stream.write_all(&msg_bytes).await?;
|
||||
let request = {
|
||||
let msg_bytes = msg.serialize();
|
||||
let mut buf = BytesMut::new();
|
||||
let copy_msg = BeMessage::CopyData(&msg_bytes);
|
||||
BeMessage::write(&mut buf, ©_msg)?;
|
||||
buf.freeze()
|
||||
};
|
||||
println!("sending msg");
|
||||
self.stream.write_all(&request).await?;
|
||||
println!("sent");
|
||||
|
||||
let _response = FeMessage::read_fut(&mut self.stream).await?;
|
||||
// TODO It's actually a be message, but it doesn't have a parser.
|
||||
// So error response (code b'E' parses incorrectly as FeExecuteMessage)
|
||||
let response = match FeMessage::read_fut(&mut self.stream).await? {
|
||||
Some(FeMessage::CopyData(page)) => page,
|
||||
r => panic!("Expected CopyData message, got: {:?}", r),
|
||||
};
|
||||
println!("received");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -61,7 +76,7 @@ async fn replay_trace<R: std::io::Read>(
|
||||
) -> anyhow::Result<()> {
|
||||
while let Ok(msg) = PagestreamFeMessage::parse(reader) {
|
||||
println!("Parsed message {:?}", msg);
|
||||
// pagestream.make_request(msg).await?;
|
||||
pagestream.make_request(msg).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -125,6 +125,7 @@ impl PagestreamFeMessage {
|
||||
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(0);
|
||||
bytes.put_u8(if req.latest { 1 } else { 0 });
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
@@ -151,6 +152,7 @@ impl PagestreamFeMessage {
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
}
|
||||
|
||||
Self::DbSize(req) => {
|
||||
|
||||
Reference in New Issue
Block a user