|
|
|
|
@@ -18,11 +18,27 @@
|
|
|
|
|
//! from the pool after a while to free up resources.
|
|
|
|
|
//!
|
|
|
|
|
//! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
|
|
|
|
|
//! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it
|
|
|
|
|
//! returns a guard that can be used to send a single request, to properly enforce queue depth and
|
|
|
|
|
//! route responses. Internally, the pool will reuse or spin up a suitable stream for the request,
|
|
|
|
|
//! possibly pipelining multiple requests from multiple callers on the same stream (up to some
|
|
|
|
|
//! queue depth). Idle streams are removed from the pool after a while to free up resources.
|
|
|
|
|
//! ClientPool for the stream's lifetime. A stream can only be acquired by a single caller at a
|
|
|
|
|
//! time, and is returned to the pool when dropped. Idle streams are removed from the pool after
|
|
|
|
|
//! a while to free up resources.
|
|
|
|
|
//!
|
|
|
|
|
//! The stream only supports sending a single, synchronous request at a time, and does not support
|
|
|
|
|
//! pipelining multiple requests from different callers onto the same stream -- instead, we scale
|
|
|
|
|
//! out concurrent streams to improve throughput. There are many reasons for this design choice:
|
|
|
|
|
//!
|
|
|
|
|
//! * It (mostly) eliminates head-of-line blocking. A single stream is processed sequentially by
|
|
|
|
|
//! a single server task, which may block e.g. on layer downloads, LSN waits, etc.
|
|
|
|
|
//!
|
|
|
|
|
//! * Cancellation becomes trivial, by closing the stream. Otherwise, if a caller goes away
|
|
|
|
|
//! (e.g. because of a timeout), the request would still be processed by the server and block
|
|
|
|
|
//! requests behind it in the stream. It might even block its own timeout retry.
|
|
|
|
|
//!
|
|
|
|
|
//! * Stream scheduling becomes significantly simpler and cheaper.
|
|
|
|
|
//!
|
|
|
|
|
//! * Individual callers can still use client-side batching for pipelining.
|
|
|
|
|
//!
|
|
|
|
|
//! * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB
|
|
|
|
|
//! per stream (2.5 GB for 100,000 streams), so we can afford to scale out.
|
|
|
|
|
//!
|
|
|
|
|
//! Each channel corresponds to one TCP connection. Each client unary request and each stream
|
|
|
|
|
//! corresponds to one HTTP/2 stream and server task.
|
|
|
|
|
@@ -30,20 +46,20 @@
|
|
|
|
|
//! TODO: error handling (including custom error types).
|
|
|
|
|
//! TODO: observability.
|
|
|
|
|
|
|
|
|
|
use std::collections::{BTreeMap, HashMap};
|
|
|
|
|
use std::collections::BTreeMap;
|
|
|
|
|
use std::num::NonZero;
|
|
|
|
|
use std::ops::{Deref, DerefMut};
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
|
use std::sync::{Arc, Mutex, Weak};
|
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
|
|
|
|
use futures::StreamExt as _;
|
|
|
|
|
use tokio::sync::mpsc::{Receiver, Sender};
|
|
|
|
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
|
|
|
|
|
use futures::{Stream, StreamExt as _};
|
|
|
|
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch};
|
|
|
|
|
use tokio_stream::wrappers::WatchStream;
|
|
|
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
|
use tonic::codec::CompressionEncoding;
|
|
|
|
|
use tonic::transport::{Channel, Endpoint};
|
|
|
|
|
use tracing::{error, warn};
|
|
|
|
|
|
|
|
|
|
use pageserver_page_api as page_api;
|
|
|
|
|
use utils::id::{TenantId, TimelineId};
|
|
|
|
|
@@ -225,8 +241,7 @@ pub struct ClientPool {
|
|
|
|
|
///
|
|
|
|
|
/// The first client in the map will be acquired next. The map is sorted by client ID, which in
|
|
|
|
|
/// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
|
|
|
|
|
/// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle
|
|
|
|
|
/// clients are reaped.
|
|
|
|
|
/// lower-ordered channels. This allows us to free up and reap higher-ordered channels.
|
|
|
|
|
idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
|
|
|
|
|
/// Reaps idle clients.
|
|
|
|
|
idle_reaper: Reaper,
|
|
|
|
|
@@ -282,7 +297,7 @@ impl ClientPool {
|
|
|
|
|
/// This is moderately performance-sensitive. It is called for every unary request, but these
|
|
|
|
|
/// establish a new gRPC stream per request so they're already expensive. GetPage requests use
|
|
|
|
|
/// the `StreamPool` instead.
|
|
|
|
|
pub async fn get(self: &Arc<Self>) -> anyhow::Result<ClientGuard> {
|
|
|
|
|
pub async fn get(self: &Arc<Self>) -> tonic::Result<ClientGuard> {
|
|
|
|
|
// Acquire a permit if the pool is bounded.
|
|
|
|
|
let mut permit = None;
|
|
|
|
|
if let Some(limiter) = self.limiter.clone() {
|
|
|
|
|
@@ -300,7 +315,7 @@ impl ClientPool {
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Slow path: construct a new client.
|
|
|
|
|
// Construct a new client.
|
|
|
|
|
let mut channel_guard = self.channel_pool.get();
|
|
|
|
|
let client = page_api::Client::new(
|
|
|
|
|
channel_guard.take(),
|
|
|
|
|
@@ -309,7 +324,8 @@ impl ClientPool {
|
|
|
|
|
self.shard_id,
|
|
|
|
|
self.auth_token.clone(),
|
|
|
|
|
self.compression,
|
|
|
|
|
)?;
|
|
|
|
|
)
|
|
|
|
|
.map_err(|err| tonic::Status::internal(format!("failed to create client: {err}")))?;
|
|
|
|
|
|
|
|
|
|
Ok(ClientGuard {
|
|
|
|
|
pool: Arc::downgrade(self),
|
|
|
|
|
@@ -379,287 +395,187 @@ impl Drop for ClientGuard {
|
|
|
|
|
/// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
|
|
|
|
|
/// acquires a client from the inner `ClientPool` for the stream's lifetime.
|
|
|
|
|
///
|
|
|
|
|
/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send
|
|
|
|
|
/// a single request and await the response. Internally, requests are multiplexed across streams and
|
|
|
|
|
/// channels. This allows proper queue depth enforcement and response routing.
|
|
|
|
|
/// Individual streams only send a single request at a time, and do not pipeline multiple callers
|
|
|
|
|
/// onto the same stream. Instead, we scale out the number of concurrent streams. This is primarily
|
|
|
|
|
/// to eliminate head-of-line blocking. See the module documentation for more details.
|
|
|
|
|
///
|
|
|
|
|
/// TODO: consider making this generic over request and response types; not currently needed.
|
|
|
|
|
pub struct StreamPool {
|
|
|
|
|
/// The client pool to acquire clients from. Must be unbounded.
|
|
|
|
|
client_pool: Arc<ClientPool>,
|
|
|
|
|
/// All pooled streams.
|
|
|
|
|
/// Idle pooled streams. Acquired streams are removed from here and returned on drop.
|
|
|
|
|
///
|
|
|
|
|
/// Incoming requests will be sent over an existing stream with available capacity. If all
|
|
|
|
|
/// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each
|
|
|
|
|
/// stream has an associated Tokio task that processes requests and responses.
|
|
|
|
|
streams: Mutex<HashMap<StreamID, StreamEntry>>,
|
|
|
|
|
/// The max number of concurrent streams, or None if unbounded.
|
|
|
|
|
max_streams: Option<NonZero<usize>>,
|
|
|
|
|
/// The max number of concurrent requests per stream.
|
|
|
|
|
max_queue_depth: NonZero<usize>,
|
|
|
|
|
/// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`.
|
|
|
|
|
/// None if the pool is unbounded.
|
|
|
|
|
/// The first stream in the map will be acquired next. The map is sorted by stream ID, which is
|
|
|
|
|
/// equivalent to the client ID and in turn sorted by its channel ID. This way we prefer
|
|
|
|
|
/// acquiring idle streams from lower-ordered channels, which allows us to free up and reap
|
|
|
|
|
/// higher-ordered channels.
|
|
|
|
|
idle: Mutex<BTreeMap<StreamID, StreamEntry>>,
|
|
|
|
|
/// Limits the max number of concurrent streams. None if the pool is unbounded.
|
|
|
|
|
limiter: Option<Arc<Semaphore>>,
|
|
|
|
|
/// Reaps idle streams.
|
|
|
|
|
idle_reaper: Reaper,
|
|
|
|
|
/// Stream ID generator.
|
|
|
|
|
next_stream_id: AtomicUsize,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type StreamID = usize;
|
|
|
|
|
type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>;
|
|
|
|
|
type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>;
|
|
|
|
|
type ResponseSender = oneshot::Sender<tonic::Result<page_api::GetPageResponse>>;
|
|
|
|
|
/// The stream ID. Reuses the inner client ID.
|
|
|
|
|
type StreamID = ClientID;
|
|
|
|
|
|
|
|
|
|
/// A pooled stream.
|
|
|
|
|
struct StreamEntry {
|
|
|
|
|
/// Sends caller requests to the stream task. The stream task exits when this is dropped.
|
|
|
|
|
sender: RequestSender,
|
|
|
|
|
/// Number of in-flight requests on this stream.
|
|
|
|
|
queue_depth: usize,
|
|
|
|
|
/// The time when this stream went idle (queue_depth == 0).
|
|
|
|
|
/// INVARIANT: Some if queue_depth == 0, otherwise None.
|
|
|
|
|
idle_since: Option<Instant>,
|
|
|
|
|
/// The bidirectional stream.
|
|
|
|
|
stream: BiStream,
|
|
|
|
|
/// The time when this stream was last used, i.e. when it was put back into `StreamPool::idle`.
|
|
|
|
|
idle_since: Instant,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A bidirectional GetPage stream and its client. Can send requests and receive responses.
|
|
|
|
|
struct BiStream {
|
|
|
|
|
/// The owning client. Holds onto the channel slot while the stream is alive.
|
|
|
|
|
client: ClientGuard,
|
|
|
|
|
/// Stream for sending requests. Uses a watch channel, so it can only send a single request at a
|
|
|
|
|
/// time, and the caller must await the response before sending another request. This is
|
|
|
|
|
/// enforced by `StreamGuard::send`.
|
|
|
|
|
sender: watch::Sender<page_api::GetPageRequest>,
|
|
|
|
|
/// Stream for receiving responses.
|
|
|
|
|
receiver: Pin<Box<dyn Stream<Item = tonic::Result<page_api::GetPageResponse>> + Send>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl StreamPool {
|
|
|
|
|
/// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth`
|
|
|
|
|
/// concurrent requests on each stream, and use up to `max_streams` concurrent streams.
|
|
|
|
|
/// Creates a new stream pool, using the given client pool. It will use up to `max_streams`
|
|
|
|
|
/// concurrent streams.
|
|
|
|
|
///
|
|
|
|
|
/// The client pool must be unbounded. The stream pool will enforce its own limits, and because
|
|
|
|
|
/// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
|
|
|
|
|
/// The stream pool should generally have its own dedicated client pool (but it can share a
|
|
|
|
|
/// channel pool with others since these are always unbounded).
|
|
|
|
|
pub fn new(
|
|
|
|
|
client_pool: Arc<ClientPool>,
|
|
|
|
|
max_streams: Option<NonZero<usize>>,
|
|
|
|
|
max_queue_depth: NonZero<usize>,
|
|
|
|
|
) -> Arc<Self> {
|
|
|
|
|
pub fn new(client_pool: Arc<ClientPool>, max_streams: Option<NonZero<usize>>) -> Arc<Self> {
|
|
|
|
|
assert!(client_pool.limiter.is_none(), "bounded client pool");
|
|
|
|
|
let pool = Arc::new(Self {
|
|
|
|
|
client_pool,
|
|
|
|
|
streams: Mutex::default(),
|
|
|
|
|
limiter: max_streams.map(|max_streams| {
|
|
|
|
|
Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get()))
|
|
|
|
|
}),
|
|
|
|
|
max_streams,
|
|
|
|
|
max_queue_depth,
|
|
|
|
|
idle: Mutex::default(),
|
|
|
|
|
limiter: max_streams.map(|max_streams| Arc::new(Semaphore::new(max_streams.get()))),
|
|
|
|
|
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
|
|
|
|
|
next_stream_id: AtomicUsize::default(),
|
|
|
|
|
});
|
|
|
|
|
pool.idle_reaper.spawn(&pool);
|
|
|
|
|
pool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Acquires an available stream from the pool, or spins up a new stream async if all streams
|
|
|
|
|
/// are full. Returns a guard that can be used to send a single request on the stream and await
|
|
|
|
|
/// the response, with queue depth quota already acquired. Blocks if the pool is at capacity
|
|
|
|
|
/// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight).
|
|
|
|
|
/// Acquires an available stream from the pool, or spins up a new stream if all streams are
|
|
|
|
|
/// full. Returns a guard that can be used to send requests and await the responses. Blocks if
|
|
|
|
|
/// the pool is full.
|
|
|
|
|
///
|
|
|
|
|
/// This is very performance-sensitive, as it is on the GetPage hot path.
|
|
|
|
|
///
|
|
|
|
|
/// TODO: this must do something more sophisticated for performance. We want:
|
|
|
|
|
///
|
|
|
|
|
/// * Cheap, concurrent access in the common case where we can use a pooled stream.
|
|
|
|
|
/// * Quick acquisition of pooled streams with available capacity.
|
|
|
|
|
/// * Prefer streams that belong to lower-numbered channels, to reap idle channels.
|
|
|
|
|
/// * Prefer filling up existing streams' queue depth before spinning up new streams.
|
|
|
|
|
/// * Don't hold a lock while spinning up new streams.
|
|
|
|
|
/// * Allow concurrent clients to join onto streams while they're spun up.
|
|
|
|
|
/// * Allow spinning up multiple streams concurrently, but don't overshoot limits.
|
|
|
|
|
///
|
|
|
|
|
/// For now, we just do something simple but inefficient (linear scan under mutex).
|
|
|
|
|
pub async fn get(self: &Arc<Self>) -> StreamGuard {
|
|
|
|
|
/// TODO: is a `Mutex<BTreeMap>` performant enough? Will it become too contended? We can't
|
|
|
|
|
/// trivially use e.g. DashMap or sharding, because we want to pop lower-ordered streams first
|
|
|
|
|
/// to free up higher-ordered channels.
|
|
|
|
|
pub async fn get(self: &Arc<Self>) -> tonic::Result<StreamGuard> {
|
|
|
|
|
// Acquire a permit if the pool is bounded.
|
|
|
|
|
let mut permit = None;
|
|
|
|
|
if let Some(limiter) = self.limiter.clone() {
|
|
|
|
|
permit = Some(limiter.acquire_owned().await.expect("never closed"));
|
|
|
|
|
}
|
|
|
|
|
let mut streams = self.streams.lock().unwrap();
|
|
|
|
|
|
|
|
|
|
// Look for a pooled stream with available capacity.
|
|
|
|
|
for (&id, entry) in streams.iter_mut() {
|
|
|
|
|
assert!(
|
|
|
|
|
entry.queue_depth <= self.max_queue_depth.get(),
|
|
|
|
|
"stream queue overflow"
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
entry.idle_since.is_some(),
|
|
|
|
|
entry.queue_depth == 0,
|
|
|
|
|
"incorrect stream idle state"
|
|
|
|
|
);
|
|
|
|
|
if entry.queue_depth < self.max_queue_depth.get() {
|
|
|
|
|
entry.queue_depth += 1;
|
|
|
|
|
entry.idle_since = None;
|
|
|
|
|
return StreamGuard {
|
|
|
|
|
pool: Arc::downgrade(self),
|
|
|
|
|
id,
|
|
|
|
|
sender: entry.sender.clone(),
|
|
|
|
|
permit,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
// Fast path: acquire an idle stream from the pool.
|
|
|
|
|
if let Some((_, entry)) = self.idle.lock().unwrap().pop_first() {
|
|
|
|
|
return Ok(StreamGuard {
|
|
|
|
|
pool: Arc::downgrade(self),
|
|
|
|
|
stream: Some(entry.stream),
|
|
|
|
|
can_reuse: true,
|
|
|
|
|
permit,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// No available stream, spin up a new one. We install the stream entry in the pool first and
|
|
|
|
|
// return the guard, while spinning up the stream task async. This allows other callers to
|
|
|
|
|
// join onto this stream and also create additional streams concurrently if this fills up.
|
|
|
|
|
let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get());
|
|
|
|
|
let entry = StreamEntry {
|
|
|
|
|
sender: req_tx.clone(),
|
|
|
|
|
queue_depth: 1, // reserve quota for this caller
|
|
|
|
|
idle_since: None,
|
|
|
|
|
};
|
|
|
|
|
streams.insert(id, entry);
|
|
|
|
|
// Spin up a new stream. Uses a watch channel to send a single request at a time, since
|
|
|
|
|
// `StreamGuard::send` enforces this anyway and it avoids unnecessary channel overhead.
|
|
|
|
|
let mut client = self.client_pool.get().await?;
|
|
|
|
|
|
|
|
|
|
if let Some(max_streams) = self.max_streams {
|
|
|
|
|
assert!(streams.len() <= max_streams.get(), "stream overflow");
|
|
|
|
|
};
|
|
|
|
|
let (req_tx, req_rx) = watch::channel(page_api::GetPageRequest::default());
|
|
|
|
|
let req_stream = WatchStream::from_changes(req_rx);
|
|
|
|
|
let resp_stream = client.get_pages(req_stream).await?;
|
|
|
|
|
|
|
|
|
|
let client_pool = self.client_pool.clone();
|
|
|
|
|
let pool = Arc::downgrade(self);
|
|
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
if let Err(err) = Self::run_stream(client_pool, req_rx).await {
|
|
|
|
|
error!("stream failed: {err}");
|
|
|
|
|
}
|
|
|
|
|
// Remove stream from pool on exit. Weak reference to avoid holding the pool alive.
|
|
|
|
|
if let Some(pool) = pool.upgrade() {
|
|
|
|
|
let entry = pool.streams.lock().unwrap().remove(&id);
|
|
|
|
|
assert!(entry.is_some(), "unknown stream ID: {id}");
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
StreamGuard {
|
|
|
|
|
Ok(StreamGuard {
|
|
|
|
|
pool: Arc::downgrade(self),
|
|
|
|
|
id,
|
|
|
|
|
sender: req_tx,
|
|
|
|
|
stream: Some(BiStream {
|
|
|
|
|
client,
|
|
|
|
|
sender: req_tx,
|
|
|
|
|
receiver: Box::pin(resp_stream),
|
|
|
|
|
}),
|
|
|
|
|
can_reuse: true,
|
|
|
|
|
permit,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Runs a stream task. This acquires a client from the `ClientPool` and establishes a
|
|
|
|
|
/// bidirectional GetPage stream, then forwards requests and responses between callers and the
|
|
|
|
|
/// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be
|
|
|
|
|
/// atomic with pool stream acquisition.
|
|
|
|
|
///
|
|
|
|
|
/// The task exits when the request channel is closed, or on a stream error. The caller is
|
|
|
|
|
/// responsible for removing the stream from the pool on exit.
|
|
|
|
|
async fn run_stream(
|
|
|
|
|
client_pool: Arc<ClientPool>,
|
|
|
|
|
mut caller_rx: RequestReceiver,
|
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
|
// Acquire a client from the pool and create a stream.
|
|
|
|
|
let mut client = client_pool.get().await?;
|
|
|
|
|
|
|
|
|
|
// NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could
|
|
|
|
|
// theoretically deadlock if both the client and server block on sends (since we're not
|
|
|
|
|
// reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and
|
|
|
|
|
// low queue depths, but it was seen to happen with the libpq protocol so better safe than
|
|
|
|
|
// sorry. It should never buffer more than the queue depth anyway, but using an unbounded
|
|
|
|
|
// channel guarantees that it will never block.
|
|
|
|
|
let (req_tx, req_rx) = mpsc::unbounded_channel();
|
|
|
|
|
let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
|
|
|
|
|
let mut resp_stream = client.get_pages(req_stream).await?;
|
|
|
|
|
|
|
|
|
|
// Track caller response channels by request ID. If the task returns early, these response
|
|
|
|
|
// channels will be dropped and the waiting callers will receive an error.
|
|
|
|
|
//
|
|
|
|
|
// NB: this will leak entries if the server doesn't respond to a request (by request ID).
|
|
|
|
|
// It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and
|
|
|
|
|
// block further use. But we could consider reaping closed channels after some time.
|
|
|
|
|
let mut callers = HashMap::new();
|
|
|
|
|
|
|
|
|
|
// Process requests and responses.
|
|
|
|
|
loop {
|
|
|
|
|
tokio::select! {
|
|
|
|
|
// Receive requests from callers and send them to the stream.
|
|
|
|
|
req = caller_rx.recv() => {
|
|
|
|
|
// Shut down if request channel is closed.
|
|
|
|
|
let Some((req, resp_tx)) = req else {
|
|
|
|
|
return Ok(());
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Store the response channel by request ID.
|
|
|
|
|
if callers.contains_key(&req.request_id) {
|
|
|
|
|
// Error on request ID duplicates. Ignore callers that went away.
|
|
|
|
|
_ = resp_tx.send(Err(tonic::Status::invalid_argument(
|
|
|
|
|
format!("duplicate request ID: {}", req.request_id),
|
|
|
|
|
)));
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
callers.insert(req.request_id, resp_tx);
|
|
|
|
|
|
|
|
|
|
// Send the request on the stream. Bail out if the stream is closed.
|
|
|
|
|
req_tx.send(req).map_err(|_| {
|
|
|
|
|
tonic::Status::unavailable("stream closed")
|
|
|
|
|
})?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Receive responses from the stream and send them to callers.
|
|
|
|
|
resp = resp_stream.next() => {
|
|
|
|
|
// Shut down if the stream is closed, and bail out on stream errors.
|
|
|
|
|
let Some(resp) = resp.transpose()? else {
|
|
|
|
|
return Ok(())
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Send the response to the caller. Ignore errors if the caller went away.
|
|
|
|
|
let Some(resp_tx) = callers.remove(&resp.request_id) else {
|
|
|
|
|
warn!("received response for unknown request ID: {}", resp.request_id);
|
|
|
|
|
continue;
|
|
|
|
|
};
|
|
|
|
|
_ = resp_tx.send(Ok(resp));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Reapable for StreamPool {
|
|
|
|
|
/// Reaps streams that have been idle since before the cutoff.
|
|
|
|
|
fn reap_idle(&self, cutoff: Instant) {
|
|
|
|
|
self.streams.lock().unwrap().retain(|_, entry| {
|
|
|
|
|
let Some(idle_since) = entry.idle_since else {
|
|
|
|
|
assert_ne!(entry.queue_depth, 0, "empty stream not marked idle");
|
|
|
|
|
return true;
|
|
|
|
|
};
|
|
|
|
|
assert_eq!(entry.queue_depth, 0, "idle stream has requests");
|
|
|
|
|
idle_since >= cutoff
|
|
|
|
|
});
|
|
|
|
|
self.idle
|
|
|
|
|
.lock()
|
|
|
|
|
.unwrap()
|
|
|
|
|
.retain(|_, entry| entry.idle_since >= cutoff);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A pooled stream reference. Can be used to send a single request, to properly enforce queue
|
|
|
|
|
/// depth. Queue depth is already reserved and will be returned on drop.
|
|
|
|
|
/// A stream acquired from the pool. Returned to the pool when dropped, unless there are still
|
|
|
|
|
/// in-flight requests on the stream, or the stream failed.
|
|
|
|
|
pub struct StreamGuard {
|
|
|
|
|
pool: Weak<StreamPool>,
|
|
|
|
|
id: StreamID,
|
|
|
|
|
sender: RequestSender,
|
|
|
|
|
stream: Option<BiStream>, // Some until dropped
|
|
|
|
|
can_reuse: bool, // returned to pool if true
|
|
|
|
|
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl StreamGuard {
|
|
|
|
|
/// Sends a request on the stream and awaits the response. Consumes the guard, since it's only
|
|
|
|
|
/// valid for a single request (to enforce queue depth). This also drops the guard on return and
|
|
|
|
|
/// returns the queue depth quota to the pool.
|
|
|
|
|
/// Sends a request on the stream and awaits the response. If the future is dropped before it
|
|
|
|
|
/// resolves (e.g. due to a timeout or cancellation), the stream will be closed to cancel the
|
|
|
|
|
/// request and is not returned to the pool. The same is true if the stream errors, in which
|
|
|
|
|
/// case the caller can't send further requests on the stream.
|
|
|
|
|
///
|
|
|
|
|
/// The `GetPageRequest::request_id` must be unique across in-flight requests.
|
|
|
|
|
/// We only support sending a single request at a time, to eliminate head-of-line blocking. See
|
|
|
|
|
/// module documentation for details.
|
|
|
|
|
///
|
|
|
|
|
/// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
|
|
|
|
|
/// to avoid tearing down the stream for per-request errors. Callers must check this.
|
|
|
|
|
pub async fn send(
|
|
|
|
|
self,
|
|
|
|
|
&mut self,
|
|
|
|
|
req: page_api::GetPageRequest,
|
|
|
|
|
) -> tonic::Result<page_api::GetPageResponse> {
|
|
|
|
|
let (resp_tx, resp_rx) = oneshot::channel();
|
|
|
|
|
let req_id = req.request_id;
|
|
|
|
|
let stream = self.stream.as_mut().expect("not dropped");
|
|
|
|
|
|
|
|
|
|
self.sender
|
|
|
|
|
.send((req, resp_tx))
|
|
|
|
|
.await
|
|
|
|
|
// Mark the stream as not reusable while the request is in flight. We can't return the
|
|
|
|
|
// stream to the pool until we receive the response, to avoid head-of-line blocking and
|
|
|
|
|
// stale responses. Failed streams can't be reused either.
|
|
|
|
|
if !self.can_reuse {
|
|
|
|
|
return Err(tonic::Status::internal("stream can't be reused"));
|
|
|
|
|
}
|
|
|
|
|
self.can_reuse = false;
|
|
|
|
|
|
|
|
|
|
// Send the request and receive the response.
|
|
|
|
|
//
|
|
|
|
|
// NB: this uses a watch channel, so it's unsafe to change this code to pipeline requests.
|
|
|
|
|
stream
|
|
|
|
|
.sender
|
|
|
|
|
.send(req)
|
|
|
|
|
.map_err(|_| tonic::Status::unavailable("stream closed"))?;
|
|
|
|
|
|
|
|
|
|
resp_rx
|
|
|
|
|
let resp = stream
|
|
|
|
|
.receiver
|
|
|
|
|
.next()
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|_| tonic::Status::unavailable("stream closed"))?
|
|
|
|
|
.ok_or_else(|| tonic::Status::unavailable("stream closed"))??;
|
|
|
|
|
|
|
|
|
|
if resp.request_id != req_id {
|
|
|
|
|
return Err(tonic::Status::internal(format!(
|
|
|
|
|
"response ID {} does not match request ID {}",
|
|
|
|
|
resp.request_id, req_id
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Success, mark the stream as reusable.
|
|
|
|
|
self.can_reuse = true;
|
|
|
|
|
|
|
|
|
|
Ok(resp)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -669,26 +585,21 @@ impl Drop for StreamGuard {
|
|
|
|
|
return; // pool was dropped
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Release the queue depth reservation on drop. This can prematurely decrement it if dropped
|
|
|
|
|
// before the response is received, but that's okay.
|
|
|
|
|
//
|
|
|
|
|
// TODO: actually, it's probably not okay. Queue depth release should be moved into the
|
|
|
|
|
// stream task, such that it continues to account for the queue depth slot until the server
|
|
|
|
|
// responds. Otherwise, if a slow request times out and keeps blocking the stream, the
|
|
|
|
|
// server will keep waiting on it and we can pile on subsequent requests (including the
|
|
|
|
|
// timeout retry) in the same stream and get blocked. But we may also want to avoid blocking
|
|
|
|
|
// requests on e.g. LSN waits and layer downloads, instead returning early to free up the
|
|
|
|
|
// stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line
|
|
|
|
|
// blocking. TBD.
|
|
|
|
|
let mut streams = pool.streams.lock().unwrap();
|
|
|
|
|
let entry = streams.get_mut(&self.id).expect("unknown stream");
|
|
|
|
|
assert!(entry.idle_since.is_none(), "active stream marked idle");
|
|
|
|
|
assert!(entry.queue_depth > 0, "stream queue underflow");
|
|
|
|
|
entry.queue_depth -= 1;
|
|
|
|
|
if entry.queue_depth == 0 {
|
|
|
|
|
entry.idle_since = Some(Instant::now()); // mark stream as idle
|
|
|
|
|
// If the stream isn't reusable, it can't be returned to the pool.
|
|
|
|
|
if !self.can_reuse {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Place the idle stream back into the pool.
|
|
|
|
|
let entry = StreamEntry {
|
|
|
|
|
stream: self.stream.take().expect("dropped once"),
|
|
|
|
|
idle_since: Instant::now(),
|
|
|
|
|
};
|
|
|
|
|
pool.idle
|
|
|
|
|
.lock()
|
|
|
|
|
.unwrap()
|
|
|
|
|
.insert(entry.stream.client.id, entry);
|
|
|
|
|
|
|
|
|
|
_ = self.permit; // returned on drop, referenced for visibility
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|