Compare commits

...

16 Commits

Author SHA1 Message Date
Alexander Bayandin
d1bcecebde test_pageserver: make compatible with the latest code 2023-12-20 19:53:34 +00:00
Alexander Bayandin
e49f2d72af test_pageserver: add snapshotting_env fixture 2023-12-20 17:45:54 +00:00
Alexander Bayandin
c3ab86dc6a Add NeonBenchmarker#record_pagebench_results method 2023-12-20 17:45:45 +00:00
Christian Schwarz
163083665e python script to duplicate tenants 2023-12-20 16:31:22 +00:00
Christian Schwarz
8dcf78d1af WIP: performance test that uses the getpage benchmark 2023-12-20 16:31:22 +00:00
Christian Schwarz
5f7e821a62 make CI happy 2023-12-20 15:53:21 +00:00
Christian Schwarz
c417a23dd0 pagebench: factor out the concept of thread local stats 2023-12-18 18:32:22 +00:00
Christian Schwarz
20e5e9dd16 pagebench: finish trigger initial logical size calculation benchmark 2023-12-18 18:32:22 +00:00
Christian Schwarz
24c72db5ff pagebench: centralize target discovery 2023-12-18 18:32:22 +00:00
Christian Schwarz
6aee8511f7 pagebench: getpage: WIP: when auto-discovering timelines, add ability to limit 2023-12-18 18:32:22 +00:00
Christian Schwarz
ad2091bdd0 pagebench: WIP: command to trigger initial logical size calculation 2023-12-18 18:32:21 +00:00
Christian Schwarz
573d4752e6 pagebench: add a 'getpage@lsn' benchmark 2023-12-18 18:32:21 +00:00
Christian Schwarz
136bec6014 pagebench: add a 'basebackup' benchmark 2023-12-18 18:32:21 +00:00
Christian Schwarz
0f8b4faa50 pagebench: scaffold 2023-12-18 18:32:21 +00:00
Christian Schwarz
5b42949531 Merge branch 'main' into problame/benchmarking/pr/timeline-ids-in-tenant-details 2023-12-18 19:22:19 +01:00
Christian Schwarz
4a6dfb0ccb include timeline ids in tenant details response 2023-12-18 15:12:48 +00:00
32 changed files with 1535 additions and 20 deletions

36
Cargo.lock generated
View File

