Add multiple clients

This commit is contained in:
Bojan Serafimov
2022-04-27 09:26:56 -04:00
parent c2adb7ac2d
commit ccb5df93ef
5 changed files with 51 additions and 21 deletions

View File

@@ -4,7 +4,9 @@
//! of the tester matters, and the pagestream API is easier to call from rust.
use bytes::{BufMut, BytesMut};
use clap::{Parser, Subcommand};
use futures::future;
use pageserver::wal_metadata::{Page, WalEntryMetadata};
use rand::thread_rng;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::time::Instant;
@@ -19,6 +21,7 @@ use zenith_utils::{
lsn::Lsn,
pq_proto::{BeMessage, FeMessage},
};
use rand::seq::SliceRandom;
use anyhow::Result;
@@ -108,6 +111,7 @@ impl PagestreamApi {
/// Parsed wal_metadata file with additional derived
/// statistics for convenience.
#[derive(Clone)]
struct Metadata {
// Parsed from metadata file
wal_metadata: Vec<WalEntryMetadata>,
@@ -146,6 +150,9 @@ impl Metadata {
/// Print results in a format readable by benchmark_fixture.py
fn report_latency(&self, latencies: &[Duration]) -> Result<()> {
let mut latencies: Vec<&Duration> = latencies.iter().collect();
latencies.sort();
println!("test_param num_pages {}", self.affected_pages.len());
println!("test_param num_wal_entries {}", self.wal_metadata.len());
println!("test_param total_wal_size {} bytes", self.total_wal_size);
@@ -157,6 +164,11 @@ impl Metadata {
"lower_is_better median {:?} microseconds",
latencies[latencies.len() / 2].as_micros()
);
println!(
"lower_is_better average {:.3} microseconds",
(latencies.iter().map(|l| l.as_micros()).sum::<u128>() as f64) / (
latencies.len() as f64)
);
println!(
"lower_is_better p99 {:?} microseconds",
latencies[latencies.len() - 1 - latencies.len() / 100].as_micros()
@@ -172,47 +184,59 @@ impl Metadata {
/// Sequentially get the latest version of each page and report latencies
async fn test_latest_pages(api: &mut PagestreamApi, metadata: &Metadata) -> Result<Vec<Duration>> {
let mut latencies: Vec<Duration> = vec![];
for page in &metadata.affected_pages {
let mut page_order: Vec<&Page> = metadata.affected_pages.iter().collect();
page_order.shuffle(&mut thread_rng());
for page in page_order {
let start = Instant::now();
let _page_bytes = api.get_page(&metadata.latest_lsn, page, true).await?;
let duration = start.elapsed();
latencies.push(duration);
}
latencies.sort();
Ok(latencies)
}
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
struct Args {
wal_metadata_path: PathBuf,
// TODO get these from wal metadata
// TODO test multi-timeline parallel reads
tenant_hex: String,
timeline: String,
// TODO change to `clients_per_timeline`
#[clap(long, default_value = "1")]
num_clients: usize,
#[clap(subcommand)]
test: PsbenchTest,
}
#[derive(Subcommand, Debug)]
#[derive(Subcommand, Debug, Clone)]
enum PsbenchTest {
GetLatestPages,
}
async fn run_client(args: Args, metadata: Metadata) -> Vec<Duration> {
let mut pagestream = PagestreamApi::connect(&args.tenant_hex, &args.timeline).await.unwrap();
match args.test {
PsbenchTest::GetLatestPages => test_latest_pages(&mut pagestream, &metadata),
}
.await.unwrap()
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
let metadata = Metadata::build(&args.wal_metadata_path).unwrap();
// Initialize setup
let metadata = Metadata::build(&args.wal_metadata_path)?;
let mut pagestream = PagestreamApi::connect(&args.tenant_hex, &args.timeline).await?;
// TODO explicitly spawn a thread for each?
let latencies: Vec<Duration> = future::join_all((0..args.num_clients).map(|_| {
run_client(args.clone(), metadata.clone())
})).await.into_iter().flatten().collect();
// Run test
let latencies = match args.test {
PsbenchTest::GetLatestPages => test_latest_pages(&mut pagestream, &metadata),
}
.await?;
// Report results
metadata.report_latency(&latencies)?;
println!("test_param num_clients {}", args.num_clients);
metadata.report_latency(&latencies).unwrap();
Ok(())
}

View File

@@ -73,7 +73,8 @@ impl From<&DecodedBkpBlock> for Page {
}
}
#[derive(Debug, Serialize, Deserialize)]
// TODO include tenant and timeline
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct WalEntryMetadata {
pub lsn: Lsn,
pub size: usize,

View File

@@ -232,7 +232,7 @@ class ZenithBenchmarker:
'',
MetricReport.TEST_PARAM)
def record_psbench_result(self, psbench_output):
def record_psbench_result(self, prefix, psbench_output):
"""Record results from pageserver benchmarker."""
for line in psbench_output.split("\n"):
tokens = line.split(" ")
@@ -240,7 +240,7 @@ class ZenithBenchmarker:
name = tokens[1]
value = tokens[2]
unit = tokens[3] if len(tokens) > 3 else ""
self.record(name, value, unit, report=report)
self.record(f"{prefix}_{name}", value, unit, report=report)
def get_io_writes(self, pageserver) -> int:
"""

View File

@@ -1271,13 +1271,15 @@ class PsbenchBin:
"""A helper class for running the pageserver benchmarker tool."""
wal_metadata_path: str
def test_latest_pages(self, tenant_hex: str, timeline: str) -> str:
def test_latest_pages(self, tenant_hex: str, timeline: str, num_clients=None) -> str:
num_clients = num_clients or 1
psbench_binpath = os.path.join(str(zenith_binpath), 'psbench')
args = [
psbench_binpath,
self.wal_metadata_path,
tenant_hex,
timeline,
f"--num-clients={num_clients}",
"get-latest-pages",
]
return subprocess.run(args, capture_output=True).stdout.decode("UTF-8").strip()

View File

@@ -50,5 +50,8 @@ def test_get_page(zenith_env_builder: ZenithEnvBuilder,
pscur.execute(f"checkpoint {env.initial_tenant.hex} {timeline} 0")
output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline)
zenbenchmark.record_psbench_result(output)
output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline, num_clients=1)
zenbenchmark.record_psbench_result("1_client", output)
output = psbench_bin.test_latest_pages(env.initial_tenant.hex, timeline, num_clients=8)
zenbenchmark.record_psbench_result("8_clients", output)