pageserver: initial gRPC page service implementation (#12094)

## Problem

We should expose the page service over gRPC.

Requires #12093.
Touches #11728.

## Summary of changes

This patch adds an initial page service implementation over gRPC. It
ties in with the existing `PageServerHandler` request logic, to avoid
the implementations drifting apart for the core read path.

This is just a bare-bones functional implementation. Several important
aspects have been omitted, and will be addressed in follow-up PRs:

* Limited observability: minimal tracing, no logging, limited metrics
and timing, etc.
* Rate limiting will currently block.
* No performance optimization.
* No cancellation handling.
* No tests.

I've only done rudimentary testing of this, but Pagebench passes at
least.
This commit is contained in:
Erik Grinaker
2025-06-02 19:15:18 +02:00
committed by GitHub
parent 781bf4945d
commit fc3994eb71
7 changed files with 723 additions and 170 deletions

View File

@@ -1934,7 +1934,7 @@ pub enum PagestreamFeMessage {
}
// Wrapped in libpq CopyData
#[derive(strum_macros::EnumProperty)]
#[derive(Debug, strum_macros::EnumProperty)]
pub enum PagestreamBeMessage {
Exists(PagestreamExistsResponse),
Nblocks(PagestreamNblocksResponse),

View File

@@ -184,12 +184,12 @@ pub enum SlruKind {
MultiXactOffsets,
}
impl SlruKind {
pub fn to_str(&self) -> &'static str {
impl fmt::Display for SlruKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Clog => "pg_xact",
Self::MultiXactMembers => "pg_multixact/members",
Self::MultiXactOffsets => "pg_multixact/offsets",
Self::Clog => write!(f, "pg_xact"),
Self::MultiXactMembers => write!(f, "pg_multixact/members"),
Self::MultiXactOffsets => write!(f, "pg_multixact/offsets"),
}
}
}

View File

@@ -10,6 +10,8 @@
//!
//! - Validate protocol invariants, via try_from() and try_into().
use std::fmt::Display;
use bytes::Bytes;
use postgres_ffi::Oid;
use smallvec::SmallVec;
@@ -48,7 +50,8 @@ pub struct ReadLsn {
pub request_lsn: Lsn,
/// If given, the caller guarantees that the page has not been modified since this LSN. Must be
/// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
/// without waiting for the request LSN to arrive. Valid for all request types.
/// without waiting for the request LSN to arrive. If not given, the request will read at the
/// request_lsn and wait for it to arrive if necessary. Valid for all request types.
///
/// It is undefined behaviour to make a request such that the page was, in fact, modified
/// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
@@ -58,6 +61,17 @@ pub struct ReadLsn {
pub not_modified_since_lsn: Option<Lsn>,
}
impl Display for ReadLsn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let req_lsn = self.request_lsn;
if let Some(mod_lsn) = self.not_modified_since_lsn {
write!(f, "{req_lsn}>={mod_lsn}")
} else {
req_lsn.fmt(f)
}
}
}
impl ReadLsn {
/// Validates the ReadLsn.
pub fn validate(&self) -> Result<(), ProtocolError> {
@@ -584,6 +598,7 @@ impl TryFrom<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
type Error = ProtocolError;
fn try_from(segment: GetSlruSegmentResponse) -> Result<Self, Self::Error> {
// TODO: can a segment legitimately be empty?
if segment.is_empty() {
return Err(ProtocolError::Missing("segment"));
}

View File

@@ -65,6 +65,30 @@ impl From<GetVectoredError> for BasebackupError {
}
}
impl From<BasebackupError> for postgres_backend::QueryError {
fn from(err: BasebackupError) -> Self {
use postgres_backend::QueryError;
use pq_proto::framed::ConnectionError;
match err {
BasebackupError::Client(err, _) => QueryError::Disconnected(ConnectionError::Io(err)),
BasebackupError::Server(err) => QueryError::Other(err),
BasebackupError::Shutdown => QueryError::Shutdown,
}
}
}
impl From<BasebackupError> for tonic::Status {
fn from(err: BasebackupError) -> Self {
use tonic::Code;
let code = match &err {
BasebackupError::Client(_, _) => Code::Cancelled,
BasebackupError::Server(_) => Code::Internal,
BasebackupError::Shutdown => Code::Unavailable,
};
tonic::Status::new(code, err.to_string())
}
}
/// Create basebackup with non-rel data in it.
/// Only include relational data if 'full_backup' is true.
///
@@ -248,7 +272,7 @@ where
async fn flush(&mut self) -> Result<(), BasebackupError> {
let nblocks = self.buf.len() / BLCKSZ as usize;
let (kind, segno) = self.current_segment.take().unwrap();
let segname = format!("{}/{:>04X}", kind.to_str(), segno);
let segname = format!("{kind}/{segno:>04X}");
let header = new_tar_header(&segname, self.buf.len() as u64)?;
self.ar
.append(&header, self.buf.as_slice())

View File

@@ -804,7 +804,7 @@ fn start_pageserver(
} else {
None
},
basebackup_cache.clone(),
basebackup_cache,
);
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
@@ -816,12 +816,10 @@ fn start_pageserver(
let mut page_service_grpc = None;
if let Some(grpc_listener) = grpc_listener {
page_service_grpc = Some(page_service::spawn_grpc(
conf,
tenant_manager.clone(),
grpc_auth,
otel_guard.as_ref().map(|g| g.dispatch.clone()),
grpc_listener,
basebackup_cache,
)?);
}

File diff suppressed because it is too large Load Diff

View File

@@ -950,6 +950,18 @@ pub(crate) enum WaitLsnError {
Timeout(String),
}
impl From<WaitLsnError> for tonic::Status {
fn from(err: WaitLsnError) -> Self {
use tonic::Code;
let code = match &err {
WaitLsnError::Timeout(_) => Code::Internal,
WaitLsnError::BadState(_) => Code::Internal,
WaitLsnError::Shutdown => Code::Unavailable,
};
tonic::Status::new(code, err.to_string())
}
}
// The impls below achieve cancellation mapping for errors.
// Perhaps there's a way of achieving this with less cruft.