Files
neon/pageserver/page_api/src/model.rs
Erik Grinaker 194b9ffc41 pageserver: remove gRPC CheckRelExists (#12616)
## Problem

Postgres will often immediately follow a relation existence check with a
relation size query. This incurs two roundtrips, and may prevent
effective caching.

See [Slack
thread](https://databricks.slack.com/archives/C091SDX74SC/p1751951732136139).

Touches #11728.

## Summary of changes

For the gRPC API:

* Add an `allow_missing` parameter to `GetRelSize`, which returns
`missing=true` instead of a `NotFound` error.
* Remove `CheckRelExists`.

There are no changes to libpq behavior.
2025-07-21 11:43:26 +00:00

824 lines
26 KiB
Rust

//! Structs representing the canonical page service API.
//!
//! These mirror the autogenerated Protobuf types. The differences are:
//!
//! - Types that are in fact required by the API are not Options. The protobuf "required"
//! attribute is deprecated and 'prost' marks a lot of members as optional because of that.
//! (See <https://github.com/tokio-rs/prost/issues/800> for a gripe on this)
//!
//! - Use more precise datatypes, e.g. Lsn and uints shorter than 32 bits.
//!
//! - Validate protocol invariants, via try_from() and try_into().
//!
//! Validation only happens on the receiver side, i.e. when converting from Protobuf to domain
//! types. This is where it matters -- the Protobuf types are less strict than the domain types, and
//! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g.
//! stream combinators without dealing with errors, and avoids validating the same message twice.
use std::fmt::Display;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use postgres_ffi_types::Oid;
// TODO: split out Lsn, RelTag, SlruKind and other basic types to a separate crate, to avoid
// pulling in all of their other crate dependencies when building the client.
use utils::lsn::Lsn;
use crate::proto;
/// A protocol error. Typically returned via try_from() or try_into().
#[derive(thiserror::Error, Clone, Debug)]
pub enum ProtocolError {
#[error("field '{0}' has invalid value '{1}'")]
Invalid(&'static str, String),
#[error("required field '{0}' is missing")]
Missing(&'static str),
}
impl ProtocolError {
/// Helper to generate a new ProtocolError::Invalid for the given field and value.
pub fn invalid(field: &'static str, value: impl std::fmt::Debug) -> Self {
Self::Invalid(field, format!("{value:?}"))
}
}
impl From<ProtocolError> for tonic::Status {
fn from(err: ProtocolError) -> Self {
tonic::Status::invalid_argument(format!("{err}"))
}
}
/// The LSN a request should read at.
#[derive(Clone, Copy, Debug, Default)]
pub struct ReadLsn {
/// The request's read LSN.
pub request_lsn: Lsn,
/// If given, the caller guarantees that the page has not been modified since this LSN. Must be
/// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
/// without waiting for the request LSN to arrive. If not given, the request will read at the
/// request_lsn and wait for it to arrive if necessary. Valid for all request types.
///
/// It is undefined behaviour to make a request such that the page was, in fact, modified
/// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
/// error, or it might return the old page version or the new page version. Setting
/// not_modified_since_lsn equal to request_lsn is always safe, but can lead to unnecessary
/// waiting.
pub not_modified_since_lsn: Option<Lsn>,
}
impl Display for ReadLsn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let req_lsn = self.request_lsn;
if let Some(mod_lsn) = self.not_modified_since_lsn {
write!(f, "{req_lsn}>={mod_lsn}")
} else {
req_lsn.fmt(f)
}
}
}
impl TryFrom<proto::ReadLsn> for ReadLsn {
type Error = ProtocolError;
fn try_from(pb: proto::ReadLsn) -> Result<Self, Self::Error> {
if pb.request_lsn == 0 {
return Err(ProtocolError::invalid("request_lsn", pb.request_lsn));
}
if pb.not_modified_since_lsn > pb.request_lsn {
return Err(ProtocolError::invalid(
"not_modified_since_lsn",
pb.not_modified_since_lsn,
));
}
Ok(Self {
request_lsn: Lsn(pb.request_lsn),
not_modified_since_lsn: match pb.not_modified_since_lsn {
0 => None,
lsn => Some(Lsn(lsn)),
},
})
}
}
impl From<ReadLsn> for proto::ReadLsn {
fn from(read_lsn: ReadLsn) -> Self {
Self {
request_lsn: read_lsn.request_lsn.0,
not_modified_since_lsn: read_lsn.not_modified_since_lsn.unwrap_or_default().0,
}
}
}
// RelTag is defined in pageserver_api::reltag.
pub type RelTag = pageserver_api::reltag::RelTag;
impl TryFrom<proto::RelTag> for RelTag {
type Error = ProtocolError;
fn try_from(pb: proto::RelTag) -> Result<Self, Self::Error> {
Ok(Self {
spcnode: pb.spc_oid,
dbnode: pb.db_oid,
relnode: pb.rel_number,
forknum: pb
.fork_number
.try_into()
.map_err(|_| ProtocolError::invalid("fork_number", pb.fork_number))?,
})
}
}
impl From<RelTag> for proto::RelTag {
fn from(rel_tag: RelTag) -> Self {
Self {
spc_oid: rel_tag.spcnode,
db_oid: rel_tag.dbnode,
rel_number: rel_tag.relnode,
fork_number: rel_tag.forknum as u32,
}
}
}
/// Requests a base backup.
#[derive(Clone, Copy, Debug)]
pub struct GetBaseBackupRequest {
/// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver.
pub lsn: Option<Lsn>,
/// If true, logical replication slots will not be created.
pub replica: bool,
/// If true, include relation files in the base backup. Mainly for debugging and tests.
pub full: bool,
/// Compression algorithm to use. Base backups send a compressed payload instead of using gRPC
/// compression, so that we can cache compressed backups on the server.
pub compression: BaseBackupCompression,
}
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
Ok(Self {
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
replica: pb.replica,
full: pb.full,
compression: pb.compression.try_into()?,
})
}
}
impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
fn from(request: GetBaseBackupRequest) -> Self {
Self {
lsn: request.lsn.unwrap_or_default().0,
replica: request.replica,
full: request.full,
compression: request.compression.into(),
}
}
}
/// Base backup compression algorithm.
#[derive(Clone, Copy, Debug)]
pub enum BaseBackupCompression {
None,
Gzip,
}
impl TryFrom<proto::BaseBackupCompression> for BaseBackupCompression {
type Error = ProtocolError;
fn try_from(pb: proto::BaseBackupCompression) -> Result<Self, Self::Error> {
match pb {
proto::BaseBackupCompression::Unknown => Err(ProtocolError::invalid("compression", pb)),
proto::BaseBackupCompression::None => Ok(Self::None),
proto::BaseBackupCompression::Gzip => Ok(Self::Gzip),
}
}
}
impl TryFrom<i32> for BaseBackupCompression {
type Error = ProtocolError;
fn try_from(compression: i32) -> Result<Self, Self::Error> {
proto::BaseBackupCompression::try_from(compression)
.map_err(|_| ProtocolError::invalid("compression", compression))
.and_then(Self::try_from)
}
}
impl From<BaseBackupCompression> for proto::BaseBackupCompression {
fn from(compression: BaseBackupCompression) -> Self {
match compression {
BaseBackupCompression::None => Self::None,
BaseBackupCompression::Gzip => Self::Gzip,
}
}
}
impl From<BaseBackupCompression> for i32 {
fn from(compression: BaseBackupCompression) -> Self {
proto::BaseBackupCompression::from(compression).into()
}
}
pub type GetBaseBackupResponseChunk = Bytes;
impl TryFrom<proto::GetBaseBackupResponseChunk> for GetBaseBackupResponseChunk {
type Error = ProtocolError;
fn try_from(pb: proto::GetBaseBackupResponseChunk) -> Result<Self, Self::Error> {
if pb.chunk.is_empty() {
return Err(ProtocolError::Missing("chunk"));
}
Ok(pb.chunk)
}
}
impl From<GetBaseBackupResponseChunk> for proto::GetBaseBackupResponseChunk {
fn from(chunk: GetBaseBackupResponseChunk) -> Self {
Self { chunk }
}
}
/// Requests the size of a database, as # of bytes. Only valid on shard 0, other shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetDbSizeRequest {
pub read_lsn: ReadLsn,
pub db_oid: Oid,
}
impl TryFrom<proto::GetDbSizeRequest> for GetDbSizeRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetDbSizeRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
db_oid: pb.db_oid,
})
}
}
impl From<GetDbSizeRequest> for proto::GetDbSizeRequest {
fn from(request: GetDbSizeRequest) -> Self {
Self {
read_lsn: Some(request.read_lsn.into()),
db_oid: request.db_oid,
}
}
}
pub type GetDbSizeResponse = u64;
impl From<proto::GetDbSizeResponse> for GetDbSizeResponse {
fn from(pb: proto::GetDbSizeResponse) -> Self {
pb.num_bytes
}
}
impl From<GetDbSizeResponse> for proto::GetDbSizeResponse {
fn from(num_bytes: GetDbSizeResponse) -> Self {
Self { num_bytes }
}
}
/// Requests one or more pages.
#[derive(Clone, Debug, Default)]
pub struct GetPageRequest {
/// A request ID. Will be included in the response. Should be unique for in-flight requests on
/// the stream.
pub request_id: RequestID,
/// The request class.
pub request_class: GetPageClass,
/// The LSN to read at.
pub read_lsn: ReadLsn,
/// The relation to read from.
pub rel: RelTag,
/// Page numbers to read. Must belong to the remote shard.
///
/// Multiple pages will be executed as a single batch by the Pageserver, amortizing layer access
/// costs and parallelizing them. This may increase the latency of any individual request, but
/// improves the overall latency and throughput of the batch as a whole.
pub block_numbers: Vec<u32>,
}
impl TryFrom<proto::GetPageRequest> for GetPageRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetPageRequest) -> Result<Self, Self::Error> {
if pb.block_number.is_empty() {
return Err(ProtocolError::Missing("block_number"));
}
Ok(Self {
request_id: pb
.request_id
.ok_or(ProtocolError::Missing("request_id"))?
.into(),
request_class: pb.request_class.into(),
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
block_numbers: pb.block_number,
})
}
}
impl From<GetPageRequest> for proto::GetPageRequest {
fn from(request: GetPageRequest) -> Self {
Self {
request_id: Some(request.request_id.into()),
request_class: request.request_class.into(),
read_lsn: Some(request.read_lsn.into()),
rel: Some(request.rel.into()),
block_number: request.block_numbers,
}
}
}
/// A GetPage request ID and retry attempt. Should be unique for in-flight requests on a stream.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RequestID {
/// The base request ID.
pub id: u64,
// The request attempt. Starts at 0, incremented on each retry.
pub attempt: u32,
}
impl RequestID {
/// Creates a new RequestID with the given ID and an initial attempt of 0.
pub fn new(id: u64) -> Self {
Self { id, attempt: 0 }
}
}
impl Display for RequestID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.id, self.attempt)
}
}
impl From<proto::RequestId> for RequestID {
fn from(pb: proto::RequestId) -> Self {
Self {
id: pb.id,
attempt: pb.attempt,
}
}
}
impl From<u64> for RequestID {
fn from(id: u64) -> Self {
Self::new(id)
}
}
impl From<RequestID> for proto::RequestId {
fn from(request_id: RequestID) -> Self {
Self {
id: request_id.id,
attempt: request_id.attempt,
}
}
}
/// A GetPage request class.
#[derive(Clone, Copy, Debug, Default, strum_macros::Display)]
pub enum GetPageClass {
/// Unknown class. For backwards compatibility: used when an older client version sends a class
/// that a newer server version has removed.
Unknown,
/// A normal request. This is the default.
#[default]
Normal,
/// A prefetch request. NB: can only be classified on pg < 18.
Prefetch,
/// A background request (e.g. vacuum).
Background,
}
impl From<proto::GetPageClass> for GetPageClass {
fn from(pb: proto::GetPageClass) -> Self {
match pb {
proto::GetPageClass::Unknown => Self::Unknown,
proto::GetPageClass::Normal => Self::Normal,
proto::GetPageClass::Prefetch => Self::Prefetch,
proto::GetPageClass::Background => Self::Background,
}
}
}
impl From<i32> for GetPageClass {
fn from(class: i32) -> Self {
proto::GetPageClass::try_from(class)
.unwrap_or(proto::GetPageClass::Unknown)
.into()
}
}
impl From<GetPageClass> for proto::GetPageClass {
fn from(class: GetPageClass) -> Self {
match class {
GetPageClass::Unknown => Self::Unknown,
GetPageClass::Normal => Self::Normal,
GetPageClass::Prefetch => Self::Prefetch,
GetPageClass::Background => Self::Background,
}
}
}
impl From<GetPageClass> for i32 {
fn from(class: GetPageClass) -> Self {
proto::GetPageClass::from(class).into()
}
}
/// A GetPage response.
///
/// A batch response will contain all of the requested pages. We could eagerly emit individual pages
/// as soon as they are ready, but on a readv() Postgres holds buffer pool locks on all pages in the
/// batch and we'll only return once the entire batch is ready, so no one can make use of the
/// individual pages.
#[derive(Clone, Debug)]
pub struct GetPageResponse {
/// The original request's ID.
pub request_id: RequestID,
/// The response status code. If not OK, the `rel` and `pages` fields will be empty.
pub status_code: GetPageStatusCode,
/// A string describing the status, if any.
pub reason: Option<String>,
/// The relation that the pages belong to.
pub rel: RelTag,
// The page(s), in the same order as the request.
pub pages: Vec<Page>,
}
impl TryFrom<proto::GetPageResponse> for GetPageResponse {
type Error = ProtocolError;
fn try_from(pb: proto::GetPageResponse) -> Result<Self, ProtocolError> {
Ok(Self {
request_id: pb
.request_id
.ok_or(ProtocolError::Missing("request_id"))?
.into(),
status_code: pb.status_code.into(),
reason: Some(pb.reason).filter(|r| !r.is_empty()),
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
pages: pb.page.into_iter().map(Page::from).collect(),
})
}
}
impl From<GetPageResponse> for proto::GetPageResponse {
fn from(response: GetPageResponse) -> Self {
Self {
request_id: Some(response.request_id.into()),
status_code: response.status_code.into(),
reason: response.reason.unwrap_or_default(),
rel: Some(response.rel.into()),
page: response.pages.into_iter().map(proto::Page::from).collect(),
}
}
}
impl GetPageResponse {
/// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a
/// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a
/// GetPageResponse with a non-OK status code instead.
#[allow(clippy::result_large_err)]
pub fn try_from_status(
status: tonic::Status,
request_id: RequestID,
) -> Result<Self, tonic::Status> {
// We shouldn't see an OK status here, because we're emitting an error.
debug_assert_ne!(status.code(), tonic::Code::Ok);
if status.code() == tonic::Code::Ok {
return Err(tonic::Status::internal(format!(
"unexpected OK status: {status:?}",
)));
}
// If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request
// error and we should return a tonic::Status to terminate the stream.
let Ok(status_code) = status.code().try_into() else {
return Err(status);
};
// Return a GetPageResponse for the status.
Ok(Self {
request_id,
status_code,
reason: Some(status.message().to_string()),
rel: RelTag::default(),
pages: Vec::new(),
})
}
}
// A page.
#[derive(Clone, Debug)]
pub struct Page {
/// The page number.
pub block_number: u32,
/// The materialized page image, as an 8KB byte vector.
pub image: Bytes,
}
impl From<proto::Page> for Page {
fn from(pb: proto::Page) -> Self {
Self {
block_number: pb.block_number,
image: pb.image,
}
}
}
impl From<Page> for proto::Page {
fn from(page: Page) -> Self {
Self {
block_number: page.block_number,
image: page.image,
}
}
}
/// A GetPage response status code.
///
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
/// (potentially shared by many backends), and a gRPC status response would terminate the stream so
/// we send GetPageResponse messages with these codes instead.
#[derive(Clone, Copy, Debug, PartialEq, strum_macros::Display)]
pub enum GetPageStatusCode {
/// Unknown status. For forwards compatibility: used when an older client version receives a new
/// status code from a newer server version.
Unknown,
/// The request was successful.
Ok,
/// The page did not exist. The tenant/timeline/shard has already been validated during stream
/// setup.
NotFound,
/// The request was invalid.
InvalidRequest,
/// The request failed due to an internal server error.
InternalError,
/// The tenant is rate limited. Slow down and retry later.
SlowDown,
}
impl From<proto::GetPageStatusCode> for GetPageStatusCode {
fn from(pb: proto::GetPageStatusCode) -> Self {
match pb {
proto::GetPageStatusCode::Unknown => Self::Unknown,
proto::GetPageStatusCode::Ok => Self::Ok,
proto::GetPageStatusCode::NotFound => Self::NotFound,
proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
proto::GetPageStatusCode::InternalError => Self::InternalError,
proto::GetPageStatusCode::SlowDown => Self::SlowDown,
}
}
}
impl From<i32> for GetPageStatusCode {
fn from(status_code: i32) -> Self {
proto::GetPageStatusCode::try_from(status_code)
.unwrap_or(proto::GetPageStatusCode::Unknown)
.into()
}
}
impl From<GetPageStatusCode> for proto::GetPageStatusCode {
fn from(status_code: GetPageStatusCode) -> Self {
match status_code {
GetPageStatusCode::Unknown => Self::Unknown,
GetPageStatusCode::Ok => Self::Ok,
GetPageStatusCode::NotFound => Self::NotFound,
GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
GetPageStatusCode::InternalError => Self::InternalError,
GetPageStatusCode::SlowDown => Self::SlowDown,
}
}
}
impl From<GetPageStatusCode> for i32 {
fn from(status_code: GetPageStatusCode) -> Self {
proto::GetPageStatusCode::from(status_code).into()
}
}
impl TryFrom<tonic::Code> for GetPageStatusCode {
type Error = tonic::Code;
fn try_from(code: tonic::Code) -> Result<Self, Self::Error> {
use tonic::Code;
let status_code = match code {
Code::Ok => Self::Ok,
// These are per-request errors, which should be returned as GetPageResponses.
Code::AlreadyExists => Self::InvalidRequest,
Code::DataLoss => Self::InternalError,
Code::FailedPrecondition => Self::InvalidRequest,
Code::InvalidArgument => Self::InvalidRequest,
Code::Internal => Self::InternalError,
Code::NotFound => Self::NotFound,
Code::OutOfRange => Self::InvalidRequest,
Code::ResourceExhausted => Self::SlowDown,
// These should terminate the stream by returning a tonic::Status.
Code::Aborted
| Code::Cancelled
| Code::DeadlineExceeded
| Code::PermissionDenied
| Code::Unauthenticated
| Code::Unavailable
| Code::Unimplemented
| Code::Unknown => return Err(code),
};
Ok(status_code)
}
}
impl From<GetPageStatusCode> for tonic::Code {
fn from(status_code: GetPageStatusCode) -> Self {
use tonic::Code;
match status_code {
GetPageStatusCode::Unknown => Code::Unknown,
GetPageStatusCode::Ok => Code::Ok,
GetPageStatusCode::NotFound => Code::NotFound,
GetPageStatusCode::InvalidRequest => Code::InvalidArgument,
GetPageStatusCode::InternalError => Code::Internal,
GetPageStatusCode::SlowDown => Code::ResourceExhausted,
}
}
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
// shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetRelSizeRequest {
pub read_lsn: ReadLsn,
pub rel: RelTag,
/// If true, return missing=true for missing relations instead of a NotFound error.
pub allow_missing: bool,
}
impl TryFrom<proto::GetRelSizeRequest> for GetRelSizeRequest {
type Error = ProtocolError;
fn try_from(proto: proto::GetRelSizeRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: proto
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
rel: proto.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
allow_missing: proto.allow_missing,
})
}
}
impl From<GetRelSizeRequest> for proto::GetRelSizeRequest {
fn from(request: GetRelSizeRequest) -> Self {
Self {
read_lsn: Some(request.read_lsn.into()),
rel: Some(request.rel.into()),
allow_missing: request.allow_missing,
}
}
}
/// The size of a relation as number of blocks, or None if `allow_missing=true` and the relation
/// does not exist.
///
/// INVARIANT: never None if `allow_missing=false` (returns `NotFound` error instead).
pub type GetRelSizeResponse = Option<u32>;
impl From<proto::GetRelSizeResponse> for GetRelSizeResponse {
fn from(pb: proto::GetRelSizeResponse) -> Self {
(!pb.missing).then_some(pb.num_blocks)
}
}
impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
fn from(resp: GetRelSizeResponse) -> Self {
Self {
num_blocks: resp.unwrap_or_default(),
missing: resp.is_none(),
}
}
}
/// Requests an SLRU segment. Only valid on shard 0, other shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetSlruSegmentRequest {
pub read_lsn: ReadLsn,
pub kind: SlruKind,
pub segno: u32,
}
impl TryFrom<proto::GetSlruSegmentRequest> for GetSlruSegmentRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetSlruSegmentRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
kind: u8::try_from(pb.kind)
.ok()
.and_then(SlruKind::from_repr)
.ok_or_else(|| ProtocolError::invalid("slru_kind", pb.kind))?,
segno: pb.segno,
})
}
}
impl From<GetSlruSegmentRequest> for proto::GetSlruSegmentRequest {
fn from(request: GetSlruSegmentRequest) -> Self {
Self {
read_lsn: Some(request.read_lsn.into()),
kind: request.kind as u32,
segno: request.segno,
}
}
}
pub type GetSlruSegmentResponse = Bytes;
impl TryFrom<proto::GetSlruSegmentResponse> for GetSlruSegmentResponse {
type Error = ProtocolError;
fn try_from(pb: proto::GetSlruSegmentResponse) -> Result<Self, Self::Error> {
if pb.segment.is_empty() {
return Err(ProtocolError::Missing("segment"));
}
Ok(pb.segment)
}
}
impl From<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
fn from(segment: GetSlruSegmentResponse) -> Self {
Self { segment }
}
}
// SlruKind is defined in pageserver_api::reltag.
pub type SlruKind = pageserver_api::reltag::SlruKind;
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage
/// collect the LSN until the lease expires.
pub struct LeaseLsnRequest {
/// The LSN to lease.
pub lsn: Lsn,
}
impl TryFrom<proto::LeaseLsnRequest> for LeaseLsnRequest {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseLsnRequest) -> Result<Self, Self::Error> {
if pb.lsn == 0 {
return Err(ProtocolError::Missing("lsn"));
}
Ok(Self { lsn: Lsn(pb.lsn) })
}
}
impl From<LeaseLsnRequest> for proto::LeaseLsnRequest {
fn from(request: LeaseLsnRequest) -> Self {
Self { lsn: request.lsn.0 }
}
}
/// Lease expiration time. If the lease could not be granted because the LSN has already been
/// garbage collected, a FailedPrecondition status will be returned instead.
pub type LeaseLsnResponse = SystemTime;
impl TryFrom<proto::LeaseLsnResponse> for LeaseLsnResponse {
type Error = ProtocolError;
fn try_from(pb: proto::LeaseLsnResponse) -> Result<Self, Self::Error> {
let expires = pb.expires.ok_or(ProtocolError::Missing("expires"))?;
UNIX_EPOCH
.checked_add(Duration::new(expires.seconds as u64, expires.nanos as u32))
.ok_or_else(|| ProtocolError::invalid("expires", expires))
}
}
impl From<LeaseLsnResponse> for proto::LeaseLsnResponse {
fn from(response: LeaseLsnResponse) -> Self {
let expires = response.duration_since(UNIX_EPOCH).unwrap_or_default();
Self {
expires: Some(prost_types::Timestamp {
seconds: expires.as_secs() as i64,
nanos: expires.subsec_nanos() as i32,
}),
}
}
}