diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 274f036f3d..aa4774c056 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -121,7 +121,7 @@ impl Client { pub async fn get_base_backup( &mut self, req: model::GetBaseBackupRequest, - ) -> Result>, tonic::Status> { + ) -> Result> + 'static, tonic::Status> { let proto_req = proto::GetBaseBackupRequest::from(req); let response_stream: Streaming = diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 43ad92980c..8015db528d 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -1,20 +1,29 @@ use std::collections::HashMap; use std::num::NonZeroUsize; use std::ops::Range; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; -use anyhow::Context; +use anyhow::anyhow; +use futures::TryStreamExt as _; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ForceAwaitLogicalSize; use pageserver_client::page_service::BasebackupRequest; +use pageserver_page_api as page_api; use rand::prelude::*; +use reqwest::Url; +use tokio::io::AsyncRead; use tokio::sync::Barrier; use tokio::task::JoinSet; +use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _}; +use tokio_util::io::StreamReader; +use tonic::async_trait; use tracing::{info, instrument}; use utils::id::TenantTimelineId; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; @@ -24,14 +33,15 @@ use crate::util::{request_stats, tokio_thread_local_stats}; pub(crate) struct Args { #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, - #[clap(long, default_value = "postgres://postgres@localhost:64000")] + /// The Pageserver to connect to. Use postgresql:// for libpq, or grpc:// for gRPC. + #[clap(long, default_value = "postgresql://postgres@localhost:64000")] page_service_connstring: String, #[clap(long)] pageserver_jwt: Option, #[clap(long, default_value = "1")] num_clients: NonZeroUsize, - #[clap(long, default_value = "1.0")] - gzip_probability: f64, + #[clap(long)] + no_compression: bool, #[clap(long)] runtime: Option, #[clap(long)] @@ -146,12 +156,23 @@ async fn main_impl( let mut work_senders = HashMap::new(); let mut tasks = Vec::new(); - for tl in &timelines { + let connurl = Url::parse(&args.page_service_connstring)?; + for &tl in &timelines { let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are work_senders.insert(tl, sender); - tasks.push(tokio::spawn(client( - args, - *tl, + + let client: Box = match connurl.scheme() { + "postgresql" | "postgres" => Box::new( + LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?, + ), + "grpc" => Box::new( + GrpcClient::new(&args.page_service_connstring, tl, !args.no_compression).await?, + ), + scheme => return Err(anyhow!("invalid scheme {scheme}")), + }; + + tasks.push(tokio::spawn(run_worker( + client, Arc::clone(&start_work_barrier), receiver, Arc::clone(&all_work_done_barrier), @@ -166,13 +187,7 @@ async fn main_impl( let mut rng = rand::thread_rng(); let target = all_targets.choose(&mut rng).unwrap(); let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r)); - ( - target.timeline, - Work { - lsn, - gzip: rng.gen_bool(args.gzip_probability), - }, - ) + (target.timeline, Work { lsn }) }; let sender = work_senders.get(&timeline).unwrap(); // TODO: what if this blocks? @@ -216,13 +231,11 @@ async fn main_impl( #[derive(Copy, Clone)] struct Work { lsn: Option, - gzip: bool, } #[instrument(skip_all)] -async fn client( - args: &'static Args, - timeline: TenantTimelineId, +async fn run_worker( + mut client: Box, start_work_barrier: Arc, mut work: tokio::sync::mpsc::Receiver, all_work_done_barrier: Arc, @@ -230,37 +243,14 @@ async fn client( ) { start_work_barrier.wait().await; - let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone()) - .await - .unwrap(); - - while let Some(Work { lsn, gzip }) = work.recv().await { + while let Some(Work { lsn }) = work.recv().await { let start = Instant::now(); - let copy_out_stream = client - .basebackup(&BasebackupRequest { - tenant_id: timeline.tenant_id, - timeline_id: timeline.timeline_id, - lsn, - gzip, - }) - .await - .with_context(|| format!("start basebackup for {timeline}")) - .unwrap(); + let stream = client.basebackup(lsn).await.unwrap(); - use futures::StreamExt; - let size = Arc::new(AtomicUsize::new(0)); - copy_out_stream - .for_each({ - |r| { - let size = Arc::clone(&size); - async move { - let size = Arc::clone(&size); - size.fetch_add(r.unwrap().len(), Ordering::Relaxed); - } - } - }) - .await; - info!("basebackup size is {} bytes", size.load(Ordering::Relaxed)); + let size = futures::io::copy(stream.compat(), &mut tokio::io::sink().compat_write()) + .await + .unwrap(); + info!("basebackup size is {size} bytes"); let elapsed = start.elapsed(); live_stats.inc(); STATS.with(|stats| { @@ -270,3 +260,94 @@ async fn client( all_work_done_barrier.wait().await; } + +/// A basebackup client. This allows switching out the client protocol implementation. +#[async_trait] +trait Client: Send { + async fn basebackup( + &mut self, + lsn: Option, + ) -> anyhow::Result>>; +} + +/// A libpq-based Pageserver client. +struct LibpqClient { + inner: pageserver_client::page_service::Client, + ttid: TenantTimelineId, + compression: bool, +} + +impl LibpqClient { + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + Ok(Self { + inner: pageserver_client::page_service::Client::new(connstring.to_string()).await?, + ttid, + compression, + }) + } +} + +#[async_trait] +impl Client for LibpqClient { + async fn basebackup( + &mut self, + lsn: Option, + ) -> anyhow::Result>> { + let req = BasebackupRequest { + tenant_id: self.ttid.tenant_id, + timeline_id: self.ttid.timeline_id, + lsn, + gzip: self.compression, + }; + let stream = self.inner.basebackup(&req).await?; + Ok(Box::pin(StreamReader::new( + stream.map_err(std::io::Error::other), + ))) + } +} + +/// A gRPC Pageserver client. +struct GrpcClient { + inner: page_api::Client, +} + +impl GrpcClient { + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + let inner = page_api::Client::new( + connstring.to_string(), + ttid.tenant_id, + ttid.timeline_id, + ShardIndex::unsharded(), + None, + compression.then_some(tonic::codec::CompressionEncoding::Zstd), + ) + .await?; + Ok(Self { inner }) + } +} + +#[async_trait] +impl Client for GrpcClient { + async fn basebackup( + &mut self, + lsn: Option, + ) -> anyhow::Result>> { + let req = page_api::GetBaseBackupRequest { + lsn, + replica: false, + full: false, + }; + let stream = self.inner.get_base_backup(req).await?; + Ok(Box::pin(StreamReader::new( + stream.map_err(std::io::Error::other), + ))) + } +}