diff --git a/Cargo.lock b/Cargo.lock index e6a67d46ea..a3cd4dae20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4563,6 +4563,7 @@ dependencies = [ name = "pageserver_page_api" version = "0.1.0" dependencies = [ + "bytes", "prost 0.13.3", "smallvec", "thiserror 1.0.69", diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index b4c54d0a80..dc4cd09ada 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -28,6 +28,8 @@ pub enum PageserverClientError { ConnectError(#[from] tonic::transport::Error), #[error("could not perform request: {0}`")] RequestError(#[from] tonic::Status), + #[error("protocol error: {0}")] + ProtocolError(#[from] ProtocolError), #[error("could not perform request: {0}`")] InvalidUri(#[from] http::uri::InvalidUri), @@ -102,10 +104,22 @@ impl PageserverClient { let request = proto::GetPageRequest::from(request); let response = client.get_page(tonic::Request::new(request)).await?; + let response: GetPageResponse = response.into_inner().try_into()?; + if response.status != GetPageStatus::Ok { + return Err(PageserverClientError::RequestError(tonic::Status::new( + tonic::Code::Internal, + format!( + "{:?} {}", + response.status, + response.reason.unwrap_or_default() + ), + ))); + } - Ok(response.into_inner().page_image) + Ok(response.page_image) } + // TODO: this should use model::GetPageRequest and GetPageResponse pub async fn get_pages( &self, requests: impl Stream + Send + 'static, diff --git a/pageserver/page_api/Cargo.toml b/pageserver/page_api/Cargo.toml index 8fd1f318b9..04b206de65 100644 --- a/pageserver/page_api/Cargo.toml +++ b/pageserver/page_api/Cargo.toml @@ -10,6 +10,7 @@ edition = "2024" # TODO: move Lsn to separate crate? This draws in a lot more dependencies utils.workspace = true +bytes.workspace = true prost.workspace = true smallvec.workspace = true thiserror.workspace = true diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index 25185ae801..7dd757d0cb 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -8,7 +8,6 @@ // - neon-priority: used e.g. for metrics ("normal" or "low"), prefetches would be "low" // // TODO: -// - Backpressure? Rate limiting? // - Health checks? // - Tracing? OpenTelemetry? // - Compression? @@ -21,12 +20,6 @@ service PageService { // Returns the total size of a database, as # of bytes. rpc DbSize (DbSizeRequest) returns (DbSizeResponse); - // Returns whether a relation exists. - rpc RelExists(RelExistsRequest) returns (RelExistsResponse); - - // Returns the size of a relation, as # of blocks. - rpc RelSize (RelSizeRequest) returns (RelSizeResponse); - // Fetches a base backup. rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk); @@ -48,6 +41,13 @@ service PageService { // Fetches an SLRU segment. rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse); + + // Returns whether a relation exists. + rpc RelExists(RelExistsRequest) returns (RelExistsResponse); + + // Returns the size of a relation, as # of blocks. + rpc RelSize (RelSizeRequest) returns (RelSizeResponse); + } message RequestCommon { @@ -80,6 +80,7 @@ message RelSizeResponse { uint32 num_blocks = 1; } +// A single GetPage request. message GetPageRequest { // A request ID. Will be included in the response. Should be unique for // in-flight requests on the stream. @@ -100,12 +101,36 @@ message GetPageRequestBatch { repeated GetPageRequest requests = 1; } +// A GetPage response. May be emitted out of order. +// // TODO: should this include page metadata, like reltag, LSN, and block number? message GetPageResponse { // The original request's ID. uint64 id = 1; - // The 8KB page image. - bytes page_image = 2; + // The response status code. + GetPageStatus status = 2; + // A string describing the status, if any. + optional string reason = 3; + // The 8KB page image. Empty if status != OK. + bytes page_image = 4; +} + +// A GetPageResponse status code. Since we use a bidirectional stream, we don't +// want to send errors as gRPC statuses, since this would terminate the stream. +enum GetPageStatus { + // Unknown status. For backwards compatibility: used when the server sends a + // status code that the client doesn't know about. + UNKNOWN = 0; + // The request was successful. + OK = 1; + // The page did not exist. The tenant/timeline/shard has already been + // validated during stream setup. + NOT_FOUND = 2; + // The request was invalid. + INVALID = 3; + // The client is rate limited. Slow down and retry later. + // TODO: should we use this? + SLOW_DOWN = 4; } message DbSizeRequest { diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index a0d9ef8d50..5f63ec82f5 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -12,6 +12,7 @@ //! TODO: these types should be used in the Pageserver for actual processing, //! instead of being cast into internal mirror types. +use bytes::Bytes; use smallvec::{SmallVec, smallvec}; use utils::lsn::Lsn; @@ -60,7 +61,18 @@ pub type GetPageRequestBatch = SmallVec<[GetPageRequest; 8]>; #[derive(Clone, Debug)] pub struct GetPageResponse { - pub page_image: std::vec::Vec, + pub id: u64, + pub status: GetPageStatus, + pub reason: Option, + pub page_image: Bytes, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum GetPageStatus { + Ok, + NotFound, + Invalid, + SlowDown, } #[derive(Clone, Debug)] @@ -234,6 +246,29 @@ impl TryFrom<&proto::GetPageRequest> for GetPageRequest { } } +impl TryFrom for GetPageResponse { + type Error = ProtocolError; + + fn try_from(value: proto::GetPageResponse) -> Result { + let status = match proto::GetPageStatus::from_i32(value.status) { + Some(proto::GetPageStatus::Unknown) => { + return Err(ProtocolError::InvalidValue("status")); + } + Some(proto::GetPageStatus::Ok) => GetPageStatus::Ok, + Some(proto::GetPageStatus::NotFound) => GetPageStatus::NotFound, + Some(proto::GetPageStatus::Invalid) => GetPageStatus::Invalid, + Some(proto::GetPageStatus::SlowDown) => GetPageStatus::SlowDown, + None => return Err(ProtocolError::InvalidValue("status")), + }; + Ok(GetPageResponse { + id: value.id, + status, + reason: value.reason, + page_image: value.page_image, + }) + } +} + impl From<&DbSizeRequest> for proto::DbSizeRequest { fn from(value: &DbSizeRequest) -> proto::DbSizeRequest { proto::DbSizeRequest { diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 372caee185..49908016eb 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -14,6 +14,7 @@ use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest}; use pageserver_api::shard::TenantShardId; +use pageserver_page_api::model::{GetPageResponse, GetPageStatus}; use rand::prelude::*; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -631,7 +632,8 @@ async fn client_grpc_stream( // Receive responses for the inflight requests if let Some(response) = response_stream.next().await { - response.unwrap(); // Ensure the response is successful + let response: GetPageResponse = response.unwrap().try_into().unwrap(); + assert_eq!(response.status, GetPageStatus::Ok); let start = inflight.pop_front().unwrap(); let end = Instant::now(); shared_state.live_stats.request_done(); diff --git a/pageserver/src/compute_service_grpc.rs b/pageserver/src/compute_service_grpc.rs index fc49d6e07c..6c99a58d8a 100644 --- a/pageserver/src/compute_service_grpc.rs +++ b/pageserver/src/compute_service_grpc.rs @@ -263,6 +263,8 @@ impl PageService for PageServiceService { Ok(tonic::Response::new(proto::GetPageResponse { id: req.id, + status: proto::GetPageStatus::Ok as i32, + reason: None, page_image, })) } @@ -270,6 +272,7 @@ impl PageService for PageServiceService { .await } + // TODO: take and emit model types async fn get_pages( &self, request: tonic::Request>, @@ -314,7 +317,12 @@ impl PageService for PageServiceService { ) .await?; - yield proto::GetPageResponse { id: request.id, page_image }; + yield proto::GetPageResponse { + id: request.id, + status: proto::GetPageStatus::Ok as i32, + reason: None, + page_image, + }; } } };