page_api: add GetPageRequestBatch

This commit is contained in:
Erik Grinaker
2025-04-30 15:31:11 +02:00
parent df2806e7a0
commit 66171a117b
7 changed files with 68 additions and 25 deletions

1
Cargo.lock generated
View File

@@ -4564,6 +4564,7 @@ name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"prost 0.13.3",
"smallvec",
"thiserror 1.0.69",
"tonic",
"tonic-build",

View File

@@ -108,7 +108,7 @@ impl PageserverClient {
pub async fn get_pages(
&self,
requests: impl Stream<Item = proto::GetPageRequest> + Send + 'static,
requests: impl Stream<Item = proto::GetPageRequestBatch> + Send + 'static,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<proto::GetPageResponse>>,
PageserverClientError,

View File

@@ -11,6 +11,7 @@ edition = "2024"
utils.workspace = true
prost.workspace = true
smallvec.workspace = true
thiserror.workspace = true
tonic.workspace = true

View File

@@ -44,7 +44,7 @@ service PageService {
// NB: a status response (e.g. for errors) will terminate the stream. The
// stream may be shared by e.g. multiple Postgres backends, so we should avoid
// this. Most errors are instead propagated in the GetPageResponse.
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
rpc GetPages (stream GetPageRequestBatch) returns (stream GetPageResponse);
// Fetches an SLRU segment.
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
@@ -89,6 +89,17 @@ message GetPageRequest {
uint32 block_number = 4;
}
// A batch of GetPage requests. These will be executed as a single batch by the
// Pageserver, amortizing layer access costs and parallelizing them. This may
// increase the latency of any individual request, but improves the overall
// latency and throughput of the batch as a whole.
//
// Responses will be emitted individually, as soon as they are ready. They may
// be emitted in a different order than the requests.
message GetPageRequestBatch {
repeated GetPageRequest requests = 1;
}
// TODO: should this include page metadata, like reltag, LSN, and block number?
message GetPageResponse {
// The original request's ID.

View File

@@ -12,6 +12,7 @@
//! TODO: these types should be used in the Pageserver for actual processing,
//! instead of being cast into internal mirror types.
use smallvec::{SmallVec, smallvec};
use utils::lsn::Lsn;
use crate::proto;
@@ -55,6 +56,8 @@ pub struct GetPageRequest {
pub block_number: u32,
}
pub type GetPageRequestBatch = SmallVec<[GetPageRequest; 8]>;
#[derive(Clone, Debug)]
pub struct GetPageResponse {
pub page_image: std::vec::Vec<u8>,
@@ -105,6 +108,28 @@ impl From<ProtocolError> for tonic::Status {
}
}
impl From<GetPageRequestBatch> for proto::GetPageRequestBatch {
fn from(value: GetPageRequestBatch) -> proto::GetPageRequestBatch {
proto::GetPageRequestBatch {
requests: (&value).iter().map(|r| r.into()).collect(),
}
}
}
impl From<GetPageRequest> for GetPageRequestBatch {
fn from(value: GetPageRequest) -> GetPageRequestBatch {
smallvec![value]
}
}
impl From<GetPageRequest> for proto::GetPageRequestBatch {
fn from(value: GetPageRequest) -> proto::GetPageRequestBatch {
proto::GetPageRequestBatch {
requests: vec![(&value).into()],
}
}
}
impl From<&RelTag> for proto::RelTag {
fn from(value: &RelTag) -> proto::RelTag {
proto::RelTag {

View File

@@ -594,6 +594,7 @@ async fn client_grpc_stream(
}
// Send requests until the queue depth is reached
// TODO: use batching
while inflight.len() < args.queue_depth.get() {
let start = Instant::now();
let req = {
@@ -624,7 +625,7 @@ async fn client_grpc_stream(
block_number: block_no,
}
};
request_tx.send((&req).into()).await.unwrap();
request_tx.send(req.into()).await.unwrap();
inflight.push_back(start);
}

View File

@@ -272,7 +272,7 @@ impl PageService for PageServiceService {
async fn get_pages(
&self,
request: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
request: tonic::Request<tonic::Streaming<proto::GetPageRequestBatch>>,
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
let ttid = self.extract_ttid(request.metadata())?;
let shard = self.extract_shard(request.metadata())?;
@@ -283,35 +283,39 @@ impl PageService for PageServiceService {
let mut request_stream = request.into_inner();
let response_stream = try_stream! {
while let Some(request) = request_stream.message().await? {
let guard = timeline
while let Some(batch) = request_stream.message().await? {
// TODO: implement batching
for request in batch.requests {
let guard = timeline
.gate
.enter()
.or(Err(tonic::Status::unavailable("timeline is shutting down")))?;
let request: model::GetPageRequest = (&request).try_into()?;
let rel = convert_reltag(&request.rel);
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
request.common.request_lsn,
request.common.not_modified_since_lsn,
&latest_gc_cutoff_lsn,
&ctx,
)
.await?;
let page_image = timeline
.get_rel_page_at_lsn(
rel,
request.block_number,
Version::Lsn(lsn),
let request: model::GetPageRequest = (&request).try_into()?;
let rel = convert_reltag(&request.rel);
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
request.common.request_lsn,
request.common.not_modified_since_lsn,
&latest_gc_cutoff_lsn,
&ctx,
IoConcurrency::spawn_from_conf(conf, guard),
)
.await?;
yield proto::GetPageResponse { id: request.id, page_image };
let page_image = timeline
.get_rel_page_at_lsn(
rel,
request.block_number,
Version::Lsn(lsn),
&ctx,
IoConcurrency::spawn_from_conf(conf, guard),
)
.await?;
yield proto::GetPageResponse { id: request.id, page_image };
}
}
};