make configurable whether pipelining should use concurrent futures or tasks

This commit is contained in:
Christian Schwarz
2024-11-22 10:42:05 +01:00
parent 093674b2fb
commit c1e8347160
3 changed files with 126 additions and 44 deletions

View File

@@ -129,8 +129,16 @@ pub struct DiskUsageEvictionTaskConfig {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PageServicePipeliningConfig {
// Causes runtime errors if larger than max get_vectored batch size.
/// Causes runtime errors if larger than max get_vectored batch size.
pub max_batch_size: NonZeroUsize,
pub protocol_pipelining_mode: PageServiceProtocolPipeliningMode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum PageServiceProtocolPipeliningMode {
ConcurrentFutures,
Tasks,
}
pub mod statvfs {

View File

@@ -7,7 +7,7 @@ use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::config::PageServicePipeliningConfig;
use pageserver_api::config::{PageServicePipeliningConfig, PageServiceProtocolPipeliningMode};
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -757,6 +757,7 @@ impl PageServerHandler {
/// Post-condition: `maybe_carry` is Some()
#[instrument(skip_all, level = tracing::Level::TRACE)]
#[allow(clippy::boxed_local)]
fn pagestream_do_batch(
max_batch_size: NonZeroUsize,
maybe_carry: &mut Option<Box<BatchedFeMessage>>,
@@ -838,7 +839,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::exists");
(
vec![
self.handle_get_rel_exists_request(&shard, &req, &ctx)
self.handle_get_rel_exists_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
@@ -849,7 +850,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
(
vec![
self.handle_get_nblocks_request(&shard, &req, &ctx)
self.handle_get_nblocks_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
@@ -872,7 +873,7 @@ impl PageServerHandler {
&shard,
effective_request_lsn,
pages,
&ctx,
ctx,
)
.instrument(span.clone())
.await;
@@ -886,7 +887,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
(
vec![
self.handle_db_size_request(&shard, &req, &ctx)
self.handle_db_size_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
@@ -897,7 +898,7 @@ impl PageServerHandler {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
(
vec![
self.handle_get_slru_segment_request(&shard, &req, &ctx)
self.handle_get_slru_segment_request(&shard, &req, ctx)
.instrument(span.clone())
.await,
],
@@ -1009,8 +1010,8 @@ impl PageServerHandler {
.expect("implementation error: timeline_handles should not be locked");
let request_span = info_span!("request", shard_id = tracing::field::Empty);
let (pgb_reader, timeline_handles) = match self.pipelining_config {
Some(PageServicePipeliningConfig { max_batch_size, .. }) => {
let (pgb_reader, timeline_handles) = match self.pipelining_config.clone() {
Some(pipelining_config) => {
self.handle_pagerequests_pipelined(
pgb,
pgb_reader,
@@ -1018,7 +1019,7 @@ impl PageServerHandler {
timeline_id,
timeline_handles,
request_span,
max_batch_size,
pipelining_config,
&ctx,
)
.await
@@ -1048,6 +1049,7 @@ impl PageServerHandler {
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn handle_pagerequests_serial<IO>(
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
@@ -1068,7 +1070,7 @@ impl PageServerHandler {
timeline_id,
&mut timeline_handles,
&self.cancel,
&ctx,
ctx,
request_span.clone(),
)
.await?;
@@ -1079,11 +1081,12 @@ impl PageServerHandler {
return Ok((pgb_reader, timeline_handles));
}
};
self.pagesteam_handle_batched_message(pgb_writer, *msg, &ctx)
self.pagesteam_handle_batched_message(pgb_writer, *msg, ctx)
.await?;
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_pagerequests_pipelined<IO>(
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
@@ -1092,12 +1095,17 @@ impl PageServerHandler {
timeline_id: TimelineId,
mut timeline_handles: TimelineHandles,
request_span: Span,
max_batch_size: NonZeroUsize,
pipelining_config: PageServicePipeliningConfig,
ctx: &RequestContext,
) -> Result<(PostgresBackendReader<IO>, TimelineHandles), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let PageServicePipeliningConfig {
max_batch_size,
protocol_pipelining_mode,
} = pipelining_config;
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let read_messages = {
let cancel = self.cancel.child_token();
@@ -1231,19 +1239,38 @@ impl PageServerHandler {
};
notify_batcher.notify_one();
debug!("processing batch");
self.pagesteam_handle_batched_message(pgb_writer, *batch, &ctx)
self.pagesteam_handle_batched_message(pgb_writer, *batch, ctx)
.await?;
}
Ok(())
};
let (read_message_task_res, _, executor_res): (_, (), _) =
tokio::join!(read_messages, batcher, executor);
let read_messages_res;
let executor_res;
match protocol_pipelining_mode {
PageServiceProtocolPipeliningMode::ConcurrentFutures => {
(read_messages_res, _, executor_res) =
tokio::join!(read_messages, batcher, executor);
}
PageServiceProtocolPipeliningMode::Tasks => {
// cancelled via sensitivity to self.cancel
let read_messages_task = tokio::task::spawn(read_messages);
// cancelled when it observes read_messages_task disconnect the channel
let batcher_task = tokio::task::spawn(batcher);
executor_res = executor.await;
read_messages_res = read_messages_task
.await
.context("read_messages task panicked, check logs for details")?;
let _: () = batcher_task
.await
.context("batcher task panicked, check logs for details")?;
}
}
match (read_message_task_res, executor_res) {
match (read_messages_res, executor_res) {
(Err(e), _) | (_, Err(e)) => {
let e: QueryError = e;
return Err(e);
Err(e)
}
(Ok((pgb_reader, timeline_handles)), Ok(())) => Ok((pgb_reader, timeline_handles)),
}
@@ -1395,7 +1422,7 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
@@ -1425,7 +1452,7 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
@@ -1455,7 +1482,7 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
@@ -1513,7 +1540,7 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,

View File

@@ -13,24 +13,61 @@ from fixtures.utils import humantime_to_ms
TARGET_RUNTIME = 5
MAX_BATCH_SIZES = [None, 1, 2, 4, 8, 16, 32]
@dataclass
class PageServicePipeliningConfig:
max_batch_size: int
protocol_pipelining_mode: str
PROTOCOL_PIPELINING_MODES = ["concurrent-futures", "tasks"]
NON_BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None]
for max_batch_size in [1, 32]:
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode))
BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode))
@pytest.mark.parametrize(
"tablesize_mib, max_batch_size, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
[
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
# non-batchable workloads should identically modulo overheads of pipelining and batching.
# importantly, latency of pipelined configs should be no worse than non-pipelined
*[
(50, n, TARGET_RUNTIME, 1, 128, f"not batchable max batch size {n}") for n in MAX_BATCH_SIZES
(
50,
config,
TARGET_RUNTIME,
1,
128,
f"not batchable {dataclasses.asdict(config) if config else None}",
)
for config in NON_BATCHABLE
],
# batchable workloads should show throughput and CPU efficiency improvements
*[
(50, n, TARGET_RUNTIME, 100, 128, f"batchable max batch size {n}") for n in MAX_BATCH_SIZES
]
(
50,
config,
TARGET_RUNTIME,
100,
128,
f"batchable {dataclasses.asdict(config) if config else None}",
)
for config in BATCHABLE
],
],
)
def test_getpage_merge_smoke(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
tablesize_mib: int,
max_batch_size: Optional[int],
pipelining_config: None | PageServicePipeliningConfig,
target_runtime: int,
effective_io_concurrency: int,
readhead_buffer_size: int,
@@ -48,16 +85,20 @@ def test_getpage_merge_smoke(
params.update(
{
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
"max_batch_size": (
-1 if max_batch_size is None else max_batch_size,
{},
),
"pipelining_enabled": (1 if pipelining_config else 0, {}),
# target_runtime is just a polite ask to the workload to run for this long
"effective_io_concurrency": (effective_io_concurrency, {}),
"readhead_buffer_size": (readhead_buffer_size, {}),
# name is not a metric
}
)
if pipelining_config:
params.update(
{
f"pipelining_config.{k}": (v, {})
for k, v in dataclasses.asdict(pipelining_config).items()
}
)
log.info("params: %s", params)
@@ -167,9 +208,11 @@ def test_getpage_merge_smoke(
after = get_metrics()
return (after - before).normalize(iters - 1)
env.pageserver.patch_config_toml_nonrecursive({"page_service_pipelining": {
"max_batch_size": max_batch_size,
}} if max_batch_size is not None else {})
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
if pipelining_config is not None
else {}
)
env.pageserver.restart()
metrics = workload()
@@ -198,14 +241,20 @@ def test_getpage_merge_smoke(
)
@pytest.mark.parametrize(
"max_batch_size", [None, 1, 32]
)
PRECISION_CONFIGS: list[Optional[PageServicePipeliningConfig]] = [None]
for max_batch_size in [1, 32]:
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
PRECISION_CONFIGS.append(
PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)
)
@pytest.mark.parametrize("pipelining_config", PRECISION_CONFIGS)
def test_timer_precision(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
max_batch_size: Optional[int],
pipelining_config: Optional[PageServicePipeliningConfig],
):
"""
Determine the batching timeout precision (mean latency) and tail latency impact.
@@ -221,10 +270,8 @@ def test_timer_precision(
#
def patch_ps_config(ps_config):
if max_batch_size is not None:
ps_config["page_service_pipelining"] = {
"max_batch_size": max_batch_size,
}
if pipelining_config is not None:
ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config)
neon_env_builder.pageserver_config_override = patch_ps_config