pageserver: tighten up gRPC page_api::Client (#12396)

This patch tightens up `page_api::Client`. It's mostly superficial
changes, but also adds a new constructor that takes an existing gRPC
channel, for use with the communicator connection pool.
This commit is contained in:
Erik Grinaker
2025-07-08 20:15:13 +02:00
committed by GitHub
parent a06c560ad0
commit 81e7218c27
5 changed files with 159 additions and 176 deletions

View File

@@ -1057,7 +1057,7 @@ impl ComputeNode {
};
let (reader, connected) = tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::new(
let mut client = page_api::Client::connect(
shard0_connstr,
spec.tenant_id,
spec.timeline_id,

View File

@@ -192,7 +192,7 @@ fn acquire_lsn_lease_grpc(
lsn: Lsn,
) -> Result<Option<SystemTime>> {
tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::new(
let mut client = page_api::Client::connect(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,

View File

@@ -1,23 +1,151 @@
use anyhow::Result;
use anyhow::Context as _;
use futures::{Stream, StreamExt as _, TryStreamExt as _};
use tokio::io::AsyncRead;
use tokio_util::io::StreamReader;
use tonic::codec::CompressionEncoding;
use tonic::metadata::AsciiMetadataValue;
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::transport::Channel;
use tonic::{Request, Streaming};
use tonic::service::Interceptor;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::{Channel, Endpoint};
use utils::id::TenantId;
use utils::id::TimelineId;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
use crate::model;
use crate::model::*;
use crate::proto;
/// A basic Pageserver gRPC client, for a single tenant shard. This API uses native Rust domain
/// types from `model` rather than generated Protobuf types.
pub struct Client {
inner: proto::PageServiceClient<InterceptedService<Channel, AuthInterceptor>>,
}
impl Client {
/// Connects to the given gRPC endpoint.
pub async fn connect<E>(
endpoint: E,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self>
where
E: TryInto<Endpoint> + Send + Sync + 'static,
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
{
let endpoint: Endpoint = endpoint.try_into().context("invalid endpoint")?;
let channel = endpoint.connect().await?;
Self::new(
channel,
tenant_id,
timeline_id,
shard_id,
auth_token,
compression,
)
}
/// Creates a new client using the given gRPC channel.
pub fn new(
channel: Channel,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
let auth = AuthInterceptor::new(tenant_id, timeline_id, shard_id, auth_token)?;
let mut inner = proto::PageServiceClient::with_interceptor(channel, auth);
if let Some(compression) = compression {
// TODO: benchmark this (including network latency).
inner = inner
.accept_compressed(compression)
.send_compressed(compression);
}
Ok(Self { inner })
}
/// Returns whether a relation exists.
pub async fn check_rel_exists(
&mut self,
req: CheckRelExistsRequest,
) -> tonic::Result<CheckRelExistsResponse> {
let req = proto::CheckRelExistsRequest::from(req);
let resp = self.inner.check_rel_exists(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches a base backup.
pub async fn get_base_backup(
&mut self,
req: GetBaseBackupRequest,
) -> tonic::Result<impl AsyncRead + use<>> {
let req = proto::GetBaseBackupRequest::from(req);
let chunks = self.inner.get_base_backup(req).await?.into_inner();
Ok(StreamReader::new(
chunks
.map_ok(|resp| resp.chunk)
.map_err(std::io::Error::other),
))
}
/// Returns the total size of a database, as # of bytes.
pub async fn get_db_size(&mut self, req: GetDbSizeRequest) -> tonic::Result<GetDbSizeResponse> {
let req = proto::GetDbSizeRequest::from(req);
let resp = self.inner.get_db_size(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches pages.
///
/// AuthInterceptor adds tenant, timeline, and auth header to the channel. These
/// headers are required at the pageserver.
/// This is implemented as a bidirectional streaming RPC for performance. Per-request errors are
/// typically returned as status_code instead of errors, to avoid tearing down the entire stream
/// via a tonic::Status error.
pub async fn get_pages(
&mut self,
reqs: impl Stream<Item = GetPageRequest> + Send + 'static,
) -> tonic::Result<impl Stream<Item = tonic::Result<GetPageResponse>> + Send + 'static> {
let reqs = reqs.map(proto::GetPageRequest::from);
let resps = self.inner.get_pages(reqs).await?.into_inner();
Ok(resps.map_ok(GetPageResponse::from))
}
/// Returns the size of a relation, as # of blocks.
pub async fn get_rel_size(
&mut self,
req: GetRelSizeRequest,
) -> tonic::Result<GetRelSizeResponse> {
let req = proto::GetRelSizeRequest::from(req);
let resp = self.inner.get_rel_size(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches an SLRU segment.
pub async fn get_slru_segment(
&mut self,
req: GetSlruSegmentRequest,
) -> tonic::Result<GetSlruSegmentResponse> {
let req = proto::GetSlruSegmentRequest::from(req);
let resp = self.inner.get_slru_segment(req).await?.into_inner();
Ok(resp.try_into()?)
}
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
/// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
///
/// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
/// acquired because the LSN has already been garbage collected.
pub async fn lease_lsn(&mut self, req: LeaseLsnRequest) -> tonic::Result<LeaseLsnResponse> {
let req = proto::LeaseLsnRequest::from(req);
let resp = self.inner.lease_lsn(req).await?.into_inner();
Ok(resp.try_into()?)
}
}
/// Adds authentication metadata to gRPC requests.
#[derive(Clone)]
struct AuthInterceptor {
tenant_id: AsciiMetadataValue,
@@ -30,174 +158,29 @@ impl AuthInterceptor {
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
auth_token: Option<String>,
shard_id: ShardIndex,
) -> Result<Self, InvalidMetadataValue> {
let tenant_ascii: AsciiMetadataValue = tenant_id.to_string().try_into()?;
let timeline_ascii: AsciiMetadataValue = timeline_id.to_string().try_into()?;
let shard_ascii: AsciiMetadataValue = shard_id.to_string().try_into()?;
let auth_header: Option<AsciiMetadataValue> = match auth_token {
Some(token) => Some(format!("Bearer {token}").try_into()?),
None => None,
};
auth_token: Option<String>,
) -> anyhow::Result<Self> {
Ok(Self {
tenant_id: tenant_ascii,
shard_id: shard_ascii,
timeline_id: timeline_ascii,
auth_header,
tenant_id: tenant_id.to_string().try_into()?,
timeline_id: timeline_id.to_string().try_into()?,
shard_id: shard_id.to_string().try_into()?,
auth_header: auth_token
.map(|token| format!("Bearer {token}").try_into())
.transpose()?,
})
}
}
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
req.metadata_mut()
.insert("neon-shard-id", self.shard_id.clone());
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());
if let Some(auth_header) = &self.auth_header {
req.metadata_mut()
.insert("authorization", auth_header.clone());
impl Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> tonic::Result<tonic::Request<()>> {
let metadata = req.metadata_mut();
metadata.insert("neon-tenant-id", self.tenant_id.clone());
metadata.insert("neon-timeline-id", self.timeline_id.clone());
metadata.insert("neon-shard-id", self.shard_id.clone());
if let Some(ref auth_header) = self.auth_header {
metadata.insert("authorization", auth_header.clone());
}
Ok(req)
}
}
#[derive(Clone)]
pub struct Client {
client: proto::PageServiceClient<
tonic::service::interceptor::InterceptedService<Channel, AuthInterceptor>,
>,
}
impl Client {
pub async fn new<T: TryInto<tonic::transport::Endpoint> + Send + Sync + 'static>(
into_endpoint: T,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_header: Option<String>,
compression: Option<tonic::codec::CompressionEncoding>,
) -> anyhow::Result<Self> {
let endpoint: tonic::transport::Endpoint = into_endpoint
.try_into()
.map_err(|_e| anyhow::anyhow!("failed to convert endpoint"))?;
let channel = endpoint.connect().await?;
let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id)
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
let mut client = proto::PageServiceClient::with_interceptor(channel, auth);
if let Some(compression) = compression {
// TODO: benchmark this (including network latency).
client = client
.accept_compressed(compression)
.send_compressed(compression);
}
Ok(Self { client })
}
/// Returns whether a relation exists.
pub async fn check_rel_exists(
&mut self,
req: model::CheckRelExistsRequest,
) -> Result<model::CheckRelExistsResponse, tonic::Status> {
let proto_req = proto::CheckRelExistsRequest::from(req);
let response = self.client.check_rel_exists(proto_req).await?;
let proto_resp = response.into_inner();
Ok(proto_resp.into())
}
/// Fetches a base backup.
pub async fn get_base_backup(
&mut self,
req: model::GetBaseBackupRequest,
) -> Result<impl AsyncRead + use<>, tonic::Status> {
let req = proto::GetBaseBackupRequest::from(req);
let chunks = self.client.get_base_backup(req).await?.into_inner();
let reader = StreamReader::new(
chunks
.map_ok(|resp| resp.chunk)
.map_err(std::io::Error::other),
);
Ok(reader)
}
/// Returns the total size of a database, as # of bytes.
pub async fn get_db_size(
&mut self,
req: model::GetDbSizeRequest,
) -> Result<u64, tonic::Status> {
let proto_req = proto::GetDbSizeRequest::from(req);
let response = self.client.get_db_size(proto_req).await?;
Ok(response.into_inner().into())
}
/// Fetches pages.
///
/// This is implemented as a bidirectional streaming RPC for performance.
/// Per-request errors are often returned as status_code instead of errors,
/// to avoid tearing down the entire stream via tonic::Status.
pub async fn get_pages<ReqSt>(
&mut self,
inbound: ReqSt,
) -> Result<
impl Stream<Item = Result<model::GetPageResponse, tonic::Status>> + Send + 'static,
tonic::Status,
>
where
ReqSt: Stream<Item = model::GetPageRequest> + Send + 'static,
{
let outbound_proto = inbound.map(|domain_req| domain_req.into());
let req_new = Request::new(outbound_proto);
let response_stream: Streaming<proto::GetPageResponse> =
self.client.get_pages(req_new).await?.into_inner();
let domain_stream = response_stream.map_ok(model::GetPageResponse::from);
Ok(domain_stream)
}
/// Returns the size of a relation, as # of blocks.
pub async fn get_rel_size(
&mut self,
req: model::GetRelSizeRequest,
) -> Result<model::GetRelSizeResponse, tonic::Status> {
let proto_req = proto::GetRelSizeRequest::from(req);
let response = self.client.get_rel_size(proto_req).await?;
let proto_resp = response.into_inner();
Ok(proto_resp.into())
}
/// Fetches an SLRU segment.
pub async fn get_slru_segment(
&mut self,
req: model::GetSlruSegmentRequest,
) -> Result<model::GetSlruSegmentResponse, tonic::Status> {
let proto_req = proto::GetSlruSegmentRequest::from(req);
let response = self.client.get_slru_segment(proto_req).await?;
Ok(response.into_inner().try_into()?)
}
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
/// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
///
/// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
/// acquired because the LSN has already been garbage collected.
pub async fn lease_lsn(
&mut self,
req: model::LeaseLsnRequest,
) -> Result<model::LeaseLsnResponse, tonic::Status> {
let req = proto::LeaseLsnRequest::from(req);
Ok(self.client.lease_lsn(req).await?.into_inner().try_into()?)
}
}

View File

@@ -326,7 +326,7 @@ impl GrpcClient {
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let inner = page_api::Client::new(
let inner = page_api::Client::connect(
connstring.to_string(),
ttid.tenant_id,
ttid.timeline_id,

View File

@@ -625,7 +625,7 @@ impl GrpcClient {
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let mut client = page_api::Client::new(
let mut client = page_api::Client::connect(
connstring.to_string(),
ttid.tenant_id,
ttid.timeline_id,