Files
neon/test_runner/performance/pageserver/test_page_service_batching.py
Christian Schwarz aa4ec11af9 page_service: rewrite batching to work without a timeout (#9851)
# Problem

The timeout-based batching adds latency to unbatchable workloads.

We can choose a short batching timeout (e.g. 10us) but that requires
high-resolution timers, which tokio doesn't have.
I thoroughly explored options to use OS timers (see
[this](https://github.com/neondatabase/neon/pull/9822) abandoned PR).
In short, it's not an attractive option because any timer implementation
adds non-trivial overheads.

# Solution

The insight is that, in the steady state of a batchable workload, the
time we spend in `get_vectored` will be hundreds of microseconds anyway.

If we prepare the next batch concurrently to `get_vectored`, we will
have a sizeable batch ready once `get_vectored` of the current batch is
done and do not need an explicit timeout.

This can be reasonably described as **pipelining of the protocol
handler**.

# Implementation

We model the sub-protocol handler for pagestream requests
(`handle_pagrequests`) as two futures that form a pipeline:

2. Batching: read requests from the connection and fill the current
batch
3. Execution: `take` the current batch, execute it using `get_vectored`,
and send the response.

The Reading and Batching stage are connected through a new type of
channel called `spsc_fold`.

See the long comment in the `handle_pagerequests_pipelined` for details.

# Changes

- Refactor `handle_pagerequests`
    - separate functions for
- reading one protocol message; produces a `BatchedFeMessage` with just
one page request in it
- batching; tried to merge an incoming `BatchedFeMessage` into an
existing `BatchedFeMessage`; returns `None` on success and returns back
the incoming message in case merging isn't possible
        - execution of a batched message
- unify the timeline handle acquisition & request span construction; it
now happen in the function that reads the protocol message
- Implement serial and pipelined model
    - serial: what we had before any of the batching changes
      - read one protocol message
      - execute protocol messages
    - pipelined: the design described above
- optionality for execution of the pipeline: either via concurrent
futures vs tokio tasks
- Pageserver config
  - remove batching timeout field
  - add ability to configure pipelining mode
- add ability to limit max batch size for pipelined configurations
(required for the rollout, cf
https://github.com/neondatabase/cloud/issues/20620 )
  - ability to configure execution mode
- Tests
  - remove `batch_timeout` parametrization
  - rename `test_getpage_merge_smoke` to `test_throughput`
- add parametrization to test different max batch sizes and execution
moes
  - rename `test_timer_precision` to `test_latency`
  - rename the test case file to `test_page_service_batching.py`
  - better descriptions of what the tests actually do

## On the holding The `TimelineHandle` in the pending batch

While batching, we hold the `TimelineHandle` in the pending batch.
Therefore, the timeline will not finish shutting down while we're
batching.

This is not a problem in practice because the concurrently ongoing
`get_vectored` call will fail quickly with an error indicating that the
timeline is shutting down.
This results in the Execution stage returning a `QueryError::Shutdown`,
which causes the pipeline / entire page service connection to shut down.
This drops all references to the
`Arc<Mutex<Option<Box<BatchedFeMessage>>>>` object, thereby dropping the
contained `TimelineHandle`s.

- => fixes https://github.com/neondatabase/neon/issues/9850

# Performance

Local run of the benchmarks, results in [this empty
commit](1cf5b1463f)
in the PR branch.

Key take-aways:
* `concurrent-futures` and `tasks` deliver identical `batching_factor`
* tail latency impact unknown, cf
https://github.com/neondatabase/neon/issues/9837
* `concurrent-futures` has higher throughput than `tasks` in all
workloads (=lower `time` metric)
* In unbatchable workloads, `concurrent-futures` has 5% higher
`CPU-per-throughput` than that of `tasks`, and 15% higher than that of
`serial`.
* In batchable-32 workload, `concurrent-futures` has 8% lower
`CPU-per-throughput` than that of `tasks` (comparison to tput of
`serial` is irrelevant)
* in unbatchable workloads, mean and tail latencies of
`concurrent-futures` is practically identical to `serial`, whereas
`tasks` adds 20-30us of overhead

Overall, `concurrent-futures` seems like a slightly more attractive
choice.

# Rollout

This change is disabled-by-default.

Rollout plan:
- https://github.com/neondatabase/cloud/issues/20620

# Refs

- epic: https://github.com/neondatabase/neon/issues/9376
- this sub-task: https://github.com/neondatabase/neon/issues/9377
- the abandoned attempt to improve batching timeout resolution:
https://github.com/neondatabase/neon/pull/9820
- closes https://github.com/neondatabase/neon/issues/9850
- fixes https://github.com/neondatabase/neon/issues/9835
2024-11-30 00:16:24 +00:00

379 lines
13 KiB
Python

import dataclasses
import json
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.utils import humantime_to_ms
TARGET_RUNTIME = 30
@dataclass
class PageServicePipeliningConfig:
pass
@dataclass
class PageServicePipeliningConfigSerial(PageServicePipeliningConfig):
mode: str = "serial"
@dataclass
class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
max_batch_size: int
execution: str
mode: str = "pipelined"
EXECUTION = ["concurrent-futures", "tasks"]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
for execution in EXECUTION:
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
@pytest.mark.parametrize(
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
[
# non-batchable workloads
# (A separate benchmark will consider latency).
*[
(
50,
config,
TARGET_RUNTIME,
1,
128,
f"not batchable {dataclasses.asdict(config)}",
)
for config in NON_BATCHABLE
],
# batchable workloads should show throughput and CPU efficiency improvements
*[
(
50,
config,
TARGET_RUNTIME,
100,
128,
f"batchable {dataclasses.asdict(config)}",
)
for config in BATCHABLE
],
],
)
def test_throughput(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
tablesize_mib: int,
pipelining_config: PageServicePipeliningConfig,
target_runtime: int,
effective_io_concurrency: int,
readhead_buffer_size: int,
name: str,
):
"""
Do a bunch of sequential scans with varying compute and pipelining configurations.
Primary performance metrics are the achieved batching factor and throughput (wall clock time).
Resource utilization is also interesting - we currently measure CPU time.
The test is a fixed-runtime based type of test (target_runtime).
Hence, the results are normalized to the number of iterations completed within target runtime.
If the compute doesn't provide pipeline depth (effective_io_concurrency=1),
performance should be about identical in all configurations.
Pipelining can still yield improvements in these scenarios because it parses the
next request while the current one is still being executed.
If the compute provides pipeline depth (effective_io_concurrency=100), then
pipelining configs, especially with max_batch_size>1 should yield dramatic improvements
in all performance metrics.
"""
#
# record perf-related parameters as metrics to simplify processing of results
#
params: dict[str, tuple[float | int, dict[str, Any]]] = {}
params.update(
{
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
# target_runtime is just a polite ask to the workload to run for this long
"effective_io_concurrency": (effective_io_concurrency, {}),
"readhead_buffer_size": (readhead_buffer_size, {}),
# name is not a metric, we just use it to identify the test easily in the `test_...[...]`` notation
}
)
params.update(
{
f"pipelining_config.{k}": (v, {})
for k, v in dataclasses.asdict(pipelining_config).items()
}
)
log.info("params: %s", params)
for param, (value, kwargs) in params.items():
zenbenchmark.record(
param,
metric_value=value,
unit=kwargs.pop("unit", ""),
report=MetricReport.TEST_PARAM,
**kwargs,
)
#
# Setup
#
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
cur.execute(f"SET effective_io_concurrency={effective_io_concurrency}")
cur.execute(
f"SET neon.readahead_buffer_size={readhead_buffer_size}"
) # this is the current default value, but let's hard-code that
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
log.info("Filling the table")
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
tablesize = tablesize_mib * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?
#
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
#
@dataclass
class Metrics:
time: float
pageserver_getpage_count: float
pageserver_vectored_get_count: float
compute_getpage_count: float
pageserver_cpu_seconds_total: float
def __sub__(self, other: "Metrics") -> "Metrics":
return Metrics(
time=self.time - other.time,
pageserver_getpage_count=self.pageserver_getpage_count
- other.pageserver_getpage_count,
pageserver_vectored_get_count=self.pageserver_vectored_get_count
- other.pageserver_vectored_get_count,
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
- other.pageserver_cpu_seconds_total,
)
def normalize(self, by) -> "Metrics":
return Metrics(
time=self.time / by,
pageserver_getpage_count=self.pageserver_getpage_count / by,
pageserver_vectored_get_count=self.pageserver_vectored_get_count / by,
compute_getpage_count=self.compute_getpage_count / by,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
)
def get_metrics() -> Metrics:
with conn.cursor() as cur:
cur.execute(
"select value from neon_perf_counters where metric='getpage_wait_seconds_count';"
)
compute_getpage_count = cur.fetchall()[0][0]
pageserver_metrics = ps_http.get_metrics()
return Metrics(
time=time.time(),
pageserver_getpage_count=pageserver_metrics.query_one(
"pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"}
).value,
pageserver_vectored_get_count=pageserver_metrics.query_one(
"pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"}
).value,
compute_getpage_count=compute_getpage_count,
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
"libmetrics_process_cpu_seconds_highres"
).value,
)
def workload() -> Metrics:
start = time.time()
iters = 0
while time.time() - start < target_runtime or iters < 2:
log.info("Seqscan %d", iters)
if iters == 1:
# round zero for warming up
before = get_metrics()
cur.execute(
"select clear_buffer_cache()"
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
cur.execute("select sum(data::bigint) from t")
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
iters += 1
after = get_metrics()
return (after - before).normalize(iters - 1)
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
)
env.pageserver.restart()
metrics = workload()
log.info("Results: %s", metrics)
#
# Sanity-checks on the collected data
#
# assert that getpage counts roughly match between compute and ps
assert metrics.pageserver_getpage_count == pytest.approx(
metrics.compute_getpage_count, rel=0.01
)
#
# Record the results
#
for metric, value in dataclasses.asdict(metrics).items():
zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM)
zenbenchmark.record(
"perfmetric.batching_factor",
metrics.pageserver_getpage_count / metrics.pageserver_vectored_get_count,
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
PRECISION_CONFIGS: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
PRECISION_CONFIGS.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
@pytest.mark.parametrize(
"pipelining_config,name",
[(config, f"{dataclasses.asdict(config)}") for config in PRECISION_CONFIGS],
)
def test_latency(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
pipelining_config: PageServicePipeliningConfig,
name: str,
):
"""
Measure the latency impact of pipelining in an un-batchable workloads.
An ideal implementation should not increase average or tail latencies for such workloads.
We don't have support in pagebench to create queue depth yet.
=> https://github.com/neondatabase/neon/issues/9837
"""
#
# Setup
#
def patch_ps_config(ps_config):
if pipelining_config is not None:
ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config)
neon_env_builder.pageserver_config_override = patch_ps_config
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
cur.execute("SET effective_io_concurrency=1")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
log.info("Filling the table")
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
tablesize = 50 * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?
cur.close()
conn.close()
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.stop()
for sk in env.safekeepers:
sk.stop()
#
# Run single-threaded pagebench (TODO: dedup with other benchmark code)
#
env.pageserver.allowed_errors.append(
# https://github.com/neondatabase/neon/issues/6925
r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*"
)
ps_http = env.pageserver.http_client()
cmd = [
str(env.neon_binpath / "pagebench"),
"get-page-latest-lsn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--num-clients",
"1",
"--runtime",
"10s",
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path) as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
total = results["total"]
metric = "latency_mean"
zenbenchmark.record(
metric,
metric_value=humantime_to_ms(total[metric]),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
metric = "latency_percentiles"
for k, v in total[metric].items():
zenbenchmark.record(
f"{metric}.{k}",
metric_value=humantime_to_ms(v),
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)