diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 7a6e43c79c..8524aa21b0 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -4,12 +4,14 @@ pub(crate) mod util; mod basebackup; mod getpage_latest_lsn; +mod trigger_initial_size_calculation; /// Component-level performance test for pageserver. #[derive(clap::Parser)] enum Args { Basebackup(basebackup::Args), GetPageLatestLsn(getpage_latest_lsn::Args), + TriggerInitialSizeCalculation(trigger_initial_size_calculation::Args), } fn main() { @@ -17,6 +19,7 @@ fn main() { match args { Args::Basebackup(args) => basebackup::main(args), Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args), + Args::TriggerInitialSizeCalculation(args) => trigger_initial_size_calculation::main(args), } .unwrap() } diff --git a/pageserver/pagebench/src/trigger_initial_size_calculation.rs b/pageserver/pagebench/src/trigger_initial_size_calculation.rs new file mode 100644 index 0000000000..504492b3df --- /dev/null +++ b/pageserver/pagebench/src/trigger_initial_size_calculation.rs @@ -0,0 +1,119 @@ +use std::sync::Arc; + +use humantime::Duration; +use tokio::task::JoinSet; +use tracing::info; +use utils::{id::TenantId, logging}; + +use crate::util::tenant_timeline_id::TenantTimelineId; + +#[derive(clap::Parser)] +pub(crate) struct Args { + #[clap(long, default_value = "http://localhost:9898")] + mgmt_api_endpoint: String, + #[clap(long, default_value = "localhost:64000")] + page_service_host_port: String, + #[clap(long)] + pageserver_jwt: Option, + #[clap( + long, + help = "if specified, poll mgmt api to check whether init logical size calculation has completed" + )] + poll_for_completion: Option, + + targets: Option>, +} + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + logging::init( + logging::LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stderr, + ) + .unwrap(); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let main_task = rt.spawn(main_impl(args)); + rt.block_on(main_task).unwrap() +} + +async fn main_impl(args: Args) -> anyhow::Result<()> { + let args: &'static Args = Box::leak(Box::new(args)); + + let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + args.mgmt_api_endpoint.clone(), + None, // TODO: support jwt in args + )); + + // discover targets + let mut timelines: Vec = Vec::new(); + if args.targets.is_some() { + timelines = args.targets.clone().unwrap(); + } else { + let mut tenants: Vec = Vec::new(); + for ti in mgmt_api_client.list_tenants().await? { + if !ti.id.is_unsharded() { + anyhow::bail!( + "only unsharded tenants are supported at this time: {}", + ti.id + ); + } + tenants.push(ti.id.tenant_id) + } + let mut js = JoinSet::new(); + for tenant_id in tenants { + js.spawn({ + let mgmt_api_client = Arc::clone(&mgmt_api_client); + async move { + ( + tenant_id, + mgmt_api_client.tenant_details(tenant_id).await.unwrap(), + ) + } + }); + } + while let Some(res) = js.join_next().await { + let (tenant_id, details) = res.unwrap(); + for timeline_id in details.timelines { + timelines.push(TenantTimelineId { + tenant_id, + timeline_id, + }); + } + } + } + + info!("timelines:\n{:?}", timelines); + + // kick it off + + let mut js = JoinSet::new(); + for tl in timelines { + let mgmt_api_client = Arc::clone(&mgmt_api_client); + js.spawn(async move { + // TODO: API to explicitly trigger initial logical size computation + let mut _info = mgmt_api_client + .timeline_info(tl.tenant_id, tl.timeline_id) + .await + .unwrap(); + + if let Some(_period) = args.poll_for_completion { + todo!("unimplemented: need to rebase for this"); + // let mut ticker = tokio::time::interval(period); + // ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay) + // while info.current_logical_size_is_accurate { + // ticker.tick().await; + // mgmt_api_client.timeline_info(tenant_id, timeline_id) + // } + } + }); + } + while let Some(res) = js.join_next().await { + let _: () = res.unwrap(); + } + Ok(()) +}