This commit is contained in:
Christian Schwarz
2023-12-13 12:02:58 +00:00
parent b3c811731a
commit d5bb030ea1
7 changed files with 197 additions and 44 deletions

View File

@@ -375,6 +375,14 @@ pub struct TenantInfo {
pub generation: Option<u32>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantDetails {
#[serde(flatten)]
pub tenant_info: TenantInfo,
pub timelines: Vec<TimelineId>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {

View File

@@ -36,6 +36,8 @@ pub(crate) struct Args {
runtime: Option<humantime::Duration>,
#[clap(long)]
per_target_rate_limit: Option<usize>,
#[clap(long)]
limit_to_first_n_targets: Option<usize>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -213,23 +215,26 @@ async fn main_impl(
async move {
(
tenant_id,
mgmt_api_client.list_timelines(tenant_id).await.unwrap(),
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
)
}
});
}
while let Some(res) = js.join_next().await {
let (tenant_id, tl_infos) = res.unwrap();
for tl in tl_infos {
let (tenant_id, details) = res.unwrap();
for timeline_id in details.timelines {
timelines.push(TenantTimelineId {
tenant_id,
timeline_id: tl.timeline_id,
timeline_id,
});
}
}
}
info!("timelines:\n{:?}", timelines);
info!("number of timelines:\n{:?}", timelines.len());
let mut js = JoinSet::new();
for timeline in &timelines {
@@ -345,40 +350,42 @@ async fn main_impl(
Some(rps_limit) => Box::pin(async move {
let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
let make_timeline_task: &dyn Fn(TenantTimelineId) -> Pin<Box<dyn Send + Future<Output = ()>>> =
&|timeline| {
let sender = work_senders.get(&timeline).unwrap();
let ranges: Vec<KeyRange> = all_ranges
.iter()
.filter(|r| r.timeline == timeline)
.cloned()
.collect();
let weights = rand::distributions::weighted::WeightedIndex::new(
ranges.iter().map(|v| v.len()),
)
.unwrap();
let make_timeline_task: &dyn Fn(
TenantTimelineId,
)
-> Pin<Box<dyn Send + Future<Output = ()>>> = &|timeline| {
let sender = work_senders.get(&timeline).unwrap();
let ranges: Vec<KeyRange> = all_ranges
.iter()
.filter(|r| r.timeline == timeline)
.cloned()
.collect();
let weights = rand::distributions::weighted::WeightedIndex::new(
ranges.iter().map(|v| v.len()),
)
.unwrap();
Box::pin(async move {
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(
/* TODO review this choice */
tokio::time::MissedTickBehavior::Burst,
);
loop {
ticker.tick().await;
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) = key_to_rel_block(key)
.expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
})
};
Box::pin(async move {
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(
/* TODO review this choice */
tokio::time::MissedTickBehavior::Burst,
);
loop {
ticker.tick().await;
let (range, key) = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
let (rel_tag, block_no) = key_to_rel_block(key)
.expect("we filter non-rel-block keys out above");
(r, RelTagBlockNo { rel_tag, block_no })
};
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
}
})
};
let tasks: Vec<_> = work_senders
.keys()

View File

@@ -4,12 +4,14 @@ pub(crate) mod util;
mod basebackup;
mod getpage_latest_lsn;
mod trigger_initial_size_calculation;
/// Component-level performance test for pageserver.
#[derive(clap::Parser)]
enum Args {
GetPageLatestLsn(getpage_latest_lsn::Args),
Basebackup(basebackup::Args),
TriggerInitialSizeCalculation(trigger_initial_size_calculation::Args)
}
fn main() {
@@ -17,6 +19,8 @@ fn main() {
match args {
Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args),
Args::Basebackup(args) => basebackup::main(args),
Args::TriggerInitialSizeCalculation(args) => trigger_initial_size_calculation::main(args),
}
.unwrap()
}

View File

@@ -0,0 +1,111 @@
use std::sync::Arc;
use tokio::task::{JoinError, JoinSet};
use crate::util::tenant_timeline_id::TenantTimelineId;
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "localhost:64000")]
page_service_host_port: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, help = "if specified, poll mgmt api to check whether init logical size calculation has completed")]
poll_for_completion: Option<Duration>,
targets: Option<Vec<TenantTimelineId>>,
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let _guard = logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,
)
.unwrap();
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let main_task = rt.spawn(main_impl(args));
rt.block_on(main_task).unwrap()
}
async fn main_impl(args: Args) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new(
args.mgmt_api_endpoint.clone(),
None, // TODO: support jwt in args
));
// discover targets
let mut timelines: Vec<TenantTimelineId> = Vec::new();
if args.targets.is_some() {
timelines = args.targets.clone().unwrap();
} else {
let tenants: Vec<TenantId> = mgmt_api_client
.list_tenants()
.await?
.into_iter()
.map(|ti| ti.id)
.collect();
let mut js = JoinSet::new();
for tenant_id in tenants {
js.spawn({
let mgmt_api_client = Arc::clone(&mgmt_api_client);
async move {
(
tenant_id,
mgmt_api_client.tenant_details(tenant_id).await.unwrap(),
)
}
});
}
while let Some(res) = js.join_next().await {
let (tenant_id, details) = res.unwrap();
for timeline_id in details.timelines {
timelines.push(TenantTimelineId {
tenant_id,
timeline_id,
});
}
}
}
info!("timelines:\n{:?}", timelines);
// kick it off
let mut js = JoinSet::new();
for tl in timelines {
let mgmt_api_client = Arc::clone(&mgmt_api_client);
js.spawn(async move {
// TODO: API to explicitly trigger initial logical size computation
let mut info = mgmt_api_client
.timeline_info(tl.tenant_id, tl.timeline_id)
.await.unwrap();
if let Some(period) = args.poll_for_completion {
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay)
while info.current_logical_size{
ticker.tick().await;
mgmt_api_client.timeline_info(tenant_id, timeline_id)
}
}
})
}
while let Some(res) = js.join_next().await {
let _: () = res.unwrap();
}
}

