diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index e790f4018e..393f89819a 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -143,7 +143,7 @@ impl PageserverClient { req: page_api::CheckRelExistsRequest, ) -> tonic::Result { self.retry - .with(async || { + .with(async |_| { // Relation metadata is only available on shard 0. let mut client = self.shards.load_full().get_zero().client().await?; client.check_rel_exists(req).await @@ -158,7 +158,7 @@ impl PageserverClient { req: page_api::GetDbSizeRequest, ) -> tonic::Result { self.retry - .with(async || { + .with(async |_| { // Relation metadata is only available on shard 0. let mut client = self.shards.load_full().get_zero().client().await?; client.get_db_size(req).await @@ -166,8 +166,9 @@ impl PageserverClient { .await } - /// Fetches pages. The `request_id` must be unique across all in-flight requests. Automatically - /// splits requests that straddle shard boundaries, and assembles the responses. + /// Fetches pages. The `request_id` must be unique across all in-flight requests, and the + /// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle + /// shard boundaries, and assembles the responses. /// /// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status` /// errors. All responses will have `GetPageStatusCode::Ok`. @@ -187,6 +188,10 @@ impl PageserverClient { if req.block_numbers.is_empty() { return Err(tonic::Status::invalid_argument("no block number")); } + // The request attempt must be 0. The client will increment it internally. + if req.request_id.attempt != 0 { + return Err(tonic::Status::invalid_argument("request attempt must be 0")); + } // The shards may change while we're fetching pages. We execute the request using a stable // view of the shards (especially important for requests that span shards), but retry the @@ -197,7 +202,11 @@ impl PageserverClient { // TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this // once we figure out how to handle these. self.retry - .with(async || Self::get_page_with_shards(req.clone(), &self.shards.load_full()).await) + .with(async |attempt| { + let mut req = req.clone(); + req.request_id.attempt = attempt as u32; + Self::get_page_with_shards(req, &self.shards.load_full()).await + }) .await } @@ -267,7 +276,7 @@ impl PageserverClient { req: page_api::GetRelSizeRequest, ) -> tonic::Result { self.retry - .with(async || { + .with(async |_| { // Relation metadata is only available on shard 0. let mut client = self.shards.load_full().get_zero().client().await?; client.get_rel_size(req).await @@ -282,7 +291,7 @@ impl PageserverClient { req: page_api::GetSlruSegmentRequest, ) -> tonic::Result { self.retry - .with(async || { + .with(async |_| { // SLRU segments are only available on shard 0. let mut client = self.shards.load_full().get_zero().client().await?; client.get_slru_segment(req).await diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 2dde40b5b4..906872e091 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -591,6 +591,10 @@ impl StreamPool { // Track caller response channels by request ID. If the task returns early, these response // channels will be dropped and the waiting callers will receive an error. + // + // NB: this will leak entries if the server doesn't respond to a request (by request ID). + // It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and + // block further use. But we could consider reaping closed channels after some time. let mut callers = HashMap::new(); // Process requests and responses. @@ -695,6 +699,15 @@ impl Drop for StreamGuard { // Release the queue depth reservation on drop. This can prematurely decrement it if dropped // before the response is received, but that's okay. + // + // TODO: actually, it's probably not okay. Queue depth release should be moved into the + // stream task, such that it continues to account for the queue depth slot until the server + // responds. Otherwise, if a slow request times out and keeps blocking the stream, the + // server will keep waiting on it and we can pile on subsequent requests (including the + // timeout retry) in the same stream and get blocked. But we may also want to avoid blocking + // requests on e.g. LSN waits and layer downloads, instead returning early to free up the + // stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line + // blocking. TBD. let mut streams = pool.streams.lock().unwrap(); let entry = streams.get_mut(&self.id).expect("unknown stream"); assert!(entry.idle_since.is_none(), "active stream marked idle"); diff --git a/pageserver/client_grpc/src/retry.rs b/pageserver/client_grpc/src/retry.rs index a4d4b19870..a1e0b8636f 100644 --- a/pageserver/client_grpc/src/retry.rs +++ b/pageserver/client_grpc/src/retry.rs @@ -23,14 +23,14 @@ impl Retry { /// If true, log successful requests. For debugging. const LOG_SUCCESS: bool = false; - /// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors, - /// using the current tracing span for context. + /// Runs the given async closure with timeouts and retries (exponential backoff), passing the + /// attempt number starting at 0. Logs errors, using the current tracing span for context. /// /// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default /// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`]. pub async fn with(&self, mut f: F) -> tonic::Result where - F: FnMut() -> O, + F: FnMut(usize) -> O, // takes attempt number, starting at 0 O: Future>, { let started = Instant::now(); @@ -47,7 +47,7 @@ impl Retry { } let request_started = Instant::now(); - tokio::time::timeout(Self::REQUEST_TIMEOUT, f()) + tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries)) .await .map_err(|_| { tonic::Status::deadline_exceeded(format!( diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index 1d6c230916..b1f266d910 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -153,7 +153,7 @@ message GetDbSizeResponse { message GetPageRequest { // A request ID. Will be included in the response. Should be unique for // in-flight requests on the stream. - uint64 request_id = 1; + RequestID request_id = 1; // The request class. GetPageClass request_class = 2; // The LSN to read at. @@ -177,6 +177,14 @@ message GetPageRequest { repeated uint32 block_number = 5; } +// A Request ID. Should be unique for in-flight requests on a stream. Included in the response. +message RequestID { + // The base request ID. + uint64 id = 1; + // The request attempt. Starts at 0, incremented on each retry. + uint32 attempt = 2; +} + // A GetPageRequest class. Primarily intended for observability, but may also be // used for prioritization in the future. enum GetPageClass { @@ -199,7 +207,7 @@ enum GetPageClass { // the entire batch is ready, so no one can make use of the individual pages. message GetPageResponse { // The original request's ID. - uint64 request_id = 1; + RequestID request_id = 1; // The response status code. GetPageStatusCode status_code = 2; // A string describing the status, if any. diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index d0d3517d41..4db8237ad8 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -356,7 +356,10 @@ impl TryFrom for GetPageRequest { return Err(ProtocolError::Missing("block_number")); } Ok(Self { - request_id: pb.request_id, + request_id: pb + .request_id + .ok_or(ProtocolError::Missing("request_id"))? + .into(), request_class: pb.request_class.into(), read_lsn: pb .read_lsn @@ -371,7 +374,7 @@ impl TryFrom for GetPageRequest { impl From for proto::GetPageRequest { fn from(request: GetPageRequest) -> Self { Self { - request_id: request.request_id, + request_id: Some(request.request_id.into()), request_class: request.request_class.into(), read_lsn: Some(request.read_lsn.into()), rel: Some(request.rel.into()), @@ -380,8 +383,51 @@ impl From for proto::GetPageRequest { } } -/// A GetPage request ID. -pub type RequestID = u64; +/// A GetPage request ID and retry attempt. Should be unique for in-flight requests on a stream. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct RequestID { + /// The base request ID. + pub id: u64, + // The request attempt. Starts at 0, incremented on each retry. + pub attempt: u32, +} + +impl RequestID { + /// Creates a new RequestID with the given ID and an initial attempt of 0. + pub fn new(id: u64) -> Self { + Self { id, attempt: 0 } + } +} + +impl Display for RequestID { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.id, self.attempt) + } +} + +impl From for RequestID { + fn from(pb: proto::RequestId) -> Self { + Self { + id: pb.id, + attempt: pb.attempt, + } + } +} + +impl From for RequestID { + fn from(id: u64) -> Self { + Self::new(id) + } +} + +impl From for proto::RequestId { + fn from(request_id: RequestID) -> Self { + Self { + id: request_id.id, + attempt: request_id.attempt, + } + } +} /// A GetPage request class. #[derive(Clone, Copy, Debug, strum_macros::Display)] @@ -467,7 +513,7 @@ pub struct GetPageResponse { impl From for GetPageResponse { fn from(pb: proto::GetPageResponse) -> Self { Self { - request_id: pb.request_id, + request_id: pb.request_id.unwrap_or_default().into(), status_code: pb.status_code.into(), reason: Some(pb.reason).filter(|r| !r.is_empty()), page_images: pb.page_image, @@ -478,7 +524,7 @@ impl From for GetPageResponse { impl From for proto::GetPageResponse { fn from(response: GetPageResponse) -> Self { Self { - request_id: response.request_id, + request_id: Some(response.request_id.into()), status_code: response.status_code.into(), reason: response.reason.unwrap_or_default(), page_image: response.page_images, diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 42c7e40489..b5c191e29a 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -674,7 +674,7 @@ impl Client for GrpcClient { blks: Vec, ) -> anyhow::Result<()> { let req = page_api::GetPageRequest { - request_id: req_id, + request_id: req_id.into(), request_class: page_api::GetPageClass::Normal, read_lsn: page_api::ReadLsn { request_lsn: req_lsn, @@ -694,7 +694,7 @@ impl Client for GrpcClient { "unexpected status code: {}", resp.status_code, ); - Ok((resp.request_id, resp.page_images)) + Ok((resp.request_id.id, resp.page_images)) } } @@ -740,7 +740,7 @@ impl Client for RichGrpcClient { blks: Vec, ) -> anyhow::Result<()> { let req = page_api::GetPageRequest { - request_id: req_id, + request_id: req_id.into(), request_class: page_api::GetPageClass::Normal, read_lsn: page_api::ReadLsn { request_lsn: req_lsn, @@ -761,6 +761,6 @@ impl Client for RichGrpcClient { async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)> { let resp = self.requests.next().await.unwrap()?; - Ok((resp.request_id, resp.page_images)) + Ok((resp.request_id.id, resp.page_images)) } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ebb1addcdb..b2f6cd465d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3338,9 +3338,12 @@ impl GrpcPageServiceHandler { } /// Generates a PagestreamRequest header from a ReadLsn and request ID. - fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest { + fn make_hdr( + read_lsn: page_api::ReadLsn, + req_id: Option, + ) -> PagestreamRequest { PagestreamRequest { - reqid: req_id, + reqid: req_id.map(|r| r.id).unwrap_or_default(), request_lsn: read_lsn.request_lsn, not_modified_since: read_lsn .not_modified_since_lsn @@ -3450,7 +3453,7 @@ impl GrpcPageServiceHandler { batch.push(BatchedGetPageRequest { req: PagestreamGetPageRequest { - hdr: Self::make_hdr(req.read_lsn, req.request_id), + hdr: Self::make_hdr(req.read_lsn, Some(req.request_id)), rel: req.rel, blkno, }, @@ -3528,7 +3531,7 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(rel=%req.rel, lsn=%req.read_lsn); let req = PagestreamExistsRequest { - hdr: Self::make_hdr(req.read_lsn, 0), + hdr: Self::make_hdr(req.read_lsn, None), rel: req.rel, }; @@ -3678,7 +3681,7 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn); let req = PagestreamDbSizeRequest { - hdr: Self::make_hdr(req.read_lsn, 0), + hdr: Self::make_hdr(req.read_lsn, None), dbnode: req.db_oid, }; @@ -3728,7 +3731,7 @@ impl proto::PageService for GrpcPageServiceHandler { .await? .downgrade(); while let Some(req) = reqs.message().await? { - let req_id = req.request_id; + let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default(); let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) .instrument(span.clone()) // propagate request span .await; @@ -3767,7 +3770,7 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(rel=%req.rel, lsn=%req.read_lsn); let req = PagestreamNblocksRequest { - hdr: Self::make_hdr(req.read_lsn, 0), + hdr: Self::make_hdr(req.read_lsn, None), rel: req.rel, }; @@ -3800,7 +3803,7 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn); let req = PagestreamGetSlruSegmentRequest { - hdr: Self::make_hdr(req.read_lsn, 0), + hdr: Self::make_hdr(req.read_lsn, None), kind: req.kind as u8, segno: req.segno, };