diff --git a/pageserver/src/bin/psbench.rs b/pageserver/src/bin/psbench.rs new file mode 100644 index 0000000000..4bc80caccc --- /dev/null +++ b/pageserver/src/bin/psbench.rs @@ -0,0 +1,143 @@ +//! Pageserver benchmark tool +//! +//! Usually it's easier to write python perf tests, but here the performance +//! of the tester matters, and the API is easier to work with from rust. +use std::{io::{BufRead, BufReader, Cursor}, net::SocketAddr}; +use bytes::{Bytes, BytesMut}; +use clap::{App, Arg}; +use std::fs::File; +use zenith_utils::{GIT_VERSION, pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage}}; + +use anyhow::Result; + +pub fn read_lines_buffered(file_name: &str) -> impl Iterator { + BufReader::new(File::open(file_name).unwrap()) + .lines() + .map(|result| result.unwrap()) +} + +#[tokio::main] +async fn main() -> Result<()> { + + // TODO do I need connection string to pageserver? + + let arg_matches = App::new("LALALA") + .about("lalala") + .version(GIT_VERSION) + .arg( + Arg::new("path") + .help("Path to file to dump") + .required(true) + .index(1), + ) + .arg( + Arg::new("ps_connstr") + .help("Connection string to pageserver") + .required(true) + .index(2), + ) + .arg( + Arg::new("tenant_hex") + .help("TODO") + .required(true) + .index(3), + ) + .arg( + Arg::new("timeline") + .help("TODO") + .required(true) + .index(4), + ) + .get_matches(); + + let log_file = arg_matches.value_of("path").unwrap(); + let ps_connstr = arg_matches.value_of("ps_connstr").unwrap(); + let tenant_hex = arg_matches.value_of("tenant_hex").unwrap(); + let timeline = arg_matches.value_of("timeline").unwrap(); + + let lsn_page_pairs: Vec<_> = read_lines_buffered(log_file) + .filter_map(|line| line.strip_prefix("wal-at-lsn-modified-page ").map(|x| x.to_string())) + .map(|rest| { + let (lsn, page) = rest.split_once(" ").unwrap(); + let lsn = hex::decode(lsn).unwrap(); + if lsn.len() != 8 { + panic!("AAA") + } + let page = hex::decode(page).unwrap(); + if page.len() != 17 { + panic!("AAA") + } + (lsn, page) + }) + .collect(); + + let (some_lsn, some_page) = lsn_page_pairs[0].clone(); + + let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?; + println!("AYY got socket"); + let (client, conn) = tokio_postgres::Config::new() + .host("127.0.0.1") + .port(15000) + .dbname("postgres") + .user("zenith_admin") + .connect_raw(&mut socket, tokio_postgres::NoTls) + .await?; + + let query = format!("pagestream {} {}", tenant_hex, timeline); + tokio::select! { + _ = conn => panic!("AAAA"), + _ = client.query(query.as_str(), &[]) => (), + }; + + println!("AYYYYYYYYYYYY"); + + let msg = { + let query = { + use bytes::buf::BufMut; + let mut query = BytesMut::new(); + query.put_u8(2); // Specifies get_page query + query.put_u8(0); // Specifies this is not a "latest page" query + for byte in some_lsn { + query.put_u8(byte); + } + for byte in some_page { + query.put_u8(byte); + } + query.freeze() + }; + + let mut buf = BytesMut::new(); + let copy_msg = BeMessage::CopyData(&query); + BeMessage::write(&mut buf, ©_msg)?; + buf.freeze() + }; + + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + socket.write(&msg).await?; + + let response = match FeMessage::read_fut(&mut socket).await? { + Some(FeMessage::CopyData(page)) => page, + _ => panic!("AAAAA"), + }; + + let page = { + let mut cursor = Cursor::new(response); + let tag = cursor.read_u8().await?; + if tag != 102 { + panic!("AA"); + } + + let mut page = Vec::::new(); + cursor.read_to_end(&mut page).await?; + dbg!(page.len()); + if page.len() != 8 * 1024 { + panic!("AA"); + } + + page + }; + + print!("yay done"); + + Ok(()) +} diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 1962c9bbd3..47a326db56 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -21,6 +21,7 @@ //! redo Postgres process, but some records it can handle directly with //! bespoken Rust code. +use chrono::format::format; use postgres_ffi::nonrelfile_utils::clogpage_precedes; use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment; use std::cmp::min; @@ -270,6 +271,25 @@ impl WalIngest { // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { + + let lsn_hex = { + use bytes::BufMut; + let mut bytes = BytesMut::new(); + bytes.put_u64(lsn.0); + hex::encode(bytes.freeze()) + }; + let page_hex = { + use bytes::BufMut; + let mut page = BytesMut::new(); + page.put_u32(blk.rnode_spcnode); + page.put_u32(blk.rnode_dbnode); + page.put_u32(blk.rnode_relnode); + page.put_u8(blk.forknum); + page.put_u32(blk.blkno); + hex::encode(page.freeze()) + }; + println!("wal-at-lsn-modified-page {} {}", lsn_hex, page_hex); + self.ingest_decoded_block(timeline, lsn, &decoded, blk)?; } diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index ec570a7dac..3c14ee6031 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -613,6 +613,15 @@ class ZenithEnv: """ Get list of safekeeper endpoints suitable for wal_acceptors GUC """ return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers]) + def run_psbench(self, timeline): + ps_log_filename = os.path.join(self.repo_dir, "pageserver.log") + ps_connstr = self.pageserver.connstr() + psbench_binpath = os.path.join(str(zenith_binpath), 'psbench') + tenant_hex = self.initial_tenant.hex + print("AAAAAAAA", ps_connstr) + args = [psbench_binpath, ps_log_filename, ps_connstr, tenant_hex, timeline] + subprocess.run(args) + @cached_property def auth_keys(self) -> AuthKeys: pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes() diff --git a/test_runner/performance/test_pageserver.py b/test_runner/performance/test_pageserver.py new file mode 100644 index 0000000000..35c5d12090 --- /dev/null +++ b/test_runner/performance/test_pageserver.py @@ -0,0 +1,72 @@ +from contextlib import closing +from fixtures.zenith_fixtures import ZenithEnv +from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker + +def _get_page(): + # u8 tag: 2, big endian + # u8 latest + # u64 lsn + # reltag: + # u32 spcnode + # u32 dbnode + # u32 relnode + # u8 forknum + # u32 blkno + pass + + +def test_get_page(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker): + env = zenith_simple_env + # Create a branch for us + env.zenith_cli.create_branch("test_pageserver", "empty") + pg = env.postgres.create_start('test_pageserver') + tenant_hex = env.initial_tenant.hex + timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0] + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute('create table t (i integer);') + cur.execute('insert into t values (generate_series(1,3));') + + cur.execute("select * from t;") + res = cur.fetchall() + + cur.execute("select pg_relation_filepath('t');") + res = cur.fetchall() + print(res) + + env.run_psbench(timeline) + return + + import os + ps_log_filename = os.path.join(env.repo_dir, "pageserver.log") + with open(ps_log_filename) as log_file: + log = log_file.readlines() + + ps_connstr = env.pageserver.connstr() + + + + latest_write = None + for line in log: + if line.startswith("wal-at-lsn-modified-page "): + tokens = line.split() + lsn_hex = tokens[1] + page_hex = tokens[2] + latest_write = (lsn_hex, page_hex) + + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor() as cur: + cur.execute(f"pagestream {tenant_hex} {timeline}") + with psconn.cursor() as cur: + cur.execute(f"select 1;") + + # res = cur.fetchall() + # print(res) + # TODO send query to pageserver, see what is logged + + # TODO send queries on these pages + # 1. Craft binary message + # 2. Send as postgres query + + # TODO maybe make rust program for this side of the protocol?