//! Structs representing the canonical page service API. //! //! These mirror the autogenerated Protobuf types. The differences are: //! //! - Types that are in fact required by the API are not Options. The protobuf "required" //! attribute is deprecated and 'prost' marks a lot of members as optional because of that. //! (See for a gripe on this) //! //! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits. //! //! - Validate protocol invariants, via try_from() and try_into(). //! //! Validation only happens on the receiver side, i.e. when converting from Protobuf to domain //! types. This is where it matters -- the Protobuf types are less strict than the domain types, and //! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g. //! stream combinators without dealing with errors, and avoids validating the same message twice. use std::fmt::Display; use bytes::Bytes; use postgres_ffi::Oid; // TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid // pulling in all of their other crate dependencies when building the client. use utils::lsn::Lsn; use crate::proto; /// A protocol error. Typically returned via try_from() or try_into(). #[derive(thiserror::Error, Clone, Debug)] pub enum ProtocolError { #[error("field '{0}' has invalid value '{1}'")] Invalid(&'static str, String), #[error("required field '{0}' is missing")] Missing(&'static str), } impl ProtocolError { /// Helper to generate a new ProtocolError::Invalid for the given field and value. pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self { Self::Invalid(field, format!("{value:?}")) } } impl From for tonic::Status { fn from(err: ProtocolError) -> Self { tonic::Status::invalid_argument(format!("{err}")) } } /// The LSN a request should read at. #[derive(Clone, Copy, Debug)] pub struct ReadLsn { /// The request's read LSN. pub request_lsn: Lsn, /// If given, the caller guarantees that the page has not been modified since this LSN. Must be /// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page /// without waiting for the request LSN to arrive. If not given, the request will read at the /// request_lsn and wait for it to arrive if necessary. Valid for all request types. /// /// It is undefined behaviour to make a request such that the page was, in fact, modified /// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an /// error, or it might return the old page version or the new page version. Setting /// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary /// waiting. pub not_modified_since_lsn: Option, } impl Display for ReadLsn { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let req_lsn = self.request_lsn; if let Some(mod_lsn) = self.not_modified_since_lsn { write!(f, "{req_lsn}>={mod_lsn}") } else { req_lsn.fmt(f) } } } impl TryFrom for ReadLsn { type Error = ProtocolError; fn try_from(pb: proto::ReadLsn) -> Result { if pb.request_lsn == 0 { return Err(ProtocolError::invalid("request_lsn", pb.request_lsn)); } if pb.not_modified_since_lsn > pb.request_lsn { return Err(ProtocolError::invalid( "not_modified_since_lsn", pb.not_modified_since_lsn, )); } Ok(Self { request_lsn: Lsn(pb.request_lsn), not_modified_since_lsn: match pb.not_modified_since_lsn { 0 => None, lsn => Some(Lsn(lsn)), }, }) } } impl From for proto::ReadLsn { fn from(read_lsn: ReadLsn) -> Self { Self { request_lsn: read_lsn.request_lsn.0, not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0, } } } // RelTag is defined in pageserver_api::reltag. pub type RelTag = pageserver_api::reltag::RelTag; impl TryFrom for RelTag { type Error = ProtocolError; fn try_from(pb: proto::RelTag) -> Result { Ok(Self { spcnode: pb.spc_oid, dbnode: pb.db_oid, relnode: pb.rel_number, forknum: pb .fork_number .try_into() .map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?, }) } } impl From for proto::RelTag { fn from(rel_tag: RelTag) -> Self { Self { spc_oid: rel_tag.spcnode, db_oid: rel_tag.dbnode, rel_number: rel_tag.relnode, fork_number: rel_tag.forknum as u32, } } } /// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error. #[derive(Clone, Copy, Debug)] pub struct CheckRelExistsRequest { pub read_lsn: ReadLsn, pub rel: RelTag, } impl TryFrom for CheckRelExistsRequest { type Error = ProtocolError; fn try_from(pb: proto::CheckRelExistsRequest) -> Result { Ok(Self { read_lsn: pb .read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, }) } } impl From for proto::CheckRelExistsRequest { fn from(request: CheckRelExistsRequest) -> Self { Self { read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), } } } pub type CheckRelExistsResponse = bool; impl From for CheckRelExistsResponse { fn from(pb: proto::CheckRelExistsResponse) -> Self { pb.exists } } impl From for proto::CheckRelExistsResponse { fn from(exists: CheckRelExistsResponse) -> Self { Self { exists } } } /// Requests a base backup. #[derive(Clone, Copy, Debug)] pub struct GetBaseBackupRequest { /// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver. pub lsn: Option, /// If true, logical replication slots will not be created. pub replica: bool, /// If true, include relation files in the base backup. Mainly for debugging and tests. pub full: bool, } impl From for GetBaseBackupRequest { fn from(pb: proto::GetBaseBackupRequest) -> Self { Self { lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)), replica: pb.replica, full: pb.full, } } } impl From for proto::GetBaseBackupRequest { fn from(request: GetBaseBackupRequest) -> Self { Self { lsn: request.lsn.unwrap_or_default().0, replica: request.replica, full: request.full, } } } pub type GetBaseBackupResponseChunk = Bytes; impl TryFrom for GetBaseBackupResponseChunk { type Error = ProtocolError; fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result { if pb.chunk.is_empty() { return Err(ProtocolError::Missing("chunk")); } Ok(pb.chunk) } } impl From for proto::GetBaseBackupResponseChunk { fn from(chunk: GetBaseBackupResponseChunk) -> Self { Self { chunk } } } /// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error. #[derive(Clone, Copy, Debug)] pub struct GetDbSizeRequest { pub read_lsn: ReadLsn, pub db_oid: Oid, } impl TryFrom for GetDbSizeRequest { type Error = ProtocolError; fn try_from(pb: proto::GetDbSizeRequest) -> Result { Ok(Self { read_lsn: pb .read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, db_oid: pb.db_oid, }) } } impl From for proto::GetDbSizeRequest { fn from(request: GetDbSizeRequest) -> Self { Self { read_lsn: Some(request.read_lsn.into()), db_oid: request.db_oid, } } } pub type GetDbSizeResponse = u64; impl From for GetDbSizeResponse { fn from(pb: proto::GetDbSizeResponse) -> Self { pb.num_bytes } } impl From for proto::GetDbSizeResponse { fn from(num_bytes: GetDbSizeResponse) -> Self { Self { num_bytes } } } /// Requests one or more pages. #[derive(Clone, Debug)] pub struct GetPageRequest { /// A request ID. Will be included in the response. Should be unique for in-flight requests on /// the stream. pub request_id: RequestID, /// The request class. pub request_class: GetPageClass, /// The LSN to read at. pub read_lsn: ReadLsn, /// The relation to read from. pub rel: RelTag, /// Page numbers to read. Must belong to the remote shard. /// /// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access /// costs and parallelizing them. This may increase the latency of any individual request, but /// improves the overall latency and throughput of the batch as a whole. pub block_numbers: Vec, } impl TryFrom for GetPageRequest { type Error = ProtocolError; fn try_from(pb: proto::GetPageRequest) -> Result { if pb.block_number.is_empty() { return Err(ProtocolError::Missing("block_number")); } Ok(Self { request_id: pb.request_id, request_class: pb.request_class.into(), read_lsn: pb .read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, block_numbers: pb.block_number, }) } } impl From for proto::GetPageRequest { fn from(request: GetPageRequest) -> Self { Self { request_id: request.request_id, request_class: request.request_class.into(), read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), block_number: request.block_numbers, } } } /// A GetPage request ID. pub type RequestID = u64; /// A GetPage request class. #[derive(Clone, Copy, Debug)] pub enum GetPageClass { /// Unknown class. For backwards compatibility: used when an older client version sends a class /// that a newer server version has removed. Unknown, /// A normal request. This is the default. Normal, /// A prefetch request. NB: can only be classified on pg < 18. Prefetch, /// A background request (e.g. vacuum). Background, } impl From for GetPageClass { fn from(pb: proto::GetPageClass) -> Self { match pb { proto::GetPageClass::Unknown => Self::Unknown, proto::GetPageClass::Normal => Self::Normal, proto::GetPageClass::Prefetch => Self::Prefetch, proto::GetPageClass::Background => Self::Background, } } } impl From for GetPageClass { fn from(class: i32) -> Self { proto::GetPageClass::try_from(class) .unwrap_or(proto::GetPageClass::Unknown) .into() } } impl From for proto::GetPageClass { fn from(class: GetPageClass) -> Self { match class { GetPageClass::Unknown => Self::Unknown, GetPageClass::Normal => Self::Normal, GetPageClass::Prefetch => Self::Prefetch, GetPageClass::Background => Self::Background, } } } impl From for i32 { fn from(class: GetPageClass) -> Self { proto::GetPageClass::from(class).into() } } /// A GetPage response. /// /// A batch response will contain all of the requested pages. We could eagerly emit individual pages /// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the /// batch and we'll only return once the entire batch is ready, so no one can make use of the /// individual pages. #[derive(Clone, Debug)] pub struct GetPageResponse { /// The original request's ID. pub request_id: RequestID, /// The response status code. pub status_code: GetPageStatusCode, /// A string describing the status, if any. pub reason: Option, /// The 8KB page images, in the same order as the request. Empty if status != OK. pub page_images: Vec, } impl From for GetPageResponse { fn from(pb: proto::GetPageResponse) -> Self { Self { request_id: pb.request_id, status_code: pb.status_code.into(), reason: Some(pb.reason).filter(|r| !r.is_empty()), page_images: pb.page_image, } } } impl From for proto::GetPageResponse { fn from(response: GetPageResponse) -> Self { Self { request_id: response.request_id, status_code: response.status_code.into(), reason: response.reason.unwrap_or_default(), page_image: response.page_images, } } } impl GetPageResponse { /// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a /// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a /// GetPageResponse with a non-OK status code instead. #[allow(clippy::result_large_err)] pub fn try_from_status( status: tonic::Status, request_id: RequestID, ) -> Result { // We shouldn't see an OK status here, because we're emitting an error. debug_assert_ne!(status.code(), tonic::Code::Ok); if status.code() == tonic::Code::Ok { return Err(tonic::Status::internal(format!( "unexpected OK status: {status:?}", ))); } // If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request // error and we should return a tonic::Status to terminate the stream. let Ok(status_code) = status.code().try_into() else { return Err(status); }; // Return a GetPageResponse for the status. Ok(Self { request_id, status_code, reason: Some(status.message().to_string()), page_images: Vec::new(), }) } } /// A GetPage response status code. /// /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream /// (potentially shared by many backends), and a gRPC status response would terminate the stream so /// we send GetPageResponse messages with these codes instead. #[derive(Clone, Copy, Debug, PartialEq, strum_macros::Display)] pub enum GetPageStatusCode { /// Unknown status. For forwards compatibility: used when an older client version receives a new /// status code from a newer server version. Unknown, /// The request was successful. Ok, /// The page did not exist. The tenant/timeline/shard has already been validated during stream /// setup. NotFound, /// The request was invalid. InvalidRequest, /// The request failed due to an internal server error. InternalError, /// The tenant is rate limited. Slow down and retry later. SlowDown, } impl From for GetPageStatusCode { fn from(pb: proto::GetPageStatusCode) -> Self { match pb { proto::GetPageStatusCode::Unknown => Self::Unknown, proto::GetPageStatusCode::Ok => Self::Ok, proto::GetPageStatusCode::NotFound => Self::NotFound, proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest, proto::GetPageStatusCode::InternalError => Self::InternalError, proto::GetPageStatusCode::SlowDown => Self::SlowDown, } } } impl From for GetPageStatusCode { fn from(status_code: i32) -> Self { proto::GetPageStatusCode::try_from(status_code) .unwrap_or(proto::GetPageStatusCode::Unknown) .into() } } impl From for proto::GetPageStatusCode { fn from(status_code: GetPageStatusCode) -> Self { match status_code { GetPageStatusCode::Unknown => Self::Unknown, GetPageStatusCode::Ok => Self::Ok, GetPageStatusCode::NotFound => Self::NotFound, GetPageStatusCode::InvalidRequest => Self::InvalidRequest, GetPageStatusCode::InternalError => Self::InternalError, GetPageStatusCode::SlowDown => Self::SlowDown, } } } impl From for i32 { fn from(status_code: GetPageStatusCode) -> Self { proto::GetPageStatusCode::from(status_code).into() } } impl TryFrom for GetPageStatusCode { type Error = tonic::Code; fn try_from(code: tonic::Code) -> Result { use tonic::Code; let status_code = match code { Code::Ok => Self::Ok, // These are per-request errors, which should be returned as GetPageResponses. Code::AlreadyExists => Self::InvalidRequest, Code::DataLoss => Self::InternalError, Code::FailedPrecondition => Self::InvalidRequest, Code::InvalidArgument => Self::InvalidRequest, Code::Internal => Self::InternalError, Code::NotFound => Self::NotFound, Code::OutOfRange => Self::InvalidRequest, Code::ResourceExhausted => Self::SlowDown, // These should terminate the stream by returning a tonic::Status. Code::Aborted | Code::Cancelled | Code::DeadlineExceeded | Code::PermissionDenied | Code::Unauthenticated | Code::Unavailable | Code::Unimplemented | Code::Unknown => return Err(code), }; Ok(status_code) } } // Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other // shards will error. #[derive(Clone, Copy, Debug)] pub struct GetRelSizeRequest { pub read_lsn: ReadLsn, pub rel: RelTag, } impl TryFrom for GetRelSizeRequest { type Error = ProtocolError; fn try_from(proto: proto::GetRelSizeRequest) -> Result { Ok(Self { read_lsn: proto .read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, }) } } impl From for proto::GetRelSizeRequest { fn from(request: GetRelSizeRequest) -> Self { Self { read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), } } } pub type GetRelSizeResponse = u32; impl From for GetRelSizeResponse { fn from(proto: proto::GetRelSizeResponse) -> Self { proto.num_blocks } } impl From for proto::GetRelSizeResponse { fn from(num_blocks: GetRelSizeResponse) -> Self { Self { num_blocks } } } /// Requests an SLRU segment. Only valid on shard 0, other shards will error. #[derive(Clone, Copy, Debug)] pub struct GetSlruSegmentRequest { pub read_lsn: ReadLsn, pub kind: SlruKind, pub segno: u32, } impl TryFrom for GetSlruSegmentRequest { type Error = ProtocolError; fn try_from(pb: proto::GetSlruSegmentRequest) -> Result { Ok(Self { read_lsn: pb .read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, kind: u8::try_from(pb.kind) .ok() .and_then(SlruKind::from_repr) .ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?, segno: pb.segno, }) } } impl From for proto::GetSlruSegmentRequest { fn from(request: GetSlruSegmentRequest) -> Self { Self { read_lsn: Some(request.read_lsn.into()), kind: request.kind as u32, segno: request.segno, } } } pub type GetSlruSegmentResponse = Bytes; impl TryFrom for GetSlruSegmentResponse { type Error = ProtocolError; fn try_from(pb: proto::GetSlruSegmentResponse) -> Result { if pb.segment.is_empty() { return Err(ProtocolError::Missing("segment")); } Ok(pb.segment) } } impl From for proto::GetSlruSegmentResponse { fn from(segment: GetSlruSegmentResponse) -> Self { Self { segment } } } // SlruKind is defined in pageserver_api::reltag. pub type SlruKind = pageserver_api::reltag::SlruKind;