From 1ec63bd6bc27d984a698390bd593305dcab8d625 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 2 Jul 2025 14:41:47 +0200 Subject: [PATCH] Misc pool improvements --- pageserver/client_grpc/src/client.rs | 44 ++-- pageserver/client_grpc/src/pool.rs | 312 +++++++++++++++------------ 2 files changed, 198 insertions(+), 158 deletions(-) diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 41ee43a732..abf3fe6b13 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -8,7 +8,7 @@ use utils::backoff; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamPool}; +use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool}; /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the /// basic `page_api::Client` gRPC client, and supports: @@ -93,10 +93,10 @@ impl PageserverClient { ) -> tonic::Result { // TODO: support multiple shards. let shard_id = ShardIndex::unsharded(); - let streams = self.get_shard_streams(shard_id)?; self.with_retries("get_page", async || { - let resp = streams.send(req.clone()).await?; + let stream = self.get_shard_stream(shard_id).await?; + let resp = stream.send(req.clone()).await?; if resp.status_code != page_api::GetPageStatusCode::Ok { return Err(tonic::Status::new( @@ -151,14 +151,16 @@ impl PageserverClient { .map_err(|err| tonic::Status::internal(format!("failed to acquire client: {err}"))) } - /// Returns the stream pool for the given shard. + /// Returns a pooled stream for the given shard. #[allow(clippy::result_large_err)] // TODO: revisit - fn get_shard_streams(&self, shard_id: ShardIndex) -> tonic::Result<&Arc> { - Ok(&self + async fn get_shard_stream(&self, shard_id: ShardIndex) -> tonic::Result { + Ok(self .pools .get(&shard_id) .ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))? - .streams) + .streams + .get() + .await) } /// Returns the shard index for shard 0. @@ -215,12 +217,10 @@ impl PageserverClient { /// /// TODO: consider separate pools for normal and bulk traffic, with different settings. struct ShardPools { - /// Manages gRPC channels (i.e. TCP connections) for this shard. - #[allow(unused)] - channels: Arc, - /// Manages gRPC clients for this shard, using `channels`. + /// Manages unary gRPC clients for this shard. clients: Arc, - /// Manages gRPC GetPage streams for this shard, using `clients`. + /// Manages gRPC GetPage streams for this shard. Uses a dedicated client pool, but shares the + /// channel pool with unary clients. streams: Arc, } @@ -233,20 +233,26 @@ impl ShardPools { shard_id: ShardIndex, auth_token: Option, ) -> anyhow::Result { + // Use a common channel pool for all clients, to multiplex unary and stream requests across + // the same TCP connections. The channel pool is unbounded (client pools are bounded). let channels = ChannelPool::new(url)?; + + // Dedicated client pool for unary requests. let clients = ClientPool::new( channels.clone(), tenant_id, timeline_id, shard_id, - auth_token, + auth_token.clone(), ); - let streams = StreamPool::new(clients.clone()); - Ok(Self { - channels, - clients, - streams, - }) + // Dedicated client pool for streams. If this shared a client pool with unary requests, + // long-lived streams could fill up the client pool and starve out unary requests. It + // shares the same underlying channel pool with unary clients though. + let stream_clients = + ClientPool::new(channels, tenant_id, timeline_id, shard_id, auth_token); + let streams = StreamPool::new(stream_clients); + + Ok(Self { clients, streams }) } } diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index cbcf26656e..ac7fe35dc7 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -1,8 +1,8 @@ //! This module provides various Pageserver gRPC client resource pools. //! //! These pools are designed to reuse gRPC resources (connections, clients, and streams) across -//! multiple callers (i.e. Postgres backends). This avoids the resource cost and latency of creating -//! a dedicated TCP connection and server task for every Postgres backend. +//! multiple concurrent callers (i.e. Postgres backends). This avoids the resource cost and latency +//! of creating dedicated TCP connections and server tasks for every Postgres backend. //! //! Each resource has its own, nested pool. The pools are custom-built for the properties of each //! resource -- they are different enough that a generic pool isn't suitable. @@ -18,14 +18,17 @@ //! //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from //! the ClientPool for the stream's lifetime. Internal streams are not exposed to callers; -//! instead, callers submit individual GetPage requests to the pool and await a response. -//! Internally, the pool will reuse or spin up a suitable stream for the request, possibly -//! pipelining multiple requests from multiple callers on the same stream (up to some queue -//! depth), and route the response back to the original caller. Idle streams may be removed from -//! the pool after some time, to free up the client. -//! +//! instead, it returns a guard can be used to send a single request, to properly enforce queue +//! depth and route responses. Internally, the pool will reuse or spin up a suitable stream for +//! the request, possibly pipelining multiple requests from multiple callers on the same stream +//! (up to some queue depth). Idle streams may be removed from the pool after some time, to free +//! up the client. +//! //! Each channel corresponds to one TCP connection. Each client unary request and each stream //! corresponds to one HTTP/2 stream and server task. +//! +//! TODO: error handling (including custom error types). +//! TODO: observability. use std::collections::{BTreeMap, HashMap}; use std::ops::{Deref, DerefMut}; @@ -33,17 +36,16 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use futures::StreamExt as _; -use scopeguard::defer; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; use tonic::transport::{Channel, Endpoint}; -use tracing::warn; +use tracing::{error, warn}; use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -// TODO: tune these constants, and consider making them configurable. +// TODO: tune these constants, and make them configurable. /// Max number of concurrent clients per channel. /// @@ -51,8 +53,7 @@ use utils::shard::ShardIndex; /// with only streams. const CLIENTS_PER_CHANNEL: usize = 16; -/// Maximum number of concurrent clients per `ClientPool`. This bounds the number of channels as -/// CLIENT_LIMIT / CLIENTS_PER_CHANNEL. +/// Maximum number of concurrent clients per `ClientPool`. const CLIENT_LIMIT: usize = 64; /// Max number of pipelined requests per gRPC GetPage stream. @@ -62,18 +63,18 @@ const STREAM_QUEUE_DEPTH: usize = 2; /// stream multiplexing), up to `CLIENTS_PER_CHANNEL`. The pool does not limit the number of /// channels, and instead relies on `ClientPool` to limit the number of concurrent clients. /// -/// The pool is always wrapped in an outer `Arc`, to allow long-lived references from guards. -/// -/// Tonic will automatically retry the underlying connection if it fails, so there is no need -/// to re-establish connections on errors. +/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// /// TODO: reap idle channels. +/// TODO: consider prewarming a set of channels, to avoid initial connection latency. /// TODO: consider adding a circuit breaker for errors and fail fast. pub struct ChannelPool { /// Pageserver endpoint to connect to. endpoint: Endpoint, /// Open channels. channels: Mutex>, + /// Channel ID generator. + next_channel_id: AtomicUsize, } type ChannelID = usize; @@ -94,14 +95,16 @@ impl ChannelPool { { Ok(Arc::new(Self { endpoint: endpoint.try_into()?, - channels: Default::default(), + channels: Mutex::default(), + next_channel_id: AtomicUsize::default(), })) } /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel. /// - /// This never blocks (except for sync mutex acquisition). The channel is connected lazily on - /// first use, and the `ChannelPool` does not have a channel limit. + /// This never blocks (except for mutex acquisition). The channel is connected lazily on first + /// use, and the `ChannelPool` does not have a channel limit. Channels will be re-established + /// automatically on failure (TODO: verify). /// /// Callers should not clone the returned channel, and must hold onto the returned guard as long /// as the channel is in use. It is unfortunately not possible to enforce this: the Protobuf @@ -115,9 +118,9 @@ impl ChannelPool { let mut channels = self.channels.lock().unwrap(); // Try to find an existing channel with available capacity. We check entries in BTreeMap - // order, to fill up the lower-ordered channels first. The ClientPool also uses clients with - // lower-ordered channel IDs first. This will cluster clients in lower-ordered channels, and - // free up higher-ordered channels such that they can be reaped. + // order, to fill up the lower-ordered channels first. The ClientPool also prefers clients + // with lower-ordered channel IDs first. This will cluster clients in lower-ordered + // channels, and free up higher-ordered channels such that they can be reaped. for (&id, entry) in channels.iter_mut() { assert!(entry.clients <= CLIENTS_PER_CHANNEL, "channel overflow"); if entry.clients < CLIENTS_PER_CHANNEL { @@ -130,11 +133,11 @@ impl ChannelPool { } } - // Create a new channel. We connect lazily on the first use, such that we don't block here - // and other clients can join onto the same channel while it's connecting. + // Create a new channel. We connect lazily on first use, such that we don't block here and + // other clients can join onto the same channel while it's connecting. let channel = self.endpoint.connect_lazy(); - let id = channels.keys().last().copied().unwrap_or_default(); + let id = self.next_channel_id.fetch_add(1, Ordering::Relaxed); let entry = ChannelEntry { channel: channel.clone(), clients: 1, // we're returning the guard below @@ -144,14 +147,13 @@ impl ChannelPool { ChannelGuard { pool: Arc::downgrade(self), id, - channel: Some(channel.clone()), + channel: Some(channel), } } } -/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`. -/// However, the caller must hold onto the guard as long as it's using the channel, and should not -/// clone it. +/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`, +/// since the gRPC client requires an owned `Channel`. pub struct ChannelGuard { pool: Weak, id: ChannelID, @@ -159,8 +161,8 @@ pub struct ChannelGuard { } impl ChannelGuard { - /// Returns the inner channel. Panics if called more than once. The caller must hold onto the - /// guard as long as the channel is in use, and should not clone it. + /// Returns the inner owned channel. Panics if called more than once. The caller must hold onto + /// the guard as long as the channel is in use, and should not clone it. pub fn take(&mut self) -> Channel { self.channel.take().expect("channel already taken") } @@ -180,14 +182,12 @@ impl Drop for ChannelGuard { } /// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner -/// `ChannelPool`. A client is only acquired by a single caller at a time. The pool limits the total +/// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total /// number of concurrent clients to `CLIENT_LIMIT` via semaphore. /// -/// The pool is always wrapped in an outer `Arc`, to allow long-lived references from guards. +/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// /// TODO: reap idle clients. -/// TODO: error handling (but channel will be reconnected automatically). -/// TODO: rate limiting. pub struct ClientPool { /// Tenant ID. tenant_id: TenantId, @@ -204,7 +204,7 @@ pub struct ClientPool { /// Idle pooled clients. Acquired clients are removed from here and returned on drop. /// /// The first client in the map will be acquired next. The map is sorted by client ID, which in - /// turn is sorted by the channel ID, such that we prefer acquiring idle clients from + /// turn is sorted by its channel ID, such that we prefer acquiring idle clients from /// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle /// clients are reaped. idle: Mutex>, @@ -241,12 +241,12 @@ impl ClientPool { }) } - /// Gets a client from the pool, or creates a new one if necessary. Blocks if the pool is at - /// `CLIENT_LIMIT`, but connection happens lazily (if needed). The client is returned to the - /// pool when the guard is dropped. + /// Gets a client from the pool, or creates a new one if necessary. Connections are established + /// lazily and does not block, but this call can block if the pool is at `CLIENT_LIMIT`. The + /// client is returned to the pool when the guard is dropped. /// /// This is moderately performance-sensitive. It is called for every unary request, but recall - /// that these establish a new gRPC stream per request so it's already expensive. GetPage + /// that these establish a new gRPC stream per request so they're already expensive. GetPage /// requests use the `StreamPool` instead. pub async fn get(self: &Arc) -> anyhow::Result { let permit = self @@ -291,7 +291,7 @@ impl ClientPool { } } -/// A client acquired from the pool. The inner client can be accessed via derefs. The client is +/// A client acquired from the pool. The inner client can be accessed via Deref. The client is /// returned to the pool when dropped. pub struct ClientGuard { pool: Weak, @@ -327,32 +327,30 @@ impl Drop for ClientGuard { }; pool.idle.lock().unwrap().insert(self.id, entry); - // The permit will be returned by its drop handler. Tag it here for visibility. - _ = self.permit; + _ = self.permit; // returned on drop, referenced for visibility } } /// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream /// acquires a client from the inner `ClientPool` for the stream's lifetime. /// -/// Individual streams are not exposed to callers -- instead, callers submit invididual requests to -/// the pool and await a response. Internally, requests are multiplexed across streams and channels. +/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send +/// a single request and await the response. Internally, requests are multiplexed across streams and +/// channels. This allows proper queue depth enforcement and response routing. /// /// TODO: reap idle streams. -/// TODO: error handling (but channel will be reconnected automatically). -/// TODO: rate limiting. /// TODO: consider making this generic over request and response types; not currently needed. pub struct StreamPool { /// The client pool to acquire clients from. client_pool: Arc, /// All pooled streams. /// - /// Incoming requests will be sent over an existing stream with available capacity, or a new - /// stream is spun up and added to the pool. Each stream has an associated Tokio task that - /// processes requests and responses. + /// Incoming requests will be sent over an existing stream with available capacity. If all + /// streams are full, a new one is spun up and added to the pool (up to the `ClientPool` limit). + /// Each stream has an associated Tokio task that processes requests and responses. streams: Arc>>, /// Limits the max number of concurrent requests (not streams). - limiter: Semaphore, + limiter: Arc, /// Stream ID generator. next_stream_id: AtomicUsize, } @@ -372,21 +370,23 @@ struct StreamEntry { impl StreamPool { /// Creates a new stream pool, using the given client pool. + /// + /// NB: the stream pool should use a dedicated client pool. Otherwise, long-lived streams may + /// fill up the client pool and starve out unary requests. Client pools can share the same + /// `ChannelPool` though, since the channel pool is unbounded. pub fn new(client_pool: Arc) -> Arc { Arc::new(Self { client_pool, streams: Arc::default(), - limiter: Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH), + limiter: Arc::new(Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH)), next_stream_id: AtomicUsize::default(), }) } - /// Sends a request via the stream pool and awaits the response. Blocks if the pool is at - /// capacity (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight). The - /// `GetPageRequest::request_id` must be unique across in-flight request. - /// - /// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status` - /// to avoid tearing down the stream for per-request errors. Callers must check this. + /// Acquires an available stream from the pool, or spins up a new stream async if all streams + /// are full. Returns a guard that can be used to send a single request on the stream and await + /// the response, with queue depth quota already acquired. Blocks if the pool is at capacity + /// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight). /// /// This is very performance-sensitive, as it is on the GetPage hot path. /// @@ -400,95 +400,84 @@ impl StreamPool { /// * Allow spinning up multiple streams concurrently, but don't overshoot limits. /// /// For now, we just do something simple and functional, but very inefficient (linear scan). - pub async fn send( - &self, - req: page_api::GetPageRequest, - ) -> tonic::Result { - // Acquire a permit. For simplicity, we drop it when this method returns. This may exceed - // the queue depth if a caller goes away while a request is in flight, but that's okay. We - // do the same for queue depth tracking. - let _permit = self.limiter.acquire().await.expect("never closed"); + pub async fn get(&self) -> StreamGuard { + let permit = self + .limiter + .clone() + .acquire_owned() + .await + .expect("never closed"); + let mut streams = self.streams.lock().unwrap(); - // Acquire a stream sender. We increment and decrement the queue depth here while acquiring - // a stream, instead of in the stream task, to ensure we don't acquire a full stream. - #[allow(clippy::await_holding_lock)] // TODO: Clippy doesn't understand drop() - let (req_tx, queue_depth) = async { - let mut streams = self.streams.lock().unwrap(); - - // Try to find an existing stream with available capacity. - for entry in streams.values() { - assert!( - entry.queue_depth.load(Ordering::Relaxed) <= STREAM_QUEUE_DEPTH, - "stream overflow" - ); - if entry - .queue_depth - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| { - // Increment the queue depth via compare-and-swap. - // TODO: review ordering. - (queue_depth < STREAM_QUEUE_DEPTH).then_some(queue_depth + 1) - }) - .is_ok() - { - return anyhow::Ok((entry.sender.clone(), entry.queue_depth.clone())); - } + // Look for a pooled stream with available capacity. + for entry in streams.values() { + assert!( + entry.queue_depth.load(Ordering::Relaxed) <= STREAM_QUEUE_DEPTH, + "stream queue overflow" + ); + if entry + .queue_depth + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| { + // Increment the queue depth via compare-and-swap. + // TODO: review ordering. + (queue_depth < STREAM_QUEUE_DEPTH).then_some(queue_depth + 1) + }) + .is_ok() + { + return StreamGuard { + sender: entry.sender.clone(), + queue_depth: entry.queue_depth.clone(), + permit, + }; } - - // No available stream, spin up a new one. We install the stream entry first and release - // the lock, to allow other callers to join onto this stream and also create additional - // streams concurrently when this fills up. - let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); - let queue_depth = Arc::new(AtomicUsize::new(1)); // account for this request - let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); - let entry = StreamEntry { - sender: req_tx.clone(), - queue_depth: queue_depth.clone(), - }; - streams.insert(id, entry); - - drop(streams); // drop lock before spinning up stream - - let client_pool = self.client_pool.clone(); - let streams = self.streams.clone(); - - tokio::spawn(async move { - if let Err(err) = Self::run_stream(client_pool, req_rx).await { - warn!("stream failed: {err}"); - } - // Remove stream from pool on exit. - let entry = streams.lock().unwrap().remove(&id); - assert!(entry.is_some(), "unknown stream ID: {id}"); - }); - - anyhow::Ok((req_tx, queue_depth)) } - .await - .map_err(|err| tonic::Status::internal(err.to_string()))?; - // Decrement the queue depth on return. This may prematurely decrement it if the caller goes - // away while the request is in flight, but that's okay. - defer!( - let prev_queue_depth = queue_depth.fetch_sub(1, Ordering::SeqCst); - assert!(prev_queue_depth > 0, "stream underflow"); - ); + // No available stream, spin up a new one. We install the stream entry in the pool first and + // return the guard, while spinning up the stream task async. This allows other callers to + // join onto this stream and also create additional streams concurrently if this fills up. + // + // NB: we have to be careful not to overshoot here. The semaphore limit is CLIENT_LIMIT * + // STREAM_QUEUE_DEPTH, but if we were to miss a concurrent queue depth allocation we'd try + // to spin up more streams than CLIENT_LIMIT and block on the client pool ~forever. Because + // we only acquire queue depth under lock. + let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); + let queue_depth = Arc::new(AtomicUsize::new(1)); // reserve quota for this caller + let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); + let entry = StreamEntry { + sender: req_tx.clone(), + queue_depth: queue_depth.clone(), + }; + streams.insert(id, entry); - // Send the request and wait for the response. - let (resp_tx, resp_rx) = oneshot::channel(); + // NB: make sure we don't overshoot the client limit. The semaphore limit is CLIENT_LIMIT * + // STREAM_QUEUE_DEPTH, but if we were to misaccount queue depth we'd try to spin up more + // streams than CLIENT_LIMIT and block on the client pool ~forever. This should not be + // possible because we only acquire queue depth under lock. + assert!(streams.len() <= CLIENT_LIMIT, "stream overflow"); - req_tx - .send((req, resp_tx)) - .await - .map_err(|_| tonic::Status::unavailable("stream closed"))?; + let client_pool = self.client_pool.clone(); + let streams = self.streams.clone(); - resp_rx - .await - .map_err(|_| tonic::Status::unavailable("stream closed"))? + tokio::spawn(async move { + if let Err(err) = Self::run_stream(client_pool, req_rx).await { + error!("stream failed: {err}"); + } + // Remove stream from pool on exit. + let entry = streams.lock().unwrap().remove(&id); + assert!(entry.is_some(), "unknown stream ID: {id}"); + }); + + StreamGuard { + sender: req_tx, + queue_depth, + permit, + } } /// Runs a stream task. This acquires a client from the `ClientPool` and establishes a /// bidirectional GetPage stream, then forwards requests and responses between callers and the - /// stream. It does not track or enforce queue depths -- that's done by `send()` since it must - /// be atomic with pool stream acquisition. + /// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be + /// atomic with pool stream acquisition. /// /// The task exits when the request channel is closed, or on a stream error. The caller is /// responsible for removing the stream from the pool on exit. @@ -504,13 +493,13 @@ impl StreamPool { let mut resp_stream = client.get_pages(req_stream).await?; // Track caller response channels by request ID. If the task returns early, these response - // channels will be dropped and the callers will receive an error. + // channels will be dropped and the waiting callers will receive an error. let mut callers = HashMap::with_capacity(STREAM_QUEUE_DEPTH); // Process requests and responses. loop { // NB: this can trip if the server doesn't respond to a request, so only debug_assert. - debug_assert!(callers.len() <= STREAM_QUEUE_DEPTH, "stream overflow"); + debug_assert!(callers.len() <= STREAM_QUEUE_DEPTH, "stream queue overflow"); tokio::select! { // Receive requests from callers and send them to the stream. @@ -554,3 +543,48 @@ impl StreamPool { } } } + +/// A pooled stream reference. Can be used to send a single request, to properly enforce queue +/// depth. Queue depth is already reserved and will be returned on drop. +pub struct StreamGuard { + sender: RequestSender, + queue_depth: Arc, + permit: OwnedSemaphorePermit, +} + +impl StreamGuard { + /// Sends a request on the stream and awaits the response. Consumes the guard, since it's only + /// valid for a single request (to enforce queue depth). This also drops the guard on return and + /// returns the queue depth quota to the pool. + /// + /// The `GetPageRequest::request_id` must be unique across in-flight request. + /// + /// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status` + /// to avoid tearing down the stream for per-request errors. Callers must check this. + pub async fn send( + self, + req: page_api::GetPageRequest, + ) -> tonic::Result { + let (resp_tx, resp_rx) = oneshot::channel(); + + self.sender + .send((req, resp_tx)) + .await + .map_err(|_| tonic::Status::unavailable("stream closed"))?; + + resp_rx + .await + .map_err(|_| tonic::Status::unavailable("stream closed"))? + } +} + +impl Drop for StreamGuard { + fn drop(&mut self) { + // Release the queue depth reservation on drop. This can prematurely decrement it if dropped + // before the response is received, but that's okay. + let prev_queue_depth = self.queue_depth.fetch_sub(1, Ordering::SeqCst); + assert!(prev_queue_depth > 0, "stream queue underflow"); + + _ = self.permit; // returned on drop, referenced for visibility + } +}