//! Structs representing the canonical page service API. //! //! These mirror the autogenerated Protobuf types. The differences are: //! //! - Types that are in fact required by the API are not Options. The protobuf "required" //! attribute is deprecated and 'prost' marks a lot of members as optional because of that. //! (See for a gripe on this) //! //! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits. //! //! - Validate protocol invariants, via try_from() and try_into(). //! //! Validation only happens on the receiver side, i.e. when converting from Protobuf to domain //! types. This is where it matters -- the Protobuf types are less strict than the domain types, and //! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g. //! stream combinators without dealing with errors, and avoids validating the same message twice. use std::fmt::Display; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bytes::Bytes; use postgres_ffi_types::Oid; // TODO: split out Lsn, RelTag, SlruKind and other basic types to a separate crate, to avoid // pulling in all of their other crate dependencies when building the client. use utils::lsn::Lsn; use crate::proto; /// A protocol error. Typically returned via try_from() or try_into(). #[derive(thiserror::Error, Clone, Debug)] pub enum ProtocolError { #[error("field '{0}' has invalid value '{1}'")] Invalid(&'static str, String), #[error("required field '{0}' is missing")] Missing(&'static str), } impl ProtocolError { /// Helper to generate a new ProtocolError::Invalid for the given field and value. pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self { Self::Invalid(field, format!("{value:?}")) } } impl From for tonic::Status { fn from(err: ProtocolError) -> Self { tonic::Status::invalid_argument(format!("{err}")) } } /// The LSN a request should read at. #[derive(Clone, Copy, Debug, Default)] pub struct ReadLsn { /// The request's read LSN. pub request_lsn: Lsn, /// If given, the caller guarantees that the page has not been modified since this LSN. Must be /// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page /// without waiting for the request LSN to arrive. If not given, the request will read at the /// request_lsn and wait for it to arrive if necessary. Valid for all request types. /// /// It is undefined behaviour to make a request such that the page was, in fact, modified /// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an /// error, or it might return the old page version or the new page version. Setting /// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary /// waiting. pub not_modified_since_lsn: Option, } impl Display for ReadLsn { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let req_lsn = self.request_lsn; if let Some(mod_lsn) = self.not_modified_since_lsn { write!(f, "{req_lsn}>={mod_lsn}") } else { req_lsn.fmt(f) } } } impl TryFrom for ReadLsn { type Error = ProtocolError; fn try_from(pb: proto::ReadLsn) -> Result { if pb.request_lsn == 0 { return Err(ProtocolError::invalid("request_lsn", pb.request_lsn)); } if pb.not_modified_since_lsn > pb.request_lsn { return Err(ProtocolError::invalid( "not_modified_since_lsn", pb.not_modified_since_lsn, )); } Ok(Self { request_lsn: Lsn(pb.request_lsn), not_modified_since_lsn: match pb.not_modified_since_lsn { 0 => None, lsn => Some(Lsn(lsn)), }, }) } } impl From for proto::ReadLsn { fn from(read_lsn: ReadLsn) -> Self { Self { request_lsn: read_lsn.request_lsn.0, not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0, } } } // RelTag is defined in pageserver_api::reltag. pub type RelTag = pageserver_api::reltag::RelTag; impl TryFrom for RelTag { type Error = ProtocolError; fn try_from(pb: proto::RelTag) -> Result { Ok(Self { spcnode: pb.spc_oid, dbnode: pb.db_oid, relnode: pb.rel_number, forknum: pb .fork_number .try_into() .map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?, }) } } impl From for proto::RelTag { fn from(rel_tag: RelTag) -> Self { Self { spc_oid: rel_tag.spcnode, db_oid: rel_tag.dbnode, rel_number: rel_tag.relnode, fork_number: rel_tag.forknum as u32, } } } /// Requests a base backup. #[derive(Clone, Copy, Debug)] pub struct GetBaseBackupRequest { /// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver. pub lsn: Option, /// If true, logical replication slots will not be created. pub replica: bool, /// If true, include relation files in the base backup. Mainly for debugging and tests. pub full: bool, /// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC /// compression, so that we can cache compressed backups on the server. pub compression: BaseBackupCompression, } impl TryFrom for GetBaseBackupRequest { type Error = ProtocolError; fn try_from(pb: proto::GetBaseBackupRequest) -> Result { Ok(Self { lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)), replica: pb.replica, full: pb.full, compression: pb.compression.try_into()?, }) } } impl From for proto::GetBaseBackupRequest { fn from(request: GetBaseBackupRequest) -> Self { Self { lsn: request.lsn.unwrap_or_default().0, replica: request.replica, full: request.full, compression: request.compression.into(), } } } /// Base backup compression algorithm. #[derive(Clone, Copy, Debug)] pub enum BaseBackupCompression { None, Gzip, } impl TryFrom for BaseBackupCompression { type Error = ProtocolError; fn try_from(pb: proto::BaseBackupCompression) -> Result { match pb { proto::BaseBackupCompression::Unknown => Err(ProtocolError::invalid("compression", pb)), proto::BaseBackupCompression::None => Ok(Self::None), proto::BaseBackupCompression::Gzip => Ok(Self::Gzip), } } } impl TryFrom for BaseBackupCompression { type Error = ProtocolError; fn try_from(compression: i32) -> Result { proto::BaseBackupCompression::try_from(compression) .map_err(|_| ProtocolError::invalid("compression", compression)) .and_then(Self::try_from) } } impl From for proto::BaseBackupCompression { fn from(compression: BaseBackupCompression) -> Self { match compression { BaseBackupCompression::None => Self::None, BaseBackupCompression::Gzip => Self::Gzip, } } } impl From for i32 { fn from(compression: BaseBackupCompression) -> Self { proto::BaseBackupCompression::from(compression).into() } } pub type GetBaseBackupResponseChunk = Bytes; impl TryFrom for GetBaseBackupResponseChunk { type Error = ProtocolError; fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result { if pb.chunk.is_empty() { return Err(ProtocolError::Missing("chunk")); } Ok(pb.chunk) } } impl From for proto::GetBaseBackupResponseChunk { fn from(chunk: GetBaseBackupResponseChunk) -> Self { Self { chunk } } } /// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error. #[derive(Clone, Copy, Debug)] pub struct GetDbSizeRequest { pub read_lsn: ReadLsn, pub db_oid: Oid, } impl TryFrom for GetDbSizeRequest { type Error = ProtocolError; fn try_from(pb: proto::GetDbSizeRequest) -> Result { Ok(Self { read_lsn: pb .read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, db_oid: pb.db_oid, }) } } impl From for proto::GetDbSizeRequest { fn from(request: GetDbSizeRequest) -> Self { Self { read_lsn: Some(request.read_lsn.into()), db_oid: request.db_oid, } } } pub type GetDbSizeResponse = u64; impl From for GetDbSizeResponse { fn from(pb: proto::GetDbSizeResponse) -> Self { pb.num_bytes } } impl From for proto::GetDbSizeResponse { fn from(num_bytes: GetDbSizeResponse) -> Self { Self { num_bytes } } } /// Requests one or more pages. #[derive(Clone, Debug, Default)] pub struct GetPageRequest { /// A request ID. Will be included in the response. Should be unique for in-flight requests on /// the stream. pub request_id: RequestID, /// The request class. pub request_class: GetPageClass, /// The LSN to read at. pub read_lsn: ReadLsn, /// The relation to read from. pub rel: RelTag, /// Page numbers to read. Must belong to the remote shard. /// /// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access /// costs and parallelizing them. This may increase the latency of any individual request, but /// improves the overall latency and throughput of the batch as a whole. pub block_numbers: Vec, } impl TryFrom for GetPageRequest { type Error = ProtocolError; fn try_from(pb: proto::GetPageRequest) -> Result { if pb.block_number.is_empty() { return Err(ProtocolError::Missing("block_number")); } Ok(Self { request_id: pb .request_id .ok_or(ProtocolError::Missing("request_id"))? .into(), request_class: pb.request_class.into(), read_lsn: pb .read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, block_numbers: pb.block_number, }) } } impl From for proto::GetPageRequest { fn from(request: GetPageRequest) -> Self { Self { request_id: Some(request.request_id.into()), request_class: request.request_class.into(), read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), block_number: request.block_numbers, } } } /// A GetPage request ID and retry attempt. Should be unique for in-flight requests on a stream. #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct RequestID { /// The base request ID. pub id: u64, // The request attempt. Starts at 0, incremented on each retry. pub attempt: u32, } impl RequestID { /// Creates a new RequestID with the given ID and an initial attempt of 0. pub fn new(id: u64) -> Self { Self { id, attempt: 0 } } } impl Display for RequestID { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}.{}", self.id, self.attempt) } } impl From for RequestID { fn from(pb: proto::RequestId) -> Self { Self { id: pb.id, attempt: pb.attempt, } } } impl From for RequestID { fn from(id: u64) -> Self { Self::new(id) } } impl From for proto::RequestId { fn from(request_id: RequestID) -> Self { Self { id: request_id.id, attempt: request_id.attempt, } } } /// A GetPage request class. #[derive(Clone, Copy, Debug, Default, strum_macros::Display)] pub enum GetPageClass { /// Unknown class. For backwards compatibility: used when an older client version sends a class /// that a newer server version has removed. Unknown, /// A normal request. This is the default. #[default] Normal, /// A prefetch request. NB: can only be classified on pg < 18. Prefetch, /// A background request (e.g. vacuum). Background, } impl From for GetPageClass { fn from(pb: proto::GetPageClass) -> Self { match pb { proto::GetPageClass::Unknown => Self::Unknown, proto::GetPageClass::Normal => Self::Normal, proto::GetPageClass::Prefetch => Self::Prefetch, proto::GetPageClass::Background => Self::Background, } } } impl From for GetPageClass { fn from(class: i32) -> Self { proto::GetPageClass::try_from(class) .unwrap_or(proto::GetPageClass::Unknown) .into() } } impl From for proto::GetPageClass { fn from(class: GetPageClass) -> Self { match class { GetPageClass::Unknown => Self::Unknown, GetPageClass::Normal => Self::Normal, GetPageClass::Prefetch => Self::Prefetch, GetPageClass::Background => Self::Background, } } } impl From for i32 { fn from(class: GetPageClass) -> Self { proto::GetPageClass::from(class).into() } } /// A GetPage response. /// /// A batch response will contain all of the requested pages. We could eagerly emit individual pages /// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the /// batch and we'll only return once the entire batch is ready, so no one can make use of the /// individual pages. #[derive(Clone, Debug)] pub struct GetPageResponse { /// The original request's ID. pub request_id: RequestID, /// The response status code. If not OK, the `rel` and `pages` fields will be empty. pub status_code: GetPageStatusCode, /// A string describing the status, if any. pub reason: Option, /// The relation that the pages belong to. pub rel: RelTag, // The page(s), in the same order as the request. pub pages: Vec, } impl TryFrom for GetPageResponse { type Error = ProtocolError; fn try_from(pb: proto::GetPageResponse) -> Result { Ok(Self { request_id: pb .request_id .ok_or(ProtocolError::Missing("request_id"))? .into(), status_code: pb.status_code.into(), reason: Some(pb.reason).filter(|r| !r.is_empty()), rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, pages: pb.page.into_iter().map(Page::from).collect(), }) } } impl From for proto::GetPageResponse { fn from(response: GetPageResponse) -> Self { Self { request_id: Some(response.request_id.into()), status_code: response.status_code.into(), reason: response.reason.unwrap_or_default(), rel: Some(response.rel.into()), page: response.pages.into_iter().map(proto::Page::from).collect(), } } } impl GetPageResponse { /// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a /// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a /// GetPageResponse with a non-OK status code instead. #[allow(clippy::result_large_err)] pub fn try_from_status( status: tonic::Status, request_id: RequestID, ) -> Result { // We shouldn't see an OK status here, because we're emitting an error. debug_assert_ne!(status.code(), tonic::Code::Ok); if status.code() == tonic::Code::Ok { return Err(tonic::Status::internal(format!( "unexpected OK status: {status:?}", ))); } // If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request // error and we should return a tonic::Status to terminate the stream. let Ok(status_code) = status.code().try_into() else { return Err(status); }; // Return a GetPageResponse for the status. Ok(Self { request_id, status_code, reason: Some(status.message().to_string()), rel: RelTag::default(), pages: Vec::new(), }) } } // A page. #[derive(Clone, Debug)] pub struct Page { /// The page number. pub block_number: u32, /// The materialized page image, as an 8KB byte vector. pub image: Bytes, } impl From for Page { fn from(pb: proto::Page) -> Self { Self { block_number: pb.block_number, image: pb.image, } } } impl From for proto::Page { fn from(page: Page) -> Self { Self { block_number: page.block_number, image: page.image, } } } /// A GetPage response status code. /// /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream /// (potentially shared by many backends), and a gRPC status response would terminate the stream so /// we send GetPageResponse messages with these codes instead. #[derive(Clone, Copy, Debug, PartialEq, strum_macros::Display)] pub enum GetPageStatusCode { /// Unknown status. For forwards compatibility: used when an older client version receives a new /// status code from a newer server version. Unknown, /// The request was successful. Ok, /// The page did not exist. The tenant/timeline/shard has already been validated during stream /// setup. NotFound, /// The request was invalid. InvalidRequest, /// The request failed due to an internal server error. InternalError, /// The tenant is rate limited. Slow down and retry later. SlowDown, } impl From for GetPageStatusCode { fn from(pb: proto::GetPageStatusCode) -> Self { match pb { proto::GetPageStatusCode::Unknown => Self::Unknown, proto::GetPageStatusCode::Ok => Self::Ok, proto::GetPageStatusCode::NotFound => Self::NotFound, proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest, proto::GetPageStatusCode::InternalError => Self::InternalError, proto::GetPageStatusCode::SlowDown => Self::SlowDown, } } } impl From for GetPageStatusCode { fn from(status_code: i32) -> Self { proto::GetPageStatusCode::try_from(status_code) .unwrap_or(proto::GetPageStatusCode::Unknown) .into() } } impl From for proto::GetPageStatusCode { fn from(status_code: GetPageStatusCode) -> Self { match status_code { GetPageStatusCode::Unknown => Self::Unknown, GetPageStatusCode::Ok => Self::Ok, GetPageStatusCode::NotFound => Self::NotFound, GetPageStatusCode::InvalidRequest => Self::InvalidRequest, GetPageStatusCode::InternalError => Self::InternalError, GetPageStatusCode::SlowDown => Self::SlowDown, } } } impl From for i32 { fn from(status_code: GetPageStatusCode) -> Self { proto::GetPageStatusCode::from(status_code).into() } } impl TryFrom for GetPageStatusCode { type Error = tonic::Code; fn try_from(code: tonic::Code) -> Result { use tonic::Code; let status_code = match code { Code::Ok => Self::Ok, // These are per-request errors, which should be returned as GetPageResponses. Code::AlreadyExists => Self::InvalidRequest, Code::DataLoss => Self::InternalError, Code::FailedPrecondition => Self::InvalidRequest, Code::InvalidArgument => Self::InvalidRequest, Code::Internal => Self::InternalError, Code::NotFound => Self::NotFound, Code::OutOfRange => Self::InvalidRequest, Code::ResourceExhausted => Self::SlowDown, // These should terminate the stream by returning a tonic::Status. Code::Aborted | Code::Cancelled | Code::DeadlineExceeded | Code::PermissionDenied | Code::Unauthenticated | Code::Unavailable | Code::Unimplemented | Code::Unknown => return Err(code), }; Ok(status_code) } } impl From for tonic::Code { fn from(status_code: GetPageStatusCode) -> Self { use tonic::Code; match status_code { GetPageStatusCode::Unknown => Code::Unknown, GetPageStatusCode::Ok => Code::Ok, GetPageStatusCode::NotFound => Code::NotFound, GetPageStatusCode::InvalidRequest => Code::InvalidArgument, GetPageStatusCode::InternalError => Code::Internal, GetPageStatusCode::SlowDown => Code::ResourceExhausted, } } } // Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other // shards will error. #[derive(Clone, Copy, Debug)] pub struct GetRelSizeRequest { pub read_lsn: ReadLsn, pub rel: RelTag, /// If true, return missing=true for missing relations instead of a NotFound error. pub allow_missing: bool, } impl TryFrom for GetRelSizeRequest { type Error = ProtocolError; fn try_from(proto: proto::GetRelSizeRequest) -> Result { Ok(Self { read_lsn: proto .read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, allow_missing: proto.allow_missing, }) } } impl From for proto::GetRelSizeRequest { fn from(request: GetRelSizeRequest) -> Self { Self { read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), allow_missing: request.allow_missing, } } } /// The size of a relation as number of blocks, or None if `allow_missing=true` and the relation /// does not exist. /// /// INVARIANT: never None if `allow_missing=false` (returns `NotFound` error instead). pub type GetRelSizeResponse = Option; impl From for GetRelSizeResponse { fn from(pb: proto::GetRelSizeResponse) -> Self { (!pb.missing).then_some(pb.num_blocks) } } impl From for proto::GetRelSizeResponse { fn from(resp: GetRelSizeResponse) -> Self { Self { num_blocks: resp.unwrap_or_default(), missing: resp.is_none(), } } } /// Requests an SLRU segment. Only valid on shard 0, other shards will error. #[derive(Clone, Copy, Debug)] pub struct GetSlruSegmentRequest { pub read_lsn: ReadLsn, pub kind: SlruKind, pub segno: u32, } impl TryFrom for GetSlruSegmentRequest { type Error = ProtocolError; fn try_from(pb: proto::GetSlruSegmentRequest) -> Result { Ok(Self { read_lsn: pb .read_lsn .ok_or(ProtocolError::Missing("read_lsn"))? .try_into()?, kind: u8::try_from(pb.kind) .ok() .and_then(SlruKind::from_repr) .ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?, segno: pb.segno, }) } } impl From for proto::GetSlruSegmentRequest { fn from(request: GetSlruSegmentRequest) -> Self { Self { read_lsn: Some(request.read_lsn.into()), kind: request.kind as u32, segno: request.segno, } } } pub type GetSlruSegmentResponse = Bytes; impl TryFrom for GetSlruSegmentResponse { type Error = ProtocolError; fn try_from(pb: proto::GetSlruSegmentResponse) -> Result { if pb.segment.is_empty() { return Err(ProtocolError::Missing("segment")); } Ok(pb.segment) } } impl From for proto::GetSlruSegmentResponse { fn from(segment: GetSlruSegmentResponse) -> Self { Self { segment } } } // SlruKind is defined in pageserver_api::reltag. pub type SlruKind = pageserver_api::reltag::SlruKind; /// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage /// collect the LSN until the lease expires. pub struct LeaseLsnRequest { /// The LSN to lease. pub lsn: Lsn, } impl TryFrom for LeaseLsnRequest { type Error = ProtocolError; fn try_from(pb: proto::LeaseLsnRequest) -> Result { if pb.lsn == 0 { return Err(ProtocolError::Missing("lsn")); } Ok(Self { lsn: Lsn(pb.lsn) }) } } impl From for proto::LeaseLsnRequest { fn from(request: LeaseLsnRequest) -> Self { Self { lsn: request.lsn.0 } } } /// Lease expiration time. If the lease could not be granted because the LSN has already been /// garbage collected, a FailedPrecondition status will be returned instead. pub type LeaseLsnResponse = SystemTime; impl TryFrom for LeaseLsnResponse { type Error = ProtocolError; fn try_from(pb: proto::LeaseLsnResponse) -> Result { let expires = pb.expires.ok_or(ProtocolError::Missing("expires"))?; UNIX_EPOCH .checked_add(Duration::new(expires.seconds as u64, expires.nanos as u32)) .ok_or_else(|| ProtocolError::invalid("expires", expires)) } } impl From for proto::LeaseLsnResponse { fn from(response: LeaseLsnResponse) -> Self { let expires = response.duration_since(UNIX_EPOCH).unwrap_or_default(); Self { expires: Some(prost_types::Timestamp { seconds: expires.as_secs() as i64, nanos: expires.subsec_nanos() as i32, }), } } }