Add grpc pagebench for communicator grpc.

This commit is contained in:
Elizabeth Murray
2025-05-28 06:14:56 -07:00
parent 5d538a9503
commit df32cc153c
5 changed files with 283 additions and 41 deletions

View File

@@ -8,6 +8,7 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/client_grpc",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
@@ -199,7 +200,7 @@ tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
tonic = { version = "0.13.1", default-features = false, features = ["gzip", "channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
tonic-reflection = { version = "0.13.1", features = ["server"] }
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
@@ -254,6 +255,7 @@ metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }

View File

@@ -18,6 +18,6 @@ pub mod proto {
pub use page_service_server::{PageService, PageServiceServer};
}
mod model;
pub mod model;
pub use model::*;

View File

@@ -102,6 +102,15 @@ impl TryFrom<ReadLsn> for proto::ReadLsn {
}
}
impl From<&ReadLsn> for proto::ReadLsn {
fn from(value: &ReadLsn) -> proto::ReadLsn {
proto::ReadLsn {
request_lsn: value.request_lsn.into(),
not_modified_since_lsn: value.not_modified_since_lsn.unwrap_or_default().0,
}
}
}
// RelTag is defined in pageserver_api::reltag.
pub type RelTag = pageserver_api::reltag::RelTag;
@@ -132,6 +141,16 @@ impl From<RelTag> for proto::RelTag {
}
}
impl From<&RelTag> for proto::RelTag {
fn from(value: &RelTag) -> proto::RelTag {
proto::RelTag {
spc_oid: value.spcnode,
db_oid: value.dbnode,
rel_number: value.relnode,
fork_number: value.forknum as u32,
}
}
}
/// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
#[derive(Clone, Copy, Debug)]
pub struct CheckRelExistsRequest {
@@ -153,6 +172,14 @@ impl TryFrom<proto::CheckRelExistsRequest> for CheckRelExistsRequest {
}
}
impl From<&CheckRelExistsRequest> for proto::CheckRelExistsRequest {
fn from(value: &CheckRelExistsRequest) -> proto::CheckRelExistsRequest {
proto::CheckRelExistsRequest {
read_lsn: Some((&value.read_lsn).into()),
rel: Some((&value.rel).into()),
}
}
}
pub type CheckRelExistsResponse = bool;
impl From<proto::CheckRelExistsResponse> for CheckRelExistsResponse {
@@ -190,6 +217,15 @@ impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
}
}
impl From<&GetBaseBackupRequest> for proto::GetBaseBackupRequest {
fn from(value: &GetBaseBackupRequest) -> proto::GetBaseBackupRequest {
proto::GetBaseBackupRequest {
read_lsn: Some((&value.read_lsn).into()),
replica: value.replica,
}
}
}
impl TryFrom<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
type Error = ProtocolError;
@@ -246,6 +282,14 @@ impl TryFrom<proto::GetDbSizeRequest> for GetDbSizeRequest {
}
}
impl From<&GetDbSizeRequest> for proto::GetDbSizeRequest {
fn from(value: &GetDbSizeRequest) -> proto::GetDbSizeRequest {
proto::GetDbSizeRequest {
read_lsn: Some((&value.read_lsn).into()),
db_oid: value.db_oid,
}
}
}
impl TryFrom<GetDbSizeRequest> for proto::GetDbSizeRequest {
type Error = ProtocolError;
@@ -311,6 +355,17 @@ impl TryFrom<proto::GetPageRequest> for GetPageRequest {
}
}
impl From<&GetPageRequest> for proto::GetPageRequest {
fn from(request: &GetPageRequest) -> proto::GetPageRequest {
proto::GetPageRequest {
request_id: request.request_id,
request_class: request.request_class.into(),
read_lsn: Some(request.read_lsn.try_into().unwrap()),
rel: Some(request.rel.into()),
block_number: request.block_numbers.clone().into_vec(),
}
}
}
impl TryFrom<GetPageRequest> for proto::GetPageRequest {
type Error = ProtocolError;
@@ -505,6 +560,14 @@ impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
}
}
impl From<&GetRelSizeRequest> for proto::GetRelSizeRequest {
fn from(value: &GetRelSizeRequest) -> proto::GetRelSizeRequest {
proto::GetRelSizeRequest {
read_lsn: Some((&value.read_lsn).into()),
rel: Some((&value.rel).into()),
}
}
}
impl TryFrom<GetRelSizeRequest> for proto::GetRelSizeRequest {
type Error = ProtocolError;

View File

@@ -10,12 +10,14 @@ license.workspace = true
anyhow.workspace = true
camino.workspace = true
clap.workspace = true
thiserror.workspace = true
futures.workspace = true
hdrhistogram.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
rand.workspace = true
reqwest.workspace=true
bytes.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
@@ -23,6 +25,8 @@ tokio.workspace = true
tokio-util.workspace = true
pageserver_client.workspace = true
pageserver_client_grpc.workspace = true
pageserver_api.workspace = true
pageserver_page_api.workspace = true
utils = { path = "../../libs/utils/" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,4 +1,4 @@
use std::collections::{HashSet, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
@@ -11,20 +11,32 @@ use camino::Utf8PathBuf;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_page_api::model::{GetPageClass};
use pageserver_client::page_service::PagestreamClient;
use pageserver_api::shard::TenantShardId;
use rand::prelude::*;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::info;
use utils::id::TenantTimelineId;
use utils::id::TenantId;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::ShardIndex;
use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, StreamExt};
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "false")]
grpc: bool,
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
@@ -118,6 +130,24 @@ struct Output {
total: request_stats::Output,
}
enum ProtocolType {
Pg(PgProtocol),
Grpc(GrpcProtocol),
}
struct PgProtocol {
libpq_pagestream: PagestreamClient,
libpq_vector: VecDeque<Instant>,
}
type GetPageFut = BoxFuture<'static, (Instant, Option<pageserver_client_grpc::PageserverClientError>)>;
struct GrpcProtocol {
// mutex
grpc_page_client : Arc<pageserver_client_grpc::PageserverClient>,
grpc_vector: FuturesOrdered<GetPageFut>,
}
tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
@@ -303,7 +333,22 @@ async fn main_impl(
.unwrap();
Box::pin(async move {
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
let protocol : ProtocolType;
if args.grpc {
let grpc = GrpcProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
protocol = ProtocolType::Grpc(grpc);
} else {
let pg = PgProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
protocol = ProtocolType::Pg(pg);
}
client_proto(args, protocol, worker_id, ss, cancel, rps_period, ranges, weights).await
})
};
@@ -355,8 +400,168 @@ async fn main_impl(
anyhow::Ok(())
}
async fn client_libpq(
impl PgProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let client = pageserver_client::page_service::Client::new(conn_string)
.await
.unwrap();
let client = client
.pagestream(tenant_id, timeline_id)
.await
.unwrap();
Self {
libpq_pagestream: client,
libpq_vector: VecDeque::new(),
}
}
}
impl GrpcProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let shard_map = HashMap::from([(
ShardIndex::unsharded(),
conn_string.clone(),
)]);
let client = pageserver_client_grpc::PageserverClient::new(
&tenant_id.to_string(),
&timeline_id.to_string(),
&None,
shard_map,
);
Self {
grpc_page_client: Arc::new(client),
grpc_vector: FuturesOrdered::new(),
}
}
}
impl ProtocolType {
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: rand::distributions::weighted::WeightedIndex<i128>,
) -> () {
match self {
ProtocolType::Grpc(g) => {
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
pageserver_page_api::model::GetPageRequest {
request_id: 0, // TODO
request_class: GetPageClass::Normal,
read_lsn: pageserver_page_api::model::ReadLsn {
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since_lsn: Some(r.timeline_lsn),
},
rel: pageserver_page_api::model::RelTag {
spcnode: rel_tag.spcnode,
dbnode: rel_tag.dbnode,
relnode: rel_tag.relnode,
forknum: rel_tag.forknum,
},
block_numbers: vec![block_no].into(),
}
};
let client_clone = g.grpc_page_client.clone();
let getpage_fut : GetPageFut = async move {
let result = client_clone.get_page(&req).await;
match result {
Ok(_) => {
(start, None)
}
Err(e) => {
(start, Some(e))
}
}
}.boxed();
g.grpc_vector.push_back(getpage_fut);
}
ProtocolType::Pg(p) => {
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
let _ = p.libpq_pagestream.getpage_send(req).await;
p.libpq_vector.push_back(start);
}
}
}
async fn get_start_time(&mut self) -> Instant {
match self {
ProtocolType::Grpc(g) => {
// Logic to get start time for grpc
let (start, result) = g.grpc_vector.next().await.unwrap();
match result {
None => {
// Request succeeded
}
Some(e) => {
tracing::error!("getpage request failed: {e}");
}
}
start
}
ProtocolType::Pg(p) => {
// Logic to get start time for pgstream
let start = p.libpq_vector.pop_front().unwrap();
let _ = p.libpq_pagestream.getpage_recv().await;
start
}
}
}
pub fn len(&self) -> usize {
match self {
ProtocolType::Grpc(g) => g.grpc_vector.len(),
ProtocolType::Pg(p) => p.libpq_vector.len(),
}
}
}
async fn client_proto(
args: &Args,
mut protocol: ProtocolType,
worker_id: WorkerId,
shared_state: Arc<SharedState>,
cancel: CancellationToken,
@@ -364,18 +569,11 @@ async fn client_libpq(
ranges: Vec<KeyRange>,
weights: rand::distributions::weighted::WeightedIndex<i128>,
) {
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
.await
.unwrap();
shared_state.start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
let mut inflight = VecDeque::new();
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
@@ -390,37 +588,12 @@ async fn client_libpq(
ticks_processed = periods_passed_until_now;
}
while inflight.len() < args.queue_depth.get() {
while protocol.len() < args.queue_depth.get() {
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
client.getpage_send(req).await.unwrap();
inflight.push_back(start);
protocol.add_to_inflight(start, args, ranges.clone(), weights.clone()).await;
}
let start = inflight.pop_front().unwrap();
client.getpage_recv().await.unwrap();
let start = protocol.get_start_time().await;
let end = Instant::now();
shared_state.live_stats.request_done();
ticks_processed += 1;