diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 5ef09312a5..2223f6fe45 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -154,7 +154,7 @@ impl std::str::FromStr for Key { mod tests { use std::str::FromStr; - use crate::repository::Key; + use crate::key::Key; #[test] fn display_fromstr_bijection() { diff --git a/pageserver/src/bin/getpage_bench_libpq.rs b/pageserver/src/bin/getpage_bench_libpq.rs index d3b4a3bd1f..39f74e7caa 100644 --- a/pageserver/src/bin/getpage_bench_libpq.rs +++ b/pageserver/src/bin/getpage_bench_libpq.rs @@ -1,10 +1,10 @@ use anyhow::Context; use clap::Parser; +use pageserver::client::page_service::RelTagBlockNo; use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block}; use pageserver::repository; -use pageserver_api::reltag::RelTag; use rand::prelude::*; use tokio::sync::Barrier; use tracing::info; @@ -18,22 +18,6 @@ use std::sync::Arc; use tokio::task::JoinHandle; -struct KeyRange { - start: i128, - end: i128, -} - -impl KeyRange { - fn len(&self) -> i128 { - self.end - self.start - } -} - -struct RelTagBlockNo { - rel_tag: RelTag, - block_no: u32, -} - #[derive(clap::Parser)] struct Args { #[clap(long, default_value = "http://localhost:9898")] @@ -73,7 +57,9 @@ async fn main() -> anyhow::Result<()> { let args: &'static Args = Box::leak(Box::new(Args::parse())); - let client = Arc::new(mgmt_api_client::Client::new(args.mgmt_api_endpoint.clone())); + let client = Arc::new(pageserver::client::mgmt_api::Client::new( + args.mgmt_api_endpoint.clone(), + )); let mut tenants: Vec = if let Some(tenants) = &args.tenants { tenants.clone() @@ -154,7 +140,7 @@ async fn main() -> anyhow::Result<()> { fn timeline( args: &'static Args, - mgmt_api_client: Arc, + mgmt_api_client: Arc, tenant_id: TenantId, timeline_id: TimelineId, start_work_barrier: Arc, @@ -163,6 +149,18 @@ fn timeline( async move { let partitioning = mgmt_api_client.keyspace(tenant_id, timeline_id).await?; let lsn = partitioning.at_lsn; + + struct KeyRange { + start: i128, + end: i128, + } + + impl KeyRange { + fn len(&self) -> i128 { + self.end - self.start + } + } + let ranges = partitioning .keys .ranges @@ -199,7 +197,7 @@ fn timeline( let task = tokio::spawn({ let stats = Arc::clone(&stats); async move { - let mut getpage_client = getpage_client::Client::new( + let mut getpage_client = pageserver::client::page_service::Client::new( args.page_service_connstring.clone(), tenant_id, timeline_id, @@ -239,163 +237,3 @@ fn timeline( Ok(()) } } - -mod mgmt_api_client { - use anyhow::Context; - - use hyper::{client::HttpConnector, Uri}; - use utils::id::{TenantId, TimelineId}; - - pub(crate) struct Client { - mgmt_api_endpoint: String, - pub(crate) client: hyper::Client, - } - - impl Client { - pub fn new(mgmt_api_endpoint: String) -> Self { - Self { - mgmt_api_endpoint, - client: hyper::client::Client::new(), - } - } - - pub async fn list_tenants( - &self, - ) -> anyhow::Result> { - let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?; - let resp = self.client.get(uri).await?; - if !resp.status().is_success() { - anyhow::bail!("status error"); - } - let body = hyper::body::to_bytes(resp).await?; - Ok(serde_json::from_slice(&body)?) - } - - pub async fn list_timelines( - &self, - tenant_id: TenantId, - ) -> anyhow::Result> { - let uri = Uri::try_from(format!( - "{}/v1/tenant/{tenant_id}/timeline", - self.mgmt_api_endpoint - ))?; - let resp = self.client.get(uri).await?; - if !resp.status().is_success() { - anyhow::bail!("status error"); - } - let body = hyper::body::to_bytes(resp).await?; - Ok(serde_json::from_slice(&body)?) - } - - pub async fn keyspace( - &self, - tenant_id: TenantId, - timeline_id: TimelineId, - ) -> anyhow::Result { - let uri = Uri::try_from(format!( - "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace?check_serialization_roundtrip=true", - self.mgmt_api_endpoint - ))?; - let resp = self.client.get(uri).await?; - if !resp.status().is_success() { - anyhow::bail!("status error"); - } - let body = hyper::body::to_bytes(resp).await?; - Ok(serde_json::from_slice(&body).context("deserialize")?) - } - } -} - -mod getpage_client { - use std::pin::Pin; - - use futures::SinkExt; - use pageserver_api::models::{ - PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, - PagestreamGetPageResponse, - }; - use tokio::task::JoinHandle; - use tokio_stream::StreamExt; - use tokio_util::sync::CancellationToken; - use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, - }; - - use crate::RelTagBlockNo; - - pub(crate) struct Client { - copy_both: Pin>>, - cancel_on_client_drop: Option, - conn_task: JoinHandle<()>, - } - - impl Client { - pub async fn new( - connstring: String, - tenant_id: TenantId, - timeline_id: TimelineId, - ) -> anyhow::Result { - let (client, connection) = - tokio_postgres::connect(&connstring, postgres::NoTls).await?; - - let conn_task_cancel = CancellationToken::new(); - let conn_task = tokio::spawn({ - let conn_task_cancel = conn_task_cancel.clone(); - async move { - tokio::select! { - _ = conn_task_cancel.cancelled() => { } - res = connection => { - res.unwrap(); - } - } - } - }); - - let copy_both: tokio_postgres::CopyBothDuplex = client - .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}")) - .await?; - - Ok(Self { - copy_both: Box::pin(copy_both), - conn_task, - cancel_on_client_drop: Some(conn_task_cancel.drop_guard()), - }) - } - - pub async fn shutdown(mut self) { - let _ = self.cancel_on_client_drop.take(); - self.conn_task.await.unwrap(); - } - - pub async fn getpage( - &mut self, - key: RelTagBlockNo, - lsn: Lsn, - ) -> anyhow::Result { - let req = PagestreamGetPageRequest { - latest: false, - rel: key.rel_tag, - blkno: key.block_no, - lsn, - }; - let req = PagestreamFeMessage::GetPage(req); - let req: bytes::Bytes = req.serialize(); - // let mut req = tokio_util::io::ReaderStream::new(&req); - let mut req = tokio_stream::once(Ok(req)); - - self.copy_both.send_all(&mut req).await?; - - let next: Option> = self.copy_both.next().await; - let next = next.unwrap().unwrap(); - - match PagestreamBeMessage::deserialize(next)? { - PagestreamBeMessage::Exists(_) => todo!(), - PagestreamBeMessage::Nblocks(_) => todo!(), - PagestreamBeMessage::GetPage(p) => Ok(p), - PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), - PagestreamBeMessage::DbSize(_) => todo!(), - } - } - } -} diff --git a/pageserver/src/client.rs b/pageserver/src/client.rs new file mode 100644 index 0000000000..4a3f4dea47 --- /dev/null +++ b/pageserver/src/client.rs @@ -0,0 +1,2 @@ +pub mod mgmt_api; +pub mod page_service; diff --git a/pageserver/src/client/mgmt_api.rs b/pageserver/src/client/mgmt_api.rs new file mode 100644 index 0000000000..9de0e533a8 --- /dev/null +++ b/pageserver/src/client/mgmt_api.rs @@ -0,0 +1,61 @@ +use anyhow::Context; + +use hyper::{client::HttpConnector, Uri}; +use utils::id::{TenantId, TimelineId}; + +pub struct Client { + mgmt_api_endpoint: String, + client: hyper::Client, +} + +impl Client { + pub fn new(mgmt_api_endpoint: String) -> Self { + Self { + mgmt_api_endpoint, + client: hyper::client::Client::new(), + } + } + + pub async fn list_tenants(&self) -> anyhow::Result> { + let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?; + let resp = self.client.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body)?) + } + + pub async fn list_timelines( + &self, + tenant_id: TenantId, + ) -> anyhow::Result> { + let uri = Uri::try_from(format!( + "{}/v1/tenant/{tenant_id}/timeline", + self.mgmt_api_endpoint + ))?; + let resp = self.client.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body)?) + } + + pub async fn keyspace( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result { + let uri = Uri::try_from(format!( + "{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace?check_serialization_roundtrip=true", + self.mgmt_api_endpoint + ))?; + let resp = self.client.get(uri).await?; + if !resp.status().is_success() { + anyhow::bail!("status error"); + } + let body = hyper::body::to_bytes(resp).await?; + Ok(serde_json::from_slice(&body).context("deserialize")?) + } +} diff --git a/pageserver/src/client/page_service.rs b/pageserver/src/client/page_service.rs new file mode 100644 index 0000000000..6f32926380 --- /dev/null +++ b/pageserver/src/client/page_service.rs @@ -0,0 +1,92 @@ +use std::pin::Pin; + +use futures::SinkExt; +use pageserver_api::{models::{ + PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, +}, reltag::RelTag}; +use tokio::task::JoinHandle; +use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; +use utils::{ + id::{TenantId, TimelineId}, + lsn::Lsn, +}; + +pub struct Client { + copy_both: Pin>>, + cancel_on_client_drop: Option, + conn_task: JoinHandle<()>, +} + +pub struct RelTagBlockNo { + pub rel_tag: RelTag, + pub block_no: u32, +} + +impl Client { + pub async fn new( + connstring: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> anyhow::Result { + let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?; + + let conn_task_cancel = CancellationToken::new(); + let conn_task = tokio::spawn({ + let conn_task_cancel = conn_task_cancel.clone(); + async move { + tokio::select! { + _ = conn_task_cancel.cancelled() => { } + res = connection => { + res.unwrap(); + } + } + } + }); + + let copy_both: tokio_postgres::CopyBothDuplex = client + .copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}")) + .await?; + + Ok(Self { + copy_both: Box::pin(copy_both), + conn_task, + cancel_on_client_drop: Some(conn_task_cancel.drop_guard()), + }) + } + + pub async fn shutdown(mut self) { + let _ = self.cancel_on_client_drop.take(); + self.conn_task.await.unwrap(); + } + + pub async fn getpage( + &mut self, + key: RelTagBlockNo, + lsn: Lsn, + ) -> anyhow::Result { + let req = PagestreamGetPageRequest { + latest: false, + rel: key.rel_tag, + blkno: key.block_no, + lsn, + }; + let req = PagestreamFeMessage::GetPage(req); + let req: bytes::Bytes = req.serialize(); + // let mut req = tokio_util::io::ReaderStream::new(&req); + let mut req = tokio_stream::once(Ok(req)); + + self.copy_both.send_all(&mut req).await?; + + let next: Option> = self.copy_both.next().await; + let next = next.unwrap().unwrap(); + + match PagestreamBeMessage::deserialize(next)? { + PagestreamBeMessage::Exists(_) => todo!(), + PagestreamBeMessage::Nblocks(_) => todo!(), + PagestreamBeMessage::GetPage(p) => Ok(p), + PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), + PagestreamBeMessage::DbSize(_) => todo!(), + } + } +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3f74694ef2..88ad9d4d95 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -26,6 +26,7 @@ pub mod walrecord; pub mod walredo; pub mod failpoint_support; +pub mod client; use crate::task_mgr::TaskKind; use camino::Utf8Path;