Add get_page tests

This commit is contained in:
Bojan Serafimov
2022-04-12 13:27:18 -04:00
parent 07a9553700
commit c2814e9828
7 changed files with 334 additions and 0 deletions

View File

@@ -185,6 +185,9 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// Initialize logger
let log_file = logging::init(LOG_FILE_NAME, daemonize)?;
// Initialize wal metadata logger, if necessary
pageserver::wal_metadata::init(conf).expect("wal_metadata init failed");
info!("version: {}", GIT_VERSION);
// TODO: Check that it looks like a valid repository before going further

View File

@@ -0,0 +1,171 @@
//! Pageserver benchmark tool
//!
//! Usually it's easier to write python perf tests, but here the performance
//! of the tester matters, and the pagestream API is easier to call from rust.
use bytes::{BufMut, BytesMut};
use clap::{App, Arg};
use pageserver::wal_metadata::{Page, WalEntryMetadata};
use std::fs::File;
use std::time::Instant;
use std::{
collections::HashSet,
io::{BufRead, BufReader, Cursor},
time::Duration,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use zenith_utils::{
lsn::Lsn,
pq_proto::{BeMessage, FeMessage},
GIT_VERSION,
};
use anyhow::Result;
const BYTES_IN_PAGE: usize = 8 * 1024;
pub fn read_lines_buffered(file_name: &str) -> impl Iterator<Item = String> {
BufReader::new(File::open(file_name).unwrap())
.lines()
.map(|result| result.unwrap())
}
pub async fn get_page(
pagestream: &mut tokio::net::TcpStream,
lsn: &Lsn,
page: &Page,
latest: bool,
) -> anyhow::Result<Vec<u8>> {
let latest: u8 = if latest { 1 } else { 0 };
let msg = {
let query = {
let mut query = BytesMut::new();
query.put_u8(2); // Specifies get_page query
query.put_u8(latest);
query.put_u64(lsn.0);
page.write(&mut query).await?;
query.freeze()
};
let mut buf = BytesMut::new();
let copy_msg = BeMessage::CopyData(&query);
BeMessage::write(&mut buf, &copy_msg)?;
buf.freeze()
};
pagestream.write(&msg).await?;
let response = match FeMessage::read_fut(pagestream).await? {
Some(FeMessage::CopyData(page)) => page,
r => panic!("Expected CopyData message, got: {:?}", r),
};
let page = {
let mut cursor = Cursor::new(response);
let tag = AsyncReadExt::read_u8(&mut cursor).await?;
match tag {
102 => {
let mut page = Vec::<u8>::new();
cursor.read_to_end(&mut page).await?;
if page.len() != BYTES_IN_PAGE {
panic!("Expected 8kb page, got: {:?}", page.len());
}
page
}
103 => {
let mut bytes = Vec::<u8>::new();
cursor.read_to_end(&mut bytes).await?;
let message = String::from_utf8(bytes)?;
panic!("Got error message: {}", message);
}
_ => panic!("Unhandled tag {:?}", tag),
}
};
Ok(page)
}
#[tokio::main]
async fn main() -> Result<()> {
let arg_matches = App::new("LALALA")
.about("lalala")
.version(GIT_VERSION)
.arg(
Arg::new("wal_metadata_file")
.help("Path to wal metadata file")
.required(true)
.index(1),
)
.arg(Arg::new("tenant_hex").help("TODO").required(true).index(2))
.arg(Arg::new("timeline").help("TODO").required(true).index(3))
.get_matches();
let metadata_file = arg_matches.value_of("wal_metadata_file").unwrap();
let tenant_hex = arg_matches.value_of("tenant_hex").unwrap();
let timeline = arg_matches.value_of("timeline").unwrap();
// Parse log lines
let wal_metadata: Vec<WalEntryMetadata> = read_lines_buffered(metadata_file)
.map(|line| serde_json::from_str(&line).expect("corrupt metadata file"))
.collect();
// Get raw TCP connection to the pageserver postgres protocol port
let mut socket = tokio::net::TcpStream::connect("localhost:15000").await?;
let (client, conn) = tokio_postgres::Config::new()
.host("127.0.0.1")
.port(15000)
.dbname("postgres")
.user("zenith_admin")
.connect_raw(&mut socket, tokio_postgres::NoTls)
.await?;
// Enter pagestream protocol
let init_query = format!("pagestream {} {}", tenant_hex, timeline);
tokio::select! {
_ = conn => panic!("AAAA"),
_ = client.query(init_query.as_str(), &[]) => (),
};
// Derive some variables
let total_wal_size: usize = wal_metadata.iter().map(|m| m.size).sum();
let affected_pages: HashSet<_> = wal_metadata
.iter()
.map(|m| m.affected_pages.clone())
.flatten()
.collect();
let latest_lsn = wal_metadata.iter().map(|m| m.lsn).max().unwrap();
// Get all latest pages
let mut durations: Vec<Duration> = vec![];
for page in &affected_pages {
let start = Instant::now();
let _page_bytes = get_page(&mut socket, &latest_lsn, &page, true).await?;
let duration = start.elapsed();
durations.push(duration);
}
durations.sort();
// Format is optimized for easy parsing from benchmark_fixture.py
println!("test_param num_pages {}", affected_pages.len());
println!("test_param num_wal_entries {}", wal_metadata.len());
println!("test_param total_wal_size {} bytes", total_wal_size);
println!(
"lower_is_better fastest {:?} microseconds",
durations.first().unwrap().as_micros()
);
println!(
"lower_is_better median {:?} microseconds",
durations[durations.len() / 2].as_micros()
);
println!(
"lower_is_better p99 {:?} microseconds",
durations[durations.len() - 1 - durations.len() / 100].as_micros()
);
println!(
"lower_is_better slowest {:?} microseconds",
durations.last().unwrap().as_micros()
);
Ok(())
}

View File

@@ -15,6 +15,7 @@ pub mod tenant_threads;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;
pub mod wal_metadata;
pub mod walingest;
pub mod walreceiver;
pub mod walrecord;

View File

@@ -0,0 +1,98 @@
//!
//! Utils for logging wal metadata. Useful for tests only, and too expensive to run in prod.
//!
//! Ideally we'd get this wal metadata using pg_waldump from the compute pg_wal directory,
//! but pg_waldump doesn't provide all the metadata we need. We could write a rust program
//! to analyze pg wal, but we'd need to port some c code for decoding wal files. This module
//! is a temporary hack that allows us to print the metadata that the pageserver decodes
//! using postgres_ffi::waldecoder.
//!
//! Logging wal metadata could add significant write overhead to the pageserver. Tests that
//! rely on this should either spin up a dedicated pageserver for wal metadata logging, or
//! only measure read performance.
//!
use anyhow::Result;
use bytes::{BufMut, Bytes, BytesMut};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use zenith_utils::lsn::Lsn;
use crate::{config::PageServerConf, repository::Key, walrecord::DecodedBkpBlock};
pub static WAL_METADATA_FILE: OnceCell<File> = OnceCell::new();
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub struct Page {
spcnode: u32,
dbnode: u32,
relnode: u32,
forknum: u8,
blkno: u32,
}
impl Page {
pub async fn read<Reader>(buf: &mut Reader) -> Result<Page>
where
Reader: tokio::io::AsyncRead + Unpin,
{
let spcnode = buf.read_u32().await?;
let dbnode = buf.read_u32().await?;
let relnode = buf.read_u32().await?;
let forknum = buf.read_u8().await?;
let blkno = buf.read_u32().await?;
Ok(Page {
spcnode,
dbnode,
relnode,
forknum,
blkno,
})
}
pub async fn write(&self, buf: &mut BytesMut) -> Result<()> {
buf.put_u32(self.spcnode);
buf.put_u32(self.dbnode);
buf.put_u32(self.relnode);
buf.put_u8(self.forknum);
buf.put_u32(self.blkno);
Ok(())
}
}
impl From<&DecodedBkpBlock> for Page {
fn from(blk: &DecodedBkpBlock) -> Self {
Page {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
blkno: blk.blkno,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WalEntryMetadata {
pub lsn: Lsn,
pub size: usize,
pub affected_pages: Vec<Page>,
}
pub fn init(conf: &'static PageServerConf) -> Result<()> {
let wal_metadata_file_dir = conf.workdir.join("wal_metadata.log");
WAL_METADATA_FILE
.set(File::create(wal_metadata_file_dir)?)
.expect("wal_metadata file is already created");
Ok(())
}
pub fn write(wal_meta: WalEntryMetadata) -> Result<()> {
if let Some(mut file) = WAL_METADATA_FILE.get() {
let mut line = serde_json::to_string(&wal_meta)?;
line.push('\n');
std::io::prelude::Write::write_all(&mut file, line.as_bytes())?;
}
Ok(())
}

View File

@@ -82,6 +82,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
) -> Result<()> {
let mut modification = timeline.begin_modification(lsn);
let recdata_len = recdata.len();
let mut decoded = decode_wal_record(recdata);
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -249,6 +250,13 @@ impl<'a, R: Repository> WalIngest<'a, R> {
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
}
// Emit wal entry metadata, if configured to do so
crate::wal_metadata::write(crate::wal_metadata::WalEntryMetadata {
lsn,
size: recdata_len,
affected_pages: decoded.blocks.iter().map(|blk| blk.into()).collect(),
});
// If checkpoint data was updated, store the new version in the repository
if self.checkpoint_modified {
let new_checkpoint_bytes = self.checkpoint.encode();

View File

@@ -638,6 +638,13 @@ class ZenithEnv:
""" Get list of safekeeper endpoints suitable for wal_acceptors GUC """
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
def run_psbench(self, timeline):
wal_metadata_filename = os.path.join(self.repo_dir, "wal_metadata.log")
psbench_binpath = os.path.join(str(zenith_binpath), 'psbench')
tenant_hex = self.initial_tenant.hex
args = [psbench_binpath, wal_metadata_filename, tenant_hex, timeline]
return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()
@cached_property
def auth_keys(self) -> AuthKeys:
pub = (Path(self.repo_dir) / 'auth_public_key.pem').read_bytes()

View File

@@ -0,0 +1,46 @@
from contextlib import closing
from fixtures.zenith_fixtures import ZenithEnv, PgBin
from fixtures.benchmark_fixture import MetricReport, ZenithBenchmarker
def test_get_page(zenith_simple_env: ZenithEnv, zenbenchmark: ZenithBenchmarker, pg_bin: PgBin):
env = zenith_simple_env
env.zenith_cli.create_branch("test_pageserver", "empty")
pg = env.postgres.create_start('test_pageserver')
tenant_hex = env.initial_tenant.hex
timeline = pg.safe_psql("SHOW zenith.zenith_timeline")[0][0]
# Long-lived cursor, useful for flushing
psconn = env.pageserver.connect()
pscur = psconn.cursor()
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
workload = "pgbench"
print(f"Running workload {workload}")
if workload == "hot page":
cur.execute('create table t (i integer);')
cur.execute('insert into t values (0);')
for i in range(100000):
cur.execute(f'update t set i = {i};')
elif workload == "pgbench":
pg_bin.run_capture(['pgbench', '-s5', '-i', pg.connstr()])
pg_bin.run_capture(['pgbench', '-c1', '-t5000', pg.connstr()])
elif workload == "pgbench big":
pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()])
pg_bin.run_capture(['pgbench', '-c1', '-t100000', pg.connstr()])
elif workload == "pgbench long":
pg_bin.run_capture(['pgbench', '-s100', '-i', pg.connstr()])
pg_bin.run_capture(['pgbench', '-c1', '-t1000000', pg.connstr()])
pscur.execute(f"checkpoint {env.initial_tenant.hex} {timeline} 0")
output = env.run_psbench(timeline)
for line in output.split("\n"):
tokens = line.split(" ")
report = tokens[0]
name = tokens[1]
value = tokens[2]
unit = tokens[3] if len(tokens) > 3 else ""
zenbenchmark.record(name, value, unit, report=report)