@@ -2106,6 +2106,20 @@ dependencies = [
"hashbrown 0.13.2",
]
[[package]]
name = "hdrhistogram"
version = "7.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
dependencies = [
"base64 0.21.1",
"byteorder",
"crossbeam-channel",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "heapless"
version = "0.8.0"
@@ -3056,6 +3070,28 @@ dependencies = [
"sha2",
]
[[package]]
name = "pagebench"
version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"futures",
"hdrhistogram",
"humantime",
"humantime-serde",
"pageserver",
"pageserver_api",
"pageserver_client",
"rand 0.8.5",
"serde",
"serde_json",
"tokio",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "pagectl"
version = "0.1.0"

View File

@@ -6,6 +6,7 @@ members = [
"pageserver",
"pageserver/ctl",
"pageserver/client",
"pageserver/pagebench",
"proxy",
"safekeeper",
"storage_broker",
@@ -79,6 +80,7 @@ futures-util = "0.3"
git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.1"
hdrhistogram = "7.5.2"
hex = "0.4"
hex-literal = "0.4"
hmac = "0.12.1"

View File

@@ -368,6 +368,16 @@ pub struct TenantInfo {
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub attachment_status: TenantAttachmentStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantDetails {
#[serde(flatten)]
pub tenant_info: TenantInfo,
pub timelines: Vec<TimelineId>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
@@ -914,6 +924,7 @@ mod tests {
state: TenantState::Active,
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_active = json!({
"id": original_active.id.to_string(),
@@ -934,6 +945,7 @@ mod tests {
},
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_broken = json!({
"id": original_broken.id.to_string(),

View File

@@ -81,6 +81,10 @@ impl TenantShardId {
pub fn is_zero(&self) -> bool {
self.shard_number == ShardNumber(0)
}
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
}
/// Formatting helper

View File

@@ -366,6 +366,47 @@ impl MonotonicCounter<Lsn> for RecordLsn {
}
}
/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s.
pub struct LsnSampler(<u64 as rand::distributions::uniform::SampleUniform>::Sampler);
impl rand::distributions::uniform::SampleUniform for Lsn {
type Sampler = LsnSampler;
}
impl rand::distributions::uniform::UniformSampler for LsnSampler {
type X = Lsn;
fn new<B1, B2>(low: B1, high: B2) -> Self
where
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
{
Self(
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new(
low.borrow().0,
high.borrow().0,
),
)
}
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Self
where
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
{
Self(
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new_inclusive(
low.borrow().0,
high.borrow().0,
),
)
}
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> Self::X {
Lsn(self.0.sample(rng))
}
}
#[cfg(test)]
mod tests {
use crate::bin_ser::BeSer;

View File

@@ -64,6 +64,18 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn tenant_details(
&self,
tenant_id: TenantId,
) -> Result<pageserver_api::models::TenantDetails> {
let uri = format!("{}/v1/tenant/{tenant_id}", self.mgmt_api_endpoint);
self.get(uri)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn list_timelines(
&self,
tenant_id: TenantId,

View File

@@ -0,0 +1,26 @@
[package]
name = "pagebench"
version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow.workspace = true
clap.workspace = true
futures.workspace = true
hdrhistogram.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tokio.workspace = true
pageserver = { path = ".." }
pageserver_client.workspace = true
pageserver_api.workspace = true
utils = { path = "../../libs/utils/" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -0,0 +1,273 @@
use anyhow::Context;
use pageserver_client::page_service::BasebackupRequest;
use utils::lsn::Lsn;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{debug, info, instrument};
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use crate::cli;
use crate::util::tenant_timeline_id::TenantTimelineId;
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
/// basebackup@LatestLSN
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "localhost:64000")]
page_service_host_port: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
#[clap(long, default_value = "1.0")]
gzip_probability: f64,
#[clap(long)]
runtime: Option<humantime::Duration>,
#[clap(long)]
limit_to_first_n_targets: Option<usize>,
targets: Option<Vec<TenantTimelineId>>,
}
#[derive(Debug, Default)]
struct LiveStats {
completed_requests: AtomicU64,
}
impl LiveStats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
struct Target {
timeline: TenantTimelineId,
lsn_range: Option<Range<Lsn>>,
}
#[derive(serde::Serialize)]
struct Output {
total: request_stats::Output,
}
tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
main_impl(args, thread_local_stats)
})
}
async fn main_impl(
args: Args,
all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
// discover targets
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
&mgmt_api_client,
cli::targets::Spec {
limit_to_first_n_targets: args.limit_to_first_n_targets,
targets: args.targets.clone(),
},
)
.await?;
let mut js = JoinSet::new();
for timeline in &timelines {
js.spawn({
let timeline = *timeline;
// FIXME: this triggers initial logical size calculation
// https://github.com/neondatabase/neon/issues/6168
let info = mgmt_api_client
.timeline_info(timeline.tenant_id, timeline.timeline_id)
.await
.unwrap();
async move {
anyhow::Ok(Target {
timeline,
// TODO: support lsn_range != latest LSN
lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
})
}
});
}
let mut all_targets: Vec<Target> = Vec::new();
while let Some(res) = js.join_next().await {
all_targets.push(res.unwrap().unwrap());
}
let live_stats = Arc::new(LiveStats::default());
let num_client_tasks = timelines.len();
let num_live_stats_dump = 1;
let num_work_sender_tasks = 1;
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
));
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
tokio::spawn({
let stats = Arc::clone(&live_stats);
let start_work_barrier = Arc::clone(&start_work_barrier);
async move {
start_work_barrier.wait().await;
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
info!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut work_senders = HashMap::new();
let mut tasks = Vec::new();
for tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
tasks.push(tokio::spawn(client(
args,
*tl,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
Arc::clone(&live_stats),
)));
}
let work_sender = async move {
start_work_barrier.wait().await;
loop {
let (timeline, work) = {
let mut rng = rand::thread_rng();
let target = all_targets.choose(&mut rng).unwrap();
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
(
target.timeline,
Work {
lsn,
gzip: rng.gen_bool(args.gzip_probability),
},
)
};
let sender = work_senders.get(&timeline).unwrap();
// TODO: what if this blocks?
sender.send(work).await.ok().unwrap();
}
};
if let Some(runtime) = args.runtime {
match tokio::time::timeout(runtime.into(), work_sender).await {
Ok(()) => unreachable!("work sender never terminates"),
Err(_timeout) => {
// this implicitly drops the work_senders, making all the clients exit
}
}
} else {
work_sender.await;
unreachable!("work sender never terminates");
}
for t in tasks {
t.await.unwrap();
}
let output = Output {
total: {
let mut agg_stats = request_stats::Stats::new();
for stats in all_thread_local_stats.lock().unwrap().iter() {
let stats = stats.lock().unwrap();
agg_stats.add(&stats);
}
agg_stats.output()
},
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");
anyhow::Ok(())
}
#[derive(Copy, Clone)]
struct Work {
lsn: Option<Lsn>,
gzip: bool,
}
#[instrument(skip_all)]
async fn client(
args: &'static Args,
timeline: TenantTimelineId,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<Work>,
all_work_done_barrier: Arc<Barrier>,
live_stats: Arc<LiveStats>,
) {
start_work_barrier.wait().await;
let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring(
&args.page_service_host_port,
args.pageserver_jwt.as_deref(),
))
.await
.unwrap();
while let Some(Work { lsn, gzip }) = work.recv().await {
let start = Instant::now();
let copy_out_stream = client
.basebackup(&BasebackupRequest {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
lsn,
gzip,
})
.await
.with_context(|| format!("start basebackup for {timeline}"))
.unwrap();
use futures::StreamExt;
let size = Arc::new(AtomicUsize::new(0));
copy_out_stream
.for_each({
|r| {
let size = Arc::clone(&size);
async move {
let size = Arc::clone(&size);
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
}
}
})
.await;
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
});
}
all_work_done_barrier.wait().await;
}

View File

@@ -0,0 +1 @@
pub(crate) mod targets;

View File

@@ -0,0 +1,37 @@
use std::sync::Arc;
use pageserver_client::mgmt_api;
use tracing::info;
use crate::util::{
discover_timelines::get_pageserver_tenant_timelines, tenant_timeline_id::TenantTimelineId,
};
pub(crate) struct Spec {
pub(crate) limit_to_first_n_targets: Option<usize>,
pub(crate) targets: Option<Vec<TenantTimelineId>>,
}
pub(crate) async fn discover(
api_client: &Arc<mgmt_api::Client>,
spec: Spec,
) -> anyhow::Result<Vec<TenantTimelineId>> {
let mut timelines = if let Some(targets) = spec.targets {
targets
} else {
get_pageserver_tenant_timelines(api_client).await?
};
if let Some(limit) = spec.limit_to_first_n_targets {
timelines.sort(); // for determinism
timelines.truncate(limit);
if timelines.len() < limit {
anyhow::bail!("pageserver has less than limit_to_first_n_targets={limit} tenants");
}
}
info!("timelines:\n{:?}", timelines);
info!("number of timelines:\n{:?}", timelines.len());
Ok(timelines)
}

View File

@@ -0,0 +1,337 @@
use anyhow::Context;
use futures::future::join_all;
use pageserver::pgdatadir_mapping::key_to_rel_block;
use pageserver::repository;
use pageserver_api::key::is_rel_block_key;
use pageserver_client::page_service::RelTagBlockNo;
use utils::lsn::Lsn;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{info, instrument};
use std::collections::HashMap;
use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::cli;
use crate::util::tenant_timeline_id::TenantTimelineId;
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
page_service_connstring: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
#[clap(long)]
runtime: Option<humantime::Duration>,
#[clap(long)]
per_target_rate_limit: Option<usize>,
#[clap(long)]
limit_to_first_n_targets: Option<usize>,
targets: Option<Vec<TenantTimelineId>>,
}
#[derive(Debug, Default)]
struct LiveStats {
completed_requests: AtomicU64,
}
impl LiveStats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Clone)]
struct KeyRange {
timeline: TenantTimelineId,
timeline_lsn: Lsn,
start: i128,
end: i128,
}
impl KeyRange {
fn len(&self) -> i128 {
self.end - self.start
}
}
#[derive(serde::Serialize)]
struct Output {
total: request_stats::Output,
}
tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
main_impl(args, thread_local_stats)
})
}
async fn main_impl(
args: Args,
all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
// discover targets
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
&mgmt_api_client,
cli::targets::Spec {
limit_to_first_n_targets: args.limit_to_first_n_targets,
targets: args.targets.clone(),
},
)
.await?;
let mut js = JoinSet::new();
for timeline in &timelines {
js.spawn({
let mgmt_api_client = Arc::clone(&mgmt_api_client);
let timeline = *timeline;
async move {
let partitioning = mgmt_api_client
.keyspace(timeline.tenant_id, timeline.timeline_id)
.await?;
let lsn = partitioning.at_lsn;
let ranges = partitioning
.keys
.ranges
.iter()
.filter_map(|r| {
let start = r.start;
let end = r.end;
// filter out non-relblock keys
match (is_rel_block_key(&start), is_rel_block_key(&end)) {
(true, true) => Some(KeyRange {
timeline,
timeline_lsn: lsn,
start: start.to_i128(),
end: end.to_i128(),
}),
(true, false) | (false, true) => {
unimplemented!("split up range")
}
(false, false) => None,
}
})
.collect::<Vec<_>>();
anyhow::Ok(ranges)
}
});
}
let mut all_ranges: Vec<KeyRange> = Vec::new();
while let Some(res) = js.join_next().await {
all_ranges.extend(res.unwrap().unwrap());
}
let live_stats = Arc::new(LiveStats::default());
let num_client_tasks = timelines.len();
let num_live_stats_dump = 1;
let num_work_sender_tasks = 1;
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
));
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
tokio::spawn({
let stats = Arc::clone(&live_stats);
let start_work_barrier = Arc::clone(&start_work_barrier);
async move {
start_work_barrier.wait().await;
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
info!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut work_senders = HashMap::new();
let mut tasks = Vec::new();
for tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
tasks.push(tokio::spawn(client(
args,
*tl,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
Arc::clone(&live_stats),
)));
}
let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = match args.per_target_rate_limit {
None => Box::pin(async move {
let weights = rand::distributions::weighted::WeightedIndex::new(
all_ranges.iter().map(|v| v.len()),
)
.unwrap();
start_work_barrier.wait().await;
loop {
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &all_ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
let sender = work_senders.get(&range.timeline).unwrap();
// TODO: what if this blocks?
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
}),
Some(rps_limit) => Box::pin(async move {
let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
let make_timeline_task: &dyn Fn(
TenantTimelineId,
)
-> Pin<Box<dyn Send + Future<Output = ()>>> = &|timeline| {
let sender = work_senders.get(&timeline).unwrap();
let ranges: Vec<KeyRange> = all_ranges
.iter()
.filter(|r| r.timeline == timeline)
.cloned()
.collect();
let weights = rand::distributions::weighted::WeightedIndex::new(
ranges.iter().map(|v| v.len()),
)
.unwrap();
Box::pin(async move {
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(
/* TODO review this choice */
tokio::time::MissedTickBehavior::Burst,
);
loop {
ticker.tick().await;
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) = key_to_rel_block(key)
.expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
})
};
let tasks: Vec<_> = work_senders
.keys()
.map(|tl| make_timeline_task(**tl))
.collect();
start_work_barrier.wait().await;
join_all(tasks).await;
}),
};
if let Some(runtime) = args.runtime {
match tokio::time::timeout(runtime.into(), work_sender).await {
Ok(()) => unreachable!("work sender never terminates"),
Err(_timeout) => {
// this implicitly drops the work_senders, making all the clients exit
}
}
} else {
work_sender.await;
unreachable!("work sender never terminates");
}
for t in tasks {
t.await.unwrap();
}
let output = Output {
total: {
let mut agg_stats = request_stats::Stats::new();
for stats in all_thread_local_stats.lock().unwrap().iter() {
let stats = stats.lock().unwrap();
agg_stats.add(&stats);
}
agg_stats.output()
},
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");
anyhow::Ok(())
}
#[instrument(skip_all)]
async fn client(
args: &'static Args,
timeline: TenantTimelineId,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<(RelTagBlockNo, Lsn)>,
all_work_done_barrier: Arc<Barrier>,
live_stats: Arc<LiveStats>,
) {
start_work_barrier.wait().await;
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(timeline.tenant_id, timeline.timeline_id)
.await
.unwrap();
while let Some((key, lsn)) = work.recv().await {
let start = Instant::now();
client
.getpage(key, lsn)
.await
.with_context(|| format!("getpage for {timeline}"))
.unwrap();
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
});
}
all_work_done_barrier.wait().await;
}

View File

@@ -0,0 +1,34 @@
use clap::Parser;
use utils::logging;
pub(crate) mod cli;
pub(crate) mod util;
mod basebackup;
mod getpage_latest_lsn;
mod trigger_initial_size_calculation;
/// Component-level performance test for pageserver.
#[derive(clap::Parser)]
enum Args {
Basebackup(basebackup::Args),
GetPageLatestLsn(getpage_latest_lsn::Args),
TriggerInitialSizeCalculation(trigger_initial_size_calculation::Args),
}
fn main() {
logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,
)
.unwrap();
let args = Args::parse();
match args {
Args::Basebackup(args) => basebackup::main(args),
Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args),
Args::TriggerInitialSizeCalculation(args) => trigger_initial_size_calculation::main(args),
}
.unwrap()
}

