mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-25 23:29:59 +00:00
Domain client for Pageserver GRPC. (#12111)
Add domain client for new communicator GRPC types.
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -4465,11 +4465,14 @@ dependencies = [
|
|||||||
name = "pageserver_page_api"
|
name = "pageserver_page_api"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"futures",
|
||||||
"pageserver_api",
|
"pageserver_api",
|
||||||
"postgres_ffi",
|
"postgres_ffi",
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"thiserror 1.0.69",
|
"thiserror 1.0.69",
|
||||||
|
"tokio",
|
||||||
"tonic 0.13.1",
|
"tonic 0.13.1",
|
||||||
"tonic-build",
|
"tonic-build",
|
||||||
"utils",
|
"utils",
|
||||||
|
|||||||
@@ -5,11 +5,14 @@ edition.workspace = true
|
|||||||
license.workspace = true
|
license.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow.workspace = true
|
||||||
bytes.workspace = true
|
bytes.workspace = true
|
||||||
|
futures.workspace = true
|
||||||
pageserver_api.workspace = true
|
pageserver_api.workspace = true
|
||||||
postgres_ffi.workspace = true
|
postgres_ffi.workspace = true
|
||||||
prost.workspace = true
|
prost.workspace = true
|
||||||
thiserror.workspace = true
|
thiserror.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
tonic.workspace = true
|
tonic.workspace = true
|
||||||
utils.workspace = true
|
utils.workspace = true
|
||||||
workspace_hack.workspace = true
|
workspace_hack.workspace = true
|
||||||
|
|||||||
191
pageserver/page_api/src/client.rs
Normal file
191
pageserver/page_api/src/client.rs
Normal file
@@ -0,0 +1,191 @@
|
|||||||
|
use std::convert::TryInto;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::TryStreamExt;
|
||||||
|
use futures::{Stream, StreamExt};
|
||||||
|
use tonic::metadata::AsciiMetadataValue;
|
||||||
|
use tonic::metadata::errors::InvalidMetadataValue;
|
||||||
|
use tonic::transport::Channel;
|
||||||
|
use tonic::{Request, Streaming};
|
||||||
|
|
||||||
|
use utils::id::TenantId;
|
||||||
|
use utils::id::TimelineId;
|
||||||
|
use utils::shard::ShardIndex;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
use crate::model;
|
||||||
|
use crate::proto;
|
||||||
|
|
||||||
|
///
|
||||||
|
/// AuthInterceptor adds tenant, timeline, and auth header to the channel. These
|
||||||
|
/// headers are required at the pageserver.
|
||||||
|
///
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct AuthInterceptor {
|
||||||
|
tenant_id: AsciiMetadataValue,
|
||||||
|
timeline_id: AsciiMetadataValue,
|
||||||
|
shard_id: AsciiMetadataValue,
|
||||||
|
auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AuthInterceptor {
|
||||||
|
fn new(
|
||||||
|
tenant_id: TenantId,
|
||||||
|
timeline_id: TimelineId,
|
||||||
|
auth_token: Option<String>,
|
||||||
|
shard_id: ShardIndex,
|
||||||
|
) -> Result<Self, InvalidMetadataValue> {
|
||||||
|
let tenant_ascii: AsciiMetadataValue = tenant_id.to_string().try_into()?;
|
||||||
|
let timeline_ascii: AsciiMetadataValue = timeline_id.to_string().try_into()?;
|
||||||
|
let shard_ascii: AsciiMetadataValue = shard_id.to_string().try_into()?;
|
||||||
|
|
||||||
|
let auth_header: Option<AsciiMetadataValue> = match auth_token {
|
||||||
|
Some(token) => Some(format!("Bearer {token}").try_into()?),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
tenant_id: tenant_ascii,
|
||||||
|
shard_id: shard_ascii,
|
||||||
|
timeline_id: timeline_ascii,
|
||||||
|
auth_header,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl tonic::service::Interceptor for AuthInterceptor {
|
||||||
|
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||||
|
req.metadata_mut()
|
||||||
|
.insert("neon-tenant-id", self.tenant_id.clone());
|
||||||
|
req.metadata_mut()
|
||||||
|
.insert("neon-shard-id", self.shard_id.clone());
|
||||||
|
req.metadata_mut()
|
||||||
|
.insert("neon-timeline-id", self.timeline_id.clone());
|
||||||
|
if let Some(auth_header) = &self.auth_header {
|
||||||
|
req.metadata_mut()
|
||||||
|
.insert("authorization", auth_header.clone());
|
||||||
|
}
|
||||||
|
Ok(req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Client {
|
||||||
|
client: proto::PageServiceClient<
|
||||||
|
tonic::service::interceptor::InterceptedService<Channel, AuthInterceptor>,
|
||||||
|
>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
pub async fn new<T: TryInto<tonic::transport::Endpoint> + Send + Sync + 'static>(
|
||||||
|
into_endpoint: T,
|
||||||
|
tenant_id: TenantId,
|
||||||
|
timeline_id: TimelineId,
|
||||||
|
shard_id: ShardIndex,
|
||||||
|
auth_header: Option<String>,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
|
let endpoint: tonic::transport::Endpoint = into_endpoint
|
||||||
|
.try_into()
|
||||||
|
.map_err(|_e| anyhow::anyhow!("failed to convert endpoint"))?;
|
||||||
|
let channel = endpoint.connect().await?;
|
||||||
|
let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id)
|
||||||
|
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
|
||||||
|
let client = proto::PageServiceClient::with_interceptor(channel, auth);
|
||||||
|
|
||||||
|
Ok(Self { client })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns whether a relation exists.
|
||||||
|
pub async fn check_rel_exists(
|
||||||
|
&mut self,
|
||||||
|
req: model::CheckRelExistsRequest,
|
||||||
|
) -> Result<model::CheckRelExistsResponse, tonic::Status> {
|
||||||
|
let proto_req = proto::CheckRelExistsRequest::from(req);
|
||||||
|
|
||||||
|
let response = self.client.check_rel_exists(proto_req).await?;
|
||||||
|
|
||||||
|
let proto_resp = response.into_inner();
|
||||||
|
Ok(proto_resp.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetches a base backup.
|
||||||
|
pub async fn get_base_backup(
|
||||||
|
&mut self,
|
||||||
|
req: model::GetBaseBackupRequest,
|
||||||
|
) -> Result<impl Stream<Item = Result<Bytes, tonic::Status>>, tonic::Status> {
|
||||||
|
let proto_req = proto::GetBaseBackupRequest::from(req);
|
||||||
|
|
||||||
|
let response_stream: Streaming<proto::GetBaseBackupResponseChunk> =
|
||||||
|
self.client.get_base_backup(proto_req).await?.into_inner();
|
||||||
|
|
||||||
|
// TODO: Consider dechunking internally
|
||||||
|
let domain_stream = response_stream.map(|chunk_res| {
|
||||||
|
chunk_res.and_then(|proto_chunk| {
|
||||||
|
proto_chunk.try_into().map_err(|e| {
|
||||||
|
tonic::Status::internal(format!("Failed to convert response chunk: {}", e))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(domain_stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the total size of a database, as # of bytes.
|
||||||
|
pub async fn get_db_size(
|
||||||
|
&mut self,
|
||||||
|
req: model::GetDbSizeRequest,
|
||||||
|
) -> Result<u64, tonic::Status> {
|
||||||
|
let proto_req = proto::GetDbSizeRequest::from(req);
|
||||||
|
|
||||||
|
let response = self.client.get_db_size(proto_req).await?;
|
||||||
|
Ok(response.into_inner().into())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetches pages.
|
||||||
|
///
|
||||||
|
/// This is implemented as a bidirectional streaming RPC for performance.
|
||||||
|
/// Per-request errors are often returned as status_code instead of errors,
|
||||||
|
/// to avoid tearing down the entire stream via tonic::Status.
|
||||||
|
pub async fn get_pages<ReqSt>(
|
||||||
|
&mut self,
|
||||||
|
inbound: ReqSt,
|
||||||
|
) -> Result<
|
||||||
|
impl Stream<Item = Result<model::GetPageResponse, tonic::Status>> + Send + 'static,
|
||||||
|
tonic::Status,
|
||||||
|
>
|
||||||
|
where
|
||||||
|
ReqSt: Stream<Item = model::GetPageRequest> + Send + 'static,
|
||||||
|
{
|
||||||
|
let outbound_proto = inbound.map(|domain_req| domain_req.into());
|
||||||
|
|
||||||
|
let req_new = Request::new(outbound_proto);
|
||||||
|
|
||||||
|
let response_stream: Streaming<proto::GetPageResponse> =
|
||||||
|
self.client.get_pages(req_new).await?.into_inner();
|
||||||
|
|
||||||
|
let domain_stream = response_stream.map_ok(model::GetPageResponse::from);
|
||||||
|
|
||||||
|
Ok(domain_stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the size of a relation, as # of blocks.
|
||||||
|
pub async fn get_rel_size(
|
||||||
|
&mut self,
|
||||||
|
req: model::GetRelSizeRequest,
|
||||||
|
) -> Result<model::GetRelSizeResponse, tonic::Status> {
|
||||||
|
let proto_req = proto::GetRelSizeRequest::from(req);
|
||||||
|
let response = self.client.get_rel_size(proto_req).await?;
|
||||||
|
let proto_resp = response.into_inner();
|
||||||
|
Ok(proto_resp.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetches an SLRU segment.
|
||||||
|
pub async fn get_slru_segment(
|
||||||
|
&mut self,
|
||||||
|
req: model::GetSlruSegmentRequest,
|
||||||
|
) -> Result<model::GetSlruSegmentResponse, tonic::Status> {
|
||||||
|
let proto_req = proto::GetSlruSegmentRequest::from(req);
|
||||||
|
let response = self.client.get_slru_segment(proto_req).await?;
|
||||||
|
Ok(response.into_inner().try_into()?)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -18,6 +18,8 @@ pub mod proto {
|
|||||||
pub use page_service_server::{PageService, PageServiceServer};
|
pub use page_service_server::{PageService, PageServiceServer};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mod client;
|
||||||
|
pub use client::Client;
|
||||||
mod model;
|
mod model;
|
||||||
|
|
||||||
pub use model::*;
|
pub use model::*;
|
||||||
|
|||||||
Reference in New Issue
Block a user