mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Compare commits
16 Commits
prewarm_us
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d1bcecebde | ||
|
|
e49f2d72af | ||
|
|
c3ab86dc6a | ||
|
|
163083665e | ||
|
|
8dcf78d1af | ||
|
|
5f7e821a62 | ||
|
|
c417a23dd0 | ||
|
|
20e5e9dd16 | ||
|
|
24c72db5ff | ||
|
|
6aee8511f7 | ||
|
|
ad2091bdd0 | ||
|
|
573d4752e6 | ||
|
|
136bec6014 | ||
|
|
0f8b4faa50 | ||
|
|
5b42949531 | ||
|
|
4a6dfb0ccb |
36
Cargo.lock
generated
36
Cargo.lock
generated
@@ -2106,6 +2106,20 @@ dependencies = [
|
||||
"hashbrown 0.13.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hdrhistogram"
|
||||
version = "7.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"byteorder",
|
||||
"crossbeam-channel",
|
||||
"flate2",
|
||||
"nom",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heapless"
|
||||
version = "0.8.0"
|
||||
@@ -3056,6 +3070,28 @@ dependencies = [
|
||||
"sha2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"futures",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"pageserver",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pagectl"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -6,6 +6,7 @@ members = [
|
||||
"pageserver",
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/pagebench",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"storage_broker",
|
||||
@@ -79,6 +80,7 @@ futures-util = "0.3"
|
||||
git-version = "0.3"
|
||||
hashbrown = "0.13"
|
||||
hashlink = "0.8.1"
|
||||
hdrhistogram = "7.5.2"
|
||||
hex = "0.4"
|
||||
hex-literal = "0.4"
|
||||
hmac = "0.12.1"
|
||||
|
||||
@@ -368,6 +368,16 @@ pub struct TenantInfo {
|
||||
/// If a layer is present in both local FS and S3, it counts only once.
|
||||
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
||||
pub attachment_status: TenantAttachmentStatus,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TenantDetails {
|
||||
#[serde(flatten)]
|
||||
pub tenant_info: TenantInfo,
|
||||
|
||||
pub timelines: Vec<TimelineId>,
|
||||
}
|
||||
|
||||
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
|
||||
@@ -914,6 +924,7 @@ mod tests {
|
||||
state: TenantState::Active,
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_active = json!({
|
||||
"id": original_active.id.to_string(),
|
||||
@@ -934,6 +945,7 @@ mod tests {
|
||||
},
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_broken = json!({
|
||||
"id": original_broken.id.to_string(),
|
||||
|
||||
@@ -81,6 +81,10 @@ impl TenantShardId {
|
||||
pub fn is_zero(&self) -> bool {
|
||||
self.shard_number == ShardNumber(0)
|
||||
}
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Formatting helper
|
||||
|
||||
@@ -366,6 +366,47 @@ impl MonotonicCounter<Lsn> for RecordLsn {
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s.
|
||||
pub struct LsnSampler(<u64 as rand::distributions::uniform::SampleUniform>::Sampler);
|
||||
|
||||
impl rand::distributions::uniform::SampleUniform for Lsn {
|
||||
type Sampler = LsnSampler;
|
||||
}
|
||||
|
||||
impl rand::distributions::uniform::UniformSampler for LsnSampler {
|
||||
type X = Lsn;
|
||||
|
||||
fn new<B1, B2>(low: B1, high: B2) -> Self
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Self
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new_inclusive(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> Self::X {
|
||||
Lsn(self.0.sample(rng))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::bin_ser::BeSer;
|
||||
|
||||
@@ -64,6 +64,18 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn tenant_details(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<pageserver_api::models::TenantDetails> {
|
||||
let uri = format!("{}/v1/tenant/{tenant_id}", self.mgmt_api_endpoint);
|
||||
self.get(uri)
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn list_timelines(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
26
pageserver/pagebench/Cargo.toml
Normal file
26
pageserver/pagebench/Cargo.toml
Normal file
@@ -0,0 +1,26 @@
|
||||
[package]
|
||||
name = "pagebench"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
pageserver = { path = ".." }
|
||||
pageserver_client.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
utils = { path = "../../libs/utils/" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
273
pageserver/pagebench/src/basebackup.rs
Normal file
273
pageserver/pagebench/src/basebackup.rs
Normal file
@@ -0,0 +1,273 @@
|
||||
use anyhow::Context;
|
||||
use pageserver_client::page_service::BasebackupRequest;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, info, instrument};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::cli;
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
/// basebackup@LatestLSN
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "localhost:64000")]
|
||||
page_service_host_port: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long, default_value = "1.0")]
|
||||
gzip_probability: f64,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long)]
|
||||
limit_to_first_n_targets: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiveStats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
struct Target {
|
||||
timeline: TenantTimelineId,
|
||||
lsn_range: Option<Range<Lsn>>,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output {
|
||||
total: request_stats::Output,
|
||||
}
|
||||
|
||||
tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
|
||||
main_impl(args, thread_local_stats)
|
||||
})
|
||||
}
|
||||
|
||||
async fn main_impl(
|
||||
args: Args,
|
||||
all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
|
||||
) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
cli::targets::Spec {
|
||||
limit_to_first_n_targets: args.limit_to_first_n_targets,
|
||||
targets: args.targets.clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let mut js = JoinSet::new();
|
||||
for timeline in &timelines {
|
||||
js.spawn({
|
||||
let timeline = *timeline;
|
||||
// FIXME: this triggers initial logical size calculation
|
||||
// https://github.com/neondatabase/neon/issues/6168
|
||||
let info = mgmt_api_client
|
||||
.timeline_info(timeline.tenant_id, timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
async move {
|
||||
anyhow::Ok(Target {
|
||||
timeline,
|
||||
// TODO: support lsn_range != latest LSN
|
||||
lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
|
||||
})
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut all_targets: Vec<Target> = Vec::new();
|
||||
while let Some(res) = js.join_next().await {
|
||||
all_targets.push(res.unwrap().unwrap());
|
||||
}
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_client_tasks = timelines.len();
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
|
||||
));
|
||||
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
info!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut work_senders = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
for tl in &timelines {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
|
||||
work_senders.insert(tl, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
*tl,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&all_work_done_barrier),
|
||||
Arc::clone(&live_stats),
|
||||
)));
|
||||
}
|
||||
|
||||
let work_sender = async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let (timeline, work) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let target = all_targets.choose(&mut rng).unwrap();
|
||||
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
|
||||
(
|
||||
target.timeline,
|
||||
Work {
|
||||
lsn,
|
||||
gzip: rng.gen_bool(args.gzip_probability),
|
||||
},
|
||||
)
|
||||
};
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
// TODO: what if this blocks?
|
||||
sender.send(work).await.ok().unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(runtime) = args.runtime {
|
||||
match tokio::time::timeout(runtime.into(), work_sender).await {
|
||||
Ok(()) => unreachable!("work sender never terminates"),
|
||||
Err(_timeout) => {
|
||||
// this implicitly drops the work_senders, making all the clients exit
|
||||
}
|
||||
}
|
||||
} else {
|
||||
work_sender.await;
|
||||
unreachable!("work sender never terminates");
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
total: {
|
||||
let mut agg_stats = request_stats::Stats::new();
|
||||
for stats in all_thread_local_stats.lock().unwrap().iter() {
|
||||
let stats = stats.lock().unwrap();
|
||||
agg_stats.add(&stats);
|
||||
}
|
||||
agg_stats.output()
|
||||
},
|
||||
};
|
||||
|
||||
let output = serde_json::to_string_pretty(&output).unwrap();
|
||||
println!("{output}");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
struct Work {
|
||||
lsn: Option<Lsn>,
|
||||
gzip: bool,
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
timeline: TenantTimelineId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<Work>,
|
||||
all_work_done_barrier: Arc<Barrier>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
let client = pageserver_client::page_service::Client::new(crate::util::connstring::connstring(
|
||||
&args.page_service_host_port,
|
||||
args.pageserver_jwt.as_deref(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(Work { lsn, gzip }) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
let copy_out_stream = client
|
||||
.basebackup(&BasebackupRequest {
|
||||
tenant_id: timeline.tenant_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
lsn,
|
||||
gzip,
|
||||
})
|
||||
.await
|
||||
.with_context(|| format!("start basebackup for {timeline}"))
|
||||
.unwrap();
|
||||
|
||||
use futures::StreamExt;
|
||||
let size = Arc::new(AtomicUsize::new(0));
|
||||
copy_out_stream
|
||||
.for_each({
|
||||
|r| {
|
||||
let size = Arc::clone(&size);
|
||||
async move {
|
||||
let size = Arc::clone(&size);
|
||||
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
all_work_done_barrier.wait().await;
|
||||
}
|
||||
1
pageserver/pagebench/src/cli.rs
Normal file
1
pageserver/pagebench/src/cli.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub(crate) mod targets;
|
||||
37
pageserver/pagebench/src/cli/targets.rs
Normal file
37
pageserver/pagebench/src/cli/targets.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_client::mgmt_api;
|
||||
use tracing::info;
|
||||
|
||||
use crate::util::{
|
||||
discover_timelines::get_pageserver_tenant_timelines, tenant_timeline_id::TenantTimelineId,
|
||||
};
|
||||
|
||||
pub(crate) struct Spec {
|
||||
pub(crate) limit_to_first_n_targets: Option<usize>,
|
||||
pub(crate) targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
pub(crate) async fn discover(
|
||||
api_client: &Arc<mgmt_api::Client>,
|
||||
spec: Spec,
|
||||
) -> anyhow::Result<Vec<TenantTimelineId>> {
|
||||
let mut timelines = if let Some(targets) = spec.targets {
|
||||
targets
|
||||
} else {
|
||||
get_pageserver_tenant_timelines(api_client).await?
|
||||
};
|
||||
|
||||
if let Some(limit) = spec.limit_to_first_n_targets {
|
||||
timelines.sort(); // for determinism
|
||||
timelines.truncate(limit);
|
||||
if timelines.len() < limit {
|
||||
anyhow::bail!("pageserver has less than limit_to_first_n_targets={limit} tenants");
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
info!("number of timelines:\n{:?}", timelines.len());
|
||||
|
||||
Ok(timelines)
|
||||
}
|
||||
337
pageserver/pagebench/src/getpage_latest_lsn.rs
Normal file
337
pageserver/pagebench/src/getpage_latest_lsn.rs
Normal file
@@ -0,0 +1,337 @@
|
||||
use anyhow::Context;
|
||||
use futures::future::join_all;
|
||||
use pageserver::pgdatadir_mapping::key_to_rel_block;
|
||||
use pageserver::repository;
|
||||
use pageserver_api::key::is_rel_block_key;
|
||||
use pageserver_client::page_service::RelTagBlockNo;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{info, instrument};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::cli;
|
||||
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long)]
|
||||
per_target_rate_limit: Option<usize>,
|
||||
#[clap(long)]
|
||||
limit_to_first_n_targets: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiveStats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct KeyRange {
|
||||
timeline: TenantTimelineId,
|
||||
timeline_lsn: Lsn,
|
||||
start: i128,
|
||||
end: i128,
|
||||
}
|
||||
|
||||
impl KeyRange {
|
||||
fn len(&self) -> i128 {
|
||||
self.end - self.start
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output {
|
||||
total: request_stats::Output,
|
||||
}
|
||||
|
||||
tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
tokio_thread_local_stats::main!(STATS, move |thread_local_stats| {
|
||||
main_impl(args, thread_local_stats)
|
||||
})
|
||||
}
|
||||
|
||||
async fn main_impl(
|
||||
args: Args,
|
||||
all_thread_local_stats: AllThreadLocalStats<request_stats::Stats>,
|
||||
) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
cli::targets::Spec {
|
||||
limit_to_first_n_targets: args.limit_to_first_n_targets,
|
||||
targets: args.targets.clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for timeline in &timelines {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
let timeline = *timeline;
|
||||
async move {
|
||||
let partitioning = mgmt_api_client
|
||||
.keyspace(timeline.tenant_id, timeline.timeline_id)
|
||||
.await?;
|
||||
let lsn = partitioning.at_lsn;
|
||||
|
||||
let ranges = partitioning
|
||||
.keys
|
||||
.ranges
|
||||
.iter()
|
||||
.filter_map(|r| {
|
||||
let start = r.start;
|
||||
let end = r.end;
|
||||
// filter out non-relblock keys
|
||||
match (is_rel_block_key(&start), is_rel_block_key(&end)) {
|
||||
(true, true) => Some(KeyRange {
|
||||
timeline,
|
||||
timeline_lsn: lsn,
|
||||
start: start.to_i128(),
|
||||
end: end.to_i128(),
|
||||
}),
|
||||
(true, false) | (false, true) => {
|
||||
unimplemented!("split up range")
|
||||
}
|
||||
(false, false) => None,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
anyhow::Ok(ranges)
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut all_ranges: Vec<KeyRange> = Vec::new();
|
||||
while let Some(res) = js.join_next().await {
|
||||
all_ranges.extend(res.unwrap().unwrap());
|
||||
}
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_client_tasks = timelines.len();
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
|
||||
));
|
||||
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
info!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut work_senders = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
for tl in &timelines {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
|
||||
work_senders.insert(tl, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
*tl,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&all_work_done_barrier),
|
||||
Arc::clone(&live_stats),
|
||||
)));
|
||||
}
|
||||
|
||||
let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = match args.per_target_rate_limit {
|
||||
None => Box::pin(async move {
|
||||
let weights = rand::distributions::weighted::WeightedIndex::new(
|
||||
all_ranges.iter().map(|v| v.len()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
loop {
|
||||
let (range, key) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &all_ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = repository::Key::from_i128(key);
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
(r, RelTagBlockNo { rel_tag, block_no })
|
||||
};
|
||||
let sender = work_senders.get(&range.timeline).unwrap();
|
||||
// TODO: what if this blocks?
|
||||
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
|
||||
}
|
||||
}),
|
||||
Some(rps_limit) => Box::pin(async move {
|
||||
let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
|
||||
|
||||
let make_timeline_task: &dyn Fn(
|
||||
TenantTimelineId,
|
||||
)
|
||||
-> Pin<Box<dyn Send + Future<Output = ()>>> = &|timeline| {
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
let ranges: Vec<KeyRange> = all_ranges
|
||||
.iter()
|
||||
.filter(|r| r.timeline == timeline)
|
||||
.cloned()
|
||||
.collect();
|
||||
let weights = rand::distributions::weighted::WeightedIndex::new(
|
||||
ranges.iter().map(|v| v.len()),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
let mut ticker = tokio::time::interval(period);
|
||||
ticker.set_missed_tick_behavior(
|
||||
/* TODO review this choice */
|
||||
tokio::time::MissedTickBehavior::Burst,
|
||||
);
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
let (range, key) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = repository::Key::from_i128(key);
|
||||
let (rel_tag, block_no) = key_to_rel_block(key)
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
(r, RelTagBlockNo { rel_tag, block_no })
|
||||
};
|
||||
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let tasks: Vec<_> = work_senders
|
||||
.keys()
|
||||
.map(|tl| make_timeline_task(**tl))
|
||||
.collect();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
join_all(tasks).await;
|
||||
}),
|
||||
};
|
||||
|
||||
if let Some(runtime) = args.runtime {
|
||||
match tokio::time::timeout(runtime.into(), work_sender).await {
|
||||
Ok(()) => unreachable!("work sender never terminates"),
|
||||
Err(_timeout) => {
|
||||
// this implicitly drops the work_senders, making all the clients exit
|
||||
}
|
||||
}
|
||||
} else {
|
||||
work_sender.await;
|
||||
unreachable!("work sender never terminates");
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
total: {
|
||||
let mut agg_stats = request_stats::Stats::new();
|
||||
for stats in all_thread_local_stats.lock().unwrap().iter() {
|
||||
let stats = stats.lock().unwrap();
|
||||
agg_stats.add(&stats);
|
||||
}
|
||||
agg_stats.output()
|
||||
},
|
||||
};
|
||||
|
||||
let output = serde_json::to_string_pretty(&output).unwrap();
|
||||
println!("{output}");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
timeline: TenantTimelineId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<(RelTagBlockNo, Lsn)>,
|
||||
all_work_done_barrier: Arc<Barrier>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(timeline.tenant_id, timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some((key, lsn)) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
client
|
||||
.getpage(key, lsn)
|
||||
.await
|
||||
.with_context(|| format!("getpage for {timeline}"))
|
||||
.unwrap();
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
all_work_done_barrier.wait().await;
|
||||
}
|
||||
34
pageserver/pagebench/src/main.rs
Normal file
34
pageserver/pagebench/src/main.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
use clap::Parser;
|
||||
use utils::logging;
|
||||
|
||||
pub(crate) mod cli;
|
||||
pub(crate) mod util;
|
||||
|
||||
mod basebackup;
|
||||
mod getpage_latest_lsn;
|
||||
mod trigger_initial_size_calculation;
|
||||
|
||||
/// Component-level performance test for pageserver.
|
||||
#[derive(clap::Parser)]
|
||||
enum Args {
|
||||
Basebackup(basebackup::Args),
|
||||
GetPageLatestLsn(getpage_latest_lsn::Args),
|
||||
TriggerInitialSizeCalculation(trigger_initial_size_calculation::Args),
|
||||
}
|
||||
|
||||
fn main() {
|
||||
logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let args = Args::parse();
|
||||
match args {
|
||||
Args::Basebackup(args) => basebackup::main(args),
|
||||
Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args),
|
||||
Args::TriggerInitialSizeCalculation(args) => trigger_initial_size_calculation::main(args),
|
||||
}
|
||||
.unwrap()
|
||||
}
|
||||
86
pageserver/pagebench/src/trigger_initial_size_calculation.rs
Normal file
86
pageserver/pagebench/src/trigger_initial_size_calculation.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use humantime::Duration;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
use crate::{cli, util::tenant_timeline_id::TenantTimelineId};
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "localhost:64000")]
|
||||
page_service_host_port: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(
|
||||
long,
|
||||
help = "if specified, poll mgmt api to check whether init logical size calculation has completed"
|
||||
)]
|
||||
poll_for_completion: Option<Duration>,
|
||||
#[clap(long)]
|
||||
limit_to_first_n_targets: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let main_task = rt.spawn(main_impl(args));
|
||||
rt.block_on(main_task).unwrap()
|
||||
}
|
||||
|
||||
async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
cli::targets::Spec {
|
||||
limit_to_first_n_targets: args.limit_to_first_n_targets,
|
||||
targets: args.targets.clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// kick it off
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for tl in timelines {
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
js.spawn(async move {
|
||||
// TODO: API to explicitly trigger initial logical size computation.
|
||||
// Should probably also avoid making it a side effect of timeline details to trigger initial logical size calculation.
|
||||
// => https://github.com/neondatabase/neon/issues/6168
|
||||
let info = mgmt_api_client
|
||||
.timeline_info(tl.tenant_id, tl.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
if let Some(period) = args.poll_for_completion {
|
||||
let mut ticker = tokio::time::interval(period.into());
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
||||
let mut info = info;
|
||||
while !info.current_logical_size_is_accurate {
|
||||
ticker.tick().await;
|
||||
info = mgmt_api_client
|
||||
.timeline_info(tl.tenant_id, tl.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let _: () = res.unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
6
pageserver/pagebench/src/util.rs
Normal file
6
pageserver/pagebench/src/util.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
pub(crate) mod connstring;
|
||||
pub(crate) mod discover_timelines;
|
||||
pub(crate) mod request_stats;
|
||||
pub(crate) mod tenant_timeline_id;
|
||||
#[macro_use]
|
||||
pub(crate) mod tokio_thread_local_stats;
|
||||
8
pageserver/pagebench/src/util/connstring.rs
Normal file
8
pageserver/pagebench/src/util/connstring.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
pub(crate) fn connstring(host_port: &str, jwt: Option<&str>) -> String {
|
||||
let colon_and_jwt = if let Some(jwt) = jwt {
|
||||
format!(":{jwt}") // TODO: urlescape
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
format!("postgres://postgres{colon_and_jwt}@{host_port}")
|
||||
}
|
||||
45
pageserver/pagebench/src/util/discover_timelines.rs
Normal file
45
pageserver/pagebench/src/util/discover_timelines.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_client::mgmt_api;
|
||||
use tokio::task::JoinSet;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use super::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
pub(crate) async fn get_pageserver_tenant_timelines(
|
||||
api_client: &Arc<mgmt_api::Client>,
|
||||
) -> anyhow::Result<Vec<TenantTimelineId>> {
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
let mut tenants: Vec<TenantId> = Vec::new();
|
||||
for ti in api_client.list_tenants().await? {
|
||||
if !ti.id.is_unsharded() {
|
||||
anyhow::bail!(
|
||||
"only unsharded tenants are supported at this time: {}",
|
||||
ti.id
|
||||
);
|
||||
}
|
||||
tenants.push(ti.id.tenant_id)
|
||||
}
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, details) = res.unwrap();
|
||||
for timeline_id in details.timelines {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(timelines)
|
||||
}
|
||||
88
pageserver/pagebench/src/util/request_stats.rs
Normal file
88
pageserver/pagebench/src/util/request_stats.rs
Normal file
@@ -0,0 +1,88 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
pub(crate) struct Stats {
|
||||
latency_histo: hdrhistogram::Histogram<u64>,
|
||||
}
|
||||
|
||||
impl Stats {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
|
||||
// which would skew the benchmark results.
|
||||
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
|
||||
}
|
||||
}
|
||||
pub(crate) fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
|
||||
let micros: u64 = latency
|
||||
.as_micros()
|
||||
.try_into()
|
||||
.context("latency greater than u64")?;
|
||||
self.latency_histo
|
||||
.record(micros)
|
||||
.context("add to histogram")?;
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) fn output(&self) -> Output {
|
||||
let latency_percentiles = std::array::from_fn(|idx| {
|
||||
let micros = self
|
||||
.latency_histo
|
||||
.value_at_percentile(LATENCY_PERCENTILES[idx]);
|
||||
Duration::from_micros(micros)
|
||||
});
|
||||
Output {
|
||||
request_count: self.latency_histo.len(),
|
||||
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
|
||||
latency_percentiles: LatencyPercentiles {
|
||||
latency_percentiles,
|
||||
},
|
||||
}
|
||||
}
|
||||
pub(crate) fn add(&mut self, other: &Self) {
|
||||
let Self {
|
||||
ref mut latency_histo,
|
||||
} = self;
|
||||
latency_histo.add(&other.latency_histo).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Stats {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99];
|
||||
|
||||
struct LatencyPercentiles {
|
||||
latency_percentiles: [Duration; 4],
|
||||
}
|
||||
|
||||
impl serde::Serialize for LatencyPercentiles {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
|
||||
for p in LATENCY_PERCENTILES {
|
||||
ser.serialize_entry(
|
||||
&format!("p{p}"),
|
||||
&format!(
|
||||
"{}",
|
||||
&humantime::format_duration(self.latency_percentiles[0])
|
||||
),
|
||||
)?;
|
||||
}
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
pub(crate) struct Output {
|
||||
request_count: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
latency_mean: Duration,
|
||||
latency_percentiles: LatencyPercentiles,
|
||||
}
|
||||
34
pageserver/pagebench/src/util/tenant_timeline_id.rs
Normal file
34
pageserver/pagebench/src/util/tenant_timeline_id.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
|
||||
pub(crate) struct TenantTimelineId {
|
||||
pub(crate) tenant_id: TenantId,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
impl FromStr for TenantTimelineId {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let (tenant_id, timeline_id) = s
|
||||
.split_once('/')
|
||||
.context("tenant and timeline id must be separated by `/`")?;
|
||||
let tenant_id = TenantId::from_str(tenant_id)
|
||||
.with_context(|| format!("invalid tenant id: {tenant_id:?}"))?;
|
||||
let timeline_id = TimelineId::from_str(timeline_id)
|
||||
.with_context(|| format!("invalid timeline id: {timeline_id:?}"))?;
|
||||
Ok(Self {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TenantTimelineId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
|
||||
}
|
||||
}
|
||||
45
pageserver/pagebench/src/util/tokio_thread_local_stats.rs
Normal file
45
pageserver/pagebench/src/util/tokio_thread_local_stats.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
pub(crate) type ThreadLocalStats<T> = Arc<Mutex<T>>;
|
||||
pub(crate) type AllThreadLocalStats<T> = Arc<Mutex<Vec<ThreadLocalStats<T>>>>;
|
||||
|
||||
macro_rules! declare {
|
||||
($THREAD_LOCAL_NAME:ident: $T:ty) => {
|
||||
thread_local! {
|
||||
pub static $THREAD_LOCAL_NAME: std::cell::RefCell<crate::util::tokio_thread_local_stats::ThreadLocalStats<$T>> = std::cell::RefCell::new(
|
||||
std::sync::Arc::new(std::sync::Mutex::new(Default::default()))
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub(crate) use declare;
|
||||
|
||||
macro_rules! main {
|
||||
($THREAD_LOCAL_NAME:ident, $main_impl:expr) => {{
|
||||
let main_impl = $main_impl;
|
||||
let all = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.on_thread_start({
|
||||
let all = Arc::clone(&all);
|
||||
move || {
|
||||
// pre-initialize the thread local stats by accessesing them
|
||||
// (some stats like requests_stats::Stats are quite costly to initialize,
|
||||
// we don't want to pay that cost during the measurement period)
|
||||
$THREAD_LOCAL_NAME.with(|stats| {
|
||||
let stats: Arc<_> = Arc::clone(&*stats.borrow());
|
||||
all.lock().unwrap().push(stats);
|
||||
});
|
||||
}
|
||||
})
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let main_task = rt.spawn(main_impl(all));
|
||||
rt.block_on(main_task).unwrap()
|
||||
}};
|
||||
}
|
||||
|
||||
pub(crate) use main;
|
||||
@@ -267,7 +267,7 @@ async fn calculate_synthetic_size_worker(
|
||||
}
|
||||
};
|
||||
|
||||
for (tenant_shard_id, tenant_state) in tenants {
|
||||
for (tenant_shard_id, tenant_state, _gen) in tenants {
|
||||
if tenant_state != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ pub(super) async fn collect_all_metrics(
|
||||
}
|
||||
};
|
||||
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
|
||||
if state != TenantState::Active || !id.is_zero() {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -515,7 +515,7 @@ async fn collect_eviction_candidates(
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state) in &tenants {
|
||||
for (tenant_id, _state, _gen) in &tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ use hyper::header;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::models::TenantDetails;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
|
||||
TenantLoadRequest, TenantLocationConfigRequest,
|
||||
@@ -844,11 +845,12 @@ async fn tenant_list_handler(
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
|
||||
})?
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
.map(|(id, state, gen)| TenantInfo {
|
||||
id: *id,
|
||||
state: state.clone(),
|
||||
current_physical_size: None,
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: (*gen).into(),
|
||||
})
|
||||
.collect::<Vec<TenantInfo>>();
|
||||
|
||||
@@ -872,11 +874,15 @@ async fn tenant_status(
|
||||
}
|
||||
|
||||
let state = tenant.current_state();
|
||||
Result::<_, ApiError>::Ok(TenantInfo {
|
||||
id: tenant_shard_id,
|
||||
state: state.clone(),
|
||||
current_physical_size: Some(current_physical_size),
|
||||
attachment_status: state.attachment_status(),
|
||||
Result::<_, ApiError>::Ok(TenantDetails {
|
||||
tenant_info: TenantInfo {
|
||||
id: tenant_shard_id,
|
||||
state: state.clone(),
|
||||
current_physical_size: Some(current_physical_size),
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: tenant.generation().into(),
|
||||
},
|
||||
timelines: tenant.list_timeline_ids(),
|
||||
})
|
||||
}
|
||||
.instrument(info_span!("tenant_status_handler",
|
||||
|
||||
@@ -1776,6 +1776,7 @@ pub fn is_inherited_key(key: Key) -> bool {
|
||||
key != AUX_FILES_KEY
|
||||
}
|
||||
|
||||
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
|
||||
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
Ok(match key.field1 {
|
||||
0x00 => (
|
||||
@@ -1790,7 +1791,6 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
_ => anyhow::bail!("unexpected value kind 0x{:02x}", key.field1),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_rel_fsm_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
|
||||
}
|
||||
|
||||
@@ -1552,6 +1552,10 @@ impl Tenant {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
|
||||
self.timelines.lock().unwrap().keys().cloned().collect()
|
||||
}
|
||||
|
||||
/// This is used to create the initial 'main' timeline during bootstrapping,
|
||||
/// or when importing a new base backup. The caller is expected to load an
|
||||
/// initial image of the datadir to the new timeline after this.
|
||||
@@ -1911,6 +1915,10 @@ impl Tenant {
|
||||
self.current_state() == TenantState::Active
|
||||
}
|
||||
|
||||
pub fn generation(&self) -> Generation {
|
||||
self.generation
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
///
|
||||
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
|
||||
|
||||
@@ -1511,8 +1511,8 @@ pub(crate) enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>, TenantMapListError>
|
||||
{
|
||||
pub(crate) async fn list_tenants(
|
||||
) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().unwrap();
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
@@ -1520,7 +1520,9 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>,
|
||||
};
|
||||
Ok(m.iter()
|
||||
.filter_map(|(id, tenant)| match tenant {
|
||||
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
|
||||
TenantSlot::Attached(tenant) => {
|
||||
Some((*id, tenant.current_state(), tenant.generation()))
|
||||
}
|
||||
TenantSlot::Secondary => None,
|
||||
TenantSlot::InProgress(_) => None,
|
||||
})
|
||||
|
||||
73
scripts/ps_duplicate_tenant.py
Normal file
73
scripts/ps_duplicate_tenant.py
Normal file
@@ -0,0 +1,73 @@
|
||||
# Usage from top of repo:
|
||||
# poetry run python3 ./scripts/ps_duplicate_tenant.py c66e2e233057f7f05563caff664ecb14 .neon/remote_storage_local_fs
|
||||
import argparse
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import sys
|
||||
|
||||
sys.path.append("test_runner")
|
||||
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.types import TenantId
|
||||
|
||||
parser = argparse.ArgumentParser(description="Duplicate tenant script.")
|
||||
parser.add_argument("initial_tenant", type=str, help="Initial tenant")
|
||||
parser.add_argument("remote_storage_local_fs_root", type=Path, help="Remote storage local fs root")
|
||||
parser.add_argument("--ncopies", type=int, help="Number of copies")
|
||||
parser.add_argument("--numthreads", type=int, default=1, help="Number of threads")
|
||||
parser.add_argument("--port", type=int, default=9898, help="Pageserver management api port")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
initial_tenant = args.initial_tenant
|
||||
remote_storage_local_fs_root: Path = args.remote_storage_local_fs_root
|
||||
ncopies = args.ncopies
|
||||
numthreads = args.numthreads
|
||||
|
||||
new_tenant = TenantId.generate()
|
||||
print(f"New tenant: {new_tenant}")
|
||||
|
||||
client = PageserverHttpClient(args.port, lambda: None)
|
||||
|
||||
src_tenant_gen = int(client.tenant_status(initial_tenant)["generation"])
|
||||
|
||||
assert remote_storage_local_fs_root.is_dir(), f"{remote_storage_local_fs_root} is not a directory"
|
||||
|
||||
src_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / initial_tenant / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
|
||||
dst_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / str(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd = [
|
||||
"./target/debug/pagectl", # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
client.tenant_attach(new_tenant, generation=src_tenant_gen)
|
||||
|
||||
while True:
|
||||
status = client.tenant_status(new_tenant)
|
||||
if status["state"]["slug"] == "Active":
|
||||
break
|
||||
print("Waiting for tenant to be active..., is: " + status["state"]["slug"])
|
||||
time.sleep(1)
|
||||
|
||||
print("Tenant is active: " + str(new_tenant))
|
||||
41
setup_bench_repo_dir.bash
Normal file
41
setup_bench_repo_dir.bash
Normal file
@@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
if [ "$(cat /sys/class/block/nvme1n1/device/model)" != "Amazon EC2 NVMe Instance Storage " ]; then
|
||||
echo "nvme1n1 is not Amazon EC2 NVMe Instance Storage: '$(cat /sys/class/block/nvme1n1/device/model)'"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
rmdir bench_repo_dir || true
|
||||
|
||||
sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
|
||||
|
||||
sudo mount /dev/nvme1n1 /mnt
|
||||
sudo chown -R "$(id -u)":"$(id -g)" /mnt
|
||||
|
||||
mkdir /mnt/bench_repo_dir
|
||||
mkdir bench_repo_dir
|
||||
sudo mount --bind /mnt/bench_repo_dir bench_repo_dir
|
||||
|
||||
mkdir /mnt/test_output
|
||||
|
||||
mkdir /mnt/many_tenants
|
||||
|
||||
echo run the following commands
|
||||
|
||||
cat <<EOF
|
||||
# test suite run
|
||||
export TEST_OUTPUT="/mnt/test_output"
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_pageserver.py
|
||||
|
||||
# for interactive use
|
||||
export NEON_REPO_DIR="$(readlink -f ./bench_repo_dir)/repo"
|
||||
cargo build_testing --release
|
||||
./target/release/neon_local init
|
||||
# ... create tenant, seed it using pgbench
|
||||
# then duplicate the tenant using
|
||||
# poetry run python3 ./test_runner/duplicate_tenant.py TENANT_ID 200 8
|
||||
EOF
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# Type-related stuff
|
||||
from typing import Callable, ClassVar, Dict, Iterator, Optional
|
||||
from typing import Any, Callable, ClassVar, Dict, Iterator, Optional
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
@@ -20,6 +20,7 @@ from _pytest.terminal import TerminalReporter
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonPageserver
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import humantime_to_ms
|
||||
|
||||
"""
|
||||
This file contains fixtures for micro-benchmarks.
|
||||
@@ -409,6 +410,34 @@ class NeonBenchmarker:
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
def record_pagebench_results(self, name: str, results: Dict[str, Any]):
|
||||
total = results["total"]
|
||||
|
||||
metric = "request_count"
|
||||
self.record(
|
||||
f"{name}.{metric}",
|
||||
total[metric],
|
||||
"",
|
||||
report=MetricReport.HIGHER_IS_BETTER,
|
||||
)
|
||||
|
||||
metric = "latency_mean"
|
||||
self.record(
|
||||
f"{name}.{metric}",
|
||||
humantime_to_ms(total[metric]),
|
||||
"ms",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
metric = "latency_percentiles"
|
||||
for k, v in total[metric].items():
|
||||
self.record(
|
||||
f"{name}.{metric}.{k}",
|
||||
humantime_to_ms(v),
|
||||
"ms",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def zenbenchmark(record_property: Callable[[str, object], None]) -> Iterator[NeonBenchmarker]:
|
||||
|
||||
@@ -772,13 +772,10 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
self.control_plane_api: Optional[str] = None
|
||||
self.attachment_service: Optional[NeonAttachmentService] = None
|
||||
if config.enable_generations:
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
|
||||
else:
|
||||
self.control_plane_api = None
|
||||
self.attachment_service = None
|
||||
self.enable_generations()
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
cfg: Dict[str, Any] = {
|
||||
@@ -847,6 +844,18 @@ class NeonEnv:
|
||||
log.info(f"Config: {cfg}")
|
||||
self.neon_cli.init(cfg)
|
||||
|
||||
def enable_generations(self, start=False):
|
||||
if not start:
|
||||
# TODO: assert that we haven't `self.start()`ed yet
|
||||
pass
|
||||
assert self.control_plane_api is None
|
||||
assert self.attachment_service is None
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service = NeonAttachmentService(self)
|
||||
if start:
|
||||
self.attachment_service.start()
|
||||
|
||||
def start(self):
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
self.broker.try_start()
|
||||
@@ -1580,6 +1589,16 @@ class Pagectl(AbstractNeonCli):
|
||||
parsed = json.loads(res.stdout)
|
||||
return IndexPartDump.from_json(parsed)
|
||||
|
||||
# class GetpageBenchLibpq(AbstractNeonCli):
|
||||
# """
|
||||
# A typed wrapper around the `getpage_bench_libpq` CLI.
|
||||
# """
|
||||
#
|
||||
# COMMAND = "getpage_bench_libpq"
|
||||
#
|
||||
# def run(self):
|
||||
# pass
|
||||
|
||||
|
||||
class NeonAttachmentService:
|
||||
def __init__(self, env: NeonEnv):
|
||||
|
||||
@@ -397,3 +397,36 @@ def run_pg_bench_small(pg_bin: "PgBin", connstr: str):
|
||||
}
|
||||
"""
|
||||
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", connstr])
|
||||
|
||||
|
||||
def humantime_to_ms(humantime: str) -> float:
|
||||
"""
|
||||
Converts Rust humantime's output string to milliseconds.
|
||||
|
||||
humantime_to_ms("1h 1ms 406us") -> 3600001.406
|
||||
"""
|
||||
|
||||
unit_multiplier_map = {
|
||||
"ns": 1e-6,
|
||||
"us": 1e-3,
|
||||
"ms": 1,
|
||||
"s": 1e3,
|
||||
"m": 1e3 * 60,
|
||||
"h": 1e3 * 60 * 60,
|
||||
}
|
||||
matcher = re.compile(rf"^(\d+)({'|'.join(unit_multiplier_map.keys())})$")
|
||||
total_ms = 0.0
|
||||
|
||||
if humantime == "0":
|
||||
return total_ms
|
||||
|
||||
for item in humantime.split():
|
||||
if (match := matcher.search(item)) is not None:
|
||||
n, unit = match.groups()
|
||||
total_ms += int(n) * unit_multiplier_map[unit]
|
||||
else:
|
||||
raise ValueError(
|
||||
f"can't parse '{item}' (from string '{humantime}'), known units are {', '.join(unit_multiplier_map.keys())}."
|
||||
)
|
||||
|
||||
return round(total_ms, 3)
|
||||
|
||||
177
test_runner/performance/test_pageserver.py
Normal file
177
test_runner/performance/test_pageserver.py
Normal file
@@ -0,0 +1,177 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, last_flush_lsn_upload
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def snapshotting_env(
|
||||
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_output_dir: Path
|
||||
) -> Tuple[NeonEnv, TimelineId, List[TenantId]]:
|
||||
"""
|
||||
The fixture prepares environment or restores it from a snapshot.
|
||||
|
||||
The logic is the following:
|
||||
- if the snapshot directory exists, the snapshot is restored from it
|
||||
- if there is no snapshot, the environment is initialized from scratch and stored in a snapshot
|
||||
- if the fixture is executed on CI (it has CI=true in the environment), the snapshot is not saved
|
||||
"""
|
||||
|
||||
snapshot_dir = test_output_dir.parent / f"snapshot-{test_output_dir.name}"
|
||||
save_snapshot = os.getenv("CI", "false") != "true"
|
||||
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
# create our template tenant
|
||||
tenant_config_mgmt_api = {
|
||||
"gc_period": "0s",
|
||||
"checkpoint_timeout": "10 years",
|
||||
"compaction_period": "20 s",
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
|
||||
if snapshot_dir.exists():
|
||||
env = neon_env_builder.from_repo_dir(snapshot_dir)
|
||||
ps_http = env.pageserver.http_client()
|
||||
tenants = list({TenantId(t.name) for t in (snapshot_dir.glob("pageserver_*/tenants/*"))})
|
||||
template_timeline = env.initial_timeline
|
||||
|
||||
env.broker.try_start()
|
||||
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.start()
|
||||
|
||||
# Wait for the attachment service to start
|
||||
time.sleep(5)
|
||||
|
||||
for tenant in tenants:
|
||||
env.attachment_service.attach_hook_issue(tenant, 1)
|
||||
|
||||
env.pageserver.start()
|
||||
else:
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
remote_storage = env.pageserver_remote_storage
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
# clean up the useless default tenant
|
||||
ps_http.tenant_delete(env.initial_tenant)
|
||||
|
||||
tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()}
|
||||
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(
|
||||
conf=tenant_config_cli, set_default=True
|
||||
)
|
||||
template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"])
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s50", ep.connstr()])
|
||||
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
|
||||
ps_http.tenant_detach(template_tenant)
|
||||
|
||||
# stop PS just for good measure
|
||||
env.pageserver.stop()
|
||||
|
||||
# duplicate the tenant in remote storage
|
||||
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
tenants = [template_tenant]
|
||||
for i in range(0, 200):
|
||||
new_tenant = TenantId.generate()
|
||||
tenants.append(new_tenant)
|
||||
log.info("Duplicating tenant #%s: %s", i, new_tenant)
|
||||
|
||||
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd: List[str] = [
|
||||
str(
|
||||
env.neon_binpath / "pagectl"
|
||||
), # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
else:
|
||||
# index_part etc need no patching
|
||||
pass
|
||||
|
||||
env.pageserver.start()
|
||||
assert ps_http.tenant_list() == []
|
||||
for tenant in tenants:
|
||||
ps_http.tenant_attach(
|
||||
tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen + 1
|
||||
)
|
||||
|
||||
if save_snapshot and not snapshot_dir.exists():
|
||||
shutil.copytree(env.repo_dir, snapshot_dir)
|
||||
|
||||
for tenant in tenants:
|
||||
wait_until_tenant_active(ps_http, tenant)
|
||||
|
||||
# ensure all layers are resident for predictiable performance
|
||||
# TODO: ensure all kinds of eviction are disabled (per-tenant, disk-usage-based)
|
||||
for tenant in tenants:
|
||||
ps_http.download_all_layers(tenant, template_timeline)
|
||||
|
||||
return env, template_timeline, tenants
|
||||
|
||||
|
||||
def test_getpage_throughput(
|
||||
snapshotting_env: Tuple[NeonEnv, TimelineId, List[TenantId]],
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
env, template_timeline, tenants = snapshotting_env
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# run the benchmark with one client per timeline, each doing 10k requests to random keys.
|
||||
cmd = [
|
||||
str(env.neon_binpath / "pagebench"),
|
||||
"get-page-latest-lsn",
|
||||
"--mgmt-api-endpoint",
|
||||
ps_http.base_url,
|
||||
"--page-service-connstring",
|
||||
env.pageserver.connstr(password=None),
|
||||
"--runtime",
|
||||
"10s",
|
||||
*[f"{tenant}/{template_timeline}" for tenant in tenants],
|
||||
]
|
||||
log.info(f"command: {' '.join(cmd)}")
|
||||
basepath = pg_bin.run_capture(cmd, with_command_header=False)
|
||||
results_path = Path(basepath + ".stdout")
|
||||
log.info(f"Benchmark results at: {results_path}")
|
||||
|
||||
with open(results_path, "r") as f:
|
||||
results = json.load(f)
|
||||
|
||||
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
|
||||
|
||||
zenbenchmark.record_pagebench_results("get-page-latest-lsn", results)
|
||||
Reference in New Issue
Block a user