fixup 'pagebench: getpage: better logging about when load starts & cancellation'

This commit is contained in:
Christian Schwarz
2024-01-10 11:27:36 +00:00
parent 4dd33718e5
commit 5ec8e1a6fe

View File

@@ -216,7 +216,6 @@ async fn main_impl(
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
num_client_tasks + num_live_stats_dump + num_work_sender_tasks + num_main_impl,
));
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
tokio::spawn({
let stats = Arc::clone(&live_stats);
@@ -236,25 +235,26 @@ async fn main_impl(
}
});
let mut work_senders = HashMap::new();
let cancel = CancellationToken::new();
let mut work_senders: HashMap<TenantTimelineId, _> = HashMap::new();
let mut tasks = Vec::new();
let cancel_clients = CancellationToken::new();
for tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
work_senders.insert(*tl, sender);
tasks.push(tokio::spawn(client(
args,
*tl,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
Arc::clone(&live_stats),
cancel_clients.child_token(),
cancel.clone(),
)));
}
let work_sender: Pin<Box<dyn Send + Future<Output = ()>>> = {
let start_work_barrier = start_work_barrier.clone();
let cancel = cancel.clone();
match args.per_target_rate_limit {
None => Box::pin(async move {
let weights = rand::distributions::weighted::WeightedIndex::new(
@@ -264,7 +264,7 @@ async fn main_impl(
start_work_barrier.wait().await;
loop {
while !cancel.is_cancelled() {
let (timeline, req) = {
let mut rng = rand::thread_rng();
let r = &all_ranges[weights.sample(&mut rng)];
@@ -284,12 +284,13 @@ async fn main_impl(
};
let sender = work_senders.get(&timeline).unwrap();
// TODO: what if this blocks?
sender.send(req).await.ok().unwrap();
if sender.send(req).await.is_err() {
assert!(cancel.is_cancelled(), "client has gone away unexpectedly");
}
}
}),
Some(rps_limit) => Box::pin(async move {
let period = Duration::from_secs_f64(1.0 / (rps_limit as f64));
let make_timeline_task: &dyn Fn(
TenantTimelineId,
)
@@ -305,13 +306,14 @@ async fn main_impl(
)
.unwrap();
let cancel = cancel.clone();
Box::pin(async move {
let mut ticker = tokio::time::interval(period);
ticker.set_missed_tick_behavior(
/* TODO review this choice */
tokio::time::MissedTickBehavior::Burst,
);
loop {
while !cancel.is_cancelled() {
ticker.tick().await;
let req = {
let mut rng = rand::thread_rng();
@@ -328,14 +330,16 @@ async fn main_impl(
blkno: block_no,
}
};
sender.send(req).await.ok().unwrap();
if sender.send(req).await.is_err() {
assert!(cancel.is_cancelled(), "client has gone away unexpectedly");
}
}
})
};
let tasks: Vec<_> = work_senders
.keys()
.map(|tl| make_timeline_task(**tl))
.map(|tl| make_timeline_task(*tl))
.collect();
start_work_barrier.wait().await;
@@ -345,24 +349,23 @@ async fn main_impl(
}
};
info!("waiting for everything to become ready");
start_work_barrier.wait().await;
info!("workload is starting");
let work_sender_task = tokio::spawn(work_sender);
if let Some(runtime) = args.runtime {
match tokio::time::timeout(runtime.into(), work_sender).await {
Ok(()) => unreachable!("work sender never terminates"),
Err(_timeout) => {
info!("runtime over, cancelling clients");
// this implicitly drops the work_senders, making all the clients exit eventually
// however, work_senders can have a queue, so, also add a cancellation token
cancel_clients.cancel();
}
}
info!("waiting for everything to become ready");
start_work_barrier.wait().await;
info!("work started");
tokio::time::sleep(runtime.into()).await;
info!("runtime over, signalling cancellation");
cancel.cancel();
work_sender_task.await.unwrap();
info!("work sender exited");
} else {
work_sender.await;
work_sender_task.await.unwrap();
unreachable!("work sender never terminates");
}
info!("joining clients");
for t in tasks {
t.await.unwrap();
}
@@ -392,12 +395,9 @@ async fn client(
timeline: TenantTimelineId,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<PagestreamGetPageRequest>,
all_work_done_barrier: Arc<Barrier>,
live_stats: Arc<LiveStats>,
cancel: CancellationToken,
) {
start_work_barrier.wait().await;
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
@@ -406,6 +406,8 @@ async fn client(
.await
.unwrap();
start_work_barrier.wait().await;
while let Some(req) =
tokio::select! { work = work.recv() => { work } , _ = cancel.cancelled() => { return; } }
{
@@ -423,6 +425,4 @@ async fn client(
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
});
}
all_work_done_barrier.wait().await;
}