From 66171a117b00fff67cf82097c173d6ccf7052c58 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 30 Apr 2025 15:31:11 +0200 Subject: [PATCH] page_api: add `GetPageRequestBatch` --- Cargo.lock | 1 + pageserver/client_grpc/src/lib.rs | 2 +- pageserver/page_api/Cargo.toml | 1 + pageserver/page_api/proto/page_service.proto | 13 ++++- pageserver/page_api/src/model.rs | 25 ++++++++++ .../pagebench/src/cmd/getpage_latest_lsn.rs | 3 +- pageserver/src/compute_service_grpc.rs | 48 ++++++++++--------- 7 files changed, 68 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 644d982829..e6a67d46ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4564,6 +4564,7 @@ name = "pageserver_page_api" version = "0.1.0" dependencies = [ "prost 0.13.3", + "smallvec", "thiserror 1.0.69", "tonic", "tonic-build", diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index 1301f9e1ec..b4c54d0a80 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -108,7 +108,7 @@ impl PageserverClient { pub async fn get_pages( &self, - requests: impl Stream + Send + 'static, + requests: impl Stream + Send + 'static, ) -> std::result::Result< tonic::Response>, PageserverClientError, diff --git a/pageserver/page_api/Cargo.toml b/pageserver/page_api/Cargo.toml index 0d4652281e..8fd1f318b9 100644 --- a/pageserver/page_api/Cargo.toml +++ b/pageserver/page_api/Cargo.toml @@ -11,6 +11,7 @@ edition = "2024" utils.workspace = true prost.workspace = true +smallvec.workspace = true thiserror.workspace = true tonic.workspace = true diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index 0afdcb3899..25185ae801 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -44,7 +44,7 @@ service PageService { // NB: a status response (e.g. for errors) will terminate the stream. The // stream may be shared by e.g. multiple Postgres backends, so we should avoid // this. Most errors are instead propagated in the GetPageResponse. - rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse); + rpc GetPages (stream GetPageRequestBatch) returns (stream GetPageResponse); // Fetches an SLRU segment. rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse); @@ -89,6 +89,17 @@ message GetPageRequest { uint32 block_number = 4; } +// A batch of GetPage requests. These will be executed as a single batch by the +// Pageserver, amortizing layer access costs and parallelizing them. This may +// increase the latency of any individual request, but improves the overall +// latency and throughput of the batch as a whole. +// +// Responses will be emitted individually, as soon as they are ready. They may +// be emitted in a different order than the requests. +message GetPageRequestBatch { + repeated GetPageRequest requests = 1; +} + // TODO: should this include page metadata, like reltag, LSN, and block number? message GetPageResponse { // The original request's ID. diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 6a112f5de2..a0d9ef8d50 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -12,6 +12,7 @@ //! TODO: these types should be used in the Pageserver for actual processing, //! instead of being cast into internal mirror types. +use smallvec::{SmallVec, smallvec}; use utils::lsn::Lsn; use crate::proto; @@ -55,6 +56,8 @@ pub struct GetPageRequest { pub block_number: u32, } +pub type GetPageRequestBatch = SmallVec<[GetPageRequest; 8]>; + #[derive(Clone, Debug)] pub struct GetPageResponse { pub page_image: std::vec::Vec, @@ -105,6 +108,28 @@ impl From for tonic::Status { } } +impl From for proto::GetPageRequestBatch { + fn from(value: GetPageRequestBatch) -> proto::GetPageRequestBatch { + proto::GetPageRequestBatch { + requests: (&value).iter().map(|r| r.into()).collect(), + } + } +} + +impl From for GetPageRequestBatch { + fn from(value: GetPageRequest) -> GetPageRequestBatch { + smallvec![value] + } +} + +impl From for proto::GetPageRequestBatch { + fn from(value: GetPageRequest) -> proto::GetPageRequestBatch { + proto::GetPageRequestBatch { + requests: vec![(&value).into()], + } + } +} + impl From<&RelTag> for proto::RelTag { fn from(value: &RelTag) -> proto::RelTag { proto::RelTag { diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index e8550eaa9e..372caee185 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -594,6 +594,7 @@ async fn client_grpc_stream( } // Send requests until the queue depth is reached + // TODO: use batching while inflight.len() < args.queue_depth.get() { let start = Instant::now(); let req = { @@ -624,7 +625,7 @@ async fn client_grpc_stream( block_number: block_no, } }; - request_tx.send((&req).into()).await.unwrap(); + request_tx.send(req.into()).await.unwrap(); inflight.push_back(start); } diff --git a/pageserver/src/compute_service_grpc.rs b/pageserver/src/compute_service_grpc.rs index 4305d289f8..fc49d6e07c 100644 --- a/pageserver/src/compute_service_grpc.rs +++ b/pageserver/src/compute_service_grpc.rs @@ -272,7 +272,7 @@ impl PageService for PageServiceService { async fn get_pages( &self, - request: tonic::Request>, + request: tonic::Request>, ) -> Result, tonic::Status> { let ttid = self.extract_ttid(request.metadata())?; let shard = self.extract_shard(request.metadata())?; @@ -283,35 +283,39 @@ impl PageService for PageServiceService { let mut request_stream = request.into_inner(); let response_stream = try_stream! { - while let Some(request) = request_stream.message().await? { - let guard = timeline + while let Some(batch) = request_stream.message().await? { + + // TODO: implement batching + for request in batch.requests { + let guard = timeline .gate .enter() .or(Err(tonic::Status::unavailable("timeline is shutting down")))?; - let request: model::GetPageRequest = (&request).try_into()?; - let rel = convert_reltag(&request.rel); - let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); - let lsn = Self::wait_or_get_last_lsn( - &timeline, - request.common.request_lsn, - request.common.not_modified_since_lsn, - &latest_gc_cutoff_lsn, - &ctx, - ) - .await?; - - let page_image = timeline - .get_rel_page_at_lsn( - rel, - request.block_number, - Version::Lsn(lsn), + let request: model::GetPageRequest = (&request).try_into()?; + let rel = convert_reltag(&request.rel); + let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); + let lsn = Self::wait_or_get_last_lsn( + &timeline, + request.common.request_lsn, + request.common.not_modified_since_lsn, + &latest_gc_cutoff_lsn, &ctx, - IoConcurrency::spawn_from_conf(conf, guard), ) .await?; - yield proto::GetPageResponse { id: request.id, page_image }; + let page_image = timeline + .get_rel_page_at_lsn( + rel, + request.block_number, + Version::Lsn(lsn), + &ctx, + IoConcurrency::spawn_from_conf(conf, guard), + ) + .await?; + + yield proto::GetPageResponse { id: request.id, page_image }; + } } };