From d5bb030ea112ffd5552647b1d29cc3f83697be24 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 13 Dec 2023 12:02:58 +0000 Subject: [PATCH] WIP --- libs/pageserver_api/src/models.rs | 8 ++ .../pagebench/src/getpage_latest_lsn.rs | 81 +++++++------ pageserver/pagebench/src/main.rs | 4 + .../src/trigger_initial_size_calculation.rs | 111 ++++++++++++++++++ pageserver/src/client/mgmt_api.rs | 17 ++- pageserver/src/http/routes.rs | 16 ++- pageserver/src/tenant.rs | 4 + 7 files changed, 197 insertions(+), 44 deletions(-) create mode 100644 pageserver/pagebench/src/trigger_initial_size_calculation.rs diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 8adb581707..c1c1917da5 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -375,6 +375,14 @@ pub struct TenantInfo { pub generation: Option, } +#[derive(Serialize, Deserialize, Clone)] +pub struct TenantDetails { + #[serde(flatten)] + pub tenant_info: TenantInfo, + + pub timelines: Vec, +} + /// This represents the output of the "timeline_detail" and "timeline_list" API calls. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TimelineInfo { diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index cb5fbb7abd..e83a8e66ca 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -36,6 +36,8 @@ pub(crate) struct Args { runtime: Option, #[clap(long)] per_target_rate_limit: Option, + #[clap(long)] + limit_to_first_n_targets: Option, targets: Option>, } @@ -213,23 +215,26 @@ async fn main_impl( async move { ( tenant_id, - mgmt_api_client.list_timelines(tenant_id).await.unwrap(), + mgmt_api_client.tenant_details(tenant_id).await.unwrap(), ) } }); } while let Some(res) = js.join_next().await { - let (tenant_id, tl_infos) = res.unwrap(); - for tl in tl_infos { + let (tenant_id, details) = res.unwrap(); + for timeline_id in details.timelines { timelines.push(TenantTimelineId { tenant_id, - timeline_id: tl.timeline_id, + timeline_id, }); } } } info!("timelines:\n{:?}", timelines); + info!("number of timelines:\n{:?}", timelines.len()); + + let mut js = JoinSet::new(); for timeline in &timelines { @@ -345,40 +350,42 @@ async fn main_impl( Some(rps_limit) => Box::pin(async move { let period = Duration::from_secs_f64(1.0 / (rps_limit as f64)); - let make_timeline_task: &dyn Fn(TenantTimelineId) -> Pin>> = - &|timeline| { - let sender = work_senders.get(&timeline).unwrap(); - let ranges: Vec = all_ranges - .iter() - .filter(|r| r.timeline == timeline) - .cloned() - .collect(); - let weights = rand::distributions::weighted::WeightedIndex::new( - ranges.iter().map(|v| v.len()), - ) - .unwrap(); + let make_timeline_task: &dyn Fn( + TenantTimelineId, + ) + -> Pin>> = &|timeline| { + let sender = work_senders.get(&timeline).unwrap(); + let ranges: Vec = all_ranges + .iter() + .filter(|r| r.timeline == timeline) + .cloned() + .collect(); + let weights = rand::distributions::weighted::WeightedIndex::new( + ranges.iter().map(|v| v.len()), + ) + .unwrap(); - Box::pin(async move { - let mut ticker = tokio::time::interval(period); - ticker.set_missed_tick_behavior( - /* TODO review this choice */ - tokio::time::MissedTickBehavior::Burst, - ); - loop { - ticker.tick().await; - let (range, key) = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = repository::Key::from_i128(key); - let (rel_tag, block_no) = key_to_rel_block(key) - .expect("we filter non-rel-block keys out above"); - (r, RelTagBlockNo { rel_tag, block_no }) - }; - sender.send((key, range.timeline_lsn)).await.ok().unwrap(); - } - }) - }; + Box::pin(async move { + let mut ticker = tokio::time::interval(period); + ticker.set_missed_tick_behavior( + /* TODO review this choice */ + tokio::time::MissedTickBehavior::Burst, + ); + loop { + ticker.tick().await; + let (range, key) = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = repository::Key::from_i128(key); + let (rel_tag, block_no) = key_to_rel_block(key) + .expect("we filter non-rel-block keys out above"); + (r, RelTagBlockNo { rel_tag, block_no }) + }; + sender.send((key, range.timeline_lsn)).await.ok().unwrap(); + } + }) + }; let tasks: Vec<_> = work_senders .keys() diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 1119d9f194..3f4e226f05 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -4,12 +4,14 @@ pub(crate) mod util; mod basebackup; mod getpage_latest_lsn; +mod trigger_initial_size_calculation; /// Component-level performance test for pageserver. #[derive(clap::Parser)] enum Args { GetPageLatestLsn(getpage_latest_lsn::Args), Basebackup(basebackup::Args), + TriggerInitialSizeCalculation(trigger_initial_size_calculation::Args) } fn main() { @@ -17,6 +19,8 @@ fn main() { match args { Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args), Args::Basebackup(args) => basebackup::main(args), + Args::TriggerInitialSizeCalculation(args) => trigger_initial_size_calculation::main(args), + } .unwrap() } diff --git a/pageserver/pagebench/src/trigger_initial_size_calculation.rs b/pageserver/pagebench/src/trigger_initial_size_calculation.rs new file mode 100644 index 0000000000..f233551e44 --- /dev/null +++ b/pageserver/pagebench/src/trigger_initial_size_calculation.rs @@ -0,0 +1,111 @@ +use std::sync::Arc; + +use tokio::task::{JoinError, JoinSet}; + +use crate::util::tenant_timeline_id::TenantTimelineId; + +#[derive(clap::Parser)] +pub(crate) struct Args { + #[clap(long, default_value = "http://localhost:9898")] + mgmt_api_endpoint: String, + #[clap(long, default_value = "localhost:64000")] + page_service_host_port: String, + #[clap(long)] + pageserver_jwt: Option, + #[clap(long, help = "if specified, poll mgmt api to check whether init logical size calculation has completed")] + poll_for_completion: Option, + + targets: Option>, +} + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + let _guard = logging::init( + logging::LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stderr, + ) + .unwrap(); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let main_task = rt.spawn(main_impl(args)); + rt.block_on(main_task).unwrap() +} + +async fn main_impl(args: Args) -> anyhow::Result<()> { + let args: &'static Args = Box::leak(Box::new(args)); + + let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new( + args.mgmt_api_endpoint.clone(), + None, // TODO: support jwt in args + )); + + // discover targets + let mut timelines: Vec = Vec::new(); + if args.targets.is_some() { + timelines = args.targets.clone().unwrap(); + } else { + let tenants: Vec = mgmt_api_client + .list_tenants() + .await? + .into_iter() + .map(|ti| ti.id) + .collect(); + let mut js = JoinSet::new(); + for tenant_id in tenants { + js.spawn({ + let mgmt_api_client = Arc::clone(&mgmt_api_client); + async move { + ( + tenant_id, + mgmt_api_client.tenant_details(tenant_id).await.unwrap(), + ) + } + }); + } + while let Some(res) = js.join_next().await { + let (tenant_id, details) = res.unwrap(); + for timeline_id in details.timelines { + timelines.push(TenantTimelineId { + tenant_id, + timeline_id, + }); + } + } + } + + info!("timelines:\n{:?}", timelines); + + // kick it off + + let mut js = JoinSet::new(); + for tl in timelines { + let mgmt_api_client = Arc::clone(&mgmt_api_client); + js.spawn(async move { + // TODO: API to explicitly trigger initial logical size computation + let mut info = mgmt_api_client + .timeline_info(tl.tenant_id, tl.timeline_id) + .await.unwrap(); + + if let Some(period) = args.poll_for_completion { + let mut ticker = tokio::time::interval(period); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay) + while info.current_logical_size{ + + ticker.tick().await; + mgmt_api_client.timeline_info(tenant_id, timeline_id) + } + } + + + + + }) + } + while let Some(res) = js.join_next().await { + let _: () = res.unwrap(); + } +} diff --git a/pageserver/src/client/mgmt_api.rs b/pageserver/src/client/mgmt_api.rs index c9a61ff64c..5dd9fb08ae 100644 --- a/pageserver/src/client/mgmt_api.rs +++ b/pageserver/src/client/mgmt_api.rs @@ -28,6 +28,19 @@ impl Client { Ok(serde_json::from_slice(&body)?) } + pub async fn tenant_details( + &self, + tenant_id: TenantId, + ) -> anyhow::Result { + let uri = Uri::try_from(format!("{}/v1/tenant/{tenant_id}", self.mgmt_api_endpoint))?; + let resp = self.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body)?) + } + pub async fn list_timelines( &self, tenant_id: TenantId, @@ -45,7 +58,9 @@ impl Client { } pub async fn timeline_info( - &self, tenant_id: TenantId, timeline_id: TimelineId, + &self, + tenant_id: TenantId, + timeline_id: TimelineId, ) -> anyhow::Result { let uri = Uri::try_from(format!( "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}", diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index bc839f8aba..759df34d7e 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -12,6 +12,7 @@ use hyper::header; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; +use pageserver_api::models::TenantDetails; use pageserver_api::models::{ DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest, TenantLoadRequest, TenantLocationConfigRequest, @@ -793,12 +794,15 @@ async fn tenant_status( } let state = tenant.current_state(); - Result::<_, ApiError>::Ok(TenantInfo { - id: tenant_id, - state: state.clone(), - current_physical_size: Some(current_physical_size), - attachment_status: state.attachment_status(), - generation: tenant.generation().into(), + Result::<_, ApiError>::Ok(TenantDetails { + tenant_info: TenantInfo { + id: tenant_id, + state: state.clone(), + current_physical_size: Some(current_physical_size), + attachment_status: state.attachment_status(), + generation: tenant.generation().into(), + }, + timelines: tenant.list_timeline_ids(), }) } .instrument(info_span!("tenant_status_handler", %tenant_id)) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3da2d0d473..ba4c4e72f3 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1404,6 +1404,10 @@ impl Tenant { .collect() } + pub fn list_timeline_ids(&self) -> Vec { + self.timelines.lock().unwrap().keys().cloned().collect() + } + /// This is used to create the initial 'main' timeline during bootstrapping, /// or when importing a new base backup. The caller is expected to load an /// initial image of the datadir to the new timeline after this.