diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 337aa135dc..be7e634d4c 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -819,6 +819,7 @@ fn start_pageserver( tenant_manager.clone(), grpc_auth, otel_guard.as_ref().map(|g| g.dispatch.clone()), + conf.get_vectored_concurrent_io, grpc_listener, )?); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 77e5f0a92b..4a1ddf09b5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -178,6 +178,7 @@ pub fn spawn_grpc( tenant_manager: Arc, auth: Option>, perf_trace_dispatch: Option, + get_vectored_concurrent_io: GetVectoredConcurrentIo, listener: std::net::TcpListener, ) -> anyhow::Result { let cancel = CancellationToken::new(); @@ -214,6 +215,8 @@ pub fn spawn_grpc( let page_service_handler = GrpcPageServiceHandler { tenant_manager, ctx, + gate_guard: gate.enter().expect("gate was just created"), + get_vectored_concurrent_io, }; let observability_layer = ObservabilityLayer; @@ -497,10 +500,6 @@ async fn page_service_conn_main( } /// Page service connection handler. -/// -/// TODO: for gRPC, this will be shared by all requests from all connections. -/// Decompose it into global state and per-connection/request state, and make -/// libpq-specific options (e.g. pipelining) separate. struct PageServerHandler { auth: Option>, claims: Option, @@ -3362,6 +3361,8 @@ where pub struct GrpcPageServiceHandler { tenant_manager: Arc, ctx: RequestContext, + gate_guard: GateGuard, + get_vectored_concurrent_io: GetVectoredConcurrentIo, } impl GrpcPageServiceHandler { @@ -3721,6 +3722,14 @@ impl proto::PageService for GrpcPageServiceHandler { .get(ttid.tenant_id, ttid.timeline_id, shard_selector) .await?; + // Spawn an IoConcurrency sidecar, if enabled. + let Ok(gate_guard) = self.gate_guard.try_clone() else { + return Err(tonic::Status::unavailable("shutting down")); + }; + let io_concurrency = + IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard); + + // Spawn a task to handle the GetPageRequest stream. let span = Span::current(); let ctx = self.ctx.attached_child(); let mut reqs = req.into_inner(); @@ -3731,8 +3740,7 @@ impl proto::PageService for GrpcPageServiceHandler { .await? .downgrade(); while let Some(req) = reqs.message().await? { - // TODO: implement IoConcurrency sidecar. - yield Self::get_page(&ctx, &timeline, req, IoConcurrency::Sequential) + yield Self::get_page(&ctx, &timeline, req, io_concurrency.clone()) .instrument(span.clone()) // propagate request span .await? }