mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Add new PageserverClient
This commit is contained in:
252
pageserver/client_grpc/src/client.rs
Normal file
252
pageserver/client_grpc/src/client.rs
Normal file
@@ -0,0 +1,252 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::ensure;
|
||||
use pageserver_page_api as page_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::shard::ShardIndex;
|
||||
|
||||
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamPool};
|
||||
|
||||
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
|
||||
/// basic `page_api::Client` gRPC client, and supports:
|
||||
///
|
||||
/// * Sharded tenants across multiple Pageservers.
|
||||
/// * Pooling of connections, clients, and streams for efficient resource use.
|
||||
/// * Concurrent use by many callers.
|
||||
/// * Internal handling of GetPage bidirectional streams.
|
||||
/// * Automatic retries.
|
||||
///
|
||||
/// TODO: this client does not support base backups or LSN leases, as these are only used by
|
||||
/// compute_ctl. Consider adding this.
|
||||
///
|
||||
/// TODO: use a proper error type.
|
||||
pub struct PageserverClient {
|
||||
/// Resource pools per shard.
|
||||
pools: HashMap<ShardIndex, ShardPools>,
|
||||
}
|
||||
|
||||
impl PageserverClient {
|
||||
/// Creates a new Pageserver client.
|
||||
pub fn new(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
shard_map: HashMap<ShardIndex, String>,
|
||||
auth_token: Option<String>,
|
||||
) -> anyhow::Result<Self> {
|
||||
// TODO: support multiple shards.
|
||||
ensure!(shard_map.len() == 1, "multiple shard not supported");
|
||||
ensure!(
|
||||
shard_map.keys().next() == Some(&ShardIndex::unsharded()),
|
||||
"only unsharded tenant supported"
|
||||
);
|
||||
|
||||
let mut pools = HashMap::new();
|
||||
for (shard_id, url) in shard_map {
|
||||
let shard_pools =
|
||||
ShardPools::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?;
|
||||
pools.insert(shard_id, shard_pools);
|
||||
}
|
||||
|
||||
Ok(Self { pools })
|
||||
}
|
||||
|
||||
/// Returns whether a relation exists.
|
||||
pub async fn check_rel_exists(
|
||||
&self,
|
||||
req: page_api::CheckRelExistsRequest,
|
||||
) -> tonic::Result<page_api::CheckRelExistsResponse> {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let shard_id = self.shard_zero();
|
||||
|
||||
self.with_retries("check_rel_exists", async || {
|
||||
let mut client = self.get_shard_client(shard_id).await?;
|
||||
client.check_rel_exists(req).await
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns the total size of a database, as # of bytes.
|
||||
pub async fn get_db_size(
|
||||
&self,
|
||||
req: page_api::GetDbSizeRequest,
|
||||
) -> tonic::Result<page_api::GetDbSizeResponse> {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let shard_id = self.shard_zero();
|
||||
|
||||
self.with_retries("get_db_size", async || {
|
||||
let mut client = self.get_shard_client(shard_id).await?;
|
||||
client.get_db_size(req).await
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Fetches a page. The `request_id` must be unique across all in-flight requests.
|
||||
///
|
||||
/// Unlike the `page_api::Client`, this client automatically converts `status_code` into
|
||||
/// `tonic::Status` errors. All responses will have `GetPageStatusCode::Ok`.
|
||||
pub async fn get_page(
|
||||
&self,
|
||||
req: page_api::GetPageRequest,
|
||||
) -> tonic::Result<page_api::GetPageResponse> {
|
||||
// TODO: support multiple shards.
|
||||
let shard_id = ShardIndex::unsharded();
|
||||
let streams = self.get_shard_streams(shard_id)?;
|
||||
|
||||
self.with_retries("get_page", async || {
|
||||
let resp = streams.send(req.clone()).await?;
|
||||
|
||||
if resp.status_code != page_api::GetPageStatusCode::Ok {
|
||||
return Err(tonic::Status::new(
|
||||
resp.status_code.into(),
|
||||
resp.reason.unwrap_or_else(|| String::from("unknown error")),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(resp)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns the size of a relation, as # of blocks.
|
||||
pub async fn get_rel_size(
|
||||
&self,
|
||||
req: page_api::GetRelSizeRequest,
|
||||
) -> tonic::Result<page_api::GetRelSizeResponse> {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let shard_id = self.shard_zero();
|
||||
|
||||
self.with_retries("get_rel_size", async || {
|
||||
let mut client = self.get_shard_client(shard_id).await?;
|
||||
client.get_rel_size(req).await
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Fetches an SLRU segment.
|
||||
pub async fn get_slru_segment(
|
||||
&self,
|
||||
req: page_api::GetSlruSegmentRequest,
|
||||
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
|
||||
// SLRU segments are only available on shard 0.
|
||||
let shard_id = self.shard_zero();
|
||||
|
||||
self.with_retries("get_slru_segment", async || {
|
||||
let mut client = self.get_shard_client(shard_id).await?;
|
||||
client.get_slru_segment(req).await
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns a pooled `page_api::Client` for the given shard.
|
||||
async fn get_shard_client(&self, shard_id: ShardIndex) -> tonic::Result<ClientGuard> {
|
||||
self.pools
|
||||
.get(&shard_id)
|
||||
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))?
|
||||
.clients
|
||||
.get()
|
||||
.await
|
||||
.map_err(|err| tonic::Status::internal(format!("failed to acquire client: {err}")))
|
||||
}
|
||||
|
||||
/// Returns the stream pool for the given shard.
|
||||
#[allow(clippy::result_large_err)] // TODO: revisit
|
||||
fn get_shard_streams(&self, shard_id: ShardIndex) -> tonic::Result<&Arc<StreamPool>> {
|
||||
Ok(&self
|
||||
.pools
|
||||
.get(&shard_id)
|
||||
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))?
|
||||
.streams)
|
||||
}
|
||||
|
||||
/// Returns the shard index for shard 0.
|
||||
fn shard_zero(&self) -> ShardIndex {
|
||||
// TODO: support multiple shards.
|
||||
ShardIndex::unsharded()
|
||||
}
|
||||
|
||||
/// Runs the given closure with exponential backoff retries.
|
||||
async fn with_retries<T, F, O>(&self, name: &str, f: F) -> tonic::Result<T>
|
||||
where
|
||||
F: FnMut() -> O,
|
||||
O: Future<Output = tonic::Result<T>>,
|
||||
{
|
||||
/// TODO: tune retry parameters (retry forever?).
|
||||
/// TODO: add timeouts.
|
||||
const WARN_THRESHOLD: u32 = 1;
|
||||
const MAX_RETRIES: u32 = 10;
|
||||
// TODO: cancellation.
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
fn is_permanent(err: &tonic::Status) -> bool {
|
||||
match err.code() {
|
||||
// Not really an error, but whatever. Don't retry.
|
||||
tonic::Code::Ok => true,
|
||||
// These codes are transient, so retry them.
|
||||
tonic::Code::Aborted => false,
|
||||
tonic::Code::Cancelled => false,
|
||||
tonic::Code::DeadlineExceeded => false, // maybe transient slowness
|
||||
tonic::Code::Internal => false, // maybe transient failure
|
||||
tonic::Code::ResourceExhausted => false,
|
||||
tonic::Code::Unavailable => false,
|
||||
tonic::Code::Unknown => false, // may as well retry
|
||||
// The following codes will like continue to fail, so don't retry.
|
||||
tonic::Code::AlreadyExists => true,
|
||||
tonic::Code::DataLoss => true,
|
||||
tonic::Code::FailedPrecondition => true,
|
||||
tonic::Code::InvalidArgument => true,
|
||||
tonic::Code::NotFound => true,
|
||||
tonic::Code::OutOfRange => true,
|
||||
tonic::Code::PermissionDenied => true,
|
||||
tonic::Code::Unimplemented => true,
|
||||
tonic::Code::Unauthenticated => true,
|
||||
}
|
||||
}
|
||||
|
||||
backoff::retry(f, is_permanent, WARN_THRESHOLD, MAX_RETRIES, name, &cancel)
|
||||
.await
|
||||
.expect("never cancelled (for now)")
|
||||
}
|
||||
}
|
||||
|
||||
/// Resource pools for a single shard.
|
||||
///
|
||||
/// TODO: consider separate pools for normal and bulk traffic, with different settings.
|
||||
struct ShardPools {
|
||||
/// Manages gRPC channels (i.e. TCP connections) for this shard.
|
||||
#[allow(unused)]
|
||||
channels: Arc<ChannelPool>,
|
||||
/// Manages gRPC clients for this shard, using `channels`.
|
||||
clients: Arc<ClientPool>,
|
||||
/// Manages gRPC GetPage streams for this shard, using `clients`.
|
||||
streams: Arc<StreamPool>,
|
||||
}
|
||||
|
||||
impl ShardPools {
|
||||
/// Creates a new set of resource pools for the given shard.
|
||||
pub fn new(
|
||||
url: String,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
shard_id: ShardIndex,
|
||||
auth_token: Option<String>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let channels = ChannelPool::new(url)?;
|
||||
let clients = ClientPool::new(
|
||||
channels.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard_id,
|
||||
auth_token,
|
||||
);
|
||||
let streams = StreamPool::new(clients.clone());
|
||||
|
||||
Ok(Self {
|
||||
channels,
|
||||
clients,
|
||||
streams,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,7 @@ use pageserver_page_api::proto::PageServiceClient;
|
||||
use pageserver_page_api::*;
|
||||
use utils::shard::ShardIndex;
|
||||
|
||||
pub mod client;
|
||||
pub mod client_cache;
|
||||
pub mod pool;
|
||||
pub mod request_tracker;
|
||||
|
||||
@@ -5,16 +5,16 @@
|
||||
//! a dedicated TCP connection and server task for every Postgres backend.
|
||||
//!
|
||||
//! Each resource has its own, nested pool. The pools are custom-built for the properties of each
|
||||
//! resource -- these are different enough that a generic pool isn't suitable.
|
||||
//! resource -- they are different enough that a generic pool isn't suitable.
|
||||
//!
|
||||
//! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
|
||||
//! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
|
||||
//! per-channel limit. Channels may be closed when they are no longer used by any clients.
|
||||
//! per-channel client limit. Channels may be closed when they are no longer used by any clients.
|
||||
//!
|
||||
//! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
|
||||
//! channel from the ChannelPool for client's lifetime. A client can only be acquired by a single
|
||||
//! caller at a time, and is returned to the pool when dropped. Idle clients may be removed from
|
||||
//! the pool after some time, to free up the channel.
|
||||
//! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
|
||||
//! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed
|
||||
//! from the pool after some time, to free up the channel.
|
||||
//!
|
||||
//! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from
|
||||
//! the ClientPool for the stream's lifetime. Internal streams are not exposed to callers;
|
||||
@@ -23,6 +23,9 @@
|
||||
//! pipelining multiple requests from multiple callers on the same stream (up to some queue
|
||||
//! depth), and route the response back to the original caller. Idle streams may be removed from
|
||||
//! the pool after some time, to free up the client.
|
||||
//!
|
||||
//! Each channel corresponds to one TCP connection. Each client unary request and each stream
|
||||
//! corresponds to one HTTP/2 stream and server task.
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
@@ -108,7 +111,7 @@ impl ChannelPool {
|
||||
/// NB: this is not very performance-sensitive. It is only called when creating a new client,
|
||||
/// and clients are cached and reused by ClientPool. The total number of channels will also be
|
||||
/// small. O(n) performance is therefore okay.
|
||||
pub fn get(self: &Arc<Self>) -> anyhow::Result<ChannelGuard> {
|
||||
pub fn get(self: &Arc<Self>) -> ChannelGuard {
|
||||
let mut channels = self.channels.lock().unwrap();
|
||||
|
||||
// Try to find an existing channel with available capacity. We check entries in BTreeMap
|
||||
@@ -119,11 +122,11 @@ impl ChannelPool {
|
||||
assert!(entry.clients <= CLIENTS_PER_CHANNEL, "channel overflow");
|
||||
if entry.clients < CLIENTS_PER_CHANNEL {
|
||||
entry.clients += 1;
|
||||
return Ok(ChannelGuard {
|
||||
return ChannelGuard {
|
||||
pool: Arc::downgrade(self),
|
||||
id,
|
||||
channel: Some(entry.channel.clone()),
|
||||
});
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,11 +141,11 @@ impl ChannelPool {
|
||||
};
|
||||
channels.insert(id, entry);
|
||||
|
||||
Ok(ChannelGuard {
|
||||
ChannelGuard {
|
||||
pool: Arc::downgrade(self),
|
||||
id,
|
||||
channel: Some(channel.clone()),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -239,7 +242,8 @@ impl ClientPool {
|
||||
}
|
||||
|
||||
/// Gets a client from the pool, or creates a new one if necessary. Blocks if the pool is at
|
||||
/// `CLIENT_LIMIT`. The client is returned to the pool when the guard is dropped.
|
||||
/// `CLIENT_LIMIT`, but connection happens lazily (if needed). The client is returned to the
|
||||
/// pool when the guard is dropped.
|
||||
///
|
||||
/// This is moderately performance-sensitive. It is called for every unary request, but recall
|
||||
/// that these establish a new gRPC stream per request so it's already expensive. GetPage
|
||||
@@ -264,7 +268,7 @@ impl ClientPool {
|
||||
}
|
||||
|
||||
// Slow path: construct a new client.
|
||||
let mut channel_guard = self.channel_pool.get()?;
|
||||
let mut channel_guard = self.channel_pool.get();
|
||||
let client = page_api::Client::new(
|
||||
channel_guard.take(),
|
||||
self.tenant_id,
|
||||
@@ -368,13 +372,13 @@ struct StreamEntry {
|
||||
|
||||
impl StreamPool {
|
||||
/// Creates a new stream pool, using the given client pool.
|
||||
pub fn new(client_pool: Arc<ClientPool>) -> Self {
|
||||
Self {
|
||||
pub fn new(client_pool: Arc<ClientPool>) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
client_pool,
|
||||
streams: Arc::default(),
|
||||
limiter: Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH),
|
||||
next_stream_id: AtomicUsize::default(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Sends a request via the stream pool and awaits the response. Blocks if the pool is at
|
||||
@@ -402,8 +406,8 @@ impl StreamPool {
|
||||
// do the same for queue depth tracking.
|
||||
let _permit = self.limiter.acquire().await.expect("never closed");
|
||||
|
||||
// Acquire a stream sender. We increment and decrement the queue depth here instead of in
|
||||
// the stream task to ensure we don't exceed the queue depth limit.
|
||||
// Acquire a stream sender. We increment and decrement the queue depth here while acquiring
|
||||
// a stream, instead of in the stream task, to ensure we don't acquire a full stream.
|
||||
#[allow(clippy::await_holding_lock)] // TODO: Clippy doesn't understand drop()
|
||||
let (req_tx, queue_depth) = async {
|
||||
let mut streams = self.streams.lock().unwrap();
|
||||
@@ -480,7 +484,8 @@ impl StreamPool {
|
||||
|
||||
/// Runs a stream task. This acquires a client from the `ClientPool` and establishes a
|
||||
/// bidirectional GetPage stream, then forwards requests and responses between callers and the
|
||||
/// stream. It does not track or enforce queue depths, see `send()`.
|
||||
/// stream. It does not track or enforce queue depths -- that's done by `send()` since it must
|
||||
/// be atomic with pool stream acquisition.
|
||||
///
|
||||
/// The task exits when the request channel is closed, or on a stream error. The caller is
|
||||
/// responsible for removing the stream from the pool on exit.
|
||||
|
||||
@@ -602,6 +602,21 @@ impl TryFrom<tonic::Code> for GetPageStatusCode {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetPageStatusCode> for tonic::Code {
|
||||
fn from(status_code: GetPageStatusCode) -> Self {
|
||||
use tonic::Code;
|
||||
|
||||
match status_code {
|
||||
GetPageStatusCode::Unknown => Code::Unknown,
|
||||
GetPageStatusCode::Ok => Code::Ok,
|
||||
GetPageStatusCode::NotFound => Code::NotFound,
|
||||
GetPageStatusCode::InvalidRequest => Code::InvalidArgument,
|
||||
GetPageStatusCode::InternalError => Code::Internal,
|
||||
GetPageStatusCode::SlowDown => Code::ResourceExhausted,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
|
||||
// shards will error.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
|
||||
Reference in New Issue
Block a user