diff --git a/Cargo.lock b/Cargo.lock index 89351432c1..98a848840f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -701,7 +701,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "itoa", "matchit", @@ -2330,7 +2330,7 @@ dependencies = [ "futures-core", "futures-sink", "http-body-util", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "pin-project", "rand 0.8.5", @@ -2883,9 +2883,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.8.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" [[package]] name = "httpdate" @@ -2935,9 +2935,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" dependencies = [ "bytes", "futures-channel", @@ -2977,7 +2977,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "rustls 0.22.4", "rustls-pki-types", @@ -2992,7 +2992,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "pin-project-lite", "tokio", @@ -3001,20 +3001,20 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.7" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710" dependencies = [ "bytes", "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.4.1", + "hyper 1.6.0", + "libc", "pin-project-lite", "socket2", "tokio", - "tower 0.4.13", "tower-service", "tracing", ] @@ -4236,6 +4236,8 @@ name = "pagebench" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", + "bytes", "camino", "clap", "futures", @@ -4244,10 +4246,13 @@ dependencies = [ "humantime-serde", "pageserver_api", "pageserver_client", + "pageserver_client_grpc", + "pageserver_page_api", "rand 0.8.5", "reqwest", "serde", "serde_json", + "thiserror 1.0.69", "tokio", "tokio-util", "tracing", @@ -4432,6 +4437,29 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "pageserver_client_grpc" +version = "0.1.0" +dependencies = [ + "bytes", + "futures", + "http 1.1.0", + "hyper 1.6.0", + "hyper-util", + "metrics", + "pageserver_page_api", + "priority-queue", + "rand 0.8.5", + "thiserror 1.0.69", + "tokio", + "tokio-util", + "tonic 0.13.1", + "tower 0.4.13", + "tracing", + "utils", + "uuid", +] + [[package]] name = "pageserver_compaction" version = "0.1.0" @@ -5008,6 +5036,17 @@ dependencies = [ "elliptic-curve 0.13.8", ] +[[package]] +name = "priority-queue" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef08705fa1589a1a59aa924ad77d14722cb0cd97b67dd5004ed5f4a4873fce8d" +dependencies = [ + "autocfg", + "equivalent", + "indexmap 2.9.0", +] + [[package]] name = "proc-macro2" version = "1.0.94" @@ -5208,7 +5247,7 @@ dependencies = [ "humantime", "humantime-serde", "hyper 0.14.30", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "indexmap 2.9.0", "ipnet", @@ -5604,7 +5643,7 @@ dependencies = [ "http-body-util", "http-types", "humantime-serde", - "hyper 1.4.1", + "hyper 1.6.0", "itertools 0.10.5", "metrics", "once_cell", @@ -5644,7 +5683,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-rustls 0.26.0", "hyper-util", "ipnet", @@ -5701,7 +5740,7 @@ dependencies = [ "futures", "getrandom 0.2.11", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.6.0", "parking_lot 0.11.2", "reqwest", "reqwest-middleware", @@ -6642,12 +6681,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.5" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -6713,7 +6752,7 @@ dependencies = [ "http-body-util", "http-utils", "humantime", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "metrics", "once_cell", @@ -7538,11 +7577,12 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "flate2", "h2 0.4.4", "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-timeout", "hyper-util", "percent-encoding", @@ -7599,6 +7639,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -8591,7 +8632,7 @@ dependencies = [ "hex", "hmac", "hyper 0.14.30", - "hyper 1.4.1", + "hyper 1.6.0", "hyper-util", "indexmap 2.9.0", "itertools 0.12.1", diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index c8084ca2b3..354240a75e 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -12,7 +12,7 @@ use std::collections::HashMap; use bytes::Bytes; -use futures::{Stream, StreamExt}; +use futures::{StreamExt}; use thiserror::Error; use tonic::metadata::AsciiMetadataValue; @@ -27,7 +27,6 @@ use tracing::error; use tokio::sync::RwLock; - use tonic::transport::{Channel, Endpoint}; #[derive(Error, Debug)] pub enum PageserverClientError { @@ -37,10 +36,8 @@ pub enum PageserverClientError { RequestError(#[from] tonic::Status), #[error("protocol error: {0}")] ProtocolError(#[from] ProtocolError), - #[error("could not perform request: {0}`")] InvalidUri(#[from] http::uri::InvalidUri), - #[error("could not perform request: {0}`")] Other(String), } @@ -48,13 +45,9 @@ pub enum PageserverClientError { pub struct PageserverClient { _tenant_id: String, _timeline_id: String, - _auth_token: Option, - shard_map: HashMap, - channels: tokio::sync::RwLock>, - auth_interceptor: AuthInterceptor, } @@ -75,54 +68,6 @@ impl PageserverClient { auth_interceptor: AuthInterceptor::new(tenant_id, timeline_id, auth_token.as_deref()), } } - pub async fn process_check_rel_exists_request( - &self, - request: &CheckRelExistsRequest, - ) -> Result { - // Current sharding model assumes that all metadata is present only at shard 0. - let shard = ShardIndex::unsharded(); - let chan = self.get_client(shard).await; - - let mut client = - PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - - let request = proto::CheckRelExistsRequest::from(request); - let response = client.check_rel_exists(tonic::Request::new(request)).await; - - match response { - Err(status) => { - return Err(PageserverClientError::RequestError(status)); - } - Ok(resp) => { - return Ok(resp.get_ref().exists); - } - } - } - - pub async fn process_get_rel_size_request( - &self, - request: &GetRelSizeRequest, - ) -> Result { - // Current sharding model assumes that all metadata is present only at shard 0. - let shard = ShardIndex::unsharded(); - let chan = self.get_client(shard).await; - - let mut client = - PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - - let request = proto::GetRelSizeRequest::from(request); - let response = client.get_rel_size(tonic::Request::new(request)).await; - - match response { - Err(status) => { - return Err(PageserverClientError::RequestError(status)); - } - Ok(resp) => { - return Ok(resp.get_ref().num_blocks); - } - } - } - // // TODO: This opens a new gRPC stream for every request, which is extremely inefficient pub async fn get_page( @@ -133,12 +78,10 @@ impl PageserverClient { let shard = ShardIndex::unsharded(); let chan = self.get_client(shard).await; - let mut client = PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); let request = proto::GetPageRequest::from(request); - let request_stream = futures::stream::once(std::future::ready(request)); let mut response_stream = client @@ -163,92 +106,7 @@ impl PageserverClient { } } - // Open a stream for requesting pages - // - // TODO: This is a pretty low level interface, the caller should not need to be concerned - // with streams. But 'get_page' is currently very naive and inefficient. - pub async fn get_pages( - &self, - requests: impl Stream + Send + 'static, - ) -> std::result::Result< - tonic::Response>, - PageserverClientError, - > { - // FIXME: calculate the shard number correctly - let shard = ShardIndex::unsharded(); - let chan = self.get_client(shard).await; - let mut client = - PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - - let response = client.get_pages(tonic::Request::new(requests)).await; - - match response { - Err(status) => { - return Err(PageserverClientError::RequestError(status)); - } - Ok(resp) => { - return Ok(resp); - } - } - } - - /// Process a request to get the size of a database. - pub async fn process_get_dbsize_request( - &self, - request: &GetDbSizeRequest, - ) -> Result { - // Current sharding model assumes that all metadata is present only at shard 0. - let shard = ShardIndex::unsharded(); - let chan = self.get_client(shard).await; - - let mut client = - PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - - let request = proto::GetDbSizeRequest::from(request); - let response = client.get_db_size(tonic::Request::new(request)).await; - - match response { - Err(status) => { - return Err(PageserverClientError::RequestError(status)); - } - Ok(resp) => { - return Ok(resp.get_ref().num_bytes); - } - } - } - /// Process a request to get the size of a database. - pub async fn get_base_backup( - &self, - request: &GetBaseBackupRequest, - gzip: bool, - ) -> std::result::Result< - tonic::Response>, - PageserverClientError, - > { - // Current sharding model assumes that all metadata is present only at shard 0. - let shard = ShardIndex::unsharded(); - let chan = self.get_client(shard).await; - - let mut client = - PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard)); - - if gzip { - client = client.accept_compressed(tonic::codec::CompressionEncoding::Gzip); - } - - let request = proto::GetBaseBackupRequest::from(request); - let response = client.get_base_backup(tonic::Request::new(request)).await; - - match response { - Err(status) => { - return Err(PageserverClientError::RequestError(status)); - } - Ok(resp) => { - return Ok(resp); - } - } - } // // TODO: this should use a connection pool with concurrency limits, // not a single connection to the shard. @@ -267,7 +125,8 @@ impl PageserverClient { let attempt = Endpoint::from_shared(shard_url.clone()) .expect("invalid endpoint") - .connect().await; + .connect() + .await; match attempt { Ok(channel) => { @@ -276,6 +135,7 @@ impl PageserverClient { channel.clone() } Err(e) => { + // TODO: handle this more gracefully, e.g. with a connection pool retry panic!("Failed to connect to shard {shard}: {e}"); } } @@ -288,7 +148,6 @@ struct AuthInterceptor { tenant_id: AsciiMetadataValue, shard_id: Option, timeline_id: AsciiMetadataValue, - auth_header: Option, // including "Bearer " prefix } @@ -329,7 +188,6 @@ impl tonic::service::Interceptor for AuthInterceptor { req.metadata_mut() .insert("authorization", auth_header.clone()); } - Ok(req) } } diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 8d2e218247..baaf65942f 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -172,14 +172,6 @@ impl TryFrom for CheckRelExistsRequest { } } -impl From<&CheckRelExistsRequest> for proto::CheckRelExistsRequest { - fn from(value: &CheckRelExistsRequest) -> proto::CheckRelExistsRequest { - proto::CheckRelExistsRequest { - read_lsn: Some((&value.read_lsn).into()), - rel: Some((&value.rel).into()), - } - } -} pub type CheckRelExistsResponse = bool; impl From for CheckRelExistsResponse { @@ -217,15 +209,6 @@ impl TryFrom for GetBaseBackupRequest { } } -impl From<&GetBaseBackupRequest> for proto::GetBaseBackupRequest { - fn from(value: &GetBaseBackupRequest) -> proto::GetBaseBackupRequest { - proto::GetBaseBackupRequest { - read_lsn: Some((&value.read_lsn).into()), - replica: value.replica, - } - } -} - impl TryFrom for proto::GetBaseBackupRequest { type Error = ProtocolError; @@ -282,14 +265,6 @@ impl TryFrom for GetDbSizeRequest { } } -impl From<&GetDbSizeRequest> for proto::GetDbSizeRequest { - fn from(value: &GetDbSizeRequest) -> proto::GetDbSizeRequest { - proto::GetDbSizeRequest { - read_lsn: Some((&value.read_lsn).into()), - db_oid: value.db_oid, - } - } -} impl TryFrom for proto::GetDbSizeRequest { type Error = ProtocolError; @@ -560,14 +535,6 @@ impl TryFrom for GetRelSizeRequest { } } -impl From<&GetRelSizeRequest> for proto::GetRelSizeRequest { - fn from(value: &GetRelSizeRequest) -> proto::GetRelSizeRequest { - proto::GetRelSizeRequest { - read_lsn: Some((&value.read_lsn).into()), - rel: Some((&value.rel).into()), - } - } -} impl TryFrom for proto::GetRelSizeRequest { type Error = ProtocolError; diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 00add30363..7a23d2f35c 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -23,6 +23,7 @@ serde_json.workspace = true tracing.workspace = true tokio.workspace = true tokio-util.workspace = true +async-trait = "0.1" pageserver_client.workspace = true pageserver_client_grpc.workspace = true diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index c13bf31975..45a0aada29 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashSet, VecDeque}; use std::future::Future; use std::num::NonZeroUsize; use std::pin::Pin; @@ -131,23 +131,6 @@ struct Output { } -enum ProtocolType { - Pg(PgProtocol), - Grpc(GrpcProtocol), -} - -struct PgProtocol { - libpq_pagestream: PagestreamClient, - libpq_vector: VecDeque, -} -type GetPageFut = BoxFuture<'static, (Instant, Option)>; - -struct GrpcProtocol { - // mutex - grpc_page_client : Arc, - grpc_vector: FuturesOrdered, -} - tokio_thread_local_stats::declare!(STATS: request_stats::Stats); pub(crate) fn main(args: Args) -> anyhow::Result<()> { @@ -333,22 +316,20 @@ async fn main_impl( .unwrap(); Box::pin(async move { - let protocol : ProtocolType; if args.grpc { let grpc = GrpcProtocol::new( args.page_service_connstring.clone(), worker_id.timeline.tenant_id, worker_id.timeline.timeline_id).await; - protocol = ProtocolType::Grpc(grpc); + client_proto(args, grpc, worker_id, ss, cancel, rps_period, ranges, weights).await } else { let pg = PgProtocol::new( args.page_service_connstring.clone(), worker_id.timeline.tenant_id, worker_id.timeline.timeline_id).await; - protocol = ProtocolType::Pg(pg); + client_proto(args, pg, worker_id, ss, cancel, rps_period, ranges, weights).await } - client_proto(args, protocol, worker_id, ss, cancel, rps_period, ranges, weights).await }) }; @@ -399,8 +380,51 @@ async fn main_impl( anyhow::Ok(()) } +// src/protocol.rs +use async_trait::async_trait; +use rand::distributions::weighted::WeightedIndex; -impl PgProtocol { +// — your existing imports for PagestreamClient, PageserverClientError, KeyRange, etc. — + +/// Common interface for both Pg and Grpc versions. +#[async_trait] +trait Protocol: Send { + /// Constructor/factory. + async fn new( + conn_string: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Self + where + Self: Sized; + + /// Fire off a “get page” request and store the start time. + async fn add_to_inflight( + &mut self, + start: Instant, + args: &Args, + ranges: Vec, + weights: WeightedIndex, + ); + + /// Wait for the next response and return its start time. + async fn get_start_time(&mut self) -> Instant; + + /// How many in-flight requests do we have? + fn len(&self) -> usize; +} + +/////////////////////////////////////////////////////////////////////////////// +// PgProtocol +/////////////////////////////////////////////////////////////////////////////// + +struct PgProtocol { + libpq_pagestream: PagestreamClient, + libpq_vector: VecDeque, +} + +#[async_trait] +impl Protocol for PgProtocol { async fn new( conn_string: String, tenant_id: TenantId, @@ -408,8 +432,7 @@ impl PgProtocol { ) -> Self { let client = pageserver_client::page_service::Client::new(conn_string) .await - .unwrap(); - let client = client + .unwrap() .pagestream(tenant_id, timeline_id) .await .unwrap(); @@ -418,150 +441,151 @@ impl PgProtocol { libpq_vector: VecDeque::new(), } } -} - -impl GrpcProtocol { - async fn new( - conn_string: String, - tenant_id: TenantId, - timeline_id: TimelineId, - ) -> Self { - let shard_map = HashMap::from([( - ShardIndex::unsharded(), - conn_string.clone(), - )]); - let client = pageserver_client_grpc::PageserverClient::new( - &tenant_id.to_string(), - &timeline_id.to_string(), - &None, - shard_map, - ); - Self { - grpc_page_client: Arc::new(client), - grpc_vector: FuturesOrdered::new(), - } - } -} -impl ProtocolType { async fn add_to_inflight( &mut self, start: Instant, args: &Args, ranges: Vec, - weights: rand::distributions::weighted::WeightedIndex, - ) -> () { - match self { - ProtocolType::Grpc(g) => { - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - assert!(key.is_rel_block_key()); - let (rel_tag, block_no) = key - .to_rel_block() - .expect("we filter non-rel-block keys out above"); - pageserver_page_api::model::GetPageRequest { - request_id: 0, // TODO - request_class: GetPageClass::Normal, - read_lsn: pageserver_page_api::model::ReadLsn { - request_lsn: if rng.gen_bool(args.req_latest_probability) { - Lsn::MAX - } else { - r.timeline_lsn - }, - not_modified_since_lsn: Some(r.timeline_lsn), - }, - rel: pageserver_page_api::model::RelTag { - spcnode: rel_tag.spcnode, - dbnode: rel_tag.dbnode, - relnode: rel_tag.relnode, - forknum: rel_tag.forknum, - }, - block_numbers: vec![block_no].into(), - } - }; - let client_clone = g.grpc_page_client.clone(); - let getpage_fut : GetPageFut = async move { - let result = client_clone.get_page(&req).await; - match result { - Ok(_) => { - (start, None) - } - Err(e) => { - (start, Some(e)) - } - } - }.boxed(); - g.grpc_vector.push_back(getpage_fut); - + weights: WeightedIndex, + ) { + // build your PagestreamGetPageRequest exactly as before… + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + let (rel_tag, block_no) = key.to_rel_block().unwrap(); + PagestreamGetPageRequest { + hdr: PagestreamRequest { + reqid: 0, + request_lsn: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, + not_modified_since: r.timeline_lsn, + }, + rel: rel_tag, + blkno: block_no, } - ProtocolType::Pg(p) => { + }; - let req = { - let mut rng = rand::thread_rng(); - let r = &ranges[weights.sample(&mut rng)]; - let key: i128 = rng.gen_range(r.start..r.end); - let key = Key::from_i128(key); - assert!(key.is_rel_block_key()); - let (rel_tag, block_no) = key - .to_rel_block() - .expect("we filter non-rel-block keys out above"); - PagestreamGetPageRequest { - hdr: PagestreamRequest { - reqid: 0, - request_lsn: if rng.gen_bool(args.req_latest_probability) { - Lsn::MAX - } else { - r.timeline_lsn - }, - not_modified_since: r.timeline_lsn, - }, - rel: rel_tag, - blkno: block_no, - } - }; - let _ = p.libpq_pagestream.getpage_send(req).await; - p.libpq_vector.push_back(start); - } - } + let _ = self.libpq_pagestream.getpage_send(req).await; + self.libpq_vector.push_back(start); } async fn get_start_time(&mut self) -> Instant { - match self { - ProtocolType::Grpc(g) => { - // Logic to get start time for grpc - let (start, result) = g.grpc_vector.next().await.unwrap(); - match result { - None => { - // Request succeeded - } - Some(e) => { - tracing::error!("getpage request failed: {e}"); - } - } - - start - } - ProtocolType::Pg(p) => { - // Logic to get start time for pgstream - let start = p.libpq_vector.pop_front().unwrap(); - let _ = p.libpq_pagestream.getpage_recv().await; - start - } - } + let start = self.libpq_vector.pop_front().unwrap(); + let _ = self.libpq_pagestream.getpage_recv().await; + start } - pub fn len(&self) -> usize { - match self { - ProtocolType::Grpc(g) => g.grpc_vector.len(), - ProtocolType::Pg(p) => p.libpq_vector.len(), - } + + fn len(&self) -> usize { + self.libpq_vector.len() } } + +/////////////////////////////////////////////////////////////////////////////// +// GrpcProtocol +/////////////////////////////////////////////////////////////////////////////// +type GetPageFut = BoxFuture<'static, (Instant, Option)>; +struct GrpcProtocol { + grpc_page_client: Arc, + grpc_vector: FuturesOrdered, +} + +#[async_trait] +impl Protocol for GrpcProtocol { + async fn new( + conn_string: String, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Self { + let shard_map = std::collections::HashMap::from([( + ShardIndex::unsharded(), + conn_string.clone(), + )]); + let client = pageserver_client_grpc::PageserverClient::new( + &tenant_id.to_string(), + &timeline_id.to_string(), + &None, + shard_map, + ); + Self { + grpc_page_client: Arc::new(client), + grpc_vector: FuturesOrdered::new(), + } + } + + async fn add_to_inflight( + &mut self, + start: Instant, + args: &Args, + ranges: Vec, + weights: WeightedIndex, + ) { + // build your GetPageRequest exactly as before… + let req = { + let mut rng = rand::thread_rng(); + let r = &ranges[weights.sample(&mut rng)]; + let key: i128 = rng.gen_range(r.start..r.end); + let key = Key::from_i128(key); + assert!(key.is_rel_block_key()); + let (rel_tag, block_no) = key.to_rel_block().unwrap(); + pageserver_page_api::model::GetPageRequest { + request_id: 0, + request_class: GetPageClass::Normal, + read_lsn: pageserver_page_api::model::ReadLsn { + request_lsn: if rng.gen_bool(args.req_latest_probability) { + Lsn::MAX + } else { + r.timeline_lsn + }, + not_modified_since_lsn: Some(r.timeline_lsn), + }, + rel: pageserver_page_api::model::RelTag { + spcnode: rel_tag.spcnode, + dbnode: rel_tag.dbnode, + relnode: rel_tag.relnode, + forknum: rel_tag.forknum, + }, + block_numbers: vec![block_no].into(), + } + }; + + let client_clone = self.grpc_page_client.clone(); + let getpage_fut : GetPageFut = async move { + let result = client_clone.get_page(&req).await; + match result { + Ok(_) => { + (start, None) + } + Err(e) => { + (start, Some(e)) + } + } + }.boxed(); + self.grpc_vector.push_back(getpage_fut); + } + + async fn get_start_time(&mut self) -> Instant { + let (start, err) = self.grpc_vector.next().await.unwrap(); + if let Some(e) = err { + tracing::error!("getpage request failed: {e}"); + } + start + } + + fn len(&self) -> usize { + self.grpc_vector.len() + } +} + async fn client_proto( args: &Args, - mut protocol: ProtocolType, + mut protocol: impl Protocol, worker_id: WorkerId, shared_state: Arc, cancel: CancellationToken,