Misc pool improvements

This commit is contained in:
Erik Grinaker
2025-07-02 14:41:47 +02:00
parent 7012b4aa90
commit 1ec63bd6bc
2 changed files with 198 additions and 158 deletions

View File

@@ -8,7 +8,7 @@ use utils::backoff;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamPool};
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
/// basic `page_api::Client` gRPC client, and supports:
@@ -93,10 +93,10 @@ impl PageserverClient {
) -> tonic::Result<page_api::GetPageResponse> {
// TODO: support multiple shards.
let shard_id = ShardIndex::unsharded();
let streams = self.get_shard_streams(shard_id)?;
self.with_retries("get_page", async || {
let resp = streams.send(req.clone()).await?;
let stream = self.get_shard_stream(shard_id).await?;
let resp = stream.send(req.clone()).await?;
if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new(
@@ -151,14 +151,16 @@ impl PageserverClient {
.map_err(|err| tonic::Status::internal(format!("failed to acquire client: {err}")))
}
/// Returns the stream pool for the given shard.
/// Returns a pooled stream for the given shard.
#[allow(clippy::result_large_err)] // TODO: revisit
fn get_shard_streams(&self, shard_id: ShardIndex) -> tonic::Result<&Arc<StreamPool>> {
Ok(&self
async fn get_shard_stream(&self, shard_id: ShardIndex) -> tonic::Result<StreamGuard> {
Ok(self
.pools
.get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))?
.streams)
.streams
.get()
.await)
}
/// Returns the shard index for shard 0.
@@ -215,12 +217,10 @@ impl PageserverClient {
///
/// TODO: consider separate pools for normal and bulk traffic, with different settings.
struct ShardPools {
/// Manages gRPC channels (i.e. TCP connections) for this shard.
#[allow(unused)]
channels: Arc<ChannelPool>,
/// Manages gRPC clients for this shard, using `channels`.
/// Manages unary gRPC clients for this shard.
clients: Arc<ClientPool>,
/// Manages gRPC GetPage streams for this shard, using `clients`.
/// Manages gRPC GetPage streams for this shard. Uses a dedicated client pool, but shares the
/// channel pool with unary clients.
streams: Arc<StreamPool>,
}
@@ -233,20 +233,26 @@ impl ShardPools {
shard_id: ShardIndex,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
// Use a common channel pool for all clients, to multiplex unary and stream requests across
// the same TCP connections. The channel pool is unbounded (client pools are bounded).
let channels = ChannelPool::new(url)?;
// Dedicated client pool for unary requests.
let clients = ClientPool::new(
channels.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token,
auth_token.clone(),
);
let streams = StreamPool::new(clients.clone());
Ok(Self {
channels,
clients,
streams,
})
// Dedicated client pool for streams. If this shared a client pool with unary requests,
// long-lived streams could fill up the client pool and starve out unary requests. It
// shares the same underlying channel pool with unary clients though.
let stream_clients =
ClientPool::new(channels, tenant_id, timeline_id, shard_id, auth_token);
let streams = StreamPool::new(stream_clients);
Ok(Self { clients, streams })
}
}

View File

@@ -1,8 +1,8 @@
//! This module provides various Pageserver gRPC client resource pools.
//!
//! These pools are designed to reuse gRPC resources (connections, clients, and streams) across
//! multiple callers (i.e. Postgres backends). This avoids the resource cost and latency of creating
//! a dedicated TCP connection and server task for every Postgres backend.
//! multiple concurrent callers (i.e. Postgres backends). This avoids the resource cost and latency
//! of creating dedicated TCP connections and server tasks for every Postgres backend.
//!
//! Each resource has its own, nested pool. The pools are custom-built for the properties of each
//! resource -- they are different enough that a generic pool isn't suitable.
@@ -18,14 +18,17 @@
//!
//! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from
//! the ClientPool for the stream's lifetime. Internal streams are not exposed to callers;
//! instead, callers submit individual GetPage requests to the pool and await a response.
//! Internally, the pool will reuse or spin up a suitable stream for the request, possibly
//! pipelining multiple requests from multiple callers on the same stream (up to some queue
//! depth), and route the response back to the original caller. Idle streams may be removed from
//! the pool after some time, to free up the client.
//!
//! instead, it returns a guard can be used to send a single request, to properly enforce queue
//! depth and route responses. Internally, the pool will reuse or spin up a suitable stream for
//! the request, possibly pipelining multiple requests from multiple callers on the same stream
//! (up to some queue depth). Idle streams may be removed from the pool after some time, to free
//! up the client.
//!
//! Each channel corresponds to one TCP connection. Each client unary request and each stream
//! corresponds to one HTTP/2 stream and server task.
//!
//! TODO: error handling (including custom error types).
//! TODO: observability.
use std::collections::{BTreeMap, HashMap};
use std::ops::{Deref, DerefMut};
@@ -33,17 +36,16 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use futures::StreamExt as _;
use scopeguard::defer;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
use tonic::transport::{Channel, Endpoint};
use tracing::warn;
use tracing::{error, warn};
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
// TODO: tune these constants, and consider making them configurable.
// TODO: tune these constants, and make them configurable.
/// Max number of concurrent clients per channel.
///
@@ -51,8 +53,7 @@ use utils::shard::ShardIndex;
/// with only streams.
const CLIENTS_PER_CHANNEL: usize = 16;
/// Maximum number of concurrent clients per `ClientPool`. This bounds the number of channels as
/// CLIENT_LIMIT / CLIENTS_PER_CHANNEL.
/// Maximum number of concurrent clients per `ClientPool`.
const CLIENT_LIMIT: usize = 64;
/// Max number of pipelined requests per gRPC GetPage stream.
@@ -62,18 +63,18 @@ const STREAM_QUEUE_DEPTH: usize = 2;
/// stream multiplexing), up to `CLIENTS_PER_CHANNEL`. The pool does not limit the number of
/// channels, and instead relies on `ClientPool` to limit the number of concurrent clients.
///
/// The pool is always wrapped in an outer `Arc`, to allow long-lived references from guards.
///
/// Tonic will automatically retry the underlying connection if it fails, so there is no need
/// to re-establish connections on errors.
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
///
/// TODO: reap idle channels.
/// TODO: consider prewarming a set of channels, to avoid initial connection latency.
/// TODO: consider adding a circuit breaker for errors and fail fast.
pub struct ChannelPool {
/// Pageserver endpoint to connect to.
endpoint: Endpoint,
/// Open channels.
channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
/// Channel ID generator.
next_channel_id: AtomicUsize,
}
type ChannelID = usize;
@@ -94,14 +95,16 @@ impl ChannelPool {
{
Ok(Arc::new(Self {
endpoint: endpoint.try_into()?,
channels: Default::default(),
channels: Mutex::default(),
next_channel_id: AtomicUsize::default(),
}))
}
/// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
///
/// This never blocks (except for sync mutex acquisition). The channel is connected lazily on
/// first use, and the `ChannelPool` does not have a channel limit.
/// This never blocks (except for mutex acquisition). The channel is connected lazily on first
/// use, and the `ChannelPool` does not have a channel limit. Channels will be re-established
/// automatically on failure (TODO: verify).
///
/// Callers should not clone the returned channel, and must hold onto the returned guard as long
/// as the channel is in use. It is unfortunately not possible to enforce this: the Protobuf
@@ -115,9 +118,9 @@ impl ChannelPool {
let mut channels = self.channels.lock().unwrap();
// Try to find an existing channel with available capacity. We check entries in BTreeMap
// order, to fill up the lower-ordered channels first. The ClientPool also uses clients with
// lower-ordered channel IDs first. This will cluster clients in lower-ordered channels, and
// free up higher-ordered channels such that they can be reaped.
// order, to fill up the lower-ordered channels first. The ClientPool also prefers clients
// with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// channels, and free up higher-ordered channels such that they can be reaped.
for (&id, entry) in channels.iter_mut() {
assert!(entry.clients <= CLIENTS_PER_CHANNEL, "channel overflow");
if entry.clients < CLIENTS_PER_CHANNEL {
@@ -130,11 +133,11 @@ impl ChannelPool {
}
}
// Create a new channel. We connect lazily on the first use, such that we don't block here
// and other clients can join onto the same channel while it's connecting.
// Create a new channel. We connect lazily on first use, such that we don't block here and
// other clients can join onto the same channel while it's connecting.
let channel = self.endpoint.connect_lazy();
let id = channels.keys().last().copied().unwrap_or_default();
let id = self.next_channel_id.fetch_add(1, Ordering::Relaxed);
let entry = ChannelEntry {
channel: channel.clone(),
clients: 1, // we're returning the guard below
@@ -144,14 +147,13 @@ impl ChannelPool {
ChannelGuard {
pool: Arc::downgrade(self),
id,
channel: Some(channel.clone()),
channel: Some(channel),
}
}
}
/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`.
/// However, the caller must hold onto the guard as long as it's using the channel, and should not
/// clone it.
/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
/// since the gRPC client requires an owned `Channel`.
pub struct ChannelGuard {
pool: Weak<ChannelPool>,
id: ChannelID,
@@ -159,8 +161,8 @@ pub struct ChannelGuard {
}
impl ChannelGuard {
/// Returns the inner channel. Panics if called more than once. The caller must hold onto the
/// guard as long as the channel is in use, and should not clone it.
/// Returns the inner owned channel. Panics if called more than once. The caller must hold onto
/// the guard as long as the channel is in use, and should not clone it.
pub fn take(&mut self) -> Channel {
self.channel.take().expect("channel already taken")
}
@@ -180,14 +182,12 @@ impl Drop for ChannelGuard {
}
/// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner
/// `ChannelPool`. A client is only acquired by a single caller at a time. The pool limits the total
/// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total
/// number of concurrent clients to `CLIENT_LIMIT` via semaphore.
///
/// The pool is always wrapped in an outer `Arc`, to allow long-lived references from guards.
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
///
/// TODO: reap idle clients.
/// TODO: error handling (but channel will be reconnected automatically).
/// TODO: rate limiting.
pub struct ClientPool {
/// Tenant ID.
tenant_id: TenantId,
@@ -204,7 +204,7 @@ pub struct ClientPool {
/// Idle pooled clients. Acquired clients are removed from here and returned on drop.
///
/// The first client in the map will be acquired next. The map is sorted by client ID, which in
/// turn is sorted by the channel ID, such that we prefer acquiring idle clients from
/// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
/// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle
/// clients are reaped.
idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
@@ -241,12 +241,12 @@ impl ClientPool {
})
}
/// Gets a client from the pool, or creates a new one if necessary. Blocks if the pool is at
/// `CLIENT_LIMIT`, but connection happens lazily (if needed). The client is returned to the
/// pool when the guard is dropped.
/// Gets a client from the pool, or creates a new one if necessary. Connections are established
/// lazily and does not block, but this call can block if the pool is at `CLIENT_LIMIT`. The
/// client is returned to the pool when the guard is dropped.
///
/// This is moderately performance-sensitive. It is called for every unary request, but recall
/// that these establish a new gRPC stream per request so it's already expensive. GetPage
/// that these establish a new gRPC stream per request so they're already expensive. GetPage
/// requests use the `StreamPool` instead.
pub async fn get(self: &Arc<Self>) -> anyhow::Result<ClientGuard> {
let permit = self
@@ -291,7 +291,7 @@ impl ClientPool {
}
}
/// A client acquired from the pool. The inner client can be accessed via derefs. The client is
/// A client acquired from the pool. The inner client can be accessed via Deref. The client is
/// returned to the pool when dropped.
pub struct ClientGuard {
pool: Weak<ClientPool>,
@@ -327,32 +327,30 @@ impl Drop for ClientGuard {
};
pool.idle.lock().unwrap().insert(self.id, entry);
// The permit will be returned by its drop handler. Tag it here for visibility.
_ = self.permit;
_ = self.permit; // returned on drop, referenced for visibility
}
}
/// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
/// acquires a client from the inner `ClientPool` for the stream's lifetime.
///
/// Individual streams are not exposed to callers -- instead, callers submit invididual requests to
/// the pool and await a response. Internally, requests are multiplexed across streams and channels.
/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send
/// a single request and await the response. Internally, requests are multiplexed across streams and
/// channels. This allows proper queue depth enforcement and response routing.
///
/// TODO: reap idle streams.
/// TODO: error handling (but channel will be reconnected automatically).
/// TODO: rate limiting.
/// TODO: consider making this generic over request and response types; not currently needed.
pub struct StreamPool {
/// The client pool to acquire clients from.
client_pool: Arc<ClientPool>,
/// All pooled streams.
///
/// Incoming requests will be sent over an existing stream with available capacity, or a new
/// stream is spun up and added to the pool. Each stream has an associated Tokio task that
/// processes requests and responses.
/// Incoming requests will be sent over an existing stream with available capacity. If all
/// streams are full, a new one is spun up and added to the pool (up to the `ClientPool` limit).
/// Each stream has an associated Tokio task that processes requests and responses.
streams: Arc<Mutex<HashMap<StreamID, StreamEntry>>>,
/// Limits the max number of concurrent requests (not streams).
limiter: Semaphore,
limiter: Arc<Semaphore>,
/// Stream ID generator.
next_stream_id: AtomicUsize,
}
@@ -372,21 +370,23 @@ struct StreamEntry {
impl StreamPool {
/// Creates a new stream pool, using the given client pool.
///
/// NB: the stream pool should use a dedicated client pool. Otherwise, long-lived streams may
/// fill up the client pool and starve out unary requests. Client pools can share the same
/// `ChannelPool` though, since the channel pool is unbounded.
pub fn new(client_pool: Arc<ClientPool>) -> Arc<Self> {
Arc::new(Self {
client_pool,
streams: Arc::default(),
limiter: Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH),
limiter: Arc::new(Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH)),
next_stream_id: AtomicUsize::default(),
})
}
/// Sends a request via the stream pool and awaits the response. Blocks if the pool is at
/// capacity (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight). The
/// `GetPageRequest::request_id` must be unique across in-flight request.
///
/// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
/// to avoid tearing down the stream for per-request errors. Callers must check this.
/// Acquires an available stream from the pool, or spins up a new stream async if all streams
/// are full. Returns a guard that can be used to send a single request on the stream and await
/// the response, with queue depth quota already acquired. Blocks if the pool is at capacity
/// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight).
///
/// This is very performance-sensitive, as it is on the GetPage hot path.
///
@@ -400,95 +400,84 @@ impl StreamPool {
/// * Allow spinning up multiple streams concurrently, but don't overshoot limits.
///
/// For now, we just do something simple and functional, but very inefficient (linear scan).
pub async fn send(
&self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
// Acquire a permit. For simplicity, we drop it when this method returns. This may exceed
// the queue depth if a caller goes away while a request is in flight, but that's okay. We
// do the same for queue depth tracking.
let _permit = self.limiter.acquire().await.expect("never closed");
pub async fn get(&self) -> StreamGuard {
let permit = self
.limiter
.clone()
.acquire_owned()
.await
.expect("never closed");
let mut streams = self.streams.lock().unwrap();
// Acquire a stream sender. We increment and decrement the queue depth here while acquiring
// a stream, instead of in the stream task, to ensure we don't acquire a full stream.
#[allow(clippy::await_holding_lock)] // TODO: Clippy doesn't understand drop()
let (req_tx, queue_depth) = async {
let mut streams = self.streams.lock().unwrap();
// Try to find an existing stream with available capacity.
for entry in streams.values() {
assert!(
entry.queue_depth.load(Ordering::Relaxed) <= STREAM_QUEUE_DEPTH,
"stream overflow"
);
if entry
.queue_depth
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| {
// Increment the queue depth via compare-and-swap.
// TODO: review ordering.
(queue_depth < STREAM_QUEUE_DEPTH).then_some(queue_depth + 1)
})
.is_ok()
{
return anyhow::Ok((entry.sender.clone(), entry.queue_depth.clone()));
}
// Look for a pooled stream with available capacity.
for entry in streams.values() {
assert!(
entry.queue_depth.load(Ordering::Relaxed) <= STREAM_QUEUE_DEPTH,
"stream queue overflow"
);
if entry
.queue_depth
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| {
// Increment the queue depth via compare-and-swap.
// TODO: review ordering.
(queue_depth < STREAM_QUEUE_DEPTH).then_some(queue_depth + 1)
})
.is_ok()
{
return StreamGuard {
sender: entry.sender.clone(),
queue_depth: entry.queue_depth.clone(),
permit,
};
}
// No available stream, spin up a new one. We install the stream entry first and release
// the lock, to allow other callers to join onto this stream and also create additional
// streams concurrently when this fills up.
let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
let queue_depth = Arc::new(AtomicUsize::new(1)); // account for this request
let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH);
let entry = StreamEntry {
sender: req_tx.clone(),
queue_depth: queue_depth.clone(),
};
streams.insert(id, entry);
drop(streams); // drop lock before spinning up stream
let client_pool = self.client_pool.clone();
let streams = self.streams.clone();
tokio::spawn(async move {
if let Err(err) = Self::run_stream(client_pool, req_rx).await {
warn!("stream failed: {err}");
}
// Remove stream from pool on exit.
let entry = streams.lock().unwrap().remove(&id);
assert!(entry.is_some(), "unknown stream ID: {id}");
});
anyhow::Ok((req_tx, queue_depth))
}
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
// Decrement the queue depth on return. This may prematurely decrement it if the caller goes
// away while the request is in flight, but that's okay.
defer!(
let prev_queue_depth = queue_depth.fetch_sub(1, Ordering::SeqCst);
assert!(prev_queue_depth > 0, "stream underflow");
);
// No available stream, spin up a new one. We install the stream entry in the pool first and
// return the guard, while spinning up the stream task async. This allows other callers to
// join onto this stream and also create additional streams concurrently if this fills up.
//
// NB: we have to be careful not to overshoot here. The semaphore limit is CLIENT_LIMIT *
// STREAM_QUEUE_DEPTH, but if we were to miss a concurrent queue depth allocation we'd try
// to spin up more streams than CLIENT_LIMIT and block on the client pool ~forever. Because
// we only acquire queue depth under lock.
let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
let queue_depth = Arc::new(AtomicUsize::new(1)); // reserve quota for this caller
let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH);
let entry = StreamEntry {
sender: req_tx.clone(),
queue_depth: queue_depth.clone(),
};
streams.insert(id, entry);
// Send the request and wait for the response.
let (resp_tx, resp_rx) = oneshot::channel();
// NB: make sure we don't overshoot the client limit. The semaphore limit is CLIENT_LIMIT *
// STREAM_QUEUE_DEPTH, but if we were to misaccount queue depth we'd try to spin up more
// streams than CLIENT_LIMIT and block on the client pool ~forever. This should not be
// possible because we only acquire queue depth under lock.
assert!(streams.len() <= CLIENT_LIMIT, "stream overflow");
req_tx
.send((req, resp_tx))
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?;
let client_pool = self.client_pool.clone();
let streams = self.streams.clone();
resp_rx
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?
tokio::spawn(async move {
if let Err(err) = Self::run_stream(client_pool, req_rx).await {
error!("stream failed: {err}");
}
// Remove stream from pool on exit.
let entry = streams.lock().unwrap().remove(&id);
assert!(entry.is_some(), "unknown stream ID: {id}");
});
StreamGuard {
sender: req_tx,
queue_depth,
permit,
}
}
/// Runs a stream task. This acquires a client from the `ClientPool` and establishes a
/// bidirectional GetPage stream, then forwards requests and responses between callers and the
/// stream. It does not track or enforce queue depths -- that's done by `send()` since it must
/// be atomic with pool stream acquisition.
/// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be
/// atomic with pool stream acquisition.
///
/// The task exits when the request channel is closed, or on a stream error. The caller is
/// responsible for removing the stream from the pool on exit.
@@ -504,13 +493,13 @@ impl StreamPool {
let mut resp_stream = client.get_pages(req_stream).await?;
// Track caller response channels by request ID. If the task returns early, these response
// channels will be dropped and the callers will receive an error.
// channels will be dropped and the waiting callers will receive an error.
let mut callers = HashMap::with_capacity(STREAM_QUEUE_DEPTH);
// Process requests and responses.
loop {
// NB: this can trip if the server doesn't respond to a request, so only debug_assert.
debug_assert!(callers.len() <= STREAM_QUEUE_DEPTH, "stream overflow");
debug_assert!(callers.len() <= STREAM_QUEUE_DEPTH, "stream queue overflow");
tokio::select! {
// Receive requests from callers and send them to the stream.
@@ -554,3 +543,48 @@ impl StreamPool {
}
}
}
/// A pooled stream reference. Can be used to send a single request, to properly enforce queue
/// depth. Queue depth is already reserved and will be returned on drop.
pub struct StreamGuard {
sender: RequestSender,
queue_depth: Arc<AtomicUsize>,
permit: OwnedSemaphorePermit,
}
impl StreamGuard {
/// Sends a request on the stream and awaits the response. Consumes the guard, since it's only
/// valid for a single request (to enforce queue depth). This also drops the guard on return and
/// returns the queue depth quota to the pool.
///
/// The `GetPageRequest::request_id` must be unique across in-flight request.
///
/// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
/// to avoid tearing down the stream for per-request errors. Callers must check this.
pub async fn send(
self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
let (resp_tx, resp_rx) = oneshot::channel();
self.sender
.send((req, resp_tx))
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?;
resp_rx
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?
}
}
impl Drop for StreamGuard {
fn drop(&mut self) {
// Release the queue depth reservation on drop. This can prematurely decrement it if dropped
// before the response is received, but that's okay.
let prev_queue_depth = self.queue_depth.fetch_sub(1, Ordering::SeqCst);
assert!(prev_queue_depth > 0, "stream queue underflow");
_ = self.permit; // returned on drop, referenced for visibility
}
}