View File

@@ -0,0 +1,86 @@
use std::sync::Arc;
use humantime::Duration;
use tokio::task::JoinSet;
use crate::{cli, util::tenant_timeline_id::TenantTimelineId};
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "localhost:64000")]
page_service_host_port: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(
long,
help = "if specified, poll mgmt api to check whether init logical size calculation has completed"
)]
poll_for_completion: Option<Duration>,
#[clap(long)]
limit_to_first_n_targets: Option<usize>,
targets: Option<Vec<TenantTimelineId>>,
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let main_task = rt.spawn(main_impl(args));
rt.block_on(main_task).unwrap()
}
async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
// discover targets
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
&mgmt_api_client,
cli::targets::Spec {
limit_to_first_n_targets: args.limit_to_first_n_targets,
targets: args.targets.clone(),
},
)
.await?;
// kick it off
let mut js = JoinSet::new();
for tl in timelines {
let mgmt_api_client = Arc::clone(&mgmt_api_client);
js.spawn(async move {
// TODO: API to explicitly trigger initial logical size computation.
// Should probably also avoid making it a side effect of timeline details to trigger initial logical size calculation.
// => https://github.com/neondatabase/neon/issues/6168
let info = mgmt_api_client
.timeline_info(tl.tenant_id, tl.timeline_id)
.await
.unwrap();
if let Some(period) = args.poll_for_completion {
let mut ticker = tokio::time::interval(period.into());
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut info = info;
while !info.current_logical_size_is_accurate {
ticker.tick().await;
info = mgmt_api_client
.timeline_info(tl.tenant_id, tl.timeline_id)
.await
.unwrap();
}
}
});
}
while let Some(res) = js.join_next().await {
let _: () = res.unwrap();
}
Ok(())
}

