diff --git a/Cargo.lock b/Cargo.lock index 08aec2e443..d9caf22c93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4615,6 +4615,7 @@ dependencies = [ "pageserver_page_api", "priority-queue", "rand 0.8.5", + "scopeguard", "thiserror 1.0.69", "tokio", "tokio-stream", diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index ae4ed72052..0e9bf9ccdf 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -20,6 +20,7 @@ hyper-util = "0.1.9" hyper = "1.6.0" metrics.workspace = true priority-queue = "2.3.1" +scopeguard.workspace = true async-trait = { version = "0.1" } tokio-stream = "0.1" dashmap = "5" diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 38d387bc42..749ce2aeb0 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -1,103 +1,18 @@ -use std::collections::{BTreeMap, VecDeque}; -use std::future::Future; +use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; -use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; -use tokio::sync::{Semaphore, SemaphorePermit}; +use futures::StreamExt; +use scopeguard::defer; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tonic::transport::{Channel, Endpoint}; -use pageserver_page_api as page_api; +use pageserver_page_api::{self as page_api, GetPageRequest, GetPageResponse}; +use tracing::warn; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -/// Constructs new pool items. -/// TODO: use a proper error type. -type Maker = Box Pin>>> + Send + Sync>; - -/// A resource pool. This is used to manage gRPC channels, clients, and stream. -/// -/// An item is only handed out to a single user at a time. New items will be created up to the pool -/// limit, if specified. -pub struct Pool { - /// Creates new pool items. - maker: Maker, - /// Idle items in the pool. Returned items are pushed to the front of the queue, so that the - /// oldest idle items are kept at the back. - /// - /// TODO: reap idle items after some time. - /// TODO: consider prewarming items. - idle: Arc>>, - /// Limits the max number of items managed by the pool. - limiter: Semaphore, -} - -impl Pool { - /// Create a new pool with the specified limit. - pub fn new(maker: Maker, limit: Option) -> Self { - Self { - maker, - idle: Default::default(), - limiter: Semaphore::new(limit.unwrap_or(Semaphore::MAX_PERMITS)), - } - } - - /// Gets an item from the pool, or creates a new one if necessary. Blocks if the pool is at its - /// limit. The item is returned to the pool when the guard is dropped. - pub async fn get(&self) -> anyhow::Result> { - let permit = self.limiter.acquire().await.expect("never closed"); - - // Acquire an idle item from the pool, or create a new one. - let item = self.idle.lock().unwrap().pop_front(); - let item = match item { - Some(item) => item, - // TODO: if an item is returned while we're waiting, use the returned item instead. - None => (self.maker)().await?, - }; - - Ok(PoolGuard { - pool: self, - permit, - item: Some(item), - }) - } -} - -/// A guard for a pooled item. -pub struct PoolGuard<'a, T> { - pool: &'a Pool, - permit: SemaphorePermit<'a>, - item: Option, // only None during drop -} - -impl Deref for PoolGuard<'_, T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - self.item.as_ref().expect("not dropped") - } -} - -impl DerefMut for PoolGuard<'_, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - self.item.as_mut().expect("not dropped") - } -} - -impl Drop for PoolGuard<'_, T> { - fn drop(&mut self) { - // Return the item to the pool. - self.pool - .idle - .lock() - .unwrap() - .push_front(self.item.take().expect("only dropped once")); - // The permit will be returned by its drop handler. Tag it here for visibility. - _ = self.permit; - } -} - /// A gRPC channel pool. A channel is shared by many clients, using HTTP/2 stream multiplexing. /// This pool allows an unlimited number of channels. Concurrency is limited by ClientPool. It is /// not performance-critical, because clients (and thus channels) will be reused by ClientPool. @@ -110,7 +25,7 @@ struct ChannelPool { /// Pageserver endpoint to connect to. endpoint: Endpoint, /// Open channels. - channels: Arc>>, + channels: Mutex>, } type ChannelID = usize; @@ -128,14 +43,14 @@ impl ChannelPool { /// TODO: tune this. /// TODO: consider having separate limits for unary and streaming clients. This way, a channel /// that's full of streaming requests also has room for a few unary requests. - const CLIENTS_PER_CHANNEL: usize = 20; + const CLIENTS_PER_CHANNEL: usize = 16; /// Creates a new channel pool for the given Pageserver URL. - pub fn new(url: String) -> anyhow::Result { - Ok(Self { + pub fn new(url: String) -> anyhow::Result> { + Ok(Arc::new(Self { endpoint: Endpoint::from_shared(url)?, channels: Default::default(), - }) + })) } /// Acquires a new gRPC channel. @@ -143,7 +58,7 @@ impl ChannelPool { /// NB: this is not particularly performance-sensitive. It is called rarely since clients are /// cached and reused by ClientPool, and the number of channels will be small. O(n) performance /// is therefore okay. - pub fn get(&self) -> anyhow::Result> { + pub fn get(self: Arc) -> anyhow::Result { let mut channels = self.channels.lock().unwrap(); // Find an existing channel with available capacity. We check entries in BTreeMap order, @@ -153,7 +68,7 @@ impl ChannelPool { if entry.clients < Self::CLIENTS_PER_CHANNEL { entry.clients += 1; return Ok(ChannelGuard { - pool: self, + pool: Arc::downgrade(&self), id, channel: Some(entry.channel.clone()), }); @@ -165,7 +80,7 @@ impl ChannelPool { let id = channels.keys().last().copied().unwrap_or_default(); let channel = self.endpoint.connect_lazy(); let guard = ChannelGuard { - pool: self, + pool: Arc::downgrade(&self), id, channel: Some(channel.clone()), }; @@ -179,13 +94,13 @@ impl ChannelPool { } } -struct ChannelGuard<'a> { - pool: &'a ChannelPool, +struct ChannelGuard { + pool: Weak, id: ChannelID, channel: Option, } -impl<'a> ChannelGuard<'a> { +impl ChannelGuard { /// Returns the inner channel. Can only be called once. The caller must hold onto the guard as /// long as the channel is in use, and should not clone it. /// @@ -198,9 +113,12 @@ impl<'a> ChannelGuard<'a> { } /// Returns the channel to the pool. -impl Drop for ChannelGuard<'_> { +impl Drop for ChannelGuard { fn drop(&mut self) { - let mut channels = self.pool.channels.lock().unwrap(); + let Some(pool) = self.pool.upgrade() else { + return; // pool was dropped + }; + let mut channels = pool.channels.lock().unwrap(); let entry = channels.get_mut(&self.id).expect("unknown channel"); assert!(entry.clients > 0, "channel clients underflow"); entry.clients -= 1; @@ -208,7 +126,7 @@ impl Drop for ChannelGuard<'_> { } /// A pool of gRPC clients. -pub struct ClientPool<'a> { +pub struct ClientPool { /// Tenant ID. tenant_id: TenantId, /// Timeline ID. @@ -218,24 +136,26 @@ pub struct ClientPool<'a> { /// Authentication token, if any. auth_token: Option, /// Channel pool. - channels: ChannelPool, + channels: Arc, /// Limits the max number of concurrent clients. - limiter: Semaphore, - /// Idle clients in the pool. - idle: Arc>>>, + limiter: Arc, + /// Idle clients in the pool. This is sorted by channel ID and client ID, such that we use idle + /// clients from the lower-numbered channels first. This allows us to reap the higher-numbered + /// channels as they become idle. + idle: Mutex>, /// Unique client ID generator. next_client_id: AtomicUsize, } type ClientID = usize; type ClientKey = (ChannelID, ClientID); -struct ClientEntry<'a> { +struct ClientEntry { client: page_api::Client, - channel_guard: ChannelGuard<'a>, + channel_guard: ChannelGuard, } -impl<'a> ClientPool<'a> { - const CLIENT_LIMIT: usize = 100; // TODO: make this configurable +impl ClientPool { + const CLIENT_LIMIT: usize = 64; // TODO: make this configurable /// Creates a new client pool for the given Pageserver and tenant shard. pub fn new( @@ -251,22 +171,27 @@ impl<'a> ClientPool<'a> { shard_id, auth_token, channels: ChannelPool::new(url)?, - idle: Arc::default(), - limiter: Semaphore::new(Self::CLIENT_LIMIT), + idle: Mutex::default(), + limiter: Arc::new(Semaphore::new(Self::CLIENT_LIMIT)), next_client_id: AtomicUsize::default(), }) } /// Gets a client from the pool, or creates a new one if necessary. The client is returned to /// the pool when the guard is dropped. - pub async fn get(&'a self) -> anyhow::Result> { - let permit = self.limiter.acquire().await.expect("never closed"); + pub async fn get(self: Arc) -> anyhow::Result { + let permit = self + .limiter + .clone() + .acquire_owned() + .await + .expect("never closed"); let mut idle = self.idle.lock().unwrap(); // Fast path: acquire an idle client from the pool. if let Some(((_, id), entry)) = idle.pop_first() { return Ok(ClientGuard { - pool: self, + pool: Arc::downgrade(&self), id, client: Some(entry.client), channel_guard: Some(entry.channel_guard), @@ -275,7 +200,7 @@ impl<'a> ClientPool<'a> { } // Slow path: construct a new client. - let mut channel_guard = self.channels.get()?; // never blocks (lazy connection) + let mut channel_guard = self.channels.clone().get()?; // never blocks (lazy connection) let id = self.next_client_id.fetch_add(1, Ordering::Relaxed); let client = page_api::Client::new( @@ -288,7 +213,7 @@ impl<'a> ClientPool<'a> { )?; Ok(ClientGuard { - pool: self, + pool: Arc::downgrade(&self), id, client: Some(client), channel_guard: Some(channel_guard), @@ -297,18 +222,35 @@ impl<'a> ClientPool<'a> { } } -pub struct ClientGuard<'a> { - pool: &'a ClientPool<'a>, +pub struct ClientGuard { + pool: Weak, id: ClientID, client: Option, - channel_guard: Option>, - permit: SemaphorePermit<'a>, + channel_guard: Option, + permit: OwnedSemaphorePermit, +} + +impl Deref for ClientGuard { + type Target = page_api::Client; + + fn deref(&self) -> &Self::Target { + self.client.as_ref().expect("not dropped") + } +} + +impl DerefMut for ClientGuard { + fn deref_mut(&mut self) -> &mut Self::Target { + self.client.as_mut().expect("not dropped") + } } // Returns the client to the pool. -impl Drop for ClientGuard<'_> { +impl Drop for ClientGuard { fn drop(&mut self) { - let mut idle = self.pool.idle.lock().unwrap(); + let Some(pool) = self.pool.upgrade() else { + return; // pool was dropped + }; + let mut idle = pool.idle.lock().unwrap(); let client = self.client.take().expect("dropped once"); let channel_guard = self.channel_guard.take().expect("dropped once"); let channel_id = channel_guard.id; @@ -322,3 +264,191 @@ impl Drop for ClientGuard<'_> { _ = self.permit; } } + +/// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. +/// TODO: consider making this generic over request and response types, but not currently needed. +/// +/// Individual streams are not exposed to callers -- instead, callers can send invididual requests +/// to the pool and await a response. Internally, requests are multiplexed over streams and +/// channels. +pub struct StreamPool { + /// gRPC client pool. + clients: Arc, + /// All pooled streams. + /// + /// TODO: this must use something more sophisticated. This is on the GetPage hot path, so we + /// want cheap concurrent access in the common case. We also want to prioritize using streams + /// that belong to lower-numbered channels and clients first, such that we can reap + /// higher-numbered channels and clients as they become idle. And we can't hold a lock on this + /// while we're spinning up new streams, but we want to install an entry prior to spinning it up + /// such that other requests can join onto it (we won't know the client/channel ID until we've + /// acquired a client from the client pool which may block). + streams: Arc>>, + /// Limits the max number of concurrent requests (not streams). + limiter: Semaphore, + /// Stream ID generator. + next_stream_id: AtomicUsize, +} + +type StreamID = usize; +type StreamSender = tokio::sync::mpsc::Sender<(GetPageRequest, ResponseSender)>; +type StreamReceiver = tokio::sync::mpsc::Receiver<(GetPageRequest, ResponseSender)>; +type ResponseSender = tokio::sync::oneshot::Sender>; + +struct StreamEntry { + /// The request stream sender. The stream task exits when this is dropped. + sender: StreamSender, + /// Number of in-flight requests on this stream. + queue_depth: Arc, +} + +impl StreamPool { + /// Max number of concurrent requests per stream. + const STREAM_QUEUE_DEPTH: usize = 2; + /// Max number of concurrent requests in flight. + const TOTAL_QUEUE_DEPTH: usize = ClientPool::CLIENT_LIMIT * Self::STREAM_QUEUE_DEPTH; + + /// Creates a new stream pool, using the given client pool. + pub fn new(clients: Arc) -> Self { + Self { + clients, + streams: Arc::default(), + limiter: Semaphore::new(Self::TOTAL_QUEUE_DEPTH), + next_stream_id: AtomicUsize::default(), + } + } + + /// Sends a request via the stream pool, returning a response. + pub async fn send(&self, req: GetPageRequest) -> tonic::Result { + // Acquire a permit. For simplicity, we drop it when this method returns, even if the + // request is still in flight because the caller went away. We do the same for queue depth. + let _permit = self.limiter.acquire().await.expect("never closed"); + + // Acquire a stream from the pool. + #[allow(clippy::await_holding_lock)] // TODO: Clippy doesn't understand drop() + let (req_tx, queue_depth) = async { + let mut streams = self.streams.lock().unwrap(); + + // Try to find an existing stream with available capacity. + for entry in streams.values() { + if entry + .queue_depth + // TODO: review ordering. + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |depth| { + (depth < Self::STREAM_QUEUE_DEPTH).then_some(depth + 1) + }) + .is_ok() + { + return anyhow::Ok((entry.sender.clone(), entry.queue_depth.clone())); + } + } + + // No available stream, spin up a new one. We install the stream entry first and release + // the lock. This will allow other requests to join onto this stream while we're + // spinning up the task, and also create additional streams concurrently when full. + let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); + let queue_depth = Arc::new(AtomicUsize::new(1)); + let (req_tx, req_rx) = tokio::sync::mpsc::channel(Self::STREAM_QUEUE_DEPTH); + streams.insert( + id, + StreamEntry { + sender: req_tx.clone(), + queue_depth: queue_depth.clone(), + }, + ); + drop(streams); // drop lock before spinning up task + + let clients = self.clients.clone(); + let streams = self.streams.clone(); + + tokio::spawn(async move { + if let Err(err) = Self::run_stream(clients, req_rx).await { + warn!("stream failed: {err}"); + } + // Remove stream from pool on exit. + let entry = streams.lock().unwrap().remove(&id); + assert!(entry.is_some(), "unknown stream ID: {id}"); + }); + + anyhow::Ok((req_tx, queue_depth)) + } + .await + .map_err(|err| tonic::Status::internal(err.to_string()))?; + + // Decrement the queue depth on return. We incremented it above, so we also decrement it + // here, even though that could prematurely decrement it before the response arrives. + defer!(queue_depth.fetch_sub(1, Ordering::SeqCst);); + + // Send the request and wait for the response. + let (resp_tx, resp_rx) = tokio::sync::oneshot::channel(); + + req_tx + .send((req, resp_tx)) + .await + .map_err(|_| tonic::Status::unavailable("stream closed"))?; + + resp_rx + .await + .map_err(|_| tonic::Status::unavailable("stream closed"))? + } + + /// Runs a stream task. + async fn run_stream( + client_pool: Arc, + mut caller_rx: StreamReceiver, + ) -> anyhow::Result<()> { + // Acquire a client from the pool and create a stream. + let mut client_guard = client_pool.get().await?; + let client = client_guard.deref_mut(); + + let (req_tx, req_rx) = tokio::sync::mpsc::channel(Self::STREAM_QUEUE_DEPTH); + let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); + let mut resp_stream = client.get_pages(req_stream).await?; + + // Track caller response channels by request ID. If the task returns early, the response + // channels will be dropped and the callers will receive an error. + let mut callers = HashMap::with_capacity(Self::STREAM_QUEUE_DEPTH); + + // Process requests and responses. + loop { + tokio::select! { + // Receive requests from callers and send them to the stream. + req = caller_rx.recv() => { + // Shut down if input channel is closed. + let Some((req, resp_tx)) = req else { + return Ok(()); // stream closed + }; + + // Store the response channel by request ID. + if callers.contains_key(&req.request_id) { + _ = resp_tx.send(Err(tonic::Status::invalid_argument( + format!("duplicate request ID: {}", req.request_id), + ))); + continue; + } + callers.insert(req.request_id, resp_tx); + + // Send the request on the stream. Bail out on send errors. + req_tx.send(req).await.map_err(|_| { + tonic::Status::unavailable("stream closed") + })?; + } + + // Receive responses from the stream and send them to callers. + resp = resp_stream.next() => { + // Shut down if the stream is closed, and bail out on stream errors. + let Some(resp) = resp.transpose()? else { + return Ok(()) + }; + + // Send the response to the caller. + let Some(resp_tx) = callers.remove(&resp.request_id) else { + warn!("received response for unknown request ID: {}", resp.request_id); + continue; + }; + _ = resp_tx.send(Ok(resp)); // ignore error if caller went away + } + } + } + } +}