diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6c371cfef6..23146ac40e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3218,14 +3218,25 @@ where pub struct GrpcPageServiceHandler { tenant_manager: Arc, ctx: RequestContext, + + /// Cancelled to shut down the server. Tonic will shut down in response to this, but wait for + /// in-flight requests to complete. Any tasks we spawn ourselves must respect this token. cancel: CancellationToken, + + /// Any tasks we spawn ourselves should clone this gate guard, so that we can wait for them to + /// complete during shutdown. Request handlers implicitly hold this guard already. gate_guard: GateGuard, + + /// `get_vectored` concurrency setting. get_vectored_concurrent_io: GetVectoredConcurrentIo, } impl GrpcPageServiceHandler { /// Spawns a gRPC server for the page service. /// + /// Returns a `CancellableTask` handle that can be used to shut down the server. It waits for + /// any in-flight requests and tasks to complete first. + /// /// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we /// need to reimplement the TCP+TLS accept loop ourselves. pub fn spawn( @@ -3235,12 +3246,15 @@ impl GrpcPageServiceHandler { get_vectored_concurrent_io: GetVectoredConcurrentIo, listener: std::net::TcpListener, ) -> anyhow::Result { + // Set up a cancellation token for shutting down the server, and a gate to wait for all + // requests and spawned tasks to complete. let cancel = CancellationToken::new(); + let gate = Gate::default(); + let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler) .download_behavior(DownloadBehavior::Download) .perf_span_dispatch(perf_trace_dispatch) .detached_child(); - let gate = Gate::default(); // Set up the TCP socket. We take a preconfigured TcpListener to bind the // port early during startup. @@ -3308,19 +3322,20 @@ impl GrpcPageServiceHandler { .build_v1()?; let server = server.add_service(reflection_service); - // Spawn server task. + // Spawn server task. It runs until the cancellation token fires and in-flight requests and + // tasks complete. The `CancellableTask` will wait for the task's join handle, which + // implicitly waits for the gate to close. let task_cancel = cancel.clone(); let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( - "grpc listener", + "grpc pageservice listener", async move { - let result = server + server .serve_with_incoming_shutdown(incoming, task_cancel.cancelled()) - .await; - if result.is_ok() { - // TODO: revisit shutdown logic once page service is implemented. - gate.close().await; - } - result + .await?; + // Server exited cleanly. All requests should have completed by now. Wait for any + // spawned tasks to complete as well (e.g. IoConcurrency sidecars) via the gate. + gate.close().await; + anyhow::Ok(()) }, )); @@ -3408,8 +3423,6 @@ impl GrpcPageServiceHandler { /// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send /// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or /// split them up in the client or server. - /// - /// TODO: verify that the given keys belong to this shard. #[instrument(skip_all, fields(req_id, rel, blkno, blks, req_lsn, mod_lsn))] async fn get_page( ctx: &RequestContext, @@ -3512,7 +3525,10 @@ impl GrpcPageServiceHandler { /// Implements the gRPC page service. /// -/// TODO: cancellation. +/// Tonic will drop the request handler futures if the client goes away (e.g. due to a timeout or +/// cancellation), so the read path must be cancellation-safe. On shutdown, Tonic will wait for +/// in-flight requests to complete. +/// /// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code. #[tonic::async_trait] impl proto::PageService for GrpcPageServiceHandler { @@ -3597,8 +3613,14 @@ impl proto::PageService for GrpcPageServiceHandler { // Spawn a task to run the basebackup. let span = Span::current(); + let gate_guard = self + .gate_guard + .try_clone() + .map_err(|_| tonic::Status::unavailable("shutting down"))?; let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE); let jh = tokio::spawn(async move { + let _gate_guard = gate_guard; // keep gate open until task completes + let gzip_level = match req.compression { page_api::BaseBackupCompression::None => None, // NB: using fast compression because it's on the critical path for compute @@ -3722,13 +3744,14 @@ impl proto::PageService for GrpcPageServiceHandler { .await?; // Spawn an IoConcurrency sidecar, if enabled. - let Ok(gate_guard) = self.gate_guard.try_clone() else { - return Err(tonic::Status::unavailable("shutting down")); - }; + let gate_guard = self + .gate_guard + .try_clone() + .map_err(|_| tonic::Status::unavailable("shutting down"))?; let io_concurrency = IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard); - // Spawn a task to handle the GetPageRequest stream. + // Construct the GetPageRequest stream handler. let span = Span::current(); let ctx = self.ctx.attached_child(); let cancel = self.cancel.clone(); @@ -3739,18 +3762,18 @@ impl proto::PageService for GrpcPageServiceHandler { .get(ttid.tenant_id, ttid.timeline_id, shard_selector) .await? .downgrade(); - loop { + // NB: Tonic considers the entire stream to be an in-flight request and will wait + // for it to complete before shutting down. React to cancellation between requests. let req = tokio::select! { - req = reqs.message() => req, - _ = cancel.cancelled() => { - tracing::info!("closing getpages stream due to shutdown"); - break; + result = reqs.message() => match result { + Ok(Some(req)) => Ok(req), + Ok(None) => break, // client closed the stream + Err(err) => Err(err), }, - }; - let Some(req) = req? else { break }; + _ = cancel.cancelled() => Err(tonic::Status::unavailable("shutting down")), + }?; let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default(); - let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) .instrument(span.clone()) // propagate request span .await; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 08828ec4eb..17401dbae8 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -286,6 +286,10 @@ impl Timeline { /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages. /// /// The ordering of the returned vec corresponds to the ordering of `pages`. + /// + /// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future + /// if the client goes away (e.g. due to timeout or cancellation). + /// TODO: verify that it actually is cancellation-safe. pub(crate) async fn get_rel_page_at_lsn_batched( &self, pages: impl ExactSizeIterator, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8f25555929..06e02a7386 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1324,6 +1324,9 @@ impl Timeline { /// /// This naive implementation will be replaced with a more efficient one /// which actually vectorizes the read path. + /// + /// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future + /// if the client goes away (e.g. due to timeout or cancellation). pub(crate) async fn get_vectored( &self, query: VersionedKeySpaceQuery,