the None configuration in the benchmark would use the default instead

of the serial configuration; fix that
This commit is contained in:
Christian Schwarz
2024-11-29 13:35:24 +01:00
parent 27c72e4ff3
commit dfcbb139fb
4 changed files with 67 additions and 51 deletions

View File

@@ -109,7 +109,7 @@ pub struct ConfigToml {
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
#[serde(skip_serializing_if = "Option::is_none")]
pub no_sync: Option<bool>,
pub page_service_pipelining: Option<PageServicePipeliningConfig>,
pub page_service_pipelining: PageServicePipeliningConfig,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -127,16 +127,23 @@ pub struct DiskUsageEvictionTaskConfig {
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub struct PageServicePipeliningConfig {
pub enum PageServicePipeliningConfig {
Serial,
Pipelined(PageServicePipeliningConfigPipelined),
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PageServicePipeliningConfigPipelined {
/// Causes runtime errors if larger than max get_vectored batch size.
pub max_batch_size: NonZeroUsize,
pub protocol_pipelining_mode: PageServiceProtocolPipeliningMode,
pub execution: PageServiceProtocolPipelinedExecutionStrategy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum PageServiceProtocolPipeliningMode {
pub enum PageServiceProtocolPipelinedExecutionStrategy {
ConcurrentFutures,
Tasks,
}
@@ -415,10 +422,12 @@ impl Default for ConfigToml {
virtual_file_io_mode: None,
tenant_config: TenantConfigToml::default(),
no_sync: None,
page_service_pipelining: Some(PageServicePipeliningConfig {
max_batch_size: NonZeroUsize::new(32).unwrap(),
protocol_pipelining_mode: PageServiceProtocolPipeliningMode::ConcurrentFutures,
}),
page_service_pipelining: PageServicePipeliningConfig::Pipelined(
PageServicePipeliningConfigPipelined {
max_batch_size: NonZeroUsize::new(32).unwrap(),
execution: PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures,
},
),
}
}
}

View File

@@ -183,7 +183,7 @@ pub struct PageServerConf {
/// Optionally disable disk syncs (unsafe!)
pub no_sync: bool,
pub page_service_pipelining: Option<pageserver_api::config::PageServicePipeliningConfig>,
pub page_service_pipelining: pageserver_api::config::PageServicePipeliningConfig,
}
/// Token for authentication to safekeepers

View File

@@ -7,7 +7,10 @@ use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::config::{PageServicePipeliningConfig, PageServiceProtocolPipeliningMode};
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -159,7 +162,7 @@ pub async fn libpq_listener_main(
auth: Option<Arc<SwappableJwtAuth>>,
listener: tokio::net::TcpListener,
auth_type: AuthType,
pipelining_config: Option<PageServicePipeliningConfig>,
pipelining_config: PageServicePipeliningConfig,
listener_ctx: RequestContext,
listener_cancel: CancellationToken,
) -> Connections {
@@ -218,7 +221,7 @@ async fn page_service_conn_main(
auth: Option<Arc<SwappableJwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
pipelining_config: Option<PageServicePipeliningConfig>,
pipelining_config: PageServicePipeliningConfig,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> ConnectionHandlerResult {
@@ -320,7 +323,7 @@ struct PageServerHandler {
/// None only while pagestream protocol is being processed.
timeline_handles: Option<TimelineHandles>,
pipelining_config: Option<PageServicePipeliningConfig>,
pipelining_config: PageServicePipeliningConfig,
}
struct TimelineHandles {
@@ -571,7 +574,7 @@ impl PageServerHandler {
pub fn new(
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: Option<PageServicePipeliningConfig>,
pipelining_config: PageServicePipeliningConfig,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> Self {
@@ -1003,7 +1006,7 @@ impl PageServerHandler {
let request_span = info_span!("request", shard_id = tracing::field::Empty);
let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
Some(pipelining_config) => {
PageServicePipeliningConfig::Pipelined(pipelining_config) => {
self.handle_pagerequests_pipelined(
pgb,
pgb_reader,
@@ -1016,7 +1019,7 @@ impl PageServerHandler {
)
.await
}
None => {
PageServicePipeliningConfig::Serial => {
self.handle_pagerequests_serial(
pgb,
pgb_reader,
@@ -1104,7 +1107,7 @@ impl PageServerHandler {
timeline_id: TimelineId,
mut timeline_handles: TimelineHandles,
request_span: Span,
pipelining_config: PageServicePipeliningConfig,
pipelining_config: PageServicePipeliningConfigPipelined,
ctx: &RequestContext,
) -> (
(PostgresBackendReader<IO>, TimelineHandles),
@@ -1162,9 +1165,9 @@ impl PageServerHandler {
// the batch that was in flight when the Batcher encountered an error,
// thereby beahving identical to a serial implementation.
let PageServicePipeliningConfig {
let PageServicePipeliningConfigPipelined {
max_batch_size,
protocol_pipelining_mode,
execution,
} = pipelining_config;
// Macro to _define_ a pipeline stage.
@@ -1285,11 +1288,11 @@ impl PageServerHandler {
// Execute the stages.
//
match protocol_pipelining_mode {
PageServiceProtocolPipeliningMode::ConcurrentFutures => {
match execution {
PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
tokio::join!(read_messages, executor)
}
PageServiceProtocolPipeliningMode::Tasks => {
PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
// These tasks are not tracked anywhere.
let read_messages_task = tokio::spawn(read_messages);
let (read_messages_task_res, executor_res_) =

View File

@@ -3,7 +3,7 @@ import json
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional, Union
from typing import Any, Union
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
@@ -16,21 +16,31 @@ TARGET_RUNTIME = 30
@dataclass
class PageServicePipeliningConfig:
pass
@dataclass
class PageServicePipeliningConfigSerial(PageServicePipeliningConfig):
mode: str = "serial"
@dataclass
class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
max_batch_size: int
protocol_pipelining_mode: str
execution: str
mode: str = "pipelined"
PROTOCOL_PIPELINING_MODES = ["concurrent-futures", "tasks"]
EXECUTION = ["concurrent-futures", "tasks"]
NON_BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode))
for execution in EXECUTION:
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None]
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode))
for execution in EXECUTION:
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
@pytest.mark.parametrize(
@@ -45,7 +55,7 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]:
TARGET_RUNTIME,
1,
128,
f"not batchable {dataclasses.asdict(config) if config else None}",
f"not batchable {dataclasses.asdict(config)}",
)
for config in NON_BATCHABLE
],
@@ -57,7 +67,7 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]:
TARGET_RUNTIME,
100,
128,
f"batchable {dataclasses.asdict(config) if config else None}",
f"batchable {dataclasses.asdict(config)}",
)
for config in BATCHABLE
],
@@ -67,7 +77,7 @@ def test_throughput(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
tablesize_mib: int,
pipelining_config: None | PageServicePipeliningConfig,
pipelining_config: PageServicePipeliningConfig,
target_runtime: int,
effective_io_concurrency: int,
readhead_buffer_size: int,
@@ -99,20 +109,18 @@ def test_throughput(
params.update(
{
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
"pipelining_enabled": (1 if pipelining_config else 0, {}),
# target_runtime is just a polite ask to the workload to run for this long
"effective_io_concurrency": (effective_io_concurrency, {}),
"readhead_buffer_size": (readhead_buffer_size, {}),
# name is not a metric, we just use it to identify the test easily in the `test_...[...]`` notation
}
)
if pipelining_config:
params.update(
{
f"pipelining_config.{k}": (v, {})
for k, v in dataclasses.asdict(pipelining_config).items()
}
)
params.update(
{
f"pipelining_config.{k}": (v, {})
for k, v in dataclasses.asdict(pipelining_config).items()
}
)
log.info("params: %s", params)
@@ -224,8 +232,6 @@ def test_throughput(
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
if pipelining_config is not None
else {}
)
env.pageserver.restart()
metrics = workload()
@@ -255,23 +261,21 @@ def test_throughput(
)
PRECISION_CONFIGS: list[Optional[PageServicePipeliningConfig]] = [None]
PRECISION_CONFIGS: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
PRECISION_CONFIGS.append(
PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)
)
for execution in EXECUTION:
PRECISION_CONFIGS.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
@pytest.mark.parametrize(
"pipelining_config,name",
[(config, f"{dataclasses.asdict(config) if config else None}") for config in PRECISION_CONFIGS],
[(config, f"{dataclasses.asdict(config)}") for config in PRECISION_CONFIGS],
)
def test_latency(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
pipelining_config: Optional[PageServicePipeliningConfig],
pipelining_config: PageServicePipeliningConfig,
name: str,
):
"""