mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 05:52:55 +00:00
Close gRPC getpage streams on shutdown
Some tests were failing, because pageserver didn't shut down promptly. Tonic server graceful shutdown was a little too graceful; any open streams linger until they're closed. Check the cancellation token while waiting for next request, and close the stream if shutdown/cancellation was requested.
This commit is contained in:
@@ -3170,6 +3170,7 @@ where
|
||||
pub struct GrpcPageServiceHandler {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
}
|
||||
@@ -3222,6 +3223,7 @@ impl GrpcPageServiceHandler {
|
||||
let page_service_handler = GrpcPageServiceHandler {
|
||||
tenant_manager,
|
||||
ctx,
|
||||
cancel: cancel.clone(),
|
||||
gate_guard: gate.enter().expect("gate was just created"),
|
||||
get_vectored_concurrent_io,
|
||||
};
|
||||
@@ -3672,6 +3674,7 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
// Spawn a task to handle the GetPageRequest stream.
|
||||
let span = Span::current();
|
||||
let ctx = self.ctx.attached_child();
|
||||
let cancel = self.cancel.clone();
|
||||
let mut reqs = req.into_inner();
|
||||
|
||||
let resps = async_stream::try_stream! {
|
||||
@@ -3679,7 +3682,20 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
|
||||
.await?
|
||||
.downgrade();
|
||||
while let Some(req) = reqs.message().await? {
|
||||
|
||||
loop {
|
||||
let req = tokio::select! {
|
||||
req = reqs.message() => req,
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("closing getpages stream due to shutdown");
|
||||
break;
|
||||
},
|
||||
};
|
||||
let req = if let Some(req) = req? {
|
||||
req
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
let req_id = req.request_id;
|
||||
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
|
||||
.instrument(span.clone()) // propagate request span
|
||||
|
||||
Reference in New Issue
Block a user