From aa7b32d892f913439fb9b3e8fa4394d0eceaa8ae Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Wed, 16 Mar 2022 18:42:58 -0400 Subject: [PATCH] Simplify --- pageserver/src/bin/psbench.rs | 88 ++++++++++++++++++++++++----------- 1 file changed, 61 insertions(+), 27 deletions(-) diff --git a/pageserver/src/bin/psbench.rs b/pageserver/src/bin/psbench.rs index 6a5124e41b..f9161ff2cd 100644 --- a/pageserver/src/bin/psbench.rs +++ b/pageserver/src/bin/psbench.rs @@ -2,9 +2,10 @@ //! //! Usually it's easier to write python perf tests, but here the performance //! of the tester matters, and the API is easier to work with from rust. -use std::{io::{BufRead, BufReader, Cursor}, net::SocketAddr}; +use std::{collections::HashMap, io::{BufRead, BufReader, Cursor}, net::SocketAddr}; +use byteorder::ReadBytesExt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use bytes::{Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use clap::{App, Arg}; use std::fs::File; use zenith_utils::{GIT_VERSION, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}}; @@ -19,21 +20,16 @@ pub fn read_lines_buffered(file_name: &str) -> impl Iterator { pub async fn get_page( pagestream: &mut tokio::net::TcpStream, - lsn: &Vec, - page: &Vec, + lsn: &Lsn, + page: &Page, ) -> anyhow::Result> { let msg = { let query = { - use bytes::buf::BufMut; let mut query = BytesMut::new(); query.put_u8(2); // Specifies get_page query query.put_u8(0); // Specifies this is not a "latest page" query - for byte in lsn { - query.put_u8(*byte); - } - for byte in page { - query.put_u8(*byte); - } + query.put_u64(lsn.0); + page.write(&mut query); query.freeze() }; @@ -52,7 +48,7 @@ pub async fn get_page( let page = { let mut cursor = Cursor::new(response); - let tag = cursor.read_u8().await?; + let tag = AsyncReadExt::read_u8(&mut cursor).await?; if tag != 102 { panic!("AA"); } @@ -70,6 +66,41 @@ pub async fn get_page( Ok(page) } +#[derive(Copy, Clone)] +pub struct Lsn(pub u64); + +#[derive(Copy, Clone)] +pub struct Page { + spcnode: u32, + dbnode: u32, + relnode: u32, + forknum: u8, + blkno: u32, +} + +impl Page { + async fn read(buf: &mut Reader) -> Result + where + Reader: tokio::io::AsyncRead + Unpin, + { + let spcnode = buf.read_u32().await?; + let dbnode = buf.read_u32().await?; + let relnode = buf.read_u32().await?; + let forknum = buf.read_u8().await?; + let blkno = buf.read_u32().await?; + Ok(Page { spcnode, dbnode, relnode, forknum, blkno }) + } + + async fn write(&self, buf: &mut BytesMut) -> Result<()> { + buf.put_u32(self.spcnode); + buf.put_u32(self.dbnode); + buf.put_u32(self.relnode); + buf.put_u8(self.forknum); + buf.put_u32(self.blkno); + Ok(()) + } +} + #[tokio::main] async fn main() -> Result<()> { @@ -109,21 +140,19 @@ async fn main() -> Result<()> { let tenant_hex = arg_matches.value_of("tenant_hex").unwrap(); let timeline = arg_matches.value_of("timeline").unwrap(); - let lsn_page_pairs: Vec<_> = read_lines_buffered(log_file) - .filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string())) - .map(|rest| { - let (lsn, page) = rest.split_once(" ").unwrap(); - let lsn = hex::decode(lsn).unwrap(); - if lsn.len() != 8 { - panic!("AAA") - } - let page = hex::decode(page).unwrap(); - if page.len() != 17 { - panic!("AAA") - } - (lsn, page) - }) - .collect(); + let relevant = read_lines_buffered(log_file) .filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string())); + let mut lsn_page_pairs = Vec::<(Lsn, Page)>::new(); + for line in relevant { + let (lsn, page) = line.split_once(" ").unwrap(); + + let lsn = hex::decode(lsn)?; + let lsn = Lsn(AsyncReadExt::read_u64(&mut Cursor::new(lsn)).await?); + + let page = hex::decode(page)?; + let page = Page::read(&mut Cursor::new(page)).await?; + + lsn_page_pairs.push((lsn, page)) + } // Get raw TCP connection to the pageserver postgres protocol port let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?; @@ -145,5 +174,10 @@ async fn main() -> Result<()> { let (some_lsn, some_page) = lsn_page_pairs[0].clone(); let _page = get_page(&mut socket, &some_lsn, &some_page).await?; + // TODO + // 1. print writes per page + // 2. Generate high writes per page + // 3. Test runtime + Ok(()) }