mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
pagebench: WIP: command to trigger initial logical size calculation
This commit is contained in:
@@ -4,12 +4,14 @@ pub(crate) mod util;
|
||||
|
||||
mod basebackup;
|
||||
mod getpage_latest_lsn;
|
||||
mod trigger_initial_size_calculation;
|
||||
|
||||
/// Component-level performance test for pageserver.
|
||||
#[derive(clap::Parser)]
|
||||
enum Args {
|
||||
Basebackup(basebackup::Args),
|
||||
GetPageLatestLsn(getpage_latest_lsn::Args),
|
||||
TriggerInitialSizeCalculation(trigger_initial_size_calculation::Args),
|
||||
}
|
||||
|
||||
fn main() {
|
||||
@@ -17,6 +19,7 @@ fn main() {
|
||||
match args {
|
||||
Args::Basebackup(args) => basebackup::main(args),
|
||||
Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args),
|
||||
Args::TriggerInitialSizeCalculation(args) => trigger_initial_size_calculation::main(args),
|
||||
}
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
119
pageserver/pagebench/src/trigger_initial_size_calculation.rs
Normal file
119
pageserver/pagebench/src/trigger_initial_size_calculation.rs
Normal file
@@ -0,0 +1,119 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use humantime::Duration;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::info;
|
||||
use utils::{id::TenantId, logging};
|
||||
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "localhost:64000")]
|
||||
page_service_host_port: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(
|
||||
long,
|
||||
help = "if specified, poll mgmt api to check whether init logical size calculation has completed"
|
||||
)]
|
||||
poll_for_completion: Option<Duration>,
|
||||
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let main_task = rt.spawn(main_impl(args));
|
||||
rt.block_on(main_task).unwrap()
|
||||
}
|
||||
|
||||
async fn main_impl(args: Args) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
None, // TODO: support jwt in args
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
if args.targets.is_some() {
|
||||
timelines = args.targets.clone().unwrap();
|
||||
} else {
|
||||
let mut tenants: Vec<TenantId> = Vec::new();
|
||||
for ti in mgmt_api_client.list_tenants().await? {
|
||||
if !ti.id.is_unsharded() {
|
||||
anyhow::bail!(
|
||||
"only unsharded tenants are supported at this time: {}",
|
||||
ti.id
|
||||
);
|
||||
}
|
||||
tenants.push(ti.id.tenant_id)
|
||||
}
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, details) = res.unwrap();
|
||||
for timeline_id in details.timelines {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
|
||||
// kick it off
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for tl in timelines {
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
js.spawn(async move {
|
||||
// TODO: API to explicitly trigger initial logical size computation
|
||||
let mut _info = mgmt_api_client
|
||||
.timeline_info(tl.tenant_id, tl.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
if let Some(_period) = args.poll_for_completion {
|
||||
todo!("unimplemented: need to rebase for this");
|
||||
// let mut ticker = tokio::time::interval(period);
|
||||
// ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay)
|
||||
// while info.current_logical_size_is_accurate {
|
||||
// ticker.tick().await;
|
||||
// mgmt_api_client.timeline_info(tenant_id, timeline_id)
|
||||
// }
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let _: () = res.unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user