pageserver: don't pass config to PageHandler (#11973)

## Problem

The gRPC page service API will require decoupling the `PageHandler` from
the libpq protocol implementation. As preparation for this, avoid
passing in the entire server config to `PageHandler`, and instead
explicitly pass in the relevant fields.

Touches https://github.com/neondatabase/neon/issues/11728.

## Summary of changes

* Change `PageHandler` to take a `GetVectoredConcurrentIo` instead of
the entire config.
* Change `IoConcurrency::spawn_from_conf` to take a
`GetVectoredConcurrentIo`.
This commit is contained in:
Erik Grinaker
2025-05-19 17:47:40 +02:00
committed by GitHub
parent 38dbc5f67f
commit f4150614d0
9 changed files with 22 additions and 21 deletions

View File

@@ -235,7 +235,7 @@ pub enum PageServiceProtocolPipelinedBatchingStrategy {
ScatteredLsn,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
pub enum GetVectoredConcurrentIo {
/// The read path is fully sequential: layers are visited

View File

@@ -144,7 +144,7 @@ where
replica,
ctx,
io_concurrency: IoConcurrency::spawn_from_conf(
timeline.conf,
timeline.conf.get_vectored_concurrent_io,
timeline
.gate
.enter()

View File

@@ -3199,7 +3199,7 @@ async fn list_aux_files(
.await?;
let io_concurrency = IoConcurrency::spawn_from_conf(
state.conf,
state.conf.get_vectored_concurrent_io,
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
);

View File

@@ -18,7 +18,7 @@ use itertools::Itertools;
use jsonwebtoken::TokenData;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::key::rel_block_to_key;
@@ -331,10 +331,10 @@ async fn page_service_conn_main(
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(
conf,
tenant_manager,
auth,
pipelining_config,
conf.get_vectored_concurrent_io,
perf_span_fields,
connection_ctx,
cancel.clone(),
@@ -371,7 +371,6 @@ async fn page_service_conn_main(
}
struct PageServerHandler {
conf: &'static PageServerConf,
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
@@ -389,6 +388,7 @@ struct PageServerHandler {
timeline_handles: Option<TimelineHandles>,
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
gate_guard: GateGuard,
}
@@ -844,17 +844,16 @@ impl BatchedFeMessage {
impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
perf_span_fields: ConnectionPerfSpanFields,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
) -> Self {
PageServerHandler {
conf,
auth,
claims: None,
connection_ctx,
@@ -862,6 +861,7 @@ impl PageServerHandler {
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
get_vectored_concurrent_io,
gate_guard,
}
}
@@ -1623,7 +1623,7 @@ impl PageServerHandler {
}
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.get_vectored_concurrent_io,
match self.gate_guard.try_clone() {
Ok(guard) => guard,
Err(_) => {

View File

@@ -586,7 +586,7 @@ impl Timeline {
// scan directory listing (new), merge with the old results
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -645,7 +645,7 @@ impl Timeline {
);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -885,7 +885,7 @@ impl Timeline {
);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,

View File

@@ -8596,8 +8596,10 @@ mod tests {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let io_concurrency =
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
let io_concurrency = IoConcurrency::spawn_from_conf(
tline.conf.get_vectored_concurrent_io,
tline.gate.enter().unwrap(),
);
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
let mut res = tline

View File

@@ -31,6 +31,7 @@ pub use inmemory_layer::InMemoryLayer;
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
use pageserver_api::config::GetVectoredConcurrentIo;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
@@ -43,7 +44,6 @@ use self::inmemory_layer::InMemoryLayerFileId;
use super::PageReconstructError;
use super::layer_map::InMemoryLayerDesc;
use super::timeline::{GetVectoredError, ReadPath};
use crate::config::PageServerConf;
use crate::context::{
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
@@ -318,11 +318,10 @@ impl IoConcurrency {
}
pub(crate) fn spawn_from_conf(
conf: &'static PageServerConf,
conf: GetVectoredConcurrentIo,
gate_guard: GateGuard,
) -> IoConcurrency {
use pageserver_api::config::GetVectoredConcurrentIo;
let selected = match conf.get_vectored_concurrent_io {
let selected = match conf {
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
};

View File

@@ -3530,7 +3530,7 @@ impl Timeline {
};
let io_concurrency = IoConcurrency::spawn_from_conf(
self_ref.conf,
self_ref.conf.get_vectored_concurrent_io,
self_ref
.gate
.enter()
@@ -5559,7 +5559,7 @@ impl Timeline {
});
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| CreateImageLayersError::Cancelled)?,

View File

@@ -188,7 +188,7 @@ pub(crate) async fn generate_tombstone_image_layer(
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
);
let io_concurrency = IoConcurrency::spawn_from_conf(
detached.conf,
detached.conf.get_vectored_concurrent_io,
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
);
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);