mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
pagebench: add gRPC support for get-page-latest-lsn (#12077)
## Problem We need gRPC support in Pagebench to benchmark the new gRPC Pageserver implementation. Touches #11728. ## Summary of changes Adds a `Client` trait to make the client transport swappable, and a gRPC client via a `--protocol grpc` parameter. This must also specify the connstring with the gRPC port: ``` pagebench get-page-latest-lsn --protocol grpc --page-service-connstring grpc://localhost:51051 ``` The client is implemented using the raw Tonic-generated gRPC client, to minimize client overhead.
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -4236,6 +4236,7 @@ name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"camino",
|
||||
"clap",
|
||||
"futures",
|
||||
@@ -4244,12 +4245,15 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_page_api",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
|
||||
@@ -2045,7 +2045,7 @@ pub enum PagestreamProtocolVersion {
|
||||
|
||||
pub type RequestId = u64;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamRequest {
|
||||
pub reqid: RequestId,
|
||||
pub request_lsn: Lsn,
|
||||
@@ -2064,7 +2064,7 @@ pub struct PagestreamNblocksRequest {
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
|
||||
@@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize};
|
||||
// FIXME: should move 'forknum' as last field to keep this consistent with Postgres.
|
||||
// Then we could replace the custom Ord and PartialOrd implementations below with
|
||||
// deriving them. This will require changes in walredoproc.c.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
pub spcnode: Oid,
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -15,14 +16,17 @@ hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace=true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
pageserver_client.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
utils = { path = "../../libs/utils/" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
@@ -7,11 +7,15 @@ use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
|
||||
use pageserver_api::models::{
|
||||
PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api::proto;
|
||||
use rand::prelude::*;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -22,6 +26,12 @@ use utils::lsn::Lsn;
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
#[derive(clap::ValueEnum, Clone, Debug)]
|
||||
enum Protocol {
|
||||
Libpq,
|
||||
Grpc,
|
||||
}
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
@@ -35,6 +45,8 @@ pub(crate) struct Args {
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long, value_enum, default_value = "libpq")]
|
||||
protocol: Protocol,
|
||||
/// Each client sends requests at the given rate.
|
||||
///
|
||||
/// If a request takes too long and we should be issuing a new request already,
|
||||
@@ -303,7 +315,20 @@ async fn main_impl(
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
let client: Box<dyn Client> = match args.protocol {
|
||||
Protocol::Libpq => Box::new(
|
||||
LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
|
||||
Protocol::Grpc => Box::new(
|
||||
GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
};
|
||||
run_worker(args, client, ss, cancel, rps_period, ranges, weights).await
|
||||
})
|
||||
};
|
||||
|
||||
@@ -355,23 +380,15 @@ async fn main_impl(
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
async fn client_libpq(
|
||||
async fn run_worker(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
mut client: Box<dyn Client>,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
@@ -415,12 +432,12 @@ async fn client_libpq(
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
client.send_get_page(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
client.recv_get_page().await.unwrap();
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
@@ -442,3 +459,104 @@ async fn client_libpq(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A benchmark client, to allow switching out the transport protocol.
|
||||
///
|
||||
/// For simplicity, this just uses separate asynchronous send/recv methods. The send method could
|
||||
/// return a future that resolves when the response is received, but we don't really need it.
|
||||
#[async_trait]
|
||||
trait Client: Send {
|
||||
/// Sends an asynchronous GetPage request to the pageserver.
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()>;
|
||||
|
||||
/// Receives the next GetPage response from the pageserver.
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse>;
|
||||
}
|
||||
|
||||
/// A libpq-based Pageserver client.
|
||||
struct LibpqClient {
|
||||
inner: pageserver_client::page_service::PagestreamClient,
|
||||
}
|
||||
|
||||
impl LibpqClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let inner = pageserver_client::page_service::Client::new(connstring)
|
||||
.await?
|
||||
.pagestream(ttid.tenant_id, ttid.timeline_id)
|
||||
.await?;
|
||||
Ok(Self { inner })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for LibpqClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
self.inner.getpage_send(req).await
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
self.inner.getpage_recv().await
|
||||
}
|
||||
}
|
||||
|
||||
/// A gRPC client using the raw, no-frills gRPC client.
|
||||
struct GrpcClient {
|
||||
req_tx: tokio::sync::mpsc::Sender<proto::GetPageRequest>,
|
||||
resp_rx: tonic::Streaming<proto::GetPageResponse>,
|
||||
}
|
||||
|
||||
impl GrpcClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?;
|
||||
|
||||
// The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the
|
||||
// benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are
|
||||
// buffered by Tonic and the OS too.
|
||||
let (req_tx, req_rx) = tokio::sync::mpsc::channel(1);
|
||||
let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx);
|
||||
let mut req = tonic::Request::new(req_stream);
|
||||
let metadata = req.metadata_mut();
|
||||
metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?);
|
||||
metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?);
|
||||
metadata.insert("neon-shard-id", "0000".try_into()?);
|
||||
|
||||
let resp = client.get_pages(req).await?;
|
||||
let resp_stream = resp.into_inner();
|
||||
|
||||
Ok(Self {
|
||||
req_tx,
|
||||
resp_rx: resp_stream,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for GrpcClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
let req = proto::GetPageRequest {
|
||||
request_id: 0,
|
||||
request_class: proto::GetPageClass::Normal as i32,
|
||||
read_lsn: Some(proto::ReadLsn {
|
||||
request_lsn: req.hdr.request_lsn.0,
|
||||
not_modified_since_lsn: req.hdr.not_modified_since.0,
|
||||
}),
|
||||
rel: Some(req.rel.into()),
|
||||
block_number: vec![req.blkno],
|
||||
};
|
||||
self.req_tx.send(req).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let resp = self.resp_rx.message().await?.unwrap();
|
||||
anyhow::ensure!(
|
||||
resp.status_code == proto::GetPageStatusCode::Ok as i32,
|
||||
"unexpected status code: {}",
|
||||
resp.status_code
|
||||
);
|
||||
Ok(PagestreamGetPageResponse {
|
||||
page: resp.page_image[0].clone(),
|
||||
req: PagestreamGetPageRequest::default(), // dummy
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user