View File

@@ -0,0 +1,6 @@
pub(crate) mod connstring;
pub(crate) mod discover_timelines;
pub(crate) mod request_stats;
pub(crate) mod tenant_timeline_id;
#[macro_use]
pub(crate) mod tokio_thread_local_stats;

View File

@@ -0,0 +1,8 @@
pub(crate) fn connstring(host_port: &str, jwt: Option<&str>) -> String {
let colon_and_jwt = if let Some(jwt) = jwt {
format!(":{jwt}") // TODO: urlescape
} else {
String::new()
};
format!("postgres://postgres{colon_and_jwt}@{host_port}")
}

View File

@@ -0,0 +1,45 @@
use std::sync::Arc;
use pageserver_client::mgmt_api;
use tokio::task::JoinSet;
use utils::id::TenantId;
use super::tenant_timeline_id::TenantTimelineId;
pub(crate) async fn get_pageserver_tenant_timelines(
api_client: &Arc<mgmt_api::Client>,
) -> anyhow::Result<Vec<TenantTimelineId>> {
let mut timelines: Vec<TenantTimelineId> = Vec::new();
let mut tenants: Vec<TenantId> = Vec::new();
for ti in api_client.list_tenants().await? {
if !ti.id.is_unsharded() {
anyhow::bail!(
"only unsharded tenants are supported at this time: {}",
ti.id
);
}
tenants.push(ti.id.tenant_id)
}
let mut js = JoinSet::new();
for tenant_id in tenants {
js.spawn({
let mgmt_api_client = Arc::clone(api_client);
async move {
(
tenant_id,
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
)
}
});
}
while let Some(res) = js.join_next().await {
let (tenant_id, details) = res.unwrap();
for timeline_id in details.timelines {
timelines.push(TenantTimelineId {
tenant_id,
timeline_id,
});
}
}
Ok(timelines)
}

