From c2814e9828fb004cacfe54177e1bb0c7decccb8f Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Tue, 12 Apr 2022 13:27:18 -0400 Subject: [PATCH] Add get_page tests --- pageserver/src/bin/pageserver.rs | 3 + pageserver/src/bin/psbench.rs | 171 +++++++++++++++++++++ pageserver/src/lib.rs | 1 + pageserver/src/wal_metadata.rs | 98 ++++++++++++ pageserver/src/walingest.rs | 8 + test_runner/fixtures/zenith_fixtures.py | 7 + test_runner/performance/test_pageserver.py | 46 ++++++ 7 files changed, 334 insertions(+) create mode 100644 pageserver/src/bin/psbench.rs create mode 100644 pageserver/src/wal_metadata.rs create mode 100644 test_runner/performance/test_pageserver.py diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 0af96cff66..005be49c7e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -185,6 +185,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() // Initialize logger let log_file = logging::init(LOG_FILE_NAME, daemonize)?; + // Initialize wal metadata logger, if necessary + pageserver::wal_metadata::init(conf).expect("wal_metadata init failed"); + info!("version: {}", GIT_VERSION); // TODO: Check that it looks like a valid repository before going further diff --git a/pageserver/src/bin/psbench.rs b/pageserver/src/bin/psbench.rs new file mode 100644 index 0000000000..ce04bf3c04 --- /dev/null +++ b/pageserver/src/bin/psbench.rs @@ -0,0 +1,171 @@ +//! Pageserver benchmark tool +//! +//! Usually it's easier to write python perf tests, but here the performance +//! of the tester matters, and the pagestream API is easier to call from rust. +use bytes::{BufMut, BytesMut}; +use clap::{App, Arg}; +use pageserver::wal_metadata::{Page, WalEntryMetadata}; +use std::fs::File; +use std::time::Instant; +use std::{ + collections::HashSet, + io::{BufRead, BufReader, Cursor}, + time::Duration, +}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use zenith_utils::{ + lsn::Lsn, + pq_proto::{BeMessage, FeMessage}, + GIT_VERSION, +}; + +use anyhow::Result; + +const BYTES_IN_PAGE: usize = 8 * 1024; + +pub fn read_lines_buffered(file_name: &str) -> impl Iterator { + BufReader::new(File::open(file_name).unwrap()) + .lines() + .map(|result| result.unwrap()) +} + +pub async fn get_page( + pagestream: &mut tokio::net::TcpStream, + lsn: &Lsn, + page: &Page, + latest: bool, +) -> anyhow::Result> { + let latest: u8 = if latest { 1 } else { 0 }; + let msg = { + let query = { + let mut query = BytesMut::new(); + query.put_u8(2); // Specifies get_page query + query.put_u8(latest); + query.put_u64(lsn.0); + page.write(&mut query).await?; + query.freeze() + }; + + let mut buf = BytesMut::new(); + let copy_msg = BeMessage::CopyData(&query); + BeMessage::write(&mut buf, ©_msg)?; + buf.freeze() + }; + + pagestream.write(&msg).await?; + + let response = match FeMessage::read_fut(pagestream).await? { + Some(FeMessage::CopyData(page)) => page, + r => panic!("Expected CopyData message, got: {:?}", r), + }; + + let page = { + let mut cursor = Cursor::new(response); + let tag = AsyncReadExt::read_u8(&mut cursor).await?; + + match tag { + 102 => { + let mut page = Vec::::new(); + cursor.read_to_end(&mut page).await?; + if page.len() != BYTES_IN_PAGE { + panic!("Expected 8kb page, got: {:?}", page.len()); + } + page + } + 103 => { + let mut bytes = Vec::::new(); + cursor.read_to_end(&mut bytes).await?; + let message = String::from_utf8(bytes)?; + panic!("Got error message: {}", message); + } + _ => panic!("Unhandled tag {:?}", tag), + } + }; + + Ok(page) +} + +#[tokio::main] +async fn main() -> Result<()> { + let arg_matches = App::new("LALALA") + .about("lalala") + .version(GIT_VERSION) + .arg( + Arg::new("wal_metadata_file") + .help("Path to wal metadata file") + .required(true) + .index(1), + ) + .arg(Arg::new("tenant_hex").help("TODO").required(true).index(2)) + .arg(Arg::new("timeline").help("TODO").required(true).index(3)) + .get_matches(); + + let metadata_file = arg_matches.value_of("wal_metadata_file").unwrap(); + let tenant_hex = arg_matches.value_of("tenant_hex").unwrap(); + let timeline = arg_matches.value_of("timeline").unwrap(); + + // Parse log lines + let wal_metadata: Vec = read_lines_buffered(metadata_file) + .map(|line| serde_json::from_str(&line).expect("corrupt metadata file")) + .collect(); + + // Get raw TCP connection to the pageserver postgres protocol port + let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?; + let (client, conn) = tokio_postgres::Config::new() + .host("127.0.0.1") + .port(15000) + .dbname("postgres") + .user("zenith_admin") + .connect_raw(&mut socket, tokio_postgres::NoTls) + .await?; + + // Enter pagestream protocol + let init_query = format!("pagestream {} {}", tenant_hex, timeline); + tokio::select! { + _ = conn => panic!("AAAA"), + _ = client.query(init_query.as_str(), &[]) => (), + }; + + // Derive some variables + let total_wal_size: usize = wal_metadata.iter().map(|m| m.size).sum(); + let affected_pages: HashSet<_> = wal_metadata + .iter() + .map(|m| m.affected_pages.clone()) + .flatten() + .collect(); + let latest_lsn = wal_metadata.iter().map(|m| m.lsn).max().unwrap(); + + // Get all latest pages + let mut durations: Vec = vec![]; + for page in &affected_pages { + let start = Instant::now(); + let _page_bytes = get_page(&mut socket, &latest_lsn, &page, true).await?; + let duration = start.elapsed(); + + durations.push(duration); + } + + durations.sort(); + // Format is optimized for easy parsing from benchmark_fixture.py + println!("test_param num_pages {}", affected_pages.len()); + println!("test_param num_wal_entries {}", wal_metadata.len()); + println!("test_param total_wal_size {} bytes", total_wal_size); + println!( + "lower_is_better fastest {:?} microseconds", + durations.first().unwrap().as_micros() + ); + println!( + "lower_is_better median {:?} microseconds", + durations[durations.len() / 2].as_micros() + ); + println!( + "lower_is_better p99 {:?} microseconds", + durations[durations.len() - 1 - durations.len() / 100].as_micros() + ); + println!( + "lower_is_better slowest {:?} microseconds", + durations.last().unwrap().as_micros() + ); + + Ok(()) +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 6dddef5f27..e466656b9c 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -15,6 +15,7 @@ pub mod tenant_threads; pub mod thread_mgr; pub mod timelines; pub mod virtual_file; +pub mod wal_metadata; pub mod walingest; pub mod walreceiver; pub mod walrecord; diff --git a/pageserver/src/wal_metadata.rs b/pageserver/src/wal_metadata.rs new file mode 100644 index 0000000000..3bc9566280 --- /dev/null +++ b/pageserver/src/wal_metadata.rs @@ -0,0 +1,98 @@ +//! +//! Utils for logging wal metadata. Useful for tests only, and too expensive to run in prod. +//! +//! Ideally we'd get this wal metadata using pg_waldump from the compute pg_wal directory, +//! but pg_waldump doesn't provide all the metadata we need. We could write a rust program +//! to analyze pg wal, but we'd need to port some c code for decoding wal files. This module +//! is a temporary hack that allows us to print the metadata that the pageserver decodes +//! using postgres_ffi::waldecoder. +//! +//! Logging wal metadata could add significant write overhead to the pageserver. Tests that +//! rely on this should either spin up a dedicated pageserver for wal metadata logging, or +//! only measure read performance. +//! +use anyhow::Result; +use bytes::{BufMut, Bytes, BytesMut}; +use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use std::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use zenith_utils::lsn::Lsn; + +use crate::{config::PageServerConf, repository::Key, walrecord::DecodedBkpBlock}; + +pub static WAL_METADATA_FILE: OnceCell = OnceCell::new(); + +#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub struct Page { + spcnode: u32, + dbnode: u32, + relnode: u32, + forknum: u8, + blkno: u32, +} + +impl Page { + pub async fn read(buf: &mut Reader) -> Result + where + Reader: tokio::io::AsyncRead + Unpin, + { + let spcnode = buf.read_u32().await?; + let dbnode = buf.read_u32().await?; + let relnode = buf.read_u32().await?; + let forknum = buf.read_u8().await?; + let blkno = buf.read_u32().await?; + Ok(Page { + spcnode, + dbnode, + relnode, + forknum, + blkno, + }) + } + + pub async fn write(&self, buf: &mut BytesMut) -> Result<()> { + buf.put_u32(self.spcnode); + buf.put_u32(self.dbnode); + buf.put_u32(self.relnode); + buf.put_u8(self.forknum); + buf.put_u32(self.blkno); + Ok(()) + } +} + +impl From<&DecodedBkpBlock> for Page { + fn from(blk: &DecodedBkpBlock) -> Self { + Page { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + blkno: blk.blkno, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WalEntryMetadata { + pub lsn: Lsn, + pub size: usize, + pub affected_pages: Vec, +} + +pub fn init(conf: &'static PageServerConf) -> Result<()> { + let wal_metadata_file_dir = conf.workdir.join("wal_metadata.log"); + WAL_METADATA_FILE + .set(File::create(wal_metadata_file_dir)?) + .expect("wal_metadata file is already created"); + Ok(()) +} + +pub fn write(wal_meta: WalEntryMetadata) -> Result<()> { + if let Some(mut file) = WAL_METADATA_FILE.get() { + let mut line = serde_json::to_string(&wal_meta)?; + line.push('\n'); + std::io::prelude::Write::write_all(&mut file, line.as_bytes())?; + } + Ok(()) +} diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index c6c6e89854..01ee4ec407 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -82,6 +82,7 @@ impl<'a, R: Repository> WalIngest<'a, R> { ) -> Result<()> { let mut modification = timeline.begin_modification(lsn); + let recdata_len = recdata.len(); let mut decoded = decode_wal_record(recdata); let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); @@ -249,6 +250,13 @@ impl<'a, R: Repository> WalIngest<'a, R> { self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?; } + // Emit wal entry metadata, if configured to do so + crate::wal_metadata::write(crate::wal_metadata::WalEntryMetadata { + lsn, + size: recdata_len, + affected_pages: decoded.blocks.iter().map(|blk| blk.into()).collect(), + }); + // If checkpoint data was updated, store the new version in the repository if self.checkpoint_modified { let new_checkpoint_bytes = self.checkpoint.encode(); diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index a95809687a..7bab6d65c0 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -638,6 +638,13 @@ class ZenithEnv: """ Get list of safekeeper endpoints suitable for wal_acceptors GUC """ return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers]) + def run_psbench(self, timeline): + wal_metadata_filename = os.path.join(self.repo_dir, "wal_metadata.log") + psbench_binpath = os.path.join(str(zenith_binpath), 'psbench') + tenant_hex = self.initial_tenant.hex + args = [psbench_binpath, wal_metadata_filename, tenant_hex, timeline] + return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip() + @cached_property def auth_keys(self) -> AuthKeys: pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes() diff --git a/test_runner/performance/test_pageserver.py b/test_runner/performance/test_pageserver.py new file mode 100644 index 0000000000..059286857c --- /dev/null +++ b/test_runner/performance/test_pageserver.py @@ -0,0 +1,46 @@ +from contextlib import closing +from fixtures.zenith_fixtures import ZenithEnv, PgBin +from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker + + +def test_get_page(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker, pg_bin: PgBin): + env = zenith_simple_env + env.zenith_cli.create_branch("test_pageserver", "empty") + pg = env.postgres.create_start('test_pageserver') + tenant_hex = env.initial_tenant.hex + timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0] + + # Long-lived cursor, useful for flushing + psconn = env.pageserver.connect() + pscur = psconn.cursor() + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + workload = "pgbench" + + print(f"Running workload {workload}") + if workload == "hot page": + cur.execute('create table t (i integer);') + cur.execute('insert into t values (0);') + for i in range(100000): + cur.execute(f'update t set i = {i};') + elif workload == "pgbench": + pg_bin.run_capture(['pgbench', '-s5', '-i', pg.connstr()]) + pg_bin.run_capture(['pgbench', '-c1', '-t5000', pg.connstr()]) + elif workload == "pgbench big": + pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()]) + pg_bin.run_capture(['pgbench', '-c1', '-t100000', pg.connstr()]) + elif workload == "pgbench long": + pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()]) + pg_bin.run_capture(['pgbench', '-c1', '-t1000000', pg.connstr()]) + + pscur.execute(f"checkpoint {env.initial_tenant.hex} {timeline} 0") + + output = env.run_psbench(timeline) + for line in output.split("\n"): + tokens = line.split(" ") + report = tokens[0] + name = tokens[1] + value = tokens[2] + unit = tokens[3] if len(tokens) > 3 else "" + zenbenchmark.record(name, value, unit, report=report)