From a4c76740c062828709f540edb42b56828731f350 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 17 Jun 2025 08:41:17 -0700 Subject: [PATCH] pageserver: emit gRPC GetPage errors as responses (#12255) ## Problem When converting `proto::GetPageRequest` into `page_api::GetPageRequest` and validating the request, errors are returned as `tonic::Status`. This will tear down the GetPage stream, which is disruptive and unnecessary. ## Summary of changes Emit invalid request errors as `GetPageResponse` with an appropriate `status_code` instead. Also move the conversion from `tonic::Status` to `GetPageResponse` out into the stream handler. --- pageserver/page_api/src/model.rs | 66 ++++++++++++++++++++++++++ pageserver/src/page_service.rs | 80 ++++++-------------------------- 2 files changed, 81 insertions(+), 65 deletions(-) diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 799f48712f..a01bba0572 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -417,6 +417,39 @@ impl From for proto::GetPageResponse { } } +impl GetPageResponse { + /// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a + /// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a + /// GetPageResponse with a non-OK status code instead. + #[allow(clippy::result_large_err)] + pub fn try_from_status( + status: tonic::Status, + request_id: RequestID, + ) -> Result { + // We shouldn't see an OK status here, because we're emitting an error. + debug_assert_ne!(status.code(), tonic::Code::Ok); + if status.code() == tonic::Code::Ok { + return Err(tonic::Status::internal(format!( + "unexpected OK status: {status:?}", + ))); + } + + // If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request + // error and we should return a tonic::Status to terminate the stream. + let Ok(status_code) = status.code().try_into() else { + return Err(status); + }; + + // Return a GetPageResponse for the status. + Ok(Self { + request_id, + status_code, + reason: Some(status.message().to_string()), + page_images: Vec::new(), + }) + } +} + /// A GetPage response status code. /// /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream @@ -480,6 +513,39 @@ impl From for i32 { } } +impl TryFrom for GetPageStatusCode { + type Error = tonic::Code; + + fn try_from(code: tonic::Code) -> Result { + use tonic::Code; + + let status_code = match code { + Code::Ok => Self::Ok, + + // These are per-request errors, which should be returned as GetPageResponses. + Code::AlreadyExists => Self::InvalidRequest, + Code::DataLoss => Self::InternalError, + Code::FailedPrecondition => Self::InvalidRequest, + Code::InvalidArgument => Self::InvalidRequest, + Code::Internal => Self::InternalError, + Code::NotFound => Self::NotFound, + Code::OutOfRange => Self::InvalidRequest, + Code::ResourceExhausted => Self::SlowDown, + + // These should terminate the stream by returning a tonic::Status. + Code::Aborted + | Code::Cancelled + | Code::DeadlineExceeded + | Code::PermissionDenied + | Code::Unauthenticated + | Code::Unavailable + | Code::Unimplemented + | Code::Unknown => return Err(code), + }; + Ok(status_code) + } +} + // Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other // shards will error. #[derive(Clone, Copy, Debug)] diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0521f5c556..79c4c0faa9 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -623,60 +623,6 @@ enum PageStreamError { BadRequest(Cow<'static, str>), } -impl PageStreamError { - /// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status - /// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a - /// convenience method for use from a get_pages gRPC stream. - #[allow(clippy::result_large_err)] - fn into_get_page_response( - self, - request_id: page_api::RequestID, - ) -> Result { - use page_api::GetPageStatusCode; - use tonic::Code; - - // We dispatch to Into first, and then map it to a GetPageResponse. - let status: tonic::Status = self.into(); - let status_code = match status.code() { - // We shouldn't see an OK status here, because we're emitting an error. - Code::Ok => { - debug_assert_ne!(status.code(), Code::Ok); - return Err(tonic::Status::internal(format!( - "unexpected OK status: {status:?}", - ))); - } - - // These are per-request errors, returned as GetPageResponses. - Code::AlreadyExists => GetPageStatusCode::InvalidRequest, - Code::DataLoss => GetPageStatusCode::InternalError, - Code::FailedPrecondition => GetPageStatusCode::InvalidRequest, - Code::InvalidArgument => GetPageStatusCode::InvalidRequest, - Code::Internal => GetPageStatusCode::InternalError, - Code::NotFound => GetPageStatusCode::NotFound, - Code::OutOfRange => GetPageStatusCode::InvalidRequest, - Code::ResourceExhausted => GetPageStatusCode::SlowDown, - - // These should terminate the stream. - Code::Aborted => return Err(status), - Code::Cancelled => return Err(status), - Code::DeadlineExceeded => return Err(status), - Code::PermissionDenied => return Err(status), - Code::Unauthenticated => return Err(status), - Code::Unavailable => return Err(status), - Code::Unimplemented => return Err(status), - Code::Unknown => return Err(status), - }; - - Ok(page_api::GetPageResponse { - request_id, - status_code, - reason: Some(status.message().to_string()), - page_images: Vec::new(), - } - .into()) - } -} - impl From for tonic::Status { fn from(err: PageStreamError) -> Self { use tonic::Code; @@ -3438,8 +3384,8 @@ impl GrpcPageServiceHandler { /// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC. /// - /// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse - /// with an appropriate status code instead. + /// NB: errors returned from here are intercepted in get_pages(), and may be converted to a + /// GetPageResponse with an appropriate status code to avoid terminating the stream. /// /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or @@ -3456,7 +3402,7 @@ impl GrpcPageServiceHandler { let ctx = ctx.with_scope_page_service_pagestream(&timeline); // Validate the request, decorate the span, and convert it to a Pagestream request. - let req: page_api::GetPageRequest = req.try_into()?; + let req = page_api::GetPageRequest::try_from(req)?; span_record!( req_id = %req.request_id, @@ -3467,7 +3413,7 @@ impl GrpcPageServiceHandler { ); let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard - let effective_lsn = match PageServerHandler::effective_request_lsn( + let effective_lsn = PageServerHandler::effective_request_lsn( &timeline, timeline.get_last_record_lsn(), req.read_lsn.request_lsn, @@ -3475,10 +3421,7 @@ impl GrpcPageServiceHandler { .not_modified_since_lsn .unwrap_or(req.read_lsn.request_lsn), &latest_gc_cutoff_lsn, - ) { - Ok(lsn) => lsn, - Err(err) => return err.into_get_page_response(req.request_id), - }; + )?; let mut batch = SmallVec::with_capacity(req.block_numbers.len()); for blkno in req.block_numbers { @@ -3535,7 +3478,7 @@ impl GrpcPageServiceHandler { "unexpected response: {resp:?}" ))); } - Err(err) => return err.err.into_get_page_response(req.request_id), + Err(err) => return Err(err.err.into()), }; } @@ -3745,9 +3688,16 @@ impl proto::PageService for GrpcPageServiceHandler { .await? .downgrade(); while let Some(req) = reqs.message().await? { - yield Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) + let req_id = req.request_id; + let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) .instrument(span.clone()) // propagate request span - .await? + .await; + yield match result { + Ok(resp) => resp, + // Convert per-request errors to GetPageResponses as appropriate, or terminate + // the stream with a tonic::Status. + Err(err) => page_api::GetPageResponse::try_from_status(err, req_id)?.into(), + } } };