View File

@@ -0,0 +1,88 @@
use std::time::Duration;
use anyhow::Context;
pub(crate) struct Stats {
latency_histo: hdrhistogram::Histogram<u64>,
}
impl Stats {
pub(crate) fn new() -> Self {
Self {
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
// which would skew the benchmark results.
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
}
}
pub(crate) fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
let micros: u64 = latency
.as_micros()
.try_into()
.context("latency greater than u64")?;
self.latency_histo
.record(micros)
.context("add to histogram")?;
Ok(())
}
pub(crate) fn output(&self) -> Output {
let latency_percentiles = std::array::from_fn(|idx| {
let micros = self
.latency_histo
.value_at_percentile(LATENCY_PERCENTILES[idx]);
Duration::from_micros(micros)
});
Output {
request_count: self.latency_histo.len(),
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
latency_percentiles: LatencyPercentiles {
latency_percentiles,
},
}
}
pub(crate) fn add(&mut self, other: &Self) {
let Self {
ref mut latency_histo,
} = self;
latency_histo.add(&other.latency_histo).unwrap();
}
}
impl Default for Stats {
fn default() -> Self {
Self::new()
}
}
const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99];
struct LatencyPercentiles {
latency_percentiles: [Duration; 4],
}
impl serde::Serialize for LatencyPercentiles {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
for p in LATENCY_PERCENTILES {
ser.serialize_entry(
&format!("p{p}"),
&format!(
"{}",
&humantime::format_duration(self.latency_percentiles[0])
),
)?;
}
ser.end()
}
}
#[derive(serde::Serialize)]
pub(crate) struct Output {
request_count: u64,
#[serde(with = "humantime_serde")]
latency_mean: Duration,
latency_percentiles: LatencyPercentiles,
}

View File

@@ -0,0 +1,34 @@
use std::str::FromStr;
use anyhow::Context;
use utils::id::{TenantId, TimelineId};
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
pub(crate) struct TenantTimelineId {
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
}
impl FromStr for TenantTimelineId {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (tenant_id, timeline_id) = s
.split_once('/')
.context("tenant and timeline id must be separated by `/`")?;
let tenant_id = TenantId::from_str(tenant_id)
.with_context(|| format!("invalid tenant id: {tenant_id:?}"))?;
let timeline_id = TimelineId::from_str(timeline_id)
.with_context(|| format!("invalid timeline id: {timeline_id:?}"))?;
Ok(Self {
tenant_id,
timeline_id,
})
}
}
impl std::fmt::Display for TenantTimelineId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
}
}

View File

@@ -0,0 +1,45 @@
pub(crate) type ThreadLocalStats<T> = Arc<Mutex<T>>;
pub(crate) type AllThreadLocalStats<T> = Arc<Mutex<Vec<ThreadLocalStats<T>>>>;
macro_rules! declare {
($THREAD_LOCAL_NAME:ident: $T:ty) => {
thread_local! {
pub static $THREAD_LOCAL_NAME: std::cell::RefCell<crate::util::tokio_thread_local_stats::ThreadLocalStats<$T>> = std::cell::RefCell::new(
std::sync::Arc::new(std::sync::Mutex::new(Default::default()))
);
}
};
}
use std::sync::{Arc, Mutex};
pub(crate) use declare;
macro_rules! main {
($THREAD_LOCAL_NAME:ident, $main_impl:expr) => {{
let main_impl = $main_impl;
let all = Arc::new(Mutex::new(Vec::new()));
let rt = tokio::runtime::Builder::new_multi_thread()
.on_thread_start({
let all = Arc::clone(&all);
move || {
// pre-initialize the thread local stats by accessesing them
// (some stats like requests_stats::Stats are quite costly to initialize,
// we don't want to pay that cost during the measurement period)
$THREAD_LOCAL_NAME.with(|stats| {
let stats: Arc<_> = Arc::clone(&*stats.borrow());
all.lock().unwrap().push(stats);
});
}
})
.enable_all()
.build()
.unwrap();
let main_task = rt.spawn(main_impl(all));
rt.block_on(main_task).unwrap()
}};
}
pub(crate) use main;

