Compare commits

...

9 Commits

Author SHA1 Message Date
Elizabeth Murray
04440343f8 Pagebench with grpc option. Note that grpc is on port 51050, so requires a connstring to be set. 2025-05-28 14:44:28 -07:00
Elizabeth Murray
578b7f1668 Remove "pub" for module module in pageserver_page_api. 2025-05-28 13:10:09 -07:00
Elizabeth Murray
97f18dd013 Remove unnecessary whitespace. 2025-05-28 12:54:53 -07:00
Elizabeth Murray
c8abe7e90f Remove unnecessary model changes. 2025-05-28 12:53:28 -07:00
Elizabeth Murray
7160fd16cd Response to review comments, code cleanup. 2025-05-28 12:40:21 -07:00
Elizabeth Murray
13b9d4cb67 Merge branch 'main' into elizabeth/communicator-grpc-minimal-domain-client 2025-05-28 09:40:47 -07:00
Elizabeth Murray
f0982f9a0a Clean up dependencies. 2025-05-28 08:51:01 -07:00
Elizabeth Murray
1634af6d10 Move conversion from string out of the auth interceptor. 2025-05-28 08:45:20 -07:00
Elizabeth Murray
53c1a7ca7f Add minimal GRPC client code that will be used for pagebench. 2025-05-28 08:09:45 -07:00
6 changed files with 497 additions and 83 deletions

90
Cargo.lock generated
View File