View File

@@ -28,6 +28,19 @@ impl Client {
Ok(serde_json::from_slice(&body)?)
}
pub async fn tenant_details(
&self,
tenant_id: TenantId,
) -> anyhow::Result<pageserver_api::models::TenantDetails> {
let uri = Uri::try_from(format!("{}/v1/tenant/{tenant_id}", self.mgmt_api_endpoint))?;
let resp = self.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body)?)
}
pub async fn list_timelines(
&self,
tenant_id: TenantId,
@@ -45,7 +58,9 @@ impl Client {
}
pub async fn timeline_info(
&self, tenant_id: TenantId, timeline_id: TimelineId,
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<pageserver_api::models::TimelineInfo> {
let uri = Uri::try_from(format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}",

View File

@@ -12,6 +12,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
@@ -793,12 +794,15 @@ async fn tenant_status(
}
let state = tenant.current_state();
Result::<_, ApiError>::Ok(TenantInfo {
id: tenant_id,
state: state.clone(),
current_physical_size: Some(current_physical_size),
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
Result::<_, ApiError>::Ok(TenantDetails {
tenant_info: TenantInfo {
id: tenant_id,
state: state.clone(),
current_physical_size: Some(current_physical_size),
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
},
timelines: tenant.list_timeline_ids(),
})
}
.instrument(info_span!("tenant_status_handler", %tenant_id))

View File

@@ -1404,6 +1404,10 @@ impl Tenant {
.collect()
}
pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
self.timelines.lock().unwrap().keys().cloned().collect()
}
/// This is used to create the initial 'main' timeline during bootstrapping,
/// or when importing a new base backup. The caller is expected to load an
/// initial image of the datadir to the new timeline after this.