From c511786548c8f09048b09a33b0e560fe2e518a5f Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 6 Jun 2025 12:01:58 +0200 Subject: [PATCH] pageserver: move `spawn_grpc` to `GrpcPageServiceHandler::spawn` (#12147) Mechanical move, no logic changes. --- pageserver/src/bin/pageserver.rs | 3 +- pageserver/src/page_service.rs | 188 ++++++++++++++++--------------- 2 files changed, 97 insertions(+), 94 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index a1a95ad2d1..5cd865f53e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -23,6 +23,7 @@ use pageserver::deletion_queue::DeletionQueue; use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task}; use pageserver::feature_resolver::FeatureResolver; use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; +use pageserver::page_service::GrpcPageServiceHandler; use pageserver::task_mgr::{ BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME, }; @@ -814,7 +815,7 @@ fn start_pageserver( // necessary? let mut page_service_grpc = None; if let Some(grpc_listener) = grpc_listener { - page_service_grpc = Some(page_service::spawn_grpc( + page_service_grpc = Some(GrpcPageServiceHandler::spawn( tenant_manager.clone(), grpc_auth, otel_guard.as_ref().map(|g| g.dispatch.clone()), diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 4a1ddf09b5..d47f6bd095 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -169,99 +169,6 @@ pub fn spawn( Listener { cancel, task } } -/// Spawns a gRPC server for the page service. -/// -/// TODO: move this onto GrpcPageServiceHandler::spawn(). -/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we -/// need to reimplement the TCP+TLS accept loop ourselves. -pub fn spawn_grpc( - tenant_manager: Arc, - auth: Option>, - perf_trace_dispatch: Option, - get_vectored_concurrent_io: GetVectoredConcurrentIo, - listener: std::net::TcpListener, -) -> anyhow::Result { - let cancel = CancellationToken::new(); - let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler) - .download_behavior(DownloadBehavior::Download) - .perf_span_dispatch(perf_trace_dispatch) - .detached_child(); - let gate = Gate::default(); - - // Set up the TCP socket. We take a preconfigured TcpListener to bind the - // port early during startup. - let incoming = { - let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std - listener.set_nonblocking(true)?; - tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?) - .with_nodelay(Some(GRPC_TCP_NODELAY)) - .with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME)) - }; - - // Set up the gRPC server. - // - // TODO: consider tuning window sizes. - let mut server = tonic::transport::Server::builder() - .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL)) - .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT)) - .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS)); - - // Main page service stack. Uses a mix of Tonic interceptors and Tower layers: - // - // * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service. - // - // * Layers: allow async code, can run code after the service response. However, only has access - // to the raw HTTP request/response, not the gRPC types. - let page_service_handler = GrpcPageServiceHandler { - tenant_manager, - ctx, - gate_guard: gate.enter().expect("gate was just created"), - get_vectored_concurrent_io, - }; - - let observability_layer = ObservabilityLayer; - let mut tenant_interceptor = TenantMetadataInterceptor; - let mut auth_interceptor = TenantAuthInterceptor::new(auth); - - let page_service = tower::ServiceBuilder::new() - // Create tracing span and record request start time. - .layer(observability_layer) - // Intercept gRPC requests. - .layer(tonic::service::InterceptorLayer::new(move |mut req| { - // Extract tenant metadata. - req = tenant_interceptor.call(req)?; - // Authenticate tenant JWT token. - req = auth_interceptor.call(req)?; - Ok(req) - })) - .service(proto::PageServiceServer::new(page_service_handler)); - let server = server.add_service(page_service); - - // Reflection service for use with e.g. grpcurl. - let reflection_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET) - .build_v1()?; - let server = server.add_service(reflection_service); - - // Spawn server task. - let task_cancel = cancel.clone(); - let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( - "grpc listener", - async move { - let result = server - .serve_with_incoming_shutdown(incoming, task_cancel.cancelled()) - .await; - if result.is_ok() { - // TODO: revisit shutdown logic once page service is implemented. - gate.close().await; - } - result - }, - )); - - Ok(CancellableTask { task, cancel }) -} - impl Listener { pub async fn stop_accepting(self) -> Connections { self.cancel.cancel(); @@ -3366,6 +3273,101 @@ pub struct GrpcPageServiceHandler { } impl GrpcPageServiceHandler { + /// Spawns a gRPC server for the page service. + /// + /// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we + /// need to reimplement the TCP+TLS accept loop ourselves. + pub fn spawn( + tenant_manager: Arc, + auth: Option>, + perf_trace_dispatch: Option, + get_vectored_concurrent_io: GetVectoredConcurrentIo, + listener: std::net::TcpListener, + ) -> anyhow::Result { + let cancel = CancellationToken::new(); + let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler) + .download_behavior(DownloadBehavior::Download) + .perf_span_dispatch(perf_trace_dispatch) + .detached_child(); + let gate = Gate::default(); + + // Set up the TCP socket. We take a preconfigured TcpListener to bind the + // port early during startup. + let incoming = { + let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std + listener.set_nonblocking(true)?; + tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std( + listener, + )?) + .with_nodelay(Some(GRPC_TCP_NODELAY)) + .with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME)) + }; + + // Set up the gRPC server. + // + // TODO: consider tuning window sizes. + let mut server = tonic::transport::Server::builder() + .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL)) + .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT)) + .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS)); + + // Main page service stack. Uses a mix of Tonic interceptors and Tower layers: + // + // * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service. + // + // * Layers: allow async code, can run code after the service response. However, only has access + // to the raw HTTP request/response, not the gRPC types. + let page_service_handler = GrpcPageServiceHandler { + tenant_manager, + ctx, + gate_guard: gate.enter().expect("gate was just created"), + get_vectored_concurrent_io, + }; + + let observability_layer = ObservabilityLayer; + let mut tenant_interceptor = TenantMetadataInterceptor; + let mut auth_interceptor = TenantAuthInterceptor::new(auth); + + let page_service = tower::ServiceBuilder::new() + // Create tracing span and record request start time. + .layer(observability_layer) + // Intercept gRPC requests. + .layer(tonic::service::InterceptorLayer::new(move |mut req| { + // Extract tenant metadata. + req = tenant_interceptor.call(req)?; + // Authenticate tenant JWT token. + req = auth_interceptor.call(req)?; + Ok(req) + })) + // Run the page service. + .service(proto::PageServiceServer::new(page_service_handler)); + let server = server.add_service(page_service); + + // Reflection service for use with e.g. grpcurl. + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET) + .build_v1()?; + let server = server.add_service(reflection_service); + + // Spawn server task. + let task_cancel = cancel.clone(); + let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( + "grpc listener", + async move { + let result = server + .serve_with_incoming_shutdown(incoming, task_cancel.cancelled()) + .await; + if result.is_ok() { + // TODO: revisit shutdown logic once page service is implemented. + gate.close().await; + } + result + }, + )); + + Ok(CancellableTask { task, cancel }) + } + /// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of /// relations and their sizes, as well as SLRU segments and similar data. #[allow(clippy::result_large_err)]