From 6f0af96a54267b2e51d47672318ecafde8a2b03f Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 2 Jul 2025 10:59:40 +0200 Subject: [PATCH] Add new PageserverClient --- pageserver/client_grpc/src/client.rs | 252 +++++++++++++++++++++++++++ pageserver/client_grpc/src/lib.rs | 1 + pageserver/client_grpc/src/pool.rs | 41 +++-- pageserver/page_api/src/model.rs | 15 ++ 4 files changed, 291 insertions(+), 18 deletions(-) create mode 100644 pageserver/client_grpc/src/client.rs diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs new file mode 100644 index 0000000000..41ee43a732 --- /dev/null +++ b/pageserver/client_grpc/src/client.rs @@ -0,0 +1,252 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::ensure; +use pageserver_page_api as page_api; +use tokio_util::sync::CancellationToken; +use utils::backoff; +use utils::id::{TenantId, TimelineId}; +use utils::shard::ShardIndex; + +use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamPool}; + +/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the +/// basic `page_api::Client` gRPC client, and supports: +/// +/// * Sharded tenants across multiple Pageservers. +/// * Pooling of connections, clients, and streams for efficient resource use. +/// * Concurrent use by many callers. +/// * Internal handling of GetPage bidirectional streams. +/// * Automatic retries. +/// +/// TODO: this client does not support base backups or LSN leases, as these are only used by +/// compute_ctl. Consider adding this. +/// +/// TODO: use a proper error type. +pub struct PageserverClient { + /// Resource pools per shard. + pools: HashMap, +} + +impl PageserverClient { + /// Creates a new Pageserver client. + pub fn new( + tenant_id: TenantId, + timeline_id: TimelineId, + shard_map: HashMap, + auth_token: Option, + ) -> anyhow::Result { + // TODO: support multiple shards. + ensure!(shard_map.len() == 1, "multiple shard not supported"); + ensure!( + shard_map.keys().next() == Some(&ShardIndex::unsharded()), + "only unsharded tenant supported" + ); + + let mut pools = HashMap::new(); + for (shard_id, url) in shard_map { + let shard_pools = + ShardPools::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?; + pools.insert(shard_id, shard_pools); + } + + Ok(Self { pools }) + } + + /// Returns whether a relation exists. + pub async fn check_rel_exists( + &self, + req: page_api::CheckRelExistsRequest, + ) -> tonic::Result { + // Relation metadata is only available on shard 0. + let shard_id = self.shard_zero(); + + self.with_retries("check_rel_exists", async || { + let mut client = self.get_shard_client(shard_id).await?; + client.check_rel_exists(req).await + }) + .await + } + + /// Returns the total size of a database, as # of bytes. + pub async fn get_db_size( + &self, + req: page_api::GetDbSizeRequest, + ) -> tonic::Result { + // Relation metadata is only available on shard 0. + let shard_id = self.shard_zero(); + + self.with_retries("get_db_size", async || { + let mut client = self.get_shard_client(shard_id).await?; + client.get_db_size(req).await + }) + .await + } + + /// Fetches a page. The `request_id` must be unique across all in-flight requests. + /// + /// Unlike the `page_api::Client`, this client automatically converts `status_code` into + /// `tonic::Status` errors. All responses will have `GetPageStatusCode::Ok`. + pub async fn get_page( + &self, + req: page_api::GetPageRequest, + ) -> tonic::Result { + // TODO: support multiple shards. + let shard_id = ShardIndex::unsharded(); + let streams = self.get_shard_streams(shard_id)?; + + self.with_retries("get_page", async || { + let resp = streams.send(req.clone()).await?; + + if resp.status_code != page_api::GetPageStatusCode::Ok { + return Err(tonic::Status::new( + resp.status_code.into(), + resp.reason.unwrap_or_else(|| String::from("unknown error")), + )); + } + + Ok(resp) + }) + .await + } + + /// Returns the size of a relation, as # of blocks. + pub async fn get_rel_size( + &self, + req: page_api::GetRelSizeRequest, + ) -> tonic::Result { + // Relation metadata is only available on shard 0. + let shard_id = self.shard_zero(); + + self.with_retries("get_rel_size", async || { + let mut client = self.get_shard_client(shard_id).await?; + client.get_rel_size(req).await + }) + .await + } + + /// Fetches an SLRU segment. + pub async fn get_slru_segment( + &self, + req: page_api::GetSlruSegmentRequest, + ) -> tonic::Result { + // SLRU segments are only available on shard 0. + let shard_id = self.shard_zero(); + + self.with_retries("get_slru_segment", async || { + let mut client = self.get_shard_client(shard_id).await?; + client.get_slru_segment(req).await + }) + .await + } + + /// Returns a pooled `page_api::Client` for the given shard. + async fn get_shard_client(&self, shard_id: ShardIndex) -> tonic::Result { + self.pools + .get(&shard_id) + .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))? + .clients + .get() + .await + .map_err(|err| tonic::Status::internal(format!("failed to acquire client: {err}"))) + } + + /// Returns the stream pool for the given shard. + #[allow(clippy::result_large_err)] // TODO: revisit + fn get_shard_streams(&self, shard_id: ShardIndex) -> tonic::Result<&Arc> { + Ok(&self + .pools + .get(&shard_id) + .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))? + .streams) + } + + /// Returns the shard index for shard 0. + fn shard_zero(&self) -> ShardIndex { + // TODO: support multiple shards. + ShardIndex::unsharded() + } + + /// Runs the given closure with exponential backoff retries. + async fn with_retries(&self, name: &str, f: F) -> tonic::Result + where + F: FnMut() -> O, + O: Future>, + { + /// TODO: tune retry parameters (retry forever?). + /// TODO: add timeouts. + const WARN_THRESHOLD: u32 = 1; + const MAX_RETRIES: u32 = 10; + // TODO: cancellation. + let cancel = CancellationToken::new(); + + fn is_permanent(err: &tonic::Status) -> bool { + match err.code() { + // Not really an error, but whatever. Don't retry. + tonic::Code::Ok => true, + // These codes are transient, so retry them. + tonic::Code::Aborted => false, + tonic::Code::Cancelled => false, + tonic::Code::DeadlineExceeded => false, // maybe transient slowness + tonic::Code::Internal => false, // maybe transient failure + tonic::Code::ResourceExhausted => false, + tonic::Code::Unavailable => false, + tonic::Code::Unknown => false, // may as well retry + // The following codes will like continue to fail, so don't retry. + tonic::Code::AlreadyExists => true, + tonic::Code::DataLoss => true, + tonic::Code::FailedPrecondition => true, + tonic::Code::InvalidArgument => true, + tonic::Code::NotFound => true, + tonic::Code::OutOfRange => true, + tonic::Code::PermissionDenied => true, + tonic::Code::Unimplemented => true, + tonic::Code::Unauthenticated => true, + } + } + + backoff::retry(f, is_permanent, WARN_THRESHOLD, MAX_RETRIES, name, &cancel) + .await + .expect("never cancelled (for now)") + } +} + +/// Resource pools for a single shard. +/// +/// TODO: consider separate pools for normal and bulk traffic, with different settings. +struct ShardPools { + /// Manages gRPC channels (i.e. TCP connections) for this shard. + #[allow(unused)] + channels: Arc, + /// Manages gRPC clients for this shard, using `channels`. + clients: Arc, + /// Manages gRPC GetPage streams for this shard, using `clients`. + streams: Arc, +} + +impl ShardPools { + /// Creates a new set of resource pools for the given shard. + pub fn new( + url: String, + tenant_id: TenantId, + timeline_id: TimelineId, + shard_id: ShardIndex, + auth_token: Option, + ) -> anyhow::Result { + let channels = ChannelPool::new(url)?; + let clients = ClientPool::new( + channels.clone(), + tenant_id, + timeline_id, + shard_id, + auth_token, + ); + let streams = StreamPool::new(clients.clone()); + + Ok(Self { + channels, + clients, + streams, + }) + } +} diff --git a/pageserver/client_grpc/src/lib.rs b/pageserver/client_grpc/src/lib.rs index ea0d85b92e..ee773ec378 100644 --- a/pageserver/client_grpc/src/lib.rs +++ b/pageserver/client_grpc/src/lib.rs @@ -20,6 +20,7 @@ use pageserver_page_api::proto::PageServiceClient; use pageserver_page_api::*; use utils::shard::ShardIndex; +pub mod client; pub mod client_cache; pub mod pool; pub mod request_tracker; diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 4686853e83..1fe5c6958a 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -5,16 +5,16 @@ //! a dedicated TCP connection and server task for every Postgres backend. //! //! Each resource has its own, nested pool. The pools are custom-built for the properties of each -//! resource -- these are different enough that a generic pool isn't suitable. +//! resource -- they are different enough that a generic pool isn't suitable. //! //! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients //! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a -//! per-channel limit. Channels may be closed when they are no longer used by any clients. +//! per-channel client limit. Channels may be closed when they are no longer used by any clients. //! //! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared) -//! channel from the ChannelPool for client's lifetime. A client can only be acquired by a single -//! caller at a time, and is returned to the pool when dropped. Idle clients may be removed from -//! the pool after some time, to free up the channel. +//! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a +//! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed +//! from the pool after some time, to free up the channel. //! //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from //! the ClientPool for the stream's lifetime. Internal streams are not exposed to callers; @@ -23,6 +23,9 @@ //! pipelining multiple requests from multiple callers on the same stream (up to some queue //! depth), and route the response back to the original caller. Idle streams may be removed from //! the pool after some time, to free up the client. +//! +//! Each channel corresponds to one TCP connection. Each client unary request and each stream +//! corresponds to one HTTP/2 stream and server task. use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; @@ -108,7 +111,7 @@ impl ChannelPool { /// NB: this is not very performance-sensitive. It is only called when creating a new client, /// and clients are cached and reused by ClientPool. The total number of channels will also be /// small. O(n) performance is therefore okay. - pub fn get(self: &Arc) -> anyhow::Result { + pub fn get(self: &Arc) -> ChannelGuard { let mut channels = self.channels.lock().unwrap(); // Try to find an existing channel with available capacity. We check entries in BTreeMap @@ -119,11 +122,11 @@ impl ChannelPool { assert!(entry.clients <= CLIENTS_PER_CHANNEL, "channel overflow"); if entry.clients < CLIENTS_PER_CHANNEL { entry.clients += 1; - return Ok(ChannelGuard { + return ChannelGuard { pool: Arc::downgrade(self), id, channel: Some(entry.channel.clone()), - }); + }; } } @@ -138,11 +141,11 @@ impl ChannelPool { }; channels.insert(id, entry); - Ok(ChannelGuard { + ChannelGuard { pool: Arc::downgrade(self), id, channel: Some(channel.clone()), - }) + } } } @@ -239,7 +242,8 @@ impl ClientPool { } /// Gets a client from the pool, or creates a new one if necessary. Blocks if the pool is at - /// `CLIENT_LIMIT`. The client is returned to the pool when the guard is dropped. + /// `CLIENT_LIMIT`, but connection happens lazily (if needed). The client is returned to the + /// pool when the guard is dropped. /// /// This is moderately performance-sensitive. It is called for every unary request, but recall /// that these establish a new gRPC stream per request so it's already expensive. GetPage @@ -264,7 +268,7 @@ impl ClientPool { } // Slow path: construct a new client. - let mut channel_guard = self.channel_pool.get()?; + let mut channel_guard = self.channel_pool.get(); let client = page_api::Client::new( channel_guard.take(), self.tenant_id, @@ -368,13 +372,13 @@ struct StreamEntry { impl StreamPool { /// Creates a new stream pool, using the given client pool. - pub fn new(client_pool: Arc) -> Self { - Self { + pub fn new(client_pool: Arc) -> Arc { + Arc::new(Self { client_pool, streams: Arc::default(), limiter: Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH), next_stream_id: AtomicUsize::default(), - } + }) } /// Sends a request via the stream pool and awaits the response. Blocks if the pool is at @@ -402,8 +406,8 @@ impl StreamPool { // do the same for queue depth tracking. let _permit = self.limiter.acquire().await.expect("never closed"); - // Acquire a stream sender. We increment and decrement the queue depth here instead of in - // the stream task to ensure we don't exceed the queue depth limit. + // Acquire a stream sender. We increment and decrement the queue depth here while acquiring + // a stream, instead of in the stream task, to ensure we don't acquire a full stream. #[allow(clippy::await_holding_lock)] // TODO: Clippy doesn't understand drop() let (req_tx, queue_depth) = async { let mut streams = self.streams.lock().unwrap(); @@ -480,7 +484,8 @@ impl StreamPool { /// Runs a stream task. This acquires a client from the `ClientPool` and establishes a /// bidirectional GetPage stream, then forwards requests and responses between callers and the - /// stream. It does not track or enforce queue depths, see `send()`. + /// stream. It does not track or enforce queue depths -- that's done by `send()` since it must + /// be atomic with pool stream acquisition. /// /// The task exits when the request channel is closed, or on a stream error. The caller is /// responsible for removing the stream from the pool on exit. diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 4497fc6fc7..c5b6f06879 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -602,6 +602,21 @@ impl TryFrom for GetPageStatusCode { } } +impl From for tonic::Code { + fn from(status_code: GetPageStatusCode) -> Self { + use tonic::Code; + + match status_code { + GetPageStatusCode::Unknown => Code::Unknown, + GetPageStatusCode::Ok => Code::Ok, + GetPageStatusCode::NotFound => Code::NotFound, + GetPageStatusCode::InvalidRequest => Code::InvalidArgument, + GetPageStatusCode::InternalError => Code::Internal, + GetPageStatusCode::SlowDown => Code::ResourceExhausted, + } + } +} + // Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other // shards will error. #[derive(Clone, Copy, Debug)]