tests: update batching perf test workload to include scattered LSNs (#11391)

The batching perf test workload is currently read-only sequential scans.
However, realistic workloads have concurrent writes (to other pages)
going on.

This PR simulates concurrent writes to other pages by emitting logical
replication messages.

These degrade the achieved batching factor, for the reason see
- https://github.com/neondatabase/neon/issues/10765

PR 
- https://github.com/neondatabase/neon/pull/11494

will fix this problem and get batching factor back up.

---------

Co-authored-by: Vlad Lazar <vlad@neon.tech>
This commit is contained in:
Christian Schwarz
2025-04-11 11:55:49 +02:00
committed by GitHub
parent 8884865bca
commit 979fa0682b
2 changed files with 65 additions and 26 deletions

View File

@@ -2884,13 +2884,14 @@ class NeonPageserver(PgProtocol, LogUtils):
self,
immediate: bool = False,
timeout_in_seconds: int | None = None,
extra_env_vars: dict[str, str] | None = None,
):
"""
High level wrapper for restart: restarts the process, and waits for
tenant state to stabilize.
"""
self.stop(immediate=immediate)
self.start(timeout_in_seconds=timeout_in_seconds)
self.start(timeout_in_seconds=timeout_in_seconds, extra_env_vars=extra_env_vars)
self.quiesce_tenants()
def quiesce_tenants(self):

View File

@@ -1,5 +1,8 @@
import concurrent.futures
import dataclasses
import json
import re
import threading
import time
from dataclasses import dataclass
from pathlib import Path
@@ -31,15 +34,15 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
mode: str = "pipelined"
EXECUTION = ["concurrent-futures", "tasks"]
EXECUTION = ["concurrent-futures"]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 2, 4, 8, 16, 32]:
BATCHABLE: list[PageServicePipeliningConfig] = []
for max_batch_size in [32]:
for execution in EXECUTION:
BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution))
@@ -47,19 +50,6 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]:
@pytest.mark.parametrize(
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
[
# non-batchable workloads
# (A separate benchmark will consider latency).
*[
(
50,
config,
TARGET_RUNTIME,
1,
128,
f"not batchable {dataclasses.asdict(config)}",
)
for config in NON_BATCHABLE
],
# batchable workloads should show throughput and CPU efficiency improvements
*[
(
@@ -137,7 +127,14 @@ def test_throughput(
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
endpoint = env.endpoints.create_start(
"main",
config_lines=[
# minimal lfc & small shared buffers to force requests to pageserver
"neon.max_file_cache_size=1MB",
"shared_buffers=10MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
@@ -155,7 +152,6 @@ def test_throughput(
tablesize = tablesize_mib * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?
#
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
@@ -211,31 +207,73 @@ def test_throughput(
).value,
)
def workload() -> Metrics:
def workload(disruptor_started: threading.Event) -> Metrics:
disruptor_started.wait()
start = time.time()
iters = 0
while time.time() - start < target_runtime or iters < 2:
log.info("Seqscan %d", iters)
if iters == 1:
# round zero for warming up
before = get_metrics()
cur.execute(
"select clear_buffer_cache()"
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
cur.execute("select sum(data::bigint) from t")
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
iters += 1
after = get_metrics()
return (after - before).normalize(iters - 1)
def disruptor(disruptor_started: threading.Event, stop_disruptor: threading.Event):
conn = endpoint.connect()
cur = conn.cursor()
iters = 0
while True:
cur.execute("SELECT pg_logical_emit_message(true, 'test', 'advancelsn')")
if stop_disruptor.is_set():
break
disruptor_started.set()
iters += 1
time.sleep(0.001)
return iters
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
)
env.pageserver.restart()
metrics = workload()
# set trace for log analysis below
env.pageserver.restart(extra_env_vars={"RUST_LOG": "info,pageserver::page_service=trace"})
log.info("Starting workload")
with concurrent.futures.ThreadPoolExecutor() as executor:
disruptor_started = threading.Event()
stop_disruptor = threading.Event()
disruptor_fut = executor.submit(disruptor, disruptor_started, stop_disruptor)
workload_fut = executor.submit(workload, disruptor_started)
metrics = workload_fut.result()
stop_disruptor.set()
ndisruptions = disruptor_fut.result()
log.info("Disruptor issued %d disrupting requests", ndisruptions)
log.info("Results: %s", metrics)
since_last_start: list[str] = []
for line in env.pageserver.logfile.read_text().splitlines():
if "git:" in line:
since_last_start = []
since_last_start.append(line)
stopping_batching_because_re = re.compile(
r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)"
)
reasons_for_stopping_batching = {}
for line in since_last_start:
match = stopping_batching_because_re.search(line)
if match:
if match.group(1) not in reasons_for_stopping_batching:
reasons_for_stopping_batching[match.group(1)] = 0
reasons_for_stopping_batching[match.group(1)] += 1
log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching)
#
# Sanity-checks on the collected data
#