From cdc09bab0b5dd1fcf7a3e232f0f322922185f5a7 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 18 Dec 2023 16:04:03 +0000 Subject: [PATCH] pagebench: centralize target discovery --- pageserver/pagebench/src/basebackup.rs | 51 ++++------------- pageserver/pagebench/src/cli.rs | 1 + pageserver/pagebench/src/cli/targets.rs | 37 ++++++++++++ .../pagebench/src/getpage_latest_lsn.rs | 57 +++++-------------- pageserver/pagebench/src/main.rs | 1 + .../src/trigger_initial_size_calculation.rs | 57 +++++-------------- pageserver/pagebench/src/util.rs | 1 + .../pagebench/src/util/discover_timelines.rs | 45 +++++++++++++++ .../pagebench/src/util/tenant_timeline_id.rs | 2 +- 9 files changed, 126 insertions(+), 126 deletions(-) create mode 100644 pageserver/pagebench/src/cli.rs create mode 100644 pageserver/pagebench/src/cli/targets.rs create mode 100644 pageserver/pagebench/src/util/discover_timelines.rs diff --git a/pageserver/pagebench/src/basebackup.rs b/pageserver/pagebench/src/basebackup.rs index 3ffd6bfa80..b6c1080669 100644 --- a/pageserver/pagebench/src/basebackup.rs +++ b/pageserver/pagebench/src/basebackup.rs @@ -1,6 +1,6 @@ use anyhow::Context; use pageserver_client::page_service::BasebackupRequest; -use utils::id::TenantId; + use utils::lsn::Lsn; use rand::prelude::*; @@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use crate::cli; use crate::util::tenant_timeline_id::TenantTimelineId; /// basebackup@LatestLSN @@ -34,6 +35,8 @@ pub(crate) struct Args { gzip_probability: f64, #[clap(long)] runtime: Option, + #[clap(long)] + limit_to_first_n_targets: Option, targets: Option>, } @@ -185,44 +188,14 @@ async fn main_impl( )); // discover targets - let mut timelines: Vec = Vec::new(); - if args.targets.is_some() { - timelines = args.targets.clone().unwrap(); - } else { - let mut tenants: Vec = Vec::new(); - for ti in mgmt_api_client.list_tenants().await? { - if !ti.id.is_unsharded() { - anyhow::bail!( - "only unsharded tenants are supported at this time: {}", - ti.id - ); - } - tenants.push(ti.id.tenant_id) - } - let mut js = JoinSet::new(); - for tenant_id in tenants { - js.spawn({ - let mgmt_api_client = Arc::clone(&mgmt_api_client); - async move { - ( - tenant_id, - mgmt_api_client.list_timelines(tenant_id).await.unwrap(), - ) - } - }); - } - while let Some(res) = js.join_next().await { - let (tenant_id, tl_infos) = res.unwrap(); - for tl in tl_infos { - timelines.push(TenantTimelineId { - tenant_id, - timeline_id: tl.timeline_id, - }); - } - } - } - - info!("timelines:\n{:?}", timelines); + let timelines: Vec = cli::targets::discover( + &mgmt_api_client, + cli::targets::Spec { + limit_to_first_n_targets: args.limit_to_first_n_targets, + targets: args.targets.clone(), + }, + ) + .await?; let mut js = JoinSet::new(); for timeline in &timelines { diff --git a/pageserver/pagebench/src/cli.rs b/pageserver/pagebench/src/cli.rs new file mode 100644 index 0000000000..6026a191ee --- /dev/null +++ b/pageserver/pagebench/src/cli.rs @@ -0,0 +1 @@ +pub(crate) mod targets; diff --git a/pageserver/pagebench/src/cli/targets.rs b/pageserver/pagebench/src/cli/targets.rs new file mode 100644 index 0000000000..67822d1c7a --- /dev/null +++ b/pageserver/pagebench/src/cli/targets.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use pageserver_client::mgmt_api; +use tracing::info; + +use crate::util::{ + discover_timelines::get_pageserver_tenant_timelines, tenant_timeline_id::TenantTimelineId, +}; + +pub(crate) struct Spec { + pub(crate) limit_to_first_n_targets: Option, + pub(crate) targets: Option>, +} + +pub(crate) async fn discover( + api_client: &Arc, + spec: Spec, +) -> anyhow::Result> { + let mut timelines = if let Some(targets) = spec.targets { + targets + } else { + get_pageserver_tenant_timelines(api_client).await? + }; + + if let Some(limit) = spec.limit_to_first_n_targets { + timelines.sort(); // for determinism + timelines.truncate(limit); + if timelines.len() < limit { + anyhow::bail!("pageserver has less than limit_to_first_n_targets={limit} tenants"); + } + } + + info!("timelines:\n{:?}", timelines); + info!("number of timelines:\n{:?}", timelines.len()); + + Ok(timelines) +} diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index 6cea13323d..fac3791db5 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -4,7 +4,7 @@ use pageserver::pgdatadir_mapping::key_to_rel_block; use pageserver::repository; use pageserver_api::key::is_rel_block_key; use pageserver_client::page_service::RelTagBlockNo; -use utils::id::TenantId; + use utils::lsn::Lsn; use rand::prelude::*; @@ -22,6 +22,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use crate::cli; + use crate::util::tenant_timeline_id::TenantTimelineId; /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. @@ -31,6 +33,8 @@ pub(crate) struct Args { mgmt_api_endpoint: String, #[clap(long, default_value = "postgres://postgres@localhost:64000")] page_service_connstring: String, + #[clap(long)] + pageserver_jwt: Option, #[clap(long, default_value = "1")] num_clients: NonZeroUsize, #[clap(long)] @@ -195,51 +199,18 @@ async fn main_impl( let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), - None, // TODO: support jwt in args + args.pageserver_jwt.as_deref(), )); // discover targets - let mut timelines: Vec = Vec::new(); - if args.targets.is_some() { - timelines = args.targets.clone().unwrap(); - } else { - let mut tenants: Vec = Vec::new(); - for ti in mgmt_api_client.list_tenants().await? { - if !ti.id.is_unsharded() { - anyhow::bail!( - "only unsharded tenants are supported at this time: {}", - ti.id - ); - } - tenants.push(ti.id.tenant_id) - } - let mut js = JoinSet::new(); - for tenant_id in tenants { - js.spawn({ - let mgmt_api_client = Arc::clone(&mgmt_api_client); - async move { - ( - tenant_id, - mgmt_api_client.tenant_details(tenant_id).await.unwrap(), - ) - } - }); - } - while let Some(res) = js.join_next().await { - let (tenant_id, details) = res.unwrap(); - for timeline_id in details.timelines { - timelines.push(TenantTimelineId { - tenant_id, - timeline_id, - }); - } - } - } - - info!("timelines:\n{:?}", timelines); - info!("number of timelines:\n{:?}", timelines.len()); - - + let timelines: Vec = cli::targets::discover( + &mgmt_api_client, + cli::targets::Spec { + limit_to_first_n_targets: args.limit_to_first_n_targets, + targets: args.targets.clone(), + }, + ) + .await?; let mut js = JoinSet::new(); for timeline in &timelines { diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 8524aa21b0..8e3a446d9d 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -1,5 +1,6 @@ use clap::Parser; +pub(crate) mod cli; pub(crate) mod util; mod basebackup; diff --git a/pageserver/pagebench/src/trigger_initial_size_calculation.rs b/pageserver/pagebench/src/trigger_initial_size_calculation.rs index 504492b3df..ba0496f733 100644 --- a/pageserver/pagebench/src/trigger_initial_size_calculation.rs +++ b/pageserver/pagebench/src/trigger_initial_size_calculation.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use humantime::Duration; use tokio::task::JoinSet; -use tracing::info; -use utils::{id::TenantId, logging}; -use crate::util::tenant_timeline_id::TenantTimelineId; +use utils::logging; + +use crate::{cli, util::tenant_timeline_id::TenantTimelineId}; #[derive(clap::Parser)] pub(crate) struct Args { @@ -20,7 +20,8 @@ pub(crate) struct Args { help = "if specified, poll mgmt api to check whether init logical size calculation has completed" )] poll_for_completion: Option, - + #[clap(long)] + limit_to_first_n_targets: Option, targets: Option>, } @@ -46,48 +47,18 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( args.mgmt_api_endpoint.clone(), - None, // TODO: support jwt in args + args.pageserver_jwt.as_deref(), )); // discover targets - let mut timelines: Vec = Vec::new(); - if args.targets.is_some() { - timelines = args.targets.clone().unwrap(); - } else { - let mut tenants: Vec = Vec::new(); - for ti in mgmt_api_client.list_tenants().await? { - if !ti.id.is_unsharded() { - anyhow::bail!( - "only unsharded tenants are supported at this time: {}", - ti.id - ); - } - tenants.push(ti.id.tenant_id) - } - let mut js = JoinSet::new(); - for tenant_id in tenants { - js.spawn({ - let mgmt_api_client = Arc::clone(&mgmt_api_client); - async move { - ( - tenant_id, - mgmt_api_client.tenant_details(tenant_id).await.unwrap(), - ) - } - }); - } - while let Some(res) = js.join_next().await { - let (tenant_id, details) = res.unwrap(); - for timeline_id in details.timelines { - timelines.push(TenantTimelineId { - tenant_id, - timeline_id, - }); - } - } - } - - info!("timelines:\n{:?}", timelines); + let timelines: Vec = cli::targets::discover( + &mgmt_api_client, + cli::targets::Spec { + limit_to_first_n_targets: args.limit_to_first_n_targets, + targets: args.targets.clone(), + }, + ) + .await?; // kick it off diff --git a/pageserver/pagebench/src/util.rs b/pageserver/pagebench/src/util.rs index b9d857562f..d3ade58d33 100644 --- a/pageserver/pagebench/src/util.rs +++ b/pageserver/pagebench/src/util.rs @@ -1,2 +1,3 @@ pub(crate) mod connstring; +pub(crate) mod discover_timelines; pub(crate) mod tenant_timeline_id; diff --git a/pageserver/pagebench/src/util/discover_timelines.rs b/pageserver/pagebench/src/util/discover_timelines.rs new file mode 100644 index 0000000000..7464c2193a --- /dev/null +++ b/pageserver/pagebench/src/util/discover_timelines.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use pageserver_client::mgmt_api; +use tokio::task::JoinSet; +use utils::id::TenantId; + +use super::tenant_timeline_id::TenantTimelineId; + +pub(crate) async fn get_pageserver_tenant_timelines( + api_client: &Arc, +) -> anyhow::Result> { + let mut timelines: Vec = Vec::new(); + let mut tenants: Vec = Vec::new(); + for ti in api_client.list_tenants().await? { + if !ti.id.is_unsharded() { + anyhow::bail!( + "only unsharded tenants are supported at this time: {}", + ti.id + ); + } + tenants.push(ti.id.tenant_id) + } + let mut js = JoinSet::new(); + for tenant_id in tenants { + js.spawn({ + let mgmt_api_client = Arc::clone(api_client); + async move { + ( + tenant_id, + mgmt_api_client.tenant_details(tenant_id).await.unwrap(), + ) + } + }); + } + while let Some(res) = js.join_next().await { + let (tenant_id, details) = res.unwrap(); + for timeline_id in details.timelines { + timelines.push(TenantTimelineId { + tenant_id, + timeline_id, + }); + } + } + Ok(timelines) +} diff --git a/pageserver/pagebench/src/util/tenant_timeline_id.rs b/pageserver/pagebench/src/util/tenant_timeline_id.rs index 8bcbac8aef..dfc219d1ce 100644 --- a/pageserver/pagebench/src/util/tenant_timeline_id.rs +++ b/pageserver/pagebench/src/util/tenant_timeline_id.rs @@ -3,7 +3,7 @@ use std::str::FromStr; use anyhow::Context; use utils::id::{TenantId, TimelineId}; -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord)] pub(crate) struct TenantTimelineId { pub(crate) tenant_id: TenantId, pub(crate) timeline_id: TimelineId,