diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index e870cecc58..f25aff1110 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1057,7 +1057,7 @@ impl ComputeNode { }; let (reader, connected) = tokio::runtime::Handle::current().block_on(async move { - let mut client = page_api::Client::new( + let mut client = page_api::Client::connect( shard0_connstr, spec.tenant_id, spec.timeline_id, diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index 3346c18c0d..bb0828429d 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -192,7 +192,7 @@ fn acquire_lsn_lease_grpc( lsn: Lsn, ) -> Result> { tokio::runtime::Handle::current().block_on(async move { - let mut client = page_api::Client::new( + let mut client = page_api::Client::connect( connstring.to_string(), tenant_shard_id.tenant_id, timeline_id, diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 65e41540b8..6523d00d3d 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -1,23 +1,151 @@ -use anyhow::Result; +use anyhow::Context as _; use futures::{Stream, StreamExt as _, TryStreamExt as _}; use tokio::io::AsyncRead; use tokio_util::io::StreamReader; +use tonic::codec::CompressionEncoding; use tonic::metadata::AsciiMetadataValue; -use tonic::metadata::errors::InvalidMetadataValue; -use tonic::transport::Channel; -use tonic::{Request, Streaming}; +use tonic::service::Interceptor; +use tonic::service::interceptor::InterceptedService; +use tonic::transport::{Channel, Endpoint}; -use utils::id::TenantId; -use utils::id::TimelineId; +use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -use crate::model; +use crate::model::*; use crate::proto; -/// -/// AuthInterceptor adds tenant, timeline, and auth header to the channel. These -/// headers are required at the pageserver. -/// +/// A basic Pageserver gRPC client, for a single tenant shard. This API uses native Rust domain +/// types from `model` rather than generated Protobuf types. +pub struct Client { + inner: proto::PageServiceClient>, +} + +impl Client { + /// Connects to the given gRPC endpoint. + pub async fn connect( + endpoint: E, + tenant_id: TenantId, + timeline_id: TimelineId, + shard_id: ShardIndex, + auth_token: Option, + compression: Option, + ) -> anyhow::Result + where + E: TryInto + Send + Sync + 'static, + >::Error: std::error::Error + Send + Sync, + { + let endpoint: Endpoint = endpoint.try_into().context("invalid endpoint")?; + let channel = endpoint.connect().await?; + Self::new( + channel, + tenant_id, + timeline_id, + shard_id, + auth_token, + compression, + ) + } + + /// Creates a new client using the given gRPC channel. + pub fn new( + channel: Channel, + tenant_id: TenantId, + timeline_id: TimelineId, + shard_id: ShardIndex, + auth_token: Option, + compression: Option, + ) -> anyhow::Result { + let auth = AuthInterceptor::new(tenant_id, timeline_id, shard_id, auth_token)?; + let mut inner = proto::PageServiceClient::with_interceptor(channel, auth); + + if let Some(compression) = compression { + // TODO: benchmark this (including network latency). + inner = inner + .accept_compressed(compression) + .send_compressed(compression); + } + + Ok(Self { inner }) + } + + /// Returns whether a relation exists. + pub async fn check_rel_exists( + &mut self, + req: CheckRelExistsRequest, + ) -> tonic::Result { + let req = proto::CheckRelExistsRequest::from(req); + let resp = self.inner.check_rel_exists(req).await?.into_inner(); + Ok(resp.into()) + } + + /// Fetches a base backup. + pub async fn get_base_backup( + &mut self, + req: GetBaseBackupRequest, + ) -> tonic::Result> { + let req = proto::GetBaseBackupRequest::from(req); + let chunks = self.inner.get_base_backup(req).await?.into_inner(); + Ok(StreamReader::new( + chunks + .map_ok(|resp| resp.chunk) + .map_err(std::io::Error::other), + )) + } + + /// Returns the total size of a database, as # of bytes. + pub async fn get_db_size(&mut self, req: GetDbSizeRequest) -> tonic::Result { + let req = proto::GetDbSizeRequest::from(req); + let resp = self.inner.get_db_size(req).await?.into_inner(); + Ok(resp.into()) + } + + /// Fetches pages. + /// + /// This is implemented as a bidirectional streaming RPC for performance. Per-request errors are + /// typically returned as status_code instead of errors, to avoid tearing down the entire stream + /// via a tonic::Status error. + pub async fn get_pages( + &mut self, + reqs: impl Stream + Send + 'static, + ) -> tonic::Result> + Send + 'static> { + let reqs = reqs.map(proto::GetPageRequest::from); + let resps = self.inner.get_pages(reqs).await?.into_inner(); + Ok(resps.map_ok(GetPageResponse::from)) + } + + /// Returns the size of a relation, as # of blocks. + pub async fn get_rel_size( + &mut self, + req: GetRelSizeRequest, + ) -> tonic::Result { + let req = proto::GetRelSizeRequest::from(req); + let resp = self.inner.get_rel_size(req).await?.into_inner(); + Ok(resp.into()) + } + + /// Fetches an SLRU segment. + pub async fn get_slru_segment( + &mut self, + req: GetSlruSegmentRequest, + ) -> tonic::Result { + let req = proto::GetSlruSegmentRequest::from(req); + let resp = self.inner.get_slru_segment(req).await?.into_inner(); + Ok(resp.try_into()?) + } + + /// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't + /// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards. + /// + /// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be + /// acquired because the LSN has already been garbage collected. + pub async fn lease_lsn(&mut self, req: LeaseLsnRequest) -> tonic::Result { + let req = proto::LeaseLsnRequest::from(req); + let resp = self.inner.lease_lsn(req).await?.into_inner(); + Ok(resp.try_into()?) + } +} + +/// Adds authentication metadata to gRPC requests. #[derive(Clone)] struct AuthInterceptor { tenant_id: AsciiMetadataValue, @@ -30,174 +158,29 @@ impl AuthInterceptor { fn new( tenant_id: TenantId, timeline_id: TimelineId, - auth_token: Option, shard_id: ShardIndex, - ) -> Result { - let tenant_ascii: AsciiMetadataValue = tenant_id.to_string().try_into()?; - let timeline_ascii: AsciiMetadataValue = timeline_id.to_string().try_into()?; - let shard_ascii: AsciiMetadataValue = shard_id.to_string().try_into()?; - - let auth_header: Option = match auth_token { - Some(token) => Some(format!("Bearer {token}").try_into()?), - None => None, - }; - + auth_token: Option, + ) -> anyhow::Result { Ok(Self { - tenant_id: tenant_ascii, - shard_id: shard_ascii, - timeline_id: timeline_ascii, - auth_header, + tenant_id: tenant_id.to_string().try_into()?, + timeline_id: timeline_id.to_string().try_into()?, + shard_id: shard_id.to_string().try_into()?, + auth_header: auth_token + .map(|token| format!("Bearer {token}").try_into()) + .transpose()?, }) } } -impl tonic::service::Interceptor for AuthInterceptor { - fn call(&mut self, mut req: tonic::Request<()>) -> Result, tonic::Status> { - req.metadata_mut() - .insert("neon-tenant-id", self.tenant_id.clone()); - req.metadata_mut() - .insert("neon-shard-id", self.shard_id.clone()); - req.metadata_mut() - .insert("neon-timeline-id", self.timeline_id.clone()); - if let Some(auth_header) = &self.auth_header { - req.metadata_mut() - .insert("authorization", auth_header.clone()); +impl Interceptor for AuthInterceptor { + fn call(&mut self, mut req: tonic::Request<()>) -> tonic::Result> { + let metadata = req.metadata_mut(); + metadata.insert("neon-tenant-id", self.tenant_id.clone()); + metadata.insert("neon-timeline-id", self.timeline_id.clone()); + metadata.insert("neon-shard-id", self.shard_id.clone()); + if let Some(ref auth_header) = self.auth_header { + metadata.insert("authorization", auth_header.clone()); } Ok(req) } } - -#[derive(Clone)] -pub struct Client { - client: proto::PageServiceClient< - tonic::service::interceptor::InterceptedService, - >, -} - -impl Client { - pub async fn new + Send + Sync + 'static>( - into_endpoint: T, - tenant_id: TenantId, - timeline_id: TimelineId, - shard_id: ShardIndex, - auth_header: Option, - compression: Option, - ) -> anyhow::Result { - let endpoint: tonic::transport::Endpoint = into_endpoint - .try_into() - .map_err(|_e| anyhow::anyhow!("failed to convert endpoint"))?; - let channel = endpoint.connect().await?; - let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id) - .map_err(|e| anyhow::anyhow!(e.to_string()))?; - let mut client = proto::PageServiceClient::with_interceptor(channel, auth); - - if let Some(compression) = compression { - // TODO: benchmark this (including network latency). - client = client - .accept_compressed(compression) - .send_compressed(compression); - } - - Ok(Self { client }) - } - - /// Returns whether a relation exists. - pub async fn check_rel_exists( - &mut self, - req: model::CheckRelExistsRequest, - ) -> Result { - let proto_req = proto::CheckRelExistsRequest::from(req); - - let response = self.client.check_rel_exists(proto_req).await?; - - let proto_resp = response.into_inner(); - Ok(proto_resp.into()) - } - - /// Fetches a base backup. - pub async fn get_base_backup( - &mut self, - req: model::GetBaseBackupRequest, - ) -> Result, tonic::Status> { - let req = proto::GetBaseBackupRequest::from(req); - let chunks = self.client.get_base_backup(req).await?.into_inner(); - let reader = StreamReader::new( - chunks - .map_ok(|resp| resp.chunk) - .map_err(std::io::Error::other), - ); - Ok(reader) - } - - /// Returns the total size of a database, as # of bytes. - pub async fn get_db_size( - &mut self, - req: model::GetDbSizeRequest, - ) -> Result { - let proto_req = proto::GetDbSizeRequest::from(req); - - let response = self.client.get_db_size(proto_req).await?; - Ok(response.into_inner().into()) - } - - /// Fetches pages. - /// - /// This is implemented as a bidirectional streaming RPC for performance. - /// Per-request errors are often returned as status_code instead of errors, - /// to avoid tearing down the entire stream via tonic::Status. - pub async fn get_pages( - &mut self, - inbound: ReqSt, - ) -> Result< - impl Stream> + Send + 'static, - tonic::Status, - > - where - ReqSt: Stream + Send + 'static, - { - let outbound_proto = inbound.map(|domain_req| domain_req.into()); - - let req_new = Request::new(outbound_proto); - - let response_stream: Streaming = - self.client.get_pages(req_new).await?.into_inner(); - - let domain_stream = response_stream.map_ok(model::GetPageResponse::from); - - Ok(domain_stream) - } - - /// Returns the size of a relation, as # of blocks. - pub async fn get_rel_size( - &mut self, - req: model::GetRelSizeRequest, - ) -> Result { - let proto_req = proto::GetRelSizeRequest::from(req); - let response = self.client.get_rel_size(proto_req).await?; - let proto_resp = response.into_inner(); - Ok(proto_resp.into()) - } - - /// Fetches an SLRU segment. - pub async fn get_slru_segment( - &mut self, - req: model::GetSlruSegmentRequest, - ) -> Result { - let proto_req = proto::GetSlruSegmentRequest::from(req); - let response = self.client.get_slru_segment(proto_req).await?; - Ok(response.into_inner().try_into()?) - } - - /// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't - /// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards. - /// - /// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be - /// acquired because the LSN has already been garbage collected. - pub async fn lease_lsn( - &mut self, - req: model::LeaseLsnRequest, - ) -> Result { - let req = proto::LeaseLsnRequest::from(req); - Ok(self.client.lease_lsn(req).await?.into_inner().try_into()?) - } -} diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 4b7a70504a..c14bb73136 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -326,7 +326,7 @@ impl GrpcClient { ttid: TenantTimelineId, compression: bool, ) -> anyhow::Result { - let inner = page_api::Client::new( + let inner = page_api::Client::connect( connstring.to_string(), ttid.tenant_id, ttid.timeline_id, diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index a297819e9b..f14caf548c 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -625,7 +625,7 @@ impl GrpcClient { ttid: TenantTimelineId, compression: bool, ) -> anyhow::Result { - let mut client = page_api::Client::new( + let mut client = page_api::Client::connect( connstring.to_string(), ttid.tenant_id, ttid.timeline_id,