Files
neon/test_runner/performance/test_sharded_ingest.py
Fedor Dikarev 9a6ace9bde introduce new runners: unit-perf and use them for benchmark jobs (#11409)
## Problem
Benchmarks results are inconsistent on existing small-metal runners

## Summary of changes
Introduce new `unit-perf` runners, and lets run benchmark on them.

The new hardware has slower, but consistent, CPU frequency - if run with
default governor schedutil.
Thus we needed to adjust some testcases' timeouts and add some retry
steps where hard-coded timeouts couldn't be increased without changing
the system under test.
-
[wait_for_last_record_lsn](6592d69a67/test_runner/fixtures/pageserver/utils.py (L193))
1000s -> 2000s
-
[test_branch_creation_many](https://github.com/neondatabase/neon/pull/11409/files#diff-2ebfe76f89004d563c7e53e3ca82462e1d85e92e6d5588e8e8f598bbe119e927)
1000s
-
[test_ingest_insert_bulk](https://github.com/neondatabase/neon/pull/11409/files#diff-e90e685be4a87053bc264a68740969e6a8872c8897b8b748d0e8c5f683a68d9f)
- with back throttling disabled compute becomes unresponsive for more
than 60 seconds (PG hard-coded client authentication connection timeout)
-
[test_sharded_ingest](https://github.com/neondatabase/neon/pull/11409/files#diff-e8d870165bd44acb9a6d8350f8640b301c1385a4108430b8d6d659b697e4a3f1)
600s -> 1200s

Right now there are only 2 runners of that class, and if we decide to go
with them, we have to check how much that type of runners we need, so
jobs not stuck with waiting for that type of runners available.

However we now decided to run those runners with governor performance
instead of schedutil.
This achieves almost same performance as previous runners but still
achieves consistent results for same commit

Related issue to activate performance governor on these runners
https://github.com/neondatabase/runner/pull/138

## Verification that it helps

### analyze runtimes on new runner for same commit

Table of runtimes for the same commit on different runners in
[run](https://github.com/neondatabase/neon/actions/runs/14417589789)

| Run | Benchmarks (1) | Benchmarks (2) |Benchmarks (3) |Benchmarks (4)
| Benchmarks (5) |
|--------|--------|---------|---------|---------|---------|
| 1 | 1950.37s | 6374.55s |  3646.15s |  4149.48s |  2330.22s | 
| 2 | - | 6369.27s |  3666.65s |  4162.42s |  2329.23s | 
| Delta % |  - |  0,07 %  | 0,5 %   |   0,3 % | 0,04 %   |
| with governor performance | 1519.57s |  4131.62s |  - | -  |  - |
| second run gov. perf. | 1513.62s |  4134.67s |  - | -  |  - |
| Delta % |  0,3 % |  0,07 %  |  -  |  - | -   |
| speedup gov. performance | 22 % |  35 % |  - | -  |  - |
| current desktop class hetzner runners (main) | 1487.10s | 3699.67s | -
| - | - |
| slower than desktop class | 2 % |  12 % |  - | -  |  - |


In summary, the runtimes for the same commit on this hardware varies
less than 1 %.

---------

Co-authored-by: BodoBolero <peterbendel@neon.tech>
2025-04-15 08:21:44 +00:00

162 lines
5.7 KiB
Python

from __future__ import annotations
from contextlib import closing
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.common_types import Lsn, TenantShardId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
tenant_get_shards,
wait_for_last_flush_lsn,
)
@pytest.mark.timeout(1200)
@pytest.mark.parametrize("shard_count", [1, 8, 32])
@pytest.mark.parametrize(
"wal_receiver_protocol",
[
"vanilla",
"interpreted-bincode-compressed",
"interpreted-protobuf-compressed",
],
)
def test_sharded_ingest(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
shard_count: int,
wal_receiver_protocol: str,
):
"""
Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
(shard_count=1) to the sharded case indicates the overhead of sharding.
"""
ROW_COUNT = 100_000_000 # about 7 GB of WAL
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_configs()
for ps in env.pageservers:
if wal_receiver_protocol == "vanilla":
ps.patch_config_toml_nonrecursive(
{
"wal_receiver_protocol": {
"type": "vanilla",
}
}
)
elif wal_receiver_protocol == "interpreted-bincode-compressed":
ps.patch_config_toml_nonrecursive(
{
"wal_receiver_protocol": {
"type": "interpreted",
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
}
}
)
elif wal_receiver_protocol == "interpreted-protobuf-compressed":
ps.patch_config_toml_nonrecursive(
{
"wal_receiver_protocol": {
"type": "interpreted",
"args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}},
}
}
)
else:
raise AssertionError("Test must use explicit wal receiver protocol config")
env.start()
# Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure
# the storage controller doesn't mess with shard placements.
#
# TODO: there should be a way to disable storage controller background reconciliations.
# Currently, disabling reconciliation also disables foreground operations.
tenant_id, timeline_id = env.create_tenant(shard_count=shard_count)
for shard_number in range(0, shard_count):
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
pageserver_id = shard_number + 1
env.storage_controller.tenant_shard_migrate(tenant_shard_id, pageserver_id)
shards = tenant_get_shards(env, tenant_id)
env.storage_controller.reconcile_until_idle()
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
# Start the endpoint.
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
# Ingest data and measure WAL volume and duration.
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
log.info("Ingesting data")
cur.execute("set statement_timeout = 0")
cur.execute("create table huge (i int, j int)")
with zenbenchmark.record_duration("pageserver_ingest"):
with zenbenchmark.record_duration("wal_ingest"):
cur.execute(f"insert into huge values (generate_series(1, {ROW_COUNT}), 0)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
# Record metrics.
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
total_ingested = 0
total_records_received = 0
ingested_by_ps = []
for pageserver in env.pageservers:
ingested = pageserver.http_client().get_metric_value(
"pageserver_wal_ingest_bytes_received_total"
)
records_received = pageserver.http_client().get_metric_value(
"pageserver_wal_ingest_records_received_total"
)
if ingested is None:
ingested = 0
if records_received is None:
records_received = 0
ingested_by_ps.append(
(
pageserver.id,
{
"ingested": ingested,
"records_received": records_received,
},
)
)
total_ingested += int(ingested)
total_records_received += int(records_received)
total_ingested_mb = total_ingested / (1024 * 1024)
zenbenchmark.record("wal_ingested", total_ingested_mb, "MB", MetricReport.LOWER_IS_BETTER)
zenbenchmark.record(
"records_received", total_records_received, "records", MetricReport.LOWER_IS_BETTER
)
ingested_by_ps.sort(key=lambda x: x[0])
for _, stats in ingested_by_ps:
for k in stats:
if k != "records_received":
stats[k] /= 1024**2
log.info(f"WAL ingested by each pageserver {ingested_by_ps}")
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
# The pageservers can take a long time to shut down gracefully, presumably due to the upload
# queue or compactions or something. Just stop them immediately, we don't care.
env.stop(immediate=True)