per-task & global mean + percentiles using hdrhistogram

known problem is: one hdrhistogram per task => too much memory usage
This commit is contained in:
Christian Schwarz
2023-11-24 12:34:47 +00:00
parent 857150dcee
commit 568f6ae332
5 changed files with 188 additions and 38 deletions

19
Cargo.lock generated
View File

@@ -1955,6 +1955,20 @@ dependencies = [
"hashbrown 0.13.2",
]
[[package]]
name = "hdrhistogram"
version = "7.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
dependencies = [
"base64 0.21.1",
"byteorder",
"crossbeam-channel",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "heapless"
version = "0.8.0"
@@ -2916,8 +2930,13 @@ version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"hdrhistogram",
"humantime",
"humantime-serde",
"pageserver",
"rand 0.8.5",
"serde",
"serde_json",
"tokio",
"tracing",
"utils",

View File

@@ -80,6 +80,7 @@ futures-util = "0.3"
git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.1"
hdrhistogram = "7.5.2"
hex = "0.4"
hex-literal = "0.4"
hmac = "0.12.1"

View File

@@ -8,7 +8,12 @@ edition = "2021"
[dependencies]
anyhow.workspace = true
clap.workspace = true
hdrhistogram.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tokio.workspace = true

View File

@@ -11,8 +11,7 @@ use utils::logging;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::task::JoinHandle;
use std::time::{Duration, Instant};
/// Measure performance of the GetPage API, targeting the latest LSN.
#[derive(clap::Parser)]
@@ -41,6 +40,92 @@ impl LiveStats {
}
}
#[derive(serde::Serialize)]
struct Output {
per_task: Vec<PerTaskOutput>,
total: PerTaskOutput,
}
const LATENCY_PERCENTILES: [f64; 3] = [99.0, 99.9, 99.99];
struct LatencyPercentiles {
latency_percentiles: [Duration; 3],
}
impl serde::Serialize for LatencyPercentiles {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
for p in LATENCY_PERCENTILES {
ser.serialize_entry(
&format!("p{p}"),
&format!(
"{}",
&humantime::format_duration(self.latency_percentiles[0])
),
)?;
}
ser.end()
}
}
#[derive(serde::Serialize)]
struct PerTaskOutput {
request_count: u64,
#[serde(with = "humantime_serde")]
latency_mean: Duration,
latency_percentiles: LatencyPercentiles,
}
struct PerTaskStats {
latency_histo: hdrhistogram::Histogram<u64>,
}
impl PerTaskStats {
fn new() -> Self {
Self {
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
// which would skew the benchmark results.
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
}
}
fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
let micros: u64 = latency
.as_micros()
.try_into()
.context("latency greater than u64")?;
self.latency_histo
.record(micros)
.context("add to histogram")?;
Ok(())
}
fn output(&self) -> PerTaskOutput {
let latency_percentiles = std::array::from_fn(|idx| {
let micros = self
.latency_histo
.value_at_percentile(LATENCY_PERCENTILES[idx]);
Duration::from_micros(micros)
});
PerTaskOutput {
request_count: self.latency_histo.len(),
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
latency_percentiles: LatencyPercentiles {
latency_percentiles,
},
}
}
fn add(&mut self, other: &Self) {
let Self {
ref mut latency_histo,
} = self;
latency_histo.add(&other.latency_histo).unwrap();
}
}
pub(crate) async fn main(args: Args) -> anyhow::Result<()> {
logging::init(
logging::LogFormat::Plain,
@@ -92,6 +177,7 @@ pub(crate) async fn main(args: Args) -> anyhow::Result<()> {
let num_work_tasks = tenant_timelines.len() * args.num_tasks;
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks + 1));
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_work_tasks));
tokio::spawn({
let stats = Arc::clone(&stats);
@@ -113,22 +199,44 @@ pub(crate) async fn main(args: Args) -> anyhow::Result<()> {
let mut tasks = Vec::new();
for (tenant_id, timeline_id) in tenant_timelines {
let stats = Arc::clone(&stats);
let live_stats = Arc::clone(&stats);
let t = tokio::spawn(timeline(
args,
client.clone(),
tenant_id,
timeline_id,
Arc::clone(&start_work_barrier),
stats,
Arc::clone(&all_work_done_barrier),
live_stats,
));
tasks.push(t);
tasks.push(((tenant_id, timeline_id), t));
}
for t in tasks {
t.await.unwrap().unwrap();
let mut per_timeline: Vec<((TenantId, TimelineId), Vec<PerTaskStats>)> =
Vec::with_capacity(tasks.len());
for (key, task) in tasks {
per_timeline.push((key, task.await.unwrap().unwrap()));
}
let per_task: Vec<(_, _)> = per_timeline
.iter()
.flat_map(|(k, task_stats)| task_stats.into_iter().map(|s| (*k, s)))
.collect();
let output = Output {
total: {
let mut agg_stats = PerTaskStats::new();
for (_, stats) in &per_task {
agg_stats.add(stats);
}
agg_stats.output()
},
per_task: per_task.iter().map(|(_, s)| s.output()).collect(),
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");
anyhow::Ok(())
}
@@ -138,8 +246,9 @@ async fn timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
start_work_barrier: Arc<Barrier>,
all_work_done_barrier: Arc<Barrier>,
stats: Arc<LiveStats>,
) -> anyhow::Result<()> {
) -> anyhow::Result<Vec<PerTaskStats>> {
let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?;
let lsn = partitioning.at_lsn;
@@ -181,14 +290,16 @@ async fn timeline(
let ranges = Arc::new(ranges);
let weights = Arc::new(weights);
let mut tasks = Vec::<JoinHandle<()>>::new();
let mut tasks = Vec::new();
for _i in 0..args.num_tasks {
let ranges = ranges.clone();
let _weights = weights.clone();
let start_work_barrier = Arc::clone(&start_work_barrier);
let task = tokio::spawn({
let stats = Arc::clone(&stats);
let all_work_done_barrier = Arc::clone(&all_work_done_barrier);
let jh = tokio::spawn({
let live_stats = Arc::clone(&stats);
async move {
let mut getpage_client = pageserver::client::page_service::Client::new(
args.page_service_connstring.clone(),
@@ -197,6 +308,9 @@ async fn timeline(
)
.await
.unwrap();
let mut stats = PerTaskStats::new();
start_work_barrier.wait().await;
for _i in 0..args.num_requests {
let key = {
@@ -208,6 +322,7 @@ async fn timeline(
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
RelTagBlockNo { rel_tag, block_no }
};
let start = Instant::now();
getpage_client
.getpage(key, lsn)
.await
@@ -215,17 +330,23 @@ async fn timeline(
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
})
.unwrap();
stats.inc();
let elapsed = start.elapsed();
live_stats.inc();
stats.observe(elapsed).unwrap();
}
all_work_done_barrier.wait().await;
getpage_client.shutdown().await;
stats
}
});
tasks.push(task);
tasks.push(jh);
}
let mut ret = Vec::with_capacity(tasks.len());
for task in tasks {
task.await.unwrap();
ret.push(task.await.unwrap());
}
Ok(())
Ok(ret)
}

View File

@@ -1,4 +1,3 @@
import json
from pathlib import Path
import shutil
@@ -11,7 +10,10 @@ from fixtures.types import TenantId
from fixtures.log_helper import log
from fixtures.benchmark_fixture import NeonBenchmarker
def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, pg_bin: PgBin):
def test_getpage_throughput(
neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, pg_bin: PgBin
):
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
@@ -26,15 +28,15 @@ def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: Neon
# create our template tenant
tenant_config_mgmt_api = {
"gc_period" : '0s',
"checkpoint_timeout" : '3650 day',
"compaction_period" : '20 s',
"compaction_threshold" : 10,
"compaction_target_size" : 134217728,
"checkpoint_distance" : 268435456,
"image_creation_threshold" : 3,
"gc_period": "0s",
"checkpoint_timeout": "3650 day",
"compaction_period": "20 s",
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
tenant_config_cli = { k: str(v) for k, v in tenant_config_mgmt_api.items() }
tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()}
template_tenant, template_timeline = env.neon_cli.create_tenant(conf=tenant_config_cli)
template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"])
@@ -46,18 +48,15 @@ def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: Neon
# stop PS just for good measure
env.pageserver.stop()
# duplicate the tenant in remote stora
# duplicate the tenant in remote storage
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
tenants = [template_tenant]
for i in range(0, 10):
new_tenant = TenantId.generate()
tenants.append(new_tenant)
log.info("Duplicating tenant #%s: %s", i, new_tenant)
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
@@ -86,7 +85,9 @@ def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: Neon
env.pageserver.start()
assert ps_http.tenant_list() == []
for tenant in tenants:
ps_http.tenant_attach(tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen+1)
ps_http.tenant_attach(
tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen + 1
)
for tenant in tenants:
wait_until_tenant_active(ps_http, tenant)
@@ -95,20 +96,23 @@ def test_getpage_throughput(neon_env_builder: NeonEnvBuilder, zenbenchmark: Neon
for tenant in tenants:
ps_http.download_all_layers(tenant, template_timeline)
# run the benchmark
# run the benchmark with one client per timeline, each doing 10k requests to random keys.
cmd = [
str(env.neon_binpath / "pagebench"),
"--mgmt-api-endpoint", ps_http.base_url,
"--page-service-connstring", env.pageserver.connstr(password=None),
"--num-tasks", "1",
"--num-requests", "10000",
"get-page-latest-lsn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--num-tasks",
"1",
"--num-requests",
"10000",
*[str(tenant) for tenant in tenants],
]
basepath = pg_bin.run_capture(cmd)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, 'r') as f:
with open(results_path, "r") as f:
results = json.load(f)