pageserver/client_grpc: reap idle channels immediately

This commit is contained in:
Erik Grinaker
2025-07-13 18:40:27 +02:00
parent 56eb511618
commit 87f01a25ab

View File

@@ -36,7 +36,8 @@
//! * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB
//! per stream (2.5 GB for 100,000 streams), so we can afford to scale out.
//!
//! Idle resources are removed from the pools periodically, to avoid holding onto server resources.
//! Idle clients/streams are removed from the pools periodically, to free up server resources.
//! Channels are reaped immediately when unused, and indirectly rely on client/stream idle timeouts.
//!
//! Each channel corresponds to one TCP connection. Each client unary request and each stream
//! corresponds to one HTTP/2 stream and server task.
@@ -63,14 +64,12 @@ use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
/// Reap channels/clients/streams that have been idle for this long.
/// Reap clients/streams that have been idle for this long. Channels are reaped immediately when
/// empty, and indirectly rely on the client/stream idle timeouts.
///
/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to
/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle.
/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we
/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to
/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty
/// channels, and/or stream pool clients.
/// A stream's client will be reaped after 2x the idle threshold (first stream the client), but
/// that's okay -- if the stream closes abruptly (e.g. due to timeout or cancellation), we want to
/// keep its client around in the pool for a while.
const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
false => Duration::from_secs(180),
true => Duration::from_secs(1), // exercise reaping in tests
@@ -98,8 +97,6 @@ pub struct ChannelPool {
max_clients_per_channel: NonZero<usize>,
/// Open channels.
channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
/// Reaps idle channels.
idle_reaper: Reaper,
/// Channel ID generator.
next_channel_id: AtomicUsize,
}
@@ -111,9 +108,6 @@ struct ChannelEntry {
channel: Channel,
/// Number of clients using this channel.
clients: usize,
/// The channel has been idle (no clients) since this time. None if channel is in use.
/// INVARIANT: Some if clients == 0, otherwise None.
idle_since: Option<Instant>,
}
impl ChannelPool {
@@ -123,15 +117,12 @@ impl ChannelPool {
E: TryInto<Endpoint> + Send + Sync + 'static,
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
{
let pool = Arc::new(Self {
Ok(Arc::new(Self {
endpoint: endpoint.try_into()?,
max_clients_per_channel,
channels: Mutex::default(),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
next_channel_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
Ok(pool)
}))
}
/// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
@@ -152,22 +143,17 @@ impl ChannelPool {
let mut channels = self.channels.lock().unwrap();
// Try to find an existing channel with available capacity. We check entries in BTreeMap
// order, to fill up the lower-ordered channels first. The ClientPool also prefers clients
// with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// order, to fill up the lower-ordered channels first. The client/stream pools also prefer
// clients with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// channels, and free up higher-ordered channels such that they can be reaped.
for (&id, entry) in channels.iter_mut() {
assert!(
entry.clients <= self.max_clients_per_channel.get(),
"channel overflow"
);
assert_eq!(
entry.idle_since.is_some(),
entry.clients == 0,
"incorrect channel idle state"
);
assert_ne!(entry.clients, 0, "empty channel not reaped");
if entry.clients < self.max_clients_per_channel.get() {
entry.clients += 1;
entry.idle_since = None;
return ChannelGuard {
pool: Arc::downgrade(self),
id,
@@ -184,7 +170,6 @@ impl ChannelPool {
let entry = ChannelEntry {
channel: channel.clone(),
clients: 1, // account for the guard below
idle_since: None,
};
channels.insert(id, entry);
@@ -196,20 +181,6 @@ impl ChannelPool {
}
}
impl Reapable for ChannelPool {
/// Reaps channels that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.channels.lock().unwrap().retain(|_, entry| {
let Some(idle_since) = entry.idle_since else {
assert_ne!(entry.clients, 0, "empty channel not marked idle");
return true;
};
assert_eq!(entry.clients, 0, "idle channel has clients");
idle_since >= cutoff
})
}
}
/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
/// since the gRPC client requires an owned `Channel`.
pub struct ChannelGuard {
@@ -226,7 +197,7 @@ impl ChannelGuard {
}
}
/// Returns the channel to the pool.
/// Returns the channel to the pool. The channel is closed when empty.
impl Drop for ChannelGuard {
fn drop(&mut self) {
let Some(pool) = self.pool.upgrade() else {
@@ -235,11 +206,12 @@ impl Drop for ChannelGuard {
let mut channels = pool.channels.lock().unwrap();
let entry = channels.get_mut(&self.id).expect("unknown channel");
assert!(entry.idle_since.is_none(), "active channel marked idle");
assert!(entry.clients > 0, "channel underflow");
entry.clients -= 1;
// Reap empty channels immediately.
if entry.clients == 0 {
entry.idle_since = Some(Instant::now()); // mark channel as idle
channels.remove(&self.id);
}
}
}