diff --git a/pageserver/src/bin/psbench.rs b/pageserver/src/bin/psbench.rs index bc327ae917..43a82090e0 100644 --- a/pageserver/src/bin/psbench.rs +++ b/pageserver/src/bin/psbench.rs @@ -24,62 +24,95 @@ use anyhow::Result; const BYTES_IN_PAGE: usize = 8 * 1024; -pub async fn get_page( - pagestream: &mut tokio::net::TcpStream, - lsn: &Lsn, - page: &Page, - latest: bool, -) -> anyhow::Result> { - let latest: u8 = if latest { 1 } else { 0 }; - let msg = { - let query = { - let mut query = BytesMut::new(); - query.put_u8(2); // Specifies get_page query - query.put_u8(latest); - query.put_u64(lsn.0); - page.write(&mut query).await?; - query.freeze() - }; - - let mut buf = BytesMut::new(); - let copy_msg = BeMessage::CopyData(&query); - BeMessage::write(&mut buf, ©_msg)?; - buf.freeze() - }; - - pagestream.write(&msg).await?; - - let response = match FeMessage::read_fut(pagestream).await? { - Some(FeMessage::CopyData(page)) => page, - r => panic!("Expected CopyData message, got: {:?}", r), - }; - - let page = { - let mut cursor = Cursor::new(response); - let tag = AsyncReadExt::read_u8(&mut cursor).await?; - - match tag { - 102 => { - let mut page = Vec::::new(); - cursor.read_to_end(&mut page).await?; - if page.len() != BYTES_IN_PAGE { - panic!("Expected 8kb page, got: {:?}", page.len()); - } - page - } - 103 => { - let mut bytes = Vec::::new(); - cursor.read_to_end(&mut bytes).await?; - let message = String::from_utf8(bytes)?; - panic!("Got error message: {}", message); - } - _ => panic!("Unhandled tag {:?}", tag), - } - }; - - Ok(page) +/// Client for the pageserver's pagestream API +struct PagestreamApi { + stream: TcpStream } +/// Good enough implementation for these tests +impl PagestreamApi { + async fn connect(tenant_hex: &str, timeline: &str) -> Result { + let mut stream = TcpStream::connect("localhost:15000").await?; + + // Connect to pageserver + // TODO read host, port, dbname, user from command line + let (client, conn) = tokio_postgres::Config::new() + .host("127.0.0.1") + .port(15000) + .dbname("postgres") + .user("zenith_admin") + .connect_raw(&mut stream, tokio_postgres::NoTls) + .await?; + + // Enter pagestream protocol + let init_query = format!("pagestream {} {}", tenant_hex, timeline); + tokio::select! { + _ = conn => panic!("AAAA"), + _ = client.query(init_query.as_str(), &[]) => (), + }; + + Ok(PagestreamApi{stream}) + } + + async fn get_page( + &mut self, + lsn: &Lsn, + page: &Page, + latest: bool, + ) -> anyhow::Result> { + let latest: u8 = if latest { 1 } else { 0 }; + let msg = { + let query = { + let mut query = BytesMut::new(); + query.put_u8(2); // Specifies get_page query + query.put_u8(latest); + query.put_u64(lsn.0); + page.write(&mut query).await?; + query.freeze() + }; + + let mut buf = BytesMut::new(); + let copy_msg = BeMessage::CopyData(&query); + BeMessage::write(&mut buf, ©_msg)?; + buf.freeze() + }; + + self.stream.write(&msg).await?; + + let response = match FeMessage::read_fut(&mut self.stream).await? { + Some(FeMessage::CopyData(page)) => page, + r => panic!("Expected CopyData message, got: {:?}", r), + }; + + let page = { + let mut cursor = Cursor::new(response); + let tag = AsyncReadExt::read_u8(&mut cursor).await?; + + match tag { + 102 => { + let mut page = Vec::::new(); + cursor.read_to_end(&mut page).await?; + if page.len() != BYTES_IN_PAGE { + panic!("Expected 8kb page, got: {:?}", page.len()); + } + page + } + 103 => { + let mut bytes = Vec::::new(); + cursor.read_to_end(&mut bytes).await?; + let message = String::from_utf8(bytes)?; + panic!("Got error message: {}", message); + } + _ => panic!("Unhandled tag {:?}", tag), + } + }; + + Ok(page) + } +} + +/// Parsed wal_metadata file with additional derived +/// statistics for convenience. struct Metadata { // Parsed from metadata file wal_metadata: Vec, @@ -91,6 +124,7 @@ struct Metadata { } impl Metadata { + /// Construct metadata object from wal_metadata file emitted by pageserver fn build(wal_metadata_path: &PathBuf) -> Result { let wal_metadata_file = File::open(wal_metadata_path) .expect("error opening wal_metadata"); @@ -117,42 +151,43 @@ impl Metadata { }) } - fn report_results(&self, durations: &Vec) -> Result<()> { - // Format is optimized for easy parsing from benchmark_fixture.py + /// Print results in a format readable by benchmark_fixture.py + fn report_latency(&self, latencies: &Vec) -> Result<()> { println!("test_param num_pages {}", self.affected_pages.len()); println!("test_param num_wal_entries {}", self.wal_metadata.len()); println!("test_param total_wal_size {} bytes", self.total_wal_size); println!( "lower_is_better fastest {:?} microseconds", - durations.first().unwrap().as_micros() + latencies.first().unwrap().as_micros() ); println!( "lower_is_better median {:?} microseconds", - durations[durations.len() / 2].as_micros() + latencies[latencies.len() / 2].as_micros() ); println!( "lower_is_better p99 {:?} microseconds", - durations[durations.len() - 1 - durations.len() / 100].as_micros() + latencies[latencies.len() - 1 - latencies.len() / 100].as_micros() ); println!( "lower_is_better slowest {:?} microseconds", - durations.last().unwrap().as_micros() + latencies.last().unwrap().as_micros() ); Ok(()) } } -async fn test_latest_pages(pagestream: &mut TcpStream, metadata: &Metadata) -> Result>{ - let mut durations: Vec = vec![]; +/// Sequentially get the latest version of each page and report latencies +async fn test_latest_pages(api: &mut PagestreamApi, metadata: &Metadata) -> Result>{ + let mut latencies: Vec = vec![]; for page in &metadata.affected_pages { let start = Instant::now(); - let _page_bytes = get_page(pagestream, &metadata.latest_lsn, &page, true).await?; + let _page_bytes = api.get_page(&metadata.latest_lsn, &page, true).await?; let duration = start.elapsed(); - durations.push(duration); + latencies.push(duration); } - durations.sort(); - Ok(durations) + latencies.sort(); + Ok(latencies) } #[derive(Parser, Debug)] @@ -174,31 +209,16 @@ enum PsbenchTest { async fn main() -> Result<()> { let args = Args::parse(); - // Parse wal metadata from file + // Initialize setup let metadata = Metadata::build(&args.wal_metadata_path)?; - - // Get raw TCP connection to the pageserver postgres protocol port - let mut pagestream = TcpStream::connect("localhost:15000").await?; - let (client, conn) = tokio_postgres::Config::new() - .host("127.0.0.1") - .port(15000) - .dbname("postgres") - .user("zenith_admin") - .connect_raw(&mut pagestream, tokio_postgres::NoTls) - .await?; - - // Enter pagestream protocol - let init_query = format!("pagestream {} {}", args.tenant_hex, args.timeline); - tokio::select! { - _ = conn => panic!("AAAA"), - _ = client.query(init_query.as_str(), &[]) => (), - }; + let mut pagestream = PagestreamApi::connect(&args.tenant_hex, &args.timeline).await?; // Run test - let durations = match args.test { + let latencies = match args.test { PsbenchTest::GetLatestPages => test_latest_pages(&mut pagestream, &metadata) }.await?; - metadata.report_results(&durations)?; + // Report results + metadata.report_latency(&latencies)?; Ok(()) }