diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1fc7e4eac7..2b266c6811 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -3218,13 +3218,25 @@ where pub struct GrpcPageServiceHandler { tenant_manager: Arc, ctx: RequestContext, + + /// Cancelled to shut down the server. Tonic will shut down in response to this, but wait for + /// in-flight requests to complete. Any tasks we spawn ourselves must respect this token. + cancel: CancellationToken, + + /// Any tasks we spawn ourselves should clone this gate guard, so that we can wait for them to + /// complete during shutdown. Request handlers implicitly hold this guard already. gate_guard: GateGuard, + + /// `get_vectored` concurrency setting. get_vectored_concurrent_io: GetVectoredConcurrentIo, } impl GrpcPageServiceHandler { /// Spawns a gRPC server for the page service. /// + /// Returns a `CancellableTask` handle that can be used to shut down the server. It waits for + /// any in-flight requests and tasks to complete first. + /// /// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we /// need to reimplement the TCP+TLS accept loop ourselves. pub fn spawn( @@ -3234,12 +3246,15 @@ impl GrpcPageServiceHandler { get_vectored_concurrent_io: GetVectoredConcurrentIo, listener: std::net::TcpListener, ) -> anyhow::Result { + // Set up a cancellation token for shutting down the server, and a gate to wait for all + // requests and spawned tasks to complete. let cancel = CancellationToken::new(); + let gate = Gate::default(); + let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler) .download_behavior(DownloadBehavior::Download) .perf_span_dispatch(perf_trace_dispatch) .detached_child(); - let gate = Gate::default(); // Set up the TCP socket. We take a preconfigured TcpListener to bind the // port early during startup. @@ -3270,6 +3285,7 @@ impl GrpcPageServiceHandler { let page_service_handler = GrpcPageServiceHandler { tenant_manager, ctx, + cancel: cancel.clone(), gate_guard: gate.enter().expect("gate was just created"), get_vectored_concurrent_io, }; @@ -3306,19 +3322,20 @@ impl GrpcPageServiceHandler { .build_v1()?; let server = server.add_service(reflection_service); - // Spawn server task. + // Spawn server task. It runs until the cancellation token fires and in-flight requests and + // tasks complete. The `CancellableTask` will wait for the task's join handle, which + // implicitly waits for the gate to close. let task_cancel = cancel.clone(); let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( - "grpc listener", + "grpc pageservice listener", async move { - let result = server + server .serve_with_incoming_shutdown(incoming, task_cancel.cancelled()) - .await; - if result.is_ok() { - // TODO: revisit shutdown logic once page service is implemented. - gate.close().await; - } - result + .await?; + // Server exited cleanly. All requests should have completed by now. Wait for any + // spawned tasks to complete as well (e.g. IoConcurrency sidecars) via the gate. + gate.close().await; + anyhow::Ok(()) }, )); @@ -3508,7 +3525,10 @@ impl GrpcPageServiceHandler { /// Implements the gRPC page service. /// -/// TODO: cancellation. +/// On client disconnect (e.g. timeout or client shutdown), Tonic will drop the request handler +/// futures, so the read path must be cancellation-safe. On server shutdown, Tonic will wait for +/// in-flight requests to complete. +/// /// TODO: when the libpq impl is removed, remove the Pagestream types and inline the handler code. #[tonic::async_trait] impl proto::PageService for GrpcPageServiceHandler { @@ -3593,8 +3613,14 @@ impl proto::PageService for GrpcPageServiceHandler { // Spawn a task to run the basebackup. let span = Span::current(); + let gate_guard = self + .gate_guard + .try_clone() + .map_err(|_| tonic::Status::unavailable("shutting down"))?; let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE); let jh = tokio::spawn(async move { + let _gate_guard = gate_guard; // keep gate open until task completes + let gzip_level = match req.compression { page_api::BaseBackupCompression::None => None, // NB: using fast compression because it's on the critical path for compute @@ -3718,15 +3744,17 @@ impl proto::PageService for GrpcPageServiceHandler { .await?; // Spawn an IoConcurrency sidecar, if enabled. - let Ok(gate_guard) = self.gate_guard.try_clone() else { - return Err(tonic::Status::unavailable("shutting down")); - }; + let gate_guard = self + .gate_guard + .try_clone() + .map_err(|_| tonic::Status::unavailable("shutting down"))?; let io_concurrency = IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard); - // Spawn a task to handle the GetPageRequest stream. + // Construct the GetPageRequest stream handler. let span = Span::current(); let ctx = self.ctx.attached_child(); + let cancel = self.cancel.clone(); let mut reqs = req.into_inner(); let resps = async_stream::try_stream! { @@ -3734,7 +3762,19 @@ impl proto::PageService for GrpcPageServiceHandler { .get(ttid.tenant_id, ttid.timeline_id, shard_selector) .await? .downgrade(); - while let Some(req) = reqs.message().await? { + loop { + // NB: Tonic considers the entire stream to be an in-flight request and will wait + // for it to complete before shutting down. React to cancellation between requests. + let req = tokio::select! { + biased; + _ = cancel.cancelled() => Err(tonic::Status::unavailable("shutting down")), + + result = reqs.message() => match result { + Ok(Some(req)) => Ok(req), + Ok(None) => break, // client closed the stream + Err(err) => Err(err), + }, + }?; let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default(); let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) .instrument(span.clone()) // propagate request span diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index cda08f2cc4..8b76d980fc 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -286,6 +286,10 @@ impl Timeline { /// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages. /// /// The ordering of the returned vec corresponds to the ordering of `pages`. + /// + /// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future + /// if the client goes away (e.g. due to timeout or cancellation). + /// TODO: verify that it actually is cancellation-safe. pub(crate) async fn get_rel_page_at_lsn_batched( &self, pages: impl ExactSizeIterator, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8f25555929..06e02a7386 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1324,6 +1324,9 @@ impl Timeline { /// /// This naive implementation will be replaced with a more efficient one /// which actually vectorizes the read path. + /// + /// NB: the read path must be cancellation-safe. The Tonic gRPC service will drop the future + /// if the client goes away (e.g. due to timeout or cancellation). pub(crate) async fn get_vectored( &self, query: VersionedKeySpaceQuery,