diff --git a/Cargo.lock b/Cargo.lock index bea8d3a7fd..2f36790d30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7564,6 +7564,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0d521ee4d9..df2064a4a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -201,7 +201,7 @@ tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.g tokio-io-timeout = "1.2.0" tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["sync"] } tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "io-util", "rt"] } toml = "0.8" diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index 7732585f7c..4b606d6939 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -32,21 +32,13 @@ const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); /// Max number of concurrent unary request clients per shard. const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); -/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage -/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`. +/// Max number of concurrent GetPage streams per shard. const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); -/// Max number of pipelined requests per stream. -const MAX_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(2).unwrap(); - /// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these -/// are more throughput-oriented, we have a smaller limit but higher queue depth. +/// are more throughput-oriented, we have a smaller limit. const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); -/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus -/// get a larger queue depth. -const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); - /// The overall request call timeout, including retries and pool acquisition. /// TODO: should we retry forever? Should the caller decide? const CALL_TIMEOUT: Duration = Duration::from_secs(60); @@ -272,7 +264,7 @@ impl PageserverClient { req: page_api::GetPageRequest, shard: &Shard, ) -> tonic::Result { - let stream = shard.stream(req.request_class.is_bulk()).await; + let mut stream = shard.stream(req.request_class.is_bulk()).await?; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -557,7 +549,6 @@ impl Shard { None, // unbounded, limited by stream pool ), Some(MAX_STREAMS), - MAX_STREAM_QUEUE_DEPTH, ); // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools @@ -573,7 +564,6 @@ impl Shard { None, // unbounded, limited by stream pool ), Some(MAX_BULK_STREAMS), - MAX_BULK_STREAM_QUEUE_DEPTH, ); Ok(Self { @@ -593,13 +583,12 @@ impl Shard { pin!(self.client_pool.get()), ) .await - .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) } /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream /// pool (e.g. for prefetches). #[instrument(skip_all, fields(bulk))] - async fn stream(&self, bulk: bool) -> StreamGuard { + async fn stream(&self, bulk: bool) -> tonic::Result { let pool = match bulk { false => &self.stream_pool, true => &self.bulk_stream_pool, diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 4a29252cd9..98a649b4c8 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -18,11 +18,27 @@ //! from the pool after a while to free up resources. //! //! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the -//! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it -//! returns a guard that can be used to send a single request, to properly enforce queue depth and -//! route responses. Internally, the pool will reuse or spin up a suitable stream for the request, -//! possibly pipelining multiple requests from multiple callers on the same stream (up to some -//! queue depth). Idle streams are removed from the pool after a while to free up resources. +//! ClientPool for the stream's lifetime. A stream can only be acquired by a single caller at a +//! time, and is returned to the pool when dropped. Idle streams are removed from the pool after +//! a while to free up resources. +//! +//! The stream only supports sending a single, synchronous request at a time, and does not support +//! pipelining multiple requests from different callers onto the same stream -- instead, we scale +//! out concurrent streams to improve throughput. There are many reasons for this design choice: +//! +//! * It (mostly) eliminates head-of-line blocking. A single stream is processed sequentially by +//! a single server task, which may block e.g. on layer downloads, LSN waits, etc. +//! +//! * Cancellation becomes trivial, by closing the stream. Otherwise, if a caller goes away +//! (e.g. because of a timeout), the request would still be processed by the server and block +//! requests behind it in the stream. It might even block its own timeout retry. +//! +//! * Stream scheduling becomes significantly simpler and cheaper. +//! +//! * Individual callers can still use client-side batching for pipelining. +//! +//! * Idle streams are cheap. Benchmarks show that an idle GetPage stream takes up about 26 KB +//! per stream (2.5 GB for 100,000 streams), so we can afford to scale out. //! //! Each channel corresponds to one TCP connection. Each client unary request and each stream //! corresponds to one HTTP/2 stream and server task. @@ -30,20 +46,20 @@ //! TODO: error handling (including custom error types). //! TODO: observability. -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::num::NonZero; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::{Duration, Instant}; -use futures::StreamExt as _; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; +use futures::{Stream, StreamExt as _}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch}; +use tokio_stream::wrappers::WatchStream; use tokio_util::sync::CancellationToken; use tonic::codec::CompressionEncoding; use tonic::transport::{Channel, Endpoint}; -use tracing::{error, warn}; use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; @@ -225,8 +241,7 @@ pub struct ClientPool { /// /// The first client in the map will be acquired next. The map is sorted by client ID, which in /// turn is sorted by its channel ID, such that we prefer acquiring idle clients from - /// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle - /// clients are reaped. + /// lower-ordered channels. This allows us to free up and reap higher-ordered channels. idle: Mutex>, /// Reaps idle clients. idle_reaper: Reaper, @@ -282,7 +297,7 @@ impl ClientPool { /// This is moderately performance-sensitive. It is called for every unary request, but these /// establish a new gRPC stream per request so they're already expensive. GetPage requests use /// the `StreamPool` instead. - pub async fn get(self: &Arc) -> anyhow::Result { + pub async fn get(self: &Arc) -> tonic::Result { // Acquire a permit if the pool is bounded. let mut permit = None; if let Some(limiter) = self.limiter.clone() { @@ -300,7 +315,7 @@ impl ClientPool { }); } - // Slow path: construct a new client. + // Construct a new client. let mut channel_guard = self.channel_pool.get(); let client = page_api::Client::new( channel_guard.take(), @@ -309,7 +324,8 @@ impl ClientPool { self.shard_id, self.auth_token.clone(), self.compression, - )?; + ) + .map_err(|err| tonic::Status::internal(format!("failed to create client: {err}")))?; Ok(ClientGuard { pool: Arc::downgrade(self), @@ -379,287 +395,187 @@ impl Drop for ClientGuard { /// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream /// acquires a client from the inner `ClientPool` for the stream's lifetime. /// -/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send -/// a single request and await the response. Internally, requests are multiplexed across streams and -/// channels. This allows proper queue depth enforcement and response routing. +/// Individual streams only send a single request at a time, and do not pipeline multiple callers +/// onto the same stream. Instead, we scale out the number of concurrent streams. This is primarily +/// to eliminate head-of-line blocking. See the module documentation for more details. /// /// TODO: consider making this generic over request and response types; not currently needed. pub struct StreamPool { /// The client pool to acquire clients from. Must be unbounded. client_pool: Arc, - /// All pooled streams. + /// Idle pooled streams. Acquired streams are removed from here and returned on drop. /// - /// Incoming requests will be sent over an existing stream with available capacity. If all - /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each - /// stream has an associated Tokio task that processes requests and responses. - streams: Mutex>, - /// The max number of concurrent streams, or None if unbounded. - max_streams: Option>, - /// The max number of concurrent requests per stream. - max_queue_depth: NonZero, - /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`. - /// None if the pool is unbounded. + /// The first stream in the map will be acquired next. The map is sorted by stream ID, which is + /// equivalent to the client ID and in turn sorted by its channel ID. This way we prefer + /// acquiring idle streams from lower-ordered channels, which allows us to free up and reap + /// higher-ordered channels. + idle: Mutex>, + /// Limits the max number of concurrent streams. None if the pool is unbounded. limiter: Option>, /// Reaps idle streams. idle_reaper: Reaper, - /// Stream ID generator. - next_stream_id: AtomicUsize, } -type StreamID = usize; -type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>; -type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>; -type ResponseSender = oneshot::Sender>; +/// The stream ID. Reuses the inner client ID. +type StreamID = ClientID; +/// A pooled stream. struct StreamEntry { - /// Sends caller requests to the stream task. The stream task exits when this is dropped. - sender: RequestSender, - /// Number of in-flight requests on this stream. - queue_depth: usize, - /// The time when this stream went idle (queue_depth == 0). - /// INVARIANT: Some if queue_depth == 0, otherwise None. - idle_since: Option, + /// The bidirectional stream. + stream: BiStream, + /// The time when this stream was last used, i.e. when it was put back into `StreamPool::idle`. + idle_since: Instant, +} + +/// A bidirectional GetPage stream and its client. Can send requests and receive responses. +struct BiStream { + /// The owning client. Holds onto the channel slot while the stream is alive. + client: ClientGuard, + /// Stream for sending requests. Uses a watch channel, so it can only send a single request at a + /// time, and the caller must await the response before sending another request. This is + /// enforced by `StreamGuard::send`. + sender: watch::Sender, + /// Stream for receiving responses. + receiver: Pin> + Send>>, } impl StreamPool { - /// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth` - /// concurrent requests on each stream, and use up to `max_streams` concurrent streams. + /// Creates a new stream pool, using the given client pool. It will use up to `max_streams` + /// concurrent streams. /// /// The client pool must be unbounded. The stream pool will enforce its own limits, and because /// streams are long-lived they can cause persistent starvation if they exhaust the client pool. /// The stream pool should generally have its own dedicated client pool (but it can share a /// channel pool with others since these are always unbounded). - pub fn new( - client_pool: Arc, - max_streams: Option>, - max_queue_depth: NonZero, - ) -> Arc { + pub fn new(client_pool: Arc, max_streams: Option>) -> Arc { assert!(client_pool.limiter.is_none(), "bounded client pool"); let pool = Arc::new(Self { client_pool, - streams: Mutex::default(), - limiter: max_streams.map(|max_streams| { - Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get())) - }), - max_streams, - max_queue_depth, + idle: Mutex::default(), + limiter: max_streams.map(|max_streams| Arc::new(Semaphore::new(max_streams.get()))), idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), - next_stream_id: AtomicUsize::default(), }); pool.idle_reaper.spawn(&pool); pool } - /// Acquires an available stream from the pool, or spins up a new stream async if all streams - /// are full. Returns a guard that can be used to send a single request on the stream and await - /// the response, with queue depth quota already acquired. Blocks if the pool is at capacity - /// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight). + /// Acquires an available stream from the pool, or spins up a new stream if all streams are + /// full. Returns a guard that can be used to send requests and await the responses. Blocks if + /// the pool is full. /// /// This is very performance-sensitive, as it is on the GetPage hot path. /// - /// TODO: this must do something more sophisticated for performance. We want: - /// - /// * Cheap, concurrent access in the common case where we can use a pooled stream. - /// * Quick acquisition of pooled streams with available capacity. - /// * Prefer streams that belong to lower-numbered channels, to reap idle channels. - /// * Prefer filling up existing streams' queue depth before spinning up new streams. - /// * Don't hold a lock while spinning up new streams. - /// * Allow concurrent clients to join onto streams while they're spun up. - /// * Allow spinning up multiple streams concurrently, but don't overshoot limits. - /// - /// For now, we just do something simple but inefficient (linear scan under mutex). - pub async fn get(self: &Arc) -> StreamGuard { + /// TODO: is a `Mutex` performant enough? Will it become too contended? We can't + /// trivially use e.g. DashMap or sharding, because we want to pop lower-ordered streams first + /// to free up higher-ordered channels. + pub async fn get(self: &Arc) -> tonic::Result { // Acquire a permit if the pool is bounded. let mut permit = None; if let Some(limiter) = self.limiter.clone() { permit = Some(limiter.acquire_owned().await.expect("never closed")); } - let mut streams = self.streams.lock().unwrap(); - // Look for a pooled stream with available capacity. - for (&id, entry) in streams.iter_mut() { - assert!( - entry.queue_depth <= self.max_queue_depth.get(), - "stream queue overflow" - ); - assert_eq!( - entry.idle_since.is_some(), - entry.queue_depth == 0, - "incorrect stream idle state" - ); - if entry.queue_depth < self.max_queue_depth.get() { - entry.queue_depth += 1; - entry.idle_since = None; - return StreamGuard { - pool: Arc::downgrade(self), - id, - sender: entry.sender.clone(), - permit, - }; - } + // Fast path: acquire an idle stream from the pool. + if let Some((_, entry)) = self.idle.lock().unwrap().pop_first() { + return Ok(StreamGuard { + pool: Arc::downgrade(self), + stream: Some(entry.stream), + can_reuse: true, + permit, + }); } - // No available stream, spin up a new one. We install the stream entry in the pool first and - // return the guard, while spinning up the stream task async. This allows other callers to - // join onto this stream and also create additional streams concurrently if this fills up. - let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); - let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get()); - let entry = StreamEntry { - sender: req_tx.clone(), - queue_depth: 1, // reserve quota for this caller - idle_since: None, - }; - streams.insert(id, entry); + // Spin up a new stream. Uses a watch channel to send a single request at a time, since + // `StreamGuard::send` enforces this anyway and it avoids unnecessary channel overhead. + let mut client = self.client_pool.get().await?; - if let Some(max_streams) = self.max_streams { - assert!(streams.len() <= max_streams.get(), "stream overflow"); - }; + let (req_tx, req_rx) = watch::channel(page_api::GetPageRequest::default()); + let req_stream = WatchStream::from_changes(req_rx); + let resp_stream = client.get_pages(req_stream).await?; - let client_pool = self.client_pool.clone(); - let pool = Arc::downgrade(self); - - tokio::spawn(async move { - if let Err(err) = Self::run_stream(client_pool, req_rx).await { - error!("stream failed: {err}"); - } - // Remove stream from pool on exit. Weak reference to avoid holding the pool alive. - if let Some(pool) = pool.upgrade() { - let entry = pool.streams.lock().unwrap().remove(&id); - assert!(entry.is_some(), "unknown stream ID: {id}"); - } - }); - - StreamGuard { + Ok(StreamGuard { pool: Arc::downgrade(self), - id, - sender: req_tx, + stream: Some(BiStream { + client, + sender: req_tx, + receiver: Box::pin(resp_stream), + }), + can_reuse: true, permit, - } - } - - /// Runs a stream task. This acquires a client from the `ClientPool` and establishes a - /// bidirectional GetPage stream, then forwards requests and responses between callers and the - /// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be - /// atomic with pool stream acquisition. - /// - /// The task exits when the request channel is closed, or on a stream error. The caller is - /// responsible for removing the stream from the pool on exit. - async fn run_stream( - client_pool: Arc, - mut caller_rx: RequestReceiver, - ) -> anyhow::Result<()> { - // Acquire a client from the pool and create a stream. - let mut client = client_pool.get().await?; - - // NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could - // theoretically deadlock if both the client and server block on sends (since we're not - // reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and - // low queue depths, but it was seen to happen with the libpq protocol so better safe than - // sorry. It should never buffer more than the queue depth anyway, but using an unbounded - // channel guarantees that it will never block. - let (req_tx, req_rx) = mpsc::unbounded_channel(); - let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx); - let mut resp_stream = client.get_pages(req_stream).await?; - - // Track caller response channels by request ID. If the task returns early, these response - // channels will be dropped and the waiting callers will receive an error. - // - // NB: this will leak entries if the server doesn't respond to a request (by request ID). - // It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and - // block further use. But we could consider reaping closed channels after some time. - let mut callers = HashMap::new(); - - // Process requests and responses. - loop { - tokio::select! { - // Receive requests from callers and send them to the stream. - req = caller_rx.recv() => { - // Shut down if request channel is closed. - let Some((req, resp_tx)) = req else { - return Ok(()); - }; - - // Store the response channel by request ID. - if callers.contains_key(&req.request_id) { - // Error on request ID duplicates. Ignore callers that went away. - _ = resp_tx.send(Err(tonic::Status::invalid_argument( - format!("duplicate request ID: {}", req.request_id), - ))); - continue; - } - callers.insert(req.request_id, resp_tx); - - // Send the request on the stream. Bail out if the stream is closed. - req_tx.send(req).map_err(|_| { - tonic::Status::unavailable("stream closed") - })?; - } - - // Receive responses from the stream and send them to callers. - resp = resp_stream.next() => { - // Shut down if the stream is closed, and bail out on stream errors. - let Some(resp) = resp.transpose()? else { - return Ok(()) - }; - - // Send the response to the caller. Ignore errors if the caller went away. - let Some(resp_tx) = callers.remove(&resp.request_id) else { - warn!("received response for unknown request ID: {}", resp.request_id); - continue; - }; - _ = resp_tx.send(Ok(resp)); - } - } - } + }) } } impl Reapable for StreamPool { /// Reaps streams that have been idle since before the cutoff. fn reap_idle(&self, cutoff: Instant) { - self.streams.lock().unwrap().retain(|_, entry| { - let Some(idle_since) = entry.idle_since else { - assert_ne!(entry.queue_depth, 0, "empty stream not marked idle"); - return true; - }; - assert_eq!(entry.queue_depth, 0, "idle stream has requests"); - idle_since >= cutoff - }); + self.idle + .lock() + .unwrap() + .retain(|_, entry| entry.idle_since >= cutoff); } } -/// A pooled stream reference. Can be used to send a single request, to properly enforce queue -/// depth. Queue depth is already reserved and will be returned on drop. +/// A stream acquired from the pool. Returned to the pool when dropped, unless there are still +/// in-flight requests on the stream, or the stream failed. pub struct StreamGuard { pool: Weak, - id: StreamID, - sender: RequestSender, + stream: Option, // Some until dropped + can_reuse: bool, // returned to pool if true permit: Option, // None if pool is unbounded } impl StreamGuard { - /// Sends a request on the stream and awaits the response. Consumes the guard, since it's only - /// valid for a single request (to enforce queue depth). This also drops the guard on return and - /// returns the queue depth quota to the pool. + /// Sends a request on the stream and awaits the response. If the future is dropped before it + /// resolves (e.g. due to a timeout or cancellation), the stream will be closed to cancel the + /// request and is not returned to the pool. The same is true if the stream errors, in which + /// case the caller can't send further requests on the stream. /// - /// The `GetPageRequest::request_id` must be unique across in-flight requests. + /// We only support sending a single request at a time, to eliminate head-of-line blocking. See + /// module documentation for details. /// /// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status` /// to avoid tearing down the stream for per-request errors. Callers must check this. pub async fn send( - self, + &mut self, req: page_api::GetPageRequest, ) -> tonic::Result { - let (resp_tx, resp_rx) = oneshot::channel(); + let req_id = req.request_id; + let stream = self.stream.as_mut().expect("not dropped"); - self.sender - .send((req, resp_tx)) - .await + // Mark the stream as not reusable while the request is in flight. We can't return the + // stream to the pool until we receive the response, to avoid head-of-line blocking and + // stale responses. Failed streams can't be reused either. + if !self.can_reuse { + return Err(tonic::Status::internal("stream can't be reused")); + } + self.can_reuse = false; + + // Send the request and receive the response. + // + // NB: this uses a watch channel, so it's unsafe to change this code to pipeline requests. + stream + .sender + .send(req) .map_err(|_| tonic::Status::unavailable("stream closed"))?; - resp_rx + let resp = stream + .receiver + .next() .await - .map_err(|_| tonic::Status::unavailable("stream closed"))? + .ok_or_else(|| tonic::Status::unavailable("stream closed"))??; + + if resp.request_id != req_id { + return Err(tonic::Status::internal(format!( + "response ID {} does not match request ID {}", + resp.request_id, req_id + ))); + } + + // Success, mark the stream as reusable. + self.can_reuse = true; + + Ok(resp) } } @@ -669,26 +585,21 @@ impl Drop for StreamGuard { return; // pool was dropped }; - // Release the queue depth reservation on drop. This can prematurely decrement it if dropped - // before the response is received, but that's okay. - // - // TODO: actually, it's probably not okay. Queue depth release should be moved into the - // stream task, such that it continues to account for the queue depth slot until the server - // responds. Otherwise, if a slow request times out and keeps blocking the stream, the - // server will keep waiting on it and we can pile on subsequent requests (including the - // timeout retry) in the same stream and get blocked. But we may also want to avoid blocking - // requests on e.g. LSN waits and layer downloads, instead returning early to free up the - // stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line - // blocking. TBD. - let mut streams = pool.streams.lock().unwrap(); - let entry = streams.get_mut(&self.id).expect("unknown stream"); - assert!(entry.idle_since.is_none(), "active stream marked idle"); - assert!(entry.queue_depth > 0, "stream queue underflow"); - entry.queue_depth -= 1; - if entry.queue_depth == 0 { - entry.idle_since = Some(Instant::now()); // mark stream as idle + // If the stream isn't reusable, it can't be returned to the pool. + if !self.can_reuse { + return; } + // Place the idle stream back into the pool. + let entry = StreamEntry { + stream: self.stream.take().expect("dropped once"), + idle_since: Instant::now(), + }; + pool.idle + .lock() + .unwrap() + .insert(entry.stream.client.id, entry); + _ = self.permit; // returned on drop, referenced for visibility } } diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index a9dd154285..76355ae546 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -49,7 +49,7 @@ impl From for tonic::Status { } /// The LSN a request should read at. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, Default)] pub struct ReadLsn { /// The request's read LSN. pub request_lsn: Lsn, @@ -329,7 +329,7 @@ impl From for proto::GetDbSizeResponse { } /// Requests one or more pages. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct GetPageRequest { /// A request ID. Will be included in the response. Should be unique for in-flight requests on /// the stream. @@ -430,12 +430,13 @@ impl From for proto::RequestId { } /// A GetPage request class. -#[derive(Clone, Copy, Debug, strum_macros::Display)] +#[derive(Clone, Copy, Debug, Default, strum_macros::Display)] pub enum GetPageClass { /// Unknown class. For backwards compatibility: used when an older client version sends a class /// that a newer server version has removed. Unknown, /// A normal request. This is the default. + #[default] Normal, /// A prefetch request. NB: can only be classified on pg < 18. Prefetch, diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index fc01deb92d..c61598cdf6 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -98,7 +98,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["full", "test-util"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] } -tokio-stream = { version = "0.1", features = ["net"] } +tokio-stream = { version = "0.1", features = ["net", "sync"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] } toml_edit = { version = "0.22", features = ["serde"] } tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }