This commit is contained in:
Bojan Serafimov
2022-03-16 18:42:58 -04:00
parent d7ed9d8e01
commit aa7b32d892

View File

@@ -2,9 +2,10 @@
//!
//! Usually it's easier to write python perf tests, but here the performance
//! of the tester matters, and the API is easier to work with from rust.
use std::{io::{BufRead, BufReader, Cursor}, net::SocketAddr};
use std::{collections::HashMap, io::{BufRead, BufReader, Cursor}, net::SocketAddr};
use byteorder::ReadBytesExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use bytes::{Bytes, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use clap::{App, Arg};
use std::fs::File;
use zenith_utils::{GIT_VERSION, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}};
@@ -19,21 +20,16 @@ pub fn read_lines_buffered(file_name: &str) -> impl Iterator<Item = String> {
pub async fn get_page(
pagestream: &mut tokio::net::TcpStream,
lsn: &Vec<u8>,
page: &Vec<u8>,
lsn: &Lsn,
page: &Page,
) -> anyhow::Result<Vec<u8>> {
let msg = {
let query = {
use bytes::buf::BufMut;
let mut query = BytesMut::new();
query.put_u8(2); // Specifies get_page query
query.put_u8(0); // Specifies this is not a "latest page" query
for byte in lsn {
query.put_u8(*byte);
}
for byte in page {
query.put_u8(*byte);
}
query.put_u64(lsn.0);
page.write(&mut query);
query.freeze()
};
@@ -52,7 +48,7 @@ pub async fn get_page(
let page = {
let mut cursor = Cursor::new(response);
let tag = cursor.read_u8().await?;
let tag = AsyncReadExt::read_u8(&mut cursor).await?;
if tag != 102 {
panic!("AA");
}
@@ -70,6 +66,41 @@ pub async fn get_page(
Ok(page)
}
#[derive(Copy, Clone)]
pub struct Lsn(pub u64);
#[derive(Copy, Clone)]
pub struct Page {
spcnode: u32,
dbnode: u32,
relnode: u32,
forknum: u8,
blkno: u32,
}
impl Page {
async fn read<Reader>(buf: &mut Reader) -> Result<Page>
where
Reader: tokio::io::AsyncRead + Unpin,
{
let spcnode = buf.read_u32().await?;
let dbnode = buf.read_u32().await?;
let relnode = buf.read_u32().await?;
let forknum = buf.read_u8().await?;
let blkno = buf.read_u32().await?;
Ok(Page { spcnode, dbnode, relnode, forknum, blkno })
}
async fn write(&self, buf: &mut BytesMut) -> Result<()> {
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u8(self.forknum);
buf.put_u32(self.blkno);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
@@ -109,21 +140,19 @@ async fn main() -> Result<()> {
let tenant_hex = arg_matches.value_of("tenant_hex").unwrap();
let timeline = arg_matches.value_of("timeline").unwrap();
let lsn_page_pairs: Vec<_> = read_lines_buffered(log_file)
.filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string()))
.map(|rest| {
let (lsn, page) = rest.split_once(" ").unwrap();
let lsn = hex::decode(lsn).unwrap();
if lsn.len() != 8 {
panic!("AAA")
}
let page = hex::decode(page).unwrap();
if page.len() != 17 {
panic!("AAA")
}
(lsn, page)
})
.collect();
let relevant = read_lines_buffered(log_file) .filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string()));
let mut lsn_page_pairs = Vec::<(Lsn, Page)>::new();
for line in relevant {
let (lsn, page) = line.split_once(" ").unwrap();
let lsn = hex::decode(lsn)?;
let lsn = Lsn(AsyncReadExt::read_u64(&mut Cursor::new(lsn)).await?);
let page = hex::decode(page)?;
let page = Page::read(&mut Cursor::new(page)).await?;
lsn_page_pairs.push((lsn, page))
}
// Get raw TCP connection to the pageserver postgres protocol port
let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?;
@@ -145,5 +174,10 @@ async fn main() -> Result<()> {
let (some_lsn, some_page) = lsn_page_pairs[0].clone();
let _page = get_page(&mut socket, &some_lsn, &some_page).await?;
// TODO
// 1. print writes per page
// 2. Generate high writes per page
// 3. Test runtime
Ok(())
}