First version of a new benchmark to test larger OLTP workload (#11053)

## Problem

We want to support larger tenants (regarding logical database size,
number of transactions per second etc.) and should increase our test
coverage of OLTP transactions at larger scale.

## Summary of changes

Start a new benchmark that over time will add more OLTP tests at larger
scale.
This PR covers the first version and will be extended in further PRs.

Also fix some infrastructure:
- default for new connections and large tenants is to use connection
pooler pgbouncer, however our fixture always added
`statement_timeout=120` which is not compatible with pooler
[see](https://neon.tech/docs/connect/connection-errors#unsupported-startup-parameter)
- action to create branch timed out after 10 seconds and 10 retries but
for large tenants it can take longer so use increasing back-off for
retries

## Test run

https://github.com/neondatabase/neon/actions/runs/13593446706
This commit is contained in:
Peter Bendel
2025-03-03 16:25:48 +01:00
committed by GitHub
parent 38277497fd
commit a07599949f
8 changed files with 324 additions and 4 deletions

View File

@@ -253,10 +253,15 @@ class PgProtocol:
# enough for our tests, but if you need a longer, you can
# change it by calling "SET statement_timeout" after
# connecting.
# pooler does not support statement_timeout
# Check if the hostname contains the string 'pooler'
hostname = result.get("host", "")
log.info(f"Hostname: {hostname}")
options = result.get("options", "")
if "statement_timeout" not in options:
if "statement_timeout" not in options and "pooler" not in hostname:
options = f"-cstatement_timeout=120s {options}"
result["options"] = options
return result
# autocommit=True here by default because that's what we need most of the time

View File

@@ -0,0 +1,47 @@
\set event_type random(1,10)
\set service_key random(1, 3)
INSERT INTO webhook.incoming_webhooks (
created_at,
delivery_id,
upstream_emitted_at,
service_key,
event_id,
source,
body,
json,
additional_data,
is_body_encrypted,
event_type
) VALUES (
now(),
gen_random_uuid(),
now() - interval '10 minutes',
CASE :service_key::int
WHEN 1 THEN 'shopify'
WHEN 2 THEN 'stripe'
WHEN 3 THEN 'github'
END,
'evt_' || gen_random_uuid(), -- Ensures uniqueness
CASE :service_key::int
WHEN 1 THEN 'Shopify'
WHEN 2 THEN 'Stripe'
WHEN 3 THEN 'GitHub'
END,
'{"order_id": 987654, "customer": {"name": "John Doe", "email": "john.doe@example.com"}, "items": [{"product_id": 12345, "quantity": 2}, {"product_id": 67890, "quantity": 1}], "total": 199.99}',
'{"order_id": 987654, "customer": {"name": "John Doe", "email": "john.doe@example.com"}, "items": [{"product_id": 12345, "quantity": 2}, {"product_id": 67890, "quantity": 1}], "total": 199.99}'::jsonb,
'{"metadata": {"user_agent": "Mozilla/5.0", "ip_address": "203.0.113.42"}}'::jsonb,
false,
CASE :event_type::int
WHEN 1 THEN 'ORDER_PLACED'
WHEN 2 THEN 'ORDER_CANCELLED'
WHEN 3 THEN 'PAYMENT_SUCCESSFUL'
WHEN 4 THEN 'PAYMENT_FAILED'
WHEN 5 THEN 'CUSTOMER_CREATED'
WHEN 6 THEN 'CUSTOMER_UPDATED'
WHEN 7 THEN 'PRODUCT_UPDATED'
WHEN 8 THEN 'INVENTORY_LOW'
WHEN 9 THEN 'SHIPPING_DISPATCHED'
WHEN 10 THEN 'REFUND_ISSUED'
END
);

View File

@@ -0,0 +1,15 @@
-- Zipfian distributions model real-world access patterns where:
-- A few values (popular IDs) are accessed frequently.
-- Many values are accessed rarely.
-- This is useful for simulating realistic workloads, like webhook processing where recent events are more frequently accessed.
\set alpha 1.2
\set min_id 1
\set max_id 135000000
\set zipf_random_id random_zipfian(:min_id, :max_id, :alpha)
SELECT *
FROM webhook.incoming_webhooks
WHERE id = (:zipf_random_id)::bigint
LIMIT 1;

View File

@@ -0,0 +1,9 @@
-- select one of the most recent webhook records (created in the branch timeline during the bench run)
SELECT *
FROM webhook.incoming_webhooks
WHERE id = (
SELECT (floor(random() * (
(SELECT last_value FROM webhook.incoming_webhooks_id_seq) - 1350000001 + 1
) + 1350000001))::bigint
)
LIMIT 1;

View File

@@ -0,0 +1,90 @@
from __future__ import annotations
import os
import timeit
from pathlib import Path
import pytest
from fixtures.benchmark_fixture import PgBenchRunResult
from fixtures.compare_fixtures import PgCompare
from performance.test_perf_pgbench import get_durations_matrix, utc_now_timestamp
def get_custom_scripts(
default: str = "insert_webhooks.sql@2 select_any_webhook_with_skew.sql@4 select_recent_webhook.sql@4",
) -> list[str]:
# We parametrize each run with the custom scripts to run and their weights.
# The custom scripts and their weights are passed through TEST_PGBENCH_CUSTOM_SCRIPTS env variable.
# Delimit the custom scripts for one run by spaces and for different runs by commas, for example:
# "insert_webhooks.sql@2 select_any_webhook_with_skew.sql@4,insert_webhooks.sql@8 select_any_webhook_with_skew.sql@2"
# Databases/branches are pre-created and passed through BENCHMARK_CONNSTR env variable.
scripts = os.getenv("TEST_PGBENCH_CUSTOM_SCRIPTS", default=str(default))
rv = []
for s in scripts.split(","):
rv.append(s)
return rv
def run_test_pgbench(env: PgCompare, custom_scripts: str, duration: int):
password = env.pg.default_options.get("password", None)
options = env.pg.default_options.get("options", "")
# drop password from the connection string by passing password=None and set password separately
connstr = env.pg.connstr(password=None, options=options)
# if connstr does not contain pooler we can set statement_timeout to 0
if "pooler" not in connstr:
options = "-cstatement_timeout=0 " + env.pg.default_options.get("options", "")
connstr = env.pg.connstr(password=None, options=options)
script_args = [
"pgbench",
"-n", # no explicit vacuum before the test - we want to rely on auto-vacuum
"-M",
"prepared",
"--client=500",
"--jobs=100",
f"-T{duration}",
"-P60", # progress every minute
"--progress-timestamp",
]
for script in custom_scripts.split():
script_args.extend(["-f", f"test_runner/performance/large_synthetic_oltp/{script}"])
script_args.append(connstr)
run_pgbench(
env,
"custom-scripts",
script_args,
password=password,
)
def run_pgbench(env: PgCompare, prefix: str, cmdline, password: None):
environ: dict[str, str] = {}
if password is not None:
environ["PGPASSWORD"] = password
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = env.pg_bin.run_capture(cmdline, env=environ)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
env.flush()
stdout = Path(f"{out}.stdout").read_text()
res = PgBenchRunResult.parse_from_stdout(
stdout=stdout,
run_duration=run_duration,
run_start_timestamp=run_start_timestamp,
run_end_timestamp=run_end_timestamp,
)
env.zenbenchmark.record_pg_bench_result(prefix, res)
@pytest.mark.parametrize("custom_scripts", get_custom_scripts())
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_perf_oltp_large_tenant(remote_compare: PgCompare, custom_scripts: str, duration: int):
run_test_pgbench(remote_compare, custom_scripts, duration)
# todo: run re-index, analyze, vacuum, etc. after the test and measure and report its duration