From 17ec37aab2e708b005d3bb6551c1098763e27afd Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 5 Jul 2025 14:08:28 +0300 Subject: [PATCH] Close gRPC getpage streams on shutdown Some tests were failing, because pageserver didn't shut down promptly. Tonic server graceful shutdown was a little too graceful; any open streams linger until they're closed. Check the cancellation token while waiting for next request, and close the stream if shutdown/cancellation was requested. --- pageserver/src/page_service.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 586f03b19c..ad50d32dc8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3170,6 +3170,7 @@ where pub struct GrpcPageServiceHandler { tenant_manager: Arc, ctx: RequestContext, + cancel: CancellationToken, gate_guard: GateGuard, get_vectored_concurrent_io: GetVectoredConcurrentIo, } @@ -3222,6 +3223,7 @@ impl GrpcPageServiceHandler { let page_service_handler = GrpcPageServiceHandler { tenant_manager, ctx, + cancel: cancel.clone(), gate_guard: gate.enter().expect("gate was just created"), get_vectored_concurrent_io, }; @@ -3672,6 +3674,7 @@ impl proto::PageService for GrpcPageServiceHandler { // Spawn a task to handle the GetPageRequest stream. let span = Span::current(); let ctx = self.ctx.attached_child(); + let cancel = self.cancel.clone(); let mut reqs = req.into_inner(); let resps = async_stream::try_stream! { @@ -3679,7 +3682,20 @@ impl proto::PageService for GrpcPageServiceHandler { .get(ttid.tenant_id, ttid.timeline_id, shard_selector) .await? .downgrade(); - while let Some(req) = reqs.message().await? { + + loop { + let req = tokio::select! { + req = reqs.message() => req, + _ = cancel.cancelled() => { + tracing::info!("closing getpages stream due to shutdown"); + break; + }, + }; + let req = if let Some(req) = req? { + req + } else { + break; + }; let req_id = req.request_id; let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) .instrument(span.clone()) // propagate request span