pageserver: improve gRPC cancellation

This commit is contained in:
Erik Grinaker
2025-07-17 12:34:46 +02:00
parent edcdd6ca9c
commit f765bd3677
3 changed files with 55 additions and 25 deletions

View File

@@ -3218,14 +3218,25 @@ where
pub struct GrpcPageServiceHandler {
tenant_manager: Arc<TenantManager>,
ctx: RequestContext,
/// Cancelled to shut down the server. Tonic will shut down in response to this, but wait for
/// in-flight requests to complete. Any tasks we spawn ourselves must respect this token.
cancel: CancellationToken,
/// Any tasks we spawn ourselves should clone this gate guard, so that we can wait for them to
/// complete during shutdown. Request handlers implicitly hold this guard already.
gate_guard: GateGuard,
/// `get_vectored` concurrency setting.
get_vectored_concurrent_io: GetVectoredConcurrentIo,
}
impl GrpcPageServiceHandler {
/// Spawns a gRPC server for the page service.
///
/// Returns a `CancellableTask` handle that can be used to shut down the server. It waits for
/// any in-flight requests and tasks to complete first.
///
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn(
@@ -3235,12 +3246,15 @@ impl GrpcPageServiceHandler {
get_vectored_concurrent_io: GetVectoredConcurrentIo,
listener: std::net::TcpListener,
) -> anyhow::Result<CancellableTask> {
// Set up a cancellation token for shutting down the server, and a gate to wait for all
// requests and spawned tasks to complete.
let cancel = CancellationToken::new();
let gate = Gate::default();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch)
.detached_child();
let gate = Gate::default();
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
// port early during startup.
@@ -3308,19 +3322,20 @@ impl GrpcPageServiceHandler {
.build_v1()?;
let server = server.add_service(reflection_service);
// Spawn server task.
// Spawn server task. It runs until the cancellation token fires and in-flight requests and
// tasks complete. The `CancellableTask` will wait for the task's join handle, which
// implicitly waits for the gate to close.
let task_cancel = cancel.clone();
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"grpc listener",
"grpc pageservice listener",
async move {
let result = server
server
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
.await;
if result.is_ok() {
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
}
result
.await?;
// Server exited cleanly. All requests should have completed by now. Wait for any
// spawned tasks to complete as well (e.g. IoConcurrency sidecars) via the gate.
gate.close().await;
anyhow::Ok(())
},
));
@@ -3408,8 +3423,6 @@ impl GrpcPageServiceHandler {
/// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
/// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
/// split them up in the client or server.
///
/// TODO: verify that the given keys belong to this shard.
#[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))]
async fn get_page(
ctx: &RequestContext,
@@ -3512,7 +3525,10 @@ impl GrpcPageServiceHandler {
/// Implements the gRPC page service.
///
/// TODO: cancellation.
/// Tonic will drop the request handler futures if the client goes away (e.g. due to a timeout or
/// cancellation), so the read path must be cancellation-safe. On shutdown, Tonic will wait for
/// in-flight requests to complete.
///
/// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code.
#[tonic::async_trait]
impl proto::PageService for GrpcPageServiceHandler {
@@ -3597,8 +3613,14 @@ impl proto::PageService for GrpcPageServiceHandler {
// Spawn a task to run the basebackup.
let span = Span::current();
let gate_guard = self
.gate_guard
.try_clone()
.map_err(|_| tonic::Status::unavailable("shutting down"))?;
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
let jh = tokio::spawn(async move {
let _gate_guard = gate_guard; // keep gate open until task completes
let gzip_level = match req.compression {
page_api::BaseBackupCompression::None => None,
// NB: using fast compression because it's on the critical path for compute
@@ -3722,13 +3744,14 @@ impl proto::PageService for GrpcPageServiceHandler {
.await?;
// Spawn an IoConcurrency sidecar, if enabled.
let Ok(gate_guard) = self.gate_guard.try_clone() else {
return Err(tonic::Status::unavailable("shutting down"));
};
let gate_guard = self
.gate_guard
.try_clone()
.map_err(|_| tonic::Status::unavailable("shutting down"))?;
let io_concurrency =
IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard);
// Spawn a task to handle the GetPageRequest stream.
// Construct the GetPageRequest stream handler.
let span = Span::current();
let ctx = self.ctx.attached_child();
let cancel = self.cancel.clone();
@@ -3739,18 +3762,18 @@ impl proto::PageService for GrpcPageServiceHandler {
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
.await?
.downgrade();
loop {
// NB: Tonic considers the entire stream to be an in-flight request and will wait
// for it to complete before shutting down. React to cancellation between requests.
let req = tokio::select! {
req = reqs.message() => req,
_ = cancel.cancelled() => {
tracing::info!("closing getpages stream due to shutdown");
break;
result = reqs.message() => match result {
Ok(Some(req)) => Ok(req),
Ok(None) => break, // client closed the stream
Err(err) => Err(err),
},
};
let Some(req) = req? else { break };
_ = cancel.cancelled() => Err(tonic::Status::unavailable("shutting down")),
}?;
let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default();
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
.instrument(span.clone()) // propagate request span
.await;

View File

@@ -286,6 +286,10 @@ impl Timeline {
/// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
///
/// The ordering of the returned vec corresponds to the ordering of `pages`.
///
/// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
/// if the client goes away (e.g. due to timeout or cancellation).
/// TODO: verify that it actually is cancellation-safe.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,

View File

@@ -1324,6 +1324,9 @@ impl Timeline {
///
/// This naive implementation will be replaced with a more efficient one
/// which actually vectorizes the read path.
///
/// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future
/// if the client goes away (e.g. due to timeout or cancellation).
pub(crate) async fn get_vectored(
&self,
query: VersionedKeySpaceQuery,