diff --git a/pageserver/src/bin/getpage_bench_libpq.rs b/pageserver/src/bin/getpage_bench_libpq.rs index 5746bf0493..8a1718a73b 100644 --- a/pageserver/src/bin/getpage_bench_libpq.rs +++ b/pageserver/src/bin/getpage_bench_libpq.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use clap::Parser; use futures::{SinkExt, TryStreamExt}; use hyper::client::conn::Parts; @@ -194,7 +195,7 @@ fn timeline( let tenant_id = tenant_id.clone(); let timeline_id = timeline_id.clone(); let task = tokio::spawn(async move { - let mut client = getpage_client::Client::new(tenant_id, timeline_id) + let mut client = getpage_client::Client::new(tenant_id.clone(), timeline_id.clone()) .await .unwrap(); for i in 0..args.num_requests { @@ -211,8 +212,11 @@ fn timeline( let (rel_tag, block_no) = key_to_rel_block(key).expect("we just checked"); RelTagBlockNo { rel_tag, block_no } }; - client.getpage(key, lsn).await.unwrap(); + client.getpage(key, lsn).await.with_context(|| { + format!("getpage for tenant {} timeline {}", tenant_id, timeline_id) + }).unwrap(); } + client.shutdown().await; }); tasks.push(task); } @@ -239,12 +243,14 @@ mod getpage_client { }; use tokio::task::JoinHandle; use tokio_stream::StreamExt; + use tokio_util::sync::CancellationToken; use utils::lsn::Lsn; use crate::RelTagBlockNo; pub(crate) struct Client { copy_both: Pin>>, + cancel_on_client_drop: Option, conn_task: JoinHandle<()>, } @@ -257,9 +263,19 @@ mod getpage_client { let (client, connection) = tokio_postgres::connect("host=localhost port=64000", postgres::NoTls).await?; - let conn_task = tokio::spawn(async move { - let res = connection.await; - res.unwrap(); + let conn_task_cancel = CancellationToken::new(); + let conn_task = tokio::spawn({ + let conn_task_cancel = conn_task_cancel.clone(); + async move { + tokio::select! { + _ = conn_task_cancel.cancelled() => { + return; + } + res = connection => { + res.unwrap(); + } + } + } }); let copy_both: tokio_postgres::CopyBothDuplex = client @@ -269,10 +285,16 @@ mod getpage_client { Ok(Self { copy_both: Box::pin(copy_both), conn_task, + cancel_on_client_drop: Some(conn_task_cancel.drop_guard()), }) } } + pub async fn shutdown(mut self) { + let _ = self.cancel_on_client_drop.take(); + self.conn_task.await.unwrap(); + } + pub async fn getpage( &mut self, key: RelTagBlockNo, @@ -298,7 +320,7 @@ mod getpage_client { PagestreamBeMessage::Exists(_) => todo!(), PagestreamBeMessage::Nblocks(_) => todo!(), PagestreamBeMessage::GetPage(p) => Ok(p), - PagestreamBeMessage::Error(_) => todo!(), + PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), PagestreamBeMessage::DbSize(_) => todo!(), } }