View File

@@ -267,7 +267,7 @@ async fn calculate_synthetic_size_worker(
}
};
for (tenant_shard_id, tenant_state) in tenants {
for (tenant_shard_id, tenant_state, _gen) in tenants {
if tenant_state != TenantState::Active {
continue;
}

View File

@@ -196,7 +196,7 @@ pub(super) async fn collect_all_metrics(
}
};
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
if state != TenantState::Active || !id.is_zero() {
None
} else {

View File

@@ -515,7 +515,7 @@ async fn collect_eviction_candidates(
let mut candidates = Vec::new();
for (tenant_id, _state) in &tenants {
for (tenant_id, _state, _gen) in &tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}

View File

@@ -14,6 +14,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
@@ -844,11 +845,12 @@ async fn tenant_list_handler(
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
})?
.iter()
.map(|(id, state)| TenantInfo {
.map(|(id, state, gen)| TenantInfo {
id: *id,
state: state.clone(),
current_physical_size: None,
attachment_status: state.attachment_status(),
generation: (*gen).into(),
})
.collect::<Vec<TenantInfo>>();
@@ -872,11 +874,15 @@ async fn tenant_status(
}
let state = tenant.current_state();
Result::<_, ApiError>::Ok(TenantInfo {
id: tenant_shard_id,
state: state.clone(),
current_physical_size: Some(current_physical_size),
attachment_status: state.attachment_status(),
Result::<_, ApiError>::Ok(TenantDetails {
tenant_info: TenantInfo {
id: tenant_shard_id,
state: state.clone(),
current_physical_size: Some(current_physical_size),
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
},
timelines: tenant.list_timeline_ids(),
})
}
.instrument(info_span!("tenant_status_handler",

View File

@@ -1776,6 +1776,7 @@ pub fn is_inherited_key(key: Key) -> bool {
key != AUX_FILES_KEY
}
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
Ok(match key.field1 {
0x00 => (
@@ -1790,7 +1791,6 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
_ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
})
}
pub fn is_rel_fsm_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
}

View File

@@ -1552,6 +1552,10 @@ impl Tenant {
.collect()
}
pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
self.timelines.lock().unwrap().keys().cloned().collect()
}
/// This is used to create the initial 'main' timeline during bootstrapping,
/// or when importing a new base backup. The caller is expected to load an
/// initial image of the datadir to the new timeline after this.
@@ -1911,6 +1915,10 @@ impl Tenant {
self.current_state() == TenantState::Active
}
pub fn generation(&self) -> Generation {
self.generation
}
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup

View File

@@ -1511,8 +1511,8 @@ pub(crate) enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>, TenantMapListError>
{
pub(crate) async fn list_tenants(
) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
let tenants = TENANTS.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
@@ -1520,7 +1520,9 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>,
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
TenantSlot::Attached(tenant) => {
Some((*id, tenant.current_state(), tenant.generation()))
}
TenantSlot::Secondary => None,
TenantSlot::InProgress(_) => None,
})

View File

@@ -0,0 +1,73 @@
# Usage from top of repo:
# poetry run python3 ./scripts/ps_duplicate_tenant.py c66e2e233057f7f05563caff664ecb14 .neon/remote_storage_local_fs
import argparse
import shutil
import subprocess
import time
from pathlib import Path
import sys
sys.path.append("test_runner")
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import TenantId
parser = argparse.ArgumentParser(description="Duplicate tenant script.")
parser.add_argument("initial_tenant", type=str, help="Initial tenant")
parser.add_argument("remote_storage_local_fs_root", type=Path, help="Remote storage local fs root")
parser.add_argument("--ncopies", type=int, help="Number of copies")
parser.add_argument("--numthreads", type=int, default=1, help="Number of threads")
parser.add_argument("--port", type=int, default=9898, help="Pageserver management api port")
args = parser.parse_args()
initial_tenant = args.initial_tenant
remote_storage_local_fs_root: Path = args.remote_storage_local_fs_root
ncopies = args.ncopies
numthreads = args.numthreads
new_tenant = TenantId.generate()
print(f"New tenant: {new_tenant}")
client = PageserverHttpClient(args.port, lambda: None)
src_tenant_gen = int(client.tenant_status(initial_tenant)["generation"])
assert remote_storage_local_fs_root.is_dir(), f"{remote_storage_local_fs_root} is not a directory"
src_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / initial_tenant / "timelines"
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
dst_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / str(new_tenant) / "timelines"
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
for tl in src_timelines_dir.iterdir():
src_tl_dir = src_timelines_dir / tl.name
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
dst_tl_dir = dst_timelines_dir / tl.name
dst_tl_dir.mkdir(parents=False, exist_ok=False)
for file in tl.iterdir():
shutil.copy2(file, dst_tl_dir)
if "__" in file.name:
cmd = [
"./target/debug/pagectl", # TODO: abstract this like the other binaries
"layer",
"rewrite-summary",
str(dst_tl_dir / file.name),
"--new-tenant-id",
str(new_tenant),
]
subprocess.run(cmd, check=True)
client.tenant_attach(new_tenant, generation=src_tenant_gen)
while True:
status = client.tenant_status(new_tenant)
if status["state"]["slug"] == "Active":
break
print("Waiting for tenant to be active..., is: " + status["state"]["slug"])
time.sleep(1)
print("Tenant is active: " + str(new_tenant))

41
setup_bench_repo_dir.bash Normal file
View File

@@ -0,0 +1,41 @@
#!/usr/bin/env bash
set -euo pipefail
if [ "$(cat /sys/class/block/nvme1n1/device/model)" != "Amazon EC2 NVMe Instance Storage " ]; then
echo "nvme1n1 is not Amazon EC2 NVMe Instance Storage: '$(cat /sys/class/block/nvme1n1/device/model)'"
exit 1
fi
rmdir bench_repo_dir || true
sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
sudo mount /dev/nvme1n1 /mnt
sudo chown -R "$(id -u)":"$(id -g)" /mnt
mkdir /mnt/bench_repo_dir
mkdir bench_repo_dir
sudo mount --bind /mnt/bench_repo_dir bench_repo_dir
mkdir /mnt/test_output
mkdir /mnt/many_tenants
echo run the following commands
cat <<EOF
# test suite run
export TEST_OUTPUT="/mnt/test_output"
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_pageserver.py
# for interactive use
export NEON_REPO_DIR="$(readlink -f ./bench_repo_dir)/repo"
cargo build_testing --release
./target/release/neon_local init
# ... create tenant, seed it using pgbench
# then duplicate the tenant using
# poetry run python3 ./test_runner/duplicate_tenant.py TENANT_ID 200 8
EOF

View File

@@ -10,7 +10,7 @@ from datetime import datetime
from pathlib import Path
# Type-related stuff
from typing import Callable, ClassVar, Dict, Iterator, Optional
from typing import Any, Callable, ClassVar, Dict, Iterator, Optional
import pytest
from _pytest.config import Config
@@ -20,6 +20,7 @@ from _pytest.terminal import TerminalReporter
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonPageserver
from fixtures.types import TenantId, TimelineId
from fixtures.utils import humantime_to_ms
"""
This file contains fixtures for micro-benchmarks.
@@ -409,6 +410,34 @@ class NeonBenchmarker:
report=MetricReport.LOWER_IS_BETTER,
)
def record_pagebench_results(self, name: str, results: Dict[str, Any]):
total = results["total"]
metric = "request_count"
self.record(
f"{name}.{metric}",
total[metric],
"",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "latency_mean"
self.record(
f"{name}.{metric}",
humantime_to_ms(total[metric]),
"ms",
report=MetricReport.LOWER_IS_BETTER,
)
metric = "latency_percentiles"
for k, v in total[metric].items():
self.record(
f"{name}.{metric}.{k}",
humantime_to_ms(v),
"ms",
report=MetricReport.LOWER_IS_BETTER,
)
@pytest.fixture(scope="function")
def zenbenchmark(record_property: Callable[[str, object], None]) -> Iterator[NeonBenchmarker]:

View File

@@ -772,13 +772,10 @@ class NeonEnv:
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
self.control_plane_api: Optional[str] = None
self.attachment_service: Optional[NeonAttachmentService] = None
if config.enable_generations:
attachment_service_port = self.port_distributor.get_port()
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
else:
self.control_plane_api = None
self.attachment_service = None
self.enable_generations()
# Create a config file corresponding to the options
cfg: Dict[str, Any] = {
@@ -847,6 +844,18 @@ class NeonEnv:
log.info(f"Config: {cfg}")
self.neon_cli.init(cfg)
def enable_generations(self, start=False):
if not start:
# TODO: assert that we haven't `self.start()`ed yet
pass
assert self.control_plane_api is None
assert self.attachment_service is None
attachment_service_port = self.port_distributor.get_port()
self.control_plane_api = f"http://127.0.0.1:{attachment_service_port}"
self.attachment_service = NeonAttachmentService(self)
if start:
self.attachment_service.start()
def start(self):
# Start up broker, pageserver and all safekeepers
self.broker.try_start()
@@ -1580,6 +1589,16 @@ class Pagectl(AbstractNeonCli):
parsed = json.loads(res.stdout)
return IndexPartDump.from_json(parsed)
# class GetpageBenchLibpq(AbstractNeonCli):
# """
# A typed wrapper around the `getpage_bench_libpq` CLI.
# """
#
# COMMAND = "getpage_bench_libpq"
#
# def run(self):
# pass
class NeonAttachmentService:
def __init__(self, env: NeonEnv):

View File

@@ -397,3 +397,36 @@ def run_pg_bench_small(pg_bin: "PgBin", connstr: str):
}
"""
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", connstr])
def humantime_to_ms(humantime: str) -> float:
"""
Converts Rust humantime's output string to milliseconds.
humantime_to_ms("1h 1ms 406us") -> 3600001.406
"""
unit_multiplier_map = {
"ns": 1e-6,
"us": 1e-3,
"ms": 1,
"s": 1e3,
"m": 1e3 * 60,
"h": 1e3 * 60 * 60,
}
matcher = re.compile(rf"^(\d+)({'|'.join(unit_multiplier_map.keys())})$")
total_ms = 0.0
if humantime == "0":
return total_ms
for item in humantime.split():
if (match := matcher.search(item)) is not None:
n, unit = match.groups()
total_ms += int(n) * unit_multiplier_map[unit]
else:
raise ValueError(
f"can't parse '{item}' (from string '{humantime}'), known units are {', '.join(unit_multiplier_map.keys())}."
)
return round(total_ms, 3)

View File

@@ -0,0 +1,177 @@
import json
import os
import shutil
import subprocess
import time
from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, last_flush_lsn_upload
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import TenantId, TimelineId
@pytest.fixture(scope="function")
def snapshotting_env(
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_output_dir: Path
) -> Tuple[NeonEnv, TimelineId, List[TenantId]]:
"""
The fixture prepares environment or restores it from a snapshot.
The logic is the following:
- if the snapshot directory exists, the snapshot is restored from it
- if there is no snapshot, the environment is initialized from scratch and stored in a snapshot
- if the fixture is executed on CI (it has CI=true in the environment), the snapshot is not saved
"""
snapshot_dir = test_output_dir.parent / f"snapshot-{test_output_dir.name}"
save_snapshot = os.getenv("CI", "false") != "true"
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# create our template tenant
tenant_config_mgmt_api = {
"gc_period": "0s",
"checkpoint_timeout": "10 years",
"compaction_period": "20 s",
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
if snapshot_dir.exists():
env = neon_env_builder.from_repo_dir(snapshot_dir)
ps_http = env.pageserver.http_client()
tenants = list({TenantId(t.name) for t in (snapshot_dir.glob("pageserver_*/tenants/*"))})
template_timeline = env.initial_timeline
env.broker.try_start()
assert env.attachment_service is not None
env.attachment_service.start()
# Wait for the attachment service to start
time.sleep(5)
for tenant in tenants:
env.attachment_service.attach_hook_issue(tenant, 1)
env.pageserver.start()
else:
env = neon_env_builder.init_start()
remote_storage = env.pageserver_remote_storage
assert isinstance(remote_storage, LocalFsStorage)
ps_http = env.pageserver.http_client()
# clean up the useless default tenant
ps_http.tenant_delete(env.initial_tenant)
tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()}
template_tenant, template_timeline = env.neon_cli.create_tenant(
conf=tenant_config_cli, set_default=True
)
template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"])
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", "-s50", ep.connstr()])
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
ps_http.tenant_detach(template_tenant)
# stop PS just for good measure
env.pageserver.stop()
# duplicate the tenant in remote storage
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
tenants = [template_tenant]
for i in range(0, 200):
new_tenant = TenantId.generate()
tenants.append(new_tenant)
log.info("Duplicating tenant #%s: %s", i, new_tenant)
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
for tl in src_timelines_dir.iterdir():
src_tl_dir = src_timelines_dir / tl.name
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
dst_tl_dir = dst_timelines_dir / tl.name
dst_tl_dir.mkdir(parents=False, exist_ok=False)
for file in tl.iterdir():
shutil.copy2(file, dst_tl_dir)
if "__" in file.name:
cmd: List[str] = [
str(
env.neon_binpath / "pagectl"
), # TODO: abstract this like the other binaries
"layer",
"rewrite-summary",
str(dst_tl_dir / file.name),
"--new-tenant-id",
str(new_tenant),
]
subprocess.run(cmd, check=True)
else:
# index_part etc need no patching
pass
env.pageserver.start()
assert ps_http.tenant_list() == []
for tenant in tenants:
ps_http.tenant_attach(
tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen + 1
)
if save_snapshot and not snapshot_dir.exists():
shutil.copytree(env.repo_dir, snapshot_dir)
for tenant in tenants:
wait_until_tenant_active(ps_http, tenant)
# ensure all layers are resident for predictiable performance
# TODO: ensure all kinds of eviction are disabled (per-tenant, disk-usage-based)
for tenant in tenants:
ps_http.download_all_layers(tenant, template_timeline)
return env, template_timeline, tenants
def test_getpage_throughput(
snapshotting_env: Tuple[NeonEnv, TimelineId, List[TenantId]],
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
):
env, template_timeline, tenants = snapshotting_env
ps_http = env.pageserver.http_client()
# run the benchmark with one client per timeline, each doing 10k requests to random keys.
cmd = [
str(env.neon_binpath / "pagebench"),
"get-page-latest-lsn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--runtime",
"10s",
*[f"{tenant}/{template_timeline}" for tenant in tenants],
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
zenbenchmark.record_pagebench_results("get-page-latest-lsn", results)