client & getpage bench: distinguish between page_service client and client in pagestream mode

This commit is contained in:
Christian Schwarz
2023-12-05 18:45:20 +00:00
parent 60cc3a3397
commit 4fc3596677
2 changed files with 73 additions and 23 deletions

View File

@@ -376,13 +376,14 @@ async fn client(
) {
start_work_barrier.wait().await;
let mut client = pageserver::client::page_service::Client::new(
args.page_service_connstring.clone(),
timeline.tenant_id,
timeline.timeline_id,
)
.await
.unwrap();
let client =
pageserver::client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(timeline.tenant_id, timeline.timeline_id)
.await
.unwrap();
while let Some((key, lsn)) = work.recv().await {
let start = Instant::now();

View File

@@ -9,6 +9,7 @@ use pageserver_api::{
reltag::RelTag,
};
use tokio::task::JoinHandle;
use tokio_postgres::CopyOutStream;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::{
@@ -17,22 +18,20 @@ use utils::{
};
pub struct Client {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
client: tokio_postgres::Client,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
pub struct RelTagBlockNo {
pub rel_tag: RelTag,
pub block_no: u32,
pub struct BasebackupRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>,
pub gzip: bool,
}
impl Client {
pub async fn new(
connstring: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<Self> {
pub async fn new(connstring: String) -> anyhow::Result<Self> {
let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
let conn_task_cancel = CancellationToken::new();
@@ -47,18 +46,68 @@ impl Client {
}
}
});
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = client
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
.await?;
Ok(Self {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
conn_task,
client,
})
}
pub async fn pagestream(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<PagestreamClient> {
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
.client
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
.await?;
let Client {
cancel_on_client_drop,
conn_task,
client: _,
} = self;
Ok(PagestreamClient {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop,
})
}
pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
let BasebackupRequest {
tenant_id,
timeline_id,
lsn,
gzip,
} = req;
let mut args = Vec::with_capacity(5);
args.push("basebackup".to_string());
args.push(format!("{tenant_id}"));
args.push(format!("{timeline_id}"));
if let Some(lsn) = lsn {
args.push(format!("{lsn}"));
}
if *gzip {
args.push(format!("--gzip"))
}
Ok(self.client.copy_out(&args.join(" ")).await?)
}
}
/// Create using [`Client::pagestream`].
pub struct PagestreamClient {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
pub struct RelTagBlockNo {
pub rel_tag: RelTag,
pub block_no: u32,
}
impl PagestreamClient {
pub async fn shutdown(mut self) {
let _ = self.cancel_on_client_drop.take();
self.conn_task.await.unwrap();