page_service: rewrite batching to work without a timeout (#9851)

# Problem

The timeout-based batching adds latency to unbatchable workloads.

We can choose a short batching timeout (e.g. 10us) but that requires
high-resolution timers, which tokio doesn't have.
I thoroughly explored options to use OS timers (see
[this](https://github.com/neondatabase/neon/pull/9822) abandoned PR).
In short, it's not an attractive option because any timer implementation
adds non-trivial overheads.

# Solution

The insight is that, in the steady state of a batchable workload, the
time we spend in `get_vectored` will be hundreds of microseconds anyway.

If we prepare the next batch concurrently to `get_vectored`, we will
have a sizeable batch ready once `get_vectored` of the current batch is
done and do not need an explicit timeout.

This can be reasonably described as **pipelining of the protocol
handler**.

# Implementation

We model the sub-protocol handler for pagestream requests
(`handle_pagrequests`) as two futures that form a pipeline:

2. Batching: read requests from the connection and fill the current
batch
3. Execution: `take` the current batch, execute it using `get_vectored`,
and send the response.

The Reading and Batching stage are connected through a new type of
channel called `spsc_fold`.

See the long comment in the `handle_pagerequests_pipelined` for details.

# Changes

- Refactor `handle_pagerequests`
    - separate functions for
- reading one protocol message; produces a `BatchedFeMessage` with just
one page request in it
- batching; tried to merge an incoming `BatchedFeMessage` into an
existing `BatchedFeMessage`; returns `None` on success and returns back
the incoming message in case merging isn't possible
        - execution of a batched message
- unify the timeline handle acquisition & request span construction; it
now happen in the function that reads the protocol message
- Implement serial and pipelined model
    - serial: what we had before any of the batching changes
      - read one protocol message
      - execute protocol messages
    - pipelined: the design described above
- optionality for execution of the pipeline: either via concurrent
futures vs tokio tasks
- Pageserver config
  - remove batching timeout field
  - add ability to configure pipelining mode
- add ability to limit max batch size for pipelined configurations
(required for the rollout, cf
https://github.com/neondatabase/cloud/issues/20620 )
  - ability to configure execution mode
- Tests
  - remove `batch_timeout` parametrization
  - rename `test_getpage_merge_smoke` to `test_throughput`
- add parametrization to test different max batch sizes and execution
moes
  - rename `test_timer_precision` to `test_latency`
  - rename the test case file to `test_page_service_batching.py`
  - better descriptions of what the tests actually do

## On the holding The `TimelineHandle` in the pending batch

While batching, we hold the `TimelineHandle` in the pending batch.
Therefore, the timeline will not finish shutting down while we're
batching.

This is not a problem in practice because the concurrently ongoing
`get_vectored` call will fail quickly with an error indicating that the
timeline is shutting down.
This results in the Execution stage returning a `QueryError::Shutdown`,
which causes the pipeline / entire page service connection to shut down.
This drops all references to the
`Arc<Mutex<Option<Box<BatchedFeMessage>>>>` object, thereby dropping the
contained `TimelineHandle`s.

- => fixes https://github.com/neondatabase/neon/issues/9850

# Performance

Local run of the benchmarks, results in [this empty
commit](1cf5b1463f)
in the PR branch.

Key take-aways:
* `concurrent-futures` and `tasks` deliver identical `batching_factor`
* tail latency impact unknown, cf
https://github.com/neondatabase/neon/issues/9837
* `concurrent-futures` has higher throughput than `tasks` in all
workloads (=lower `time` metric)
* In unbatchable workloads, `concurrent-futures` has 5% higher
`CPU-per-throughput` than that of `tasks`, and 15% higher than that of
`serial`.
* In batchable-32 workload, `concurrent-futures` has 8% lower
`CPU-per-throughput` than that of `tasks` (comparison to tput of
`serial` is irrelevant)
* in unbatchable workloads, mean and tail latencies of
`concurrent-futures` is practically identical to `serial`, whereas
`tasks` adds 20-30us of overhead

Overall, `concurrent-futures` seems like a slightly more attractive
choice.

# Rollout

This change is disabled-by-default.

Rollout plan:
- https://github.com/neondatabase/cloud/issues/20620

# Refs

- epic: https://github.com/neondatabase/neon/issues/9376
- this sub-task: https://github.com/neondatabase/neon/issues/9377
- the abandoned attempt to improve batching timeout resolution:
https://github.com/neondatabase/neon/pull/9820
- closes https://github.com/neondatabase/neon/issues/9850
- fixes https://github.com/neondatabase/neon/issues/9835
This commit is contained in:
Christian Schwarz
2024-11-30 01:16:24 +01:00
committed by Ivan Efremov
parent 547b2d2827
commit 5dad89acd4
11 changed files with 1262 additions and 457 deletions

View File

@@ -3804,9 +3804,10 @@ class Endpoint(PgProtocol, LogUtils):
# shared_buffers = 512kB to make postgres use LFC intensively
# neon.max_file_cache_size and neon.file_cache size limit are
# set to 1MB because small LFC is better for testing (helps to find more problems)
lfc_path_escaped = str(lfc_path).replace("'", "''")
config_lines = [
"shared_buffers = 512kB",
f"neon.file_cache_path = '{self.lfc_path()}'",
f"neon.file_cache_path = '{lfc_path_escaped}'",
"neon.max_file_cache_size = 1MB",
"neon.file_cache_size_limit = 1MB",
] + config_lines

View File

@@ -11,36 +11,95 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.utils import humantime_to_ms
TARGET_RUNTIME = 60
TARGET_RUNTIME = 30
@dataclass
class PageServicePipeliningConfig:
pass
@dataclass
class PageServicePipeliningConfigSerial(PageServicePipeliningConfig):
mode: str = "serial"
@dataclass
class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
max_batch_size: int
execution: str
mode: str = "pipelined"
EXECUTION = ["concurrent-futures", "tasks"]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
for execution in EXECUTION:
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
@pytest.mark.skip("See https://github.com/neondatabase/neon/pull/9820#issue-2675856095")
@pytest.mark.parametrize(
"tablesize_mib, batch_timeout, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
[
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
# the next 4 cases demonstrate how batchable workloads benefit from batching
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
# non-batchable workloads
# (A separate benchmark will consider latency).
*[
(
50,
config,
TARGET_RUNTIME,
1,
128,
f"not batchable {dataclasses.asdict(config)}",
)
for config in NON_BATCHABLE
],
# batchable workloads should show throughput and CPU efficiency improvements
*[
(
50,
config,
TARGET_RUNTIME,
100,
128,
f"batchable {dataclasses.asdict(config)}",
)
for config in BATCHABLE
],
],
)
def test_getpage_merge_smoke(
def test_throughput(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
tablesize_mib: int,
batch_timeout: str | None,
pipelining_config: PageServicePipeliningConfig,
target_runtime: int,
effective_io_concurrency: int,
readhead_buffer_size: int,
name: str,
):
"""
Do a bunch of sequential scans and ensure that the pageserver does some merging.
Do a bunch of sequential scans with varying compute and pipelining configurations.
Primary performance metrics are the achieved batching factor and throughput (wall clock time).
Resource utilization is also interesting - we currently measure CPU time.
The test is a fixed-runtime based type of test (target_runtime).
Hence, the results are normalized to the number of iterations completed within target runtime.
If the compute doesn't provide pipeline depth (effective_io_concurrency=1),
performance should be about identical in all configurations.
Pipelining can still yield improvements in these scenarios because it parses the
next request while the current one is still being executed.
If the compute provides pipeline depth (effective_io_concurrency=100), then
pipelining configs, especially with max_batch_size>1 should yield dramatic improvements
in all performance metrics.
"""
#
@@ -51,14 +110,16 @@ def test_getpage_merge_smoke(
params.update(
{
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
"batch_timeout": (
-1 if batch_timeout is None else 1e3 * humantime_to_ms(batch_timeout),
{"unit": "us"},
),
# target_runtime is just a polite ask to the workload to run for this long
"effective_io_concurrency": (effective_io_concurrency, {}),
"readhead_buffer_size": (readhead_buffer_size, {}),
# name is not a metric
# name is not a metric, we just use it to identify the test easily in the `test_...[...]`` notation
}
)
params.update(
{
f"pipelining_config.{k}": (v, {})
for k, v in dataclasses.asdict(pipelining_config).items()
}
)
@@ -170,7 +231,9 @@ def test_getpage_merge_smoke(
after = get_metrics()
return (after - before).normalize(iters - 1)
env.pageserver.patch_config_toml_nonrecursive({"server_side_batch_timeout": batch_timeout})
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
)
env.pageserver.restart()
metrics = workload()
@@ -199,23 +262,30 @@ def test_getpage_merge_smoke(
)
@pytest.mark.skip("See https://github.com/neondatabase/neon/pull/9820#issue-2675856095")
PRECISION_CONFIGS: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
PRECISION_CONFIGS.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
@pytest.mark.parametrize(
"batch_timeout", [None, "10us", "20us", "50us", "100us", "200us", "500us", "1ms"]
"pipelining_config,name",
[(config, f"{dataclasses.asdict(config)}") for config in PRECISION_CONFIGS],
)
def test_timer_precision(
def test_latency(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
batch_timeout: str | None,
pipelining_config: PageServicePipeliningConfig,
name: str,
):
"""
Determine the batching timeout precision (mean latency) and tail latency impact.
Measure the latency impact of pipelining in an un-batchable workloads.
The baseline is `None`; an ideal batching timeout implementation would increase
the mean latency by exactly `batch_timeout`.
An ideal implementation should not increase average or tail latencies for such workloads.
That is not the case with the current implementation, will be addressed in future changes.
We don't have support in pagebench to create queue depth yet.
=> https://github.com/neondatabase/neon/issues/9837
"""
#
@@ -223,7 +293,8 @@ def test_timer_precision(
#
def patch_ps_config(ps_config):
ps_config["server_side_batch_timeout"] = batch_timeout
if pipelining_config is not None:
ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config)
neon_env_builder.pageserver_config_override = patch_ps_config