diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 858d367abf..75a0596f58 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2884,13 +2884,14 @@ class NeonPageserver(PgProtocol, LogUtils): self, immediate: bool = False, timeout_in_seconds: int | None = None, + extra_env_vars: dict[str, str] | None = None, ): """ High level wrapper for restart: restarts the process, and waits for tenant state to stabilize. """ self.stop(immediate=immediate) - self.start(timeout_in_seconds=timeout_in_seconds) + self.start(timeout_in_seconds=timeout_in_seconds, extra_env_vars=extra_env_vars) self.quiesce_tenants() def quiesce_tenants(self): diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py index 2c27368001..5169add6cb 100644 --- a/test_runner/performance/pageserver/test_page_service_batching.py +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -1,5 +1,8 @@ +import concurrent.futures import dataclasses import json +import re +import threading import time from dataclasses import dataclass from pathlib import Path @@ -31,15 +34,15 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig): mode: str = "pipelined" -EXECUTION = ["concurrent-futures", "tasks"] +EXECUTION = ["concurrent-futures"] NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()] for max_batch_size in [1, 32]: for execution in EXECUTION: NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution)) -BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()] -for max_batch_size in [1, 2, 4, 8, 16, 32]: +BATCHABLE: list[PageServicePipeliningConfig] = [] +for max_batch_size in [32]: for execution in EXECUTION: BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution)) @@ -47,19 +50,6 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]: @pytest.mark.parametrize( "tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name", [ - # non-batchable workloads - # (A separate benchmark will consider latency). - *[ - ( - 50, - config, - TARGET_RUNTIME, - 1, - 128, - f"not batchable {dataclasses.asdict(config)}", - ) - for config in NON_BATCHABLE - ], # batchable workloads should show throughput and CPU efficiency improvements *[ ( @@ -137,7 +127,14 @@ def test_throughput( env = neon_env_builder.init_start() ps_http = env.pageserver.http_client() - endpoint = env.endpoints.create_start("main") + endpoint = env.endpoints.create_start( + "main", + config_lines=[ + # minimal lfc & small shared buffers to force requests to pageserver + "neon.max_file_cache_size=1MB", + "shared_buffers=10MB", + ], + ) conn = endpoint.connect() cur = conn.cursor() @@ -155,7 +152,6 @@ def test_throughput( tablesize = tablesize_mib * 1024 * 1024 npages = tablesize // (8 * 1024) cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,)) - # TODO: can we force postgres to do sequential scans? # # Run the workload, collect `Metrics` before and after, calculate difference, normalize. @@ -211,31 +207,73 @@ def test_throughput( ).value, ) - def workload() -> Metrics: + def workload(disruptor_started: threading.Event) -> Metrics: + disruptor_started.wait() start = time.time() iters = 0 while time.time() - start < target_runtime or iters < 2: - log.info("Seqscan %d", iters) if iters == 1: # round zero for warming up before = get_metrics() - cur.execute( - "select clear_buffer_cache()" - ) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests cur.execute("select sum(data::bigint) from t") assert cur.fetchall()[0][0] == npages * (npages + 1) // 2 iters += 1 after = get_metrics() return (after - before).normalize(iters - 1) + def disruptor(disruptor_started: threading.Event, stop_disruptor: threading.Event): + conn = endpoint.connect() + cur = conn.cursor() + iters = 0 + while True: + cur.execute("SELECT pg_logical_emit_message(true, 'test', 'advancelsn')") + if stop_disruptor.is_set(): + break + disruptor_started.set() + iters += 1 + time.sleep(0.001) + return iters + env.pageserver.patch_config_toml_nonrecursive( {"page_service_pipelining": dataclasses.asdict(pipelining_config)} ) - env.pageserver.restart() - metrics = workload() + + # set trace for log analysis below + env.pageserver.restart(extra_env_vars={"RUST_LOG": "info,pageserver::page_service=trace"}) + + log.info("Starting workload") + + with concurrent.futures.ThreadPoolExecutor() as executor: + disruptor_started = threading.Event() + stop_disruptor = threading.Event() + disruptor_fut = executor.submit(disruptor, disruptor_started, stop_disruptor) + workload_fut = executor.submit(workload, disruptor_started) + metrics = workload_fut.result() + stop_disruptor.set() + ndisruptions = disruptor_fut.result() + log.info("Disruptor issued %d disrupting requests", ndisruptions) log.info("Results: %s", metrics) + since_last_start: list[str] = [] + for line in env.pageserver.logfile.read_text().splitlines(): + if "git:" in line: + since_last_start = [] + since_last_start.append(line) + + stopping_batching_because_re = re.compile( + r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)" + ) + reasons_for_stopping_batching = {} + for line in since_last_start: + match = stopping_batching_because_re.search(line) + if match: + if match.group(1) not in reasons_for_stopping_batching: + reasons_for_stopping_batching[match.group(1)] = 0 + reasons_for_stopping_batching[match.group(1)] += 1 + + log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching) + # # Sanity-checks on the collected data #