From 4fc3596677648f548cc4e8ac65a3fcbc746c090b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 5 Dec 2023 18:45:20 +0000 Subject: [PATCH] client & getpage bench: distinguish between page_service client and client in pagestream mode --- .../pagebench/src/getpage_latest_lsn.rs | 15 ++-- pageserver/src/client/page_service.rs | 81 +++++++++++++++---- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/pageserver/pagebench/src/getpage_latest_lsn.rs b/pageserver/pagebench/src/getpage_latest_lsn.rs index 02c094351f..ec0e345a19 100644 --- a/pageserver/pagebench/src/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/getpage_latest_lsn.rs @@ -376,13 +376,14 @@ async fn client( ) { start_work_barrier.wait().await; - let mut client = pageserver::client::page_service::Client::new( - args.page_service_connstring.clone(), - timeline.tenant_id, - timeline.timeline_id, - ) - .await - .unwrap(); + let client = + pageserver::client::page_service::Client::new(args.page_service_connstring.clone()) + .await + .unwrap(); + let mut client = client + .pagestream(timeline.tenant_id, timeline.timeline_id) + .await + .unwrap(); while let Some((key, lsn)) = work.recv().await { let start = Instant::now(); diff --git a/pageserver/src/client/page_service.rs b/pageserver/src/client/page_service.rs index 6d6821a798..7be09bee0d 100644 --- a/pageserver/src/client/page_service.rs +++ b/pageserver/src/client/page_service.rs @@ -9,6 +9,7 @@ use pageserver_api::{ reltag::RelTag, }; use tokio::task::JoinHandle; +use tokio_postgres::CopyOutStream; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use utils::{ @@ -17,22 +18,20 @@ use utils::{ }; pub struct Client { - copy_both: Pin>>, + client: tokio_postgres::Client, cancel_on_client_drop: Option, conn_task: JoinHandle<()>, } -pub struct RelTagBlockNo { - pub rel_tag: RelTag, - pub block_no: u32, +pub struct BasebackupRequest { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub lsn: Option, + pub gzip: bool, } impl Client { - pub async fn new( - connstring: String, - tenant_id: TenantId, - timeline_id: TimelineId, - ) -> anyhow::Result { + pub async fn new(connstring: String) -> anyhow::Result { let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?; let conn_task_cancel = CancellationToken::new(); @@ -47,18 +46,68 @@ impl Client { } } }); - - let copy_both: tokio_postgres::CopyBothDuplex = client - .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}")) - .await?; - Ok(Self { - copy_both: Box::pin(copy_both), - conn_task, cancel_on_client_drop: Some(conn_task_cancel.drop_guard()), + conn_task, + client, }) } + pub async fn pagestream( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result { + let copy_both: tokio_postgres::CopyBothDuplex = self + .client + .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}")) + .await?; + let Client { + cancel_on_client_drop, + conn_task, + client: _, + } = self; + Ok(PagestreamClient { + copy_both: Box::pin(copy_both), + conn_task, + cancel_on_client_drop, + }) + } + + pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result { + let BasebackupRequest { + tenant_id, + timeline_id, + lsn, + gzip, + } = req; + let mut args = Vec::with_capacity(5); + args.push("basebackup".to_string()); + args.push(format!("{tenant_id}")); + args.push(format!("{timeline_id}")); + if let Some(lsn) = lsn { + args.push(format!("{lsn}")); + } + if *gzip { + args.push(format!("--gzip")) + } + Ok(self.client.copy_out(&args.join(" ")).await?) + } +} + +/// Create using [`Client::pagestream`]. +pub struct PagestreamClient { + copy_both: Pin>>, + cancel_on_client_drop: Option, + conn_task: JoinHandle<()>, +} + +pub struct RelTagBlockNo { + pub rel_tag: RelTag, + pub block_no: u32, +} + +impl PagestreamClient { pub async fn shutdown(mut self) { let _ = self.cancel_on_client_drop.take(); self.conn_task.await.unwrap();