From 979fa0682b09bcbac5d74d2f1082a12c173c676f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Apr 2025 11:55:49 +0200 Subject: [PATCH] tests: update batching perf test workload to include scattered LSNs (#11391) The batching perf test workload is currently read-only sequential scans. However, realistic workloads have concurrent writes (to other pages) going on. This PR simulates concurrent writes to other pages by emitting logical replication messages. These degrade the achieved batching factor, for the reason see - https://github.com/neondatabase/neon/issues/10765 PR - https://github.com/neondatabase/neon/pull/11494 will fix this problem and get batching factor back up. --------- Co-authored-by: Vlad Lazar --- test_runner/fixtures/neon_fixtures.py | 3 +- .../pageserver/test_page_service_batching.py | 88 +++++++++++++------ 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 858d367abf..75a0596f58 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2884,13 +2884,14 @@ class NeonPageserver(PgProtocol, LogUtils): self, immediate: bool = False, timeout_in_seconds: int | None = None, + extra_env_vars: dict[str, str] | None = None, ): """ High level wrapper for restart: restarts the process, and waits for tenant state to stabilize. """ self.stop(immediate=immediate) - self.start(timeout_in_seconds=timeout_in_seconds) + self.start(timeout_in_seconds=timeout_in_seconds, extra_env_vars=extra_env_vars) self.quiesce_tenants() def quiesce_tenants(self): diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py index 2c27368001..5169add6cb 100644 --- a/test_runner/performance/pageserver/test_page_service_batching.py +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -1,5 +1,8 @@ +import concurrent.futures import dataclasses import json +import re +import threading import time from dataclasses import dataclass from pathlib import Path @@ -31,15 +34,15 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig): mode: str = "pipelined" -EXECUTION = ["concurrent-futures", "tasks"] +EXECUTION = ["concurrent-futures"] NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()] for max_batch_size in [1, 32]: for execution in EXECUTION: NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution)) -BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()] -for max_batch_size in [1, 2, 4, 8, 16, 32]: +BATCHABLE: list[PageServicePipeliningConfig] = [] +for max_batch_size in [32]: for execution in EXECUTION: BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution)) @@ -47,19 +50,6 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]: @pytest.mark.parametrize( "tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name", [ - # non-batchable workloads - # (A separate benchmark will consider latency). - *[ - ( - 50, - config, - TARGET_RUNTIME, - 1, - 128, - f"not batchable {dataclasses.asdict(config)}", - ) - for config in NON_BATCHABLE - ], # batchable workloads should show throughput and CPU efficiency improvements *[ ( @@ -137,7 +127,14 @@ def test_throughput( env = neon_env_builder.init_start() ps_http = env.pageserver.http_client() - endpoint = env.endpoints.create_start("main") + endpoint = env.endpoints.create_start( + "main", + config_lines=[ + # minimal lfc & small shared buffers to force requests to pageserver + "neon.max_file_cache_size=1MB", + "shared_buffers=10MB", + ], + ) conn = endpoint.connect() cur = conn.cursor() @@ -155,7 +152,6 @@ def test_throughput( tablesize = tablesize_mib * 1024 * 1024 npages = tablesize // (8 * 1024) cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,)) - # TODO: can we force postgres to do sequential scans? # # Run the workload, collect `Metrics` before and after, calculate difference, normalize. @@ -211,31 +207,73 @@ def test_throughput( ).value, ) - def workload() -> Metrics: + def workload(disruptor_started: threading.Event) -> Metrics: + disruptor_started.wait() start = time.time() iters = 0 while time.time() - start < target_runtime or iters < 2: - log.info("Seqscan %d", iters) if iters == 1: # round zero for warming up before = get_metrics() - cur.execute( - "select clear_buffer_cache()" - ) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests cur.execute("select sum(data::bigint) from t") assert cur.fetchall()[0][0] == npages * (npages + 1) // 2 iters += 1 after = get_metrics() return (after - before).normalize(iters - 1) + def disruptor(disruptor_started: threading.Event, stop_disruptor: threading.Event): + conn = endpoint.connect() + cur = conn.cursor() + iters = 0 + while True: + cur.execute("SELECT pg_logical_emit_message(true, 'test', 'advancelsn')") + if stop_disruptor.is_set(): + break + disruptor_started.set() + iters += 1 + time.sleep(0.001) + return iters + env.pageserver.patch_config_toml_nonrecursive( {"page_service_pipelining": dataclasses.asdict(pipelining_config)} ) - env.pageserver.restart() - metrics = workload() + + # set trace for log analysis below + env.pageserver.restart(extra_env_vars={"RUST_LOG": "info,pageserver::page_service=trace"}) + + log.info("Starting workload") + + with concurrent.futures.ThreadPoolExecutor() as executor: + disruptor_started = threading.Event() + stop_disruptor = threading.Event() + disruptor_fut = executor.submit(disruptor, disruptor_started, stop_disruptor) + workload_fut = executor.submit(workload, disruptor_started) + metrics = workload_fut.result() + stop_disruptor.set() + ndisruptions = disruptor_fut.result() + log.info("Disruptor issued %d disrupting requests", ndisruptions) log.info("Results: %s", metrics) + since_last_start: list[str] = [] + for line in env.pageserver.logfile.read_text().splitlines(): + if "git:" in line: + since_last_start = [] + since_last_start.append(line) + + stopping_batching_because_re = re.compile( + r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)" + ) + reasons_for_stopping_batching = {} + for line in since_last_start: + match = stopping_batching_because_re.search(line) + if match: + if match.group(1) not in reasons_for_stopping_batching: + reasons_for_stopping_batching[match.group(1)] = 0 + reasons_for_stopping_batching[match.group(1)] += 1 + + log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching) + # # Sanity-checks on the collected data #