@@ -701,7 +701,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"itoa",
"matchit",
@@ -718,7 +718,7 @@ dependencies = [
"sync_wrapper 1.0.1",
"tokio",
"tokio-tungstenite 0.26.1",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
@@ -761,7 +761,7 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
]
@@ -1337,7 +1337,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"tower 0.5.2",
"tower",
"tower-http",
"tower-otel",
"tracing",
@@ -2066,7 +2066,7 @@ dependencies = [
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tower",
"tracing",
"utils",
"workspace_hack",
@@ -2330,7 +2330,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"pin-project",
"rand 0.8.5",
@@ -2883,9 +2883,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.8.0"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "httpdate"
@@ -2935,9 +2935,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "1.4.1"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
dependencies = [
"bytes",
"futures-channel",
@@ -2977,7 +2977,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"rustls 0.22.4",
"rustls-pki-types",
@@ -2992,7 +2992,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -3001,20 +3001,20 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.7"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"hyper 1.4.1",
"hyper 1.6.0",
"libc",
"pin-project-lite",
"socket2",
"tokio",
"tower 0.4.13",
"tower-service",
"tracing",
]
@@ -4236,6 +4236,7 @@ name = "pagebench"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"camino",
"clap",
"futures",
@@ -4244,12 +4245,15 @@ dependencies = [
"humantime-serde",
"pageserver_api",
"pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tonic 0.13.1",
"tracing",
"utils",
"workspace_hack",
@@ -4432,6 +4436,21 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"http 1.1.0",
"pageserver_page_api",
"thiserror 1.0.69",
"tokio",
"tonic 0.13.1",
"tracing",
"utils",
]
[[package]]
name = "pageserver_compaction"
version = "0.1.0"
@@ -5208,7 +5227,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.30",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"indexmap 2.9.0",
"ipnet",
@@ -5604,7 +5623,7 @@ dependencies = [
"http-body-util",
"http-types",
"humantime-serde",
"hyper 1.4.1",
"hyper 1.6.0",
"itertools 0.10.5",
"metrics",
"once_cell",
@@ -5644,7 +5663,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-rustls 0.26.0",
"hyper-util",
"ipnet",
@@ -5701,7 +5720,7 @@ dependencies = [
"futures",
"getrandom 0.2.11",
"http 1.1.0",
"hyper 1.4.1",
"hyper 1.6.0",
"parking_lot 0.11.2",
"reqwest",
"reqwest-middleware",
@@ -6642,12 +6661,12 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.5.5"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -6713,7 +6732,7 @@ dependencies = [
"http-body-util",
"http-utils",
"humantime",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"metrics",
"once_cell",
@@ -7542,7 +7561,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-timeout",
"hyper-util",
"percent-encoding",
@@ -7553,7 +7572,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
@@ -7586,21 +7605,6 @@ dependencies = [
"tonic 0.13.1",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower"
version = "0.5.2"
@@ -8591,7 +8595,7 @@ dependencies = [
"hex",
"hmac",
"hyper 0.14.30",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"indexmap 2.9.0",
"itertools 0.12.1",
@@ -8645,7 +8649,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"toml_edit",
"tower 0.5.2",
"tower",
"tracing",
"tracing-core",
"tracing-log",

View File

@@ -8,6 +8,7 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/client_grpc",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
@@ -254,6 +255,7 @@ metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }

View File

@@ -0,0 +1,16 @@
[package]
name = "pageserver_client_grpc"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
bytes.workspace = true
futures.workspace = true
http.workspace = true
thiserror.workspace = true
tonic.workspace = true
tracing.workspace = true
pageserver_page_api.workspace = true
utils.workspace = true
tokio.workspace = true

View File

@@ -0,0 +1,192 @@
//!
//! Pageserver gRPC client library
//!
//! This library provides a gRPC client for the pageserver for the
//! communicator project.
//!
//! This library is a work in progress.
//!
//!
use std::collections::HashMap;
use bytes::Bytes;
use futures::{StreamExt};
use thiserror::Error;
use tonic::metadata::AsciiMetadataValue;
use pageserver_page_api::proto;
use pageserver_page_api::proto::PageServiceClient;
use utils::shard::ShardIndex;
use std::fmt::Debug;
use tracing::error;
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};
#[derive(Error, Debug)]
pub enum PageserverClientError {
#[error("could not connect to service: {0}")]
ConnectError(#[from] tonic::transport::Error),
#[error("could not perform request: {0}`")]
RequestError(#[from] tonic::Status),
#[error("protocol error: {0}")]
ProtocolError(#[from] pageserver_page_api::ProtocolError),
#[error("could not perform request: {0}`")]
InvalidUri(#[from] http::uri::InvalidUri),
#[error("could not perform request: {0}`")]
Other(String),
}
pub struct PageserverClient {
endpoint_map: HashMap<ShardIndex, Endpoint>,
channels: tokio::sync::RwLock<HashMap<ShardIndex, Channel>>,
auth_interceptor: AuthInterceptor,
}
impl PageserverClient {
/// TODO: this doesn't currently react to changes in the shard map.
pub fn new(
tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>,
shard_map: HashMap<ShardIndex, String>,
) -> Result<Self, PageserverClientError> {
let endpoint_map: HashMap<ShardIndex, Endpoint> = shard_map
.into_iter()
.map(|(shard, url)| {
let endpoint = Endpoint::from_shared(url)
.map_err(|_e| PageserverClientError::Other("Unable to parse endpoint {url}".to_string()))?;
Ok::<(ShardIndex, Endpoint), PageserverClientError>((shard, endpoint))
})
.collect::<Result<_, _>>()?;
Ok(Self {
endpoint_map,
channels: RwLock::new(HashMap::new()),
auth_interceptor: AuthInterceptor::new(
tenant_id,
timeline_id,
auth_token,
),
})
}
//
// TODO: This opens a new gRPC stream for every request, which is extremely inefficient
pub async fn get_page(
&self,
shard: ShardIndex,
request: pageserver_page_api::GetPageRequest,
) -> Result<Vec<Bytes>, PageserverClientError> {
// FIXME: calculate the shard number correctly
let chan = self.get_client(shard).await?;
let mut client =
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
let request = proto::GetPageRequest::try_from(request)?;
let request_stream = futures::stream::once(std::future::ready(request));
let mut response_stream = client
.get_pages(tonic::Request::new(request_stream))
.await?
.into_inner();
let Some(response) = response_stream.next().await else {
return Err(PageserverClientError::Other(
"no response received for getpage request".to_string(),
));
};
match response {
Err(status) => {
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
let response: pageserver_page_api::GetPageResponse = resp.try_into().unwrap();
return Ok(response.page_images.to_vec());
}
}
}
//
// TODO: this should use a connection pool with concurrency limits,
// not a single connection to the shard.
//
async fn get_client(&self, shard: ShardIndex) -> Result<Channel, PageserverClientError> {
// Get channel from the hashmap
let mut channels = self.channels.write();
if let Some(channel) = channels.await.get(&shard) {
return Ok(channel.clone());
}
// Create a new channel if it doesn't exist
let shard_endpoint = self
.endpoint_map
.get(&shard);
let endpoint = match shard_endpoint{
Some(_endpoint) => _endpoint,
None => {
error!("Shard {shard} not found in shard map");
return Err(PageserverClientError::Other(format!(
"Shard {shard} not found in shard map"
)));
}
};
let channel = endpoint.connect().await?;
channels = self.channels.write();
channels.await.insert(shard, channel.clone());
Ok(channel.clone())
}
}
/// Inject tenant_id, timeline_id and authentication token to all pageserver requests.
#[derive(Clone)]
struct AuthInterceptor {
tenant_id: AsciiMetadataValue,
shard_id: Option<AsciiMetadataValue>,
timeline_id: AsciiMetadataValue,
auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
}
impl AuthInterceptor {
fn new(tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>) -> Self {
Self {
tenant_id: tenant_id,
shard_id: None,
timeline_id: timeline_id,
auth_header: auth_token
.map(|t| format!("Bearer {t}"))
.map(|t| t.parse().expect("could not parse auth token")),
}
}
fn for_shard(&self, shard_id: ShardIndex) -> Self {
let mut with_shard = self.clone();
with_shard.shard_id = Some(
shard_id
.to_string()
.parse()
.expect("could not parse shard id"),
);
with_shard
}
}
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
if let Some(shard_id) = &self.shard_id {
req.metadata_mut().insert("neon-shard-id", shard_id.clone());
}
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());
if let Some(auth_header) = &self.auth_header {
req.metadata_mut()
.insert("authorization", auth_header.clone());
}
Ok(req)
}
}

View File

@@ -20,9 +20,13 @@ serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tokio.workspace = true
tonic.workspace = true
tokio-util.workspace = true
async-trait = "0.1"
pageserver_client.workspace = true
pageserver_api.workspace = true
pageserver_client_grpc.workspace = true
pageserver_page_api.workspace = true
utils = { path = "../../libs/utils/" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -6,25 +6,40 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tonic::metadata::AsciiMetadataValue;
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::shard::TenantShardId;
use pageserver_client::page_service::PagestreamClient;
use rand::prelude::*;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::info;
use utils::id::TenantTimelineId;
use utils::id::TenantId;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use futures::{
future::BoxFuture,
stream::FuturesOrdered,
FutureExt, StreamExt,
};
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
use async_trait::async_trait;
use rand::distributions::weighted::WeightedIndex;
use utils::shard::ShardIndex;
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "false")]
grpc: bool,
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
@@ -303,7 +318,19 @@ async fn main_impl(
.unwrap();
Box::pin(async move {
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
if args.grpc {
let grpc = GrpcProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
client_proto(args, grpc, worker_id, ss, cancel, rps_period, ranges, weights).await
} else {
let pg = PgProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
client_proto(args, pg, worker_id, ss, cancel, rps_period, ranges, weights).await
}
})
};
@@ -354,9 +381,208 @@ async fn main_impl(
anyhow::Ok(())
}
/// Common interface for both Pg and Grpc versions.
#[async_trait]
trait Protocol: Send {
/// Constructor/factory.
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self
where
Self: Sized;
async fn client_libpq(
/// Fire off a “get page” request and store the start time.
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
);
/// Wait for the next response and return its start time.
async fn get_start_time(&mut self) -> Instant;
/// How many in-flight requests do we have?
fn len(&self) -> usize;
}
///////////////////////////////////////////////////////////////////////////////
// PgProtocol
///////////////////////////////////////////////////////////////////////////////
struct PgProtocol {
libpq_pagestream: PagestreamClient,
libpq_vector: VecDeque<Instant>,
}
#[async_trait]
impl Protocol for PgProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let client = pageserver_client::page_service::Client::new(conn_string)
.await
.unwrap()
.pagestream(tenant_id, timeline_id)
.await
.unwrap();
Self {
libpq_pagestream: client,
libpq_vector: VecDeque::new(),
}
}
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
) {
// build your PagestreamGetPageRequest exactly as before…
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key.to_rel_block().unwrap();
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
let _ = self.libpq_pagestream.getpage_send(req).await;
self.libpq_vector.push_back(start);
}
async fn get_start_time(&mut self) -> Instant {
let start = self.libpq_vector.pop_front().unwrap();
let _ = self.libpq_pagestream.getpage_recv().await;
start
}
fn len(&self) -> usize {
self.libpq_vector.len()
}
}
///////////////////////////////////////////////////////////////////////////////
// GrpcProtocol
///////////////////////////////////////////////////////////////////////////////
type GetPageFut = BoxFuture<'static, (Instant, Option<pageserver_client_grpc::PageserverClientError>)>;
struct GrpcProtocol {
grpc_page_client: Arc<pageserver_client_grpc::PageserverClient>,
grpc_vector: FuturesOrdered<GetPageFut>,
}
#[async_trait]
impl Protocol for GrpcProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let shard_map = std::collections::HashMap::from([(
ShardIndex::unsharded(),
conn_string.clone(),
)]);
let tenant_ascii : AsciiMetadataValue = tenant_id.to_string().parse().unwrap();
let timeline_ascii : AsciiMetadataValue = timeline_id.to_string().parse().unwrap();
let client = pageserver_client_grpc::PageserverClient::new(
tenant_ascii,
timeline_ascii,
None,
shard_map,
).unwrap();
Self {
grpc_page_client: Arc::new(client),
grpc_vector: FuturesOrdered::new(),
}
}
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
) {
// build your GetPageRequest exactly as before…
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key.to_rel_block().unwrap();
pageserver_page_api::GetPageRequest {
request_id: 0,
request_class: pageserver_page_api::GetPageClass::Normal,
read_lsn: pageserver_page_api::ReadLsn {
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since_lsn: Some(r.timeline_lsn),
},
rel: pageserver_page_api::RelTag {
spcnode: rel_tag.spcnode,
dbnode: rel_tag.dbnode,
relnode: rel_tag.relnode,
forknum: rel_tag.forknum,
},
block_numbers: vec![block_no].into(),
}
};
let client_clone = self.grpc_page_client.clone();
let getpage_fut : GetPageFut = async move {
let result = client_clone.get_page(ShardIndex::unsharded(), req).await;
match result {
Ok(_) => {
(start, None)
}
Err(e) => {
(start, Some(e))
}
}
}.boxed();
self.grpc_vector.push_back(getpage_fut);
}
async fn get_start_time(&mut self) -> Instant {
let (start, err) = self.grpc_vector.next().await.unwrap();
if let Some(e) = err {
tracing::error!("getpage request failed: {e}");
}
start
}
fn len(&self) -> usize {
self.grpc_vector.len()
}
}
async fn client_proto(
args: &Args,
mut protocol: impl Protocol,
worker_id: WorkerId,
shared_state: Arc<SharedState>,
cancel: CancellationToken,
@@ -364,18 +590,11 @@ async fn client_libpq(
ranges: Vec<KeyRange>,
weights: rand::distributions::weighted::WeightedIndex<i128>,
) {
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
.await
.unwrap();
shared_state.start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
let mut inflight = VecDeque::new();
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
@@ -390,37 +609,12 @@ async fn client_libpq(
ticks_processed = periods_passed_until_now;
}
while inflight.len() < args.queue_depth.get() {
while protocol.len() < args.queue_depth.get() {
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
client.getpage_send(req).await.unwrap();
inflight.push_back(start);
protocol.add_to_inflight(start, args, ranges.clone(), weights.clone()).await;
}
let start = inflight.pop_front().unwrap();
client.getpage_recv().await.unwrap();
let start = protocol.get_start_time().await;
let end = Instant::now();
shared_state.live_stats.request_done();
ticks_processed += 1;
@@ -436,9 +630,11 @@ async fn client_libpq(
if let Some(period) = &rps_period {
let next_at = client_start
+ Duration::from_micros(
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
tokio::time::sleep_until(next_at.into()).await;
}
}
}