pq bench: proper shutdown

This commit is contained in:
Christian Schwarz
2023-11-02 17:07:00 +00:00
committed by Christian Schwarz
parent fefdecd13b
commit 4ea2834711

View File

@@ -1,3 +1,4 @@
use anyhow::Context;
use clap::Parser;
use futures::{SinkExt, TryStreamExt};
use hyper::client::conn::Parts;
@@ -194,7 +195,7 @@ fn timeline(
let tenant_id = tenant_id.clone();
let timeline_id = timeline_id.clone();
let task = tokio::spawn(async move {
let mut client = getpage_client::Client::new(tenant_id, timeline_id)
let mut client = getpage_client::Client::new(tenant_id.clone(), timeline_id.clone())
.await
.unwrap();
for i in 0..args.num_requests {
@@ -211,8 +212,11 @@ fn timeline(
let (rel_tag, block_no) = key_to_rel_block(key).expect("we just checked");
RelTagBlockNo { rel_tag, block_no }
};
client.getpage(key, lsn).await.unwrap();
client.getpage(key, lsn).await.with_context(|| {
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
}).unwrap();
}
client.shutdown().await;
});
tasks.push(task);
}
@@ -239,12 +243,14 @@ mod getpage_client {
};
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::lsn::Lsn;
use crate::RelTagBlockNo;
pub(crate) struct Client {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
@@ -257,9 +263,19 @@ mod getpage_client {
let (client, connection) =
tokio_postgres::connect("host=localhost port=64000", postgres::NoTls).await?;
let conn_task = tokio::spawn(async move {
let res = connection.await;
res.unwrap();
let conn_task_cancel = CancellationToken::new();
let conn_task = tokio::spawn({
let conn_task_cancel = conn_task_cancel.clone();
async move {
tokio::select! {
_ = conn_task_cancel.cancelled() => {
return;
}
res = connection => {
res.unwrap();
}
}
}
});
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = client
@@ -269,10 +285,16 @@ mod getpage_client {
Ok(Self {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
})
}
}
pub async fn shutdown(mut self) {
let _ = self.cancel_on_client_drop.take();
self.conn_task.await.unwrap();
}
pub async fn getpage(
&mut self,
key: RelTagBlockNo,
@@ -298,7 +320,7 @@ mod getpage_client {
PagestreamBeMessage::Exists(_) => todo!(),
PagestreamBeMessage::Nblocks(_) => todo!(),
PagestreamBeMessage::GetPage(p) => Ok(p),
PagestreamBeMessage::Error(_) => todo!(),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::DbSize(_) => todo!(),
}
}