From 1f9a7d1cd0a94a7c539c4fc9ff194d4fdf2917c8 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 18 Dec 2023 19:17:19 +0100 Subject: [PATCH] add a Rust client for Pageserver page_service (#6128) Part of getpage@lsn benchmark epic: https://github.com/neondatabase/neon/issues/5771 Stacked atop https://github.com/neondatabase/neon/pull/6145 --- Cargo.lock | 9 ++ libs/pageserver_api/src/models.rs | 93 +++++++++++++++- pageserver/Cargo.toml | 1 + pageserver/client/Cargo.toml | 8 ++ pageserver/client/src/lib.rs | 1 + pageserver/client/src/page_service.rs | 151 ++++++++++++++++++++++++++ 6 files changed, 257 insertions(+), 6 deletions(-) create mode 100644 pageserver/client/src/page_service.rs diff --git a/Cargo.lock b/Cargo.lock index f931fd6c29..9a367effbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3145,6 +3145,7 @@ dependencies = [ "tokio", "tokio-io-timeout", "tokio-postgres", + "tokio-stream", "tokio-tar", "tokio-util", "toml_edit", @@ -3182,11 +3183,19 @@ dependencies = [ name = "pageserver_client" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", + "bytes", + "futures", "pageserver_api", + "postgres", "reqwest", "serde", "thiserror", + "tokio", + "tokio-postgres", + "tokio-stream", + "tokio-util", "utils", "workspace_hack", ] diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a78ba8ad94..0f5e202249 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -2,6 +2,7 @@ pub mod partitioning; use std::{ collections::HashMap, + io::Read, num::{NonZeroU64, NonZeroUsize}, time::SystemTime, }; @@ -19,7 +20,7 @@ use utils::{ use crate::{reltag::RelTag, shard::TenantShardId}; use anyhow::bail; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; /// The state of a tenant in this pageserver. /// @@ -576,6 +577,7 @@ pub enum PagestreamFeMessage { } // Wrapped in libpq CopyData +#[derive(strum_macros::EnumProperty)] pub enum PagestreamBeMessage { Exists(PagestreamExistsResponse), Nblocks(PagestreamNblocksResponse), @@ -584,6 +586,29 @@ pub enum PagestreamBeMessage { DbSize(PagestreamDbSizeResponse), } +// Keep in sync with `pagestore_client.h` +#[repr(u8)] +enum PagestreamBeMessageTag { + Exists = 100, + Nblocks = 101, + GetPage = 102, + Error = 103, + DbSize = 104, +} +impl TryFrom for PagestreamBeMessageTag { + type Error = u8; + fn try_from(value: u8) -> Result { + match value { + 100 => Ok(PagestreamBeMessageTag::Exists), + 101 => Ok(PagestreamBeMessageTag::Nblocks), + 102 => Ok(PagestreamBeMessageTag::GetPage), + 103 => Ok(PagestreamBeMessageTag::Error), + 104 => Ok(PagestreamBeMessageTag::DbSize), + _ => Err(value), + } + } +} + #[derive(Debug, PartialEq, Eq)] pub struct PagestreamExistsRequest { pub latest: bool, @@ -739,35 +764,91 @@ impl PagestreamBeMessage { pub fn serialize(&self) -> Bytes { let mut bytes = BytesMut::new(); + use PagestreamBeMessageTag as Tag; match self { Self::Exists(resp) => { - bytes.put_u8(100); /* tag from pagestore_client.h */ + bytes.put_u8(Tag::Exists as u8); bytes.put_u8(resp.exists as u8); } Self::Nblocks(resp) => { - bytes.put_u8(101); /* tag from pagestore_client.h */ + bytes.put_u8(Tag::Nblocks as u8); bytes.put_u32(resp.n_blocks); } Self::GetPage(resp) => { - bytes.put_u8(102); /* tag from pagestore_client.h */ + bytes.put_u8(Tag::GetPage as u8); bytes.put(&resp.page[..]); } Self::Error(resp) => { - bytes.put_u8(103); /* tag from pagestore_client.h */ + bytes.put_u8(Tag::Error as u8); bytes.put(resp.message.as_bytes()); bytes.put_u8(0); // null terminator } Self::DbSize(resp) => { - bytes.put_u8(104); /* tag from pagestore_client.h */ + bytes.put_u8(Tag::DbSize as u8); bytes.put_i64(resp.db_size); } } bytes.into() } + + pub fn deserialize(buf: Bytes) -> anyhow::Result { + let mut buf = buf.reader(); + let msg_tag = buf.read_u8()?; + + use PagestreamBeMessageTag as Tag; + let ok = + match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? { + Tag::Exists => { + let exists = buf.read_u8()?; + Self::Exists(PagestreamExistsResponse { + exists: exists != 0, + }) + } + Tag::Nblocks => { + let n_blocks = buf.read_u32::()?; + Self::Nblocks(PagestreamNblocksResponse { n_blocks }) + } + Tag::GetPage => { + let mut page = vec![0; 8192]; // TODO: use MaybeUninit + buf.read_exact(&mut page)?; + PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() }) + } + Tag::Error => { + let buf = buf.get_ref(); + let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?; + let rust_str = cstr.to_str()?; + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: rust_str.to_owned(), + }) + } + Tag::DbSize => { + let db_size = buf.read_i64::()?; + Self::DbSize(PagestreamDbSizeResponse { db_size }) + } + }; + let remaining = buf.into_inner(); + if !remaining.is_empty() { + anyhow::bail!( + "remaining bytes in msg with tag={msg_tag}: {}", + remaining.len() + ); + } + Ok(ok) + } + + pub fn kind(&self) -> &'static str { + match self { + Self::Exists(_) => "Exists", + Self::Nblocks(_) => "Nblocks", + Self::GetPage(_) => "GetPage", + Self::Error(_) => "Error", + Self::DbSize(_) => "DbSize", + } + } } #[cfg(test)] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 9e8172c6a1..980fbab22e 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -63,6 +63,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] } tokio-io-timeout.workspace = true tokio-postgres.workspace = true +tokio-stream.workspace = true tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } tracing.workspace = true diff --git a/pageserver/client/Cargo.toml b/pageserver/client/Cargo.toml index 4bd36185a6..0ed27602cd 100644 --- a/pageserver/client/Cargo.toml +++ b/pageserver/client/Cargo.toml @@ -12,3 +12,11 @@ reqwest.workspace = true utils.workspace = true serde.workspace = true workspace_hack = { version = "0.1", path = "../../workspace_hack" } +tokio-postgres.workspace = true +tokio-stream.workspace = true +tokio.workspace = true +futures.workspace = true +tokio-util.workspace = true +anyhow.workspace = true +postgres.workspace = true +bytes.workspace = true diff --git a/pageserver/client/src/lib.rs b/pageserver/client/src/lib.rs index 3963fd466c..4a3f4dea47 100644 --- a/pageserver/client/src/lib.rs +++ b/pageserver/client/src/lib.rs @@ -1 +1,2 @@ pub mod mgmt_api; +pub mod page_service; diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs new file mode 100644 index 0000000000..fc0d2311f7 --- /dev/null +++ b/pageserver/client/src/page_service.rs @@ -0,0 +1,151 @@ +use std::pin::Pin; + +use futures::SinkExt; +use pageserver_api::{ + models::{ + PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, + PagestreamGetPageResponse, + }, + reltag::RelTag, +}; +use tokio::task::JoinHandle; +use tokio_postgres::CopyOutStream; +use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; +use utils::{ + id::{TenantId, TimelineId}, + lsn::Lsn, +}; + +pub struct Client { + client: tokio_postgres::Client, + cancel_on_client_drop: Option, + conn_task: JoinHandle<()>, +} + +pub struct BasebackupRequest { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub lsn: Option, + pub gzip: bool, +} + +impl Client { + pub async fn new(connstring: String) -> anyhow::Result { + let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?; + + let conn_task_cancel = CancellationToken::new(); + let conn_task = tokio::spawn({ + let conn_task_cancel = conn_task_cancel.clone(); + async move { + tokio::select! { + _ = conn_task_cancel.cancelled() => { } + res = connection => { + res.unwrap(); + } + } + } + }); + Ok(Self { + cancel_on_client_drop: Some(conn_task_cancel.drop_guard()), + conn_task, + client, + }) + } + + pub async fn pagestream( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result { + let copy_both: tokio_postgres::CopyBothDuplex = self + .client + .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}")) + .await?; + let Client { + cancel_on_client_drop, + conn_task, + client: _, + } = self; + Ok(PagestreamClient { + copy_both: Box::pin(copy_both), + conn_task, + cancel_on_client_drop, + }) + } + + pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result { + let BasebackupRequest { + tenant_id, + timeline_id, + lsn, + gzip, + } = req; + let mut args = Vec::with_capacity(5); + args.push("basebackup".to_string()); + args.push(format!("{tenant_id}")); + args.push(format!("{timeline_id}")); + if let Some(lsn) = lsn { + args.push(format!("{lsn}")); + } + if *gzip { + args.push("--gzip".to_string()) + } + Ok(self.client.copy_out(&args.join(" ")).await?) + } +} + +/// Create using [`Client::pagestream`]. +pub struct PagestreamClient { + copy_both: Pin>>, + cancel_on_client_drop: Option, + conn_task: JoinHandle<()>, +} + +pub struct RelTagBlockNo { + pub rel_tag: RelTag, + pub block_no: u32, +} + +impl PagestreamClient { + pub async fn shutdown(mut self) { + let _ = self.cancel_on_client_drop.take(); + self.conn_task.await.unwrap(); + } + + pub async fn getpage( + &mut self, + key: RelTagBlockNo, + lsn: Lsn, + ) -> anyhow::Result { + let req = PagestreamGetPageRequest { + latest: false, + rel: key.rel_tag, + blkno: key.block_no, + lsn, + }; + let req = PagestreamFeMessage::GetPage(req); + let req: bytes::Bytes = req.serialize(); + // let mut req = tokio_util::io::ReaderStream::new(&req); + let mut req = tokio_stream::once(Ok(req)); + + self.copy_both.send_all(&mut req).await?; + + let next: Option> = self.copy_both.next().await; + let next: bytes::Bytes = next.unwrap()?; + + let msg = PagestreamBeMessage::deserialize(next)?; + match msg { + PagestreamBeMessage::GetPage(p) => Ok(p), + PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), + PagestreamBeMessage::Exists(_) + | PagestreamBeMessage::Nblocks(_) + | PagestreamBeMessage::DbSize(_) => { + anyhow::bail!( + "unexpected be message kind in response to getpage request: {}", + msg.kind() + ) + } + } + } +}