From d7ed9d8e0130f0bf486d4085da4281af67fb9400 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Wed, 16 Mar 2022 17:40:19 -0400 Subject: [PATCH] cleanup --- pageserver/src/bin/psbench.rs | 114 ++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/pageserver/src/bin/psbench.rs b/pageserver/src/bin/psbench.rs index 4bc80caccc..6a5124e41b 100644 --- a/pageserver/src/bin/psbench.rs +++ b/pageserver/src/bin/psbench.rs @@ -3,6 +3,7 @@ //! Usually it's easier to write python perf tests, but here the performance //! of the tester matters, and the API is easier to work with from rust. use std::{io::{BufRead, BufReader, Cursor}, net::SocketAddr}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use bytes::{Bytes, BytesMut}; use clap::{App, Arg}; use std::fs::File; @@ -16,6 +17,59 @@ pub fn read_lines_buffered(file_name: &str) -> impl Iterator { .map(|result| result.unwrap()) } +pub async fn get_page( + pagestream: &mut tokio::net::TcpStream, + lsn: &Vec, + page: &Vec, +) -> anyhow::Result> { + let msg = { + let query = { + use bytes::buf::BufMut; + let mut query = BytesMut::new(); + query.put_u8(2); // Specifies get_page query + query.put_u8(0); // Specifies this is not a "latest page" query + for byte in lsn { + query.put_u8(*byte); + } + for byte in page { + query.put_u8(*byte); + } + query.freeze() + }; + + let mut buf = BytesMut::new(); + let copy_msg = BeMessage::CopyData(&query); + BeMessage::write(&mut buf, ©_msg)?; + buf.freeze() + }; + + pagestream.write(&msg).await?; + + let response = match FeMessage::read_fut(pagestream).await? { + Some(FeMessage::CopyData(page)) => page, + _ => panic!("AAAAA"), + }; + + let page = { + let mut cursor = Cursor::new(response); + let tag = cursor.read_u8().await?; + if tag != 102 { + panic!("AA"); + } + + let mut page = Vec::::new(); + cursor.read_to_end(&mut page).await?; + dbg!(page.len()); + if page.len() != 8 * 1024 { + panic!("AA"); + } + + page + }; + + Ok(page) +} + #[tokio::main] async fn main() -> Result<()> { @@ -71,10 +125,8 @@ async fn main() -> Result<()> { }) .collect(); - let (some_lsn, some_page) = lsn_page_pairs[0].clone(); - + // Get raw TCP connection to the pageserver postgres protocol port let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?; - println!("AYY got socket"); let (client, conn) = tokio_postgres::Config::new() .host("127.0.0.1") .port(15000) @@ -83,61 +135,15 @@ async fn main() -> Result<()> { .connect_raw(&mut socket, tokio_postgres::NoTls) .await?; - let query = format!("pagestream {} {}", tenant_hex, timeline); + // Enter pagestream protocol + let init_query = format!("pagestream {} {}", tenant_hex, timeline); tokio::select! { _ = conn => panic!("AAAA"), - _ = client.query(query.as_str(), &[]) => (), + _ = client.query(init_query.as_str(), &[]) => (), }; - println!("AYYYYYYYYYYYY"); - - let msg = { - let query = { - use bytes::buf::BufMut; - let mut query = BytesMut::new(); - query.put_u8(2); // Specifies get_page query - query.put_u8(0); // Specifies this is not a "latest page" query - for byte in some_lsn { - query.put_u8(byte); - } - for byte in some_page { - query.put_u8(byte); - } - query.freeze() - }; - - let mut buf = BytesMut::new(); - let copy_msg = BeMessage::CopyData(&query); - BeMessage::write(&mut buf, ©_msg)?; - buf.freeze() - }; - - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - socket.write(&msg).await?; - - let response = match FeMessage::read_fut(&mut socket).await? { - Some(FeMessage::CopyData(page)) => page, - _ => panic!("AAAAA"), - }; - - let page = { - let mut cursor = Cursor::new(response); - let tag = cursor.read_u8().await?; - if tag != 102 { - panic!("AA"); - } - - let mut page = Vec::::new(); - cursor.read_to_end(&mut page).await?; - dbg!(page.len()); - if page.len() != 8 * 1024 { - panic!("AA"); - } - - page - }; - - print!("yay done"); + let (some_lsn, some_page) = lsn_page_pairs[0].clone(); + let _page = get_page(&mut socket, &some_lsn, &some_page).await?; Ok(()) }