diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index fcefaad8fa..51afd3a03d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -500,6 +500,8 @@ class NeonEnvBuilder: num_safekeepers: int = 1, # Use non-standard SK ids to check for various parsing bugs safekeepers_id_start: int = 0, + # fsync is disabled by default to make the tests go faster + safekeepers_enable_fsync: bool = False, auth_enabled: bool = False, rust_log_override: Optional[str] = None, default_branch_name=DEFAULT_BRANCH_NAME): @@ -513,6 +515,7 @@ class NeonEnvBuilder: self.pageserver_config_override = pageserver_config_override self.num_safekeepers = num_safekeepers self.safekeepers_id_start = safekeepers_id_start + self.safekeepers_enable_fsync = safekeepers_enable_fsync self.auth_enabled = auth_enabled self.default_branch_name = default_branch_name self.env: Optional[NeonEnv] = None @@ -666,7 +669,7 @@ class NeonEnv: id = {id} pg_port = {port.pg} http_port = {port.http} - sync = false # Disable fsyncs to make the tests go faster""") + sync = {'true' if config.safekeepers_enable_fsync else 'false'}""") if config.auth_enabled: toml += textwrap.dedent(f""" auth_enabled = true diff --git a/test_runner/performance/test_wal_backpressure.py b/test_runner/performance/test_wal_backpressure.py new file mode 100644 index 0000000000..873d1132a7 --- /dev/null +++ b/test_runner/performance/test_wal_backpressure.py @@ -0,0 +1,264 @@ +import statistics +import threading +import time +import timeit +from typing import Callable + +import pytest +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker +from fixtures.compare_fixtures import NeonCompare, PgCompare, VanillaCompare +from fixtures.log_helper import log +from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder, PgBin +from fixtures.utils import lsn_from_hex + +from performance.test_perf_pgbench import (get_durations_matrix, get_scales_matrix) + + +@pytest.fixture(params=["vanilla", "neon_off", "neon_on"]) +# This fixture constructs multiple `PgCompare` interfaces using a builder pattern. +# The builder parameters are encoded in the fixture's param. +# For example, to build a `NeonCompare` interface, the corresponding fixture's param should have +# a format of `neon_{safekeepers_enable_fsync}`. +# Note that, here "_" is used to separate builder parameters. +def pg_compare(request) -> PgCompare: + x = request.param.split("_") + + if x[0] == "vanilla": + # `VanillaCompare` interface + fixture = request.getfixturevalue("vanilla_compare") + assert isinstance(fixture, VanillaCompare) + + return fixture + else: + assert len(x) == 2, f"request param ({request.param}) should have a format of \ + `neon_{{safekeepers_enable_fsync}}`" + + # `NeonCompare` interface + neon_env_builder = request.getfixturevalue("neon_env_builder") + assert isinstance(neon_env_builder, NeonEnvBuilder) + + zenbenchmark = request.getfixturevalue("zenbenchmark") + assert isinstance(zenbenchmark, NeonBenchmarker) + + pg_bin = request.getfixturevalue("pg_bin") + assert isinstance(pg_bin, PgBin) + + neon_env_builder.safekeepers_enable_fsync = x[1] == "on" + + env = neon_env_builder.init_start() + env.neon_cli.create_branch("empty", ancestor_branch_name=DEFAULT_BRANCH_NAME) + + branch_name = request.node.name + return NeonCompare(zenbenchmark, env, pg_bin, branch_name) + + +def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_iters: int): + """Start an intensive write workload across multiple tables. + + ## Single table workload: + At each step, insert new `new_rows_each_update` rows. + The variable `new_rows_each_update` is equal to `scale * 100_000`. + The number of steps is determined by `num_iters` variable.""" + new_rows_each_update = scale * 100_000 + + def start_single_table_workload(table_id: int): + for _ in range(num_iters): + with env.pg.connect().cursor() as cur: + cur.execute( + f"INSERT INTO t{table_id} SELECT FROM generate_series(1,{new_rows_each_update})" + ) + + with env.record_duration("run_duration"): + threads = [ + threading.Thread(target=start_single_table_workload, args=(i, )) + for i in range(n_tables) + ] + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + +@pytest.mark.parametrize("n_tables", [5]) +@pytest.mark.parametrize("scale", get_scales_matrix(5)) +@pytest.mark.parametrize("num_iters", [10]) +def test_heavy_write_workload(pg_compare: PgCompare, n_tables: int, scale: int, num_iters: int): + env = pg_compare + + # Initializes test tables + with env.pg.connect().cursor() as cur: + for i in range(n_tables): + cur.execute( + f"CREATE TABLE t{i}(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')" + ) + cur.execute(f"INSERT INTO t{i} (key) VALUES (0)") + + workload_thread = threading.Thread(target=start_heavy_write_workload, + args=(env, n_tables, scale, num_iters)) + workload_thread.start() + + record_thread = threading.Thread(target=record_lsn_write_lag, + args=(env, lambda: workload_thread.is_alive())) + record_thread.start() + + record_read_latency(env, lambda: workload_thread.is_alive(), "SELECT * from t0 where key = 0") + workload_thread.join() + record_thread.join() + + +def start_pgbench_simple_update_workload(env: PgCompare, duration: int): + with env.record_duration("run_duration"): + env.pg_bin.run_capture([ + 'pgbench', + '-j10', + '-c10', + '-N', + f'-T{duration}', + '-Mprepared', + env.pg.connstr(options="-csynchronous_commit=off") + ]) + env.flush() + + +@pytest.mark.parametrize("scale", get_scales_matrix(100)) +@pytest.mark.parametrize("duration", get_durations_matrix()) +def test_pgbench_simple_update_workload(pg_compare: PgCompare, scale: int, duration: int): + env = pg_compare + + # initialize pgbench tables + env.pg_bin.run_capture(['pgbench', f'-s{scale}', '-i', env.pg.connstr()]) + env.flush() + + workload_thread = threading.Thread(target=start_pgbench_simple_update_workload, + args=(env, duration)) + workload_thread.start() + + record_thread = threading.Thread(target=record_lsn_write_lag, + args=(env, lambda: workload_thread.is_alive())) + record_thread.start() + + record_read_latency(env, + lambda: workload_thread.is_alive(), + "SELECT * from pgbench_accounts where aid = 1") + workload_thread.join() + record_thread.join() + + +def start_pgbench_intensive_initialization(env: PgCompare, scale: int): + with env.record_duration("run_duration"): + # Needs to increase the statement timeout (default: 120s) because the + # initialization step can be slow with a large scale. + env.pg_bin.run_capture([ + 'pgbench', + f'-s{scale}', + '-i', + '-Idtg', + env.pg.connstr(options='-cstatement_timeout=300s') + ]) + + +@pytest.mark.parametrize("scale", get_scales_matrix(1000)) +def test_pgbench_intensive_init_workload(pg_compare: PgCompare, scale: int): + env = pg_compare + with env.pg.connect().cursor() as cur: + cur.execute("CREATE TABLE foo as select generate_series(1,100000)") + + workload_thread = threading.Thread(target=start_pgbench_intensive_initialization, + args=(env, scale)) + workload_thread.start() + + record_thread = threading.Thread(target=record_lsn_write_lag, + args=(env, lambda: workload_thread.is_alive())) + record_thread.start() + + record_read_latency(env, lambda: workload_thread.is_alive(), "SELECT count(*) from foo") + workload_thread.join() + record_thread.join() + + +def record_lsn_write_lag(env: PgCompare, run_cond: Callable[[], bool], pool_interval: float = 1.0): + if not isinstance(env, NeonCompare): + return + + lsn_write_lags = [] + last_received_lsn = 0 + last_pg_flush_lsn = 0 + + with env.pg.connect().cursor() as cur: + cur.execute("CREATE EXTENSION neon") + + while run_cond(): + cur.execute(''' + select pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn), + pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn)), + pg_current_wal_flush_lsn(), + received_lsn + from backpressure_lsns(); + ''') + + res = cur.fetchone() + lsn_write_lags.append(res[0]) + + curr_received_lsn = lsn_from_hex(res[3]) + lsn_process_speed = (curr_received_lsn - last_received_lsn) / (1024**2) + last_received_lsn = curr_received_lsn + + curr_pg_flush_lsn = lsn_from_hex(res[2]) + lsn_produce_speed = (curr_pg_flush_lsn - last_pg_flush_lsn) / (1024**2) + last_pg_flush_lsn = curr_pg_flush_lsn + + log.info( + f"received_lsn_lag={res[1]}, pg_flush_lsn={res[2]}, received_lsn={res[3]}, lsn_process_speed={lsn_process_speed:.2f}MB/s, lsn_produce_speed={lsn_produce_speed:.2f}MB/s" + ) + + time.sleep(pool_interval) + + env.zenbenchmark.record("lsn_write_lag_max", + float(max(lsn_write_lags) / (1024**2)), + "MB", + MetricReport.LOWER_IS_BETTER) + env.zenbenchmark.record("lsn_write_lag_avg", + float(statistics.mean(lsn_write_lags) / (1024**2)), + "MB", + MetricReport.LOWER_IS_BETTER) + env.zenbenchmark.record("lsn_write_lag_stdev", + float(statistics.stdev(lsn_write_lags) / (1024**2)), + "MB", + MetricReport.LOWER_IS_BETTER) + + +def record_read_latency(env: PgCompare, + run_cond: Callable[[], bool], + read_query: str, + read_interval: float = 1.0): + read_latencies = [] + + with env.pg.connect().cursor() as cur: + while run_cond(): + try: + t1 = timeit.default_timer() + cur.execute(read_query) + t2 = timeit.default_timer() + + log.info( + f"Executed read query {read_query}, got {cur.fetchall()}, read time {t2-t1:.2f}s" + ) + read_latencies.append(t2 - t1) + except Exception as err: + log.error(f"Got error when executing the read query: {err}") + + time.sleep(read_interval) + + env.zenbenchmark.record("read_latency_max", + max(read_latencies), + 's', + MetricReport.LOWER_IS_BETTER) + env.zenbenchmark.record("read_latency_avg", + statistics.mean(read_latencies), + 's', + MetricReport.LOWER_IS_BETTER) + env.zenbenchmark.record("read_latency_stdev", + statistics.stdev(read_latencies), + 's', + MetricReport.LOWER_IS_BETTER)