From f18cc808f09adcc5fd570cdb2a5bddd2c77a0da9 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 14 Jul 2025 12:47:26 +0200 Subject: [PATCH] pageserver/client_grpc: reap idle channels immediately (#12587) ## Problem It can take 3x the idle timeout to reap a channel. We have to wait for the idle timeout to trigger first for the stream, then the client, then the channel. Touches #11735. ## Summary of changes Reap empty channels immediately, and rely indirectly on the channel/stream timeouts. This can still lead to 2x the idle timeout for streams (first stream then client), but that's okay -- if the stream closes abruptly (e.g. due to timeout or error) we want to keep the client around in the pool for a while. --- pageserver/client_grpc/src/pool.rs | 66 +++++++++--------------------- 1 file changed, 19 insertions(+), 47 deletions(-) diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 906872e091..4a29252cd9 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -9,19 +9,20 @@ //! //! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients //! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a -//! per-channel client limit. Channels may be closed when they are no longer used by any clients. +//! per-channel client limit. Channels are closed immediately when empty, and indirectly rely on +//! client/stream idle timeouts. //! //! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared) //! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a -//! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed -//! from the pool after some time, to free up the channel. +//! single caller at a time, and is returned to the pool when dropped. Idle clients are removed +//! from the pool after a while to free up resources. //! //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the //! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it //! returns a guard that can be used to send a single request, to properly enforce queue depth and //! route responses. Internally, the pool will reuse or spin up a suitable stream for the request, //! possibly pipelining multiple requests from multiple callers on the same stream (up to some -//! queue depth). Idle streams may be removed from the pool after a while to free up the client. +//! queue depth). Idle streams are removed from the pool after a while to free up resources. //! //! Each channel corresponds to one TCP connection. Each client unary request and each stream //! corresponds to one HTTP/2 stream and server task. @@ -48,14 +49,12 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -/// Reap channels/clients/streams that have been idle for this long. +/// Reap clients/streams that have been idle for this long. Channels are reaped immediately when +/// empty, and indirectly rely on the client/stream idle timeouts. /// -/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to -/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle. -/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we -/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to -/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty -/// channels, and/or stream pool clients. +/// A stream's client will be reaped after 2x the idle threshold (first stream the client), but +/// that's okay -- if the stream closes abruptly (e.g. due to timeout or cancellation), we want to +/// keep its client around in the pool for a while. const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) { false => Duration::from_secs(180), true => Duration::from_secs(1), // exercise reaping in tests @@ -83,8 +82,6 @@ pub struct ChannelPool { max_clients_per_channel: NonZero, /// Open channels. channels: Mutex>, - /// Reaps idle channels. - idle_reaper: Reaper, /// Channel ID generator. next_channel_id: AtomicUsize, } @@ -96,9 +93,6 @@ struct ChannelEntry { channel: Channel, /// Number of clients using this channel. clients: usize, - /// The channel has been idle (no clients) since this time. None if channel is in use. - /// INVARIANT: Some if clients == 0, otherwise None. - idle_since: Option, } impl ChannelPool { @@ -108,15 +102,12 @@ impl ChannelPool { E: TryInto + Send + Sync + 'static, >::Error: std::error::Error + Send + Sync, { - let pool = Arc::new(Self { + Ok(Arc::new(Self { endpoint: endpoint.try_into()?, max_clients_per_channel, channels: Mutex::default(), - idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), next_channel_id: AtomicUsize::default(), - }); - pool.idle_reaper.spawn(&pool); - Ok(pool) + })) } /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel. @@ -137,22 +128,17 @@ impl ChannelPool { let mut channels = self.channels.lock().unwrap(); // Try to find an existing channel with available capacity. We check entries in BTreeMap - // order, to fill up the lower-ordered channels first. The ClientPool also prefers clients - // with lower-ordered channel IDs first. This will cluster clients in lower-ordered + // order, to fill up the lower-ordered channels first. The client/stream pools also prefer + // clients with lower-ordered channel IDs first. This will cluster clients in lower-ordered // channels, and free up higher-ordered channels such that they can be reaped. for (&id, entry) in channels.iter_mut() { assert!( entry.clients <= self.max_clients_per_channel.get(), "channel overflow" ); - assert_eq!( - entry.idle_since.is_some(), - entry.clients == 0, - "incorrect channel idle state" - ); + assert_ne!(entry.clients, 0, "empty channel not reaped"); if entry.clients < self.max_clients_per_channel.get() { entry.clients += 1; - entry.idle_since = None; return ChannelGuard { pool: Arc::downgrade(self), id, @@ -169,7 +155,6 @@ impl ChannelPool { let entry = ChannelEntry { channel: channel.clone(), clients: 1, // account for the guard below - idle_since: None, }; channels.insert(id, entry); @@ -181,20 +166,6 @@ impl ChannelPool { } } -impl Reapable for ChannelPool { - /// Reaps channels that have been idle since before the cutoff. - fn reap_idle(&self, cutoff: Instant) { - self.channels.lock().unwrap().retain(|_, entry| { - let Some(idle_since) = entry.idle_since else { - assert_ne!(entry.clients, 0, "empty channel not marked idle"); - return true; - }; - assert_eq!(entry.clients, 0, "idle channel has clients"); - idle_since >= cutoff - }) - } -} - /// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`, /// since the gRPC client requires an owned `Channel`. pub struct ChannelGuard { @@ -211,7 +182,7 @@ impl ChannelGuard { } } -/// Returns the channel to the pool. +/// Returns the channel to the pool. The channel is closed when empty. impl Drop for ChannelGuard { fn drop(&mut self) { let Some(pool) = self.pool.upgrade() else { @@ -220,11 +191,12 @@ impl Drop for ChannelGuard { let mut channels = pool.channels.lock().unwrap(); let entry = channels.get_mut(&self.id).expect("unknown channel"); - assert!(entry.idle_since.is_none(), "active channel marked idle"); assert!(entry.clients > 0, "channel underflow"); entry.clients -= 1; + + // Reap empty channels immediately. if entry.clients == 0 { - entry.idle_since = Some(Instant::now()); // mark channel as idle + channels.remove(&self.id); } } }