config changes to express pipelining config (not respected yet)

This commit is contained in:
Christian Schwarz
2024-11-22 08:36:17 +01:00
parent 89d9d16130
commit a3d1cf636b
3 changed files with 21 additions and 15 deletions

View File

@@ -109,8 +109,7 @@ pub struct ConfigToml {
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
#[serde(skip_serializing_if = "Option::is_none")]
pub no_sync: Option<bool>,
#[serde(with = "humantime_serde")]
pub server_side_batch_timeout: Option<Duration>,
pub page_service_pipelining: Option<PageServicePipeliningConfig>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -127,6 +126,12 @@ pub struct DiskUsageEvictionTaskConfig {
pub eviction_order: EvictionOrder,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PageServicePipeliningConfig {
pub max_batch_size: usize,
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -319,8 +324,6 @@ pub mod defaults {
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None;
}
impl Default for ConfigToml {
@@ -401,10 +404,9 @@ impl Default for ConfigToml {
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_io_mode: None,
server_side_batch_timeout: DEFAULT_SERVER_SIDE_BATCH_TIMEOUT
.map(|duration| humantime::parse_duration(duration).unwrap()),
tenant_config: TenantConfigToml::default(),
no_sync: None,
page_service_pipelining: None,
}
}
}

View File

@@ -185,7 +185,7 @@ pub struct PageServerConf {
/// Maximum amount of time for which a get page request request
/// might be held up for request merging.
pub server_side_batch_timeout: Option<Duration>,
pub page_service_pipelining: Option<pageserver_api::config::PageServicePipeliningConfig>,
}
/// Token for authentication to safekeepers
@@ -340,9 +340,9 @@ impl PageServerConf {
concurrent_tenant_warmup,
concurrent_tenant_size_logical_size_queries,
virtual_file_io_engine,
server_side_batch_timeout,
tenant_config,
no_sync,
page_service_pipelining,
} = config_toml;
let mut conf = PageServerConf {
@@ -382,7 +382,7 @@ impl PageServerConf {
image_compression,
timeline_offloading,
ephemeral_bytes_per_memory_kb,
server_side_batch_timeout,
page_service_pipelining,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -7,6 +7,7 @@ use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::config::PageServicePipeliningConfig;
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -107,7 +108,7 @@ pub fn spawn(
pg_auth,
tcp_listener,
conf.pg_auth_type,
conf.server_side_batch_timeout,
conf.page_service_pipelining.clone(),
libpq_ctx,
cancel.clone(),
)
@@ -156,7 +157,7 @@ pub async fn libpq_listener_main(
auth: Option<Arc<SwappableJwtAuth>>,
listener: tokio::net::TcpListener,
auth_type: AuthType,
server_side_batch_timeout: Option<Duration>,
pipelining_config: Option<PageServicePipeliningConfig>,
listener_ctx: RequestContext,
listener_cancel: CancellationToken,
) -> Connections {
@@ -187,7 +188,7 @@ pub async fn libpq_listener_main(
local_auth,
socket,
auth_type,
server_side_batch_timeout,
pipelining_config.clone(),
connection_ctx,
connections_cancel.child_token(),
));
@@ -215,7 +216,7 @@ async fn page_service_conn_main(
auth: Option<Arc<SwappableJwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
server_side_batch_timeout: Option<Duration>,
pipelining_config: Option<PageServicePipeliningConfig>,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> ConnectionHandlerResult {
@@ -269,7 +270,7 @@ async fn page_service_conn_main(
let mut conn_handler = PageServerHandler::new(
tenant_manager,
auth,
server_side_batch_timeout,
pipelining_config,
connection_ctx,
cancel.clone(),
);
@@ -316,6 +317,8 @@ struct PageServerHandler {
/// None only while pagestream protocol is being processed.
timeline_handles: Option<TimelineHandles>,
pipelining_config: Option<PageServicePipeliningConfig>,
}
struct TimelineHandles {
@@ -566,7 +569,7 @@ impl PageServerHandler {
pub fn new(
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
server_side_batch_timeout: Option<Duration>,
pipelining_config: Option<PageServicePipeliningConfig>,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> Self {
@@ -576,6 +579,7 @@ impl PageServerHandler {
connection_ctx,
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
}
}