From a3d1cf636b8c6e6f50d3aaf35059529093dc3772 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 22 Nov 2024 08:36:17 +0100 Subject: [PATCH] config changes to express pipelining config (not respected yet) --- libs/pageserver_api/src/config.rs | 14 ++++++++------ pageserver/src/config.rs | 6 +++--- pageserver/src/page_service.rs | 16 ++++++++++------ 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index ee20613d6d..29b59b8664 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -109,8 +109,7 @@ pub struct ConfigToml { pub virtual_file_io_mode: Option, #[serde(skip_serializing_if = "Option::is_none")] pub no_sync: Option, - #[serde(with = "humantime_serde")] - pub server_side_batch_timeout: Option, + pub page_service_pipelining: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -127,6 +126,12 @@ pub struct DiskUsageEvictionTaskConfig { pub eviction_order: EvictionOrder, } +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct PageServicePipeliningConfig { + pub max_batch_size: usize, +} + pub mod statvfs { pub mod mock { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -319,8 +324,6 @@ pub mod defaults { pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512; - - pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None; } impl Default for ConfigToml { @@ -401,10 +404,9 @@ impl Default for ConfigToml { ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), l0_flush: None, virtual_file_io_mode: None, - server_side_batch_timeout: DEFAULT_SERVER_SIDE_BATCH_TIMEOUT - .map(|duration| humantime::parse_duration(duration).unwrap()), tenant_config: TenantConfigToml::default(), no_sync: None, + page_service_pipelining: None, } } } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f7be6ecaab..599fd32ce0 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -185,7 +185,7 @@ pub struct PageServerConf { /// Maximum amount of time for which a get page request request /// might be held up for request merging. - pub server_side_batch_timeout: Option, + pub page_service_pipelining: Option, } /// Token for authentication to safekeepers @@ -340,9 +340,9 @@ impl PageServerConf { concurrent_tenant_warmup, concurrent_tenant_size_logical_size_queries, virtual_file_io_engine, - server_side_batch_timeout, tenant_config, no_sync, + page_service_pipelining, } = config_toml; let mut conf = PageServerConf { @@ -382,7 +382,7 @@ impl PageServerConf { image_compression, timeline_offloading, ephemeral_bytes_per_memory_kb, - server_side_batch_timeout, + page_service_pipelining, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ef7d9de752..7dfd3888a1 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,6 +7,7 @@ use bytes::Buf; use futures::FutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; +use pageserver_api::config::PageServicePipeliningConfig; use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -107,7 +108,7 @@ pub fn spawn( pg_auth, tcp_listener, conf.pg_auth_type, - conf.server_side_batch_timeout, + conf.page_service_pipelining.clone(), libpq_ctx, cancel.clone(), ) @@ -156,7 +157,7 @@ pub async fn libpq_listener_main( auth: Option>, listener: tokio::net::TcpListener, auth_type: AuthType, - server_side_batch_timeout: Option, + pipelining_config: Option, listener_ctx: RequestContext, listener_cancel: CancellationToken, ) -> Connections { @@ -187,7 +188,7 @@ pub async fn libpq_listener_main( local_auth, socket, auth_type, - server_side_batch_timeout, + pipelining_config.clone(), connection_ctx, connections_cancel.child_token(), )); @@ -215,7 +216,7 @@ async fn page_service_conn_main( auth: Option>, socket: tokio::net::TcpStream, auth_type: AuthType, - server_side_batch_timeout: Option, + pipelining_config: Option, connection_ctx: RequestContext, cancel: CancellationToken, ) -> ConnectionHandlerResult { @@ -269,7 +270,7 @@ async fn page_service_conn_main( let mut conn_handler = PageServerHandler::new( tenant_manager, auth, - server_side_batch_timeout, + pipelining_config, connection_ctx, cancel.clone(), ); @@ -316,6 +317,8 @@ struct PageServerHandler { /// None only while pagestream protocol is being processed. timeline_handles: Option, + + pipelining_config: Option, } struct TimelineHandles { @@ -566,7 +569,7 @@ impl PageServerHandler { pub fn new( tenant_manager: Arc, auth: Option>, - server_side_batch_timeout: Option, + pipelining_config: Option, connection_ctx: RequestContext, cancel: CancellationToken, ) -> Self { @@ -576,6 +579,7 @@ impl PageServerHandler { connection_ctx, timeline_handles: Some(TimelineHandles::new(tenant_manager)), cancel, + pipelining_config, } }