From 7cd0defaf0b71d72b8954915317dbad65f730143 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 26 May 2025 13:01:36 +0200 Subject: [PATCH] page_api: add Rust domain types (#11999) ## Problem For the gRPC Pageserver API, we should convert the Protobuf types to stricter, canonical Rust types. Touches https://github.com/neondatabase/neon/issues/11728. ## Summary of changes Adds Rust domain types that mirror the Protobuf types, with conversion and validation. --- Cargo.lock | 6 + pageserver/page_api/Cargo.toml | 6 + pageserver/page_api/src/lib.rs | 4 + pageserver/page_api/src/model.rs | 581 +++++++++++++++++++++++++++++++ 4 files changed, 597 insertions(+) create mode 100644 pageserver/page_api/src/model.rs diff --git a/Cargo.lock b/Cargo.lock index ddca5bbd3f..21c863ff95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4458,9 +4458,15 @@ dependencies = [ name = "pageserver_page_api" version = "0.1.0" dependencies = [ + "bytes", + "pageserver_api", + "postgres_ffi", "prost 0.13.5", + "smallvec", + "thiserror 1.0.69", "tonic 0.13.1", "tonic-build", + "utils", "workspace_hack", ] diff --git a/pageserver/page_api/Cargo.toml b/pageserver/page_api/Cargo.toml index c237949226..4f62c77eb2 100644 --- a/pageserver/page_api/Cargo.toml +++ b/pageserver/page_api/Cargo.toml @@ -5,8 +5,14 @@ edition.workspace = true license.workspace = true [dependencies] +bytes.workspace = true +pageserver_api.workspace = true +postgres_ffi.workspace = true prost.workspace = true +smallvec.workspace = true +thiserror.workspace = true tonic.workspace = true +utils.workspace = true workspace_hack.workspace = true [build-dependencies] diff --git a/pageserver/page_api/src/lib.rs b/pageserver/page_api/src/lib.rs index 0b68d03aaa..f515f27f3e 100644 --- a/pageserver/page_api/src/lib.rs +++ b/pageserver/page_api/src/lib.rs @@ -17,3 +17,7 @@ pub mod proto { pub use page_service_client::PageServiceClient; pub use page_service_server::{PageService, PageServiceServer}; } + +mod model; + +pub use model::*; diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs new file mode 100644 index 0000000000..a83d0a5947 --- /dev/null +++ b/pageserver/page_api/src/model.rs @@ -0,0 +1,581 @@ +//! Structs representing the canonical page service API. +//! +//! These mirror the autogenerated Protobuf types. The differences are: +//! +//! - Types that are in fact required by the API are not Options. The protobuf "required" +//! attribute is deprecated and 'prost' marks a lot of members as optional because of that. +//! (See for a gripe on this) +//! +//! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits. +//! +//! - Validate protocol invariants, via try_from() and try_into(). + +use bytes::Bytes; +use postgres_ffi::Oid; +use smallvec::SmallVec; +// TODO: split out Lsn, RelTag, SlruKind, Oid and other basic types to a separate crate, to avoid +// pulling in all of their other crate dependencies when building the client. +use utils::lsn::Lsn; + +use crate::proto; + +/// A protocol error. Typically returned via try_from() or try_into(). +#[derive(thiserror::Error, Debug)] +pub enum ProtocolError { + #[error("field '{0}' has invalid value '{1}'")] + Invalid(&'static str, String), + #[error("required field '{0}' is missing")] + Missing(&'static str), +} + +impl ProtocolError { + /// Helper to generate a new ProtocolError::Invalid for the given field and value. + pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self { + Self::Invalid(field, format!("{value:?}")) + } +} + +/// The LSN a request should read at. +#[derive(Clone, Copy, Debug)] +pub struct ReadLsn { + /// The request's read LSN. + pub request_lsn: Lsn, + /// If given, the caller guarantees that the page has not been modified since this LSN. Must be + /// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page + /// without waiting for the request LSN to arrive. Valid for all request types. + /// + /// It is undefined behaviour to make a request such that the page was, in fact, modified + /// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an + /// error, or it might return the old page version or the new page version. Setting + /// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary + /// waiting. + pub not_modified_since_lsn: Option, +} + +impl ReadLsn { + /// Validates the ReadLsn. + pub fn validate(&self) -> Result<(), ProtocolError> { + if self.request_lsn == Lsn::INVALID { + return Err(ProtocolError::invalid("request_lsn", self.request_lsn)); + } + if self.not_modified_since_lsn > Some(self.request_lsn) { + return Err(ProtocolError::invalid( + "not_modified_since_lsn", + self.not_modified_since_lsn, + )); + } + Ok(()) + } +} + +impl TryFrom for ReadLsn { + type Error = ProtocolError; + + fn try_from(pb: proto::ReadLsn) -> Result { + let read_lsn = Self { + request_lsn: Lsn(pb.request_lsn), + not_modified_since_lsn: match pb.not_modified_since_lsn { + 0 => None, + lsn => Some(Lsn(lsn)), + }, + }; + read_lsn.validate()?; + Ok(read_lsn) + } +} + +impl TryFrom for proto::ReadLsn { + type Error = ProtocolError; + + fn try_from(read_lsn: ReadLsn) -> Result { + read_lsn.validate()?; + Ok(Self { + request_lsn: read_lsn.request_lsn.0, + not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0, + }) + } +} + +// RelTag is defined in pageserver_api::reltag. +pub type RelTag = pageserver_api::reltag::RelTag; + +impl TryFrom for RelTag { + type Error = ProtocolError; + + fn try_from(pb: proto::RelTag) -> Result { + Ok(Self { + spcnode: pb.spc_oid, + dbnode: pb.db_oid, + relnode: pb.rel_number, + forknum: pb + .fork_number + .try_into() + .map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?, + }) + } +} + +impl From for proto::RelTag { + fn from(rel_tag: RelTag) -> Self { + Self { + spc_oid: rel_tag.spcnode, + db_oid: rel_tag.dbnode, + rel_number: rel_tag.relnode, + fork_number: rel_tag.forknum as u32, + } + } +} + +/// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error. +#[derive(Clone, Copy, Debug)] +pub struct CheckRelExistsRequest { + pub read_lsn: ReadLsn, + pub rel: RelTag, +} + +impl TryFrom for CheckRelExistsRequest { + type Error = ProtocolError; + + fn try_from(pb: proto::CheckRelExistsRequest) -> Result { + Ok(Self { + read_lsn: pb + .read_lsn + .ok_or(ProtocolError::Missing("read_lsn"))? + .try_into()?, + rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, + }) + } +} + +pub type CheckRelExistsResponse = bool; + +impl From for CheckRelExistsResponse { + fn from(pb: proto::CheckRelExistsResponse) -> Self { + pb.exists + } +} + +impl From for proto::CheckRelExistsResponse { + fn from(exists: CheckRelExistsResponse) -> Self { + Self { exists } + } +} + +/// Requests a base backup at a given LSN. +#[derive(Clone, Copy, Debug)] +pub struct GetBaseBackupRequest { + /// The LSN to fetch a base backup at. + pub read_lsn: ReadLsn, + /// If true, logical replication slots will not be created. + pub replica: bool, +} + +impl TryFrom for GetBaseBackupRequest { + type Error = ProtocolError; + + fn try_from(pb: proto::GetBaseBackupRequest) -> Result { + Ok(Self { + read_lsn: pb + .read_lsn + .ok_or(ProtocolError::Missing("read_lsn"))? + .try_into()?, + replica: pb.replica, + }) + } +} + +impl TryFrom for proto::GetBaseBackupRequest { + type Error = ProtocolError; + + fn try_from(request: GetBaseBackupRequest) -> Result { + Ok(Self { + read_lsn: Some(request.read_lsn.try_into()?), + replica: request.replica, + }) + } +} + +pub type GetBaseBackupResponseChunk = Bytes; + +impl TryFrom for GetBaseBackupResponseChunk { + type Error = ProtocolError; + + fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result { + if pb.chunk.is_empty() { + return Err(ProtocolError::Missing("chunk")); + } + Ok(pb.chunk) + } +} + +impl TryFrom for proto::GetBaseBackupResponseChunk { + type Error = ProtocolError; + + fn try_from(chunk: GetBaseBackupResponseChunk) -> Result { + if chunk.is_empty() { + return Err(ProtocolError::Missing("chunk")); + } + Ok(Self { chunk }) + } +} + +/// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error. +#[derive(Clone, Copy, Debug)] +pub struct GetDbSizeRequest { + pub read_lsn: ReadLsn, + pub db_oid: Oid, +} + +impl TryFrom for GetDbSizeRequest { + type Error = ProtocolError; + + fn try_from(pb: proto::GetDbSizeRequest) -> Result { + Ok(Self { + read_lsn: pb + .read_lsn + .ok_or(ProtocolError::Missing("read_lsn"))? + .try_into()?, + db_oid: pb.db_oid, + }) + } +} + +impl TryFrom for proto::GetDbSizeRequest { + type Error = ProtocolError; + + fn try_from(request: GetDbSizeRequest) -> Result { + Ok(Self { + read_lsn: Some(request.read_lsn.try_into()?), + db_oid: request.db_oid, + }) + } +} + +pub type GetDbSizeResponse = u64; + +impl From for GetDbSizeResponse { + fn from(pb: proto::GetDbSizeResponse) -> Self { + pb.num_bytes + } +} + +impl From for proto::GetDbSizeResponse { + fn from(num_bytes: GetDbSizeResponse) -> Self { + Self { num_bytes } + } +} + +/// Requests one or more pages. +#[derive(Clone, Debug)] +pub struct GetPageRequest { + /// A request ID. Will be included in the response. Should be unique for in-flight requests on + /// the stream. + pub request_id: RequestID, + /// The request class. + pub request_class: GetPageClass, + /// The LSN to read at. + pub read_lsn: ReadLsn, + /// The relation to read from. + pub rel: RelTag, + /// Page numbers to read. Must belong to the remote shard. + /// + /// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access + /// costs and parallelizing them. This may increase the latency of any individual request, but + /// improves the overall latency and throughput of the batch as a whole. + pub block_numbers: SmallVec<[u32; 1]>, +} + +impl TryFrom for GetPageRequest { + type Error = ProtocolError; + + fn try_from(pb: proto::GetPageRequest) -> Result { + if pb.block_number.is_empty() { + return Err(ProtocolError::Missing("block_number")); + } + Ok(Self { + request_id: pb.request_id, + request_class: pb.request_class.into(), + read_lsn: pb + .read_lsn + .ok_or(ProtocolError::Missing("read_lsn"))? + .try_into()?, + rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, + block_numbers: pb.block_number.into(), + }) + } +} + +impl TryFrom for proto::GetPageRequest { + type Error = ProtocolError; + + fn try_from(request: GetPageRequest) -> Result { + if request.block_numbers.is_empty() { + return Err(ProtocolError::Missing("block_number")); + } + Ok(Self { + request_id: request.request_id, + request_class: request.request_class.into(), + read_lsn: Some(request.read_lsn.try_into()?), + rel: Some(request.rel.into()), + block_number: request.block_numbers.into_vec(), + }) + } +} + +/// A GetPage request ID. +pub type RequestID = u64; + +/// A GetPage request class. +#[derive(Clone, Copy, Debug)] +pub enum GetPageClass { + /// Unknown status. For backwards compatibility: used when an older client version sends a class + /// that a newer server version has removed. + Unknown, + /// A normal request. This is the default. + Normal, + /// A prefetch request. NB: can only be classified on pg < 18. + Prefetch, + /// A background request (e.g. vacuum). + Background, +} + +impl From for GetPageClass { + fn from(pb: proto::GetPageClass) -> Self { + match pb { + proto::GetPageClass::Unknown => Self::Unknown, + proto::GetPageClass::Normal => Self::Normal, + proto::GetPageClass::Prefetch => Self::Prefetch, + proto::GetPageClass::Background => Self::Background, + } + } +} + +impl From for GetPageClass { + fn from(class: i32) -> Self { + proto::GetPageClass::try_from(class) + .unwrap_or(proto::GetPageClass::Unknown) + .into() + } +} + +impl From for proto::GetPageClass { + fn from(class: GetPageClass) -> Self { + match class { + GetPageClass::Unknown => Self::Unknown, + GetPageClass::Normal => Self::Normal, + GetPageClass::Prefetch => Self::Prefetch, + GetPageClass::Background => Self::Background, + } + } +} + +impl From for i32 { + fn from(class: GetPageClass) -> Self { + proto::GetPageClass::from(class).into() + } +} + +/// A GetPage response. +/// +/// A batch response will contain all of the requested pages. We could eagerly emit individual pages +/// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the +/// batch and we'll only return once the entire batch is ready, so no one can make use of the +/// individual pages. +#[derive(Clone, Debug)] +pub struct GetPageResponse { + /// The original request's ID. + pub request_id: RequestID, + /// The response status code. + pub status: GetPageStatus, + /// A string describing the status, if any. + pub reason: Option, + /// The 8KB page images, in the same order as the request. Empty if status != OK. + pub page_images: SmallVec<[Bytes; 1]>, +} + +impl From for GetPageResponse { + fn from(pb: proto::GetPageResponse) -> Self { + Self { + request_id: pb.request_id, + status: pb.status.into(), + reason: Some(pb.reason).filter(|r| !r.is_empty()), + page_images: pb.page_image.into(), + } + } +} + +impl From for proto::GetPageResponse { + fn from(response: GetPageResponse) -> Self { + Self { + request_id: response.request_id, + status: response.status.into(), + reason: response.reason.unwrap_or_default(), + page_image: response.page_images.into_vec(), + } + } +} + +/// A GetPage response status. +#[derive(Clone, Copy, Debug)] +pub enum GetPageStatus { + /// Unknown status. For forwards compatibility: used when an older client version receives a new + /// status code from a newer server version. + Unknown, + /// The request was successful. + Ok, + /// The page did not exist. The tenant/timeline/shard has already been validated during stream + /// setup. + NotFound, + /// The request was invalid. + Invalid, + /// The tenant is rate limited. Slow down and retry later. + SlowDown, +} + +impl From for GetPageStatus { + fn from(pb: proto::GetPageStatus) -> Self { + match pb { + proto::GetPageStatus::Unknown => Self::Unknown, + proto::GetPageStatus::Ok => Self::Ok, + proto::GetPageStatus::NotFound => Self::NotFound, + proto::GetPageStatus::Invalid => Self::Invalid, + proto::GetPageStatus::SlowDown => Self::SlowDown, + } + } +} + +impl From for GetPageStatus { + fn from(status: i32) -> Self { + proto::GetPageStatus::try_from(status) + .unwrap_or(proto::GetPageStatus::Unknown) + .into() + } +} + +impl From for proto::GetPageStatus { + fn from(status: GetPageStatus) -> Self { + match status { + GetPageStatus::Unknown => Self::Unknown, + GetPageStatus::Ok => Self::Ok, + GetPageStatus::NotFound => Self::NotFound, + GetPageStatus::Invalid => Self::Invalid, + GetPageStatus::SlowDown => Self::SlowDown, + } + } +} + +impl From for i32 { + fn from(status: GetPageStatus) -> Self { + proto::GetPageStatus::from(status).into() + } +} + +// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other +// shards will error. +pub struct GetRelSizeRequest { + pub read_lsn: ReadLsn, + pub rel: RelTag, +} + +impl TryFrom for GetRelSizeRequest { + type Error = ProtocolError; + + fn try_from(proto: proto::GetRelSizeRequest) -> Result { + Ok(Self { + read_lsn: proto + .read_lsn + .ok_or(ProtocolError::Missing("read_lsn"))? + .try_into()?, + rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?, + }) + } +} + +impl TryFrom for proto::GetRelSizeRequest { + type Error = ProtocolError; + + fn try_from(request: GetRelSizeRequest) -> Result { + Ok(Self { + read_lsn: Some(request.read_lsn.try_into()?), + rel: Some(request.rel.into()), + }) + } +} + +pub type GetRelSizeResponse = u32; + +impl From for GetRelSizeResponse { + fn from(proto: proto::GetRelSizeResponse) -> Self { + proto.num_blocks + } +} + +impl From for proto::GetRelSizeResponse { + fn from(num_blocks: GetRelSizeResponse) -> Self { + Self { num_blocks } + } +} + +/// Requests an SLRU segment. Only valid on shard 0, other shards will error. +pub struct GetSlruSegmentRequest { + pub read_lsn: ReadLsn, + pub kind: SlruKind, + pub segno: u32, +} + +impl TryFrom for GetSlruSegmentRequest { + type Error = ProtocolError; + + fn try_from(pb: proto::GetSlruSegmentRequest) -> Result { + Ok(Self { + read_lsn: pb + .read_lsn + .ok_or(ProtocolError::Missing("read_lsn"))? + .try_into()?, + kind: u8::try_from(pb.kind) + .ok() + .and_then(SlruKind::from_repr) + .ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?, + segno: pb.segno, + }) + } +} + +impl TryFrom for proto::GetSlruSegmentRequest { + type Error = ProtocolError; + + fn try_from(request: GetSlruSegmentRequest) -> Result { + Ok(Self { + read_lsn: Some(request.read_lsn.try_into()?), + kind: request.kind as u32, + segno: request.segno, + }) + } +} + +pub type GetSlruSegmentResponse = Bytes; + +impl TryFrom for GetSlruSegmentResponse { + type Error = ProtocolError; + + fn try_from(pb: proto::GetSlruSegmentResponse) -> Result { + if pb.segment.is_empty() { + return Err(ProtocolError::Missing("segment")); + } + Ok(pb.segment) + } +} + +impl TryFrom for proto::GetSlruSegmentResponse { + type Error = ProtocolError; + + fn try_from(segment: GetSlruSegmentResponse) -> Result { + if segment.is_empty() { + return Err(ProtocolError::Missing("segment")); + } + Ok(Self { segment }) + } +} + +// SlruKind is defined in pageserver_api::reltag. +pub type SlruKind = pageserver_api::reltag::SlruKind;