pagebench: add basebackup gRPC support (#12250)

## Problem

Pagebench does not support gRPC for `basebackup` benchmarks.

Requires #12243.
Touches #11728.

## Summary of changes

Add gRPC support via gRPC connstrings, e.g. `pagebench basebackup
--page-service-connstring grpc://localhost:51051`.

Also change `--gzip-probability` to `--no-compression`, since this must
be specified per-client for gRPC.
This commit is contained in:
Erik Grinaker
2025-06-19 17:40:57 +02:00
committed by GitHub
parent a6d4de25cd
commit dc1625cd8e
2 changed files with 131 additions and 50 deletions

View File

@@ -121,7 +121,7 @@ impl Client {
pub async fn get_base_backup(
&mut self,
req: model::GetBaseBackupRequest,
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>>, tonic::Status> {
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>> + 'static, tonic::Status> {
let proto_req = proto::GetBaseBackupRequest::from(req);
let response_stream: Streaming<proto::GetBaseBackupResponseChunk> =

View File

@@ -1,20 +1,29 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use anyhow::Context;
use anyhow::anyhow;
use futures::TryStreamExt as _;
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
use pageserver_client::page_service::BasebackupRequest;
use pageserver_page_api as page_api;
use rand::prelude::*;
use reqwest::Url;
use tokio::io::AsyncRead;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt as _};
use tokio_util::io::StreamReader;
use tonic::async_trait;
use tracing::{info, instrument};
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
@@ -24,14 +33,15 @@ use crate::util::{request_stats, tokio_thread_local_stats};
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
/// The Pageserver to connect to. Use postgresql:// for libpq, or grpc:// for gRPC.
#[clap(long, default_value = "postgresql://postgres@localhost:64000")]
page_service_connstring: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
#[clap(long, default_value = "1.0")]
gzip_probability: f64,
#[clap(long)]
no_compression: bool,
#[clap(long)]
runtime: Option<humantime::Duration>,
#[clap(long)]
@@ -146,12 +156,23 @@ async fn main_impl(
let mut work_senders = HashMap::new();
let mut tasks = Vec::new();
for tl in &timelines {
let connurl = Url::parse(&args.page_service_connstring)?;
for &tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
tasks.push(tokio::spawn(client(
args,
*tl,
let client: Box<dyn Client> = match connurl.scheme() {
"postgresql" | "postgres" => Box::new(
LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?,
),
"grpc" => Box::new(
GrpcClient::new(&args.page_service_connstring, tl, !args.no_compression).await?,
),
scheme => return Err(anyhow!("invalid scheme {scheme}")),
};
tasks.push(tokio::spawn(run_worker(
client,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
@@ -166,13 +187,7 @@ async fn main_impl(
let mut rng = rand::thread_rng();
let target = all_targets.choose(&mut rng).unwrap();
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
(
target.timeline,
Work {
lsn,
gzip: rng.gen_bool(args.gzip_probability),
},
)
(target.timeline, Work { lsn })
};
let sender = work_senders.get(&timeline).unwrap();
// TODO: what if this blocks?
@@ -216,13 +231,11 @@ async fn main_impl(
#[derive(Copy, Clone)]
struct Work {
lsn: Option<Lsn>,
gzip: bool,
}
#[instrument(skip_all)]
async fn client(
args: &'static Args,
timeline: TenantTimelineId,
async fn run_worker(
mut client: Box<dyn Client>,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<Work>,
all_work_done_barrier: Arc<Barrier>,
@@ -230,37 +243,14 @@ async fn client(
) {
start_work_barrier.wait().await;
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
while let Some(Work { lsn, gzip }) = work.recv().await {
while let Some(Work { lsn }) = work.recv().await {
let start = Instant::now();
let copy_out_stream = client
.basebackup(&BasebackupRequest {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
lsn,
gzip,
})
.await
.with_context(|| format!("start basebackup for {timeline}"))
.unwrap();
let stream = client.basebackup(lsn).await.unwrap();
use futures::StreamExt;
let size = Arc::new(AtomicUsize::new(0));
copy_out_stream
.for_each({
|r| {
let size = Arc::clone(&size);
async move {
let size = Arc::clone(&size);
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
}
}
})
.await;
info!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
let size = futures::io::copy(stream.compat(), &mut tokio::io::sink().compat_write())
.await
.unwrap();
info!("basebackup size is {size} bytes");
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
@@ -270,3 +260,94 @@ async fn client(
all_work_done_barrier.wait().await;
}
/// A basebackup client. This allows switching out the client protocol implementation.
#[async_trait]
trait Client: Send {
async fn basebackup(
&mut self,
lsn: Option<Lsn>,
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send>>>;
}
/// A libpq-based Pageserver client.
struct LibpqClient {
inner: pageserver_client::page_service::Client,
ttid: TenantTimelineId,
compression: bool,
}
impl LibpqClient {
async fn new(
connstring: &str,
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
Ok(Self {
inner: pageserver_client::page_service::Client::new(connstring.to_string()).await?,
ttid,
compression,
})
}
}
#[async_trait]
impl Client for LibpqClient {
async fn basebackup(
&mut self,
lsn: Option<Lsn>,
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send + 'static>>> {
let req = BasebackupRequest {
tenant_id: self.ttid.tenant_id,
timeline_id: self.ttid.timeline_id,
lsn,
gzip: self.compression,
};
let stream = self.inner.basebackup(&req).await?;
Ok(Box::pin(StreamReader::new(
stream.map_err(std::io::Error::other),
)))
}
}
/// A gRPC Pageserver client.
struct GrpcClient {
inner: page_api::Client,
}
impl GrpcClient {
async fn new(
connstring: &str,
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let inner = page_api::Client::new(
connstring.to_string(),
ttid.tenant_id,
ttid.timeline_id,
ShardIndex::unsharded(),
None,
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
)
.await?;
Ok(Self { inner })
}
}
#[async_trait]
impl Client for GrpcClient {
async fn basebackup(
&mut self,
lsn: Option<Lsn>,
) -> anyhow::Result<Pin<Box<dyn AsyncRead + Send + 'static>>> {
let req = page_api::GetBaseBackupRequest {
lsn,
replica: false,
full: false,
};
let stream = self.inner.get_base_backup(req).await?;
Ok(Box::pin(StreamReader::new(
stream.map_err(std::io::Error::other),
)))
}
}