page_api: add GetPageResponse::status

This commit is contained in:
Erik Grinaker
2025-04-30 16:48:45 +02:00
parent 66171a117b
commit 2c0d930e3d
7 changed files with 99 additions and 13 deletions

1
Cargo.lock generated
View File

@@ -4563,6 +4563,7 @@ dependencies = [
name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"bytes",
"prost 0.13.3",
"smallvec",
"thiserror 1.0.69",

View File

@@ -28,6 +28,8 @@ pub enum PageserverClientError {
ConnectError(#[from] tonic::transport::Error),
#[error("could not perform request: {0}`")]
RequestError(#[from] tonic::Status),
#[error("protocol error: {0}")]
ProtocolError(#[from] ProtocolError),
#[error("could not perform request: {0}`")]
InvalidUri(#[from] http::uri::InvalidUri),
@@ -102,10 +104,22 @@ impl PageserverClient {
let request = proto::GetPageRequest::from(request);
let response = client.get_page(tonic::Request::new(request)).await?;
let response: GetPageResponse = response.into_inner().try_into()?;
if response.status != GetPageStatus::Ok {
return Err(PageserverClientError::RequestError(tonic::Status::new(
tonic::Code::Internal,
format!(
"{:?} {}",
response.status,
response.reason.unwrap_or_default()
),
)));
}
Ok(response.into_inner().page_image)
Ok(response.page_image)
}
// TODO: this should use model::GetPageRequest and GetPageResponse
pub async fn get_pages(
&self,
requests: impl Stream<Item = proto::GetPageRequestBatch> + Send + 'static,

View File

@@ -10,6 +10,7 @@ edition = "2024"
# TODO: move Lsn to separate crate? This draws in a lot more dependencies
utils.workspace = true
bytes.workspace = true
prost.workspace = true
smallvec.workspace = true
thiserror.workspace = true

View File

@@ -8,7 +8,6 @@
// - neon-priority: used e.g. for metrics ("normal" or "low"), prefetches would be "low"
//
// TODO:
// - Backpressure? Rate limiting?
// - Health checks?
// - Tracing? OpenTelemetry?
// - Compression?
@@ -21,12 +20,6 @@ service PageService {
// Returns the total size of a database, as # of bytes.
rpc DbSize (DbSizeRequest) returns (DbSizeResponse);
// Returns whether a relation exists.
rpc RelExists(RelExistsRequest) returns (RelExistsResponse);
// Returns the size of a relation, as # of blocks.
rpc RelSize (RelSizeRequest) returns (RelSizeResponse);
// Fetches a base backup.
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
@@ -48,6 +41,13 @@ service PageService {
// Fetches an SLRU segment.
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
// Returns whether a relation exists.
rpc RelExists(RelExistsRequest) returns (RelExistsResponse);
// Returns the size of a relation, as # of blocks.
rpc RelSize (RelSizeRequest) returns (RelSizeResponse);
}
message RequestCommon {
@@ -80,6 +80,7 @@ message RelSizeResponse {
uint32 num_blocks = 1;
}
// A single GetPage request.
message GetPageRequest {
// A request ID. Will be included in the response. Should be unique for
// in-flight requests on the stream.
@@ -100,12 +101,36 @@ message GetPageRequestBatch {
repeated GetPageRequest requests = 1;
}
// A GetPage response. May be emitted out of order.
//
// TODO: should this include page metadata, like reltag, LSN, and block number?
message GetPageResponse {
// The original request's ID.
uint64 id = 1;
// The 8KB page image.
bytes page_image = 2;
// The response status code.
GetPageStatus status = 2;
// A string describing the status, if any.
optional string reason = 3;
// The 8KB page image. Empty if status != OK.
bytes page_image = 4;
}
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
// want to send errors as gRPC statuses, since this would terminate the stream.
enum GetPageStatus {
// Unknown status. For backwards compatibility: used when the server sends a
// status code that the client doesn't know about.
UNKNOWN = 0;
// The request was successful.
OK = 1;
// The page did not exist. The tenant/timeline/shard has already been
// validated during stream setup.
NOT_FOUND = 2;
// The request was invalid.
INVALID = 3;
// The client is rate limited. Slow down and retry later.
// TODO: should we use this?
SLOW_DOWN = 4;
}
message DbSizeRequest {

View File

@@ -12,6 +12,7 @@
//! TODO: these types should be used in the Pageserver for actual processing,
//! instead of being cast into internal mirror types.
use bytes::Bytes;
use smallvec::{SmallVec, smallvec};
use utils::lsn::Lsn;
@@ -60,7 +61,18 @@ pub type GetPageRequestBatch = SmallVec<[GetPageRequest; 8]>;
#[derive(Clone, Debug)]
pub struct GetPageResponse {
pub page_image: std::vec::Vec<u8>,
pub id: u64,
pub status: GetPageStatus,
pub reason: Option<String>,
pub page_image: Bytes,
}
#[derive(Clone, Debug, PartialEq)]
pub enum GetPageStatus {
Ok,
NotFound,
Invalid,
SlowDown,
}
#[derive(Clone, Debug)]
@@ -234,6 +246,29 @@ impl TryFrom<&proto::GetPageRequest> for GetPageRequest {
}
}
impl TryFrom<proto::GetPageResponse> for GetPageResponse {
type Error = ProtocolError;
fn try_from(value: proto::GetPageResponse) -> Result<GetPageResponse, ProtocolError> {
let status = match proto::GetPageStatus::from_i32(value.status) {
Some(proto::GetPageStatus::Unknown) => {
return Err(ProtocolError::InvalidValue("status"));
}
Some(proto::GetPageStatus::Ok) => GetPageStatus::Ok,
Some(proto::GetPageStatus::NotFound) => GetPageStatus::NotFound,
Some(proto::GetPageStatus::Invalid) => GetPageStatus::Invalid,
Some(proto::GetPageStatus::SlowDown) => GetPageStatus::SlowDown,
None => return Err(ProtocolError::InvalidValue("status")),
};
Ok(GetPageResponse {
id: value.id,
status,
reason: value.reason,
page_image: value.page_image,
})
}
}
impl From<&DbSizeRequest> for proto::DbSizeRequest {
fn from(value: &DbSizeRequest) -> proto::DbSizeRequest {
proto::DbSizeRequest {

View File

@@ -14,6 +14,7 @@ use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::shard::TenantShardId;
use pageserver_page_api::model::{GetPageResponse, GetPageStatus};
use rand::prelude::*;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -631,7 +632,8 @@ async fn client_grpc_stream(
// Receive responses for the inflight requests
if let Some(response) = response_stream.next().await {
response.unwrap(); // Ensure the response is successful
let response: GetPageResponse = response.unwrap().try_into().unwrap();
assert_eq!(response.status, GetPageStatus::Ok);
let start = inflight.pop_front().unwrap();
let end = Instant::now();
shared_state.live_stats.request_done();

View File

@@ -263,6 +263,8 @@ impl PageService for PageServiceService {
Ok(tonic::Response::new(proto::GetPageResponse {
id: req.id,
status: proto::GetPageStatus::Ok as i32,
reason: None,
page_image,
}))
}
@@ -270,6 +272,7 @@ impl PageService for PageServiceService {
.await
}
// TODO: take and emit model types
async fn get_pages(
&self,
request: tonic::Request<tonic::Streaming<proto::GetPageRequestBatch>>,
@@ -314,7 +317,12 @@ impl PageService for PageServiceService {
)
.await?;
yield proto::GetPageResponse { id: request.id, page_image };
yield proto::GetPageResponse {
id: request.id,
status: proto::GetPageStatus::Ok as i32,
reason: None,
page_image,
};
}
}
};