mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
pageserver: support get_vectored_concurrent_io with gRPC (#12131)
## Problem The gRPC page service doesn't respect `get_vectored_concurrent_io` and always uses sequential IO. ## Summary of changes Spawn a sidecar task for concurrent IO when enabled. Cancellation will be addressed separately.
This commit is contained in:
@@ -819,6 +819,7 @@ fn start_pageserver(
|
||||
tenant_manager.clone(),
|
||||
grpc_auth,
|
||||
otel_guard.as_ref().map(|g| g.dispatch.clone()),
|
||||
conf.get_vectored_concurrent_io,
|
||||
grpc_listener,
|
||||
)?);
|
||||
}
|
||||
|
||||
@@ -178,6 +178,7 @@ pub fn spawn_grpc(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
listener: std::net::TcpListener,
|
||||
) -> anyhow::Result<CancellableTask> {
|
||||
let cancel = CancellationToken::new();
|
||||
@@ -214,6 +215,8 @@ pub fn spawn_grpc(
|
||||
let page_service_handler = GrpcPageServiceHandler {
|
||||
tenant_manager,
|
||||
ctx,
|
||||
gate_guard: gate.enter().expect("gate was just created"),
|
||||
get_vectored_concurrent_io,
|
||||
};
|
||||
|
||||
let observability_layer = ObservabilityLayer;
|
||||
@@ -497,10 +500,6 @@ async fn page_service_conn_main(
|
||||
}
|
||||
|
||||
/// Page service connection handler.
|
||||
///
|
||||
/// TODO: for gRPC, this will be shared by all requests from all connections.
|
||||
/// Decompose it into global state and per-connection/request state, and make
|
||||
/// libpq-specific options (e.g. pipelining) separate.
|
||||
struct PageServerHandler {
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
@@ -3362,6 +3361,8 @@ where
|
||||
pub struct GrpcPageServiceHandler {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
ctx: RequestContext,
|
||||
gate_guard: GateGuard,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
}
|
||||
|
||||
impl GrpcPageServiceHandler {
|
||||
@@ -3721,6 +3722,14 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
|
||||
.await?;
|
||||
|
||||
// Spawn an IoConcurrency sidecar, if enabled.
|
||||
let Ok(gate_guard) = self.gate_guard.try_clone() else {
|
||||
return Err(tonic::Status::unavailable("shutting down"));
|
||||
};
|
||||
let io_concurrency =
|
||||
IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard);
|
||||
|
||||
// Spawn a task to handle the GetPageRequest stream.
|
||||
let span = Span::current();
|
||||
let ctx = self.ctx.attached_child();
|
||||
let mut reqs = req.into_inner();
|
||||
@@ -3731,8 +3740,7 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
.await?
|
||||
.downgrade();
|
||||
while let Some(req) = reqs.message().await? {
|
||||
// TODO: implement IoConcurrency sidecar.
|
||||
yield Self::get_page(&ctx, &timeline, req, IoConcurrency::Sequential)
|
||||
yield Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
|
||||
.instrument(span.clone()) // propagate request span
|
||||
.await?
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user