mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
pagebench: centralize target discovery
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use anyhow::Context;
|
||||
use pageserver_client::page_service::BasebackupRequest;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
@@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::cli;
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
/// basebackup@LatestLSN
|
||||
@@ -34,6 +35,8 @@ pub(crate) struct Args {
|
||||
gzip_probability: f64,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long)]
|
||||
limit_to_first_n_targets: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
@@ -185,44 +188,14 @@ async fn main_impl(
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
if args.targets.is_some() {
|
||||
timelines = args.targets.clone().unwrap();
|
||||
} else {
|
||||
let mut tenants: Vec<TenantId> = Vec::new();
|
||||
for ti in mgmt_api_client.list_tenants().await? {
|
||||
if !ti.id.is_unsharded() {
|
||||
anyhow::bail!(
|
||||
"only unsharded tenants are supported at this time: {}",
|
||||
ti.id
|
||||
);
|
||||
}
|
||||
tenants.push(ti.id.tenant_id)
|
||||
}
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.list_timelines(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, tl_infos) = res.unwrap();
|
||||
for tl in tl_infos {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id: tl.timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
cli::targets::Spec {
|
||||
limit_to_first_n_targets: args.limit_to_first_n_targets,
|
||||
targets: args.targets.clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for timeline in &timelines {
|
||||
|
||||
1
pageserver/pagebench/src/cli.rs
Normal file
1
pageserver/pagebench/src/cli.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub(crate) mod targets;
|
||||
37
pageserver/pagebench/src/cli/targets.rs
Normal file
37
pageserver/pagebench/src/cli/targets.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_client::mgmt_api;
|
||||
use tracing::info;
|
||||
|
||||
use crate::util::{
|
||||
discover_timelines::get_pageserver_tenant_timelines, tenant_timeline_id::TenantTimelineId,
|
||||
};
|
||||
|
||||
pub(crate) struct Spec {
|
||||
pub(crate) limit_to_first_n_targets: Option<usize>,
|
||||
pub(crate) targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
pub(crate) async fn discover(
|
||||
api_client: &Arc<mgmt_api::Client>,
|
||||
spec: Spec,
|
||||
) -> anyhow::Result<Vec<TenantTimelineId>> {
|
||||
let mut timelines = if let Some(targets) = spec.targets {
|
||||
targets
|
||||
} else {
|
||||
get_pageserver_tenant_timelines(api_client).await?
|
||||
};
|
||||
|
||||
if let Some(limit) = spec.limit_to_first_n_targets {
|
||||
timelines.sort(); // for determinism
|
||||
timelines.truncate(limit);
|
||||
if timelines.len() < limit {
|
||||
anyhow::bail!("pageserver has less than limit_to_first_n_targets={limit} tenants");
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
info!("number of timelines:\n{:?}", timelines.len());
|
||||
|
||||
Ok(timelines)
|
||||
}
|
||||
@@ -4,7 +4,7 @@ use pageserver::pgdatadir_mapping::key_to_rel_block;
|
||||
use pageserver::repository;
|
||||
use pageserver_api::key::is_rel_block_key;
|
||||
use pageserver_client::page_service::RelTagBlockNo;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
@@ -22,6 +22,8 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::cli;
|
||||
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
@@ -31,6 +33,8 @@ pub(crate) struct Args {
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
@@ -195,51 +199,18 @@ async fn main_impl(
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
None, // TODO: support jwt in args
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
if args.targets.is_some() {
|
||||
timelines = args.targets.clone().unwrap();
|
||||
} else {
|
||||
let mut tenants: Vec<TenantId> = Vec::new();
|
||||
for ti in mgmt_api_client.list_tenants().await? {
|
||||
if !ti.id.is_unsharded() {
|
||||
anyhow::bail!(
|
||||
"only unsharded tenants are supported at this time: {}",
|
||||
ti.id
|
||||
);
|
||||
}
|
||||
tenants.push(ti.id.tenant_id)
|
||||
}
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, details) = res.unwrap();
|
||||
for timeline_id in details.timelines {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
info!("number of timelines:\n{:?}", timelines.len());
|
||||
|
||||
|
||||
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
cli::targets::Spec {
|
||||
limit_to_first_n_targets: args.limit_to_first_n_targets,
|
||||
targets: args.targets.clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for timeline in &timelines {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use clap::Parser;
|
||||
|
||||
pub(crate) mod cli;
|
||||
pub(crate) mod util;
|
||||
|
||||
mod basebackup;
|
||||
|
||||
@@ -2,10 +2,10 @@ use std::sync::Arc;
|
||||
|
||||
use humantime::Duration;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::info;
|
||||
use utils::{id::TenantId, logging};
|
||||
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
use utils::logging;
|
||||
|
||||
use crate::{cli, util::tenant_timeline_id::TenantTimelineId};
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
@@ -20,7 +20,8 @@ pub(crate) struct Args {
|
||||
help = "if specified, poll mgmt api to check whether init logical size calculation has completed"
|
||||
)]
|
||||
poll_for_completion: Option<Duration>,
|
||||
|
||||
#[clap(long)]
|
||||
limit_to_first_n_targets: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
@@ -46,48 +47,18 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
None, // TODO: support jwt in args
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
if args.targets.is_some() {
|
||||
timelines = args.targets.clone().unwrap();
|
||||
} else {
|
||||
let mut tenants: Vec<TenantId> = Vec::new();
|
||||
for ti in mgmt_api_client.list_tenants().await? {
|
||||
if !ti.id.is_unsharded() {
|
||||
anyhow::bail!(
|
||||
"only unsharded tenants are supported at this time: {}",
|
||||
ti.id
|
||||
);
|
||||
}
|
||||
tenants.push(ti.id.tenant_id)
|
||||
}
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, details) = res.unwrap();
|
||||
for timeline_id in details.timelines {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
let timelines: Vec<TenantTimelineId> = cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
cli::targets::Spec {
|
||||
limit_to_first_n_targets: args.limit_to_first_n_targets,
|
||||
targets: args.targets.clone(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// kick it off
|
||||
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
pub(crate) mod connstring;
|
||||
pub(crate) mod discover_timelines;
|
||||
pub(crate) mod tenant_timeline_id;
|
||||
|
||||
45
pageserver/pagebench/src/util/discover_timelines.rs
Normal file
45
pageserver/pagebench/src/util/discover_timelines.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use pageserver_client::mgmt_api;
|
||||
use tokio::task::JoinSet;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use super::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
pub(crate) async fn get_pageserver_tenant_timelines(
|
||||
api_client: &Arc<mgmt_api::Client>,
|
||||
) -> anyhow::Result<Vec<TenantTimelineId>> {
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
let mut tenants: Vec<TenantId> = Vec::new();
|
||||
for ti in api_client.list_tenants().await? {
|
||||
if !ti.id.is_unsharded() {
|
||||
anyhow::bail!(
|
||||
"only unsharded tenants are supported at this time: {}",
|
||||
ti.id
|
||||
);
|
||||
}
|
||||
tenants.push(ti.id.tenant_id)
|
||||
}
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, details) = res.unwrap();
|
||||
for timeline_id in details.timelines {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(timelines)
|
||||
}
|
||||
@@ -3,7 +3,7 @@ use std::str::FromStr;
|
||||
use anyhow::Context;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)]
|
||||
pub(crate) struct TenantTimelineId {
|
||||
pub(crate) tenant_id: TenantId,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
|
||||
Reference in New Issue
Block a user