diff --git a/Cargo.lock b/Cargo.lock index 1b6b423444..49e0416169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3139,6 +3139,7 @@ dependencies = [ "tokio", "tokio-io-timeout", "tokio-postgres", + "tokio-stream", "tokio-tar", "tokio-util", "toml_edit", diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a3029e67a5..075b0644ff 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -765,6 +765,36 @@ impl PagestreamBeMessage { bytes.into() } + + pub fn deserialize(buf: Bytes) -> anyhow::Result { + let mut buf = buf.reader(); + let msg_tag = buf.read_u8()?; + match msg_tag { + 100 => todo!(), + 101 => todo!(), + 102 => { + let buf = buf.get_ref(); + /* TODO use constant */ + if buf.len() == 8192 { + Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { + page: buf.clone(), + })) + } else { + anyhow::bail!("invalid page size: {}", buf.len()); + } + } + 103 => { + let buf = buf.get_ref(); + let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?; + let rust_str = cstr.to_str()?; + Ok(PagestreamBeMessage::Error(PagestreamErrorResponse { + message: rust_str.to_owned(), + })) + } + 104 => todo!(), + _ => bail!("unknown tag: {:?}", msg_tag), + } + } } #[cfg(test)] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 35c260740c..e0cbff3123 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -62,6 +62,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] } tokio-io-timeout.workspace = true tokio-postgres.workspace = true +tokio-stream.workspace = true tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } tracing.workspace = true diff --git a/pageserver/src/client/page_service.rs b/pageserver/src/client/page_service.rs new file mode 100644 index 0000000000..7be09bee0d --- /dev/null +++ b/pageserver/src/client/page_service.rs @@ -0,0 +1,145 @@ +use std::pin::Pin; + +use futures::SinkExt; +use pageserver_api::{ + models::{ + PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, + PagestreamGetPageResponse, + }, + reltag::RelTag, +}; +use tokio::task::JoinHandle; +use tokio_postgres::CopyOutStream; +use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; +use utils::{ + id::{TenantId, TimelineId}, + lsn::Lsn, +}; + +pub struct Client { + client: tokio_postgres::Client, + cancel_on_client_drop: Option, + conn_task: JoinHandle<()>, +} + +pub struct BasebackupRequest { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub lsn: Option, + pub gzip: bool, +} + +impl Client { + pub async fn new(connstring: String) -> anyhow::Result { + let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?; + + let conn_task_cancel = CancellationToken::new(); + let conn_task = tokio::spawn({ + let conn_task_cancel = conn_task_cancel.clone(); + async move { + tokio::select! { + _ = conn_task_cancel.cancelled() => { } + res = connection => { + res.unwrap(); + } + } + } + }); + Ok(Self { + cancel_on_client_drop: Some(conn_task_cancel.drop_guard()), + conn_task, + client, + }) + } + + pub async fn pagestream( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result { + let copy_both: tokio_postgres::CopyBothDuplex = self + .client + .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}")) + .await?; + let Client { + cancel_on_client_drop, + conn_task, + client: _, + } = self; + Ok(PagestreamClient { + copy_both: Box::pin(copy_both), + conn_task, + cancel_on_client_drop, + }) + } + + pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result { + let BasebackupRequest { + tenant_id, + timeline_id, + lsn, + gzip, + } = req; + let mut args = Vec::with_capacity(5); + args.push("basebackup".to_string()); + args.push(format!("{tenant_id}")); + args.push(format!("{timeline_id}")); + if let Some(lsn) = lsn { + args.push(format!("{lsn}")); + } + if *gzip { + args.push(format!("--gzip")) + } + Ok(self.client.copy_out(&args.join(" ")).await?) + } +} + +/// Create using [`Client::pagestream`]. +pub struct PagestreamClient { + copy_both: Pin>>, + cancel_on_client_drop: Option, + conn_task: JoinHandle<()>, +} + +pub struct RelTagBlockNo { + pub rel_tag: RelTag, + pub block_no: u32, +} + +impl PagestreamClient { + pub async fn shutdown(mut self) { + let _ = self.cancel_on_client_drop.take(); + self.conn_task.await.unwrap(); + } + + pub async fn getpage( + &mut self, + key: RelTagBlockNo, + lsn: Lsn, + ) -> anyhow::Result { + let req = PagestreamGetPageRequest { + latest: false, + rel: key.rel_tag, + blkno: key.block_no, + lsn, + }; + let req = PagestreamFeMessage::GetPage(req); + let req: bytes::Bytes = req.serialize(); + // let mut req = tokio_util::io::ReaderStream::new(&req); + let mut req = tokio_stream::once(Ok(req)); + + self.copy_both.send_all(&mut req).await?; + + let next: Option> = self.copy_both.next().await; + let next = next.unwrap().unwrap(); + + match PagestreamBeMessage::deserialize(next)? { + PagestreamBeMessage::Exists(_) => todo!(), + PagestreamBeMessage::Nblocks(_) => todo!(), + PagestreamBeMessage::GetPage(p) => Ok(p), + PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), + PagestreamBeMessage::DbSize(_) => todo!(), + } + } +}